diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/common/model/EventTimeAvroPayload.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/common/model/EventTimeAvroPayload.java new file mode 100644 index 000000000..7c8efb66e --- /dev/null +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/common/model/EventTimeAvroPayload.java @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.common.model; + +import org.apache.hudi.common.util.Option; + +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.generic.IndexedRecord; + +import java.io.IOException; +import java.util.Map; +import java.util.Properties; + +import static org.apache.hudi.avro.HoodieAvroUtils.bytesToAvro; + +/** + * The only difference with {@link DefaultHoodieRecordPayload} is that is does not + * track the event time metadata for efficiency. + */ +public class EventTimeAvroPayload extends DefaultHoodieRecordPayload { + + public EventTimeAvroPayload(GenericRecord record, Comparable orderingVal) { + super(record, orderingVal); + } + + public EventTimeAvroPayload(Option record) { + this(record.isPresent() ? record.get() : null, 0); // natural order + } + + @Override + public Option combineAndGetUpdateValue(IndexedRecord currentValue, Schema schema, Properties properties) throws IOException { + if (recordBytes.length == 0) { + return Option.empty(); + } + + GenericRecord incomingRecord = bytesToAvro(recordBytes, schema); + + // Null check is needed here to support schema evolution. The record in storage may be from old schema where + // the new ordering column might not be present and hence returns null. + if (!needUpdatingPersistedRecord(currentValue, incomingRecord, properties)) { + return Option.of(currentValue); + } + + /* + * Now check if the incoming record is a delete record. + */ + return isDeleteRecord(incomingRecord) ? Option.empty() : Option.of(incomingRecord); + } + + @Override + public Option getInsertValue(Schema schema, Properties properties) throws IOException { + if (recordBytes.length == 0) { + return Option.empty(); + } + GenericRecord incomingRecord = bytesToAvro(recordBytes, schema); + + return isDeleteRecord(incomingRecord) ? Option.empty() : Option.of(incomingRecord); + } + + @Override + public Option> getMetadata() { + return Option.empty(); + } +} diff --git a/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableFactory.java b/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableFactory.java index 2a1eda00f..40801ad39 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableFactory.java +++ b/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableFactory.java @@ -19,6 +19,7 @@ package org.apache.hudi.table; import org.apache.hudi.common.model.DefaultHoodieRecordPayload; +import org.apache.hudi.common.model.EventTimeAvroPayload; import org.apache.hudi.configuration.FlinkOptions; import org.apache.hudi.configuration.OptionsResolver; import org.apache.hudi.exception.HoodieValidationException; @@ -148,6 +149,11 @@ public class HoodieTableFactory implements DynamicTableSourceFactory, DynamicTab throw new HoodieValidationException("Field " + preCombineField + " does not exist in the table schema." + "Please check '" + FlinkOptions.PRECOMBINE_FIELD.key() + "' option."); } + } else if (FlinkOptions.isDefaultValueDefined(conf, FlinkOptions.PAYLOAD_CLASS_NAME)) { + // if precombine field is specified but payload clazz is default, + // use DefaultHoodieRecordPayload to make sure the precombine field is always taken for + // comparing. + conf.setString(FlinkOptions.PAYLOAD_CLASS_NAME, EventTimeAvroPayload.class.getName()); } } diff --git a/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieTableFactory.java b/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieTableFactory.java index 01acbd9f5..cbdffe360 100644 --- a/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieTableFactory.java +++ b/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieTableFactory.java @@ -19,6 +19,7 @@ package org.apache.hudi.table; import org.apache.hudi.common.model.DefaultHoodieRecordPayload; +import org.apache.hudi.common.model.EventTimeAvroPayload; import org.apache.hudi.configuration.FlinkOptions; import org.apache.hudi.exception.HoodieValidationException; import org.apache.hudi.hive.MultiPartKeysValueExtractor; @@ -120,6 +121,9 @@ public class TestHoodieTableFactory { // the precombine field is overwritten assertThat(tableSource.getConf().getString(FlinkOptions.PRECOMBINE_FIELD), is(FlinkOptions.NO_PRE_COMBINE)); assertThat(tableSink.getConf().getString(FlinkOptions.PRECOMBINE_FIELD), is(FlinkOptions.NO_PRE_COMBINE)); + // precombine field not specified, use the default payload clazz + assertThat(tableSource.getConf().getString(FlinkOptions.PAYLOAD_CLASS_NAME), is(FlinkOptions.PAYLOAD_CLASS_NAME.defaultValue())); + assertThat(tableSink.getConf().getString(FlinkOptions.PAYLOAD_CLASS_NAME), is(FlinkOptions.PAYLOAD_CLASS_NAME.defaultValue())); // given pk but miss the pre combine key with DefaultHoodieRecordPayload should throw this.conf.setString(FlinkOptions.PAYLOAD_CLASS_NAME, DefaultHoodieRecordPayload.class.getName()); @@ -141,6 +145,11 @@ public class TestHoodieTableFactory { assertDoesNotThrow(() -> new HoodieTableFactory().createDynamicTableSource(sourceContext5)); assertDoesNotThrow(() -> new HoodieTableFactory().createDynamicTableSink(sourceContext5)); + // precombine field specified(default ts), use DefaultHoodieRecordPayload as payload clazz + HoodieTableSource tableSource5 = (HoodieTableSource) new HoodieTableFactory().createDynamicTableSource(sourceContext5); + HoodieTableSink tableSink5 = (HoodieTableSink) new HoodieTableFactory().createDynamicTableSink(sourceContext5); + assertThat(tableSource5.getConf().getString(FlinkOptions.PAYLOAD_CLASS_NAME), is(EventTimeAvroPayload.class.getName())); + assertThat(tableSink5.getConf().getString(FlinkOptions.PAYLOAD_CLASS_NAME), is(EventTimeAvroPayload.class.getName())); } @Test diff --git a/packaging/hudi-flink-bundle/pom.xml b/packaging/hudi-flink-bundle/pom.xml index 8b7b2176a..868912eea 100644 --- a/packaging/hudi-flink-bundle/pom.xml +++ b/packaging/hudi-flink-bundle/pom.xml @@ -612,6 +612,10 @@ ${hbase.version} compile + + guava + com.google.guava + org.apache.hbase hbase-common