From 7bb90e8cafaf1dfd5000ff740441161dd43a7dd5 Mon Sep 17 00:00:00 2001 From: Sivabalan Narayanan Date: Thu, 25 Nov 2021 14:19:30 -0500 Subject: [PATCH] [HUDI-2794] Guarding table service commits within a single lock to commit to both data table and metadata table (#4037) * Fixing a single lock to commit table services across metadata table and data table * Addressing comments * rebasing with master --- .../action/clean/CleanActionExecutor.java | 26 ++++--------- .../rollback/BaseRollbackActionExecutor.java | 26 ++++--------- .../hudi/client/HoodieFlinkWriteClient.java | 32 +++++++-------- .../hudi/client/SparkRDDWriteClient.java | 39 +++++++++++-------- 4 files changed, 49 insertions(+), 74 deletions(-) diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanActionExecutor.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanActionExecutor.java index a445fd3cc..173010e86 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanActionExecutor.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanActionExecutor.java @@ -205,31 +205,19 @@ public class CleanActionExecutor extends Option.of(timer.endTimer()), cleanStats ); - writeMetadata(metadata); + if (!skipLocking) { + this.txnManager.beginTransaction(Option.empty(), Option.empty()); + } + writeTableMetadata(metadata); table.getActiveTimeline().transitionCleanInflightToComplete(inflightInstant, TimelineMetadataUtils.serializeCleanMetadata(metadata)); LOG.info("Marked clean started on " + inflightInstant.getTimestamp() + " as complete"); return metadata; } catch (IOException e) { throw new HoodieIOException("Failed to clean up after commit", e); - } - } - - /** - * Update metadata table if available. Any update to metadata table happens within data table lock. - * @param cleanMetadata instance of {@link HoodieCleanMetadata} to be applied to metadata. - */ - private void writeMetadata(HoodieCleanMetadata cleanMetadata) { - if (config.isMetadataTableEnabled()) { - try { - if (!skipLocking) { - this.txnManager.beginTransaction(Option.empty(), Option.empty()); - } - writeTableMetadata(cleanMetadata); - } finally { - if (!skipLocking) { - this.txnManager.endTransaction(); - } + } finally { + if (!skipLocking) { + this.txnManager.endTransaction(); } } } diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/BaseRollbackActionExecutor.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/BaseRollbackActionExecutor.java index ff50a2961..54cb51f03 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/BaseRollbackActionExecutor.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/BaseRollbackActionExecutor.java @@ -255,30 +255,18 @@ public abstract class BaseRollbackActionExecutor extends String compactionCommitTime) { this.context.setJobStatus(this.getClass().getSimpleName(), "Collect compaction write status and commit compaction"); List writeStats = writeStatuses.stream().map(WriteStatus::getStat).collect(Collectors.toList()); - finalizeWrite(table, compactionCommitTime, writeStats); - // commit to data table after committing to metadata table. - writeTableMetadata(table, metadata, new HoodieInstant(HoodieInstant.State.INFLIGHT, HoodieTimeline.COMPACTION_ACTION, compactionCommitTime)); - LOG.info("Committing Compaction {} finished with result {}.", compactionCommitTime, metadata); - CompactHelpers.getInstance().completeInflightCompaction(table, compactionCommitTime, metadata); - + try { + HoodieInstant compactionInstant = new HoodieInstant(HoodieInstant.State.INFLIGHT, HoodieTimeline.COMPACTION_ACTION, compactionCommitTime); + this.txnManager.beginTransaction(Option.of(compactionInstant), Option.empty()); + finalizeWrite(table, compactionCommitTime, writeStats); + // commit to data table after committing to metadata table. + // Do not do any conflict resolution here as we do with regular writes. We take the lock here to ensure all writes to metadata table happens within a + // single lock (single writer). Because more than one write to metadata table will result in conflicts since all of them updates the same partition. + table.getMetadataWriter().ifPresent(w -> w.update(metadata, compactionInstant.getTimestamp(), table.isTableServiceAction(compactionInstant.getAction()))); + LOG.info("Committing Compaction {} finished with result {}.", compactionCommitTime, metadata); + CompactHelpers.getInstance().completeInflightCompaction(table, compactionCommitTime, metadata); + } finally { + this.txnManager.endTransaction(); + } if (compactionTimer != null) { long durationInMs = metrics.getDurationInMs(compactionTimer.stop()); try { @@ -399,19 +406,6 @@ public class HoodieFlinkWriteClient extends throw new HoodieNotSupportedException("Clustering is not supported yet"); } - private void writeTableMetadata(HoodieTable>, List, List> table, - HoodieCommitMetadata commitMetadata, - HoodieInstant hoodieInstant) { - try { - this.txnManager.beginTransaction(Option.of(hoodieInstant), Option.empty()); - // Do not do any conflict resolution here as we do with regular writes. We take the lock here to ensure all writes to metadata table happens within a - // single lock (single writer). Because more than one write to metadata table will result in conflicts since all of them updates the same partition. - table.getMetadataWriter().ifPresent(w -> w.update(commitMetadata, hoodieInstant.getTimestamp(), table.isTableServiceAction(hoodieInstant.getAction()))); - } finally { - this.txnManager.endTransaction(); - } - } - @Override protected HoodieTable>, List, List> getTableAndInitCtx(WriteOperationType operationType, String instantTime) { HoodieTableMetaClient metaClient = createMetaClient(true); diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java index 81da1fbed..6672028a6 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java @@ -313,11 +313,17 @@ public class SparkRDDWriteClient extends String compactionCommitTime) { this.context.setJobStatus(this.getClass().getSimpleName(), "Collect compaction write status and commit compaction"); List writeStats = writeStatuses.map(WriteStatus::getStat).collect(); - finalizeWrite(table, compactionCommitTime, writeStats); - writeTableMetadataForTableServices(table, metadata, new HoodieInstant(HoodieInstant.State.INFLIGHT, HoodieTimeline.COMPACTION_ACTION, compactionCommitTime)); - // commit to data table after committing to metadata table. - LOG.info("Committing Compaction " + compactionCommitTime + ". Finished with result " + metadata); - CompactHelpers.getInstance().completeInflightCompaction(table, compactionCommitTime, metadata); + try { + HoodieInstant compactionInstant = new HoodieInstant(HoodieInstant.State.INFLIGHT, HoodieTimeline.COMPACTION_ACTION, compactionCommitTime); + this.txnManager.beginTransaction(Option.of(compactionInstant), Option.empty()); + finalizeWrite(table, compactionCommitTime, writeStats); + // commit to data table after committing to metadata table. + writeTableMetadataForTableServices(table, metadata, compactionInstant); + LOG.info("Committing Compaction " + compactionCommitTime + ". Finished with result " + metadata); + CompactHelpers.getInstance().completeInflightCompaction(table, compactionCommitTime, metadata); + } finally { + this.txnManager.endTransaction(); + } WriteMarkersFactory.get(config.getMarkersType(), table, compactionCommitTime) .quietDeleteMarkerDir(context, config.getMarkersDeleteParallelism()); if (compactionTimer != null) { @@ -385,9 +391,11 @@ public class SparkRDDWriteClient extends throw new HoodieClusteringException("Clustering failed to write to files:" + writeStats.stream().filter(s -> s.getTotalWriteErrors() > 0L).map(s -> s.getFileId()).collect(Collectors.joining(","))); } - finalizeWrite(table, clusteringCommitTime, writeStats); - writeTableMetadataForTableServices(table, metadata, new HoodieInstant(HoodieInstant.State.INFLIGHT, HoodieTimeline.REPLACE_COMMIT_ACTION, clusteringCommitTime)); try { + HoodieInstant clusteringInstant = new HoodieInstant(HoodieInstant.State.INFLIGHT, HoodieTimeline.REPLACE_COMMIT_ACTION, clusteringCommitTime); + this.txnManager.beginTransaction(Option.of(clusteringInstant), Option.empty()); + finalizeWrite(table, clusteringCommitTime, writeStats); + writeTableMetadataForTableServices(table, metadata,clusteringInstant); // try to save statistics info to hudi if (config.isDataSkippingEnabled() && config.isLayoutOptimizationEnabled() && !config.getClusteringSortColumns().isEmpty()) { table.updateStatistics(context, writeStats, clusteringCommitTime, true); @@ -398,6 +406,8 @@ public class SparkRDDWriteClient extends Option.of(metadata.toJsonString().getBytes(StandardCharsets.UTF_8))); } catch (IOException e) { throw new HoodieClusteringException("unable to transition clustering inflight to complete: " + clusteringCommitTime, e); + } finally { + this.txnManager.endTransaction(); } WriteMarkersFactory.get(config.getMarkersType(), table, clusteringCommitTime) .quietDeleteMarkerDir(context, config.getMarkersDeleteParallelism()); @@ -415,16 +425,11 @@ public class SparkRDDWriteClient extends } private void writeTableMetadataForTableServices(HoodieTable>, JavaRDD, JavaRDD> table, HoodieCommitMetadata commitMetadata, - HoodieInstant hoodieInstant) { - try { - this.txnManager.beginTransaction(Option.of(hoodieInstant), Option.empty()); - boolean isTableServiceAction = table.isTableServiceAction(hoodieInstant.getAction()); - // Do not do any conflict resolution here as we do with regular writes. We take the lock here to ensure all writes to metadata table happens within a - // single lock (single writer). Because more than one write to metadata table will result in conflicts since all of them updates the same partition. - table.getMetadataWriter().ifPresent(w -> w.update(commitMetadata, hoodieInstant.getTimestamp(), isTableServiceAction)); - } finally { - this.txnManager.endTransaction(); - } + HoodieInstant hoodieInstant) { + boolean isTableServiceAction = table.isTableServiceAction(hoodieInstant.getAction()); + // Do not do any conflict resolution here as we do with regular writes. We take the lock here to ensure all writes to metadata table happens within a + // single lock (single writer). Because more than one write to metadata table will result in conflicts since all of them updates the same partition. + table.getMetadataWriter().ifPresent(w -> w.update(commitMetadata, hoodieInstant.getTimestamp(), isTableServiceAction)); } @Override