[HUDI-2298] The HoodieMergedLogRecordScanner should set up the operation of the chosen record (#3456)
This commit is contained in:
@@ -20,6 +20,7 @@ package org.apache.hudi.common.table.log;
|
||||
|
||||
import org.apache.hudi.common.config.HoodieCommonConfig;
|
||||
import org.apache.hudi.common.model.HoodieKey;
|
||||
import org.apache.hudi.common.model.HoodieOperation;
|
||||
import org.apache.hudi.common.model.HoodieRecord;
|
||||
import org.apache.hudi.common.model.HoodieRecordPayload;
|
||||
import org.apache.hudi.common.util.DefaultSizeEstimator;
|
||||
@@ -134,9 +135,12 @@ public class HoodieMergedLogRecordScanner extends AbstractHoodieLogRecordScanner
|
||||
// Merge and store the merged record. The HoodieRecordPayload implementation is free to decide what should be
|
||||
// done when a delete (empty payload) is encountered before or after an insert/update.
|
||||
|
||||
// Always use the natural order now.
|
||||
HoodieRecordPayload combinedValue = hoodieRecord.getData().preCombine(records.get(key).getData());
|
||||
records.put(key, new HoodieRecord<>(new HoodieKey(key, hoodieRecord.getPartitionPath()), combinedValue, hoodieRecord.getOperation()));
|
||||
HoodieRecord<? extends HoodieRecordPayload> oldRecord = records.get(key);
|
||||
HoodieRecordPayload oldValue = oldRecord.getData();
|
||||
HoodieRecordPayload combinedValue = hoodieRecord.getData().preCombine(oldValue);
|
||||
boolean choosePrev = combinedValue.equals(oldValue);
|
||||
HoodieOperation operation = choosePrev ? oldRecord.getOperation() : hoodieRecord.getOperation();
|
||||
records.put(key, new HoodieRecord<>(new HoodieKey(key, hoodieRecord.getPartitionPath()), combinedValue, operation));
|
||||
} else {
|
||||
// Put the record as is
|
||||
records.put(key, hoodieRecord);
|
||||
|
||||
@@ -220,6 +220,43 @@ public class TestInputFormat {
|
||||
assertThat(actual2, is(expected2));
|
||||
}
|
||||
|
||||
@Test
|
||||
void testReadBaseAndLogFilesWithDisorderUpdateDelete() throws Exception {
|
||||
Map<String, String> options = new HashMap<>();
|
||||
options.put(FlinkOptions.CHANGELOG_ENABLED.key(), "true");
|
||||
beforeEach(HoodieTableType.MERGE_ON_READ, options);
|
||||
|
||||
// write base first with compaction.
|
||||
conf.setBoolean(FlinkOptions.COMPACTION_ASYNC_ENABLED, true);
|
||||
conf.setInteger(FlinkOptions.COMPACTION_DELTA_COMMITS, 1);
|
||||
TestData.writeData(TestData.DATA_SET_SINGLE_INSERT, conf);
|
||||
|
||||
// write another commit using logs and read again.
|
||||
conf.setBoolean(FlinkOptions.COMPACTION_ASYNC_ENABLED, false);
|
||||
TestData.writeData(TestData.DATA_SET_DISORDER_UPDATE_DELETE, conf);
|
||||
|
||||
InputFormat<RowData, ?> inputFormat = this.tableSource.getInputFormat();
|
||||
assertThat(inputFormat, instanceOf(MergeOnReadInputFormat.class));
|
||||
|
||||
// when isEmitDelete is false.
|
||||
List<RowData> result1 = readData(inputFormat);
|
||||
|
||||
final String actual1 = TestData.rowDataToString(result1, true);
|
||||
final String expected1 = "[+U(id1,Danny,22,1970-01-01T00:00:00.004,par1)]";
|
||||
assertThat(actual1, is(expected1));
|
||||
|
||||
// refresh the input format and set isEmitDelete to true.
|
||||
this.tableSource.reset();
|
||||
inputFormat = this.tableSource.getInputFormat();
|
||||
((MergeOnReadInputFormat) inputFormat).isEmitDelete(true);
|
||||
|
||||
List<RowData> result2 = readData(inputFormat);
|
||||
|
||||
final String actual2 = TestData.rowDataToString(result2, true);
|
||||
final String expected2 = "[+U(id1,Danny,22,1970-01-01T00:00:00.004,par1)]";
|
||||
assertThat(actual2, is(expected2));
|
||||
}
|
||||
|
||||
@Test
|
||||
void testReadWithDeletesMOR() throws Exception {
|
||||
Map<String, String> options = new HashMap<>();
|
||||
|
||||
@@ -58,6 +58,7 @@ import java.io.FileFilter;
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collections;
|
||||
import java.util.Comparator;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
@@ -256,6 +257,29 @@ public class TestData {
|
||||
TimestampData.fromEpochMillis(5), StringData.fromString("par1"))
|
||||
);
|
||||
|
||||
public static List<RowData> DATA_SET_SINGLE_INSERT = Collections.singletonList(
|
||||
insertRow(StringData.fromString("id1"), StringData.fromString("Danny"), 23,
|
||||
TimestampData.fromEpochMillis(1), StringData.fromString("par1")));
|
||||
|
||||
public static List<RowData> DATA_SET_DISORDER_UPDATE_DELETE = Arrays.asList(
|
||||
// DISORDER UPDATE
|
||||
updateAfterRow(StringData.fromString("id1"), StringData.fromString("Danny"), 21,
|
||||
TimestampData.fromEpochMillis(3), StringData.fromString("par1")),
|
||||
updateAfterRow(StringData.fromString("id1"), StringData.fromString("Danny"), 20,
|
||||
TimestampData.fromEpochMillis(2), StringData.fromString("par1")),
|
||||
updateBeforeRow(StringData.fromString("id1"), StringData.fromString("Danny"), 23,
|
||||
TimestampData.fromEpochMillis(1), StringData.fromString("par1")),
|
||||
updateBeforeRow(StringData.fromString("id1"), StringData.fromString("Danny"), 20,
|
||||
TimestampData.fromEpochMillis(2), StringData.fromString("par1")),
|
||||
updateAfterRow(StringData.fromString("id1"), StringData.fromString("Danny"), 22,
|
||||
TimestampData.fromEpochMillis(4), StringData.fromString("par1")),
|
||||
updateBeforeRow(StringData.fromString("id1"), StringData.fromString("Danny"), 21,
|
||||
TimestampData.fromEpochMillis(3), StringData.fromString("par1")),
|
||||
// DISORDER DELETE
|
||||
deleteRow(StringData.fromString("id1"), StringData.fromString("Danny"), 22,
|
||||
TimestampData.fromEpochMillis(2), StringData.fromString("par1"))
|
||||
);
|
||||
|
||||
/**
|
||||
* Returns string format of a list of RowData.
|
||||
*/
|
||||
|
||||
Reference in New Issue
Block a user