[HUDI-2191] Bump flink version to 1.13.1 (#3291)
This commit is contained in:
@@ -41,7 +41,6 @@ import org.apache.flink.configuration.Configuration;
|
||||
import org.apache.flink.runtime.jobgraph.OperatorID;
|
||||
import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
|
||||
import org.apache.flink.runtime.operators.coordination.OperatorEvent;
|
||||
import org.apache.flink.runtime.operators.coordination.TaskNotRunningException;
|
||||
import org.jetbrains.annotations.Nullable;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
@@ -86,6 +85,11 @@ public class StreamWriteOperatorCoordinator
|
||||
*/
|
||||
private final Context context;
|
||||
|
||||
/**
|
||||
* Gateways for sending events to sub tasks.
|
||||
*/
|
||||
private transient SubtaskGateway[] gateways;
|
||||
|
||||
/**
|
||||
* Write client.
|
||||
*/
|
||||
@@ -150,6 +154,7 @@ public class StreamWriteOperatorCoordinator
|
||||
public void start() throws Exception {
|
||||
// initialize event buffer
|
||||
reset();
|
||||
this.gateways = new SubtaskGateway[this.parallelism];
|
||||
this.writeClient = StreamerUtil.createWriteClient(conf);
|
||||
this.tableState = TableState.create(conf);
|
||||
// init table, create it if not exists.
|
||||
@@ -257,6 +262,11 @@ public class StreamWriteOperatorCoordinator
|
||||
// no operation
|
||||
}
|
||||
|
||||
@Override
|
||||
public void subtaskReady(int i, SubtaskGateway subtaskGateway) {
|
||||
this.gateways[i] = subtaskGateway;
|
||||
}
|
||||
|
||||
// -------------------------------------------------------------------------
|
||||
// Utilities
|
||||
// -------------------------------------------------------------------------
|
||||
@@ -397,13 +407,8 @@ public class StreamWriteOperatorCoordinator
|
||||
*/
|
||||
private void sendCommitAckEvents() {
|
||||
CompletableFuture<?>[] futures = IntStream.range(0, this.parallelism)
|
||||
.mapToObj(taskID -> {
|
||||
try {
|
||||
return this.context.sendEvent(CommitAckEvent.getInstance(), taskID);
|
||||
} catch (TaskNotRunningException e) {
|
||||
throw new HoodieException("Error while sending commit ack event to task [" + taskID + "]", e);
|
||||
}
|
||||
}).toArray(CompletableFuture<?>[]::new);
|
||||
.mapToObj(taskID -> this.gateways[taskID].sendEvent(CommitAckEvent.getInstance()))
|
||||
.toArray(CompletableFuture<?>[]::new);
|
||||
try {
|
||||
CompletableFuture.allOf(futures).get();
|
||||
} catch (Exception e) {
|
||||
|
||||
@@ -22,11 +22,10 @@ import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
|
||||
import org.apache.flink.table.api.TableConfig;
|
||||
import org.apache.flink.table.data.RowData;
|
||||
import org.apache.flink.table.planner.codegen.sort.SortCodeGenerator;
|
||||
import org.apache.flink.table.types.logical.LogicalType;
|
||||
import org.apache.flink.table.planner.plan.nodes.exec.spec.SortSpec;
|
||||
import org.apache.flink.table.types.logical.RowType;
|
||||
|
||||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
import java.util.stream.IntStream;
|
||||
|
||||
/**
|
||||
@@ -34,13 +33,12 @@ import java.util.stream.IntStream;
|
||||
*/
|
||||
public class SortOperatorGen {
|
||||
private final int[] sortIndices;
|
||||
private final LogicalType[] sortTypes;
|
||||
private final RowType rowType;
|
||||
private final TableConfig tableConfig = new TableConfig();
|
||||
|
||||
public SortOperatorGen(RowType rowType, String[] sortFields) {
|
||||
this.sortIndices = Arrays.stream(sortFields).mapToInt(rowType::getFieldIndex).toArray();
|
||||
List<RowType.RowField> fields = rowType.getFields();
|
||||
this.sortTypes = Arrays.stream(sortIndices).mapToObj(idx -> fields.get(idx).getType()).toArray(LogicalType[]::new);
|
||||
this.rowType = rowType;
|
||||
}
|
||||
|
||||
public OneInputStreamOperator<RowData, RowData> createSortOperator() {
|
||||
@@ -51,8 +49,8 @@ public class SortOperatorGen {
|
||||
}
|
||||
|
||||
private SortCodeGenerator createSortCodeGenerator() {
|
||||
boolean[] padBooleans = new boolean[sortIndices.length];
|
||||
IntStream.range(0, sortIndices.length).forEach(i -> padBooleans[i] = true);
|
||||
return new SortCodeGenerator(tableConfig, sortIndices, sortTypes, padBooleans, padBooleans);
|
||||
SortSpec.SortSpecBuilder builder = SortSpec.builder();
|
||||
IntStream.range(0, sortIndices.length).forEach(i -> builder.addField(i, true, true));
|
||||
return new SortCodeGenerator(tableConfig, rowType, builder.build());
|
||||
}
|
||||
}
|
||||
|
||||
@@ -149,7 +149,7 @@ public class WriteMetadataEvent implements OperatorEvent {
|
||||
ValidationUtils.checkArgument(this.taskID == other.taskID);
|
||||
// the instant time could be monotonically increasing
|
||||
this.instantTime = other.instantTime;
|
||||
this.lastBatch |= other.lastBatch; // true if one of the event isLastBatch true.
|
||||
this.lastBatch |= other.lastBatch; // true if one of the event lastBatch is true
|
||||
List<WriteStatus> statusList = new ArrayList<>();
|
||||
statusList.addAll(this.writeStatuses);
|
||||
statusList.addAll(other.writeStatuses);
|
||||
|
||||
@@ -37,15 +37,16 @@ import org.apache.hudi.util.StreamerUtil;
|
||||
|
||||
import org.apache.flink.annotation.VisibleForTesting;
|
||||
import org.apache.flink.api.common.state.CheckpointListener;
|
||||
import org.apache.flink.api.common.state.StateTtlConfig;
|
||||
import org.apache.flink.api.common.state.ValueState;
|
||||
import org.apache.flink.api.common.state.ValueStateDescriptor;
|
||||
import org.apache.flink.api.common.time.Time;
|
||||
import org.apache.flink.api.common.typeinfo.TypeInformation;
|
||||
import org.apache.flink.configuration.Configuration;
|
||||
import org.apache.flink.runtime.state.FunctionInitializationContext;
|
||||
import org.apache.flink.runtime.state.FunctionSnapshotContext;
|
||||
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
|
||||
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
|
||||
import org.apache.flink.table.runtime.util.StateTtlConfigUtil;
|
||||
import org.apache.flink.util.Collector;
|
||||
|
||||
import java.util.Objects;
|
||||
@@ -147,7 +148,7 @@ public class BucketAssignFunction<K, I, O extends HoodieRecord<?>>
|
||||
TypeInformation.of(HoodieRecordGlobalLocation.class));
|
||||
double ttl = conf.getDouble(FlinkOptions.INDEX_STATE_TTL) * 24 * 60 * 60 * 1000;
|
||||
if (ttl > 0) {
|
||||
indexStateDesc.enableTimeToLive(StateTtlConfigUtil.createTtlConfig((long) ttl));
|
||||
indexStateDesc.enableTimeToLive(StateTtlConfig.newBuilder(Time.milliseconds((long) ttl)).build());
|
||||
}
|
||||
indexState = context.getKeyedStateStore().getState(indexStateDesc);
|
||||
}
|
||||
|
||||
@@ -39,8 +39,8 @@ import org.apache.hudi.util.StreamerUtil;
|
||||
import com.beust.jcommander.JCommander;
|
||||
import org.apache.flink.api.common.typeinfo.TypeInformation;
|
||||
import org.apache.flink.configuration.Configuration;
|
||||
import org.apache.flink.formats.common.TimestampFormat;
|
||||
import org.apache.flink.formats.json.JsonRowDataDeserializationSchema;
|
||||
import org.apache.flink.formats.json.TimestampFormat;
|
||||
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
|
||||
import org.apache.flink.streaming.api.datastream.DataStream;
|
||||
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
|
||||
@@ -54,9 +54,7 @@ import java.util.Properties;
|
||||
|
||||
/**
|
||||
* An Utility which can incrementally consume data from Kafka and apply it to the target table.
|
||||
* currently, it only support COW table and insert, upsert operation.
|
||||
* <p>
|
||||
* note: HoodieFlinkStreamer is not suitable to initialize on large tables when we have no checkpoint to restore from.
|
||||
* currently, it only supports COW table and insert, upsert operation.
|
||||
*/
|
||||
public class HoodieFlinkStreamer {
|
||||
public static void main(String[] args) throws Exception {
|
||||
|
||||
@@ -27,17 +27,16 @@ import org.apache.hudi.util.StreamerUtil;
|
||||
|
||||
import org.apache.flink.configuration.ConfigOption;
|
||||
import org.apache.flink.configuration.Configuration;
|
||||
import org.apache.flink.table.api.TableSchema;
|
||||
import org.apache.flink.table.api.ValidationException;
|
||||
import org.apache.flink.table.api.constraints.UniqueConstraint;
|
||||
import org.apache.flink.table.catalog.CatalogTable;
|
||||
import org.apache.flink.table.catalog.ResolvedSchema;
|
||||
import org.apache.flink.table.connector.sink.DynamicTableSink;
|
||||
import org.apache.flink.table.connector.source.DynamicTableSource;
|
||||
import org.apache.flink.table.factories.DynamicTableSinkFactory;
|
||||
import org.apache.flink.table.factories.DynamicTableSourceFactory;
|
||||
import org.apache.flink.table.factories.FactoryUtil;
|
||||
import org.apache.flink.table.types.logical.LogicalType;
|
||||
import org.apache.flink.table.utils.TableSchemaUtils;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
@@ -46,7 +45,6 @@ import java.util.Arrays;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.Set;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
/**
|
||||
* Hoodie data source/sink factory.
|
||||
@@ -62,7 +60,7 @@ public class HoodieTableFactory implements DynamicTableSourceFactory, DynamicTab
|
||||
helper.validate();
|
||||
|
||||
Configuration conf = (Configuration) helper.getOptions();
|
||||
TableSchema schema = TableSchemaUtils.getPhysicalSchema(context.getCatalogTable().getSchema());
|
||||
ResolvedSchema schema = context.getCatalogTable().getResolvedSchema();
|
||||
sanityCheck(conf, schema);
|
||||
setupConfOptions(conf, context.getObjectIdentifier().getObjectName(), context.getCatalogTable(), schema);
|
||||
|
||||
@@ -79,7 +77,7 @@ public class HoodieTableFactory implements DynamicTableSourceFactory, DynamicTab
|
||||
@Override
|
||||
public DynamicTableSink createDynamicTableSink(Context context) {
|
||||
Configuration conf = FlinkOptions.fromMap(context.getCatalogTable().getOptions());
|
||||
TableSchema schema = TableSchemaUtils.getPhysicalSchema(context.getCatalogTable().getSchema());
|
||||
ResolvedSchema schema = context.getCatalogTable().getResolvedSchema();
|
||||
sanityCheck(conf, schema);
|
||||
setupConfOptions(conf, context.getObjectIdentifier().getObjectName(), context.getCatalogTable(), schema);
|
||||
return new HoodieTableSink(conf, schema);
|
||||
@@ -110,8 +108,8 @@ public class HoodieTableFactory implements DynamicTableSourceFactory, DynamicTab
|
||||
* @param conf The table options
|
||||
* @param schema The table schema
|
||||
*/
|
||||
private void sanityCheck(Configuration conf, TableSchema schema) {
|
||||
List<String> fields = Arrays.stream(schema.getFieldNames()).collect(Collectors.toList());
|
||||
private void sanityCheck(Configuration conf, ResolvedSchema schema) {
|
||||
List<String> fields = schema.getColumnNames();
|
||||
|
||||
// validate record key in pk absence.
|
||||
if (!schema.getPrimaryKey().isPresent()) {
|
||||
@@ -144,7 +142,7 @@ public class HoodieTableFactory implements DynamicTableSourceFactory, DynamicTab
|
||||
Configuration conf,
|
||||
String tableName,
|
||||
CatalogTable table,
|
||||
TableSchema schema) {
|
||||
ResolvedSchema schema) {
|
||||
// table name
|
||||
conf.setString(FlinkOptions.TABLE_NAME.key(), tableName);
|
||||
// hoodie key about options
|
||||
@@ -154,7 +152,7 @@ public class HoodieTableFactory implements DynamicTableSourceFactory, DynamicTab
|
||||
// hive options
|
||||
setupHiveOptions(conf);
|
||||
// infer avro schema from physical DDL schema
|
||||
inferAvroSchema(conf, schema.toRowDataType().notNull().getLogicalType());
|
||||
inferAvroSchema(conf, schema.toPhysicalRowDataType().notNull().getLogicalType());
|
||||
}
|
||||
|
||||
/**
|
||||
|
||||
@@ -34,9 +34,9 @@ import org.apache.hudi.sink.compact.CompactionPlanEvent;
|
||||
import org.apache.hudi.sink.compact.CompactionPlanOperator;
|
||||
import org.apache.hudi.sink.partitioner.BucketAssignFunction;
|
||||
import org.apache.hudi.sink.partitioner.BucketAssignOperator;
|
||||
import org.apache.hudi.util.ChangelogModes;
|
||||
import org.apache.hudi.sink.transform.RowDataToHoodieFunctions;
|
||||
import org.apache.hudi.table.format.FilePathUtils;
|
||||
import org.apache.hudi.util.ChangelogModes;
|
||||
import org.apache.hudi.util.StreamerUtil;
|
||||
|
||||
import org.apache.flink.annotation.VisibleForTesting;
|
||||
@@ -44,14 +44,14 @@ import org.apache.flink.api.common.typeinfo.TypeInformation;
|
||||
import org.apache.flink.configuration.Configuration;
|
||||
import org.apache.flink.streaming.api.datastream.DataStream;
|
||||
import org.apache.flink.streaming.api.operators.ProcessOperator;
|
||||
import org.apache.flink.table.api.TableSchema;
|
||||
import org.apache.flink.table.catalog.ResolvedSchema;
|
||||
import org.apache.flink.table.connector.ChangelogMode;
|
||||
import org.apache.flink.table.connector.sink.DataStreamSinkProvider;
|
||||
import org.apache.flink.table.connector.sink.DynamicTableSink;
|
||||
import org.apache.flink.table.connector.sink.abilities.SupportsOverwrite;
|
||||
import org.apache.flink.table.connector.sink.abilities.SupportsPartitioning;
|
||||
import org.apache.flink.table.data.RowData;
|
||||
import org.apache.flink.table.planner.plan.nodes.exec.ExecNode$;
|
||||
import org.apache.flink.table.planner.plan.nodes.exec.utils.ExecNodeUtil;
|
||||
import org.apache.flink.table.types.logical.RowType;
|
||||
|
||||
import java.util.Map;
|
||||
@@ -62,15 +62,15 @@ import java.util.Map;
|
||||
public class HoodieTableSink implements DynamicTableSink, SupportsPartitioning, SupportsOverwrite {
|
||||
|
||||
private final Configuration conf;
|
||||
private final TableSchema schema;
|
||||
private final ResolvedSchema schema;
|
||||
private boolean overwrite = false;
|
||||
|
||||
public HoodieTableSink(Configuration conf, TableSchema schema) {
|
||||
public HoodieTableSink(Configuration conf, ResolvedSchema schema) {
|
||||
this.conf = conf;
|
||||
this.schema = schema;
|
||||
}
|
||||
|
||||
public HoodieTableSink(Configuration conf, TableSchema schema, boolean overwrite) {
|
||||
public HoodieTableSink(Configuration conf, ResolvedSchema schema, boolean overwrite) {
|
||||
this.conf = conf;
|
||||
this.schema = schema;
|
||||
this.overwrite = overwrite;
|
||||
@@ -85,7 +85,7 @@ public class HoodieTableSink implements DynamicTableSink, SupportsPartitioning,
|
||||
.getCheckpointConfig().getCheckpointTimeout();
|
||||
conf.setLong(FlinkOptions.WRITE_COMMIT_ACK_TIMEOUT, ckpTimeout);
|
||||
|
||||
RowType rowType = (RowType) schema.toRowDataType().notNull().getLogicalType();
|
||||
RowType rowType = (RowType) schema.toSourceRowDataType().notNull().getLogicalType();
|
||||
|
||||
// bulk_insert mode
|
||||
final String writeOperation = this.conf.get(FlinkOptions.OPERATION);
|
||||
@@ -108,7 +108,7 @@ public class HoodieTableSink implements DynamicTableSink, SupportsPartitioning,
|
||||
TypeInformation.of(RowData.class),
|
||||
sortOperatorGen.createSortOperator())
|
||||
.setParallelism(conf.getInteger(FlinkOptions.WRITE_TASKS));
|
||||
ExecNode$.MODULE$.setManagedMemoryWeight(dataStream.getTransformation(),
|
||||
ExecNodeUtil.setManagedMemoryWeight(dataStream.getTransformation(),
|
||||
conf.getInteger(FlinkOptions.WRITE_SORT_MEMORY) * 1024L * 1024L);
|
||||
}
|
||||
}
|
||||
@@ -203,21 +203,18 @@ public class HoodieTableSink implements DynamicTableSink, SupportsPartitioning,
|
||||
}
|
||||
|
||||
@Override
|
||||
public void applyStaticPartition(Map<String, String> partition) {
|
||||
public void applyStaticPartition(Map<String, String> partitions) {
|
||||
// #applyOverwrite should have been invoked.
|
||||
if (this.overwrite) {
|
||||
final String operationType;
|
||||
if (partition.size() > 0) {
|
||||
operationType = WriteOperationType.INSERT_OVERWRITE.value();
|
||||
} else {
|
||||
operationType = WriteOperationType.INSERT_OVERWRITE_TABLE.value();
|
||||
}
|
||||
this.conf.setString(FlinkOptions.OPERATION, operationType);
|
||||
if (this.overwrite && partitions.size() > 0) {
|
||||
this.conf.setString(FlinkOptions.OPERATION, WriteOperationType.INSERT_OVERWRITE.value());
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void applyOverwrite(boolean b) {
|
||||
this.overwrite = b;
|
||||
public void applyOverwrite(boolean overwrite) {
|
||||
this.overwrite = overwrite;
|
||||
// set up the operation as INSERT_OVERWRITE_TABLE first,
|
||||
// if there are explicit partitions, #applyStaticPartition would overwrite the option.
|
||||
this.conf.setString(FlinkOptions.OPERATION, WriteOperationType.INSERT_OVERWRITE_TABLE.value());
|
||||
}
|
||||
}
|
||||
|
||||
@@ -55,7 +55,7 @@ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
|
||||
import org.apache.flink.streaming.api.functions.source.InputFormatSourceFunction;
|
||||
import org.apache.flink.streaming.api.operators.OneInputStreamOperatorFactory;
|
||||
import org.apache.flink.table.api.DataTypes;
|
||||
import org.apache.flink.table.api.TableSchema;
|
||||
import org.apache.flink.table.catalog.ResolvedSchema;
|
||||
import org.apache.flink.table.connector.ChangelogMode;
|
||||
import org.apache.flink.table.connector.source.DataStreamScanProvider;
|
||||
import org.apache.flink.table.connector.source.DynamicTableSource;
|
||||
@@ -109,7 +109,7 @@ public class HoodieTableSource implements
|
||||
private final transient HoodieTableMetaClient metaClient;
|
||||
private final long maxCompactionMemoryInBytes;
|
||||
|
||||
private final TableSchema schema;
|
||||
private final ResolvedSchema schema;
|
||||
private final Path path;
|
||||
private final List<String> partitionKeys;
|
||||
private final String defaultPartName;
|
||||
@@ -122,7 +122,7 @@ public class HoodieTableSource implements
|
||||
private List<Map<String, String>> requiredPartitions;
|
||||
|
||||
public HoodieTableSource(
|
||||
TableSchema schema,
|
||||
ResolvedSchema schema,
|
||||
Path path,
|
||||
List<String> partitionKeys,
|
||||
String defaultPartName,
|
||||
@@ -131,7 +131,7 @@ public class HoodieTableSource implements
|
||||
}
|
||||
|
||||
public HoodieTableSource(
|
||||
TableSchema schema,
|
||||
ResolvedSchema schema,
|
||||
Path path,
|
||||
List<String> partitionKeys,
|
||||
String defaultPartName,
|
||||
@@ -147,7 +147,7 @@ public class HoodieTableSource implements
|
||||
this.conf = conf;
|
||||
this.requiredPartitions = requiredPartitions;
|
||||
this.requiredPos = requiredPos == null
|
||||
? IntStream.range(0, schema.getFieldCount()).toArray()
|
||||
? IntStream.range(0, schema.getColumnCount()).toArray()
|
||||
: requiredPos;
|
||||
this.limit = limit == null ? NO_LIMIT_CONSTANT : limit;
|
||||
this.filters = filters == null ? Collections.emptyList() : filters;
|
||||
@@ -250,8 +250,8 @@ public class HoodieTableSource implements
|
||||
}
|
||||
|
||||
private DataType getProducedDataType() {
|
||||
String[] schemaFieldNames = this.schema.getFieldNames();
|
||||
DataType[] schemaTypes = this.schema.getFieldDataTypes();
|
||||
String[] schemaFieldNames = this.schema.getColumnNames().toArray(new String[0]);
|
||||
DataType[] schemaTypes = this.schema.getColumnDataTypes().toArray(new DataType[0]);
|
||||
|
||||
return DataTypes.ROW(Arrays.stream(this.requiredPos)
|
||||
.mapToObj(i -> DataTypes.FIELD(schemaFieldNames[i], schemaTypes[i]))
|
||||
@@ -383,8 +383,8 @@ public class HoodieTableSource implements
|
||||
}
|
||||
FileInputFormat<RowData> format = new CopyOnWriteInputFormat(
|
||||
FilePathUtils.toFlinkPaths(paths),
|
||||
this.schema.getFieldNames(),
|
||||
this.schema.getFieldDataTypes(),
|
||||
this.schema.getColumnNames().toArray(new String[0]),
|
||||
this.schema.getColumnDataTypes().toArray(new DataType[0]),
|
||||
this.requiredPos,
|
||||
this.conf.getString(FlinkOptions.PARTITION_DEFAULT_NAME),
|
||||
this.limit == NO_LIMIT_CONSTANT ? Long.MAX_VALUE : this.limit, // ParquetInputFormat always uses the limit value
|
||||
@@ -399,8 +399,8 @@ public class HoodieTableSource implements
|
||||
case FlinkOptions.QUERY_TYPE_READ_OPTIMIZED:
|
||||
FileInputFormat<RowData> format = new CopyOnWriteInputFormat(
|
||||
FilePathUtils.toFlinkPaths(paths),
|
||||
this.schema.getFieldNames(),
|
||||
this.schema.getFieldDataTypes(),
|
||||
this.schema.getColumnNames().toArray(new String[0]),
|
||||
this.schema.getColumnDataTypes().toArray(new DataType[0]),
|
||||
this.requiredPos,
|
||||
"default",
|
||||
this.limit == NO_LIMIT_CONSTANT ? Long.MAX_VALUE : this.limit, // ParquetInputFormat always uses the limit value
|
||||
|
||||
Reference in New Issue
Block a user