1
0

[HUDI-1327] Introduce base implemetation of hudi-flink-client (#2176)

This commit is contained in:
wangxianghu
2020-11-18 17:57:11 +08:00
committed by GitHub
parent 430d4b428e
commit 4d05680038
48 changed files with 4597 additions and 38 deletions

View File

@@ -0,0 +1,188 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi;
import com.beust.jcommander.IStringConverter;
import com.beust.jcommander.JCommander;
import com.beust.jcommander.Parameter;
import com.beust.jcommander.ParameterException;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.OverwriteWithLatestAvroPayload;
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.operator.InstantGenerateOperator;
import org.apache.hudi.operator.KeyedWriteProcessFunction;
import org.apache.hudi.operator.KeyedWriteProcessOperator;
import org.apache.hudi.sink.CommitSink;
import org.apache.hudi.source.JsonStringToHoodieRecordMapFunction;
import org.apache.hudi.util.StreamerUtil;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.Properties;
/**
* An Utility which can incrementally consume data from Kafka and apply it to the target table.
* currently, it only support COW table and insert, upsert operation.
*/
public class HudiFlinkStreamer {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
final Config cfg = new Config();
JCommander cmd = new JCommander(cfg, null, args);
if (cfg.help || args.length == 0) {
cmd.usage();
System.exit(1);
}
env.enableCheckpointing(cfg.checkpointInterval);
env.getConfig().setGlobalJobParameters(cfg);
// We use checkpoint to trigger write operation, including instant generating and committing,
// There can only be one checkpoint at one time.
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
env.disableOperatorChaining();
if (cfg.flinkCheckPointPath != null) {
env.setStateBackend(new FsStateBackend(cfg.flinkCheckPointPath));
}
Properties kafkaProps = StreamerUtil.getKafkaProps(cfg);
// Read from kafka source
DataStream<HoodieRecord> inputRecords =
env.addSource(new FlinkKafkaConsumer<>(cfg.kafkaTopic, new SimpleStringSchema(), kafkaProps))
.filter(Objects::nonNull)
.map(new JsonStringToHoodieRecordMapFunction(cfg))
.name("kafka_to_hudi_record")
.uid("kafka_to_hudi_record_uid");
// InstantGenerateOperator helps to emit globally unique instantTime, it must be executed in one parallelism
inputRecords.transform(InstantGenerateOperator.NAME, TypeInformation.of(HoodieRecord.class), new InstantGenerateOperator())
.name("instant_generator")
.uid("instant_generator_id")
.setParallelism(1)
// Keyby partition path, to avoid multiple subtasks writing to a partition at the same time
.keyBy(HoodieRecord::getPartitionPath)
// write operator, where the write operation really happens
.transform(KeyedWriteProcessOperator.NAME, TypeInformation.of(new TypeHint<Tuple3<String, List<WriteStatus>, Integer>>() {
}), new KeyedWriteProcessOperator(new KeyedWriteProcessFunction()))
.name("write_process")
.uid("write_process_uid")
.setParallelism(env.getParallelism())
// Commit can only be executed once, so make it one parallelism
.addSink(new CommitSink())
.name("commit_sink")
.uid("commit_sink_uid")
.setParallelism(1);
env.execute(cfg.targetTableName);
}
public static class Config extends Configuration {
@Parameter(names = {"--kafka-topic"}, description = "kafka topic", required = true)
public String kafkaTopic;
@Parameter(names = {"--kafka-group-id"}, description = "kafka consumer group id", required = true)
public String kafkaGroupId;
@Parameter(names = {"--kafka-bootstrap-servers"}, description = "kafka bootstrap.servers", required = true)
public String kafkaBootstrapServers;
@Parameter(names = {"--flink-checkpoint-path"}, description = "flink checkpoint path")
public String flinkCheckPointPath;
@Parameter(names = {"--flink-block-retry-times"}, description = "Times to retry when latest instant has not completed")
public String blockRetryTime = "10";
@Parameter(names = {"--flink-block-retry-interval"}, description = "Seconds between two tries when latest instant has not completed")
public String blockRetryInterval = "1";
@Parameter(names = {"--target-base-path"},
description = "base path for the target hoodie table. "
+ "(Will be created if did not exist first time around. If exists, expected to be a hoodie table)",
required = true)
public String targetBasePath;
@Parameter(names = {"--target-table"}, description = "name of the target table in Hive", required = true)
public String targetTableName;
@Parameter(names = {"--table-type"}, description = "Type of table. COPY_ON_WRITE (or) MERGE_ON_READ", required = true)
public String tableType;
@Parameter(names = {"--props"}, description = "path to properties file on localfs or dfs, with configurations for "
+ "hoodie client, schema provider, key generator and data source. For hoodie client props, sane defaults are "
+ "used, but recommend use to provide basic things like metrics endpoints, hive configs etc. For sources, refer"
+ "to individual classes, for supported properties.")
public String propsFilePath =
"file://" + System.getProperty("user.dir") + "/src/test/resources/delta-streamer-config/dfs-source.properties";
@Parameter(names = {"--hoodie-conf"}, description = "Any configuration that can be set in the properties file "
+ "(using the CLI parameter \"--props\") can also be passed command line using this parameter.")
public List<String> configs = new ArrayList<>();
@Parameter(names = {"--source-ordering-field"}, description = "Field within source record to decide how"
+ " to break ties between records with same key in input data. Default: 'ts' holding unix timestamp of record")
public String sourceOrderingField = "ts";
@Parameter(names = {"--payload-class"}, description = "subclass of HoodieRecordPayload, that works off "
+ "a GenericRecord. Implement your own, if you want to do something other than overwriting existing value")
public String payloadClassName = OverwriteWithLatestAvroPayload.class.getName();
@Parameter(names = {"--op"}, description = "Takes one of these values : UPSERT (default), INSERT (use when input "
+ "is purely new data/inserts to gain speed)", converter = OperationConverter.class)
public WriteOperationType operation = WriteOperationType.UPSERT;
@Parameter(names = {"--filter-dupes"},
description = "Should duplicate records from source be dropped/filtered out before insert/bulk-insert")
public Boolean filterDupes = false;
@Parameter(names = {"--commit-on-errors"}, description = "Commit even when some records failed to be written")
public Boolean commitOnErrors = false;
/**
* Flink checkpoint interval.
*/
@Parameter(names = {"--checkpoint-interval"}, description = "Flink checkpoint interval.")
public Long checkpointInterval = 1000 * 5L;
@Parameter(names = {"--help", "-h"}, help = true)
public Boolean help = false;
}
private static class OperationConverter implements IStringConverter<WriteOperationType> {
@Override
public WriteOperationType convert(String value) throws ParameterException {
return WriteOperationType.valueOf(value);
}
}
}

