Making DataSource/DeltaStreamer use defaults for combining
- Addresses issue where insert will combine and remove duplicates within batch - Setting default insert combining to false (write client default) - Set to true if filtering duplicates on insert/bulk_insert
This commit is contained in:
committed by
Balaji Varadarajan
parent
ea20d47248
commit
57a8b9cc8c
@@ -128,12 +128,16 @@ public class DataSourceUtils {
|
|||||||
String basePath, String tblName, Map<String, String> parameters) throws Exception {
|
String basePath, String tblName, Map<String, String> parameters) throws Exception {
|
||||||
|
|
||||||
// inline compaction is on by default for MOR
|
// inline compaction is on by default for MOR
|
||||||
boolean inlineCompact = parameters.containsKey(DataSourceWriteOptions.STORAGE_TYPE_OPT_KEY())
|
boolean inlineCompact = parameters.get(DataSourceWriteOptions.STORAGE_TYPE_OPT_KEY())
|
||||||
&& parameters.get(DataSourceWriteOptions.STORAGE_TYPE_OPT_KEY()).equals(DataSourceWriteOptions
|
.equals(DataSourceWriteOptions.MOR_STORAGE_TYPE_OPT_VAL());
|
||||||
.MOR_STORAGE_TYPE_OPT_VAL());
|
|
||||||
|
|
||||||
HoodieWriteConfig writeConfig = HoodieWriteConfig.newBuilder().combineInput(true, true)
|
// insert/bulk-insert combining to be true, if filtering for duplicates
|
||||||
|
boolean combineInserts = Boolean.parseBoolean(parameters.get(
|
||||||
|
DataSourceWriteOptions.INSERT_DROP_DUPS_OPT_KEY()));
|
||||||
|
|
||||||
|
HoodieWriteConfig writeConfig = HoodieWriteConfig.newBuilder()
|
||||||
.withPath(basePath).withAutoCommit(false)
|
.withPath(basePath).withAutoCommit(false)
|
||||||
|
.combineInput(combineInserts, true)
|
||||||
.withSchema(schemaStr).forTable(tblName).withIndexConfig(
|
.withSchema(schemaStr).forTable(tblName).withIndexConfig(
|
||||||
HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.BLOOM).build())
|
HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.BLOOM).build())
|
||||||
.withCompactionConfig(HoodieCompactionConfig.newBuilder()
|
.withCompactionConfig(HoodieCompactionConfig.newBuilder()
|
||||||
|
|||||||
@@ -138,10 +138,7 @@ private[hoodie] object HoodieSparkSqlWriter {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Create a HoodieWriteClient & issue the write.
|
// Create a HoodieWriteClient & issue the write.
|
||||||
val client = DataSourceUtils.createHoodieClient(jsc,
|
val client = DataSourceUtils.createHoodieClient(jsc, schema.toString, path.get, tblName.get,
|
||||||
schema.toString,
|
|
||||||
path.get,
|
|
||||||
tblName.get,
|
|
||||||
mapAsJavaMap(parameters)
|
mapAsJavaMap(parameters)
|
||||||
)
|
)
|
||||||
val commitTime = client.startCommit()
|
val commitTime = client.startCommit()
|
||||||
@@ -257,4 +254,4 @@ private[hoodie] object HoodieSparkSqlWriter {
|
|||||||
hiveSyncConfig.partitionValueExtractorClass = parameters(HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY)
|
hiveSyncConfig.partitionValueExtractorClass = parameters(HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY)
|
||||||
hiveSyncConfig
|
hiveSyncConfig
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -326,8 +326,8 @@ public class HoodieDeltaStreamer implements Serializable {
|
|||||||
|
|
||||||
private HoodieWriteConfig getHoodieClientConfig(SchemaProvider schemaProvider) {
|
private HoodieWriteConfig getHoodieClientConfig(SchemaProvider schemaProvider) {
|
||||||
HoodieWriteConfig.Builder builder =
|
HoodieWriteConfig.Builder builder =
|
||||||
HoodieWriteConfig.newBuilder().combineInput(true, true).withPath(cfg.targetBasePath)
|
HoodieWriteConfig.newBuilder().withPath(cfg.targetBasePath)
|
||||||
.withAutoCommit(false)
|
.withAutoCommit(false).combineInput(cfg.filterDupes, true)
|
||||||
.withCompactionConfig(HoodieCompactionConfig.newBuilder()
|
.withCompactionConfig(HoodieCompactionConfig.newBuilder()
|
||||||
.withPayloadClass(cfg.payloadClassName)
|
.withPayloadClass(cfg.payloadClassName)
|
||||||
// turn on inline compaction by default, for MOR tables
|
// turn on inline compaction by default, for MOR tables
|
||||||
|
|||||||
Reference in New Issue
Block a user