diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieClusteringConfig.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieClusteringConfig.java index f1e930b12..c93907c4a 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieClusteringConfig.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieClusteringConfig.java @@ -133,7 +133,7 @@ public class HoodieClusteringConfig extends HoodieConfig { public static final ConfigProperty PRESERVE_COMMIT_METADATA = ConfigProperty .key("hoodie.clustering.preserve.commit.metadata") - .defaultValue(false) + .defaultValue(true) .sinceVersion("0.9.0") .withDocumentation("When rewriting data, preserves existing hoodie_commit_time"); diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java index 53d68c323..6632dce86 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java @@ -323,9 +323,7 @@ public class DeltaSync implements Serializable { // Retrieve the previous round checkpoints, if any Option resumeCheckpointStr = Option.empty(); if (commitTimelineOpt.isPresent()) { - // TODO: now not support replace action HUDI-1500 - Option lastCommit = commitTimelineOpt.get() - .filter(instant -> !instant.getAction().equals(HoodieTimeline.REPLACE_COMMIT_ACTION)).lastInstant(); + Option lastCommit = commitTimelineOpt.get().lastInstant(); if (lastCommit.isPresent()) { HoodieCommitMetadata commitMetadata = HoodieCommitMetadata .fromBytes(commitTimelineOpt.get().getInstantDetails(lastCommit.get()).get(), HoodieCommitMetadata.class); diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java index 86c92f240..014a0c140 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java @@ -711,8 +711,9 @@ public class TestHoodieDeltaStreamer extends HoodieDeltaStreamerTestBase { deltaStreamerTestRunner(ds, null, condition); } - @Test - public void testInlineClustering() throws Exception { + @ParameterizedTest + @ValueSource(strings = {"true", "false"}) + public void testInlineClustering(String preserveCommitMetadata) throws Exception { String tableBasePath = dfsBasePath + "/inlineClustering"; // Keep it higher than batch-size to test continuous mode int totalRecords = 3000; @@ -721,7 +722,7 @@ public class TestHoodieDeltaStreamer extends HoodieDeltaStreamerTestBase { HoodieDeltaStreamer.Config cfg = TestHelpers.makeConfig(tableBasePath, WriteOperationType.UPSERT); cfg.continuousMode = true; cfg.tableType = HoodieTableType.MERGE_ON_READ.name(); - cfg.configs.addAll(getAsyncServicesConfigs(totalRecords, "false", "true", "2", "", "")); + cfg.configs.addAll(getAsyncServicesConfigs(totalRecords, "false", "true", "2", "", "", preserveCommitMetadata)); HoodieDeltaStreamer ds = new HoodieDeltaStreamer(cfg, jsc); deltaStreamerTestRunner(ds, cfg, (r) -> { TestHelpers.assertAtLeastNCommits(2, tableBasePath, dfs); @@ -812,6 +813,13 @@ public class TestHoodieDeltaStreamer extends HoodieDeltaStreamerTestBase { } } + private List getAsyncServicesConfigs(int totalRecords, String autoClean, String inlineCluster, String inlineClusterMaxCommit, + String asyncCluster, String asyncClusterMaxCommit, String preserveCommitMetadata) { + List configs = getAsyncServicesConfigs(totalRecords, autoClean, inlineCluster, inlineClusterMaxCommit, asyncCluster, asyncClusterMaxCommit); + configs.add(String.format("%s=%s", HoodieClusteringConfig.PRESERVE_COMMIT_METADATA.key(), preserveCommitMetadata)); + return configs; + } + private List getAsyncServicesConfigs(int totalRecords, String autoClean, String inlineCluster, String inlineClusterMaxCommit, String asyncCluster, String asyncClusterMaxCommit) { List configs = new ArrayList<>(); @@ -904,8 +912,9 @@ public class TestHoodieDeltaStreamer extends HoodieDeltaStreamerTestBase { }); } - @Test - public void testAsyncClusteringServiceWithCompaction() throws Exception { + @ParameterizedTest + @ValueSource(strings = {"true", "false"}) + public void testAsyncClusteringServiceWithCompaction(String preserveCommitMetadata) throws Exception { String tableBasePath = dfsBasePath + "/asyncClusteringCompaction"; // Keep it higher than batch-size to test continuous mode int totalRecords = 3000; @@ -914,7 +923,7 @@ public class TestHoodieDeltaStreamer extends HoodieDeltaStreamerTestBase { HoodieDeltaStreamer.Config cfg = TestHelpers.makeConfig(tableBasePath, WriteOperationType.INSERT); cfg.continuousMode = true; cfg.tableType = HoodieTableType.MERGE_ON_READ.name(); - cfg.configs.addAll(getAsyncServicesConfigs(totalRecords, "false", "", "", "true", "2")); + cfg.configs.addAll(getAsyncServicesConfigs(totalRecords, "false", "", "", "true", "2", preserveCommitMetadata)); HoodieDeltaStreamer ds = new HoodieDeltaStreamer(cfg, jsc); deltaStreamerTestRunner(ds, cfg, (r) -> { TestHelpers.assertAtLeastNCommits(2, tableBasePath, dfs);