1
0

[HUDI-2485] Consume as mini-batch for flink stream reader (#3710)

This commit is contained in:
Danny Chan
2021-09-24 23:44:01 +08:00
committed by GitHub
parent 440525ccbb
commit 31a301f0aa
5 changed files with 86 additions and 16 deletions

View File

@@ -64,6 +64,8 @@ public class StreamReadOperator extends AbstractStreamOperator<RowData>
private static final Logger LOG = LoggerFactory.getLogger(StreamReadOperator.class); private static final Logger LOG = LoggerFactory.getLogger(StreamReadOperator.class);
private static final int MINI_BATCH_SIZE = 1000;
// It's the same thread that runs this operator and checkpoint actions. Use this executor to schedule only // It's the same thread that runs this operator and checkpoint actions. Use this executor to schedule only
// splits for subsequent reading, so that a new checkpoint could be triggered without blocking a long time // splits for subsequent reading, so that a new checkpoint could be triggered without blocking a long time
// for exhausting all scheduled split reading tasks. // for exhausting all scheduled split reading tasks.
@@ -74,6 +76,7 @@ public class StreamReadOperator extends AbstractStreamOperator<RowData>
private transient SourceFunction.SourceContext<RowData> sourceContext; private transient SourceFunction.SourceContext<RowData> sourceContext;
private transient ListState<MergeOnReadInputSplit> inputSplitsState; private transient ListState<MergeOnReadInputSplit> inputSplitsState;
private transient Queue<MergeOnReadInputSplit> splits; private transient Queue<MergeOnReadInputSplit> splits;
// Splits are read by the same thread that calls #processElement. Each read task is submitted to that thread by adding // Splits are read by the same thread that calls #processElement. Each read task is submitted to that thread by adding
@@ -146,31 +149,56 @@ public class StreamReadOperator extends AbstractStreamOperator<RowData>
} }
private void processSplits() throws IOException { private void processSplits() throws IOException {
MergeOnReadInputSplit split = splits.poll(); MergeOnReadInputSplit split = splits.peek();
if (split == null) { if (split == null) {
currentSplitState = SplitState.IDLE; currentSplitState = SplitState.IDLE;
return; return;
} }
// This log is important to indicate the consuming process, there is only one log message for one data bucket. // 1. open a fresh new input split and start reading as mini-batch
LOG.info("Processing input split : {}", split); // 2. if the input split has remaining records to read, switches to another runnable to handle
// 3. if the input split reads to the end, close the format and remove the split from the queue #splits
try { // 4. for each runnable, reads at most #MINI_BATCH_SIZE number of records
if (format.isClosed()) {
// This log is important to indicate the consuming process,
// there is only one log message for one data bucket.
LOG.info("Processing input split : {}", split);
format.open(split); format.open(split);
RowData nextElement = null; }
while (!format.reachedEnd()) { try {
nextElement = format.nextRecord(nextElement); consumeAsMiniBatch(split);
sourceContext.collect(nextElement);
}
} finally { } finally {
currentSplitState = SplitState.IDLE; currentSplitState = SplitState.IDLE;
format.close();
} }
// Re-schedule to process the next split. // Re-schedule to process the next split.
enqueueProcessSplits(); enqueueProcessSplits();
} }
/**
* Consumes at most {@link #MINI_BATCH_SIZE} number of records
* for the given input split {@code split}.
*
* <p>Note: close the input format and remove the input split for the queue {@link #splits}
* if the split reads to the end.
*
* @param split The input split
*/
private void consumeAsMiniBatch(MergeOnReadInputSplit split) throws IOException {
for (int i = 0; i < MINI_BATCH_SIZE; i++) {
if (!format.reachedEnd()) {
sourceContext.collect(format.nextRecord(null));
split.consume();
} else {
// close the input format
format.close();
// remove the split
splits.poll();
break;
}
}
}
@Override @Override
public void processWatermark(Watermark mark) { public void processWatermark(Watermark mark) {
// we do nothing because we emit our own watermarks if needed. // we do nothing because we emit our own watermarks if needed.

View File

@@ -180,11 +180,9 @@ public class HoodieTableSource implements
conf, FilePathUtils.toFlinkPath(path), maxCompactionMemoryInBytes, getRequiredPartitionPaths()); conf, FilePathUtils.toFlinkPath(path), maxCompactionMemoryInBytes, getRequiredPartitionPaths());
InputFormat<RowData, ?> inputFormat = getInputFormat(true); InputFormat<RowData, ?> inputFormat = getInputFormat(true);
OneInputStreamOperatorFactory<MergeOnReadInputSplit, RowData> factory = StreamReadOperator.factory((MergeOnReadInputFormat) inputFormat); OneInputStreamOperatorFactory<MergeOnReadInputSplit, RowData> factory = StreamReadOperator.factory((MergeOnReadInputFormat) inputFormat);
SingleOutputStreamOperator<RowData> source = execEnv.addSource(monitoringFunction, "streaming_source") SingleOutputStreamOperator<RowData> source = execEnv.addSource(monitoringFunction, "split_monitor")
.uid("uid_streaming_source_" + conf.getString(FlinkOptions.TABLE_NAME))
.setParallelism(1) .setParallelism(1)
.transform("split_reader", typeInfo, factory) .transform("split_reader", typeInfo, factory)
.uid("uid_split_reader_" + conf.getString(FlinkOptions.TABLE_NAME))
.setParallelism(conf.getInteger(FlinkOptions.READ_TASKS)); .setParallelism(conf.getInteger(FlinkOptions.READ_TASKS));
return new DataStreamSource<>(source); return new DataStreamSource<>(source);
} else { } else {

View File

@@ -129,6 +129,11 @@ public class MergeOnReadInputFormat
*/ */
private boolean emitDelete; private boolean emitDelete;
/**
* Flag saying whether the input format has been closed.
*/
private boolean closed = true;
private MergeOnReadInputFormat( private MergeOnReadInputFormat(
Configuration conf, Configuration conf,
MergeOnReadTableState tableState, MergeOnReadTableState tableState,
@@ -158,6 +163,7 @@ public class MergeOnReadInputFormat
@Override @Override
public void open(MergeOnReadInputSplit split) throws IOException { public void open(MergeOnReadInputSplit split) throws IOException {
this.currentReadCount = 0L; this.currentReadCount = 0L;
this.closed = false;
this.hadoopConf = StreamerUtil.getHadoopConf(); this.hadoopConf = StreamerUtil.getHadoopConf();
if (!(split.getLogPaths().isPresent() && split.getLogPaths().get().size() > 0)) { if (!(split.getLogPaths().isPresent() && split.getLogPaths().get().size() > 0)) {
if (split.getInstantRange() != null) { if (split.getInstantRange() != null) {
@@ -203,6 +209,7 @@ public class MergeOnReadInputFormat
+ "spark partition Index: " + split.getSplitNumber() + "spark partition Index: " + split.getSplitNumber()
+ "merge type: " + split.getMergeType()); + "merge type: " + split.getMergeType());
} }
mayShiftInputSplit(split);
} }
@Override @Override
@@ -249,12 +256,32 @@ public class MergeOnReadInputFormat
this.iterator.close(); this.iterator.close();
} }
this.iterator = null; this.iterator = null;
this.closed = true;
}
public boolean isClosed() {
return this.closed;
} }
// ------------------------------------------------------------------------- // -------------------------------------------------------------------------
// Utilities // Utilities
// ------------------------------------------------------------------------- // -------------------------------------------------------------------------
/**
* Shifts the input split by its consumed records number.
*
* <p>Note: This action is time-consuming.
*/
private void mayShiftInputSplit(MergeOnReadInputSplit split) throws IOException {
if (split.isConsumed()) {
// if the input split has been consumed before,
// shift the input split with consumed num of records first
for (long i = 0; i < split.getConsumed() && !reachedEnd(); i++) {
nextRecord(null);
}
}
}
private ParquetColumnarRowSplitReader getFullSchemaReader(String path) throws IOException { private ParquetColumnarRowSplitReader getFullSchemaReader(String path) throws IOException {
return getReader(path, IntStream.range(0, this.tableState.getRowType().getFieldCount()).toArray()); return getReader(path, IntStream.range(0, this.tableState.getRowType().getFieldCount()).toArray());
} }

