diff --git a/hudi-common/src/main/java/org/apache/hudi/exception/HoodieKeyException.java b/hudi-common/src/main/java/org/apache/hudi/exception/HoodieKeyException.java new file mode 100644 index 000000000..04080c9ea --- /dev/null +++ b/hudi-common/src/main/java/org/apache/hudi/exception/HoodieKeyException.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.exception; + +/** + *

+ * Exception thrown for Hoodie Key Generator related errors. + *

+ */ +public class HoodieKeyException extends HoodieException { + + public HoodieKeyException(String msg) { + super(msg); + } + + public HoodieKeyException(String msg, Throwable e) { + super(msg, e); + } +} diff --git a/hudi-spark/src/main/java/org/apache/hudi/ComplexKeyGenerator.java b/hudi-spark/src/main/java/org/apache/hudi/ComplexKeyGenerator.java index 8419257d8..15f47d39d 100644 --- a/hudi-spark/src/main/java/org/apache/hudi/ComplexKeyGenerator.java +++ b/hudi-spark/src/main/java/org/apache/hudi/ComplexKeyGenerator.java @@ -23,7 +23,7 @@ import java.util.List; import org.apache.avro.generic.GenericRecord; import org.apache.hudi.common.model.HoodieKey; import org.apache.hudi.common.util.TypedProperties; -import org.apache.hudi.exception.HoodieException; +import org.apache.hudi.exception.HoodieKeyException; /** * Complex key generator, which takes names of fields to be used for recordKey and partitionPath as configs. @@ -31,8 +31,9 @@ import org.apache.hudi.exception.HoodieException; public class ComplexKeyGenerator extends KeyGenerator { private static final String DEFAULT_PARTITION_PATH = "default"; - private static final String DEFAULT_PARTITION_PATH_SEPARATOR = "/"; + private static final String NULL_RECORDKEY_PLACEHOLDER = "__null__"; + private static final String EMPTY_RECORDKEY_PLACEHOLDER = "__empty__"; protected final List recordKeyFields; @@ -48,24 +49,40 @@ public class ComplexKeyGenerator extends KeyGenerator { @Override public HoodieKey getKey(GenericRecord record) { if (recordKeyFields == null || partitionPathFields == null) { - throw new HoodieException("Unable to find field names for record key or partition path in cfg"); + throw new HoodieKeyException("Unable to find field names for record key or partition path in cfg"); } + + boolean keyIsNullEmpty = true; StringBuilder recordKey = new StringBuilder(); for (String recordKeyField : recordKeyFields) { - recordKey.append(recordKeyField + ":" + DataSourceUtils.getNestedFieldValAsString(record, recordKeyField) + ","); + String recordKeyValue = DataSourceUtils.getNullableNestedFieldValAsString(record, recordKeyField); + if (recordKeyValue == null) { + recordKey.append(recordKeyField + ":" + NULL_RECORDKEY_PLACEHOLDER + ","); + } else if (recordKeyValue.isEmpty()) { + recordKey.append(recordKeyField + ":" + EMPTY_RECORDKEY_PLACEHOLDER + ","); + } else { + recordKey.append(recordKeyField + ":" + recordKeyValue + ","); + keyIsNullEmpty = false; + } } recordKey.deleteCharAt(recordKey.length() - 1); - StringBuilder partitionPath = new StringBuilder(); - try { - for (String partitionPathField : partitionPathFields) { - partitionPath.append(DataSourceUtils.getNestedFieldValAsString(record, partitionPathField)); - partitionPath.append(DEFAULT_PARTITION_PATH_SEPARATOR); - } - partitionPath.deleteCharAt(partitionPath.length() - 1); - } catch (HoodieException e) { - partitionPath = partitionPath.append(DEFAULT_PARTITION_PATH); + if (keyIsNullEmpty) { + throw new HoodieKeyException("recordKey values: \"" + recordKey + "\" for fields: " + + recordKeyFields.toString() + " cannot be entirely null or empty."); } + StringBuilder partitionPath = new StringBuilder(); + for (String partitionPathField : partitionPathFields) { + String fieldVal = DataSourceUtils.getNullableNestedFieldValAsString(record, partitionPathField); + if (fieldVal == null || fieldVal.isEmpty()) { + partitionPath.append(DEFAULT_PARTITION_PATH); + } else { + partitionPath.append(fieldVal); + } + partitionPath.append(DEFAULT_PARTITION_PATH_SEPARATOR); + } + partitionPath.deleteCharAt(partitionPath.length() - 1); + return new HoodieKey(recordKey.toString(), partitionPath.toString()); } diff --git a/hudi-spark/src/main/java/org/apache/hudi/DataSourceUtils.java b/hudi-spark/src/main/java/org/apache/hudi/DataSourceUtils.java index 90128b8b5..6938e4b44 100644 --- a/hudi-spark/src/main/java/org/apache/hudi/DataSourceUtils.java +++ b/hudi-spark/src/main/java/org/apache/hudi/DataSourceUtils.java @@ -51,6 +51,17 @@ import org.apache.spark.api.java.JavaSparkContext; */ public class DataSourceUtils { + /** + * Obtain value of the provided nullable field as string, denoted by dot notation. e.g: a.b.c + */ + public static String getNullableNestedFieldValAsString(GenericRecord record, String fieldName) { + try { + return getNestedFieldValAsString(record, fieldName); + } catch (HoodieException e) { + return null; + } + } + /** * Obtain value of the provided field as string, denoted by dot notation. e.g: a.b.c */ diff --git a/hudi-spark/src/main/java/org/apache/hudi/NonpartitionedKeyGenerator.java b/hudi-spark/src/main/java/org/apache/hudi/NonpartitionedKeyGenerator.java index 50cf327d4..4cfbd55bd 100644 --- a/hudi-spark/src/main/java/org/apache/hudi/NonpartitionedKeyGenerator.java +++ b/hudi-spark/src/main/java/org/apache/hudi/NonpartitionedKeyGenerator.java @@ -21,6 +21,7 @@ package org.apache.hudi; import org.apache.avro.generic.GenericRecord; import org.apache.hudi.common.model.HoodieKey; import org.apache.hudi.common.util.TypedProperties; +import org.apache.hudi.exception.HoodieKeyException; /** * Simple Key generator for unpartitioned Hive Tables @@ -35,7 +36,10 @@ public class NonpartitionedKeyGenerator extends SimpleKeyGenerator { @Override public HoodieKey getKey(GenericRecord record) { - String recordKey = DataSourceUtils.getNestedFieldValAsString(record, recordKeyField); + String recordKey = DataSourceUtils.getNullableNestedFieldValAsString(record, recordKeyField); + if (recordKey == null || recordKey.isEmpty()) { + throw new HoodieKeyException("recordKey value: \"" + recordKey + "\" for field: \"" + recordKeyField + "\" cannot be null or empty."); + } return new HoodieKey(recordKey, EMPTY_PARTITION); } } diff --git a/hudi-spark/src/main/java/org/apache/hudi/SimpleKeyGenerator.java b/hudi-spark/src/main/java/org/apache/hudi/SimpleKeyGenerator.java index b9bb25865..8f5948442 100644 --- a/hudi-spark/src/main/java/org/apache/hudi/SimpleKeyGenerator.java +++ b/hudi-spark/src/main/java/org/apache/hudi/SimpleKeyGenerator.java @@ -21,7 +21,7 @@ package org.apache.hudi; import org.apache.avro.generic.GenericRecord; import org.apache.hudi.common.model.HoodieKey; import org.apache.hudi.common.util.TypedProperties; -import org.apache.hudi.exception.HoodieException; +import org.apache.hudi.exception.HoodieKeyException; /** * Simple key generator, which takes names of fields to be used for recordKey and partitionPath as configs. @@ -43,15 +43,16 @@ public class SimpleKeyGenerator extends KeyGenerator { @Override public HoodieKey getKey(GenericRecord record) { if (recordKeyField == null || partitionPathField == null) { - throw new HoodieException("Unable to find field names for record key or partition path in cfg"); + throw new HoodieKeyException("Unable to find field names for record key or partition path in cfg"); } - String recordKey = DataSourceUtils.getNestedFieldValAsString(record, recordKeyField); - String partitionPath; - try { - partitionPath = DataSourceUtils.getNestedFieldValAsString(record, partitionPathField); - } catch (HoodieException e) { - // if field is not found, lump it into default partition + String recordKey = DataSourceUtils.getNullableNestedFieldValAsString(record, recordKeyField); + if (recordKey == null || recordKey.isEmpty()) { + throw new HoodieKeyException("recordKey value: \"" + recordKey + "\" for field: \"" + recordKeyField + "\" cannot be null or empty."); + } + + String partitionPath = DataSourceUtils.getNullableNestedFieldValAsString(record, partitionPathField); + if (partitionPath == null || partitionPath.isEmpty()) { partitionPath = DEFAULT_PARTITION_PATH; } diff --git a/hudi-spark/src/test/scala/TestDataSourceDefaults.scala b/hudi-spark/src/test/scala/TestDataSourceDefaults.scala index a2a1804d7..d4e3f1414 100644 --- a/hudi-spark/src/test/scala/TestDataSourceDefaults.scala +++ b/hudi-spark/src/test/scala/TestDataSourceDefaults.scala @@ -18,7 +18,7 @@ import org.apache.avro.generic.GenericRecord import org.apache.hudi.common.model.EmptyHoodieRecordPayload import org.apache.hudi.common.util.{Option, SchemaTestUtil, TypedProperties} -import org.apache.hudi.exception.HoodieException +import org.apache.hudi.exception.{HoodieException, HoodieKeyException} import org.apache.hudi.{ComplexKeyGenerator, DataSourceWriteOptions, OverwriteWithLatestAvroPayload, SimpleKeyGenerator} import org.junit.Assert._ import org.junit.{Before, Test} @@ -96,6 +96,44 @@ class TestDataSourceDefaults extends AssertionsForJUnit { val hk3 = new SimpleKeyGenerator(getKeyConfig("testNestedRecord.userId", "testNestedRecord.notThere")) .getKey(baseRecord); assertEquals("default", hk3.getPartitionPath) + + // if partition is null, return default partition path + baseRecord.put("name", "") + val hk4 = new SimpleKeyGenerator(getKeyConfig("field1", "name")) + .getKey(baseRecord) + assertEquals("default", hk4.getPartitionPath) + + // if partition is empty, return default partition path + baseRecord.put("name", null) + val hk5 = new SimpleKeyGenerator(getKeyConfig("field1", "name")) + .getKey(baseRecord) + assertEquals("default", hk5.getPartitionPath) + + // if record key is empty, throw error + try { + baseRecord.put("field1", "") + val props = new TypedProperties() + props.setProperty(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY, "field1") + props.setProperty(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY, "name") + new SimpleKeyGenerator(props).getKey(baseRecord) + fail("Should have errored out") + } catch { + case e: HoodieKeyException => + // do nothing + } + + // if record key is null, throw error + try { + baseRecord.put("field1", null) + val props = new TypedProperties() + props.setProperty(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY, "field1") + props.setProperty(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY, "name") + new SimpleKeyGenerator(props).getKey(baseRecord) + fail("Should have errored out") + } catch { + case e: HoodieKeyException => + // do nothing + } } @Test def testComplexKeyGenerator() = { @@ -149,6 +187,32 @@ class TestDataSourceDefaults extends AssertionsForJUnit { val hk3 = new ComplexKeyGenerator(getKeyConfig("testNestedRecord.userId", "testNestedRecord.notThere")) .getKey(baseRecord); assertEquals("default", hk3.getPartitionPath) + + // if one part of the record key is empty, replace with "__empty__" + baseRecord.put("name", "") + val hk4 = new ComplexKeyGenerator(getKeyConfig("field1,name", "field1,name")).getKey(baseRecord) + assertEquals("field1:field1,name:__empty__", hk4.getRecordKey) + assertEquals("field1/default", hk4.getPartitionPath) + + // if one part of the record key is null, replace with "__null__" + baseRecord.put("name", null) + val hk5 = new ComplexKeyGenerator(getKeyConfig("field1,name", "field1,name")).getKey(baseRecord) + assertEquals("field1:field1,name:__null__", hk5.getRecordKey) + assertEquals("field1/default", hk5.getPartitionPath) + + // if all parts of the composite record key are null/empty, throw error + try { + baseRecord.put("name", "") + baseRecord.put("field1", null) + val props = new TypedProperties() + props.setProperty(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY, "field1,name") + props.setProperty(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY, "field1,name") + new ComplexKeyGenerator(props).getKey(baseRecord) + fail("Should have errored out") + } catch { + case e: HoodieKeyException => + // do nothing + } } @Test def testOverwriteWithLatestAvroPayload() = { diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/keygen/TimestampBasedKeyGenerator.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/keygen/TimestampBasedKeyGenerator.java index 970ec54e9..4f91f959f 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/keygen/TimestampBasedKeyGenerator.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/keygen/TimestampBasedKeyGenerator.java @@ -30,6 +30,7 @@ import org.apache.hudi.DataSourceUtils; import org.apache.hudi.SimpleKeyGenerator; import org.apache.hudi.common.model.HoodieKey; import org.apache.hudi.common.util.TypedProperties; +import org.apache.hudi.exception.HoodieKeyException; import org.apache.hudi.exception.HoodieNotSupportedException; import org.apache.hudi.utilities.exception.HoodieDeltaStreamerException; @@ -98,8 +99,11 @@ public class TimestampBasedKeyGenerator extends SimpleKeyGenerator { } Date timestamp = this.timestampType == TimestampType.EPOCHMILLISECONDS ? new Date(unixTime) : new Date(unixTime * 1000); - return new HoodieKey(DataSourceUtils.getNestedFieldValAsString(record, recordKeyField), - partitionPathFormat.format(timestamp)); + String recordKey = DataSourceUtils.getNullableNestedFieldValAsString(record, recordKeyField); + if (recordKey == null || recordKey.isEmpty()) { + throw new HoodieKeyException("recordKey value: \"" + recordKey + "\" for field: \"" + recordKeyField + "\" cannot be null or empty."); + } + return new HoodieKey(recordKey, partitionPathFormat.format(timestamp)); } catch (ParseException pe) { throw new HoodieDeltaStreamerException("Unable to parse input partition field :" + partitionVal, pe); }