1
0

[HUDI-2556] Tweak some default config options for flink (#3800)

* rename write.insert.drop.duplicates to write.precombine and set it as true for COW table
* set index.global.enabled default as true
* set compaction.target_io default as 500GB
This commit is contained in:
Danny Chan
2021-10-14 19:42:56 +08:00
committed by GitHub
parent f897e6d73e
commit 2c370cbae0
10 changed files with 57 additions and 47 deletions

View File

@@ -115,15 +115,15 @@ public class FlinkOptions extends HoodieConfig {
public static final ConfigOption<Double> INDEX_STATE_TTL = ConfigOptions public static final ConfigOption<Double> INDEX_STATE_TTL = ConfigOptions
.key("index.state.ttl") .key("index.state.ttl")
.doubleType() .doubleType()
.defaultValue(1.5D) .defaultValue(0D)
.withDescription("Index state ttl in days, default 1.5 day"); .withDescription("Index state ttl in days, default stores the index permanently");
public static final ConfigOption<Boolean> INDEX_GLOBAL_ENABLED = ConfigOptions public static final ConfigOption<Boolean> INDEX_GLOBAL_ENABLED = ConfigOptions
.key("index.global.enabled") .key("index.global.enabled")
.booleanType() .booleanType()
.defaultValue(false) .defaultValue(true)
.withDescription("Whether to update index for the old partition path\n" .withDescription("Whether to update index for the old partition path\n"
+ "if same key record with different partition path came in, default false"); + "if same key record with different partition path came in, default true");
public static final ConfigOption<String> INDEX_PARTITION_REGEX = ConfigOptions public static final ConfigOption<String> INDEX_PARTITION_REGEX = ConfigOptions
.key("index.partition.regex") .key("index.partition.regex")
@@ -255,15 +255,17 @@ public class FlinkOptions extends HoodieConfig {
+ "This will render any value set for the option in-effective"); + "This will render any value set for the option in-effective");
/** /**
* Flag to indicate whether to drop duplicates upon insert. * Flag to indicate whether to drop duplicates before insert/upsert.
* By default insert will accept duplicates, to gain extra performance. * By default false to gain extra performance.
*/ */
public static final ConfigOption<Boolean> INSERT_DROP_DUPS = ConfigOptions public static final ConfigOption<Boolean> PRE_COMBINE = ConfigOptions
.key("write.insert.drop.duplicates") .key("write.precombine")
.booleanType() .booleanType()
.defaultValue(false) .defaultValue(false)
.withDescription("Flag to indicate whether to drop duplicates upon insert.\n" .withDescription("Flag to indicate whether to drop duplicates before insert/upsert.\n"
+ "By default insert will accept duplicates, to gain extra performance"); + "By default these cases will accept duplicates, to gain extra performance:\n"
+ "1) insert operation;\n"
+ "2) upsert for MOR table, the MOR table deduplicate on reading");
public static final ConfigOption<Integer> RETRY_TIMES = ConfigOptions public static final ConfigOption<Integer> RETRY_TIMES = ConfigOptions
.key("write.retry.times") .key("write.retry.times")
@@ -496,8 +498,8 @@ public class FlinkOptions extends HoodieConfig {
public static final ConfigOption<Long> COMPACTION_TARGET_IO = ConfigOptions public static final ConfigOption<Long> COMPACTION_TARGET_IO = ConfigOptions
.key("compaction.target_io") .key("compaction.target_io")
.longType() .longType()
.defaultValue(5120L) // default 5 GB .defaultValue(500 * 1024L) // default 500 GB
.withDescription("Target IO per compaction (both read and write), default 5 GB"); .withDescription("Target IO per compaction (both read and write), default 500 GB");
public static final ConfigOption<Boolean> CLEAN_ASYNC_ENABLED = ConfigOptions public static final ConfigOption<Boolean> CLEAN_ASYNC_ENABLED = ConfigOptions
.key("clean.async.enabled") .key("clean.async.enabled")

View File

@@ -419,7 +419,7 @@ public class StreamWriteFunction<I> extends AbstractStreamWriteFunction<I> {
List<HoodieRecord> records = bucket.writeBuffer(); List<HoodieRecord> records = bucket.writeBuffer();
ValidationUtils.checkState(records.size() > 0, "Data bucket to flush has no buffering records"); ValidationUtils.checkState(records.size() > 0, "Data bucket to flush has no buffering records");
if (config.getBoolean(FlinkOptions.INSERT_DROP_DUPS)) { if (config.getBoolean(FlinkOptions.PRE_COMBINE)) {
records = FlinkWriteHelper.newInstance().deduplicateRecords(records, (HoodieIndex) null, -1); records = FlinkWriteHelper.newInstance().deduplicateRecords(records, (HoodieIndex) null, -1);
} }
bucket.preWrite(records); bucket.preWrite(records);
@@ -454,7 +454,7 @@ public class StreamWriteFunction<I> extends AbstractStreamWriteFunction<I> {
.forEach(bucket -> { .forEach(bucket -> {
List<HoodieRecord> records = bucket.writeBuffer(); List<HoodieRecord> records = bucket.writeBuffer();
if (records.size() > 0) { if (records.size() > 0) {
if (config.getBoolean(FlinkOptions.INSERT_DROP_DUPS)) { if (config.getBoolean(FlinkOptions.PRE_COMBINE)) {
records = FlinkWriteHelper.newInstance().deduplicateRecords(records, (HoodieIndex) null, -1); records = FlinkWriteHelper.newInstance().deduplicateRecords(records, (HoodieIndex) null, -1);
} }
bucket.preWrite(records); bucket.preWrite(records);

View File

@@ -211,13 +211,11 @@ public class StreamWriteOperatorCoordinator
// the stream write task snapshot and flush the data buffer synchronously in sequence, // the stream write task snapshot and flush the data buffer synchronously in sequence,
// so a successful checkpoint subsumes the old one(follows the checkpoint subsuming contract) // so a successful checkpoint subsumes the old one(follows the checkpoint subsuming contract)
final boolean committed = commitInstant(this.instant); final boolean committed = commitInstant(this.instant);
if (tableState.scheduleCompaction) { if (committed) {
// if async compaction is on, schedule the compaction if (tableState.scheduleCompaction) {
if (committed || tableState.timeCompactionTriggerStrategy) { // if async compaction is on, schedule the compaction
writeClient.scheduleCompaction(Option.empty()); writeClient.scheduleCompaction(Option.empty());
} }
}
if (committed) {
// start new instant. // start new instant.
startInstant(); startInstant();
// sync Hive if is enabled // sync Hive if is enabled
@@ -532,7 +530,6 @@ public class StreamWriteOperatorCoordinator
final String commitAction; final String commitAction;
final boolean isOverwrite; final boolean isOverwrite;
final boolean scheduleCompaction; final boolean scheduleCompaction;
final boolean timeCompactionTriggerStrategy;
final boolean syncHive; final boolean syncHive;
final boolean syncMetadata; final boolean syncMetadata;
@@ -542,7 +539,6 @@ public class StreamWriteOperatorCoordinator
HoodieTableType.valueOf(conf.getString(FlinkOptions.TABLE_TYPE).toUpperCase(Locale.ROOT))); HoodieTableType.valueOf(conf.getString(FlinkOptions.TABLE_TYPE).toUpperCase(Locale.ROOT)));
this.isOverwrite = WriteOperationType.isOverwrite(this.operationType); this.isOverwrite = WriteOperationType.isOverwrite(this.operationType);
this.scheduleCompaction = StreamerUtil.needsScheduleCompaction(conf); this.scheduleCompaction = StreamerUtil.needsScheduleCompaction(conf);
this.timeCompactionTriggerStrategy = StreamerUtil.isTimeCompactionTriggerStrategy(conf);
this.syncHive = conf.getBoolean(FlinkOptions.HIVE_SYNC_ENABLED); this.syncHive = conf.getBoolean(FlinkOptions.HIVE_SYNC_ENABLED);
this.syncMetadata = conf.getBoolean(FlinkOptions.METADATA_ENABLED); this.syncMetadata = conf.getBoolean(FlinkOptions.METADATA_ENABLED);
} }

View File

@@ -55,7 +55,7 @@ public class PayloadCreation implements Serializable {
} }
public static PayloadCreation instance(Configuration conf) throws Exception { public static PayloadCreation instance(Configuration conf) throws Exception {
boolean shouldCombine = conf.getBoolean(FlinkOptions.INSERT_DROP_DUPS) boolean shouldCombine = conf.getBoolean(FlinkOptions.PRE_COMBINE)
|| WriteOperationType.fromValue(conf.getString(FlinkOptions.OPERATION)) == WriteOperationType.UPSERT; || WriteOperationType.fromValue(conf.getString(FlinkOptions.OPERATION)) == WriteOperationType.UPSERT;
String preCombineField = null; String preCombineField = null;
final Class<?>[] argTypes; final Class<?>[] argTypes;

View File

@@ -112,7 +112,7 @@ public class FlinkStreamerConfig extends Configuration {
@Parameter(names = {"--filter-dupes"}, @Parameter(names = {"--filter-dupes"},
description = "Should duplicate records from source be dropped/filtered out before insert/bulk-insert.") description = "Should duplicate records from source be dropped/filtered out before insert/bulk-insert.")
public Boolean filterDupes = false; public Boolean preCombine = false;
@Parameter(names = {"--commit-on-errors"}, description = "Commit even when some records failed to be written.") @Parameter(names = {"--commit-on-errors"}, description = "Commit even when some records failed to be written.")
public Boolean commitOnErrors = false; public Boolean commitOnErrors = false;
@@ -220,8 +220,8 @@ public class FlinkStreamerConfig extends Configuration {
@Parameter(names = {"--compaction-max-memory"}, description = "Max memory in MB for compaction spillable map, default 100MB") @Parameter(names = {"--compaction-max-memory"}, description = "Max memory in MB for compaction spillable map, default 100MB")
public Integer compactionMaxMemory = 100; public Integer compactionMaxMemory = 100;
@Parameter(names = {"--compaction-target-io"}, description = "Target IO per compaction (both read and write), default 5 GB") @Parameter(names = {"--compaction-target-io"}, description = "Target IO per compaction (both read and write), default 500 GB")
public Long compactionTargetIo = 5120L; public Long compactionTargetIo = 512000L;
@Parameter(names = {"--clean-async-enabled"}, description = "Whether to cleanup the old commits immediately on new commits, enabled by default") @Parameter(names = {"--clean-async-enabled"}, description = "Whether to cleanup the old commits immediately on new commits, enabled by default")
public Boolean cleanAsyncEnabled = true; public Boolean cleanAsyncEnabled = true;
@@ -312,7 +312,7 @@ public class FlinkStreamerConfig extends Configuration {
conf.setString(FlinkOptions.OPERATION, config.operation.value()); conf.setString(FlinkOptions.OPERATION, config.operation.value());
conf.setString(FlinkOptions.PRECOMBINE_FIELD, config.sourceOrderingField); conf.setString(FlinkOptions.PRECOMBINE_FIELD, config.sourceOrderingField);
conf.setString(FlinkOptions.PAYLOAD_CLASS_NAME, config.payloadClassName); conf.setString(FlinkOptions.PAYLOAD_CLASS_NAME, config.payloadClassName);
conf.setBoolean(FlinkOptions.INSERT_DROP_DUPS, config.filterDupes); conf.setBoolean(FlinkOptions.PRE_COMBINE, config.preCombine);
conf.setInteger(FlinkOptions.RETRY_TIMES, Integer.parseInt(config.instantRetryTimes)); conf.setInteger(FlinkOptions.RETRY_TIMES, Integer.parseInt(config.instantRetryTimes));
conf.setLong(FlinkOptions.RETRY_INTERVAL_MS, Long.parseLong(config.instantRetryInterval)); conf.setLong(FlinkOptions.RETRY_INTERVAL_MS, Long.parseLong(config.instantRetryInterval));
conf.setBoolean(FlinkOptions.IGNORE_FAILED, config.commitOnErrors); conf.setBoolean(FlinkOptions.IGNORE_FAILED, config.commitOnErrors);

View File

@@ -18,6 +18,7 @@
package org.apache.hudi.table; package org.apache.hudi.table;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.configuration.FlinkOptions; import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.exception.HoodieValidationException; import org.apache.hudi.exception.HoodieValidationException;
import org.apache.hudi.hive.MultiPartKeysValueExtractor; import org.apache.hudi.hive.MultiPartKeysValueExtractor;
@@ -157,6 +158,8 @@ public class HoodieTableFactory implements DynamicTableSourceFactory, DynamicTab
setupHiveOptions(conf); setupHiveOptions(conf);
// read options // read options
setupReadOptions(conf); setupReadOptions(conf);
// write options
setupWriteOptions(conf);
// infer avro schema from physical DDL schema // infer avro schema from physical DDL schema
inferAvroSchema(conf, schema.toPhysicalRowDataType().notNull().getLogicalType()); inferAvroSchema(conf, schema.toPhysicalRowDataType().notNull().getLogicalType());
} }
@@ -249,12 +252,6 @@ public class HoodieTableFactory implements DynamicTableSourceFactory, DynamicTab
conf.setInteger(FlinkOptions.ARCHIVE_MIN_COMMITS, commitsToRetain + 10); conf.setInteger(FlinkOptions.ARCHIVE_MIN_COMMITS, commitsToRetain + 10);
conf.setInteger(FlinkOptions.ARCHIVE_MAX_COMMITS, commitsToRetain + 20); conf.setInteger(FlinkOptions.ARCHIVE_MAX_COMMITS, commitsToRetain + 20);
} }
if (conf.getBoolean(FlinkOptions.COMPACTION_SCHEDULE_ENABLED)
&& !conf.getBoolean(FlinkOptions.COMPACTION_ASYNC_ENABLED)
&& FlinkOptions.isDefaultValueDefined(conf, FlinkOptions.COMPACTION_TARGET_IO)) {
// if compaction schedule is on, tweak the target io to 500GB
conf.setLong(FlinkOptions.COMPACTION_TARGET_IO, 500 * 1024L);
}
if (StreamerUtil.allowDuplicateInserts(conf)) { if (StreamerUtil.allowDuplicateInserts(conf)) {
// no need for compaction if insert duplicates is allowed // no need for compaction if insert duplicates is allowed
conf.setBoolean(FlinkOptions.COMPACTION_ASYNC_ENABLED, false); conf.setBoolean(FlinkOptions.COMPACTION_ASYNC_ENABLED, false);
@@ -282,6 +279,16 @@ public class HoodieTableFactory implements DynamicTableSourceFactory, DynamicTab
} }
} }
/**
* Sets up the write options from the table definition.
*/
private static void setupWriteOptions(Configuration conf) {
if (FlinkOptions.isDefaultValueDefined(conf, FlinkOptions.OPERATION)
&& HoodieTableType.valueOf(conf.getString(FlinkOptions.TABLE_TYPE)) == HoodieTableType.COPY_ON_WRITE) {
conf.setBoolean(FlinkOptions.PRE_COMBINE, true);
}
}
/** /**
* Inferences the deserialization Avro schema from the table schema (e.g. the DDL) * Inferences the deserialization Avro schema from the table schema (e.g. the DDL)
* if both options {@link FlinkOptions#SOURCE_AVRO_SCHEMA_PATH} and * if both options {@link FlinkOptions#SOURCE_AVRO_SCHEMA_PATH} and

View File

@@ -154,7 +154,7 @@ public class StreamerUtil {
HoodieWriteConfig.newBuilder() HoodieWriteConfig.newBuilder()
.withEngineType(EngineType.FLINK) .withEngineType(EngineType.FLINK)
.withPath(conf.getString(FlinkOptions.PATH)) .withPath(conf.getString(FlinkOptions.PATH))
.combineInput(conf.getBoolean(FlinkOptions.INSERT_DROP_DUPS), true) .combineInput(conf.getBoolean(FlinkOptions.PRE_COMBINE), true)
.withMergeAllowDuplicateOnInserts(allowDuplicateInserts(conf)) .withMergeAllowDuplicateOnInserts(allowDuplicateInserts(conf))
.withCompactionConfig( .withCompactionConfig(
HoodieCompactionConfig.newBuilder() HoodieCompactionConfig.newBuilder()
@@ -302,16 +302,6 @@ public class StreamerUtil {
&& conf.getBoolean(FlinkOptions.COMPACTION_SCHEDULE_ENABLED); && conf.getBoolean(FlinkOptions.COMPACTION_SCHEDULE_ENABLED);
} }
/**
* Returns whether the compaction trigger strategy is time based.
*
* @param conf The flink configuration.
*/
public static boolean isTimeCompactionTriggerStrategy(Configuration conf) {
final String strategy = conf.getString(FlinkOptions.COMPACTION_TRIGGER_STRATEGY);
return FlinkOptions.TIME_ELAPSED.equalsIgnoreCase(strategy) || FlinkOptions.NUM_OR_TIME.equalsIgnoreCase(strategy);
}
/** /**
* Creates the meta client for reader. * Creates the meta client for reader.
* *

View File

@@ -275,7 +275,7 @@ public class TestWriteCopyOnWrite {
@Test @Test
public void testInsertDuplicates() throws Exception { public void testInsertDuplicates() throws Exception {
// reset the config option // reset the config option
conf.setBoolean(FlinkOptions.INSERT_DROP_DUPS, true); conf.setBoolean(FlinkOptions.PRE_COMBINE, true);
funcWrapper = new StreamWriteFunctionWrapper<>(tempFile.getAbsolutePath(), conf); funcWrapper = new StreamWriteFunctionWrapper<>(tempFile.getAbsolutePath(), conf);
// open the function and ingest data // open the function and ingest data
@@ -470,7 +470,7 @@ public class TestWriteCopyOnWrite {
public void testInsertWithDeduplication() throws Exception { public void testInsertWithDeduplication() throws Exception {
// reset the config option // reset the config option
conf.setDouble(FlinkOptions.WRITE_BATCH_SIZE, 0.0008); // 839 bytes batch size conf.setDouble(FlinkOptions.WRITE_BATCH_SIZE, 0.0008); // 839 bytes batch size
conf.setBoolean(FlinkOptions.INSERT_DROP_DUPS, true); conf.setBoolean(FlinkOptions.PRE_COMBINE, true);
funcWrapper = new StreamWriteFunctionWrapper<>(tempFile.getAbsolutePath(), conf); funcWrapper = new StreamWriteFunctionWrapper<>(tempFile.getAbsolutePath(), conf);
// open the function and ingest data // open the function and ingest data

View File

@@ -652,7 +652,7 @@ public class HoodieDataSourceITCase extends AbstractTestBase {
String hoodieTableDDL = sql("t1") String hoodieTableDDL = sql("t1")
.option(FlinkOptions.PATH, tempFile.getAbsolutePath()) .option(FlinkOptions.PATH, tempFile.getAbsolutePath())
.option(FlinkOptions.INDEX_GLOBAL_ENABLED, true) .option(FlinkOptions.INDEX_GLOBAL_ENABLED, true)
.option(FlinkOptions.INSERT_DROP_DUPS, true) .option(FlinkOptions.PRE_COMBINE, true)
.end(); .end();
streamTableEnv.executeSql(hoodieTableDDL); streamTableEnv.executeSql(hoodieTableDDL);
@@ -674,7 +674,7 @@ public class HoodieDataSourceITCase extends AbstractTestBase {
String hoodieTableDDL = sql("t1") String hoodieTableDDL = sql("t1")
.option(FlinkOptions.PATH, tempFile.getAbsolutePath()) .option(FlinkOptions.PATH, tempFile.getAbsolutePath())
.option(FlinkOptions.INDEX_GLOBAL_ENABLED, false) .option(FlinkOptions.INDEX_GLOBAL_ENABLED, false)
.option(FlinkOptions.INSERT_DROP_DUPS, true) .option(FlinkOptions.PRE_COMBINE, true)
.end(); .end();
streamTableEnv.executeSql(hoodieTableDDL); streamTableEnv.executeSql(hoodieTableDDL);

View File

@@ -394,6 +394,21 @@ public class TestHoodieTableFactory {
is("UTC")); is("UTC"));
} }
@Test
void testSetupWriteOptionsForSink() {
final HoodieTableSink tableSink1 =
(HoodieTableSink) new HoodieTableFactory().createDynamicTableSink(MockContext.getInstance(this.conf));
final Configuration conf1 = tableSink1.getConf();
assertThat(conf1.get(FlinkOptions.PRE_COMBINE), is(true));
// set up operation as 'insert'
this.conf.setString(FlinkOptions.OPERATION, "insert");
HoodieTableSink tableSink2 =
(HoodieTableSink) new HoodieTableFactory().createDynamicTableSink(MockContext.getInstance(this.conf));
Configuration conf2 = tableSink2.getConf();
assertThat(conf2.get(FlinkOptions.PRE_COMBINE), is(false));
}
// ------------------------------------------------------------------------- // -------------------------------------------------------------------------
// Inner Class // Inner Class
// ------------------------------------------------------------------------- // -------------------------------------------------------------------------