diff --git a/hudi-flink/src/main/java/org/apache/hudi/source/StreamReadOperator.java b/hudi-flink/src/main/java/org/apache/hudi/source/StreamReadOperator.java index e2f5f7b95..013043384 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/source/StreamReadOperator.java +++ b/hudi-flink/src/main/java/org/apache/hudi/source/StreamReadOperator.java @@ -64,6 +64,8 @@ public class StreamReadOperator extends AbstractStreamOperator private static final Logger LOG = LoggerFactory.getLogger(StreamReadOperator.class); + private static final int MINI_BATCH_SIZE = 1000; + // It's the same thread that runs this operator and checkpoint actions. Use this executor to schedule only // splits for subsequent reading, so that a new checkpoint could be triggered without blocking a long time // for exhausting all scheduled split reading tasks. @@ -74,6 +76,7 @@ public class StreamReadOperator extends AbstractStreamOperator private transient SourceFunction.SourceContext sourceContext; private transient ListState inputSplitsState; + private transient Queue splits; // Splits are read by the same thread that calls #processElement. Each read task is submitted to that thread by adding @@ -146,31 +149,56 @@ public class StreamReadOperator extends AbstractStreamOperator } private void processSplits() throws IOException { - MergeOnReadInputSplit split = splits.poll(); + MergeOnReadInputSplit split = splits.peek(); if (split == null) { currentSplitState = SplitState.IDLE; return; } - // This log is important to indicate the consuming process, there is only one log message for one data bucket. - LOG.info("Processing input split : {}", split); - - try { + // 1. open a fresh new input split and start reading as mini-batch + // 2. if the input split has remaining records to read, switches to another runnable to handle + // 3. if the input split reads to the end, close the format and remove the split from the queue #splits + // 4. for each runnable, reads at most #MINI_BATCH_SIZE number of records + if (format.isClosed()) { + // This log is important to indicate the consuming process, + // there is only one log message for one data bucket. + LOG.info("Processing input split : {}", split); format.open(split); - RowData nextElement = null; - while (!format.reachedEnd()) { - nextElement = format.nextRecord(nextElement); - sourceContext.collect(nextElement); - } + } + try { + consumeAsMiniBatch(split); } finally { currentSplitState = SplitState.IDLE; - format.close(); } // Re-schedule to process the next split. enqueueProcessSplits(); } + /** + * Consumes at most {@link #MINI_BATCH_SIZE} number of records + * for the given input split {@code split}. + * + *

