[HUDI-2943] Complete pending clustering before deltastreamer sync (#4572)
This commit is contained in:
@@ -190,6 +190,9 @@ public class DeltaSync implements Serializable {
|
||||
*/
|
||||
private transient Option<HoodieTimeline> commitTimelineOpt;
|
||||
|
||||
// all commits timeline
|
||||
private transient Option<HoodieTimeline> allCommitsTimelineOpt;
|
||||
|
||||
/**
|
||||
* Tracks whether new schema is being seen and creates client accordingly.
|
||||
*/
|
||||
@@ -245,15 +248,18 @@ public class DeltaSync implements Serializable {
|
||||
switch (meta.getTableType()) {
|
||||
case COPY_ON_WRITE:
|
||||
this.commitTimelineOpt = Option.of(meta.getActiveTimeline().getCommitTimeline().filterCompletedInstants());
|
||||
this.allCommitsTimelineOpt = Option.of(meta.getActiveTimeline().getAllCommitsTimeline());
|
||||
break;
|
||||
case MERGE_ON_READ:
|
||||
this.commitTimelineOpt = Option.of(meta.getActiveTimeline().getDeltaCommitTimeline().filterCompletedInstants());
|
||||
this.allCommitsTimelineOpt = Option.of(meta.getActiveTimeline().getAllCommitsTimeline());
|
||||
break;
|
||||
default:
|
||||
throw new HoodieException("Unsupported table type :" + meta.getTableType());
|
||||
}
|
||||
} else {
|
||||
this.commitTimelineOpt = Option.empty();
|
||||
this.allCommitsTimelineOpt = Option.empty();
|
||||
String partitionColumns = HoodieSparkUtils.getPartitionColumns(keyGenerator, props);
|
||||
HoodieTableMetaClient.withPropertyBuilder()
|
||||
.setTableType(cfg.tableType)
|
||||
@@ -306,6 +312,14 @@ public class DeltaSync implements Serializable {
|
||||
}
|
||||
}
|
||||
|
||||
// complete the pending clustering before writing to sink
|
||||
if (cfg.retryLastPendingInlineClusteringJob && getHoodieClientConfig(this.schemaProvider).inlineClusteringEnabled()) {
|
||||
Option<String> pendingClusteringInstant = getLastPendingClusteringInstant(allCommitsTimelineOpt);
|
||||
if (pendingClusteringInstant.isPresent()) {
|
||||
writeClient.cluster(pendingClusteringInstant.get(), true);
|
||||
}
|
||||
}
|
||||
|
||||
result = writeToSink(srcRecordsWithCkpt.getRight().getRight(),
|
||||
srcRecordsWithCkpt.getRight().getLeft(), metrics, overallTimerContext);
|
||||
}
|
||||
@@ -317,6 +331,14 @@ public class DeltaSync implements Serializable {
|
||||
return result;
|
||||
}
|
||||
|
||||
private Option<String> getLastPendingClusteringInstant(Option<HoodieTimeline> commitTimelineOpt) {
|
||||
if (commitTimelineOpt.isPresent()) {
|
||||
Option<HoodieInstant> pendingClusteringInstant = commitTimelineOpt.get().filterPendingReplaceTimeline().lastInstant();
|
||||
return pendingClusteringInstant.isPresent() ? Option.of(pendingClusteringInstant.get().getTimestamp()) : Option.empty();
|
||||
}
|
||||
return Option.empty();
|
||||
}
|
||||
|
||||
/**
|
||||
* Read from Upstream Source and apply transformation if needed.
|
||||
*
|
||||
|
||||
@@ -372,6 +372,9 @@ public class HoodieDeltaStreamer implements Serializable {
|
||||
@Parameter(names = {"--help", "-h"}, help = true)
|
||||
public Boolean help = false;
|
||||
|
||||
@Parameter(names = {"--retry-last-pending-inline-clustering", "-rc"}, description = "Retry last pending inline clustering plan before writing to sink.")
|
||||
public Boolean retryLastPendingInlineClusteringJob = false;
|
||||
|
||||
public boolean isAsyncCompactionEnabled() {
|
||||
return continuousMode && !forceDisableCompaction
|
||||
&& HoodieTableType.MERGE_ON_READ.equals(HoodieTableType.valueOf(tableType));
|
||||
|
||||
@@ -754,6 +754,38 @@ public class TestHoodieDeltaStreamer extends HoodieDeltaStreamerTestBase {
|
||||
});
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testDeltaSyncWithPendingClustering() throws Exception {
|
||||
String tableBasePath = dfsBasePath + "/inlineClusteringPending";
|
||||
// ingest data
|
||||
int totalRecords = 2000;
|
||||
HoodieDeltaStreamer.Config cfg = TestHelpers.makeConfig(tableBasePath, WriteOperationType.INSERT);
|
||||
cfg.continuousMode = false;
|
||||
cfg.tableType = HoodieTableType.COPY_ON_WRITE.name();
|
||||
HoodieDeltaStreamer ds = new HoodieDeltaStreamer(cfg, jsc);
|
||||
ds.sync();
|
||||
// assert ingest successful
|
||||
TestHelpers.assertAtLeastNCommits(1, tableBasePath, dfs);
|
||||
|
||||
// schedule a clustering job to build a clustering plan and transition to inflight
|
||||
HoodieClusteringJob clusteringJob = initialHoodieClusteringJob(tableBasePath, null, false, "schedule");
|
||||
clusteringJob.cluster(0);
|
||||
HoodieTableMetaClient meta = HoodieTableMetaClient.builder().setConf(dfs.getConf()).setBasePath(tableBasePath).build();
|
||||
List<HoodieInstant> hoodieClusteringInstants = meta.getActiveTimeline().filterPendingReplaceTimeline().getInstants().collect(Collectors.toList());
|
||||
HoodieInstant clusteringRequest = hoodieClusteringInstants.get(0);
|
||||
meta.getActiveTimeline().transitionReplaceRequestedToInflight(clusteringRequest, Option.empty());
|
||||
|
||||
// do another ingestion with inline clustering enabled
|
||||
cfg.configs.addAll(getAsyncServicesConfigs(totalRecords, "false", "true", "2", "", ""));
|
||||
cfg.retryLastPendingInlineClusteringJob = true;
|
||||
HoodieDeltaStreamer ds2 = new HoodieDeltaStreamer(cfg, jsc);
|
||||
ds2.sync();
|
||||
String completeClusteringTimeStamp = meta.reloadActiveTimeline().getCompletedReplaceTimeline().lastInstant().get().getTimestamp();
|
||||
assertEquals(clusteringRequest.getTimestamp(), completeClusteringTimeStamp);
|
||||
TestHelpers.assertAtLeastNCommits(2, tableBasePath, dfs);
|
||||
TestHelpers.assertAtLeastNReplaceCommits(1, tableBasePath, dfs);
|
||||
}
|
||||
|
||||
@ParameterizedTest
|
||||
@ValueSource(booleans = {true, false})
|
||||
public void testCleanerDeleteReplacedDataWithArchive(Boolean asyncClean) throws Exception {
|
||||
|
||||
Reference in New Issue
Block a user