1
0

[HUDI-1821] Remove legacy code for Flink writer (#2868)

This commit is contained in:
hiscat
2021-05-07 10:58:49 +08:00
committed by GitHub
parent 0284cdecce
commit 0a5863939b
10 changed files with 36 additions and 1146 deletions

View File

@@ -1,30 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.exception;
public class HoodieFlinkStreamerException extends HoodieException {
public HoodieFlinkStreamerException(String msg, Throwable e) {
super(msg, e);
}
public HoodieFlinkStreamerException(String msg) {
super(msg);
}
}

View File

@@ -1,153 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.sink;
import org.apache.hudi.client.FlinkTaskContextSupplier;
import org.apache.hudi.client.HoodieFlinkWriteClient;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.client.common.HoodieFlinkEngineContext;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieFlinkStreamerException;
import org.apache.hudi.streamer.FlinkStreamerConfig;
import org.apache.hudi.util.StreamerUtil;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
/**
* Function helps to execute commit operation. this operation should be executed only once.
*/
public class CommitSink extends RichSinkFunction<Tuple3<String, List<WriteStatus>, Integer>> {
private static final Logger LOG = LoggerFactory.getLogger(CommitSink.class);
/**
* Job conf.
*/
private FlinkStreamerConfig cfg;
/**
* Write client.
*/
private transient HoodieFlinkWriteClient writeClient;
/**
* Write result buffer.
*/
private Map<String, List<List<WriteStatus>>> bufferedWriteStatus = new HashMap<>();
/**
* Parallelism of this job.
*/
private Integer writeParallelSize = 0;
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
// Get configs from runtimeContext
cfg = (FlinkStreamerConfig) getRuntimeContext().getExecutionConfig().getGlobalJobParameters();
writeParallelSize = getRuntimeContext().getExecutionConfig().getParallelism();
// writeClient
writeClient = new HoodieFlinkWriteClient<>(new HoodieFlinkEngineContext(new FlinkTaskContextSupplier(null)), StreamerUtil.getHoodieClientConfig(cfg));
}
@Override
public void invoke(Tuple3<String, List<WriteStatus>, Integer> writeStatues, Context context) {
LOG.info("Receive records, instantTime = [{}], subtaskId = [{}], WriteStatus size = [{}]", writeStatues.f0, writeStatues.f2, writeStatues.f1.size());
try {
if (bufferedWriteStatus.containsKey(writeStatues.f0)) {
bufferedWriteStatus.get(writeStatues.f0).add(writeStatues.f1);
} else {
List<List<WriteStatus>> oneBatchData = new ArrayList<>(writeParallelSize);
oneBatchData.add(writeStatues.f1);
bufferedWriteStatus.put(writeStatues.f0, oneBatchData);
}
// check and commit
checkAndCommit(writeStatues.f0);
} catch (Exception e) {
throw new HoodieFlinkStreamerException("Invoke sink error", e);
}
}
/**
* Check and commit if all subtask completed.
*
* @throws Exception
*/
private void checkAndCommit(String instantTime) throws Exception {
if (bufferedWriteStatus.get(instantTime).size() == writeParallelSize) {
LOG.info("Instant [{}] process complete, start commit", instantTime);
doCommit(instantTime);
bufferedWriteStatus.clear();
LOG.info("Instant [{}] commit completed!", instantTime);
} else {
LOG.info("Instant [{}], can not commit yet, subtask completed : [{}/{}]", instantTime, bufferedWriteStatus.get(instantTime).size(), writeParallelSize);
}
}
private void doCommit(String instantTime) {
// get the records to commit
List<WriteStatus> writeResults = bufferedWriteStatus.get(instantTime).stream().flatMap(Collection::stream).collect(Collectors.toList());
// commit and rollback
long totalErrorRecords = writeResults.stream().map(WriteStatus::getTotalErrorRecords).reduce(Long::sum).orElse(0L);
long totalRecords = writeResults.stream().map(WriteStatus::getTotalRecords).reduce(Long::sum).orElse(0L);
boolean hasErrors = totalErrorRecords > 0;
if (!hasErrors || cfg.commitOnErrors) {
HashMap<String, String> checkpointCommitMetadata = new HashMap<>();
if (hasErrors) {
LOG.warn("Some records failed to be merged but forcing commit since commitOnErrors set. Errors/Total="
+ totalErrorRecords + "/" + totalRecords);
}
boolean success = writeClient.commit(instantTime, writeResults, Option.of(checkpointCommitMetadata));
if (success) {
LOG.warn("Commit " + instantTime + " successful!");
} else {
LOG.warn("Commit " + instantTime + " failed!");
throw new HoodieException("Commit " + instantTime + " failed!");
}
} else {
LOG.error("Streamer sync found errors when writing. Errors/Total=" + totalErrorRecords + "/" + totalRecords);
LOG.error("Printing out the top 100 errors");
writeResults.stream().filter(WriteStatus::hasErrors).limit(100).forEach(ws -> {
LOG.error("Global error :", ws.getGlobalError());
if (ws.getErrors().size() > 0) {
ws.getErrors().forEach((key, value) -> LOG.trace("Error for key:" + key + " is " + value));
}
});
// Rolling back instant
writeClient.rollback(instantTime);
throw new HoodieException("Commit " + instantTime + " failed and rolled-back !");
}
}
}

