[HUDI-4197] Fix Async indexer to support building FILES partition (#5766)
- When async indexer is invoked only with "FILES" partition, it fails. Fixing it to work with Async indexer. Also, if metadata table itself is not initialized, and if someone is looking to build indexes via AsyncIndexer, first they are expected to index "FILES" partition followed by other partitions. In general, we have a limitation of building only one index at a time w/ AsyncIndexer and hence. Have added guards to ensure these conditions are met.
This commit is contained in:
committed by
GitHub
parent
4f6fc726d0
commit
21b903fddb
@@ -51,6 +51,7 @@ import org.apache.log4j.LogManager;
|
||||
import org.apache.log4j.Logger;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.Locale;
|
||||
import java.util.Set;
|
||||
@@ -120,45 +121,64 @@ public class RunIndexActionExecutor<T extends HoodieRecordPayload, I, K, O> exte
|
||||
if (indexPartitionInfos == null || indexPartitionInfos.isEmpty()) {
|
||||
throw new HoodieIndexException(String.format("No partitions to index for instant: %s", instantTime));
|
||||
}
|
||||
boolean firstTimeInitializingMetadataTable = false;
|
||||
HoodieIndexPartitionInfo fileIndexPartitionInfo = null;
|
||||
if (indexPartitionInfos.size() == 1 && indexPartitionInfos.get(0).getMetadataPartitionPath().equals(MetadataPartitionType.FILES.getPartitionPath())) {
|
||||
firstTimeInitializingMetadataTable = true;
|
||||
fileIndexPartitionInfo = indexPartitionInfos.get(0);
|
||||
}
|
||||
// ensure the metadata partitions for the requested indexes are not already available (or inflight)
|
||||
Set<String> indexesInflightOrCompleted = getInflightAndCompletedMetadataPartitions(table.getMetaClient().getTableConfig());
|
||||
Set<String> requestedPartitions = indexPartitionInfos.stream()
|
||||
.map(HoodieIndexPartitionInfo::getMetadataPartitionPath).collect(Collectors.toSet());
|
||||
requestedPartitions.retainAll(indexesInflightOrCompleted);
|
||||
if (!requestedPartitions.isEmpty()) {
|
||||
if (!firstTimeInitializingMetadataTable && !requestedPartitions.isEmpty()) {
|
||||
throw new HoodieIndexException(String.format("Following partitions already exist or inflight: %s", requestedPartitions));
|
||||
}
|
||||
|
||||
// transition requested indexInstant to inflight
|
||||
table.getActiveTimeline().transitionIndexRequestedToInflight(indexInstant, Option.empty());
|
||||
// start indexing for each partition
|
||||
HoodieTableMetadataWriter metadataWriter = table.getMetadataWriter(instantTime)
|
||||
.orElseThrow(() -> new HoodieIndexException(String.format("Could not get metadata writer to run index action for instant: %s", instantTime)));
|
||||
// this will only build index upto base instant as generated by the plan, we will be doing catchup later
|
||||
String indexUptoInstant = indexPartitionInfos.get(0).getIndexUptoInstant();
|
||||
LOG.info("Starting Index Building with base instant: " + indexUptoInstant);
|
||||
metadataWriter.buildMetadataPartitions(context, indexPartitionInfos);
|
||||
List<HoodieIndexPartitionInfo> finalIndexPartitionInfos = null;
|
||||
if (!firstTimeInitializingMetadataTable) {
|
||||
// start indexing for each partition
|
||||
HoodieTableMetadataWriter metadataWriter = table.getMetadataWriter(instantTime)
|
||||
.orElseThrow(() -> new HoodieIndexException(String.format("Could not get metadata writer to run index action for instant: %s", instantTime)));
|
||||
// this will only build index upto base instant as generated by the plan, we will be doing catchup later
|
||||
String indexUptoInstant = indexPartitionInfos.get(0).getIndexUptoInstant();
|
||||
LOG.info("Starting Index Building with base instant: " + indexUptoInstant);
|
||||
metadataWriter.buildMetadataPartitions(context, indexPartitionInfos);
|
||||
|
||||
// get remaining instants to catchup
|
||||
List<HoodieInstant> instantsToCatchup = getInstantsToCatchup(indexUptoInstant);
|
||||
LOG.info("Total remaining instants to index: " + instantsToCatchup.size());
|
||||
// get remaining instants to catchup
|
||||
List<HoodieInstant> instantsToCatchup = getInstantsToCatchup(indexUptoInstant);
|
||||
LOG.info("Total remaining instants to index: " + instantsToCatchup.size());
|
||||
|
||||
// reconcile with metadata table timeline
|
||||
String metadataBasePath = getMetadataTableBasePath(table.getMetaClient().getBasePath());
|
||||
HoodieTableMetaClient metadataMetaClient = HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(metadataBasePath).build();
|
||||
Set<String> metadataCompletedTimestamps = getCompletedArchivedAndActiveInstantsAfter(indexUptoInstant, metadataMetaClient).stream()
|
||||
.map(HoodieInstant::getTimestamp).collect(Collectors.toSet());
|
||||
// reconcile with metadata table timeline
|
||||
String metadataBasePath = getMetadataTableBasePath(table.getMetaClient().getBasePath());
|
||||
HoodieTableMetaClient metadataMetaClient = HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(metadataBasePath).build();
|
||||
Set<String> metadataCompletedTimestamps = getCompletedArchivedAndActiveInstantsAfter(indexUptoInstant, metadataMetaClient).stream()
|
||||
.map(HoodieInstant::getTimestamp).collect(Collectors.toSet());
|
||||
|
||||
// index catchup for all remaining instants with a timeout
|
||||
currentCaughtupInstant = indexUptoInstant;
|
||||
catchupWithInflightWriters(metadataWriter, instantsToCatchup, metadataMetaClient, metadataCompletedTimestamps);
|
||||
// save index commit metadata and update table config
|
||||
finalIndexPartitionInfos = indexPartitionInfos.stream()
|
||||
.map(info -> new HoodieIndexPartitionInfo(
|
||||
info.getVersion(),
|
||||
info.getMetadataPartitionPath(),
|
||||
currentCaughtupInstant))
|
||||
.collect(Collectors.toList());
|
||||
} else {
|
||||
String indexUptoInstant = fileIndexPartitionInfo.getIndexUptoInstant();
|
||||
// save index commit metadata and update table config
|
||||
finalIndexPartitionInfos = Collections.singletonList(fileIndexPartitionInfo).stream()
|
||||
.map(info -> new HoodieIndexPartitionInfo(
|
||||
info.getVersion(),
|
||||
info.getMetadataPartitionPath(),
|
||||
indexUptoInstant))
|
||||
.collect(Collectors.toList());
|
||||
}
|
||||
|
||||
// index catchup for all remaining instants with a timeout
|
||||
currentCaughtupInstant = indexUptoInstant;
|
||||
catchupWithInflightWriters(metadataWriter, instantsToCatchup, metadataMetaClient, metadataCompletedTimestamps);
|
||||
// save index commit metadata and update table config
|
||||
List<HoodieIndexPartitionInfo> finalIndexPartitionInfos = indexPartitionInfos.stream()
|
||||
.map(info -> new HoodieIndexPartitionInfo(
|
||||
info.getVersion(),
|
||||
info.getMetadataPartitionPath(),
|
||||
currentCaughtupInstant))
|
||||
.collect(Collectors.toList());
|
||||
HoodieIndexCommitMetadata indexCommitMetadata = HoodieIndexCommitMetadata.newBuilder()
|
||||
.setVersion(LATEST_INDEX_COMMIT_METADATA_VERSION).setIndexPartitionInfos(finalIndexPartitionInfos).build();
|
||||
updateTableConfigAndTimeline(indexInstant, finalIndexPartitionInfos, indexCommitMetadata);
|
||||
|
||||
@@ -105,7 +105,10 @@ public class ScheduleIndexActionExecutor<T extends HoodieRecordPayload, I, K, O>
|
||||
// in case FILES partition itself was not initialized before (i.e. metadata was never enabled), this will initialize synchronously
|
||||
HoodieTableMetadataWriter metadataWriter = table.getMetadataWriter(instantTime)
|
||||
.orElseThrow(() -> new HoodieIndexException(String.format("Could not get metadata writer to initialize filegroups for indexing for instant: %s", instantTime)));
|
||||
metadataWriter.initializeMetadataPartitions(table.getMetaClient(), finalPartitionsToIndex, indexInstant.getTimestamp());
|
||||
if (!finalPartitionsToIndex.get(0).getPartitionPath().equals(MetadataPartitionType.FILES.getPartitionPath())) {
|
||||
// initialize metadata partition only if not for FILES partition.
|
||||
metadataWriter.initializeMetadataPartitions(table.getMetaClient(), finalPartitionsToIndex, indexInstant.getTimestamp());
|
||||
}
|
||||
|
||||
// for each partitionToIndex add that time to the plan
|
||||
List<HoodieIndexPartitionInfo> indexPartitionInfos = finalPartitionsToIndex.stream()
|
||||
|
||||
Reference in New Issue
Block a user