[HUDI-1930] Bootstrap support configure KeyGenerator by type (#3170)
* [HUDI-1930] Bootstrap support configure KeyGenerator by type
This commit is contained in:
@@ -36,6 +36,7 @@ import org.apache.hudi.config.HoodieIndexConfig;
|
|||||||
import org.apache.hudi.config.HoodieWriteConfig;
|
import org.apache.hudi.config.HoodieWriteConfig;
|
||||||
import org.apache.hudi.exception.HoodieSavepointException;
|
import org.apache.hudi.exception.HoodieSavepointException;
|
||||||
import org.apache.hudi.index.HoodieIndex;
|
import org.apache.hudi.index.HoodieIndex;
|
||||||
|
import org.apache.hudi.keygen.constant.KeyGeneratorType;
|
||||||
import org.apache.hudi.table.action.compact.strategy.UnBoundedCompactionStrategy;
|
import org.apache.hudi.table.action.compact.strategy.UnBoundedCompactionStrategy;
|
||||||
import org.apache.hudi.table.upgrade.SparkUpgradeDowngrade;
|
import org.apache.hudi.table.upgrade.SparkUpgradeDowngrade;
|
||||||
import org.apache.hudi.utilities.HDFSParquetImporter;
|
import org.apache.hudi.utilities.HDFSParquetImporter;
|
||||||
@@ -58,6 +59,7 @@ import java.io.IOException;
|
|||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
import java.util.Locale;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* This class deals with initializing spark context based on command entered to hudi-cli.
|
* This class deals with initializing spark context based on command entered to hudi-cli.
|
||||||
@@ -353,14 +355,20 @@ public class SparkMain {
|
|||||||
|
|
||||||
private static int doBootstrap(JavaSparkContext jsc, String tableName, String tableType, String basePath,
|
private static int doBootstrap(JavaSparkContext jsc, String tableName, String tableType, String basePath,
|
||||||
String sourcePath, String recordKeyCols, String partitionFields, String parallelism, String schemaProviderClass,
|
String sourcePath, String recordKeyCols, String partitionFields, String parallelism, String schemaProviderClass,
|
||||||
String bootstrapIndexClass, String selectorClass, String keyGeneratorClass, String fullBootstrapInputProvider,
|
String bootstrapIndexClass, String selectorClass, String keyGenerator, String fullBootstrapInputProvider,
|
||||||
String payloadClassName, String enableHiveSync, String propsFilePath, List<String> configs) throws IOException {
|
String payloadClassName, String enableHiveSync, String propsFilePath, List<String> configs) throws IOException {
|
||||||
|
|
||||||
TypedProperties properties = propsFilePath == null ? UtilHelpers.buildProperties(configs)
|
TypedProperties properties = propsFilePath == null ? UtilHelpers.buildProperties(configs)
|
||||||
: UtilHelpers.readConfig(FSUtils.getFs(propsFilePath, jsc.hadoopConfiguration()), new Path(propsFilePath), configs).getConfig();
|
: UtilHelpers.readConfig(FSUtils.getFs(propsFilePath, jsc.hadoopConfiguration()), new Path(propsFilePath), configs).getConfig();
|
||||||
|
|
||||||
properties.setProperty(HoodieBootstrapConfig.BOOTSTRAP_BASE_PATH_PROP.key(), sourcePath);
|
properties.setProperty(HoodieBootstrapConfig.BOOTSTRAP_BASE_PATH_PROP.key(), sourcePath);
|
||||||
properties.setProperty(HoodieBootstrapConfig.BOOTSTRAP_KEYGEN_CLASS.key(), keyGeneratorClass);
|
|
||||||
|
if (!StringUtils.isNullOrEmpty(keyGenerator) && KeyGeneratorType.getNames().contains(keyGenerator.toUpperCase(Locale.ROOT))) {
|
||||||
|
properties.setProperty(HoodieBootstrapConfig.BOOTSTRAP_KEYGEN_TYPE.key(), keyGenerator.toUpperCase(Locale.ROOT));
|
||||||
|
} else {
|
||||||
|
properties.setProperty(HoodieBootstrapConfig.BOOTSTRAP_KEYGEN_CLASS.key(), keyGenerator);
|
||||||
|
}
|
||||||
|
|
||||||
properties.setProperty(HoodieBootstrapConfig.FULL_BOOTSTRAP_INPUT_PROVIDER.key(), fullBootstrapInputProvider);
|
properties.setProperty(HoodieBootstrapConfig.FULL_BOOTSTRAP_INPUT_PROVIDER.key(), fullBootstrapInputProvider);
|
||||||
properties.setProperty(HoodieBootstrapConfig.BOOTSTRAP_PARALLELISM.key(), parallelism);
|
properties.setProperty(HoodieBootstrapConfig.BOOTSTRAP_PARALLELISM.key(), parallelism);
|
||||||
properties.setProperty(HoodieBootstrapConfig.BOOTSTRAP_MODE_SELECTOR.key(), selectorClass);
|
properties.setProperty(HoodieBootstrapConfig.BOOTSTRAP_MODE_SELECTOR.key(), selectorClass);
|
||||||
|
|||||||
@@ -25,6 +25,7 @@ import org.apache.hudi.common.bootstrap.index.HFileBootstrapIndex;
|
|||||||
import org.apache.hudi.common.config.ConfigProperty;
|
import org.apache.hudi.common.config.ConfigProperty;
|
||||||
import org.apache.hudi.common.config.HoodieConfig;
|
import org.apache.hudi.common.config.HoodieConfig;
|
||||||
import org.apache.hudi.common.table.HoodieTableConfig;
|
import org.apache.hudi.common.table.HoodieTableConfig;
|
||||||
|
import org.apache.hudi.keygen.constant.KeyGeneratorType;
|
||||||
|
|
||||||
import java.io.File;
|
import java.io.File;
|
||||||
import java.io.FileReader;
|
import java.io.FileReader;
|
||||||
@@ -60,6 +61,12 @@ public class HoodieBootstrapConfig extends HoodieConfig {
|
|||||||
.sinceVersion("0.6.0")
|
.sinceVersion("0.6.0")
|
||||||
.withDocumentation("Key generator implementation to be used for generating keys from the bootstrapped dataset");
|
.withDocumentation("Key generator implementation to be used for generating keys from the bootstrapped dataset");
|
||||||
|
|
||||||
|
public static final ConfigProperty<String> BOOTSTRAP_KEYGEN_TYPE = ConfigProperty
|
||||||
|
.key("hoodie.bootstrap.keygen.type")
|
||||||
|
.defaultValue(KeyGeneratorType.SIMPLE.name())
|
||||||
|
.sinceVersion("0.9.0")
|
||||||
|
.withDocumentation("Type of build-in key generator, currently support SIMPLE, COMPLEX, TIMESTAMP, CUSTOM, NON_PARTITION, GLOBAL_DELETE");
|
||||||
|
|
||||||
public static final ConfigProperty<String> BOOTSTRAP_PARTITION_PATH_TRANSLATOR_CLASS = ConfigProperty
|
public static final ConfigProperty<String> BOOTSTRAP_PARTITION_PATH_TRANSLATOR_CLASS = ConfigProperty
|
||||||
.key("hoodie.bootstrap.partitionpath.translator.class")
|
.key("hoodie.bootstrap.partitionpath.translator.class")
|
||||||
.defaultValue(IdentityBootstrapPartitionPathTranslator.class.getName())
|
.defaultValue(IdentityBootstrapPartitionPathTranslator.class.getName())
|
||||||
@@ -131,6 +138,11 @@ public class HoodieBootstrapConfig extends HoodieConfig {
|
|||||||
return this;
|
return this;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public Builder withBootstrapKeyGenType(String keyGenType) {
|
||||||
|
bootstrapConfig.setValue(BOOTSTRAP_KEYGEN_TYPE, keyGenType);
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
public Builder withBootstrapPartitionPathTranslatorClass(String partitionPathTranslatorClass) {
|
public Builder withBootstrapPartitionPathTranslatorClass(String partitionPathTranslatorClass) {
|
||||||
bootstrapConfig
|
bootstrapConfig
|
||||||
.setValue(BOOTSTRAP_PARTITION_PATH_TRANSLATOR_CLASS, partitionPathTranslatorClass);
|
.setValue(BOOTSTRAP_PARTITION_PATH_TRANSLATOR_CLASS, partitionPathTranslatorClass);
|
||||||
|
|||||||
@@ -1126,6 +1126,10 @@ public class HoodieWriteConfig extends HoodieConfig {
|
|||||||
return getString(HoodieBootstrapConfig.BOOTSTRAP_KEYGEN_CLASS);
|
return getString(HoodieBootstrapConfig.BOOTSTRAP_KEYGEN_CLASS);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public String getBootstrapKeyGeneratorType() {
|
||||||
|
return getString(HoodieBootstrapConfig.BOOTSTRAP_KEYGEN_TYPE);
|
||||||
|
}
|
||||||
|
|
||||||
public String getBootstrapModeSelectorRegex() {
|
public String getBootstrapModeSelectorRegex() {
|
||||||
return getString(HoodieBootstrapConfig.BOOTSTRAP_MODE_SELECTOR_REGEX);
|
return getString(HoodieBootstrapConfig.BOOTSTRAP_MODE_SELECTOR_REGEX);
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -18,6 +18,10 @@
|
|||||||
|
|
||||||
package org.apache.hudi.keygen.constant;
|
package org.apache.hudi.keygen.constant;
|
||||||
|
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.Arrays;
|
||||||
|
import java.util.List;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Types of {@link org.apache.hudi.keygen.KeyGenerator}.
|
* Types of {@link org.apache.hudi.keygen.KeyGenerator}.
|
||||||
*/
|
*/
|
||||||
@@ -55,5 +59,12 @@ public enum KeyGeneratorType {
|
|||||||
/**
|
/**
|
||||||
* Key generator for deletes using global indices.
|
* Key generator for deletes using global indices.
|
||||||
*/
|
*/
|
||||||
GLOBAL_DELETE
|
GLOBAL_DELETE;
|
||||||
|
|
||||||
|
public static List<String> getNames() {
|
||||||
|
List<String> names = new ArrayList<>(KeyGeneratorType.values().length);
|
||||||
|
Arrays.stream(KeyGeneratorType.values())
|
||||||
|
.forEach(x -> names.add(x.name()));
|
||||||
|
return names;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -18,6 +18,7 @@
|
|||||||
package org.apache.hudi.keygen.factory;
|
package org.apache.hudi.keygen.factory;
|
||||||
|
|
||||||
import org.apache.hudi.common.config.TypedProperties;
|
import org.apache.hudi.common.config.TypedProperties;
|
||||||
|
import org.apache.hudi.common.util.StringUtils;
|
||||||
import org.apache.hudi.config.HoodieWriteConfig;
|
import org.apache.hudi.config.HoodieWriteConfig;
|
||||||
import org.apache.hudi.exception.HoodieKeyGeneratorException;
|
import org.apache.hudi.exception.HoodieKeyGeneratorException;
|
||||||
import org.apache.hudi.keygen.ComplexAvroKeyGenerator;
|
import org.apache.hudi.keygen.ComplexAvroKeyGenerator;
|
||||||
@@ -30,6 +31,9 @@ import org.apache.hudi.keygen.SimpleAvroKeyGenerator;
|
|||||||
import org.apache.hudi.keygen.TimestampBasedAvroKeyGenerator;
|
import org.apache.hudi.keygen.TimestampBasedAvroKeyGenerator;
|
||||||
import org.apache.hudi.keygen.constant.KeyGeneratorType;
|
import org.apache.hudi.keygen.constant.KeyGeneratorType;
|
||||||
|
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.util.Locale;
|
import java.util.Locale;
|
||||||
import java.util.Objects;
|
import java.util.Objects;
|
||||||
@@ -41,6 +45,9 @@ import java.util.Objects;
|
|||||||
* will not be overwritten by {@link KeyGeneratorType}
|
* will not be overwritten by {@link KeyGeneratorType}
|
||||||
*/
|
*/
|
||||||
public class HoodieAvroKeyGeneratorFactory {
|
public class HoodieAvroKeyGeneratorFactory {
|
||||||
|
|
||||||
|
private static final Logger LOG = LoggerFactory.getLogger(HoodieAvroKeyGeneratorFactory.class);
|
||||||
|
|
||||||
public static KeyGenerator createKeyGenerator(TypedProperties props) throws IOException {
|
public static KeyGenerator createKeyGenerator(TypedProperties props) throws IOException {
|
||||||
// keyGenerator class name has higher priority
|
// keyGenerator class name has higher priority
|
||||||
KeyGenerator keyGenerator = KeyGenUtils.createKeyGeneratorByClassName(props);
|
KeyGenerator keyGenerator = KeyGenUtils.createKeyGeneratorByClassName(props);
|
||||||
@@ -50,7 +57,12 @@ public class HoodieAvroKeyGeneratorFactory {
|
|||||||
private static KeyGenerator createAvroKeyGeneratorByType(TypedProperties props) throws IOException {
|
private static KeyGenerator createAvroKeyGeneratorByType(TypedProperties props) throws IOException {
|
||||||
// Use KeyGeneratorType.SIMPLE as default keyGeneratorType
|
// Use KeyGeneratorType.SIMPLE as default keyGeneratorType
|
||||||
String keyGeneratorType =
|
String keyGeneratorType =
|
||||||
props.getString(HoodieWriteConfig.KEYGENERATOR_TYPE_PROP.key(), KeyGeneratorType.SIMPLE.name());
|
props.getString(HoodieWriteConfig.KEYGENERATOR_TYPE_PROP.key(), null);
|
||||||
|
|
||||||
|
if (StringUtils.isNullOrEmpty(keyGeneratorType)) {
|
||||||
|
LOG.info("The value of {} is empty, use SIMPLE", HoodieWriteConfig.KEYGENERATOR_TYPE_PROP.key());
|
||||||
|
keyGeneratorType = KeyGeneratorType.SIMPLE.name();
|
||||||
|
}
|
||||||
|
|
||||||
KeyGeneratorType keyGeneratorTypeEnum;
|
KeyGeneratorType keyGeneratorTypeEnum;
|
||||||
try {
|
try {
|
||||||
|
|||||||
@@ -19,6 +19,7 @@
|
|||||||
package org.apache.hudi.keygen.factory;
|
package org.apache.hudi.keygen.factory;
|
||||||
|
|
||||||
import org.apache.hudi.common.config.TypedProperties;
|
import org.apache.hudi.common.config.TypedProperties;
|
||||||
|
import org.apache.hudi.common.util.StringUtils;
|
||||||
import org.apache.hudi.config.HoodieWriteConfig;
|
import org.apache.hudi.config.HoodieWriteConfig;
|
||||||
import org.apache.hudi.exception.HoodieKeyGeneratorException;
|
import org.apache.hudi.exception.HoodieKeyGeneratorException;
|
||||||
import org.apache.hudi.keygen.BuiltinKeyGenerator;
|
import org.apache.hudi.keygen.BuiltinKeyGenerator;
|
||||||
@@ -32,6 +33,9 @@ import org.apache.hudi.keygen.SimpleKeyGenerator;
|
|||||||
import org.apache.hudi.keygen.TimestampBasedKeyGenerator;
|
import org.apache.hudi.keygen.TimestampBasedKeyGenerator;
|
||||||
import org.apache.hudi.keygen.constant.KeyGeneratorType;
|
import org.apache.hudi.keygen.constant.KeyGeneratorType;
|
||||||
|
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.util.Locale;
|
import java.util.Locale;
|
||||||
import java.util.Objects;
|
import java.util.Objects;
|
||||||
@@ -44,6 +48,8 @@ import java.util.Objects;
|
|||||||
*/
|
*/
|
||||||
public class HoodieSparkKeyGeneratorFactory {
|
public class HoodieSparkKeyGeneratorFactory {
|
||||||
|
|
||||||
|
private static final Logger LOG = LoggerFactory.getLogger(HoodieSparkKeyGeneratorFactory.class);
|
||||||
|
|
||||||
public static KeyGenerator createKeyGenerator(TypedProperties props) throws IOException {
|
public static KeyGenerator createKeyGenerator(TypedProperties props) throws IOException {
|
||||||
// keyGenerator class name has higher priority
|
// keyGenerator class name has higher priority
|
||||||
KeyGenerator keyGenerator = KeyGenUtils.createKeyGeneratorByClassName(props);
|
KeyGenerator keyGenerator = KeyGenUtils.createKeyGeneratorByClassName(props);
|
||||||
@@ -54,7 +60,12 @@ public class HoodieSparkKeyGeneratorFactory {
|
|||||||
private static BuiltinKeyGenerator createKeyGeneratorByType(TypedProperties props) throws IOException {
|
private static BuiltinKeyGenerator createKeyGeneratorByType(TypedProperties props) throws IOException {
|
||||||
// Use KeyGeneratorType.SIMPLE as default keyGeneratorType
|
// Use KeyGeneratorType.SIMPLE as default keyGeneratorType
|
||||||
String keyGeneratorType =
|
String keyGeneratorType =
|
||||||
props.getString(HoodieWriteConfig.KEYGENERATOR_TYPE_PROP.key(), KeyGeneratorType.SIMPLE.name());
|
props.getString(HoodieWriteConfig.KEYGENERATOR_TYPE_PROP.key(), null);
|
||||||
|
|
||||||
|
if (StringUtils.isNullOrEmpty(keyGeneratorType)) {
|
||||||
|
LOG.info("The value of {} is empty, use SIMPLE", HoodieWriteConfig.KEYGENERATOR_TYPE_PROP.key());
|
||||||
|
keyGeneratorType = KeyGeneratorType.SIMPLE.name();
|
||||||
|
}
|
||||||
|
|
||||||
KeyGeneratorType keyGeneratorTypeEnum;
|
KeyGeneratorType keyGeneratorTypeEnum;
|
||||||
try {
|
try {
|
||||||
|
|||||||
@@ -57,10 +57,12 @@ import org.apache.hudi.config.HoodieWriteConfig;
|
|||||||
import org.apache.hudi.exception.HoodieCommitException;
|
import org.apache.hudi.exception.HoodieCommitException;
|
||||||
import org.apache.hudi.exception.HoodieException;
|
import org.apache.hudi.exception.HoodieException;
|
||||||
import org.apache.hudi.exception.HoodieIOException;
|
import org.apache.hudi.exception.HoodieIOException;
|
||||||
|
import org.apache.hudi.exception.HoodieKeyGeneratorException;
|
||||||
import org.apache.hudi.exception.HoodieMetadataException;
|
import org.apache.hudi.exception.HoodieMetadataException;
|
||||||
import org.apache.hudi.execution.SparkBoundedInMemoryExecutor;
|
import org.apache.hudi.execution.SparkBoundedInMemoryExecutor;
|
||||||
import org.apache.hudi.io.HoodieBootstrapHandle;
|
import org.apache.hudi.io.HoodieBootstrapHandle;
|
||||||
import org.apache.hudi.keygen.KeyGeneratorInterface;
|
import org.apache.hudi.keygen.KeyGeneratorInterface;
|
||||||
|
import org.apache.hudi.keygen.factory.HoodieSparkKeyGeneratorFactory;
|
||||||
import org.apache.hudi.metadata.HoodieTableMetadataWriter;
|
import org.apache.hudi.metadata.HoodieTableMetadataWriter;
|
||||||
import org.apache.hudi.metadata.SparkHoodieBackedTableMetadataWriter;
|
import org.apache.hudi.metadata.SparkHoodieBackedTableMetadataWriter;
|
||||||
import org.apache.hudi.table.HoodieSparkTable;
|
import org.apache.hudi.table.HoodieSparkTable;
|
||||||
@@ -123,8 +125,6 @@ public class SparkBootstrapCommitActionExecutor<T extends HoodieRecordPayload<T>
|
|||||||
"Ensure Bootstrap Source Path is set");
|
"Ensure Bootstrap Source Path is set");
|
||||||
ValidationUtils.checkArgument(config.getBootstrapModeSelectorClass() != null,
|
ValidationUtils.checkArgument(config.getBootstrapModeSelectorClass() != null,
|
||||||
"Ensure Bootstrap Partition Selector is set");
|
"Ensure Bootstrap Partition Selector is set");
|
||||||
ValidationUtils.checkArgument(config.getBootstrapKeyGeneratorClass() != null,
|
|
||||||
"Ensure bootstrap key generator class is set");
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
@@ -390,8 +390,14 @@ public class SparkBootstrapCommitActionExecutor<T extends HoodieRecordPayload<T>
|
|||||||
|
|
||||||
TypedProperties properties = new TypedProperties();
|
TypedProperties properties = new TypedProperties();
|
||||||
properties.putAll(config.getProps());
|
properties.putAll(config.getProps());
|
||||||
KeyGeneratorInterface keyGenerator = (KeyGeneratorInterface) ReflectionUtils.loadClass(config.getBootstrapKeyGeneratorClass(),
|
|
||||||
properties);
|
KeyGeneratorInterface keyGenerator;
|
||||||
|
try {
|
||||||
|
keyGenerator = HoodieSparkKeyGeneratorFactory.createKeyGenerator(properties);
|
||||||
|
} catch (IOException e) {
|
||||||
|
throw new HoodieKeyGeneratorException("Init keyGenerator failed ", e);
|
||||||
|
}
|
||||||
|
|
||||||
BootstrapPartitionPathTranslator translator = (BootstrapPartitionPathTranslator) ReflectionUtils.loadClass(
|
BootstrapPartitionPathTranslator translator = (BootstrapPartitionPathTranslator) ReflectionUtils.loadClass(
|
||||||
config.getBootstrapPartitionPathTranslatorClass(), properties);
|
config.getBootstrapPartitionPathTranslatorClass(), properties);
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user