diff --git a/hudi-flink/src/main/java/org/apache/hudi/operator/transform/RowDataToHoodieFunction.java b/hudi-flink/src/main/java/org/apache/hudi/operator/transform/RowDataToHoodieFunction.java index 2d47c7961..611a277d2 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/operator/transform/RowDataToHoodieFunction.java +++ b/hudi-flink/src/main/java/org/apache/hudi/operator/transform/RowDataToHoodieFunction.java @@ -19,6 +19,7 @@ package org.apache.hudi.operator.transform; import org.apache.hudi.avro.HoodieAvroUtils; +import org.apache.hudi.common.model.HoodieKey; import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieRecordPayload; import org.apache.hudi.common.model.WriteOperationType; @@ -33,6 +34,7 @@ import org.apache.flink.api.common.functions.RichMapFunction; import org.apache.flink.configuration.Configuration; import org.apache.flink.table.data.RowData; import org.apache.flink.table.types.logical.RowType; +import org.apache.flink.types.RowKind; import java.io.IOException; @@ -100,9 +102,12 @@ public class RowDataToHoodieFunction(keyGenerator.getKey(gr), payload); + return new HoodieRecord<>(hoodieKey, payload); } } diff --git a/hudi-flink/src/test/java/org/apache/hudi/operator/TestWriteCopyOnWrite.java b/hudi-flink/src/test/java/org/apache/hudi/operator/TestWriteCopyOnWrite.java index e9c678b8f..b8c9b3c22 100644 --- a/hudi-flink/src/test/java/org/apache/hudi/operator/TestWriteCopyOnWrite.java +++ b/hudi-flink/src/test/java/org/apache/hudi/operator/TestWriteCopyOnWrite.java @@ -114,7 +114,7 @@ public class TestWriteCopyOnWrite { public void testCheckpoint() throws Exception { // open the function and ingest data funcWrapper.openFunction(); - for (RowData rowData : TestData.DATA_SET_ONE) { + for (RowData rowData : TestData.DATA_SET_INSERT) { funcWrapper.invoke(rowData); } @@ -200,7 +200,7 @@ public class TestWriteCopyOnWrite { checkInstantState(funcWrapper.getWriteClient(), HoodieInstant.State.REQUESTED, null); checkInstantState(funcWrapper.getWriteClient(), HoodieInstant.State.COMPLETED, null); - for (RowData rowData : TestData.DATA_SET_ONE) { + for (RowData rowData : TestData.DATA_SET_INSERT) { funcWrapper.invoke(rowData); } @@ -215,7 +215,7 @@ public class TestWriteCopyOnWrite { public void testInsert() throws Exception { // open the function and ingest data funcWrapper.openFunction(); - for (RowData rowData : TestData.DATA_SET_ONE) { + for (RowData rowData : TestData.DATA_SET_INSERT) { funcWrapper.invoke(rowData); } @@ -248,7 +248,7 @@ public class TestWriteCopyOnWrite { // open the function and ingest data funcWrapper.openFunction(); - for (RowData rowData : TestData.DATA_SET_THREE) { + for (RowData rowData : TestData.DATA_SET_INSERT_DUPLICATES) { funcWrapper.invoke(rowData); } @@ -267,7 +267,7 @@ public class TestWriteCopyOnWrite { checkWrittenData(tempFile, EXPECTED3, 1); // insert duplicates again - for (RowData rowData : TestData.DATA_SET_THREE) { + for (RowData rowData : TestData.DATA_SET_INSERT_DUPLICATES) { funcWrapper.invoke(rowData); } @@ -284,7 +284,7 @@ public class TestWriteCopyOnWrite { public void testUpsert() throws Exception { // open the function and ingest data funcWrapper.openFunction(); - for (RowData rowData : TestData.DATA_SET_ONE) { + for (RowData rowData : TestData.DATA_SET_INSERT) { funcWrapper.invoke(rowData); } @@ -301,7 +301,7 @@ public class TestWriteCopyOnWrite { funcWrapper.checkpointComplete(1); // upsert another data buffer - for (RowData rowData : TestData.DATA_SET_TWO) { + for (RowData rowData : TestData.DATA_SET_UPDATE_INSERT) { funcWrapper.invoke(rowData); } // the data is not flushed yet @@ -325,6 +325,58 @@ public class TestWriteCopyOnWrite { checkWrittenData(tempFile, EXPECTED2); } + @Test + public void testUpsertWithDelete() throws Exception { + // open the function and ingest data + funcWrapper.openFunction(); + for (RowData rowData : TestData.DATA_SET_INSERT) { + funcWrapper.invoke(rowData); + } + + assertEmptyDataFiles(); + // this triggers the data write and event send + funcWrapper.checkpointFunction(1); + + OperatorEvent nextEvent = funcWrapper.getNextEvent(); + assertThat("The operator expect to send an event", nextEvent, instanceOf(BatchWriteSuccessEvent.class)); + + funcWrapper.getCoordinator().handleEventFromOperator(0, nextEvent); + assertNotNull(funcWrapper.getEventBuffer()[0], "The coordinator missed the event"); + + funcWrapper.checkpointComplete(1); + + // upsert another data buffer + for (RowData rowData : TestData.DATA_SET_UPDATE_DELETE) { + funcWrapper.invoke(rowData); + } + // the data is not flushed yet + checkWrittenData(tempFile, EXPECTED1); + // this triggers the data write and event send + funcWrapper.checkpointFunction(2); + + String instant = funcWrapper.getWriteClient() + .getInflightAndRequestedInstant(getTableType()); + + nextEvent = funcWrapper.getNextEvent(); + assertThat("The operator expect to send an event", nextEvent, instanceOf(BatchWriteSuccessEvent.class)); + + funcWrapper.getCoordinator().handleEventFromOperator(0, nextEvent); + assertNotNull(funcWrapper.getEventBuffer()[0], "The coordinator missed the event"); + + checkInstantState(funcWrapper.getWriteClient(), HoodieInstant.State.REQUESTED, instant); + funcWrapper.checkpointComplete(2); + // the coordinator checkpoint commits the inflight instant. + checkInstantState(funcWrapper.getWriteClient(), HoodieInstant.State.COMPLETED, instant); + + Map expected = new HashMap<>(); + // id3, id5 were deleted and id9 is ignored + expected.put("par1", "[id1,par1,id1,Danny,24,1,par1, id2,par1,id2,Stephen,34,2,par1]"); + expected.put("par2", "[id4,par2,id4,Fabian,31,4,par2]"); + expected.put("par3", "[id6,par3,id6,Emma,20,6,par3]"); + expected.put("par4", "[id7,par4,id7,Bob,44,7,par4, id8,par4,id8,Han,56,8,par4]"); + checkWrittenData(tempFile, expected); + } + @Test public void testInsertWithMiniBatches() throws Exception { // reset the config option @@ -334,7 +386,7 @@ public class TestWriteCopyOnWrite { // open the function and ingest data funcWrapper.openFunction(); // Each record is 424 bytes. so 3 records expect to trigger a mini-batch write - for (RowData rowData : TestData.DATA_SET_THREE) { + for (RowData rowData : TestData.DATA_SET_INSERT_DUPLICATES) { funcWrapper.invoke(rowData); } @@ -369,7 +421,7 @@ public class TestWriteCopyOnWrite { checkInstantState(funcWrapper.getWriteClient(), HoodieInstant.State.COMPLETED, instant); // insert duplicates again - for (RowData rowData : TestData.DATA_SET_THREE) { + for (RowData rowData : TestData.DATA_SET_INSERT_DUPLICATES) { funcWrapper.invoke(rowData); } @@ -401,7 +453,7 @@ public class TestWriteCopyOnWrite { public void testIndexStateBootstrap() throws Exception { // open the function and ingest data funcWrapper.openFunction(); - for (RowData rowData : TestData.DATA_SET_ONE) { + for (RowData rowData : TestData.DATA_SET_INSERT) { funcWrapper.invoke(rowData); } @@ -421,7 +473,7 @@ public class TestWriteCopyOnWrite { funcWrapper.clearIndexState(); // upsert another data buffer - for (RowData rowData : TestData.DATA_SET_TWO) { + for (RowData rowData : TestData.DATA_SET_UPDATE_INSERT) { funcWrapper.invoke(rowData); } checkIndexLoaded( diff --git a/hudi-flink/src/test/java/org/apache/hudi/operator/utils/TestData.java b/hudi-flink/src/test/java/org/apache/hudi/operator/utils/TestData.java index efeb0dd4d..3b63febdc 100644 --- a/hudi-flink/src/test/java/org/apache/hudi/operator/utils/TestData.java +++ b/hudi-flink/src/test/java/org/apache/hudi/operator/utils/TestData.java @@ -44,6 +44,7 @@ import org.apache.flink.table.runtime.types.InternalSerializers; import org.apache.flink.table.types.logical.LogicalType; import org.apache.flink.table.types.logical.RowType; import org.apache.flink.types.Row; +import org.apache.flink.types.RowKind; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.parquet.Strings; @@ -58,6 +59,7 @@ import java.util.Arrays; import java.util.Comparator; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.Properties; import java.util.stream.Collectors; import java.util.stream.IntStream; @@ -70,100 +72,116 @@ import static org.junit.jupiter.api.Assertions.assertTrue; /** Data set for testing, also some utilities to check the results. */ public class TestData { - public static List DATA_SET_ONE = Arrays.asList( - binaryRow(StringData.fromString("id1"), StringData.fromString("Danny"), 23, + public static List DATA_SET_INSERT = Arrays.asList( + insertRow(StringData.fromString("id1"), StringData.fromString("Danny"), 23, TimestampData.fromEpochMillis(1), StringData.fromString("par1")), - binaryRow(StringData.fromString("id2"), StringData.fromString("Stephen"), 33, + insertRow(StringData.fromString("id2"), StringData.fromString("Stephen"), 33, TimestampData.fromEpochMillis(2), StringData.fromString("par1")), - binaryRow(StringData.fromString("id3"), StringData.fromString("Julian"), 53, + insertRow(StringData.fromString("id3"), StringData.fromString("Julian"), 53, TimestampData.fromEpochMillis(3), StringData.fromString("par2")), - binaryRow(StringData.fromString("id4"), StringData.fromString("Fabian"), 31, + insertRow(StringData.fromString("id4"), StringData.fromString("Fabian"), 31, TimestampData.fromEpochMillis(4), StringData.fromString("par2")), - binaryRow(StringData.fromString("id5"), StringData.fromString("Sophia"), 18, + insertRow(StringData.fromString("id5"), StringData.fromString("Sophia"), 18, TimestampData.fromEpochMillis(5), StringData.fromString("par3")), - binaryRow(StringData.fromString("id6"), StringData.fromString("Emma"), 20, + insertRow(StringData.fromString("id6"), StringData.fromString("Emma"), 20, TimestampData.fromEpochMillis(6), StringData.fromString("par3")), - binaryRow(StringData.fromString("id7"), StringData.fromString("Bob"), 44, + insertRow(StringData.fromString("id7"), StringData.fromString("Bob"), 44, TimestampData.fromEpochMillis(7), StringData.fromString("par4")), - binaryRow(StringData.fromString("id8"), StringData.fromString("Han"), 56, + insertRow(StringData.fromString("id8"), StringData.fromString("Han"), 56, TimestampData.fromEpochMillis(8), StringData.fromString("par4")) ); - public static List DATA_SET_TWO = Arrays.asList( + public static List DATA_SET_UPDATE_INSERT = Arrays.asList( // advance the age by 1 - binaryRow(StringData.fromString("id1"), StringData.fromString("Danny"), 24, + insertRow(StringData.fromString("id1"), StringData.fromString("Danny"), 24, TimestampData.fromEpochMillis(1), StringData.fromString("par1")), - binaryRow(StringData.fromString("id2"), StringData.fromString("Stephen"), 34, + insertRow(StringData.fromString("id2"), StringData.fromString("Stephen"), 34, TimestampData.fromEpochMillis(2), StringData.fromString("par1")), - binaryRow(StringData.fromString("id3"), StringData.fromString("Julian"), 54, + insertRow(StringData.fromString("id3"), StringData.fromString("Julian"), 54, TimestampData.fromEpochMillis(3), StringData.fromString("par2")), - binaryRow(StringData.fromString("id4"), StringData.fromString("Fabian"), 32, + insertRow(StringData.fromString("id4"), StringData.fromString("Fabian"), 32, TimestampData.fromEpochMillis(4), StringData.fromString("par2")), // same with before - binaryRow(StringData.fromString("id5"), StringData.fromString("Sophia"), 18, + insertRow(StringData.fromString("id5"), StringData.fromString("Sophia"), 18, TimestampData.fromEpochMillis(5), StringData.fromString("par3")), // new data - binaryRow(StringData.fromString("id9"), StringData.fromString("Jane"), 19, + insertRow(StringData.fromString("id9"), StringData.fromString("Jane"), 19, TimestampData.fromEpochMillis(6), StringData.fromString("par3")), - binaryRow(StringData.fromString("id10"), StringData.fromString("Ella"), 38, + insertRow(StringData.fromString("id10"), StringData.fromString("Ella"), 38, TimestampData.fromEpochMillis(7), StringData.fromString("par4")), - binaryRow(StringData.fromString("id11"), StringData.fromString("Phoebe"), 52, + insertRow(StringData.fromString("id11"), StringData.fromString("Phoebe"), 52, TimestampData.fromEpochMillis(8), StringData.fromString("par4")) ); - public static List DATA_SET_THREE = new ArrayList<>(); + public static List DATA_SET_INSERT_DUPLICATES = new ArrayList<>(); static { - IntStream.range(0, 5).forEach(i -> DATA_SET_THREE.add( - binaryRow(StringData.fromString("id1"), StringData.fromString("Danny"), 23, + IntStream.range(0, 5).forEach(i -> DATA_SET_INSERT_DUPLICATES.add( + insertRow(StringData.fromString("id1"), StringData.fromString("Danny"), 23, TimestampData.fromEpochMillis(1), StringData.fromString("par1")))); } // data set of test_source.data - public static List DATA_SET_FOUR = Arrays.asList( - binaryRow(StringData.fromString("id1"), StringData.fromString("Danny"), 23, + public static List DATA_SET_SOURCE_INSERT = Arrays.asList( + insertRow(StringData.fromString("id1"), StringData.fromString("Danny"), 23, TimestampData.fromEpochMillis(1000), StringData.fromString("par1")), - binaryRow(StringData.fromString("id2"), StringData.fromString("Stephen"), 33, + insertRow(StringData.fromString("id2"), StringData.fromString("Stephen"), 33, TimestampData.fromEpochMillis(2000), StringData.fromString("par1")), - binaryRow(StringData.fromString("id3"), StringData.fromString("Julian"), 53, + insertRow(StringData.fromString("id3"), StringData.fromString("Julian"), 53, TimestampData.fromEpochMillis(3000), StringData.fromString("par2")), - binaryRow(StringData.fromString("id4"), StringData.fromString("Fabian"), 31, + insertRow(StringData.fromString("id4"), StringData.fromString("Fabian"), 31, TimestampData.fromEpochMillis(4000), StringData.fromString("par2")), - binaryRow(StringData.fromString("id5"), StringData.fromString("Sophia"), 18, + insertRow(StringData.fromString("id5"), StringData.fromString("Sophia"), 18, TimestampData.fromEpochMillis(5000), StringData.fromString("par3")), - binaryRow(StringData.fromString("id6"), StringData.fromString("Emma"), 20, + insertRow(StringData.fromString("id6"), StringData.fromString("Emma"), 20, TimestampData.fromEpochMillis(6000), StringData.fromString("par3")), - binaryRow(StringData.fromString("id7"), StringData.fromString("Bob"), 44, + insertRow(StringData.fromString("id7"), StringData.fromString("Bob"), 44, TimestampData.fromEpochMillis(7000), StringData.fromString("par4")), - binaryRow(StringData.fromString("id8"), StringData.fromString("Han"), 56, + insertRow(StringData.fromString("id8"), StringData.fromString("Han"), 56, TimestampData.fromEpochMillis(8000), StringData.fromString("par4")) ); // merged data set of test_source.data and test_source2.data - public static List DATA_SET_FIVE = Arrays.asList( - binaryRow(StringData.fromString("id1"), StringData.fromString("Danny"), 24, + public static List DATA_SET_SOURCE_MERGED = Arrays.asList( + insertRow(StringData.fromString("id1"), StringData.fromString("Danny"), 24, TimestampData.fromEpochMillis(1000), StringData.fromString("par1")), - binaryRow(StringData.fromString("id2"), StringData.fromString("Stephen"), 34, + insertRow(StringData.fromString("id2"), StringData.fromString("Stephen"), 34, TimestampData.fromEpochMillis(2000), StringData.fromString("par1")), - binaryRow(StringData.fromString("id3"), StringData.fromString("Julian"), 54, + insertRow(StringData.fromString("id3"), StringData.fromString("Julian"), 54, TimestampData.fromEpochMillis(3000), StringData.fromString("par2")), - binaryRow(StringData.fromString("id4"), StringData.fromString("Fabian"), 32, + insertRow(StringData.fromString("id4"), StringData.fromString("Fabian"), 32, TimestampData.fromEpochMillis(4000), StringData.fromString("par2")), - binaryRow(StringData.fromString("id5"), StringData.fromString("Sophia"), 18, + insertRow(StringData.fromString("id5"), StringData.fromString("Sophia"), 18, TimestampData.fromEpochMillis(5000), StringData.fromString("par3")), - binaryRow(StringData.fromString("id6"), StringData.fromString("Emma"), 20, + insertRow(StringData.fromString("id6"), StringData.fromString("Emma"), 20, TimestampData.fromEpochMillis(6000), StringData.fromString("par3")), - binaryRow(StringData.fromString("id7"), StringData.fromString("Bob"), 44, + insertRow(StringData.fromString("id7"), StringData.fromString("Bob"), 44, TimestampData.fromEpochMillis(7000), StringData.fromString("par4")), - binaryRow(StringData.fromString("id8"), StringData.fromString("Han"), 56, + insertRow(StringData.fromString("id8"), StringData.fromString("Han"), 56, TimestampData.fromEpochMillis(8000), StringData.fromString("par4")), - binaryRow(StringData.fromString("id9"), StringData.fromString("Jane"), 19, + insertRow(StringData.fromString("id9"), StringData.fromString("Jane"), 19, TimestampData.fromEpochMillis(6000), StringData.fromString("par3")), - binaryRow(StringData.fromString("id10"), StringData.fromString("Ella"), 38, + insertRow(StringData.fromString("id10"), StringData.fromString("Ella"), 38, TimestampData.fromEpochMillis(7000), StringData.fromString("par4")), - binaryRow(StringData.fromString("id11"), StringData.fromString("Phoebe"), 52, + insertRow(StringData.fromString("id11"), StringData.fromString("Phoebe"), 52, TimestampData.fromEpochMillis(8000), StringData.fromString("par4")) ); + public static List DATA_SET_UPDATE_DELETE = Arrays.asList( + // this is update + insertRow(StringData.fromString("id1"), StringData.fromString("Danny"), 24, + TimestampData.fromEpochMillis(1), StringData.fromString("par1")), + insertRow(StringData.fromString("id2"), StringData.fromString("Stephen"), 34, + TimestampData.fromEpochMillis(2), StringData.fromString("par1")), + // this is delete + deleteRow(StringData.fromString("id3"), StringData.fromString("Julian"), 53, + TimestampData.fromEpochMillis(3), StringData.fromString("par2")), + deleteRow(StringData.fromString("id5"), StringData.fromString("Sophia"), 18, + TimestampData.fromEpochMillis(5), StringData.fromString("par3")), + // delete a record that has no inserts + deleteRow(StringData.fromString("id9"), StringData.fromString("Jane"), 19, + TimestampData.fromEpochMillis(6), StringData.fromString("par3")) + ); + /** * Returns string format of a list of RowData. */ @@ -388,11 +406,16 @@ public class TestData { List readBuffer = scanner.getRecords().values().stream() .map(hoodieRecord -> { try { - return filterOutVariables((GenericRecord) hoodieRecord.getData().getInsertValue(schema, new Properties()).get()); + // in case it is a delete + GenericRecord record = (GenericRecord) hoodieRecord.getData() + .getInsertValue(schema, new Properties()) + .orElse(null); + return record == null ? (String) null : filterOutVariables(record); } catch (IOException e) { throw new RuntimeException(e); } }) + .filter(Objects::nonNull) .sorted(Comparator.naturalOrder()) .collect(Collectors.toList()); assertThat(readBuffer.toString(), is(expected.get(partitionDir.getName()))); @@ -437,7 +460,7 @@ public class TestData { return Strings.join(fields, ","); } - private static BinaryRowData binaryRow(Object... fields) { + private static BinaryRowData insertRow(Object... fields) { LogicalType[] types = TestConfigurations.ROW_TYPE.getFields().stream().map(RowType.RowField::getType) .toArray(LogicalType[]::new); assertEquals( @@ -458,4 +481,10 @@ public class TestData { writer.complete(); return row; } + + private static BinaryRowData deleteRow(Object... fields) { + BinaryRowData rowData = insertRow(fields); + rowData.setRowKind(RowKind.DELETE); + return rowData; + } } diff --git a/hudi-flink/src/test/java/org/apache/hudi/source/HoodieDataSourceITCase.java b/hudi-flink/src/test/java/org/apache/hudi/source/HoodieDataSourceITCase.java index ca110db4f..1a4a71d78 100644 --- a/hudi-flink/src/test/java/org/apache/hudi/source/HoodieDataSourceITCase.java +++ b/hudi-flink/src/test/java/org/apache/hudi/source/HoodieDataSourceITCase.java @@ -95,12 +95,12 @@ public class HoodieDataSourceITCase extends AbstractTestBase { execInsertSql(streamTableEnv, insertInto); List rows = execSelectSql(streamTableEnv, "select * from t1", 10); - assertRowsEquals(rows, TestData.DATA_SET_FOUR); + assertRowsEquals(rows, TestData.DATA_SET_SOURCE_INSERT); // insert another batch of data execInsertSql(streamTableEnv, insertInto); List rows2 = execSelectSql(streamTableEnv, "select * from t1", 10); - assertRowsEquals(rows2, TestData.DATA_SET_FOUR); + assertRowsEquals(rows2, TestData.DATA_SET_SOURCE_INSERT); } @Test @@ -135,7 +135,7 @@ public class HoodieDataSourceITCase extends AbstractTestBase { List rows = execSelectSql(streamTableEnv, "select * from t2", 10); // all the data with same keys are appended within one data bucket and one log file, // so when consume, the same keys are merged - assertRowsEquals(rows, TestData.DATA_SET_FIVE); + assertRowsEquals(rows, TestData.DATA_SET_SOURCE_MERGED); } @Test @@ -156,7 +156,7 @@ public class HoodieDataSourceITCase extends AbstractTestBase { List rows = CollectionUtil.iterableToList( () -> streamTableEnv.sqlQuery("select * from t1").execute().collect()); - assertRowsEquals(rows, TestData.DATA_SET_FOUR); + assertRowsEquals(rows, TestData.DATA_SET_SOURCE_INSERT); } @Test @@ -182,7 +182,7 @@ public class HoodieDataSourceITCase extends AbstractTestBase { List rows = CollectionUtil.iterableToList( () -> batchTableEnv.sqlQuery("select * from t1").execute().collect()); - assertRowsEquals(rows, TestData.DATA_SET_FOUR); + assertRowsEquals(rows, TestData.DATA_SET_SOURCE_INSERT); } private void execInsertSql(TableEnvironment tEnv, String insert) { diff --git a/hudi-flink/src/test/java/org/apache/hudi/source/TestHoodieTableSource.java b/hudi-flink/src/test/java/org/apache/hudi/source/TestHoodieTableSource.java index af50cf0dc..48bd350db 100644 --- a/hudi-flink/src/test/java/org/apache/hudi/source/TestHoodieTableSource.java +++ b/hudi-flink/src/test/java/org/apache/hudi/source/TestHoodieTableSource.java @@ -101,7 +101,7 @@ public class TestHoodieTableSource { @Test void testGetInputFormat() throws Exception { // write some data to let the TableSchemaResolver get the right instant - TestData.writeData(TestData.DATA_SET_ONE, conf); + TestData.writeData(TestData.DATA_SET_INSERT, conf); HoodieTableSource tableSource = new HoodieTableSource( TestConfigurations.TABLE_SCHEMA, diff --git a/hudi-flink/src/test/java/org/apache/hudi/source/TestStreamReadMonitoringFunction.java b/hudi-flink/src/test/java/org/apache/hudi/source/TestStreamReadMonitoringFunction.java index f02a28c0f..4733fa07b 100644 --- a/hudi-flink/src/test/java/org/apache/hudi/source/TestStreamReadMonitoringFunction.java +++ b/hudi-flink/src/test/java/org/apache/hudi/source/TestStreamReadMonitoringFunction.java @@ -71,7 +71,7 @@ public class TestStreamReadMonitoringFunction { @Test public void testConsumeFromLatestCommit() throws Exception { - TestData.writeData(TestData.DATA_SET_ONE, conf); + TestData.writeData(TestData.DATA_SET_INSERT, conf); StreamReadMonitoringFunction function = TestUtils.getMonitorFunc(conf); try (AbstractStreamOperatorTestHarness harness = createHarness(function)) { harness.setup(); @@ -95,7 +95,7 @@ public class TestStreamReadMonitoringFunction { sourceContext.reset(latch); // write another instant and validate - TestData.writeData(TestData.DATA_SET_TWO, conf); + TestData.writeData(TestData.DATA_SET_UPDATE_INSERT, conf); assertTrue(latch.await(WAIT_TIME_MILLIS, TimeUnit.MILLISECONDS), "Should finish splits generation"); assertThat("Should produce the expected splits", @@ -112,8 +112,8 @@ public class TestStreamReadMonitoringFunction { public void testConsumeFromSpecifiedCommit() throws Exception { // write 2 commits first, use the second commit time as the specified start instant, // all the splits should come from the second commit. - TestData.writeData(TestData.DATA_SET_ONE, conf); - TestData.writeData(TestData.DATA_SET_TWO, conf); + TestData.writeData(TestData.DATA_SET_INSERT, conf); + TestData.writeData(TestData.DATA_SET_UPDATE_INSERT, conf); String specifiedCommit = TestUtils.getLatestCommit(tempFile.getAbsolutePath()); conf.setString(FlinkOptions.READ_STREAMING_START_COMMIT, specifiedCommit); StreamReadMonitoringFunction function = TestUtils.getMonitorFunc(conf); @@ -141,7 +141,7 @@ public class TestStreamReadMonitoringFunction { @Test public void testCheckpointRestore() throws Exception { - TestData.writeData(TestData.DATA_SET_ONE, conf); + TestData.writeData(TestData.DATA_SET_INSERT, conf); StreamReadMonitoringFunction function = TestUtils.getMonitorFunc(conf); OperatorSubtaskState state; @@ -169,7 +169,7 @@ public class TestStreamReadMonitoringFunction { } - TestData.writeData(TestData.DATA_SET_TWO, conf); + TestData.writeData(TestData.DATA_SET_UPDATE_INSERT, conf); StreamReadMonitoringFunction function2 = TestUtils.getMonitorFunc(conf); try (AbstractStreamOperatorTestHarness harness = createHarness(function2)) { harness.setup(); diff --git a/hudi-flink/src/test/java/org/apache/hudi/source/TestStreamReadOperator.java b/hudi-flink/src/test/java/org/apache/hudi/source/TestStreamReadOperator.java index e13f95081..2daa6c38f 100644 --- a/hudi-flink/src/test/java/org/apache/hudi/source/TestStreamReadOperator.java +++ b/hudi-flink/src/test/java/org/apache/hudi/source/TestStreamReadOperator.java @@ -93,7 +93,7 @@ public class TestStreamReadOperator { @Test void testWriteRecords() throws Exception { - TestData.writeData(TestData.DATA_SET_ONE, conf); + TestData.writeData(TestData.DATA_SET_INSERT, conf); try (OneInputStreamOperatorTestHarness harness = createReader()) { harness.setup(); harness.open(); @@ -111,9 +111,9 @@ public class TestStreamReadOperator { assertThat("Should process 1 split", processor.runMailboxStep()); } // Assert the output has expected elements. - TestData.assertRowDataEquals(harness.extractOutputValues(), TestData.DATA_SET_ONE); + TestData.assertRowDataEquals(harness.extractOutputValues(), TestData.DATA_SET_INSERT); - TestData.writeData(TestData.DATA_SET_TWO, conf); + TestData.writeData(TestData.DATA_SET_UPDATE_INSERT, conf); final List splits2 = generateSplits(func); assertThat("Should have 4 splits", splits2.size(), is(4)); for (MergeOnReadInputSplit split : splits2) { @@ -124,8 +124,8 @@ public class TestStreamReadOperator { assertThat("Should processed 1 split", processor.runMailboxStep()); } // The result sets behaves like append only: DATA_SET_ONE + DATA_SET_TWO - List expected = new ArrayList<>(TestData.DATA_SET_ONE); - expected.addAll(TestData.DATA_SET_TWO); + List expected = new ArrayList<>(TestData.DATA_SET_INSERT); + expected.addAll(TestData.DATA_SET_UPDATE_INSERT); TestData.assertRowDataEquals(harness.extractOutputValues(), expected); } } @@ -134,7 +134,7 @@ public class TestStreamReadOperator { public void testCheckpoint() throws Exception { // Received emitted splits: split1, split2, split3, split4, checkpoint request is triggered // when reading records from split1. - TestData.writeData(TestData.DATA_SET_ONE, conf); + TestData.writeData(TestData.DATA_SET_INSERT, conf); long timestamp = 0; try (OneInputStreamOperatorTestHarness harness = createReader()) { harness.setup(); @@ -170,13 +170,13 @@ public class TestStreamReadOperator { assertTrue(processor.runMailboxStep(), "Should have processed the split3"); // Assert the output has expected elements. - TestData.assertRowDataEquals(harness.extractOutputValues(), TestData.DATA_SET_ONE); + TestData.assertRowDataEquals(harness.extractOutputValues(), TestData.DATA_SET_INSERT); } } @Test public void testCheckpointRestore() throws Exception { - TestData.writeData(TestData.DATA_SET_ONE, conf); + TestData.writeData(TestData.DATA_SET_INSERT, conf); OperatorSubtaskState state; final List splits; diff --git a/hudi-flink/src/test/java/org/apache/hudi/source/format/TestInputFormat.java b/hudi-flink/src/test/java/org/apache/hudi/source/format/TestInputFormat.java index 7774e5611..343f2939b 100644 --- a/hudi-flink/src/test/java/org/apache/hudi/source/format/TestInputFormat.java +++ b/hudi-flink/src/test/java/org/apache/hudi/source/format/TestInputFormat.java @@ -76,18 +76,18 @@ public class TestInputFormat { void testRead(HoodieTableType tableType) throws Exception { beforeEach(tableType); - TestData.writeData(TestData.DATA_SET_ONE, conf); + TestData.writeData(TestData.DATA_SET_INSERT, conf); InputFormat inputFormat = this.tableSource.getInputFormat(); List result = readData(inputFormat); String actual = TestData.rowDataToString(result); - String expected = TestData.rowDataToString(TestData.DATA_SET_ONE); + String expected = TestData.rowDataToString(TestData.DATA_SET_INSERT); assertThat(actual, is(expected)); // write another commit to read again - TestData.writeData(TestData.DATA_SET_TWO, conf); + TestData.writeData(TestData.DATA_SET_UPDATE_INSERT, conf); // refresh the input format this.tableSource.reloadActiveTimeline(); @@ -116,19 +116,19 @@ public class TestInputFormat { // write parquet first with compaction conf.setBoolean(FlinkOptions.COMPACTION_ASYNC_ENABLED, true); - TestData.writeData(TestData.DATA_SET_ONE, conf); + TestData.writeData(TestData.DATA_SET_INSERT, conf); InputFormat inputFormat = this.tableSource.getInputFormat(); List result = readData(inputFormat); String actual = TestData.rowDataToString(result); - String expected = TestData.rowDataToString(TestData.DATA_SET_ONE); + String expected = TestData.rowDataToString(TestData.DATA_SET_INSERT); assertThat(actual, is(expected)); // write another commit using logs and read again conf.setBoolean(FlinkOptions.COMPACTION_ASYNC_ENABLED, false); - TestData.writeData(TestData.DATA_SET_TWO, conf); + TestData.writeData(TestData.DATA_SET_UPDATE_INSERT, conf); // refresh the input format this.tableSource.reloadActiveTimeline(); @@ -156,7 +156,7 @@ public class TestInputFormat { void testReadWithPartitionPrune(HoodieTableType tableType) throws Exception { beforeEach(tableType); - TestData.writeData(TestData.DATA_SET_ONE, conf); + TestData.writeData(TestData.DATA_SET_INSERT, conf); Map prunedPartitions = new HashMap<>(); prunedPartitions.put("partition", "par1");