View File

@@ -0,0 +1,30 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.exception;
public class HoodieFlinkStreamerException extends HoodieException {
public HoodieFlinkStreamerException(String msg, Throwable e) {
super(msg, e);
}
public HoodieFlinkStreamerException(String msg) {
super(msg);
}
}

View File

@@ -0,0 +1,224 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.operator;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.runtime.state.StateInitializationContext;
import org.apache.flink.runtime.state.StateSnapshotContext;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hudi.HudiFlinkStreamer;
import org.apache.hudi.client.FlinkTaskContextSupplier;
import org.apache.hudi.client.HoodieFlinkWriteClient;
import org.apache.hudi.client.common.HoodieFlinkEngineContext;
import org.apache.hudi.client.common.TaskContextSupplier;
import org.apache.hudi.common.config.SerializableConfiguration;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.util.StreamerUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.TimeUnit;
/**
* Operator helps to generate globally unique instant, it must be executed in one parallelism. Before generate a new
* instant , {@link InstantGenerateOperator} will always check whether the last instant has completed. if it is
* completed, a new instant will be generated immediately, otherwise, wait and check the state of last instant until
* time out and throw an exception.
*/
public class InstantGenerateOperator extends AbstractStreamOperator<HoodieRecord> implements OneInputStreamOperator<HoodieRecord, HoodieRecord> {
private static final Logger LOG = LoggerFactory.getLogger(InstantGenerateOperator.class);
public static final String NAME = "InstantGenerateOperator";
private HudiFlinkStreamer.Config cfg;
private HoodieFlinkWriteClient writeClient;
private SerializableConfiguration serializableHadoopConf;
private transient FileSystem fs;
private String latestInstant = "";
private List<String> latestInstantList = new ArrayList<>(1);
private transient ListState<String> latestInstantState;
private List<StreamRecord> bufferedRecords = new LinkedList();
private transient ListState<StreamRecord> recordsState;
private Integer retryTimes;
private Integer retryInterval;
@Override
public void processElement(StreamRecord<HoodieRecord> streamRecord) throws Exception {
if (streamRecord.getValue() != null) {
bufferedRecords.add(streamRecord);
output.collect(streamRecord);
}
}
@Override
public void open() throws Exception {
super.open();
// get configs from runtimeContext
cfg = (HudiFlinkStreamer.Config) getRuntimeContext().getExecutionConfig().getGlobalJobParameters();
// retry times
retryTimes = Integer.valueOf(cfg.blockRetryTime);
// retry interval
retryInterval = Integer.valueOf(cfg.blockRetryInterval);
// hadoopConf
serializableHadoopConf = new SerializableConfiguration(StreamerUtil.getHadoopConf());
// Hadoop FileSystem
fs = FSUtils.getFs(cfg.targetBasePath, serializableHadoopConf.get());
TaskContextSupplier taskContextSupplier = new FlinkTaskContextSupplier(null);
// writeClient
writeClient = new HoodieFlinkWriteClient(new HoodieFlinkEngineContext(taskContextSupplier), StreamerUtil.getHoodieClientConfig(cfg), true);
// init table, create it if not exists.
initTable();
}
@Override
public void prepareSnapshotPreBarrier(long checkpointId) throws Exception {
super.prepareSnapshotPreBarrier(checkpointId);
// check whether the last instant is completed, if not, wait 10s and then throws an exception
if (!StringUtils.isNullOrEmpty(latestInstant)) {
doCheck();
// last instant completed, set it empty
latestInstant = "";
}
// no data no new instant
if (!bufferedRecords.isEmpty()) {
latestInstant = startNewInstant(checkpointId);
}
}
@Override
public void initializeState(StateInitializationContext context) throws Exception {
// instantState
ListStateDescriptor<String> latestInstantStateDescriptor = new ListStateDescriptor<String>("latestInstant", String.class);
latestInstantState = context.getOperatorStateStore().getListState(latestInstantStateDescriptor);
// recordState
ListStateDescriptor<StreamRecord> recordsStateDescriptor = new ListStateDescriptor<StreamRecord>("recordsState", StreamRecord.class);
recordsState = context.getOperatorStateStore().getListState(recordsStateDescriptor);
if (context.isRestored()) {
Iterator<String> latestInstantIterator = latestInstantState.get().iterator();
latestInstantIterator.forEachRemaining(x -> latestInstant = x);
LOG.info("InstantGenerateOperator initializeState get latestInstant [{}]", latestInstant);
Iterator<StreamRecord> recordIterator = recordsState.get().iterator();
bufferedRecords.clear();
recordIterator.forEachRemaining(x -> bufferedRecords.add(x));
}
}
@Override
public void snapshotState(StateSnapshotContext functionSnapshotContext) throws Exception {
if (latestInstantList.isEmpty()) {
latestInstantList.add(latestInstant);
} else {
latestInstantList.set(0, latestInstant);
}
latestInstantState.update(latestInstantList);
LOG.info("Update latest instant [{}]", latestInstant);
recordsState.update(bufferedRecords);
LOG.info("Update records state size = [{}]", bufferedRecords.size());
bufferedRecords.clear();
}
/**
* Create a new instant.
*
* @param checkpointId
*/
private String startNewInstant(long checkpointId) {
String newTime = writeClient.startCommit();
LOG.info("create instant [{}], at checkpoint [{}]", newTime, checkpointId);
return newTime;
}
/**
* Check the status of last instant.
*/
private void doCheck() throws InterruptedException {
// query the requested and inflight commit/deltacommit instants
String commitType = cfg.tableType.equals(HoodieTableType.COPY_ON_WRITE.name()) ? HoodieTimeline.COMMIT_ACTION : HoodieTimeline.DELTA_COMMIT_ACTION;
LOG.info("Query latest instant [{}]", latestInstant);
List<String> rollbackPendingCommits = writeClient.getInflightsAndRequestedInstants(commitType);
int tryTimes = 0;
while (tryTimes < retryTimes) {
tryTimes++;
StringBuffer sb = new StringBuffer();
if (rollbackPendingCommits.contains(latestInstant)) {
rollbackPendingCommits.forEach(x -> sb.append(x).append(","));
LOG.warn("Latest transaction [{}] is not completed! unCompleted transaction:[{}],try times [{}]", latestInstant, sb.toString(), tryTimes);
TimeUnit.SECONDS.sleep(retryInterval);
rollbackPendingCommits = writeClient.getInflightsAndRequestedInstants(commitType);
} else {
LOG.warn("Latest transaction [{}] is completed! Completed transaction, try times [{}]", latestInstant, tryTimes);
return;
}
}
throw new InterruptedException("Last instant costs more than ten second, stop task now");
}
/**
* Create table if not exists.
*/
private void initTable() throws IOException {
if (!fs.exists(new Path(cfg.targetBasePath))) {
HoodieTableMetaClient.initTableType(new Configuration(serializableHadoopConf.get()), cfg.targetBasePath,
HoodieTableType.valueOf(cfg.tableType), cfg.targetTableName, "archived", cfg.payloadClassName, 1);
LOG.info("Table initialized");
} else {
LOG.info("Table already [{}/{}] exists, do nothing here", cfg.targetBasePath, cfg.targetTableName);
}
}
@Override
public void close() throws Exception {
if (writeClient != null) {
writeClient.close();
}
if (fs != null) {
fs.close();
}
}
}

