diff --git a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java index 822fff32e..5a3cbe93d 100644 --- a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java +++ b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java @@ -1221,9 +1221,16 @@ public class HoodieTableMetadataUtil { if (isBootstrapCompleted) { final List latestFileSlices = HoodieTableMetadataUtil .getPartitionLatestFileSlices(metaClient.get(), fsView, partitionType.getPartitionPath()); + if (latestFileSlices.size() == 0 && !partitionType.getPartitionPath().equals(MetadataPartitionType.FILES.getPartitionPath())) { + return getFileGroupCount(partitionType, metadataConfig); + } return Math.max(latestFileSlices.size(), 1); } + return getFileGroupCount(partitionType, metadataConfig); + } + + private static int getFileGroupCount(MetadataPartitionType partitionType, final HoodieMetadataConfig metadataConfig) { switch (partitionType) { case BLOOM_FILTERS: return metadataConfig.getBloomFilterIndexFileGroupCount(); diff --git a/hudi-common/src/main/java/org/apache/hudi/metadata/MetadataPartitionType.java b/hudi-common/src/main/java/org/apache/hudi/metadata/MetadataPartitionType.java index 85505c025..ddb76ca25 100644 --- a/hudi-common/src/main/java/org/apache/hudi/metadata/MetadataPartitionType.java +++ b/hudi-common/src/main/java/org/apache/hudi/metadata/MetadataPartitionType.java @@ -48,7 +48,7 @@ public enum MetadataPartitionType { return fileIdPrefix; } - void setFileGroupCount(final int fileGroupCount) { + public void setFileGroupCount(final int fileGroupCount) { this.fileGroupCount = fileGroupCount; } diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieIndexer.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieIndexer.java index 1b1cbca45..45f4f2717 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieIndexer.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieIndexer.java @@ -27,6 +27,7 @@ import org.apache.hudi.common.model.HoodieRecordPayload; import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.util.Option; +import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.exception.HoodieException; import org.apache.hudi.exception.HoodieIndexException; import org.apache.hudi.metadata.MetadataPartitionType; @@ -226,7 +227,8 @@ public class HoodieIndexer { } private Option doSchedule(SparkRDDWriteClient client) { - List partitionTypes = getRequestedPartitionTypes(cfg.indexTypes); + HoodieMetadataConfig metadataConfig = getHoodieMetadataConfig(); + List partitionTypes = getRequestedPartitionTypes(cfg.indexTypes, Option.of(metadataConfig)); checkArgument(partitionTypes.size() == 1, "Currently, only one index type can be scheduled at a time."); if (!isMetadataInitialized() && !partitionTypes.contains(MetadataPartitionType.FILES)) { throw new HoodieException("Metadata table is not yet initialized. Initialize FILES partition before any other partition " + Arrays.toString(partitionTypes.toArray())); @@ -241,6 +243,12 @@ public class HoodieIndexer { return indexingInstant; } + private HoodieMetadataConfig getHoodieMetadataConfig() { + props.setProperty(HoodieWriteConfig.BASE_PATH.key(), cfg.basePath); + HoodieWriteConfig dataTableWriteConfig = HoodieWriteConfig.newBuilder().withProps(props).build(); + return dataTableWriteConfig.getMetadataConfig(); + } + private boolean indexExists(List partitionTypes) { Set indexedMetadataPartitions = metaClient.getTableConfig().getMetadataPartitions(); Set requestedIndexPartitionPaths = partitionTypes.stream().map(MetadataPartitionType::getPartitionPath).collect(Collectors.toSet()); @@ -291,7 +299,7 @@ public class HoodieIndexer { } private int dropIndex(JavaSparkContext jsc) throws Exception { - List partitionTypes = getRequestedPartitionTypes(cfg.indexTypes); + List partitionTypes = getRequestedPartitionTypes(cfg.indexTypes, Option.empty()); String schemaStr = UtilHelpers.getSchemaFromLatestInstant(metaClient); try (SparkRDDWriteClient client = UtilHelpers.createHoodieClient(jsc, cfg.basePath, schemaStr, cfg.parallelism, Option.empty(), props)) { client.dropIndex(partitionTypes); @@ -316,16 +324,27 @@ public class HoodieIndexer { boolean isIndexBuiltForAllRequestedTypes(List indexPartitionInfos) { Set indexedPartitions = indexPartitionInfos.stream() .map(HoodieIndexPartitionInfo::getMetadataPartitionPath).collect(Collectors.toSet()); - Set requestedPartitions = getRequestedPartitionTypes(cfg.indexTypes).stream() + Set requestedPartitions = getRequestedPartitionTypes(cfg.indexTypes, Option.empty()).stream() .map(MetadataPartitionType::getPartitionPath).collect(Collectors.toSet()); requestedPartitions.removeAll(indexedPartitions); return requestedPartitions.isEmpty(); } - List getRequestedPartitionTypes(String indexTypes) { + List getRequestedPartitionTypes(String indexTypes, Option metadataConfig) { List requestedIndexTypes = Arrays.asList(indexTypes.split(",")); return requestedIndexTypes.stream() - .map(p -> MetadataPartitionType.valueOf(p.toUpperCase(Locale.ROOT))) - .collect(Collectors.toList()); + .map(p -> { + MetadataPartitionType metadataPartitionType = MetadataPartitionType.valueOf(p.toUpperCase(Locale.ROOT)); + if (metadataConfig.isPresent()) { // this is expected to be non-null during scheduling where file groups for a given partition are instantiated for the first time. + if (!metadataPartitionType.getPartitionPath().equals(MetadataPartitionType.FILES.toString())) { + if (metadataPartitionType.getPartitionPath().equals(MetadataPartitionType.COLUMN_STATS.getPartitionPath())) { + metadataPartitionType.setFileGroupCount(metadataConfig.get().getColumnStatsIndexFileGroupCount()); + } else if (metadataPartitionType.getPartitionPath().equals(MetadataPartitionType.BLOOM_FILTERS.getPartitionPath())) { + metadataPartitionType.setFileGroupCount(metadataConfig.get().getBloomFilterIndexFileGroupCount()); + } + } + } + return metadataPartitionType; + }).collect(Collectors.toList()); } } diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHoodieIndexer.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHoodieIndexer.java index 7117041ad..9c4fc0766 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHoodieIndexer.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHoodieIndexer.java @@ -27,7 +27,9 @@ import org.apache.hudi.client.WriteStatus; import org.apache.hudi.client.common.HoodieSparkEngineContext; import org.apache.hudi.common.config.HoodieMetadataConfig; import org.apache.hudi.common.engine.HoodieEngineContext; +import org.apache.hudi.common.model.FileSlice; import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.table.timeline.versioning.TimelineLayoutVersion; import org.apache.hudi.common.testutils.HoodieCommonTestHarness; @@ -35,6 +37,7 @@ import org.apache.hudi.common.testutils.HoodieTestDataGenerator; import org.apache.hudi.common.testutils.HoodieTestUtils; import org.apache.hudi.common.util.Option; import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.metadata.HoodieTableMetadataUtil; import org.apache.hudi.metadata.MetadataPartitionType; import org.apache.hudi.testutils.providers.SparkProvider; @@ -46,6 +49,9 @@ import org.apache.spark.sql.SQLContext; import org.apache.spark.sql.SparkSession; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; import java.io.IOException; import java.util.Arrays; @@ -53,6 +59,7 @@ import java.util.Collections; import java.util.List; import java.util.Objects; import java.util.Set; +import java.util.stream.Stream; import static org.apache.hudi.common.table.HoodieTableMetaClient.reload; import static org.apache.hudi.common.table.timeline.HoodieInstant.State.REQUESTED; @@ -74,6 +81,7 @@ public class TestHoodieIndexer extends HoodieCommonTestHarness implements SparkP private static transient SQLContext sqlContext; private static transient JavaSparkContext jsc; private static transient HoodieSparkEngineContext context; + private static int colStatsFileGroupCount; @BeforeEach public void init() throws IOException { @@ -86,6 +94,7 @@ public class TestHoodieIndexer extends HoodieCommonTestHarness implements SparkP sqlContext = spark.sqlContext(); jsc = new JavaSparkContext(spark.sparkContext()); context = new HoodieSparkEngineContext(jsc); + colStatsFileGroupCount = HoodieMetadataConfig.METADATA_INDEX_COLUMN_STATS_FILE_GROUP_COUNT.defaultValue(); } initPath(); initMetaClient(); @@ -106,7 +115,7 @@ public class TestHoodieIndexer extends HoodieCommonTestHarness implements SparkP config.tableName = "indexer_test"; config.indexTypes = "FILES,BLOOM_FILTERS,COLUMN_STATS"; HoodieIndexer indexer = new HoodieIndexer(jsc, config); - List partitionTypes = indexer.getRequestedPartitionTypes(config.indexTypes); + List partitionTypes = indexer.getRequestedPartitionTypes(config.indexTypes, Option.empty()); assertTrue(partitionTypes.contains(FILES)); assertTrue(partitionTypes.contains(BLOOM_FILTERS)); assertTrue(partitionTypes.contains(COLUMN_STATS)); @@ -145,7 +154,7 @@ public class TestHoodieIndexer extends HoodieCommonTestHarness implements SparkP assertTrue(reload(metaClient).getTableConfig().getMetadataPartitions().contains(BLOOM_FILTERS.getPartitionPath())); // build indexer config which has only column_stats enabled (files and bloom filter is already enabled) - indexMetadataPartitionsAndAssert(COLUMN_STATS, Arrays.asList(new MetadataPartitionType[]{FILES, BLOOM_FILTERS}), Collections.emptyList()); + indexMetadataPartitionsAndAssert(COLUMN_STATS, Arrays.asList(new MetadataPartitionType[] {FILES, BLOOM_FILTERS}), Collections.emptyList()); } @Test @@ -160,7 +169,41 @@ public class TestHoodieIndexer extends HoodieCommonTestHarness implements SparkP assertFalse(reload(metaClient).getTableConfig().getMetadataPartitions().contains(FILES.getPartitionPath())); // build indexer config which has only files enabled - indexMetadataPartitionsAndAssert(FILES, Collections.emptyList(), Arrays.asList(new MetadataPartitionType[]{COLUMN_STATS, BLOOM_FILTERS})); + indexMetadataPartitionsAndAssert(FILES, Collections.emptyList(), Arrays.asList(new MetadataPartitionType[] {COLUMN_STATS, BLOOM_FILTERS})); + } + + private static Stream colStatsFileGroupCountParams() { + return Stream.of( + Arguments.of(1), + Arguments.of(2), + Arguments.of(4), + Arguments.of(8) + ); + } + + @ParameterizedTest + @MethodSource("colStatsFileGroupCountParams") + public void testColStatsFileGroupCount(int colStatsFileGroupCount) { + TestHoodieIndexer.colStatsFileGroupCount = colStatsFileGroupCount; + initTestDataGenerator(); + tableName = "indexer_test"; + // enable files and bloom_filters on the regular write client + HoodieMetadataConfig.Builder metadataConfigBuilder = getMetadataConfigBuilder(false, false).withMetadataIndexBloomFilter(true); + initializeWriteClient(metadataConfigBuilder.build()); + + // validate table config + assertFalse(reload(metaClient).getTableConfig().getMetadataPartitions().contains(FILES.getPartitionPath())); + + // build indexer config which has only files enabled + indexMetadataPartitionsAndAssert(FILES, Collections.emptyList(), Arrays.asList(new MetadataPartitionType[] {COLUMN_STATS, BLOOM_FILTERS})); + + // build indexer config which has only col stats enabled + indexMetadataPartitionsAndAssert(COLUMN_STATS, Collections.singletonList(FILES), Arrays.asList(new MetadataPartitionType[] {BLOOM_FILTERS})); + + HoodieTableMetaClient metadataMetaClient = HoodieTableMetaClient.builder().setConf(metaClient.getHadoopConf()).setBasePath(metaClient.getMetaPath() + "/metadata").build(); + List partitionFileSlices = + HoodieTableMetadataUtil.getPartitionLatestMergedFileSlices(metadataMetaClient, COLUMN_STATS.getPartitionPath()); + assertEquals(partitionFileSlices.size(), colStatsFileGroupCount); } /** @@ -198,7 +241,15 @@ public class TestHoodieIndexer extends HoodieCommonTestHarness implements SparkP assertFalse(metadataPartitionExists(basePath, context, FILES)); // trigger FILES partition and indexing should succeed. - indexMetadataPartitionsAndAssert(FILES, Collections.emptyList(), Arrays.asList(new MetadataPartitionType[]{COLUMN_STATS, BLOOM_FILTERS})); + indexMetadataPartitionsAndAssert(FILES, Collections.emptyList(), Arrays.asList(new MetadataPartitionType[] {COLUMN_STATS, BLOOM_FILTERS})); + + // build indexer config which has only col stats enabled + indexMetadataPartitionsAndAssert(COLUMN_STATS, Collections.singletonList(FILES), Arrays.asList(new MetadataPartitionType[] {BLOOM_FILTERS})); + + HoodieTableMetaClient metadataMetaClient = HoodieTableMetaClient.builder().setConf(metaClient.getHadoopConf()).setBasePath(metaClient.getMetaPath() + "/metadata").build(); + List partitionFileSlices = + HoodieTableMetadataUtil.getPartitionLatestMergedFileSlices(metadataMetaClient, COLUMN_STATS.getPartitionPath()); + assertEquals(partitionFileSlices.size(), HoodieMetadataConfig.METADATA_INDEX_COLUMN_STATS_FILE_GROUP_COUNT.defaultValue()); } private void initializeWriteClient(HoodieMetadataConfig metadataConfig) { @@ -222,6 +273,9 @@ public class TestHoodieIndexer extends HoodieCommonTestHarness implements SparkP config.indexTypes = partitionTypeToIndex.name(); config.runningMode = SCHEDULE_AND_EXECUTE; config.propsFilePath = propsPath; + if (partitionTypeToIndex.getPartitionPath().equals(COLUMN_STATS.getPartitionPath())) { + config.configs.add(HoodieMetadataConfig.METADATA_INDEX_COLUMN_STATS_FILE_GROUP_COUNT.key() + "=" + colStatsFileGroupCount); + } // start the indexer and validate files index is completely built out HoodieIndexer indexer = new HoodieIndexer(jsc, config); assertEquals(0, indexer.start(0));