1
0

[HUDI-1522] Add a new pipeline for Flink writer (#2430)

* [HUDI-1522] Add a new pipeline for Flink writer
This commit is contained in:
Danny Chan
2021-01-28 08:53:13 +08:00
committed by GitHub
parent 7b2e658ac0
commit bc0325f6ea
40 changed files with 3613 additions and 302 deletions

View File

@@ -1,198 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.OverwriteWithLatestAvroPayload;
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.operator.InstantGenerateOperator;
import org.apache.hudi.operator.KeyedWriteProcessFunction;
import org.apache.hudi.operator.KeyedWriteProcessOperator;
import org.apache.hudi.sink.CommitSink;
import org.apache.hudi.source.JsonStringToHoodieRecordMapFunction;
import org.apache.hudi.util.StreamerUtil;
import com.beust.jcommander.IStringConverter;
import com.beust.jcommander.JCommander;
import com.beust.jcommander.Parameter;
import com.beust.jcommander.ParameterException;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
/**
* An Utility which can incrementally consume data from Kafka and apply it to the target table.
* currently, it only support COW table and insert, upsert operation.
*/
public class HoodieFlinkStreamer {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
final Config cfg = new Config();
JCommander cmd = new JCommander(cfg, null, args);
if (cfg.help || args.length == 0) {
cmd.usage();
System.exit(1);
}
env.enableCheckpointing(cfg.checkpointInterval);
env.getConfig().setGlobalJobParameters(cfg);
// We use checkpoint to trigger write operation, including instant generating and committing,
// There can only be one checkpoint at one time.
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
env.disableOperatorChaining();
if (cfg.flinkCheckPointPath != null) {
env.setStateBackend(new FsStateBackend(cfg.flinkCheckPointPath));
}
TypedProperties props = StreamerUtil.getProps(cfg);
// add kafka config
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, cfg.kafkaBootstrapServers);
props.put(ConsumerConfig.GROUP_ID_CONFIG, cfg.kafkaGroupId);
// add data source config
props.put(HoodieWriteConfig.WRITE_PAYLOAD_CLASS, cfg.payloadClassName);
props.put(HoodieWriteConfig.PRECOMBINE_FIELD_PROP, cfg.sourceOrderingField);
// Read from kafka source
DataStream<HoodieRecord> inputRecords =
env.addSource(new FlinkKafkaConsumer<>(cfg.kafkaTopic, new SimpleStringSchema(), props))
.filter(Objects::nonNull)
.map(new JsonStringToHoodieRecordMapFunction(props))
.name("kafka_to_hudi_record")
.uid("kafka_to_hudi_record_uid");
// InstantGenerateOperator helps to emit globally unique instantTime
inputRecords.transform(InstantGenerateOperator.NAME, TypeInformation.of(HoodieRecord.class), new InstantGenerateOperator())
.name("instant_generator")
.uid("instant_generator_id")
// Keyby partition path, to avoid multiple subtasks writing to a partition at the same time
.keyBy(HoodieRecord::getPartitionPath)
// write operator, where the write operation really happens
.transform(KeyedWriteProcessOperator.NAME, TypeInformation.of(new TypeHint<Tuple3<String, List<WriteStatus>, Integer>>() {
}), new KeyedWriteProcessOperator(new KeyedWriteProcessFunction()))
.name("write_process")
.uid("write_process_uid")
.setParallelism(env.getParallelism())
// Commit can only be executed once, so make it one parallelism
.addSink(new CommitSink())
.name("commit_sink")
.uid("commit_sink_uid")
.setParallelism(1);
env.execute(cfg.targetTableName);
}
public static class Config extends Configuration {
@Parameter(names = {"--kafka-topic"}, description = "kafka topic", required = true)
public String kafkaTopic;
@Parameter(names = {"--kafka-group-id"}, description = "kafka consumer group id", required = true)
public String kafkaGroupId;
@Parameter(names = {"--kafka-bootstrap-servers"}, description = "kafka bootstrap.servers", required = true)
public String kafkaBootstrapServers;
@Parameter(names = {"--flink-checkpoint-path"}, description = "flink checkpoint path")
public String flinkCheckPointPath;
@Parameter(names = {"--flink-block-retry-times"}, description = "Times to retry when latest instant has not completed")
public String blockRetryTime = "10";
@Parameter(names = {"--flink-block-retry-interval"}, description = "Seconds between two tries when latest instant has not completed")
public String blockRetryInterval = "1";
@Parameter(names = {"--target-base-path"},
description = "base path for the target hoodie table. "
+ "(Will be created if did not exist first time around. If exists, expected to be a hoodie table)",
required = true)
public String targetBasePath;
@Parameter(names = {"--target-table"}, description = "name of the target table in Hive", required = true)
public String targetTableName;
@Parameter(names = {"--table-type"}, description = "Type of table. COPY_ON_WRITE (or) MERGE_ON_READ", required = true)
public String tableType;
@Parameter(names = {"--props"}, description = "path to properties file on localfs or dfs, with configurations for "
+ "hoodie client, schema provider, key generator and data source. For hoodie client props, sane defaults are "
+ "used, but recommend use to provide basic things like metrics endpoints, hive configs etc. For sources, refer"
+ "to individual classes, for supported properties.")
public String propsFilePath =
"file://" + System.getProperty("user.dir") + "/src/test/resources/delta-streamer-config/dfs-source.properties";
@Parameter(names = {"--hoodie-conf"}, description = "Any configuration that can be set in the properties file "
+ "(using the CLI parameter \"--props\") can also be passed command line using this parameter.")
public List<String> configs = new ArrayList<>();
@Parameter(names = {"--source-ordering-field"}, description = "Field within source record to decide how"
+ " to break ties between records with same key in input data. Default: 'ts' holding unix timestamp of record")
public String sourceOrderingField = "ts";
@Parameter(names = {"--payload-class"}, description = "subclass of HoodieRecordPayload, that works off "
+ "a GenericRecord. Implement your own, if you want to do something other than overwriting existing value")
public String payloadClassName = OverwriteWithLatestAvroPayload.class.getName();
@Parameter(names = {"--op"}, description = "Takes one of these values : UPSERT (default), INSERT (use when input "
+ "is purely new data/inserts to gain speed)", converter = OperationConverter.class)
public WriteOperationType operation = WriteOperationType.UPSERT;
@Parameter(names = {"--filter-dupes"},
description = "Should duplicate records from source be dropped/filtered out before insert/bulk-insert")
public Boolean filterDupes = false;
@Parameter(names = {"--commit-on-errors"}, description = "Commit even when some records failed to be written")
public Boolean commitOnErrors = false;
/**
* Flink checkpoint interval.
*/
@Parameter(names = {"--checkpoint-interval"}, description = "Flink checkpoint interval.")
public Long checkpointInterval = 1000 * 5L;
@Parameter(names = {"--help", "-h"}, help = true)
public Boolean help = false;
}
private static class OperationConverter implements IStringConverter<WriteOperationType> {
@Override
public WriteOperationType convert(String value) throws ParameterException {
return WriteOperationType.valueOf(value);
}
}
}

View File