View File

@@ -1,297 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.sink;
import org.apache.hudi.client.FlinkTaskContextSupplier;
import org.apache.hudi.client.HoodieFlinkWriteClient;
import org.apache.hudi.client.common.HoodieFlinkEngineContext;
import org.apache.hudi.common.config.SerializableConfiguration;
import org.apache.hudi.common.engine.TaskContextSupplier;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.util.CommitUtils;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.streamer.FlinkStreamerConfig;
import org.apache.hudi.util.StreamerUtil;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.runtime.state.StateInitializationContext;
import org.apache.flink.runtime.state.StateSnapshotContext;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
/**
* Operator helps to generate globally unique instant. Before generate a new instant {@link InstantGenerateOperator}
* will always check whether the last instant has completed. if it is completed and has records flows in, a new instant
* will be generated immediately, otherwise, wait and check the state of last instant until time out and throw an exception.
*/
public class InstantGenerateOperator extends AbstractStreamOperator<HoodieRecord> implements OneInputStreamOperator<HoodieRecord, HoodieRecord> {
private static final Logger LOG = LoggerFactory.getLogger(InstantGenerateOperator.class);
public static final String NAME = "InstantGenerateOperator";
private FlinkStreamerConfig cfg;
private HoodieFlinkWriteClient writeClient;
private SerializableConfiguration serializableHadoopConf;
private transient FileSystem fs;
private String latestInstant = "";
private List<String> latestInstantList = new ArrayList<>(1);
private transient ListState<String> latestInstantState;
private Integer retryTimes;
private Integer retryInterval;
private static final String DELIMITER = "_";
private static final String INSTANT_MARKER_FOLDER_NAME = ".instant_marker";
private transient boolean isMain = false;
private AtomicLong recordCounter = new AtomicLong(0);
private StreamingRuntimeContext runtimeContext;
private int indexOfThisSubtask;
@Override
public void processElement(StreamRecord<HoodieRecord> streamRecord) throws Exception {
if (streamRecord.getValue() != null) {
output.collect(streamRecord);
recordCounter.incrementAndGet();
}
}
@Override
public void open() throws Exception {
super.open();
// get configs from runtimeContext
cfg = (FlinkStreamerConfig) getRuntimeContext().getExecutionConfig().getGlobalJobParameters();
// retry times
retryTimes = Integer.valueOf(cfg.instantRetryTimes);
// retry interval
retryInterval = Integer.valueOf(cfg.instantRetryInterval);
// hadoopConf
serializableHadoopConf = new SerializableConfiguration(StreamerUtil.getHadoopConf());
// Hadoop FileSystem
fs = FSUtils.getFs(cfg.targetBasePath, serializableHadoopConf.get());
if (isMain) {
TaskContextSupplier taskContextSupplier = new FlinkTaskContextSupplier(null);
// writeClient
writeClient = new HoodieFlinkWriteClient(new HoodieFlinkEngineContext(taskContextSupplier), StreamerUtil.getHoodieClientConfig(cfg));
// init table, create it if not exists.
StreamerUtil.initTableIfNotExists(FlinkOptions.fromStreamerConfig(cfg));
// create instant marker directory
createInstantMarkerDir();
}
}
@Override
public void prepareSnapshotPreBarrier(long checkpointId) throws Exception {
super.prepareSnapshotPreBarrier(checkpointId);
String instantMarkerFileName = String.format("%d%s%d%s%d", indexOfThisSubtask, DELIMITER, checkpointId, DELIMITER, recordCounter.get());
Path path = generateCurrentMakerFilePath(instantMarkerFileName);
// create marker file
fs.create(path, true);
LOG.info("Subtask [{}] at checkpoint [{}] created marker file [{}]", indexOfThisSubtask, checkpointId, instantMarkerFileName);
if (isMain) {
// check whether the last instant is completed, will try specific times until an exception is thrown
if (!StringUtils.isNullOrEmpty(latestInstant)) {
doCheck();
// last instant completed, set it empty
latestInstant = "";
}
boolean receivedDataInCurrentCP = checkReceivedData(checkpointId);
// no data no new instant
if (receivedDataInCurrentCP) {
latestInstant = startNewInstant(checkpointId);
}
}
}
@Override
public void initializeState(StateInitializationContext context) throws Exception {
runtimeContext = getRuntimeContext();
indexOfThisSubtask = runtimeContext.getIndexOfThisSubtask();
isMain = indexOfThisSubtask == 0;
if (isMain) {
// instantState
ListStateDescriptor<String> latestInstantStateDescriptor = new ListStateDescriptor<>("latestInstant", String.class);
latestInstantState = context.getOperatorStateStore().getListState(latestInstantStateDescriptor);
if (context.isRestored()) {
Iterator<String> latestInstantIterator = latestInstantState.get().iterator();
latestInstantIterator.forEachRemaining(x -> latestInstant = x);
LOG.info("Restoring the latest instant [{}] from the state", latestInstant);
}
}
}
@Override
public void snapshotState(StateSnapshotContext functionSnapshotContext) throws Exception {
long checkpointId = functionSnapshotContext.getCheckpointId();
long recordSize = recordCounter.get();
if (isMain) {
LOG.info("Update latest instant [{}] records size [{}] checkpointId [{}]", latestInstant, recordSize, checkpointId);
if (latestInstantList.isEmpty()) {
latestInstantList.add(latestInstant);
} else {
latestInstantList.set(0, latestInstant);
}
latestInstantState.update(latestInstantList);
} else {
LOG.info("Task instance {} received {} records in checkpoint [{}]", indexOfThisSubtask, recordSize, checkpointId);
}
recordCounter.set(0);
}
/**
* Create a new instant.
*
* @param checkpointId
*/
private String startNewInstant(long checkpointId) {
String newTime = writeClient.startCommit();
final String actionType = CommitUtils.getCommitActionType(HoodieTableType.valueOf(this.cfg.tableType));
this.writeClient.transitionRequestedToInflight(actionType, newTime);
LOG.info("create instant [{}], at checkpoint [{}]", newTime, checkpointId);
return newTime;
}
/**
* Check the status of last instant.
*/
private void doCheck() throws InterruptedException {
// query the requested and inflight commit/deltacommit instants
String commitType = cfg.tableType.equals(HoodieTableType.COPY_ON_WRITE.name()) ? HoodieTimeline.COMMIT_ACTION : HoodieTimeline.DELTA_COMMIT_ACTION;
LOG.info("Query latest instant [{}]", latestInstant);
List<String> rollbackPendingCommits = writeClient.getInflightsAndRequestedInstants(commitType);
int tryTimes = 0;
while (tryTimes < retryTimes) {
tryTimes++;
StringBuilder sb = new StringBuilder();
if (rollbackPendingCommits.contains(latestInstant)) {
rollbackPendingCommits.forEach(x -> sb.append(x).append(","));
LOG.warn("Latest transaction [{}] is not completed! unCompleted transaction:[{}],try times [{}]", latestInstant, sb, tryTimes);
TimeUnit.SECONDS.sleep(retryInterval);
rollbackPendingCommits = writeClient.getInflightsAndRequestedInstants(commitType);
} else {
LOG.warn("Latest transaction [{}] is completed! Completed transaction, try times [{}]", latestInstant, tryTimes);
return;
}
}
throw new InterruptedException(String.format("Last instant costs more than %s second, stop task now", retryTimes * retryInterval));
}
@Override
public void close() throws Exception {
if (writeClient != null) {
writeClient.close();
}
if (fs != null) {
fs.close();
}
}
private boolean checkReceivedData(long checkpointId) throws InterruptedException, IOException {
int numberOfParallelSubtasks = runtimeContext.getNumberOfParallelSubtasks();
FileStatus[] fileStatuses;
Path instantMarkerPath = generateCurrentMakerDirPath();
// waiting all subtask create marker file ready
while (true) {
Thread.sleep(500L);
fileStatuses = fs.listStatus(instantMarkerPath, new PathFilter() {
@Override
public boolean accept(Path pathname) {
return pathname.getName().contains(String.format("%s%d%s", DELIMITER, checkpointId, DELIMITER));
}
});
// is ready
if (fileStatuses != null && fileStatuses.length == numberOfParallelSubtasks) {
break;
}
}
boolean receivedData = false;
// check whether has data in this checkpoint and delete maker file.
for (FileStatus fileStatus : fileStatuses) {
Path path = fileStatus.getPath();
String name = path.getName();
// has data
if (Long.parseLong(name.split(DELIMITER)[2]) > 0) {
receivedData = true;
break;
}
}
// delete all marker file
cleanMarkerDir(instantMarkerPath);
return receivedData;
}
private void createInstantMarkerDir() throws IOException {
// Always create instantMarkerFolder which is needed for InstantGenerateOperator
final Path instantMarkerFolder = new Path(new Path(cfg.targetBasePath, HoodieTableMetaClient.AUXILIARYFOLDER_NAME), INSTANT_MARKER_FOLDER_NAME);
if (!fs.exists(instantMarkerFolder)) {
fs.mkdirs(instantMarkerFolder);
} else {
// Clean marker dir.
cleanMarkerDir(instantMarkerFolder);
}
}
private void cleanMarkerDir(Path instantMarkerFolder) throws IOException {
FileStatus[] fileStatuses = fs.listStatus(instantMarkerFolder);
for (FileStatus fileStatus : fileStatuses) {
fs.delete(fileStatus.getPath(), true);
}
}
private Path generateCurrentMakerDirPath() {
Path auxPath = new Path(cfg.targetBasePath, HoodieTableMetaClient.AUXILIARYFOLDER_NAME);
return new Path(auxPath, INSTANT_MARKER_FOLDER_NAME);
}
private Path generateCurrentMakerFilePath(String instantMarkerFileName) {
return new Path(generateCurrentMakerDirPath(), instantMarkerFileName);
}
}

