From e78b2f1b5572fd1c0b5a549bf5cd4772531399ec Mon Sep 17 00:00:00 2001 From: Sagar Sumit Date: Sat, 29 Jan 2022 07:58:04 +0530 Subject: [PATCH] [HUDI-2943] Complete pending clustering before deltastreamer sync (#4572) --- .../utilities/deltastreamer/DeltaSync.java | 22 +++++++++++++ .../deltastreamer/HoodieDeltaStreamer.java | 3 ++ .../functional/TestHoodieDeltaStreamer.java | 32 +++++++++++++++++++ 3 files changed, 57 insertions(+) diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java index c7b29c9f0..5c2b69293 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java @@ -190,6 +190,9 @@ public class DeltaSync implements Serializable { */ private transient Option commitTimelineOpt; + // all commits timeline + private transient Option allCommitsTimelineOpt; + /** * Tracks whether new schema is being seen and creates client accordingly. */ @@ -245,15 +248,18 @@ public class DeltaSync implements Serializable { switch (meta.getTableType()) { case COPY_ON_WRITE: this.commitTimelineOpt = Option.of(meta.getActiveTimeline().getCommitTimeline().filterCompletedInstants()); + this.allCommitsTimelineOpt = Option.of(meta.getActiveTimeline().getAllCommitsTimeline()); break; case MERGE_ON_READ: this.commitTimelineOpt = Option.of(meta.getActiveTimeline().getDeltaCommitTimeline().filterCompletedInstants()); + this.allCommitsTimelineOpt = Option.of(meta.getActiveTimeline().getAllCommitsTimeline()); break; default: throw new HoodieException("Unsupported table type :" + meta.getTableType()); } } else { this.commitTimelineOpt = Option.empty(); + this.allCommitsTimelineOpt = Option.empty(); String partitionColumns = HoodieSparkUtils.getPartitionColumns(keyGenerator, props); HoodieTableMetaClient.withPropertyBuilder() .setTableType(cfg.tableType) @@ -306,6 +312,14 @@ public class DeltaSync implements Serializable { } } + // complete the pending clustering before writing to sink + if (cfg.retryLastPendingInlineClusteringJob && getHoodieClientConfig(this.schemaProvider).inlineClusteringEnabled()) { + Option pendingClusteringInstant = getLastPendingClusteringInstant(allCommitsTimelineOpt); + if (pendingClusteringInstant.isPresent()) { + writeClient.cluster(pendingClusteringInstant.get(), true); + } + } + result = writeToSink(srcRecordsWithCkpt.getRight().getRight(), srcRecordsWithCkpt.getRight().getLeft(), metrics, overallTimerContext); } @@ -317,6 +331,14 @@ public class DeltaSync implements Serializable { return result; } + private Option getLastPendingClusteringInstant(Option commitTimelineOpt) { + if (commitTimelineOpt.isPresent()) { + Option pendingClusteringInstant = commitTimelineOpt.get().filterPendingReplaceTimeline().lastInstant(); + return pendingClusteringInstant.isPresent() ? Option.of(pendingClusteringInstant.get().getTimestamp()) : Option.empty(); + } + return Option.empty(); + } + /** * Read from Upstream Source and apply transformation if needed. * diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java index 3ceb00287..3e4f00930 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java @@ -372,6 +372,9 @@ public class HoodieDeltaStreamer implements Serializable { @Parameter(names = {"--help", "-h"}, help = true) public Boolean help = false; + @Parameter(names = {"--retry-last-pending-inline-clustering", "-rc"}, description = "Retry last pending inline clustering plan before writing to sink.") + public Boolean retryLastPendingInlineClusteringJob = false; + public boolean isAsyncCompactionEnabled() { return continuousMode && !forceDisableCompaction && HoodieTableType.MERGE_ON_READ.equals(HoodieTableType.valueOf(tableType)); diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java index 187499188..d63bce658 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java @@ -754,6 +754,38 @@ public class TestHoodieDeltaStreamer extends HoodieDeltaStreamerTestBase { }); } + @Test + public void testDeltaSyncWithPendingClustering() throws Exception { + String tableBasePath = dfsBasePath + "/inlineClusteringPending"; + // ingest data + int totalRecords = 2000; + HoodieDeltaStreamer.Config cfg = TestHelpers.makeConfig(tableBasePath, WriteOperationType.INSERT); + cfg.continuousMode = false; + cfg.tableType = HoodieTableType.COPY_ON_WRITE.name(); + HoodieDeltaStreamer ds = new HoodieDeltaStreamer(cfg, jsc); + ds.sync(); + // assert ingest successful + TestHelpers.assertAtLeastNCommits(1, tableBasePath, dfs); + + // schedule a clustering job to build a clustering plan and transition to inflight + HoodieClusteringJob clusteringJob = initialHoodieClusteringJob(tableBasePath, null, false, "schedule"); + clusteringJob.cluster(0); + HoodieTableMetaClient meta = HoodieTableMetaClient.builder().setConf(dfs.getConf()).setBasePath(tableBasePath).build(); + List hoodieClusteringInstants = meta.getActiveTimeline().filterPendingReplaceTimeline().getInstants().collect(Collectors.toList()); + HoodieInstant clusteringRequest = hoodieClusteringInstants.get(0); + meta.getActiveTimeline().transitionReplaceRequestedToInflight(clusteringRequest, Option.empty()); + + // do another ingestion with inline clustering enabled + cfg.configs.addAll(getAsyncServicesConfigs(totalRecords, "false", "true", "2", "", "")); + cfg.retryLastPendingInlineClusteringJob = true; + HoodieDeltaStreamer ds2 = new HoodieDeltaStreamer(cfg, jsc); + ds2.sync(); + String completeClusteringTimeStamp = meta.reloadActiveTimeline().getCompletedReplaceTimeline().lastInstant().get().getTimestamp(); + assertEquals(clusteringRequest.getTimestamp(), completeClusteringTimeStamp); + TestHelpers.assertAtLeastNCommits(2, tableBasePath, dfs); + TestHelpers.assertAtLeastNReplaceCommits(1, tableBasePath, dfs); + } + @ParameterizedTest @ValueSource(booleans = {true, false}) public void testCleanerDeleteReplacedDataWithArchive(Boolean asyncClean) throws Exception {