1
0

[HUDI-1678] Row level delete for Flink sink (#2659)

This commit is contained in:
Danny Chan
2021-03-11 19:44:06 +08:00
committed by GitHub
parent 2fdae6835c
commit 12ff562d2b
8 changed files with 169 additions and 83 deletions

View File

@@ -19,6 +19,7 @@
package org.apache.hudi.operator.transform; package org.apache.hudi.operator.transform;
import org.apache.hudi.avro.HoodieAvroUtils; import org.apache.hudi.avro.HoodieAvroUtils;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordPayload; import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.model.WriteOperationType; import org.apache.hudi.common.model.WriteOperationType;
@@ -33,6 +34,7 @@ import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.Configuration;
import org.apache.flink.table.data.RowData; import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.logical.RowType; import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.types.RowKind;
import java.io.IOException; import java.io.IOException;
@@ -100,9 +102,12 @@ public class RowDataToHoodieFunction<I extends RowData, O extends HoodieRecord<?
final String payloadClazz = this.config.getString(FlinkOptions.PAYLOAD_CLASS); final String payloadClazz = this.config.getString(FlinkOptions.PAYLOAD_CLASS);
Comparable orderingVal = (Comparable) HoodieAvroUtils.getNestedFieldVal(gr, Comparable orderingVal = (Comparable) HoodieAvroUtils.getNestedFieldVal(gr,
this.config.getString(FlinkOptions.PRECOMBINE_FIELD), false); this.config.getString(FlinkOptions.PRECOMBINE_FIELD), false);
final HoodieKey hoodieKey = keyGenerator.getKey(gr);
// nullify the payload insert data to mark the record as a DELETE
gr = record.getRowKind() == RowKind.DELETE ? null : gr;
HoodieRecordPayload payload = shouldCombine HoodieRecordPayload payload = shouldCombine
? StreamerUtil.createPayload(payloadClazz, gr, orderingVal) ? StreamerUtil.createPayload(payloadClazz, gr, orderingVal)
: StreamerUtil.createPayload(payloadClazz, gr); : StreamerUtil.createPayload(payloadClazz, gr);
return new HoodieRecord<>(keyGenerator.getKey(gr), payload); return new HoodieRecord<>(hoodieKey, payload);
} }
} }

View File

