From 17e878f721c110f274251d2ba6c7c50fcd58cb8f Mon Sep 17 00:00:00 2001 From: Jaimin Shah Date: Fri, 21 Jun 2019 12:55:06 +0530 Subject: [PATCH] adding support for complex keys (#728) - Resolving the issue related to ambiguity in recordKey by creating and parsing json object as string. - added unit test for ComplexKeyGenerator - minor changes --- .../com/uber/hoodie/ComplexKeyGenerator.java | 81 +++++++++++++++++++ .../test/scala/DataSourceDefaultsTest.scala | 55 ++++++++++++- 2 files changed, 135 insertions(+), 1 deletion(-) create mode 100644 hoodie-spark/src/main/java/com/uber/hoodie/ComplexKeyGenerator.java diff --git a/hoodie-spark/src/main/java/com/uber/hoodie/ComplexKeyGenerator.java b/hoodie-spark/src/main/java/com/uber/hoodie/ComplexKeyGenerator.java new file mode 100644 index 000000000..69d80f7ad --- /dev/null +++ b/hoodie-spark/src/main/java/com/uber/hoodie/ComplexKeyGenerator.java @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.uber.hoodie; + +import com.uber.hoodie.common.model.HoodieKey; +import com.uber.hoodie.common.util.TypedProperties; +import com.uber.hoodie.exception.HoodieException; +import java.util.Arrays; +import java.util.List; +import org.apache.avro.generic.GenericRecord; + +/** + * Complex key generator, which takes names of fields to be used for recordKey and partitionPath as + * configs. + */ +public class ComplexKeyGenerator extends KeyGenerator { + + private static final String DEFAULT_PARTITION_PATH = "default"; + + private static final String DEFAULT_PARTITION_PATH_SEPARATOR = "/"; + + protected final List recordKeyFields; + + protected final List partitionPathFields; + + public ComplexKeyGenerator(TypedProperties props) { + super(props); + this.recordKeyFields = Arrays.asList(props.getString(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY()).split(",")); + this.partitionPathFields = Arrays.asList(props + .getString(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY()).split(",")); + } + + @Override + public HoodieKey getKey(GenericRecord record) { + if (recordKeyFields == null || partitionPathFields == null) { + throw new HoodieException( + "Unable to find field names for record key or partition path in cfg"); + } + StringBuilder recordKey = new StringBuilder(); + for (String recordKeyField : recordKeyFields) { + recordKey.append(recordKeyField + ":" + DataSourceUtils.getNestedFieldValAsString(record, recordKeyField) + ","); + } + recordKey.deleteCharAt(recordKey.length() - 1); + StringBuilder partitionPath = new StringBuilder(); + try { + for (String partitionPathField : partitionPathFields) { + partitionPath.append(DataSourceUtils.getNestedFieldValAsString(record, partitionPathField)); + partitionPath.append(DEFAULT_PARTITION_PATH_SEPARATOR); + } + partitionPath.deleteCharAt(partitionPath.length() - 1); + } catch (HoodieException e) { + partitionPath = partitionPath.append(DEFAULT_PARTITION_PATH); + } + + return new HoodieKey(recordKey.toString(), partitionPath.toString()); + } + + public List getRecordKeyFields() { + return recordKeyFields; + } + + public List getPartitionPathFields() { + return partitionPathFields; + } +} \ No newline at end of file diff --git a/hoodie-spark/src/test/scala/DataSourceDefaultsTest.scala b/hoodie-spark/src/test/scala/DataSourceDefaultsTest.scala index e428e647e..b59d389d0 100644 --- a/hoodie-spark/src/test/scala/DataSourceDefaultsTest.scala +++ b/hoodie-spark/src/test/scala/DataSourceDefaultsTest.scala @@ -19,7 +19,7 @@ import java.util.Optional import com.uber.hoodie.common.util.{SchemaTestUtil, TypedProperties} import com.uber.hoodie.exception.HoodieException -import com.uber.hoodie.{DataSourceWriteOptions, EmptyHoodieRecordPayload, OverwriteWithLatestAvroPayload, SimpleKeyGenerator} +import com.uber.hoodie.{DataSourceWriteOptions, EmptyHoodieRecordPayload, OverwriteWithLatestAvroPayload, SimpleKeyGenerator, ComplexKeyGenerator} import org.apache.avro.generic.GenericRecord import org.junit.Assert._ import org.junit.{Before, Test} @@ -99,6 +99,59 @@ class DataSourceDefaultsTest extends AssertionsForJUnit { assertEquals("default", hk3.getPartitionPath) } + @Test def testComplexKeyGenerator() = { + // top level, valid fields + val hk1 = new ComplexKeyGenerator(getKeyConfig("field1,name", "field1,name")).getKey(baseRecord) + assertEquals("field1:field1,name:name1", hk1.getRecordKey) + assertEquals("field1/name1", hk1.getPartitionPath) + + // partition path field not specified + try { + val props = new TypedProperties() + props.setProperty(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY, "field1") + new ComplexKeyGenerator(props).getKey(baseRecord) + fail("Should have errored out") + } catch { + case e: IllegalArgumentException => { + // do nothing + } + }; + + // recordkey field not specified + try { + val props = new TypedProperties() + props.setProperty(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY, "partitionField") + new ComplexKeyGenerator(props).getKey(baseRecord) + fail("Should have errored out") + } catch { + case e: IllegalArgumentException => { + // do nothing + } + }; + + // nested field as record key and partition path + val hk2 = new ComplexKeyGenerator(getKeyConfig("testNestedRecord.userId,testNestedRecord.isAdmin", "testNestedRecord.userId,testNestedRecord.isAdmin")) + .getKey(baseRecord) + assertEquals("testNestedRecord.userId:UserId1@001,testNestedRecord.isAdmin:false", hk2.getRecordKey) + assertEquals("UserId1@001/false", hk2.getPartitionPath) + + // Nested record key not found + try { + new ComplexKeyGenerator(getKeyConfig("testNestedRecord.NotThere", "testNestedRecord.isAdmin")) + .getKey(baseRecord) + fail("Should have errored out") + } catch { + case e: HoodieException => { + // do nothing + } + }; + + // if partition path can't be found, return default partition path + val hk3 = new ComplexKeyGenerator(getKeyConfig("testNestedRecord.userId", "testNestedRecord.notThere")) + .getKey(baseRecord); + assertEquals("default", hk3.getPartitionPath) + } + @Test def testOverwriteWithLatestAvroPayload() = { val overWritePayload1 = new OverwriteWithLatestAvroPayload(baseRecord, 1) val laterRecord = SchemaTestUtil