From 1277c62398c58690cd5a6aa78048335d5313ca05 Mon Sep 17 00:00:00 2001 From: Jintao Guan Date: Thu, 18 Mar 2021 15:33:31 -0700 Subject: [PATCH] [HUDI-1653] Add support for composite keys in NonpartitionedKeyGenerator (#2627) * [HUDI-1653] Add support for composite keys in NonpartitionedKeyGenerator * update NonpartitionedKeyGenerator to support composite record keys * update NonpartitionedKeyGenerator --- .../NonpartitionedAvroKeyGenerator.java | 19 ++- .../keygen/NonpartitionedKeyGenerator.java | 20 ++- .../TestNonpartitionedKeyGenerator.java | 136 ++++++++++++++++++ 3 files changed, 170 insertions(+), 5 deletions(-) create mode 100644 hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/keygen/TestNonpartitionedKeyGenerator.java diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/NonpartitionedAvroKeyGenerator.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/NonpartitionedAvroKeyGenerator.java index a5272b38b..5b117c59e 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/NonpartitionedAvroKeyGenerator.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/NonpartitionedAvroKeyGenerator.java @@ -19,20 +19,26 @@ package org.apache.hudi.keygen; import org.apache.avro.generic.GenericRecord; import org.apache.hudi.common.config.TypedProperties; +import org.apache.hudi.keygen.constant.KeyGeneratorOptions; import java.util.ArrayList; +import java.util.Arrays; import java.util.List; +import java.util.stream.Collectors; /** * Avro simple Key generator for unpartitioned Hive Tables. */ -public class NonpartitionedAvroKeyGenerator extends SimpleAvroKeyGenerator { +public class NonpartitionedAvroKeyGenerator extends BaseKeyGenerator { private static final String EMPTY_PARTITION = ""; private static final List EMPTY_PARTITION_FIELD_LIST = new ArrayList<>(); public NonpartitionedAvroKeyGenerator(TypedProperties props) { super(props); + this.recordKeyFields = Arrays.stream(props.getString(KeyGeneratorOptions.RECORDKEY_FIELD_OPT_KEY) + .split(",")).map(String::trim).filter(s -> !s.isEmpty()).collect(Collectors.toList()); + this.partitionPathFields = EMPTY_PARTITION_FIELD_LIST; } @Override @@ -45,6 +51,17 @@ public class NonpartitionedAvroKeyGenerator extends SimpleAvroKeyGenerator { return EMPTY_PARTITION_FIELD_LIST; } + @Override + public String getRecordKey(GenericRecord record) { + // for backward compatibility, we need to use the right format according to the number of record key fields + // 1. if there is only one record key field, the format of record key is just "" + // 2. if there are multiple record key fields, the format is ":,:,..." + if (getRecordKeyFieldNames().size() == 1) { + return KeyGenUtils.getRecordKey(record, getRecordKeyFields().get(0)); + } + return KeyGenUtils.getRecordKey(record, getRecordKeyFields()); + } + public String getEmptyPartition() { return EMPTY_PARTITION; } diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/NonpartitionedKeyGenerator.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/NonpartitionedKeyGenerator.java index 543e1349e..277d2eb75 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/NonpartitionedKeyGenerator.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/NonpartitionedKeyGenerator.java @@ -20,20 +20,32 @@ package org.apache.hudi.keygen; import org.apache.avro.generic.GenericRecord; import org.apache.hudi.common.config.TypedProperties; +import org.apache.hudi.keygen.constant.KeyGeneratorOptions; import org.apache.spark.sql.Row; +import java.util.Arrays; +import java.util.Collections; import java.util.List; +import java.util.stream.Collectors; /** * Simple Key generator for unpartitioned Hive Tables. */ -public class NonpartitionedKeyGenerator extends SimpleKeyGenerator { +public class NonpartitionedKeyGenerator extends BuiltinKeyGenerator { private final NonpartitionedAvroKeyGenerator nonpartitionedAvroKeyGenerator; - public NonpartitionedKeyGenerator(TypedProperties config) { - super(config); - nonpartitionedAvroKeyGenerator = new NonpartitionedAvroKeyGenerator(config); + public NonpartitionedKeyGenerator(TypedProperties props) { + super(props); + this.recordKeyFields = Arrays.stream(props.getString(KeyGeneratorOptions.RECORDKEY_FIELD_OPT_KEY) + .split(",")).map(String::trim).collect(Collectors.toList()); + this.partitionPathFields = Collections.emptyList(); + nonpartitionedAvroKeyGenerator = new NonpartitionedAvroKeyGenerator(props); + } + + @Override + public String getRecordKey(GenericRecord record) { + return nonpartitionedAvroKeyGenerator.getRecordKey(record); } @Override diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/keygen/TestNonpartitionedKeyGenerator.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/keygen/TestNonpartitionedKeyGenerator.java new file mode 100644 index 000000000..4782caa15 --- /dev/null +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/keygen/TestNonpartitionedKeyGenerator.java @@ -0,0 +1,136 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.keygen; + +import org.apache.hudi.common.config.TypedProperties; +import org.apache.hudi.common.model.HoodieKey; +import org.apache.hudi.common.testutils.HoodieTestDataGenerator; +import org.apache.hudi.exception.HoodieKeyException; + +import org.apache.avro.generic.GenericRecord; +import org.apache.hudi.keygen.constant.KeyGeneratorOptions; +import org.apache.hudi.testutils.KeyGeneratorTestUtilities; +import org.apache.spark.sql.Row; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +import static junit.framework.TestCase.assertEquals; + +public class TestNonpartitionedKeyGenerator extends KeyGeneratorTestUtilities { + + private TypedProperties getCommonProps(boolean getComplexRecordKey) { + TypedProperties properties = new TypedProperties(); + if (getComplexRecordKey) { + properties.put(KeyGeneratorOptions.RECORDKEY_FIELD_OPT_KEY, "_row_key, pii_col"); + } else { + properties.put(KeyGeneratorOptions.RECORDKEY_FIELD_OPT_KEY, "_row_key"); + } + properties.put(KeyGeneratorOptions.HIVE_STYLE_PARTITIONING_OPT_KEY, "true"); + return properties; + } + + private TypedProperties getPropertiesWithoutPartitionPathProp() { + return getCommonProps(false); + } + + private TypedProperties getPropertiesWithPartitionPathProp() { + TypedProperties properties = getCommonProps(true); + properties.put(KeyGeneratorOptions.PARTITIONPATH_FIELD_OPT_KEY, "timestamp,ts_ms"); + return properties; + } + + private TypedProperties getPropertiesWithoutRecordKeyProp() { + TypedProperties properties = new TypedProperties(); + properties.put(KeyGeneratorOptions.PARTITIONPATH_FIELD_OPT_KEY, "timestamp"); + return properties; + } + + private TypedProperties getWrongRecordKeyFieldProps() { + TypedProperties properties = new TypedProperties(); + properties.put(KeyGeneratorOptions.RECORDKEY_FIELD_OPT_KEY, "_wrong_key"); + return properties; + } + + @Test + public void testNullRecordKeyFields() { + Assertions.assertThrows(IllegalArgumentException.class, () -> new NonpartitionedKeyGenerator(getPropertiesWithoutRecordKeyProp())); + } + + @Test + public void testNonNullPartitionPathFields() { + TypedProperties properties = getPropertiesWithPartitionPathProp(); + NonpartitionedKeyGenerator keyGenerator = new NonpartitionedKeyGenerator(properties); + GenericRecord record = getRecord(); + Row row = KeyGeneratorTestUtilities.getRow(record); + Assertions.assertEquals(properties.getString(KeyGeneratorOptions.PARTITIONPATH_FIELD_OPT_KEY), "timestamp,ts_ms"); + Assertions.assertEquals(keyGenerator.getPartitionPath(row), ""); + } + + @Test + public void testNullPartitionPathFields() { + TypedProperties properties = getPropertiesWithoutPartitionPathProp(); + NonpartitionedKeyGenerator keyGenerator = new NonpartitionedKeyGenerator(properties); + GenericRecord record = getRecord(); + Row row = KeyGeneratorTestUtilities.getRow(record); + Assertions.assertEquals(keyGenerator.getPartitionPath(row), ""); + } + + @Test + public void testWrongRecordKeyField() { + NonpartitionedKeyGenerator keyGenerator = new NonpartitionedKeyGenerator(getWrongRecordKeyFieldProps()); + Assertions.assertThrows(HoodieKeyException.class, () -> keyGenerator.getRecordKey(getRecord())); + Assertions.assertThrows(HoodieKeyException.class, () -> keyGenerator.buildFieldPositionMapIfNeeded(KeyGeneratorTestUtilities.structType)); + } + + @Test + public void testSingleValueKeyGeneratorNonPartitioned() { + TypedProperties properties = new TypedProperties(); + properties.setProperty(KeyGeneratorOptions.RECORDKEY_FIELD_OPT_KEY, "timestamp"); + properties.setProperty(KeyGeneratorOptions.PARTITIONPATH_FIELD_OPT_KEY, ""); + NonpartitionedKeyGenerator keyGenerator = new NonpartitionedKeyGenerator(properties); + assertEquals(keyGenerator.getRecordKeyFields().size(), 1); + assertEquals(keyGenerator.getPartitionPathFields().size(), 0); + + HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator(); + GenericRecord record = dataGenerator.generateGenericRecords(1).get(0); + String rowKey = record.get("timestamp").toString(); + HoodieKey hoodieKey = keyGenerator.getKey(record); + assertEquals(rowKey, hoodieKey.getRecordKey()); + assertEquals("", hoodieKey.getPartitionPath()); + } + + @Test + public void testMultipleValueKeyGeneratorNonPartitioned1() { + TypedProperties properties = new TypedProperties(); + properties.setProperty(KeyGeneratorOptions.RECORDKEY_FIELD_OPT_KEY, "timestamp,driver"); + properties.setProperty(KeyGeneratorOptions.PARTITIONPATH_FIELD_OPT_KEY, ""); + NonpartitionedKeyGenerator keyGenerator = new NonpartitionedKeyGenerator(properties); + assertEquals(keyGenerator.getRecordKeyFields().size(), 2); + assertEquals(keyGenerator.getPartitionPathFields().size(), 0); + HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator(); + GenericRecord record = dataGenerator.generateGenericRecords(1).get(0); + String rowKey = + "timestamp" + ComplexAvroKeyGenerator.DEFAULT_RECORD_KEY_SEPARATOR + record.get("timestamp").toString() + "," + + "driver" + ComplexAvroKeyGenerator.DEFAULT_RECORD_KEY_SEPARATOR + record.get("driver").toString(); + String partitionPath = ""; + HoodieKey hoodieKey = keyGenerator.getKey(record); + assertEquals(rowKey, hoodieKey.getRecordKey()); + assertEquals(partitionPath, hoodieKey.getPartitionPath()); + } +}