1
0

[HUDI-2340] Merge the data set for flink bounded source when changelog mode turns off (#3513)

This commit is contained in:
Danny Chan
2021-08-21 07:21:35 +08:00
committed by GitHub
parent e39d0a2f28
commit c7c517f14c
4 changed files with 53 additions and 3 deletions

View File

@@ -197,7 +197,7 @@ public class HoodieTableSource implements
@Override
public ChangelogMode getChangelogMode() {
return conf.getBoolean(FlinkOptions.READ_AS_STREAMING)
return conf.getBoolean(FlinkOptions.CHANGELOG_ENABLED)
? ChangelogModes.FULL
// when all the changes are persisted or read as batch,
// use INSERT mode.

View File

@@ -91,6 +91,17 @@ public class FormatUtils {
}
}
/**
* Returns the RowKind of the given record, never null.
* Returns RowKind.INSERT when the given field value not found.
*/
public static RowKind getRowKindSafely(IndexedRecord record, int index) {
if (index == -1) {
return RowKind.INSERT;
}
return getRowKind(record, index);
}
public static GenericRecord buildAvroRecordBySchema(
IndexedRecord record,
Schema requiredSchema,

View File

@@ -177,7 +177,8 @@ public class MergeOnReadInputFormat
}
} else if (!split.getBasePath().isPresent()) {
// log files only
if (conf.getBoolean(FlinkOptions.CHANGELOG_ENABLED)) {
if (conf.getBoolean(FlinkOptions.READ_AS_STREAMING)
&& conf.getBoolean(FlinkOptions.CHANGELOG_ENABLED)) {
this.iterator = new LogFileOnlyIterator(getUnMergedLogFileIterator(split));
} else {
this.iterator = new LogFileOnlyIterator(getLogFileIterator(split));
@@ -350,13 +351,18 @@ public class MergeOnReadInputFormat
// continue;
} else {
final IndexedRecord avroRecord = curAvroRecord.get();
final RowKind rowKind = FormatUtils.getRowKindSafely(avroRecord, tableState.getOperationPos());
if (rowKind == RowKind.DELETE && !emitDelete) {
// skip the delete record
continue;
}
GenericRecord requiredAvroRecord = buildAvroRecordBySchema(
avroRecord,
requiredSchema,
requiredPos,
recordBuilder);
currentRecord = (RowData) avroToRowDataConverter.convert(requiredAvroRecord);
FormatUtils.setRowKind(currentRecord, avroRecord, tableState.getOperationPos());
currentRecord.setRowKind(rowKind);
return true;
}
}

View File

@@ -325,10 +325,42 @@ public class TestInputFormat {
assertThat(actual, is(expected));
}
@Test
void testReadChangesMergedMOR() throws Exception {
Map<String, String> options = new HashMap<>();
options.put(FlinkOptions.CHANGELOG_ENABLED.key(), "true");
beforeEach(HoodieTableType.MERGE_ON_READ, options);
// write another commit to read again
TestData.writeData(TestData.DATA_SET_INSERT_UPDATE_DELETE, conf);
InputFormat<RowData, ?> inputFormat = this.tableSource.getInputFormat();
assertThat(inputFormat, instanceOf(MergeOnReadInputFormat.class));
List<RowData> result1 = readData(inputFormat);
final String actual1 = TestData.rowDataToString(result1);
// the data set is merged when the data source is bounded.
final String expected1 = "[]";
assertThat(actual1, is(expected1));
// refresh the input format and set isEmitDelete to true.
this.tableSource.reset();
inputFormat = this.tableSource.getInputFormat();
((MergeOnReadInputFormat) inputFormat).isEmitDelete(true);
List<RowData> result2 = readData(inputFormat);
final String actual2 = TestData.rowDataToString(result2);
final String expected2 = "[-D[id1, Danny, 22, 1970-01-01T00:00:00.005, par1]]";
assertThat(actual2, is(expected2));
}
@Test
void testReadChangesUnMergedMOR() throws Exception {
Map<String, String> options = new HashMap<>();
options.put(FlinkOptions.CHANGELOG_ENABLED.key(), "true");
options.put(FlinkOptions.READ_AS_STREAMING.key(), "true");
beforeEach(HoodieTableType.MERGE_ON_READ, options);
// write another commit to read again
@@ -340,6 +372,7 @@ public class TestInputFormat {
List<RowData> result = readData(inputFormat);
final String actual = TestData.rowDataToString(result);
// the data set is merged when the data source is bounded.
final String expected = "["
+ "+I[id1, Danny, 19, 1970-01-01T00:00:00.001, par1], "
+ "-U[id1, Danny, 19, 1970-01-01T00:00:00.001, par1], "