From 0a5863939b8da149f6dab3ec0fdb86d77baa9739 Mon Sep 17 00:00:00 2001 From: hiscat <46845236+MyLanPangzi@users.noreply.github.com> Date: Fri, 7 May 2021 10:58:49 +0800 Subject: [PATCH] [HUDI-1821] Remove legacy code for Flink writer (#2868) --- .../HoodieFlinkStreamerException.java | 30 -- .../java/org/apache/hudi/sink/CommitSink.java | 153 --------- .../hudi/sink/InstantGenerateOperator.java | 297 ------------------ .../hudi/sink/KeyedWriteProcessFunction.java | 194 ------------ .../hudi/sink/KeyedWriteProcessOperator.java | 66 ---- .../JsonStringToHoodieRecordMapFunction.java | 88 ------ .../hudi/streamer/HoodieFlinkStreamer.java | 82 +++-- .../hudi/streamer/HoodieFlinkStreamerV2.java | 110 ------- .../apache/hudi/sink/StreamWriteITCase.java | 73 ----- ...stJsonStringToHoodieRecordMapFunction.java | 89 ------ 10 files changed, 36 insertions(+), 1146 deletions(-) delete mode 100644 hudi-flink/src/main/java/org/apache/hudi/exception/HoodieFlinkStreamerException.java delete mode 100644 hudi-flink/src/main/java/org/apache/hudi/sink/CommitSink.java delete mode 100644 hudi-flink/src/main/java/org/apache/hudi/sink/InstantGenerateOperator.java delete mode 100644 hudi-flink/src/main/java/org/apache/hudi/sink/KeyedWriteProcessFunction.java delete mode 100644 hudi-flink/src/main/java/org/apache/hudi/sink/KeyedWriteProcessOperator.java delete mode 100644 hudi-flink/src/main/java/org/apache/hudi/sink/transform/JsonStringToHoodieRecordMapFunction.java delete mode 100644 hudi-flink/src/main/java/org/apache/hudi/streamer/HoodieFlinkStreamerV2.java delete mode 100644 hudi-flink/src/test/java/org/apache/hudi/sink/transform/TestJsonStringToHoodieRecordMapFunction.java diff --git a/hudi-flink/src/main/java/org/apache/hudi/exception/HoodieFlinkStreamerException.java b/hudi-flink/src/main/java/org/apache/hudi/exception/HoodieFlinkStreamerException.java deleted file mode 100644 index 0aadce83d..000000000 --- a/hudi-flink/src/main/java/org/apache/hudi/exception/HoodieFlinkStreamerException.java +++ /dev/null @@ -1,30 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hudi.exception; - -public class HoodieFlinkStreamerException extends HoodieException { - - public HoodieFlinkStreamerException(String msg, Throwable e) { - super(msg, e); - } - - public HoodieFlinkStreamerException(String msg) { - super(msg); - } -} diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/CommitSink.java b/hudi-flink/src/main/java/org/apache/hudi/sink/CommitSink.java deleted file mode 100644 index 00ef66d2c..000000000 --- a/hudi-flink/src/main/java/org/apache/hudi/sink/CommitSink.java +++ /dev/null @@ -1,153 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hudi.sink; - -import org.apache.hudi.client.FlinkTaskContextSupplier; -import org.apache.hudi.client.HoodieFlinkWriteClient; -import org.apache.hudi.client.WriteStatus; -import org.apache.hudi.client.common.HoodieFlinkEngineContext; -import org.apache.hudi.common.util.Option; -import org.apache.hudi.exception.HoodieException; -import org.apache.hudi.exception.HoodieFlinkStreamerException; -import org.apache.hudi.streamer.FlinkStreamerConfig; -import org.apache.hudi.util.StreamerUtil; - -import org.apache.flink.api.java.tuple.Tuple3; -import org.apache.flink.configuration.Configuration; -import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.ArrayList; -import java.util.Collection; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.stream.Collectors; - -/** - * Function helps to execute commit operation. this operation should be executed only once. - */ -public class CommitSink extends RichSinkFunction, Integer>> { - - private static final Logger LOG = LoggerFactory.getLogger(CommitSink.class); - /** - * Job conf. - */ - private FlinkStreamerConfig cfg; - - /** - * Write client. - */ - private transient HoodieFlinkWriteClient writeClient; - - /** - * Write result buffer. - */ - private Map>> bufferedWriteStatus = new HashMap<>(); - - /** - * Parallelism of this job. - */ - private Integer writeParallelSize = 0; - - @Override - public void open(Configuration parameters) throws Exception { - super.open(parameters); - // Get configs from runtimeContext - cfg = (FlinkStreamerConfig) getRuntimeContext().getExecutionConfig().getGlobalJobParameters(); - - writeParallelSize = getRuntimeContext().getExecutionConfig().getParallelism(); - - // writeClient - writeClient = new HoodieFlinkWriteClient<>(new HoodieFlinkEngineContext(new FlinkTaskContextSupplier(null)), StreamerUtil.getHoodieClientConfig(cfg)); - } - - @Override - public void invoke(Tuple3, Integer> writeStatues, Context context) { - LOG.info("Receive records, instantTime = [{}], subtaskId = [{}], WriteStatus size = [{}]", writeStatues.f0, writeStatues.f2, writeStatues.f1.size()); - try { - if (bufferedWriteStatus.containsKey(writeStatues.f0)) { - bufferedWriteStatus.get(writeStatues.f0).add(writeStatues.f1); - } else { - List> oneBatchData = new ArrayList<>(writeParallelSize); - oneBatchData.add(writeStatues.f1); - bufferedWriteStatus.put(writeStatues.f0, oneBatchData); - } - // check and commit - checkAndCommit(writeStatues.f0); - } catch (Exception e) { - throw new HoodieFlinkStreamerException("Invoke sink error", e); - } - } - - /** - * Check and commit if all subtask completed. - * - * @throws Exception - */ - private void checkAndCommit(String instantTime) throws Exception { - if (bufferedWriteStatus.get(instantTime).size() == writeParallelSize) { - LOG.info("Instant [{}] process complete, start commit!", instantTime); - doCommit(instantTime); - bufferedWriteStatus.clear(); - LOG.info("Instant [{}] commit completed!", instantTime); - } else { - LOG.info("Instant [{}], can not commit yet, subtask completed : [{}/{}]", instantTime, bufferedWriteStatus.get(instantTime).size(), writeParallelSize); - } - } - - private void doCommit(String instantTime) { - // get the records to commit - List writeResults = bufferedWriteStatus.get(instantTime).stream().flatMap(Collection::stream).collect(Collectors.toList()); - - // commit and rollback - long totalErrorRecords = writeResults.stream().map(WriteStatus::getTotalErrorRecords).reduce(Long::sum).orElse(0L); - long totalRecords = writeResults.stream().map(WriteStatus::getTotalRecords).reduce(Long::sum).orElse(0L); - boolean hasErrors = totalErrorRecords > 0; - - if (!hasErrors || cfg.commitOnErrors) { - HashMap checkpointCommitMetadata = new HashMap<>(); - if (hasErrors) { - LOG.warn("Some records failed to be merged but forcing commit since commitOnErrors set. Errors/Total=" - + totalErrorRecords + "/" + totalRecords); - } - - boolean success = writeClient.commit(instantTime, writeResults, Option.of(checkpointCommitMetadata)); - if (success) { - LOG.warn("Commit " + instantTime + " successful!"); - } else { - LOG.warn("Commit " + instantTime + " failed!"); - throw new HoodieException("Commit " + instantTime + " failed!"); - } - } else { - LOG.error("Streamer sync found errors when writing. Errors/Total=" + totalErrorRecords + "/" + totalRecords); - LOG.error("Printing out the top 100 errors"); - writeResults.stream().filter(WriteStatus::hasErrors).limit(100).forEach(ws -> { - LOG.error("Global error :", ws.getGlobalError()); - if (ws.getErrors().size() > 0) { - ws.getErrors().forEach((key, value) -> LOG.trace("Error for key:" + key + " is " + value)); - } - }); - // Rolling back instant - writeClient.rollback(instantTime); - throw new HoodieException("Commit " + instantTime + " failed and rolled-back !"); - } - } -} \ No newline at end of file diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/InstantGenerateOperator.java b/hudi-flink/src/main/java/org/apache/hudi/sink/InstantGenerateOperator.java deleted file mode 100644 index 3c4fb1e84..000000000 --- a/hudi-flink/src/main/java/org/apache/hudi/sink/InstantGenerateOperator.java +++ /dev/null @@ -1,297 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hudi.sink; - -import org.apache.hudi.client.FlinkTaskContextSupplier; -import org.apache.hudi.client.HoodieFlinkWriteClient; -import org.apache.hudi.client.common.HoodieFlinkEngineContext; -import org.apache.hudi.common.config.SerializableConfiguration; -import org.apache.hudi.common.engine.TaskContextSupplier; -import org.apache.hudi.common.fs.FSUtils; -import org.apache.hudi.common.model.HoodieRecord; -import org.apache.hudi.common.model.HoodieTableType; -import org.apache.hudi.common.table.HoodieTableMetaClient; -import org.apache.hudi.common.table.timeline.HoodieTimeline; -import org.apache.hudi.common.util.CommitUtils; -import org.apache.hudi.common.util.StringUtils; -import org.apache.hudi.configuration.FlinkOptions; -import org.apache.hudi.streamer.FlinkStreamerConfig; -import org.apache.hudi.util.StreamerUtil; - -import org.apache.flink.api.common.state.ListState; -import org.apache.flink.api.common.state.ListStateDescriptor; -import org.apache.flink.runtime.state.StateInitializationContext; -import org.apache.flink.runtime.state.StateSnapshotContext; -import org.apache.flink.streaming.api.operators.AbstractStreamOperator; -import org.apache.flink.streaming.api.operators.OneInputStreamOperator; -import org.apache.flink.streaming.api.operators.StreamingRuntimeContext; -import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; -import org.apache.hadoop.fs.FileStatus; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.fs.PathFilter; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.Iterator; -import java.util.List; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicLong; - -/** - * Operator helps to generate globally unique instant. Before generate a new instant {@link InstantGenerateOperator} - * will always check whether the last instant has completed. if it is completed and has records flows in, a new instant - * will be generated immediately, otherwise, wait and check the state of last instant until time out and throw an exception. - */ -public class InstantGenerateOperator extends AbstractStreamOperator implements OneInputStreamOperator { - - private static final Logger LOG = LoggerFactory.getLogger(InstantGenerateOperator.class); - public static final String NAME = "InstantGenerateOperator"; - - private FlinkStreamerConfig cfg; - private HoodieFlinkWriteClient writeClient; - private SerializableConfiguration serializableHadoopConf; - private transient FileSystem fs; - private String latestInstant = ""; - private List latestInstantList = new ArrayList<>(1); - private transient ListState latestInstantState; - private Integer retryTimes; - private Integer retryInterval; - private static final String DELIMITER = "_"; - private static final String INSTANT_MARKER_FOLDER_NAME = ".instant_marker"; - private transient boolean isMain = false; - private AtomicLong recordCounter = new AtomicLong(0); - private StreamingRuntimeContext runtimeContext; - private int indexOfThisSubtask; - - @Override - public void processElement(StreamRecord streamRecord) throws Exception { - if (streamRecord.getValue() != null) { - output.collect(streamRecord); - recordCounter.incrementAndGet(); - } - } - - @Override - public void open() throws Exception { - super.open(); - // get configs from runtimeContext - cfg = (FlinkStreamerConfig) getRuntimeContext().getExecutionConfig().getGlobalJobParameters(); - - // retry times - retryTimes = Integer.valueOf(cfg.instantRetryTimes); - - // retry interval - retryInterval = Integer.valueOf(cfg.instantRetryInterval); - - // hadoopConf - serializableHadoopConf = new SerializableConfiguration(StreamerUtil.getHadoopConf()); - - // Hadoop FileSystem - fs = FSUtils.getFs(cfg.targetBasePath, serializableHadoopConf.get()); - - if (isMain) { - TaskContextSupplier taskContextSupplier = new FlinkTaskContextSupplier(null); - - // writeClient - writeClient = new HoodieFlinkWriteClient(new HoodieFlinkEngineContext(taskContextSupplier), StreamerUtil.getHoodieClientConfig(cfg)); - - // init table, create it if not exists. - StreamerUtil.initTableIfNotExists(FlinkOptions.fromStreamerConfig(cfg)); - - // create instant marker directory - createInstantMarkerDir(); - } - } - - @Override - public void prepareSnapshotPreBarrier(long checkpointId) throws Exception { - super.prepareSnapshotPreBarrier(checkpointId); - String instantMarkerFileName = String.format("%d%s%d%s%d", indexOfThisSubtask, DELIMITER, checkpointId, DELIMITER, recordCounter.get()); - Path path = generateCurrentMakerFilePath(instantMarkerFileName); - // create marker file - fs.create(path, true); - LOG.info("Subtask [{}] at checkpoint [{}] created marker file [{}]", indexOfThisSubtask, checkpointId, instantMarkerFileName); - if (isMain) { - // check whether the last instant is completed, will try specific times until an exception is thrown - if (!StringUtils.isNullOrEmpty(latestInstant)) { - doCheck(); - // last instant completed, set it empty - latestInstant = ""; - } - boolean receivedDataInCurrentCP = checkReceivedData(checkpointId); - // no data no new instant - if (receivedDataInCurrentCP) { - latestInstant = startNewInstant(checkpointId); - } - } - } - - @Override - public void initializeState(StateInitializationContext context) throws Exception { - runtimeContext = getRuntimeContext(); - indexOfThisSubtask = runtimeContext.getIndexOfThisSubtask(); - isMain = indexOfThisSubtask == 0; - - if (isMain) { - // instantState - ListStateDescriptor latestInstantStateDescriptor = new ListStateDescriptor<>("latestInstant", String.class); - latestInstantState = context.getOperatorStateStore().getListState(latestInstantStateDescriptor); - - if (context.isRestored()) { - Iterator latestInstantIterator = latestInstantState.get().iterator(); - latestInstantIterator.forEachRemaining(x -> latestInstant = x); - LOG.info("Restoring the latest instant [{}] from the state", latestInstant); - } - } - } - - @Override - public void snapshotState(StateSnapshotContext functionSnapshotContext) throws Exception { - long checkpointId = functionSnapshotContext.getCheckpointId(); - long recordSize = recordCounter.get(); - if (isMain) { - LOG.info("Update latest instant [{}] records size [{}] checkpointId [{}]", latestInstant, recordSize, checkpointId); - if (latestInstantList.isEmpty()) { - latestInstantList.add(latestInstant); - } else { - latestInstantList.set(0, latestInstant); - } - latestInstantState.update(latestInstantList); - } else { - LOG.info("Task instance {} received {} records in checkpoint [{}]", indexOfThisSubtask, recordSize, checkpointId); - } - recordCounter.set(0); - } - - /** - * Create a new instant. - * - * @param checkpointId - */ - private String startNewInstant(long checkpointId) { - String newTime = writeClient.startCommit(); - final String actionType = CommitUtils.getCommitActionType(HoodieTableType.valueOf(this.cfg.tableType)); - this.writeClient.transitionRequestedToInflight(actionType, newTime); - LOG.info("create instant [{}], at checkpoint [{}]", newTime, checkpointId); - return newTime; - } - - /** - * Check the status of last instant. - */ - private void doCheck() throws InterruptedException { - // query the requested and inflight commit/deltacommit instants - String commitType = cfg.tableType.equals(HoodieTableType.COPY_ON_WRITE.name()) ? HoodieTimeline.COMMIT_ACTION : HoodieTimeline.DELTA_COMMIT_ACTION; - LOG.info("Query latest instant [{}]", latestInstant); - List rollbackPendingCommits = writeClient.getInflightsAndRequestedInstants(commitType); - int tryTimes = 0; - while (tryTimes < retryTimes) { - tryTimes++; - StringBuilder sb = new StringBuilder(); - if (rollbackPendingCommits.contains(latestInstant)) { - rollbackPendingCommits.forEach(x -> sb.append(x).append(",")); - LOG.warn("Latest transaction [{}] is not completed! unCompleted transaction:[{}],try times [{}]", latestInstant, sb, tryTimes); - TimeUnit.SECONDS.sleep(retryInterval); - rollbackPendingCommits = writeClient.getInflightsAndRequestedInstants(commitType); - } else { - LOG.warn("Latest transaction [{}] is completed! Completed transaction, try times [{}]", latestInstant, tryTimes); - return; - } - } - throw new InterruptedException(String.format("Last instant costs more than %s second, stop task now", retryTimes * retryInterval)); - } - - @Override - public void close() throws Exception { - if (writeClient != null) { - writeClient.close(); - } - if (fs != null) { - fs.close(); - } - } - - private boolean checkReceivedData(long checkpointId) throws InterruptedException, IOException { - int numberOfParallelSubtasks = runtimeContext.getNumberOfParallelSubtasks(); - FileStatus[] fileStatuses; - Path instantMarkerPath = generateCurrentMakerDirPath(); - // waiting all subtask create marker file ready - while (true) { - Thread.sleep(500L); - fileStatuses = fs.listStatus(instantMarkerPath, new PathFilter() { - @Override - public boolean accept(Path pathname) { - return pathname.getName().contains(String.format("%s%d%s", DELIMITER, checkpointId, DELIMITER)); - } - }); - - // is ready - if (fileStatuses != null && fileStatuses.length == numberOfParallelSubtasks) { - break; - } - } - - boolean receivedData = false; - // check whether has data in this checkpoint and delete maker file. - for (FileStatus fileStatus : fileStatuses) { - Path path = fileStatus.getPath(); - String name = path.getName(); - // has data - if (Long.parseLong(name.split(DELIMITER)[2]) > 0) { - receivedData = true; - break; - } - } - - // delete all marker file - cleanMarkerDir(instantMarkerPath); - - return receivedData; - } - - private void createInstantMarkerDir() throws IOException { - // Always create instantMarkerFolder which is needed for InstantGenerateOperator - final Path instantMarkerFolder = new Path(new Path(cfg.targetBasePath, HoodieTableMetaClient.AUXILIARYFOLDER_NAME), INSTANT_MARKER_FOLDER_NAME); - if (!fs.exists(instantMarkerFolder)) { - fs.mkdirs(instantMarkerFolder); - } else { - // Clean marker dir. - cleanMarkerDir(instantMarkerFolder); - } - } - - private void cleanMarkerDir(Path instantMarkerFolder) throws IOException { - FileStatus[] fileStatuses = fs.listStatus(instantMarkerFolder); - for (FileStatus fileStatus : fileStatuses) { - fs.delete(fileStatus.getPath(), true); - } - } - - private Path generateCurrentMakerDirPath() { - Path auxPath = new Path(cfg.targetBasePath, HoodieTableMetaClient.AUXILIARYFOLDER_NAME); - return new Path(auxPath, INSTANT_MARKER_FOLDER_NAME); - } - - private Path generateCurrentMakerFilePath(String instantMarkerFileName) { - return new Path(generateCurrentMakerDirPath(), instantMarkerFileName); - } -} diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/KeyedWriteProcessFunction.java b/hudi-flink/src/main/java/org/apache/hudi/sink/KeyedWriteProcessFunction.java deleted file mode 100644 index 26ac820e0..000000000 --- a/hudi-flink/src/main/java/org/apache/hudi/sink/KeyedWriteProcessFunction.java +++ /dev/null @@ -1,194 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hudi.sink; - -import org.apache.hudi.client.FlinkTaskContextSupplier; -import org.apache.hudi.client.HoodieFlinkWriteClient; -import org.apache.hudi.client.WriteStatus; -import org.apache.hudi.client.common.HoodieFlinkEngineContext; -import org.apache.hudi.common.config.SerializableConfiguration; -import org.apache.hudi.common.model.HoodieRecord; -import org.apache.hudi.common.model.HoodieTableType; -import org.apache.hudi.common.table.timeline.HoodieTimeline; -import org.apache.hudi.exception.HoodieFlinkStreamerException; -import org.apache.hudi.index.HoodieIndex; -import org.apache.hudi.streamer.FlinkStreamerConfig; -import org.apache.hudi.table.action.commit.FlinkWriteHelper; -import org.apache.hudi.util.StreamerUtil; - -import org.apache.flink.api.java.tuple.Tuple3; -import org.apache.flink.configuration.Configuration; -import org.apache.flink.runtime.state.CheckpointListener; -import org.apache.flink.runtime.state.FunctionInitializationContext; -import org.apache.flink.runtime.state.FunctionSnapshotContext; -import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction; -import org.apache.flink.streaming.api.functions.KeyedProcessFunction; -import org.apache.flink.util.Collector; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.ArrayList; -import java.util.LinkedHashMap; -import java.util.List; -import java.util.Map; - -/** - * A {@link KeyedProcessFunction} where the write operations really happens. - */ -public class KeyedWriteProcessFunction - extends KeyedProcessFunction, Integer>> - implements CheckpointedFunction, CheckpointListener { - - private static final Logger LOG = LoggerFactory.getLogger(KeyedWriteProcessFunction.class); - /** - * Records buffer, will be processed in snapshotState function. - */ - private Map> bufferedRecords; - - /** - * Flink collector help s to send data downstream. - */ - private Collector, Integer>> output; - - /** - * Id of current subtask. - */ - private int indexOfThisSubtask; - - /** - * Instant time this batch belongs to. - */ - private String latestInstant; - - /** - * Flag indicate whether this subtask has records in. - */ - private boolean hasRecordsIn; - - /** - * Job conf. - */ - private FlinkStreamerConfig cfg; - - /** - * Write Client. - */ - private transient HoodieFlinkWriteClient writeClient; - - @Override - public void open(Configuration parameters) throws Exception { - super.open(parameters); - - this.bufferedRecords = new LinkedHashMap<>(); - - indexOfThisSubtask = getRuntimeContext().getIndexOfThisSubtask(); - - cfg = (FlinkStreamerConfig) getRuntimeContext().getExecutionConfig().getGlobalJobParameters(); - - HoodieFlinkEngineContext context = - new HoodieFlinkEngineContext(new SerializableConfiguration(new org.apache.hadoop.conf.Configuration()), new FlinkTaskContextSupplier(getRuntimeContext())); - - writeClient = new HoodieFlinkWriteClient<>(context, StreamerUtil.getHoodieClientConfig(cfg)); - } - - @Override - public void snapshotState(FunctionSnapshotContext context) { - - // get latest requested instant - String commitType = cfg.tableType.equals(HoodieTableType.COPY_ON_WRITE.name()) ? HoodieTimeline.COMMIT_ACTION : HoodieTimeline.DELTA_COMMIT_ACTION; - List latestInstants = writeClient.getInflightsAndRequestedInstants(commitType); - latestInstant = latestInstants.isEmpty() ? null : latestInstants.get(0); - - if (bufferedRecords.size() > 0) { - hasRecordsIn = true; - if (output != null && latestInstant != null) { - String instantTimestamp = latestInstant; - LOG.info("Write records, subtask id = [{}] checkpoint_id = [{}}] instant = [{}], record size = [{}]", indexOfThisSubtask, context.getCheckpointId(), instantTimestamp, bufferedRecords.size()); - - final List writeStatus = new ArrayList<>(); - this.bufferedRecords.values().forEach(records -> { - if (records.size() > 0) { - if (cfg.filterDupes) { - records = FlinkWriteHelper.newInstance().deduplicateRecords(records, (HoodieIndex) null, -1); - } - switch (cfg.operation) { - case INSERT: - writeStatus.addAll(writeClient.insert(records, instantTimestamp)); - break; - case UPSERT: - writeStatus.addAll(writeClient.upsert(records, instantTimestamp)); - break; - default: - throw new HoodieFlinkStreamerException("Unknown operation : " + cfg.operation); - } - } - }); - output.collect(new Tuple3<>(instantTimestamp, writeStatus, indexOfThisSubtask)); - bufferedRecords.clear(); - } - } else { - LOG.info("No data in subtask [{}]", indexOfThisSubtask); - hasRecordsIn = false; - } - } - - @Override - public void initializeState(FunctionInitializationContext functionInitializationContext) { - // no operation - } - - @Override - public void processElement(HoodieRecord hoodieRecord, Context context, Collector, Integer>> collector) { - if (output == null) { - output = collector; - } - - // buffer the records - putDataIntoBuffer(hoodieRecord); - } - - @Override - public void notifyCheckpointComplete(long checkpointId) throws Exception { - this.writeClient.cleanHandles(); - } - - public boolean hasRecordsIn() { - return hasRecordsIn; - } - - public String getLatestInstant() { - return latestInstant; - } - - private void putDataIntoBuffer(HoodieRecord record) { - final String fileId = record.getCurrentLocation().getFileId(); - final String key = StreamerUtil.generateBucketKey(record.getPartitionPath(), fileId); - if (!this.bufferedRecords.containsKey(key)) { - this.bufferedRecords.put(key, new ArrayList<>()); - } - this.bufferedRecords.get(key).add(record); - } - - @Override - public void close() { - if (writeClient != null) { - writeClient.close(); - } - } -} diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/KeyedWriteProcessOperator.java b/hudi-flink/src/main/java/org/apache/hudi/sink/KeyedWriteProcessOperator.java deleted file mode 100644 index ebd15d169..000000000 --- a/hudi-flink/src/main/java/org/apache/hudi/sink/KeyedWriteProcessOperator.java +++ /dev/null @@ -1,66 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hudi.sink; - -import org.apache.hudi.client.WriteStatus; -import org.apache.hudi.common.model.HoodieRecord; - -import org.apache.flink.api.java.tuple.Tuple3; -import org.apache.flink.runtime.state.StateSnapshotContext; -import org.apache.flink.streaming.api.functions.KeyedProcessFunction; -import org.apache.flink.streaming.api.operators.KeyedProcessOperator; -import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.ArrayList; -import java.util.List; - -/** - * Operator helps to mock empty write results and deliver downstream when no data flow in some subtask. - */ -public class KeyedWriteProcessOperator extends KeyedProcessOperator, Integer>> { - - public static final String NAME = "WriteProcessOperator"; - private static final Logger LOG = LoggerFactory.getLogger(KeyedWriteProcessOperator.class); - private KeyedWriteProcessFunction writeProcessFunction; - - public KeyedWriteProcessOperator(KeyedProcessFunction, Integer>> function) { - super(function); - this.writeProcessFunction = (KeyedWriteProcessFunction) function; - } - - @Override - public void snapshotState(StateSnapshotContext context) throws Exception { - // This super.snapshotState(context) triggers `writeProcessFunction.snapshotState()` method. which means the logic - // below will be executed after `writeProcessFunction.snapshotState()` method. - - // If there is no data flows in `writeProcessFunction`, it will never send anything downstream. so, in order to make - // sure each subtask will send a write status downstream, we implement this operator`s snapshotState() to mock empty - // write status and send it downstream when there is no data flows in some subtasks. - super.snapshotState(context); - - // make up an empty result and send downstream - if (!writeProcessFunction.hasRecordsIn() && writeProcessFunction.getLatestInstant() != null) { - String instantTime = writeProcessFunction.getLatestInstant(); - LOG.info("Mock empty writeStatus, subtaskId = [{}], instant = [{}]", getRuntimeContext().getIndexOfThisSubtask(), instantTime); - output.collect(new StreamRecord<>(new Tuple3(instantTime, new ArrayList(), getRuntimeContext().getIndexOfThisSubtask()))); - } - } -} diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/transform/JsonStringToHoodieRecordMapFunction.java b/hudi-flink/src/main/java/org/apache/hudi/sink/transform/JsonStringToHoodieRecordMapFunction.java deleted file mode 100644 index a14943cf9..000000000 --- a/hudi-flink/src/main/java/org/apache/hudi/sink/transform/JsonStringToHoodieRecordMapFunction.java +++ /dev/null @@ -1,88 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hudi.sink.transform; - -import org.apache.hudi.avro.HoodieAvroUtils; -import org.apache.hudi.common.config.TypedProperties; -import org.apache.hudi.common.model.HoodieRecord; -import org.apache.hudi.common.model.HoodieRecordPayload; -import org.apache.hudi.common.model.OverwriteWithLatestAvroPayload; -import org.apache.hudi.common.util.Option; -import org.apache.hudi.config.HoodieWriteConfig; -import org.apache.hudi.exception.HoodieFlinkStreamerException; -import org.apache.hudi.keygen.KeyGenerator; -import org.apache.hudi.keygen.SimpleAvroKeyGenerator; -import org.apache.hudi.schema.FilebasedSchemaProvider; -import org.apache.hudi.util.AvroConvertor; -import org.apache.hudi.util.StreamerUtil; - -import org.apache.avro.Schema; -import org.apache.avro.generic.GenericRecord; -import org.apache.flink.api.common.functions.MapFunction; - -import java.io.IOException; - -/** - * Function helps to transfer json string to {@link HoodieRecord}. - */ -public class JsonStringToHoodieRecordMapFunction implements MapFunction { - - private TypedProperties props; - private KeyGenerator keyGenerator; - private AvroConvertor avroConvertor; - private Option schemaStr; - private String payloadClassName; - private String orderingField; - - public JsonStringToHoodieRecordMapFunction(TypedProperties props) { - this(props, Option.empty()); - } - - public JsonStringToHoodieRecordMapFunction(TypedProperties props, Option schemaStr) { - this.props = props; - this.schemaStr = schemaStr; - init(); - } - - @Override - public HoodieRecord map(String value) throws Exception { - GenericRecord gr = this.avroConvertor.fromJson(value); - HoodieRecordPayload payload = StreamerUtil.createPayload(this.payloadClassName, gr, - (Comparable) HoodieAvroUtils.getNestedFieldVal(gr, this.orderingField, false)); - - return new HoodieRecord<>(this.keyGenerator.getKey(gr), payload); - } - - private void init() { - if (schemaStr.isPresent()) { - this.avroConvertor = new AvroConvertor(new Schema.Parser().parse(schemaStr.get())); - } else { - this.avroConvertor = new AvroConvertor(new FilebasedSchemaProvider(props).getSourceSchema()); - } - this.payloadClassName = props.getString(HoodieWriteConfig.WRITE_PAYLOAD_CLASS, - OverwriteWithLatestAvroPayload.class.getName()); - this.orderingField = props.getString(HoodieWriteConfig.PRECOMBINE_FIELD_PROP, "ts"); - try { - this.keyGenerator = StreamerUtil.createKeyGenerator(props); - } catch (IOException e) { - throw new HoodieFlinkStreamerException(String.format("KeyGenerator %s initialization failed", - props.getString(HoodieWriteConfig.KEYGENERATOR_CLASS_PROP, SimpleAvroKeyGenerator.class.getName())), e); - } - } -} diff --git a/hudi-flink/src/main/java/org/apache/hudi/streamer/HoodieFlinkStreamer.java b/hudi-flink/src/main/java/org/apache/hudi/streamer/HoodieFlinkStreamer.java index 0d37574a4..88ab2b658 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/streamer/HoodieFlinkStreamer.java +++ b/hudi-flink/src/main/java/org/apache/hudi/streamer/HoodieFlinkStreamer.java @@ -18,33 +18,28 @@ package org.apache.hudi.streamer; -import org.apache.hudi.client.WriteStatus; -import org.apache.hudi.common.config.TypedProperties; import org.apache.hudi.common.model.HoodieRecord; -import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.configuration.FlinkOptions; -import org.apache.hudi.sink.CommitSink; -import org.apache.hudi.sink.InstantGenerateOperator; -import org.apache.hudi.sink.KeyedWriteProcessFunction; -import org.apache.hudi.sink.KeyedWriteProcessOperator; +import org.apache.hudi.sink.CleanFunction; +import org.apache.hudi.sink.StreamWriteOperatorFactory; import org.apache.hudi.sink.partitioner.BucketAssignFunction; -import org.apache.hudi.sink.transform.JsonStringToHoodieRecordMapFunction; +import org.apache.hudi.sink.transform.RowDataToHoodieFunction; +import org.apache.hudi.util.AvroSchemaConverter; import org.apache.hudi.util.StreamerUtil; import com.beust.jcommander.JCommander; -import org.apache.flink.api.common.serialization.SimpleStringSchema; -import org.apache.flink.api.common.typeinfo.TypeHint; import org.apache.flink.api.common.typeinfo.TypeInformation; -import org.apache.flink.api.java.tuple.Tuple3; import org.apache.flink.configuration.Configuration; +import org.apache.flink.formats.json.JsonRowDataDeserializationSchema; +import org.apache.flink.formats.json.TimestampFormat; import org.apache.flink.runtime.state.filesystem.FsStateBackend; -import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.operators.KeyedProcessOperator; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer; +import org.apache.flink.table.runtime.typeutils.InternalTypeInfo; +import org.apache.flink.table.types.logical.RowType; -import java.util.List; -import java.util.Objects; +import java.util.Properties; /** * An Utility which can incrementally consume data from Kafka and apply it to the target table. @@ -70,31 +65,31 @@ public class HoodieFlinkStreamer { env.setStateBackend(new FsStateBackend(cfg.flinkCheckPointPath)); } + Properties kafkaProps = StreamerUtil.appendKafkaProps(cfg); + + // Read from kafka source + RowType rowType = + (RowType) AvroSchemaConverter.convertToDataType(StreamerUtil.getSourceSchema(cfg)) + .getLogicalType(); Configuration conf = FlinkOptions.fromStreamerConfig(cfg); int numWriteTask = conf.getInteger(FlinkOptions.WRITE_TASKS); + StreamWriteOperatorFactory operatorFactory = + new StreamWriteOperatorFactory<>(conf); - TypedProperties props = StreamerUtil.appendKafkaProps(cfg); - - // add data source config - props.put(HoodieWriteConfig.WRITE_PAYLOAD_CLASS, cfg.payloadClassName); - props.put(HoodieWriteConfig.PRECOMBINE_FIELD_PROP, cfg.sourceOrderingField); - - StreamerUtil.initTableIfNotExists(conf); - // Read from kafka source - DataStream inputRecords = - env.addSource(new FlinkKafkaConsumer<>(cfg.kafkaTopic, new SimpleStringSchema(), props)) - .filter(Objects::nonNull) - .map(new JsonStringToHoodieRecordMapFunction(props)) - .name("kafka_to_hudi_record") - .uid("kafka_to_hudi_record_uid"); - - inputRecords.transform(InstantGenerateOperator.NAME, TypeInformation.of(HoodieRecord.class), new InstantGenerateOperator()) - .name("instant_generator") - .uid("instant_generator_id") - - // Keyby partition path, to avoid multiple subtasks writing to a partition at the same time + env.addSource(new FlinkKafkaConsumer<>( + cfg.kafkaTopic, + new JsonRowDataDeserializationSchema( + rowType, + InternalTypeInfo.of(rowType), + false, + true, + TimestampFormat.ISO_8601 + ), kafkaProps)) + .name("kafka_source") + .uid("uid_kafka_source") + .map(new RowDataToHoodieFunction<>(rowType, conf), TypeInformation.of(HoodieRecord.class)) + // Key-by partition path, to avoid multiple subtasks write to a partition at the same time .keyBy(HoodieRecord::getPartitionPath) - // use the bucket assigner to generate bucket IDs .transform( "bucket_assigner", TypeInformation.of(HoodieRecord.class), @@ -102,18 +97,13 @@ public class HoodieFlinkStreamer { .uid("uid_bucket_assigner") // shuffle by fileId(bucket id) .keyBy(record -> record.getCurrentLocation().getFileId()) - // write operator, where the write operation really happens - .transform(KeyedWriteProcessOperator.NAME, TypeInformation.of(new TypeHint, Integer>>() { - }), new KeyedWriteProcessOperator(new KeyedWriteProcessFunction())) - .name("write_process") - .uid("write_process_uid") + .transform("hoodie_stream_write", null, operatorFactory) + .uid("uid_hoodie_stream_write") .setParallelism(numWriteTask) - - // Commit can only be executed once, so make it one parallelism - .addSink(new CommitSink()) - .name("commit_sink") - .uid("commit_sink_uid") - .setParallelism(1); + .addSink(new CleanFunction<>(conf)) + .setParallelism(1) + .name("clean_commits") + .uid("uid_clean_commits"); env.execute(cfg.targetTableName); } diff --git a/hudi-flink/src/main/java/org/apache/hudi/streamer/HoodieFlinkStreamerV2.java b/hudi-flink/src/main/java/org/apache/hudi/streamer/HoodieFlinkStreamerV2.java deleted file mode 100644 index 10c0b419e..000000000 --- a/hudi-flink/src/main/java/org/apache/hudi/streamer/HoodieFlinkStreamerV2.java +++ /dev/null @@ -1,110 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hudi.streamer; - -import org.apache.hudi.common.model.HoodieRecord; -import org.apache.hudi.configuration.FlinkOptions; -import org.apache.hudi.sink.CleanFunction; -import org.apache.hudi.sink.StreamWriteOperatorFactory; -import org.apache.hudi.sink.partitioner.BucketAssignFunction; -import org.apache.hudi.sink.transform.RowDataToHoodieFunction; -import org.apache.hudi.util.AvroSchemaConverter; -import org.apache.hudi.util.StreamerUtil; - -import com.beust.jcommander.JCommander; -import org.apache.flink.api.common.typeinfo.TypeInformation; -import org.apache.flink.configuration.Configuration; -import org.apache.flink.formats.json.JsonRowDataDeserializationSchema; -import org.apache.flink.formats.json.TimestampFormat; -import org.apache.flink.runtime.state.filesystem.FsStateBackend; -import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; -import org.apache.flink.streaming.api.operators.KeyedProcessOperator; -import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer; -import org.apache.flink.table.runtime.typeutils.InternalTypeInfo; -import org.apache.flink.table.types.logical.RowType; - -import java.util.Properties; - -/** - * An Utility which can incrementally consume data from Kafka and apply it to the target table. - * currently, it only support COW table and insert, upsert operation. - */ -public class HoodieFlinkStreamerV2 { - public static void main(String[] args) throws Exception { - StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); - - final FlinkStreamerConfig cfg = new FlinkStreamerConfig(); - JCommander cmd = new JCommander(cfg, null, args); - if (cfg.help || args.length == 0) { - cmd.usage(); - System.exit(1); - } - env.enableCheckpointing(cfg.checkpointInterval); - env.getConfig().setGlobalJobParameters(cfg); - // We use checkpoint to trigger write operation, including instant generating and committing, - // There can only be one checkpoint at one time. - env.getCheckpointConfig().setMaxConcurrentCheckpoints(1); - - if (cfg.flinkCheckPointPath != null) { - env.setStateBackend(new FsStateBackend(cfg.flinkCheckPointPath)); - } - - Properties kafkaProps = StreamerUtil.appendKafkaProps(cfg); - - // Read from kafka source - RowType rowType = - (RowType) AvroSchemaConverter.convertToDataType(StreamerUtil.getSourceSchema(cfg)) - .getLogicalType(); - Configuration conf = FlinkOptions.fromStreamerConfig(cfg); - int numWriteTask = conf.getInteger(FlinkOptions.WRITE_TASKS); - StreamWriteOperatorFactory operatorFactory = - new StreamWriteOperatorFactory<>(conf); - - env.addSource(new FlinkKafkaConsumer<>( - cfg.kafkaTopic, - new JsonRowDataDeserializationSchema( - rowType, - InternalTypeInfo.of(rowType), - false, - true, - TimestampFormat.ISO_8601 - ), kafkaProps)) - .name("kafka_source") - .uid("uid_kafka_source") - .map(new RowDataToHoodieFunction<>(rowType, conf), TypeInformation.of(HoodieRecord.class)) - // Key-by partition path, to avoid multiple subtasks write to a partition at the same time - .keyBy(HoodieRecord::getPartitionPath) - .transform( - "bucket_assigner", - TypeInformation.of(HoodieRecord.class), - new KeyedProcessOperator<>(new BucketAssignFunction<>(conf))) - .uid("uid_bucket_assigner") - // shuffle by fileId(bucket id) - .keyBy(record -> record.getCurrentLocation().getFileId()) - .transform("hoodie_stream_write", null, operatorFactory) - .uid("uid_hoodie_stream_write") - .setParallelism(numWriteTask) - .addSink(new CleanFunction<>(conf)) - .setParallelism(1) - .name("clean_commits") - .uid("uid_clean_commits"); - - env.execute(cfg.targetTableName); - } -} diff --git a/hudi-flink/src/test/java/org/apache/hudi/sink/StreamWriteITCase.java b/hudi-flink/src/test/java/org/apache/hudi/sink/StreamWriteITCase.java index 361fcef12..82360f496 100644 --- a/hudi-flink/src/test/java/org/apache/hudi/sink/StreamWriteITCase.java +++ b/hudi-flink/src/test/java/org/apache/hudi/sink/StreamWriteITCase.java @@ -18,7 +18,6 @@ package org.apache.hudi.sink; -import org.apache.hudi.client.WriteStatus; import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieTableType; import org.apache.hudi.configuration.FlinkOptions; @@ -29,7 +28,6 @@ import org.apache.hudi.sink.compact.CompactionPlanEvent; import org.apache.hudi.sink.compact.CompactionPlanOperator; import org.apache.hudi.sink.partitioner.BucketAssignFunction; import org.apache.hudi.sink.transform.RowDataToHoodieFunction; -import org.apache.hudi.streamer.FlinkStreamerConfig; import org.apache.hudi.util.AvroSchemaConverter; import org.apache.hudi.util.StreamerUtil; import org.apache.hudi.utils.TestConfigurations; @@ -39,10 +37,8 @@ import org.apache.hudi.utils.source.ContinuousFileSource; import org.apache.flink.api.common.JobStatus; import org.apache.flink.api.common.io.FilePathFilter; import org.apache.flink.api.common.typeinfo.BasicTypeInfo; -import org.apache.flink.api.common.typeinfo.TypeHint; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.io.TextInputFormat; -import org.apache.flink.api.java.tuple.Tuple3; import org.apache.flink.configuration.Configuration; import org.apache.flink.core.execution.JobClient; import org.apache.flink.core.fs.Path; @@ -140,75 +136,6 @@ public class StreamWriteITCase extends TestLogger { TestData.checkWrittenFullData(tempFile, EXPECTED); } - @Test - public void testWriteToHoodieLegacy() throws Exception { - FlinkStreamerConfig streamerConf = TestConfigurations.getDefaultStreamerConf(tempFile.getAbsolutePath()); - Configuration conf = FlinkOptions.fromStreamerConfig(streamerConf); - StreamerUtil.initTableIfNotExists(conf); - StreamExecutionEnvironment execEnv = StreamExecutionEnvironment.getExecutionEnvironment(); - execEnv.getConfig().disableObjectReuse(); - execEnv.setParallelism(4); - // set up checkpoint interval - execEnv.enableCheckpointing(4000, CheckpointingMode.EXACTLY_ONCE); - execEnv.getConfig().setGlobalJobParameters(streamerConf); - - // Read from file source - RowType rowType = - (RowType) AvroSchemaConverter.convertToDataType(StreamerUtil.getSourceSchema(conf)) - .getLogicalType(); - - JsonRowDataDeserializationSchema deserializationSchema = new JsonRowDataDeserializationSchema( - rowType, - InternalTypeInfo.of(rowType), - false, - true, - TimestampFormat.ISO_8601 - ); - String sourcePath = Objects.requireNonNull(Thread.currentThread() - .getContextClassLoader().getResource("test_source.data")).toString(); - - execEnv - // use continuous file source to trigger checkpoint - .addSource(new ContinuousFileSource.BoundedSourceFunction(new Path(sourcePath), 2)) - .name("continuous_file_source") - .setParallelism(1) - .map(record -> deserializationSchema.deserialize(record.getBytes(StandardCharsets.UTF_8))) - .setParallelism(4) - .map(new RowDataToHoodieFunction<>(rowType, conf), TypeInformation.of(HoodieRecord.class)) - .transform(InstantGenerateOperator.NAME, TypeInformation.of(HoodieRecord.class), new InstantGenerateOperator()) - .name("instant_generator") - .uid("instant_generator_id") - - // Key-by record key, to avoid multiple subtasks write to a bucket at the same time - .keyBy(HoodieRecord::getRecordKey) - // use the bucket assigner to generate bucket IDs - .transform( - "bucket_assigner", - TypeInformation.of(HoodieRecord.class), - new KeyedProcessOperator<>(new BucketAssignFunction<>(conf))) - .uid("uid_bucket_assigner") - // shuffle by fileId(bucket id) - .keyBy(record -> record.getCurrentLocation().getFileId()) - // write operator, where the write operation really happens - .transform(KeyedWriteProcessOperator.NAME, TypeInformation.of(new TypeHint, Integer>>() { - }), new KeyedWriteProcessOperator(new KeyedWriteProcessFunction())) - .name("write_process") - .uid("write_process_uid") - .setParallelism(4) - - // Commit can only be executed once, so make it one parallelism - .addSink(new CommitSink()) - .name("commit_sink") - .uid("commit_sink_uid") - .setParallelism(1); - - JobClient client = execEnv.executeAsync(execEnv.getStreamGraph(conf.getString(FlinkOptions.TABLE_NAME))); - // wait for the streaming job to finish - client.getJobExecutionResult().get(); - - TestData.checkWrittenFullData(tempFile, EXPECTED); - } - @Test public void testMergeOnReadWriteWithCompaction() throws Exception { Configuration conf = TestConfigurations.getDefaultConf(tempFile.getAbsolutePath()); diff --git a/hudi-flink/src/test/java/org/apache/hudi/sink/transform/TestJsonStringToHoodieRecordMapFunction.java b/hudi-flink/src/test/java/org/apache/hudi/sink/transform/TestJsonStringToHoodieRecordMapFunction.java deleted file mode 100644 index c2d7bdc5f..000000000 --- a/hudi-flink/src/test/java/org/apache/hudi/sink/transform/TestJsonStringToHoodieRecordMapFunction.java +++ /dev/null @@ -1,89 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hudi.sink.transform; - -import org.apache.hudi.common.config.TypedProperties; -import org.apache.hudi.common.model.HoodieRecord; -import org.apache.hudi.common.model.OverwriteWithLatestAvroPayload; -import org.apache.hudi.common.testutils.RawTripTestPayload; -import org.apache.hudi.common.util.Option; -import org.apache.hudi.config.HoodieWriteConfig; -import org.apache.hudi.keygen.constant.KeyGeneratorOptions; -import org.apache.hudi.testutils.HoodieFlinkClientTestHarness; - -import org.apache.avro.Schema; -import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; -import org.junit.jupiter.api.AfterEach; -import org.junit.jupiter.api.Assertions; -import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Test; - -import java.util.List; -import java.util.Set; -import java.util.stream.Collectors; - -import static org.apache.hudi.common.testutils.HoodieTestDataGenerator.AVRO_SCHEMA; - -public class TestJsonStringToHoodieRecordMapFunction extends HoodieFlinkClientTestHarness { - @BeforeEach - public void init() { - initPath(); - initTestDataGenerator(); - initFileSystem(); - initFlinkMiniCluster(); - } - - @AfterEach - public void clean() throws Exception { - cleanupTestDataGenerator(); - cleanupFileSystem(); - cleanupFlinkMiniCluster(); - } - - @Test - public void testMapFunction() throws Exception { - final String newCommitTime = "001"; - final int numRecords = 10; - List records = dataGen.generateInserts(newCommitTime, numRecords); - List recordStr = RawTripTestPayload.recordsToStrings(records); - Schema schema = AVRO_SCHEMA; - - TypedProperties props = new TypedProperties(); - props.put(HoodieWriteConfig.WRITE_PAYLOAD_CLASS, OverwriteWithLatestAvroPayload.class.getName()); - props.put(HoodieWriteConfig.PRECOMBINE_FIELD_PROP, "timestamp"); - props.put(KeyGeneratorOptions.RECORDKEY_FIELD_OPT_KEY, "_row_key"); - props.put(KeyGeneratorOptions.PARTITIONPATH_FIELD_OPT_KEY, "current_date"); - StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); - env.setParallelism(1); - - SimpleTestSinkFunction.valuesList.clear(); - env.fromCollection(recordStr) - .map(new JsonStringToHoodieRecordMapFunction(props, Option.of(schema.toString()))) - .addSink(new SimpleTestSinkFunction()); - env.execute(); - - // input records all present in the sink - Assertions.assertEquals(10, SimpleTestSinkFunction.valuesList.size()); - - // input keys all present in the sink - Set inputKeySet = records.stream().map(r -> r.getKey().getRecordKey()).collect(Collectors.toSet()); - Assertions.assertEquals(10, SimpleTestSinkFunction.valuesList.stream() - .map(r -> inputKeySet.contains(r.getRecordKey())).filter(b -> b).count()); - } -}