[HUDI-1692] Bounded source for stream writer (#2674)
Supports bounded source such as VALUES for stream mode writer.
This commit is contained in:
@@ -182,7 +182,7 @@ public class StreamWriteFunction<K, I, O>
|
||||
// it would check the validity.
|
||||
this.onCheckpointing = true;
|
||||
// wait for the buffer data flush out and request a new instant
|
||||
flushBuffer(true);
|
||||
flushBuffer(true, false);
|
||||
// signal the task thread to start buffering
|
||||
addToBufferCondition.signal();
|
||||
} finally {
|
||||
@@ -221,7 +221,7 @@ public class StreamWriteFunction<K, I, O>
|
||||
* End input action for batch source.
|
||||
*/
|
||||
public void endInput() {
|
||||
flushBuffer(true);
|
||||
flushBuffer(true, true);
|
||||
this.writeClient.cleanHandles();
|
||||
}
|
||||
|
||||
@@ -333,13 +333,13 @@ public class StreamWriteFunction<K, I, O>
|
||||
private void flushBufferOnCondition(I value) {
|
||||
boolean needFlush = this.detector.detect(value);
|
||||
if (needFlush) {
|
||||
flushBuffer(false);
|
||||
flushBuffer(false, false);
|
||||
this.detector.reset();
|
||||
}
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked, rawtypes")
|
||||
private void flushBuffer(boolean isFinalBatch) {
|
||||
private void flushBuffer(boolean isLastBatch, boolean isEndInput) {
|
||||
this.currentInstant = this.writeClient.getInflightAndRequestedInstant(this.config.get(FlinkOptions.TABLE_TYPE));
|
||||
if (this.currentInstant == null) {
|
||||
// in case there are empty checkpoints that has no input data
|
||||
@@ -364,8 +364,14 @@ public class StreamWriteFunction<K, I, O>
|
||||
LOG.info("No data to write in subtask [{}] for instant [{}]", taskID, currentInstant);
|
||||
writeStatus = Collections.emptyList();
|
||||
}
|
||||
this.eventGateway.sendEventToCoordinator(
|
||||
new BatchWriteSuccessEvent(this.taskID, currentInstant, writeStatus, isFinalBatch));
|
||||
final BatchWriteSuccessEvent event = BatchWriteSuccessEvent.builder()
|
||||
.taskID(taskID)
|
||||
.instantTime(currentInstant)
|
||||
.writeStatus(writeStatus)
|
||||
.isLastBatch(isLastBatch)
|
||||
.isEndInput(isEndInput)
|
||||
.build();
|
||||
this.eventGateway.sendEventToCoordinator(event);
|
||||
this.buffer.clear();
|
||||
this.currentInstant = "";
|
||||
}
|
||||
|
||||
@@ -102,11 +102,6 @@ public class StreamWriteOperatorCoordinator
|
||||
*/
|
||||
private final int parallelism;
|
||||
|
||||
/**
|
||||
* Whether the coordinator executes with the bounded data set.
|
||||
*/
|
||||
private final boolean isBounded;
|
||||
|
||||
/**
|
||||
* Whether needs to schedule compaction task on finished checkpoints.
|
||||
*/
|
||||
@@ -117,16 +112,13 @@ public class StreamWriteOperatorCoordinator
|
||||
*
|
||||
* @param conf The config options
|
||||
* @param parallelism The operator task number
|
||||
* @param isBounded Whether the input data source is bounded
|
||||
*/
|
||||
public StreamWriteOperatorCoordinator(
|
||||
Configuration conf,
|
||||
int parallelism,
|
||||
boolean isBounded) {
|
||||
int parallelism) {
|
||||
this.conf = conf;
|
||||
this.parallelism = parallelism;
|
||||
this.needsScheduleCompaction = StreamerUtil.needsScheduleCompaction(conf);
|
||||
this.isBounded = isBounded;
|
||||
}
|
||||
|
||||
@Override
|
||||
@@ -143,11 +135,6 @@ public class StreamWriteOperatorCoordinator
|
||||
|
||||
@Override
|
||||
public void close() {
|
||||
if (isBounded) {
|
||||
// start to commit the instant.
|
||||
checkAndCommitWithRetry();
|
||||
// no compaction scheduling for batch mode
|
||||
}
|
||||
// teardown the resource
|
||||
if (writeClient != null) {
|
||||
writeClient.close();
|
||||
@@ -216,6 +203,11 @@ public class StreamWriteOperatorCoordinator
|
||||
} else {
|
||||
this.eventBuffer[event.getTaskID()] = event;
|
||||
}
|
||||
if (event.isEndInput() && checkReady()) {
|
||||
// start to commit the instant.
|
||||
doCommit();
|
||||
// no compaction scheduling for batch mode
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
@@ -424,12 +416,10 @@ public class StreamWriteOperatorCoordinator
|
||||
public static class Provider implements OperatorCoordinator.Provider {
|
||||
private final OperatorID operatorId;
|
||||
private final Configuration conf;
|
||||
private final boolean isBounded;
|
||||
|
||||
public Provider(OperatorID operatorId, Configuration conf, boolean isBounded) {
|
||||
public Provider(OperatorID operatorId, Configuration conf) {
|
||||
this.operatorId = operatorId;
|
||||
this.conf = conf;
|
||||
this.isBounded = isBounded;
|
||||
}
|
||||
|
||||
@Override
|
||||
@@ -439,7 +429,7 @@ public class StreamWriteOperatorCoordinator
|
||||
|
||||
@Override
|
||||
public OperatorCoordinator create(Context context) {
|
||||
return new StreamWriteOperatorCoordinator(this.conf, context.currentParallelism(), isBounded);
|
||||
return new StreamWriteOperatorCoordinator(this.conf, context.currentParallelism());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -39,20 +39,11 @@ public class StreamWriteOperatorFactory<I>
|
||||
|
||||
private final StreamWriteOperator<I> operator;
|
||||
private final Configuration conf;
|
||||
private final boolean isBounded;
|
||||
|
||||
public StreamWriteOperatorFactory(
|
||||
Configuration conf) {
|
||||
this(conf, false);
|
||||
}
|
||||
|
||||
public StreamWriteOperatorFactory(
|
||||
Configuration conf,
|
||||
boolean isBounded) {
|
||||
public StreamWriteOperatorFactory(Configuration conf) {
|
||||
super(new StreamWriteOperator<>(conf));
|
||||
this.operator = (StreamWriteOperator<I>) getOperator();
|
||||
this.conf = conf;
|
||||
this.isBounded = isBounded;
|
||||
}
|
||||
|
||||
@Override
|
||||
@@ -70,7 +61,7 @@ public class StreamWriteOperatorFactory<I>
|
||||
|
||||
@Override
|
||||
public OperatorCoordinator.Provider getCoordinatorProvider(String s, OperatorID operatorID) {
|
||||
return new StreamWriteOperatorCoordinator.Provider(operatorID, this.conf, isBounded);
|
||||
return new StreamWriteOperatorCoordinator.Provider(operatorID, this.conf);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
||||
@@ -25,6 +25,7 @@ import org.apache.flink.runtime.operators.coordination.OperatorEvent;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.Objects;
|
||||
|
||||
/**
|
||||
* An operator event to mark successful checkpoint batch write.
|
||||
@@ -36,13 +37,13 @@ public class BatchWriteSuccessEvent implements OperatorEvent {
|
||||
private final int taskID;
|
||||
private final String instantTime;
|
||||
private boolean isLastBatch;
|
||||
|
||||
public BatchWriteSuccessEvent(
|
||||
int taskID,
|
||||
String instantTime,
|
||||
List<WriteStatus> writeStatuses) {
|
||||
this(taskID, instantTime, writeStatuses, false);
|
||||
}
|
||||
/**
|
||||
* Flag saying whether the event comes from the end of input, e.g. the source
|
||||
* is bounded, there are two cases in which this flag should be set to true:
|
||||
* 1. batch execution mode
|
||||
* 2. bounded stream source such as VALUES
|
||||
*/
|
||||
private final boolean isEndInput;
|
||||
|
||||
/**
|
||||
* Creates an event.
|
||||
@@ -55,15 +56,24 @@ public class BatchWriteSuccessEvent implements OperatorEvent {
|
||||
* if true, the whole data set of the checkpoint
|
||||
* has been flushed successfully
|
||||
*/
|
||||
public BatchWriteSuccessEvent(
|
||||
private BatchWriteSuccessEvent(
|
||||
int taskID,
|
||||
String instantTime,
|
||||
List<WriteStatus> writeStatuses,
|
||||
boolean isLastBatch) {
|
||||
boolean isLastBatch,
|
||||
boolean isEndInput) {
|
||||
this.taskID = taskID;
|
||||
this.instantTime = instantTime;
|
||||
this.writeStatuses = new ArrayList<>(writeStatuses);
|
||||
this.isLastBatch = isLastBatch;
|
||||
this.isEndInput = isEndInput;
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the builder for {@link BatchWriteSuccessEvent}.
|
||||
*/
|
||||
public static Builder builder() {
|
||||
return new Builder();
|
||||
}
|
||||
|
||||
public List<WriteStatus> getWriteStatuses() {
|
||||
@@ -82,6 +92,10 @@ public class BatchWriteSuccessEvent implements OperatorEvent {
|
||||
return isLastBatch;
|
||||
}
|
||||
|
||||
public boolean isEndInput() {
|
||||
return isEndInput;
|
||||
}
|
||||
|
||||
/**
|
||||
* Merges this event with given {@link BatchWriteSuccessEvent} {@code other}.
|
||||
*
|
||||
@@ -101,4 +115,51 @@ public class BatchWriteSuccessEvent implements OperatorEvent {
|
||||
public boolean isReady(String currentInstant) {
|
||||
return isLastBatch && this.instantTime.equals(currentInstant);
|
||||
}
|
||||
|
||||
// -------------------------------------------------------------------------
|
||||
// Builder
|
||||
// -------------------------------------------------------------------------
|
||||
|
||||
/**
|
||||
* Builder for {@link BatchWriteSuccessEvent}.
|
||||
*/
|
||||
public static class Builder {
|
||||
private List<WriteStatus> writeStatus;
|
||||
private Integer taskID;
|
||||
private String instantTime;
|
||||
private boolean isLastBatch = false;
|
||||
private boolean isEndInput = false;
|
||||
|
||||
public BatchWriteSuccessEvent build() {
|
||||
Objects.requireNonNull(taskID);
|
||||
Objects.requireNonNull(instantTime);
|
||||
Objects.requireNonNull(writeStatus);
|
||||
return new BatchWriteSuccessEvent(taskID, instantTime, writeStatus, isLastBatch, isEndInput);
|
||||
}
|
||||
|
||||
public Builder taskID(int taskID) {
|
||||
this.taskID = taskID;
|
||||
return this;
|
||||
}
|
||||
|
||||
public Builder instantTime(String instantTime) {
|
||||
this.instantTime = instantTime;
|
||||
return this;
|
||||
}
|
||||
|
||||
public Builder writeStatus(List<WriteStatus> writeStatus) {
|
||||
this.writeStatus = writeStatus;
|
||||
return this;
|
||||
}
|
||||
|
||||
public Builder isLastBatch(boolean isLastBatch) {
|
||||
this.isLastBatch = isLastBatch;
|
||||
return this;
|
||||
}
|
||||
|
||||
public Builder isEndInput(boolean isEndInput) {
|
||||
this.isEndInput = isEndInput;
|
||||
return this;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -69,7 +69,7 @@ public class HoodieTableFactory implements TableSourceFactory<RowData>, TableSin
|
||||
conf.setString(FlinkOptions.PARTITION_PATH_FIELD, String.join(",", context.getTable().getPartitionKeys()));
|
||||
TableSchema tableSchema = TableSchemaUtils.getPhysicalSchema(context.getTable().getSchema());
|
||||
inferAvroSchema(conf, tableSchema.toRowDataType().notNull().getLogicalType());
|
||||
return new HoodieTableSink(conf, tableSchema, context.isBounded());
|
||||
return new HoodieTableSink(conf, tableSchema);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
||||
@@ -54,12 +54,10 @@ public class HoodieTableSink implements AppendStreamTableSink<RowData>, Partitio
|
||||
|
||||
private final Configuration conf;
|
||||
private final TableSchema schema;
|
||||
private final boolean isBounded;
|
||||
|
||||
public HoodieTableSink(Configuration conf, TableSchema schema, boolean isBounded) {
|
||||
public HoodieTableSink(Configuration conf, TableSchema schema) {
|
||||
this.conf = conf;
|
||||
this.schema = schema;
|
||||
this.isBounded = isBounded;
|
||||
}
|
||||
|
||||
@Override
|
||||
@@ -67,7 +65,7 @@ public class HoodieTableSink implements AppendStreamTableSink<RowData>, Partitio
|
||||
// Read from kafka source
|
||||
RowType rowType = (RowType) this.schema.toRowDataType().notNull().getLogicalType();
|
||||
int numWriteTasks = this.conf.getInteger(FlinkOptions.WRITE_TASKS);
|
||||
StreamWriteOperatorFactory<HoodieRecord> operatorFactory = new StreamWriteOperatorFactory<>(conf, isBounded);
|
||||
StreamWriteOperatorFactory<HoodieRecord> operatorFactory = new StreamWriteOperatorFactory<>(conf);
|
||||
|
||||
DataStream<Object> pipeline = dataStream
|
||||
.map(new RowDataToHoodieFunction<>(rowType, conf), TypeInformation.of(HoodieRecord.class))
|
||||
|
||||
@@ -58,7 +58,7 @@ public class TestStreamWriteOperatorCoordinator {
|
||||
@BeforeEach
|
||||
public void before() throws Exception {
|
||||
coordinator = new StreamWriteOperatorCoordinator(
|
||||
TestConfigurations.getDefaultConf(tempFile.getAbsolutePath()), 2, false);
|
||||
TestConfigurations.getDefaultConf(tempFile.getAbsolutePath()), 2);
|
||||
coordinator.start();
|
||||
}
|
||||
|
||||
@@ -75,14 +75,22 @@ public class TestStreamWriteOperatorCoordinator {
|
||||
WriteStatus writeStatus = new WriteStatus(true, 0.1D);
|
||||
writeStatus.setPartitionPath("par1");
|
||||
writeStatus.setStat(new HoodieWriteStat());
|
||||
OperatorEvent event0 =
|
||||
new BatchWriteSuccessEvent(0, instant, Collections.singletonList(writeStatus), true);
|
||||
OperatorEvent event0 = BatchWriteSuccessEvent.builder()
|
||||
.taskID(0)
|
||||
.instantTime(instant)
|
||||
.writeStatus(Collections.singletonList(writeStatus))
|
||||
.isLastBatch(true)
|
||||
.build();
|
||||
|
||||
WriteStatus writeStatus1 = new WriteStatus(false, 0.2D);
|
||||
writeStatus1.setPartitionPath("par2");
|
||||
writeStatus1.setStat(new HoodieWriteStat());
|
||||
OperatorEvent event1 =
|
||||
new BatchWriteSuccessEvent(1, instant, Collections.singletonList(writeStatus1), true);
|
||||
OperatorEvent event1 = BatchWriteSuccessEvent.builder()
|
||||
.taskID(1)
|
||||
.instantTime(instant)
|
||||
.writeStatus(Collections.singletonList(writeStatus1))
|
||||
.isLastBatch(true)
|
||||
.build();
|
||||
coordinator.handleEventFromOperator(0, event0);
|
||||
coordinator.handleEventFromOperator(1, event1);
|
||||
|
||||
@@ -115,7 +123,11 @@ public class TestStreamWriteOperatorCoordinator {
|
||||
public void testReceiveInvalidEvent() {
|
||||
CompletableFuture<byte[]> future = new CompletableFuture<>();
|
||||
coordinator.checkpointCoordinator(1, future);
|
||||
OperatorEvent event = new BatchWriteSuccessEvent(0, "abc", Collections.emptyList());
|
||||
OperatorEvent event = BatchWriteSuccessEvent.builder()
|
||||
.taskID(0)
|
||||
.instantTime("abc")
|
||||
.writeStatus(Collections.emptyList())
|
||||
.build();
|
||||
assertThrows(IllegalStateException.class,
|
||||
() -> coordinator.handleEventFromOperator(0, event),
|
||||
"Receive an unexpected event for instant abc from task 0");
|
||||
@@ -126,7 +138,11 @@ public class TestStreamWriteOperatorCoordinator {
|
||||
final CompletableFuture<byte[]> future = new CompletableFuture<>();
|
||||
coordinator.checkpointCoordinator(1, future);
|
||||
String inflightInstant = coordinator.getInstant();
|
||||
OperatorEvent event = new BatchWriteSuccessEvent(0, inflightInstant, Collections.emptyList());
|
||||
OperatorEvent event = BatchWriteSuccessEvent.builder()
|
||||
.taskID(0)
|
||||
.instantTime(inflightInstant)
|
||||
.writeStatus(Collections.emptyList())
|
||||
.build();
|
||||
coordinator.handleEventFromOperator(0, event);
|
||||
assertThrows(HoodieException.class,
|
||||
() -> coordinator.checkpointComplete(1),
|
||||
|
||||
@@ -84,7 +84,7 @@ public class StreamWriteFunctionWrapper<I> {
|
||||
this.gateway = new MockOperatorEventGateway();
|
||||
this.conf = conf;
|
||||
// one function
|
||||
this.coordinator = new StreamWriteOperatorCoordinator(conf, 1, false);
|
||||
this.coordinator = new StreamWriteOperatorCoordinator(conf, 1);
|
||||
this.functionInitializationContext = new MockFunctionInitializationContext();
|
||||
this.compactFunctionWrapper = new CompactFunctionWrapper(this.conf);
|
||||
}
|
||||
|
||||
@@ -36,6 +36,8 @@ import org.apache.flink.util.CollectionUtil;
|
||||
import org.junit.jupiter.api.BeforeEach;
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.junit.jupiter.api.io.TempDir;
|
||||
import org.junit.jupiter.params.ParameterizedTest;
|
||||
import org.junit.jupiter.params.provider.EnumSource;
|
||||
|
||||
import java.io.File;
|
||||
import java.util.Collection;
|
||||
@@ -149,12 +151,14 @@ public class HoodieDataSourceITCase extends AbstractTestBase {
|
||||
assertRowsEquals(rows, TestData.DATA_SET_SOURCE_INSERT);
|
||||
}
|
||||
|
||||
@Test
|
||||
void testBatchWriteAndRead() {
|
||||
@ParameterizedTest
|
||||
@EnumSource(value = ExecMode.class)
|
||||
void testWriteAndRead(ExecMode execMode) {
|
||||
TableEnvironment tableEnv = execMode == ExecMode.BATCH ? batchTableEnv : streamTableEnv;
|
||||
Map<String, String> options = new HashMap<>();
|
||||
options.put(FlinkOptions.PATH.key(), tempFile.getAbsolutePath());
|
||||
String hoodieTableDDL = TestConfigurations.getCreateHoodieTableDDL("t1", options);
|
||||
batchTableEnv.executeSql(hoodieTableDDL);
|
||||
tableEnv.executeSql(hoodieTableDDL);
|
||||
String insertInto = "insert into t1 values\n"
|
||||
+ "('id1','Danny',23,TIMESTAMP '1970-01-01 00:00:01','par1'),\n"
|
||||
+ "('id2','Stephen',33,TIMESTAMP '1970-01-01 00:00:02','par1'),\n"
|
||||
@@ -165,13 +169,20 @@ public class HoodieDataSourceITCase extends AbstractTestBase {
|
||||
+ "('id7','Bob',44,TIMESTAMP '1970-01-01 00:00:07','par4'),\n"
|
||||
+ "('id8','Han',56,TIMESTAMP '1970-01-01 00:00:08','par4')";
|
||||
|
||||
execInsertSql(batchTableEnv, insertInto);
|
||||
execInsertSql(tableEnv, insertInto);
|
||||
|
||||
List<Row> rows = CollectionUtil.iterableToList(
|
||||
() -> batchTableEnv.sqlQuery("select * from t1").execute().collect());
|
||||
() -> tableEnv.sqlQuery("select * from t1").execute().collect());
|
||||
assertRowsEquals(rows, TestData.DATA_SET_SOURCE_INSERT);
|
||||
}
|
||||
|
||||
// -------------------------------------------------------------------------
|
||||
// Utilities
|
||||
// -------------------------------------------------------------------------
|
||||
private enum ExecMode {
|
||||
BATCH, STREAM
|
||||
}
|
||||
|
||||
private void execInsertSql(TableEnvironment tEnv, String insert) {
|
||||
TableResult tableResult = tEnv.executeSql(insert);
|
||||
// wait to finish
|
||||
|
||||
Reference in New Issue
Block a user