[HUDI-2016] Fixed bootstrap of Metadata Table when some actions are in progress. (#3083)
Metadata Table cannot be bootstrapped when any action is in progress. This is detected by the presence of inflight or requested instants. The bootstrapping is initiated in preWrite and postWrite of each commit. So bootstrapping will be retried again until it succeeds. Also added metrics for when the bootstrapping fails or a table is re-bootstrapped. This will help detect tables which are not getting bootstrapped.
This commit is contained in:
@@ -250,13 +250,14 @@ public abstract class HoodieBackedTableMetadataWriter implements HoodieTableMeta
|
||||
rebootstrap = true;
|
||||
} else if (datasetMetaClient.getActiveTimeline().isBeforeTimelineStarts(latestMetadataInstant.get().getTimestamp())) {
|
||||
LOG.warn("Metadata Table will need to be re-bootstrapped as un-synced instants have been archived."
|
||||
+ "latestMetadataInstant=" + latestMetadataInstant.get().getTimestamp()
|
||||
+ " latestMetadataInstant=" + latestMetadataInstant.get().getTimestamp()
|
||||
+ ", latestDatasetInstant=" + datasetMetaClient.getActiveTimeline().firstInstant().get().getTimestamp());
|
||||
rebootstrap = true;
|
||||
}
|
||||
}
|
||||
|
||||
if (rebootstrap) {
|
||||
metrics.ifPresent(m -> m.updateMetrics(HoodieMetadataMetrics.REBOOTSTRAP_STR, 1));
|
||||
LOG.info("Deleting Metadata Table directory so that it can be re-bootstrapped");
|
||||
datasetMetaClient.getFs().delete(new Path(metadataWriteConfig.getBasePath()), true);
|
||||
exists = false;
|
||||
@@ -264,8 +265,9 @@ public abstract class HoodieBackedTableMetadataWriter implements HoodieTableMeta
|
||||
|
||||
if (!exists) {
|
||||
// Initialize for the first time by listing partitions and files directly from the file system
|
||||
bootstrapFromFilesystem(engineContext, datasetMetaClient);
|
||||
metrics.ifPresent(m -> m.updateMetrics(HoodieMetadataMetrics.INITIALIZE_STR, timer.endTimer()));
|
||||
if (bootstrapFromFilesystem(engineContext, datasetMetaClient)) {
|
||||
metrics.ifPresent(m -> m.updateMetrics(HoodieMetadataMetrics.INITIALIZE_STR, timer.endTimer()));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -274,22 +276,22 @@ public abstract class HoodieBackedTableMetadataWriter implements HoodieTableMeta
|
||||
*
|
||||
* @param datasetMetaClient {@code HoodieTableMetaClient} for the dataset
|
||||
*/
|
||||
private void bootstrapFromFilesystem(HoodieEngineContext engineContext, HoodieTableMetaClient datasetMetaClient) throws IOException {
|
||||
private boolean bootstrapFromFilesystem(HoodieEngineContext engineContext, HoodieTableMetaClient datasetMetaClient) throws IOException {
|
||||
ValidationUtils.checkState(enabled, "Metadata table cannot be initialized as it is not enabled");
|
||||
|
||||
// If there is no commit on the dataset yet, use the SOLO_COMMIT_TIMESTAMP as the instant time for initial commit
|
||||
// Otherwise, we use the timestamp of the instant which does not have any non-completed instants before it.
|
||||
Option<HoodieInstant> latestInstant = Option.empty();
|
||||
boolean foundNonComplete = false;
|
||||
for (HoodieInstant instant : datasetMetaClient.getActiveTimeline().getInstants().collect(Collectors.toList())) {
|
||||
if (!instant.isCompleted()) {
|
||||
foundNonComplete = true;
|
||||
} else if (!foundNonComplete) {
|
||||
latestInstant = Option.of(instant);
|
||||
}
|
||||
// We can only bootstrap if there are no pending operations on the dataset
|
||||
Option<HoodieInstant> pendingInstantOption = Option.fromJavaOptional(datasetMetaClient.getActiveTimeline()
|
||||
.getReverseOrderedInstants().filter(i -> !i.isCompleted()).findFirst());
|
||||
if (pendingInstantOption.isPresent()) {
|
||||
metrics.ifPresent(m -> m.updateMetrics(HoodieMetadataMetrics.BOOTSTRAP_ERR_STR, 1));
|
||||
LOG.warn("Cannot bootstrap metadata table as operation is in progress: " + pendingInstantOption.get());
|
||||
return false;
|
||||
}
|
||||
|
||||
String createInstantTime = latestInstant.map(HoodieInstant::getTimestamp).orElse(SOLO_COMMIT_TIMESTAMP);
|
||||
// If there is no commit on the dataset yet, use the SOLO_COMMIT_TIMESTAMP as the instant time for initial commit
|
||||
// Otherwise, we use the latest commit timestamp.
|
||||
String createInstantTime = datasetMetaClient.getActiveTimeline().getReverseOrderedInstants().findFirst()
|
||||
.map(HoodieInstant::getTimestamp).orElse(SOLO_COMMIT_TIMESTAMP);
|
||||
LOG.info("Creating a new metadata table in " + metadataWriteConfig.getBasePath() + " at instant " + createInstantTime);
|
||||
|
||||
HoodieTableMetaClient.withPropertyBuilder()
|
||||
@@ -335,6 +337,7 @@ public abstract class HoodieBackedTableMetadataWriter implements HoodieTableMeta
|
||||
|
||||
LOG.info("Committing " + partitionToFileStatus.size() + " partitions and " + stats[0] + " files to metadata");
|
||||
update(commitMetadata, createInstantTime);
|
||||
return true;
|
||||
}
|
||||
|
||||
/**
|
||||
|
||||
Reference in New Issue
Block a user