[HUDI-4138] Fix the concurrency modification of hoodie table config for flink (#5660)
* Remove the metadata cleaning strategy for flink, that means the multi-modal index may be affected * Improve the HoodieTable#clearMetadataTablePartitionsConfig to only update table config when necessary * Remove the modification of read code path in HoodieTableConfig
This commit is contained in:
@@ -885,24 +885,22 @@ public abstract class HoodieTable<T extends HoodieRecordPayload, I, K, O> implem
|
|||||||
// partitions are ready to use
|
// partitions are ready to use
|
||||||
return !HoodieTableMetadata.isMetadataTable(metaClient.getBasePath())
|
return !HoodieTableMetadata.isMetadataTable(metaClient.getBasePath())
|
||||||
&& !config.isMetadataTableEnabled()
|
&& !config.isMetadataTableEnabled()
|
||||||
&& (!metaClient.getTableConfig().contains(TABLE_METADATA_PARTITIONS)
|
&& !metaClient.getTableConfig().getMetadataPartitions().isEmpty();
|
||||||
|| !metaClient.getTableConfig().getMetadataPartitions().isEmpty());
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Clears hoodie.table.metadata.partitions in hoodie.properties
|
* Clears hoodie.table.metadata.partitions in hoodie.properties
|
||||||
*/
|
*/
|
||||||
private void clearMetadataTablePartitionsConfig(Option<MetadataPartitionType> partitionType, boolean clearAll) {
|
private void clearMetadataTablePartitionsConfig(Option<MetadataPartitionType> partitionType, boolean clearAll) {
|
||||||
if (clearAll) {
|
Set<String> partitions = getCompletedMetadataPartitions(metaClient.getTableConfig());
|
||||||
|
if (clearAll && partitions.size() > 0) {
|
||||||
LOG.info("Clear hoodie.table.metadata.partitions in hoodie.properties");
|
LOG.info("Clear hoodie.table.metadata.partitions in hoodie.properties");
|
||||||
metaClient.getTableConfig().setValue(TABLE_METADATA_PARTITIONS.key(), EMPTY_STRING);
|
metaClient.getTableConfig().setValue(TABLE_METADATA_PARTITIONS.key(), EMPTY_STRING);
|
||||||
HoodieTableConfig.update(metaClient.getFs(), new Path(metaClient.getMetaPath()), metaClient.getTableConfig().getProps());
|
HoodieTableConfig.update(metaClient.getFs(), new Path(metaClient.getMetaPath()), metaClient.getTableConfig().getProps());
|
||||||
return;
|
} else if (partitions.remove(partitionType.get().getPartitionPath())) {
|
||||||
|
metaClient.getTableConfig().setValue(HoodieTableConfig.TABLE_METADATA_PARTITIONS.key(), String.join(",", partitions));
|
||||||
|
HoodieTableConfig.update(metaClient.getFs(), new Path(metaClient.getMetaPath()), metaClient.getTableConfig().getProps());
|
||||||
}
|
}
|
||||||
Set<String> completedPartitions = getCompletedMetadataPartitions(metaClient.getTableConfig());
|
|
||||||
completedPartitions.remove(partitionType.get().getPartitionPath());
|
|
||||||
metaClient.getTableConfig().setValue(HoodieTableConfig.TABLE_METADATA_PARTITIONS.key(), String.join(",", completedPartitions));
|
|
||||||
HoodieTableConfig.update(metaClient.getFs(), new Path(metaClient.getMetaPath()), metaClient.getTableConfig().getProps());
|
|
||||||
}
|
}
|
||||||
|
|
||||||
public HoodieTableMetadata getMetadataTable() {
|
public HoodieTableMetadata getMetadataTable() {
|
||||||
|
|||||||
@@ -53,7 +53,6 @@ import org.apache.hudi.io.HoodieWriteHandle;
|
|||||||
import org.apache.hudi.io.MiniBatchHandle;
|
import org.apache.hudi.io.MiniBatchHandle;
|
||||||
import org.apache.hudi.metadata.FlinkHoodieBackedTableMetadataWriter;
|
import org.apache.hudi.metadata.FlinkHoodieBackedTableMetadataWriter;
|
||||||
import org.apache.hudi.metadata.HoodieBackedTableMetadataWriter;
|
import org.apache.hudi.metadata.HoodieBackedTableMetadataWriter;
|
||||||
import org.apache.hudi.metadata.HoodieTableMetadataWriter;
|
|
||||||
import org.apache.hudi.table.BulkInsertPartitioner;
|
import org.apache.hudi.table.BulkInsertPartitioner;
|
||||||
import org.apache.hudi.table.HoodieFlinkTable;
|
import org.apache.hudi.table.HoodieFlinkTable;
|
||||||
import org.apache.hudi.table.HoodieTable;
|
import org.apache.hudi.table.HoodieTable;
|
||||||
@@ -365,8 +364,7 @@ public class HoodieFlinkWriteClient<T extends HoodieRecordPayload> extends
|
|||||||
// commit to data table after committing to metadata table.
|
// commit to data table after committing to metadata table.
|
||||||
// Do not do any conflict resolution here as we do with regular writes. We take the lock here to ensure all writes to metadata table happens within a
|
// Do not do any conflict resolution here as we do with regular writes. We take the lock here to ensure all writes to metadata table happens within a
|
||||||
// single lock (single writer). Because more than one write to metadata table will result in conflicts since all of them updates the same partition.
|
// single lock (single writer). Because more than one write to metadata table will result in conflicts since all of them updates the same partition.
|
||||||
table.getMetadataWriter(compactionInstant.getTimestamp()).ifPresent(
|
writeTableMetadata(table, compactionCommitTime, compactionInstant.getAction(), metadata);
|
||||||
w -> ((HoodieTableMetadataWriter) w).update(metadata, compactionInstant.getTimestamp(), table.isTableServiceAction(compactionInstant.getAction())));
|
|
||||||
LOG.info("Committing Compaction {} finished with result {}.", compactionCommitTime, metadata);
|
LOG.info("Committing Compaction {} finished with result {}.", compactionCommitTime, metadata);
|
||||||
CompactHelpers.getInstance().completeInflightCompaction(table, compactionCommitTime, metadata);
|
CompactHelpers.getInstance().completeInflightCompaction(table, compactionCommitTime, metadata);
|
||||||
} finally {
|
} finally {
|
||||||
|
|||||||
@@ -105,13 +105,9 @@ public abstract class HoodieFlinkTable<T extends HoodieRecordPayload>
|
|||||||
public <T extends SpecificRecordBase> Option<HoodieTableMetadataWriter> getMetadataWriter(String triggeringInstantTimestamp,
|
public <T extends SpecificRecordBase> Option<HoodieTableMetadataWriter> getMetadataWriter(String triggeringInstantTimestamp,
|
||||||
Option<T> actionMetadata) {
|
Option<T> actionMetadata) {
|
||||||
if (config.isMetadataTableEnabled()) {
|
if (config.isMetadataTableEnabled()) {
|
||||||
// even with metadata enabled, some index could have been disabled
|
|
||||||
// delete metadata partitions corresponding to such indexes
|
|
||||||
deleteMetadataIndexIfNecessary();
|
|
||||||
return Option.of(FlinkHoodieBackedTableMetadataWriter.create(context.getHadoopConf().get(), config,
|
return Option.of(FlinkHoodieBackedTableMetadataWriter.create(context.getHadoopConf().get(), config,
|
||||||
context, actionMetadata, Option.of(triggeringInstantTimestamp)));
|
context, actionMetadata, Option.of(triggeringInstantTimestamp)));
|
||||||
} else {
|
} else {
|
||||||
maybeDeleteMetadataTable();
|
|
||||||
return Option.empty();
|
return Option.empty();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -272,8 +272,8 @@ public class HoodieTableConfig extends HoodieConfig {
|
|||||||
* @throws IOException
|
* @throws IOException
|
||||||
*/
|
*/
|
||||||
private static String storeProperties(Properties props, FSDataOutputStream outputStream) throws IOException {
|
private static String storeProperties(Properties props, FSDataOutputStream outputStream) throws IOException {
|
||||||
String checksum;
|
final String checksum;
|
||||||
if (props.containsKey(TABLE_CHECKSUM.key()) && validateChecksum(props)) {
|
if (isValidChecksum(props)) {
|
||||||
checksum = props.getProperty(TABLE_CHECKSUM.key());
|
checksum = props.getProperty(TABLE_CHECKSUM.key());
|
||||||
props.store(outputStream, "Updated at " + Instant.now());
|
props.store(outputStream, "Updated at " + Instant.now());
|
||||||
} else {
|
} else {
|
||||||
@@ -285,8 +285,8 @@ public class HoodieTableConfig extends HoodieConfig {
|
|||||||
return checksum;
|
return checksum;
|
||||||
}
|
}
|
||||||
|
|
||||||
private boolean isValidChecksum() {
|
private static boolean isValidChecksum(Properties props) {
|
||||||
return contains(TABLE_CHECKSUM) && validateChecksum(props);
|
return props.containsKey(TABLE_CHECKSUM.key()) && validateChecksum(props);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@@ -298,20 +298,13 @@ public class HoodieTableConfig extends HoodieConfig {
|
|||||||
|
|
||||||
private void fetchConfigs(FileSystem fs, String metaPath) throws IOException {
|
private void fetchConfigs(FileSystem fs, String metaPath) throws IOException {
|
||||||
Path cfgPath = new Path(metaPath, HOODIE_PROPERTIES_FILE);
|
Path cfgPath = new Path(metaPath, HOODIE_PROPERTIES_FILE);
|
||||||
Path backupCfgPath = new Path(metaPath, HOODIE_PROPERTIES_FILE_BACKUP);
|
|
||||||
try (FSDataInputStream is = fs.open(cfgPath)) {
|
try (FSDataInputStream is = fs.open(cfgPath)) {
|
||||||
props.load(is);
|
props.load(is);
|
||||||
// validate checksum for latest table version
|
|
||||||
if (getTableVersion().versionCode() >= HoodieTableVersion.FOUR.versionCode() && !isValidChecksum()) {
|
|
||||||
LOG.warn("Checksum validation failed. Falling back to backed up configs.");
|
|
||||||
try (FSDataInputStream fsDataInputStream = fs.open(backupCfgPath)) {
|
|
||||||
props.load(fsDataInputStream);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
} catch (IOException ioe) {
|
} catch (IOException ioe) {
|
||||||
if (!fs.exists(cfgPath)) {
|
if (!fs.exists(cfgPath)) {
|
||||||
LOG.warn("Run `table recover-configs` if config update/delete failed midway. Falling back to backed up configs.");
|
LOG.warn("Run `table recover-configs` if config update/delete failed midway. Falling back to backed up configs.");
|
||||||
// try the backup. this way no query ever fails if update fails midway.
|
// try the backup. this way no query ever fails if update fails midway.
|
||||||
|
Path backupCfgPath = new Path(metaPath, HOODIE_PROPERTIES_FILE_BACKUP);
|
||||||
try (FSDataInputStream is = fs.open(backupCfgPath)) {
|
try (FSDataInputStream is = fs.open(backupCfgPath)) {
|
||||||
props.load(is);
|
props.load(is);
|
||||||
}
|
}
|
||||||
@@ -631,7 +624,7 @@ public class HoodieTableConfig extends HoodieConfig {
|
|||||||
CONFIG_VALUES_DELIMITER
|
CONFIG_VALUES_DELIMITER
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Returns the format to use for partition meta files.
|
* Returns the format to use for partition meta files.
|
||||||
*/
|
*/
|
||||||
|
|||||||
Reference in New Issue
Block a user