[HUDI-2684] Use DefaultHoodieRecordPayload when precombine field is specified specifically (#3922)
This commit is contained in:
@@ -0,0 +1,81 @@
|
|||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hudi.common.model;
|
||||||
|
|
||||||
|
import org.apache.hudi.common.util.Option;
|
||||||
|
|
||||||
|
import org.apache.avro.Schema;
|
||||||
|
import org.apache.avro.generic.GenericRecord;
|
||||||
|
import org.apache.avro.generic.IndexedRecord;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.util.Map;
|
||||||
|
import java.util.Properties;
|
||||||
|
|
||||||
|
import static org.apache.hudi.avro.HoodieAvroUtils.bytesToAvro;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* The only difference with {@link DefaultHoodieRecordPayload} is that is does not
|
||||||
|
* track the event time metadata for efficiency.
|
||||||
|
*/
|
||||||
|
public class EventTimeAvroPayload extends DefaultHoodieRecordPayload {
|
||||||
|
|
||||||
|
public EventTimeAvroPayload(GenericRecord record, Comparable orderingVal) {
|
||||||
|
super(record, orderingVal);
|
||||||
|
}
|
||||||
|
|
||||||
|
public EventTimeAvroPayload(Option<GenericRecord> record) {
|
||||||
|
this(record.isPresent() ? record.get() : null, 0); // natural order
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Option<IndexedRecord> combineAndGetUpdateValue(IndexedRecord currentValue, Schema schema, Properties properties) throws IOException {
|
||||||
|
if (recordBytes.length == 0) {
|
||||||
|
return Option.empty();
|
||||||
|
}
|
||||||
|
|
||||||
|
GenericRecord incomingRecord = bytesToAvro(recordBytes, schema);
|
||||||
|
|
||||||
|
// Null check is needed here to support schema evolution. The record in storage may be from old schema where
|
||||||
|
// the new ordering column might not be present and hence returns null.
|
||||||
|
if (!needUpdatingPersistedRecord(currentValue, incomingRecord, properties)) {
|
||||||
|
return Option.of(currentValue);
|
||||||
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Now check if the incoming record is a delete record.
|
||||||
|
*/
|
||||||
|
return isDeleteRecord(incomingRecord) ? Option.empty() : Option.of(incomingRecord);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Option<IndexedRecord> getInsertValue(Schema schema, Properties properties) throws IOException {
|
||||||
|
if (recordBytes.length == 0) {
|
||||||
|
return Option.empty();
|
||||||
|
}
|
||||||
|
GenericRecord incomingRecord = bytesToAvro(recordBytes, schema);
|
||||||
|
|
||||||
|
return isDeleteRecord(incomingRecord) ? Option.empty() : Option.of(incomingRecord);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Option<Map<String, String>> getMetadata() {
|
||||||
|
return Option.empty();
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -19,6 +19,7 @@
|
|||||||
package org.apache.hudi.table;
|
package org.apache.hudi.table;
|
||||||
|
|
||||||
import org.apache.hudi.common.model.DefaultHoodieRecordPayload;
|
import org.apache.hudi.common.model.DefaultHoodieRecordPayload;
|
||||||
|
import org.apache.hudi.common.model.EventTimeAvroPayload;
|
||||||
import org.apache.hudi.configuration.FlinkOptions;
|
import org.apache.hudi.configuration.FlinkOptions;
|
||||||
import org.apache.hudi.configuration.OptionsResolver;
|
import org.apache.hudi.configuration.OptionsResolver;
|
||||||
import org.apache.hudi.exception.HoodieValidationException;
|
import org.apache.hudi.exception.HoodieValidationException;
|
||||||
@@ -148,6 +149,11 @@ public class HoodieTableFactory implements DynamicTableSourceFactory, DynamicTab
|
|||||||
throw new HoodieValidationException("Field " + preCombineField + " does not exist in the table schema."
|
throw new HoodieValidationException("Field " + preCombineField + " does not exist in the table schema."
|
||||||
+ "Please check '" + FlinkOptions.PRECOMBINE_FIELD.key() + "' option.");
|
+ "Please check '" + FlinkOptions.PRECOMBINE_FIELD.key() + "' option.");
|
||||||
}
|
}
|
||||||
|
} else if (FlinkOptions.isDefaultValueDefined(conf, FlinkOptions.PAYLOAD_CLASS_NAME)) {
|
||||||
|
// if precombine field is specified but payload clazz is default,
|
||||||
|
// use DefaultHoodieRecordPayload to make sure the precombine field is always taken for
|
||||||
|
// comparing.
|
||||||
|
conf.setString(FlinkOptions.PAYLOAD_CLASS_NAME, EventTimeAvroPayload.class.getName());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -19,6 +19,7 @@
|
|||||||
package org.apache.hudi.table;
|
package org.apache.hudi.table;
|
||||||
|
|
||||||
import org.apache.hudi.common.model.DefaultHoodieRecordPayload;
|
import org.apache.hudi.common.model.DefaultHoodieRecordPayload;
|
||||||
|
import org.apache.hudi.common.model.EventTimeAvroPayload;
|
||||||
import org.apache.hudi.configuration.FlinkOptions;
|
import org.apache.hudi.configuration.FlinkOptions;
|
||||||
import org.apache.hudi.exception.HoodieValidationException;
|
import org.apache.hudi.exception.HoodieValidationException;
|
||||||
import org.apache.hudi.hive.MultiPartKeysValueExtractor;
|
import org.apache.hudi.hive.MultiPartKeysValueExtractor;
|
||||||
@@ -120,6 +121,9 @@ public class TestHoodieTableFactory {
|
|||||||
// the precombine field is overwritten
|
// the precombine field is overwritten
|
||||||
assertThat(tableSource.getConf().getString(FlinkOptions.PRECOMBINE_FIELD), is(FlinkOptions.NO_PRE_COMBINE));
|
assertThat(tableSource.getConf().getString(FlinkOptions.PRECOMBINE_FIELD), is(FlinkOptions.NO_PRE_COMBINE));
|
||||||
assertThat(tableSink.getConf().getString(FlinkOptions.PRECOMBINE_FIELD), is(FlinkOptions.NO_PRE_COMBINE));
|
assertThat(tableSink.getConf().getString(FlinkOptions.PRECOMBINE_FIELD), is(FlinkOptions.NO_PRE_COMBINE));
|
||||||
|
// precombine field not specified, use the default payload clazz
|
||||||
|
assertThat(tableSource.getConf().getString(FlinkOptions.PAYLOAD_CLASS_NAME), is(FlinkOptions.PAYLOAD_CLASS_NAME.defaultValue()));
|
||||||
|
assertThat(tableSink.getConf().getString(FlinkOptions.PAYLOAD_CLASS_NAME), is(FlinkOptions.PAYLOAD_CLASS_NAME.defaultValue()));
|
||||||
|
|
||||||
// given pk but miss the pre combine key with DefaultHoodieRecordPayload should throw
|
// given pk but miss the pre combine key with DefaultHoodieRecordPayload should throw
|
||||||
this.conf.setString(FlinkOptions.PAYLOAD_CLASS_NAME, DefaultHoodieRecordPayload.class.getName());
|
this.conf.setString(FlinkOptions.PAYLOAD_CLASS_NAME, DefaultHoodieRecordPayload.class.getName());
|
||||||
@@ -141,6 +145,11 @@ public class TestHoodieTableFactory {
|
|||||||
|
|
||||||
assertDoesNotThrow(() -> new HoodieTableFactory().createDynamicTableSource(sourceContext5));
|
assertDoesNotThrow(() -> new HoodieTableFactory().createDynamicTableSource(sourceContext5));
|
||||||
assertDoesNotThrow(() -> new HoodieTableFactory().createDynamicTableSink(sourceContext5));
|
assertDoesNotThrow(() -> new HoodieTableFactory().createDynamicTableSink(sourceContext5));
|
||||||
|
// precombine field specified(default ts), use DefaultHoodieRecordPayload as payload clazz
|
||||||
|
HoodieTableSource tableSource5 = (HoodieTableSource) new HoodieTableFactory().createDynamicTableSource(sourceContext5);
|
||||||
|
HoodieTableSink tableSink5 = (HoodieTableSink) new HoodieTableFactory().createDynamicTableSink(sourceContext5);
|
||||||
|
assertThat(tableSource5.getConf().getString(FlinkOptions.PAYLOAD_CLASS_NAME), is(EventTimeAvroPayload.class.getName()));
|
||||||
|
assertThat(tableSink5.getConf().getString(FlinkOptions.PAYLOAD_CLASS_NAME), is(EventTimeAvroPayload.class.getName()));
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
|||||||
@@ -612,6 +612,10 @@
|
|||||||
<version>${hbase.version}</version>
|
<version>${hbase.version}</version>
|
||||||
<scope>compile</scope>
|
<scope>compile</scope>
|
||||||
<exclusions>
|
<exclusions>
|
||||||
|
<exclusion>
|
||||||
|
<artifactId>guava</artifactId>
|
||||||
|
<groupId>com.google.guava</groupId>
|
||||||
|
</exclusion>
|
||||||
<exclusion>
|
<exclusion>
|
||||||
<groupId>org.apache.hbase</groupId>
|
<groupId>org.apache.hbase</groupId>
|
||||||
<artifactId>hbase-common</artifactId>
|
<artifactId>hbase-common</artifactId>
|
||||||
|
|||||||
Reference in New Issue
Block a user