View File

@@ -1,194 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.sink;
import org.apache.hudi.client.FlinkTaskContextSupplier;
import org.apache.hudi.client.HoodieFlinkWriteClient;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.client.common.HoodieFlinkEngineContext;
import org.apache.hudi.common.config.SerializableConfiguration;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.exception.HoodieFlinkStreamerException;
import org.apache.hudi.index.HoodieIndex;
import org.apache.hudi.streamer.FlinkStreamerConfig;
import org.apache.hudi.table.action.commit.FlinkWriteHelper;
import org.apache.hudi.util.StreamerUtil;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.state.CheckpointListener;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
/**
* A {@link KeyedProcessFunction} where the write operations really happens.
*/
public class KeyedWriteProcessFunction
extends KeyedProcessFunction<String, HoodieRecord, Tuple3<String, List<WriteStatus>, Integer>>
implements CheckpointedFunction, CheckpointListener {
private static final Logger LOG = LoggerFactory.getLogger(KeyedWriteProcessFunction.class);
/**
* Records buffer, will be processed in snapshotState function.
*/
private Map<String, List<HoodieRecord>> bufferedRecords;
/**
* Flink collector help s to send data downstream.
*/
private Collector<Tuple3<String, List<WriteStatus>, Integer>> output;
/**
* Id of current subtask.
*/
private int indexOfThisSubtask;
/**
* Instant time this batch belongs to.
*/
private String latestInstant;
/**
* Flag indicate whether this subtask has records in.
*/
private boolean hasRecordsIn;
/**
* Job conf.
*/
private FlinkStreamerConfig cfg;
/**
* Write Client.
*/
private transient HoodieFlinkWriteClient writeClient;
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
this.bufferedRecords = new LinkedHashMap<>();
indexOfThisSubtask = getRuntimeContext().getIndexOfThisSubtask();
cfg = (FlinkStreamerConfig) getRuntimeContext().getExecutionConfig().getGlobalJobParameters();
HoodieFlinkEngineContext context =
new HoodieFlinkEngineContext(new SerializableConfiguration(new org.apache.hadoop.conf.Configuration()), new FlinkTaskContextSupplier(getRuntimeContext()));
writeClient = new HoodieFlinkWriteClient<>(context, StreamerUtil.getHoodieClientConfig(cfg));
}
@Override
public void snapshotState(FunctionSnapshotContext context) {
// get latest requested instant
String commitType = cfg.tableType.equals(HoodieTableType.COPY_ON_WRITE.name()) ? HoodieTimeline.COMMIT_ACTION : HoodieTimeline.DELTA_COMMIT_ACTION;
List<String> latestInstants = writeClient.getInflightsAndRequestedInstants(commitType);
latestInstant = latestInstants.isEmpty() ? null : latestInstants.get(0);
if (bufferedRecords.size() > 0) {
hasRecordsIn = true;
if (output != null && latestInstant != null) {
String instantTimestamp = latestInstant;
LOG.info("Write records, subtask id = [{}] checkpoint_id = [{}}] instant = [{}], record size = [{}]", indexOfThisSubtask, context.getCheckpointId(), instantTimestamp, bufferedRecords.size());
final List<WriteStatus> writeStatus = new ArrayList<>();
this.bufferedRecords.values().forEach(records -> {
if (records.size() > 0) {
if (cfg.filterDupes) {
records = FlinkWriteHelper.newInstance().deduplicateRecords(records, (HoodieIndex) null, -1);
}
switch (cfg.operation) {
case INSERT:
writeStatus.addAll(writeClient.insert(records, instantTimestamp));
break;
case UPSERT:
writeStatus.addAll(writeClient.upsert(records, instantTimestamp));
break;
default:
throw new HoodieFlinkStreamerException("Unknown operation : " + cfg.operation);
}
}
});
output.collect(new Tuple3<>(instantTimestamp, writeStatus, indexOfThisSubtask));
bufferedRecords.clear();
}
} else {
LOG.info("No data in subtask [{}]", indexOfThisSubtask);
hasRecordsIn = false;
}
}
@Override
public void initializeState(FunctionInitializationContext functionInitializationContext) {
// no operation
}
@Override
public void processElement(HoodieRecord hoodieRecord, Context context, Collector<Tuple3<String, List<WriteStatus>, Integer>> collector) {
if (output == null) {
output = collector;
}
// buffer the records
putDataIntoBuffer(hoodieRecord);
}
@Override
public void notifyCheckpointComplete(long checkpointId) throws Exception {
this.writeClient.cleanHandles();
}
public boolean hasRecordsIn() {
return hasRecordsIn;
}
public String getLatestInstant() {
return latestInstant;
}
private void putDataIntoBuffer(HoodieRecord<?> record) {
final String fileId = record.getCurrentLocation().getFileId();
final String key = StreamerUtil.generateBucketKey(record.getPartitionPath(), fileId);
if (!this.bufferedRecords.containsKey(key)) {
this.bufferedRecords.put(key, new ArrayList<>());
}
this.bufferedRecords.get(key).add(record);
}
@Override
public void close() {
if (writeClient != null) {
writeClient.close();
}
}
}

