diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java index c964c919e..7ec730394 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java @@ -173,9 +173,6 @@ public class DeltaSync implements Serializable { UtilHelpers.createSource(cfg.sourceClassName, props, jssc, sparkSession, schemaProvider)); this.hiveConf = hiveConf; - if (cfg.filterDupes) { - cfg.operation = cfg.operation == Operation.UPSERT ? Operation.INSERT : cfg.operation; - } // If schemaRegistry already resolved, setup write-client setupWriteClient(); @@ -348,8 +345,6 @@ public class DeltaSync implements Serializable { Option scheduledCompactionInstant = Option.empty(); // filter dupes if needed if (cfg.filterDupes) { - // turn upserts to insert - cfg.operation = cfg.operation == Operation.UPSERT ? Operation.INSERT : cfg.operation; records = DataSourceUtils.dropDuplicates(jssc, records, writeClient.getConfig()); } diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java index 836847815..0325eaf48 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java @@ -388,17 +388,15 @@ public class HoodieDeltaStreamer implements Serializable { tableType = HoodieTableType.valueOf(cfg.tableType); } + ValidationUtils.checkArgument(!cfg.filterDupes || cfg.operation != Operation.UPSERT, + "'--filter-dupes' needs to be disabled when '--op' is 'UPSERT' to ensure updates are not missed."); + this.props = properties != null ? properties : UtilHelpers.readConfig(fs, new Path(cfg.propsFilePath), cfg.configs).getConfig(); LOG.info("Creating delta streamer with configs : " + props.toString()); this.schemaProvider = UtilHelpers.createSchemaProvider(cfg.schemaProviderClassName, props, jssc); - if (cfg.filterDupes) { - cfg.operation = cfg.operation == Operation.UPSERT ? Operation.INSERT : cfg.operation; - } - deltaSync = new DeltaSync(cfg, sparkSession, schemaProvider, props, jssc, fs, hiveConf, this::onInitializingWriteClient); - } public DeltaSyncService(HoodieDeltaStreamer.Config cfg, JavaSparkContext jssc, FileSystem fs, HiveConf hiveConf) diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieMultiTableDeltaStreamer.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieMultiTableDeltaStreamer.java index 74455f28c..d9c8f83ea 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieMultiTableDeltaStreamer.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieMultiTableDeltaStreamer.java @@ -24,6 +24,7 @@ import org.apache.hudi.common.fs.FSUtils; import org.apache.hudi.common.model.OverwriteWithLatestAvroPayload; import org.apache.hudi.common.util.StringUtils; import org.apache.hudi.common.config.TypedProperties; +import org.apache.hudi.common.util.ValidationUtils; import org.apache.hudi.exception.HoodieException; import org.apache.hudi.utilities.UtilHelpers; import org.apache.hudi.utilities.schema.SchemaRegistryProvider; @@ -66,6 +67,8 @@ public class HoodieMultiTableDeltaStreamer { this.jssc = jssc; String commonPropsFile = config.propsFilePath; String configFolder = config.configFolder; + ValidationUtils.checkArgument(!config.filterDupes || config.operation != HoodieDeltaStreamer.Operation.UPSERT, + "'--filter-dupes' needs to be disabled when '--op' is 'UPSERT' to ensure updates are not missed."); FileSystem fs = FSUtils.getFs(commonPropsFile, jssc.hadoopConfiguration()); configFolder = configFolder.charAt(configFolder.length() - 1) == '/' ? configFolder.substring(0, configFolder.length() - 1) : configFolder; checkIfPropsFileAndConfigFolderExist(commonPropsFile, configFolder, fs); diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHoodieDeltaStreamer.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHoodieDeltaStreamer.java index 7edf5345c..8b661e729 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHoodieDeltaStreamer.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHoodieDeltaStreamer.java @@ -661,7 +661,7 @@ public class TestHoodieDeltaStreamer extends UtilitiesTestBase { // Generate the same 1000 records + 1000 new ones for upsert cfg.filterDupes = true; cfg.sourceLimit = 2000; - cfg.operation = Operation.UPSERT; + cfg.operation = Operation.INSERT; new HoodieDeltaStreamer(cfg, jsc).sync(); TestHelpers.assertRecordCount(2000, tableBasePath + "/*/*.parquet", sqlContext); TestHelpers.assertCommitMetadata("00001", tableBasePath, dfs, 2); @@ -674,7 +674,7 @@ public class TestHoodieDeltaStreamer extends UtilitiesTestBase { HoodieTableMetaClient mClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), tableBasePath, true); HoodieInstant lastFinished = mClient.getCommitsTimeline().filterCompletedInstants().lastInstant().get(); HoodieDeltaStreamer.Config cfg2 = TestHelpers.makeDropAllConfig(tableBasePath, Operation.UPSERT); - cfg2.filterDupes = true; + cfg2.filterDupes = false; cfg2.sourceLimit = 2000; cfg2.operation = Operation.UPSERT; cfg2.configs.add(String.format("%s=false", HoodieCompactionConfig.AUTO_CLEAN_PROP)); @@ -690,6 +690,16 @@ public class TestHoodieDeltaStreamer extends UtilitiesTestBase { .fromBytes(mClient.getActiveTimeline().getInstantDetails(newLastFinished).get(), HoodieCommitMetadata.class); System.out.println("New Commit Metadata=" + commitMetadata); assertTrue(commitMetadata.getPartitionToWriteStats().isEmpty()); + + // Try UPSERT with filterDupes true. Expect exception + cfg2.filterDupes = true; + cfg2.operation = Operation.UPSERT; + try { + new HoodieDeltaStreamer(cfg2, jsc).sync(); + } catch (IllegalArgumentException e) { + assertTrue(e.getMessage().contains("'--filter-dupes' needs to be disabled when '--op' is 'UPSERT' to ensure updates are not missed.")); + } + } @Test