[HUDI-1500] Support replace commit in DeltaSync with commit metadata preserved (#3802)
This commit is contained in:
@@ -133,7 +133,7 @@ public class HoodieClusteringConfig extends HoodieConfig {
|
|||||||
|
|
||||||
public static final ConfigProperty<Boolean> PRESERVE_COMMIT_METADATA = ConfigProperty
|
public static final ConfigProperty<Boolean> PRESERVE_COMMIT_METADATA = ConfigProperty
|
||||||
.key("hoodie.clustering.preserve.commit.metadata")
|
.key("hoodie.clustering.preserve.commit.metadata")
|
||||||
.defaultValue(false)
|
.defaultValue(true)
|
||||||
.sinceVersion("0.9.0")
|
.sinceVersion("0.9.0")
|
||||||
.withDocumentation("When rewriting data, preserves existing hoodie_commit_time");
|
.withDocumentation("When rewriting data, preserves existing hoodie_commit_time");
|
||||||
|
|
||||||
|
|||||||
@@ -323,9 +323,7 @@ public class DeltaSync implements Serializable {
|
|||||||
// Retrieve the previous round checkpoints, if any
|
// Retrieve the previous round checkpoints, if any
|
||||||
Option<String> resumeCheckpointStr = Option.empty();
|
Option<String> resumeCheckpointStr = Option.empty();
|
||||||
if (commitTimelineOpt.isPresent()) {
|
if (commitTimelineOpt.isPresent()) {
|
||||||
// TODO: now not support replace action HUDI-1500
|
Option<HoodieInstant> lastCommit = commitTimelineOpt.get().lastInstant();
|
||||||
Option<HoodieInstant> lastCommit = commitTimelineOpt.get()
|
|
||||||
.filter(instant -> !instant.getAction().equals(HoodieTimeline.REPLACE_COMMIT_ACTION)).lastInstant();
|
|
||||||
if (lastCommit.isPresent()) {
|
if (lastCommit.isPresent()) {
|
||||||
HoodieCommitMetadata commitMetadata = HoodieCommitMetadata
|
HoodieCommitMetadata commitMetadata = HoodieCommitMetadata
|
||||||
.fromBytes(commitTimelineOpt.get().getInstantDetails(lastCommit.get()).get(), HoodieCommitMetadata.class);
|
.fromBytes(commitTimelineOpt.get().getInstantDetails(lastCommit.get()).get(), HoodieCommitMetadata.class);
|
||||||
|
|||||||
@@ -711,8 +711,9 @@ public class TestHoodieDeltaStreamer extends HoodieDeltaStreamerTestBase {
|
|||||||
deltaStreamerTestRunner(ds, null, condition);
|
deltaStreamerTestRunner(ds, null, condition);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@ParameterizedTest
|
||||||
public void testInlineClustering() throws Exception {
|
@ValueSource(strings = {"true", "false"})
|
||||||
|
public void testInlineClustering(String preserveCommitMetadata) throws Exception {
|
||||||
String tableBasePath = dfsBasePath + "/inlineClustering";
|
String tableBasePath = dfsBasePath + "/inlineClustering";
|
||||||
// Keep it higher than batch-size to test continuous mode
|
// Keep it higher than batch-size to test continuous mode
|
||||||
int totalRecords = 3000;
|
int totalRecords = 3000;
|
||||||
@@ -721,7 +722,7 @@ public class TestHoodieDeltaStreamer extends HoodieDeltaStreamerTestBase {
|
|||||||
HoodieDeltaStreamer.Config cfg = TestHelpers.makeConfig(tableBasePath, WriteOperationType.UPSERT);
|
HoodieDeltaStreamer.Config cfg = TestHelpers.makeConfig(tableBasePath, WriteOperationType.UPSERT);
|
||||||
cfg.continuousMode = true;
|
cfg.continuousMode = true;
|
||||||
cfg.tableType = HoodieTableType.MERGE_ON_READ.name();
|
cfg.tableType = HoodieTableType.MERGE_ON_READ.name();
|
||||||
cfg.configs.addAll(getAsyncServicesConfigs(totalRecords, "false", "true", "2", "", ""));
|
cfg.configs.addAll(getAsyncServicesConfigs(totalRecords, "false", "true", "2", "", "", preserveCommitMetadata));
|
||||||
HoodieDeltaStreamer ds = new HoodieDeltaStreamer(cfg, jsc);
|
HoodieDeltaStreamer ds = new HoodieDeltaStreamer(cfg, jsc);
|
||||||
deltaStreamerTestRunner(ds, cfg, (r) -> {
|
deltaStreamerTestRunner(ds, cfg, (r) -> {
|
||||||
TestHelpers.assertAtLeastNCommits(2, tableBasePath, dfs);
|
TestHelpers.assertAtLeastNCommits(2, tableBasePath, dfs);
|
||||||
@@ -812,6 +813,13 @@ public class TestHoodieDeltaStreamer extends HoodieDeltaStreamerTestBase {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private List<String> getAsyncServicesConfigs(int totalRecords, String autoClean, String inlineCluster, String inlineClusterMaxCommit,
|
||||||
|
String asyncCluster, String asyncClusterMaxCommit, String preserveCommitMetadata) {
|
||||||
|
List<String> configs = getAsyncServicesConfigs(totalRecords, autoClean, inlineCluster, inlineClusterMaxCommit, asyncCluster, asyncClusterMaxCommit);
|
||||||
|
configs.add(String.format("%s=%s", HoodieClusteringConfig.PRESERVE_COMMIT_METADATA.key(), preserveCommitMetadata));
|
||||||
|
return configs;
|
||||||
|
}
|
||||||
|
|
||||||
private List<String> getAsyncServicesConfigs(int totalRecords, String autoClean, String inlineCluster,
|
private List<String> getAsyncServicesConfigs(int totalRecords, String autoClean, String inlineCluster,
|
||||||
String inlineClusterMaxCommit, String asyncCluster, String asyncClusterMaxCommit) {
|
String inlineClusterMaxCommit, String asyncCluster, String asyncClusterMaxCommit) {
|
||||||
List<String> configs = new ArrayList<>();
|
List<String> configs = new ArrayList<>();
|
||||||
@@ -904,8 +912,9 @@ public class TestHoodieDeltaStreamer extends HoodieDeltaStreamerTestBase {
|
|||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@ParameterizedTest
|
||||||
public void testAsyncClusteringServiceWithCompaction() throws Exception {
|
@ValueSource(strings = {"true", "false"})
|
||||||
|
public void testAsyncClusteringServiceWithCompaction(String preserveCommitMetadata) throws Exception {
|
||||||
String tableBasePath = dfsBasePath + "/asyncClusteringCompaction";
|
String tableBasePath = dfsBasePath + "/asyncClusteringCompaction";
|
||||||
// Keep it higher than batch-size to test continuous mode
|
// Keep it higher than batch-size to test continuous mode
|
||||||
int totalRecords = 3000;
|
int totalRecords = 3000;
|
||||||
@@ -914,7 +923,7 @@ public class TestHoodieDeltaStreamer extends HoodieDeltaStreamerTestBase {
|
|||||||
HoodieDeltaStreamer.Config cfg = TestHelpers.makeConfig(tableBasePath, WriteOperationType.INSERT);
|
HoodieDeltaStreamer.Config cfg = TestHelpers.makeConfig(tableBasePath, WriteOperationType.INSERT);
|
||||||
cfg.continuousMode = true;
|
cfg.continuousMode = true;
|
||||||
cfg.tableType = HoodieTableType.MERGE_ON_READ.name();
|
cfg.tableType = HoodieTableType.MERGE_ON_READ.name();
|
||||||
cfg.configs.addAll(getAsyncServicesConfigs(totalRecords, "false", "", "", "true", "2"));
|
cfg.configs.addAll(getAsyncServicesConfigs(totalRecords, "false", "", "", "true", "2", preserveCommitMetadata));
|
||||||
HoodieDeltaStreamer ds = new HoodieDeltaStreamer(cfg, jsc);
|
HoodieDeltaStreamer ds = new HoodieDeltaStreamer(cfg, jsc);
|
||||||
deltaStreamerTestRunner(ds, cfg, (r) -> {
|
deltaStreamerTestRunner(ds, cfg, (r) -> {
|
||||||
TestHelpers.assertAtLeastNCommits(2, tableBasePath, dfs);
|
TestHelpers.assertAtLeastNCommits(2, tableBasePath, dfs);
|
||||||
|
|||||||
Reference in New Issue
Block a user