View File

@@ -1,66 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.sink;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.runtime.state.StateSnapshotContext;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.streaming.api.operators.KeyedProcessOperator;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.List;
/**
* Operator helps to mock empty write results and deliver downstream when no data flow in some subtask.
*/
public class KeyedWriteProcessOperator extends KeyedProcessOperator<String, HoodieRecord, Tuple3<String, List<WriteStatus>, Integer>> {
public static final String NAME = "WriteProcessOperator";
private static final Logger LOG = LoggerFactory.getLogger(KeyedWriteProcessOperator.class);
private KeyedWriteProcessFunction writeProcessFunction;
public KeyedWriteProcessOperator(KeyedProcessFunction<String, HoodieRecord, Tuple3<String, List<WriteStatus>, Integer>> function) {
super(function);
this.writeProcessFunction = (KeyedWriteProcessFunction) function;
}
@Override
public void snapshotState(StateSnapshotContext context) throws Exception {
// This super.snapshotState(context) triggers `writeProcessFunction.snapshotState()` method. which means the logic
// below will be executed after `writeProcessFunction.snapshotState()` method.
// If there is no data flows in `writeProcessFunction`, it will never send anything downstream. so, in order to make
// sure each subtask will send a write status downstream, we implement this operator`s snapshotState() to mock empty
// write status and send it downstream when there is no data flows in some subtasks.
super.snapshotState(context);
// make up an empty result and send downstream
if (!writeProcessFunction.hasRecordsIn() && writeProcessFunction.getLatestInstant() != null) {
String instantTime = writeProcessFunction.getLatestInstant();
LOG.info("Mock empty writeStatus, subtaskId = [{}], instant = [{}]", getRuntimeContext().getIndexOfThisSubtask(), instantTime);
output.collect(new StreamRecord<>(new Tuple3(instantTime, new ArrayList<WriteStatus>(), getRuntimeContext().getIndexOfThisSubtask())));
}
}
}

