[HUDI - 738] Add validation to DeltaStreamer to fail fast when filterDupes is enabled on UPSERT mode. (#1505)
Summary: This fix ensures for UPSERT operation, '--filter-dupes' is disabled and fails fast if not. Otherwise it would drop all updates silently and only take in new records.
This commit is contained in:
committed by
GitHub
parent
f5f34bb1c1
commit
8c7cef3e50
@@ -661,7 +661,7 @@ public class TestHoodieDeltaStreamer extends UtilitiesTestBase {
|
||||
// Generate the same 1000 records + 1000 new ones for upsert
|
||||
cfg.filterDupes = true;
|
||||
cfg.sourceLimit = 2000;
|
||||
cfg.operation = Operation.UPSERT;
|
||||
cfg.operation = Operation.INSERT;
|
||||
new HoodieDeltaStreamer(cfg, jsc).sync();
|
||||
TestHelpers.assertRecordCount(2000, tableBasePath + "/*/*.parquet", sqlContext);
|
||||
TestHelpers.assertCommitMetadata("00001", tableBasePath, dfs, 2);
|
||||
@@ -674,7 +674,7 @@ public class TestHoodieDeltaStreamer extends UtilitiesTestBase {
|
||||
HoodieTableMetaClient mClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), tableBasePath, true);
|
||||
HoodieInstant lastFinished = mClient.getCommitsTimeline().filterCompletedInstants().lastInstant().get();
|
||||
HoodieDeltaStreamer.Config cfg2 = TestHelpers.makeDropAllConfig(tableBasePath, Operation.UPSERT);
|
||||
cfg2.filterDupes = true;
|
||||
cfg2.filterDupes = false;
|
||||
cfg2.sourceLimit = 2000;
|
||||
cfg2.operation = Operation.UPSERT;
|
||||
cfg2.configs.add(String.format("%s=false", HoodieCompactionConfig.AUTO_CLEAN_PROP));
|
||||
@@ -690,6 +690,16 @@ public class TestHoodieDeltaStreamer extends UtilitiesTestBase {
|
||||
.fromBytes(mClient.getActiveTimeline().getInstantDetails(newLastFinished).get(), HoodieCommitMetadata.class);
|
||||
System.out.println("New Commit Metadata=" + commitMetadata);
|
||||
assertTrue(commitMetadata.getPartitionToWriteStats().isEmpty());
|
||||
|
||||
// Try UPSERT with filterDupes true. Expect exception
|
||||
cfg2.filterDupes = true;
|
||||
cfg2.operation = Operation.UPSERT;
|
||||
try {
|
||||
new HoodieDeltaStreamer(cfg2, jsc).sync();
|
||||
} catch (IllegalArgumentException e) {
|
||||
assertTrue(e.getMessage().contains("'--filter-dupes' needs to be disabled when '--op' is 'UPSERT' to ensure updates are not missed."));
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@Test
|
||||
|
||||
Reference in New Issue
Block a user