From 29332498af420596f42c3b0ff4a2fe804d7d7dfc Mon Sep 17 00:00:00 2001 From: Danny Chan Date: Wed, 11 Aug 2021 22:55:43 +0800 Subject: [PATCH] [HUDI-2298] The HoodieMergedLogRecordScanner should set up the operation of the chosen record (#3456) --- .../log/HoodieMergedLogRecordScanner.java | 10 +++-- .../hudi/table/format/TestInputFormat.java | 37 +++++++++++++++++++ .../java/org/apache/hudi/utils/TestData.java | 24 ++++++++++++ 3 files changed, 68 insertions(+), 3 deletions(-) diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordScanner.java b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordScanner.java index bc08fe744..a68c8f17b 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordScanner.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordScanner.java @@ -20,6 +20,7 @@ package org.apache.hudi.common.table.log; import org.apache.hudi.common.config.HoodieCommonConfig; import org.apache.hudi.common.model.HoodieKey; +import org.apache.hudi.common.model.HoodieOperation; import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieRecordPayload; import org.apache.hudi.common.util.DefaultSizeEstimator; @@ -134,9 +135,12 @@ public class HoodieMergedLogRecordScanner extends AbstractHoodieLogRecordScanner // Merge and store the merged record. The HoodieRecordPayload implementation is free to decide what should be // done when a delete (empty payload) is encountered before or after an insert/update. - // Always use the natural order now. - HoodieRecordPayload combinedValue = hoodieRecord.getData().preCombine(records.get(key).getData()); - records.put(key, new HoodieRecord<>(new HoodieKey(key, hoodieRecord.getPartitionPath()), combinedValue, hoodieRecord.getOperation())); + HoodieRecord oldRecord = records.get(key); + HoodieRecordPayload oldValue = oldRecord.getData(); + HoodieRecordPayload combinedValue = hoodieRecord.getData().preCombine(oldValue); + boolean choosePrev = combinedValue.equals(oldValue); + HoodieOperation operation = choosePrev ? oldRecord.getOperation() : hoodieRecord.getOperation(); + records.put(key, new HoodieRecord<>(new HoodieKey(key, hoodieRecord.getPartitionPath()), combinedValue, operation)); } else { // Put the record as is records.put(key, hoodieRecord); diff --git a/hudi-flink/src/test/java/org/apache/hudi/table/format/TestInputFormat.java b/hudi-flink/src/test/java/org/apache/hudi/table/format/TestInputFormat.java index 55359451e..54848bdee 100644 --- a/hudi-flink/src/test/java/org/apache/hudi/table/format/TestInputFormat.java +++ b/hudi-flink/src/test/java/org/apache/hudi/table/format/TestInputFormat.java @@ -220,6 +220,43 @@ public class TestInputFormat { assertThat(actual2, is(expected2)); } + @Test + void testReadBaseAndLogFilesWithDisorderUpdateDelete() throws Exception { + Map options = new HashMap<>(); + options.put(FlinkOptions.CHANGELOG_ENABLED.key(), "true"); + beforeEach(HoodieTableType.MERGE_ON_READ, options); + + // write base first with compaction. + conf.setBoolean(FlinkOptions.COMPACTION_ASYNC_ENABLED, true); + conf.setInteger(FlinkOptions.COMPACTION_DELTA_COMMITS, 1); + TestData.writeData(TestData.DATA_SET_SINGLE_INSERT, conf); + + // write another commit using logs and read again. + conf.setBoolean(FlinkOptions.COMPACTION_ASYNC_ENABLED, false); + TestData.writeData(TestData.DATA_SET_DISORDER_UPDATE_DELETE, conf); + + InputFormat inputFormat = this.tableSource.getInputFormat(); + assertThat(inputFormat, instanceOf(MergeOnReadInputFormat.class)); + + // when isEmitDelete is false. + List result1 = readData(inputFormat); + + final String actual1 = TestData.rowDataToString(result1, true); + final String expected1 = "[+U(id1,Danny,22,1970-01-01T00:00:00.004,par1)]"; + assertThat(actual1, is(expected1)); + + // refresh the input format and set isEmitDelete to true. + this.tableSource.reset(); + inputFormat = this.tableSource.getInputFormat(); + ((MergeOnReadInputFormat) inputFormat).isEmitDelete(true); + + List result2 = readData(inputFormat); + + final String actual2 = TestData.rowDataToString(result2, true); + final String expected2 = "[+U(id1,Danny,22,1970-01-01T00:00:00.004,par1)]"; + assertThat(actual2, is(expected2)); + } + @Test void testReadWithDeletesMOR() throws Exception { Map options = new HashMap<>(); diff --git a/hudi-flink/src/test/java/org/apache/hudi/utils/TestData.java b/hudi-flink/src/test/java/org/apache/hudi/utils/TestData.java index d3e32e699..b51631c68 100644 --- a/hudi-flink/src/test/java/org/apache/hudi/utils/TestData.java +++ b/hudi-flink/src/test/java/org/apache/hudi/utils/TestData.java @@ -58,6 +58,7 @@ import java.io.FileFilter; import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; import java.util.Comparator; import java.util.List; import java.util.Map; @@ -256,6 +257,29 @@ public class TestData { TimestampData.fromEpochMillis(5), StringData.fromString("par1")) ); + public static List DATA_SET_SINGLE_INSERT = Collections.singletonList( + insertRow(StringData.fromString("id1"), StringData.fromString("Danny"), 23, + TimestampData.fromEpochMillis(1), StringData.fromString("par1"))); + + public static List DATA_SET_DISORDER_UPDATE_DELETE = Arrays.asList( + // DISORDER UPDATE + updateAfterRow(StringData.fromString("id1"), StringData.fromString("Danny"), 21, + TimestampData.fromEpochMillis(3), StringData.fromString("par1")), + updateAfterRow(StringData.fromString("id1"), StringData.fromString("Danny"), 20, + TimestampData.fromEpochMillis(2), StringData.fromString("par1")), + updateBeforeRow(StringData.fromString("id1"), StringData.fromString("Danny"), 23, + TimestampData.fromEpochMillis(1), StringData.fromString("par1")), + updateBeforeRow(StringData.fromString("id1"), StringData.fromString("Danny"), 20, + TimestampData.fromEpochMillis(2), StringData.fromString("par1")), + updateAfterRow(StringData.fromString("id1"), StringData.fromString("Danny"), 22, + TimestampData.fromEpochMillis(4), StringData.fromString("par1")), + updateBeforeRow(StringData.fromString("id1"), StringData.fromString("Danny"), 21, + TimestampData.fromEpochMillis(3), StringData.fromString("par1")), + // DISORDER DELETE + deleteRow(StringData.fromString("id1"), StringData.fromString("Danny"), 22, + TimestampData.fromEpochMillis(2), StringData.fromString("par1")) + ); + /** * Returns string format of a list of RowData. */