View File

@@ -0,0 +1,164 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.operator;
import org.apache.hudi.HudiFlinkStreamer;
import org.apache.hudi.client.FlinkTaskContextSupplier;
import org.apache.hudi.client.HoodieFlinkWriteClient;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.client.common.HoodieFlinkEngineContext;
import org.apache.hudi.common.config.SerializableConfiguration;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.exception.HoodieFlinkStreamerException;
import org.apache.hudi.util.StreamerUtil;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.LinkedList;
import java.util.List;
/**
* A {@link KeyedProcessFunction} where the write operations really happens.
*/
public class KeyedWriteProcessFunction extends KeyedProcessFunction<String, HoodieRecord, Tuple3<String, List<WriteStatus>, Integer>> implements CheckpointedFunction {
private static final Logger LOG = LoggerFactory.getLogger(KeyedWriteProcessFunction.class);
/**
* Records buffer, will be processed in snapshotState function.
*/
private List<HoodieRecord> bufferedRecords = new LinkedList<>();
/**
* Flink collector help s to send data downstream.
*/
private Collector<Tuple3<String, List<WriteStatus>, Integer>> output;
/**
* Id of current subtask.
*/
private int indexOfThisSubtask;
/**
* Instant time this batch belongs to.
*/
private String latestInstant;
/**
* Flag indicate whether this subtask has records in.
*/
private boolean hasRecordsIn;
/**
* Job conf.
*/
private HudiFlinkStreamer.Config cfg;
/**
* Write Client.
*/
private transient HoodieFlinkWriteClient writeClient;
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
indexOfThisSubtask = getRuntimeContext().getIndexOfThisSubtask();
cfg = (HudiFlinkStreamer.Config) getRuntimeContext().getExecutionConfig().getGlobalJobParameters();
HoodieFlinkEngineContext context =
new HoodieFlinkEngineContext(new SerializableConfiguration(new org.apache.hadoop.conf.Configuration()), new FlinkTaskContextSupplier(getRuntimeContext()));
writeClient = new HoodieFlinkWriteClient<>(context, StreamerUtil.getHoodieClientConfig(cfg));
}
@Override
public void snapshotState(FunctionSnapshotContext context) {
// get latest requested instant
String commitType = cfg.tableType.equals(HoodieTableType.COPY_ON_WRITE.name()) ? HoodieTimeline.COMMIT_ACTION : HoodieTimeline.DELTA_COMMIT_ACTION;
List<String> latestInstants = writeClient.getInflightsAndRequestedInstants(commitType);
latestInstant = latestInstants.isEmpty() ? null : latestInstants.get(0);
if (bufferedRecords.size() > 0) {
hasRecordsIn = true;
if (output != null && latestInstant != null) {
String instantTimestamp = latestInstant;
LOG.info("Write records, subtask id = [{}] checkpoint_id = [{}}] instant = [{}], record size = [{}]", indexOfThisSubtask, context.getCheckpointId(), instantTimestamp, bufferedRecords.size());
List<WriteStatus> writeStatus;
switch (cfg.operation) {
case INSERT:
writeStatus = writeClient.insert(bufferedRecords, instantTimestamp);
break;
case UPSERT:
writeStatus = writeClient.upsert(bufferedRecords, instantTimestamp);
break;
default:
throw new HoodieFlinkStreamerException("Unknown operation : " + cfg.operation);
}
output.collect(new Tuple3<>(instantTimestamp, writeStatus, indexOfThisSubtask));
bufferedRecords.clear();
}
} else {
LOG.info("No data in subtask [{}]", indexOfThisSubtask);
hasRecordsIn = false;
}
}
@Override
public void initializeState(FunctionInitializationContext functionInitializationContext) {
// no operation
}
@Override
public void processElement(HoodieRecord hoodieRecord, Context context, Collector<Tuple3<String, List<WriteStatus>, Integer>> collector) {
if (output == null) {
output = collector;
}
// buffer the records
bufferedRecords.add(hoodieRecord);
}
public boolean hasRecordsIn() {
return hasRecordsIn;
}
public String getLatestInstant() {
return latestInstant;
}
@Override
public void close() {
if (writeClient != null) {
writeClient.close();
}
}
}

