1
0

[HUDI-578] Trim recordKeyFields and partitionPathFields in ComplexKeyGenerator (#1281)

* [HUDI-578] Trim recordKeyFields and partitionPathFields in ComplexKeyGenerator

* add tests
This commit is contained in:
leesf
2020-01-30 08:26:26 +08:00
committed by GitHub
parent 6f34be1b8d
commit 652224edc8
2 changed files with 16 additions and 2 deletions

View File

@@ -28,6 +28,7 @@ import org.apache.avro.generic.GenericRecord;
import java.util.Arrays;
import java.util.List;
import java.util.stream.Collectors;
/**
* Complex key generator, which takes names of fields to be used for recordKey and partitionPath as configs.
@@ -48,9 +49,11 @@ public class ComplexKeyGenerator extends KeyGenerator {
public ComplexKeyGenerator(TypedProperties props) {
super(props);
this.recordKeyFields = Arrays.asList(props.getString(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY()).split(","));
this.recordKeyFields = Arrays.asList(props.getString(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY()).split(","))
.stream().map(String::trim).collect(Collectors.toList());
this.partitionPathFields =
Arrays.asList(props.getString(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY()).split(","));
Arrays.asList(props.getString(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY()).split(","))
.stream().map(String::trim).collect(Collectors.toList());
this.hiveStylePartitioning = props.getBoolean(DataSourceWriteOptions.HIVE_STYLE_PARTITIONING_OPT_KEY(),
Boolean.parseBoolean(DataSourceWriteOptions.DEFAULT_HIVE_STYLE_PARTITIONING_OPT_VAL()));
}

View File

@@ -224,6 +224,17 @@ class TestDataSourceDefaults extends AssertionsForJUnit {
case e: HoodieKeyException =>
// do nothing
}
// reset name and field1 values.
baseRecord.put("name", "name1")
baseRecord.put("field1", "field1")
val hk7 = new ComplexKeyGenerator(getKeyConfig("field1, name", "field1, name", "false")).getKey(baseRecord)
assertEquals("field1:field1,name:name1", hk7.getRecordKey)
assertEquals("field1/name1", hk7.getPartitionPath)
val hk8 = new ComplexKeyGenerator(getKeyConfig("field1,", "field1,", "false")).getKey(baseRecord)
assertEquals("field1:field1", hk8.getRecordKey)
assertEquals("field1", hk8.getPartitionPath)
}
@Test def testGlobalDeleteKeyGenerator() = {