@@ -114,7 +114,7 @@ public class TestWriteCopyOnWrite {
public void testCheckpoint() throws Exception { public void testCheckpoint() throws Exception {
// open the function and ingest data // open the function and ingest data
funcWrapper.openFunction(); funcWrapper.openFunction();
for (RowData rowData : TestData.DATA_SET_ONE) { for (RowData rowData : TestData.DATA_SET_INSERT) {
funcWrapper.invoke(rowData); funcWrapper.invoke(rowData);
} }
@@ -200,7 +200,7 @@ public class TestWriteCopyOnWrite {
checkInstantState(funcWrapper.getWriteClient(), HoodieInstant.State.REQUESTED, null); checkInstantState(funcWrapper.getWriteClient(), HoodieInstant.State.REQUESTED, null);
checkInstantState(funcWrapper.getWriteClient(), HoodieInstant.State.COMPLETED, null); checkInstantState(funcWrapper.getWriteClient(), HoodieInstant.State.COMPLETED, null);
for (RowData rowData : TestData.DATA_SET_ONE) { for (RowData rowData : TestData.DATA_SET_INSERT) {
funcWrapper.invoke(rowData); funcWrapper.invoke(rowData);
} }
@@ -215,7 +215,7 @@ public class TestWriteCopyOnWrite {
public void testInsert() throws Exception { public void testInsert() throws Exception {
// open the function and ingest data // open the function and ingest data
funcWrapper.openFunction(); funcWrapper.openFunction();
for (RowData rowData : TestData.DATA_SET_ONE) { for (RowData rowData : TestData.DATA_SET_INSERT) {
funcWrapper.invoke(rowData); funcWrapper.invoke(rowData);
} }
@@ -248,7 +248,7 @@ public class TestWriteCopyOnWrite {
// open the function and ingest data // open the function and ingest data
funcWrapper.openFunction(); funcWrapper.openFunction();
for (RowData rowData : TestData.DATA_SET_THREE) { for (RowData rowData : TestData.DATA_SET_INSERT_DUPLICATES) {
funcWrapper.invoke(rowData); funcWrapper.invoke(rowData);
} }
@@ -267,7 +267,7 @@ public class TestWriteCopyOnWrite {
checkWrittenData(tempFile, EXPECTED3, 1); checkWrittenData(tempFile, EXPECTED3, 1);
// insert duplicates again // insert duplicates again
for (RowData rowData : TestData.DATA_SET_THREE) { for (RowData rowData : TestData.DATA_SET_INSERT_DUPLICATES) {
funcWrapper.invoke(rowData); funcWrapper.invoke(rowData);
} }
@@ -284,7 +284,7 @@ public class TestWriteCopyOnWrite {
public void testUpsert() throws Exception { public void testUpsert() throws Exception {
// open the function and ingest data // open the function and ingest data
funcWrapper.openFunction(); funcWrapper.openFunction();
for (RowData rowData : TestData.DATA_SET_ONE) { for (RowData rowData : TestData.DATA_SET_INSERT) {
funcWrapper.invoke(rowData); funcWrapper.invoke(rowData);
} }
@@ -301,7 +301,7 @@ public class TestWriteCopyOnWrite {
funcWrapper.checkpointComplete(1); funcWrapper.checkpointComplete(1);
// upsert another data buffer // upsert another data buffer
for (RowData rowData : TestData.DATA_SET_TWO) { for (RowData rowData : TestData.DATA_SET_UPDATE_INSERT) {
funcWrapper.invoke(rowData); funcWrapper.invoke(rowData);
} }
// the data is not flushed yet // the data is not flushed yet
@@ -325,6 +325,58 @@ public class TestWriteCopyOnWrite {
checkWrittenData(tempFile, EXPECTED2); checkWrittenData(tempFile, EXPECTED2);
} }
@Test
public void testUpsertWithDelete() throws Exception {
// open the function and ingest data
funcWrapper.openFunction();
for (RowData rowData : TestData.DATA_SET_INSERT) {
funcWrapper.invoke(rowData);
}
assertEmptyDataFiles();
// this triggers the data write and event send
funcWrapper.checkpointFunction(1);
OperatorEvent nextEvent = funcWrapper.getNextEvent();
assertThat("The operator expect to send an event", nextEvent, instanceOf(BatchWriteSuccessEvent.class));
funcWrapper.getCoordinator().handleEventFromOperator(0, nextEvent);
assertNotNull(funcWrapper.getEventBuffer()[0], "The coordinator missed the event");
funcWrapper.checkpointComplete(1);
// upsert another data buffer
for (RowData rowData : TestData.DATA_SET_UPDATE_DELETE) {
funcWrapper.invoke(rowData);
}
// the data is not flushed yet
checkWrittenData(tempFile, EXPECTED1);
// this triggers the data write and event send
funcWrapper.checkpointFunction(2);
String instant = funcWrapper.getWriteClient()
.getInflightAndRequestedInstant(getTableType());
nextEvent = funcWrapper.getNextEvent();
assertThat("The operator expect to send an event", nextEvent, instanceOf(BatchWriteSuccessEvent.class));
funcWrapper.getCoordinator().handleEventFromOperator(0, nextEvent);
assertNotNull(funcWrapper.getEventBuffer()[0], "The coordinator missed the event");
checkInstantState(funcWrapper.getWriteClient(), HoodieInstant.State.REQUESTED, instant);
funcWrapper.checkpointComplete(2);
// the coordinator checkpoint commits the inflight instant.
checkInstantState(funcWrapper.getWriteClient(), HoodieInstant.State.COMPLETED, instant);
Map<String, String> expected = new HashMap<>();
// id3, id5 were deleted and id9 is ignored
expected.put("par1", "[id1,par1,id1,Danny,24,1,par1, id2,par1,id2,Stephen,34,2,par1]");
expected.put("par2", "[id4,par2,id4,Fabian,31,4,par2]");
expected.put("par3", "[id6,par3,id6,Emma,20,6,par3]");
expected.put("par4", "[id7,par4,id7,Bob,44,7,par4, id8,par4,id8,Han,56,8,par4]");
checkWrittenData(tempFile, expected);
}
@Test @Test
public void testInsertWithMiniBatches() throws Exception { public void testInsertWithMiniBatches() throws Exception {
// reset the config option // reset the config option
@@ -334,7 +386,7 @@ public class TestWriteCopyOnWrite {
// open the function and ingest data // open the function and ingest data
funcWrapper.openFunction(); funcWrapper.openFunction();
// Each record is 424 bytes. so 3 records expect to trigger a mini-batch write // Each record is 424 bytes. so 3 records expect to trigger a mini-batch write
for (RowData rowData : TestData.DATA_SET_THREE) { for (RowData rowData : TestData.DATA_SET_INSERT_DUPLICATES) {
funcWrapper.invoke(rowData); funcWrapper.invoke(rowData);
} }
@@ -369,7 +421,7 @@ public class TestWriteCopyOnWrite {
checkInstantState(funcWrapper.getWriteClient(), HoodieInstant.State.COMPLETED, instant); checkInstantState(funcWrapper.getWriteClient(), HoodieInstant.State.COMPLETED, instant);
// insert duplicates again // insert duplicates again
for (RowData rowData : TestData.DATA_SET_THREE) { for (RowData rowData : TestData.DATA_SET_INSERT_DUPLICATES) {
funcWrapper.invoke(rowData); funcWrapper.invoke(rowData);
} }
@@ -401,7 +453,7 @@ public class TestWriteCopyOnWrite {
public void testIndexStateBootstrap() throws Exception { public void testIndexStateBootstrap() throws Exception {
// open the function and ingest data // open the function and ingest data
funcWrapper.openFunction(); funcWrapper.openFunction();
for (RowData rowData : TestData.DATA_SET_ONE) { for (RowData rowData : TestData.DATA_SET_INSERT) {
funcWrapper.invoke(rowData); funcWrapper.invoke(rowData);
} }
@@ -421,7 +473,7 @@ public class TestWriteCopyOnWrite {
funcWrapper.clearIndexState(); funcWrapper.clearIndexState();
// upsert another data buffer // upsert another data buffer
for (RowData rowData : TestData.DATA_SET_TWO) { for (RowData rowData : TestData.DATA_SET_UPDATE_INSERT) {
funcWrapper.invoke(rowData); funcWrapper.invoke(rowData);
} }
checkIndexLoaded( checkIndexLoaded(

View File

@@ -44,6 +44,7 @@ import org.apache.flink.table.runtime.types.InternalSerializers;
import org.apache.flink.table.types.logical.LogicalType; import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.RowType; import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.types.Row; import org.apache.flink.types.Row;
import org.apache.flink.types.RowKind;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.parquet.Strings; import org.apache.parquet.Strings;
@@ -58,6 +59,7 @@ import java.util.Arrays;
import java.util.Comparator; import java.util.Comparator;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Objects;
import java.util.Properties; import java.util.Properties;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import java.util.stream.IntStream; import java.util.stream.IntStream;
@@ -70,100 +72,116 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
/** Data set for testing, also some utilities to check the results. */ /** Data set for testing, also some utilities to check the results. */
public class TestData { public class TestData {
public static List<RowData> DATA_SET_ONE = Arrays.asList( public static List<RowData> DATA_SET_INSERT = Arrays.asList(
binaryRow(StringData.fromString("id1"), StringData.fromString("Danny"), 23, insertRow(StringData.fromString("id1"), StringData.fromString("Danny"), 23,
TimestampData.fromEpochMillis(1), StringData.fromString("par1")), TimestampData.fromEpochMillis(1), StringData.fromString("par1")),
binaryRow(StringData.fromString("id2"), StringData.fromString("Stephen"), 33, insertRow(StringData.fromString("id2"), StringData.fromString("Stephen"), 33,
TimestampData.fromEpochMillis(2), StringData.fromString("par1")), TimestampData.fromEpochMillis(2), StringData.fromString("par1")),
binaryRow(StringData.fromString("id3"), StringData.fromString("Julian"), 53, insertRow(StringData.fromString("id3"), StringData.fromString("Julian"), 53,
TimestampData.fromEpochMillis(3), StringData.fromString("par2")), TimestampData.fromEpochMillis(3), StringData.fromString("par2")),
binaryRow(StringData.fromString("id4"), StringData.fromString("Fabian"), 31, insertRow(StringData.fromString("id4"), StringData.fromString("Fabian"), 31,
TimestampData.fromEpochMillis(4), StringData.fromString("par2")), TimestampData.fromEpochMillis(4), StringData.fromString("par2")),
binaryRow(StringData.fromString("id5"), StringData.fromString("Sophia"), 18, insertRow(StringData.fromString("id5"), StringData.fromString("Sophia"), 18,
TimestampData.fromEpochMillis(5), StringData.fromString("par3")), TimestampData.fromEpochMillis(5), StringData.fromString("par3")),
binaryRow(StringData.fromString("id6"), StringData.fromString("Emma"), 20, insertRow(StringData.fromString("id6"), StringData.fromString("Emma"), 20,
TimestampData.fromEpochMillis(6), StringData.fromString("par3")), TimestampData.fromEpochMillis(6), StringData.fromString("par3")),
binaryRow(StringData.fromString("id7"), StringData.fromString("Bob"), 44, insertRow(StringData.fromString("id7"), StringData.fromString("Bob"), 44,
TimestampData.fromEpochMillis(7), StringData.fromString("par4")), TimestampData.fromEpochMillis(7), StringData.fromString("par4")),
binaryRow(StringData.fromString("id8"), StringData.fromString("Han"), 56, insertRow(StringData.fromString("id8"), StringData.fromString("Han"), 56,
TimestampData.fromEpochMillis(8), StringData.fromString("par4")) TimestampData.fromEpochMillis(8), StringData.fromString("par4"))
); );
public static List<RowData> DATA_SET_TWO = Arrays.asList( public static List<RowData> DATA_SET_UPDATE_INSERT = Arrays.asList(
// advance the age by 1 // advance the age by 1
binaryRow(StringData.fromString("id1"), StringData.fromString("Danny"), 24, insertRow(StringData.fromString("id1"), StringData.fromString("Danny"), 24,
TimestampData.fromEpochMillis(1), StringData.fromString("par1")), TimestampData.fromEpochMillis(1), StringData.fromString("par1")),
binaryRow(StringData.fromString("id2"), StringData.fromString("Stephen"), 34, insertRow(StringData.fromString("id2"), StringData.fromString("Stephen"), 34,
TimestampData.fromEpochMillis(2), StringData.fromString("par1")), TimestampData.fromEpochMillis(2), StringData.fromString("par1")),
binaryRow(StringData.fromString("id3"), StringData.fromString("Julian"), 54, insertRow(StringData.fromString("id3"), StringData.fromString("Julian"), 54,
TimestampData.fromEpochMillis(3), StringData.fromString("par2")), TimestampData.fromEpochMillis(3), StringData.fromString("par2")),
binaryRow(StringData.fromString("id4"), StringData.fromString("Fabian"), 32, insertRow(StringData.fromString("id4"), StringData.fromString("Fabian"), 32,
TimestampData.fromEpochMillis(4), StringData.fromString("par2")), TimestampData.fromEpochMillis(4), StringData.fromString("par2")),
// same with before // same with before
binaryRow(StringData.fromString("id5"), StringData.fromString("Sophia"), 18, insertRow(StringData.fromString("id5"), StringData.fromString("Sophia"), 18,
TimestampData.fromEpochMillis(5), StringData.fromString("par3")), TimestampData.fromEpochMillis(5), StringData.fromString("par3")),
// new data // new data
binaryRow(StringData.fromString("id9"), StringData.fromString("Jane"), 19, insertRow(StringData.fromString("id9"), StringData.fromString("Jane"), 19,
TimestampData.fromEpochMillis(6), StringData.fromString("par3")), TimestampData.fromEpochMillis(6), StringData.fromString("par3")),
binaryRow(StringData.fromString("id10"), StringData.fromString("Ella"), 38, insertRow(StringData.fromString("id10"), StringData.fromString("Ella"), 38,
TimestampData.fromEpochMillis(7), StringData.fromString("par4")), TimestampData.fromEpochMillis(7), StringData.fromString("par4")),
binaryRow(StringData.fromString("id11"), StringData.fromString("Phoebe"), 52, insertRow(StringData.fromString("id11"), StringData.fromString("Phoebe"), 52,
TimestampData.fromEpochMillis(8), StringData.fromString("par4")) TimestampData.fromEpochMillis(8), StringData.fromString("par4"))
); );
public static List<RowData> DATA_SET_THREE = new ArrayList<>(); public static List<RowData> DATA_SET_INSERT_DUPLICATES = new ArrayList<>();
static { static {
IntStream.range(0, 5).forEach(i -> DATA_SET_THREE.add( IntStream.range(0, 5).forEach(i -> DATA_SET_INSERT_DUPLICATES.add(
binaryRow(StringData.fromString("id1"), StringData.fromString("Danny"), 23, insertRow(StringData.fromString("id1"), StringData.fromString("Danny"), 23,
TimestampData.fromEpochMillis(1), StringData.fromString("par1")))); TimestampData.fromEpochMillis(1), StringData.fromString("par1"))));
} }
// data set of test_source.data // data set of test_source.data
public static List<RowData> DATA_SET_FOUR = Arrays.asList( public static List<RowData> DATA_SET_SOURCE_INSERT = Arrays.asList(
binaryRow(StringData.fromString("id1"), StringData.fromString("Danny"), 23, insertRow(StringData.fromString("id1"), StringData.fromString("Danny"), 23,
TimestampData.fromEpochMillis(1000), StringData.fromString("par1")), TimestampData.fromEpochMillis(1000), StringData.fromString("par1")),
binaryRow(StringData.fromString("id2"), StringData.fromString("Stephen"), 33, insertRow(StringData.fromString("id2"), StringData.fromString("Stephen"), 33,
TimestampData.fromEpochMillis(2000), StringData.fromString("par1")), TimestampData.fromEpochMillis(2000), StringData.fromString("par1")),
binaryRow(StringData.fromString("id3"), StringData.fromString("Julian"), 53, insertRow(StringData.fromString("id3"), StringData.fromString("Julian"), 53,
TimestampData.fromEpochMillis(3000), StringData.fromString("par2")), TimestampData.fromEpochMillis(3000), StringData.fromString("par2")),
binaryRow(StringData.fromString("id4"), StringData.fromString("Fabian"), 31, insertRow(StringData.fromString("id4"), StringData.fromString("Fabian"), 31,
TimestampData.fromEpochMillis(4000), StringData.fromString("par2")), TimestampData.fromEpochMillis(4000), StringData.fromString("par2")),
binaryRow(StringData.fromString("id5"), StringData.fromString("Sophia"), 18, insertRow(StringData.fromString("id5"), StringData.fromString("Sophia"), 18,
TimestampData.fromEpochMillis(5000), StringData.fromString("par3")), TimestampData.fromEpochMillis(5000), StringData.fromString("par3")),
binaryRow(StringData.fromString("id6"), StringData.fromString("Emma"), 20, insertRow(StringData.fromString("id6"), StringData.fromString("Emma"), 20,
TimestampData.fromEpochMillis(6000), StringData.fromString("par3")), TimestampData.fromEpochMillis(6000), StringData.fromString("par3")),
binaryRow(StringData.fromString("id7"), StringData.fromString("Bob"), 44, insertRow(StringData.fromString("id7"), StringData.fromString("Bob"), 44,
TimestampData.fromEpochMillis(7000), StringData.fromString("par4")), TimestampData.fromEpochMillis(7000), StringData.fromString("par4")),
binaryRow(StringData.fromString("id8"), StringData.fromString("Han"), 56, insertRow(StringData.fromString("id8"), StringData.fromString("Han"), 56,
TimestampData.fromEpochMillis(8000), StringData.fromString("par4")) TimestampData.fromEpochMillis(8000), StringData.fromString("par4"))
); );
// merged data set of test_source.data and test_source2.data // merged data set of test_source.data and test_source2.data
public static List<RowData> DATA_SET_FIVE = Arrays.asList( public static List<RowData> DATA_SET_SOURCE_MERGED = Arrays.asList(
binaryRow(StringData.fromString("id1"), StringData.fromString("Danny"), 24, insertRow(StringData.fromString("id1"), StringData.fromString("Danny"), 24,
TimestampData.fromEpochMillis(1000), StringData.fromString("par1")), TimestampData.fromEpochMillis(1000), StringData.fromString("par1")),
binaryRow(StringData.fromString("id2"), StringData.fromString("Stephen"), 34, insertRow(StringData.fromString("id2"), StringData.fromString("Stephen"), 34,
TimestampData.fromEpochMillis(2000), StringData.fromString("par1")), TimestampData.fromEpochMillis(2000), StringData.fromString("par1")),
binaryRow(StringData.fromString("id3"), StringData.fromString("Julian"), 54, insertRow(StringData.fromString("id3"), StringData.fromString("Julian"), 54,
TimestampData.fromEpochMillis(3000), StringData.fromString("par2")), TimestampData.fromEpochMillis(3000), StringData.fromString("par2")),
binaryRow(StringData.fromString("id4"), StringData.fromString("Fabian"), 32, insertRow(StringData.fromString("id4"), StringData.fromString("Fabian"), 32,
TimestampData.fromEpochMillis(4000), StringData.fromString("par2")), TimestampData.fromEpochMillis(4000), StringData.fromString("par2")),
binaryRow(StringData.fromString("id5"), StringData.fromString("Sophia"), 18, insertRow(StringData.fromString("id5"), StringData.fromString("Sophia"), 18,
TimestampData.fromEpochMillis(5000), StringData.fromString("par3")), TimestampData.fromEpochMillis(5000), StringData.fromString("par3")),
binaryRow(StringData.fromString("id6"), StringData.fromString("Emma"), 20, insertRow(StringData.fromString("id6"), StringData.fromString("Emma"), 20,
TimestampData.fromEpochMillis(6000), StringData.fromString("par3")), TimestampData.fromEpochMillis(6000), StringData.fromString("par3")),
binaryRow(StringData.fromString("id7"), StringData.fromString("Bob"), 44, insertRow(StringData.fromString("id7"), StringData.fromString("Bob"), 44,
TimestampData.fromEpochMillis(7000), StringData.fromString("par4")), TimestampData.fromEpochMillis(7000), StringData.fromString("par4")),
binaryRow(StringData.fromString("id8"), StringData.fromString("Han"), 56, insertRow(StringData.fromString("id8"), StringData.fromString("Han"), 56,
TimestampData.fromEpochMillis(8000), StringData.fromString("par4")), TimestampData.fromEpochMillis(8000), StringData.fromString("par4")),
binaryRow(StringData.fromString("id9"), StringData.fromString("Jane"), 19, insertRow(StringData.fromString("id9"), StringData.fromString("Jane"), 19,
TimestampData.fromEpochMillis(6000), StringData.fromString("par3")), TimestampData.fromEpochMillis(6000), StringData.fromString("par3")),
binaryRow(StringData.fromString("id10"), StringData.fromString("Ella"), 38, insertRow(StringData.fromString("id10"), StringData.fromString("Ella"), 38,
TimestampData.fromEpochMillis(7000), StringData.fromString("par4")), TimestampData.fromEpochMillis(7000), StringData.fromString("par4")),
binaryRow(StringData.fromString("id11"), StringData.fromString("Phoebe"), 52, insertRow(StringData.fromString("id11"), StringData.fromString("Phoebe"), 52,
TimestampData.fromEpochMillis(8000), StringData.fromString("par4")) TimestampData.fromEpochMillis(8000), StringData.fromString("par4"))
); );
public static List<RowData> DATA_SET_UPDATE_DELETE = Arrays.asList(
// this is update
insertRow(StringData.fromString("id1"), StringData.fromString("Danny"), 24,
TimestampData.fromEpochMillis(1), StringData.fromString("par1")),
insertRow(StringData.fromString("id2"), StringData.fromString("Stephen"), 34,
TimestampData.fromEpochMillis(2), StringData.fromString("par1")),
// this is delete
deleteRow(StringData.fromString("id3"), StringData.fromString("Julian"), 53,
TimestampData.fromEpochMillis(3), StringData.fromString("par2")),
deleteRow(StringData.fromString("id5"), StringData.fromString("Sophia"), 18,
TimestampData.fromEpochMillis(5), StringData.fromString("par3")),
// delete a record that has no inserts
deleteRow(StringData.fromString("id9"), StringData.fromString("Jane"), 19,
TimestampData.fromEpochMillis(6), StringData.fromString("par3"))
);
/** /**
* Returns string format of a list of RowData. * Returns string format of a list of RowData.
*/ */
@@ -388,11 +406,16 @@ public class TestData {
List<String> readBuffer = scanner.getRecords().values().stream() List<String> readBuffer = scanner.getRecords().values().stream()
.map(hoodieRecord -> { .map(hoodieRecord -> {
try { try {
return filterOutVariables((GenericRecord) hoodieRecord.getData().getInsertValue(schema, new Properties()).get()); // in case it is a delete
GenericRecord record = (GenericRecord) hoodieRecord.getData()
.getInsertValue(schema, new Properties())
.orElse(null);
return record == null ? (String) null : filterOutVariables(record);
} catch (IOException e) { } catch (IOException e) {
throw new RuntimeException(e); throw new RuntimeException(e);
} }
}) })
.filter(Objects::nonNull)
.sorted(Comparator.naturalOrder()) .sorted(Comparator.naturalOrder())
.collect(Collectors.toList()); .collect(Collectors.toList());
assertThat(readBuffer.toString(), is(expected.get(partitionDir.getName()))); assertThat(readBuffer.toString(), is(expected.get(partitionDir.getName())));
@@ -437,7 +460,7 @@ public class TestData {
return Strings.join(fields, ","); return Strings.join(fields, ",");
} }
private static BinaryRowData binaryRow(Object... fields) { private static BinaryRowData insertRow(Object... fields) {
LogicalType[] types = TestConfigurations.ROW_TYPE.getFields().stream().map(RowType.RowField::getType) LogicalType[] types = TestConfigurations.ROW_TYPE.getFields().stream().map(RowType.RowField::getType)
.toArray(LogicalType[]::new); .toArray(LogicalType[]::new);
assertEquals( assertEquals(
@@ -458,4 +481,10 @@ public class TestData {
writer.complete(); writer.complete();
return row; return row;
} }
private static BinaryRowData deleteRow(Object... fields) {
BinaryRowData rowData = insertRow(fields);
rowData.setRowKind(RowKind.DELETE);
return rowData;
}
} }

View File

@@ -95,12 +95,12 @@ public class HoodieDataSourceITCase extends AbstractTestBase {
execInsertSql(streamTableEnv, insertInto); execInsertSql(streamTableEnv, insertInto);
List<Row> rows = execSelectSql(streamTableEnv, "select * from t1", 10); List<Row> rows = execSelectSql(streamTableEnv, "select * from t1", 10);
assertRowsEquals(rows, TestData.DATA_SET_FOUR); assertRowsEquals(rows, TestData.DATA_SET_SOURCE_INSERT);
// insert another batch of data // insert another batch of data
execInsertSql(streamTableEnv, insertInto); execInsertSql(streamTableEnv, insertInto);
List<Row> rows2 = execSelectSql(streamTableEnv, "select * from t1", 10); List<Row> rows2 = execSelectSql(streamTableEnv, "select * from t1", 10);
assertRowsEquals(rows2, TestData.DATA_SET_FOUR); assertRowsEquals(rows2, TestData.DATA_SET_SOURCE_INSERT);
} }
@Test @Test
@@ -135,7 +135,7 @@ public class HoodieDataSourceITCase extends AbstractTestBase {
List<Row> rows = execSelectSql(streamTableEnv, "select * from t2", 10); List<Row> rows = execSelectSql(streamTableEnv, "select * from t2", 10);
// all the data with same keys are appended within one data bucket and one log file, // all the data with same keys are appended within one data bucket and one log file,
// so when consume, the same keys are merged // so when consume, the same keys are merged
assertRowsEquals(rows, TestData.DATA_SET_FIVE); assertRowsEquals(rows, TestData.DATA_SET_SOURCE_MERGED);
} }
@Test @Test
@@ -156,7 +156,7 @@ public class HoodieDataSourceITCase extends AbstractTestBase {
List<Row> rows = CollectionUtil.iterableToList( List<Row> rows = CollectionUtil.iterableToList(
() -> streamTableEnv.sqlQuery("select * from t1").execute().collect()); () -> streamTableEnv.sqlQuery("select * from t1").execute().collect());
assertRowsEquals(rows, TestData.DATA_SET_FOUR); assertRowsEquals(rows, TestData.DATA_SET_SOURCE_INSERT);
} }
@Test @Test
@@ -182,7 +182,7 @@ public class HoodieDataSourceITCase extends AbstractTestBase {
List<Row> rows = CollectionUtil.iterableToList( List<Row> rows = CollectionUtil.iterableToList(
() -> batchTableEnv.sqlQuery("select * from t1").execute().collect()); () -> batchTableEnv.sqlQuery("select * from t1").execute().collect());
assertRowsEquals(rows, TestData.DATA_SET_FOUR); assertRowsEquals(rows, TestData.DATA_SET_SOURCE_INSERT);
} }
private void execInsertSql(TableEnvironment tEnv, String insert) { private void execInsertSql(TableEnvironment tEnv, String insert) {

View File

@@ -101,7 +101,7 @@ public class TestHoodieTableSource {
@Test @Test
void testGetInputFormat() throws Exception { void testGetInputFormat() throws Exception {
// write some data to let the TableSchemaResolver get the right instant // write some data to let the TableSchemaResolver get the right instant
TestData.writeData(TestData.DATA_SET_ONE, conf); TestData.writeData(TestData.DATA_SET_INSERT, conf);
HoodieTableSource tableSource = new HoodieTableSource( HoodieTableSource tableSource = new HoodieTableSource(
TestConfigurations.TABLE_SCHEMA, TestConfigurations.TABLE_SCHEMA,

View File

@@ -71,7 +71,7 @@ public class TestStreamReadMonitoringFunction {
@Test @Test
public void testConsumeFromLatestCommit() throws Exception { public void testConsumeFromLatestCommit() throws Exception {
TestData.writeData(TestData.DATA_SET_ONE, conf); TestData.writeData(TestData.DATA_SET_INSERT, conf);
StreamReadMonitoringFunction function = TestUtils.getMonitorFunc(conf); StreamReadMonitoringFunction function = TestUtils.getMonitorFunc(conf);
try (AbstractStreamOperatorTestHarness<MergeOnReadInputSplit> harness = createHarness(function)) { try (AbstractStreamOperatorTestHarness<MergeOnReadInputSplit> harness = createHarness(function)) {
harness.setup(); harness.setup();
@@ -95,7 +95,7 @@ public class TestStreamReadMonitoringFunction {
sourceContext.reset(latch); sourceContext.reset(latch);
// write another instant and validate // write another instant and validate
TestData.writeData(TestData.DATA_SET_TWO, conf); TestData.writeData(TestData.DATA_SET_UPDATE_INSERT, conf);
assertTrue(latch.await(WAIT_TIME_MILLIS, TimeUnit.MILLISECONDS), "Should finish splits generation"); assertTrue(latch.await(WAIT_TIME_MILLIS, TimeUnit.MILLISECONDS), "Should finish splits generation");
assertThat("Should produce the expected splits", assertThat("Should produce the expected splits",
@@ -112,8 +112,8 @@ public class TestStreamReadMonitoringFunction {
public void testConsumeFromSpecifiedCommit() throws Exception { public void testConsumeFromSpecifiedCommit() throws Exception {
// write 2 commits first, use the second commit time as the specified start instant, // write 2 commits first, use the second commit time as the specified start instant,
// all the splits should come from the second commit. // all the splits should come from the second commit.
TestData.writeData(TestData.DATA_SET_ONE, conf); TestData.writeData(TestData.DATA_SET_INSERT, conf);
TestData.writeData(TestData.DATA_SET_TWO, conf); TestData.writeData(TestData.DATA_SET_UPDATE_INSERT, conf);
String specifiedCommit = TestUtils.getLatestCommit(tempFile.getAbsolutePath()); String specifiedCommit = TestUtils.getLatestCommit(tempFile.getAbsolutePath());
conf.setString(FlinkOptions.READ_STREAMING_START_COMMIT, specifiedCommit); conf.setString(FlinkOptions.READ_STREAMING_START_COMMIT, specifiedCommit);
StreamReadMonitoringFunction function = TestUtils.getMonitorFunc(conf); StreamReadMonitoringFunction function = TestUtils.getMonitorFunc(conf);
@@ -141,7 +141,7 @@ public class TestStreamReadMonitoringFunction {
@Test @Test
public void testCheckpointRestore() throws Exception { public void testCheckpointRestore() throws Exception {
TestData.writeData(TestData.DATA_SET_ONE, conf); TestData.writeData(TestData.DATA_SET_INSERT, conf);
StreamReadMonitoringFunction function = TestUtils.getMonitorFunc(conf); StreamReadMonitoringFunction function = TestUtils.getMonitorFunc(conf);
OperatorSubtaskState state; OperatorSubtaskState state;
@@ -169,7 +169,7 @@ public class TestStreamReadMonitoringFunction {
} }
TestData.writeData(TestData.DATA_SET_TWO, conf); TestData.writeData(TestData.DATA_SET_UPDATE_INSERT, conf);
StreamReadMonitoringFunction function2 = TestUtils.getMonitorFunc(conf); StreamReadMonitoringFunction function2 = TestUtils.getMonitorFunc(conf);
try (AbstractStreamOperatorTestHarness<MergeOnReadInputSplit> harness = createHarness(function2)) { try (AbstractStreamOperatorTestHarness<MergeOnReadInputSplit> harness = createHarness(function2)) {
harness.setup(); harness.setup();

View File

@@ -93,7 +93,7 @@ public class TestStreamReadOperator {
@Test @Test
void testWriteRecords() throws Exception { void testWriteRecords() throws Exception {
TestData.writeData(TestData.DATA_SET_ONE, conf); TestData.writeData(TestData.DATA_SET_INSERT, conf);
try (OneInputStreamOperatorTestHarness<MergeOnReadInputSplit, RowData> harness = createReader()) { try (OneInputStreamOperatorTestHarness<MergeOnReadInputSplit, RowData> harness = createReader()) {
harness.setup(); harness.setup();
harness.open(); harness.open();
@@ -111,9 +111,9 @@ public class TestStreamReadOperator {
assertThat("Should process 1 split", processor.runMailboxStep()); assertThat("Should process 1 split", processor.runMailboxStep());
} }
// Assert the output has expected elements. // Assert the output has expected elements.
TestData.assertRowDataEquals(harness.extractOutputValues(), TestData.DATA_SET_ONE); TestData.assertRowDataEquals(harness.extractOutputValues(), TestData.DATA_SET_INSERT);
TestData.writeData(TestData.DATA_SET_TWO, conf); TestData.writeData(TestData.DATA_SET_UPDATE_INSERT, conf);
final List<MergeOnReadInputSplit> splits2 = generateSplits(func); final List<MergeOnReadInputSplit> splits2 = generateSplits(func);
assertThat("Should have 4 splits", splits2.size(), is(4)); assertThat("Should have 4 splits", splits2.size(), is(4));
for (MergeOnReadInputSplit split : splits2) { for (MergeOnReadInputSplit split : splits2) {
@@ -124,8 +124,8 @@ public class TestStreamReadOperator {
assertThat("Should processed 1 split", processor.runMailboxStep()); assertThat("Should processed 1 split", processor.runMailboxStep());
} }
// The result sets behaves like append only: DATA_SET_ONE + DATA_SET_TWO // The result sets behaves like append only: DATA_SET_ONE + DATA_SET_TWO
List<RowData> expected = new ArrayList<>(TestData.DATA_SET_ONE); List<RowData> expected = new ArrayList<>(TestData.DATA_SET_INSERT);
expected.addAll(TestData.DATA_SET_TWO); expected.addAll(TestData.DATA_SET_UPDATE_INSERT);
TestData.assertRowDataEquals(harness.extractOutputValues(), expected); TestData.assertRowDataEquals(harness.extractOutputValues(), expected);
} }
} }
@@ -134,7 +134,7 @@ public class TestStreamReadOperator {
public void testCheckpoint() throws Exception { public void testCheckpoint() throws Exception {
// Received emitted splits: split1, split2, split3, split4, checkpoint request is triggered // Received emitted splits: split1, split2, split3, split4, checkpoint request is triggered
// when reading records from split1. // when reading records from split1.
TestData.writeData(TestData.DATA_SET_ONE, conf); TestData.writeData(TestData.DATA_SET_INSERT, conf);
long timestamp = 0; long timestamp = 0;
try (OneInputStreamOperatorTestHarness<MergeOnReadInputSplit, RowData> harness = createReader()) { try (OneInputStreamOperatorTestHarness<MergeOnReadInputSplit, RowData> harness = createReader()) {
harness.setup(); harness.setup();
@@ -170,13 +170,13 @@ public class TestStreamReadOperator {
assertTrue(processor.runMailboxStep(), "Should have processed the split3"); assertTrue(processor.runMailboxStep(), "Should have processed the split3");
// Assert the output has expected elements. // Assert the output has expected elements.
TestData.assertRowDataEquals(harness.extractOutputValues(), TestData.DATA_SET_ONE); TestData.assertRowDataEquals(harness.extractOutputValues(), TestData.DATA_SET_INSERT);
} }
} }
@Test @Test
public void testCheckpointRestore() throws Exception { public void testCheckpointRestore() throws Exception {
TestData.writeData(TestData.DATA_SET_ONE, conf); TestData.writeData(TestData.DATA_SET_INSERT, conf);
OperatorSubtaskState state; OperatorSubtaskState state;
final List<MergeOnReadInputSplit> splits; final List<MergeOnReadInputSplit> splits;

View File

@@ -76,18 +76,18 @@ public class TestInputFormat {
void testRead(HoodieTableType tableType) throws Exception { void testRead(HoodieTableType tableType) throws Exception {
beforeEach(tableType); beforeEach(tableType);
TestData.writeData(TestData.DATA_SET_ONE, conf); TestData.writeData(TestData.DATA_SET_INSERT, conf);
InputFormat<RowData, ?> inputFormat = this.tableSource.getInputFormat(); InputFormat<RowData, ?> inputFormat = this.tableSource.getInputFormat();
List<RowData> result = readData(inputFormat); List<RowData> result = readData(inputFormat);
String actual = TestData.rowDataToString(result); String actual = TestData.rowDataToString(result);
String expected = TestData.rowDataToString(TestData.DATA_SET_ONE); String expected = TestData.rowDataToString(TestData.DATA_SET_INSERT);
assertThat(actual, is(expected)); assertThat(actual, is(expected));
// write another commit to read again // write another commit to read again
TestData.writeData(TestData.DATA_SET_TWO, conf); TestData.writeData(TestData.DATA_SET_UPDATE_INSERT, conf);
// refresh the input format // refresh the input format
this.tableSource.reloadActiveTimeline(); this.tableSource.reloadActiveTimeline();
@@ -116,19 +116,19 @@ public class TestInputFormat {
// write parquet first with compaction // write parquet first with compaction
conf.setBoolean(FlinkOptions.COMPACTION_ASYNC_ENABLED, true); conf.setBoolean(FlinkOptions.COMPACTION_ASYNC_ENABLED, true);
TestData.writeData(TestData.DATA_SET_ONE, conf); TestData.writeData(TestData.DATA_SET_INSERT, conf);
InputFormat<RowData, ?> inputFormat = this.tableSource.getInputFormat(); InputFormat<RowData, ?> inputFormat = this.tableSource.getInputFormat();
List<RowData> result = readData(inputFormat); List<RowData> result = readData(inputFormat);
String actual = TestData.rowDataToString(result); String actual = TestData.rowDataToString(result);
String expected = TestData.rowDataToString(TestData.DATA_SET_ONE); String expected = TestData.rowDataToString(TestData.DATA_SET_INSERT);
assertThat(actual, is(expected)); assertThat(actual, is(expected));
// write another commit using logs and read again // write another commit using logs and read again
conf.setBoolean(FlinkOptions.COMPACTION_ASYNC_ENABLED, false); conf.setBoolean(FlinkOptions.COMPACTION_ASYNC_ENABLED, false);
TestData.writeData(TestData.DATA_SET_TWO, conf); TestData.writeData(TestData.DATA_SET_UPDATE_INSERT, conf);
// refresh the input format // refresh the input format
this.tableSource.reloadActiveTimeline(); this.tableSource.reloadActiveTimeline();
@@ -156,7 +156,7 @@ public class TestInputFormat {
void testReadWithPartitionPrune(HoodieTableType tableType) throws Exception { void testReadWithPartitionPrune(HoodieTableType tableType) throws Exception {
beforeEach(tableType); beforeEach(tableType);
TestData.writeData(TestData.DATA_SET_ONE, conf); TestData.writeData(TestData.DATA_SET_INSERT, conf);
Map<String, String> prunedPartitions = new HashMap<>(); Map<String, String> prunedPartitions = new HashMap<>();
prunedPartitions.put("partition", "par1"); prunedPartitions.put("partition", "par1");