View File

@@ -0,0 +1,66 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.operator;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.runtime.state.StateSnapshotContext;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.streaming.api.operators.KeyedProcessOperator;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.common.model.HoodieRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.List;
/**
* Operator helps to mock empty write results and deliver downstream when no data flow in some subtask.
*/
public class KeyedWriteProcessOperator extends KeyedProcessOperator<String, HoodieRecord, Tuple3<String, List<WriteStatus>, Integer>> {
public static final String NAME = "WriteProcessOperator";
private static final Logger LOG = LoggerFactory.getLogger(KeyedWriteProcessOperator.class);
private KeyedWriteProcessFunction writeProcessFunction;
public KeyedWriteProcessOperator(KeyedProcessFunction<String, HoodieRecord, Tuple3<String, List<WriteStatus>, Integer>> function) {
super(function);
this.writeProcessFunction = (KeyedWriteProcessFunction) function;
}
@Override
public void snapshotState(StateSnapshotContext context) throws Exception {
// This super.snapshotState(context) triggers `writeProcessFunction.snapshotState()` method. which means the logic
// below will be executed after `writeProcessFunction.snapshotState()` method.
// If there is no data flows in `writeProcessFunction`, it will never send anything downstream. so, in order to make
// sure each subtask will send a write status downstream, we implement this operator`s snapshotState() to mock empty
// write status and send it downstream when there is no data flows in some subtasks.
super.snapshotState(context);
// make up an empty result and send downstream
if (!writeProcessFunction.hasRecordsIn() && writeProcessFunction.getLatestInstant() != null) {
String instantTime = writeProcessFunction.getLatestInstant();
LOG.info("Mock empty writeStatus, subtaskId = [{}], instant = [{}]", getRuntimeContext().getIndexOfThisSubtask(), instantTime);
output.collect(new StreamRecord<>(new Tuple3(instantTime, new ArrayList<WriteStatus>(), getRuntimeContext().getIndexOfThisSubtask())));
}
}
}

