From 9ef51deb841765e85a1d1c8ab337677a687dfb66 Mon Sep 17 00:00:00 2001 From: lyogev Date: Thu, 11 Apr 2019 15:14:53 +0300 Subject: [PATCH] Add empty payload class to support deletes via apache spark --- .../uber/hoodie/EmptyHoodieRecordPayload.java | 48 +++++++++++++++++++ .../test/scala/DataSourceDefaultsTest.scala | 16 ++++++- 2 files changed, 63 insertions(+), 1 deletion(-) create mode 100644 hoodie-spark/src/main/java/com/uber/hoodie/EmptyHoodieRecordPayload.java diff --git a/hoodie-spark/src/main/java/com/uber/hoodie/EmptyHoodieRecordPayload.java b/hoodie-spark/src/main/java/com/uber/hoodie/EmptyHoodieRecordPayload.java new file mode 100644 index 000000000..85eb5ffb6 --- /dev/null +++ b/hoodie-spark/src/main/java/com/uber/hoodie/EmptyHoodieRecordPayload.java @@ -0,0 +1,48 @@ +/* + * Copyright (c) 2019 Uber Technologies, Inc. (hoodie-dev-group@uber.com) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * + */ + +package com.uber.hoodie; + +import com.uber.hoodie.common.model.HoodieRecordPayload; +import java.util.Optional; +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.generic.IndexedRecord; + +/** + * Empty payload used for deletions + */ +public class EmptyHoodieRecordPayload implements HoodieRecordPayload { + + public EmptyHoodieRecordPayload(GenericRecord record, Comparable orderingVal) { } + + @Override + public EmptyHoodieRecordPayload preCombine(EmptyHoodieRecordPayload another) { + return another; + } + + @Override + public Optional combineAndGetUpdateValue(IndexedRecord currentValue, Schema schema) { + return Optional.empty(); + } + + @Override + public Optional getInsertValue(Schema schema) { + return Optional.empty(); + } +} diff --git a/hoodie-spark/src/test/scala/DataSourceDefaultsTest.scala b/hoodie-spark/src/test/scala/DataSourceDefaultsTest.scala index 2abf50c0e..14960b11d 100644 --- a/hoodie-spark/src/test/scala/DataSourceDefaultsTest.scala +++ b/hoodie-spark/src/test/scala/DataSourceDefaultsTest.scala @@ -16,9 +16,11 @@ * */ +import java.util.Optional + import com.uber.hoodie.common.util.{SchemaTestUtil, TypedProperties} import com.uber.hoodie.exception.HoodieException -import com.uber.hoodie.{DataSourceWriteOptions, OverwriteWithLatestAvroPayload, SimpleKeyGenerator} +import com.uber.hoodie.{DataSourceWriteOptions, EmptyHoodieRecordPayload, OverwriteWithLatestAvroPayload, SimpleKeyGenerator} import org.apache.avro.generic.GenericRecord import org.junit.Assert._ import org.junit.{Before, Test} @@ -114,4 +116,16 @@ class DataSourceDefaultsTest extends AssertionsForJUnit { val combinedGR21 = combinedPayload21.getInsertValue(schema).get().asInstanceOf[GenericRecord] assertEquals("field2", combinedGR21.get("field1").toString) } + + @Test def testEmptyHoodieRecordPayload() = { + val emptyPayload1 = new EmptyHoodieRecordPayload(baseRecord, 1) + val laterRecord = SchemaTestUtil + .generateAvroRecordFromJson(schema, 2, "001", "f1") + val emptyPayload2 = new EmptyHoodieRecordPayload(laterRecord, 2) + + // it will provide an empty record + val combinedPayload12 = emptyPayload1.preCombine(emptyPayload2) + val combined12 = combinedPayload12.getInsertValue(schema) + assertEquals(Optional.empty(), combined12) + } }