diff --git a/hoodie-spark/src/main/java/com/uber/hoodie/DataSourceUtils.java b/hoodie-spark/src/main/java/com/uber/hoodie/DataSourceUtils.java index 28811bbfd..3c56a0d02 100644 --- a/hoodie-spark/src/main/java/com/uber/hoodie/DataSourceUtils.java +++ b/hoodie-spark/src/main/java/com/uber/hoodie/DataSourceUtils.java @@ -128,12 +128,16 @@ public class DataSourceUtils { String basePath, String tblName, Map parameters) throws Exception { // inline compaction is on by default for MOR - boolean inlineCompact = parameters.containsKey(DataSourceWriteOptions.STORAGE_TYPE_OPT_KEY()) - && parameters.get(DataSourceWriteOptions.STORAGE_TYPE_OPT_KEY()).equals(DataSourceWriteOptions - .MOR_STORAGE_TYPE_OPT_VAL()); + boolean inlineCompact = parameters.get(DataSourceWriteOptions.STORAGE_TYPE_OPT_KEY()) + .equals(DataSourceWriteOptions.MOR_STORAGE_TYPE_OPT_VAL()); - HoodieWriteConfig writeConfig = HoodieWriteConfig.newBuilder().combineInput(true, true) + // insert/bulk-insert combining to be true, if filtering for duplicates + boolean combineInserts = Boolean.parseBoolean(parameters.get( + DataSourceWriteOptions.INSERT_DROP_DUPS_OPT_KEY())); + + HoodieWriteConfig writeConfig = HoodieWriteConfig.newBuilder() .withPath(basePath).withAutoCommit(false) + .combineInput(combineInserts, true) .withSchema(schemaStr).forTable(tblName).withIndexConfig( HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.BLOOM).build()) .withCompactionConfig(HoodieCompactionConfig.newBuilder() diff --git a/hoodie-spark/src/main/scala/com/uber/hoodie/HoodieSparkSqlWriter.scala b/hoodie-spark/src/main/scala/com/uber/hoodie/HoodieSparkSqlWriter.scala index e7ae8e068..a3e50df7c 100644 --- a/hoodie-spark/src/main/scala/com/uber/hoodie/HoodieSparkSqlWriter.scala +++ b/hoodie-spark/src/main/scala/com/uber/hoodie/HoodieSparkSqlWriter.scala @@ -138,10 +138,7 @@ private[hoodie] object HoodieSparkSqlWriter { } // Create a HoodieWriteClient & issue the write. - val client = DataSourceUtils.createHoodieClient(jsc, - schema.toString, - path.get, - tblName.get, + val client = DataSourceUtils.createHoodieClient(jsc, schema.toString, path.get, tblName.get, mapAsJavaMap(parameters) ) val commitTime = client.startCommit() @@ -257,4 +254,4 @@ private[hoodie] object HoodieSparkSqlWriter { hiveSyncConfig.partitionValueExtractorClass = parameters(HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY) hiveSyncConfig } -} \ No newline at end of file +} diff --git a/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/deltastreamer/HoodieDeltaStreamer.java b/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/deltastreamer/HoodieDeltaStreamer.java index 0ad10def1..68621c44b 100644 --- a/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/deltastreamer/HoodieDeltaStreamer.java +++ b/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/deltastreamer/HoodieDeltaStreamer.java @@ -326,8 +326,8 @@ public class HoodieDeltaStreamer implements Serializable { private HoodieWriteConfig getHoodieClientConfig(SchemaProvider schemaProvider) { HoodieWriteConfig.Builder builder = - HoodieWriteConfig.newBuilder().combineInput(true, true).withPath(cfg.targetBasePath) - .withAutoCommit(false) + HoodieWriteConfig.newBuilder().withPath(cfg.targetBasePath) + .withAutoCommit(false).combineInput(cfg.filterDupes, true) .withCompactionConfig(HoodieCompactionConfig.newBuilder() .withPayloadClass(cfg.payloadClassName) // turn on inline compaction by default, for MOR tables