View File

@@ -0,0 +1,79 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.schema;
import org.apache.avro.Schema;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.util.StreamerUtil;
import java.io.IOException;
import java.util.Collections;
/**
* A simple schema provider, that reads off files on DFS.
*/
public class FilebasedSchemaProvider extends SchemaProvider {
/**
* Configs supported.
*/
public static class Config {
private static final String SOURCE_SCHEMA_FILE_PROP = "hoodie.deltastreamer.schemaprovider.source.schema.file";
private static final String TARGET_SCHEMA_FILE_PROP = "hoodie.deltastreamer.schemaprovider.target.schema.file";
}
private final FileSystem fs;
private final Schema sourceSchema;
private Schema targetSchema;
public FilebasedSchemaProvider(TypedProperties props) {
super(props);
StreamerUtil.checkRequiredProperties(props, Collections.singletonList(Config.SOURCE_SCHEMA_FILE_PROP));
this.fs = FSUtils.getFs(props.getString(Config.SOURCE_SCHEMA_FILE_PROP), StreamerUtil.getHadoopConf());
try {
this.sourceSchema = new Schema.Parser().parse(fs.open(new Path(props.getString(Config.SOURCE_SCHEMA_FILE_PROP))));
if (props.containsKey(Config.TARGET_SCHEMA_FILE_PROP)) {
this.targetSchema =
new Schema.Parser().parse(fs.open(new Path(props.getString(Config.TARGET_SCHEMA_FILE_PROP))));
}
} catch (IOException ioe) {
throw new HoodieIOException("Error reading schema", ioe);
}
}
@Override
public Schema getSourceSchema() {
return sourceSchema;
}
@Override
public Schema getTargetSchema() {
if (targetSchema != null) {
return targetSchema;
} else {
return super.getTargetSchema();
}
}
}

