[HUDI-2274] Allows INSERT duplicates for Flink MOR table (#3403)
This commit is contained in:
@@ -82,10 +82,12 @@ public class HoodieFlinkMergeOnReadTable<T extends HoodieRecordPayload>
|
|||||||
HoodieWriteHandle<?, ?, ?, ?> writeHandle,
|
HoodieWriteHandle<?, ?, ?, ?> writeHandle,
|
||||||
String instantTime,
|
String instantTime,
|
||||||
List<HoodieRecord<T>> hoodieRecords) {
|
List<HoodieRecord<T>> hoodieRecords) {
|
||||||
ValidationUtils.checkArgument(writeHandle instanceof FlinkAppendHandle,
|
if (writeHandle instanceof FlinkAppendHandle) {
|
||||||
"MOR write handle should always be a FlinkAppendHandle");
|
|
||||||
FlinkAppendHandle<?, ?, ?, ?> appendHandle = (FlinkAppendHandle<?, ?, ?, ?>) writeHandle;
|
FlinkAppendHandle<?, ?, ?, ?> appendHandle = (FlinkAppendHandle<?, ?, ?, ?>) writeHandle;
|
||||||
return new FlinkUpsertDeltaCommitActionExecutor<>(context, appendHandle, config, this, instantTime, hoodieRecords).execute();
|
return new FlinkUpsertDeltaCommitActionExecutor<>(context, appendHandle, config, this, instantTime, hoodieRecords).execute();
|
||||||
|
} else {
|
||||||
|
return super.insert(context, writeHandle, instantTime, hoodieRecords);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|||||||
@@ -209,11 +209,11 @@ public class FlinkOptions extends HoodieConfig {
|
|||||||
.defaultValue(TABLE_TYPE_COPY_ON_WRITE)
|
.defaultValue(TABLE_TYPE_COPY_ON_WRITE)
|
||||||
.withDescription("Type of table to write. COPY_ON_WRITE (or) MERGE_ON_READ");
|
.withDescription("Type of table to write. COPY_ON_WRITE (or) MERGE_ON_READ");
|
||||||
|
|
||||||
public static final ConfigOption<Boolean> INSERT_ALLOW_DUP = ConfigOptions
|
public static final ConfigOption<Boolean> INSERT_DEDUP = ConfigOptions
|
||||||
.key("write.insert.allow_dup")
|
.key("write.insert.deduplicate")
|
||||||
.booleanType()
|
.booleanType()
|
||||||
.defaultValue(true)
|
.defaultValue(true)
|
||||||
.withDescription("Whether to allow data duplication for INSERT operation, if enabled, writes the base files directly, default true");
|
.withDescription("Whether to deduplicate for INSERT operation, if disabled, writes the base files directly, default true");
|
||||||
|
|
||||||
public static final ConfigOption<String> OPERATION = ConfigOptions
|
public static final ConfigOption<String> OPERATION = ConfigOptions
|
||||||
.key("write.operation")
|
.key("write.operation")
|
||||||
|
|||||||
@@ -69,8 +69,8 @@ public class FlinkStreamerConfig extends Configuration {
|
|||||||
@Parameter(names = {"--table-type"}, description = "Type of table. COPY_ON_WRITE (or) MERGE_ON_READ.", required = true)
|
@Parameter(names = {"--table-type"}, description = "Type of table. COPY_ON_WRITE (or) MERGE_ON_READ.", required = true)
|
||||||
public String tableType;
|
public String tableType;
|
||||||
|
|
||||||
@Parameter(names = {"--insert-allow-dup"}, description = "Whether to allow data duplication for INSERT operation, if enabled, writes the base files directly.", required = true)
|
@Parameter(names = {"--insert-dedup"}, description = "Whether to deduplicate for INSERT operation, if disabled, writes the base files directly.", required = true)
|
||||||
public Boolean insertAllowDup = true;
|
public Boolean insertDedup = true;
|
||||||
|
|
||||||
@Parameter(names = {"--props"}, description = "Path to properties file on localfs or dfs, with configurations for "
|
@Parameter(names = {"--props"}, description = "Path to properties file on localfs or dfs, with configurations for "
|
||||||
+ "hoodie client, schema provider, key generator and data source. For hoodie client props, sane defaults are "
|
+ "hoodie client, schema provider, key generator and data source. For hoodie client props, sane defaults are "
|
||||||
@@ -308,7 +308,7 @@ public class FlinkStreamerConfig extends Configuration {
|
|||||||
conf.setString(FlinkOptions.TABLE_NAME, config.targetTableName);
|
conf.setString(FlinkOptions.TABLE_NAME, config.targetTableName);
|
||||||
// copy_on_write works same as COPY_ON_WRITE
|
// copy_on_write works same as COPY_ON_WRITE
|
||||||
conf.setString(FlinkOptions.TABLE_TYPE, config.tableType.toUpperCase());
|
conf.setString(FlinkOptions.TABLE_TYPE, config.tableType.toUpperCase());
|
||||||
conf.setBoolean(FlinkOptions.INSERT_ALLOW_DUP, config.insertAllowDup);
|
conf.setBoolean(FlinkOptions.INSERT_DEDUP, config.insertDedup);
|
||||||
conf.setString(FlinkOptions.OPERATION, config.operation.value());
|
conf.setString(FlinkOptions.OPERATION, config.operation.value());
|
||||||
conf.setString(FlinkOptions.PRECOMBINE_FIELD, config.sourceOrderingField);
|
conf.setString(FlinkOptions.PRECOMBINE_FIELD, config.sourceOrderingField);
|
||||||
conf.setString(FlinkOptions.PAYLOAD_CLASS, config.payloadClassName);
|
conf.setString(FlinkOptions.PAYLOAD_CLASS, config.payloadClassName);
|
||||||
|
|||||||
@@ -23,6 +23,7 @@ import org.apache.hudi.hive.MultiPartKeysValueExtractor;
|
|||||||
import org.apache.hudi.keygen.ComplexAvroKeyGenerator;
|
import org.apache.hudi.keygen.ComplexAvroKeyGenerator;
|
||||||
import org.apache.hudi.keygen.NonpartitionedAvroKeyGenerator;
|
import org.apache.hudi.keygen.NonpartitionedAvroKeyGenerator;
|
||||||
import org.apache.hudi.util.AvroSchemaConverter;
|
import org.apache.hudi.util.AvroSchemaConverter;
|
||||||
|
import org.apache.hudi.util.StreamerUtil;
|
||||||
|
|
||||||
import org.apache.flink.configuration.ConfigOption;
|
import org.apache.flink.configuration.ConfigOption;
|
||||||
import org.apache.flink.configuration.Configuration;
|
import org.apache.flink.configuration.Configuration;
|
||||||
@@ -129,11 +130,6 @@ public class HoodieTableFactory implements DynamicTableSourceFactory, DynamicTab
|
|||||||
throw new ValidationException("Field " + preCombineField + " does not exist in the table schema."
|
throw new ValidationException("Field " + preCombineField + " does not exist in the table schema."
|
||||||
+ "Please check 'write.precombine.field' option.");
|
+ "Please check 'write.precombine.field' option.");
|
||||||
}
|
}
|
||||||
|
|
||||||
if (conf.getString(FlinkOptions.TABLE_TYPE).toUpperCase().equals(FlinkOptions.TABLE_TYPE_MERGE_ON_READ)
|
|
||||||
&& conf.getBoolean(FlinkOptions.INSERT_ALLOW_DUP)) {
|
|
||||||
throw new ValidationException("Option 'write.insert.allow_dup' is only allowed for COPY_ON_WRITE table.");
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@@ -214,6 +210,11 @@ public class HoodieTableFactory implements DynamicTableSourceFactory, DynamicTab
|
|||||||
// if compaction schedule is on, tweak the target io to 500GB
|
// if compaction schedule is on, tweak the target io to 500GB
|
||||||
conf.setLong(FlinkOptions.COMPACTION_TARGET_IO, 500 * 1024L);
|
conf.setLong(FlinkOptions.COMPACTION_TARGET_IO, 500 * 1024L);
|
||||||
}
|
}
|
||||||
|
if (StreamerUtil.allowDuplicateInserts(conf)) {
|
||||||
|
// no need for compaction if insert duplicates is allowed
|
||||||
|
conf.setBoolean(FlinkOptions.COMPACTION_ASYNC_ENABLED, false);
|
||||||
|
conf.setBoolean(FlinkOptions.COMPACTION_SCHEDULE_ENABLED, false);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|||||||
@@ -350,6 +350,6 @@ public class StreamerUtil {
|
|||||||
|
|
||||||
public static boolean allowDuplicateInserts(Configuration conf) {
|
public static boolean allowDuplicateInserts(Configuration conf) {
|
||||||
WriteOperationType operationType = WriteOperationType.fromValue(conf.getString(FlinkOptions.OPERATION));
|
WriteOperationType operationType = WriteOperationType.fromValue(conf.getString(FlinkOptions.OPERATION));
|
||||||
return operationType == WriteOperationType.INSERT && conf.getBoolean(FlinkOptions.INSERT_ALLOW_DUP);
|
return operationType == WriteOperationType.INSERT && !conf.getBoolean(FlinkOptions.INSERT_DEDUP);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -538,6 +538,7 @@ public class TestWriteCopyOnWrite {
|
|||||||
// reset the config option
|
// reset the config option
|
||||||
conf.setDouble(FlinkOptions.WRITE_BATCH_SIZE, 0.0006); // 630 bytes batch size
|
conf.setDouble(FlinkOptions.WRITE_BATCH_SIZE, 0.0006); // 630 bytes batch size
|
||||||
conf.setString(FlinkOptions.OPERATION, WriteOperationType.INSERT.value());
|
conf.setString(FlinkOptions.OPERATION, WriteOperationType.INSERT.value());
|
||||||
|
conf.setBoolean(FlinkOptions.INSERT_DEDUP, false);
|
||||||
funcWrapper = new StreamWriteFunctionWrapper<>(tempFile.getAbsolutePath(), conf);
|
funcWrapper = new StreamWriteFunctionWrapper<>(tempFile.getAbsolutePath(), conf);
|
||||||
|
|
||||||
// open the function and ingest data
|
// open the function and ingest data
|
||||||
|
|||||||
@@ -37,7 +37,6 @@ import org.apache.avro.Schema;
|
|||||||
import org.apache.flink.configuration.Configuration;
|
import org.apache.flink.configuration.Configuration;
|
||||||
import org.apache.hadoop.fs.FileSystem;
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
import org.junit.jupiter.api.BeforeEach;
|
import org.junit.jupiter.api.BeforeEach;
|
||||||
import org.junit.jupiter.api.Test;
|
|
||||||
|
|
||||||
import java.io.File;
|
import java.io.File;
|
||||||
import java.util.Comparator;
|
import java.util.Comparator;
|
||||||
@@ -68,11 +67,6 @@ public class TestWriteMergeOnRead extends TestWriteCopyOnWrite {
|
|||||||
conf.setBoolean(FlinkOptions.COMPACTION_ASYNC_ENABLED, false);
|
conf.setBoolean(FlinkOptions.COMPACTION_ASYNC_ENABLED, false);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
|
||||||
public void testInsertAllowsDuplication() {
|
|
||||||
// ignore the test because only COW table supports INSERT duplication
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected void checkWrittenData(File baseFile, Map<String, String> expected, int partitions) throws Exception {
|
protected void checkWrittenData(File baseFile, Map<String, String> expected, int partitions) throws Exception {
|
||||||
HoodieTableMetaClient metaClient = HoodieFlinkTable.create(writeConfig, context).getMetaClient();
|
HoodieTableMetaClient metaClient = HoodieFlinkTable.create(writeConfig, context).getMetaClient();
|
||||||
|
|||||||
@@ -22,7 +22,6 @@ import org.apache.hudi.common.model.HoodieTableType;
|
|||||||
import org.apache.hudi.configuration.FlinkOptions;
|
import org.apache.hudi.configuration.FlinkOptions;
|
||||||
|
|
||||||
import org.apache.flink.configuration.Configuration;
|
import org.apache.flink.configuration.Configuration;
|
||||||
import org.junit.jupiter.api.Test;
|
|
||||||
|
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
@@ -38,11 +37,6 @@ public class TestWriteMergeOnReadWithCompact extends TestWriteCopyOnWrite {
|
|||||||
conf.setInteger(FlinkOptions.COMPACTION_DELTA_COMMITS, 1);
|
conf.setInteger(FlinkOptions.COMPACTION_DELTA_COMMITS, 1);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
|
||||||
public void testInsertAllowsDuplication() {
|
|
||||||
// ignore the test because only COW table supports INSERT duplication
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected Map<String, String> getExpectedBeforeCheckpointComplete() {
|
protected Map<String, String> getExpectedBeforeCheckpointComplete() {
|
||||||
return EXPECTED1;
|
return EXPECTED1;
|
||||||
|
|||||||
@@ -340,24 +340,6 @@ public class TestHoodieTableFactory {
|
|||||||
assertThat(conf2.getInteger(FlinkOptions.ARCHIVE_MAX_COMMITS), is(45));
|
assertThat(conf2.getInteger(FlinkOptions.ARCHIVE_MAX_COMMITS), is(45));
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
|
||||||
void testMorTableInsertAllowDuplication() {
|
|
||||||
TableSchema schema = TableSchema.builder()
|
|
||||||
.field("f0", DataTypes.INT().notNull())
|
|
||||||
.field("f1", DataTypes.VARCHAR(20))
|
|
||||||
.field("f2", DataTypes.TIMESTAMP(3))
|
|
||||||
.field("ts", DataTypes.TIMESTAMP(3))
|
|
||||||
.primaryKey("f0")
|
|
||||||
.build();
|
|
||||||
// overwrite the operation
|
|
||||||
this.conf.setString(FlinkOptions.OPERATION.key(), "insert");
|
|
||||||
this.conf.setString(FlinkOptions.TABLE_TYPE.key(), FlinkOptions.TABLE_TYPE_MERGE_ON_READ);
|
|
||||||
|
|
||||||
final MockContext sinkContext = MockContext.getInstance(this.conf, schema, "f2");
|
|
||||||
assertThrows(ValidationException.class, () -> new HoodieTableFactory().createDynamicTableSink(sinkContext),
|
|
||||||
"Option 'write.insert.allow_dup' is only allowed for COPY_ON_WRITE table.");
|
|
||||||
}
|
|
||||||
|
|
||||||
// -------------------------------------------------------------------------
|
// -------------------------------------------------------------------------
|
||||||
// Inner Class
|
// Inner Class
|
||||||
// -------------------------------------------------------------------------
|
// -------------------------------------------------------------------------
|
||||||
|
|||||||
Reference in New Issue
Block a user