1
0

[HUDI-1929] Support configure KeyGenerator by type (#2993)

This commit is contained in:
wangxianghu
2021-06-08 21:26:10 +08:00
committed by GitHub
parent f760ec543e
commit 7261f08507
20 changed files with 819 additions and 106 deletions

View File

@@ -23,8 +23,8 @@ import org.apache.hudi.common.model.OverwriteWithLatestAvroPayload;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.hive.SlashEncodedDayPartitionValueExtractor;
import org.apache.hudi.keygen.SimpleAvroKeyGenerator;
import org.apache.hudi.keygen.constant.KeyGeneratorOptions;
import org.apache.hudi.keygen.constant.KeyGeneratorType;
import org.apache.hudi.streamer.FlinkStreamerConfig;
import org.apache.hudi.util.StreamerUtil;
@@ -271,9 +271,15 @@ public class FlinkOptions {
public static final ConfigOption<String> KEYGEN_CLASS = ConfigOptions
.key(HoodieWriteConfig.KEYGENERATOR_CLASS_PROP)
.stringType()
.defaultValue(SimpleAvroKeyGenerator.class.getName())
.defaultValue("")
.withDescription("Key generator class, that implements will extract the key out of incoming record");
public static final ConfigOption<String> KEYGEN_TYPE = ConfigOptions
.key(HoodieWriteConfig.KEYGENERATOR_TYPE_PROP)
.stringType()
.defaultValue(KeyGeneratorType.SIMPLE.name())
.withDescription("Key generator type, that implements will extract the key out of incoming record");
public static final ConfigOption<Integer> WRITE_TASKS = ConfigOptions
.key("write.tasks")
.intType()
@@ -539,6 +545,8 @@ public class FlinkOptions {
conf.setBoolean(FlinkOptions.IGNORE_FAILED, config.commitOnErrors);
conf.setString(FlinkOptions.RECORD_KEY_FIELD, config.recordKeyField);
conf.setString(FlinkOptions.PARTITION_PATH_FIELD, config.partitionPathField);
// keygenClass has higher priority than keygenType
conf.setString(FlinkOptions.KEYGEN_TYPE, config.keygenType);
conf.setString(FlinkOptions.KEYGEN_CLASS, config.keygenClass);
conf.setInteger(FlinkOptions.WRITE_TASKS, config.writeTaskNum);

View File

@@ -23,6 +23,7 @@ import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.keygen.KeyGenerator;
import org.apache.hudi.keygen.factory.HoodieAvroKeyGeneratorFactory;
import org.apache.hudi.sink.utils.PayloadCreation;
import org.apache.hudi.util.RowDataToAvroConverters;
import org.apache.hudi.util.StreamerUtil;
@@ -37,6 +38,8 @@ import org.apache.flink.types.RowKind;
import java.io.IOException;
import static org.apache.hudi.util.StreamerUtil.flinkConf2TypedProperties;
/**
* Function that transforms RowData to HoodieRecord.
*/
@@ -82,7 +85,9 @@ public class RowDataToHoodieFunction<I extends RowData, O extends HoodieRecord<?
super.open(parameters);
this.avroSchema = StreamerUtil.getSourceSchema(this.config);
this.converter = RowDataToAvroConverters.createConverter(this.rowType);
this.keyGenerator = StreamerUtil.createKeyGenerator(FlinkOptions.flatOptions(this.config));
this.keyGenerator =
HoodieAvroKeyGeneratorFactory
.createKeyGenerator(flinkConf2TypedProperties(FlinkOptions.flatOptions(this.config)));
this.payloadCreation = PayloadCreation.instance(config);
}

View File

@@ -21,10 +21,10 @@ package org.apache.hudi.streamer;
import org.apache.hudi.client.utils.OperationConverter;
import org.apache.hudi.common.model.OverwriteWithLatestAvroPayload;
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.keygen.SimpleAvroKeyGenerator;
import com.beust.jcommander.Parameter;
import org.apache.flink.configuration.Configuration;
import org.apache.hudi.keygen.constant.KeyGeneratorType;
import java.util.ArrayList;
import java.util.List;
@@ -87,9 +87,12 @@ public class FlinkStreamerConfig extends Configuration {
+ "the `partitionPath` component of `HoodieKey`. Actual value obtained by invoking .toString(). By default `partitionpath`.")
public String partitionPathField = "partitionpath";
@Parameter(names = {"--keygen-class"}, description = "Key generator class, that implements will extract the key out of incoming record.\n"
+ "By default `SimpleAvroKeyGenerator`.")
public String keygenClass = SimpleAvroKeyGenerator.class.getName();
@Parameter(names = {"--keygen-class"}, description = "Key generator class, that implements will extract the key out of incoming record.")
public String keygenClass;
@Parameter(names = {"--keygen-type"}, description = "Key generator type, that implements will extract the key out of incoming record \n"
+ "By default `SIMPLE`.")
public String keygenType = KeyGeneratorType.SIMPLE.name();
@Parameter(names = {"--source-ordering-field"}, description = "Field within source record to decide how"
+ " to break ties between records with same key in input data. Default: 'ts' holding unix timestamp of record.")

View File

@@ -40,8 +40,6 @@ import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.exception.TableNotFoundException;
import org.apache.hudi.keygen.KeyGenerator;
import org.apache.hudi.keygen.SimpleAvroKeyGenerator;
import org.apache.hudi.schema.FilebasedSchemaProvider;
import org.apache.hudi.streamer.FlinkStreamerConfig;
import org.apache.hudi.table.action.compact.CompactionTriggerStrategy;
@@ -138,37 +136,6 @@ public class StreamerUtil {
return FlinkClientUtil.getHadoopConf();
}
/**
* Create a key generator class via reflection, passing in any configs needed.
* <p>
* If the class name of key generator is configured through the properties file, i.e., {@code props}, use the corresponding key generator class; otherwise, use the default key generator class
* specified in {@code DataSourceWriteOptions}.
*/
public static KeyGenerator createKeyGenerator(TypedProperties props) throws IOException {
String keyGeneratorClass = props.getString("hoodie.datasource.write.keygenerator.class",
SimpleAvroKeyGenerator.class.getName());
try {
return (KeyGenerator) ReflectionUtils.loadClass(keyGeneratorClass, props);
} catch (Throwable e) {
throw new IOException("Could not load key generator class " + keyGeneratorClass, e);
}
}
/**
* Create a key generator class via reflection, passing in any configs needed.
* <p>
* If the class name of key generator is configured through the properties file, i.e., {@code props}, use the corresponding key generator class; otherwise, use the default key generator class
* specified in {@link FlinkOptions}.
*/
public static KeyGenerator createKeyGenerator(Configuration conf) throws IOException {
String keyGeneratorClass = conf.getString(FlinkOptions.KEYGEN_CLASS);
try {
return (KeyGenerator) ReflectionUtils.loadClass(keyGeneratorClass, flinkConf2TypedProperties(conf));
} catch (Throwable e) {
throw new IOException("Could not load key generator class " + keyGeneratorClass, e);
}
}
/**
* Create a payload class via reflection, passing in an ordering/precombine value.
*/