View File

@@ -0,0 +1,43 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.schema;
import org.apache.avro.Schema;
import org.apache.hudi.common.config.TypedProperties;
import java.io.Serializable;
/**
* Class to provide schema for reading data and also writing into a Hoodie table.
*/
public abstract class SchemaProvider implements Serializable {
protected TypedProperties config;
protected SchemaProvider(TypedProperties props) {
this.config = props;
}
public abstract Schema getSourceSchema();
public Schema getTargetSchema() {
// by default, use source schema as target for hoodie table as well
return getSourceSchema();
}
}

View File

@@ -0,0 +1,153 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.sink;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.hudi.HudiFlinkStreamer;
import org.apache.hudi.client.FlinkTaskContextSupplier;
import org.apache.hudi.client.HoodieFlinkWriteClient;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.client.common.HoodieFlinkEngineContext;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieFlinkStreamerException;
import org.apache.hudi.util.StreamerUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
/**
* Function helps to execute commit operation. this operation should be executed only once.
*/
public class CommitSink extends RichSinkFunction<Tuple3<String, List<WriteStatus>, Integer>> {
private static final Logger LOG = LoggerFactory.getLogger(CommitSink.class);
/**
* Job conf.
*/
private HudiFlinkStreamer.Config cfg;
/**
* Write client.
*/
private transient HoodieFlinkWriteClient writeClient;
/**
* Write result buffer.
*/
private Map<String, List<List<WriteStatus>>> bufferedWriteStatus = new HashMap<>();
/**
* Parallelism of this job.
*/
private Integer writeParallelSize = 0;
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
// Get configs from runtimeContext
cfg = (HudiFlinkStreamer.Config) getRuntimeContext().getExecutionConfig().getGlobalJobParameters();
writeParallelSize = getRuntimeContext().getExecutionConfig().getParallelism();
// writeClient
writeClient = new HoodieFlinkWriteClient<>(new HoodieFlinkEngineContext(new FlinkTaskContextSupplier(null)), StreamerUtil.getHoodieClientConfig(cfg));
}
@Override
public void invoke(Tuple3<String, List<WriteStatus>, Integer> writeStatues, Context context) {
LOG.info("Receive records, instantTime = [{}], subtaskId = [{}], records size = [{}]", writeStatues.f0, writeStatues.f2, writeStatues.f1.size());
try {
if (bufferedWriteStatus.containsKey(writeStatues.f0)) {
bufferedWriteStatus.get(writeStatues.f0).add(writeStatues.f1);
} else {
List<List<WriteStatus>> oneBatchData = new ArrayList<>(writeParallelSize);
oneBatchData.add(writeStatues.f1);
bufferedWriteStatus.put(writeStatues.f0, oneBatchData);
}
// check and commit
checkAndCommit(writeStatues.f0);
} catch (Exception e) {
throw new HoodieFlinkStreamerException("Invoke sink error", e);
}
}
/**
* Check and commit if all subtask completed.
*
* @throws Exception
*/
private void checkAndCommit(String instantTime) throws Exception {
if (bufferedWriteStatus.get(instantTime).size() == writeParallelSize) {
LOG.info("Instant [{}] process complete, start commit", instantTime);
doCommit(instantTime);
bufferedWriteStatus.clear();
LOG.info("Instant [{}] commit completed!", instantTime);
} else {
LOG.info("Instant [{}], can not commit yet, subtask completed : [{}/{}]", instantTime, bufferedWriteStatus.get(instantTime).size(), writeParallelSize);
}
}
private void doCommit(String instantTime) {
// get the records to commit
List<WriteStatus> writeResults = bufferedWriteStatus.get(instantTime).stream().flatMap(Collection::stream).collect(Collectors.toList());
// commit and rollback
long totalErrorRecords = writeResults.stream().map(WriteStatus::getTotalErrorRecords).reduce(Long::sum).orElse(0L);
long totalRecords = writeResults.stream().map(WriteStatus::getTotalRecords).reduce(Long::sum).orElse(0L);
boolean hasErrors = totalErrorRecords > 0;
if (!hasErrors || cfg.commitOnErrors) {
HashMap<String, String> checkpointCommitMetadata = new HashMap<>();
if (hasErrors) {
LOG.warn("Some records failed to be merged but forcing commit since commitOnErrors set. Errors/Total="
+ totalErrorRecords + "/" + totalRecords);
}
boolean success = writeClient.commit(instantTime, writeResults, Option.of(checkpointCommitMetadata));
if (success) {
LOG.warn("Commit " + instantTime + " successful!");
} else {
LOG.warn("Commit " + instantTime + " failed!");
throw new HoodieException("Commit " + instantTime + " failed!");
}
} else {
LOG.error("Streamer sync found errors when writing. Errors/Total=" + totalErrorRecords + "/" + totalRecords);
LOG.error("Printing out the top 100 errors");
writeResults.stream().filter(WriteStatus::hasErrors).limit(100).forEach(ws -> {
LOG.error("Global error :", ws.getGlobalError());
if (ws.getErrors().size() > 0) {
ws.getErrors().forEach((key, value) -> LOG.trace("Error for key:" + key + " is " + value));
}
});
// Rolling back instant
writeClient.rollback(instantTime);
throw new HoodieException("Commit " + instantTime + " failed and rolled-back !");
}
}
}