@@ -0,0 +1,249 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.operator;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.streamer.FlinkStreamerConfig;
import org.apache.hudi.common.model.OverwriteWithLatestAvroPayload;
import org.apache.hudi.keygen.SimpleAvroKeyGenerator;
import org.apache.hudi.keygen.constant.KeyGeneratorOptions;
import org.apache.hudi.util.StreamerUtil;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ConfigOptions;
import org.apache.flink.configuration.Configuration;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
/**
* Hoodie Flink config options.
*
* <p>It has the options for Hoodie table read and write. It also defines some utilities.
*/
public class FlinkOptions {
private FlinkOptions() {
}
// ------------------------------------------------------------------------
// Base Options
// ------------------------------------------------------------------------
public static final ConfigOption<String> PATH = ConfigOptions
.key("path")
.stringType()
.noDefaultValue()
.withDescription("Base path for the target hoodie table."
+ "\nThe path would be created if it does not exist,\n"
+ "otherwise a Hoodie table expects to be initialized successfully");
// ------------------------------------------------------------------------
// Read Options
// ------------------------------------------------------------------------
public static final ConfigOption<String> READ_SCHEMA_FILE_PATH = ConfigOptions
.key("read.schema.file.path")
.stringType()
.noDefaultValue()
.withDescription("Avro schema file path, the parsed schema is used for deserializing");
// ------------------------------------------------------------------------
// Write Options
// ------------------------------------------------------------------------
public static final ConfigOption<String> TABLE_NAME = ConfigOptions
.key(HoodieWriteConfig.TABLE_NAME)
.stringType()
.noDefaultValue()
.withDescription("Table name to register to Hive metastore");
public static final ConfigOption<String> TABLE_TYPE = ConfigOptions
.key("write.table.type")
.stringType()
.defaultValue(HoodieTableType.COPY_ON_WRITE.name())
.withDescription("Type of table to write, COPY_ON_WRITE (or) MERGE_ON_READ");
public static final ConfigOption<String> OPERATION = ConfigOptions
.key("write.operation")
.stringType()
.defaultValue("upsert")
.withDescription("The write operation, that this write should do");
public static final ConfigOption<String> PRECOMBINE_FIELD = ConfigOptions
.key("write.precombine.field")
.stringType()
.defaultValue("ts")
.withDescription("Field used in preCombining before actual write. When two records have the same\n"
+ "key value, we will pick the one with the largest value for the precombine field,\n"
+ "determined by Object.compareTo(..)");
public static final ConfigOption<String> PAYLOAD_CLASS = ConfigOptions
.key("write.payload.class")
.stringType()
.defaultValue(OverwriteWithLatestAvroPayload.class.getName())
.withDescription("Payload class used. Override this, if you like to roll your own merge logic, when upserting/inserting.\n"
+ "This will render any value set for the option in-effective");
/**
* Flag to indicate whether to drop duplicates upon insert.
* By default insert will accept duplicates, to gain extra performance.
*/
public static final ConfigOption<Boolean> INSERT_DROP_DUPS = ConfigOptions
.key("write.insert.drop.duplicates")
.booleanType()
.defaultValue(false)
.withDescription("Flag to indicate whether to drop duplicates upon insert.\n"
+ "By default insert will accept duplicates, to gain extra performance");
public static final ConfigOption<Integer> RETRY_TIMES = ConfigOptions
.key("write.retry.times")
.intType()
.defaultValue(3)
.withDescription("Flag to indicate how many times streaming job should retry for a failed checkpoint batch.\n"
+ "By default 3");
public static final ConfigOption<Long> RETRY_INTERVAL_MS = ConfigOptions
.key("write.retry.interval.ms")
.longType()
.defaultValue(2000L)
.withDescription("Flag to indicate how long (by millisecond) before a retry should issued for failed checkpoint batch.\n"
+ "By default 2000 and it will be doubled by every retry");
public static final ConfigOption<Boolean> IGNORE_FAILED = ConfigOptions
.key("write.ignore.failed")
.booleanType()
.defaultValue(true)
.withDescription("Flag to indicate whether to ignore any non exception error (e.g. writestatus error). within a checkpoint batch.\n"
+ "By default true (in favor of streaming progressing over data integrity)");
public static final ConfigOption<String> RECORD_KEY_FIELD = ConfigOptions
.key(KeyGeneratorOptions.RECORDKEY_FIELD_OPT_KEY)
.stringType()
.defaultValue("uuid")
.withDescription("Record key field. Value to be used as the `recordKey` component of `HoodieKey`.\n"
+ "Actual value will be obtained by invoking .toString() on the field value. Nested fields can be specified using "
+ "the dot notation eg: `a.b.c`");
public static final ConfigOption<String> PARTITION_PATH_FIELD = ConfigOptions
.key(KeyGeneratorOptions.PARTITIONPATH_FIELD_OPT_KEY)
.stringType()
.defaultValue("partition-path")
.withDescription("Partition path field. Value to be used at the `partitionPath` component of `HoodieKey`.\n"
+ "Actual value obtained by invoking .toString()");
public static final ConfigOption<String> KEYGEN_CLASS = ConfigOptions
.key(HoodieWriteConfig.KEYGENERATOR_CLASS_PROP)
.stringType()
.defaultValue(SimpleAvroKeyGenerator.class.getName())
.withDescription("Key generator class, that implements will extract the key out of incoming record");
public static final ConfigOption<Integer> WRITE_TASK_PARALLELISM = ConfigOptions
.key("write.task.parallelism")
.intType()
.defaultValue(4)
.withDescription("Parallelism of tasks that do actual write, default is 4");
// -------------------------------------------------------------------------
// Utilities
// -------------------------------------------------------------------------
// Remember to update the set when adding new options.
public static final List<ConfigOption<?>> OPTIONAL_OPTIONS = Arrays.asList(
TABLE_TYPE, OPERATION, PRECOMBINE_FIELD, PAYLOAD_CLASS, INSERT_DROP_DUPS, RETRY_TIMES,
RETRY_INTERVAL_MS, IGNORE_FAILED, RECORD_KEY_FIELD, PARTITION_PATH_FIELD, KEYGEN_CLASS
);
// Prefix for Hoodie specific properties.
private static final String PROPERTIES_PREFIX = "properties.";
/**
* Transforms a {@code HoodieFlinkStreamer.Config} into {@code Configuration}.
* The latter is more suitable for the table APIs. It reads all the properties
* in the properties file (set by `--props` option) and cmd line options
* (set by `--hoodie-conf` option).
*/
@SuppressWarnings("unchecked, rawtypes")
public static org.apache.flink.configuration.Configuration fromStreamerConfig(FlinkStreamerConfig config) {
Map<String, String> propsMap = new HashMap<String, String>((Map) StreamerUtil.getProps(config));
org.apache.flink.configuration.Configuration conf = fromMap(propsMap);
conf.setString(FlinkOptions.PATH, config.targetBasePath);
conf.setString(READ_SCHEMA_FILE_PATH, config.readSchemaFilePath);
conf.setString(FlinkOptions.TABLE_NAME, config.targetTableName);
// copy_on_write works same as COPY_ON_WRITE
conf.setString(FlinkOptions.TABLE_TYPE, config.tableType.toUpperCase());
conf.setString(FlinkOptions.OPERATION, config.operation.value());
conf.setString(FlinkOptions.PRECOMBINE_FIELD, config.sourceOrderingField);
conf.setString(FlinkOptions.PAYLOAD_CLASS, config.payloadClassName);
conf.setBoolean(FlinkOptions.INSERT_DROP_DUPS, config.filterDupes);
conf.setInteger(FlinkOptions.RETRY_TIMES, Integer.parseInt(config.instantRetryTimes));
conf.setLong(FlinkOptions.RETRY_INTERVAL_MS, Long.parseLong(config.instantRetryInterval));
conf.setBoolean(FlinkOptions.IGNORE_FAILED, config.commitOnErrors);
conf.setString(FlinkOptions.RECORD_KEY_FIELD, config.recordKeyField);
conf.setString(FlinkOptions.PARTITION_PATH_FIELD, config.partitionPathField);
conf.setString(FlinkOptions.KEYGEN_CLASS, config.keygenClass);
conf.setInteger(FlinkOptions.WRITE_TASK_PARALLELISM, config.writeTaskNum);
return conf;
}
/**
* Collects the config options that start with 'properties.' into a 'key'='value' list.
*/
public static Map<String, String> getHoodieProperties(Map<String, String> options) {
final Map<String, String> hoodieProperties = new HashMap<>();
if (hasPropertyOptions(options)) {
options.keySet().stream()
.filter(key -> key.startsWith(PROPERTIES_PREFIX))
.forEach(key -> {
final String value = options.get(key);
final String subKey = key.substring((PROPERTIES_PREFIX).length());
hoodieProperties.put(subKey, value);
});
}
return hoodieProperties;
}
/**
* Collects all the config options, the 'properties.' prefix would be removed if the option key starts with it.
*/
public static Configuration flatOptions(Configuration conf) {
final Map<String, String> propsMap = new HashMap<>();
conf.toMap().forEach((key, value) -> {
final String subKey = key.startsWith(PROPERTIES_PREFIX)
? key.substring((PROPERTIES_PREFIX).length())
: key;
propsMap.put(subKey, value);
});
return fromMap(propsMap);
}
private static boolean hasPropertyOptions(Map<String, String> options) {
return options.keySet().stream().anyMatch(k -> k.startsWith(PROPERTIES_PREFIX));
}
/** Creates a new configuration that is initialized with the options of the given map. */
private static Configuration fromMap(Map<String, String> map) {
final Configuration configuration = new Configuration();
map.forEach(configuration::setString);
return configuration;
}
}

View File

