From 1daba24065959dcab3606836bd1d789b3d74cc15 Mon Sep 17 00:00:00 2001 From: Scheller Date: Tue, 7 Jan 2020 16:51:28 -0800 Subject: [PATCH] Add GlobalDeleteKeyGenerator Adds new GlobalDeleteKeyGenerator for record_key deletes with global indices. Also refactors key generators into their own package. --- .../java/org/apache/hudi/DataSourceUtils.java | 1 + .../{ => keygen}/ComplexKeyGenerator.java | 9 ++- .../hudi/keygen/GlobalDeleteKeyGenerator.java | 74 +++++++++++++++++++ .../hudi/{ => keygen}/KeyGenerator.java | 2 +- .../NonpartitionedKeyGenerator.java | 3 +- .../hudi/{ => keygen}/SimpleKeyGenerator.java | 4 +- .../org/apache/hudi/DataSourceOptions.scala | 1 + hudi-spark/src/test/java/HoodieJavaApp.java | 4 +- .../test/scala/TestDataSourceDefaults.scala | 65 +++++++++++++++- .../utilities/deltastreamer/DeltaSync.java | 2 +- .../keygen/TimestampBasedKeyGenerator.java | 2 +- .../utilities/TestHoodieDeltaStreamer.java | 2 +- 12 files changed, 157 insertions(+), 12 deletions(-) rename hudi-spark/src/main/java/org/apache/hudi/{ => keygen}/ComplexKeyGenerator.java (93%) create mode 100644 hudi-spark/src/main/java/org/apache/hudi/keygen/GlobalDeleteKeyGenerator.java rename hudi-spark/src/main/java/org/apache/hudi/{ => keygen}/KeyGenerator.java (97%) rename hudi-spark/src/main/java/org/apache/hudi/{ => keygen}/NonpartitionedKeyGenerator.java (95%) rename hudi-spark/src/main/java/org/apache/hudi/{ => keygen}/SimpleKeyGenerator.java (95%) diff --git a/hudi-spark/src/main/java/org/apache/hudi/DataSourceUtils.java b/hudi-spark/src/main/java/org/apache/hudi/DataSourceUtils.java index de364ff4f..83bad6f65 100644 --- a/hudi-spark/src/main/java/org/apache/hudi/DataSourceUtils.java +++ b/hudi-spark/src/main/java/org/apache/hudi/DataSourceUtils.java @@ -37,6 +37,7 @@ import org.apache.hudi.index.HoodieIndex; import org.apache.avro.Schema.Field; import org.apache.avro.generic.GenericRecord; +import org.apache.hudi.keygen.KeyGenerator; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; diff --git a/hudi-spark/src/main/java/org/apache/hudi/ComplexKeyGenerator.java b/hudi-spark/src/main/java/org/apache/hudi/keygen/ComplexKeyGenerator.java similarity index 93% rename from hudi-spark/src/main/java/org/apache/hudi/ComplexKeyGenerator.java rename to hudi-spark/src/main/java/org/apache/hudi/keygen/ComplexKeyGenerator.java index 6460b5e9c..7406b9ca0 100644 --- a/hudi-spark/src/main/java/org/apache/hudi/ComplexKeyGenerator.java +++ b/hudi-spark/src/main/java/org/apache/hudi/keygen/ComplexKeyGenerator.java @@ -16,8 +16,10 @@ * limitations under the License. */ -package org.apache.hudi; +package org.apache.hudi.keygen; +import org.apache.hudi.DataSourceUtils; +import org.apache.hudi.DataSourceWriteOptions; import org.apache.hudi.common.model.HoodieKey; import org.apache.hudi.common.util.TypedProperties; import org.apache.hudi.exception.HoodieKeyException; @@ -34,8 +36,9 @@ public class ComplexKeyGenerator extends KeyGenerator { private static final String DEFAULT_PARTITION_PATH = "default"; private static final String DEFAULT_PARTITION_PATH_SEPARATOR = "/"; - private static final String NULL_RECORDKEY_PLACEHOLDER = "__null__"; - private static final String EMPTY_RECORDKEY_PLACEHOLDER = "__empty__"; + + protected static final String NULL_RECORDKEY_PLACEHOLDER = "__null__"; + protected static final String EMPTY_RECORDKEY_PLACEHOLDER = "__empty__"; protected final List recordKeyFields; diff --git a/hudi-spark/src/main/java/org/apache/hudi/keygen/GlobalDeleteKeyGenerator.java b/hudi-spark/src/main/java/org/apache/hudi/keygen/GlobalDeleteKeyGenerator.java new file mode 100644 index 000000000..ebbdb1d3b --- /dev/null +++ b/hudi-spark/src/main/java/org/apache/hudi/keygen/GlobalDeleteKeyGenerator.java @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.keygen; + +import java.util.Arrays; +import java.util.List; +import org.apache.avro.generic.GenericRecord; +import org.apache.hudi.DataSourceUtils; +import org.apache.hudi.DataSourceWriteOptions; +import org.apache.hudi.common.model.HoodieKey; +import org.apache.hudi.common.util.TypedProperties; +import org.apache.hudi.exception.HoodieKeyException; + +/** + * Key generator for deletes using global indices. Global index deletes do not require partition value + * so this key generator avoids using partition value for generating HoodieKey. + */ +public class GlobalDeleteKeyGenerator extends KeyGenerator { + + private static final String EMPTY_PARTITION = ""; + private static final String NULL_RECORDKEY_PLACEHOLDER = "__null__"; + private static final String EMPTY_RECORDKEY_PLACEHOLDER = "__empty__"; + + protected final List recordKeyFields; + + public GlobalDeleteKeyGenerator(TypedProperties config) { + super(config); + this.recordKeyFields = Arrays.asList(config.getString(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY()).split(",")); + } + + @Override + public HoodieKey getKey(GenericRecord record) { + if (recordKeyFields == null) { + throw new HoodieKeyException("Unable to find field names for record key or partition path in cfg"); + } + + boolean keyIsNullEmpty = true; + StringBuilder recordKey = new StringBuilder(); + for (String recordKeyField : recordKeyFields) { + String recordKeyValue = DataSourceUtils.getNestedFieldValAsString(record, recordKeyField, true); + if (recordKeyValue == null) { + recordKey.append(recordKeyField + ":" + NULL_RECORDKEY_PLACEHOLDER + ","); + } else if (recordKeyValue.isEmpty()) { + recordKey.append(recordKeyField + ":" + EMPTY_RECORDKEY_PLACEHOLDER + ","); + } else { + recordKey.append(recordKeyField + ":" + recordKeyValue + ","); + keyIsNullEmpty = false; + } + } + recordKey.deleteCharAt(recordKey.length() - 1); + if (keyIsNullEmpty) { + throw new HoodieKeyException("recordKey values: \"" + recordKey + "\" for fields: " + + recordKeyFields.toString() + " cannot be entirely null or empty."); + } + + return new HoodieKey(recordKey.toString(), EMPTY_PARTITION); + } +} diff --git a/hudi-spark/src/main/java/org/apache/hudi/KeyGenerator.java b/hudi-spark/src/main/java/org/apache/hudi/keygen/KeyGenerator.java similarity index 97% rename from hudi-spark/src/main/java/org/apache/hudi/KeyGenerator.java rename to hudi-spark/src/main/java/org/apache/hudi/keygen/KeyGenerator.java index 17b5e0b39..56fbcba7e 100644 --- a/hudi-spark/src/main/java/org/apache/hudi/KeyGenerator.java +++ b/hudi-spark/src/main/java/org/apache/hudi/keygen/KeyGenerator.java @@ -16,7 +16,7 @@ * limitations under the License. */ -package org.apache.hudi; +package org.apache.hudi.keygen; import org.apache.hudi.common.model.HoodieKey; import org.apache.hudi.common.util.TypedProperties; diff --git a/hudi-spark/src/main/java/org/apache/hudi/NonpartitionedKeyGenerator.java b/hudi-spark/src/main/java/org/apache/hudi/keygen/NonpartitionedKeyGenerator.java similarity index 95% rename from hudi-spark/src/main/java/org/apache/hudi/NonpartitionedKeyGenerator.java rename to hudi-spark/src/main/java/org/apache/hudi/keygen/NonpartitionedKeyGenerator.java index e022d926c..d22997aef 100644 --- a/hudi-spark/src/main/java/org/apache/hudi/NonpartitionedKeyGenerator.java +++ b/hudi-spark/src/main/java/org/apache/hudi/keygen/NonpartitionedKeyGenerator.java @@ -16,8 +16,9 @@ * limitations under the License. */ -package org.apache.hudi; +package org.apache.hudi.keygen; +import org.apache.hudi.DataSourceUtils; import org.apache.hudi.common.model.HoodieKey; import org.apache.hudi.common.util.TypedProperties; import org.apache.hudi.exception.HoodieKeyException; diff --git a/hudi-spark/src/main/java/org/apache/hudi/SimpleKeyGenerator.java b/hudi-spark/src/main/java/org/apache/hudi/keygen/SimpleKeyGenerator.java similarity index 95% rename from hudi-spark/src/main/java/org/apache/hudi/SimpleKeyGenerator.java rename to hudi-spark/src/main/java/org/apache/hudi/keygen/SimpleKeyGenerator.java index f93562015..67a794220 100644 --- a/hudi-spark/src/main/java/org/apache/hudi/SimpleKeyGenerator.java +++ b/hudi-spark/src/main/java/org/apache/hudi/keygen/SimpleKeyGenerator.java @@ -16,8 +16,10 @@ * limitations under the License. */ -package org.apache.hudi; +package org.apache.hudi.keygen; +import org.apache.hudi.DataSourceUtils; +import org.apache.hudi.DataSourceWriteOptions; import org.apache.hudi.common.model.HoodieKey; import org.apache.hudi.common.util.TypedProperties; import org.apache.hudi.exception.HoodieKeyException; diff --git a/hudi-spark/src/main/scala/org/apache/hudi/DataSourceOptions.scala b/hudi-spark/src/main/scala/org/apache/hudi/DataSourceOptions.scala index e21e025a0..564e2a47a 100644 --- a/hudi-spark/src/main/scala/org/apache/hudi/DataSourceOptions.scala +++ b/hudi-spark/src/main/scala/org/apache/hudi/DataSourceOptions.scala @@ -20,6 +20,7 @@ package org.apache.hudi import org.apache.hudi.common.model.HoodieTableType import org.apache.hudi.common.model.OverwriteWithLatestAvroPayload import org.apache.hudi.hive.SlashEncodedDayPartitionValueExtractor +import org.apache.hudi.keygen.SimpleKeyGenerator /** * List of options that can be passed to the Hoodie datasource, diff --git a/hudi-spark/src/test/java/HoodieJavaApp.java b/hudi-spark/src/test/java/HoodieJavaApp.java index f9499867a..f44cd0a9d 100644 --- a/hudi-spark/src/test/java/HoodieJavaApp.java +++ b/hudi-spark/src/test/java/HoodieJavaApp.java @@ -19,8 +19,8 @@ import org.apache.hudi.DataSourceReadOptions; import org.apache.hudi.DataSourceWriteOptions; import org.apache.hudi.HoodieDataSourceHelpers; -import org.apache.hudi.NonpartitionedKeyGenerator; -import org.apache.hudi.SimpleKeyGenerator; +import org.apache.hudi.keygen.NonpartitionedKeyGenerator; +import org.apache.hudi.keygen.SimpleKeyGenerator; import org.apache.hudi.common.HoodieClientTestUtils; import org.apache.hudi.common.HoodieTestDataGenerator; import org.apache.hudi.common.model.HoodieRecord; diff --git a/hudi-spark/src/test/scala/TestDataSourceDefaults.scala b/hudi-spark/src/test/scala/TestDataSourceDefaults.scala index f4b4fecd6..2ac5fce6b 100644 --- a/hudi-spark/src/test/scala/TestDataSourceDefaults.scala +++ b/hudi-spark/src/test/scala/TestDataSourceDefaults.scala @@ -19,7 +19,8 @@ import org.apache.avro.generic.GenericRecord import org.apache.hudi.common.model.{EmptyHoodieRecordPayload, OverwriteWithLatestAvroPayload} import org.apache.hudi.common.util.{Option, SchemaTestUtil, TypedProperties} import org.apache.hudi.exception.{HoodieException, HoodieKeyException} -import org.apache.hudi.{ComplexKeyGenerator, DataSourceWriteOptions, SimpleKeyGenerator} +import org.apache.hudi.keygen.{ComplexKeyGenerator, GlobalDeleteKeyGenerator, SimpleKeyGenerator} +import org.apache.hudi.DataSourceWriteOptions import org.junit.Assert._ import org.junit.{Before, Test} import org.scalatest.junit.AssertionsForJUnit @@ -225,6 +226,68 @@ class TestDataSourceDefaults extends AssertionsForJUnit { } } + @Test def testGlobalDeleteKeyGenerator() = { + // top level, partition value included but not actually used + val hk1 = new GlobalDeleteKeyGenerator(getKeyConfig("field1,name", "field1,name", "false")).getKey(baseRecord) + assertEquals("field1:field1,name:name1", hk1.getRecordKey) + assertEquals("", hk1.getPartitionPath) + + // top level, partition value not included + val props = new TypedProperties() + props.setProperty(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY, "field1,name") + val hk2 = new GlobalDeleteKeyGenerator(props).getKey(baseRecord) + assertEquals("field1:field1,name:name1", hk2.getRecordKey) + assertEquals("", hk2.getPartitionPath) + + // if one part of the record key is empty, replace with "__empty__" + baseRecord.put("name", "") + val hk3 = new GlobalDeleteKeyGenerator(getKeyConfig("field1,name", "field1,name", "false")).getKey(baseRecord) + assertEquals("field1:field1,name:__empty__", hk3.getRecordKey) + assertEquals("", hk3.getPartitionPath) + + // if one part of the record key is null, replace with "__null__" + baseRecord.put("name", null) + val hk4 = new GlobalDeleteKeyGenerator(getKeyConfig("field1,name", "field1,name", "false")).getKey(baseRecord) + assertEquals("field1:field1,name:__null__", hk4.getRecordKey) + assertEquals("", hk4.getPartitionPath) + + // recordkey field not specified + try { + val props = new TypedProperties() + props.setProperty(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY, "partitionField") + new GlobalDeleteKeyGenerator(props).getKey(baseRecord) + fail("Should have errored out") + } catch { + case e: IllegalArgumentException => { + // do nothing + } + }; + + // Nested record key not found + try { + new GlobalDeleteKeyGenerator(getKeyConfig("testNestedRecord.NotThere", "testNestedRecord.isAdmin", "false")) + .getKey(baseRecord) + fail("Should have errored out") + } catch { + case e: HoodieException => { + // do nothing + } + }; + + // if all parts of the composite record key are null/empty, throw error + try { + baseRecord.put("name", "") + baseRecord.put("field1", null) + val props = new TypedProperties() + props.setProperty(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY, "field1,name") + new GlobalDeleteKeyGenerator(props).getKey(baseRecord) + fail("Should have errored out") + } catch { + case e: HoodieKeyException => + // do nothing + } + } + @Test def testOverwriteWithLatestAvroPayload() = { val overWritePayload1 = new OverwriteWithLatestAvroPayload(baseRecord, 1) val laterRecord = SchemaTestUtil diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java index 8dc438eb7..6ba14877a 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java @@ -21,7 +21,7 @@ package org.apache.hudi.utilities.deltastreamer; import org.apache.hudi.AvroConversionUtils; import org.apache.hudi.DataSourceUtils; import org.apache.hudi.HoodieWriteClient; -import org.apache.hudi.KeyGenerator; +import org.apache.hudi.keygen.KeyGenerator; import org.apache.hudi.WriteStatus; import org.apache.hudi.common.model.HoodieCommitMetadata; import org.apache.hudi.common.model.HoodieRecord; diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/keygen/TimestampBasedKeyGenerator.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/keygen/TimestampBasedKeyGenerator.java index 7f1380eb7..c24ddc849 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/keygen/TimestampBasedKeyGenerator.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/keygen/TimestampBasedKeyGenerator.java @@ -19,7 +19,7 @@ package org.apache.hudi.utilities.keygen; import org.apache.hudi.DataSourceUtils; -import org.apache.hudi.SimpleKeyGenerator; +import org.apache.hudi.keygen.SimpleKeyGenerator; import org.apache.hudi.common.model.HoodieKey; import org.apache.hudi.common.util.TypedProperties; import org.apache.hudi.exception.HoodieKeyException; diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHoodieDeltaStreamer.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHoodieDeltaStreamer.java index ed45346f4..d2ff3033c 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHoodieDeltaStreamer.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHoodieDeltaStreamer.java @@ -19,7 +19,7 @@ package org.apache.hudi.utilities; import org.apache.hudi.DataSourceWriteOptions; -import org.apache.hudi.SimpleKeyGenerator; +import org.apache.hudi.keygen.SimpleKeyGenerator; import org.apache.hudi.common.model.HoodieCommitMetadata; import org.apache.hudi.common.model.HoodieTableType; import org.apache.hudi.common.model.OverwriteWithLatestAvroPayload;