View File

@@ -0,0 +1,73 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.source;
import org.apache.avro.generic.GenericRecord;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.hudi.HudiFlinkStreamer;
import org.apache.hudi.avro.HoodieAvroUtils;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.keygen.KeyGenerator;
import org.apache.hudi.schema.FilebasedSchemaProvider;
import org.apache.hudi.util.AvroConvertor;
import org.apache.hudi.util.StreamerUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
/**
* Function helps to transfer json string to {@link HoodieRecord}.
*/
public class JsonStringToHoodieRecordMapFunction implements MapFunction<String, HoodieRecord> {
private static Logger LOG = LoggerFactory.getLogger(JsonStringToHoodieRecordMapFunction.class);
private final HudiFlinkStreamer.Config cfg;
private TypedProperties props;
private KeyGenerator keyGenerator;
private AvroConvertor avroConvertor;
public JsonStringToHoodieRecordMapFunction(HudiFlinkStreamer.Config cfg) {
this.cfg = cfg;
init();
}
@Override
public HoodieRecord map(String value) throws Exception {
GenericRecord gr = avroConvertor.fromJson(value);
HoodieRecordPayload payload = StreamerUtil.createPayload(cfg.payloadClassName, gr,
(Comparable) HoodieAvroUtils.getNestedFieldVal(gr, cfg.sourceOrderingField, false));
return new HoodieRecord<>(keyGenerator.getKey(gr), payload);
}
private void init() {
this.props = StreamerUtil.getProps(cfg);
avroConvertor = new AvroConvertor(new FilebasedSchemaProvider(props).getSourceSchema());
try {
keyGenerator = StreamerUtil.createKeyGenerator(props);
} catch (IOException e) {
LOG.error("Init keyGenerator failed ", e);
}
}
}

View File

@@ -0,0 +1,80 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.util;
import com.twitter.bijection.Injection;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.hudi.avro.MercifulJsonConverter;
import java.io.Serializable;
/**
* Convert a variety of datum into Avro GenericRecords. Has a bunch of lazy fields to circumvent issues around
* serializing these objects from driver to executors
*/
public class AvroConvertor implements Serializable {
private static final long serialVersionUID = 1L;
/**
* To be lazily inited on executors.
*/
private transient Schema schema;
private final String schemaStr;
/**
* To be lazily inited on executors.
*/
private transient MercifulJsonConverter jsonConverter;
/**
* To be lazily inited on executors.
*/
private transient Injection<GenericRecord, byte[]> recordInjection;
public AvroConvertor(String schemaStr) {
this.schemaStr = schemaStr;
}
public AvroConvertor(Schema schema) {
this.schemaStr = schema.toString();
this.schema = schema;
}
private void initSchema() {
if (schema == null) {
Schema.Parser parser = new Schema.Parser();
schema = parser.parse(schemaStr);
}
}
private void initJsonConvertor() {
if (jsonConverter == null) {
jsonConverter = new MercifulJsonConverter();
}
}
public GenericRecord fromJson(String json) {
initSchema();
initJsonConvertor();
return jsonConverter.convert(json, schema);
}
}

View File

