diff --git a/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java b/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java index 91afd491b..1d58e4e24 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java +++ b/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java @@ -197,7 +197,7 @@ public class HoodieTableSource implements @Override public ChangelogMode getChangelogMode() { - return conf.getBoolean(FlinkOptions.READ_AS_STREAMING) + return conf.getBoolean(FlinkOptions.CHANGELOG_ENABLED) ? ChangelogModes.FULL // when all the changes are persisted or read as batch, // use INSERT mode. diff --git a/hudi-flink/src/main/java/org/apache/hudi/table/format/FormatUtils.java b/hudi-flink/src/main/java/org/apache/hudi/table/format/FormatUtils.java index 14f7eb3e2..94fbe028b 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/table/format/FormatUtils.java +++ b/hudi-flink/src/main/java/org/apache/hudi/table/format/FormatUtils.java @@ -91,6 +91,17 @@ public class FormatUtils { } } + /** + * Returns the RowKind of the given record, never null. + * Returns RowKind.INSERT when the given field value not found. + */ + public static RowKind getRowKindSafely(IndexedRecord record, int index) { + if (index == -1) { + return RowKind.INSERT; + } + return getRowKind(record, index); + } + public static GenericRecord buildAvroRecordBySchema( IndexedRecord record, Schema requiredSchema, diff --git a/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputFormat.java b/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputFormat.java index d0e1c33d1..2042b9673 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputFormat.java +++ b/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputFormat.java @@ -177,7 +177,8 @@ public class MergeOnReadInputFormat } } else if (!split.getBasePath().isPresent()) { // log files only - if (conf.getBoolean(FlinkOptions.CHANGELOG_ENABLED)) { + if (conf.getBoolean(FlinkOptions.READ_AS_STREAMING) + && conf.getBoolean(FlinkOptions.CHANGELOG_ENABLED)) { this.iterator = new LogFileOnlyIterator(getUnMergedLogFileIterator(split)); } else { this.iterator = new LogFileOnlyIterator(getLogFileIterator(split)); @@ -350,13 +351,18 @@ public class MergeOnReadInputFormat // continue; } else { final IndexedRecord avroRecord = curAvroRecord.get(); + final RowKind rowKind = FormatUtils.getRowKindSafely(avroRecord, tableState.getOperationPos()); + if (rowKind == RowKind.DELETE && !emitDelete) { + // skip the delete record + continue; + } GenericRecord requiredAvroRecord = buildAvroRecordBySchema( avroRecord, requiredSchema, requiredPos, recordBuilder); currentRecord = (RowData) avroToRowDataConverter.convert(requiredAvroRecord); - FormatUtils.setRowKind(currentRecord, avroRecord, tableState.getOperationPos()); + currentRecord.setRowKind(rowKind); return true; } } diff --git a/hudi-flink/src/test/java/org/apache/hudi/table/format/TestInputFormat.java b/hudi-flink/src/test/java/org/apache/hudi/table/format/TestInputFormat.java index cc0699fc5..f83b2d991 100644 --- a/hudi-flink/src/test/java/org/apache/hudi/table/format/TestInputFormat.java +++ b/hudi-flink/src/test/java/org/apache/hudi/table/format/TestInputFormat.java @@ -325,10 +325,42 @@ public class TestInputFormat { assertThat(actual, is(expected)); } + @Test + void testReadChangesMergedMOR() throws Exception { + Map options = new HashMap<>(); + options.put(FlinkOptions.CHANGELOG_ENABLED.key(), "true"); + beforeEach(HoodieTableType.MERGE_ON_READ, options); + + // write another commit to read again + TestData.writeData(TestData.DATA_SET_INSERT_UPDATE_DELETE, conf); + + InputFormat inputFormat = this.tableSource.getInputFormat(); + assertThat(inputFormat, instanceOf(MergeOnReadInputFormat.class)); + + List result1 = readData(inputFormat); + + final String actual1 = TestData.rowDataToString(result1); + // the data set is merged when the data source is bounded. + final String expected1 = "[]"; + assertThat(actual1, is(expected1)); + + // refresh the input format and set isEmitDelete to true. + this.tableSource.reset(); + inputFormat = this.tableSource.getInputFormat(); + ((MergeOnReadInputFormat) inputFormat).isEmitDelete(true); + + List result2 = readData(inputFormat); + + final String actual2 = TestData.rowDataToString(result2); + final String expected2 = "[-D[id1, Danny, 22, 1970-01-01T00:00:00.005, par1]]"; + assertThat(actual2, is(expected2)); + } + @Test void testReadChangesUnMergedMOR() throws Exception { Map options = new HashMap<>(); options.put(FlinkOptions.CHANGELOG_ENABLED.key(), "true"); + options.put(FlinkOptions.READ_AS_STREAMING.key(), "true"); beforeEach(HoodieTableType.MERGE_ON_READ, options); // write another commit to read again @@ -340,6 +372,7 @@ public class TestInputFormat { List result = readData(inputFormat); final String actual = TestData.rowDataToString(result); + // the data set is merged when the data source is bounded. final String expected = "[" + "+I[id1, Danny, 19, 1970-01-01T00:00:00.001, par1], " + "-U[id1, Danny, 19, 1970-01-01T00:00:00.001, par1], "