View File

@@ -1,88 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.sink.transform;
import org.apache.hudi.avro.HoodieAvroUtils;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.model.OverwriteWithLatestAvroPayload;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieFlinkStreamerException;
import org.apache.hudi.keygen.KeyGenerator;
import org.apache.hudi.keygen.SimpleAvroKeyGenerator;
import org.apache.hudi.schema.FilebasedSchemaProvider;
import org.apache.hudi.util.AvroConvertor;
import org.apache.hudi.util.StreamerUtil;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.flink.api.common.functions.MapFunction;
import java.io.IOException;
/**
* Function helps to transfer json string to {@link HoodieRecord}.
*/
public class JsonStringToHoodieRecordMapFunction implements MapFunction<String, HoodieRecord> {
private TypedProperties props;
private KeyGenerator keyGenerator;
private AvroConvertor avroConvertor;
private Option<String> schemaStr;
private String payloadClassName;
private String orderingField;
public JsonStringToHoodieRecordMapFunction(TypedProperties props) {
this(props, Option.empty());
}
public JsonStringToHoodieRecordMapFunction(TypedProperties props, Option<String> schemaStr) {
this.props = props;
this.schemaStr = schemaStr;
init();
}
@Override
public HoodieRecord map(String value) throws Exception {
GenericRecord gr = this.avroConvertor.fromJson(value);
HoodieRecordPayload payload = StreamerUtil.createPayload(this.payloadClassName, gr,
(Comparable) HoodieAvroUtils.getNestedFieldVal(gr, this.orderingField, false));
return new HoodieRecord<>(this.keyGenerator.getKey(gr), payload);
}
private void init() {
if (schemaStr.isPresent()) {
this.avroConvertor = new AvroConvertor(new Schema.Parser().parse(schemaStr.get()));
} else {
this.avroConvertor = new AvroConvertor(new FilebasedSchemaProvider(props).getSourceSchema());
}
this.payloadClassName = props.getString(HoodieWriteConfig.WRITE_PAYLOAD_CLASS,
OverwriteWithLatestAvroPayload.class.getName());
this.orderingField = props.getString(HoodieWriteConfig.PRECOMBINE_FIELD_PROP, "ts");
try {
this.keyGenerator = StreamerUtil.createKeyGenerator(props);
} catch (IOException e) {
throw new HoodieFlinkStreamerException(String.format("KeyGenerator %s initialization failed",
props.getString(HoodieWriteConfig.KEYGENERATOR_CLASS_PROP, SimpleAvroKeyGenerator.class.getName())), e);
}
}
}

