1
0

[HUDI-1915] Fix the file id for write data buffer before flushing (#2966)

This commit is contained in:
Danny Chan
2021-05-20 10:20:08 +08:00
committed by GitHub
parent ced068e1ee
commit 9b01d2f864
6 changed files with 171 additions and 11 deletions

View File

@@ -436,6 +436,70 @@ public class TestWriteCopyOnWrite {
checkWrittenData(tempFile, expected, 1);
}
@Test
public void testInsertWithDeduplication() throws Exception {
// reset the config option
conf.setDouble(FlinkOptions.WRITE_BATCH_SIZE, 0.0006); // 630 bytes batch size
conf.setBoolean(FlinkOptions.INSERT_DROP_DUPS, true);
funcWrapper = new StreamWriteFunctionWrapper<>(tempFile.getAbsolutePath(), conf);
// open the function and ingest data
funcWrapper.openFunction();
// Each record is 208 bytes. so 4 records expect to trigger a mini-batch write
for (RowData rowData : TestData.DATA_SET_INSERT_SAME_KEY) {
funcWrapper.invoke(rowData);
}
Map<String, List<HoodieRecord>> dataBuffer = funcWrapper.getDataBuffer();
assertThat("Should have 1 data bucket", dataBuffer.size(), is(1));
assertThat("3 records expect to flush out as a mini-batch",
dataBuffer.values().stream().findFirst().map(List::size).orElse(-1),
is(2));
// this triggers the data write and event send
funcWrapper.checkpointFunction(1);
dataBuffer = funcWrapper.getDataBuffer();
assertThat("All data should be flushed out", dataBuffer.size(), is(0));
final OperatorEvent event1 = funcWrapper.getNextEvent(); // remove the first event first
final OperatorEvent event2 = funcWrapper.getNextEvent();
assertThat("The operator expect to send an event", event2, instanceOf(BatchWriteSuccessEvent.class));
funcWrapper.getCoordinator().handleEventFromOperator(0, event1);
funcWrapper.getCoordinator().handleEventFromOperator(0, event2);
assertNotNull(funcWrapper.getEventBuffer()[0], "The coordinator missed the event");
String instant = funcWrapper.getWriteClient()
.getLastPendingInstant(getTableType());
funcWrapper.checkpointComplete(1);
Map<String, String> expected = new HashMap<>();
expected.put("par1", "[id1,par1,id1,Danny,23,4,par1]");
checkWrittenData(tempFile, expected, 1);
// started a new instant already
checkInflightInstant(funcWrapper.getWriteClient());
checkInstantState(funcWrapper.getWriteClient(), HoodieInstant.State.COMPLETED, instant);
// insert duplicates again
for (RowData rowData : TestData.DATA_SET_INSERT_SAME_KEY) {
funcWrapper.invoke(rowData);
}
funcWrapper.checkpointFunction(2);
final OperatorEvent event3 = funcWrapper.getNextEvent(); // remove the first event first
final OperatorEvent event4 = funcWrapper.getNextEvent();
funcWrapper.getCoordinator().handleEventFromOperator(0, event3);
funcWrapper.getCoordinator().handleEventFromOperator(0, event4);
funcWrapper.checkpointComplete(2);
// Same the original base file content.
checkWrittenData(tempFile, expected, 1);
}
@Test
public void testInsertWithSmallBufferSize() throws Exception {
// reset the config option

View File

@@ -121,6 +121,13 @@ public class TestData {
TimestampData.fromEpochMillis(1), StringData.fromString("par1"))));
}
public static List<RowData> DATA_SET_INSERT_SAME_KEY = new ArrayList<>();
static {
IntStream.range(0, 5).forEach(i -> DATA_SET_INSERT_SAME_KEY.add(
insertRow(StringData.fromString("id1"), StringData.fromString("Danny"), 23,
TimestampData.fromEpochMillis(i), StringData.fromString("par1"))));
}
// data set of test_source.data
public static List<RowData> DATA_SET_SOURCE_INSERT = Arrays.asList(
insertRow(StringData.fromString("id1"), StringData.fromString("Danny"), 23,