From 3d8fc78c6675cf2df79bd359276bc1ba3440befd Mon Sep 17 00:00:00 2001 From: Sagar Sumit Date: Tue, 12 Apr 2022 03:46:06 +0530 Subject: [PATCH] [HUDI-3844] Update props in indexer based on table config (#5293) --- .../HoodieBackedTableMetadataWriter.java | 31 ++++--- .../index/ScheduleIndexActionExecutor.java | 2 +- .../apache/hudi/utilities/HoodieIndexer.java | 18 ++++ .../hudi/utilities/TestHoodieIndexer.java | 87 +++++++++++++++++-- 4 files changed, 118 insertions(+), 20 deletions(-) diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java index 4faac22a8..eaae1ad6c 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java @@ -379,21 +379,24 @@ public abstract class HoodieBackedTableMetadataWriter implements HoodieTableMeta } // if metadata table exists, then check if any of the enabled partition types needs to be initialized - Set inflightAndCompletedPartitions = getInflightAndCompletedMetadataPartitions(dataMetaClient.getTableConfig()); - List partitionsToInit = this.enabledPartitionTypes.stream() - .filter(p -> !inflightAndCompletedPartitions.contains(p.getPartitionPath()) && !MetadataPartitionType.FILES.equals(p)) - .collect(Collectors.toList()); + // NOTE: It needs to be guarded by async index config because if that is enabled then initialization happens through the index scheduler. + if (!dataWriteConfig.isMetadataAsyncIndex()) { + Set inflightAndCompletedPartitions = getInflightAndCompletedMetadataPartitions(dataMetaClient.getTableConfig()); + LOG.info("Async metadata indexing enabled and following partitions already initialized: " + inflightAndCompletedPartitions); + List partitionsToInit = this.enabledPartitionTypes.stream() + .filter(p -> !inflightAndCompletedPartitions.contains(p.getPartitionPath()) && !MetadataPartitionType.FILES.equals(p)) + .collect(Collectors.toList()); + // if there are no partitions to initialize or there is a pending operation, then don't initialize in this round + if (partitionsToInit.isEmpty() || anyPendingDataInstant(dataMetaClient, inflightInstantTimestamp)) { + return; + } - // if there are no partitions to initialize or there is a pending operation, then don't initialize in this round - if (partitionsToInit.isEmpty() || anyPendingDataInstant(dataMetaClient, inflightInstantTimestamp)) { - return; + String createInstantTime = getInitialCommitInstantTime(dataMetaClient); + initTableMetadata(); // re-init certain flags in BaseTableMetadata + initializeEnabledFileGroups(dataMetaClient, createInstantTime, partitionsToInit); + initialCommit(createInstantTime, partitionsToInit); + updateInitializedPartitionsInTableConfig(partitionsToInit); } - - String createInstantTime = getInitialCommitInstantTime(dataMetaClient); - initTableMetadata(); // re-init certain flags in BaseTableMetadata - initializeEnabledFileGroups(dataMetaClient, createInstantTime, partitionsToInit); - initialCommit(createInstantTime, partitionsToInit); - updateInitializedPartitionsInTableConfig(partitionsToInit); } private boolean metadataTableExists(HoodieTableMetaClient dataMetaClient, @@ -557,6 +560,8 @@ public abstract class HoodieBackedTableMetadataWriter implements HoodieTableMeta List pendingDataInstant = dataMetaClient.getActiveTimeline() .getInstants().filter(i -> !i.isCompleted()) .filter(i -> !inflightInstantTimestamp.isPresent() || !i.getTimestamp().equals(inflightInstantTimestamp.get())) + // regular writers should not be blocked due to pending indexing action + .filter(i -> !HoodieTimeline.INDEXING_ACTION.equals(i.getAction())) .collect(Collectors.toList()); if (!pendingDataInstant.isEmpty()) { diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/index/ScheduleIndexActionExecutor.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/index/ScheduleIndexActionExecutor.java index 5afebee8a..cfd975c50 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/index/ScheduleIndexActionExecutor.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/index/ScheduleIndexActionExecutor.java @@ -87,7 +87,7 @@ public class ScheduleIndexActionExecutor Set requestedPartitions = partitionIndexTypes.stream().map(MetadataPartitionType::getPartitionPath).collect(Collectors.toSet()); requestedPartitions.removeAll(indexesInflightOrCompleted); if (!requestedPartitions.isEmpty()) { - LOG.warn(String.format("Following partitions already exist or inflight: %s. Going to index only these partitions: %s", + LOG.warn(String.format("Following partitions already exist or inflight: %s. Going to schedule indexing of only these partitions: %s", indexesInflightOrCompleted, requestedPartitions)); } else { LOG.error("All requested index types are inflight or completed: " + partitionIndexTypes); diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieIndexer.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieIndexer.java index 2741e2b98..501f9296b 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieIndexer.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieIndexer.java @@ -46,9 +46,14 @@ import java.util.Locale; import java.util.Set; import java.util.stream.Collectors; +import static org.apache.hudi.common.config.HoodieMetadataConfig.ENABLE_METADATA_INDEX_BLOOM_FILTER; +import static org.apache.hudi.common.config.HoodieMetadataConfig.ENABLE_METADATA_INDEX_COLUMN_STATS; import static org.apache.hudi.common.util.StringUtils.isNullOrEmpty; import static org.apache.hudi.common.util.ValidationUtils.checkArgument; +import static org.apache.hudi.metadata.HoodieTableMetadataUtil.PARTITION_NAME_BLOOM_FILTERS; +import static org.apache.hudi.metadata.HoodieTableMetadataUtil.PARTITION_NAME_COLUMN_STATS; import static org.apache.hudi.metadata.HoodieTableMetadataUtil.getCompletedMetadataPartitions; +import static org.apache.hudi.metadata.HoodieTableMetadataUtil.getInflightAndCompletedMetadataPartitions; import static org.apache.hudi.utilities.UtilHelpers.EXECUTE; import static org.apache.hudi.utilities.UtilHelpers.SCHEDULE; import static org.apache.hudi.utilities.UtilHelpers.SCHEDULE_AND_EXECUTE; @@ -164,6 +169,19 @@ public class HoodieIndexer { return -1; } + // all inflight or completed metadata partitions have already been initialized + // so enable corresponding indexes in the props so that they're not deleted + Set initializedMetadataPartitions = getInflightAndCompletedMetadataPartitions(metaClient.getTableConfig()); + LOG.info("Setting props for: " + initializedMetadataPartitions); + initializedMetadataPartitions.forEach(p -> { + if (PARTITION_NAME_COLUMN_STATS.equals(p)) { + props.setProperty(ENABLE_METADATA_INDEX_COLUMN_STATS.key(), "true"); + } + if (PARTITION_NAME_BLOOM_FILTERS.equals(p)) { + props.setProperty(ENABLE_METADATA_INDEX_BLOOM_FILTER.key(), "true"); + } + }); + return UtilHelpers.retry(retry, () -> { switch (cfg.runningMode.toLowerCase()) { case SCHEDULE: { diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHoodieIndexer.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHoodieIndexer.java index 9ce8eef31..695ca8841 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHoodieIndexer.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHoodieIndexer.java @@ -23,14 +23,21 @@ import org.apache.hudi.avro.model.HoodieIndexCommitMetadata; import org.apache.hudi.avro.model.HoodieIndexPartitionInfo; import org.apache.hudi.client.HoodieReadClient; import org.apache.hudi.client.SparkRDDWriteClient; +import org.apache.hudi.client.WriteStatus; import org.apache.hudi.client.common.HoodieSparkEngineContext; +import org.apache.hudi.common.config.HoodieMetadataConfig; import org.apache.hudi.common.engine.HoodieEngineContext; +import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.common.table.timeline.versioning.TimelineLayoutVersion; import org.apache.hudi.common.testutils.HoodieCommonTestHarness; -import org.apache.hudi.common.testutils.HoodieTestUtils; +import org.apache.hudi.common.testutils.HoodieTestDataGenerator; +import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.metadata.HoodieTableMetadataUtil; import org.apache.hudi.metadata.MetadataPartitionType; import org.apache.hudi.testutils.providers.SparkProvider; import org.apache.spark.SparkConf; +import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.sql.SQLContext; import org.apache.spark.sql.SparkSession; @@ -40,7 +47,14 @@ import org.junit.jupiter.api.Test; import java.io.IOException; import java.util.Arrays; import java.util.List; +import java.util.Objects; +import static org.apache.hudi.common.table.HoodieTableMetaClient.reload; +import static org.apache.hudi.metadata.MetadataPartitionType.BLOOM_FILTERS; +import static org.apache.hudi.metadata.MetadataPartitionType.COLUMN_STATS; +import static org.apache.hudi.metadata.MetadataPartitionType.FILES; +import static org.apache.hudi.testutils.Assertions.assertNoWriteErrors; +import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertTrue; @@ -64,7 +78,7 @@ public class TestHoodieIndexer extends HoodieCommonTestHarness implements SparkP context = new HoodieSparkEngineContext(jsc); } initPath(); - metaClient = HoodieTestUtils.init(basePath, getTableType()); + initMetaClient(); } @Test @@ -75,9 +89,9 @@ public class TestHoodieIndexer extends HoodieCommonTestHarness implements SparkP config.indexTypes = "FILES,BLOOM_FILTERS,COLUMN_STATS"; HoodieIndexer indexer = new HoodieIndexer(jsc, config); List partitionTypes = indexer.getRequestedPartitionTypes(config.indexTypes); - assertFalse(partitionTypes.contains(MetadataPartitionType.FILES)); - assertTrue(partitionTypes.contains(MetadataPartitionType.BLOOM_FILTERS)); - assertTrue(partitionTypes.contains(MetadataPartitionType.COLUMN_STATS)); + assertFalse(partitionTypes.contains(FILES)); + assertTrue(partitionTypes.contains(BLOOM_FILTERS)); + assertTrue(partitionTypes.contains(COLUMN_STATS)); } @Test @@ -90,7 +104,7 @@ public class TestHoodieIndexer extends HoodieCommonTestHarness implements SparkP HoodieIndexCommitMetadata commitMetadata = HoodieIndexCommitMetadata.newBuilder() .setIndexPartitionInfos(Arrays.asList(new HoodieIndexPartitionInfo( 1, - MetadataPartitionType.COLUMN_STATS.getPartitionPath(), + COLUMN_STATS.getPartitionPath(), "0000"))) .build(); assertFalse(indexer.isIndexBuiltForAllRequestedTypes(commitMetadata.getIndexPartitionInfos())); @@ -100,6 +114,67 @@ public class TestHoodieIndexer extends HoodieCommonTestHarness implements SparkP assertTrue(indexer.isIndexBuiltForAllRequestedTypes(commitMetadata.getIndexPartitionInfos())); } + @Test + public void testIndexerWithNotAllIndexesEnabled() { + initTestDataGenerator(); + String tableName = "indexer_test"; + HoodieWriteConfig.Builder writeConfigBuilder = getWriteConfigBuilder(basePath, tableName); + // enable files and bloom_filters on the regular write client + HoodieMetadataConfig.Builder metadataConfigBuilder = getMetadataConfigBuilder(true, false).withMetadataIndexBloomFilter(true); + HoodieWriteConfig writeConfig = writeConfigBuilder.withMetadataConfig(metadataConfigBuilder.build()).build(); + // do one upsert with synchronous metadata update + SparkRDDWriteClient writeClient = new SparkRDDWriteClient(context, writeConfig); + String instant = "0001"; + writeClient.startCommitWithTime(instant); + List records = dataGen.generateInserts(instant, 100); + JavaRDD result = writeClient.upsert(jsc.parallelize(records, 1), instant); + List statuses = result.collect(); + assertNoWriteErrors(statuses); + + // validate table config + assertTrue(HoodieTableMetadataUtil.getCompletedMetadataPartitions(reload(metaClient).getTableConfig()).contains(FILES.getPartitionPath())); + assertTrue(HoodieTableMetadataUtil.getCompletedMetadataPartitions(reload(metaClient).getTableConfig()).contains(BLOOM_FILTERS.getPartitionPath())); + + // build indexer config which has only column_stats enabled (files is enabled by default) + HoodieIndexer.Config config = new HoodieIndexer.Config(); + String propsPath = Objects.requireNonNull(getClass().getClassLoader().getResource("delta-streamer-config/indexer.properties")).getPath(); + config.basePath = basePath; + config.tableName = tableName; + config.indexTypes = "COLUMN_STATS"; + config.runningMode = "scheduleAndExecute"; + config.propsFilePath = propsPath; + // start the indexer and validate column_stats index is also complete + HoodieIndexer indexer = new HoodieIndexer(jsc, config); + assertEquals(0, indexer.start(0)); + + // validate table config + assertTrue(HoodieTableMetadataUtil.getCompletedMetadataPartitions(reload(metaClient).getTableConfig()).contains(FILES.getPartitionPath())); + assertTrue(HoodieTableMetadataUtil.getCompletedMetadataPartitions(reload(metaClient).getTableConfig()).contains(BLOOM_FILTERS.getPartitionPath())); + assertTrue(HoodieTableMetadataUtil.getCompletedMetadataPartitions(reload(metaClient).getTableConfig()).contains(COLUMN_STATS.getPartitionPath())); + // validate metadata partitions actually exist + assertTrue(HoodieTableMetadataUtil.metadataPartitionExists(basePath, context, FILES)); + assertTrue(HoodieTableMetadataUtil.metadataPartitionExists(basePath, context, COLUMN_STATS)); + assertTrue(HoodieTableMetadataUtil.metadataPartitionExists(basePath, context, BLOOM_FILTERS)); + } + + private static HoodieWriteConfig.Builder getWriteConfigBuilder(String basePath, String tableName) { + return HoodieWriteConfig.newBuilder() + .withPath(basePath) + .withSchema(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA) + .withParallelism(2, 2) + .withBulkInsertParallelism(2) + .withFinalizeWriteParallelism(2) + .withDeleteParallelism(2) + .withTimelineLayoutVersion(TimelineLayoutVersion.CURR_VERSION) + .forTable(tableName); + } + + private static HoodieMetadataConfig.Builder getMetadataConfigBuilder(boolean enable, boolean asyncIndex) { + return HoodieMetadataConfig.newBuilder() + .enable(enable) + .withAsyncIndex(asyncIndex); + } + @Override public HoodieEngineContext context() { return context;