1
0

[HUDI-3844] Update props in indexer based on table config (#5293)

This commit is contained in:
Sagar Sumit
2022-04-12 03:46:06 +05:30
committed by GitHub
parent 458fdd5611
commit 3d8fc78c66
4 changed files with 118 additions and 20 deletions

View File

@@ -379,21 +379,24 @@ public abstract class HoodieBackedTableMetadataWriter implements HoodieTableMeta
} }
// if metadata table exists, then check if any of the enabled partition types needs to be initialized // if metadata table exists, then check if any of the enabled partition types needs to be initialized
Set<String> inflightAndCompletedPartitions = getInflightAndCompletedMetadataPartitions(dataMetaClient.getTableConfig()); // NOTE: It needs to be guarded by async index config because if that is enabled then initialization happens through the index scheduler.
List<MetadataPartitionType> partitionsToInit = this.enabledPartitionTypes.stream() if (!dataWriteConfig.isMetadataAsyncIndex()) {
.filter(p -> !inflightAndCompletedPartitions.contains(p.getPartitionPath()) && !MetadataPartitionType.FILES.equals(p)) Set<String> inflightAndCompletedPartitions = getInflightAndCompletedMetadataPartitions(dataMetaClient.getTableConfig());
.collect(Collectors.toList()); LOG.info("Async metadata indexing enabled and following partitions already initialized: " + inflightAndCompletedPartitions);
List<MetadataPartitionType> partitionsToInit = this.enabledPartitionTypes.stream()
.filter(p -> !inflightAndCompletedPartitions.contains(p.getPartitionPath()) && !MetadataPartitionType.FILES.equals(p))
.collect(Collectors.toList());
// if there are no partitions to initialize or there is a pending operation, then don't initialize in this round
if (partitionsToInit.isEmpty() || anyPendingDataInstant(dataMetaClient, inflightInstantTimestamp)) {
return;
}
// if there are no partitions to initialize or there is a pending operation, then don't initialize in this round String createInstantTime = getInitialCommitInstantTime(dataMetaClient);
if (partitionsToInit.isEmpty() || anyPendingDataInstant(dataMetaClient, inflightInstantTimestamp)) { initTableMetadata(); // re-init certain flags in BaseTableMetadata
return; initializeEnabledFileGroups(dataMetaClient, createInstantTime, partitionsToInit);
initialCommit(createInstantTime, partitionsToInit);
updateInitializedPartitionsInTableConfig(partitionsToInit);
} }
String createInstantTime = getInitialCommitInstantTime(dataMetaClient);
initTableMetadata(); // re-init certain flags in BaseTableMetadata
initializeEnabledFileGroups(dataMetaClient, createInstantTime, partitionsToInit);
initialCommit(createInstantTime, partitionsToInit);
updateInitializedPartitionsInTableConfig(partitionsToInit);
} }
private <T extends SpecificRecordBase> boolean metadataTableExists(HoodieTableMetaClient dataMetaClient, private <T extends SpecificRecordBase> boolean metadataTableExists(HoodieTableMetaClient dataMetaClient,
@@ -557,6 +560,8 @@ public abstract class HoodieBackedTableMetadataWriter implements HoodieTableMeta
List<HoodieInstant> pendingDataInstant = dataMetaClient.getActiveTimeline() List<HoodieInstant> pendingDataInstant = dataMetaClient.getActiveTimeline()
.getInstants().filter(i -> !i.isCompleted()) .getInstants().filter(i -> !i.isCompleted())
.filter(i -> !inflightInstantTimestamp.isPresent() || !i.getTimestamp().equals(inflightInstantTimestamp.get())) .filter(i -> !inflightInstantTimestamp.isPresent() || !i.getTimestamp().equals(inflightInstantTimestamp.get()))
// regular writers should not be blocked due to pending indexing action
.filter(i -> !HoodieTimeline.INDEXING_ACTION.equals(i.getAction()))
.collect(Collectors.toList()); .collect(Collectors.toList());
if (!pendingDataInstant.isEmpty()) { if (!pendingDataInstant.isEmpty()) {

View File

@@ -87,7 +87,7 @@ public class ScheduleIndexActionExecutor<T extends HoodieRecordPayload, I, K, O>
Set<String> requestedPartitions = partitionIndexTypes.stream().map(MetadataPartitionType::getPartitionPath).collect(Collectors.toSet()); Set<String> requestedPartitions = partitionIndexTypes.stream().map(MetadataPartitionType::getPartitionPath).collect(Collectors.toSet());
requestedPartitions.removeAll(indexesInflightOrCompleted); requestedPartitions.removeAll(indexesInflightOrCompleted);
if (!requestedPartitions.isEmpty()) { if (!requestedPartitions.isEmpty()) {
LOG.warn(String.format("Following partitions already exist or inflight: %s. Going to index only these partitions: %s", LOG.warn(String.format("Following partitions already exist or inflight: %s. Going to schedule indexing of only these partitions: %s",
indexesInflightOrCompleted, requestedPartitions)); indexesInflightOrCompleted, requestedPartitions));
} else { } else {
LOG.error("All requested index types are inflight or completed: " + partitionIndexTypes); LOG.error("All requested index types are inflight or completed: " + partitionIndexTypes);

View File

@@ -46,9 +46,14 @@ import java.util.Locale;
import java.util.Set; import java.util.Set;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import static org.apache.hudi.common.config.HoodieMetadataConfig.ENABLE_METADATA_INDEX_BLOOM_FILTER;
import static org.apache.hudi.common.config.HoodieMetadataConfig.ENABLE_METADATA_INDEX_COLUMN_STATS;
import static org.apache.hudi.common.util.StringUtils.isNullOrEmpty; import static org.apache.hudi.common.util.StringUtils.isNullOrEmpty;
import static org.apache.hudi.common.util.ValidationUtils.checkArgument; import static org.apache.hudi.common.util.ValidationUtils.checkArgument;
import static org.apache.hudi.metadata.HoodieTableMetadataUtil.PARTITION_NAME_BLOOM_FILTERS;
import static org.apache.hudi.metadata.HoodieTableMetadataUtil.PARTITION_NAME_COLUMN_STATS;
import static org.apache.hudi.metadata.HoodieTableMetadataUtil.getCompletedMetadataPartitions; import static org.apache.hudi.metadata.HoodieTableMetadataUtil.getCompletedMetadataPartitions;
import static org.apache.hudi.metadata.HoodieTableMetadataUtil.getInflightAndCompletedMetadataPartitions;
import static org.apache.hudi.utilities.UtilHelpers.EXECUTE; import static org.apache.hudi.utilities.UtilHelpers.EXECUTE;
import static org.apache.hudi.utilities.UtilHelpers.SCHEDULE; import static org.apache.hudi.utilities.UtilHelpers.SCHEDULE;
import static org.apache.hudi.utilities.UtilHelpers.SCHEDULE_AND_EXECUTE; import static org.apache.hudi.utilities.UtilHelpers.SCHEDULE_AND_EXECUTE;
@@ -164,6 +169,19 @@ public class HoodieIndexer {
return -1; return -1;
} }
// all inflight or completed metadata partitions have already been initialized
// so enable corresponding indexes in the props so that they're not deleted
Set<String> initializedMetadataPartitions = getInflightAndCompletedMetadataPartitions(metaClient.getTableConfig());
LOG.info("Setting props for: " + initializedMetadataPartitions);
initializedMetadataPartitions.forEach(p -> {
if (PARTITION_NAME_COLUMN_STATS.equals(p)) {
props.setProperty(ENABLE_METADATA_INDEX_COLUMN_STATS.key(), "true");
}
if (PARTITION_NAME_BLOOM_FILTERS.equals(p)) {
props.setProperty(ENABLE_METADATA_INDEX_BLOOM_FILTER.key(), "true");
}
});
return UtilHelpers.retry(retry, () -> { return UtilHelpers.retry(retry, () -> {
switch (cfg.runningMode.toLowerCase()) { switch (cfg.runningMode.toLowerCase()) {
case SCHEDULE: { case SCHEDULE: {

View File

@@ -23,14 +23,21 @@ import org.apache.hudi.avro.model.HoodieIndexCommitMetadata;
import org.apache.hudi.avro.model.HoodieIndexPartitionInfo; import org.apache.hudi.avro.model.HoodieIndexPartitionInfo;
import org.apache.hudi.client.HoodieReadClient; import org.apache.hudi.client.HoodieReadClient;
import org.apache.hudi.client.SparkRDDWriteClient; import org.apache.hudi.client.SparkRDDWriteClient;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.client.common.HoodieSparkEngineContext; import org.apache.hudi.client.common.HoodieSparkEngineContext;
import org.apache.hudi.common.config.HoodieMetadataConfig;
import org.apache.hudi.common.engine.HoodieEngineContext; import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.table.timeline.versioning.TimelineLayoutVersion;
import org.apache.hudi.common.testutils.HoodieCommonTestHarness; import org.apache.hudi.common.testutils.HoodieCommonTestHarness;
import org.apache.hudi.common.testutils.HoodieTestUtils; import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.metadata.HoodieTableMetadataUtil;
import org.apache.hudi.metadata.MetadataPartitionType; import org.apache.hudi.metadata.MetadataPartitionType;
import org.apache.hudi.testutils.providers.SparkProvider; import org.apache.hudi.testutils.providers.SparkProvider;
import org.apache.spark.SparkConf; import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.SQLContext; import org.apache.spark.sql.SQLContext;
import org.apache.spark.sql.SparkSession; import org.apache.spark.sql.SparkSession;
@@ -40,7 +47,14 @@ import org.junit.jupiter.api.Test;
import java.io.IOException; import java.io.IOException;
import java.util.Arrays; import java.util.Arrays;
import java.util.List; import java.util.List;
import java.util.Objects;
import static org.apache.hudi.common.table.HoodieTableMetaClient.reload;
import static org.apache.hudi.metadata.MetadataPartitionType.BLOOM_FILTERS;
import static org.apache.hudi.metadata.MetadataPartitionType.COLUMN_STATS;
import static org.apache.hudi.metadata.MetadataPartitionType.FILES;
import static org.apache.hudi.testutils.Assertions.assertNoWriteErrors;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue; import static org.junit.jupiter.api.Assertions.assertTrue;
@@ -64,7 +78,7 @@ public class TestHoodieIndexer extends HoodieCommonTestHarness implements SparkP
context = new HoodieSparkEngineContext(jsc); context = new HoodieSparkEngineContext(jsc);
} }
initPath(); initPath();
metaClient = HoodieTestUtils.init(basePath, getTableType()); initMetaClient();
} }
@Test @Test
@@ -75,9 +89,9 @@ public class TestHoodieIndexer extends HoodieCommonTestHarness implements SparkP
config.indexTypes = "FILES,BLOOM_FILTERS,COLUMN_STATS"; config.indexTypes = "FILES,BLOOM_FILTERS,COLUMN_STATS";
HoodieIndexer indexer = new HoodieIndexer(jsc, config); HoodieIndexer indexer = new HoodieIndexer(jsc, config);
List<MetadataPartitionType> partitionTypes = indexer.getRequestedPartitionTypes(config.indexTypes); List<MetadataPartitionType> partitionTypes = indexer.getRequestedPartitionTypes(config.indexTypes);
assertFalse(partitionTypes.contains(MetadataPartitionType.FILES)); assertFalse(partitionTypes.contains(FILES));
assertTrue(partitionTypes.contains(MetadataPartitionType.BLOOM_FILTERS)); assertTrue(partitionTypes.contains(BLOOM_FILTERS));
assertTrue(partitionTypes.contains(MetadataPartitionType.COLUMN_STATS)); assertTrue(partitionTypes.contains(COLUMN_STATS));
} }
@Test @Test
@@ -90,7 +104,7 @@ public class TestHoodieIndexer extends HoodieCommonTestHarness implements SparkP
HoodieIndexCommitMetadata commitMetadata = HoodieIndexCommitMetadata.newBuilder() HoodieIndexCommitMetadata commitMetadata = HoodieIndexCommitMetadata.newBuilder()
.setIndexPartitionInfos(Arrays.asList(new HoodieIndexPartitionInfo( .setIndexPartitionInfos(Arrays.asList(new HoodieIndexPartitionInfo(
1, 1,
MetadataPartitionType.COLUMN_STATS.getPartitionPath(), COLUMN_STATS.getPartitionPath(),
"0000"))) "0000")))
.build(); .build();
assertFalse(indexer.isIndexBuiltForAllRequestedTypes(commitMetadata.getIndexPartitionInfos())); assertFalse(indexer.isIndexBuiltForAllRequestedTypes(commitMetadata.getIndexPartitionInfos()));
@@ -100,6 +114,67 @@ public class TestHoodieIndexer extends HoodieCommonTestHarness implements SparkP
assertTrue(indexer.isIndexBuiltForAllRequestedTypes(commitMetadata.getIndexPartitionInfos())); assertTrue(indexer.isIndexBuiltForAllRequestedTypes(commitMetadata.getIndexPartitionInfos()));
} }
@Test
public void testIndexerWithNotAllIndexesEnabled() {
initTestDataGenerator();
String tableName = "indexer_test";
HoodieWriteConfig.Builder writeConfigBuilder = getWriteConfigBuilder(basePath, tableName);
// enable files and bloom_filters on the regular write client
HoodieMetadataConfig.Builder metadataConfigBuilder = getMetadataConfigBuilder(true, false).withMetadataIndexBloomFilter(true);
HoodieWriteConfig writeConfig = writeConfigBuilder.withMetadataConfig(metadataConfigBuilder.build()).build();
// do one upsert with synchronous metadata update
SparkRDDWriteClient writeClient = new SparkRDDWriteClient(context, writeConfig);
String instant = "0001";
writeClient.startCommitWithTime(instant);
List<HoodieRecord> records = dataGen.generateInserts(instant, 100);
JavaRDD<WriteStatus> result = writeClient.upsert(jsc.parallelize(records, 1), instant);
List<WriteStatus> statuses = result.collect();
assertNoWriteErrors(statuses);
// validate table config
assertTrue(HoodieTableMetadataUtil.getCompletedMetadataPartitions(reload(metaClient).getTableConfig()).contains(FILES.getPartitionPath()));
assertTrue(HoodieTableMetadataUtil.getCompletedMetadataPartitions(reload(metaClient).getTableConfig()).contains(BLOOM_FILTERS.getPartitionPath()));
// build indexer config which has only column_stats enabled (files is enabled by default)
HoodieIndexer.Config config = new HoodieIndexer.Config();
String propsPath = Objects.requireNonNull(getClass().getClassLoader().getResource("delta-streamer-config/indexer.properties")).getPath();
config.basePath = basePath;
config.tableName = tableName;
config.indexTypes = "COLUMN_STATS";
config.runningMode = "scheduleAndExecute";
config.propsFilePath = propsPath;
// start the indexer and validate column_stats index is also complete
HoodieIndexer indexer = new HoodieIndexer(jsc, config);
assertEquals(0, indexer.start(0));
// validate table config
assertTrue(HoodieTableMetadataUtil.getCompletedMetadataPartitions(reload(metaClient).getTableConfig()).contains(FILES.getPartitionPath()));
assertTrue(HoodieTableMetadataUtil.getCompletedMetadataPartitions(reload(metaClient).getTableConfig()).contains(BLOOM_FILTERS.getPartitionPath()));
assertTrue(HoodieTableMetadataUtil.getCompletedMetadataPartitions(reload(metaClient).getTableConfig()).contains(COLUMN_STATS.getPartitionPath()));
// validate metadata partitions actually exist
assertTrue(HoodieTableMetadataUtil.metadataPartitionExists(basePath, context, FILES));
assertTrue(HoodieTableMetadataUtil.metadataPartitionExists(basePath, context, COLUMN_STATS));
assertTrue(HoodieTableMetadataUtil.metadataPartitionExists(basePath, context, BLOOM_FILTERS));
}
private static HoodieWriteConfig.Builder getWriteConfigBuilder(String basePath, String tableName) {
return HoodieWriteConfig.newBuilder()
.withPath(basePath)
.withSchema(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA)
.withParallelism(2, 2)
.withBulkInsertParallelism(2)
.withFinalizeWriteParallelism(2)
.withDeleteParallelism(2)
.withTimelineLayoutVersion(TimelineLayoutVersion.CURR_VERSION)
.forTable(tableName);
}
private static HoodieMetadataConfig.Builder getMetadataConfigBuilder(boolean enable, boolean asyncIndex) {
return HoodieMetadataConfig.newBuilder()
.enable(enable)
.withAsyncIndex(asyncIndex);
}
@Override @Override
public HoodieEngineContext context() { public HoodieEngineContext context() {
return context; return context;