[HUDI-3839] Fixing incorrect selection of MT partitions to be updated (#5274)
* Fixing incorrect selection of MT partitions to be updated * Ensure that metadata partitions table config is inherited correctly Co-authored-by: Sagar Sumit <sagarsumit09@gmail.com>
This commit is contained in:
@@ -779,11 +779,15 @@ public abstract class HoodieBackedTableMetadataWriter implements HoodieTableMeta
|
|||||||
// fetch partitions to update from table config
|
// fetch partitions to update from table config
|
||||||
Set<String> partitionsToUpdate = getCompletedMetadataPartitions(dataMetaClient.getTableConfig());
|
Set<String> partitionsToUpdate = getCompletedMetadataPartitions(dataMetaClient.getTableConfig());
|
||||||
// add inflight indexes as well because the file groups have already been initialized, so writers can log updates
|
// add inflight indexes as well because the file groups have already been initialized, so writers can log updates
|
||||||
|
// NOTE: Async HoodieIndexer can move some partition to inflight. While that partition is still being built,
|
||||||
|
// the regular ingestion writers should not be blocked. They can go ahead and log updates to the metadata partition.
|
||||||
|
// Instead of depending on enabledPartitionTypes, the table config becomes the source of truth for which partitions to update.
|
||||||
partitionsToUpdate.addAll(getInflightMetadataPartitions(dataMetaClient.getTableConfig()));
|
partitionsToUpdate.addAll(getInflightMetadataPartitions(dataMetaClient.getTableConfig()));
|
||||||
if (!partitionsToUpdate.isEmpty()) {
|
if (!partitionsToUpdate.isEmpty()) {
|
||||||
return partitionsToUpdate;
|
return partitionsToUpdate;
|
||||||
}
|
}
|
||||||
// fallback to all enabled partitions if table config returned no partitions
|
// fallback to all enabled partitions if table config returned no partitions
|
||||||
|
LOG.warn("There are no partitions to update according to table config. Falling back to enabled partition types in the write config.");
|
||||||
return getEnabledPartitionTypes().stream().map(MetadataPartitionType::getPartitionPath).collect(Collectors.toSet());
|
return getEnabledPartitionTypes().stream().map(MetadataPartitionType::getPartitionPath).collect(Collectors.toSet());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -699,6 +699,8 @@ public class HoodieTableMetaClient implements Serializable {
|
|||||||
private HoodieTimelineTimeZone commitTimeZone;
|
private HoodieTimelineTimeZone commitTimeZone;
|
||||||
private Boolean partitionMetafileUseBaseFormat;
|
private Boolean partitionMetafileUseBaseFormat;
|
||||||
private Boolean dropPartitionColumnsWhenWrite;
|
private Boolean dropPartitionColumnsWhenWrite;
|
||||||
|
private String metadataPartitions;
|
||||||
|
private String inflightMetadataPartitions;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Persist the configs that is written at the first time, and should not be changed.
|
* Persist the configs that is written at the first time, and should not be changed.
|
||||||
@@ -823,6 +825,16 @@ public class HoodieTableMetaClient implements Serializable {
|
|||||||
return this;
|
return this;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public PropertyBuilder setMetadataPartitions(String partitions) {
|
||||||
|
this.metadataPartitions = partitions;
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
|
public PropertyBuilder setInflightMetadataPartitions(String partitions) {
|
||||||
|
this.inflightMetadataPartitions = partitions;
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
public PropertyBuilder set(String key, Object value) {
|
public PropertyBuilder set(String key, Object value) {
|
||||||
if (HoodieTableConfig.PERSISTED_CONFIG_LIST.contains(key)) {
|
if (HoodieTableConfig.PERSISTED_CONFIG_LIST.contains(key)) {
|
||||||
this.others.put(key, value);
|
this.others.put(key, value);
|
||||||
@@ -925,6 +937,14 @@ public class HoodieTableMetaClient implements Serializable {
|
|||||||
if (hoodieConfig.contains(HoodieTableConfig.DROP_PARTITION_COLUMNS)) {
|
if (hoodieConfig.contains(HoodieTableConfig.DROP_PARTITION_COLUMNS)) {
|
||||||
setDropPartitionColumnsWhenWrite(hoodieConfig.getBoolean(HoodieTableConfig.DROP_PARTITION_COLUMNS));
|
setDropPartitionColumnsWhenWrite(hoodieConfig.getBoolean(HoodieTableConfig.DROP_PARTITION_COLUMNS));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (hoodieConfig.contains(HoodieTableConfig.TABLE_METADATA_PARTITIONS)) {
|
||||||
|
setMetadataPartitions(hoodieConfig.getString(HoodieTableConfig.TABLE_METADATA_PARTITIONS));
|
||||||
|
}
|
||||||
|
|
||||||
|
if (hoodieConfig.contains(HoodieTableConfig.TABLE_METADATA_PARTITIONS_INFLIGHT)) {
|
||||||
|
setInflightMetadataPartitions(hoodieConfig.getString(HoodieTableConfig.TABLE_METADATA_PARTITIONS_INFLIGHT));
|
||||||
|
}
|
||||||
return this;
|
return this;
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -1010,6 +1030,14 @@ public class HoodieTableMetaClient implements Serializable {
|
|||||||
if (null != dropPartitionColumnsWhenWrite) {
|
if (null != dropPartitionColumnsWhenWrite) {
|
||||||
tableConfig.setValue(HoodieTableConfig.DROP_PARTITION_COLUMNS, Boolean.toString(dropPartitionColumnsWhenWrite));
|
tableConfig.setValue(HoodieTableConfig.DROP_PARTITION_COLUMNS, Boolean.toString(dropPartitionColumnsWhenWrite));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (null != metadataPartitions) {
|
||||||
|
tableConfig.setValue(HoodieTableConfig.TABLE_METADATA_PARTITIONS, metadataPartitions);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (null != inflightMetadataPartitions) {
|
||||||
|
tableConfig.setValue(HoodieTableConfig.TABLE_METADATA_PARTITIONS_INFLIGHT, inflightMetadataPartitions);
|
||||||
|
}
|
||||||
return tableConfig.getProps();
|
return tableConfig.getProps();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user