diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/utils/Pipelines.java b/hudi-flink/src/main/java/org/apache/hudi/sink/utils/Pipelines.java
index f97f79422..527cda02d 100644
--- a/hudi-flink/src/main/java/org/apache/hudi/sink/utils/Pipelines.java
+++ b/hudi-flink/src/main/java/org/apache/hudi/sink/utils/Pipelines.java
@@ -58,6 +58,31 @@ import org.apache.flink.table.types.logical.RowType;
*/
public class Pipelines {
+ /**
+ * Bulk insert the input dataset at once.
+ *
+ *
By default, the input dataset would shuffle by the partition path first then
+ * sort by the partition path before passing around to the write function.
+ * The whole pipeline looks like the following:
+ *
+ *
+ * | input1 | ===\ /=== |sorter| === | task1 | (p1, p2)
+ * shuffle
+ * | input2 | ===/ \=== |sorter| === | task2 | (p3, p4)
+ *
+ * Note: Both input1 and input2's dataset come from partitions: p1, p2, p3, p4
+ *
+ *
+ * The write task switches to new file handle each time it receives a record
+ * from the different partition path, the shuffle and sort would reduce small files.
+ *
+ *
The bulk insert should be run in batch execution mode.
+ *
+ * @param conf The configuration
+ * @param rowType The input row type
+ * @param dataStream The input data stream
+ * @return the bulk insert data stream sink
+ */
public static DataStreamSink bulkInsert(Configuration conf, RowType rowType, DataStream dataStream) {
WriteOperatorFactory operatorFactory = BulkInsertWriteOperator.getFactory(conf, rowType);
@@ -95,6 +120,27 @@ public class Pipelines {
.name("dummy");
}
+ /**
+ * Insert the dataset with append mode(no upsert or deduplication).
+ *
+ * The input dataset would be rebalanced among the write tasks:
+ *
+ *
+ * | input1 | ===\ /=== | task1 | (p1, p2, p3, p4)
+ * shuffle
+ * | input2 | ===/ \=== | task2 | (p1, p2, p3, p4)
+ *
+ * Note: Both input1 and input2's dataset come from partitions: p1, p2, p3, p4
+ *
+ *
+ * The write task switches to new file handle each time it receives a record
+ * from the different partition path, so there may be many small files.
+ *
+ * @param conf The configuration
+ * @param rowType The input row type
+ * @param dataStream The input data stream
+ * @return the appending data stream sink
+ */
public static DataStreamSink append(Configuration conf, RowType rowType, DataStream dataStream) {
WriteOperatorFactory operatorFactory = AppendWriteOperator.getFactory(conf, rowType);
@@ -108,6 +154,8 @@ public class Pipelines {
/**
* Constructs bootstrap pipeline as streaming.
+ * The bootstrap operator loads the existing data index (primary key to file id mapping),
+ * then sends the indexing data set to subsequent operator(usually the bucket assign operator).
*/
public static DataStream bootstrap(
Configuration conf,
@@ -119,6 +167,8 @@ public class Pipelines {
/**
* Constructs bootstrap pipeline.
+ * The bootstrap operator loads the existing data index (primary key to file id mapping),
+ * then send the indexing data set to subsequent operator(usually the bucket assign operator).
*
* @param conf The configuration
* @param rowType The row type
@@ -165,6 +215,11 @@ public class Pipelines {
return dataStream1;
}
+ /**
+ * Constructs bootstrap pipeline for batch execution mode.
+ * The indexing data set is loaded before the actual data write
+ * in order to support batch UPSERT.
+ */
private static DataStream boundedBootstrap(
Configuration conf,
RowType rowType,
@@ -184,10 +239,36 @@ public class Pipelines {
.uid("uid_batch_index_bootstrap_" + conf.getString(FlinkOptions.TABLE_NAME));
}
+ /**
+ * Transforms the row data to hoodie records.
+ */
public static DataStream rowDataToHoodieRecord(Configuration conf, RowType rowType, DataStream dataStream) {
return dataStream.map(RowDataToHoodieFunctions.create(rowType, conf), TypeInformation.of(HoodieRecord.class));
}
+ /**
+ * The streaming write pipeline.
+ *
+ * The input dataset shuffles by the primary key first then
+ * shuffles by the file group ID before passing around to the write function.
+ * The whole pipeline looks like the following:
+ *
+ *
+ * | input1 | ===\ /=== | bucket assigner | ===\ /=== | task1 |
+ * shuffle(by PK) shuffle(by bucket ID)
+ * | input2 | ===/ \=== | bucket assigner | ===/ \=== | task2 |
+ *
+ * Note: a file group must be handled by one write task to avoid write conflict.
+ *
+ *
+ * The bucket assigner assigns the inputs to suitable file groups, the write task caches
+ * and flushes the data set to disk.
+ *
+ * @param conf The configuration
+ * @param defaultParallelism The default parallelism
+ * @param dataStream The input data stream
+ * @return the stream write data stream pipeline
+ */
public static DataStream hoodieStreamWrite(Configuration conf, int defaultParallelism, DataStream dataStream) {
WriteOperatorFactory operatorFactory = StreamWriteOperator.getFactory(conf);
return dataStream
@@ -206,6 +287,26 @@ public class Pipelines {
.setParallelism(conf.getInteger(FlinkOptions.WRITE_TASKS));
}
+ /**
+ * The compaction tasks pipeline.
+ *
+ * The compaction plan operator monitors the new compaction plan on the timeline
+ * then distributes the sub-plans to the compaction tasks. The compaction task then
+ * handle over the metadata to commit task for compaction transaction commit.
+ * The whole pipeline looks like the following:
+ *
+ *
+ * /=== | task1 | ===\
+ * | plan generation | ===> re-balance | commit |
+ * \=== | task2 | ===/
+ *
+ * Note: both the compaction plan generation task and commission task are singleton.
+ *
+ *
+ * @param conf The configuration
+ * @param dataStream The input data stream
+ * @return the compaction pipeline
+ */
public static DataStreamSink compact(Configuration conf, DataStream dataStream) {
return dataStream.transform("compact_plan_generate",
TypeInformation.of(CompactionPlanEvent.class),