Note: close the input format and remove the input split for the queue {@link #splits} + * if the split reads to the end. + * + * @param split The input split + */ + private void consumeAsMiniBatch(MergeOnReadInputSplit split) throws IOException { + for (int i = 0; i < MINI_BATCH_SIZE; i++) { + if (!format.reachedEnd()) { + sourceContext.collect(format.nextRecord(null)); + split.consume(); + } else { + // close the input format + format.close(); + // remove the split + splits.poll(); + break; + } + } + } + @Override public void processWatermark(Watermark mark) { // we do nothing because we emit our own watermarks if needed. diff --git a/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java b/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java index 6ef608bc7..4e193fab2 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java +++ b/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java @@ -180,11 +180,9 @@ public class HoodieTableSource implements conf, FilePathUtils.toFlinkPath(path), maxCompactionMemoryInBytes, getRequiredPartitionPaths()); InputFormat inputFormat = getInputFormat(true); OneInputStreamOperatorFactory factory = StreamReadOperator.factory((MergeOnReadInputFormat) inputFormat); - SingleOutputStreamOperator source = execEnv.addSource(monitoringFunction, "streaming_source") - .uid("uid_streaming_source_" + conf.getString(FlinkOptions.TABLE_NAME)) + SingleOutputStreamOperator source = execEnv.addSource(monitoringFunction, "split_monitor") .setParallelism(1) .transform("split_reader", typeInfo, factory) - .uid("uid_split_reader_" + conf.getString(FlinkOptions.TABLE_NAME)) .setParallelism(conf.getInteger(FlinkOptions.READ_TASKS)); return new DataStreamSource<>(source); } else { diff --git a/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputFormat.java b/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputFormat.java index 4cd45a81d..566d4d318 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputFormat.java +++ b/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputFormat.java @@ -129,6 +129,11 @@ public class MergeOnReadInputFormat */ private boolean emitDelete; + /** + * Flag saying whether the input format has been closed. + */ + private boolean closed = true; + private MergeOnReadInputFormat( Configuration conf, MergeOnReadTableState tableState, @@ -158,6 +163,7 @@ public class MergeOnReadInputFormat @Override public void open(MergeOnReadInputSplit split) throws IOException { this.currentReadCount = 0L; + this.closed = false; this.hadoopConf = StreamerUtil.getHadoopConf(); if (!(split.getLogPaths().isPresent() && split.getLogPaths().get().size() > 0)) { if (split.getInstantRange() != null) { @@ -203,6 +209,7 @@ public class MergeOnReadInputFormat + "spark partition Index: " + split.getSplitNumber() + "merge type: " + split.getMergeType()); } + mayShiftInputSplit(split); } @Override @@ -249,12 +256,32 @@ public class MergeOnReadInputFormat this.iterator.close(); } this.iterator = null; + this.closed = true; + } + + public boolean isClosed() { + return this.closed; } // ------------------------------------------------------------------------- // Utilities // ------------------------------------------------------------------------- + /** + * Shifts the input split by its consumed records number. + * + *

Note: This action is time-consuming. + */ + private void mayShiftInputSplit(MergeOnReadInputSplit split) throws IOException { + if (split.isConsumed()) { + // if the input split has been consumed before, + // shift the input split with consumed num of records first + for (long i = 0; i < split.getConsumed() && !reachedEnd(); i++) { + nextRecord(null); + } + } + } + private ParquetColumnarRowSplitReader getFullSchemaReader(String path) throws IOException { return getReader(path, IntStream.range(0, this.tableState.getRowType().getFieldCount()).toArray()); } diff --git a/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputSplit.java b/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputSplit.java index 0c93eeac2..156622c30 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputSplit.java +++ b/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputSplit.java @@ -33,6 +33,8 @@ import java.util.List; public class MergeOnReadInputSplit implements InputSplit { private static final long serialVersionUID = 1L; + private static final long NUM_NO_CONSUMPTION = 0L; + private final int splitNum; private final Option basePath; private final Option> logPaths; @@ -42,6 +44,10 @@ public class MergeOnReadInputSplit implements InputSplit { private final String mergeType; private final Option instantRange; + // for streaming reader to record the consumed offset, + // which is the start of next round reading. + private long consumed = NUM_NO_CONSUMPTION; + public MergeOnReadInputSplit( int splitNum, @Nullable String basePath, @@ -94,6 +100,18 @@ public class MergeOnReadInputSplit implements InputSplit { return this.splitNum; } + public void consume() { + this.consumed += 1L; + } + + public long getConsumed() { + return consumed; + } + + public boolean isConsumed() { + return this.consumed != NUM_NO_CONSUMPTION; + } + @Override public String toString() { return "MergeOnReadInputSplit{" @@ -107,5 +125,4 @@ public class MergeOnReadInputSplit implements InputSplit { + ", instantRange=" + instantRange + '}'; } - } diff --git a/hudi-flink/src/test/java/org/apache/hudi/table/HoodieDataSourceITCase.java b/hudi-flink/src/test/java/org/apache/hudi/table/HoodieDataSourceITCase.java index db7111b1f..a5812aa58 100644 --- a/hudi-flink/src/test/java/org/apache/hudi/table/HoodieDataSourceITCase.java +++ b/hudi-flink/src/test/java/org/apache/hudi/table/HoodieDataSourceITCase.java @@ -128,7 +128,7 @@ public class HoodieDataSourceITCase extends AbstractTestBase { .setBoolean("table.dynamic-table-options.enabled", true); // specify the start commit as earliest List rows3 = execSelectSql(streamTableEnv, - "select * from t1/*+options('read.streaming.start-commit'='earliest')*/", 10); + "select * from t1/*+options('read.start-commit'='earliest')*/", 10); assertRowsEquals(rows3, TestData.DATA_SET_SOURCE_INSERT); }