View File

@@ -33,6 +33,8 @@ import java.util.List;
public class MergeOnReadInputSplit implements InputSplit { public class MergeOnReadInputSplit implements InputSplit {
private static final long serialVersionUID = 1L; private static final long serialVersionUID = 1L;
private static final long NUM_NO_CONSUMPTION = 0L;
private final int splitNum; private final int splitNum;
private final Option<String> basePath; private final Option<String> basePath;
private final Option<List<String>> logPaths; private final Option<List<String>> logPaths;
@@ -42,6 +44,10 @@ public class MergeOnReadInputSplit implements InputSplit {
private final String mergeType; private final String mergeType;
private final Option<InstantRange> instantRange; private final Option<InstantRange> instantRange;
// for streaming reader to record the consumed offset,
// which is the start of next round reading.
private long consumed = NUM_NO_CONSUMPTION;
public MergeOnReadInputSplit( public MergeOnReadInputSplit(
int splitNum, int splitNum,
@Nullable String basePath, @Nullable String basePath,
@@ -94,6 +100,18 @@ public class MergeOnReadInputSplit implements InputSplit {
return this.splitNum; return this.splitNum;
} }
public void consume() {
this.consumed += 1L;
}
public long getConsumed() {
return consumed;
}
public boolean isConsumed() {
return this.consumed != NUM_NO_CONSUMPTION;
}
@Override @Override
public String toString() { public String toString() {
return "MergeOnReadInputSplit{" return "MergeOnReadInputSplit{"
@@ -107,5 +125,4 @@ public class MergeOnReadInputSplit implements InputSplit {
+ ", instantRange=" + instantRange + ", instantRange=" + instantRange
+ '}'; + '}';
} }
} }

View File

@@ -128,7 +128,7 @@ public class HoodieDataSourceITCase extends AbstractTestBase {
.setBoolean("table.dynamic-table-options.enabled", true); .setBoolean("table.dynamic-table-options.enabled", true);
// specify the start commit as earliest // specify the start commit as earliest
List<Row> rows3 = execSelectSql(streamTableEnv, List<Row> rows3 = execSelectSql(streamTableEnv,
"select * from t1/*+options('read.streaming.start-commit'='earliest')*/", 10); "select * from t1/*+options('read.start-commit'='earliest')*/", 10);
assertRowsEquals(rows3, TestData.DATA_SET_SOURCE_INSERT); assertRowsEquals(rows3, TestData.DATA_SET_SOURCE_INSERT);
} }