[HUDI-3474] Add more document to Pipelines for the usage of this tool to build a write pipeline (#4906)
This commit is contained in:
@@ -58,6 +58,31 @@ import org.apache.flink.table.types.logical.RowType;
|
||||
*/
|
||||
public class Pipelines {
|
||||
|
||||
/**
|
||||
* Bulk insert the input dataset at once.
|
||||
*
|
||||
* <p>By default, the input dataset would shuffle by the partition path first then
|
||||
* sort by the partition path before passing around to the write function.
|
||||
* The whole pipeline looks like the following:
|
||||
*
|
||||
* <pre>
|
||||
* | input1 | ===\ /=== |sorter| === | task1 | (p1, p2)
|
||||
* shuffle
|
||||
* | input2 | ===/ \=== |sorter| === | task2 | (p3, p4)
|
||||
*
|
||||
* Note: Both input1 and input2's dataset come from partitions: p1, p2, p3, p4
|
||||
* </pre>
|
||||
*
|
||||
* <p>The write task switches to new file handle each time it receives a record
|
||||
* from the different partition path, the shuffle and sort would reduce small files.
|
||||
*
|
||||
* <p>The bulk insert should be run in batch execution mode.
|
||||
*
|
||||
* @param conf The configuration
|
||||
* @param rowType The input row type
|
||||
* @param dataStream The input data stream
|
||||
* @return the bulk insert data stream sink
|
||||
*/
|
||||
public static DataStreamSink<Object> bulkInsert(Configuration conf, RowType rowType, DataStream<RowData> dataStream) {
|
||||
WriteOperatorFactory<RowData> operatorFactory = BulkInsertWriteOperator.getFactory(conf, rowType);
|
||||
|
||||
@@ -95,6 +120,27 @@ public class Pipelines {
|
||||
.name("dummy");
|
||||
}
|
||||
|
||||
/**
|
||||
* Insert the dataset with append mode(no upsert or deduplication).
|
||||
*
|
||||
* <p>The input dataset would be rebalanced among the write tasks:
|
||||
*
|
||||
* <pre>
|
||||
* | input1 | ===\ /=== | task1 | (p1, p2, p3, p4)
|
||||
* shuffle
|
||||
* | input2 | ===/ \=== | task2 | (p1, p2, p3, p4)
|
||||
*
|
||||
* Note: Both input1 and input2's dataset come from partitions: p1, p2, p3, p4
|
||||
* </pre>
|
||||
*
|
||||
* <p>The write task switches to new file handle each time it receives a record
|
||||
* from the different partition path, so there may be many small files.
|
||||
*
|
||||
* @param conf The configuration
|
||||
* @param rowType The input row type
|
||||
* @param dataStream The input data stream
|
||||
* @return the appending data stream sink
|
||||
*/
|
||||
public static DataStreamSink<Object> append(Configuration conf, RowType rowType, DataStream<RowData> dataStream) {
|
||||
WriteOperatorFactory<RowData> operatorFactory = AppendWriteOperator.getFactory(conf, rowType);
|
||||
|
||||
@@ -108,6 +154,8 @@ public class Pipelines {
|
||||
|
||||
/**
|
||||
* Constructs bootstrap pipeline as streaming.
|
||||
* The bootstrap operator loads the existing data index (primary key to file id mapping),
|
||||
* then sends the indexing data set to subsequent operator(usually the bucket assign operator).
|
||||
*/
|
||||
public static DataStream<HoodieRecord> bootstrap(
|
||||
Configuration conf,
|
||||
@@ -119,6 +167,8 @@ public class Pipelines {
|
||||
|
||||
/**
|
||||
* Constructs bootstrap pipeline.
|
||||
* The bootstrap operator loads the existing data index (primary key to file id mapping),
|
||||
* then send the indexing data set to subsequent operator(usually the bucket assign operator).
|
||||
*
|
||||
* @param conf The configuration
|
||||
* @param rowType The row type
|
||||
@@ -165,6 +215,11 @@ public class Pipelines {
|
||||
return dataStream1;
|
||||
}
|
||||
|
||||
/**
|
||||
* Constructs bootstrap pipeline for batch execution mode.
|
||||
* The indexing data set is loaded before the actual data write
|
||||
* in order to support batch UPSERT.
|
||||
*/
|
||||
private static DataStream<HoodieRecord> boundedBootstrap(
|
||||
Configuration conf,
|
||||
RowType rowType,
|
||||
@@ -184,10 +239,36 @@ public class Pipelines {
|
||||
.uid("uid_batch_index_bootstrap_" + conf.getString(FlinkOptions.TABLE_NAME));
|
||||
}
|
||||
|
||||
/**
|
||||
* Transforms the row data to hoodie records.
|
||||
*/
|
||||
public static DataStream<HoodieRecord> rowDataToHoodieRecord(Configuration conf, RowType rowType, DataStream<RowData> dataStream) {
|
||||
return dataStream.map(RowDataToHoodieFunctions.create(rowType, conf), TypeInformation.of(HoodieRecord.class));
|
||||
}
|
||||
|
||||
/**
|
||||
* The streaming write pipeline.
|
||||
*
|
||||
* <p>The input dataset shuffles by the primary key first then
|
||||
* shuffles by the file group ID before passing around to the write function.
|
||||
* The whole pipeline looks like the following:
|
||||
*
|
||||
* <pre>
|
||||
* | input1 | ===\ /=== | bucket assigner | ===\ /=== | task1 |
|
||||
* shuffle(by PK) shuffle(by bucket ID)
|
||||
* | input2 | ===/ \=== | bucket assigner | ===/ \=== | task2 |
|
||||
*
|
||||
* Note: a file group must be handled by one write task to avoid write conflict.
|
||||
* </pre>
|
||||
*
|
||||
* <p>The bucket assigner assigns the inputs to suitable file groups, the write task caches
|
||||
* and flushes the data set to disk.
|
||||
*
|
||||
* @param conf The configuration
|
||||
* @param defaultParallelism The default parallelism
|
||||
* @param dataStream The input data stream
|
||||
* @return the stream write data stream pipeline
|
||||
*/
|
||||
public static DataStream<Object> hoodieStreamWrite(Configuration conf, int defaultParallelism, DataStream<HoodieRecord> dataStream) {
|
||||
WriteOperatorFactory<HoodieRecord> operatorFactory = StreamWriteOperator.getFactory(conf);
|
||||
return dataStream
|
||||
@@ -206,6 +287,26 @@ public class Pipelines {
|
||||
.setParallelism(conf.getInteger(FlinkOptions.WRITE_TASKS));
|
||||
}
|
||||
|
||||
/**
|
||||
* The compaction tasks pipeline.
|
||||
*
|
||||
* <p>The compaction plan operator monitors the new compaction plan on the timeline
|
||||
* then distributes the sub-plans to the compaction tasks. The compaction task then
|
||||
* handle over the metadata to commit task for compaction transaction commit.
|
||||
* The whole pipeline looks like the following:
|
||||
*
|
||||
* <pre>
|
||||
* /=== | task1 | ===\
|
||||
* | plan generation | ===> re-balance | commit |
|
||||
* \=== | task2 | ===/
|
||||
*
|
||||
* Note: both the compaction plan generation task and commission task are singleton.
|
||||
* </pre>
|
||||
*
|
||||
* @param conf The configuration
|
||||
* @param dataStream The input data stream
|
||||
* @return the compaction pipeline
|
||||
*/
|
||||
public static DataStreamSink<CompactionCommitEvent> compact(Configuration conf, DataStream<Object> dataStream) {
|
||||
return dataStream.transform("compact_plan_generate",
|
||||
TypeInformation.of(CompactionPlanEvent.class),
|
||||
|
||||
Reference in New Issue
Block a user