[HUDI-3836] Improve the way of fetching metadata partitions from table (#5286)
Co-authored-by: xicm <xicm@asiainfo.com>
This commit is contained in:
@@ -56,7 +56,6 @@ import static java.util.stream.Collectors.toList;
|
||||
import static org.apache.hudi.common.util.CollectionUtils.isNullOrEmpty;
|
||||
import static org.apache.hudi.index.HoodieIndexUtils.getLatestBaseFilesForAllPartitions;
|
||||
import static org.apache.hudi.metadata.HoodieMetadataPayload.unwrapStatisticValueWrapper;
|
||||
import static org.apache.hudi.metadata.HoodieTableMetadataUtil.getCompletedMetadataPartitions;
|
||||
import static org.apache.hudi.metadata.MetadataPartitionType.COLUMN_STATS;
|
||||
|
||||
/**
|
||||
@@ -143,7 +142,7 @@ public class HoodieBloomIndex extends HoodieIndex<Object, Object> {
|
||||
if (config.getBloomIndexPruneByRanges()) {
|
||||
// load column ranges from metadata index if column stats index is enabled and column_stats metadata partition is available
|
||||
if (config.getBloomIndexUseMetadata()
|
||||
&& getCompletedMetadataPartitions(hoodieTable.getMetaClient().getTableConfig()).contains(COLUMN_STATS.getPartitionPath())) {
|
||||
&& hoodieTable.getMetaClient().getTableConfig().getMetadataPartitions().contains(COLUMN_STATS.getPartitionPath())) {
|
||||
fileInfoList = loadColumnRangesFromMetaIndex(affectedPartitionPathList, context, hoodieTable);
|
||||
}
|
||||
// fallback to loading column ranges from files
|
||||
|
||||
@@ -37,7 +37,6 @@ import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
|
||||
import static org.apache.hudi.metadata.HoodieTableMetadataUtil.getCompletedMetadataPartitions;
|
||||
import static org.apache.hudi.metadata.MetadataPartitionType.BLOOM_FILTERS;
|
||||
|
||||
/**
|
||||
@@ -64,7 +63,7 @@ public class HoodieKeyLookupHandle<T extends HoodieRecordPayload, I, K, O> exten
|
||||
HoodieTimer timer = new HoodieTimer().startTimer();
|
||||
try {
|
||||
if (config.getBloomIndexUseMetadata()
|
||||
&& getCompletedMetadataPartitions(hoodieTable.getMetaClient().getTableConfig())
|
||||
&& hoodieTable.getMetaClient().getTableConfig().getMetadataPartitions()
|
||||
.contains(BLOOM_FILTERS.getPartitionPath())) {
|
||||
bloomFilter = hoodieTable.getMetadataTable().getBloomFilter(partitionPathFileIDPair.getLeft(), partitionPathFileIDPair.getRight())
|
||||
.orElseThrow(() -> new HoodieIndexException("BloomFilter missing for " + partitionPathFileIDPair.getRight()));
|
||||
|
||||
@@ -96,7 +96,6 @@ import static org.apache.hudi.common.table.timeline.TimelineMetadataUtils.deseri
|
||||
import static org.apache.hudi.common.util.StringUtils.EMPTY_STRING;
|
||||
import static org.apache.hudi.metadata.HoodieTableMetadata.METADATA_TABLE_NAME_SUFFIX;
|
||||
import static org.apache.hudi.metadata.HoodieTableMetadata.SOLO_COMMIT_TIMESTAMP;
|
||||
import static org.apache.hudi.metadata.HoodieTableMetadataUtil.getCompletedMetadataPartitions;
|
||||
import static org.apache.hudi.metadata.HoodieTableMetadataUtil.getInflightAndCompletedMetadataPartitions;
|
||||
import static org.apache.hudi.metadata.HoodieTableMetadataUtil.getInflightMetadataPartitions;
|
||||
|
||||
@@ -579,7 +578,7 @@ public abstract class HoodieBackedTableMetadataWriter implements HoodieTableMeta
|
||||
}
|
||||
|
||||
private void updateInitializedPartitionsInTableConfig(List<MetadataPartitionType> partitionTypes) {
|
||||
Set<String> completedPartitions = getCompletedMetadataPartitions(dataMetaClient.getTableConfig());
|
||||
Set<String> completedPartitions = dataMetaClient.getTableConfig().getMetadataPartitions();
|
||||
completedPartitions.addAll(partitionTypes.stream().map(MetadataPartitionType::getPartitionPath).collect(Collectors.toSet()));
|
||||
dataMetaClient.getTableConfig().setValue(HoodieTableConfig.TABLE_METADATA_PARTITIONS.key(), String.join(",", completedPartitions));
|
||||
HoodieTableConfig.update(dataMetaClient.getFs(), new Path(dataMetaClient.getMetaPath()), dataMetaClient.getTableConfig().getProps());
|
||||
@@ -716,7 +715,7 @@ public abstract class HoodieBackedTableMetadataWriter implements HoodieTableMeta
|
||||
}
|
||||
|
||||
public void dropMetadataPartitions(List<MetadataPartitionType> metadataPartitions) throws IOException {
|
||||
Set<String> completedIndexes = getCompletedMetadataPartitions(dataMetaClient.getTableConfig());
|
||||
Set<String> completedIndexes = dataMetaClient.getTableConfig().getMetadataPartitions();
|
||||
Set<String> inflightIndexes = getInflightMetadataPartitions(dataMetaClient.getTableConfig());
|
||||
|
||||
for (MetadataPartitionType partitionType : metadataPartitions) {
|
||||
@@ -806,7 +805,7 @@ public abstract class HoodieBackedTableMetadataWriter implements HoodieTableMeta
|
||||
|
||||
private Set<String> getMetadataPartitionsToUpdate() {
|
||||
// fetch partitions to update from table config
|
||||
Set<String> partitionsToUpdate = getCompletedMetadataPartitions(dataMetaClient.getTableConfig());
|
||||
Set<String> partitionsToUpdate = dataMetaClient.getTableConfig().getMetadataPartitions();
|
||||
// add inflight indexes as well because the file groups have already been initialized, so writers can log updates
|
||||
// NOTE: Async HoodieIndexer can move some partition to inflight. While that partition is still being built,
|
||||
// the regular ingestion writers should not be blocked. They can go ahead and log updates to the metadata partition.
|
||||
|
||||
@@ -102,7 +102,6 @@ import static org.apache.hudi.common.table.HoodieTableConfig.TABLE_METADATA_PART
|
||||
import static org.apache.hudi.common.util.StringUtils.EMPTY_STRING;
|
||||
import static org.apache.hudi.metadata.HoodieTableMetadataUtil.deleteMetadataPartition;
|
||||
import static org.apache.hudi.metadata.HoodieTableMetadataUtil.deleteMetadataTable;
|
||||
import static org.apache.hudi.metadata.HoodieTableMetadataUtil.getCompletedMetadataPartitions;
|
||||
import static org.apache.hudi.metadata.HoodieTableMetadataUtil.metadataPartitionExists;
|
||||
|
||||
/**
|
||||
@@ -900,7 +899,7 @@ public abstract class HoodieTable<T extends HoodieRecordPayload, I, K, O> implem
|
||||
return false;
|
||||
}
|
||||
return metadataIndexDisabled
|
||||
&& getCompletedMetadataPartitions(metaClient.getTableConfig()).contains(partitionType.getPartitionPath());
|
||||
&& metaClient.getTableConfig().getMetadataPartitions().contains(partitionType.getPartitionPath());
|
||||
}
|
||||
|
||||
private boolean shouldExecuteMetadataTableDeletion() {
|
||||
@@ -919,7 +918,7 @@ public abstract class HoodieTable<T extends HoodieRecordPayload, I, K, O> implem
|
||||
* Clears hoodie.table.metadata.partitions in hoodie.properties
|
||||
*/
|
||||
private void clearMetadataTablePartitionsConfig(Option<MetadataPartitionType> partitionType, boolean clearAll) {
|
||||
Set<String> partitions = getCompletedMetadataPartitions(metaClient.getTableConfig());
|
||||
Set<String> partitions = metaClient.getTableConfig().getMetadataPartitions();
|
||||
if (clearAll && partitions.size() > 0) {
|
||||
LOG.info("Clear hoodie.table.metadata.partitions in hoodie.properties");
|
||||
metaClient.getTableConfig().setValue(TABLE_METADATA_PARTITIONS.key(), EMPTY_STRING);
|
||||
|
||||
@@ -74,7 +74,6 @@ import static org.apache.hudi.common.table.timeline.HoodieTimeline.ROLLBACK_ACTI
|
||||
import static org.apache.hudi.config.HoodieWriteConfig.WRITE_CONCURRENCY_MODE;
|
||||
import static org.apache.hudi.metadata.HoodieTableMetadata.getMetadataTableBasePath;
|
||||
import static org.apache.hudi.metadata.HoodieTableMetadataUtil.deleteMetadataPartition;
|
||||
import static org.apache.hudi.metadata.HoodieTableMetadataUtil.getCompletedMetadataPartitions;
|
||||
import static org.apache.hudi.metadata.HoodieTableMetadataUtil.getInflightAndCompletedMetadataPartitions;
|
||||
import static org.apache.hudi.metadata.HoodieTableMetadataUtil.getInflightMetadataPartitions;
|
||||
import static org.apache.hudi.metadata.HoodieTableMetadataUtil.metadataPartitionExists;
|
||||
@@ -192,7 +191,7 @@ public class RunIndexActionExecutor<T extends HoodieRecordPayload, I, K, O> exte
|
||||
|
||||
private void abort(HoodieInstant indexInstant, Set<String> requestedPartitions) {
|
||||
Set<String> inflightPartitions = getInflightMetadataPartitions(table.getMetaClient().getTableConfig());
|
||||
Set<String> completedPartitions = getCompletedMetadataPartitions(table.getMetaClient().getTableConfig());
|
||||
Set<String> completedPartitions = table.getMetaClient().getTableConfig().getMetadataPartitions();
|
||||
// update table config
|
||||
requestedPartitions.forEach(partition -> {
|
||||
inflightPartitions.remove(partition);
|
||||
@@ -302,7 +301,7 @@ public class RunIndexActionExecutor<T extends HoodieRecordPayload, I, K, O> exte
|
||||
private void updateMetadataPartitionsTableConfig(HoodieTableMetaClient metaClient, Set<String> metadataPartitions) {
|
||||
// remove from inflight and update completed indexes
|
||||
Set<String> inflightPartitions = getInflightMetadataPartitions(metaClient.getTableConfig());
|
||||
Set<String> completedPartitions = getCompletedMetadataPartitions(metaClient.getTableConfig());
|
||||
Set<String> completedPartitions = metaClient.getTableConfig().getMetadataPartitions();
|
||||
inflightPartitions.removeAll(metadataPartitions);
|
||||
completedPartitions.addAll(metadataPartitions);
|
||||
// update table config
|
||||
|
||||
@@ -43,7 +43,6 @@ import java.util.stream.Collectors;
|
||||
|
||||
import scala.Tuple2;
|
||||
|
||||
import static org.apache.hudi.metadata.HoodieTableMetadataUtil.getCompletedMetadataPartitions;
|
||||
import static org.apache.hudi.metadata.MetadataPartitionType.BLOOM_FILTERS;
|
||||
|
||||
/**
|
||||
@@ -81,7 +80,7 @@ public class SparkHoodieBloomIndexHelper extends BaseHoodieBloomIndexHelper {
|
||||
|
||||
JavaRDD<List<HoodieKeyLookupResult>> keyLookupResultRDD;
|
||||
if (config.getBloomIndexUseMetadata()
|
||||
&& getCompletedMetadataPartitions(hoodieTable.getMetaClient().getTableConfig())
|
||||
&& hoodieTable.getMetaClient().getTableConfig().getMetadataPartitions()
|
||||
.contains(BLOOM_FILTERS.getPartitionPath())) {
|
||||
// Step 1: Sort by file id
|
||||
JavaRDD<Tuple2<String, HoodieKey>> sortedFileIdAndKeyPairs =
|
||||
|
||||
@@ -149,7 +149,6 @@ import static org.apache.hudi.common.model.WriteOperationType.DELETE;
|
||||
import static org.apache.hudi.common.model.WriteOperationType.INSERT;
|
||||
import static org.apache.hudi.common.model.WriteOperationType.UPSERT;
|
||||
import static org.apache.hudi.common.testutils.HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA;
|
||||
import static org.apache.hudi.metadata.HoodieTableMetadataUtil.getCompletedMetadataPartitions;
|
||||
import static org.apache.hudi.metadata.MetadataPartitionType.BLOOM_FILTERS;
|
||||
import static org.apache.hudi.metadata.MetadataPartitionType.COLUMN_STATS;
|
||||
import static org.apache.hudi.metadata.MetadataPartitionType.FILES;
|
||||
@@ -240,9 +239,9 @@ public class TestHoodieBackedMetadata extends TestHoodieMetadataBase {
|
||||
HoodieTableMetaClient.reload(metaClient);
|
||||
HoodieTableConfig tableConfig = metaClient.getTableConfig();
|
||||
assertFalse(tableConfig.getMetadataPartitions().isEmpty());
|
||||
assertTrue(getCompletedMetadataPartitions(tableConfig).contains(FILES.getPartitionPath()));
|
||||
assertFalse(getCompletedMetadataPartitions(tableConfig).contains(COLUMN_STATS.getPartitionPath()));
|
||||
assertFalse(getCompletedMetadataPartitions(tableConfig).contains(BLOOM_FILTERS.getPartitionPath()));
|
||||
assertTrue(tableConfig.getMetadataPartitions().contains(FILES.getPartitionPath()));
|
||||
assertFalse(tableConfig.getMetadataPartitions().contains(COLUMN_STATS.getPartitionPath()));
|
||||
assertFalse(tableConfig.getMetadataPartitions().contains(BLOOM_FILTERS.getPartitionPath()));
|
||||
|
||||
// enable column stats and run 1 upserts
|
||||
HoodieWriteConfig cfgWithColStatsEnabled = HoodieWriteConfig.newBuilder()
|
||||
@@ -265,9 +264,9 @@ public class TestHoodieBackedMetadata extends TestHoodieMetadataBase {
|
||||
HoodieTableMetaClient.reload(metaClient);
|
||||
tableConfig = metaClient.getTableConfig();
|
||||
assertFalse(tableConfig.getMetadataPartitions().isEmpty());
|
||||
assertTrue(getCompletedMetadataPartitions(tableConfig).contains(FILES.getPartitionPath()));
|
||||
assertTrue(getCompletedMetadataPartitions(tableConfig).contains(COLUMN_STATS.getPartitionPath()));
|
||||
assertFalse(getCompletedMetadataPartitions(tableConfig).contains(BLOOM_FILTERS.getPartitionPath()));
|
||||
assertTrue(tableConfig.getMetadataPartitions().contains(FILES.getPartitionPath()));
|
||||
assertTrue(tableConfig.getMetadataPartitions().contains(COLUMN_STATS.getPartitionPath()));
|
||||
assertFalse(tableConfig.getMetadataPartitions().contains(BLOOM_FILTERS.getPartitionPath()));
|
||||
|
||||
// disable column stats and run 1 upsert
|
||||
HoodieWriteConfig cfgWithColStatsDisabled = HoodieWriteConfig.newBuilder()
|
||||
@@ -291,9 +290,9 @@ public class TestHoodieBackedMetadata extends TestHoodieMetadataBase {
|
||||
HoodieTableMetaClient.reload(metaClient);
|
||||
tableConfig = metaClient.getTableConfig();
|
||||
assertFalse(tableConfig.getMetadataPartitions().isEmpty());
|
||||
assertTrue(getCompletedMetadataPartitions(tableConfig).contains(FILES.getPartitionPath()));
|
||||
assertFalse(getCompletedMetadataPartitions(tableConfig).contains(COLUMN_STATS.getPartitionPath()));
|
||||
assertFalse(getCompletedMetadataPartitions(tableConfig).contains(BLOOM_FILTERS.getPartitionPath()));
|
||||
assertTrue(tableConfig.getMetadataPartitions().contains(FILES.getPartitionPath()));
|
||||
assertFalse(tableConfig.getMetadataPartitions().contains(COLUMN_STATS.getPartitionPath()));
|
||||
assertFalse(tableConfig.getMetadataPartitions().contains(BLOOM_FILTERS.getPartitionPath()));
|
||||
|
||||
// enable bloom filter as well as column stats and run 1 upsert
|
||||
HoodieWriteConfig cfgWithBloomFilterEnabled = HoodieWriteConfig.newBuilder()
|
||||
@@ -317,9 +316,9 @@ public class TestHoodieBackedMetadata extends TestHoodieMetadataBase {
|
||||
HoodieTableMetaClient.reload(metaClient);
|
||||
tableConfig = metaClient.getTableConfig();
|
||||
assertFalse(tableConfig.getMetadataPartitions().isEmpty());
|
||||
assertTrue(getCompletedMetadataPartitions(tableConfig).contains(FILES.getPartitionPath()));
|
||||
assertTrue(getCompletedMetadataPartitions(tableConfig).contains(COLUMN_STATS.getPartitionPath()));
|
||||
assertTrue(getCompletedMetadataPartitions(tableConfig).contains(BLOOM_FILTERS.getPartitionPath()));
|
||||
assertTrue(tableConfig.getMetadataPartitions().contains(FILES.getPartitionPath()));
|
||||
assertTrue(tableConfig.getMetadataPartitions().contains(COLUMN_STATS.getPartitionPath()));
|
||||
assertTrue(tableConfig.getMetadataPartitions().contains(BLOOM_FILTERS.getPartitionPath()));
|
||||
}
|
||||
|
||||
@Test
|
||||
@@ -360,7 +359,7 @@ public class TestHoodieBackedMetadata extends TestHoodieMetadataBase {
|
||||
|
||||
HoodieTableConfig hoodieTableConfig2 =
|
||||
new HoodieTableConfig(this.fs, metaClient.getMetaPath(), writeConfig2.getPayloadClass());
|
||||
assertEquals(Collections.emptyList(), hoodieTableConfig2.getMetadataPartitions());
|
||||
assertEquals(Collections.emptySet(), hoodieTableConfig2.getMetadataPartitions());
|
||||
// Assert metadata table folder is deleted
|
||||
assertFalse(metaClient.getFs().exists(
|
||||
new Path(HoodieTableMetadata.getMetadataTableBasePath(writeConfig2.getBasePath()))));
|
||||
|
||||
@@ -79,7 +79,6 @@ import scala.Tuple2;
|
||||
|
||||
import static org.apache.hudi.common.testutils.SchemaTestUtil.getSchemaFromResource;
|
||||
import static org.apache.hudi.metadata.HoodieTableMetadataUtil.deleteMetadataPartition;
|
||||
import static org.apache.hudi.metadata.HoodieTableMetadataUtil.getCompletedMetadataPartitions;
|
||||
import static org.apache.hudi.metadata.HoodieTableMetadataUtil.metadataPartitionExists;
|
||||
import static org.apache.hudi.metadata.MetadataPartitionType.COLUMN_STATS;
|
||||
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||
@@ -234,7 +233,7 @@ public class TestHoodieIndex extends TestHoodieMetadataBase {
|
||||
// check column_stats partition exists
|
||||
metaClient = HoodieTableMetaClient.reload(metaClient);
|
||||
assertTrue(metadataPartitionExists(metaClient.getBasePath(), context, COLUMN_STATS));
|
||||
assertTrue(getCompletedMetadataPartitions(metaClient.getTableConfig()).contains(COLUMN_STATS.getPartitionPath()));
|
||||
assertTrue(metaClient.getTableConfig().getMetadataPartitions().contains(COLUMN_STATS.getPartitionPath()));
|
||||
|
||||
// delete the column_stats partition
|
||||
deleteMetadataPartition(metaClient.getBasePath(), context, COLUMN_STATS);
|
||||
|
||||
Reference in New Issue
Block a user