View File

@@ -18,33 +18,28 @@
package org.apache.hudi.streamer;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.sink.CommitSink;
import org.apache.hudi.sink.InstantGenerateOperator;
import org.apache.hudi.sink.KeyedWriteProcessFunction;
import org.apache.hudi.sink.KeyedWriteProcessOperator;
import org.apache.hudi.sink.CleanFunction;
import org.apache.hudi.sink.StreamWriteOperatorFactory;
import org.apache.hudi.sink.partitioner.BucketAssignFunction;
import org.apache.hudi.sink.transform.JsonStringToHoodieRecordMapFunction;
import org.apache.hudi.sink.transform.RowDataToHoodieFunction;
import org.apache.hudi.util.AvroSchemaConverter;
import org.apache.hudi.util.StreamerUtil;
import com.beust.jcommander.JCommander;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.formats.json.JsonRowDataDeserializationSchema;
import org.apache.flink.formats.json.TimestampFormat;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.operators.KeyedProcessOperator;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
import org.apache.flink.table.types.logical.RowType;
import java.util.List;
import java.util.Objects;
import java.util.Properties;
/**
* An Utility which can incrementally consume data from Kafka and apply it to the target table.
@@ -70,31 +65,31 @@ public class HoodieFlinkStreamer {
env.setStateBackend(new FsStateBackend(cfg.flinkCheckPointPath));
}
Properties kafkaProps = StreamerUtil.appendKafkaProps(cfg);
// Read from kafka source
RowType rowType =
(RowType) AvroSchemaConverter.convertToDataType(StreamerUtil.getSourceSchema(cfg))
.getLogicalType();
Configuration conf = FlinkOptions.fromStreamerConfig(cfg);
int numWriteTask = conf.getInteger(FlinkOptions.WRITE_TASKS);
StreamWriteOperatorFactory<HoodieRecord> operatorFactory =
new StreamWriteOperatorFactory<>(conf);
TypedProperties props = StreamerUtil.appendKafkaProps(cfg);
// add data source config
props.put(HoodieWriteConfig.WRITE_PAYLOAD_CLASS, cfg.payloadClassName);
props.put(HoodieWriteConfig.PRECOMBINE_FIELD_PROP, cfg.sourceOrderingField);
StreamerUtil.initTableIfNotExists(conf);
// Read from kafka source
DataStream<HoodieRecord> inputRecords =
env.addSource(new FlinkKafkaConsumer<>(cfg.kafkaTopic, new SimpleStringSchema(), props))
.filter(Objects::nonNull)
.map(new JsonStringToHoodieRecordMapFunction(props))
.name("kafka_to_hudi_record")
.uid("kafka_to_hudi_record_uid");
inputRecords.transform(InstantGenerateOperator.NAME, TypeInformation.of(HoodieRecord.class), new InstantGenerateOperator())
.name("instant_generator")
.uid("instant_generator_id")
// Keyby partition path, to avoid multiple subtasks writing to a partition at the same time
env.addSource(new FlinkKafkaConsumer<>(
cfg.kafkaTopic,
new JsonRowDataDeserializationSchema(
rowType,
InternalTypeInfo.of(rowType),
false,
true,
TimestampFormat.ISO_8601
), kafkaProps))
.name("kafka_source")
.uid("uid_kafka_source")
.map(new RowDataToHoodieFunction<>(rowType, conf), TypeInformation.of(HoodieRecord.class))
// Key-by partition path, to avoid multiple subtasks write to a partition at the same time
.keyBy(HoodieRecord::getPartitionPath)
// use the bucket assigner to generate bucket IDs
.transform(
"bucket_assigner",
TypeInformation.of(HoodieRecord.class),
@@ -102,18 +97,13 @@ public class HoodieFlinkStreamer {
.uid("uid_bucket_assigner")
// shuffle by fileId(bucket id)
.keyBy(record -> record.getCurrentLocation().getFileId())
// write operator, where the write operation really happens
.transform(KeyedWriteProcessOperator.NAME, TypeInformation.of(new TypeHint<Tuple3<String, List<WriteStatus>, Integer>>() {
}), new KeyedWriteProcessOperator(new KeyedWriteProcessFunction()))
.name("write_process")
.uid("write_process_uid")
.transform("hoodie_stream_write", null, operatorFactory)
.uid("uid_hoodie_stream_write")
.setParallelism(numWriteTask)
// Commit can only be executed once, so make it one parallelism
.addSink(new CommitSink())
.name("commit_sink")
.uid("commit_sink_uid")
.setParallelism(1);
.addSink(new CleanFunction<>(conf))
.setParallelism(1)
.name("clean_commits")
.uid("uid_clean_commits");
env.execute(cfg.targetTableName);
}

View File

@@ -1,110 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.streamer;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.sink.CleanFunction;
import org.apache.hudi.sink.StreamWriteOperatorFactory;
import org.apache.hudi.sink.partitioner.BucketAssignFunction;
import org.apache.hudi.sink.transform.RowDataToHoodieFunction;
import org.apache.hudi.util.AvroSchemaConverter;
import org.apache.hudi.util.StreamerUtil;
import com.beust.jcommander.JCommander;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.formats.json.JsonRowDataDeserializationSchema;
import org.apache.flink.formats.json.TimestampFormat;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.operators.KeyedProcessOperator;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
import org.apache.flink.table.types.logical.RowType;
import java.util.Properties;
/**
* An Utility which can incrementally consume data from Kafka and apply it to the target table.
* currently, it only support COW table and insert, upsert operation.
*/
public class HoodieFlinkStreamerV2 {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
final FlinkStreamerConfig cfg = new FlinkStreamerConfig();
JCommander cmd = new JCommander(cfg, null, args);
if (cfg.help || args.length == 0) {
cmd.usage();
System.exit(1);
}
env.enableCheckpointing(cfg.checkpointInterval);
env.getConfig().setGlobalJobParameters(cfg);
// We use checkpoint to trigger write operation, including instant generating and committing,
// There can only be one checkpoint at one time.
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
if (cfg.flinkCheckPointPath != null) {
env.setStateBackend(new FsStateBackend(cfg.flinkCheckPointPath));
}
Properties kafkaProps = StreamerUtil.appendKafkaProps(cfg);
// Read from kafka source
RowType rowType =
(RowType) AvroSchemaConverter.convertToDataType(StreamerUtil.getSourceSchema(cfg))
.getLogicalType();
Configuration conf = FlinkOptions.fromStreamerConfig(cfg);
int numWriteTask = conf.getInteger(FlinkOptions.WRITE_TASKS);
StreamWriteOperatorFactory<HoodieRecord> operatorFactory =
new StreamWriteOperatorFactory<>(conf);
env.addSource(new FlinkKafkaConsumer<>(
cfg.kafkaTopic,
new JsonRowDataDeserializationSchema(
rowType,
InternalTypeInfo.of(rowType),
false,
true,
TimestampFormat.ISO_8601
), kafkaProps))
.name("kafka_source")
.uid("uid_kafka_source")
.map(new RowDataToHoodieFunction<>(rowType, conf), TypeInformation.of(HoodieRecord.class))
// Key-by partition path, to avoid multiple subtasks write to a partition at the same time
.keyBy(HoodieRecord::getPartitionPath)
.transform(
"bucket_assigner",
TypeInformation.of(HoodieRecord.class),
new KeyedProcessOperator<>(new BucketAssignFunction<>(conf)))
.uid("uid_bucket_assigner")
// shuffle by fileId(bucket id)
.keyBy(record -> record.getCurrentLocation().getFileId())
.transform("hoodie_stream_write", null, operatorFactory)
.uid("uid_hoodie_stream_write")
.setParallelism(numWriteTask)
.addSink(new CleanFunction<>(conf))
.setParallelism(1)
.name("clean_commits")
.uid("uid_clean_commits");
env.execute(cfg.targetTableName);
}
}

