From c7f1a781ab4ff3784d53a102364fd85e379811d1 Mon Sep 17 00:00:00 2001 From: Pratyaksh Sharma Date: Thu, 9 Jul 2020 17:05:07 +0530 Subject: [PATCH] [HUDI-728]: Implemented custom key generator (#1433) --- .../HoodieDeltaStreamerException.java | 4 +- .../hudi/keygen/ComplexKeyGenerator.java | 56 +++--- .../hudi/keygen/CustomKeyGenerator.java | 126 +++++++++++++ .../hudi/keygen/GlobalDeleteKeyGenerator.java | 13 +- .../keygen/NonpartitionedKeyGenerator.java | 6 +- .../hudi/keygen/SimpleKeyGenerator.java | 23 ++- .../keygen/TimestampBasedKeyGenerator.java | 25 ++- .../hudi/keygen/TestComplexKeyGenerator.java | 88 +++++++++ .../hudi/keygen/TestCustomKeyGenerator.java | 169 ++++++++++++++++++ .../keygen/TestGlobalDeleteKeyGenerator.java | 78 ++++++++ .../keygen/TestKeyGeneratorUtilities.java | 40 +++++ .../hudi/keygen/TestSimpleKeyGenerator.java | 97 ++++++++++ .../TestTimestampBasedKeyGenerator.java | 7 +- hudi-utilities/pom.xml | 8 + .../utilities/deltastreamer/DeltaSync.java | 2 +- 15 files changed, 675 insertions(+), 67 deletions(-) rename {hudi-utilities/src/main/java/org/apache/hudi/utilities => hudi-spark/src/main/java/org/apache/hudi}/exception/HoodieDeltaStreamerException.java (91%) create mode 100644 hudi-spark/src/main/java/org/apache/hudi/keygen/CustomKeyGenerator.java rename {hudi-utilities/src/main/java/org/apache/hudi/utilities => hudi-spark/src/main/java/org/apache/hudi}/keygen/TimestampBasedKeyGenerator.java (87%) create mode 100644 hudi-spark/src/test/java/org/apache/hudi/keygen/TestComplexKeyGenerator.java create mode 100644 hudi-spark/src/test/java/org/apache/hudi/keygen/TestCustomKeyGenerator.java create mode 100644 hudi-spark/src/test/java/org/apache/hudi/keygen/TestGlobalDeleteKeyGenerator.java create mode 100644 hudi-spark/src/test/java/org/apache/hudi/keygen/TestKeyGeneratorUtilities.java create mode 100644 hudi-spark/src/test/java/org/apache/hudi/keygen/TestSimpleKeyGenerator.java rename {hudi-utilities/src/test/java/org/apache/hudi/utilities => hudi-spark/src/test/java/org/apache/hudi}/keygen/TestTimestampBasedKeyGenerator.java (97%) diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/exception/HoodieDeltaStreamerException.java b/hudi-spark/src/main/java/org/apache/hudi/exception/HoodieDeltaStreamerException.java similarity index 91% rename from hudi-utilities/src/main/java/org/apache/hudi/utilities/exception/HoodieDeltaStreamerException.java rename to hudi-spark/src/main/java/org/apache/hudi/exception/HoodieDeltaStreamerException.java index 0c7165b72..e939d62e3 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/exception/HoodieDeltaStreamerException.java +++ b/hudi-spark/src/main/java/org/apache/hudi/exception/HoodieDeltaStreamerException.java @@ -16,9 +16,7 @@ * limitations under the License. */ -package org.apache.hudi.utilities.exception; - -import org.apache.hudi.exception.HoodieException; +package org.apache.hudi.exception; public class HoodieDeltaStreamerException extends HoodieException { diff --git a/hudi-spark/src/main/java/org/apache/hudi/keygen/ComplexKeyGenerator.java b/hudi-spark/src/main/java/org/apache/hudi/keygen/ComplexKeyGenerator.java index 9c31286be..b3ab3d065 100644 --- a/hudi-spark/src/main/java/org/apache/hudi/keygen/ComplexKeyGenerator.java +++ b/hudi-spark/src/main/java/org/apache/hudi/keygen/ComplexKeyGenerator.java @@ -49,21 +49,39 @@ public class ComplexKeyGenerator extends KeyGenerator { public ComplexKeyGenerator(TypedProperties props) { super(props); - this.recordKeyFields = Arrays.asList(props.getString(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY()).split(",")) - .stream().map(String::trim).collect(Collectors.toList()); + this.recordKeyFields = Arrays.stream(props.getString(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY()).split(",")).map(String::trim).collect(Collectors.toList()); this.partitionPathFields = - Arrays.asList(props.getString(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY()).split(",")) - .stream().map(String::trim).collect(Collectors.toList()); + Arrays.stream(props.getString(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY()).split(",")).map(String::trim).collect(Collectors.toList()); this.hiveStylePartitioning = props.getBoolean(DataSourceWriteOptions.HIVE_STYLE_PARTITIONING_OPT_KEY(), Boolean.parseBoolean(DataSourceWriteOptions.DEFAULT_HIVE_STYLE_PARTITIONING_OPT_VAL())); } @Override public HoodieKey getKey(GenericRecord record) { - if (recordKeyFields == null || partitionPathFields == null) { - throw new HoodieKeyException("Unable to find field names for record key or partition path in cfg"); + String recordKey = getRecordKey(record); + StringBuilder partitionPath = new StringBuilder(); + for (String partitionPathField : partitionPathFields) { + partitionPath.append(getPartitionPath(record, partitionPathField)); + partitionPath.append(DEFAULT_PARTITION_PATH_SEPARATOR); } + partitionPath.deleteCharAt(partitionPath.length() - 1); + return new HoodieKey(recordKey, partitionPath.toString()); + } + + String getPartitionPath(GenericRecord record, String partitionPathField) { + StringBuilder partitionPath = new StringBuilder(); + String fieldVal = DataSourceUtils.getNestedFieldValAsString(record, partitionPathField, true); + if (fieldVal == null || fieldVal.isEmpty()) { + partitionPath.append(hiveStylePartitioning ? partitionPathField + "=" + DEFAULT_PARTITION_PATH + : DEFAULT_PARTITION_PATH); + } else { + partitionPath.append(hiveStylePartitioning ? partitionPathField + "=" + fieldVal : fieldVal); + } + return partitionPath.toString(); + } + + String getRecordKey(GenericRecord record) { boolean keyIsNullEmpty = true; StringBuilder recordKey = new StringBuilder(); for (String recordKeyField : recordKeyFields) { @@ -80,30 +98,8 @@ public class ComplexKeyGenerator extends KeyGenerator { recordKey.deleteCharAt(recordKey.length() - 1); if (keyIsNullEmpty) { throw new HoodieKeyException("recordKey values: \"" + recordKey + "\" for fields: " - + recordKeyFields.toString() + " cannot be entirely null or empty."); + + recordKeyFields.toString() + " cannot be entirely null or empty."); } - - StringBuilder partitionPath = new StringBuilder(); - for (String partitionPathField : partitionPathFields) { - String fieldVal = DataSourceUtils.getNestedFieldValAsString(record, partitionPathField, true); - if (fieldVal == null || fieldVal.isEmpty()) { - partitionPath.append(hiveStylePartitioning ? partitionPathField + "=" + DEFAULT_PARTITION_PATH - : DEFAULT_PARTITION_PATH); - } else { - partitionPath.append(hiveStylePartitioning ? partitionPathField + "=" + fieldVal : fieldVal); - } - partitionPath.append(DEFAULT_PARTITION_PATH_SEPARATOR); - } - partitionPath.deleteCharAt(partitionPath.length() - 1); - - return new HoodieKey(recordKey.toString(), partitionPath.toString()); - } - - public List getRecordKeyFields() { - return recordKeyFields; - } - - public List getPartitionPathFields() { - return partitionPathFields; + return recordKey.toString(); } } diff --git a/hudi-spark/src/main/java/org/apache/hudi/keygen/CustomKeyGenerator.java b/hudi-spark/src/main/java/org/apache/hudi/keygen/CustomKeyGenerator.java new file mode 100644 index 000000000..be2d1ef9c --- /dev/null +++ b/hudi-spark/src/main/java/org/apache/hudi/keygen/CustomKeyGenerator.java @@ -0,0 +1,126 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.keygen; + +import org.apache.hudi.DataSourceWriteOptions; +import org.apache.hudi.common.model.HoodieKey; +import org.apache.hudi.common.config.TypedProperties; + +import org.apache.avro.generic.GenericRecord; +import org.apache.hudi.exception.HoodieDeltaStreamerException; +import org.apache.hudi.exception.HoodieKeyException; + +import java.util.Arrays; +import java.util.List; +import java.util.stream.Collectors; + +/** + * This is a generic implementation of KeyGenerator where users can configure record key as a single field or a combination of fields. + * Similarly partition path can be configured to have multiple fields or only one field. This class expects value for prop + * "hoodie.datasource.write.partitionpath.field" in a specific format. For example: + * + * properties.put("hoodie.datasource.write.partitionpath.field", "field1:PartitionKeyType1,field2:PartitionKeyType2"). + * + * The complete partition path is created as / and so on. + * + * Few points to consider: + * 1. If you want to customize some partition path field on a timestamp basis, you can use field1:timestampBased + * 2. If you simply want to have the value of your configured field in the partition path, use field1:simple + * 3. If you want your table to be non partitioned, simply leave it as blank. + * + * RecordKey is internally generated using either SimpleKeyGenerator or ComplexKeyGenerator. + */ +public class CustomKeyGenerator extends KeyGenerator { + + protected final List recordKeyFields; + protected final List partitionPathFields; + protected final TypedProperties properties; + private static final String DEFAULT_PARTITION_PATH_SEPARATOR = "/"; + private static final String SPLIT_REGEX = ":"; + + /** + * Used as a part of config in CustomKeyGenerator.java. + */ + public enum PartitionKeyType { + SIMPLE, TIMESTAMP + } + + public CustomKeyGenerator(TypedProperties props) { + super(props); + this.properties = props; + this.recordKeyFields = Arrays.stream(props.getString(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY()).split(",")).map(String::trim).collect(Collectors.toList()); + this.partitionPathFields = + Arrays.stream(props.getString(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY()).split(",")).map(String::trim).collect(Collectors.toList()); + } + + @Override + public HoodieKey getKey(GenericRecord record) { + //call function to get the record key + String recordKey = getRecordKey(record); + //call function to get the partition key based on the type for that partition path field + String partitionPath = getPartitionPath(record); + return new HoodieKey(recordKey, partitionPath); + } + + private String getPartitionPath(GenericRecord record) { + if (partitionPathFields == null) { + throw new HoodieKeyException("Unable to find field names for partition path in cfg"); + } + + String partitionPathField; + StringBuilder partitionPath = new StringBuilder(); + + //Corresponds to no partition case + if (partitionPathFields.size() == 1 && partitionPathFields.get(0).isEmpty()) { + return ""; + } + for (String field : partitionPathFields) { + String[] fieldWithType = field.split(SPLIT_REGEX); + if (fieldWithType.length != 2) { + throw new HoodieKeyException("Unable to find field names for partition path in proper format"); + } + + partitionPathField = fieldWithType[0]; + PartitionKeyType keyType = PartitionKeyType.valueOf(fieldWithType[1].toUpperCase()); + switch (keyType) { + case SIMPLE: + partitionPath.append(new SimpleKeyGenerator(properties).getPartitionPath(record, partitionPathField)); + break; + case TIMESTAMP: + partitionPath.append(new TimestampBasedKeyGenerator(properties).getPartitionPath(record, partitionPathField)); + break; + default: + throw new HoodieDeltaStreamerException("Please provide valid PartitionKeyType with fields! You provided: " + keyType); + } + + partitionPath.append(DEFAULT_PARTITION_PATH_SEPARATOR); + } + partitionPath.deleteCharAt(partitionPath.length() - 1); + + return partitionPath.toString(); + } + + private String getRecordKey(GenericRecord record) { + if (recordKeyFields == null || recordKeyFields.isEmpty()) { + throw new HoodieKeyException("Unable to find field names for record key in cfg"); + } + + return recordKeyFields.size() == 1 ? new SimpleKeyGenerator(properties).getRecordKey(record) : new ComplexKeyGenerator(properties).getRecordKey(record); + } +} diff --git a/hudi-spark/src/main/java/org/apache/hudi/keygen/GlobalDeleteKeyGenerator.java b/hudi-spark/src/main/java/org/apache/hudi/keygen/GlobalDeleteKeyGenerator.java index 315c2659e..37b05291d 100644 --- a/hudi-spark/src/main/java/org/apache/hudi/keygen/GlobalDeleteKeyGenerator.java +++ b/hudi-spark/src/main/java/org/apache/hudi/keygen/GlobalDeleteKeyGenerator.java @@ -28,6 +28,7 @@ import org.apache.avro.generic.GenericRecord; import java.util.Arrays; import java.util.List; +import java.util.stream.Collectors; /** * Key generator for deletes using global indices. Global index deletes do not require partition value @@ -43,15 +44,15 @@ public class GlobalDeleteKeyGenerator extends KeyGenerator { public GlobalDeleteKeyGenerator(TypedProperties config) { super(config); - this.recordKeyFields = Arrays.asList(config.getString(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY()).split(",")); + this.recordKeyFields = Arrays.stream(config.getString(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY()).split(",")).map(String::trim).collect(Collectors.toList()); } @Override public HoodieKey getKey(GenericRecord record) { - if (recordKeyFields == null) { - throw new HoodieKeyException("Unable to find field names for record key or partition path in cfg"); - } + return new HoodieKey(getRecordKey(record), EMPTY_PARTITION); + } + String getRecordKey(GenericRecord record) { boolean keyIsNullEmpty = true; StringBuilder recordKey = new StringBuilder(); for (String recordKeyField : recordKeyFields) { @@ -68,9 +69,9 @@ public class GlobalDeleteKeyGenerator extends KeyGenerator { recordKey.deleteCharAt(recordKey.length() - 1); if (keyIsNullEmpty) { throw new HoodieKeyException("recordKey values: \"" + recordKey + "\" for fields: " - + recordKeyFields.toString() + " cannot be entirely null or empty."); + + recordKeyFields.toString() + " cannot be entirely null or empty."); } - return new HoodieKey(recordKey.toString(), EMPTY_PARTITION); + return recordKey.toString(); } } diff --git a/hudi-spark/src/main/java/org/apache/hudi/keygen/NonpartitionedKeyGenerator.java b/hudi-spark/src/main/java/org/apache/hudi/keygen/NonpartitionedKeyGenerator.java index f5b32a00c..e790b4693 100644 --- a/hudi-spark/src/main/java/org/apache/hudi/keygen/NonpartitionedKeyGenerator.java +++ b/hudi-spark/src/main/java/org/apache/hudi/keygen/NonpartitionedKeyGenerator.java @@ -18,6 +18,7 @@ package org.apache.hudi.keygen; +import org.apache.hudi.DataSourceWriteOptions; import org.apache.hudi.DataSourceUtils; import org.apache.hudi.common.config.TypedProperties; import org.apache.hudi.common.model.HoodieKey; @@ -28,12 +29,15 @@ import org.apache.avro.generic.GenericRecord; /** * Simple Key generator for unpartitioned Hive Tables. */ -public class NonpartitionedKeyGenerator extends SimpleKeyGenerator { +public class NonpartitionedKeyGenerator extends KeyGenerator { private static final String EMPTY_PARTITION = ""; + protected final String recordKeyField; + public NonpartitionedKeyGenerator(TypedProperties props) { super(props); + this.recordKeyField = props.getString(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY()); } @Override diff --git a/hudi-spark/src/main/java/org/apache/hudi/keygen/SimpleKeyGenerator.java b/hudi-spark/src/main/java/org/apache/hudi/keygen/SimpleKeyGenerator.java index dde321d83..e9b9396b2 100644 --- a/hudi-spark/src/main/java/org/apache/hudi/keygen/SimpleKeyGenerator.java +++ b/hudi-spark/src/main/java/org/apache/hudi/keygen/SimpleKeyGenerator.java @@ -49,15 +49,12 @@ public class SimpleKeyGenerator extends KeyGenerator { @Override public HoodieKey getKey(GenericRecord record) { - if (recordKeyField == null || partitionPathField == null) { - throw new HoodieKeyException("Unable to find field names for record key or partition path in cfg"); - } - - String recordKey = DataSourceUtils.getNestedFieldValAsString(record, recordKeyField, true); - if (recordKey == null || recordKey.isEmpty()) { - throw new HoodieKeyException("recordKey value: \"" + recordKey + "\" for field: \"" + recordKeyField + "\" cannot be null or empty."); - } + String recordKey = getRecordKey(record); + String partitionPath = getPartitionPath(record, partitionPathField); + return new HoodieKey(recordKey, partitionPath); + } + String getPartitionPath(GenericRecord record, String partitionPathField) { String partitionPath = DataSourceUtils.getNestedFieldValAsString(record, partitionPathField, true); if (partitionPath == null || partitionPath.isEmpty()) { partitionPath = DEFAULT_PARTITION_PATH; @@ -66,6 +63,14 @@ public class SimpleKeyGenerator extends KeyGenerator { partitionPath = partitionPathField + "=" + partitionPath; } - return new HoodieKey(recordKey, partitionPath); + return partitionPath; + } + + String getRecordKey(GenericRecord record) { + String recordKey = DataSourceUtils.getNestedFieldValAsString(record, recordKeyField, true); + if (recordKey == null || recordKey.isEmpty()) { + throw new HoodieKeyException("recordKey value: \"" + recordKey + "\" for field: \"" + recordKeyField + "\" cannot be null or empty."); + } + return recordKey; } } diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/keygen/TimestampBasedKeyGenerator.java b/hudi-spark/src/main/java/org/apache/hudi/keygen/TimestampBasedKeyGenerator.java similarity index 87% rename from hudi-utilities/src/main/java/org/apache/hudi/utilities/keygen/TimestampBasedKeyGenerator.java rename to hudi-spark/src/main/java/org/apache/hudi/keygen/TimestampBasedKeyGenerator.java index e5bdc6456..c0885139d 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/keygen/TimestampBasedKeyGenerator.java +++ b/hudi-spark/src/main/java/org/apache/hudi/keygen/TimestampBasedKeyGenerator.java @@ -16,15 +16,13 @@ * limitations under the License. */ -package org.apache.hudi.utilities.keygen; +package org.apache.hudi.keygen; import org.apache.hudi.DataSourceUtils; -import org.apache.hudi.common.config.TypedProperties; import org.apache.hudi.common.model.HoodieKey; -import org.apache.hudi.exception.HoodieKeyException; +import org.apache.hudi.common.config.TypedProperties; import org.apache.hudi.exception.HoodieNotSupportedException; -import org.apache.hudi.keygen.SimpleKeyGenerator; -import org.apache.hudi.utilities.exception.HoodieDeltaStreamerException; +import org.apache.hudi.exception.HoodieDeltaStreamerException; import org.apache.avro.generic.GenericRecord; @@ -75,7 +73,7 @@ public class TimestampBasedKeyGenerator extends SimpleKeyGenerator { private static final String TIMESTAMP_OUTPUT_DATE_FORMAT_PROP = "hoodie.deltastreamer.keygen.timebased.output.dateformat"; private static final String TIMESTAMP_TIMEZONE_FORMAT_PROP = - "hoodie.deltastreamer.keygen.timebased.timezone"; + "hoodie.deltastreamer.keygen.timebased.timezone"; } public TimestampBasedKeyGenerator(TypedProperties config) { @@ -111,6 +109,12 @@ public class TimestampBasedKeyGenerator extends SimpleKeyGenerator { @Override public HoodieKey getKey(GenericRecord record) { + String recordKey = getRecordKey(record); + String partitionPath = getPartitionPath(record, partitionPathField); + return new HoodieKey(recordKey, partitionPath); + } + + String getPartitionPath(GenericRecord record, String partitionPathField) { Object partitionVal = DataSourceUtils.getNestedFieldVal(record, partitionPathField, true); if (partitionVal == null) { partitionVal = 1L; @@ -133,14 +137,9 @@ public class TimestampBasedKeyGenerator extends SimpleKeyGenerator { "Unexpected type for partition field: " + partitionVal.getClass().getName()); } Date timestamp = new Date(timeMs); - String recordKey = DataSourceUtils.getNestedFieldValAsString(record, recordKeyField, true); - if (recordKey == null || recordKey.isEmpty()) { - throw new HoodieKeyException("recordKey value: \"" + recordKey + "\" for field: \"" + recordKeyField + "\" cannot be null or empty."); - } - String partitionPath = hiveStylePartitioning ? partitionPathField + "=" + partitionPathFormat.format(timestamp) - : partitionPathFormat.format(timestamp); - return new HoodieKey(recordKey, partitionPath); + return hiveStylePartitioning ? partitionPathField + "=" + partitionPathFormat.format(timestamp) + : partitionPathFormat.format(timestamp); } catch (ParseException pe) { throw new HoodieDeltaStreamerException("Unable to parse input partition field :" + partitionVal, pe); } diff --git a/hudi-spark/src/test/java/org/apache/hudi/keygen/TestComplexKeyGenerator.java b/hudi-spark/src/test/java/org/apache/hudi/keygen/TestComplexKeyGenerator.java new file mode 100644 index 000000000..bb94c2538 --- /dev/null +++ b/hudi-spark/src/test/java/org/apache/hudi/keygen/TestComplexKeyGenerator.java @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.keygen; + +import org.apache.hudi.DataSourceWriteOptions; +import org.apache.hudi.common.config.TypedProperties; +import org.apache.hudi.common.model.HoodieKey; +import org.apache.hudi.exception.HoodieKeyException; + +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +public class TestComplexKeyGenerator extends TestKeyGeneratorUtilities { + + private TypedProperties getCommonProps(boolean getComplexRecordKey) { + TypedProperties properties = new TypedProperties(); + if (getComplexRecordKey) { + properties.put(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY(), "_row_key, pii_col"); + } else { + properties.put(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY(), "_row_key"); + } + properties.put(DataSourceWriteOptions.HIVE_STYLE_PARTITIONING_OPT_KEY(), "true"); + return properties; + } + + private TypedProperties getPropertiesWithoutPartitionPathProp() { + return getCommonProps(false); + } + + private TypedProperties getPropertiesWithoutRecordKeyProp() { + TypedProperties properties = new TypedProperties(); + properties.put(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY(), "timestamp"); + return properties; + } + + private TypedProperties getWrongRecordKeyFieldProps() { + TypedProperties properties = new TypedProperties(); + properties.put(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY(), "timestamp"); + properties.put(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY(), "_wrong_key"); + return properties; + } + + private TypedProperties getProps() { + TypedProperties properties = getCommonProps(true); + properties.put(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY(), "timestamp,ts_ms"); + return properties; + } + + @Test + public void testNullPartitionPathFields() { + Assertions.assertThrows(IllegalArgumentException.class, () -> new ComplexKeyGenerator(getPropertiesWithoutPartitionPathProp())); + } + + @Test + public void testNullRecordKeyFields() { + Assertions.assertThrows(IllegalArgumentException.class, () -> new ComplexKeyGenerator(getPropertiesWithoutRecordKeyProp())); + } + + @Test + public void testWrongRecordKeyField() { + ComplexKeyGenerator keyGenerator = new ComplexKeyGenerator(getWrongRecordKeyFieldProps()); + Assertions.assertThrows(HoodieKeyException.class, () -> keyGenerator.getRecordKey(getRecord())); + } + + @Test + public void testHappyFlow() { + ComplexKeyGenerator keyGenerator = new ComplexKeyGenerator(getProps()); + HoodieKey key = keyGenerator.getKey(getRecord()); + Assertions.assertEquals(key.getRecordKey(), "_row_key:key1,pii_col:pi"); + Assertions.assertEquals(key.getPartitionPath(), "timestamp=4357686/ts_ms=2020-03-21"); + } +} diff --git a/hudi-spark/src/test/java/org/apache/hudi/keygen/TestCustomKeyGenerator.java b/hudi-spark/src/test/java/org/apache/hudi/keygen/TestCustomKeyGenerator.java new file mode 100644 index 000000000..699bf43a4 --- /dev/null +++ b/hudi-spark/src/test/java/org/apache/hudi/keygen/TestCustomKeyGenerator.java @@ -0,0 +1,169 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.keygen; + +import org.apache.hudi.DataSourceWriteOptions; +import org.apache.hudi.common.model.HoodieKey; +import org.apache.hudi.common.config.TypedProperties; + +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +public class TestCustomKeyGenerator extends TestKeyGeneratorUtilities { + + private TypedProperties getCommonProps(boolean getComplexRecordKey) { + TypedProperties properties = new TypedProperties(); + if (getComplexRecordKey) { + properties.put(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY(), "_row_key, pii_col"); + } else { + properties.put(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY(), "_row_key"); + } + properties.put(DataSourceWriteOptions.HIVE_STYLE_PARTITIONING_OPT_KEY(), "true"); + return properties; + } + + private TypedProperties getPropertiesForSimpleKeyGen() { + TypedProperties properties = getCommonProps(false); + properties.put(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY(), "timestamp:simple"); + return properties; + } + + private TypedProperties getImproperPartitionFieldFormatProp() { + TypedProperties properties = getCommonProps(false); + properties.put(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY(), "timestamp"); + return properties; + } + + private TypedProperties getInvalidPartitionKeyTypeProps() { + TypedProperties properties = getCommonProps(false); + properties.put(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY(), "timestamp:dummy"); + return properties; + } + + private TypedProperties getComplexRecordKeyWithSimplePartitionProps() { + TypedProperties properties = getCommonProps(true); + properties.put(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY(), "timestamp:simple"); + return properties; + } + + private TypedProperties getComplexRecordKeyAndPartitionPathProps() { + TypedProperties properties = getCommonProps(true); + properties.put(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY(), "timestamp:simple,ts_ms:timestamp"); + populateNecessaryPropsForTimestampBasedKeyGen(properties); + return properties; + } + + private TypedProperties getPropsWithoutRecordKeyFieldProps() { + TypedProperties properties = new TypedProperties(); + properties.put(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY(), "timestamp:simple"); + return properties; + } + + private void populateNecessaryPropsForTimestampBasedKeyGen(TypedProperties properties) { + properties.put("hoodie.deltastreamer.keygen.timebased.timestamp.type", "DATE_STRING"); + properties.put("hoodie.deltastreamer.keygen.timebased.input.dateformat", "yyyy-MM-dd"); + properties.put("hoodie.deltastreamer.keygen.timebased.output.dateformat", "yyyyMMdd"); + } + + private TypedProperties getPropertiesForTimestampBasedKeyGen() { + TypedProperties properties = getCommonProps(false); + properties.put(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY(), "ts_ms:timestamp"); + populateNecessaryPropsForTimestampBasedKeyGen(properties); + return properties; + } + + private TypedProperties getPropertiesForNonPartitionedKeyGen() { + TypedProperties properties = getCommonProps(false); + properties.put(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY(), ""); + return properties; + } + + @Test + public void testSimpleKeyGenerator() { + KeyGenerator keyGenerator = new CustomKeyGenerator(getPropertiesForSimpleKeyGen()); + HoodieKey key = keyGenerator.getKey(getRecord()); + Assertions.assertEquals(key.getRecordKey(), "key1"); + Assertions.assertEquals(key.getPartitionPath(), "timestamp=4357686"); + } + + @Test + public void testTimestampBasedKeyGenerator() { + KeyGenerator keyGenerator = new CustomKeyGenerator(getPropertiesForTimestampBasedKeyGen()); + HoodieKey key = keyGenerator.getKey(getRecord()); + Assertions.assertEquals(key.getRecordKey(), "key1"); + Assertions.assertEquals(key.getPartitionPath(), "ts_ms=20200321"); + } + + @Test + public void testNonPartitionedKeyGenerator() { + KeyGenerator keyGenerator = new CustomKeyGenerator(getPropertiesForNonPartitionedKeyGen()); + HoodieKey key = keyGenerator.getKey(getRecord()); + Assertions.assertEquals(key.getRecordKey(), "key1"); + Assertions.assertTrue(key.getPartitionPath().isEmpty()); + } + + @Test + public void testInvalidPartitionKeyType() { + try { + KeyGenerator keyGenerator = new CustomKeyGenerator(getInvalidPartitionKeyTypeProps()); + keyGenerator.getKey(getRecord()); + Assertions.fail("should fail when invalid PartitionKeyType is provided!"); + } catch (Exception e) { + Assertions.assertTrue(e.getMessage().contains("No enum constant org.apache.hudi.keygen.CustomKeyGenerator.PartitionKeyType.DUMMY")); + } + } + + @Test + public void testNoRecordKeyFieldProp() { + try { + KeyGenerator keyGenerator = new CustomKeyGenerator(getPropsWithoutRecordKeyFieldProps()); + keyGenerator.getKey(getRecord()); + Assertions.fail("should fail when record key field is not provided!"); + } catch (Exception e) { + Assertions.assertTrue(e.getMessage().contains("Property hoodie.datasource.write.recordkey.field not found")); + } + } + + @Test + public void testPartitionFieldsInImproperFormat() { + try { + KeyGenerator keyGenerator = new CustomKeyGenerator(getImproperPartitionFieldFormatProp()); + keyGenerator.getKey(getRecord()); + Assertions.fail("should fail when partition key field is provided in improper format!"); + } catch (Exception e) { + Assertions.assertTrue(e.getMessage().contains("Unable to find field names for partition path in proper format")); + } + } + + @Test + public void testComplexRecordKeyWithSimplePartitionPath() { + KeyGenerator keyGenerator = new CustomKeyGenerator(getComplexRecordKeyWithSimplePartitionProps()); + HoodieKey key = keyGenerator.getKey(getRecord()); + Assertions.assertEquals(key.getRecordKey(), "_row_key:key1,pii_col:pi"); + Assertions.assertEquals(key.getPartitionPath(), "timestamp=4357686"); + } + + @Test + public void testComplexRecordKeysWithComplexPartitionPath() { + KeyGenerator keyGenerator = new CustomKeyGenerator(getComplexRecordKeyAndPartitionPathProps()); + HoodieKey key = keyGenerator.getKey(getRecord()); + Assertions.assertEquals(key.getRecordKey(), "_row_key:key1,pii_col:pi"); + Assertions.assertEquals(key.getPartitionPath(), "timestamp=4357686/ts_ms=20200321"); + } +} diff --git a/hudi-spark/src/test/java/org/apache/hudi/keygen/TestGlobalDeleteKeyGenerator.java b/hudi-spark/src/test/java/org/apache/hudi/keygen/TestGlobalDeleteKeyGenerator.java new file mode 100644 index 000000000..e46c783b3 --- /dev/null +++ b/hudi-spark/src/test/java/org/apache/hudi/keygen/TestGlobalDeleteKeyGenerator.java @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.keygen; + +import org.apache.hudi.DataSourceWriteOptions; +import org.apache.hudi.common.config.TypedProperties; +import org.apache.hudi.common.model.HoodieKey; +import org.apache.hudi.exception.HoodieKeyException; + +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +public class TestGlobalDeleteKeyGenerator extends TestKeyGeneratorUtilities { + + private TypedProperties getCommonProps(boolean getComplexRecordKey) { + TypedProperties properties = new TypedProperties(); + if (getComplexRecordKey) { + properties.put(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY(), "_row_key,pii_col"); + } else { + properties.put(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY(), "_row_key"); + } + properties.put(DataSourceWriteOptions.HIVE_STYLE_PARTITIONING_OPT_KEY(), "true"); + return properties; + } + + private TypedProperties getPropertiesWithoutRecordKeyProp() { + TypedProperties properties = new TypedProperties(); + properties.put(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY(), "timestamp"); + return properties; + } + + private TypedProperties getWrongRecordKeyFieldProps() { + TypedProperties properties = new TypedProperties(); + properties.put(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY(), "_wrong_key"); + return properties; + } + + private TypedProperties getProps() { + TypedProperties properties = getCommonProps(true); + properties.put(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY(), "timestamp,ts_ms"); + return properties; + } + + @Test + public void testNullRecordKeyFields() { + Assertions.assertThrows(IllegalArgumentException.class, () -> new GlobalDeleteKeyGenerator(getPropertiesWithoutRecordKeyProp())); + } + + @Test + public void testWrongRecordKeyField() { + GlobalDeleteKeyGenerator keyGenerator = new GlobalDeleteKeyGenerator(getWrongRecordKeyFieldProps()); + Assertions.assertThrows(HoodieKeyException.class, () -> keyGenerator.getRecordKey(getRecord())); + } + + @Test + public void testHappyFlow() { + GlobalDeleteKeyGenerator keyGenerator = new GlobalDeleteKeyGenerator(getProps()); + HoodieKey key = keyGenerator.getKey(getRecord()); + Assertions.assertEquals(key.getRecordKey(), "_row_key:key1,pii_col:pi"); + Assertions.assertEquals(key.getPartitionPath(), ""); + } +} diff --git a/hudi-spark/src/test/java/org/apache/hudi/keygen/TestKeyGeneratorUtilities.java b/hudi-spark/src/test/java/org/apache/hudi/keygen/TestKeyGeneratorUtilities.java new file mode 100644 index 000000000..c0d027e38 --- /dev/null +++ b/hudi-spark/src/test/java/org/apache/hudi/keygen/TestKeyGeneratorUtilities.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.keygen; + +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericData; +import org.apache.avro.generic.GenericRecord; + +public class TestKeyGeneratorUtilities { + + public String exampleSchema = "{\"type\": \"record\",\"name\": \"testrec\",\"fields\": [ " + + "{\"name\": \"timestamp\",\"type\": \"long\"},{\"name\": \"_row_key\", \"type\": \"string\"}," + + "{\"name\": \"ts_ms\", \"type\": \"string\"}," + + "{\"name\": \"pii_col\", \"type\": \"string\"}]}"; + + public GenericRecord getRecord() { + GenericRecord record = new GenericData.Record(new Schema.Parser().parse(exampleSchema)); + record.put("timestamp", 4357686); + record.put("_row_key", "key1"); + record.put("ts_ms", "2020-03-21"); + record.put("pii_col", "pi"); + return record; + } +} diff --git a/hudi-spark/src/test/java/org/apache/hudi/keygen/TestSimpleKeyGenerator.java b/hudi-spark/src/test/java/org/apache/hudi/keygen/TestSimpleKeyGenerator.java new file mode 100644 index 000000000..f36331a19 --- /dev/null +++ b/hudi-spark/src/test/java/org/apache/hudi/keygen/TestSimpleKeyGenerator.java @@ -0,0 +1,97 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.keygen; + +import org.apache.hudi.DataSourceWriteOptions; +import org.apache.hudi.common.config.TypedProperties; +import org.apache.hudi.common.model.HoodieKey; +import org.apache.hudi.exception.HoodieKeyException; + +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +public class TestSimpleKeyGenerator extends TestKeyGeneratorUtilities { + + private TypedProperties getCommonProps() { + TypedProperties properties = new TypedProperties(); + properties.put(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY(), "_row_key"); + properties.put(DataSourceWriteOptions.HIVE_STYLE_PARTITIONING_OPT_KEY(), "true"); + return properties; + } + + private TypedProperties getPropertiesWithoutPartitionPathProp() { + return getCommonProps(); + } + + private TypedProperties getPropertiesWithoutRecordKeyProp() { + TypedProperties properties = new TypedProperties(); + properties.put(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY(), "timestamp"); + return properties; + } + + private TypedProperties getWrongRecordKeyFieldProps() { + TypedProperties properties = new TypedProperties(); + properties.put(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY(), "timestamp"); + properties.put(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY(), "_wrong_key"); + return properties; + } + + private TypedProperties getComplexRecordKeyProp() { + TypedProperties properties = new TypedProperties(); + properties.put(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY(), "timestamp"); + properties.put(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY(), "_row_key,pii_col"); + return properties; + } + + private TypedProperties getProps() { + TypedProperties properties = getCommonProps(); + properties.put(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY(), "timestamp"); + return properties; + } + + @Test + public void testNullPartitionPathFields() { + Assertions.assertThrows(IllegalArgumentException.class, () -> new SimpleKeyGenerator(getPropertiesWithoutPartitionPathProp())); + } + + @Test + public void testNullRecordKeyFields() { + Assertions.assertThrows(IllegalArgumentException.class, () -> new SimpleKeyGenerator(getPropertiesWithoutRecordKeyProp())); + } + + @Test + public void testWrongRecordKeyField() { + SimpleKeyGenerator keyGenerator = new SimpleKeyGenerator(getWrongRecordKeyFieldProps()); + Assertions.assertThrows(HoodieKeyException.class, () -> keyGenerator.getRecordKey(getRecord())); + } + + @Test + public void testComplexRecordKeyField() { + SimpleKeyGenerator keyGenerator = new SimpleKeyGenerator(getComplexRecordKeyProp()); + Assertions.assertThrows(HoodieKeyException.class, () -> keyGenerator.getRecordKey(getRecord())); + } + + @Test + public void testHappyFlow() { + SimpleKeyGenerator keyGenerator = new SimpleKeyGenerator(getProps()); + HoodieKey key = keyGenerator.getKey(getRecord()); + Assertions.assertEquals(key.getRecordKey(), "key1"); + Assertions.assertEquals(key.getPartitionPath(), "timestamp=4357686"); + } +} diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/keygen/TestTimestampBasedKeyGenerator.java b/hudi-spark/src/test/java/org/apache/hudi/keygen/TestTimestampBasedKeyGenerator.java similarity index 97% rename from hudi-utilities/src/test/java/org/apache/hudi/utilities/keygen/TestTimestampBasedKeyGenerator.java rename to hudi-spark/src/test/java/org/apache/hudi/keygen/TestTimestampBasedKeyGenerator.java index 81f77510e..bd8583f2c 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/keygen/TestTimestampBasedKeyGenerator.java +++ b/hudi-spark/src/test/java/org/apache/hudi/keygen/TestTimestampBasedKeyGenerator.java @@ -16,7 +16,7 @@ * limitations under the License. */ -package org.apache.hudi.utilities.keygen; +package org.apache.hudi.keygen; import org.apache.hudi.DataSourceWriteOptions; import org.apache.hudi.common.config.TypedProperties; @@ -40,13 +40,13 @@ public class TestTimestampBasedKeyGenerator { public void initialize() throws IOException { Schema schema = SchemaTestUtil.getTimestampEvolvedSchema(); baseRecord = SchemaTestUtil - .generateAvroRecordFromJson(schema, 1, "001", "f1"); + .generateAvroRecordFromJson(schema, 1, "001", "f1"); properties.setProperty(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY(), "field1"); properties.setProperty(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY(), "createTime"); properties.setProperty(DataSourceWriteOptions.HIVE_STYLE_PARTITIONING_OPT_KEY(), "false"); } - + private TypedProperties getBaseKeyConfig(String timestampType, String dateFormat, String timezone, String scalarType) { properties.setProperty("hoodie.deltastreamer.keygen.timebased.timestamp.type", timestampType); properties.setProperty("hoodie.deltastreamer.keygen.timebased.output.dateformat", dateFormat); @@ -55,7 +55,6 @@ public class TestTimestampBasedKeyGenerator { if (scalarType != null) { properties.setProperty("hoodie.deltastreamer.keygen.timebased.timestamp.scalar.time.unit", scalarType); } - return properties; } diff --git a/hudi-utilities/pom.xml b/hudi-utilities/pom.xml index 7cb78a189..e320d6747 100644 --- a/hudi-utilities/pom.xml +++ b/hudi-utilities/pom.xml @@ -352,6 +352,14 @@ test-jar test + + org.apache.hudi + hudi-spark_${scala.binary.version} + ${project.version} + tests + test-jar + test + diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java index 0d3e90c49..41efc5075 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java @@ -39,7 +39,7 @@ import org.apache.hudi.hive.HiveSyncConfig; import org.apache.hudi.hive.HiveSyncTool; import org.apache.hudi.keygen.KeyGenerator; import org.apache.hudi.utilities.UtilHelpers; -import org.apache.hudi.utilities.exception.HoodieDeltaStreamerException; +import org.apache.hudi.exception.HoodieDeltaStreamerException; import org.apache.hudi.utilities.schema.DelegatingSchemaProvider; import org.apache.hudi.utilities.schema.RowBasedSchemaProvider; import org.apache.hudi.utilities.schema.SchemaProvider;