[HUDI-2032] Make keygen class and keygen type optional for FlinkStreamerConfig (#3104)
* [HUDI-2032] Make keygen class and keygen type optional for FlinkStreamerConfig * Address the review suggestion
This commit is contained in:
@@ -21,6 +21,7 @@ package org.apache.hudi.streamer;
|
|||||||
import org.apache.hudi.client.utils.OperationConverter;
|
import org.apache.hudi.client.utils.OperationConverter;
|
||||||
import org.apache.hudi.common.model.OverwriteWithLatestAvroPayload;
|
import org.apache.hudi.common.model.OverwriteWithLatestAvroPayload;
|
||||||
import org.apache.hudi.common.model.WriteOperationType;
|
import org.apache.hudi.common.model.WriteOperationType;
|
||||||
|
import org.apache.hudi.common.util.StringUtils;
|
||||||
import org.apache.hudi.configuration.FlinkOptions;
|
import org.apache.hudi.configuration.FlinkOptions;
|
||||||
import org.apache.hudi.keygen.constant.KeyGeneratorType;
|
import org.apache.hudi.keygen.constant.KeyGeneratorType;
|
||||||
import org.apache.hudi.util.StreamerUtil;
|
import org.apache.hudi.util.StreamerUtil;
|
||||||
@@ -155,6 +156,11 @@ public class FlinkStreamerConfig extends Configuration {
|
|||||||
conf.setString(FlinkOptions.RECORD_KEY_FIELD, config.recordKeyField);
|
conf.setString(FlinkOptions.RECORD_KEY_FIELD, config.recordKeyField);
|
||||||
conf.setString(FlinkOptions.PARTITION_PATH_FIELD, config.partitionPathField);
|
conf.setString(FlinkOptions.PARTITION_PATH_FIELD, config.partitionPathField);
|
||||||
conf.setString(FlinkOptions.KEYGEN_CLASS, config.keygenClass);
|
conf.setString(FlinkOptions.KEYGEN_CLASS, config.keygenClass);
|
||||||
|
if (!StringUtils.isNullOrEmpty(config.keygenClass)) {
|
||||||
|
conf.setString(FlinkOptions.KEYGEN_CLASS, config.keygenClass);
|
||||||
|
} else {
|
||||||
|
conf.setString(FlinkOptions.KEYGEN_TYPE, config.keygenType);
|
||||||
|
}
|
||||||
conf.setInteger(FlinkOptions.WRITE_TASKS, config.writeTaskNum);
|
conf.setInteger(FlinkOptions.WRITE_TASKS, config.writeTaskNum);
|
||||||
|
|
||||||
return conf;
|
return conf;
|
||||||
|
|||||||
Reference in New Issue
Block a user