diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodiePayloadConfig.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodiePayloadConfig.java new file mode 100644 index 000000000..442bd02c6 --- /dev/null +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodiePayloadConfig.java @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.config; + +import org.apache.hudi.common.config.DefaultHoodieConfig; +import org.apache.hudi.config.HoodieMemoryConfig.Builder; + +import java.io.File; +import java.io.FileReader; +import java.io.IOException; +import java.util.Properties; + +import static org.apache.hudi.common.model.HoodiePayloadProps.DEFAULT_PAYLOAD_ORDERING_FIELD_VAL; +import static org.apache.hudi.common.model.HoodiePayloadProps.PAYLOAD_ORDERING_FIELD_PROP; + +/** + * Hoodie payload related configs. + */ +public class HoodiePayloadConfig extends DefaultHoodieConfig { + + public HoodiePayloadConfig(Properties props) { + super(props); + } + + public static HoodiePayloadConfig.Builder newBuilder() { + return new HoodiePayloadConfig.Builder(); + } + + public static class Builder { + + private final Properties props = new Properties(); + + public Builder fromFile(File propertiesFile) throws IOException { + try (FileReader reader = new FileReader(propertiesFile)) { + this.props.load(reader); + return this; + } + } + + public Builder fromProperties(Properties props) { + this.props.putAll(props); + return this; + } + + public Builder withPayloadOrderingField(String payloadOrderingField) { + props.setProperty(PAYLOAD_ORDERING_FIELD_PROP, String.valueOf(payloadOrderingField)); + return this; + } + + public HoodiePayloadConfig build() { + HoodiePayloadConfig config = new HoodiePayloadConfig(props); + setDefaultOnCondition(props, !props.containsKey(PAYLOAD_ORDERING_FIELD_PROP), DEFAULT_PAYLOAD_ORDERING_FIELD_VAL, + String.valueOf(DEFAULT_PAYLOAD_ORDERING_FIELD_VAL)); + return config; + } + } + +} diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java index 8c22cab30..e5baaf6ec 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java @@ -145,6 +145,7 @@ public class HoodieWriteConfig extends DefaultHoodieConfig { // We keep track of original config and rewritten config private final FileSystemViewStorageConfig clientSpecifiedViewStorageConfig; private FileSystemViewStorageConfig viewStorageConfig; + private HoodiePayloadConfig hoodiePayloadConfig; private EngineType engineType; @@ -163,6 +164,7 @@ public class HoodieWriteConfig extends DefaultHoodieConfig { this.consistencyGuardConfig = ConsistencyGuardConfig.newBuilder().fromProperties(newProps).build(); this.clientSpecifiedViewStorageConfig = FileSystemViewStorageConfig.newBuilder().fromProperties(newProps).build(); this.viewStorageConfig = clientSpecifiedViewStorageConfig; + this.hoodiePayloadConfig = HoodiePayloadConfig.newBuilder().fromProperties(newProps).build(); } public static HoodieWriteConfig.Builder newBuilder() { @@ -744,6 +746,10 @@ public class HoodieWriteConfig extends DefaultHoodieConfig { return clientSpecifiedViewStorageConfig; } + public HoodiePayloadConfig getPayloadConfig() { + return hoodiePayloadConfig; + } + /** * Commit call back configs. */ @@ -804,6 +810,7 @@ public class HoodieWriteConfig extends DefaultHoodieConfig { private boolean isViewConfigSet = false; private boolean isConsistencyGuardSet = false; private boolean isCallbackConfigSet = false; + private boolean isPayloadConfigSet = false; public Builder withEngineType(EngineType engineType) { this.engineType = engineType; @@ -944,6 +951,12 @@ public class HoodieWriteConfig extends DefaultHoodieConfig { return this; } + public Builder withPayloadConfig(HoodiePayloadConfig payloadConfig) { + props.putAll(payloadConfig.getProps()); + isPayloadConfigSet = true; + return this; + } + public Builder withAutoCommit(boolean autoCommit) { props.setProperty(HOODIE_AUTO_COMMIT_PROP, String.valueOf(autoCommit)); return this; @@ -1084,6 +1097,8 @@ public class HoodieWriteConfig extends DefaultHoodieConfig { ConsistencyGuardConfig.newBuilder().fromProperties(props).build()); setDefaultOnCondition(props, !isCallbackConfigSet, HoodieWriteCommitCallbackConfig.newBuilder().fromProperties(props).build()); + setDefaultOnCondition(props, !isPayloadConfigSet, + HoodiePayloadConfig.newBuilder().fromProperties(props).build()); setDefaultOnCondition(props, !props.containsKey(EXTERNAL_RECORD_AND_SCHEMA_TRANSFORMATION), EXTERNAL_RECORD_AND_SCHEMA_TRANSFORMATION, DEFAULT_EXTERNAL_RECORD_AND_SCHEMA_TRANSFORMATION); diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java index cab7283f4..1b98de439 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java @@ -225,7 +225,8 @@ public class HoodieMergeHandle extends H HoodieRecord hoodieRecord = new HoodieRecord<>(keyToNewRecords.get(key)); try { Option combinedAvroRecord = - hoodieRecord.getData().combineAndGetUpdateValue(oldRecord, useWriterSchema ? writerSchemaWithMetafields : writerSchema); + hoodieRecord.getData().combineAndGetUpdateValue(oldRecord, useWriterSchema ? writerSchemaWithMetafields : writerSchema, + config.getPayloadConfig().getProps()); if (writeUpdateRecord(hoodieRecord, combinedAvroRecord)) { /* * ONLY WHEN 1) we have an update for this key AND 2) We are able to successfully write the the combined new diff --git a/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java b/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java index 3b356a792..b1dcff90b 100644 --- a/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java +++ b/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java @@ -442,7 +442,7 @@ public class HoodieAvroUtils { * @param fieldValue avro field value * @return field value either converted (for certain data types) or as it is. */ - private static Object convertValueForSpecificDataTypes(Schema fieldSchema, Object fieldValue) { + public static Object convertValueForSpecificDataTypes(Schema fieldSchema, Object fieldValue) { if (fieldSchema == null) { return fieldValue; } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/model/DefaultHoodieRecordPayload.java b/hudi-common/src/main/java/org/apache/hudi/common/model/DefaultHoodieRecordPayload.java new file mode 100644 index 000000000..8fc75a1e7 --- /dev/null +++ b/hudi-common/src/main/java/org/apache/hudi/common/model/DefaultHoodieRecordPayload.java @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.common.model; + +import org.apache.hudi.common.util.Option; + +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.generic.IndexedRecord; + +import java.io.IOException; +import java.util.Properties; + +import static org.apache.hudi.avro.HoodieAvroUtils.bytesToAvro; +import static org.apache.hudi.avro.HoodieAvroUtils.getNestedFieldVal; + +/** + * {@link HoodieRecordPayload} impl that honors ordering field in both preCombine and combineAndGetUpdateValue. + *

+ * 1. preCombine - Picks the latest delta record for a key, based on an ordering field 2. combineAndGetUpdateValue/getInsertValue - Chooses the latest record based on ordering field value. + */ +public class DefaultHoodieRecordPayload extends OverwriteWithLatestAvroPayload { + + public DefaultHoodieRecordPayload(GenericRecord record, Comparable orderingVal) { + super(record, orderingVal); + } + + public DefaultHoodieRecordPayload(Option record) { + this(record.isPresent() ? record.get() : null, (record1) -> 0); // natural order + } + + @Override + public Option combineAndGetUpdateValue(IndexedRecord currentValue, Schema schema, Properties properties) throws IOException { + if (recordBytes.length == 0) { + return Option.empty(); + } + GenericRecord incomingRecord = bytesToAvro(recordBytes, schema); + /* + * Combining strategy here returns currentValue on disk if incoming record is older. + * The incoming record can be either a delete (sent as an upsert with _hoodie_is_deleted set to true) + * or an insert/update record. In any case, if it is older than the record in disk, the currentValue + * in disk is returned (to be rewritten with new commit time). + * + * NOTE: Deletes sent via EmptyHoodieRecordPayload and/or Delete operation type do not hit this code path + * and need to be dealt with separately. + */ + Object persistedOrderingVal = getNestedFieldVal((GenericRecord) currentValue, properties.getProperty(HoodiePayloadProps.PAYLOAD_ORDERING_FIELD_PROP), true); + Comparable incomingOrderingVal = (Comparable) getNestedFieldVal(incomingRecord, properties.getProperty(HoodiePayloadProps.PAYLOAD_ORDERING_FIELD_PROP), false); + + // Null check is needed here to support schema evolution. The record in storage may be from old schema where + // the new ordering column might not be present and hence returns null. + if (persistedOrderingVal != null && ((Comparable) persistedOrderingVal).compareTo(incomingOrderingVal) > 0) { + return Option.of(currentValue); + } + + /* + * We reached a point where the value is disk is older than the incoming record. + * Now check if the incoming record is a delete record. + */ + if (isDeleteRecord(incomingRecord)) { + return Option.empty(); + } else { + return Option.of(incomingRecord); + } + } +} diff --git a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodiePayloadProps.java b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodiePayloadProps.java new file mode 100644 index 000000000..5d71ec3cb --- /dev/null +++ b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodiePayloadProps.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.common.model; + +/** + * Holds payload properties that implementation of {@link HoodieRecordPayload} can leverage. + * Since both payload classes and HoodiePayloadConfig needs to access these props, storing it here in hudi-common. + */ +public class HoodiePayloadProps { + + // payload ordering field. This could be used to merge incoming record with that in storage. Implementations of + // {@link HoodieRecordPayload} can leverage if required. + public static final String PAYLOAD_ORDERING_FIELD_PROP = "hoodie.payload.ordering.field"; + public static String DEFAULT_PAYLOAD_ORDERING_FIELD_VAL = "ts"; + +} diff --git a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecordPayload.java b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecordPayload.java index 1afdd1b59..53fcca16d 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecordPayload.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecordPayload.java @@ -29,47 +29,84 @@ import org.apache.avro.generic.IndexedRecord; import java.io.IOException; import java.io.Serializable; import java.util.Map; +import java.util.Properties; /** - * Every Hoodie table has an implementation of the HoodieRecordPayload This abstracts out callbacks which - * depend on record specific logic. + * Every Hoodie table has an implementation of the HoodieRecordPayload This abstracts out callbacks which depend on record specific logic. */ @PublicAPIClass(maturity = ApiMaturityLevel.STABLE) public interface HoodieRecordPayload extends Serializable { /** - * When more than one HoodieRecord have the same HoodieKey, this function combines them before attempting to - * insert/upsert (if combining turned on in HoodieClientConfig). + * This method is deprecated. Please use this {@link #preCombine(HoodieRecordPayload, Properties)} method. */ - @PublicAPIMethod(maturity = ApiMaturityLevel.STABLE) + @Deprecated + @PublicAPIMethod(maturity = ApiMaturityLevel.DEPRECATED) T preCombine(T another); /** - * This methods lets you write custom merging/combining logic to produce new values as a function of current value on - * storage and whats contained in this object. - *

- * eg: 1) You are updating counters, you may want to add counts to currentValue and write back updated counts 2) You - * may be reading DB redo logs, and merge them with current image for a database row on storage - * - * @param currentValue Current value in storage, to merge/combine this payload with - * @param schema Schema used for record - * @return new combined/merged value to be written back to storage. EMPTY to skip writing this record. + * When more than one HoodieRecord have the same HoodieKey, this function combines them before attempting to insert/upsert by taking in a property map. + * Implementation can leverage the property to decide their business logic to do preCombine. + * @param another instance of another {@link HoodieRecordPayload} to be combined with. + * @param properties Payload related properties. For example pass the ordering field(s) name to extract from value in storage. + * @return the combined value */ @PublicAPIMethod(maturity = ApiMaturityLevel.STABLE) + default T preCombine(T another, Properties properties) { + return preCombine(another); + } + + /** + * This methods is deprecated. Please refer to {@link #combineAndGetUpdateValue(IndexedRecord, Schema, Properties)} for java docs. + */ + @Deprecated + @PublicAPIMethod(maturity = ApiMaturityLevel.DEPRECATED) Option combineAndGetUpdateValue(IndexedRecord currentValue, Schema schema) throws IOException; /** - * Generates an avro record out of the given HoodieRecordPayload, to be written out to storage. Called when writing a - * new value for the given HoodieKey, wherein there is no existing record in storage to be combined against. (i.e - * insert) Return EMPTY to skip writing this record. + * This methods lets you write custom merging/combining logic to produce new values as a function of current value on storage and whats contained + * in this object. Implementations can leverage properties if required. + *

+ * eg: + * 1) You are updating counters, you may want to add counts to currentValue and write back updated counts + * 2) You may be reading DB redo logs, and merge them with current image for a database row on storage + *

+ * + * @param currentValue Current value in storage, to merge/combine this payload with + * @param schema Schema used for record + * @param properties Payload related properties. For example pass the ordering field(s) name to extract from value in storage. + * @return new combined/merged value to be written back to storage. EMPTY to skip writing this record. */ - @PublicAPIMethod(maturity = ApiMaturityLevel.STABLE) + default Option combineAndGetUpdateValue(IndexedRecord currentValue, Schema schema, Properties properties) throws IOException { + return combineAndGetUpdateValue(currentValue, schema); + } + + /** + * This method is deprecated. Refer to {@link #getInsertValue(Schema, Properties)} for java docs. + * @param schema Schema used for record + * @return the {@link IndexedRecord} to be inserted. + */ + @Deprecated + @PublicAPIMethod(maturity = ApiMaturityLevel.DEPRECATED) Option getInsertValue(Schema schema) throws IOException; /** - * This method can be used to extract some metadata from HoodieRecordPayload. The metadata is passed to - * {@code WriteStatus.markSuccess()} and {@code WriteStatus.markFailure()} in order to compute some aggregate metrics - * using the metadata in the context of a write success or failure. + * Generates an avro record out of the given HoodieRecordPayload, to be written out to storage. Called when writing a new value for the given + * HoodieKey, wherein there is no existing record in storage to be combined against. (i.e insert) Return EMPTY to skip writing this record. + * Implementations can leverage properties if required. + * @param schema Schema used for record + * @param properties Payload related properties. For example pass the ordering field(s) name to extract from value in storage. + * @return the {@link IndexedRecord} to be inserted. + */ + @PublicAPIMethod(maturity = ApiMaturityLevel.STABLE) + default Option getInsertValue(Schema schema, Properties properties) throws IOException { + return getInsertValue(schema); + } + + /** + * This method can be used to extract some metadata from HoodieRecordPayload. The metadata is passed to {@code WriteStatus.markSuccess()} and + * {@code WriteStatus.markFailure()} in order to compute some aggregate metrics using the metadata in the context of a write success or failure. + * @return the metadata in the form of Map if any. */ @PublicAPIMethod(maturity = ApiMaturityLevel.STABLE) default Option> getMetadata() { diff --git a/hudi-common/src/test/java/org/apache/hudi/common/model/TestDefaultHoodieRecordPayload.java b/hudi-common/src/test/java/org/apache/hudi/common/model/TestDefaultHoodieRecordPayload.java new file mode 100644 index 000000000..791415428 --- /dev/null +++ b/hudi-common/src/test/java/org/apache/hudi/common/model/TestDefaultHoodieRecordPayload.java @@ -0,0 +1,107 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.common.model; + +import org.apache.avro.Schema; +import org.apache.avro.Schema.Type; +import org.apache.avro.generic.GenericData; +import org.apache.avro.generic.GenericRecord; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.io.IOException; +import java.util.Arrays; +import java.util.Properties; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; + +/** + * Unit tests {@link DefaultHoodieRecordPayload}. + */ +public class TestDefaultHoodieRecordPayload { + + private Schema schema; + private Properties props; + + @BeforeEach + public void setUp() throws Exception { + schema = Schema.createRecord(Arrays.asList( + new Schema.Field("id", Schema.create(Schema.Type.STRING), "", null), + new Schema.Field("partition", Schema.create(Schema.Type.STRING), "", null), + new Schema.Field("ts", Schema.create(Schema.Type.LONG), "", null), + new Schema.Field("_hoodie_is_deleted", Schema.create(Type.BOOLEAN), "", false) + )); + props = new Properties(); + props.setProperty(HoodiePayloadProps.PAYLOAD_ORDERING_FIELD_PROP, "ts"); + } + + @Test + public void testActiveRecords() throws IOException { + GenericRecord record1 = new GenericData.Record(schema); + record1.put("id", "1"); + record1.put("partition", "partition0"); + record1.put("ts", 0L); + record1.put("_hoodie_is_deleted", false); + + GenericRecord record2 = new GenericData.Record(schema); + record2.put("id", "2"); + record2.put("partition", "partition1"); + record2.put("ts", 1L); + record2.put("_hoodie_is_deleted", false); + + DefaultHoodieRecordPayload payload1 = new DefaultHoodieRecordPayload(record1, 1); + DefaultHoodieRecordPayload payload2 = new DefaultHoodieRecordPayload(record2, 2); + assertEquals(payload1.preCombine(payload2, props), payload2); + assertEquals(payload2.preCombine(payload1, props), payload2); + + assertEquals(record1, payload1.getInsertValue(schema).get()); + assertEquals(record2, payload2.getInsertValue(schema).get()); + + assertEquals(payload1.combineAndGetUpdateValue(record2, schema, props).get(), record2); + assertEquals(payload2.combineAndGetUpdateValue(record1, schema, props).get(), record2); + } + + @Test + public void testDeletedRecord() throws IOException { + GenericRecord record1 = new GenericData.Record(schema); + record1.put("id", "1"); + record1.put("partition", "partition0"); + record1.put("ts", 0L); + record1.put("_hoodie_is_deleted", false); + + GenericRecord delRecord1 = new GenericData.Record(schema); + delRecord1.put("id", "2"); + delRecord1.put("partition", "partition1"); + delRecord1.put("ts", 1L); + delRecord1.put("_hoodie_is_deleted", true); + + DefaultHoodieRecordPayload payload1 = new DefaultHoodieRecordPayload(record1, 1); + DefaultHoodieRecordPayload payload2 = new DefaultHoodieRecordPayload(delRecord1, 2); + assertEquals(payload1.preCombine(payload2, props), payload2); + assertEquals(payload2.preCombine(payload1, props), payload2); + + assertEquals(record1, payload1.getInsertValue(schema).get()); + assertFalse(payload2.getInsertValue(schema).isPresent()); + + assertEquals(payload1.combineAndGetUpdateValue(delRecord1, schema, props).get(), delRecord1); + assertFalse(payload2.combineAndGetUpdateValue(record1, schema, props).isPresent()); + } + +} diff --git a/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java b/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java index f9dacae1e..2905a88af 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java +++ b/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java @@ -26,6 +26,7 @@ import org.apache.hudi.client.common.EngineType; import org.apache.hudi.common.model.HoodieRecordPayload; import org.apache.hudi.common.util.ReflectionUtils; import org.apache.hudi.config.HoodieCompactionConfig; +import org.apache.hudi.config.HoodiePayloadConfig; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.exception.HoodieIOException; import org.apache.hudi.exception.HoodieNotSupportedException; @@ -135,6 +136,8 @@ public class StreamerUtil { HoodieWriteConfig.Builder builder = HoodieWriteConfig.newBuilder().withEngineType(EngineType.FLINK).withPath(cfg.targetBasePath).combineInput(cfg.filterDupes, true) .withCompactionConfig(HoodieCompactionConfig.newBuilder().withPayloadClass(cfg.payloadClassName).build()) + .withPayloadConfig(HoodiePayloadConfig.newBuilder().withPayloadOrderingField(cfg.sourceOrderingField) + .build()) .forTable(cfg.targetTableName) .withAutoCommit(false) .withProps(readConfig(fs, new Path(cfg.propsFilePath), cfg.configs) diff --git a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/HoodieTestSuiteWriter.java b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/HoodieTestSuiteWriter.java index bf6fca70e..a06c281ef 100644 --- a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/HoodieTestSuiteWriter.java +++ b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/HoodieTestSuiteWriter.java @@ -19,6 +19,7 @@ package org.apache.hudi.integ.testsuite; import org.apache.hadoop.conf.Configuration; + import org.apache.hudi.avro.model.HoodieCompactionPlan; import org.apache.hudi.client.HoodieReadClient; import org.apache.hudi.client.SparkRDDWriteClient; @@ -31,6 +32,7 @@ import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.collection.Pair; import org.apache.hudi.config.HoodieCompactionConfig; import org.apache.hudi.config.HoodieIndexConfig; +import org.apache.hudi.config.HoodiePayloadConfig; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.index.HoodieIndex; import org.apache.hudi.integ.testsuite.HoodieTestSuiteJob.HoodieTestSuiteConfig; @@ -98,6 +100,8 @@ public class HoodieTestSuiteWriter { HoodieWriteConfig.newBuilder().combineInput(true, true).withPath(cfg.targetBasePath) .withAutoCommit(false) .withCompactionConfig(HoodieCompactionConfig.newBuilder().withPayloadClass(cfg.payloadClassName).build()) + .withPayloadConfig(HoodiePayloadConfig.newBuilder().withPayloadOrderingField(cfg.sourceOrderingField) + .build()) .forTable(cfg.targetTableName) .withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.BLOOM).build()) .withProps(props); diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/DataSourceUtils.java b/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/DataSourceUtils.java index 1cb63c98a..8d3e81b0b 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/DataSourceUtils.java +++ b/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/DataSourceUtils.java @@ -39,13 +39,14 @@ import org.apache.hudi.common.util.StringUtils; import org.apache.hudi.common.util.TablePathUtils; import org.apache.hudi.config.HoodieCompactionConfig; import org.apache.hudi.config.HoodieIndexConfig; +import org.apache.hudi.config.HoodiePayloadConfig; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.exception.HoodieException; import org.apache.hudi.exception.HoodieNotSupportedException; import org.apache.hudi.exception.TableNotFoundException; import org.apache.hudi.hive.HiveSyncConfig; import org.apache.hudi.hive.SlashEncodedDayPartitionValueExtractor; -import org.apache.hudi.index.HoodieIndex; +import org.apache.hudi.index.HoodieIndex.IndexType; import org.apache.hudi.keygen.KeyGenerator; import org.apache.hudi.keygen.parser.AbstractHoodieDateTimeParser; import org.apache.hudi.table.BulkInsertPartitioner; @@ -177,10 +178,12 @@ public class DataSourceUtils { } return builder.forTable(tblName) - .withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.BLOOM).build()) + .withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(IndexType.BLOOM).build()) .withCompactionConfig(HoodieCompactionConfig.newBuilder() .withPayloadClass(parameters.get(DataSourceWriteOptions.PAYLOAD_CLASS_OPT_KEY())) .withInlineCompaction(inlineCompact).build()) + .withPayloadConfig(HoodiePayloadConfig.newBuilder().withPayloadOrderingField(parameters.get(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY())) + .build()) // override above with Hoodie configs specified as options. .withProps(parameters).build(); } diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala index 1b6e49b45..0a335839a 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala @@ -205,7 +205,6 @@ object DataSourceWriteOptions { val PRECOMBINE_FIELD_OPT_KEY = "hoodie.datasource.write.precombine.field" val DEFAULT_PRECOMBINE_FIELD_OPT_VAL = "ts" - /** * Payload class used. Override this, if you like to roll your own merge logic, when upserting/inserting. * This will render any value set for `PRECOMBINE_FIELD_OPT_VAL` in-effective diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestDataSourceDefaults.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestDataSourceDefaults.scala index 99e1297f6..4c69950c0 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestDataSourceDefaults.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestDataSourceDefaults.scala @@ -17,12 +17,14 @@ package org.apache.hudi +import org.apache.avro.Schema import org.apache.avro.generic.GenericRecord import org.apache.hudi.avro.HoodieAvroUtils import org.apache.hudi.common.config.TypedProperties -import org.apache.hudi.common.model.{EmptyHoodieRecordPayload, HoodieKey, OverwriteWithLatestAvroPayload} +import org.apache.hudi.common.model.{BaseAvroPayload, DefaultHoodieRecordPayload, EmptyHoodieRecordPayload, HoodieKey, HoodiePayloadProps, OverwriteWithLatestAvroPayload} import org.apache.hudi.common.testutils.SchemaTestUtil import org.apache.hudi.common.util.Option +import org.apache.hudi.config.HoodiePayloadConfig import org.apache.hudi.exception.{HoodieException, HoodieKeyException} import org.apache.hudi.keygen._ import org.apache.hudi.testutils.KeyGeneratorTestUtilities @@ -31,6 +33,8 @@ import org.junit.jupiter.api.Assertions.assertEquals import org.junit.jupiter.api.{BeforeEach, Test} import org.scalatest.Assertions.fail +import scala.collection.JavaConverters.mapAsJavaMapConverter + /** * Tests on the default key generator, payload classes. */ @@ -567,6 +571,62 @@ class TestDataSourceDefaults { assertEquals("field2", combinedGR21.get("field1").toString) } + @Test def testOverwriteWithLatestAvroPayloadCombineAndGetUpdateValue() = { + val baseOrderingVal: Object = baseRecord.get("favoriteIntNumber") + val fieldSchema: Schema = baseRecord.getSchema().getField("favoriteIntNumber").schema() + val props = new TypedProperties() + props.put(HoodiePayloadProps.PAYLOAD_ORDERING_FIELD_PROP, "favoriteIntNumber"); + + val basePayload = new OverwriteWithLatestAvroPayload(baseRecord, HoodieAvroUtils.convertValueForSpecificDataTypes(fieldSchema, baseOrderingVal).asInstanceOf[Comparable[_]]) + + val laterRecord = SchemaTestUtil + .generateAvroRecordFromJson(schema, 2, "001", "f1") + val laterOrderingVal: Object = laterRecord.get("favoriteIntNumber") + val newerPayload = new OverwriteWithLatestAvroPayload(laterRecord, HoodieAvroUtils.convertValueForSpecificDataTypes(fieldSchema, laterOrderingVal).asInstanceOf[Comparable[_]]) + + // it will provide the record with greatest combine value + val preCombinedPayload = basePayload.preCombine(newerPayload) + val precombinedGR = preCombinedPayload.getInsertValue(schema).get().asInstanceOf[GenericRecord] + assertEquals("field2", precombinedGR.get("field1").toString) + } + + @Test def testDefaultHoodieRecordPayloadCombineAndGetUpdateValue() = { + val baseOrderingVal: Object = baseRecord.get("favoriteIntNumber") + val fieldSchema: Schema = baseRecord.getSchema().getField("favoriteIntNumber").schema() + val props = new TypedProperties() + props.put(HoodiePayloadProps.PAYLOAD_ORDERING_FIELD_PROP, "favoriteIntNumber"); + + val laterRecord = SchemaTestUtil + .generateAvroRecordFromJson(schema, 2, "001", "f1") + val laterOrderingVal: Object = laterRecord.get("favoriteIntNumber") + + val earlierRecord = SchemaTestUtil + .generateAvroRecordFromJson(schema, 1, "000", "f1") + val earlierOrderingVal: Object = earlierRecord.get("favoriteIntNumber") + + val laterPayload = new DefaultHoodieRecordPayload(laterRecord, + HoodieAvroUtils.convertValueForSpecificDataTypes(fieldSchema, laterOrderingVal).asInstanceOf[Comparable[_]]) + + val earlierPayload = new DefaultHoodieRecordPayload(earlierRecord, + HoodieAvroUtils.convertValueForSpecificDataTypes(fieldSchema, earlierOrderingVal).asInstanceOf[Comparable[_]]) + + // it will provide the record with greatest combine value + val preCombinedPayload = laterPayload.preCombine(earlierPayload) + val precombinedGR = preCombinedPayload.getInsertValue(schema).get().asInstanceOf[GenericRecord] + assertEquals("field2", precombinedGR.get("field1").toString) + assertEquals(laterOrderingVal, precombinedGR.get("favoriteIntNumber")) + + val earlierWithLater = earlierPayload.combineAndGetUpdateValue(laterRecord, schema, props) + val earlierwithLaterGR = earlierWithLater.get().asInstanceOf[GenericRecord] + assertEquals("field2", earlierwithLaterGR.get("field1").toString) + assertEquals(laterOrderingVal, earlierwithLaterGR.get("favoriteIntNumber")) + + val laterWithEarlier = laterPayload.combineAndGetUpdateValue(earlierRecord, schema, props) + val laterWithEarlierGR = laterWithEarlier.get().asInstanceOf[GenericRecord] + assertEquals("field2", laterWithEarlierGR.get("field1").toString) + assertEquals(laterOrderingVal, laterWithEarlierGR.get("favoriteIntNumber")) + } + @Test def testEmptyHoodieRecordPayload() = { val emptyPayload1 = new EmptyHoodieRecordPayload(baseRecord, 1) val laterRecord = SchemaTestUtil diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java index e17c1f015..50488bc11 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java @@ -41,6 +41,7 @@ import org.apache.hudi.common.util.ReflectionUtils; import org.apache.hudi.common.util.ValidationUtils; import org.apache.hudi.common.util.collection.Pair; import org.apache.hudi.config.HoodieCompactionConfig; +import org.apache.hudi.config.HoodiePayloadConfig; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.exception.HoodieException; import org.apache.hudi.hive.HiveSyncConfig; @@ -619,6 +620,8 @@ public class DeltaSync implements Serializable { .withCompactionConfig(HoodieCompactionConfig.newBuilder().withPayloadClass(cfg.payloadClassName) // Inline compaction is disabled for continuous mode. otherwise enabled for MOR .withInlineCompaction(cfg.isInlineCompactionEnabled()).build()) + .withPayloadConfig(HoodiePayloadConfig.newBuilder().withPayloadOrderingField(cfg.sourceOrderingField) + .build()) .forTable(cfg.targetTableName) .withAutoCommit(autoCommit).withProps(props);