1
0

Fix file group count issue with metadata partitions (#5892)

This commit is contained in:
Sivabalan Narayanan
2022-07-17 18:49:29 -07:00
committed by GitHub
parent ded197800a
commit 3964c476e0
4 changed files with 91 additions and 11 deletions

View File

@@ -1221,9 +1221,16 @@ public class HoodieTableMetadataUtil {
if (isBootstrapCompleted) {
final List<FileSlice> latestFileSlices = HoodieTableMetadataUtil
.getPartitionLatestFileSlices(metaClient.get(), fsView, partitionType.getPartitionPath());
if (latestFileSlices.size() == 0 && !partitionType.getPartitionPath().equals(MetadataPartitionType.FILES.getPartitionPath())) {
return getFileGroupCount(partitionType, metadataConfig);
}
return Math.max(latestFileSlices.size(), 1);
}
return getFileGroupCount(partitionType, metadataConfig);
}
private static int getFileGroupCount(MetadataPartitionType partitionType, final HoodieMetadataConfig metadataConfig) {
switch (partitionType) {
case BLOOM_FILTERS:
return metadataConfig.getBloomFilterIndexFileGroupCount();

View File

@@ -48,7 +48,7 @@ public enum MetadataPartitionType {
return fileIdPrefix;
}
void setFileGroupCount(final int fileGroupCount) {
public void setFileGroupCount(final int fileGroupCount) {
this.fileGroupCount = fileGroupCount;
}

View File

@@ -27,6 +27,7 @@ import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieIndexException;
import org.apache.hudi.metadata.MetadataPartitionType;
@@ -226,7 +227,8 @@ public class HoodieIndexer {
}
private Option<String> doSchedule(SparkRDDWriteClient<HoodieRecordPayload> client) {
List<MetadataPartitionType> partitionTypes = getRequestedPartitionTypes(cfg.indexTypes);
HoodieMetadataConfig metadataConfig = getHoodieMetadataConfig();
List<MetadataPartitionType> partitionTypes = getRequestedPartitionTypes(cfg.indexTypes, Option.of(metadataConfig));
checkArgument(partitionTypes.size() == 1, "Currently, only one index type can be scheduled at a time.");
if (!isMetadataInitialized() && !partitionTypes.contains(MetadataPartitionType.FILES)) {
throw new HoodieException("Metadata table is not yet initialized. Initialize FILES partition before any other partition " + Arrays.toString(partitionTypes.toArray()));
@@ -241,6 +243,12 @@ public class HoodieIndexer {
return indexingInstant;
}
private HoodieMetadataConfig getHoodieMetadataConfig() {
props.setProperty(HoodieWriteConfig.BASE_PATH.key(), cfg.basePath);
HoodieWriteConfig dataTableWriteConfig = HoodieWriteConfig.newBuilder().withProps(props).build();
return dataTableWriteConfig.getMetadataConfig();
}
private boolean indexExists(List<MetadataPartitionType> partitionTypes) {
Set<String> indexedMetadataPartitions = metaClient.getTableConfig().getMetadataPartitions();
Set<String> requestedIndexPartitionPaths = partitionTypes.stream().map(MetadataPartitionType::getPartitionPath).collect(Collectors.toSet());
@@ -291,7 +299,7 @@ public class HoodieIndexer {
}
private int dropIndex(JavaSparkContext jsc) throws Exception {
List<MetadataPartitionType> partitionTypes = getRequestedPartitionTypes(cfg.indexTypes);
List<MetadataPartitionType> partitionTypes = getRequestedPartitionTypes(cfg.indexTypes, Option.empty());
String schemaStr = UtilHelpers.getSchemaFromLatestInstant(metaClient);
try (SparkRDDWriteClient<HoodieRecordPayload> client = UtilHelpers.createHoodieClient(jsc, cfg.basePath, schemaStr, cfg.parallelism, Option.empty(), props)) {
client.dropIndex(partitionTypes);
@@ -316,16 +324,27 @@ public class HoodieIndexer {
boolean isIndexBuiltForAllRequestedTypes(List<HoodieIndexPartitionInfo> indexPartitionInfos) {
Set<String> indexedPartitions = indexPartitionInfos.stream()
.map(HoodieIndexPartitionInfo::getMetadataPartitionPath).collect(Collectors.toSet());
Set<String> requestedPartitions = getRequestedPartitionTypes(cfg.indexTypes).stream()
Set<String> requestedPartitions = getRequestedPartitionTypes(cfg.indexTypes, Option.empty()).stream()
.map(MetadataPartitionType::getPartitionPath).collect(Collectors.toSet());
requestedPartitions.removeAll(indexedPartitions);
return requestedPartitions.isEmpty();
}
List<MetadataPartitionType> getRequestedPartitionTypes(String indexTypes) {
List<MetadataPartitionType> getRequestedPartitionTypes(String indexTypes, Option<HoodieMetadataConfig> metadataConfig) {
List<String> requestedIndexTypes = Arrays.asList(indexTypes.split(","));
return requestedIndexTypes.stream()
.map(p -> MetadataPartitionType.valueOf(p.toUpperCase(Locale.ROOT)))
.collect(Collectors.toList());
.map(p -> {
MetadataPartitionType metadataPartitionType = MetadataPartitionType.valueOf(p.toUpperCase(Locale.ROOT));
if (metadataConfig.isPresent()) { // this is expected to be non-null during scheduling where file groups for a given partition are instantiated for the first time.
if (!metadataPartitionType.getPartitionPath().equals(MetadataPartitionType.FILES.toString())) {
if (metadataPartitionType.getPartitionPath().equals(MetadataPartitionType.COLUMN_STATS.getPartitionPath())) {
metadataPartitionType.setFileGroupCount(metadataConfig.get().getColumnStatsIndexFileGroupCount());
} else if (metadataPartitionType.getPartitionPath().equals(MetadataPartitionType.BLOOM_FILTERS.getPartitionPath())) {
metadataPartitionType.setFileGroupCount(metadataConfig.get().getBloomFilterIndexFileGroupCount());
}
}
}
return metadataPartitionType;
}).collect(Collectors.toList());
}
}

View File

@@ -27,7 +27,9 @@ import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.client.common.HoodieSparkEngineContext;
import org.apache.hudi.common.config.HoodieMetadataConfig;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.model.FileSlice;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.versioning.TimelineLayoutVersion;
import org.apache.hudi.common.testutils.HoodieCommonTestHarness;
@@ -35,6 +37,7 @@ import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
import org.apache.hudi.common.testutils.HoodieTestUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.metadata.HoodieTableMetadataUtil;
import org.apache.hudi.metadata.MetadataPartitionType;
import org.apache.hudi.testutils.providers.SparkProvider;
@@ -46,6 +49,9 @@ import org.apache.spark.sql.SQLContext;
import org.apache.spark.sql.SparkSession;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;
import java.io.IOException;
import java.util.Arrays;
@@ -53,6 +59,7 @@ import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.Set;
import java.util.stream.Stream;
import static org.apache.hudi.common.table.HoodieTableMetaClient.reload;
import static org.apache.hudi.common.table.timeline.HoodieInstant.State.REQUESTED;
@@ -74,6 +81,7 @@ public class TestHoodieIndexer extends HoodieCommonTestHarness implements SparkP
private static transient SQLContext sqlContext;
private static transient JavaSparkContext jsc;
private static transient HoodieSparkEngineContext context;
private static int colStatsFileGroupCount;
@BeforeEach
public void init() throws IOException {
@@ -86,6 +94,7 @@ public class TestHoodieIndexer extends HoodieCommonTestHarness implements SparkP
sqlContext = spark.sqlContext();
jsc = new JavaSparkContext(spark.sparkContext());
context = new HoodieSparkEngineContext(jsc);
colStatsFileGroupCount = HoodieMetadataConfig.METADATA_INDEX_COLUMN_STATS_FILE_GROUP_COUNT.defaultValue();
}
initPath();
initMetaClient();
@@ -106,7 +115,7 @@ public class TestHoodieIndexer extends HoodieCommonTestHarness implements SparkP
config.tableName = "indexer_test";
config.indexTypes = "FILES,BLOOM_FILTERS,COLUMN_STATS";
HoodieIndexer indexer = new HoodieIndexer(jsc, config);
List<MetadataPartitionType> partitionTypes = indexer.getRequestedPartitionTypes(config.indexTypes);
List<MetadataPartitionType> partitionTypes = indexer.getRequestedPartitionTypes(config.indexTypes, Option.empty());
assertTrue(partitionTypes.contains(FILES));
assertTrue(partitionTypes.contains(BLOOM_FILTERS));
assertTrue(partitionTypes.contains(COLUMN_STATS));
@@ -145,7 +154,7 @@ public class TestHoodieIndexer extends HoodieCommonTestHarness implements SparkP
assertTrue(reload(metaClient).getTableConfig().getMetadataPartitions().contains(BLOOM_FILTERS.getPartitionPath()));
// build indexer config which has only column_stats enabled (files and bloom filter is already enabled)
indexMetadataPartitionsAndAssert(COLUMN_STATS, Arrays.asList(new MetadataPartitionType[]{FILES, BLOOM_FILTERS}), Collections.emptyList());
indexMetadataPartitionsAndAssert(COLUMN_STATS, Arrays.asList(new MetadataPartitionType[] {FILES, BLOOM_FILTERS}), Collections.emptyList());
}
@Test
@@ -160,7 +169,41 @@ public class TestHoodieIndexer extends HoodieCommonTestHarness implements SparkP
assertFalse(reload(metaClient).getTableConfig().getMetadataPartitions().contains(FILES.getPartitionPath()));
// build indexer config which has only files enabled
indexMetadataPartitionsAndAssert(FILES, Collections.emptyList(), Arrays.asList(new MetadataPartitionType[]{COLUMN_STATS, BLOOM_FILTERS}));
indexMetadataPartitionsAndAssert(FILES, Collections.emptyList(), Arrays.asList(new MetadataPartitionType[] {COLUMN_STATS, BLOOM_FILTERS}));
}
private static Stream<Arguments> colStatsFileGroupCountParams() {
return Stream.of(
Arguments.of(1),
Arguments.of(2),
Arguments.of(4),
Arguments.of(8)
);
}
@ParameterizedTest
@MethodSource("colStatsFileGroupCountParams")
public void testColStatsFileGroupCount(int colStatsFileGroupCount) {
TestHoodieIndexer.colStatsFileGroupCount = colStatsFileGroupCount;
initTestDataGenerator();
tableName = "indexer_test";
// enable files and bloom_filters on the regular write client
HoodieMetadataConfig.Builder metadataConfigBuilder = getMetadataConfigBuilder(false, false).withMetadataIndexBloomFilter(true);
initializeWriteClient(metadataConfigBuilder.build());
// validate table config
assertFalse(reload(metaClient).getTableConfig().getMetadataPartitions().contains(FILES.getPartitionPath()));
// build indexer config which has only files enabled
indexMetadataPartitionsAndAssert(FILES, Collections.emptyList(), Arrays.asList(new MetadataPartitionType[] {COLUMN_STATS, BLOOM_FILTERS}));
// build indexer config which has only col stats enabled
indexMetadataPartitionsAndAssert(COLUMN_STATS, Collections.singletonList(FILES), Arrays.asList(new MetadataPartitionType[] {BLOOM_FILTERS}));
HoodieTableMetaClient metadataMetaClient = HoodieTableMetaClient.builder().setConf(metaClient.getHadoopConf()).setBasePath(metaClient.getMetaPath() + "/metadata").build();
List<FileSlice> partitionFileSlices =
HoodieTableMetadataUtil.getPartitionLatestMergedFileSlices(metadataMetaClient, COLUMN_STATS.getPartitionPath());
assertEquals(partitionFileSlices.size(), colStatsFileGroupCount);
}
/**
@@ -198,7 +241,15 @@ public class TestHoodieIndexer extends HoodieCommonTestHarness implements SparkP
assertFalse(metadataPartitionExists(basePath, context, FILES));
// trigger FILES partition and indexing should succeed.
indexMetadataPartitionsAndAssert(FILES, Collections.emptyList(), Arrays.asList(new MetadataPartitionType[]{COLUMN_STATS, BLOOM_FILTERS}));
indexMetadataPartitionsAndAssert(FILES, Collections.emptyList(), Arrays.asList(new MetadataPartitionType[] {COLUMN_STATS, BLOOM_FILTERS}));
// build indexer config which has only col stats enabled
indexMetadataPartitionsAndAssert(COLUMN_STATS, Collections.singletonList(FILES), Arrays.asList(new MetadataPartitionType[] {BLOOM_FILTERS}));
HoodieTableMetaClient metadataMetaClient = HoodieTableMetaClient.builder().setConf(metaClient.getHadoopConf()).setBasePath(metaClient.getMetaPath() + "/metadata").build();
List<FileSlice> partitionFileSlices =
HoodieTableMetadataUtil.getPartitionLatestMergedFileSlices(metadataMetaClient, COLUMN_STATS.getPartitionPath());
assertEquals(partitionFileSlices.size(), HoodieMetadataConfig.METADATA_INDEX_COLUMN_STATS_FILE_GROUP_COUNT.defaultValue());
}
private void initializeWriteClient(HoodieMetadataConfig metadataConfig) {
@@ -222,6 +273,9 @@ public class TestHoodieIndexer extends HoodieCommonTestHarness implements SparkP
config.indexTypes = partitionTypeToIndex.name();
config.runningMode = SCHEDULE_AND_EXECUTE;
config.propsFilePath = propsPath;
if (partitionTypeToIndex.getPartitionPath().equals(COLUMN_STATS.getPartitionPath())) {
config.configs.add(HoodieMetadataConfig.METADATA_INDEX_COLUMN_STATS_FILE_GROUP_COUNT.key() + "=" + colStatsFileGroupCount);
}
// start the indexer and validate files index is completely built out
HoodieIndexer indexer = new HoodieIndexer(jsc, config);
assertEquals(0, indexer.start(0));