[HUDI-2794] Guarding table service commits within a single lock to commit to both data table and metadata table (#4037)
* Fixing a single lock to commit table services across metadata table and data table * Addressing comments * rebasing with master
This commit is contained in:
committed by
GitHub
parent
b972aa5bf2
commit
7bb90e8caf
@@ -205,31 +205,19 @@ public class CleanActionExecutor<T extends HoodieRecordPayload, I, K, O> extends
|
||||
Option.of(timer.endTimer()),
|
||||
cleanStats
|
||||
);
|
||||
writeMetadata(metadata);
|
||||
if (!skipLocking) {
|
||||
this.txnManager.beginTransaction(Option.empty(), Option.empty());
|
||||
}
|
||||
writeTableMetadata(metadata);
|
||||
table.getActiveTimeline().transitionCleanInflightToComplete(inflightInstant,
|
||||
TimelineMetadataUtils.serializeCleanMetadata(metadata));
|
||||
LOG.info("Marked clean started on " + inflightInstant.getTimestamp() + " as complete");
|
||||
return metadata;
|
||||
} catch (IOException e) {
|
||||
throw new HoodieIOException("Failed to clean up after commit", e);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Update metadata table if available. Any update to metadata table happens within data table lock.
|
||||
* @param cleanMetadata instance of {@link HoodieCleanMetadata} to be applied to metadata.
|
||||
*/
|
||||
private void writeMetadata(HoodieCleanMetadata cleanMetadata) {
|
||||
if (config.isMetadataTableEnabled()) {
|
||||
try {
|
||||
if (!skipLocking) {
|
||||
this.txnManager.beginTransaction(Option.empty(), Option.empty());
|
||||
}
|
||||
writeTableMetadata(cleanMetadata);
|
||||
} finally {
|
||||
if (!skipLocking) {
|
||||
this.txnManager.endTransaction();
|
||||
}
|
||||
} finally {
|
||||
if (!skipLocking) {
|
||||
this.txnManager.endTransaction();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -255,30 +255,18 @@ public abstract class BaseRollbackActionExecutor<T extends HoodieRecordPayload,
|
||||
|
||||
protected void finishRollback(HoodieInstant inflightInstant, HoodieRollbackMetadata rollbackMetadata) throws HoodieIOException {
|
||||
try {
|
||||
writeToMetadata(rollbackMetadata);
|
||||
if (!skipLocking) {
|
||||
this.txnManager.beginTransaction(Option.empty(), Option.empty());
|
||||
}
|
||||
writeTableMetadata(rollbackMetadata);
|
||||
table.getActiveTimeline().transitionRollbackInflightToComplete(inflightInstant,
|
||||
TimelineMetadataUtils.serializeRollbackMetadata(rollbackMetadata));
|
||||
LOG.info("Rollback of Commits " + rollbackMetadata.getCommitsRollback() + " is complete");
|
||||
} catch (IOException e) {
|
||||
throw new HoodieIOException("Error executing rollback at instant " + instantTime, e);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Update metadata table if available. Any update to metadata table happens within data table lock.
|
||||
* @param rollbackMetadata instance of {@link HoodieRollbackMetadata} to be applied to metadata.
|
||||
*/
|
||||
private void writeToMetadata(HoodieRollbackMetadata rollbackMetadata) {
|
||||
if (config.isMetadataTableEnabled()) {
|
||||
try {
|
||||
if (!skipLocking) {
|
||||
this.txnManager.beginTransaction(Option.empty(), Option.empty());
|
||||
}
|
||||
writeTableMetadata(rollbackMetadata);
|
||||
} finally {
|
||||
if (!skipLocking) {
|
||||
this.txnManager.endTransaction();
|
||||
}
|
||||
} finally {
|
||||
if (!skipLocking) {
|
||||
this.txnManager.endTransaction();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -362,12 +362,19 @@ public class HoodieFlinkWriteClient<T extends HoodieRecordPayload> extends
|
||||
String compactionCommitTime) {
|
||||
this.context.setJobStatus(this.getClass().getSimpleName(), "Collect compaction write status and commit compaction");
|
||||
List<HoodieWriteStat> writeStats = writeStatuses.stream().map(WriteStatus::getStat).collect(Collectors.toList());
|
||||
finalizeWrite(table, compactionCommitTime, writeStats);
|
||||
// commit to data table after committing to metadata table.
|
||||
writeTableMetadata(table, metadata, new HoodieInstant(HoodieInstant.State.INFLIGHT, HoodieTimeline.COMPACTION_ACTION, compactionCommitTime));
|
||||
LOG.info("Committing Compaction {} finished with result {}.", compactionCommitTime, metadata);
|
||||
CompactHelpers.getInstance().completeInflightCompaction(table, compactionCommitTime, metadata);
|
||||
|
||||
try {
|
||||
HoodieInstant compactionInstant = new HoodieInstant(HoodieInstant.State.INFLIGHT, HoodieTimeline.COMPACTION_ACTION, compactionCommitTime);
|
||||
this.txnManager.beginTransaction(Option.of(compactionInstant), Option.empty());
|
||||
finalizeWrite(table, compactionCommitTime, writeStats);
|
||||
// commit to data table after committing to metadata table.
|
||||
// Do not do any conflict resolution here as we do with regular writes. We take the lock here to ensure all writes to metadata table happens within a
|
||||
// single lock (single writer). Because more than one write to metadata table will result in conflicts since all of them updates the same partition.
|
||||
table.getMetadataWriter().ifPresent(w -> w.update(metadata, compactionInstant.getTimestamp(), table.isTableServiceAction(compactionInstant.getAction())));
|
||||
LOG.info("Committing Compaction {} finished with result {}.", compactionCommitTime, metadata);
|
||||
CompactHelpers.getInstance().completeInflightCompaction(table, compactionCommitTime, metadata);
|
||||
} finally {
|
||||
this.txnManager.endTransaction();
|
||||
}
|
||||
if (compactionTimer != null) {
|
||||
long durationInMs = metrics.getDurationInMs(compactionTimer.stop());
|
||||
try {
|
||||
@@ -399,19 +406,6 @@ public class HoodieFlinkWriteClient<T extends HoodieRecordPayload> extends
|
||||
throw new HoodieNotSupportedException("Clustering is not supported yet");
|
||||
}
|
||||
|
||||
private void writeTableMetadata(HoodieTable<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> table,
|
||||
HoodieCommitMetadata commitMetadata,
|
||||
HoodieInstant hoodieInstant) {
|
||||
try {
|
||||
this.txnManager.beginTransaction(Option.of(hoodieInstant), Option.empty());
|
||||
// Do not do any conflict resolution here as we do with regular writes. We take the lock here to ensure all writes to metadata table happens within a
|
||||
// single lock (single writer). Because more than one write to metadata table will result in conflicts since all of them updates the same partition.
|
||||
table.getMetadataWriter().ifPresent(w -> w.update(commitMetadata, hoodieInstant.getTimestamp(), table.isTableServiceAction(hoodieInstant.getAction())));
|
||||
} finally {
|
||||
this.txnManager.endTransaction();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
protected HoodieTable<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> getTableAndInitCtx(WriteOperationType operationType, String instantTime) {
|
||||
HoodieTableMetaClient metaClient = createMetaClient(true);
|
||||
|
||||
@@ -313,11 +313,17 @@ public class SparkRDDWriteClient<T extends HoodieRecordPayload> extends
|
||||
String compactionCommitTime) {
|
||||
this.context.setJobStatus(this.getClass().getSimpleName(), "Collect compaction write status and commit compaction");
|
||||
List<HoodieWriteStat> writeStats = writeStatuses.map(WriteStatus::getStat).collect();
|
||||
finalizeWrite(table, compactionCommitTime, writeStats);
|
||||
writeTableMetadataForTableServices(table, metadata, new HoodieInstant(HoodieInstant.State.INFLIGHT, HoodieTimeline.COMPACTION_ACTION, compactionCommitTime));
|
||||
// commit to data table after committing to metadata table.
|
||||
LOG.info("Committing Compaction " + compactionCommitTime + ". Finished with result " + metadata);
|
||||
CompactHelpers.getInstance().completeInflightCompaction(table, compactionCommitTime, metadata);
|
||||
try {
|
||||
HoodieInstant compactionInstant = new HoodieInstant(HoodieInstant.State.INFLIGHT, HoodieTimeline.COMPACTION_ACTION, compactionCommitTime);
|
||||
this.txnManager.beginTransaction(Option.of(compactionInstant), Option.empty());
|
||||
finalizeWrite(table, compactionCommitTime, writeStats);
|
||||
// commit to data table after committing to metadata table.
|
||||
writeTableMetadataForTableServices(table, metadata, compactionInstant);
|
||||
LOG.info("Committing Compaction " + compactionCommitTime + ". Finished with result " + metadata);
|
||||
CompactHelpers.getInstance().completeInflightCompaction(table, compactionCommitTime, metadata);
|
||||
} finally {
|
||||
this.txnManager.endTransaction();
|
||||
}
|
||||
WriteMarkersFactory.get(config.getMarkersType(), table, compactionCommitTime)
|
||||
.quietDeleteMarkerDir(context, config.getMarkersDeleteParallelism());
|
||||
if (compactionTimer != null) {
|
||||
@@ -385,9 +391,11 @@ public class SparkRDDWriteClient<T extends HoodieRecordPayload> extends
|
||||
throw new HoodieClusteringException("Clustering failed to write to files:"
|
||||
+ writeStats.stream().filter(s -> s.getTotalWriteErrors() > 0L).map(s -> s.getFileId()).collect(Collectors.joining(",")));
|
||||
}
|
||||
finalizeWrite(table, clusteringCommitTime, writeStats);
|
||||
writeTableMetadataForTableServices(table, metadata, new HoodieInstant(HoodieInstant.State.INFLIGHT, HoodieTimeline.REPLACE_COMMIT_ACTION, clusteringCommitTime));
|
||||
try {
|
||||
HoodieInstant clusteringInstant = new HoodieInstant(HoodieInstant.State.INFLIGHT, HoodieTimeline.REPLACE_COMMIT_ACTION, clusteringCommitTime);
|
||||
this.txnManager.beginTransaction(Option.of(clusteringInstant), Option.empty());
|
||||
finalizeWrite(table, clusteringCommitTime, writeStats);
|
||||
writeTableMetadataForTableServices(table, metadata,clusteringInstant);
|
||||
// try to save statistics info to hudi
|
||||
if (config.isDataSkippingEnabled() && config.isLayoutOptimizationEnabled() && !config.getClusteringSortColumns().isEmpty()) {
|
||||
table.updateStatistics(context, writeStats, clusteringCommitTime, true);
|
||||
@@ -398,6 +406,8 @@ public class SparkRDDWriteClient<T extends HoodieRecordPayload> extends
|
||||
Option.of(metadata.toJsonString().getBytes(StandardCharsets.UTF_8)));
|
||||
} catch (IOException e) {
|
||||
throw new HoodieClusteringException("unable to transition clustering inflight to complete: " + clusteringCommitTime, e);
|
||||
} finally {
|
||||
this.txnManager.endTransaction();
|
||||
}
|
||||
WriteMarkersFactory.get(config.getMarkersType(), table, clusteringCommitTime)
|
||||
.quietDeleteMarkerDir(context, config.getMarkersDeleteParallelism());
|
||||
@@ -415,16 +425,11 @@ public class SparkRDDWriteClient<T extends HoodieRecordPayload> extends
|
||||
}
|
||||
|
||||
private void writeTableMetadataForTableServices(HoodieTable<T, JavaRDD<HoodieRecord<T>>, JavaRDD<HoodieKey>, JavaRDD<WriteStatus>> table, HoodieCommitMetadata commitMetadata,
|
||||
HoodieInstant hoodieInstant) {
|
||||
try {
|
||||
this.txnManager.beginTransaction(Option.of(hoodieInstant), Option.empty());
|
||||
boolean isTableServiceAction = table.isTableServiceAction(hoodieInstant.getAction());
|
||||
// Do not do any conflict resolution here as we do with regular writes. We take the lock here to ensure all writes to metadata table happens within a
|
||||
// single lock (single writer). Because more than one write to metadata table will result in conflicts since all of them updates the same partition.
|
||||
table.getMetadataWriter().ifPresent(w -> w.update(commitMetadata, hoodieInstant.getTimestamp(), isTableServiceAction));
|
||||
} finally {
|
||||
this.txnManager.endTransaction();
|
||||
}
|
||||
HoodieInstant hoodieInstant) {
|
||||
boolean isTableServiceAction = table.isTableServiceAction(hoodieInstant.getAction());
|
||||
// Do not do any conflict resolution here as we do with regular writes. We take the lock here to ensure all writes to metadata table happens within a
|
||||
// single lock (single writer). Because more than one write to metadata table will result in conflicts since all of them updates the same partition.
|
||||
table.getMetadataWriter().ifPresent(w -> w.update(commitMetadata, hoodieInstant.getTimestamp(), isTableServiceAction));
|
||||
}
|
||||
|
||||
@Override
|
||||
|
||||
Reference in New Issue
Block a user