From a4ee7463aee936b6160e529814016a7cb2f06694 Mon Sep 17 00:00:00 2001 From: Danny Chan Date: Fri, 25 Feb 2022 19:08:51 +0800 Subject: [PATCH] [HUDI-3474] Add more document to Pipelines for the usage of this tool to build a write pipeline (#4906) --- .../org/apache/hudi/sink/utils/Pipelines.java | 101 ++++++++++++++++++ 1 file changed, 101 insertions(+) diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/utils/Pipelines.java b/hudi-flink/src/main/java/org/apache/hudi/sink/utils/Pipelines.java index f97f79422..527cda02d 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/sink/utils/Pipelines.java +++ b/hudi-flink/src/main/java/org/apache/hudi/sink/utils/Pipelines.java @@ -58,6 +58,31 @@ import org.apache.flink.table.types.logical.RowType; */ public class Pipelines { + /** + * Bulk insert the input dataset at once. + * + *

By default, the input dataset would shuffle by the partition path first then + * sort by the partition path before passing around to the write function. + * The whole pipeline looks like the following: + * + *

+   *      | input1 | ===\     /=== |sorter| === | task1 | (p1, p2)
+   *                   shuffle
+   *      | input2 | ===/     \=== |sorter| === | task2 | (p3, p4)
+   *
+   *      Note: Both input1 and input2's dataset come from partitions: p1, p2, p3, p4
+   * 
+ * + *

The write task switches to new file handle each time it receives a record + * from the different partition path, the shuffle and sort would reduce small files. + * + *

The bulk insert should be run in batch execution mode. + * + * @param conf The configuration + * @param rowType The input row type + * @param dataStream The input data stream + * @return the bulk insert data stream sink + */ public static DataStreamSink bulkInsert(Configuration conf, RowType rowType, DataStream dataStream) { WriteOperatorFactory operatorFactory = BulkInsertWriteOperator.getFactory(conf, rowType); @@ -95,6 +120,27 @@ public class Pipelines { .name("dummy"); } + /** + * Insert the dataset with append mode(no upsert or deduplication). + * + *

The input dataset would be rebalanced among the write tasks: + * + *

+   *      | input1 | ===\     /=== | task1 | (p1, p2, p3, p4)
+   *                   shuffle
+   *      | input2 | ===/     \=== | task2 | (p1, p2, p3, p4)
+   *
+   *      Note: Both input1 and input2's dataset come from partitions: p1, p2, p3, p4
+   * 
+ * + *

The write task switches to new file handle each time it receives a record + * from the different partition path, so there may be many small files. + * + * @param conf The configuration + * @param rowType The input row type + * @param dataStream The input data stream + * @return the appending data stream sink + */ public static DataStreamSink append(Configuration conf, RowType rowType, DataStream dataStream) { WriteOperatorFactory operatorFactory = AppendWriteOperator.getFactory(conf, rowType); @@ -108,6 +154,8 @@ public class Pipelines { /** * Constructs bootstrap pipeline as streaming. + * The bootstrap operator loads the existing data index (primary key to file id mapping), + * then sends the indexing data set to subsequent operator(usually the bucket assign operator). */ public static DataStream bootstrap( Configuration conf, @@ -119,6 +167,8 @@ public class Pipelines { /** * Constructs bootstrap pipeline. + * The bootstrap operator loads the existing data index (primary key to file id mapping), + * then send the indexing data set to subsequent operator(usually the bucket assign operator). * * @param conf The configuration * @param rowType The row type @@ -165,6 +215,11 @@ public class Pipelines { return dataStream1; } + /** + * Constructs bootstrap pipeline for batch execution mode. + * The indexing data set is loaded before the actual data write + * in order to support batch UPSERT. + */ private static DataStream boundedBootstrap( Configuration conf, RowType rowType, @@ -184,10 +239,36 @@ public class Pipelines { .uid("uid_batch_index_bootstrap_" + conf.getString(FlinkOptions.TABLE_NAME)); } + /** + * Transforms the row data to hoodie records. + */ public static DataStream rowDataToHoodieRecord(Configuration conf, RowType rowType, DataStream dataStream) { return dataStream.map(RowDataToHoodieFunctions.create(rowType, conf), TypeInformation.of(HoodieRecord.class)); } + /** + * The streaming write pipeline. + * + *

The input dataset shuffles by the primary key first then + * shuffles by the file group ID before passing around to the write function. + * The whole pipeline looks like the following: + * + *

+   *      | input1 | ===\     /=== | bucket assigner | ===\     /=== | task1 |
+   *                   shuffle(by PK)                    shuffle(by bucket ID)
+   *      | input2 | ===/     \=== | bucket assigner | ===/     \=== | task2 |
+   *
+   *      Note: a file group must be handled by one write task to avoid write conflict.
+   * 
+ * + *

The bucket assigner assigns the inputs to suitable file groups, the write task caches + * and flushes the data set to disk. + * + * @param conf The configuration + * @param defaultParallelism The default parallelism + * @param dataStream The input data stream + * @return the stream write data stream pipeline + */ public static DataStream hoodieStreamWrite(Configuration conf, int defaultParallelism, DataStream dataStream) { WriteOperatorFactory operatorFactory = StreamWriteOperator.getFactory(conf); return dataStream @@ -206,6 +287,26 @@ public class Pipelines { .setParallelism(conf.getInteger(FlinkOptions.WRITE_TASKS)); } + /** + * The compaction tasks pipeline. + * + *

The compaction plan operator monitors the new compaction plan on the timeline + * then distributes the sub-plans to the compaction tasks. The compaction task then + * handle over the metadata to commit task for compaction transaction commit. + * The whole pipeline looks like the following: + * + *

+   *                                           /=== | task1 | ===\
+   *      | plan generation | ===> re-balance                      | commit |
+   *                                           \=== | task2 | ===/
+   *
+   *      Note: both the compaction plan generation task and commission task are singleton.
+   * 
+ * + * @param conf The configuration + * @param dataStream The input data stream + * @return the compaction pipeline + */ public static DataStreamSink compact(Configuration conf, DataStream dataStream) { return dataStream.transform("compact_plan_generate", TypeInformation.of(CompactionPlanEvent.class),