[HUDI-2087] Support Append only in Flink stream (#3390)
Co-authored-by: 喻兆靖 <yuzhaojing@bilibili.com>
This commit is contained in:
@@ -209,6 +209,12 @@ public class FlinkOptions extends HoodieConfig {
|
||||
.defaultValue(TABLE_TYPE_COPY_ON_WRITE)
|
||||
.withDescription("Type of table to write. COPY_ON_WRITE (or) MERGE_ON_READ");
|
||||
|
||||
public static final ConfigOption<Boolean> INSERT_ALLOW_DUP = ConfigOptions
|
||||
.key("write.insert.allow_dup")
|
||||
.booleanType()
|
||||
.defaultValue(true)
|
||||
.withDescription("Whether to allow data duplication for INSERT operation, if enabled, writes the base files directly, default true");
|
||||
|
||||
public static final ConfigOption<String> OPERATION = ConfigOptions
|
||||
.key("write.operation")
|
||||
.stringType()
|
||||
|
||||
@@ -91,8 +91,6 @@ public class BucketAssignFunction<K, I, O extends HoodieRecord<?>>
|
||||
|
||||
private final Configuration conf;
|
||||
|
||||
private transient org.apache.hadoop.conf.Configuration hadoopConf;
|
||||
|
||||
private final boolean isChangingRecords;
|
||||
|
||||
/**
|
||||
@@ -117,21 +115,25 @@ public class BucketAssignFunction<K, I, O extends HoodieRecord<?>>
|
||||
public void open(Configuration parameters) throws Exception {
|
||||
super.open(parameters);
|
||||
HoodieWriteConfig writeConfig = StreamerUtil.getHoodieClientConfig(this.conf);
|
||||
this.hadoopConf = StreamerUtil.getHadoopConf();
|
||||
HoodieFlinkEngineContext context = new HoodieFlinkEngineContext(
|
||||
new SerializableConfiguration(this.hadoopConf),
|
||||
new SerializableConfiguration(StreamerUtil.getHadoopConf()),
|
||||
new FlinkTaskContextSupplier(getRuntimeContext()));
|
||||
this.bucketAssigner = BucketAssigners.create(
|
||||
getRuntimeContext().getIndexOfThisSubtask(),
|
||||
getRuntimeContext().getMaxNumberOfParallelSubtasks(),
|
||||
getRuntimeContext().getNumberOfParallelSubtasks(),
|
||||
WriteOperationType.isOverwrite(WriteOperationType.fromValue(conf.getString(FlinkOptions.OPERATION))),
|
||||
ignoreSmallFiles(writeConfig),
|
||||
HoodieTableType.valueOf(conf.getString(FlinkOptions.TABLE_TYPE)),
|
||||
context,
|
||||
writeConfig);
|
||||
this.payloadCreation = PayloadCreation.instance(this.conf);
|
||||
}
|
||||
|
||||
private boolean ignoreSmallFiles(HoodieWriteConfig writeConfig) {
|
||||
WriteOperationType operationType = WriteOperationType.fromValue(conf.getString(FlinkOptions.OPERATION));
|
||||
return WriteOperationType.isOverwrite(operationType) || writeConfig.allowDuplicateInserts();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void snapshotState(FunctionSnapshotContext context) {
|
||||
this.bucketAssigner.reset();
|
||||
|
||||
@@ -35,25 +35,25 @@ public abstract class BucketAssigners {
|
||||
/**
|
||||
* Creates a {@code BucketAssigner}.
|
||||
*
|
||||
* @param taskID The task ID
|
||||
* @param maxParallelism The max parallelism
|
||||
* @param numTasks The number of tasks
|
||||
* @param overwrite Whether the write operation is OVERWRITE
|
||||
* @param tableType The table type
|
||||
* @param context The engine context
|
||||
* @param config The configuration
|
||||
* @param taskID The task ID
|
||||
* @param maxParallelism The max parallelism
|
||||
* @param numTasks The number of tasks
|
||||
* @param ignoreSmallFiles Whether to ignore the small files
|
||||
* @param tableType The table type
|
||||
* @param context The engine context
|
||||
* @param config The configuration
|
||||
* @return the bucket assigner instance
|
||||
*/
|
||||
public static BucketAssigner create(
|
||||
int taskID,
|
||||
int maxParallelism,
|
||||
int numTasks,
|
||||
boolean overwrite,
|
||||
boolean ignoreSmallFiles,
|
||||
HoodieTableType tableType,
|
||||
HoodieFlinkEngineContext context,
|
||||
HoodieWriteConfig config) {
|
||||
boolean delta = tableType.equals(HoodieTableType.MERGE_ON_READ);
|
||||
WriteProfile writeProfile = WriteProfiles.singleton(overwrite, delta, config, context);
|
||||
WriteProfile writeProfile = WriteProfiles.singleton(ignoreSmallFiles, delta, config, context);
|
||||
return new BucketAssigner(taskID, maxParallelism, numTasks, writeProfile, config);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -26,13 +26,18 @@ import java.util.Collections;
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
* WriteProfile for INSERT OVERWRITE and INSERT OVERWRITE TABLE operations,
|
||||
* this WriteProfile always skip the existing small files because of the 'OVERWRITE' semantics.
|
||||
* WriteProfile that always return empty small files.
|
||||
*
|
||||
* <p>This write profile is used for cases:
|
||||
* i). INSERT OVERWRITE and INSERT OVERWRITE TABLE operations,
|
||||
* the existing small files are ignored because of the 'OVERWRITE' semantics;
|
||||
* ii). INSERT operation when data file merge is disabled.
|
||||
*
|
||||
*
|
||||
* <p>Note: assumes the index can always index log files for Flink write.
|
||||
*/
|
||||
public class OverwriteWriteProfile extends WriteProfile {
|
||||
public OverwriteWriteProfile(HoodieWriteConfig config, HoodieFlinkEngineContext context) {
|
||||
public class EmptyWriteProfile extends WriteProfile {
|
||||
public EmptyWriteProfile(HoodieWriteConfig config, HoodieFlinkEngineContext context) {
|
||||
super(config, context);
|
||||
}
|
||||
|
||||
@@ -53,22 +53,22 @@ public class WriteProfiles {
|
||||
|
||||
private WriteProfiles() {}
|
||||
|
||||
public static synchronized WriteProfile singleton(
|
||||
boolean overwrite,
|
||||
public static synchronized WriteProfile singleton(
|
||||
boolean ignoreSmallFiles,
|
||||
boolean delta,
|
||||
HoodieWriteConfig config,
|
||||
HoodieFlinkEngineContext context) {
|
||||
return PROFILES.computeIfAbsent(config.getBasePath(),
|
||||
k -> getWriteProfile(overwrite, delta, config, context));
|
||||
k -> getWriteProfile(ignoreSmallFiles, delta, config, context));
|
||||
}
|
||||
|
||||
private static WriteProfile getWriteProfile(
|
||||
boolean overwrite,
|
||||
boolean ignoreSmallFiles,
|
||||
boolean delta,
|
||||
HoodieWriteConfig config,
|
||||
HoodieFlinkEngineContext context) {
|
||||
if (overwrite) {
|
||||
return new OverwriteWriteProfile(config, context);
|
||||
if (ignoreSmallFiles) {
|
||||
return new EmptyWriteProfile(config, context);
|
||||
} else if (delta) {
|
||||
return new DeltaWriteProfile(config, context);
|
||||
} else {
|
||||
|
||||
@@ -69,6 +69,9 @@ public class FlinkStreamerConfig extends Configuration {
|
||||
@Parameter(names = {"--table-type"}, description = "Type of table. COPY_ON_WRITE (or) MERGE_ON_READ.", required = true)
|
||||
public String tableType;
|
||||
|
||||
@Parameter(names = {"--insert-allow-dup"}, description = "Whether to allow data duplication for INSERT operation, if enabled, writes the base files directly.", required = true)
|
||||
public Boolean insertAllowDup = true;
|
||||
|
||||
@Parameter(names = {"--props"}, description = "Path to properties file on localfs or dfs, with configurations for "
|
||||
+ "hoodie client, schema provider, key generator and data source. For hoodie client props, sane defaults are "
|
||||
+ "used, but recommend use to provide basic things like metrics endpoints, hive configs etc. For sources, refer"
|
||||
@@ -305,6 +308,7 @@ public class FlinkStreamerConfig extends Configuration {
|
||||
conf.setString(FlinkOptions.TABLE_NAME, config.targetTableName);
|
||||
// copy_on_write works same as COPY_ON_WRITE
|
||||
conf.setString(FlinkOptions.TABLE_TYPE, config.tableType.toUpperCase());
|
||||
conf.setBoolean(FlinkOptions.INSERT_ALLOW_DUP, config.insertAllowDup);
|
||||
conf.setString(FlinkOptions.OPERATION, config.operation.value());
|
||||
conf.setString(FlinkOptions.PRECOMBINE_FIELD, config.sourceOrderingField);
|
||||
conf.setString(FlinkOptions.PAYLOAD_CLASS, config.payloadClassName);
|
||||
|
||||
@@ -62,7 +62,7 @@ public class HoodieTableFactory implements DynamicTableSourceFactory, DynamicTab
|
||||
|
||||
Configuration conf = (Configuration) helper.getOptions();
|
||||
TableSchema schema = TableSchemaUtils.getPhysicalSchema(context.getCatalogTable().getSchema());
|
||||
validateRequiredFields(conf, schema);
|
||||
sanityCheck(conf, schema);
|
||||
setupConfOptions(conf, context.getObjectIdentifier().getObjectName(), context.getCatalogTable(), schema);
|
||||
|
||||
Path path = new Path(conf.getOptional(FlinkOptions.PATH).orElseThrow(() ->
|
||||
@@ -79,7 +79,7 @@ public class HoodieTableFactory implements DynamicTableSourceFactory, DynamicTab
|
||||
public DynamicTableSink createDynamicTableSink(Context context) {
|
||||
Configuration conf = FlinkOptions.fromMap(context.getCatalogTable().getOptions());
|
||||
TableSchema schema = TableSchemaUtils.getPhysicalSchema(context.getCatalogTable().getSchema());
|
||||
validateRequiredFields(conf, schema);
|
||||
sanityCheck(conf, schema);
|
||||
setupConfOptions(conf, context.getObjectIdentifier().getObjectName(), context.getCatalogTable(), schema);
|
||||
return new HoodieTableSink(conf, schema);
|
||||
}
|
||||
@@ -103,12 +103,13 @@ public class HoodieTableFactory implements DynamicTableSourceFactory, DynamicTab
|
||||
// Utilities
|
||||
// -------------------------------------------------------------------------
|
||||
|
||||
/** Validate required options. For e.g, record key and pre_combine key.
|
||||
/**
|
||||
* The sanity check.
|
||||
*
|
||||
* @param conf The table options
|
||||
* @param schema The table schema
|
||||
*/
|
||||
private void validateRequiredFields(Configuration conf, TableSchema schema) {
|
||||
private void sanityCheck(Configuration conf, TableSchema schema) {
|
||||
List<String> fields = Arrays.stream(schema.getFieldNames()).collect(Collectors.toList());
|
||||
|
||||
// validate record key in pk absence.
|
||||
@@ -128,6 +129,11 @@ public class HoodieTableFactory implements DynamicTableSourceFactory, DynamicTab
|
||||
throw new ValidationException("Field " + preCombineField + " does not exist in the table schema."
|
||||
+ "Please check 'write.precombine.field' option.");
|
||||
}
|
||||
|
||||
if (conf.getString(FlinkOptions.TABLE_TYPE).toUpperCase().equals(FlinkOptions.TABLE_TYPE_MERGE_ON_READ)
|
||||
&& conf.getBoolean(FlinkOptions.INSERT_ALLOW_DUP)) {
|
||||
throw new ValidationException("Option 'write.insert.allow_dup' is only allowed for COPY_ON_WRITE table.");
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
|
||||
@@ -27,6 +27,7 @@ import org.apache.hudi.common.config.SerializableConfiguration;
|
||||
import org.apache.hudi.common.config.TypedProperties;
|
||||
import org.apache.hudi.common.engine.EngineType;
|
||||
import org.apache.hudi.common.fs.FSUtils;
|
||||
import org.apache.hudi.common.model.WriteOperationType;
|
||||
import org.apache.hudi.common.table.HoodieTableMetaClient;
|
||||
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
|
||||
import org.apache.hudi.common.util.Option;
|
||||
@@ -145,6 +146,7 @@ public class StreamerUtil {
|
||||
.withEngineType(EngineType.FLINK)
|
||||
.withPath(conf.getString(FlinkOptions.PATH))
|
||||
.combineInput(conf.getBoolean(FlinkOptions.INSERT_DROP_DUPS), true)
|
||||
.withMergeAllowDuplicateOnInserts(allowDuplicateInserts(conf))
|
||||
.withCompactionConfig(
|
||||
HoodieCompactionConfig.newBuilder()
|
||||
.withPayloadClass(conf.getString(FlinkOptions.PAYLOAD_CLASS))
|
||||
@@ -345,4 +347,9 @@ public class StreamerUtil {
|
||||
throw new IOException("Could not load transformer class(es) " + classNames, e);
|
||||
}
|
||||
}
|
||||
|
||||
public static boolean allowDuplicateInserts(Configuration conf) {
|
||||
WriteOperationType operationType = WriteOperationType.fromValue(conf.getString(FlinkOptions.OPERATION));
|
||||
return operationType == WriteOperationType.INSERT && conf.getBoolean(FlinkOptions.INSERT_ALLOW_DUP);
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user