@@ -18,7 +18,7 @@
package org.apache.hudi.operator;
import org.apache.hudi.HoodieFlinkStreamer;
import org.apache.hudi.streamer.FlinkStreamerConfig;
import org.apache.hudi.client.FlinkTaskContextSupplier;
import org.apache.hudi.client.HoodieFlinkWriteClient;
import org.apache.hudi.client.common.HoodieFlinkEngineContext;
@@ -66,7 +66,7 @@ public class InstantGenerateOperator extends AbstractStreamOperator<HoodieRecord
private static final Logger LOG = LoggerFactory.getLogger(InstantGenerateOperator.class);
public static final String NAME = "InstantGenerateOperator";
private HoodieFlinkStreamer.Config cfg;
private FlinkStreamerConfig cfg;
private HoodieFlinkWriteClient writeClient;
private SerializableConfiguration serializableHadoopConf;
private transient FileSystem fs;
@@ -94,13 +94,13 @@ public class InstantGenerateOperator extends AbstractStreamOperator<HoodieRecord
public void open() throws Exception {
super.open();
// get configs from runtimeContext
cfg = (HoodieFlinkStreamer.Config) runtimeContext.getExecutionConfig().getGlobalJobParameters();
cfg = (FlinkStreamerConfig) getRuntimeContext().getExecutionConfig().getGlobalJobParameters();
// retry times
retryTimes = Integer.valueOf(cfg.blockRetryTime);
retryTimes = Integer.valueOf(cfg.instantRetryTimes);
// retry interval
retryInterval = Integer.valueOf(cfg.blockRetryInterval);
retryInterval = Integer.valueOf(cfg.instantRetryInterval);
// hadoopConf
serializableHadoopConf = new SerializableConfiguration(StreamerUtil.getHadoopConf());

View File

@@ -18,7 +18,7 @@
package org.apache.hudi.operator;
import org.apache.hudi.HoodieFlinkStreamer;
import org.apache.hudi.streamer.FlinkStreamerConfig;
import org.apache.hudi.client.FlinkTaskContextSupplier;
import org.apache.hudi.client.HoodieFlinkWriteClient;
import org.apache.hudi.client.WriteStatus;
@@ -77,7 +77,7 @@ public class KeyedWriteProcessFunction extends KeyedProcessFunction<String, Hood
/**
* Job conf.
*/
private HoodieFlinkStreamer.Config cfg;
private FlinkStreamerConfig cfg;
/**
* Write Client.
@@ -90,7 +90,7 @@ public class KeyedWriteProcessFunction extends KeyedProcessFunction<String, Hood
indexOfThisSubtask = getRuntimeContext().getIndexOfThisSubtask();
cfg = (HoodieFlinkStreamer.Config) getRuntimeContext().getExecutionConfig().getGlobalJobParameters();
cfg = (FlinkStreamerConfig) getRuntimeContext().getExecutionConfig().getGlobalJobParameters();
HoodieFlinkEngineContext context =
new HoodieFlinkEngineContext(new SerializableConfiguration(new org.apache.hadoop.conf.Configuration()), new FlinkTaskContextSupplier(getRuntimeContext()));

View File

@@ -0,0 +1,313 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.operator;
import org.apache.hudi.avro.HoodieAvroUtils;
import org.apache.hudi.client.FlinkTaskContextSupplier;
import org.apache.hudi.client.HoodieFlinkWriteClient;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.client.common.HoodieFlinkEngineContext;
import org.apache.hudi.common.config.SerializableConfiguration;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.keygen.KeyGenerator;
import org.apache.hudi.operator.event.BatchWriteSuccessEvent;
import org.apache.hudi.util.RowDataToAvroConverters;
import org.apache.hudi.util.StreamerUtil;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.operators.coordination.OperatorEventGateway;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.util.Collector;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.BiFunction;
/**
* Sink function to write the data to the underneath filesystem.
*
* <p><h2>Work Flow</h2>
*
* <p>The function firstly buffers the data as a batch of {@link HoodieRecord}s,
* It flushes(write) the records batch when a Flink checkpoint starts. After a batch has been written successfully,
* the function notifies its operator coordinator {@link StreamWriteOperatorCoordinator} to mark a successful write.
*
* <p><h2>Exactly-once Semantics</h2>
*
* <p>The task implements exactly-once semantics by buffering the data between checkpoints. The operator coordinator
* starts a new instant on the time line when a checkpoint triggers, the coordinator checkpoints always
* start before its operator, so when this function starts a checkpoint, a REQUESTED instant already exists.
* The function process thread then block data buffering and the checkpoint thread starts flushing the existing data buffer.
* When the existing data buffer write successfully, the process thread unblock and start buffering again for the next round checkpoint.
* Because any checkpoint failures would trigger the write rollback, it implements the exactly-once semantics.
*
* <p><h2>Fault Tolerance</h2>
*
* <p>The operator coordinator checks the validity for the last instant when it starts a new one. The operator rolls back
* the written data and throws when any error occurs. This means any checkpoint or task failure would trigger a failover.
* The operator coordinator would try several times when committing the writestatus.
*
* <p>Note: The function task requires the input stream be partitioned by the partition fields to avoid different write tasks
* write to the same file group that conflict. The general case for partition path is a datetime field,
* so the sink task is very possible to have IO bottleneck, the more flexible solution is to shuffle the
* data by the file group IDs.
*
* @param <I> Type of the input record
* @see StreamWriteOperatorCoordinator
*/
public class StreamWriteFunction<K, I, O> extends KeyedProcessFunction<K, I, O> implements CheckpointedFunction {
private static final long serialVersionUID = 1L;
private static final Logger LOG = LoggerFactory.getLogger(StreamWriteFunction.class);
/**
* Write buffer for a checkpoint.
*/
private transient List<HoodieRecord> buffer;
/**
* The buffer lock to control data buffering/flushing.
*/
private transient ReentrantLock bufferLock;
/**
* The condition to decide whether to add new records into the buffer.
*/
private transient Condition addToBufferCondition;
/**
* Flag saying whether there is an on-going checkpoint.
*/
private volatile boolean onCheckpointing = false;
/**
* Config options.
*/
private final Configuration config;
/**
* Id of current subtask.
*/
private int taskID;
/**
* Write Client.
*/
private transient HoodieFlinkWriteClient writeClient;
private transient BiFunction<List<HoodieRecord>, String, List<WriteStatus>> writeFunction;
/**
* HoodieKey generator.
*/
private transient KeyGenerator keyGenerator;
/**
* Row type of the input.
*/
private final RowType rowType;
/**
* Avro schema of the input.
*/
private final Schema avroSchema;
private transient RowDataToAvroConverters.RowDataToAvroConverter converter;
/**
* The REQUESTED instant we write the data.
*/
private volatile String currentInstant;
/**
* Gateway to send operator events to the operator coordinator.
*/
private transient OperatorEventGateway eventGateway;
/**
* Constructs a StreamingSinkFunction.
*
* @param rowType The input row type
* @param config The config options
*/
public StreamWriteFunction(RowType rowType, Configuration config) {
this.rowType = rowType;
this.avroSchema = StreamerUtil.getSourceSchema(config);
this.config = config;
}
@Override
public void open(Configuration parameters) throws IOException {
this.taskID = getRuntimeContext().getIndexOfThisSubtask();
this.keyGenerator = StreamerUtil.createKeyGenerator(FlinkOptions.flatOptions(this.config));
this.converter = RowDataToAvroConverters.createConverter(this.rowType);
initBuffer();
initWriteClient();
initWriteFunction();
}
@Override
public void initializeState(FunctionInitializationContext context) {
// no operation
}
@Override
public void snapshotState(FunctionSnapshotContext functionSnapshotContext) {
bufferLock.lock();
try {
// Based on the fact that the coordinator starts the checkpoint first,
// it would check the validity.
this.onCheckpointing = true;
this.currentInstant = this.writeClient.getInflightAndRequestedInstant(this.config.get(FlinkOptions.TABLE_TYPE));
Preconditions.checkNotNull(this.currentInstant,
"No inflight instant when flushing data");
// wait for the buffer data flush out and request a new instant
flushBuffer();
// signal the task thread to start buffering
addToBufferCondition.signal();
} finally {
this.onCheckpointing = false;
bufferLock.unlock();
}
}
@Override
public void processElement(I value, KeyedProcessFunction<K, I, O>.Context ctx, Collector<O> out) throws Exception {
bufferLock.lock();
try {
if (onCheckpointing) {
addToBufferCondition.await();
}
this.buffer.add(toHoodieRecord(value));
} finally {
bufferLock.unlock();
}
}
@Override
public void close() {
if (this.writeClient != null) {
this.writeClient.close();
}
}
// -------------------------------------------------------------------------
// Getter/Setter
// -------------------------------------------------------------------------
@VisibleForTesting
@SuppressWarnings("rawtypes")
public List<HoodieRecord> getBuffer() {
return buffer;
}
@VisibleForTesting
@SuppressWarnings("rawtypes")
public HoodieFlinkWriteClient getWriteClient() {
return writeClient;
}
public void setOperatorEventGateway(OperatorEventGateway operatorEventGateway) {
this.eventGateway = operatorEventGateway;
}
// -------------------------------------------------------------------------
// Utilities
// -------------------------------------------------------------------------
private void initBuffer() {
this.buffer = new ArrayList<>();
this.bufferLock = new ReentrantLock();
this.addToBufferCondition = this.bufferLock.newCondition();
}
private void initWriteClient() {
HoodieFlinkEngineContext context =
new HoodieFlinkEngineContext(
new SerializableConfiguration(StreamerUtil.getHadoopConf()),
new FlinkTaskContextSupplier(getRuntimeContext()));
writeClient = new HoodieFlinkWriteClient<>(context, StreamerUtil.getHoodieClientConfig(this.config));
}
private void initWriteFunction() {
final String writeOperation = this.config.get(FlinkOptions.OPERATION);
switch (WriteOperationType.fromValue(writeOperation)) {
case INSERT:
this.writeFunction = (records, instantTime) -> this.writeClient.insert(records, instantTime);
break;
case UPSERT:
this.writeFunction = (records, instantTime) -> this.writeClient.upsert(records, instantTime);
break;
default:
throw new RuntimeException("Unsupported write operation : " + writeOperation);
}
}
/**
* Converts the give record to a {@link HoodieRecord}.
*
* @param record The input record
* @return HoodieRecord based on the configuration
* @throws IOException if error occurs
*/
@SuppressWarnings("rawtypes")
private HoodieRecord toHoodieRecord(I record) throws IOException {
boolean shouldCombine = this.config.getBoolean(FlinkOptions.INSERT_DROP_DUPS)
|| WriteOperationType.fromValue(this.config.getString(FlinkOptions.OPERATION)) == WriteOperationType.UPSERT;
GenericRecord gr = (GenericRecord) this.converter.convert(this.avroSchema, record);
final String payloadClazz = this.config.getString(FlinkOptions.PAYLOAD_CLASS);
Comparable orderingVal = (Comparable) HoodieAvroUtils.getNestedFieldVal(gr,
this.config.getString(FlinkOptions.PRECOMBINE_FIELD), false);
HoodieRecordPayload payload = shouldCombine
? StreamerUtil.createPayload(payloadClazz, gr, orderingVal)
: StreamerUtil.createPayload(payloadClazz, gr);
return new HoodieRecord<>(keyGenerator.getKey(gr), payload);
}
private void flushBuffer() {
final List<WriteStatus> writeStatus;
if (buffer.size() > 0) {
writeStatus = writeFunction.apply(buffer, currentInstant);
buffer.clear();
} else {
LOG.info("No data to write in subtask [{}] for instant [{}]", taskID, currentInstant);
writeStatus = Collections.emptyList();
}
this.eventGateway.sendEventToCoordinator(new BatchWriteSuccessEvent(this.taskID, currentInstant, writeStatus));
this.currentInstant = "";
}
}

View File

@@ -0,0 +1,52 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.operator;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.operators.coordination.OperatorEvent;
import org.apache.flink.runtime.operators.coordination.OperatorEventGateway;
import org.apache.flink.runtime.operators.coordination.OperatorEventHandler;
import org.apache.flink.streaming.api.operators.KeyedProcessOperator;
import org.apache.flink.streaming.api.operators.StreamSink;
import org.apache.flink.table.types.logical.RowType;
/**
* Operator for {@link StreamSink}.
*
* @param <I> The input type
*/
public class StreamWriteOperator<I>
extends KeyedProcessOperator<Object, I, Object>
implements OperatorEventHandler {
private final StreamWriteFunction<Object, I, Object> sinkFunction;
public StreamWriteOperator(RowType rowType, Configuration conf) {
super(new StreamWriteFunction<>(rowType, conf));
this.sinkFunction = (StreamWriteFunction<Object, I, Object>) getUserFunction();
}
@Override
public void handleOperatorEvent(OperatorEvent operatorEvent) {
// do nothing
}
void setOperatorEventGateway(OperatorEventGateway operatorEventGateway) {
sinkFunction.setOperatorEventGateway(operatorEventGateway);
}
}

View File

@@ -0,0 +1,419 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.operator;
import org.apache.hudi.client.FlinkTaskContextSupplier;
import org.apache.hudi.client.HoodieFlinkWriteClient;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.client.common.HoodieFlinkEngineContext;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.operator.event.BatchWriteSuccessEvent;
import org.apache.hudi.util.StreamerUtil;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.memory.DataInputViewStreamWrapper;
import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
import org.apache.flink.runtime.operators.coordination.OperatorEvent;
import org.apache.flink.util.Preconditions;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.jetbrains.annotations.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
/**
* {@link OperatorCoordinator} for {@link StreamWriteFunction}.
*
* <p>This coordinator starts a new instant when a new checkpoint starts. It commits the instant when all the
* operator tasks write the buffer successfully for a round of checkpoint.
*
* <p>If there is no data for a round of checkpointing, it rolls back the metadata.
*
* @see StreamWriteFunction for the work flow and semantics
*/
public class StreamWriteOperatorCoordinator
implements OperatorCoordinator {
private static final Logger LOG = LoggerFactory.getLogger(StreamWriteOperatorCoordinator.class);
/**
* Config options.
*/
private final Configuration conf;
/**
* Write client.
*/
private transient HoodieFlinkWriteClient writeClient;
private long inFlightCheckpoint = -1;
/**
* Current REQUESTED instant, for validation.
*/
private String inFlightInstant = "";
/**
* Event buffer for one round of checkpointing. When all the elements are non-null and have the same
* write instant, then the instant succeed and we can commit it.
*/
private transient BatchWriteSuccessEvent[] eventBuffer;
/**
* Task number of the operator.
*/
private final int parallelism;
/**
* Constructs a StreamingSinkOperatorCoordinator.
*
* @param conf The config options
* @param parallelism The operator task number
*/
public StreamWriteOperatorCoordinator(
Configuration conf,
int parallelism) {
this.conf = conf;
this.parallelism = parallelism;
}
@Override
public void start() throws Exception {
// initialize event buffer
reset();
// writeClient
initWriteClient();
// init table, create it if not exists.
initTable();
}
@Override
public void close() {
if (writeClient != null) {
writeClient.close();
}
this.eventBuffer = null;
}
@Override
public void checkpointCoordinator(long checkpointId, CompletableFuture<byte[]> result) {
try {
final String errMsg = "A new checkpoint starts while the last checkpoint buffer"
+ " data has not finish writing, roll back the last write and throw";
checkAndForceCommit(errMsg);
this.inFlightInstant = this.writeClient.startCommit();
this.inFlightCheckpoint = checkpointId;
LOG.info("Create instant [{}], at checkpoint [{}]", this.inFlightInstant, checkpointId);
result.complete(writeCheckpointBytes());
} catch (Throwable throwable) {
// when a checkpoint fails, throws directly.
result.completeExceptionally(
new CompletionException(
String.format("Failed to checkpoint Instant %s for source %s",
this.inFlightInstant, this.getClass().getSimpleName()), throwable));
}
}
@Override
public void checkpointComplete(long checkpointId) {
// start to commit the instant.
checkAndCommitWithRetry();
}
public void notifyCheckpointAborted(long checkpointId) {
Preconditions.checkState(inFlightCheckpoint == checkpointId,
"The aborted checkpoint should always be the last checkpoint");
checkAndForceCommit("The last checkpoint was aborted, roll back the last write and throw");
}
@Override
public void resetToCheckpoint(@Nullable byte[] checkpointData) throws Exception {
if (checkpointData != null) {
// restore when any checkpoint completed
deserializeCheckpointAndRestore(checkpointData);
}
}
@Override
public void handleEventFromOperator(int i, OperatorEvent operatorEvent) {
// no event to handle
Preconditions.checkState(operatorEvent instanceof BatchWriteSuccessEvent,
"The coordinator can only handle BatchWriteSuccessEvent");
BatchWriteSuccessEvent event = (BatchWriteSuccessEvent) operatorEvent;
Preconditions.checkState(event.getInstantTime().equals(this.inFlightInstant),
String.format("Receive an unexpected event for instant %s from task %d",
event.getInstantTime(), event.getTaskID()));
this.eventBuffer[event.getTaskID()] = event;
}
@Override
public void subtaskFailed(int i, @Nullable Throwable throwable) {
// no operation
}
// -------------------------------------------------------------------------
// Utilities
// -------------------------------------------------------------------------
@SuppressWarnings("rawtypes")
private void initWriteClient() {
writeClient = new HoodieFlinkWriteClient(
new HoodieFlinkEngineContext(new FlinkTaskContextSupplier(null)),
StreamerUtil.getHoodieClientConfig(this.conf),
true);
}
private void initTable() throws IOException {
final String basePath = this.conf.getString(FlinkOptions.PATH);
final org.apache.hadoop.conf.Configuration hadoopConf = StreamerUtil.getHadoopConf();
// Hadoop FileSystem
try (FileSystem fs = FSUtils.getFs(basePath, hadoopConf)) {
if (!fs.exists(new Path(basePath, HoodieTableMetaClient.METAFOLDER_NAME))) {
HoodieTableMetaClient.initTableType(
hadoopConf,
basePath,
HoodieTableType.valueOf(this.conf.getString(FlinkOptions.TABLE_TYPE)),
this.conf.getString(FlinkOptions.TABLE_NAME),
"archived",
this.conf.getString(FlinkOptions.PAYLOAD_CLASS),
1);
LOG.info("Table initialized");
} else {
LOG.info("Table [{}/{}] already exists, no need to initialize the table",
basePath, this.conf.getString(FlinkOptions.TABLE_NAME));
}
}
}
static byte[] readBytes(DataInputStream in, int size) throws IOException {
byte[] bytes = new byte[size];
in.readFully(bytes);
return bytes;
}
/**
* Serialize the coordinator state. The current implementation may not be super efficient,
* but it should not matter that much because most of the state should be rather small.
* Large states themselves may already be a problem regardless of how the serialization
* is implemented.
*
* @return A byte array containing the serialized state of the source coordinator.
* @throws IOException When something goes wrong in serialization.
*/
private byte[] writeCheckpointBytes() throws IOException {
try (ByteArrayOutputStream baos = new ByteArrayOutputStream();
DataOutputStream out = new DataOutputViewStreamWrapper(baos)) {
out.writeLong(this.inFlightCheckpoint);
byte[] serializedInstant = this.inFlightInstant.getBytes();
out.writeInt(serializedInstant.length);
out.write(serializedInstant);
out.flush();
return baos.toByteArray();
}
}
/**
* Restore the state of this source coordinator from the state bytes.
*
* @param bytes The checkpoint bytes that was returned from {@link #writeCheckpointBytes()}
* @throws Exception When the deserialization failed.
*/
private void deserializeCheckpointAndRestore(byte[] bytes) throws Exception {
try (ByteArrayInputStream bais = new ByteArrayInputStream(bytes);
DataInputStream in = new DataInputViewStreamWrapper(bais)) {
long checkpointID = in.readLong();
int serializedInstantSize = in.readInt();
byte[] serializedInstant = readBytes(in, serializedInstantSize);
this.inFlightCheckpoint = checkpointID;
this.inFlightInstant = new String(serializedInstant);
}
}
private void reset() {
this.inFlightInstant = "";
this.eventBuffer = new BatchWriteSuccessEvent[this.parallelism];
}
private void checkAndForceCommit(String errMsg) {
if (!checkReady()) {
// forced but still has inflight instant
String inflightInstant = writeClient.getInflightAndRequestedInstant(this.conf.getString(FlinkOptions.TABLE_TYPE));
if (inflightInstant != null) {
assert inflightInstant.equals(this.inFlightInstant);
writeClient.rollback(this.inFlightInstant);
throw new HoodieException(errMsg);
}
if (Arrays.stream(eventBuffer).allMatch(Objects::isNull)) {
// The last checkpoint finished successfully.
return;
}
}
doCommit();
}
private void checkAndCommitWithRetry() {
int retryTimes = this.conf.getInteger(FlinkOptions.RETRY_TIMES);
if (retryTimes < 0) {
retryTimes = 1;
}
long retryIntervalMillis = this.conf.getLong(FlinkOptions.RETRY_INTERVAL_MS);
int tryTimes = 0;
while (tryTimes++ < retryTimes) {
try {
if (!checkReady()) {
// Do not throw if the try times expires but the event buffer are still not ready,
// because we have a force check when next checkpoint starts.
sleepFor(retryIntervalMillis);
continue;
}
doCommit();
return;
} catch (Throwable throwable) {
String cause = throwable.getCause() == null ? "" : throwable.getCause().toString();
LOG.warn("Try to commit the instant {} failed, with times {} and cause {}", this.inFlightInstant, tryTimes, cause);
if (tryTimes == retryTimes) {
throw new HoodieException(throwable);
}
sleepFor(retryIntervalMillis);
}
}
}
/**
* Sleep {@code intervalMillis} milliseconds in current thread.
*/
private void sleepFor(long intervalMillis) {
try {
TimeUnit.MILLISECONDS.sleep(intervalMillis);
} catch (InterruptedException e) {
LOG.error("Thread interrupted while waiting to retry the instant commits");
throw new HoodieException(e);
}
}
/** Checks the buffer is ready to commit. */
private boolean checkReady() {
return Arrays.stream(eventBuffer).allMatch(event ->
event != null && event.getInstantTime().equals(this.inFlightInstant));
}
/** Performs the actual commit action. */
private void doCommit() {
List<WriteStatus> writeResults = Arrays.stream(eventBuffer)
.map(BatchWriteSuccessEvent::getWriteStatuses)
.flatMap(Collection::stream)
.collect(Collectors.toList());
if (writeResults.size() == 0) {
// No data has written, clear the metadata file
this.writeClient.deletePendingInstant(this.conf.getString(FlinkOptions.TABLE_TYPE), this.inFlightInstant);
reset();
return;
}
// commit or rollback
long totalErrorRecords = writeResults.stream().map(WriteStatus::getTotalErrorRecords).reduce(Long::sum).orElse(0L);
long totalRecords = writeResults.stream().map(WriteStatus::getTotalRecords).reduce(Long::sum).orElse(0L);
boolean hasErrors = totalErrorRecords > 0;
if (!hasErrors || this.conf.getBoolean(FlinkOptions.IGNORE_FAILED)) {
HashMap<String, String> checkpointCommitMetadata = new HashMap<>();
if (hasErrors) {
LOG.warn("Some records failed to merge but forcing commit since commitOnErrors set to true. Errors/Total="
+ totalErrorRecords + "/" + totalRecords);
}
boolean success = writeClient.commit(this.inFlightInstant, writeResults, Option.of(checkpointCommitMetadata));
if (success) {
reset();
LOG.info("Commit instant [{}] success!", this.inFlightInstant);
} else {
throw new HoodieException(String.format("Commit instant [%s] failed!", this.inFlightInstant));
}
} else {
LOG.error("Error when writing. Errors/Total=" + totalErrorRecords + "/" + totalRecords);
LOG.error("The first 100 error messages");
writeResults.stream().filter(WriteStatus::hasErrors).limit(100).forEach(ws -> {
LOG.error("Global error for partition path {} and fileID {}: {}",
ws.getGlobalError(), ws.getPartitionPath(), ws.getFileId());
if (ws.getErrors().size() > 0) {
ws.getErrors().forEach((key, value) -> LOG.trace("Error for key:" + key + " and value " + value));
}
});
// Rolls back instant
writeClient.rollback(this.inFlightInstant);
throw new HoodieException(String.format("Commit instant [%s] failed and rolled back !", this.inFlightInstant));
}
}
@VisibleForTesting
public BatchWriteSuccessEvent[] getEventBuffer() {
return eventBuffer;
}
@VisibleForTesting
public String getInFlightInstant() {
return inFlightInstant;
}
/**
* Provider for {@link StreamWriteOperatorCoordinator}.
*/
public static class Provider implements OperatorCoordinator.Provider {
private final OperatorID operatorId;
private final Configuration conf;
private final int numTasks;
public Provider(OperatorID operatorId, Configuration conf, int numTasks) {
this.operatorId = operatorId;
this.conf = conf;
this.numTasks = numTasks;
}
public OperatorID getOperatorId() {
return this.operatorId;
}
public OperatorCoordinator create(Context context) {
return new StreamWriteOperatorCoordinator(this.conf, this.numTasks);
}
}
}

View File

@@ -0,0 +1,77 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.operator;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
import org.apache.flink.runtime.operators.coordination.OperatorEventDispatcher;
import org.apache.flink.streaming.api.operators.CoordinatedOperatorFactory;
import org.apache.flink.streaming.api.operators.OneInputStreamOperatorFactory;
import org.apache.flink.streaming.api.operators.SimpleUdfStreamOperatorFactory;
import org.apache.flink.streaming.api.operators.StreamOperator;
import org.apache.flink.streaming.api.operators.StreamOperatorParameters;
import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
import org.apache.flink.table.types.logical.RowType;
/**
* Factory class for {@link StreamWriteOperator}.
*/
public class StreamWriteOperatorFactory<I>
extends SimpleUdfStreamOperatorFactory<Object>
implements CoordinatedOperatorFactory<Object>, OneInputStreamOperatorFactory<I, Object> {
private static final long serialVersionUID = 1L;
private final StreamWriteOperator<I> operator;
private final Configuration conf;
private final int numTasks;
public StreamWriteOperatorFactory(
RowType rowType,
Configuration conf,
int numTasks) {
super(new StreamWriteOperator<>(rowType, conf));
this.operator = (StreamWriteOperator<I>) getOperator();
this.conf = conf;
this.numTasks = numTasks;
}
@Override
@SuppressWarnings("unchecked")
public <T extends StreamOperator<Object>> T createStreamOperator(StreamOperatorParameters<Object> parameters) {
final OperatorID operatorID = parameters.getStreamConfig().getOperatorID();
final OperatorEventDispatcher eventDispatcher = parameters.getOperatorEventDispatcher();
this.operator.setOperatorEventGateway(eventDispatcher.getOperatorEventGateway(operatorID));
this.operator.setup(parameters.getContainingTask(), parameters.getStreamConfig(), parameters.getOutput());
this.operator.setProcessingTimeService(this.processingTimeService);
eventDispatcher.registerEventHandler(operatorID, operator);
return (T) operator;
}
@Override
public OperatorCoordinator.Provider getCoordinatorProvider(String s, OperatorID operatorID) {
return new StreamWriteOperatorCoordinator.Provider(operatorID, this.conf, this.numTasks);
}
@Override
public void setProcessingTimeService(ProcessingTimeService processingTimeService) {
super.setProcessingTimeService(processingTimeService);
}
}

View File

@@ -0,0 +1,57 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.operator.event;
import org.apache.flink.runtime.operators.coordination.OperatorEvent;
import org.apache.hudi.client.WriteStatus;
import java.util.List;
/**
* An operator event to mark successful checkpoint batch write.
*/
public class BatchWriteSuccessEvent implements OperatorEvent {
private static final long serialVersionUID = 1L;
private final List<WriteStatus> writeStatuses;
private final int taskID;
private final String instantTime;
public BatchWriteSuccessEvent(
int taskID,
String instantTime,
List<WriteStatus> writeStatuses) {
this.taskID = taskID;
this.instantTime = instantTime;
this.writeStatuses = writeStatuses;
}
public List<WriteStatus> getWriteStatuses() {
return writeStatuses;
}
public int getTaskID() {
return taskID;
}
public String getInstantTime() {
return instantTime;
}
}

View File

@@ -21,9 +21,11 @@ package org.apache.hudi.schema;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.operator.FlinkOptions;
import org.apache.hudi.util.StreamerUtil;
import org.apache.avro.Schema;
import org.apache.flink.configuration.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@@ -43,16 +45,13 @@ public class FilebasedSchemaProvider extends SchemaProvider {
private static final String TARGET_SCHEMA_FILE_PROP = "hoodie.deltastreamer.schemaprovider.target.schema.file";
}
private final FileSystem fs;
private final Schema sourceSchema;
private Schema targetSchema;
public FilebasedSchemaProvider(TypedProperties props) {
super(props);
StreamerUtil.checkRequiredProperties(props, Collections.singletonList(Config.SOURCE_SCHEMA_FILE_PROP));
this.fs = FSUtils.getFs(props.getString(Config.SOURCE_SCHEMA_FILE_PROP), StreamerUtil.getHadoopConf());
FileSystem fs = FSUtils.getFs(props.getString(Config.SOURCE_SCHEMA_FILE_PROP), StreamerUtil.getHadoopConf());
try {
this.sourceSchema = new Schema.Parser().parse(fs.open(new Path(props.getString(Config.SOURCE_SCHEMA_FILE_PROP))));
if (props.containsKey(Config.TARGET_SCHEMA_FILE_PROP)) {
@@ -64,6 +63,16 @@ public class FilebasedSchemaProvider extends SchemaProvider {
}
}
public FilebasedSchemaProvider(Configuration conf) {
final String readSchemaPath = conf.getString(FlinkOptions.READ_SCHEMA_FILE_PATH);
final FileSystem fs = FSUtils.getFs(readSchemaPath, StreamerUtil.getHadoopConf());
try {
this.sourceSchema = new Schema.Parser().parse(fs.open(new Path(readSchemaPath)));
} catch (IOException ioe) {
throw new HoodieIOException("Error reading schema", ioe);
}
}
@Override
public Schema getSourceSchema() {
return sourceSchema;

View File

@@ -18,8 +18,6 @@
package org.apache.hudi.schema;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.avro.Schema;
import java.io.Serializable;
@@ -29,12 +27,6 @@ import java.io.Serializable;
*/
public abstract class SchemaProvider implements Serializable {
protected TypedProperties config;
protected SchemaProvider(TypedProperties props) {
this.config = props;
}
public abstract Schema getSourceSchema();
public Schema getTargetSchema() {

View File

@@ -25,7 +25,7 @@ import org.apache.hudi.client.common.HoodieFlinkEngineContext;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieFlinkStreamerException;
import org.apache.hudi.HoodieFlinkStreamer;
import org.apache.hudi.streamer.FlinkStreamerConfig;
import org.apache.hudi.util.StreamerUtil;
import org.apache.flink.api.java.tuple.Tuple3;
@@ -51,7 +51,7 @@ public class CommitSink extends RichSinkFunction<Tuple3<String, List<WriteStatus
/**
* Job conf.
*/
private HoodieFlinkStreamer.Config cfg;
private FlinkStreamerConfig cfg;
/**
* Write client.
@@ -72,7 +72,7 @@ public class CommitSink extends RichSinkFunction<Tuple3<String, List<WriteStatus
public void open(Configuration parameters) throws Exception {
super.open(parameters);
// Get configs from runtimeContext
cfg = (HoodieFlinkStreamer.Config) getRuntimeContext().getExecutionConfig().getGlobalJobParameters();
cfg = (FlinkStreamerConfig) getRuntimeContext().getExecutionConfig().getGlobalJobParameters();
writeParallelSize = getRuntimeContext().getExecutionConfig().getParallelism();

View File

@@ -22,10 +22,10 @@ import org.apache.hudi.avro.HoodieAvroUtils;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieFlinkStreamerException;
import org.apache.hudi.common.model.OverwriteWithLatestAvroPayload;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieFlinkStreamerException;
import org.apache.hudi.keygen.KeyGenerator;
import org.apache.hudi.keygen.SimpleAvroKeyGenerator;
import org.apache.hudi.schema.FilebasedSchemaProvider;

View File

@@ -0,0 +1,124 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.streamer;
import org.apache.hudi.common.model.OverwriteWithLatestAvroPayload;
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.keygen.SimpleAvroKeyGenerator;
import com.beust.jcommander.Parameter;
import org.apache.flink.configuration.Configuration;
import java.util.ArrayList;
import java.util.List;
/**
* Configurations for Hoodie Flink streamer.
*/
public class FlinkStreamerConfig extends Configuration {
@Parameter(names = {"--kafka-topic"}, description = "Kafka topic name.", required = true)
public String kafkaTopic;
@Parameter(names = {"--kafka-group-id"}, description = "Kafka consumer group id.", required = true)
public String kafkaGroupId;
@Parameter(names = {"--kafka-bootstrap-servers"}, description = "Kafka bootstrap.servers.", required = true)
public String kafkaBootstrapServers;
@Parameter(names = {"--flink-checkpoint-path"}, description = "Flink checkpoint path.")
public String flinkCheckPointPath;
@Parameter(names = {"--instant-retry-times"}, description = "Times to retry when latest instant has not completed.")
public String instantRetryTimes = "10";
@Parameter(names = {"--instant-retry-interval"}, description = "Seconds between two tries when latest instant has not completed.")
public String instantRetryInterval = "1";
@Parameter(names = {"--target-base-path"},
description = "Base path for the target hoodie table. "
+ "(Will be created if did not exist first time around. If exists, expected to be a hoodie table).",
required = true)
public String targetBasePath;
@Parameter(names = {"--read-schema-path"},
description = "Avro schema file path, the parsed schema is used for deserializing.",
required = true)
public String readSchemaFilePath;
@Parameter(names = {"--target-table"}, description = "Name of the target table in Hive.", required = true)
public String targetTableName;
@Parameter(names = {"--table-type"}, description = "Type of table. COPY_ON_WRITE (or) MERGE_ON_READ.", required = true)
public String tableType;
@Parameter(names = {"--props"}, description = "Path to properties file on localfs or dfs, with configurations for "
+ "hoodie client, schema provider, key generator and data source. For hoodie client props, sane defaults are "
+ "used, but recommend use to provide basic things like metrics endpoints, hive configs etc. For sources, refer"
+ "to individual classes, for supported properties.")
public String propsFilePath =
"file://" + System.getProperty("user.dir") + "/src/test/resources/delta-streamer-config/dfs-source.properties";
@Parameter(names = {"--hoodie-conf"}, description = "Any configuration that can be set in the properties file "
+ "(using the CLI parameter \"--props\") can also be passed command line using this parameter.")
public List<String> configs = new ArrayList<>();
@Parameter(names = {"--record-key-field"}, description = "Record key field. Value to be used as the `recordKey` component of `HoodieKey`.\n"
+ "Actual value will be obtained by invoking .toString() on the field value. Nested fields can be specified using "
+ "the dot notation eg: `a.b.c`. By default `uuid`.")
public String recordKeyField = "uuid";
@Parameter(names = {"--partition-path-field"}, description = "Partition path field. Value to be used at \n"
+ "the `partitionPath` component of `HoodieKey`. Actual value obtained by invoking .toString(). By default `partitionpath`.")
public String partitionPathField = "partitionpath";
@Parameter(names = {"--keygen-class"}, description = "Key generator class, that implements will extract the key out of incoming record.\n"
+ "By default `SimpleAvroKeyGenerator`.")
public String keygenClass = SimpleAvroKeyGenerator.class.getName();
@Parameter(names = {"--source-ordering-field"}, description = "Field within source record to decide how"
+ " to break ties between records with same key in input data. Default: 'ts' holding unix timestamp of record.")
public String sourceOrderingField = "ts";
@Parameter(names = {"--payload-class"}, description = "Subclass of HoodieRecordPayload, that works off "
+ "a GenericRecord. Implement your own, if you want to do something other than overwriting existing value.")
public String payloadClassName = OverwriteWithLatestAvroPayload.class.getName();
@Parameter(names = {"--op"}, description = "Takes one of these values : UPSERT (default), INSERT (use when input "
+ "is purely new data/inserts to gain speed).", converter = OperationConverter.class)
public WriteOperationType operation = WriteOperationType.UPSERT;
@Parameter(names = {"--filter-dupes"},
description = "Should duplicate records from source be dropped/filtered out before insert/bulk-insert.")
public Boolean filterDupes = false;
@Parameter(names = {"--commit-on-errors"}, description = "Commit even when some records failed to be written.")
public Boolean commitOnErrors = false;
/**
* Flink checkpoint interval.
*/
@Parameter(names = {"--checkpoint-interval"}, description = "Flink checkpoint interval.")
public Long checkpointInterval = 1000 * 5L;
@Parameter(names = {"--help", "-h"}, help = true)
public Boolean help = false;
@Parameter(names = {"--write-task-num"}, description = "Parallelism of tasks that do actual write, default is 4.")
public Integer writeTaskNum = 4;
}

View File

@@ -0,0 +1,105 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.streamer;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.operator.InstantGenerateOperator;
import org.apache.hudi.operator.KeyedWriteProcessFunction;
import org.apache.hudi.operator.KeyedWriteProcessOperator;
import org.apache.hudi.sink.CommitSink;
import org.apache.hudi.source.JsonStringToHoodieRecordMapFunction;
import org.apache.hudi.util.StreamerUtil;
import com.beust.jcommander.JCommander;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import java.util.List;
import java.util.Objects;
/**
* An Utility which can incrementally consume data from Kafka and apply it to the target table.
* currently, it only support COW table and insert, upsert operation.
*/
public class HoodieFlinkStreamer {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
final FlinkStreamerConfig cfg = new FlinkStreamerConfig();
JCommander cmd = new JCommander(cfg, null, args);
if (cfg.help || args.length == 0) {
cmd.usage();
System.exit(1);
}
env.enableCheckpointing(cfg.checkpointInterval);
env.getConfig().setGlobalJobParameters(cfg);
// We use checkpoint to trigger write operation, including instant generating and committing,
// There can only be one checkpoint at one time.
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
if (cfg.flinkCheckPointPath != null) {
env.setStateBackend(new FsStateBackend(cfg.flinkCheckPointPath));
}
TypedProperties props = StreamerUtil.appendKafkaProps(cfg);
// add data source config
props.put(HoodieWriteConfig.WRITE_PAYLOAD_CLASS, cfg.payloadClassName);
props.put(HoodieWriteConfig.PRECOMBINE_FIELD_PROP, cfg.sourceOrderingField);
// Read from kafka source
DataStream<HoodieRecord> inputRecords =
env.addSource(new FlinkKafkaConsumer<>(cfg.kafkaTopic, new SimpleStringSchema(), props))
.filter(Objects::nonNull)
.map(new JsonStringToHoodieRecordMapFunction(props))
.name("kafka_to_hudi_record")
.uid("kafka_to_hudi_record_uid");
inputRecords.transform(InstantGenerateOperator.NAME, TypeInformation.of(HoodieRecord.class), new InstantGenerateOperator())
.name("instant_generator")
.uid("instant_generator_id")
// Keyby partition path, to avoid multiple subtasks writing to a partition at the same time
.keyBy(HoodieRecord::getPartitionPath)
// write operator, where the write operation really happens
.transform(KeyedWriteProcessOperator.NAME, TypeInformation.of(new TypeHint<Tuple3<String, List<WriteStatus>, Integer>>() {
}), new KeyedWriteProcessOperator(new KeyedWriteProcessFunction()))
.name("write_process")
.uid("write_process_uid")
.setParallelism(env.getParallelism())
// Commit can only be executed once, so make it one parallelism
.addSink(new CommitSink())
.name("commit_sink")
.uid("commit_sink_uid")
.setParallelism(1);
env.execute(cfg.targetTableName);
}
}

View File

@@ -0,0 +1,102 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.streamer;
import org.apache.hudi.operator.FlinkOptions;
import org.apache.hudi.operator.StreamWriteOperatorFactory;
import org.apache.hudi.util.AvroSchemaConverter;
import org.apache.hudi.util.StreamerUtil;
import com.beust.jcommander.JCommander;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.formats.json.JsonRowDataDeserializationSchema;
import org.apache.flink.formats.json.TimestampFormat;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.runtime.typeutils.RowDataTypeInfo;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.RowType;
import java.util.Properties;
/**
* An Utility which can incrementally consume data from Kafka and apply it to the target table.
* currently, it only support COW table and insert, upsert operation.
*/
public class HoodieFlinkStreamerV2 {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
final FlinkStreamerConfig cfg = new FlinkStreamerConfig();
JCommander cmd = new JCommander(cfg, null, args);
if (cfg.help || args.length == 0) {
cmd.usage();
System.exit(1);
}
env.enableCheckpointing(cfg.checkpointInterval);
env.getConfig().setGlobalJobParameters(cfg);
// We use checkpoint to trigger write operation, including instant generating and committing,
// There can only be one checkpoint at one time.
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
if (cfg.flinkCheckPointPath != null) {
env.setStateBackend(new FsStateBackend(cfg.flinkCheckPointPath));
}
Properties kafkaProps = StreamerUtil.appendKafkaProps(cfg);
// Read from kafka source
RowType rowType =
(RowType) AvroSchemaConverter.convertToDataType(StreamerUtil.getSourceSchema(cfg))
.getLogicalType();
Configuration conf = FlinkOptions.fromStreamerConfig(cfg);
int numWriteTask = conf.getInteger(FlinkOptions.WRITE_TASK_PARALLELISM);
StreamWriteOperatorFactory<RowData> operatorFactory =
new StreamWriteOperatorFactory<>(rowType, conf, numWriteTask);
int partitionFieldIndex = rowType.getFieldIndex(conf.getString(FlinkOptions.PARTITION_PATH_FIELD));
LogicalType partitionFieldType = rowType.getTypeAt(partitionFieldIndex);
final RowData.FieldGetter partitionFieldGetter =
RowData.createFieldGetter(partitionFieldType, partitionFieldIndex);
DataStream<Object> dataStream = env.addSource(new FlinkKafkaConsumer<>(
cfg.kafkaTopic,
new JsonRowDataDeserializationSchema(
rowType,
new RowDataTypeInfo(rowType),
false,
true,
TimestampFormat.ISO_8601
), kafkaProps))
.name("kafka_source")
.uid("uid_kafka_source")
// Key-by partition path, to avoid multiple subtasks write to a partition at the same time
.keyBy(partitionFieldGetter::getFieldOrNull)
.transform("hoodie_stream_write", null, operatorFactory)
.uid("uid_hoodie_stream_write")
.setParallelism(numWriteTask); // should make it configurable
env.addOperator(dataStream.getTransformation());
env.execute(cfg.targetTableName);
}
}

View File

@@ -0,0 +1,34 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.streamer;
import org.apache.hudi.common.model.WriteOperationType;
import com.beust.jcommander.IStringConverter;
import com.beust.jcommander.ParameterException;
/**
* Converter that converts a string into enum WriteOperationType.
*/
public class OperationConverter implements IStringConverter<WriteOperationType> {
@Override
public WriteOperationType convert(String value) throws ParameterException {
return WriteOperationType.valueOf(value);
}
}

View File

@@ -0,0 +1,147 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.util;
import org.apache.avro.LogicalTypes;
import org.apache.avro.Schema;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.types.AtomicDataType;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.TypeInformationRawType;
import java.util.List;
/**
* Converts an Avro schema into Flink's type information. It uses {@link org.apache.flink.api.java.typeutils.RowTypeInfo} for
* representing objects and converts Avro types into types that are compatible with Flink's Table &
* SQL API.
*
* <p>Note: Changes in this class need to be kept in sync with the corresponding runtime classes
* {@link org.apache.flink.formats.avro.AvroRowDeserializationSchema} and {@link org.apache.flink.formats.avro.AvroRowSerializationSchema}.
*
* <p><p>NOTE: reference from Flink release 1.12.0, should remove when Flink version upgrade to that.
*/
public class AvroSchemaConverter {
/**
* Converts an Avro schema {@code schema} into a nested row structure with deterministic field order and
* data types that are compatible with Flink's Table & SQL API.
*
* @param schema Avro schema definition
* @return data type matching the schema
*/
public static DataType convertToDataType(Schema schema) {
switch (schema.getType()) {
case RECORD:
final List<Schema.Field> schemaFields = schema.getFields();
final DataTypes.Field[] fields = new DataTypes.Field[schemaFields.size()];
for (int i = 0; i < schemaFields.size(); i++) {
final Schema.Field field = schemaFields.get(i);
fields[i] = DataTypes.FIELD(field.name(), convertToDataType(field.schema()));
}
return DataTypes.ROW(fields).notNull();
case ENUM:
return DataTypes.STRING().notNull();
case ARRAY:
return DataTypes.ARRAY(convertToDataType(schema.getElementType())).notNull();
case MAP:
return DataTypes.MAP(
DataTypes.STRING().notNull(),
convertToDataType(schema.getValueType()))
.notNull();
case UNION:
final Schema actualSchema;
final boolean nullable;
if (schema.getTypes().size() == 2
&& schema.getTypes().get(0).getType() == Schema.Type.NULL) {
actualSchema = schema.getTypes().get(1);
nullable = true;
} else if (schema.getTypes().size() == 2
&& schema.getTypes().get(1).getType() == Schema.Type.NULL) {
actualSchema = schema.getTypes().get(0);
nullable = true;
} else if (schema.getTypes().size() == 1) {
actualSchema = schema.getTypes().get(0);
nullable = false;
} else {
// use Kryo for serialization
return new AtomicDataType(
new TypeInformationRawType<>(false, Types.GENERIC(Object.class)));
}
DataType converted = convertToDataType(actualSchema);
return nullable ? converted.nullable() : converted;
case FIXED:
// logical decimal type
if (schema.getLogicalType() instanceof LogicalTypes.Decimal) {
final LogicalTypes.Decimal decimalType =
(LogicalTypes.Decimal) schema.getLogicalType();
return DataTypes.DECIMAL(decimalType.getPrecision(), decimalType.getScale())
.notNull();
}
// convert fixed size binary data to primitive byte arrays
return DataTypes.VARBINARY(schema.getFixedSize()).notNull();
case STRING:
// convert Avro's Utf8/CharSequence to String
return DataTypes.STRING().notNull();
case BYTES:
// logical decimal type
if (schema.getLogicalType() instanceof LogicalTypes.Decimal) {
final LogicalTypes.Decimal decimalType =
(LogicalTypes.Decimal) schema.getLogicalType();
return DataTypes.DECIMAL(decimalType.getPrecision(), decimalType.getScale())
.notNull();
}
return DataTypes.BYTES().notNull();
case INT:
// logical date and time type
final org.apache.avro.LogicalType logicalType = schema.getLogicalType();
if (logicalType == LogicalTypes.date()) {
return DataTypes.DATE().notNull();
} else if (logicalType == LogicalTypes.timeMillis()) {
return DataTypes.TIME(3).notNull();
}
return DataTypes.INT().notNull();
case LONG:
// logical timestamp type
if (schema.getLogicalType() == LogicalTypes.timestampMillis()) {
return DataTypes.TIMESTAMP(3).notNull();
} else if (schema.getLogicalType() == LogicalTypes.timestampMicros()) {
return DataTypes.TIMESTAMP(6).notNull();
} else if (schema.getLogicalType() == LogicalTypes.timeMillis()) {
return DataTypes.TIME(3).notNull();
} else if (schema.getLogicalType() == LogicalTypes.timeMicros()) {
return DataTypes.TIME(6).notNull();
}
return DataTypes.BIGINT().notNull();
case FLOAT:
return DataTypes.FLOAT().notNull();
case DOUBLE:
return DataTypes.DOUBLE().notNull();
case BOOLEAN:
return DataTypes.BOOLEAN().notNull();
case NULL:
return DataTypes.NULL();
default:
throw new IllegalArgumentException("Unsupported Avro type '" + schema.getType() + "'.");
}
}
}

View File

@@ -0,0 +1,309 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.util;
import org.apache.flink.annotation.Internal;
import org.apache.flink.table.data.ArrayData;
import org.apache.flink.table.data.DecimalData;
import org.apache.flink.table.data.MapData;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.TimestampData;
import org.apache.flink.table.types.logical.ArrayType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.RowType;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.util.Utf8;
import java.io.Serializable;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import static org.apache.flink.formats.avro.typeutils.AvroSchemaConverter.extractValueTypeToAvroMap;
/**
* Tool class used to convert from {@link RowData} to Avro {@link GenericRecord}.
*
* <p>NOTE: reference from Flink release 1.12.0, should remove when Flink version upgrade to that.
*/
@Internal
public class RowDataToAvroConverters {
// --------------------------------------------------------------------------------
// Runtime Converters
// --------------------------------------------------------------------------------
/**
* Runtime converter that converts objects of Flink Table & SQL internal data structures to
* corresponding Avro data structures.
*/
@FunctionalInterface
public interface RowDataToAvroConverter extends Serializable {
Object convert(Schema schema, Object object);
}
// --------------------------------------------------------------------------------
// IMPORTANT! We use anonymous classes instead of lambdas for a reason here. It is
// necessary because the maven shade plugin cannot relocate classes in
// SerializedLambdas (MSHADE-260). On the other hand we want to relocate Avro for
// sql-client uber jars.
// --------------------------------------------------------------------------------
/**
* Creates a runtime converter accroding to the given logical type that converts objects of
* Flink Table & SQL internal data structures to corresponding Avro data structures.
*/
public static RowDataToAvroConverter createConverter(LogicalType type) {
final RowDataToAvroConverter converter;
switch (type.getTypeRoot()) {
case NULL:
converter =
new RowDataToAvroConverter() {
private static final long serialVersionUID = 1L;
@Override
public Object convert(Schema schema, Object object) {
return null;
}
};
break;
case TINYINT:
converter =
new RowDataToAvroConverter() {
private static final long serialVersionUID = 1L;
@Override
public Object convert(Schema schema, Object object) {
return ((Byte) object).intValue();
}
};
break;
case SMALLINT:
converter =
new RowDataToAvroConverter() {
private static final long serialVersionUID = 1L;
@Override
public Object convert(Schema schema, Object object) {
return ((Short) object).intValue();
}
};
break;
case BOOLEAN: // boolean
case INTEGER: // int
case INTERVAL_YEAR_MONTH: // long
case BIGINT: // long
case INTERVAL_DAY_TIME: // long
case FLOAT: // float
case DOUBLE: // double
case TIME_WITHOUT_TIME_ZONE: // int
case DATE: // int
converter =
new RowDataToAvroConverter() {
private static final long serialVersionUID = 1L;
@Override
public Object convert(Schema schema, Object object) {
return object;
}
};
break;
case CHAR:
case VARCHAR:
converter =
new RowDataToAvroConverter() {
private static final long serialVersionUID = 1L;
@Override
public Object convert(Schema schema, Object object) {
return new Utf8(object.toString());
}
};
break;
case BINARY:
case VARBINARY:
converter =
new RowDataToAvroConverter() {
private static final long serialVersionUID = 1L;
@Override
public Object convert(Schema schema, Object object) {
return ByteBuffer.wrap((byte[]) object);
}
};
break;
case TIMESTAMP_WITHOUT_TIME_ZONE:
converter =
new RowDataToAvroConverter() {
private static final long serialVersionUID = 1L;
@Override
public Object convert(Schema schema, Object object) {
return ((TimestampData) object).toInstant().toEpochMilli();
}
};
break;
case DECIMAL:
converter =
new RowDataToAvroConverter() {
private static final long serialVersionUID = 1L;
@Override
public Object convert(Schema schema, Object object) {
return ByteBuffer.wrap(((DecimalData) object).toUnscaledBytes());
}
};
break;
case ARRAY:
converter = createArrayConverter((ArrayType) type);
break;
case ROW:
converter = createRowConverter((RowType) type);
break;
case MAP:
case MULTISET:
converter = createMapConverter(type);
break;
case RAW:
default:
throw new UnsupportedOperationException("Unsupported type: " + type);
}
// wrap into nullable converter
return new RowDataToAvroConverter() {
private static final long serialVersionUID = 1L;
@Override
public Object convert(Schema schema, Object object) {
if (object == null) {
return null;
}
// get actual schema if it is a nullable schema
Schema actualSchema;
if (schema.getType() == Schema.Type.UNION) {
List<Schema> types = schema.getTypes();
int size = types.size();
if (size == 2 && types.get(1).getType() == Schema.Type.NULL) {
actualSchema = types.get(0);
} else if (size == 2 && types.get(0).getType() == Schema.Type.NULL) {
actualSchema = types.get(1);
} else {
throw new IllegalArgumentException(
"The Avro schema is not a nullable type: " + schema.toString());
}
} else {
actualSchema = schema;
}
return converter.convert(actualSchema, object);
}
};
}
private static RowDataToAvroConverter createRowConverter(RowType rowType) {
final RowDataToAvroConverter[] fieldConverters =
rowType.getChildren().stream()
.map(RowDataToAvroConverters::createConverter)
.toArray(RowDataToAvroConverter[]::new);
final LogicalType[] fieldTypes =
rowType.getFields().stream()
.map(RowType.RowField::getType)
.toArray(LogicalType[]::new);
final RowData.FieldGetter[] fieldGetters = new RowData.FieldGetter[fieldTypes.length];
for (int i = 0; i < fieldTypes.length; i++) {
fieldGetters[i] = RowData.createFieldGetter(fieldTypes[i], i);
}
final int length = rowType.getFieldCount();
return new RowDataToAvroConverter() {
private static final long serialVersionUID = 1L;
@Override
public Object convert(Schema schema, Object object) {
final RowData row = (RowData) object;
final List<Schema.Field> fields = schema.getFields();
final GenericRecord record = new GenericData.Record(schema);
for (int i = 0; i < length; ++i) {
final Schema.Field schemaField = fields.get(i);
Object avroObject =
fieldConverters[i].convert(
schemaField.schema(), fieldGetters[i].getFieldOrNull(row));
record.put(i, avroObject);
}
return record;
}
};
}
private static RowDataToAvroConverter createArrayConverter(ArrayType arrayType) {
LogicalType elementType = arrayType.getElementType();
final ArrayData.ElementGetter elementGetter = ArrayData.createElementGetter(elementType);
final RowDataToAvroConverter elementConverter = createConverter(arrayType.getElementType());
return new RowDataToAvroConverter() {
private static final long serialVersionUID = 1L;
@Override
public Object convert(Schema schema, Object object) {
final Schema elementSchema = schema.getElementType();
ArrayData arrayData = (ArrayData) object;
List<Object> list = new ArrayList<>();
for (int i = 0; i < arrayData.size(); ++i) {
list.add(
elementConverter.convert(
elementSchema, elementGetter.getElementOrNull(arrayData, i)));
}
return list;
}
};
}
private static RowDataToAvroConverter createMapConverter(LogicalType type) {
LogicalType valueType = extractValueTypeToAvroMap(type);
final ArrayData.ElementGetter valueGetter = ArrayData.createElementGetter(valueType);
final RowDataToAvroConverter valueConverter = createConverter(valueType);
return new RowDataToAvroConverter() {
private static final long serialVersionUID = 1L;
@Override
public Object convert(Schema schema, Object object) {
final Schema valueSchema = schema.getValueType();
final MapData mapData = (MapData) object;
final ArrayData keyArray = mapData.keyArray();
final ArrayData valueArray = mapData.valueArray();
final Map<Object, Object> map = new HashMap<>(mapData.size());
for (int i = 0; i < mapData.size(); ++i) {
final String key = keyArray.getString(i).toString();
final Object value =
valueConverter.convert(
valueSchema, valueGetter.getElementOrNull(valueArray, i));
map.put(key, value);
}
return map;
}
};
}
}

View File

@@ -18,47 +18,71 @@
package org.apache.hudi.util;
import org.apache.hudi.HoodieFlinkStreamer;
import org.apache.hudi.keygen.SimpleAvroKeyGenerator;
import org.apache.hudi.streamer.FlinkStreamerConfig;
import org.apache.hudi.common.config.DFSPropertiesConfiguration;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.engine.EngineType;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ReflectionUtils;
import org.apache.hudi.config.HoodieCompactionConfig;
import org.apache.hudi.config.HoodiePayloadConfig;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.exception.HoodieNotSupportedException;
import org.apache.hudi.keygen.KeyGenerator;
import org.apache.hudi.keygen.SimpleAvroKeyGenerator;
import org.apache.hudi.operator.FlinkOptions;
import org.apache.hudi.schema.FilebasedSchemaProvider;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.hadoop.conf.Configuration;
import org.apache.flink.api.java.hadoop.mapred.utils.HadoopUtils;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.util.Preconditions;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.BufferedReader;
import java.io.File;
import java.io.IOException;
import java.io.StringReader;
import java.util.List;
import java.util.Properties;
/**
* Utilities for Flink stream read and write.
*/
public class StreamerUtil {
private static Logger LOG = LoggerFactory.getLogger(StreamerUtil.class);
private static final Logger LOG = LoggerFactory.getLogger(StreamerUtil.class);
public static TypedProperties getProps(HoodieFlinkStreamer.Config cfg) {
public static TypedProperties appendKafkaProps(FlinkStreamerConfig config) {
TypedProperties properties = getProps(config);
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, config.kafkaBootstrapServers);
properties.put(ConsumerConfig.GROUP_ID_CONFIG, config.kafkaGroupId);
return properties;
}
public static TypedProperties getProps(FlinkStreamerConfig cfg) {
return readConfig(
FSUtils.getFs(cfg.propsFilePath, getHadoopConf()),
new Path(cfg.propsFilePath), cfg.configs).getConfig();
}
public static Schema getSourceSchema(FlinkStreamerConfig cfg) {
return new FilebasedSchemaProvider(FlinkOptions.fromStreamerConfig(cfg)).getSourceSchema();
}
public static Schema getSourceSchema(org.apache.flink.configuration.Configuration conf) {
return new FilebasedSchemaProvider(conf).getSourceSchema();
}
/**
* Read conig from files.
* Read config from properties file (`--props` option) and cmd line (`--hoodie-conf` option).
*/
public static DFSPropertiesConfiguration readConfig(FileSystem fs, Path cfgPath, List<String> overriddenProps) {
DFSPropertiesConfiguration conf;
@@ -81,16 +105,50 @@ public class StreamerUtil {
return conf;
}
public static Configuration getHadoopConf() {
return new Configuration();
public static org.apache.hadoop.conf.Configuration getHadoopConf() {
// create hadoop configuration with hadoop conf directory configured.
org.apache.hadoop.conf.Configuration hadoopConf = null;
for (String possibleHadoopConfPath : HadoopUtils.possibleHadoopConfPaths(new Configuration())) {
hadoopConf = getHadoopConfiguration(possibleHadoopConfPath);
if (hadoopConf != null) {
break;
}
}
if (hadoopConf == null) {
hadoopConf = new org.apache.hadoop.conf.Configuration();
}
return hadoopConf;
}
public static void checkRequiredProperties(TypedProperties props, List<String> checkPropNames) {
checkPropNames.forEach(prop -> {
if (!props.containsKey(prop)) {
throw new HoodieNotSupportedException("Required property " + prop + " is missing");
/**
* Returns a new Hadoop Configuration object using the path to the hadoop conf configured.
*
* @param hadoopConfDir Hadoop conf directory path.
* @return A Hadoop configuration instance.
*/
private static org.apache.hadoop.conf.Configuration getHadoopConfiguration(String hadoopConfDir) {
if (new File(hadoopConfDir).exists()) {
org.apache.hadoop.conf.Configuration hadoopConfiguration = new org.apache.hadoop.conf.Configuration();
File coreSite = new File(hadoopConfDir, "core-site.xml");
if (coreSite.exists()) {
hadoopConfiguration.addResource(new Path(coreSite.getAbsolutePath()));
}
});
File hdfsSite = new File(hadoopConfDir, "hdfs-site.xml");
if (hdfsSite.exists()) {
hadoopConfiguration.addResource(new Path(hdfsSite.getAbsolutePath()));
}
File yarnSite = new File(hadoopConfDir, "yarn-site.xml");
if (yarnSite.exists()) {
hadoopConfiguration.addResource(new Path(yarnSite.getAbsolutePath()));
}
// Add mapred-site.xml. We need to read configurations like compression codec.
File mapredSite = new File(hadoopConfDir, "mapred-site.xml");
if (mapredSite.exists()) {
hadoopConfiguration.addResource(new Path(mapredSite.getAbsolutePath()));
}
return hadoopConfiguration;
}
return null;
}
/**
@@ -109,6 +167,21 @@ public class StreamerUtil {
}
}
/**
* Create a key generator class via reflection, passing in any configs needed.
* <p>
* If the class name of key generator is configured through the properties file, i.e., {@code props}, use the corresponding key generator class; otherwise, use the default key generator class
* specified in {@link FlinkOptions}.
*/
public static KeyGenerator createKeyGenerator(Configuration conf) throws IOException {
String keyGeneratorClass = conf.getString(FlinkOptions.KEYGEN_CLASS);
try {
return (KeyGenerator) ReflectionUtils.loadClass(keyGeneratorClass, flinkConf2TypedProperties(conf));
} catch (Throwable e) {
throw new IOException("Could not load key generator class " + keyGeneratorClass, e);
}
}
/**
* Create a payload class via reflection, passing in an ordering/precombine value.
*/
@@ -122,21 +195,59 @@ public class StreamerUtil {
}
}
public static HoodieWriteConfig getHoodieClientConfig(HoodieFlinkStreamer.Config cfg) {
FileSystem fs = FSUtils.getFs(cfg.targetBasePath, getHadoopConf());
HoodieWriteConfig.Builder builder =
HoodieWriteConfig.newBuilder().withEngineType(EngineType.FLINK).withPath(cfg.targetBasePath).combineInput(cfg.filterDupes, true)
.withCompactionConfig(HoodieCompactionConfig.newBuilder().withPayloadClass(cfg.payloadClassName).build())
.withPayloadConfig(HoodiePayloadConfig.newBuilder().withPayloadOrderingField(cfg.sourceOrderingField)
.build())
.forTable(cfg.targetTableName)
.withAutoCommit(false)
.withProps(readConfig(fs, new Path(cfg.propsFilePath), cfg.configs)
.getConfig());
builder = builder.withSchema(new FilebasedSchemaProvider(getProps(cfg)).getTargetSchema().toString());
HoodieWriteConfig config = builder.build();
return config;
/**
* Create a payload class via reflection, do not ordering/precombine value.
*/
public static HoodieRecordPayload createPayload(String payloadClass, GenericRecord record)
throws IOException {
try {
return (HoodieRecordPayload) ReflectionUtils.loadClass(payloadClass,
new Class<?>[] {Option.class}, Option.of(record));
} catch (Throwable e) {
throw new IOException("Could not create payload for class: " + payloadClass, e);
}
}
public static HoodieWriteConfig getHoodieClientConfig(Configuration conf) {
HoodieWriteConfig.Builder builder =
HoodieWriteConfig.newBuilder()
.withEngineType(EngineType.FLINK)
.withPath(conf.getString(FlinkOptions.PATH))
.combineInput(conf.getBoolean(FlinkOptions.INSERT_DROP_DUPS), true)
.withCompactionConfig(
HoodieCompactionConfig.newBuilder()
.withPayloadClass(conf.getString(FlinkOptions.PAYLOAD_CLASS))
.build())
.forTable(conf.getString(FlinkOptions.TABLE_NAME))
.withAutoCommit(false)
.withProps(flinkConf2TypedProperties(FlinkOptions.flatOptions(conf)));
builder = builder.withSchema(getSourceSchema(conf).toString());
return builder.build();
}
/**
* Converts the give {@link Configuration} to {@link TypedProperties}.
* The default values are also set up.
*
* @param conf The flink configuration
* @return a TypedProperties instance
*/
public static TypedProperties flinkConf2TypedProperties(Configuration conf) {
Properties properties = new Properties();
// put all the set up options
conf.addAllToProperties(properties);
// put all the default options
for (ConfigOption<?> option : FlinkOptions.OPTIONAL_OPTIONS) {
if (!conf.contains(option)) {
properties.put(option.key(), option.defaultValue());
}
}
return new TypedProperties(properties);
}
public static void checkRequiredProperties(TypedProperties props, List<String> checkPropNames) {
checkPropNames.forEach(prop ->
Preconditions.checkState(!props.containsKey(prop), "Required property " + prop + " is missing"));
}
}