1
0

[HUDI-2595] Fixing metadata table updates such that only regular writes from data table can trigger table services in metadata table (#3900)

This commit is contained in:
Sivabalan Narayanan
2021-11-08 22:12:32 -05:00
committed by GitHub
parent 7aaf47e716
commit 6d109c6de5
22 changed files with 170 additions and 59 deletions

View File

@@ -410,9 +410,10 @@ public class SparkRDDWriteClient<T extends HoodieRecordPayload> extends
HoodieInstant hoodieInstant) {
try {
this.txnManager.beginTransaction(Option.of(hoodieInstant), Option.empty());
boolean isTableServiceAction = table.isTableServiceAction(hoodieInstant.getAction());
// Do not do any conflict resolution here as we do with regular writes. We take the lock here to ensure all writes to metadata table happens within a
// single lock (single writer). Because more than one write to metadata table will result in conflicts since all of them updates the same partition.
table.getMetadataWriter().ifPresent(w -> w.update(commitMetadata, hoodieInstant.getTimestamp()));
table.getMetadataWriter().ifPresent(w -> w.update(commitMetadata, hoodieInstant.getTimestamp(), isTableServiceAction));
} finally {
this.txnManager.endTransaction();
}
@@ -478,13 +479,14 @@ public class SparkRDDWriteClient<T extends HoodieRecordPayload> extends
}
@Override
protected void preCommit(String instantTime, HoodieCommitMetadata metadata) {
protected void preCommit(HoodieInstant inflightInstant, HoodieCommitMetadata metadata) {
// Create a Hoodie table after startTxn which encapsulated the commits and files visible.
// Important to create this after the lock to ensure latest commits show up in the timeline without need for reload
HoodieTable table = createTable(config, hadoopConf);
TransactionUtils.resolveWriteConflictIfAny(table, this.txnManager.getCurrentTransactionOwner(),
Option.of(metadata), config, txnManager.getLastCompletedTransactionOwner());
table.getMetadataWriter().ifPresent(w -> ((HoodieTableMetadataWriter)w).update(metadata, instantTime));
table.getMetadataWriter().ifPresent(w -> ((HoodieTableMetadataWriter)w).update(metadata, inflightInstant.getTimestamp(),
table.isTableServiceAction(inflightInstant.getAction())));
}
@Override

View File

@@ -103,7 +103,7 @@ public class SparkHoodieBackedTableMetadataWriter extends HoodieBackedTableMetad
}
@Override
protected void commit(List<HoodieRecord> records, String partitionName, String instantTime) {
protected void commit(List<HoodieRecord> records, String partitionName, String instantTime, boolean canTriggerTableService) {
ValidationUtils.checkState(enabled, "Metadata table cannot be committed to as it is not enabled");
JavaRDD<HoodieRecord> recordRDD = prepRecords(records, partitionName, 1);
@@ -132,8 +132,10 @@ public class SparkHoodieBackedTableMetadataWriter extends HoodieBackedTableMetad
// reload timeline
metadataMetaClient.reloadActiveTimeline();
compactIfNecessary(writeClient, instantTime);
doClean(writeClient, instantTime);
if (canTriggerTableService) {
compactIfNecessary(writeClient, instantTime);
doClean(writeClient, instantTime);
}
}
// Update total size of the metadata and count of base/log files

View File

@@ -101,6 +101,11 @@ public class HoodieSparkCopyOnWriteTable<T extends HoodieRecordPayload>
super(config, context, metaClient);
}
@Override
public boolean isTableServiceAction(String actionType) {
return !actionType.equals(HoodieTimeline.COMMIT_ACTION);
}
@Override
public HoodieWriteMetadata<JavaRDD<WriteStatus>> upsert(HoodieEngineContext context, String instantTime, JavaRDD<HoodieRecord<T>> records) {
return new SparkUpsertCommitActionExecutor<>((HoodieSparkEngineContext) context, config, this, instantTime, records).execute();

View File

@@ -80,6 +80,11 @@ public class HoodieSparkMergeOnReadTable<T extends HoodieRecordPayload> extends
super(config, context, metaClient);
}
@Override
public boolean isTableServiceAction(String actionType) {
return !actionType.equals(HoodieTimeline.DELTA_COMMIT_ACTION);
}
@Override
public HoodieWriteMetadata<JavaRDD<WriteStatus>> upsert(HoodieEngineContext context, String instantTime, JavaRDD<HoodieRecord<T>> records) {
return new SparkUpsertDeltaCommitActionExecutor<>((HoodieSparkEngineContext) context, config, this, instantTime, records).execute();

View File

@@ -248,7 +248,7 @@ public class SparkBootstrapCommitActionExecutor<T extends HoodieRecordPayload<T>
metadata.addMetadata(HoodieCommitMetadata.SCHEMA_KEY, getSchemaToStoreInCommit());
metadata.setOperationType(operationType);
writeTableMetadata(metadata);
writeTableMetadata(metadata, actionType);
try {
activeTimeline.saveAsComplete(new HoodieInstant(true, actionType, instantTime),

View File

@@ -267,7 +267,7 @@ public abstract class BaseSparkCommitActionExecutor<T extends HoodieRecordPayloa
HoodieActiveTimeline activeTimeline = table.getActiveTimeline();
HoodieCommitMetadata metadata = CommitUtils.buildMetadata(writeStats, result.getPartitionToReplaceFileIds(),
extraMetadata, operationType, getSchemaToStoreInCommit(), getCommitActionType());
writeTableMetadata(metadata);
writeTableMetadata(metadata, actionType);
activeTimeline.saveAsComplete(new HoodieInstant(true, getCommitActionType(), instantTime),
Option.of(metadata.toJsonString().getBytes(StandardCharsets.UTF_8)));
LOG.info("Committed " + instantTime);