[HUDI-327] Add null/empty checks to key generators (#1040)
* Adds null and empty checks to all key generators. * Also improves error messaging for key generator issues.
This commit is contained in:
@@ -18,7 +18,7 @@
|
||||
import org.apache.avro.generic.GenericRecord
|
||||
import org.apache.hudi.common.model.EmptyHoodieRecordPayload
|
||||
import org.apache.hudi.common.util.{Option, SchemaTestUtil, TypedProperties}
|
||||
import org.apache.hudi.exception.HoodieException
|
||||
import org.apache.hudi.exception.{HoodieException, HoodieKeyException}
|
||||
import org.apache.hudi.{ComplexKeyGenerator, DataSourceWriteOptions, OverwriteWithLatestAvroPayload, SimpleKeyGenerator}
|
||||
import org.junit.Assert._
|
||||
import org.junit.{Before, Test}
|
||||
@@ -96,6 +96,44 @@ class TestDataSourceDefaults extends AssertionsForJUnit {
|
||||
val hk3 = new SimpleKeyGenerator(getKeyConfig("testNestedRecord.userId", "testNestedRecord.notThere"))
|
||||
.getKey(baseRecord);
|
||||
assertEquals("default", hk3.getPartitionPath)
|
||||
|
||||
// if partition is null, return default partition path
|
||||
baseRecord.put("name", "")
|
||||
val hk4 = new SimpleKeyGenerator(getKeyConfig("field1", "name"))
|
||||
.getKey(baseRecord)
|
||||
assertEquals("default", hk4.getPartitionPath)
|
||||
|
||||
// if partition is empty, return default partition path
|
||||
baseRecord.put("name", null)
|
||||
val hk5 = new SimpleKeyGenerator(getKeyConfig("field1", "name"))
|
||||
.getKey(baseRecord)
|
||||
assertEquals("default", hk5.getPartitionPath)
|
||||
|
||||
// if record key is empty, throw error
|
||||
try {
|
||||
baseRecord.put("field1", "")
|
||||
val props = new TypedProperties()
|
||||
props.setProperty(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY, "field1")
|
||||
props.setProperty(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY, "name")
|
||||
new SimpleKeyGenerator(props).getKey(baseRecord)
|
||||
fail("Should have errored out")
|
||||
} catch {
|
||||
case e: HoodieKeyException =>
|
||||
// do nothing
|
||||
}
|
||||
|
||||
// if record key is null, throw error
|
||||
try {
|
||||
baseRecord.put("field1", null)
|
||||
val props = new TypedProperties()
|
||||
props.setProperty(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY, "field1")
|
||||
props.setProperty(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY, "name")
|
||||
new SimpleKeyGenerator(props).getKey(baseRecord)
|
||||
fail("Should have errored out")
|
||||
} catch {
|
||||
case e: HoodieKeyException =>
|
||||
// do nothing
|
||||
}
|
||||
}
|
||||
|
||||
@Test def testComplexKeyGenerator() = {
|
||||
@@ -149,6 +187,32 @@ class TestDataSourceDefaults extends AssertionsForJUnit {
|
||||
val hk3 = new ComplexKeyGenerator(getKeyConfig("testNestedRecord.userId", "testNestedRecord.notThere"))
|
||||
.getKey(baseRecord);
|
||||
assertEquals("default", hk3.getPartitionPath)
|
||||
|
||||
// if one part of the record key is empty, replace with "__empty__"
|
||||
baseRecord.put("name", "")
|
||||
val hk4 = new ComplexKeyGenerator(getKeyConfig("field1,name", "field1,name")).getKey(baseRecord)
|
||||
assertEquals("field1:field1,name:__empty__", hk4.getRecordKey)
|
||||
assertEquals("field1/default", hk4.getPartitionPath)
|
||||
|
||||
// if one part of the record key is null, replace with "__null__"
|
||||
baseRecord.put("name", null)
|
||||
val hk5 = new ComplexKeyGenerator(getKeyConfig("field1,name", "field1,name")).getKey(baseRecord)
|
||||
assertEquals("field1:field1,name:__null__", hk5.getRecordKey)
|
||||
assertEquals("field1/default", hk5.getPartitionPath)
|
||||
|
||||
// if all parts of the composite record key are null/empty, throw error
|
||||
try {
|
||||
baseRecord.put("name", "")
|
||||
baseRecord.put("field1", null)
|
||||
val props = new TypedProperties()
|
||||
props.setProperty(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY, "field1,name")
|
||||
props.setProperty(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY, "field1,name")
|
||||
new ComplexKeyGenerator(props).getKey(baseRecord)
|
||||
fail("Should have errored out")
|
||||
} catch {
|
||||
case e: HoodieKeyException =>
|
||||
// do nothing
|
||||
}
|
||||
}
|
||||
|
||||
@Test def testOverwriteWithLatestAvroPayload() = {
|
||||
|
||||
Reference in New Issue
Block a user