diff --git a/hudi-common/src/main/java/org/apache/hudi/common/model/OverwriteNonDefaultsWithLatestAvroPayload.java b/hudi-common/src/main/java/org/apache/hudi/common/model/OverwriteNonDefaultsWithLatestAvroPayload.java new file mode 100644 index 000000000..2f7ae2e76 --- /dev/null +++ b/hudi-common/src/main/java/org/apache/hudi/common/model/OverwriteNonDefaultsWithLatestAvroPayload.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.common.model; + +import org.apache.hudi.common.util.Option; + +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.generic.IndexedRecord; + +import java.io.IOException; +import java.util.List; + +/** + * subclass of OverwriteWithLatestAvroPayload used for delta streamer. + *

+ * 1. preCombine - Picks the latest delta record for a key, based on an ordering field. + * 2. combineAndGetUpdateValue/getInsertValue - overwrite storage for specified fields + * that doesn't equal defaultValue. + */ +public class OverwriteNonDefaultsWithLatestAvroPayload extends OverwriteWithLatestAvroPayload { + + public OverwriteNonDefaultsWithLatestAvroPayload(GenericRecord record, Comparable orderingVal) { + super(record, orderingVal); + } + + public OverwriteNonDefaultsWithLatestAvroPayload(Option record) { + super(record); // natural order + } + + @Override + public Option combineAndGetUpdateValue(IndexedRecord currentValue, Schema schema) throws IOException { + + Option recordOption = getInsertValue(schema); + if (!recordOption.isPresent()) { + return Option.empty(); + } + + GenericRecord insertRecord = (GenericRecord) recordOption.get(); + GenericRecord currentRecord = (GenericRecord) currentValue; + + if (isDeleteRecord(insertRecord)) { + return Option.empty(); + } else { + List fields = schema.getFields(); + fields.forEach(field -> { + Object value = insertRecord.get(field.name()); + Object defaultValue = field.defaultVal(); + if (!overwriteField(value, defaultValue)) { + currentRecord.put(field.name(), value); + } + }); + return Option.of(currentRecord); + } + } +} diff --git a/hudi-common/src/main/java/org/apache/hudi/common/model/OverwriteWithLatestAvroPayload.java b/hudi-common/src/main/java/org/apache/hudi/common/model/OverwriteWithLatestAvroPayload.java index d8dffdf1e..845967c00 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/model/OverwriteWithLatestAvroPayload.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/model/OverwriteWithLatestAvroPayload.java @@ -79,8 +79,15 @@ public class OverwriteWithLatestAvroPayload extends BaseAvroPayload * @param genericRecord instance of {@link GenericRecord} of interest. * @returns {@code true} if record represents a delete record. {@code false} otherwise. */ - private boolean isDeleteRecord(GenericRecord genericRecord) { + protected boolean isDeleteRecord(GenericRecord genericRecord) { Object deleteMarker = genericRecord.get("_hoodie_is_deleted"); return (deleteMarker instanceof Boolean && (boolean) deleteMarker); } + + /** + * Return true if value equals defaultValue otherwise false. + */ + public Boolean overwriteField(Object value, Object defaultValue) { + return defaultValue == null ? value == null : defaultValue.toString().equals(value.toString()); + } } diff --git a/hudi-common/src/test/java/org/apache/hudi/common/model/TestOverwriteNonDefaultsWithLatestAvroPayload.java b/hudi-common/src/test/java/org/apache/hudi/common/model/TestOverwriteNonDefaultsWithLatestAvroPayload.java new file mode 100644 index 000000000..cce492a5e --- /dev/null +++ b/hudi-common/src/test/java/org/apache/hudi/common/model/TestOverwriteNonDefaultsWithLatestAvroPayload.java @@ -0,0 +1,121 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.common.model; + +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericData; +import org.apache.avro.generic.GenericRecord; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.io.IOException; +import java.util.Arrays; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; + +/** + * Unit tests {@link TestOverwriteNonDefaultsWithLatestAvroPayload}. + */ +public class TestOverwriteNonDefaultsWithLatestAvroPayload { + private Schema schema; + + @BeforeEach + public void setUp() throws Exception { + schema = Schema.createRecord(Arrays.asList( + new Schema.Field("id", Schema.create(Schema.Type.STRING), "", null), + new Schema.Field("partition", Schema.create(Schema.Type.STRING), "", ""), + new Schema.Field("ts", Schema.create(Schema.Type.LONG), "", null), + new Schema.Field("_hoodie_is_deleted", Schema.create(Schema.Type.BOOLEAN), "", false), + new Schema.Field("city", Schema.create(Schema.Type.STRING), "", "NY") + )); + } + + @Test + public void testActiveRecords() throws IOException { + GenericRecord record1 = new GenericData.Record(schema); + record1.put("id", "1"); + record1.put("partition", "partition1"); + record1.put("ts", 0L); + record1.put("_hoodie_is_deleted", false); + record1.put("city", "NY0"); + + GenericRecord record2 = new GenericData.Record(schema); + record2.put("id", "2"); + record2.put("partition", ""); + record2.put("ts", 1L); + record2.put("_hoodie_is_deleted", false); + record2.put("city", "NY"); + + GenericRecord record3 = new GenericData.Record(schema); + record3.put("id", "2"); + record3.put("partition", "partition1"); + record3.put("ts", 1L); + record3.put("_hoodie_is_deleted", false); + record3.put("city", "NY0"); + + + OverwriteNonDefaultsWithLatestAvroPayload payload1 = new OverwriteNonDefaultsWithLatestAvroPayload(record1, 1); + OverwriteNonDefaultsWithLatestAvroPayload payload2 = new OverwriteNonDefaultsWithLatestAvroPayload(record2, 2); + assertEquals(payload1.preCombine(payload2), payload2); + assertEquals(payload2.preCombine(payload1), payload2); + + assertEquals(record1, payload1.getInsertValue(schema).get()); + assertEquals(record2, payload2.getInsertValue(schema).get()); + + assertEquals(payload1.combineAndGetUpdateValue(record2, schema).get(), record1); + assertEquals(payload2.combineAndGetUpdateValue(record1, schema).get(), record3); + } + + @Test + public void testDeletedRecord() throws IOException { + GenericRecord record1 = new GenericData.Record(schema); + record1.put("id", "1"); + record1.put("partition", "partition0"); + record1.put("ts", 0L); + record1.put("_hoodie_is_deleted", false); + record1.put("city", "NY0"); + + GenericRecord delRecord1 = new GenericData.Record(schema); + delRecord1.put("id", "2"); + delRecord1.put("partition", "partition1"); + delRecord1.put("ts", 1L); + delRecord1.put("_hoodie_is_deleted", true); + delRecord1.put("city", "NY0"); + + GenericRecord record2 = new GenericData.Record(schema); + record2.put("id", "1"); + record2.put("partition", "partition0"); + record2.put("ts", 0L); + record2.put("_hoodie_is_deleted", true); + record2.put("city", "NY0"); + + OverwriteNonDefaultsWithLatestAvroPayload payload1 = new OverwriteNonDefaultsWithLatestAvroPayload(record1, 1); + OverwriteNonDefaultsWithLatestAvroPayload payload2 = new OverwriteNonDefaultsWithLatestAvroPayload(delRecord1, 2); + + assertEquals(payload1.preCombine(payload2), payload2); + assertEquals(payload2.preCombine(payload1), payload2); + + assertEquals(record1, payload1.getInsertValue(schema).get()); + assertFalse(payload2.getInsertValue(schema).isPresent()); + + assertEquals(payload1.combineAndGetUpdateValue(delRecord1, schema).get(), record2); + assertFalse(payload2.combineAndGetUpdateValue(record1, schema).isPresent()); + } +}