diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java b/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java index 8f2269558..a67cdae96 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java +++ b/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java @@ -182,7 +182,7 @@ public class StreamWriteFunction // it would check the validity. this.onCheckpointing = true; // wait for the buffer data flush out and request a new instant - flushBuffer(true); + flushBuffer(true, false); // signal the task thread to start buffering addToBufferCondition.signal(); } finally { @@ -221,7 +221,7 @@ public class StreamWriteFunction * End input action for batch source. */ public void endInput() { - flushBuffer(true); + flushBuffer(true, true); this.writeClient.cleanHandles(); } @@ -333,13 +333,13 @@ public class StreamWriteFunction private void flushBufferOnCondition(I value) { boolean needFlush = this.detector.detect(value); if (needFlush) { - flushBuffer(false); + flushBuffer(false, false); this.detector.reset(); } } @SuppressWarnings("unchecked, rawtypes") - private void flushBuffer(boolean isFinalBatch) { + private void flushBuffer(boolean isLastBatch, boolean isEndInput) { this.currentInstant = this.writeClient.getInflightAndRequestedInstant(this.config.get(FlinkOptions.TABLE_TYPE)); if (this.currentInstant == null) { // in case there are empty checkpoints that has no input data @@ -364,8 +364,14 @@ public class StreamWriteFunction LOG.info("No data to write in subtask [{}] for instant [{}]", taskID, currentInstant); writeStatus = Collections.emptyList(); } - this.eventGateway.sendEventToCoordinator( - new BatchWriteSuccessEvent(this.taskID, currentInstant, writeStatus, isFinalBatch)); + final BatchWriteSuccessEvent event = BatchWriteSuccessEvent.builder() + .taskID(taskID) + .instantTime(currentInstant) + .writeStatus(writeStatus) + .isLastBatch(isLastBatch) + .isEndInput(isEndInput) + .build(); + this.eventGateway.sendEventToCoordinator(event); this.buffer.clear(); this.currentInstant = ""; } diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java b/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java index 942c6a009..9a509fd59 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java +++ b/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java @@ -102,11 +102,6 @@ public class StreamWriteOperatorCoordinator */ private final int parallelism; - /** - * Whether the coordinator executes with the bounded data set. - */ - private final boolean isBounded; - /** * Whether needs to schedule compaction task on finished checkpoints. */ @@ -117,16 +112,13 @@ public class StreamWriteOperatorCoordinator * * @param conf The config options * @param parallelism The operator task number - * @param isBounded Whether the input data source is bounded */ public StreamWriteOperatorCoordinator( Configuration conf, - int parallelism, - boolean isBounded) { + int parallelism) { this.conf = conf; this.parallelism = parallelism; this.needsScheduleCompaction = StreamerUtil.needsScheduleCompaction(conf); - this.isBounded = isBounded; } @Override @@ -143,11 +135,6 @@ public class StreamWriteOperatorCoordinator @Override public void close() { - if (isBounded) { - // start to commit the instant. - checkAndCommitWithRetry(); - // no compaction scheduling for batch mode - } // teardown the resource if (writeClient != null) { writeClient.close(); @@ -216,6 +203,11 @@ public class StreamWriteOperatorCoordinator } else { this.eventBuffer[event.getTaskID()] = event; } + if (event.isEndInput() && checkReady()) { + // start to commit the instant. + doCommit(); + // no compaction scheduling for batch mode + } } @Override @@ -424,12 +416,10 @@ public class StreamWriteOperatorCoordinator public static class Provider implements OperatorCoordinator.Provider { private final OperatorID operatorId; private final Configuration conf; - private final boolean isBounded; - public Provider(OperatorID operatorId, Configuration conf, boolean isBounded) { + public Provider(OperatorID operatorId, Configuration conf) { this.operatorId = operatorId; this.conf = conf; - this.isBounded = isBounded; } @Override @@ -439,7 +429,7 @@ public class StreamWriteOperatorCoordinator @Override public OperatorCoordinator create(Context context) { - return new StreamWriteOperatorCoordinator(this.conf, context.currentParallelism(), isBounded); + return new StreamWriteOperatorCoordinator(this.conf, context.currentParallelism()); } } } diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorFactory.java b/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorFactory.java index 676390397..7e78d581d 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorFactory.java +++ b/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorFactory.java @@ -39,20 +39,11 @@ public class StreamWriteOperatorFactory private final StreamWriteOperator operator; private final Configuration conf; - private final boolean isBounded; - public StreamWriteOperatorFactory( - Configuration conf) { - this(conf, false); - } - - public StreamWriteOperatorFactory( - Configuration conf, - boolean isBounded) { + public StreamWriteOperatorFactory(Configuration conf) { super(new StreamWriteOperator<>(conf)); this.operator = (StreamWriteOperator) getOperator(); this.conf = conf; - this.isBounded = isBounded; } @Override @@ -70,7 +61,7 @@ public class StreamWriteOperatorFactory @Override public OperatorCoordinator.Provider getCoordinatorProvider(String s, OperatorID operatorID) { - return new StreamWriteOperatorCoordinator.Provider(operatorID, this.conf, isBounded); + return new StreamWriteOperatorCoordinator.Provider(operatorID, this.conf); } @Override diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/event/BatchWriteSuccessEvent.java b/hudi-flink/src/main/java/org/apache/hudi/sink/event/BatchWriteSuccessEvent.java index 66521b101..dd40de2a8 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/sink/event/BatchWriteSuccessEvent.java +++ b/hudi-flink/src/main/java/org/apache/hudi/sink/event/BatchWriteSuccessEvent.java @@ -25,6 +25,7 @@ import org.apache.flink.runtime.operators.coordination.OperatorEvent; import java.util.ArrayList; import java.util.List; +import java.util.Objects; /** * An operator event to mark successful checkpoint batch write. @@ -36,13 +37,13 @@ public class BatchWriteSuccessEvent implements OperatorEvent { private final int taskID; private final String instantTime; private boolean isLastBatch; - - public BatchWriteSuccessEvent( - int taskID, - String instantTime, - List writeStatuses) { - this(taskID, instantTime, writeStatuses, false); - } + /** + * Flag saying whether the event comes from the end of input, e.g. the source + * is bounded, there are two cases in which this flag should be set to true: + * 1. batch execution mode + * 2. bounded stream source such as VALUES + */ + private final boolean isEndInput; /** * Creates an event. @@ -55,15 +56,24 @@ public class BatchWriteSuccessEvent implements OperatorEvent { * if true, the whole data set of the checkpoint * has been flushed successfully */ - public BatchWriteSuccessEvent( + private BatchWriteSuccessEvent( int taskID, String instantTime, List writeStatuses, - boolean isLastBatch) { + boolean isLastBatch, + boolean isEndInput) { this.taskID = taskID; this.instantTime = instantTime; this.writeStatuses = new ArrayList<>(writeStatuses); this.isLastBatch = isLastBatch; + this.isEndInput = isEndInput; + } + + /** + * Returns the builder for {@link BatchWriteSuccessEvent}. + */ + public static Builder builder() { + return new Builder(); } public List getWriteStatuses() { @@ -82,6 +92,10 @@ public class BatchWriteSuccessEvent implements OperatorEvent { return isLastBatch; } + public boolean isEndInput() { + return isEndInput; + } + /** * Merges this event with given {@link BatchWriteSuccessEvent} {@code other}. * @@ -101,4 +115,51 @@ public class BatchWriteSuccessEvent implements OperatorEvent { public boolean isReady(String currentInstant) { return isLastBatch && this.instantTime.equals(currentInstant); } + + // ------------------------------------------------------------------------- + // Builder + // ------------------------------------------------------------------------- + + /** + * Builder for {@link BatchWriteSuccessEvent}. + */ + public static class Builder { + private List writeStatus; + private Integer taskID; + private String instantTime; + private boolean isLastBatch = false; + private boolean isEndInput = false; + + public BatchWriteSuccessEvent build() { + Objects.requireNonNull(taskID); + Objects.requireNonNull(instantTime); + Objects.requireNonNull(writeStatus); + return new BatchWriteSuccessEvent(taskID, instantTime, writeStatus, isLastBatch, isEndInput); + } + + public Builder taskID(int taskID) { + this.taskID = taskID; + return this; + } + + public Builder instantTime(String instantTime) { + this.instantTime = instantTime; + return this; + } + + public Builder writeStatus(List writeStatus) { + this.writeStatus = writeStatus; + return this; + } + + public Builder isLastBatch(boolean isLastBatch) { + this.isLastBatch = isLastBatch; + return this; + } + + public Builder isEndInput(boolean isEndInput) { + this.isEndInput = isEndInput; + return this; + } + } } diff --git a/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableFactory.java b/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableFactory.java index b16cfdb6e..36020f48c 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableFactory.java +++ b/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableFactory.java @@ -69,7 +69,7 @@ public class HoodieTableFactory implements TableSourceFactory, TableSin conf.setString(FlinkOptions.PARTITION_PATH_FIELD, String.join(",", context.getTable().getPartitionKeys())); TableSchema tableSchema = TableSchemaUtils.getPhysicalSchema(context.getTable().getSchema()); inferAvroSchema(conf, tableSchema.toRowDataType().notNull().getLogicalType()); - return new HoodieTableSink(conf, tableSchema, context.isBounded()); + return new HoodieTableSink(conf, tableSchema); } @Override diff --git a/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSink.java b/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSink.java index c973f0865..0891f1972 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSink.java +++ b/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSink.java @@ -54,12 +54,10 @@ public class HoodieTableSink implements AppendStreamTableSink, Partitio private final Configuration conf; private final TableSchema schema; - private final boolean isBounded; - public HoodieTableSink(Configuration conf, TableSchema schema, boolean isBounded) { + public HoodieTableSink(Configuration conf, TableSchema schema) { this.conf = conf; this.schema = schema; - this.isBounded = isBounded; } @Override @@ -67,7 +65,7 @@ public class HoodieTableSink implements AppendStreamTableSink, Partitio // Read from kafka source RowType rowType = (RowType) this.schema.toRowDataType().notNull().getLogicalType(); int numWriteTasks = this.conf.getInteger(FlinkOptions.WRITE_TASKS); - StreamWriteOperatorFactory operatorFactory = new StreamWriteOperatorFactory<>(conf, isBounded); + StreamWriteOperatorFactory operatorFactory = new StreamWriteOperatorFactory<>(conf); DataStream pipeline = dataStream .map(new RowDataToHoodieFunction<>(rowType, conf), TypeInformation.of(HoodieRecord.class)) diff --git a/hudi-flink/src/test/java/org/apache/hudi/sink/TestStreamWriteOperatorCoordinator.java b/hudi-flink/src/test/java/org/apache/hudi/sink/TestStreamWriteOperatorCoordinator.java index b68f62e89..bc8f95656 100644 --- a/hudi-flink/src/test/java/org/apache/hudi/sink/TestStreamWriteOperatorCoordinator.java +++ b/hudi-flink/src/test/java/org/apache/hudi/sink/TestStreamWriteOperatorCoordinator.java @@ -58,7 +58,7 @@ public class TestStreamWriteOperatorCoordinator { @BeforeEach public void before() throws Exception { coordinator = new StreamWriteOperatorCoordinator( - TestConfigurations.getDefaultConf(tempFile.getAbsolutePath()), 2, false); + TestConfigurations.getDefaultConf(tempFile.getAbsolutePath()), 2); coordinator.start(); } @@ -75,14 +75,22 @@ public class TestStreamWriteOperatorCoordinator { WriteStatus writeStatus = new WriteStatus(true, 0.1D); writeStatus.setPartitionPath("par1"); writeStatus.setStat(new HoodieWriteStat()); - OperatorEvent event0 = - new BatchWriteSuccessEvent(0, instant, Collections.singletonList(writeStatus), true); + OperatorEvent event0 = BatchWriteSuccessEvent.builder() + .taskID(0) + .instantTime(instant) + .writeStatus(Collections.singletonList(writeStatus)) + .isLastBatch(true) + .build(); WriteStatus writeStatus1 = new WriteStatus(false, 0.2D); writeStatus1.setPartitionPath("par2"); writeStatus1.setStat(new HoodieWriteStat()); - OperatorEvent event1 = - new BatchWriteSuccessEvent(1, instant, Collections.singletonList(writeStatus1), true); + OperatorEvent event1 = BatchWriteSuccessEvent.builder() + .taskID(1) + .instantTime(instant) + .writeStatus(Collections.singletonList(writeStatus1)) + .isLastBatch(true) + .build(); coordinator.handleEventFromOperator(0, event0); coordinator.handleEventFromOperator(1, event1); @@ -115,7 +123,11 @@ public class TestStreamWriteOperatorCoordinator { public void testReceiveInvalidEvent() { CompletableFuture future = new CompletableFuture<>(); coordinator.checkpointCoordinator(1, future); - OperatorEvent event = new BatchWriteSuccessEvent(0, "abc", Collections.emptyList()); + OperatorEvent event = BatchWriteSuccessEvent.builder() + .taskID(0) + .instantTime("abc") + .writeStatus(Collections.emptyList()) + .build(); assertThrows(IllegalStateException.class, () -> coordinator.handleEventFromOperator(0, event), "Receive an unexpected event for instant abc from task 0"); @@ -126,7 +138,11 @@ public class TestStreamWriteOperatorCoordinator { final CompletableFuture future = new CompletableFuture<>(); coordinator.checkpointCoordinator(1, future); String inflightInstant = coordinator.getInstant(); - OperatorEvent event = new BatchWriteSuccessEvent(0, inflightInstant, Collections.emptyList()); + OperatorEvent event = BatchWriteSuccessEvent.builder() + .taskID(0) + .instantTime(inflightInstant) + .writeStatus(Collections.emptyList()) + .build(); coordinator.handleEventFromOperator(0, event); assertThrows(HoodieException.class, () -> coordinator.checkpointComplete(1), diff --git a/hudi-flink/src/test/java/org/apache/hudi/sink/utils/StreamWriteFunctionWrapper.java b/hudi-flink/src/test/java/org/apache/hudi/sink/utils/StreamWriteFunctionWrapper.java index ee7774d56..b80ac72b5 100644 --- a/hudi-flink/src/test/java/org/apache/hudi/sink/utils/StreamWriteFunctionWrapper.java +++ b/hudi-flink/src/test/java/org/apache/hudi/sink/utils/StreamWriteFunctionWrapper.java @@ -84,7 +84,7 @@ public class StreamWriteFunctionWrapper { this.gateway = new MockOperatorEventGateway(); this.conf = conf; // one function - this.coordinator = new StreamWriteOperatorCoordinator(conf, 1, false); + this.coordinator = new StreamWriteOperatorCoordinator(conf, 1); this.functionInitializationContext = new MockFunctionInitializationContext(); this.compactFunctionWrapper = new CompactFunctionWrapper(this.conf); } diff --git a/hudi-flink/src/test/java/org/apache/hudi/table/HoodieDataSourceITCase.java b/hudi-flink/src/test/java/org/apache/hudi/table/HoodieDataSourceITCase.java index eca14e9a2..cf99e8b22 100644 --- a/hudi-flink/src/test/java/org/apache/hudi/table/HoodieDataSourceITCase.java +++ b/hudi-flink/src/test/java/org/apache/hudi/table/HoodieDataSourceITCase.java @@ -36,6 +36,8 @@ import org.apache.flink.util.CollectionUtil; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.io.TempDir; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.EnumSource; import java.io.File; import java.util.Collection; @@ -149,12 +151,14 @@ public class HoodieDataSourceITCase extends AbstractTestBase { assertRowsEquals(rows, TestData.DATA_SET_SOURCE_INSERT); } - @Test - void testBatchWriteAndRead() { + @ParameterizedTest + @EnumSource(value = ExecMode.class) + void testWriteAndRead(ExecMode execMode) { + TableEnvironment tableEnv = execMode == ExecMode.BATCH ? batchTableEnv : streamTableEnv; Map options = new HashMap<>(); options.put(FlinkOptions.PATH.key(), tempFile.getAbsolutePath()); String hoodieTableDDL = TestConfigurations.getCreateHoodieTableDDL("t1", options); - batchTableEnv.executeSql(hoodieTableDDL); + tableEnv.executeSql(hoodieTableDDL); String insertInto = "insert into t1 values\n" + "('id1','Danny',23,TIMESTAMP '1970-01-01 00:00:01','par1'),\n" + "('id2','Stephen',33,TIMESTAMP '1970-01-01 00:00:02','par1'),\n" @@ -165,13 +169,20 @@ public class HoodieDataSourceITCase extends AbstractTestBase { + "('id7','Bob',44,TIMESTAMP '1970-01-01 00:00:07','par4'),\n" + "('id8','Han',56,TIMESTAMP '1970-01-01 00:00:08','par4')"; - execInsertSql(batchTableEnv, insertInto); + execInsertSql(tableEnv, insertInto); List rows = CollectionUtil.iterableToList( - () -> batchTableEnv.sqlQuery("select * from t1").execute().collect()); + () -> tableEnv.sqlQuery("select * from t1").execute().collect()); assertRowsEquals(rows, TestData.DATA_SET_SOURCE_INSERT); } + // ------------------------------------------------------------------------- + // Utilities + // ------------------------------------------------------------------------- + private enum ExecMode { + BATCH, STREAM + } + private void execInsertSql(TableEnvironment tEnv, String insert) { TableResult tableResult = tEnv.executeSql(insert); // wait to finish