1
0

support generan parameter 'sink.parallelism' for flink-hudi (#5405)

Co-authored-by: hehuiyuan1 <hehuiyuan@jd.com>
This commit is contained in:
hehuiyuan
2022-04-24 19:09:39 +08:00
committed by GitHub
parent 5e5c177e4b
commit bda3db078e
2 changed files with 5 additions and 1 deletions

View File

@@ -34,6 +34,7 @@ import org.apache.hudi.keygen.constant.KeyGeneratorType;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ConfigOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.table.factories.FactoryUtil;
import java.lang.reflect.Field;
import java.util.ArrayList;
@@ -232,6 +233,8 @@ public class FlinkOptions extends HoodieConfig {
// ------------------------------------------------------------------------
// Write Options
// ------------------------------------------------------------------------
public static final ConfigOption<Integer> SINK_PARALLELISM = FactoryUtil.SINK_PARALLELISM;
public static final ConfigOption<String> TABLE_NAME = ConfigOptions
.key(HoodieWriteConfig.TBL_NAME.key())
.stringType()

View File

@@ -82,7 +82,8 @@ public class HoodieTableSink implements DynamicTableSink, SupportsPartitioning,
}
// default parallelism
int parallelism = dataStream.getExecutionConfig().getParallelism();
int parallelism = conf.getInteger(FlinkOptions.SINK_PARALLELISM,
dataStream.getExecutionConfig().getParallelism());
DataStream<Object> pipeline;
// bootstrap
final DataStream<HoodieRecord> hoodieRecordDataStream =