View File

@@ -18,7 +18,6 @@
package org.apache.hudi.sink;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.configuration.FlinkOptions;
@@ -29,7 +28,6 @@ import org.apache.hudi.sink.compact.CompactionPlanEvent;
import org.apache.hudi.sink.compact.CompactionPlanOperator;
import org.apache.hudi.sink.partitioner.BucketAssignFunction;
import org.apache.hudi.sink.transform.RowDataToHoodieFunction;
import org.apache.hudi.streamer.FlinkStreamerConfig;
import org.apache.hudi.util.AvroSchemaConverter;
import org.apache.hudi.util.StreamerUtil;
import org.apache.hudi.utils.TestConfigurations;
@@ -39,10 +37,8 @@ import org.apache.hudi.utils.source.ContinuousFileSource;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.api.common.io.FilePathFilter;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.io.TextInputFormat;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.execution.JobClient;
import org.apache.flink.core.fs.Path;
@@ -140,75 +136,6 @@ public class StreamWriteITCase extends TestLogger {
TestData.checkWrittenFullData(tempFile, EXPECTED);
}
@Test
public void testWriteToHoodieLegacy() throws Exception {
FlinkStreamerConfig streamerConf = TestConfigurations.getDefaultStreamerConf(tempFile.getAbsolutePath());
Configuration conf = FlinkOptions.fromStreamerConfig(streamerConf);
StreamerUtil.initTableIfNotExists(conf);
StreamExecutionEnvironment execEnv = StreamExecutionEnvironment.getExecutionEnvironment();
execEnv.getConfig().disableObjectReuse();
execEnv.setParallelism(4);
// set up checkpoint interval
execEnv.enableCheckpointing(4000, CheckpointingMode.EXACTLY_ONCE);
execEnv.getConfig().setGlobalJobParameters(streamerConf);
// Read from file source
RowType rowType =
(RowType) AvroSchemaConverter.convertToDataType(StreamerUtil.getSourceSchema(conf))
.getLogicalType();
JsonRowDataDeserializationSchema deserializationSchema = new JsonRowDataDeserializationSchema(
rowType,
InternalTypeInfo.of(rowType),
false,
true,
TimestampFormat.ISO_8601
);
String sourcePath = Objects.requireNonNull(Thread.currentThread()
.getContextClassLoader().getResource("test_source.data")).toString();
execEnv
// use continuous file source to trigger checkpoint
.addSource(new ContinuousFileSource.BoundedSourceFunction(new Path(sourcePath), 2))
.name("continuous_file_source")
.setParallelism(1)
.map(record -> deserializationSchema.deserialize(record.getBytes(StandardCharsets.UTF_8)))
.setParallelism(4)
.map(new RowDataToHoodieFunction<>(rowType, conf), TypeInformation.of(HoodieRecord.class))
.transform(InstantGenerateOperator.NAME, TypeInformation.of(HoodieRecord.class), new InstantGenerateOperator())
.name("instant_generator")
.uid("instant_generator_id")
// Key-by record key, to avoid multiple subtasks write to a bucket at the same time
.keyBy(HoodieRecord::getRecordKey)
// use the bucket assigner to generate bucket IDs
.transform(
"bucket_assigner",
TypeInformation.of(HoodieRecord.class),
new KeyedProcessOperator<>(new BucketAssignFunction<>(conf)))
.uid("uid_bucket_assigner")
// shuffle by fileId(bucket id)
.keyBy(record -> record.getCurrentLocation().getFileId())
// write operator, where the write operation really happens
.transform(KeyedWriteProcessOperator.NAME, TypeInformation.of(new TypeHint<Tuple3<String, List<WriteStatus>, Integer>>() {
}), new KeyedWriteProcessOperator(new KeyedWriteProcessFunction()))
.name("write_process")
.uid("write_process_uid")
.setParallelism(4)
// Commit can only be executed once, so make it one parallelism
.addSink(new CommitSink())
.name("commit_sink")
.uid("commit_sink_uid")
.setParallelism(1);
JobClient client = execEnv.executeAsync(execEnv.getStreamGraph(conf.getString(FlinkOptions.TABLE_NAME)));
// wait for the streaming job to finish
client.getJobExecutionResult().get();
TestData.checkWrittenFullData(tempFile, EXPECTED);
}
@Test
public void testMergeOnReadWriteWithCompaction() throws Exception {
Configuration conf = TestConfigurations.getDefaultConf(tempFile.getAbsolutePath());

View File

@@ -1,89 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.sink.transform;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.OverwriteWithLatestAvroPayload;
import org.apache.hudi.common.testutils.RawTripTestPayload;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.keygen.constant.KeyGeneratorOptions;
import org.apache.hudi.testutils.HoodieFlinkClientTestHarness;
import org.apache.avro.Schema;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;
import static org.apache.hudi.common.testutils.HoodieTestDataGenerator.AVRO_SCHEMA;
public class TestJsonStringToHoodieRecordMapFunction extends HoodieFlinkClientTestHarness {
@BeforeEach
public void init() {
initPath();
initTestDataGenerator();
initFileSystem();
initFlinkMiniCluster();
}
@AfterEach
public void clean() throws Exception {
cleanupTestDataGenerator();
cleanupFileSystem();
cleanupFlinkMiniCluster();
}
@Test
public void testMapFunction() throws Exception {
final String newCommitTime = "001";
final int numRecords = 10;
List<HoodieRecord> records = dataGen.generateInserts(newCommitTime, numRecords);
List<String> recordStr = RawTripTestPayload.recordsToStrings(records);
Schema schema = AVRO_SCHEMA;
TypedProperties props = new TypedProperties();
props.put(HoodieWriteConfig.WRITE_PAYLOAD_CLASS, OverwriteWithLatestAvroPayload.class.getName());
props.put(HoodieWriteConfig.PRECOMBINE_FIELD_PROP, "timestamp");
props.put(KeyGeneratorOptions.RECORDKEY_FIELD_OPT_KEY, "_row_key");
props.put(KeyGeneratorOptions.PARTITIONPATH_FIELD_OPT_KEY, "current_date");
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
SimpleTestSinkFunction.valuesList.clear();
env.fromCollection(recordStr)
.map(new JsonStringToHoodieRecordMapFunction(props, Option.of(schema.toString())))
.addSink(new SimpleTestSinkFunction());
env.execute();
// input records all present in the sink
Assertions.assertEquals(10, SimpleTestSinkFunction.valuesList.size());
// input keys all present in the sink
Set<String> inputKeySet = records.stream().map(r -> r.getKey().getRecordKey()).collect(Collectors.toSet());
Assertions.assertEquals(10, SimpleTestSinkFunction.valuesList.stream()
.map(r -> inputKeySet.contains(r.getRecordKey())).filter(b -> b).count());
}
}