diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/profile/WriteProfile.java b/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/profile/WriteProfile.java index d3de247ce..1171a54cd 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/profile/WriteProfile.java +++ b/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/profile/WriteProfile.java @@ -31,11 +31,9 @@ import org.apache.hudi.sink.partitioner.BucketAssigner; import org.apache.hudi.table.HoodieFlinkTable; import org.apache.hudi.table.HoodieTable; import org.apache.hudi.table.action.commit.SmallFile; -import org.apache.hudi.util.StreamerUtil; import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.core.fs.Path; -import org.apache.hadoop.conf.Configuration; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -98,11 +96,6 @@ public class WriteProfile { */ protected AbstractTableFileSystemView fsView; - /** - * Hadoop configuration. - */ - private final Configuration hadoopConf; - /** * Metadata cache to reduce IO of metadata files. */ @@ -114,7 +107,6 @@ public class WriteProfile { this.smallFilesMap = new HashMap<>(); this.recordsPerBucket = config.getCopyOnWriteInsertSplitSize(); this.table = HoodieFlinkTable.create(config, context); - this.hadoopConf = StreamerUtil.getHadoopConf(); this.metadataCache = new HashMap<>(); // profile the record statistics on construction recordProfile(); diff --git a/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteCopyOnWrite.java b/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteCopyOnWrite.java index 5b25311ec..d8588f8cf 100644 --- a/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteCopyOnWrite.java +++ b/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteCopyOnWrite.java @@ -19,93 +19,44 @@ package org.apache.hudi.sink; import org.apache.hudi.client.HoodieFlinkWriteClient; -import org.apache.hudi.client.WriteStatus; import org.apache.hudi.common.model.HoodieKey; -import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieTableType; -import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.table.view.FileSystemViewStorageConfig; import org.apache.hudi.common.table.view.FileSystemViewStorageType; import org.apache.hudi.configuration.FlinkOptions; -import org.apache.hudi.sink.event.WriteMetadataEvent; -import org.apache.hudi.sink.utils.InsertFunctionWrapper; -import org.apache.hudi.sink.utils.StreamWriteFunctionWrapper; +import org.apache.hudi.sink.utils.TestWriteBase; import org.apache.hudi.util.StreamerUtil; import org.apache.hudi.utils.TestConfigurations; import org.apache.hudi.utils.TestData; -import org.apache.hudi.utils.TestUtils; import org.apache.flink.configuration.Configuration; -import org.apache.flink.runtime.operators.coordination.OperatorEvent; -import org.apache.flink.table.data.RowData; -import org.hamcrest.MatcherAssert; -import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.io.TempDir; import java.io.File; import java.io.IOException; -import java.util.Arrays; -import java.util.Comparator; import java.util.HashMap; -import java.util.List; import java.util.Map; -import java.util.stream.Collectors; -import static org.apache.hudi.common.table.timeline.HoodieInstant.State.REQUESTED; -import static org.hamcrest.CoreMatchers.instanceOf; -import static org.hamcrest.CoreMatchers.is; -import static org.hamcrest.MatcherAssert.assertThat; -import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertFalse; -import static org.junit.jupiter.api.Assertions.assertNotEquals; -import static org.junit.jupiter.api.Assertions.assertNotNull; -import static org.junit.jupiter.api.Assertions.assertNull; import static org.junit.jupiter.api.Assertions.assertSame; -import static org.junit.jupiter.api.Assertions.assertTrue; /** * Test cases for stream write. */ -public class TestWriteCopyOnWrite { - - protected static final Map EXPECTED1 = new HashMap<>(); - - protected static final Map EXPECTED2 = new HashMap<>(); - - protected static final Map EXPECTED3 = new HashMap<>(); - - static { - EXPECTED1.put("par1", "[id1,par1,id1,Danny,23,1,par1, id2,par1,id2,Stephen,33,2,par1]"); - EXPECTED1.put("par2", "[id3,par2,id3,Julian,53,3,par2, id4,par2,id4,Fabian,31,4,par2]"); - EXPECTED1.put("par3", "[id5,par3,id5,Sophia,18,5,par3, id6,par3,id6,Emma,20,6,par3]"); - EXPECTED1.put("par4", "[id7,par4,id7,Bob,44,7,par4, id8,par4,id8,Han,56,8,par4]"); - - EXPECTED2.put("par1", "[id1,par1,id1,Danny,24,1,par1, id2,par1,id2,Stephen,34,2,par1]"); - EXPECTED2.put("par2", "[id3,par2,id3,Julian,54,3,par2, id4,par2,id4,Fabian,32,4,par2]"); - EXPECTED2.put("par3", "[id5,par3,id5,Sophia,18,5,par3, id6,par3,id6,Emma,20,6,par3, " - + "id9,par3,id9,Jane,19,6,par3]"); - EXPECTED2.put("par4", "[id10,par4,id10,Ella,38,7,par4, id11,par4,id11,Phoebe,52,8,par4, " - + "id7,par4,id7,Bob,44,7,par4, id8,par4,id8,Han,56,8,par4]"); - - EXPECTED3.put("par1", "[id1,par1,id1,Danny,23,1,par1]"); - } +public class TestWriteCopyOnWrite extends TestWriteBase { protected Configuration conf; - protected StreamWriteFunctionWrapper funcWrapper; - @TempDir File tempFile; @BeforeEach - public void before() throws Exception { + public void before() { conf = TestConfigurations.getDefaultConf(tempFile.getAbsolutePath()); conf.setString(FlinkOptions.TABLE_TYPE, getTableType().name()); setUp(conf); - this.funcWrapper = new StreamWriteFunctionWrapper<>(tempFile.getAbsolutePath(), conf); } /** @@ -115,353 +66,148 @@ public class TestWriteCopyOnWrite { // for sub-class extension } - @AfterEach - public void after() throws Exception { - funcWrapper.close(); - } - @Test public void testCheckpoint() throws Exception { - // open the function and ingest data - funcWrapper.openFunction(); - for (RowData rowData : TestData.DATA_SET_INSERT) { - funcWrapper.invoke(rowData); - } - - // no checkpoint, so the coordinator does not accept any events - assertTrue( - funcWrapper.getEventBuffer().length == 1 - && funcWrapper.getEventBuffer()[0] == null, "The coordinator events buffer expect to be empty"); - - // this triggers the data write and event send - funcWrapper.checkpointFunction(1); - - String instant = lastPendingInstant(); - - final OperatorEvent nextEvent = funcWrapper.getNextEvent(); - MatcherAssert.assertThat("The operator expect to send an event", nextEvent, instanceOf(WriteMetadataEvent.class)); - List writeStatuses = ((WriteMetadataEvent) nextEvent).getWriteStatuses(); - assertNotNull(writeStatuses); - MatcherAssert.assertThat(writeStatuses.size(), is(4)); // write 4 partition files - assertThat(writeStatuses.stream() - .map(WriteStatus::getPartitionPath).sorted(Comparator.naturalOrder()) - .collect(Collectors.joining(",")), - is("par1,par2,par3,par4")); - - funcWrapper.getCoordinator().handleEventFromOperator(0, nextEvent); - assertNotNull(funcWrapper.getEventBuffer()[0], "The coordinator missed the event"); - - checkInstantState(REQUESTED, instant); - funcWrapper.checkpointComplete(1); - // the coordinator checkpoint commits the inflight instant. - checkInstantState(HoodieInstant.State.COMPLETED, instant); - - // checkpoint for next round, no data input, so after the checkpoint, - // there should not be REQUESTED Instant - // this triggers the data write and event send - funcWrapper.checkpointFunction(2); - - String instant2 = lastPendingInstant(); - assertNotEquals(instant, instant2); - - final OperatorEvent nextEvent2 = funcWrapper.getNextEvent(); - assertThat("The operator expect to send an event", nextEvent2, instanceOf(WriteMetadataEvent.class)); - List writeStatuses2 = ((WriteMetadataEvent) nextEvent2).getWriteStatuses(); - assertNotNull(writeStatuses2); - assertThat(writeStatuses2.size(), is(0)); // write empty statuses - - funcWrapper.getCoordinator().handleEventFromOperator(0, nextEvent2); - assertNotNull(funcWrapper.getEventBuffer()[0], "The coordinator missed the event"); - - funcWrapper.checkpointComplete(2); - // started a new instant already - checkInflightInstant(); - checkInstantState(HoodieInstant.State.COMPLETED, instant); + preparePipeline() + .consume(TestData.DATA_SET_INSERT) + // no checkpoint, so the coordinator does not accept any events + .emptyEventBuffer() + .checkpoint(1) + .assertNextEvent(4, "par1,par2,par3,par4") + .checkpointComplete(1) + // checkpoint for next round, no data input, so after the checkpoint, + // there should not be REQUESTED Instant + // this triggers the data write and event send + .checkpoint(2) + .assertEmptyEvent() + .emptyCheckpoint(2) + .end(); } @Test public void testCheckpointFails() throws Exception { // reset the config option conf.setLong(FlinkOptions.WRITE_COMMIT_ACK_TIMEOUT, 1L); - funcWrapper = new StreamWriteFunctionWrapper<>(tempFile.getAbsolutePath(), conf); - // open the function and ingest data - funcWrapper.openFunction(); - // no data written and triggers checkpoint fails, - // then we should revert the start instant - - // this triggers the data write and event send - funcWrapper.checkpointFunction(1); - - String instant = lastPendingInstant(); - assertNotNull(instant); - - final OperatorEvent nextEvent = funcWrapper.getNextEvent(); - assertThat("The operator expect to send an event", nextEvent, instanceOf(WriteMetadataEvent.class)); - List writeStatuses = ((WriteMetadataEvent) nextEvent).getWriteStatuses(); - assertNotNull(writeStatuses); - assertThat(writeStatuses.size(), is(0)); // no data write - - // fails the checkpoint - funcWrapper.checkpointFails(1); - assertFalse(funcWrapper.getCoordinatorContext().isJobFailed(), - "The last checkpoint was aborted, ignore the events"); - - // the instant metadata should be reused - checkInstantState(REQUESTED, instant); - checkInstantState(HoodieInstant.State.COMPLETED, null); - - for (RowData rowData : TestData.DATA_SET_INSERT) { - funcWrapper.invoke(rowData); - } - - // this returns early because there is no inflight instant - assertDoesNotThrow(() -> funcWrapper.checkpointFunction(2), - "The stream writer reuse the last instant time when waiting for the last instant commit timeout"); - // do not send the write event and fails the checkpoint, - // behaves like the last checkpoint is successful. - funcWrapper.checkpointFails(2); + preparePipeline(conf) + // no data written and triggers checkpoint fails, + // then we should revert the start instant + .checkpoint(1) + .assertEmptyEvent() + .checkpointFails(1) + .consume(TestData.DATA_SET_INSERT) + .checkpointNotThrow(2, + "The stream writer reuse the last instant time when waiting for the last instant commit timeout") + // do not send the write event and fails the checkpoint, + // behaves like the last checkpoint is successful. + .checkpointFails(2) + .end(); } @Test public void testSubtaskFails() throws Exception { // open the function and ingest data - funcWrapper.openFunction(); - // no data written and triggers checkpoint fails, - // then we should revert the start instant - - // this triggers the data write and event send - funcWrapper.checkpointFunction(1); - funcWrapper.getNextEvent(); - - String instant1 = lastPendingInstant(); - assertNotNull(instant1); - - // fails the subtask - funcWrapper.subTaskFails(0); - - String instant2 = lastPendingInstant(); - assertNotEquals(instant2, instant1, "The previous instant should be rolled back when starting new instant"); - - checkInstantState(HoodieInstant.State.COMPLETED, null); + preparePipeline() + .checkpoint(1) + .assertEmptyEvent() + .subTaskFails(0) + .noCompleteInstant() + .end(); } @Test public void testInsert() throws Exception { // open the function and ingest data - funcWrapper.openFunction(); - for (RowData rowData : TestData.DATA_SET_INSERT) { - funcWrapper.invoke(rowData); - } - - assertEmptyDataFiles(); - // this triggers the data write and event send - funcWrapper.checkpointFunction(1); - - String instant = lastPendingInstant(); - - final OperatorEvent nextEvent = funcWrapper.getNextEvent(); - assertThat("The operator expect to send an event", nextEvent, instanceOf(WriteMetadataEvent.class)); - - funcWrapper.getCoordinator().handleEventFromOperator(0, nextEvent); - assertNotNull(funcWrapper.getEventBuffer()[0], "The coordinator missed the event"); - - checkInstantState(REQUESTED, instant); - funcWrapper.checkpointComplete(1); - checkWrittenData(tempFile, EXPECTED1); - // the coordinator checkpoint commits the inflight instant. - checkInstantState(HoodieInstant.State.COMPLETED, instant); - checkWrittenData(tempFile, EXPECTED1); + preparePipeline() + .consume(TestData.DATA_SET_INSERT) + .assertEmptyDataFiles() + .checkpoint(1) + .assertNextEvent() + .checkpointComplete(1) + .checkWrittenData(EXPECTED1) + .end(); } @Test public void testInsertDuplicates() throws Exception { // reset the config option conf.setBoolean(FlinkOptions.PRE_COMBINE, true); - funcWrapper = new StreamWriteFunctionWrapper<>(tempFile.getAbsolutePath(), conf); - - // open the function and ingest data - funcWrapper.openFunction(); - for (RowData rowData : TestData.DATA_SET_INSERT_DUPLICATES) { - funcWrapper.invoke(rowData); - } - - assertEmptyDataFiles(); - // this triggers the data write and event send - funcWrapper.checkpointFunction(1); - - OperatorEvent nextEvent = funcWrapper.getNextEvent(); - assertThat("The operator expect to send an event", nextEvent, instanceOf(WriteMetadataEvent.class)); - - funcWrapper.getCoordinator().handleEventFromOperator(0, nextEvent); - assertNotNull(funcWrapper.getEventBuffer()[0], "The coordinator missed the event"); - - funcWrapper.checkpointComplete(1); - - checkWrittenData(tempFile, EXPECTED3, 1); - - // insert duplicates again - for (RowData rowData : TestData.DATA_SET_INSERT_DUPLICATES) { - funcWrapper.invoke(rowData); - } - - funcWrapper.checkpointFunction(2); - - nextEvent = funcWrapper.getNextEvent(); - funcWrapper.getCoordinator().handleEventFromOperator(0, nextEvent); - funcWrapper.checkpointComplete(2); - - checkWrittenData(tempFile, EXPECTED3, 1); + preparePipeline(conf) + .consume(TestData.DATA_SET_INSERT_DUPLICATES) + .assertEmptyDataFiles() + .checkpoint(1) + .assertNextEvent() + .checkpointComplete(1) + .checkWrittenData(EXPECTED3, 1) + // insert duplicates again + .consume(TestData.DATA_SET_INSERT_DUPLICATES) + .checkpoint(2) + .assertNextEvent() + .checkpointComplete(2) + .checkWrittenData(EXPECTED3, 1) + .end(); } @Test public void testUpsert() throws Exception { // open the function and ingest data - funcWrapper.openFunction(); - for (RowData rowData : TestData.DATA_SET_INSERT) { - funcWrapper.invoke(rowData); - } - - assertEmptyDataFiles(); - // this triggers the data write and event send - funcWrapper.checkpointFunction(1); - - OperatorEvent nextEvent = funcWrapper.getNextEvent(); - assertThat("The operator expect to send an event", nextEvent, instanceOf(WriteMetadataEvent.class)); - - funcWrapper.getCoordinator().handleEventFromOperator(0, nextEvent); - assertNotNull(funcWrapper.getEventBuffer()[0], "The coordinator missed the event"); - - funcWrapper.checkpointComplete(1); - - // upsert another data buffer - for (RowData rowData : TestData.DATA_SET_UPDATE_INSERT) { - funcWrapper.invoke(rowData); - } - // the data is not flushed yet - checkWrittenData(tempFile, EXPECTED1); - // this triggers the data write and event send - funcWrapper.checkpointFunction(2); - - String instant = lastPendingInstant(); - - nextEvent = funcWrapper.getNextEvent(); - assertThat("The operator expect to send an event", nextEvent, instanceOf(WriteMetadataEvent.class)); - - funcWrapper.getCoordinator().handleEventFromOperator(0, nextEvent); - assertNotNull(funcWrapper.getEventBuffer()[0], "The coordinator missed the event"); - - checkInstantState(REQUESTED, instant); - funcWrapper.checkpointComplete(2); - // the coordinator checkpoint commits the inflight instant. - checkInstantState(HoodieInstant.State.COMPLETED, instant); - checkWrittenData(tempFile, EXPECTED2); + preparePipeline() + .consume(TestData.DATA_SET_INSERT) + .assertEmptyDataFiles() + .checkpoint(1) + .assertNextEvent() + .checkpointComplete(1) + // upsert another data buffer + .consume(TestData.DATA_SET_UPDATE_INSERT) + // the data is not flushed yet + .checkWrittenData(EXPECTED1) + .checkpoint(2) + .assertNextEvent() + .checkpointComplete(2) + .checkWrittenData(EXPECTED2) + .end(); } @Test public void testUpsertWithDelete() throws Exception { // open the function and ingest data - funcWrapper.openFunction(); - for (RowData rowData : TestData.DATA_SET_INSERT) { - funcWrapper.invoke(rowData); - } - - assertEmptyDataFiles(); - // this triggers the data write and event send - funcWrapper.checkpointFunction(1); - - OperatorEvent nextEvent = funcWrapper.getNextEvent(); - assertThat("The operator expect to send an event", nextEvent, instanceOf(WriteMetadataEvent.class)); - - funcWrapper.getCoordinator().handleEventFromOperator(0, nextEvent); - assertNotNull(funcWrapper.getEventBuffer()[0], "The coordinator missed the event"); - - funcWrapper.checkpointComplete(1); - - // upsert another data buffer - for (RowData rowData : TestData.DATA_SET_UPDATE_DELETE) { - funcWrapper.invoke(rowData); - } - // the data is not flushed yet - checkWrittenData(tempFile, EXPECTED1); - // this triggers the data write and event send - funcWrapper.checkpointFunction(2); - - String instant = lastPendingInstant(); - - nextEvent = funcWrapper.getNextEvent(); - assertThat("The operator expect to send an event", nextEvent, instanceOf(WriteMetadataEvent.class)); - - funcWrapper.getCoordinator().handleEventFromOperator(0, nextEvent); - assertNotNull(funcWrapper.getEventBuffer()[0], "The coordinator missed the event"); - - checkInstantState(REQUESTED, instant); - funcWrapper.checkpointComplete(2); - // the coordinator checkpoint commits the inflight instant. - checkInstantState(HoodieInstant.State.COMPLETED, instant); - - Map expected = getUpsertWithDeleteExpected(); - checkWrittenData(tempFile, expected); + preparePipeline() + .consume(TestData.DATA_SET_INSERT) + .assertEmptyDataFiles() + .checkpoint(1) + .assertNextEvent() + .checkpointComplete(1) + .consume(TestData.DATA_SET_UPDATE_DELETE) + .checkWrittenData(EXPECTED1) + .checkpoint(2) + .assertNextEvent() + .checkpointComplete(2) + .checkWrittenData(getUpsertWithDeleteExpected()) + .end(); } @Test public void testInsertWithMiniBatches() throws Exception { // reset the config option conf.setDouble(FlinkOptions.WRITE_BATCH_SIZE, 0.0008); // 839 bytes batch size - funcWrapper = new StreamWriteFunctionWrapper<>(tempFile.getAbsolutePath(), conf); - - // open the function and ingest data - funcWrapper.openFunction(); - // record (operation: 'I') is 304 bytes and record (operation: 'U') is 352 bytes. - // so 3 records expect to trigger a mini-batch write - for (RowData rowData : TestData.DATA_SET_INSERT_DUPLICATES) { - funcWrapper.invoke(rowData); - } - - Map> dataBuffer = funcWrapper.getDataBuffer(); - assertThat("Should have 1 data bucket", dataBuffer.size(), is(1)); - assertThat("2 records expect to flush out as a mini-batch", - dataBuffer.values().stream().findFirst().map(List::size).orElse(-1), - is(2)); - - // this triggers the data write and event send - funcWrapper.checkpointFunction(1); - dataBuffer = funcWrapper.getDataBuffer(); - assertThat("All data should be flushed out", dataBuffer.size(), is(0)); - - final OperatorEvent event1 = funcWrapper.getNextEvent(); // remove the first event first - final OperatorEvent event2 = funcWrapper.getNextEvent(); - assertThat("The operator expect to send an event", event2, instanceOf(WriteMetadataEvent.class)); - - funcWrapper.getCoordinator().handleEventFromOperator(0, event1); - funcWrapper.getCoordinator().handleEventFromOperator(0, event2); - assertNotNull(funcWrapper.getEventBuffer()[0], "The coordinator missed the event"); - - String instant = lastPendingInstant(); - - funcWrapper.checkpointComplete(1); Map expected = getMiniBatchExpected(); - checkWrittenData(tempFile, expected, 1); - // started a new instant already - checkInflightInstant(); - checkInstantState(HoodieInstant.State.COMPLETED, instant); - - // insert duplicates again - for (RowData rowData : TestData.DATA_SET_INSERT_DUPLICATES) { - funcWrapper.invoke(rowData); - } - - funcWrapper.checkpointFunction(2); - - final OperatorEvent event3 = funcWrapper.getNextEvent(); // remove the first event first - final OperatorEvent event4 = funcWrapper.getNextEvent(); - funcWrapper.getCoordinator().handleEventFromOperator(0, event3); - funcWrapper.getCoordinator().handleEventFromOperator(0, event4); - funcWrapper.checkpointComplete(2); - - // Same the original base file content. - checkWrittenData(tempFile, expected, 1); + preparePipeline(conf) + // record (operation: 'I') is 304 bytes and record (operation: 'U') is 352 bytes. + // so 3 records expect to trigger a mini-batch write + .consume(TestData.DATA_SET_INSERT_DUPLICATES) + .assertDataBuffer(1, 2) + .checkpoint(1) + .allDataFlushed() + .handleEvents(2) + .checkpointComplete(1) + .checkWrittenData(expected, 1) + .consume(TestData.DATA_SET_INSERT_DUPLICATES) + .checkpoint(2) + .handleEvents(2) + .checkpointComplete(2) + .checkWrittenData(expected, 1) + .end(); } @Test @@ -469,129 +215,43 @@ public class TestWriteCopyOnWrite { // reset the config option conf.setDouble(FlinkOptions.WRITE_BATCH_SIZE, 0.0008); // 839 bytes batch size conf.setBoolean(FlinkOptions.PRE_COMBINE, true); - funcWrapper = new StreamWriteFunctionWrapper<>(tempFile.getAbsolutePath(), conf); - - // open the function and ingest data - funcWrapper.openFunction(); - // record (operation: 'I') is 304 bytes and record (operation: 'U') is 352 bytes. - // so 3 records expect to trigger a mini-batch write - for (RowData rowData : TestData.DATA_SET_INSERT_SAME_KEY) { - funcWrapper.invoke(rowData); - } - - Map> dataBuffer = funcWrapper.getDataBuffer(); - assertThat("Should have 1 data bucket", dataBuffer.size(), is(1)); - assertThat("2 records expect to flush out as a mini-batch", - dataBuffer.values().stream().findFirst().map(List::size).orElse(-1), - is(2)); - - // this triggers the data write and event send - funcWrapper.checkpointFunction(1); - dataBuffer = funcWrapper.getDataBuffer(); - assertThat("All data should be flushed out", dataBuffer.size(), is(0)); - - final OperatorEvent event1 = funcWrapper.getNextEvent(); // remove the first event first - final OperatorEvent event2 = funcWrapper.getNextEvent(); - assertThat("The operator expect to send an event", event2, instanceOf(WriteMetadataEvent.class)); - - funcWrapper.getCoordinator().handleEventFromOperator(0, event1); - funcWrapper.getCoordinator().handleEventFromOperator(0, event2); - assertNotNull(funcWrapper.getEventBuffer()[0], "The coordinator missed the event"); - - String instant = lastPendingInstant(); - - funcWrapper.checkpointComplete(1); Map expected = new HashMap<>(); expected.put("par1", "[id1,par1,id1,Danny,23,4,par1]"); - checkWrittenData(tempFile, expected, 1); - - // started a new instant already - checkInflightInstant(); - checkInstantState(HoodieInstant.State.COMPLETED, instant); - - // insert duplicates again - for (RowData rowData : TestData.DATA_SET_INSERT_SAME_KEY) { - funcWrapper.invoke(rowData); - } - - funcWrapper.checkpointFunction(2); - - final OperatorEvent event3 = funcWrapper.getNextEvent(); // remove the first event first - final OperatorEvent event4 = funcWrapper.getNextEvent(); - funcWrapper.getCoordinator().handleEventFromOperator(0, event3); - funcWrapper.getCoordinator().handleEventFromOperator(0, event4); - funcWrapper.checkpointComplete(2); - - // Same the original base file content. - checkWrittenData(tempFile, expected, 1); + preparePipeline(conf) + // record (operation: 'I') is 304 bytes and record (operation: 'U') is 352 bytes. + // so 3 records expect to trigger a mini-batch write + .consume(TestData.DATA_SET_INSERT_SAME_KEY) + .assertDataBuffer(1, 2) + .checkpoint(1) + .allDataFlushed() + .handleEvents(2) + .checkpointComplete(1) + .checkWrittenData(expected, 1) + .consume(TestData.DATA_SET_INSERT_SAME_KEY) + .checkpoint(2) + .handleEvents(2) + .checkpointComplete(2) + .checkWrittenData(expected, 1) + .end(); } @Test public void testInsertAppendMode() throws Exception { - InsertFunctionWrapper funcWrapper = new InsertFunctionWrapper<>(tempFile.getAbsolutePath(), conf); - - // open the function and ingest data - funcWrapper.openFunction(); - // Each record is 208 bytes. so 4 records expect to trigger a mini-batch write - for (RowData rowData : TestData.DATA_SET_INSERT_SAME_KEY) { - funcWrapper.invoke(rowData); - } - - // this triggers the data write and event send - funcWrapper.checkpointFunction(1); - assertNull(funcWrapper.getWriterHelper()); - - final OperatorEvent event1 = funcWrapper.getNextEvent(); // remove the first event first - assertThat("The operator expect to send an event", event1, instanceOf(WriteMetadataEvent.class)); - - funcWrapper.getCoordinator().handleEventFromOperator(0, event1); - assertNotNull(funcWrapper.getEventBuffer()[0], "The coordinator missed the event"); - - String instant = lastPendingInstant(); - - funcWrapper.checkpointComplete(1); - - Map expected = new HashMap<>(); - - expected.put("par1", "[" - + "id1,par1,id1,Danny,23,0,par1, " - + "id1,par1,id1,Danny,23,1,par1, " - + "id1,par1,id1,Danny,23,2,par1, " - + "id1,par1,id1,Danny,23,3,par1, " - + "id1,par1,id1,Danny,23,4,par1]"); - - TestData.checkWrittenAllData(tempFile, expected, 1); - - // started a new instant already - checkInflightInstant(); - checkInstantState(HoodieInstant.State.COMPLETED, instant); - - // insert duplicates again - for (RowData rowData : TestData.DATA_SET_INSERT_SAME_KEY) { - funcWrapper.invoke(rowData); - } - - funcWrapper.checkpointFunction(2); - - final OperatorEvent event2 = funcWrapper.getNextEvent(); // remove the first event first - funcWrapper.getCoordinator().handleEventFromOperator(0, event2); - funcWrapper.checkpointComplete(2); - - // same with the original base file content. - expected.put("par1", "[" - + "id1,par1,id1,Danny,23,0,par1, " - + "id1,par1,id1,Danny,23,0,par1, " - + "id1,par1,id1,Danny,23,1,par1, " - + "id1,par1,id1,Danny,23,1,par1, " - + "id1,par1,id1,Danny,23,2,par1, " - + "id1,par1,id1,Danny,23,2,par1, " - + "id1,par1,id1,Danny,23,3,par1, " - + "id1,par1,id1,Danny,23,3,par1, " - + "id1,par1,id1,Danny,23,4,par1, " - + "id1,par1,id1,Danny,23,4,par1]"); - TestData.checkWrittenAllData(tempFile, expected, 1); + prepareInsertPipeline() + // Each record is 208 bytes. so 4 records expect to trigger a mini-batch write + .consume(TestData.DATA_SET_INSERT_SAME_KEY) + .checkpoint(1) + .assertNextEvent() + .checkpointComplete(1) + .checkWrittenAllData(EXPECTED4, 1) + .consume(TestData.DATA_SET_INSERT_SAME_KEY) + .checkpoint(2) + .assertNextEvent() + .checkpointComplete(2) + .checkWrittenFullData(EXPECTED5) + .end(); } /** @@ -604,145 +264,54 @@ public class TestWriteCopyOnWrite { conf.setString(FlinkOptions.OPERATION, "insert"); conf.setBoolean(FlinkOptions.INSERT_CLUSTER, true); conf.setDouble(FlinkOptions.WRITE_TASK_MAX_SIZE, 200.0008); // 839 bytes buffer size - funcWrapper = new StreamWriteFunctionWrapper<>(tempFile.getAbsolutePath(), conf); - // open the function and ingest data - funcWrapper.openFunction(); - // record (operation: 'I') is 304 bytes and record (operation: 'U') is 352 bytes. - // so 3 records expect to trigger a mini-batch write - // flush the max size bucket once at a time. - for (RowData rowData : TestData.DATA_SET_INSERT_SAME_KEY) { - funcWrapper.invoke(rowData); - } - - Map> dataBuffer = funcWrapper.getDataBuffer(); - assertThat("Should have 1 data bucket", dataBuffer.size(), is(1)); - assertThat("2 records expect to flush out as a mini-batch", - dataBuffer.values().stream().findFirst().map(List::size).orElse(-1), - is(2)); - - // this triggers the data write and event send - funcWrapper.checkpointFunction(1); - dataBuffer = funcWrapper.getDataBuffer(); - assertThat("All data should be flushed out", dataBuffer.size(), is(0)); - - for (int i = 0; i < 2; i++) { - final OperatorEvent event = funcWrapper.getNextEvent(); // remove the first event first - assertThat("The operator expect to send an event", event, instanceOf(WriteMetadataEvent.class)); - funcWrapper.getCoordinator().handleEventFromOperator(0, event); - } - assertNotNull(funcWrapper.getEventBuffer()[0], "The coordinator missed the event"); - - String instant = lastPendingInstant(); - - funcWrapper.checkpointComplete(1); - - Map expected = new HashMap<>(); - - expected.put("par1", "[" - + "id1,par1,id1,Danny,23,0,par1, " - + "id1,par1,id1,Danny,23,1,par1, " - + "id1,par1,id1,Danny,23,2,par1, " - + "id1,par1,id1,Danny,23,3,par1, " - + "id1,par1,id1,Danny,23,4,par1]"); - TestData.checkWrittenData(tempFile, expected, 1); - - // started a new instant already - checkInflightInstant(); - checkInstantState(HoodieInstant.State.COMPLETED, instant); - - // insert duplicates again - for (RowData rowData : TestData.DATA_SET_INSERT_SAME_KEY) { - funcWrapper.invoke(rowData); - } - - funcWrapper.checkpointFunction(2); - - for (int i = 0; i < 2; i++) { - final OperatorEvent event = funcWrapper.getNextEvent(); // remove the first event first - funcWrapper.getCoordinator().handleEventFromOperator(0, event); - } - - funcWrapper.checkpointComplete(2); - - // same with the original base file content. - Map> expected2 = new HashMap<>(); - expected2.put("par1", Arrays.asList( - "id1,par1,id1,Danny,23,0,par1", - "id1,par1,id1,Danny,23,0,par1", - "id1,par1,id1,Danny,23,1,par1", - "id1,par1,id1,Danny,23,1,par1", - "id1,par1,id1,Danny,23,2,par1", - "id1,par1,id1,Danny,23,2,par1", - "id1,par1,id1,Danny,23,3,par1", - "id1,par1,id1,Danny,23,3,par1", - "id1,par1,id1,Danny,23,4,par1", - "id1,par1,id1,Danny,23,4,par1")); - - // Same the original base file content. - TestData.checkWrittenFullData(tempFile, expected2); + TestWriteMergeOnRead.TestHarness.instance() + // record (operation: 'I') is 304 bytes and record (operation: 'U') is 352 bytes. + // so 3 records expect to trigger a mini-batch write + // flush the max size bucket once at a time. + .preparePipeline(tempFile, conf) + .consume(TestData.DATA_SET_INSERT_SAME_KEY) + .assertDataBuffer(1, 2) + .checkpoint(1) + .allDataFlushed() + .handleEvents(2) + .checkpointComplete(1) + .checkWrittenData(EXPECTED4, 1) + // insert duplicates again + .consume(TestData.DATA_SET_INSERT_SAME_KEY) + .checkpoint(2) + .handleEvents(2) + .checkpointComplete(2) + .checkWrittenFullData(EXPECTED5) + .end(); } @Test public void testInsertWithSmallBufferSize() throws Exception { // reset the config option conf.setDouble(FlinkOptions.WRITE_TASK_MAX_SIZE, 200.0008); // 839 bytes buffer size - funcWrapper = new StreamWriteFunctionWrapper<>(tempFile.getAbsolutePath(), conf); - - // open the function and ingest data - funcWrapper.openFunction(); - // record (operation: 'I') is 304 bytes and record (operation: 'U') is 352 bytes. - // so 3 records expect to trigger a mini-batch write - // flush the max size bucket once at a time. - for (RowData rowData : TestData.DATA_SET_INSERT_DUPLICATES) { - funcWrapper.invoke(rowData); - } - - Map> dataBuffer = funcWrapper.getDataBuffer(); - assertThat("Should have 1 data bucket", dataBuffer.size(), is(1)); - assertThat("2 records expect to flush out as a mini-batch", - dataBuffer.values().stream().findFirst().map(List::size).orElse(-1), - is(2)); - - // this triggers the data write and event send - funcWrapper.checkpointFunction(1); - dataBuffer = funcWrapper.getDataBuffer(); - assertThat("All data should be flushed out", dataBuffer.size(), is(0)); - - for (int i = 0; i < 2; i++) { - final OperatorEvent event = funcWrapper.getNextEvent(); // remove the first event first - assertThat("The operator expect to send an event", event, instanceOf(WriteMetadataEvent.class)); - funcWrapper.getCoordinator().handleEventFromOperator(0, event); - } - assertNotNull(funcWrapper.getEventBuffer()[0], "The coordinator missed the event"); - - String instant = lastPendingInstant(); - - funcWrapper.checkpointComplete(1); Map expected = getMiniBatchExpected(); - checkWrittenData(tempFile, expected, 1); - // started a new instant already - checkInflightInstant(); - checkInstantState(HoodieInstant.State.COMPLETED, instant); - - // insert duplicates again - for (RowData rowData : TestData.DATA_SET_INSERT_DUPLICATES) { - funcWrapper.invoke(rowData); - } - - funcWrapper.checkpointFunction(2); - - for (int i = 0; i < 2; i++) { - final OperatorEvent event = funcWrapper.getNextEvent(); // remove the first event first - funcWrapper.getCoordinator().handleEventFromOperator(0, event); - } - - funcWrapper.checkpointComplete(2); - - // Same the original base file content. - checkWrittenData(tempFile, expected, 1); + preparePipeline(conf) + // record (operation: 'I') is 304 bytes and record (operation: 'U') is 352 bytes. + // so 3 records expect to trigger a mini-batch write + // flush the max size bucket once at a time. + .consume(TestData.DATA_SET_INSERT_DUPLICATES) + .assertDataBuffer(1, 2) + .checkpoint(1) + .allDataFlushed() + .handleEvents(2) + .checkpointComplete(1) + .checkWrittenData(expected, 1) + // insert duplicates again + .consume(TestData.DATA_SET_INSERT_DUPLICATES) + .checkpoint(2) + .handleEvents(2) + .checkpointComplete(2) + // Same the original base file content. + .checkWrittenData(expected, 1) + .end(); } protected Map getMiniBatchExpected() { @@ -772,71 +341,38 @@ public class TestWriteCopyOnWrite { @Test public void testIndexStateBootstrap() throws Exception { // open the function and ingest data - funcWrapper.openFunction(); - for (RowData rowData : TestData.DATA_SET_INSERT) { - funcWrapper.invoke(rowData); - } - - assertEmptyDataFiles(); - // this triggers the data write and event send - funcWrapper.checkpointFunction(1); - - OperatorEvent nextEvent = funcWrapper.getNextEvent(); - assertThat("The operator expect to send an event", nextEvent, instanceOf(WriteMetadataEvent.class)); - - funcWrapper.getCoordinator().handleEventFromOperator(0, nextEvent); - assertNotNull(funcWrapper.getEventBuffer()[0], "The coordinator missed the event"); - - funcWrapper.checkpointComplete(1); - - // the data is not flushed yet - checkWrittenData(tempFile, EXPECTED1); + preparePipeline() + .consume(TestData.DATA_SET_INSERT) + .assertEmptyDataFiles() + .checkpoint(1) + .assertNextEvent() + .checkpointComplete(1) + .checkWrittenData(EXPECTED1, 4) + .end(); // reset the config option conf.setBoolean(FlinkOptions.INDEX_BOOTSTRAP_ENABLED, true); - funcWrapper = new StreamWriteFunctionWrapper<>(tempFile.getAbsolutePath(), conf); - - // upsert another data buffer - funcWrapper.openFunction(); - for (RowData rowData : TestData.DATA_SET_UPDATE_INSERT) { - funcWrapper.invoke(rowData); - } - - checkIndexLoaded( - new HoodieKey("id1", "par1"), - new HoodieKey("id2", "par1"), - new HoodieKey("id3", "par2"), - new HoodieKey("id4", "par2"), - new HoodieKey("id5", "par3"), - new HoodieKey("id6", "par3"), - new HoodieKey("id7", "par4"), - new HoodieKey("id8", "par4"), - new HoodieKey("id9", "par3"), - new HoodieKey("id10", "par4"), - new HoodieKey("id11", "par4")); - - // this triggers the data write and event send - funcWrapper.checkpointFunction(1); - - assertTrue(funcWrapper.isAlreadyBootstrap()); - - String instant = lastPendingInstant(); - - nextEvent = funcWrapper.getNextEvent(); - assertThat("The operator expect to send an event", nextEvent, instanceOf(WriteMetadataEvent.class)); - - Map expected = getExpectedBeforeCheckpointComplete(); - checkWrittenData(tempFile, expected); - - funcWrapper.getCoordinator().handleEventFromOperator(0, nextEvent); - assertNotNull(funcWrapper.getEventBuffer()[0], "The coordinator missed the event"); - - checkInstantState(REQUESTED, instant); - - funcWrapper.checkpointComplete(1); - // the coordinator checkpoint commits the inflight instant. - checkInstantState(HoodieInstant.State.COMPLETED, instant); - checkWrittenData(tempFile, EXPECTED2); + preparePipeline(conf) + .consume(TestData.DATA_SET_UPDATE_INSERT) + .checkIndexLoaded( + new HoodieKey("id1", "par1"), + new HoodieKey("id2", "par1"), + new HoodieKey("id3", "par2"), + new HoodieKey("id4", "par2"), + new HoodieKey("id5", "par3"), + new HoodieKey("id6", "par3"), + new HoodieKey("id7", "par4"), + new HoodieKey("id8", "par4"), + new HoodieKey("id9", "par3"), + new HoodieKey("id10", "par4"), + new HoodieKey("id11", "par4")) + .checkpoint(1) + .assertBootstrapped() + .assertNextEvent() + .checkWrittenData(getExpectedBeforeCheckpointComplete()) + .checkpointComplete(1) + .checkWrittenData(EXPECTED2) + .end(); } @Test @@ -844,46 +380,18 @@ public class TestWriteCopyOnWrite { // reset the config option conf.setLong(FlinkOptions.WRITE_COMMIT_ACK_TIMEOUT, 1L); conf.setDouble(FlinkOptions.WRITE_TASK_MAX_SIZE, 200.0006); // 630 bytes buffer size - funcWrapper = new StreamWriteFunctionWrapper<>(tempFile.getAbsolutePath(), conf); - - // open the function and ingest data - - funcWrapper.openFunction(); - for (RowData rowData : TestData.DATA_SET_INSERT) { - funcWrapper.invoke(rowData); - } - - // no checkpoint, so the coordinator does not accept any events - assertTrue( - funcWrapper.getEventBuffer().length == 1 - && funcWrapper.getEventBuffer()[0] == null, "The coordinator events buffer expect to be empty"); - - // this triggers the data write and event send - funcWrapper.checkpointFunction(1); - assertTrue(funcWrapper.isConforming(), "The write function should be waiting for the instant to commit"); - - for (int i = 0; i < 4; i++) { - final OperatorEvent event = funcWrapper.getNextEvent(); // remove the first event first - assertThat("The operator expect to send an event", event, instanceOf(WriteMetadataEvent.class)); - funcWrapper.getCoordinator().handleEventFromOperator(0, event); - } - - funcWrapper.checkpointComplete(1); - - for (RowData rowData : TestData.DATA_SET_INSERT) { - funcWrapper.invoke(rowData); - } - - assertFalse(funcWrapper.isConforming(), "The write function should finish waiting for the instant to commit"); - - // checkpoint for the next round - funcWrapper.checkpointFunction(2); - - assertDoesNotThrow(() -> { - for (RowData rowData : TestData.DATA_SET_INSERT) { - funcWrapper.invoke(rowData); - } - }, "The stream writer reuse the last instant time when waiting for the last instant commit timeout"); + preparePipeline(conf) + .consume(TestData.DATA_SET_INSERT) + .emptyEventBuffer() + .checkpoint(1) + .assertConfirming() + .handleEvents(4) + .checkpointComplete(1) + .consume(TestData.DATA_SET_INSERT) + .assertNotConfirming() + .checkpoint(2) + .assertConsumeDoesNotThrow(TestData.DATA_SET_INSERT) + .end(); } @Test @@ -903,59 +411,19 @@ public class TestWriteCopyOnWrite { // Utilities // ------------------------------------------------------------------------- - private void checkInflightInstant() { - final String instant = TestUtils.getLastPendingInstant(tempFile.getAbsolutePath()); - assertNotNull(instant); + private TestHarness preparePipeline() throws Exception { + return TestHarness.instance().preparePipeline(tempFile, conf); } - private void checkInstantState(HoodieInstant.State state, String instantStr) { - final String instant; - switch (state) { - case REQUESTED: - instant = lastPendingInstant(); - break; - case COMPLETED: - instant = lastCompleteInstant(); - break; - default: - throw new AssertionError("Unexpected state"); - } - assertThat(instant, is(instantStr)); + private TestHarness preparePipeline(Configuration conf) throws Exception { + return TestHarness.instance().preparePipeline(tempFile, conf); } - protected String lastPendingInstant() { - return TestUtils.getLastPendingInstant(tempFile.getAbsolutePath()); - } - - protected String lastCompleteInstant() { - return TestUtils.getLastCompleteInstant(tempFile.getAbsolutePath()); + protected TestHarness prepareInsertPipeline() throws Exception { + return TestHarness.instance().preparePipeline(tempFile, conf, true); } protected HoodieTableType getTableType() { return HoodieTableType.COPY_ON_WRITE; } - - protected void checkWrittenData(File baseFile, Map expected) throws Exception { - checkWrittenData(baseFile, expected, 4); - } - - protected void checkWrittenData(File baseFile, Map expected, int partitions) throws Exception { - TestData.checkWrittenData(baseFile, expected, partitions); - } - - /** - * Asserts the data files are empty. - */ - protected void assertEmptyDataFiles() { - File[] dataFiles = tempFile.listFiles(file -> !file.getName().startsWith(".")); - assertNotNull(dataFiles); - assertThat(dataFiles.length, is(0)); - } - - private void checkIndexLoaded(HoodieKey... keys) { - for (HoodieKey key : keys) { - assertTrue(funcWrapper.isKeyInState(key), - "Key: " + key + " assumes to be in the index state"); - } - } } diff --git a/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteMergeOnRead.java b/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteMergeOnRead.java index 064857ae6..a35a0ac8d 100644 --- a/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteMergeOnRead.java +++ b/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteMergeOnRead.java @@ -18,25 +18,11 @@ package org.apache.hudi.sink; -import org.apache.hudi.client.FlinkTaskContextSupplier; -import org.apache.hudi.client.common.HoodieFlinkEngineContext; -import org.apache.hudi.common.config.SerializableConfiguration; -import org.apache.hudi.common.fs.FSUtils; import org.apache.hudi.common.model.HoodieTableType; -import org.apache.hudi.common.table.HoodieTableMetaClient; -import org.apache.hudi.common.table.TableSchemaResolver; -import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.configuration.FlinkOptions; -import org.apache.hudi.table.HoodieFlinkTable; -import org.apache.hudi.util.StreamerUtil; -import org.apache.hudi.utils.TestData; -import org.apache.avro.Schema; import org.apache.flink.configuration.Configuration; -import org.apache.hadoop.fs.FileSystem; -import org.junit.jupiter.api.BeforeEach; -import java.io.File; import java.util.HashMap; import java.util.Map; @@ -44,19 +30,6 @@ import java.util.Map; * Test cases for delta stream write. */ public class TestWriteMergeOnRead extends TestWriteCopyOnWrite { - private FileSystem fs; - private HoodieWriteConfig writeConfig; - private HoodieFlinkEngineContext context; - - @BeforeEach - public void before() throws Exception { - super.before(); - fs = FSUtils.getFs(tempFile.getAbsolutePath(), new org.apache.hadoop.conf.Configuration()); - writeConfig = StreamerUtil.getHoodieClientConfig(conf); - context = new HoodieFlinkEngineContext( - new SerializableConfiguration(StreamerUtil.getHadoopConf()), - new FlinkTaskContextSupplier(null)); - } @Override protected void setUp(Configuration conf) { @@ -68,14 +41,6 @@ public class TestWriteMergeOnRead extends TestWriteCopyOnWrite { // insert clustering is only valid for cow table. } - @Override - protected void checkWrittenData(File baseFile, Map expected, int partitions) throws Exception { - HoodieTableMetaClient metaClient = HoodieFlinkTable.create(writeConfig, context).getMetaClient(); - Schema schema = new TableSchemaResolver(metaClient).getTableAvroSchema(); - String latestInstant = lastCompleteInstant(); - TestData.checkWrittenDataMOR(fs, latestInstant, baseFile, expected, partitions, schema); - } - @Override protected Map getExpectedBeforeCheckpointComplete() { return EXPECTED1; diff --git a/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteMergeOnReadWithCompact.java b/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteMergeOnReadWithCompact.java index 7530c8991..704d94cab 100644 --- a/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteMergeOnReadWithCompact.java +++ b/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteMergeOnReadWithCompact.java @@ -20,7 +20,6 @@ package org.apache.hudi.sink; import org.apache.hudi.common.model.HoodieTableType; import org.apache.hudi.configuration.FlinkOptions; -import org.apache.hudi.utils.TestUtils; import org.apache.flink.configuration.Configuration; @@ -59,9 +58,4 @@ public class TestWriteMergeOnReadWithCompact extends TestWriteCopyOnWrite { protected HoodieTableType getTableType() { return HoodieTableType.MERGE_ON_READ; } - - @Override - protected String lastCompleteInstant() { - return TestUtils.getLastDeltaCompleteInstant(tempFile.getAbsolutePath()); - } } diff --git a/hudi-flink/src/test/java/org/apache/hudi/sink/utils/InsertFunctionWrapper.java b/hudi-flink/src/test/java/org/apache/hudi/sink/utils/InsertFunctionWrapper.java index 4dc197c5c..642a407c1 100644 --- a/hudi-flink/src/test/java/org/apache/hudi/sink/utils/InsertFunctionWrapper.java +++ b/hudi-flink/src/test/java/org/apache/hudi/sink/utils/InsertFunctionWrapper.java @@ -47,7 +47,7 @@ import java.util.concurrent.CompletableFuture; * * @param Input type */ -public class InsertFunctionWrapper { +public class InsertFunctionWrapper implements TestFunctionWrapper { private final Configuration conf; private final RowType rowType; @@ -115,6 +115,11 @@ public class InsertFunctionWrapper { return coordinator; } + @Override + public void close() throws Exception { + this.coordinator.close(); + } + public BulkInsertWriterHelper getWriterHelper() { return this.writeFunction.getWriterHelper(); } diff --git a/hudi-flink/src/test/java/org/apache/hudi/sink/utils/StreamWriteFunctionWrapper.java b/hudi-flink/src/test/java/org/apache/hudi/sink/utils/StreamWriteFunctionWrapper.java index c65224a6e..54a142a25 100644 --- a/hudi-flink/src/test/java/org/apache/hudi/sink/utils/StreamWriteFunctionWrapper.java +++ b/hudi-flink/src/test/java/org/apache/hudi/sink/utils/StreamWriteFunctionWrapper.java @@ -66,7 +66,7 @@ import java.util.concurrent.CompletableFuture; * * @param Input type */ -public class StreamWriteFunctionWrapper { +public class StreamWriteFunctionWrapper implements TestFunctionWrapper { private final Configuration conf; private final IOManager ioManager; diff --git a/hudi-flink/src/test/java/org/apache/hudi/sink/utils/TestFunctionWrapper.java b/hudi-flink/src/test/java/org/apache/hudi/sink/utils/TestFunctionWrapper.java new file mode 100644 index 000000000..d2fe81965 --- /dev/null +++ b/hudi-flink/src/test/java/org/apache/hudi/sink/utils/TestFunctionWrapper.java @@ -0,0 +1,124 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.sink.utils; + +import org.apache.hudi.common.model.HoodieKey; +import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.sink.StreamWriteOperatorCoordinator; +import org.apache.hudi.sink.event.WriteMetadataEvent; + +import org.apache.flink.runtime.operators.coordination.MockOperatorCoordinatorContext; +import org.apache.flink.runtime.operators.coordination.OperatorEvent; + +import java.util.List; +import java.util.Map; + +/** + * Define the common interfaces for test function wrappers. + */ +public interface TestFunctionWrapper { + /** + * Open all the functions within this wrapper. + */ + void openFunction() throws Exception; + + /** + * Process the given input record {@code record}. + */ + void invoke(I record) throws Exception; + + /** + * Returns the event buffer sent by the write tasks. + */ + WriteMetadataEvent[] getEventBuffer(); + + /** + * Returns the next event. + */ + OperatorEvent getNextEvent(); + + /** + * Snapshot all the functions in the wrapper. + */ + void checkpointFunction(long checkpointId) throws Exception; + + /** + * Mark checkpoint with id {code checkpointId} as success. + */ + void checkpointComplete(long checkpointId); + + /** + * Returns the operator coordinator. + */ + StreamWriteOperatorCoordinator getCoordinator(); + + /** + * Returns the data buffer of the write task. + */ + default Map> getDataBuffer() { + throw new UnsupportedOperationException(); + } + + /** + * Mark checkpoint with id {code checkpointId} as failed. + */ + default void checkpointFails(long checkpointId) { + throw new UnsupportedOperationException(); + } + + /** + * Returns the context of the coordinator. + */ + default MockOperatorCoordinatorContext getCoordinatorContext() { + throw new UnsupportedOperationException(); + } + + /** + * Mark sub-task with id {@code taskId} as failed. + */ + default void subTaskFails(int taskId) throws Exception { + throw new UnsupportedOperationException(); + } + + /** + * Returns whether the given key {@code key} is in the state store. + */ + default boolean isKeyInState(HoodieKey key) { + throw new UnsupportedOperationException(); + } + + /** + * Returns whether the bootstrap function already bootstrapped. + */ + default boolean isAlreadyBootstrap() throws Exception { + throw new UnsupportedOperationException(); + } + + /** + * Returns whether the write task is confirming. + */ + default boolean isConforming() { + throw new UnsupportedOperationException(); + } + + /** + * Close this function wrapper. + */ + void close() throws Exception; +} diff --git a/hudi-flink/src/test/java/org/apache/hudi/sink/utils/TestWriteBase.java b/hudi-flink/src/test/java/org/apache/hudi/sink/utils/TestWriteBase.java new file mode 100644 index 000000000..e3b1226a6 --- /dev/null +++ b/hudi-flink/src/test/java/org/apache/hudi/sink/utils/TestWriteBase.java @@ -0,0 +1,425 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.sink.utils; + +import org.apache.hudi.client.WriteStatus; +import org.apache.hudi.common.fs.FSUtils; +import org.apache.hudi.common.model.HoodieKey; +import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.common.table.TableSchemaResolver; +import org.apache.hudi.common.table.timeline.HoodieInstant; +import org.apache.hudi.configuration.FlinkOptions; +import org.apache.hudi.configuration.OptionsResolver; +import org.apache.hudi.sink.event.WriteMetadataEvent; +import org.apache.hudi.util.StreamerUtil; +import org.apache.hudi.utils.TestData; +import org.apache.hudi.utils.TestUtils; + +import org.apache.avro.Schema; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.operators.coordination.OperatorEvent; +import org.apache.flink.table.data.RowData; +import org.apache.hadoop.fs.FileSystem; +import org.hamcrest.MatcherAssert; + +import java.io.File; +import java.io.IOException; +import java.util.Arrays; +import java.util.Comparator; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +import static org.hamcrest.CoreMatchers.instanceOf; +import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNotEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertTrue; + +/** + * Base class for write test cases. + */ +public class TestWriteBase { + protected static final Map EXPECTED1 = new HashMap<>(); + + protected static final Map EXPECTED2 = new HashMap<>(); + + protected static final Map EXPECTED3 = new HashMap<>(); + + protected static final Map EXPECTED4 = new HashMap<>(); + + protected static final Map> EXPECTED5 = new HashMap<>(); + + static { + EXPECTED1.put("par1", "[id1,par1,id1,Danny,23,1,par1, id2,par1,id2,Stephen,33,2,par1]"); + EXPECTED1.put("par2", "[id3,par2,id3,Julian,53,3,par2, id4,par2,id4,Fabian,31,4,par2]"); + EXPECTED1.put("par3", "[id5,par3,id5,Sophia,18,5,par3, id6,par3,id6,Emma,20,6,par3]"); + EXPECTED1.put("par4", "[id7,par4,id7,Bob,44,7,par4, id8,par4,id8,Han,56,8,par4]"); + + EXPECTED2.put("par1", "[id1,par1,id1,Danny,24,1,par1, id2,par1,id2,Stephen,34,2,par1]"); + EXPECTED2.put("par2", "[id3,par2,id3,Julian,54,3,par2, id4,par2,id4,Fabian,32,4,par2]"); + EXPECTED2.put("par3", "[id5,par3,id5,Sophia,18,5,par3, id6,par3,id6,Emma,20,6,par3, " + + "id9,par3,id9,Jane,19,6,par3]"); + EXPECTED2.put("par4", "[id10,par4,id10,Ella,38,7,par4, id11,par4,id11,Phoebe,52,8,par4, " + + "id7,par4,id7,Bob,44,7,par4, id8,par4,id8,Han,56,8,par4]"); + + EXPECTED3.put("par1", "[id1,par1,id1,Danny,23,1,par1]"); + + EXPECTED4.put("par1", "[" + + "id1,par1,id1,Danny,23,0,par1, " + + "id1,par1,id1,Danny,23,1,par1, " + + "id1,par1,id1,Danny,23,2,par1, " + + "id1,par1,id1,Danny,23,3,par1, " + + "id1,par1,id1,Danny,23,4,par1]"); + + EXPECTED5.put("par1", Arrays.asList( + "id1,par1,id1,Danny,23,0,par1", + "id1,par1,id1,Danny,23,0,par1", + "id1,par1,id1,Danny,23,1,par1", + "id1,par1,id1,Danny,23,1,par1", + "id1,par1,id1,Danny,23,2,par1", + "id1,par1,id1,Danny,23,2,par1", + "id1,par1,id1,Danny,23,3,par1", + "id1,par1,id1,Danny,23,3,par1", + "id1,par1,id1,Danny,23,4,par1", + "id1,par1,id1,Danny,23,4,par1")); + } + + // ------------------------------------------------------------------------- + // Inner Class + // ------------------------------------------------------------------------- + + /** + * Utils to composite the test stages. + */ + public static class TestHarness { + public static TestHarness instance() { + return new TestHarness(); + } + + private File baseFile; + private String basePath; + private Configuration conf; + private TestFunctionWrapper pipeline; + + private String lastPending; + private String lastComplete; + + public TestHarness preparePipeline(File basePath, Configuration conf) throws Exception { + preparePipeline(basePath, conf, false); + return this; + } + + public TestHarness preparePipeline(File basePath, Configuration conf, boolean append) throws Exception { + this.baseFile = basePath; + this.basePath = this.baseFile.getAbsolutePath(); + this.conf = conf; + this.pipeline = append + ? new InsertFunctionWrapper<>(this.basePath, conf) + : new StreamWriteFunctionWrapper<>(this.basePath, conf); + // open the function and ingest data + this.pipeline.openFunction(); + return this; + } + + public TestHarness consume(List inputs) throws Exception { + for (RowData rowData : inputs) { + this.pipeline.invoke(rowData); + } + return this; + } + + public TestHarness assertConsumeDoesNotThrow(List inputs) { + assertDoesNotThrow(() -> { + consume(inputs); + }, "The stream writer reuse the last instant time when waiting for the last instant commit timeout"); + return this; + } + + /** + * Assert the event buffer is empty. + */ + public TestHarness emptyEventBuffer() { + assertTrue( + this.pipeline.getEventBuffer().length == 1 + && this.pipeline.getEventBuffer()[0] == null, + "The coordinator events buffer expect to be empty"); + return this; + } + + /** + * Assert the next event exists and handle over it to the coordinator. + */ + public TestHarness assertNextEvent() { + final OperatorEvent nextEvent = this.pipeline.getNextEvent(); + MatcherAssert.assertThat("The operator expect to send an event", nextEvent, instanceOf(WriteMetadataEvent.class)); + this.pipeline.getCoordinator().handleEventFromOperator(0, nextEvent); + assertNotNull(this.pipeline.getEventBuffer()[0], "The coordinator missed the event"); + return this; + } + + /** + * Assert the next event exists and handle over it to the coordinator. + * + * @param numWriteStatus The expected write status num reported by the event + * @param partitions The written partitions reported by the event + */ + public TestHarness assertNextEvent(int numWriteStatus, String partitions) { + final OperatorEvent nextEvent = this.pipeline.getNextEvent(); + MatcherAssert.assertThat("The operator expect to send an event", nextEvent, instanceOf(WriteMetadataEvent.class)); + List writeStatuses = ((WriteMetadataEvent) nextEvent).getWriteStatuses(); + assertNotNull(writeStatuses); + MatcherAssert.assertThat(writeStatuses.size(), is(numWriteStatus)); + assertThat(writeStatuses.stream() + .map(WriteStatus::getPartitionPath).sorted(Comparator.naturalOrder()) + .collect(Collectors.joining(",")), + is(partitions)); + this.pipeline.getCoordinator().handleEventFromOperator(0, nextEvent); + assertNotNull(this.pipeline.getEventBuffer()[0], "The coordinator missed the event"); + return this; + } + + /** + * Assert the next event exists and handle over it to the coordinator. + * + *

Validates that the write metadata reported by the event is empty. + */ + public TestHarness assertEmptyEvent() { + final OperatorEvent nextEvent = this.pipeline.getNextEvent(); + MatcherAssert.assertThat("The operator expect to send an event", nextEvent, instanceOf(WriteMetadataEvent.class)); + List writeStatuses = ((WriteMetadataEvent) nextEvent).getWriteStatuses(); + assertNotNull(writeStatuses); + MatcherAssert.assertThat(writeStatuses.size(), is(0)); + this.pipeline.getCoordinator().handleEventFromOperator(0, nextEvent); + assertNotNull(this.pipeline.getEventBuffer()[0], "The coordinator missed the event"); + return this; + } + + /** + * Assert the data buffer with given number of buckets and records. + */ + public TestHarness assertDataBuffer(int numBuckets, int numRecords) { + Map> dataBuffer = this.pipeline.getDataBuffer(); + assertThat("Should have " + numBuckets + " data bucket", dataBuffer.size(), is(numBuckets)); + assertThat(numRecords + " records expect to flush out as a mini-batch", + dataBuffer.values().stream().findFirst().map(List::size).orElse(-1), + is(numRecords)); + return this; + } + + /** + * Checkpoints the pipeline, which triggers the data write and event send. + */ + public TestHarness checkpoint(long checkpointId) throws Exception { + this.pipeline.checkpointFunction(checkpointId); + return this; + } + + public TestHarness allDataFlushed() { + Map> dataBuffer = this.pipeline.getDataBuffer(); + assertThat("All data should be flushed out", dataBuffer.size(), is(0)); + return this; + } + + /** + * Handle the next {@code numEvents} events and handle over them to the coordinator. + */ + public TestHarness handleEvents(int numEvents) { + for (int i = 0; i < numEvents; i++) { + final OperatorEvent event = this.pipeline.getNextEvent(); // remove the first event first + assertThat("The operator expect to send an event", event, instanceOf(WriteMetadataEvent.class)); + this.pipeline.getCoordinator().handleEventFromOperator(0, event); + } + assertNotNull(this.pipeline.getEventBuffer()[0], "The coordinator missed the event"); + return this; + } + + /** + * Mark the checkpoint with id {@code checkpointId} as finished. + */ + public TestHarness checkpointComplete(long checkpointId) { + this.lastPending = lastPendingInstant(); + this.pipeline.checkpointComplete(checkpointId); + // started a new instant already + checkInflightInstant(); + checkInstantState(HoodieInstant.State.COMPLETED, lastPending); + this.lastComplete = lastPending; + this.lastPending = lastPendingInstant(); // refresh last pending instant + return this; + } + + /** + * Mark the checkpoint finished with empty write metadata. + */ + public TestHarness emptyCheckpoint(long checkpointId) { + String lastPending = lastPendingInstant(); + this.pipeline.checkpointComplete(checkpointId); + // last pending instant was reused + assertEquals(this.lastPending, lastPending); + checkInstantState(HoodieInstant.State.COMPLETED, lastComplete); + return this; + } + + /** + * Mark the checkpoint with id {@code checkpointId} as failed. + */ + public TestHarness checkpointFails(long checkpointId) { + this.pipeline.checkpointFails(checkpointId); + assertFalse(this.pipeline.getCoordinatorContext().isJobFailed(), + "The last checkpoint was aborted, ignore the events"); + // no complete instant + checkInstantState(HoodieInstant.State.COMPLETED, null); + return this; + } + + public TestHarness checkpointNotThrow(long checkpointId, String message) { + // this returns early because there is no inflight instant + assertDoesNotThrow(() -> checkpoint(checkpointId), message); + return this; + } + + /** + * Mark the task with id {@code taskId} as failed. + */ + public TestHarness subTaskFails(int taskId) throws Exception { + // fails the subtask + String instant1 = lastPendingInstant(); + this.pipeline.subTaskFails(taskId); + + String instant2 = lastPendingInstant(); + assertNotEquals(instant2, instant1, "The previous instant should be rolled back when starting new instant"); + return this; + } + + public TestHarness noCompleteInstant() { + // no complete instant + checkInstantState(HoodieInstant.State.COMPLETED, null); + return this; + } + + /** + * Asserts the data files are empty. + */ + public TestHarness assertEmptyDataFiles() { + File[] dataFiles = baseFile.listFiles(file -> !file.getName().startsWith(".")); + assertNotNull(dataFiles); + assertThat(dataFiles.length, is(0)); + return this; + } + + public TestHarness checkWrittenData(Map expected) throws Exception { + checkWrittenData(expected, 4); + return this; + } + + public TestHarness checkWrittenData( + Map expected, + int partitions) throws Exception { + if (OptionsResolver.isCowTable(conf) || conf.getBoolean(FlinkOptions.COMPACTION_ASYNC_ENABLED)) { + TestData.checkWrittenData(this.baseFile, expected, partitions); + } else { + checkWrittenDataMor(baseFile, expected, partitions); + } + return this; + } + + private void checkWrittenDataMor(File baseFile, Map expected, int partitions) throws Exception { + HoodieTableMetaClient metaClient = StreamerUtil.createMetaClient(basePath); + Schema schema = new TableSchemaResolver(metaClient).getTableAvroSchema(); + String latestInstant = lastCompleteInstant(); + FileSystem fs = FSUtils.getFs(basePath, new org.apache.hadoop.conf.Configuration()); + TestData.checkWrittenDataMOR(fs, latestInstant, baseFile, expected, partitions, schema); + } + + public TestHarness checkWrittenFullData(Map> expected) throws IOException { + TestData.checkWrittenFullData(this.baseFile, expected); + return this; + } + + public TestHarness checkWrittenAllData(Map expected, int partitions) throws IOException { + TestData.checkWrittenAllData(baseFile, expected, partitions); + return this; + } + + public TestHarness checkIndexLoaded(HoodieKey... keys) { + for (HoodieKey key : keys) { + assertTrue(this.pipeline.isKeyInState(key), + "Key: " + key + " assumes to be in the index state"); + } + return this; + } + + public TestHarness assertBootstrapped() throws Exception { + assertTrue(this.pipeline.isAlreadyBootstrap()); + return this; + } + + public TestHarness assertConfirming() { + assertTrue(this.pipeline.isConforming(), + "The write function should be waiting for the instant to commit"); + return this; + } + + public TestHarness assertNotConfirming() { + assertFalse(this.pipeline.isConforming(), + "The write function should finish waiting for the instant to commit"); + return this; + } + + public void end() throws Exception { + this.pipeline.close(); + } + + private String lastPendingInstant() { + return TestUtils.getLastPendingInstant(basePath); + } + + private void checkInflightInstant() { + final String instant = TestUtils.getLastPendingInstant(basePath); + assertNotNull(instant); + } + + private void checkInstantState(HoodieInstant.State state, String instantStr) { + final String instant; + switch (state) { + case REQUESTED: + instant = lastPendingInstant(); + break; + case COMPLETED: + instant = lastCompleteInstant(); + break; + default: + throw new AssertionError("Unexpected state"); + } + assertThat(instant, is(instantStr)); + } + + protected String lastCompleteInstant() { + return OptionsResolver.isMorTable(conf) + ? TestUtils.getLastDeltaCompleteInstant(basePath) + : TestUtils.getLastCompleteInstant(basePath); + } + } +} diff --git a/hudi-flink/src/test/java/org/apache/hudi/utils/TestData.java b/hudi-flink/src/test/java/org/apache/hudi/utils/TestData.java index b0f7b5f08..e8e177b82 100644 --- a/hudi-flink/src/test/java/org/apache/hudi/utils/TestData.java +++ b/hudi-flink/src/test/java/org/apache/hudi/utils/TestData.java @@ -18,7 +18,6 @@ package org.apache.hudi.utils; -import org.apache.hudi.client.FlinkTaskContextSupplier; import org.apache.hudi.client.common.HoodieFlinkEngineContext; import org.apache.hudi.common.config.HoodieCommonConfig; import org.apache.hudi.common.fs.FSUtils; @@ -543,17 +542,15 @@ public class TestData { // 1. init flink table HoodieTableMetaClient metaClient = HoodieTestUtils.init(basePath.getAbsolutePath()); HoodieWriteConfig config = HoodieWriteConfig.newBuilder().withPath(basePath.getAbsolutePath()).build(); - FlinkTaskContextSupplier supplier = new FlinkTaskContextSupplier(null); - HoodieFlinkEngineContext context = new HoodieFlinkEngineContext(supplier); - HoodieFlinkTable table = HoodieFlinkTable.create(config, context, metaClient); + HoodieFlinkTable table = HoodieFlinkTable.create(config, HoodieFlinkEngineContext.DEFAULT, metaClient); // 2. check each partition data expected.forEach((partition, partitionDataSet) -> { List readBuffer = new ArrayList<>(); - table.getFileSystemView().getAllFileGroups(partition) - .forEach(v -> v.getLatestDataFile().ifPresent(baseFile -> { + table.getBaseFileOnlyView().getLatestBaseFiles(partition) + .forEach(baseFile -> { String path = baseFile.getPath(); try { ParquetReader reader = AvroParquetReader.builder(new Path(path)).build(); @@ -565,7 +562,7 @@ public class TestData { } catch (IOException e) { throw new RuntimeException(e); } - })); + }); assertTrue(partitionDataSet.size() == readBuffer.size() && partitionDataSet.containsAll(readBuffer));