diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java index 1ccdf891f..cf5ac5ce9 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java @@ -37,7 +37,7 @@ import org.apache.hudi.common.util.ReflectionUtils; import org.apache.hudi.common.util.ValidationUtils; import org.apache.hudi.execution.bulkinsert.BulkInsertSortMode; import org.apache.hudi.index.HoodieIndex; -import org.apache.hudi.keygen.SimpleAvroKeyGenerator; +import org.apache.hudi.keygen.constant.KeyGeneratorType; import org.apache.hudi.metrics.MetricsReporterType; import org.apache.hudi.metrics.datadog.DatadogHttpClient.ApiSite; import org.apache.hudi.table.action.compact.CompactionTriggerStrategy; @@ -72,8 +72,11 @@ public class HoodieWriteConfig extends DefaultHoodieConfig { public static final String PRECOMBINE_FIELD_PROP = "hoodie.datasource.write.precombine.field"; public static final String WRITE_PAYLOAD_CLASS = "hoodie.datasource.write.payload.class"; public static final String DEFAULT_WRITE_PAYLOAD_CLASS = OverwriteWithLatestAvroPayload.class.getName(); + public static final String KEYGENERATOR_CLASS_PROP = "hoodie.datasource.write.keygenerator.class"; - public static final String DEFAULT_KEYGENERATOR_CLASS = SimpleAvroKeyGenerator.class.getName(); + public static final String KEYGENERATOR_TYPE_PROP = "hoodie.datasource.write.keygenerator.type"; + public static final String DEFAULT_KEYGENERATOR_TYPE = KeyGeneratorType.SIMPLE.name(); + public static final String DEFAULT_ROLLBACK_USING_MARKERS = "false"; public static final String ROLLBACK_USING_MARKERS = "hoodie.rollback.using.markers"; public static final String TIMELINE_LAYOUT_VERSION = "hoodie.timeline.layout.version"; @@ -1370,8 +1373,8 @@ public class HoodieWriteConfig extends DefaultHoodieConfig { setDefaultOnCondition(props, !props.containsKey(ROLLBACK_PARALLELISM), ROLLBACK_PARALLELISM, DEFAULT_ROLLBACK_PARALLELISM); - setDefaultOnCondition(props, !props.containsKey(KEYGENERATOR_CLASS_PROP), - KEYGENERATOR_CLASS_PROP, DEFAULT_KEYGENERATOR_CLASS); + setDefaultOnCondition(props, !props.containsKey(KEYGENERATOR_TYPE_PROP), + KEYGENERATOR_TYPE_PROP, DEFAULT_KEYGENERATOR_TYPE); setDefaultOnCondition(props, !props.containsKey(WRITE_PAYLOAD_CLASS), WRITE_PAYLOAD_CLASS, DEFAULT_WRITE_PAYLOAD_CLASS); setDefaultOnCondition(props, !props.containsKey(ROLLBACK_USING_MARKERS), ROLLBACK_USING_MARKERS, diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/KeyGenUtils.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/KeyGenUtils.java index 04e68c8f1..385b479bd 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/KeyGenUtils.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/KeyGenUtils.java @@ -23,6 +23,8 @@ import org.apache.hudi.avro.HoodieAvroUtils; import org.apache.hudi.common.config.TypedProperties; import org.apache.hudi.common.util.PartitionPathEncodeUtils; import org.apache.hudi.common.util.ReflectionUtils; +import org.apache.hudi.common.util.StringUtils; +import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.exception.HoodieKeyException; import org.apache.hudi.exception.HoodieNotSupportedException; import org.apache.hudi.keygen.parser.AbstractHoodieDateTimeParser; @@ -152,4 +154,24 @@ public class KeyGenUtils { } }); } + + /** + * Create a key generator class via reflection, passing in any configs needed. + *

+ * This method is for user-defined classes. To create hudi's built-in key generators, please set proper + * {@link org.apache.hudi.keygen.constant.KeyGeneratorType} conf, and use the relevant factory, see + * {@link org.apache.hudi.keygen.factory.HoodieAvroKeyGeneratorFactory}. + */ + public static KeyGenerator createKeyGeneratorByClassName(TypedProperties props) throws IOException { + KeyGenerator keyGenerator = null; + String keyGeneratorClass = props.getString(HoodieWriteConfig.KEYGENERATOR_CLASS_PROP, null); + if (!StringUtils.isNullOrEmpty(keyGeneratorClass)) { + try { + keyGenerator = (KeyGenerator) ReflectionUtils.loadClass(keyGeneratorClass, props); + } catch (Throwable e) { + throw new IOException("Could not load key generator class " + keyGeneratorClass, e); + } + } + return keyGenerator; + } } \ No newline at end of file diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/constant/KeyGeneratorType.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/constant/KeyGeneratorType.java new file mode 100644 index 000000000..a37d1b341 --- /dev/null +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/constant/KeyGeneratorType.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.keygen.constant; + +/** + * Types of {@link org.apache.hudi.keygen.KeyGenerator}. + */ +public enum KeyGeneratorType { + /** + * Simple key generator, which takes names of fields to be used for recordKey and partitionPath as configs. + */ + SIMPLE, + + /** + * Complex key generator, which takes names of fields to be used for recordKey and partitionPath as configs. + */ + COMPLEX, + + /** + * Key generator, that relies on timestamps for partitioning field. Still picks record key by name. + */ + TIMESTAMP, + + /** + * This is a generic implementation type of KeyGenerator where users can configure record key as a single field or + * a combination of fields. Similarly partition path can be configured to have multiple fields or only one field. + *

+ * This KeyGenerator expects value for prop "hoodie.datasource.write.partitionpath.field" in a specific format. + * For example: + * properties.put("hoodie.datasource.write.partitionpath.field", "field1:PartitionKeyType1,field2:PartitionKeyType2"). + */ + CUSTOM, + + /** + * Simple Key generator for unpartitioned Hive Tables. + */ + NON_PARTITION, + + /** + * Key generator for deletes using global indices. + */ + GLOBAL_DELETE +} diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/factory/HoodieAvroKeyGeneratorFactory.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/factory/HoodieAvroKeyGeneratorFactory.java new file mode 100644 index 000000000..bdbf8b6fe --- /dev/null +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/factory/HoodieAvroKeyGeneratorFactory.java @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.keygen.factory; + +import org.apache.hudi.common.config.TypedProperties; +import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.exception.HoodieKeyGeneratorException; +import org.apache.hudi.keygen.ComplexAvroKeyGenerator; +import org.apache.hudi.keygen.CustomAvroKeyGenerator; +import org.apache.hudi.keygen.GlobalAvroDeleteKeyGenerator; +import org.apache.hudi.keygen.KeyGenUtils; +import org.apache.hudi.keygen.KeyGenerator; +import org.apache.hudi.keygen.NonpartitionedAvroKeyGenerator; +import org.apache.hudi.keygen.SimpleAvroKeyGenerator; +import org.apache.hudi.keygen.TimestampBasedAvroKeyGenerator; +import org.apache.hudi.keygen.constant.KeyGeneratorType; + +import java.io.IOException; +import java.util.Locale; +import java.util.Objects; + +/** + * Factory help to create {@link org.apache.hudi.keygen.KeyGenerator}. + *

+ * This factory will try {@link HoodieWriteConfig#KEYGENERATOR_CLASS_PROP} firstly, this ensures the class prop + * will not be overwritten by {@link KeyGeneratorType} + */ +public class HoodieAvroKeyGeneratorFactory { + public static KeyGenerator createKeyGenerator(TypedProperties props) throws IOException { + // keyGenerator class name has higher priority + KeyGenerator keyGenerator = KeyGenUtils.createKeyGeneratorByClassName(props); + return Objects.isNull(keyGenerator) ? createAvroKeyGeneratorByType(props) : keyGenerator; + } + + private static KeyGenerator createAvroKeyGeneratorByType(TypedProperties props) throws IOException { + // Use KeyGeneratorType.SIMPLE as default keyGeneratorType + String keyGeneratorType = + props.getString(HoodieWriteConfig.KEYGENERATOR_TYPE_PROP, KeyGeneratorType.SIMPLE.name()); + + KeyGeneratorType keyGeneratorTypeEnum; + try { + keyGeneratorTypeEnum = KeyGeneratorType.valueOf(keyGeneratorType.toUpperCase(Locale.ROOT)); + } catch (IllegalArgumentException e) { + throw new HoodieKeyGeneratorException("Unsupported keyGenerator Type " + keyGeneratorType); + } + + switch (keyGeneratorTypeEnum) { + case SIMPLE: + return new SimpleAvroKeyGenerator(props); + case COMPLEX: + return new ComplexAvroKeyGenerator(props); + case TIMESTAMP: + return new TimestampBasedAvroKeyGenerator(props); + case CUSTOM: + return new CustomAvroKeyGenerator(props); + case NON_PARTITION: + return new NonpartitionedAvroKeyGenerator(props); + case GLOBAL_DELETE: + return new GlobalAvroDeleteKeyGenerator(props); + default: + throw new HoodieKeyGeneratorException("Unsupported keyGenerator Type " + keyGeneratorType); + } + } + +} diff --git a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/keygen/factory/TestCreateAvroKeyGeneratorByTypeWithFactory.java b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/keygen/factory/TestCreateAvroKeyGeneratorByTypeWithFactory.java new file mode 100644 index 000000000..3fb5a5e11 --- /dev/null +++ b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/keygen/factory/TestCreateAvroKeyGeneratorByTypeWithFactory.java @@ -0,0 +1,100 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.keygen.factory; + +import org.apache.hudi.common.config.TypedProperties; +import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.exception.HoodieKeyGeneratorException; +import org.apache.hudi.keygen.ComplexAvroKeyGenerator; +import org.apache.hudi.keygen.CustomAvroKeyGenerator; +import org.apache.hudi.keygen.GlobalAvroDeleteKeyGenerator; +import org.apache.hudi.keygen.KeyGenerator; +import org.apache.hudi.keygen.NonpartitionedAvroKeyGenerator; +import org.apache.hudi.keygen.SimpleAvroKeyGenerator; +import org.apache.hudi.keygen.TimestampBasedAvroKeyGenerator; +import org.apache.hudi.keygen.constant.KeyGeneratorOptions; +import org.apache.hudi.keygen.constant.KeyGeneratorType; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; + +import java.io.IOException; +import java.util.stream.Stream; + +public class TestCreateAvroKeyGeneratorByTypeWithFactory { + + private TypedProperties props; + + private static Stream configParams() { + String[] types = {KeyGeneratorType.SIMPLE.name(), KeyGeneratorType.TIMESTAMP.name(), KeyGeneratorType.COMPLEX.name(), + KeyGeneratorType.CUSTOM.name(), KeyGeneratorType.NON_PARTITION.name(), KeyGeneratorType.GLOBAL_DELETE.name()}; + return Stream.of(types).map(Arguments::of); + } + + @BeforeEach + public void init() { + props = new TypedProperties(); + props.put(KeyGeneratorOptions.RECORDKEY_FIELD_OPT_KEY, "_row_key"); + props.put(KeyGeneratorOptions.HIVE_STYLE_PARTITIONING_OPT_KEY, "true"); + props.put(KeyGeneratorOptions.PARTITIONPATH_FIELD_OPT_KEY, "timestamp"); + + // for timestamp based key generator + props.put("hoodie.deltastreamer.keygen.timebased.timestamp.type", "DATE_STRING"); + props.put("hoodie.deltastreamer.keygen.timebased.input.dateformat", "yyyy-MM-dd"); + props.put("hoodie.deltastreamer.keygen.timebased.output.dateformat", "yyyyMMdd"); + } + + @AfterEach + public void teardown() { + props = null; + } + + @ParameterizedTest + @MethodSource("configParams") + public void testKeyGeneratorTypes(String keyGenType) throws IOException { + props.put(HoodieWriteConfig.KEYGENERATOR_TYPE_PROP, keyGenType); + KeyGeneratorType keyType = KeyGeneratorType.valueOf(keyGenType); + + KeyGenerator keyGenerator = HoodieAvroKeyGeneratorFactory.createKeyGenerator(props); + switch (keyType) { + case SIMPLE: + Assertions.assertEquals(SimpleAvroKeyGenerator.class.getName(), keyGenerator.getClass().getName()); + return; + case COMPLEX: + Assertions.assertEquals(ComplexAvroKeyGenerator.class.getName(), keyGenerator.getClass().getName()); + return; + case TIMESTAMP: + Assertions.assertEquals(TimestampBasedAvroKeyGenerator.class.getName(), keyGenerator.getClass().getName()); + return; + case CUSTOM: + Assertions.assertEquals(CustomAvroKeyGenerator.class.getName(), keyGenerator.getClass().getName()); + return; + case NON_PARTITION: + Assertions.assertEquals(NonpartitionedAvroKeyGenerator.class.getName(), keyGenerator.getClass().getName()); + return; + case GLOBAL_DELETE: + Assertions.assertEquals(GlobalAvroDeleteKeyGenerator.class.getName(), keyGenerator.getClass().getName()); + return; + default: + throw new HoodieKeyGeneratorException("Unsupported keyGenerator Type " + keyGenType); + } + } +} diff --git a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/keygen/factory/TestHoodieAvroKeyGeneratorFactory.java b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/keygen/factory/TestHoodieAvroKeyGeneratorFactory.java new file mode 100644 index 000000000..406f7889b --- /dev/null +++ b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/keygen/factory/TestHoodieAvroKeyGeneratorFactory.java @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.keygen.factory; + +import org.apache.hudi.common.config.TypedProperties; +import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.exception.HoodieKeyGeneratorException; +import org.apache.hudi.keygen.KeyGenerator; +import org.apache.hudi.keygen.SimpleAvroKeyGenerator; +import org.apache.hudi.keygen.constant.KeyGeneratorOptions; +import org.apache.hudi.keygen.constant.KeyGeneratorType; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +import java.io.IOException; + +import static org.junit.jupiter.api.Assertions.assertThrows; + +public class TestHoodieAvroKeyGeneratorFactory { + @Test + public void testKeyGeneratorFactory() throws IOException { + TypedProperties props = getCommonProps(); + + // set KeyGenerator type only + props.put(HoodieWriteConfig.KEYGENERATOR_TYPE_PROP, KeyGeneratorType.SIMPLE.name()); + KeyGenerator keyGenerator = HoodieAvroKeyGeneratorFactory.createKeyGenerator(props); + Assertions.assertEquals(SimpleAvroKeyGenerator.class.getName(), keyGenerator.getClass().getName()); + + // set KeyGenerator class only + props = getCommonProps(); + props.put(HoodieWriteConfig.KEYGENERATOR_CLASS_PROP, SimpleAvroKeyGenerator.class.getName()); + KeyGenerator keyGenerator2 = HoodieAvroKeyGeneratorFactory.createKeyGenerator(props); + Assertions.assertEquals(SimpleAvroKeyGenerator.class.getName(), keyGenerator2.getClass().getName()); + + // set both class name and keyGenerator type + props.put(HoodieWriteConfig.KEYGENERATOR_TYPE_PROP, KeyGeneratorType.CUSTOM.name()); + KeyGenerator keyGenerator3 = HoodieAvroKeyGeneratorFactory.createKeyGenerator(props); + // KEYGENERATOR_TYPE_PROP was overitten by KEYGENERATOR_CLASS_PROP + Assertions.assertEquals(SimpleAvroKeyGenerator.class.getName(), keyGenerator3.getClass().getName()); + + // set wrong class name + final TypedProperties props2 = getCommonProps(); + props2.put(HoodieWriteConfig.KEYGENERATOR_CLASS_PROP, TestHoodieAvroKeyGeneratorFactory.class.getName()); + assertThrows(IOException.class, () -> HoodieAvroKeyGeneratorFactory.createKeyGenerator(props2)); + + // set wrong keyGenerator type + final TypedProperties props3 = getCommonProps(); + props3.put(HoodieWriteConfig.KEYGENERATOR_TYPE_PROP, "wrong_type"); + assertThrows(HoodieKeyGeneratorException.class, () -> HoodieAvroKeyGeneratorFactory.createKeyGenerator(props3)); + } + + private TypedProperties getCommonProps() { + TypedProperties properties = new TypedProperties(); + properties.put(KeyGeneratorOptions.RECORDKEY_FIELD_OPT_KEY, "_row_key"); + properties.put(KeyGeneratorOptions.HIVE_STYLE_PARTITIONING_OPT_KEY, "true"); + properties.put(KeyGeneratorOptions.PARTITIONPATH_FIELD_OPT_KEY, "timestamp"); + return properties; + } +} diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/factory/HoodieSparkKeyGeneratorFactory.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/factory/HoodieSparkKeyGeneratorFactory.java new file mode 100644 index 000000000..401218419 --- /dev/null +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/factory/HoodieSparkKeyGeneratorFactory.java @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.keygen.factory; + +import org.apache.hudi.common.config.TypedProperties; +import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.exception.HoodieKeyGeneratorException; +import org.apache.hudi.keygen.BuiltinKeyGenerator; +import org.apache.hudi.keygen.ComplexKeyGenerator; +import org.apache.hudi.keygen.CustomKeyGenerator; +import org.apache.hudi.keygen.GlobalDeleteKeyGenerator; +import org.apache.hudi.keygen.KeyGenUtils; +import org.apache.hudi.keygen.KeyGenerator; +import org.apache.hudi.keygen.NonpartitionedKeyGenerator; +import org.apache.hudi.keygen.SimpleKeyGenerator; +import org.apache.hudi.keygen.TimestampBasedKeyGenerator; +import org.apache.hudi.keygen.constant.KeyGeneratorType; + +import java.io.IOException; +import java.util.Locale; +import java.util.Objects; + +/** + * Factory help to create {@link org.apache.hudi.keygen.KeyGenerator}. + *

+ * This factory will try {@link HoodieWriteConfig#KEYGENERATOR_CLASS_PROP} firstly, this ensures the class prop + * will not be overwritten by {@link KeyGeneratorType} + */ +public class HoodieSparkKeyGeneratorFactory { + + public static KeyGenerator createKeyGenerator(TypedProperties props) throws IOException { + // keyGenerator class name has higher priority + KeyGenerator keyGenerator = KeyGenUtils.createKeyGeneratorByClassName(props); + + return Objects.isNull(keyGenerator) ? createKeyGeneratorByType(props) : keyGenerator; + } + + private static BuiltinKeyGenerator createKeyGeneratorByType(TypedProperties props) throws IOException { + // Use KeyGeneratorType.SIMPLE as default keyGeneratorType + String keyGeneratorType = + props.getString(HoodieWriteConfig.KEYGENERATOR_TYPE_PROP, KeyGeneratorType.SIMPLE.name()); + + KeyGeneratorType keyGeneratorTypeEnum; + try { + keyGeneratorTypeEnum = KeyGeneratorType.valueOf(keyGeneratorType.toUpperCase(Locale.ROOT)); + } catch (IllegalArgumentException e) { + throw new HoodieKeyGeneratorException("Unsupported keyGenerator Type " + keyGeneratorType); + } + switch (keyGeneratorTypeEnum) { + case SIMPLE: + return new SimpleKeyGenerator(props); + case COMPLEX: + return new ComplexKeyGenerator(props); + case TIMESTAMP: + return new TimestampBasedKeyGenerator(props); + case CUSTOM: + return new CustomKeyGenerator(props); + case NON_PARTITION: + return new NonpartitionedKeyGenerator(props); + case GLOBAL_DELETE: + return new GlobalDeleteKeyGenerator(props); + default: + throw new HoodieKeyGeneratorException("Unsupported keyGenerator Type " + keyGeneratorType); + } + } + +} diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/keygen/TestCustomKeyGenerator.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/keygen/TestCustomKeyGenerator.java index dc30b932e..1e19a8a2f 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/keygen/TestCustomKeyGenerator.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/keygen/TestCustomKeyGenerator.java @@ -22,59 +22,83 @@ import org.apache.hudi.common.model.HoodieKey; import org.apache.hudi.common.config.TypedProperties; import org.apache.avro.generic.GenericRecord; +import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.keygen.constant.KeyGeneratorOptions; +import org.apache.hudi.keygen.constant.KeyGeneratorType; +import org.apache.hudi.keygen.factory.HoodieSparkKeyGeneratorFactory; import org.apache.hudi.testutils.KeyGeneratorTestUtilities; import org.apache.spark.sql.Row; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; +import java.io.IOException; + public class TestCustomKeyGenerator extends KeyGeneratorTestUtilities { - private TypedProperties getCommonProps(boolean getComplexRecordKey) { + /** + * Method to create props used for common cases. + * + * @param getComplexRecordKey Use complex record key or not + * @param useKeyGeneratorClassName Use KeyGenerator class name initialize KeyGenerator or not. + * true use {@code HoodieWriteConfig.KEYGENERATOR_CLASS_PROP}, + * false use {@code HoodieWriteConfig.KEYGENERATOR_TYPE_PROP} + * @return TypedProperties used to initialize KeyGenerator. + */ + private TypedProperties getCommonProps(boolean getComplexRecordKey, boolean useKeyGeneratorClassName) { TypedProperties properties = new TypedProperties(); if (getComplexRecordKey) { properties.put(KeyGeneratorOptions.RECORDKEY_FIELD_OPT_KEY, "_row_key, pii_col"); } else { properties.put(KeyGeneratorOptions.RECORDKEY_FIELD_OPT_KEY, "_row_key"); } + if (useKeyGeneratorClassName) { + properties.put(HoodieWriteConfig.KEYGENERATOR_CLASS_PROP, CustomKeyGenerator.class.getName()); + } else { + properties.put(HoodieWriteConfig.KEYGENERATOR_TYPE_PROP, KeyGeneratorType.CUSTOM.name()); + } properties.put(KeyGeneratorOptions.HIVE_STYLE_PARTITIONING_OPT_KEY, "true"); return properties; } - private TypedProperties getPropertiesForSimpleKeyGen() { - TypedProperties properties = getCommonProps(false); + private TypedProperties getPropertiesForSimpleKeyGen(boolean useKeyGeneratorClassName) { + TypedProperties properties = getCommonProps(false, useKeyGeneratorClassName); properties.put(KeyGeneratorOptions.PARTITIONPATH_FIELD_OPT_KEY, "timestamp:simple"); return properties; } - private TypedProperties getImproperPartitionFieldFormatProp() { - TypedProperties properties = getCommonProps(false); + private TypedProperties getImproperPartitionFieldFormatProp(boolean useKeyGeneratorClassName) { + TypedProperties properties = getCommonProps(false, useKeyGeneratorClassName); properties.put(KeyGeneratorOptions.PARTITIONPATH_FIELD_OPT_KEY, "timestamp"); return properties; } - private TypedProperties getInvalidPartitionKeyTypeProps() { - TypedProperties properties = getCommonProps(false); + private TypedProperties getInvalidPartitionKeyTypeProps(boolean useKeyGeneratorClassName) { + TypedProperties properties = getCommonProps(false, useKeyGeneratorClassName); properties.put(KeyGeneratorOptions.PARTITIONPATH_FIELD_OPT_KEY, "timestamp:dummy"); return properties; } - private TypedProperties getComplexRecordKeyWithSimplePartitionProps() { - TypedProperties properties = getCommonProps(true); + private TypedProperties getComplexRecordKeyWithSimplePartitionProps(boolean useKeyGeneratorClassName) { + TypedProperties properties = getCommonProps(true, useKeyGeneratorClassName); properties.put(KeyGeneratorOptions.PARTITIONPATH_FIELD_OPT_KEY, "timestamp:simple"); return properties; } - private TypedProperties getComplexRecordKeyAndPartitionPathProps() { - TypedProperties properties = getCommonProps(true); + private TypedProperties getComplexRecordKeyAndPartitionPathProps(boolean useKeyGeneratorClassName) { + TypedProperties properties = getCommonProps(true, useKeyGeneratorClassName); properties.put(KeyGeneratorOptions.PARTITIONPATH_FIELD_OPT_KEY, "timestamp:simple,ts_ms:timestamp"); populateNecessaryPropsForTimestampBasedKeyGen(properties); return properties; } - private TypedProperties getPropsWithoutRecordKeyFieldProps() { + private TypedProperties getPropsWithoutRecordKeyFieldProps(boolean useKeyGeneratorClassName) { TypedProperties properties = new TypedProperties(); properties.put(KeyGeneratorOptions.PARTITIONPATH_FIELD_OPT_KEY, "timestamp:simple"); + if (useKeyGeneratorClassName) { + properties.put(HoodieWriteConfig.KEYGENERATOR_CLASS_PROP, CustomKeyGenerator.class.getName()); + } else { + properties.put(HoodieWriteConfig.KEYGENERATOR_TYPE_PROP, KeyGeneratorType.CUSTOM.name()); + } return properties; } @@ -84,22 +108,32 @@ public class TestCustomKeyGenerator extends KeyGeneratorTestUtilities { properties.put("hoodie.deltastreamer.keygen.timebased.output.dateformat", "yyyyMMdd"); } - private TypedProperties getPropertiesForTimestampBasedKeyGen() { - TypedProperties properties = getCommonProps(false); + private TypedProperties getPropertiesForTimestampBasedKeyGen(boolean useKeyGeneratorClassName) { + TypedProperties properties = getCommonProps(false, useKeyGeneratorClassName); properties.put(KeyGeneratorOptions.PARTITIONPATH_FIELD_OPT_KEY, "ts_ms:timestamp"); populateNecessaryPropsForTimestampBasedKeyGen(properties); return properties; } - private TypedProperties getPropertiesForNonPartitionedKeyGen() { - TypedProperties properties = getCommonProps(false); + private TypedProperties getPropertiesForNonPartitionedKeyGen(boolean useKeyGeneratorClassName) { + TypedProperties properties = getCommonProps(false, useKeyGeneratorClassName); properties.put(KeyGeneratorOptions.PARTITIONPATH_FIELD_OPT_KEY, ""); return properties; } @Test - public void testSimpleKeyGenerator() { - BuiltinKeyGenerator keyGenerator = new CustomKeyGenerator(getPropertiesForSimpleKeyGen()); + public void testSimpleKeyGeneratorWithKeyGeneratorClass() throws IOException { + testSimpleKeyGenerator(getPropertiesForSimpleKeyGen(true)); + } + + @Test + public void testSimpleKeyGeneratorWithKeyGeneratorType() throws IOException { + testSimpleKeyGenerator(getPropertiesForSimpleKeyGen(false)); + } + + public void testSimpleKeyGenerator(TypedProperties props) throws IOException { + BuiltinKeyGenerator keyGenerator = + (BuiltinKeyGenerator) HoodieSparkKeyGeneratorFactory.createKeyGenerator(props); GenericRecord record = getRecord(); HoodieKey key = keyGenerator.getKey(record); Assertions.assertEquals(key.getRecordKey(), "key1"); @@ -110,8 +144,19 @@ public class TestCustomKeyGenerator extends KeyGeneratorTestUtilities { } @Test - public void testTimestampBasedKeyGenerator() { - BuiltinKeyGenerator keyGenerator = new CustomKeyGenerator(getPropertiesForTimestampBasedKeyGen()); + public void testTimestampBasedKeyGeneratorWithKeyGeneratorClass() throws IOException { + testTimestampBasedKeyGenerator(getPropertiesForTimestampBasedKeyGen(true)); + } + + @Test + public void testTimestampBasedKeyGeneratorWithKeyGeneratorType() throws IOException { + testTimestampBasedKeyGenerator(getPropertiesForTimestampBasedKeyGen(false)); + } + + public void testTimestampBasedKeyGenerator(TypedProperties props) throws IOException { + BuiltinKeyGenerator keyGenerator = + (BuiltinKeyGenerator) HoodieSparkKeyGeneratorFactory.createKeyGenerator(props); + GenericRecord record = getRecord(); HoodieKey key = keyGenerator.getKey(record); Assertions.assertEquals(key.getRecordKey(), "key1"); @@ -122,8 +167,19 @@ public class TestCustomKeyGenerator extends KeyGeneratorTestUtilities { } @Test - public void testNonPartitionedKeyGenerator() { - BuiltinKeyGenerator keyGenerator = new CustomKeyGenerator(getPropertiesForNonPartitionedKeyGen()); + public void testNonPartitionedKeyGeneratorWithKeyGeneratorClass() throws IOException { + testNonPartitionedKeyGenerator(getPropertiesForNonPartitionedKeyGen(true)); + } + + @Test + public void testNonPartitionedKeyGeneratorWithKeyGeneratorType() throws IOException { + testNonPartitionedKeyGenerator(getPropertiesForNonPartitionedKeyGen(false)); + } + + public void testNonPartitionedKeyGenerator(TypedProperties props) throws IOException { + BuiltinKeyGenerator keyGenerator = + (BuiltinKeyGenerator) HoodieSparkKeyGeneratorFactory.createKeyGenerator(props); + GenericRecord record = getRecord(); HoodieKey key = keyGenerator.getKey(record); Assertions.assertEquals(key.getRecordKey(), "key1"); @@ -134,9 +190,20 @@ public class TestCustomKeyGenerator extends KeyGeneratorTestUtilities { } @Test - public void testInvalidPartitionKeyType() { + public void testInvalidPartitionKeyTypeWithKeyGeneratorClass() { + testInvalidPartitionKeyType(getInvalidPartitionKeyTypeProps(true)); + } + + @Test + public void testInvalidPartitionKeyTypeWithKeyGeneratorType() { + testInvalidPartitionKeyType(getInvalidPartitionKeyTypeProps(false)); + } + + public void testInvalidPartitionKeyType(TypedProperties props) { try { - BuiltinKeyGenerator keyGenerator = new CustomKeyGenerator(getInvalidPartitionKeyTypeProps()); + BuiltinKeyGenerator keyGenerator = + (BuiltinKeyGenerator) HoodieSparkKeyGeneratorFactory.createKeyGenerator(props); + keyGenerator.getKey(getRecord()); Assertions.fail("should fail when invalid PartitionKeyType is provided!"); } catch (Exception e) { @@ -144,7 +211,9 @@ public class TestCustomKeyGenerator extends KeyGeneratorTestUtilities { } try { - BuiltinKeyGenerator keyGenerator = new CustomKeyGenerator(getInvalidPartitionKeyTypeProps()); + BuiltinKeyGenerator keyGenerator = + (BuiltinKeyGenerator) HoodieSparkKeyGeneratorFactory.createKeyGenerator(props); + GenericRecord record = getRecord(); Row row = KeyGeneratorTestUtilities.getRow(record); keyGenerator.getPartitionPath(row); @@ -155,30 +224,76 @@ public class TestCustomKeyGenerator extends KeyGeneratorTestUtilities { } @Test - public void testNoRecordKeyFieldProp() { + public void testNoRecordKeyFieldPropWithKeyGeneratorClass() { + testNoRecordKeyFieldProp(true); + } + + @Test + public void testNoRecordKeyFieldPropWithKeyGeneratorType() { + testNoRecordKeyFieldProp(false); + } + + public void testNoRecordKeyFieldProp(boolean useKeyGeneratorClassName) { + TypedProperties propsWithoutRecordKeyFieldProps = getPropsWithoutRecordKeyFieldProps(useKeyGeneratorClassName); try { - BuiltinKeyGenerator keyGenerator = new CustomKeyGenerator(getPropsWithoutRecordKeyFieldProps()); + BuiltinKeyGenerator keyGenerator = + (BuiltinKeyGenerator) HoodieSparkKeyGeneratorFactory.createKeyGenerator(propsWithoutRecordKeyFieldProps); + keyGenerator.getKey(getRecord()); Assertions.fail("should fail when record key field is not provided!"); } catch (Exception e) { - Assertions.assertTrue(e.getMessage().contains("Property hoodie.datasource.write.recordkey.field not found")); + if (useKeyGeneratorClassName) { + // "Property hoodie.datasource.write.recordkey.field not found" exception cause CustomKeyGenerator init fail + Assertions.assertTrue(e + .getCause() + .getCause() + .getCause() + .getMessage() + .contains("Property hoodie.datasource.write.recordkey.field not found")); + } else { + Assertions.assertTrue(e.getMessage().contains("Property hoodie.datasource.write.recordkey.field not found")); + } + } try { - BuiltinKeyGenerator keyGenerator = new CustomKeyGenerator(getPropsWithoutRecordKeyFieldProps()); + BuiltinKeyGenerator keyGenerator = + (BuiltinKeyGenerator) HoodieSparkKeyGeneratorFactory.createKeyGenerator(propsWithoutRecordKeyFieldProps); + GenericRecord record = getRecord(); Row row = KeyGeneratorTestUtilities.getRow(record); keyGenerator.getRecordKey(row); Assertions.fail("should fail when record key field is not provided!"); } catch (Exception e) { - Assertions.assertTrue(e.getMessage().contains("Property hoodie.datasource.write.recordkey.field not found")); + if (useKeyGeneratorClassName) { + // "Property hoodie.datasource.write.recordkey.field not found" exception cause CustomKeyGenerator init fail + Assertions.assertTrue(e + .getCause() + .getCause() + .getCause() + .getMessage() + .contains("Property hoodie.datasource.write.recordkey.field not found")); + } else { + Assertions.assertTrue(e.getMessage().contains("Property hoodie.datasource.write.recordkey.field not found")); + } } } @Test - public void testPartitionFieldsInImproperFormat() { + public void testPartitionFieldsInImproperFormatWithKeyGeneratorClass() { + testPartitionFieldsInImproperFormat(getImproperPartitionFieldFormatProp(true)); + } + + @Test + public void testPartitionFieldsInImproperFormatWithKeyGeneratorType() { + testPartitionFieldsInImproperFormat(getImproperPartitionFieldFormatProp(false)); + } + + public void testPartitionFieldsInImproperFormat(TypedProperties props) { try { - BuiltinKeyGenerator keyGenerator = new CustomKeyGenerator(getImproperPartitionFieldFormatProp()); + BuiltinKeyGenerator keyGenerator = + (BuiltinKeyGenerator) HoodieSparkKeyGeneratorFactory.createKeyGenerator(props); + keyGenerator.getKey(getRecord()); Assertions.fail("should fail when partition key field is provided in improper format!"); } catch (Exception e) { @@ -186,7 +301,9 @@ public class TestCustomKeyGenerator extends KeyGeneratorTestUtilities { } try { - BuiltinKeyGenerator keyGenerator = new CustomKeyGenerator(getImproperPartitionFieldFormatProp()); + BuiltinKeyGenerator keyGenerator = + (BuiltinKeyGenerator) HoodieSparkKeyGeneratorFactory.createKeyGenerator(props); + GenericRecord record = getRecord(); Row row = KeyGeneratorTestUtilities.getRow(record); keyGenerator.getPartitionPath(row); @@ -197,8 +314,19 @@ public class TestCustomKeyGenerator extends KeyGeneratorTestUtilities { } @Test - public void testComplexRecordKeyWithSimplePartitionPath() { - BuiltinKeyGenerator keyGenerator = new CustomKeyGenerator(getComplexRecordKeyWithSimplePartitionProps()); + public void testComplexRecordKeyWithSimplePartitionPathWithKeyGeneratorClass() throws IOException { + testComplexRecordKeyWithSimplePartitionPath(getComplexRecordKeyWithSimplePartitionProps(true)); + } + + @Test + public void testComplexRecordKeyWithSimplePartitionPathWithKeyGeneratorType() throws IOException { + testComplexRecordKeyWithSimplePartitionPath(getComplexRecordKeyWithSimplePartitionProps(false)); + } + + public void testComplexRecordKeyWithSimplePartitionPath(TypedProperties props) throws IOException { + BuiltinKeyGenerator keyGenerator = + (BuiltinKeyGenerator) HoodieSparkKeyGeneratorFactory.createKeyGenerator(props); + GenericRecord record = getRecord(); HoodieKey key = keyGenerator.getKey(record); Assertions.assertEquals(key.getRecordKey(), "_row_key:key1,pii_col:pi"); @@ -210,8 +338,19 @@ public class TestCustomKeyGenerator extends KeyGeneratorTestUtilities { } @Test - public void testComplexRecordKeysWithComplexPartitionPath() { - BuiltinKeyGenerator keyGenerator = new CustomKeyGenerator(getComplexRecordKeyAndPartitionPathProps()); + public void testComplexRecordKeysWithComplexPartitionPathWithKeyGeneratorClass() throws IOException { + testComplexRecordKeysWithComplexPartitionPath(getComplexRecordKeyAndPartitionPathProps(true)); + } + + @Test + public void testComplexRecordKeysWithComplexPartitionPathWithKeyGeneratorType() throws IOException { + testComplexRecordKeysWithComplexPartitionPath(getComplexRecordKeyAndPartitionPathProps(false)); + } + + public void testComplexRecordKeysWithComplexPartitionPath(TypedProperties props) throws IOException { + BuiltinKeyGenerator keyGenerator = + (BuiltinKeyGenerator) HoodieSparkKeyGeneratorFactory.createKeyGenerator(props); + GenericRecord record = getRecord(); HoodieKey key = keyGenerator.getKey(record); Assertions.assertEquals(key.getRecordKey(), "_row_key:key1,pii_col:pi"); diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/keygen/factory/TestCreateKeyGeneratorByTypeWithFactory.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/keygen/factory/TestCreateKeyGeneratorByTypeWithFactory.java new file mode 100644 index 000000000..dac3d11d1 --- /dev/null +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/keygen/factory/TestCreateKeyGeneratorByTypeWithFactory.java @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.keygen.factory; + +import org.apache.hudi.common.config.TypedProperties; +import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.exception.HoodieKeyGeneratorException; +import org.apache.hudi.keygen.ComplexKeyGenerator; +import org.apache.hudi.keygen.CustomKeyGenerator; +import org.apache.hudi.keygen.GlobalDeleteKeyGenerator; +import org.apache.hudi.keygen.KeyGenerator; +import org.apache.hudi.keygen.NonpartitionedKeyGenerator; +import org.apache.hudi.keygen.SimpleKeyGenerator; +import org.apache.hudi.keygen.TimestampBasedKeyGenerator; +import org.apache.hudi.keygen.constant.KeyGeneratorOptions; +import org.apache.hudi.keygen.constant.KeyGeneratorType; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; + +import java.io.IOException; +import java.util.stream.Stream; + +public class TestCreateKeyGeneratorByTypeWithFactory { + + private TypedProperties props; + + private static Stream configParams() { + String[] types = {KeyGeneratorType.SIMPLE.name(), KeyGeneratorType.TIMESTAMP.name(), KeyGeneratorType.COMPLEX.name(), + KeyGeneratorType.CUSTOM.name(), KeyGeneratorType.NON_PARTITION.name(), KeyGeneratorType.GLOBAL_DELETE.name()}; + return Stream.of(types).map(Arguments::of); + } + + @BeforeEach + public void init() { + props = new TypedProperties(); + props.put(KeyGeneratorOptions.RECORDKEY_FIELD_OPT_KEY, "_row_key"); + props.put(KeyGeneratorOptions.HIVE_STYLE_PARTITIONING_OPT_KEY, "true"); + props.put(KeyGeneratorOptions.PARTITIONPATH_FIELD_OPT_KEY, "timestamp"); + + // for timestamp based key generator + props.put("hoodie.deltastreamer.keygen.timebased.timestamp.type", "DATE_STRING"); + props.put("hoodie.deltastreamer.keygen.timebased.input.dateformat", "yyyy-MM-dd"); + props.put("hoodie.deltastreamer.keygen.timebased.output.dateformat", "yyyyMMdd"); + } + + @AfterEach + public void teardown() { + props = null; + } + + @ParameterizedTest + @MethodSource("configParams") + public void testKeyGeneratorTypes(String keyGenType) throws IOException { + props.put(HoodieWriteConfig.KEYGENERATOR_TYPE_PROP, keyGenType); + KeyGeneratorType keyType = KeyGeneratorType.valueOf(keyGenType); + + KeyGenerator keyGenerator = HoodieSparkKeyGeneratorFactory.createKeyGenerator(props); + switch (keyType) { + case SIMPLE: + Assertions.assertEquals(SimpleKeyGenerator.class.getName(), keyGenerator.getClass().getName()); + return; + case COMPLEX: + Assertions.assertEquals(ComplexKeyGenerator.class.getName(), keyGenerator.getClass().getName()); + return; + case TIMESTAMP: + Assertions.assertEquals(TimestampBasedKeyGenerator.class.getName(), keyGenerator.getClass().getName()); + return; + case CUSTOM: + Assertions.assertEquals(CustomKeyGenerator.class.getName(), keyGenerator.getClass().getName()); + return; + case NON_PARTITION: + Assertions.assertEquals(NonpartitionedKeyGenerator.class.getName(), keyGenerator.getClass().getName()); + return; + case GLOBAL_DELETE: + Assertions.assertEquals(GlobalDeleteKeyGenerator.class.getName(), keyGenerator.getClass().getName()); + return; + default: + throw new HoodieKeyGeneratorException("Unsupported keyGenerator Type " + keyGenType); + } + } +} diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/keygen/factory/TestHoodieSparkKeyGeneratorFactory.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/keygen/factory/TestHoodieSparkKeyGeneratorFactory.java new file mode 100644 index 000000000..bd868ff8f --- /dev/null +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/keygen/factory/TestHoodieSparkKeyGeneratorFactory.java @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.keygen.factory; + +import org.apache.hudi.common.config.TypedProperties; +import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.exception.HoodieKeyGeneratorException; +import org.apache.hudi.keygen.KeyGenerator; +import org.apache.hudi.keygen.SimpleKeyGenerator; +import org.apache.hudi.keygen.TestComplexKeyGenerator; +import org.apache.hudi.keygen.constant.KeyGeneratorOptions; + +import org.apache.hudi.keygen.constant.KeyGeneratorType; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +import java.io.IOException; + +import static org.junit.jupiter.api.Assertions.assertThrows; + +/** + * This class assist test KeyGenerator configuration(class name and type) priority. + *

+ * The functional test of KeyGenerator is left to other unit tests. {@link TestComplexKeyGenerator etc.}. + */ +public class TestHoodieSparkKeyGeneratorFactory { + @Test + public void testKeyGeneratorFactory() throws IOException { + TypedProperties props = getCommonProps(); + + // set KeyGenerator type only + props.put(HoodieWriteConfig.KEYGENERATOR_TYPE_PROP, KeyGeneratorType.SIMPLE.name()); + KeyGenerator keyGenerator = HoodieSparkKeyGeneratorFactory.createKeyGenerator(props); + Assertions.assertEquals(SimpleKeyGenerator.class.getName(), keyGenerator.getClass().getName()); + + // set KeyGenerator class only + props = getCommonProps(); + props.put(HoodieWriteConfig.KEYGENERATOR_CLASS_PROP, SimpleKeyGenerator.class.getName()); + KeyGenerator keyGenerator2 = HoodieSparkKeyGeneratorFactory.createKeyGenerator(props); + Assertions.assertEquals(SimpleKeyGenerator.class.getName(), keyGenerator2.getClass().getName()); + + // set both class name and keyGenerator type + props.put(HoodieWriteConfig.KEYGENERATOR_TYPE_PROP, KeyGeneratorType.CUSTOM.name()); + KeyGenerator keyGenerator3 = HoodieSparkKeyGeneratorFactory.createKeyGenerator(props); + // KEYGENERATOR_TYPE_PROP was overitten by KEYGENERATOR_CLASS_PROP + Assertions.assertEquals(SimpleKeyGenerator.class.getName(), keyGenerator3.getClass().getName()); + + // set wrong class name + final TypedProperties props2 = getCommonProps(); + props2.put(HoodieWriteConfig.KEYGENERATOR_CLASS_PROP, TestHoodieSparkKeyGeneratorFactory.class.getName()); + assertThrows(IOException.class, () -> HoodieSparkKeyGeneratorFactory.createKeyGenerator(props2)); + + // set wrong keyGenerator type + final TypedProperties props3 = getCommonProps(); + props3.put(HoodieWriteConfig.KEYGENERATOR_TYPE_PROP, "wrong_type"); + assertThrows(HoodieKeyGeneratorException.class, () -> HoodieSparkKeyGeneratorFactory.createKeyGenerator(props3)); + } + + private TypedProperties getCommonProps() { + TypedProperties properties = new TypedProperties(); + properties.put(KeyGeneratorOptions.RECORDKEY_FIELD_OPT_KEY, "_row_key"); + properties.put(KeyGeneratorOptions.HIVE_STYLE_PARTITIONING_OPT_KEY, "true"); + properties.put(KeyGeneratorOptions.PARTITIONPATH_FIELD_OPT_KEY, "timestamp"); + return properties; + } +} diff --git a/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java b/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java index 57fd5014b..d407697ad 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java +++ b/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java @@ -23,8 +23,8 @@ import org.apache.hudi.common.model.OverwriteWithLatestAvroPayload; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.exception.HoodieException; import org.apache.hudi.hive.SlashEncodedDayPartitionValueExtractor; -import org.apache.hudi.keygen.SimpleAvroKeyGenerator; import org.apache.hudi.keygen.constant.KeyGeneratorOptions; +import org.apache.hudi.keygen.constant.KeyGeneratorType; import org.apache.hudi.streamer.FlinkStreamerConfig; import org.apache.hudi.util.StreamerUtil; @@ -271,9 +271,15 @@ public class FlinkOptions { public static final ConfigOption KEYGEN_CLASS = ConfigOptions .key(HoodieWriteConfig.KEYGENERATOR_CLASS_PROP) .stringType() - .defaultValue(SimpleAvroKeyGenerator.class.getName()) + .defaultValue("") .withDescription("Key generator class, that implements will extract the key out of incoming record"); + public static final ConfigOption KEYGEN_TYPE = ConfigOptions + .key(HoodieWriteConfig.KEYGENERATOR_TYPE_PROP) + .stringType() + .defaultValue(KeyGeneratorType.SIMPLE.name()) + .withDescription("Key generator type, that implements will extract the key out of incoming record"); + public static final ConfigOption WRITE_TASKS = ConfigOptions .key("write.tasks") .intType() @@ -539,6 +545,8 @@ public class FlinkOptions { conf.setBoolean(FlinkOptions.IGNORE_FAILED, config.commitOnErrors); conf.setString(FlinkOptions.RECORD_KEY_FIELD, config.recordKeyField); conf.setString(FlinkOptions.PARTITION_PATH_FIELD, config.partitionPathField); + // keygenClass has higher priority than keygenType + conf.setString(FlinkOptions.KEYGEN_TYPE, config.keygenType); conf.setString(FlinkOptions.KEYGEN_CLASS, config.keygenClass); conf.setInteger(FlinkOptions.WRITE_TASKS, config.writeTaskNum); diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/transform/RowDataToHoodieFunction.java b/hudi-flink/src/main/java/org/apache/hudi/sink/transform/RowDataToHoodieFunction.java index fcf77dbab..505a8e2f1 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/sink/transform/RowDataToHoodieFunction.java +++ b/hudi-flink/src/main/java/org/apache/hudi/sink/transform/RowDataToHoodieFunction.java @@ -23,6 +23,7 @@ import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieRecordPayload; import org.apache.hudi.configuration.FlinkOptions; import org.apache.hudi.keygen.KeyGenerator; +import org.apache.hudi.keygen.factory.HoodieAvroKeyGeneratorFactory; import org.apache.hudi.sink.utils.PayloadCreation; import org.apache.hudi.util.RowDataToAvroConverters; import org.apache.hudi.util.StreamerUtil; @@ -37,6 +38,8 @@ import org.apache.flink.types.RowKind; import java.io.IOException; +import static org.apache.hudi.util.StreamerUtil.flinkConf2TypedProperties; + /** * Function that transforms RowData to HoodieRecord. */ @@ -82,7 +85,9 @@ public class RowDataToHoodieFunction - * If the class name of key generator is configured through the properties file, i.e., {@code props}, use the corresponding key generator class; otherwise, use the default key generator class - * specified in {@code DataSourceWriteOptions}. - */ - public static KeyGenerator createKeyGenerator(TypedProperties props) throws IOException { - String keyGeneratorClass = props.getString("hoodie.datasource.write.keygenerator.class", - SimpleAvroKeyGenerator.class.getName()); - try { - return (KeyGenerator) ReflectionUtils.loadClass(keyGeneratorClass, props); - } catch (Throwable e) { - throw new IOException("Could not load key generator class " + keyGeneratorClass, e); - } - } - - /** - * Create a key generator class via reflection, passing in any configs needed. - *

- * If the class name of key generator is configured through the properties file, i.e., {@code props}, use the corresponding key generator class; otherwise, use the default key generator class - * specified in {@link FlinkOptions}. - */ - public static KeyGenerator createKeyGenerator(Configuration conf) throws IOException { - String keyGeneratorClass = conf.getString(FlinkOptions.KEYGEN_CLASS); - try { - return (KeyGenerator) ReflectionUtils.loadClass(keyGeneratorClass, flinkConf2TypedProperties(conf)); - } catch (Throwable e) { - throw new IOException("Could not load key generator class " + keyGeneratorClass, e); - } - } - /** * Create a payload class via reflection, passing in an ordering/precombine value. */ diff --git a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/HoodieTestSuiteJob.java b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/HoodieTestSuiteJob.java index c69607ca8..8d395f282 100644 --- a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/HoodieTestSuiteJob.java +++ b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/HoodieTestSuiteJob.java @@ -19,7 +19,6 @@ package org.apache.hudi.integ.testsuite; import org.apache.avro.Schema; -import org.apache.hudi.DataSourceUtils; import org.apache.hudi.common.config.TypedProperties; import org.apache.hudi.common.fs.FSUtils; import org.apache.hudi.common.model.HoodieCommitMetadata; @@ -43,6 +42,7 @@ import org.apache.hudi.integ.testsuite.helpers.ZookeeperServiceProvider; import org.apache.hudi.integ.testsuite.reader.DeltaInputType; import org.apache.hudi.integ.testsuite.writer.DeltaOutputMode; import org.apache.hudi.keygen.BuiltinKeyGenerator; +import org.apache.hudi.keygen.factory.HoodieSparkKeyGeneratorFactory; import org.apache.hudi.utilities.UtilHelpers; import org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer; @@ -106,7 +106,7 @@ public class HoodieTestSuiteJob { this.props = UtilHelpers.readConfig(fs, new Path(cfg.propsFilePath), cfg.configs).getConfig(); log.info("Creating workload generator with configs : {}", props.toString()); this.hiveConf = getDefaultHiveConf(jsc.hadoopConfiguration()); - this.keyGenerator = (BuiltinKeyGenerator) DataSourceUtils.createKeyGenerator(props); + this.keyGenerator = (BuiltinKeyGenerator) HoodieSparkKeyGeneratorFactory.createKeyGenerator(props); metaClient = HoodieTableMetaClient.withPropertyBuilder() .setTableType(cfg.tableType) diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/DataSourceUtils.java b/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/DataSourceUtils.java index 6a5113c18..bbe786578 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/DataSourceUtils.java +++ b/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/DataSourceUtils.java @@ -44,7 +44,6 @@ import org.apache.hudi.exception.TableNotFoundException; import org.apache.hudi.hive.HiveSyncConfig; import org.apache.hudi.hive.SlashEncodedDayPartitionValueExtractor; import org.apache.hudi.index.HoodieIndex.IndexType; -import org.apache.hudi.keygen.KeyGenerator; import org.apache.hudi.table.BulkInsertPartitioner; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; @@ -80,22 +79,6 @@ public class DataSourceUtils { throw new TableNotFoundException("Unable to find a hudi table for the user provided paths."); } - /** - * Create a key generator class via reflection, passing in any configs needed. - *

- * If the class name of key generator is configured through the properties file, i.e., {@code props}, use the corresponding key generator class; otherwise, use the default key generator class - * specified in {@code DataSourceWriteOptions}. - */ - public static KeyGenerator createKeyGenerator(TypedProperties props) throws IOException { - String keyGeneratorClass = props.getString(DataSourceWriteOptions.KEYGENERATOR_CLASS_OPT_KEY(), - DataSourceWriteOptions.DEFAULT_KEYGENERATOR_CLASS_OPT_VAL()); - try { - return (KeyGenerator) ReflectionUtils.loadClass(keyGeneratorClass, props); - } catch (Throwable e) { - throw new IOException("Could not load key generator class " + keyGeneratorClass, e); - } - } - /** * Create a UserDefinedBulkInsertPartitioner class via reflection, *
diff --git a/hudi-spark-datasource/hudi-spark/src/main/java/org/apache/hudi/bootstrap/SparkParquetBootstrapDataProvider.java b/hudi-spark-datasource/hudi-spark/src/main/java/org/apache/hudi/bootstrap/SparkParquetBootstrapDataProvider.java index 6c5eb0ed5..f4483d830 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/java/org/apache/hudi/bootstrap/SparkParquetBootstrapDataProvider.java +++ b/hudi-spark-datasource/hudi-spark/src/main/java/org/apache/hudi/bootstrap/SparkParquetBootstrapDataProvider.java @@ -32,6 +32,7 @@ import org.apache.hudi.exception.HoodieIOException; import org.apache.hudi.keygen.KeyGenerator; import org.apache.avro.generic.GenericRecord; +import org.apache.hudi.keygen.factory.HoodieSparkKeyGeneratorFactory; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.rdd.RDD; import org.apache.spark.sql.Dataset; @@ -62,7 +63,7 @@ public class SparkParquetBootstrapDataProvider extends FullRecordBootstrapDataPr Dataset inputDataset = sparkSession.read().parquet(filePaths); try { - KeyGenerator keyGenerator = DataSourceUtils.createKeyGenerator(props); + KeyGenerator keyGenerator = HoodieSparkKeyGeneratorFactory.createKeyGenerator(props); String structName = tableName + "_record"; String namespace = "hoodie." + tableName; RDD genericRecords = HoodieSparkUtils.createRdd(inputDataset, structName, namespace); diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala index c3205480b..794ea6f98 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala @@ -20,7 +20,6 @@ package org.apache.hudi import java.util import java.util.Properties -import org.apache.avro.Schema import org.apache.avro.generic.GenericRecord import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{FileSystem, Path} @@ -54,7 +53,8 @@ import org.apache.spark.sql.{DataFrame, SQLContext, SaveMode, SparkSession} import scala.collection.JavaConversions._ import scala.collection.mutable.ListBuffer -import org.apache.hudi.common.table.HoodieTableConfig.{DEFAULT_ARCHIVELOG_FOLDER, HOODIE_ARCHIVELOG_FOLDER_PROP_NAME} +import org.apache.hudi.common.table.HoodieTableConfig.{DEFAULT_ARCHIVELOG_FOLDER} +import org.apache.hudi.keygen.factory.HoodieSparkKeyGeneratorFactory object HoodieSparkSqlWriter { @@ -106,7 +106,7 @@ object HoodieSparkSqlWriter { val fs = basePath.getFileSystem(sparkContext.hadoopConfiguration) tableExists = fs.exists(new Path(basePath, HoodieTableMetaClient.METAFOLDER_NAME)) var tableConfig = getHoodieTableConfig(sparkContext, path.get, hoodieTableConfigOpt) - val keyGenerator = DataSourceUtils.createKeyGenerator(toProperties(parameters)) + val keyGenerator = HoodieSparkKeyGeneratorFactory.createKeyGenerator(toProperties(parameters)) if (mode == SaveMode.Ignore && tableExists) { log.warn(s"hoodie table at $basePath already exists. Ignoring & not performing actual writes.") diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieWriterUtils.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieWriterUtils.scala index fd3e07857..c23e59d82 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieWriterUtils.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieWriterUtils.scala @@ -27,6 +27,7 @@ import org.apache.hudi.common.config.HoodieMetadataConfig.DEFAULT_METADATA_ENABL import org.apache.hudi.common.config.HoodieMetadataConfig.DEFAULT_METADATA_VALIDATE import org.apache.hudi.common.config.HoodieMetadataConfig.METADATA_ENABLE_PROP import org.apache.hudi.common.config.HoodieMetadataConfig.METADATA_VALIDATE_PROP +import org.apache.hudi.keygen.factory.HoodieSparkKeyGeneratorFactory import org.apache.hudi.keygen.{BaseKeyGenerator, CustomAvroKeyGenerator, CustomKeyGenerator, KeyGenerator} /** @@ -91,7 +92,7 @@ object HoodieWriterUtils { def getPartitionColumns(parameters: Map[String, String]): String = { val props = new TypedProperties() props.putAll(parameters.asJava) - val keyGen = DataSourceUtils.createKeyGenerator(props) + val keyGen = HoodieSparkKeyGeneratorFactory.createKeyGenerator(props) getPartitionColumns(keyGen) } diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java index fbe1c14e9..c60449a79 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java @@ -47,6 +47,7 @@ import org.apache.hudi.exception.HoodieException; import org.apache.hudi.hive.HiveSyncConfig; import org.apache.hudi.hive.HiveSyncTool; import org.apache.hudi.keygen.KeyGenerator; +import org.apache.hudi.keygen.factory.HoodieSparkKeyGeneratorFactory; import org.apache.hudi.sync.common.AbstractSyncTool; import org.apache.hudi.utilities.UtilHelpers; import org.apache.hudi.exception.HoodieDeltaStreamerException; @@ -208,7 +209,7 @@ public class DeltaSync implements Serializable { registerAvroSchemas(schemaProvider); this.transformer = UtilHelpers.createTransformer(cfg.transformerClassNames); - this.keyGenerator = DataSourceUtils.createKeyGenerator(props); + this.keyGenerator = HoodieSparkKeyGeneratorFactory.createKeyGenerator(props); this.metrics = new HoodieDeltaStreamerMetrics(getHoodieClientConfig(this.schemaProvider));