[HUDI-1495] Bump Flink version to 1.12.2 (#2718)
This commit is contained in:
@@ -21,6 +21,7 @@ package org.apache.hudi.configuration;
|
||||
import org.apache.hudi.common.model.HoodieTableType;
|
||||
import org.apache.hudi.common.model.OverwriteWithLatestAvroPayload;
|
||||
import org.apache.hudi.config.HoodieWriteConfig;
|
||||
import org.apache.hudi.exception.HoodieException;
|
||||
import org.apache.hudi.keygen.SimpleAvroKeyGenerator;
|
||||
import org.apache.hudi.keygen.constant.KeyGeneratorOptions;
|
||||
import org.apache.hudi.streamer.FlinkStreamerConfig;
|
||||
@@ -30,10 +31,13 @@ import org.apache.flink.configuration.ConfigOption;
|
||||
import org.apache.flink.configuration.ConfigOptions;
|
||||
import org.apache.flink.configuration.Configuration;
|
||||
|
||||
import java.util.Arrays;
|
||||
import java.lang.reflect.Field;
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
|
||||
/**
|
||||
* Hoodie Flink config options.
|
||||
@@ -287,12 +291,6 @@ public class FlinkOptions {
|
||||
// Utilities
|
||||
// -------------------------------------------------------------------------
|
||||
|
||||
// Remember to update the set when adding new options.
|
||||
public static final List<ConfigOption<?>> OPTIONAL_OPTIONS = Arrays.asList(
|
||||
TABLE_TYPE, OPERATION, PRECOMBINE_FIELD, PAYLOAD_CLASS, INSERT_DROP_DUPS, RETRY_TIMES,
|
||||
RETRY_INTERVAL_MS, IGNORE_FAILED, RECORD_KEY_FIELD, PARTITION_PATH_FIELD, KEYGEN_CLASS
|
||||
);
|
||||
|
||||
// Prefix for Hoodie specific properties.
|
||||
private static final String PROPERTIES_PREFIX = "properties.";
|
||||
|
||||
@@ -385,4 +383,32 @@ public class FlinkOptions {
|
||||
return !conf.getOptional(option).isPresent()
|
||||
|| conf.get(option).equals(option.defaultValue());
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns all the optional config options.
|
||||
*/
|
||||
public static Set<ConfigOption<?>> optionalOptions() {
|
||||
Set<ConfigOption<?>> options = new HashSet<>(allOptions());
|
||||
options.remove(PATH);
|
||||
return options;
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns all the config options.
|
||||
*/
|
||||
public static List<ConfigOption<?>> allOptions() {
|
||||
Field[] declaredFields = FlinkOptions.class.getDeclaredFields();
|
||||
List<ConfigOption<?>> options = new ArrayList<>();
|
||||
for (Field field : declaredFields) {
|
||||
if (java.lang.reflect.Modifier.isStatic(field.getModifiers())
|
||||
&& field.getType().equals(ConfigOption.class)) {
|
||||
try {
|
||||
options.add((ConfigOption<?>) field.get(ConfigOption.class));
|
||||
} catch (IllegalAccessException e) {
|
||||
throw new HoodieException("Error while fetching static config option", e);
|
||||
}
|
||||
}
|
||||
}
|
||||
return options;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -157,7 +157,7 @@ public class StreamWriteOperatorCoordinator
|
||||
}
|
||||
|
||||
@Override
|
||||
public void checkpointComplete(long checkpointId) {
|
||||
public void notifyCheckpointComplete(long checkpointId) {
|
||||
// start to commit the instant.
|
||||
checkAndCommitWithRetry();
|
||||
// if async compaction is on, schedule the compaction
|
||||
@@ -182,7 +182,7 @@ public class StreamWriteOperatorCoordinator
|
||||
}
|
||||
|
||||
@Override
|
||||
public void resetToCheckpoint(@Nullable byte[] checkpointData) throws Exception {
|
||||
public void resetToCheckpoint(long checkpointID, @Nullable byte[] checkpointData) throws Exception {
|
||||
if (checkpointData != null) {
|
||||
// restore when any checkpoint completed
|
||||
deserializeCheckpointAndRestore(checkpointData);
|
||||
@@ -215,6 +215,11 @@ public class StreamWriteOperatorCoordinator
|
||||
// no operation
|
||||
}
|
||||
|
||||
@Override
|
||||
public void subtaskReset(int i, long l) {
|
||||
// no operation
|
||||
}
|
||||
|
||||
// -------------------------------------------------------------------------
|
||||
// Utilities
|
||||
// -------------------------------------------------------------------------
|
||||
|
||||
@@ -36,7 +36,7 @@ import org.apache.flink.streaming.api.datastream.DataStream;
|
||||
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
|
||||
import org.apache.flink.streaming.api.operators.KeyedProcessOperator;
|
||||
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
|
||||
import org.apache.flink.table.runtime.typeutils.RowDataTypeInfo;
|
||||
import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
|
||||
import org.apache.flink.table.types.logical.RowType;
|
||||
|
||||
import java.util.Properties;
|
||||
@@ -80,7 +80,7 @@ public class HoodieFlinkStreamerV2 {
|
||||
cfg.kafkaTopic,
|
||||
new JsonRowDataDeserializationSchema(
|
||||
rowType,
|
||||
new RowDataTypeInfo(rowType),
|
||||
InternalTypeInfo.of(rowType),
|
||||
false,
|
||||
true,
|
||||
TimestampFormat.ISO_8601
|
||||
|
||||
@@ -19,21 +19,20 @@
|
||||
package org.apache.hudi.table;
|
||||
|
||||
import org.apache.hudi.configuration.FlinkOptions;
|
||||
import org.apache.hudi.exception.HoodieException;
|
||||
import org.apache.hudi.keygen.ComplexAvroKeyGenerator;
|
||||
import org.apache.hudi.util.AvroSchemaConverter;
|
||||
|
||||
import org.apache.flink.configuration.ConfigOption;
|
||||
import org.apache.flink.configuration.Configuration;
|
||||
import org.apache.flink.table.api.TableSchema;
|
||||
import org.apache.flink.table.api.ValidationException;
|
||||
import org.apache.flink.table.api.constraints.UniqueConstraint;
|
||||
import org.apache.flink.table.catalog.CatalogTable;
|
||||
import org.apache.flink.table.data.RowData;
|
||||
import org.apache.flink.table.connector.sink.DynamicTableSink;
|
||||
import org.apache.flink.table.connector.source.DynamicTableSource;
|
||||
import org.apache.flink.table.factories.DynamicTableSinkFactory;
|
||||
import org.apache.flink.table.factories.DynamicTableSourceFactory;
|
||||
import org.apache.flink.table.factories.FactoryUtil;
|
||||
import org.apache.flink.table.factories.TableSinkFactory;
|
||||
import org.apache.flink.table.factories.TableSourceFactory;
|
||||
import org.apache.flink.table.sinks.TableSink;
|
||||
import org.apache.flink.table.sources.TableSource;
|
||||
import org.apache.flink.table.types.logical.LogicalType;
|
||||
import org.apache.flink.table.utils.TableSchemaUtils;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
@@ -41,62 +40,57 @@ import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
|
||||
/**
|
||||
* Hoodie data source/sink factory.
|
||||
*/
|
||||
public class HoodieTableFactory implements TableSourceFactory<RowData>, TableSinkFactory<RowData> {
|
||||
public class HoodieTableFactory implements DynamicTableSourceFactory, DynamicTableSinkFactory {
|
||||
private static final Logger LOG = LoggerFactory.getLogger(HoodieTableFactory.class);
|
||||
|
||||
public static final String FACTORY_ID = "hudi";
|
||||
|
||||
@Override
|
||||
public TableSource<RowData> createTableSource(TableSourceFactory.Context context) {
|
||||
Configuration conf = FlinkOptions.fromMap(context.getTable().getOptions());
|
||||
TableSchema schema = TableSchemaUtils.getPhysicalSchema(context.getTable().getSchema());
|
||||
setupConfOptions(conf, context.getObjectIdentifier().getObjectName(), context.getTable(), schema);
|
||||
// enclosing the code within a try catch block so that we can log the error message.
|
||||
// Flink 1.11 did a bad compatibility for the old table factory, it uses the old factory
|
||||
// to create the source/sink and catches all the exceptions then tries the new factory.
|
||||
//
|
||||
// log the error message first so that there is a chance to show the real failure cause.
|
||||
try {
|
||||
Path path = new Path(conf.getOptional(FlinkOptions.PATH).orElseThrow(() ->
|
||||
new ValidationException("Option [path] should not be empty.")));
|
||||
return new HoodieTableSource(
|
||||
schema,
|
||||
path,
|
||||
context.getTable().getPartitionKeys(),
|
||||
conf.getString(FlinkOptions.PARTITION_DEFAULT_NAME),
|
||||
conf);
|
||||
} catch (Throwable throwable) {
|
||||
LOG.error("Create table source error", throwable);
|
||||
throw new HoodieException(throwable);
|
||||
}
|
||||
public DynamicTableSource createDynamicTableSource(Context context) {
|
||||
FactoryUtil.TableFactoryHelper helper = FactoryUtil.createTableFactoryHelper(this, context);
|
||||
helper.validate();
|
||||
|
||||
Configuration conf = (Configuration) helper.getOptions();
|
||||
TableSchema schema = TableSchemaUtils.getPhysicalSchema(context.getCatalogTable().getSchema());
|
||||
setupConfOptions(conf, context.getObjectIdentifier().getObjectName(), context.getCatalogTable(), schema);
|
||||
|
||||
Path path = new Path(conf.getOptional(FlinkOptions.PATH).orElseThrow(() ->
|
||||
new ValidationException("Option [path] should not be empty.")));
|
||||
return new HoodieTableSource(
|
||||
schema,
|
||||
path,
|
||||
context.getCatalogTable().getPartitionKeys(),
|
||||
conf.getString(FlinkOptions.PARTITION_DEFAULT_NAME),
|
||||
conf);
|
||||
}
|
||||
|
||||
@Override
|
||||
public TableSink<RowData> createTableSink(TableSinkFactory.Context context) {
|
||||
Configuration conf = FlinkOptions.fromMap(context.getTable().getOptions());
|
||||
TableSchema schema = TableSchemaUtils.getPhysicalSchema(context.getTable().getSchema());
|
||||
setupConfOptions(conf, context.getObjectIdentifier().getObjectName(), context.getTable(), schema);
|
||||
public DynamicTableSink createDynamicTableSink(Context context) {
|
||||
Configuration conf = FlinkOptions.fromMap(context.getCatalogTable().getOptions());
|
||||
TableSchema schema = TableSchemaUtils.getPhysicalSchema(context.getCatalogTable().getSchema());
|
||||
setupConfOptions(conf, context.getObjectIdentifier().getObjectName(), context.getCatalogTable(), schema);
|
||||
return new HoodieTableSink(conf, schema);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Map<String, String> requiredContext() {
|
||||
Map<String, String> context = new HashMap<>();
|
||||
context.put(FactoryUtil.CONNECTOR.key(), FACTORY_ID);
|
||||
return context;
|
||||
public String factoryIdentifier() {
|
||||
return FACTORY_ID;
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<String> supportedProperties() {
|
||||
// contains format properties.
|
||||
return Collections.singletonList("*");
|
||||
public Set<ConfigOption<?>> requiredOptions() {
|
||||
return Collections.singleton(FlinkOptions.PATH);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Set<ConfigOption<?>> optionalOptions() {
|
||||
return FlinkOptions.optionalOptions();
|
||||
}
|
||||
|
||||
// -------------------------------------------------------------------------
|
||||
|
||||
@@ -34,23 +34,22 @@ import org.apache.flink.annotation.VisibleForTesting;
|
||||
import org.apache.flink.api.common.typeinfo.TypeInformation;
|
||||
import org.apache.flink.configuration.Configuration;
|
||||
import org.apache.flink.streaming.api.datastream.DataStream;
|
||||
import org.apache.flink.streaming.api.datastream.DataStreamSink;
|
||||
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
|
||||
import org.apache.flink.streaming.api.operators.KeyedProcessOperator;
|
||||
import org.apache.flink.table.api.TableSchema;
|
||||
import org.apache.flink.table.data.RowData;
|
||||
import org.apache.flink.table.sinks.AppendStreamTableSink;
|
||||
import org.apache.flink.table.sinks.PartitionableTableSink;
|
||||
import org.apache.flink.table.sinks.TableSink;
|
||||
import org.apache.flink.table.types.DataType;
|
||||
import org.apache.flink.table.connector.ChangelogMode;
|
||||
import org.apache.flink.table.connector.sink.DataStreamSinkProvider;
|
||||
import org.apache.flink.table.connector.sink.DynamicTableSink;
|
||||
import org.apache.flink.table.connector.sink.abilities.SupportsPartitioning;
|
||||
import org.apache.flink.table.types.logical.RowType;
|
||||
import org.apache.flink.types.RowKind;
|
||||
|
||||
import java.util.Map;
|
||||
|
||||
/**
|
||||
* Hoodie table sink.
|
||||
*/
|
||||
public class HoodieTableSink implements AppendStreamTableSink<RowData>, PartitionableTableSink {
|
||||
public class HoodieTableSink implements DynamicTableSink, SupportsPartitioning {
|
||||
|
||||
private final Configuration conf;
|
||||
private final TableSchema schema;
|
||||
@@ -61,63 +60,46 @@ public class HoodieTableSink implements AppendStreamTableSink<RowData>, Partitio
|
||||
}
|
||||
|
||||
@Override
|
||||
public DataStreamSink<?> consumeDataStream(DataStream<RowData> dataStream) {
|
||||
// Read from kafka source
|
||||
RowType rowType = (RowType) this.schema.toRowDataType().notNull().getLogicalType();
|
||||
int numWriteTasks = this.conf.getInteger(FlinkOptions.WRITE_TASKS);
|
||||
StreamWriteOperatorFactory<HoodieRecord> operatorFactory = new StreamWriteOperatorFactory<>(conf);
|
||||
public SinkRuntimeProvider getSinkRuntimeProvider(Context context) {
|
||||
return (DataStreamSinkProvider) dataStream -> {
|
||||
// Read from kafka source
|
||||
RowType rowType = (RowType) schema.toRowDataType().notNull().getLogicalType();
|
||||
int numWriteTasks = conf.getInteger(FlinkOptions.WRITE_TASKS);
|
||||
StreamWriteOperatorFactory<HoodieRecord> operatorFactory = new StreamWriteOperatorFactory<>(conf);
|
||||
|
||||
DataStream<Object> pipeline = dataStream
|
||||
.map(new RowDataToHoodieFunction<>(rowType, conf), TypeInformation.of(HoodieRecord.class))
|
||||
// Key-by partition path, to avoid multiple subtasks write to a partition at the same time
|
||||
.keyBy(HoodieRecord::getPartitionPath)
|
||||
.transform(
|
||||
"bucket_assigner",
|
||||
TypeInformation.of(HoodieRecord.class),
|
||||
new KeyedProcessOperator<>(new BucketAssignFunction<>(conf)))
|
||||
.uid("uid_bucket_assigner")
|
||||
// shuffle by fileId(bucket id)
|
||||
.keyBy(record -> record.getCurrentLocation().getFileId())
|
||||
.transform("hoodie_stream_write", TypeInformation.of(Object.class), operatorFactory)
|
||||
.uid("uid_hoodie_stream_write")
|
||||
.setParallelism(numWriteTasks);
|
||||
if (StreamerUtil.needsScheduleCompaction(conf)) {
|
||||
return pipeline.transform("compact_plan_generate",
|
||||
TypeInformation.of(CompactionPlanEvent.class),
|
||||
new CompactionPlanOperator(conf))
|
||||
.uid("uid_compact_plan_generate")
|
||||
.setParallelism(1) // plan generate must be singleton
|
||||
.keyBy(event -> event.getOperation().hashCode())
|
||||
.transform("compact_task",
|
||||
TypeInformation.of(CompactionCommitEvent.class),
|
||||
new KeyedProcessOperator<>(new CompactFunction(conf)))
|
||||
.addSink(new CompactionCommitSink(conf))
|
||||
.name("compact_commit")
|
||||
.setParallelism(1); // compaction commit should be singleton
|
||||
} else {
|
||||
return pipeline.addSink(new DummySinkFunction<>())
|
||||
.name("dummy").uid("uid_dummy");
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public TableSink<RowData> configure(String[] strings, TypeInformation<?>[] infos) {
|
||||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
public TableSchema getTableSchema() {
|
||||
return this.schema;
|
||||
}
|
||||
|
||||
@Override
|
||||
public DataType getConsumedDataType() {
|
||||
return this.schema.toRowDataType().bridgedTo(RowData.class);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setStaticPartition(Map<String, String> partitions) {
|
||||
// no operation
|
||||
DataStream<Object> pipeline = dataStream
|
||||
.map(new RowDataToHoodieFunction<>(rowType, conf), TypeInformation.of(HoodieRecord.class))
|
||||
// Key-by partition path, to avoid multiple subtasks write to a partition at the same time
|
||||
.keyBy(HoodieRecord::getPartitionPath)
|
||||
.transform(
|
||||
"bucket_assigner",
|
||||
TypeInformation.of(HoodieRecord.class),
|
||||
new KeyedProcessOperator<>(new BucketAssignFunction<>(conf)))
|
||||
.uid("uid_bucket_assigner")
|
||||
// shuffle by fileId(bucket id)
|
||||
.keyBy(record -> record.getCurrentLocation().getFileId())
|
||||
.transform("hoodie_stream_write", TypeInformation.of(Object.class), operatorFactory)
|
||||
.uid("uid_hoodie_stream_write")
|
||||
.setParallelism(numWriteTasks);
|
||||
if (StreamerUtil.needsScheduleCompaction(conf)) {
|
||||
return pipeline.transform("compact_plan_generate",
|
||||
TypeInformation.of(CompactionPlanEvent.class),
|
||||
new CompactionPlanOperator(conf))
|
||||
.uid("uid_compact_plan_generate")
|
||||
.setParallelism(1) // plan generate must be singleton
|
||||
.keyBy(event -> event.getOperation().hashCode())
|
||||
.transform("compact_task",
|
||||
TypeInformation.of(CompactionCommitEvent.class),
|
||||
new KeyedProcessOperator<>(new CompactFunction(conf)))
|
||||
.addSink(new CompactionCommitSink(conf))
|
||||
.name("compact_commit")
|
||||
.setParallelism(1); // compaction commit should be singleton
|
||||
} else {
|
||||
return pipeline.addSink(new DummySinkFunction<>())
|
||||
.setParallelism(1)
|
||||
.name("dummy").uid("uid_dummy");
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
@@ -125,6 +107,31 @@ public class HoodieTableSink implements AppendStreamTableSink<RowData>, Partitio
|
||||
return this.conf;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ChangelogMode getChangelogMode(ChangelogMode changelogMode) {
|
||||
// ignore RowKind.UPDATE_BEFORE
|
||||
return ChangelogMode.newBuilder()
|
||||
.addContainedKind(RowKind.DELETE)
|
||||
.addContainedKind(RowKind.INSERT)
|
||||
.addContainedKind(RowKind.UPDATE_AFTER)
|
||||
.build();
|
||||
}
|
||||
|
||||
@Override
|
||||
public DynamicTableSink copy() {
|
||||
return new HoodieTableSink(this.conf, this.schema);
|
||||
}
|
||||
|
||||
@Override
|
||||
public String asSummaryString() {
|
||||
return "HoodieTableSink";
|
||||
}
|
||||
|
||||
@Override
|
||||
public void applyStaticPartition(Map<String, String> map) {
|
||||
// no operation
|
||||
}
|
||||
|
||||
// Dummy sink function that does nothing.
|
||||
private static class DummySinkFunction<T> implements SinkFunction<T> {}
|
||||
}
|
||||
|
||||
@@ -57,18 +57,20 @@ import org.apache.flink.streaming.api.functions.source.InputFormatSourceFunction
|
||||
import org.apache.flink.streaming.api.operators.OneInputStreamOperatorFactory;
|
||||
import org.apache.flink.table.api.DataTypes;
|
||||
import org.apache.flink.table.api.TableSchema;
|
||||
import org.apache.flink.table.connector.ChangelogMode;
|
||||
import org.apache.flink.table.connector.source.DataStreamScanProvider;
|
||||
import org.apache.flink.table.connector.source.DynamicTableSource;
|
||||
import org.apache.flink.table.connector.source.ScanTableSource;
|
||||
import org.apache.flink.table.connector.source.abilities.SupportsFilterPushDown;
|
||||
import org.apache.flink.table.connector.source.abilities.SupportsLimitPushDown;
|
||||
import org.apache.flink.table.connector.source.abilities.SupportsPartitionPushDown;
|
||||
import org.apache.flink.table.connector.source.abilities.SupportsProjectionPushDown;
|
||||
import org.apache.flink.table.data.RowData;
|
||||
import org.apache.flink.table.expressions.Expression;
|
||||
import org.apache.flink.table.expressions.ResolvedExpression;
|
||||
import org.apache.flink.table.runtime.types.TypeInfoDataTypeConverter;
|
||||
import org.apache.flink.table.sources.FilterableTableSource;
|
||||
import org.apache.flink.table.sources.LimitableTableSource;
|
||||
import org.apache.flink.table.sources.PartitionableTableSource;
|
||||
import org.apache.flink.table.sources.ProjectableTableSource;
|
||||
import org.apache.flink.table.sources.StreamTableSource;
|
||||
import org.apache.flink.table.sources.TableSource;
|
||||
import org.apache.flink.table.types.DataType;
|
||||
import org.apache.flink.table.types.logical.RowType;
|
||||
import org.apache.flink.table.utils.TableConnectorUtils;
|
||||
import org.apache.hadoop.fs.FileStatus;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.mapred.JobConf;
|
||||
@@ -83,6 +85,7 @@ import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
import java.util.stream.Collectors;
|
||||
import java.util.stream.IntStream;
|
||||
@@ -94,11 +97,11 @@ import static org.apache.hudi.table.format.FormatUtils.getParquetConf;
|
||||
* Hoodie batch table source that always read the latest snapshot of the underneath table.
|
||||
*/
|
||||
public class HoodieTableSource implements
|
||||
StreamTableSource<RowData>,
|
||||
PartitionableTableSource,
|
||||
ProjectableTableSource<RowData>,
|
||||
LimitableTableSource<RowData>,
|
||||
FilterableTableSource<RowData> {
|
||||
ScanTableSource,
|
||||
SupportsPartitionPushDown,
|
||||
SupportsProjectionPushDown,
|
||||
SupportsLimitPushDown,
|
||||
SupportsFilterPushDown {
|
||||
private static final Logger LOG = LoggerFactory.getLogger(HoodieTableSource.class);
|
||||
|
||||
private static final int NO_LIMIT_CONSTANT = -1;
|
||||
@@ -113,9 +116,9 @@ public class HoodieTableSource implements
|
||||
private final String defaultPartName;
|
||||
private final Configuration conf;
|
||||
|
||||
private final int[] requiredPos;
|
||||
private final long limit;
|
||||
private final List<Expression> filters;
|
||||
private int[] requiredPos;
|
||||
private long limit;
|
||||
private List<Expression> filters;
|
||||
|
||||
private List<Map<String, String>> requiredPartitions;
|
||||
|
||||
@@ -161,94 +164,92 @@ public class HoodieTableSource implements
|
||||
}
|
||||
|
||||
@Override
|
||||
public DataStream<RowData> getDataStream(StreamExecutionEnvironment execEnv) {
|
||||
@SuppressWarnings("unchecked")
|
||||
TypeInformation<RowData> typeInfo =
|
||||
(TypeInformation<RowData>) TypeInfoDataTypeConverter.fromDataTypeToTypeInfo(getProducedDataType());
|
||||
if (conf.getBoolean(FlinkOptions.READ_AS_STREAMING)) {
|
||||
StreamReadMonitoringFunction monitoringFunction = new StreamReadMonitoringFunction(
|
||||
conf, FilePathUtils.toFlinkPath(path), metaClient, maxCompactionMemoryInBytes);
|
||||
OneInputStreamOperatorFactory<MergeOnReadInputSplit, RowData> factory = StreamReadOperator.factory((MergeOnReadInputFormat) getInputFormat(true));
|
||||
SingleOutputStreamOperator<RowData> source = execEnv.addSource(monitoringFunction, "streaming_source")
|
||||
.setParallelism(1)
|
||||
.uid("uid_streaming_source")
|
||||
.transform("split_reader", typeInfo, factory)
|
||||
.setParallelism(conf.getInteger(FlinkOptions.READ_TASKS))
|
||||
.uid("uid_split_reader");
|
||||
return new DataStreamSource<>(source);
|
||||
} else {
|
||||
InputFormatSourceFunction<RowData> func = new InputFormatSourceFunction<>(getInputFormat(), typeInfo);
|
||||
DataStreamSource<RowData> source = execEnv.addSource(func, explainSource(), typeInfo);
|
||||
return source.name("streaming_source")
|
||||
.setParallelism(conf.getInteger(FlinkOptions.READ_TASKS))
|
||||
.uid("uid_streaming_source");
|
||||
}
|
||||
public ScanRuntimeProvider getScanRuntimeProvider(ScanContext scanContext) {
|
||||
return new DataStreamScanProvider() {
|
||||
|
||||
@Override
|
||||
public boolean isBounded() {
|
||||
return !conf.getBoolean(FlinkOptions.READ_AS_STREAMING);
|
||||
}
|
||||
|
||||
@Override
|
||||
public DataStream<RowData> produceDataStream(StreamExecutionEnvironment execEnv) {
|
||||
@SuppressWarnings("unchecked")
|
||||
TypeInformation<RowData> typeInfo =
|
||||
(TypeInformation<RowData>) TypeInfoDataTypeConverter.fromDataTypeToTypeInfo(getProducedDataType());
|
||||
if (conf.getBoolean(FlinkOptions.READ_AS_STREAMING)) {
|
||||
StreamReadMonitoringFunction monitoringFunction = new StreamReadMonitoringFunction(
|
||||
conf, FilePathUtils.toFlinkPath(path), metaClient, maxCompactionMemoryInBytes);
|
||||
OneInputStreamOperatorFactory<MergeOnReadInputSplit, RowData> factory = StreamReadOperator.factory((MergeOnReadInputFormat) getInputFormat(true));
|
||||
SingleOutputStreamOperator<RowData> source = execEnv.addSource(monitoringFunction, "streaming_source")
|
||||
.setParallelism(1)
|
||||
.uid("uid_streaming_source")
|
||||
.transform("split_reader", typeInfo, factory)
|
||||
.setParallelism(conf.getInteger(FlinkOptions.READ_TASKS))
|
||||
.uid("uid_split_reader");
|
||||
return new DataStreamSource<>(source);
|
||||
} else {
|
||||
InputFormatSourceFunction<RowData> func = new InputFormatSourceFunction<>(getInputFormat(), typeInfo);
|
||||
DataStreamSource<RowData> source = execEnv.addSource(func, asSummaryString(), typeInfo);
|
||||
return source.name("streaming_source")
|
||||
.setParallelism(conf.getInteger(FlinkOptions.READ_TASKS))
|
||||
.uid("uid_streaming_source");
|
||||
}
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isBounded() {
|
||||
return !conf.getBoolean(FlinkOptions.READ_AS_STREAMING);
|
||||
public ChangelogMode getChangelogMode() {
|
||||
return ChangelogMode.insertOnly();
|
||||
}
|
||||
|
||||
@Override
|
||||
public TableSource<RowData> applyPredicate(List<Expression> predicates) {
|
||||
return new HoodieTableSource(schema, path, partitionKeys, defaultPartName, conf,
|
||||
requiredPartitions, requiredPos, limit, new ArrayList<>(predicates));
|
||||
public DynamicTableSource copy() {
|
||||
return new HoodieTableSource(schema, path, partitionKeys, defaultPartName,
|
||||
conf, requiredPartitions, requiredPos, limit, filters);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isFilterPushedDown() {
|
||||
return this.filters != null && this.filters.size() > 0;
|
||||
public String asSummaryString() {
|
||||
return "HudiTableSource";
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isLimitPushedDown() {
|
||||
return this.limit != NO_LIMIT_CONSTANT;
|
||||
public Result applyFilters(List<ResolvedExpression> filters) {
|
||||
this.filters = new ArrayList<>(filters);
|
||||
return Result.of(new ArrayList<>(filters), new ArrayList<>(filters));
|
||||
}
|
||||
|
||||
@Override
|
||||
public TableSource<RowData> applyLimit(long limit) {
|
||||
return new HoodieTableSource(schema, path, partitionKeys, defaultPartName, conf,
|
||||
requiredPartitions, requiredPos, limit, filters);
|
||||
public Optional<List<Map<String, String>>> listPartitions() {
|
||||
List<Map<String, String>> partitions = FilePathUtils.getPartitions(path, hadoopConf,
|
||||
partitionKeys, defaultPartName, conf.getBoolean(FlinkOptions.HIVE_STYLE_PARTITION));
|
||||
return Optional.of(partitions);
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<Map<String, String>> getPartitions() {
|
||||
return FilePathUtils.getPartitions(path, hadoopConf, partitionKeys, defaultPartName,
|
||||
conf.getBoolean(FlinkOptions.HIVE_STYLE_PARTITION));
|
||||
public void applyPartitions(List<Map<String, String>> partitions) {
|
||||
this.requiredPartitions = partitions;
|
||||
}
|
||||
|
||||
@Override
|
||||
public TableSource applyPartitionPruning(List<Map<String, String>> requiredPartitions) {
|
||||
return new HoodieTableSource(schema, path, partitionKeys, defaultPartName, conf,
|
||||
requiredPartitions, requiredPos, limit, filters);
|
||||
public boolean supportsNestedProjection() {
|
||||
return false;
|
||||
}
|
||||
|
||||
@Override
|
||||
public TableSource<RowData> projectFields(int[] requiredPos) {
|
||||
return new HoodieTableSource(schema, path, partitionKeys, defaultPartName, conf,
|
||||
requiredPartitions, requiredPos, limit, filters);
|
||||
public void applyProjection(int[][] projections) {
|
||||
// nested projection is not supported.
|
||||
this.requiredPos = Arrays.stream(projections).mapToInt(array -> array[0]).toArray();
|
||||
}
|
||||
|
||||
@Override
|
||||
public TableSchema getTableSchema() {
|
||||
return schema;
|
||||
public void applyLimit(long limit) {
|
||||
this.limit = limit;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String explainSource() {
|
||||
final String filterString = filters.stream()
|
||||
.map(Expression::asSummaryString)
|
||||
.collect(Collectors.joining(","));
|
||||
return TableConnectorUtils.generateRuntimeName(getClass(), getTableSchema().getFieldNames())
|
||||
+ (requiredPartitions == null ? "" : ", requiredPartition=" + requiredPartitions)
|
||||
+ (requiredPos == null ? "" : ", requiredPos=" + Arrays.toString(requiredPos))
|
||||
+ (limit == -1 ? "" : ", limit=" + limit)
|
||||
+ (filters.size() == 0 ? "" : ", filters=" + filterString);
|
||||
}
|
||||
|
||||
@Override
|
||||
public DataType getProducedDataType() {
|
||||
private DataType getProducedDataType() {
|
||||
String[] schemaFieldNames = this.schema.getFieldNames();
|
||||
DataType[] schemaTypes = this.schema.getFieldDataTypes();
|
||||
|
||||
@@ -260,7 +261,7 @@ public class HoodieTableSource implements
|
||||
|
||||
private List<Map<String, String>> getOrFetchPartitions() {
|
||||
if (requiredPartitions == null) {
|
||||
requiredPartitions = getPartitions();
|
||||
requiredPartitions = listPartitions().orElse(Collections.emptyList());
|
||||
}
|
||||
return requiredPartitions;
|
||||
}
|
||||
|
||||
@@ -225,8 +225,8 @@ public class StreamerUtil {
|
||||
// put all the set up options
|
||||
conf.addAllToProperties(properties);
|
||||
// put all the default options
|
||||
for (ConfigOption<?> option : FlinkOptions.OPTIONAL_OPTIONS) {
|
||||
if (!conf.contains(option)) {
|
||||
for (ConfigOption<?> option : FlinkOptions.optionalOptions()) {
|
||||
if (!conf.contains(option) && option.hasDefaultValue()) {
|
||||
properties.put(option.key(), option.defaultValue());
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user