@@ -0,0 +1,149 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.util;
import org.apache.avro.generic.GenericRecord;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hudi.HudiFlinkStreamer;
import org.apache.hudi.common.config.DFSPropertiesConfiguration;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.util.ReflectionUtils;
import org.apache.hudi.config.HoodieCompactionConfig;
import org.apache.hudi.config.HoodieIndexConfig;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.exception.HoodieNotSupportedException;
import org.apache.hudi.index.HoodieIndex;
import org.apache.hudi.keygen.KeyGenerator;
import org.apache.hudi.keygen.SimpleAvroKeyGenerator;
import org.apache.hudi.schema.FilebasedSchemaProvider;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.StringReader;
import java.util.List;
import java.util.Properties;
public class StreamerUtil {
private static Logger LOG = LoggerFactory.getLogger(StreamerUtil.class);
public static Properties getKafkaProps(HudiFlinkStreamer.Config cfg) {
Properties result = new Properties();
result.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, cfg.kafkaBootstrapServers);
result.put(ConsumerConfig.GROUP_ID_CONFIG, cfg.kafkaGroupId);
return result;
}
public static TypedProperties getProps(HudiFlinkStreamer.Config cfg) {
return readConfig(
FSUtils.getFs(cfg.propsFilePath, getHadoopConf()),
new Path(cfg.propsFilePath), cfg.configs).getConfig();
}
/**
* Read conig from files.
*/
public static DFSPropertiesConfiguration readConfig(FileSystem fs, Path cfgPath, List<String> overriddenProps) {
DFSPropertiesConfiguration conf;
try {
conf = new DFSPropertiesConfiguration(cfgPath.getFileSystem(fs.getConf()), cfgPath);
} catch (Exception e) {
conf = new DFSPropertiesConfiguration();
LOG.warn("Unexpected error read props file at :" + cfgPath, e);
}
try {
if (!overriddenProps.isEmpty()) {
LOG.info("Adding overridden properties to file properties.");
conf.addProperties(new BufferedReader(new StringReader(String.join("\n", overriddenProps))));
}
} catch (IOException ioe) {
throw new HoodieIOException("Unexpected error adding config overrides", ioe);
}
return conf;
}
public static Configuration getHadoopConf() {
return new Configuration();
}
public static void checkRequiredProperties(TypedProperties props, List<String> checkPropNames) {
checkPropNames.forEach(prop -> {
if (!props.containsKey(prop)) {
throw new HoodieNotSupportedException("Required property " + prop + " is missing");
}
});
}
/**
* Create a key generator class via reflection, passing in any configs needed.
* <p>
* If the class name of key generator is configured through the properties file, i.e., {@code props}, use the corresponding key generator class; otherwise, use the default key generator class
* specified in {@code DataSourceWriteOptions}.
*/
public static KeyGenerator createKeyGenerator(TypedProperties props) throws IOException {
String keyGeneratorClass = props.getString("hoodie.datasource.write.keygenerator.class",
SimpleAvroKeyGenerator.class.getName());
try {
return (KeyGenerator) ReflectionUtils.loadClass(keyGeneratorClass, props);
} catch (Throwable e) {
throw new IOException("Could not load key generator class " + keyGeneratorClass, e);
}
}
/**
* Create a payload class via reflection, passing in an ordering/precombine value.
*/
public static HoodieRecordPayload createPayload(String payloadClass, GenericRecord record, Comparable orderingVal)
throws IOException {
try {
return (HoodieRecordPayload) ReflectionUtils.loadClass(payloadClass,
new Class<?>[] {GenericRecord.class, Comparable.class}, record, orderingVal);
} catch (Throwable e) {
throw new IOException("Could not create payload for class: " + payloadClass, e);
}
}
public static HoodieWriteConfig getHoodieClientConfig(HudiFlinkStreamer.Config cfg) {
FileSystem fs = FSUtils.getFs(cfg.targetBasePath, getHadoopConf());
HoodieWriteConfig.Builder builder =
HoodieWriteConfig.newBuilder().withPath(cfg.targetBasePath).combineInput(cfg.filterDupes, true)
.withCompactionConfig(HoodieCompactionConfig.newBuilder().withPayloadClass(cfg.payloadClassName).build())
.forTable(cfg.targetTableName)
.withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.INMEMORY).build())
.withAutoCommit(false)
.withProps(readConfig(fs, new Path(cfg.propsFilePath), cfg.configs)
.getConfig());
builder = builder.withSchema(new FilebasedSchemaProvider(getProps(cfg)).getTargetSchema().toString());
HoodieWriteConfig config = builder.build();
return config;
}
}