From dbf8c44bdb3019f2ce93d6b1224d9d478c0340fa Mon Sep 17 00:00:00 2001 From: Danny Chan Date: Thu, 4 Nov 2021 18:09:00 +0800 Subject: [PATCH] [HUDI-2677] Add DFS based message queue for flink writer (#3915) --- .../apache/hudi/sink/StreamWriteFunction.java | 8 +- .../sink/StreamWriteOperatorCoordinator.java | 65 +++---- .../sink/bulk/BulkInsertWriteFunction.java | 42 +++-- .../common/AbstractStreamWriteFunction.java | 98 ++++++++-- .../apache/hudi/sink/message/MessageBus.java | 173 ++++++++++++++++++ .../hudi/sink/message/MessageClient.java | 126 +++++++++++++ .../hudi/sink/message/MessageDriver.java | 132 +++++++++++++ .../hudi/sink/message/TestMessageBus.java | 137 ++++++++++++++ .../utils/StreamWriteFunctionWrapper.java | 1 + 9 files changed, 702 insertions(+), 80 deletions(-) create mode 100644 hudi-flink/src/main/java/org/apache/hudi/sink/message/MessageBus.java create mode 100644 hudi-flink/src/main/java/org/apache/hudi/sink/message/MessageClient.java create mode 100644 hudi-flink/src/main/java/org/apache/hudi/sink/message/MessageDriver.java create mode 100644 hudi-flink/src/test/java/org/apache/hudi/sink/message/TestMessageBus.java diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java b/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java index 0e7e35e7e..11564d186 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java +++ b/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java @@ -137,6 +137,7 @@ public class StreamWriteFunction extends AbstractStreamWriteFunction { @Override public void close() { + super.close(); if (this.writeClient != null) { this.writeClient.cleanHandlesGracefully(); this.writeClient.close(); @@ -401,11 +402,6 @@ public class StreamWriteFunction extends AbstractStreamWriteFunction { } } - private boolean hasData() { - return this.buckets.size() > 0 - && this.buckets.values().stream().anyMatch(bucket -> bucket.records.size() > 0); - } - @SuppressWarnings("unchecked, rawtypes") private boolean flushBucket(DataBucket bucket) { String instant = instantToWrite(true); @@ -439,7 +435,7 @@ public class StreamWriteFunction extends AbstractStreamWriteFunction { @SuppressWarnings("unchecked, rawtypes") private void flushRemaining(boolean endInput) { - this.currentInstant = instantToWrite(hasData()); + this.currentInstant = instantToWrite(false); if (this.currentInstant == null) { // in case there are empty checkpoints that has no input data throw new HoodieException("No inflight instant when flushing data!"); diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java b/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java index feb348fe3..a30d76613 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java +++ b/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java @@ -30,8 +30,9 @@ import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.ValidationUtils; import org.apache.hudi.configuration.FlinkOptions; import org.apache.hudi.exception.HoodieException; -import org.apache.hudi.sink.event.CommitAckEvent; import org.apache.hudi.sink.event.WriteMetadataEvent; +import org.apache.hudi.sink.message.MessageBus; +import org.apache.hudi.sink.message.MessageDriver; import org.apache.hudi.sink.utils.HiveSyncContext; import org.apache.hudi.sink.utils.NonThrownExecutor; import org.apache.hudi.util.StreamerUtil; @@ -41,7 +42,6 @@ import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.jobgraph.OperatorID; import org.apache.flink.runtime.operators.coordination.OperatorCoordinator; import org.apache.flink.runtime.operators.coordination.OperatorEvent; -import org.apache.flink.runtime.operators.coordination.TaskNotRunningException; import org.jetbrains.annotations.Nullable; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -136,6 +136,11 @@ public class StreamWriteOperatorCoordinator */ private transient TableState tableState; + /** + * The message driver. + */ + private MessageDriver messageDriver; + /** * Constructs a StreamingSinkOperatorCoordinator. * @@ -174,6 +179,7 @@ public class StreamWriteOperatorCoordinator if (tableState.syncMetadata) { initMetadataSync(); } + this.messageDriver = MessageBus.getDriver(this.metaClient.getFs(), metaClient.getBasePath()); } @Override @@ -191,6 +197,9 @@ public class StreamWriteOperatorCoordinator writeClient.close(); } this.eventBuffer = null; + if (this.messageDriver != null) { + this.messageDriver.close(); + } } @Override @@ -227,7 +236,7 @@ public class StreamWriteOperatorCoordinator writeClient.scheduleCompaction(Option.empty()); } // start new instant. - startInstant(); + startInstant(checkpointId); // sync Hive if is enabled syncHiveIfEnabled(); } @@ -237,12 +246,7 @@ public class StreamWriteOperatorCoordinator @Override public void notifyCheckpointAborted(long checkpointId) { - // once the checkpoint was aborted, unblock the writer tasks to - // reuse the last instant. - if (!WriteMetadataEvent.BOOTSTRAP_INSTANT.equals(this.instant)) { - executor.execute(() -> sendCommitAckEvents(checkpointId), - "unblock data write with aborted checkpoint %s", checkpointId); - } + this.messageDriver.abortCkp(checkpointId); } @Override @@ -333,12 +337,19 @@ public class StreamWriteOperatorCoordinator } private void startInstant() { + // the flink checkpoint id starts from 1, + // see AbstractStreamWriteFunction#ackInstant + startInstant(MessageBus.INITIAL_CKP_ID); + } + + private void startInstant(long checkpoint) { final String instant = HoodieActiveTimeline.createNewInstantTime(); this.writeClient.startCommitWithTime(instant, tableState.commitAction); + this.metaClient.getActiveTimeline().transitionRequestedToInflight(tableState.commitAction, instant); + this.writeClient.upgradeDowngrade(instant); + this.messageDriver.commitCkp(checkpoint, this.instant, instant); this.instant = instant; - this.metaClient.getActiveTimeline().transitionRequestedToInflight(tableState.commitAction, this.instant); - this.writeClient.upgradeDowngrade(this.instant); - LOG.info("Create instant [{}] for table [{}] with type [{}]", this.instant, + LOG.info("Create instant [{}] for table [{}] with type [{}]", instant, this.conf.getString(FlinkOptions.TABLE_NAME), conf.getString(FlinkOptions.TABLE_TYPE)); } @@ -397,33 +408,6 @@ public class StreamWriteOperatorCoordinator addEventToBuffer(event); } - /** - * The coordinator reuses the instant if there is no data for this round of checkpoint, - * sends the commit ack events to unblock the flushing. - */ - private void sendCommitAckEvents(long checkpointId) { - CompletableFuture[] futures = Arrays.stream(this.gateways).filter(Objects::nonNull) - .map(gw -> gw.sendEvent(CommitAckEvent.getInstance(checkpointId))) - .toArray(CompletableFuture[]::new); - try { - CompletableFuture.allOf(futures).get(); - } catch (Throwable throwable) { - if (!sendToFinishedTasks(throwable)) { - throw new HoodieException("Error while waiting for the commit ack events to finish sending", throwable); - } - } - } - - /** - * Decides whether the given exception is caused by sending events to FINISHED tasks. - * - *

Ugly impl: the exception may change in the future. - */ - private static boolean sendToFinishedTasks(Throwable throwable) { - return throwable.getCause() instanceof TaskNotRunningException - || throwable.getCause().getMessage().contains("running"); - } - /** * Commits the instant. */ @@ -451,8 +435,7 @@ public class StreamWriteOperatorCoordinator if (writeResults.size() == 0) { // No data has written, reset the buffer and returns early reset(); - // Send commit ack event to the write function to unblock the flushing - sendCommitAckEvents(checkpointId); + messageDriver.commitCkp(checkpointId, this.instant, this.instant); return false; } doCommit(instant, writeResults); diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/bulk/BulkInsertWriteFunction.java b/hudi-flink/src/main/java/org/apache/hudi/sink/bulk/BulkInsertWriteFunction.java index f3cfbae66..f5fda5aa8 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/sink/bulk/BulkInsertWriteFunction.java +++ b/hudi-flink/src/main/java/org/apache/hudi/sink/bulk/BulkInsertWriteFunction.java @@ -21,11 +21,13 @@ package org.apache.hudi.sink.bulk; import org.apache.hudi.client.HoodieFlinkWriteClient; import org.apache.hudi.client.WriteStatus; import org.apache.hudi.common.model.WriteOperationType; -import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.common.util.Option; import org.apache.hudi.configuration.FlinkOptions; import org.apache.hudi.sink.StreamWriteOperatorCoordinator; import org.apache.hudi.sink.common.AbstractWriteFunction; import org.apache.hudi.sink.event.WriteMetadataEvent; +import org.apache.hudi.sink.message.MessageBus; +import org.apache.hudi.sink.message.MessageClient; import org.apache.hudi.sink.utils.TimeWait; import org.apache.hudi.util.StreamerUtil; @@ -38,6 +40,8 @@ import org.apache.flink.util.Collector; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import javax.annotation.Nullable; + import java.io.IOException; import java.util.Collections; import java.util.List; @@ -79,26 +83,21 @@ public class BulkInsertWriteFunction */ private int taskID; - /** - * Meta Client. - */ - private transient HoodieTableMetaClient metaClient; - /** * Write Client. */ private transient HoodieFlinkWriteClient writeClient; - /** - * The initial inflight instant when start up. - */ - private volatile String initInstant; - /** * Gateway to send operator events to the operator coordinator. */ private transient OperatorEventGateway eventGateway; + /** + * The message client. + */ + private MessageClient messageClient; + /** * Constructs a StreamingSinkFunction. * @@ -112,9 +111,8 @@ public class BulkInsertWriteFunction @Override public void open(Configuration parameters) throws IOException { this.taskID = getRuntimeContext().getIndexOfThisSubtask(); - this.metaClient = StreamerUtil.createMetaClient(this.config); this.writeClient = StreamerUtil.createWriteClient(this.config, getRuntimeContext()); - this.initInstant = StreamerUtil.getLastPendingInstant(this.metaClient, false); + this.messageClient = MessageBus.getClient(config.getString(FlinkOptions.PATH)); sendBootstrapEvent(); initWriterHelper(); } @@ -130,6 +128,9 @@ public class BulkInsertWriteFunction this.writeClient.cleanHandlesGracefully(); this.writeClient.close(); } + if (this.messageClient != null) { + this.messageClient.close(); + } } /** @@ -183,8 +184,17 @@ public class BulkInsertWriteFunction LOG.info("Send bootstrap write metadata event to coordinator, task[{}].", taskID); } + /** + * Returns the next instant to write from the message bus. + */ + @Nullable + private String ackInstant() { + Option ckpMessageOption = this.messageClient.getCkpMessage(MessageBus.INITIAL_CKP_ID); + return ckpMessageOption.map(message -> message.inflightInstant).orElse(null); + } + private String instantToWrite() { - String instant = StreamerUtil.getLastPendingInstant(this.metaClient); + String instant = ackInstant(); // if exactly-once semantics turns on, // waits for the checkpoint notification until the checkpoint timeout threshold hits. TimeWait timeWait = TimeWait.builder() @@ -192,14 +202,14 @@ public class BulkInsertWriteFunction .action("instant initialize") .throwsT(true) .build(); - while (instant == null || instant.equals(this.initInstant)) { + while (instant == null) { // wait condition: // 1. there is no inflight instant // 2. the inflight instant does not change // sleep for a while timeWait.waitFor(); // refresh the inflight instant - instant = StreamerUtil.getLastPendingInstant(this.metaClient); + instant = ackInstant(); } return instant; } diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/common/AbstractStreamWriteFunction.java b/hudi-flink/src/main/java/org/apache/hudi/sink/common/AbstractStreamWriteFunction.java index 5ad2935e2..c3fcec051 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/sink/common/AbstractStreamWriteFunction.java +++ b/hudi-flink/src/main/java/org/apache/hudi/sink/common/AbstractStreamWriteFunction.java @@ -21,11 +21,14 @@ package org.apache.hudi.sink.common; import org.apache.hudi.client.HoodieFlinkWriteClient; import org.apache.hudi.client.WriteStatus; import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.ValidationUtils; import org.apache.hudi.configuration.FlinkOptions; import org.apache.hudi.sink.StreamWriteOperatorCoordinator; import org.apache.hudi.sink.event.CommitAckEvent; import org.apache.hudi.sink.event.WriteMetadataEvent; +import org.apache.hudi.sink.message.MessageBus; +import org.apache.hudi.sink.message.MessageClient; import org.apache.hudi.sink.utils.TimeWait; import org.apache.hudi.util.StreamerUtil; @@ -39,12 +42,14 @@ import org.apache.flink.runtime.operators.coordination.OperatorEventGateway; import org.apache.flink.runtime.state.FunctionInitializationContext; import org.apache.flink.runtime.state.FunctionSnapshotContext; import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction; +import org.apache.flink.util.CollectionUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import javax.annotation.Nullable; + import java.util.ArrayList; import java.util.List; -import java.util.Objects; /** * Base infrastructures for streaming writer function. @@ -119,6 +124,11 @@ public abstract class AbstractStreamWriteFunction */ private long checkpointId = -1; + /** + * The message client. + */ + private MessageClient messageClient; + /** * Constructs a StreamWriteFunctionBase. * @@ -140,7 +150,6 @@ public abstract class AbstractStreamWriteFunction TypeInformation.of(WriteMetadataEvent.class) )); - this.currentInstant = lastPendingInstant(); if (context.isRestored()) { restoreWriteMetadata(); } else { @@ -148,6 +157,7 @@ public abstract class AbstractStreamWriteFunction } // blocks flushing until the coordinator starts a new instant this.confirming = true; + this.messageClient = MessageBus.getClient(this.metaClient.getFs(), this.metaClient.getBasePath()); } @Override @@ -177,14 +187,19 @@ public abstract class AbstractStreamWriteFunction // ------------------------------------------------------------------------- private void restoreWriteMetadata() throws Exception { - String lastInflight = lastPendingInstant(); + List events = CollectionUtil.iterableToList(this.writeMetadataState.get()); boolean eventSent = false; - for (WriteMetadataEvent event : this.writeMetadataState.get()) { - if (Objects.equals(lastInflight, event.getInstantTime())) { - // The checkpoint succeed but the meta does not commit, - // re-commit the inflight instant - this.eventGateway.sendEventToCoordinator(event); - LOG.info("Send uncommitted write metadata event to coordinator, task[{}].", taskID); + if (events.size() > 0) { + boolean committed = this.metaClient.getActiveTimeline() + .filterCompletedInstants() + .containsInstant(events.get(0).getInstantTime()); + if (!committed) { + for (WriteMetadataEvent event : events) { + // The checkpoint succeed but the meta does not commit, + // re-commit the inflight instant + this.eventGateway.sendEventToCoordinator(event); + LOG.info("Send uncommitted write metadata event to coordinator, task[{}].", taskID); + } eventSent = true; } } @@ -222,21 +237,65 @@ public abstract class AbstractStreamWriteFunction } } + @Override + public void close() { + if (this.messageClient != null) { + this.messageClient.close(); + } + } + /** * Returns the last pending instant time. */ - protected String lastPendingInstant() { - return StreamerUtil.getLastPendingInstant(this.metaClient); + private String lastPendingInstant() { + return StreamerUtil.getLastPendingInstant(metaClient); + } + + /** + * Returns the previous committed checkpoint id. + * + * @param eagerFlush Whether the data flush happens before the checkpoint barrier arrives + */ + private long prevCkp(boolean eagerFlush) { + // Use the last checkpoint id to request for the message, + // the time sequence of committed checkpoints and ongoing + // checkpoints are as following: + + // 0 ------------ 1 ------------ 2 ------------ 3 ------------> committed ckp id + // | / / / / + // |--- ckp-1 ----|--- ckp-2 ----|--- ckp-3 ----|--- ckp-4 ----| ongoing ckp id + + // Use 0 as the initial committed checkpoint id, the 0th checkpoint message records the writing instant for ckp-1; + // when ckp-1 success event is received, commits a checkpoint message with the writing instant for ckp-2; + // that means, the checkpoint message records the writing instant of next checkpoint. + return Math.max(0, eagerFlush ? this.checkpointId : this.checkpointId - 1); + } + + /** + * Returns the next instant to write from the message bus. + * + *

It returns 3 kinds of value: + * i) normal instant time: the previous checkpoint succeed; + * ii) 'aborted' instant time: the previous checkpoint has been aborted; + * ii) null: the checkpoint is till ongoing without any notifications. + */ + @Nullable + protected String ackInstant(long checkpointId) { + Option ckpMessageOption = this.messageClient.getCkpMessage(checkpointId); + return ckpMessageOption.map(message -> message.inflightInstant).orElse(null); } /** * Prepares the instant time to write with for next checkpoint. * - * @param hasData Whether the task has buffering data + * @param eagerFlush Whether the data flush happens before the checkpoint barrier arrives + * * @return The instant time */ - protected String instantToWrite(boolean hasData) { - String instant = lastPendingInstant(); + protected String instantToWrite(boolean eagerFlush) { + final long ckpId = prevCkp(eagerFlush); + String instant = ackInstant(ckpId); + // if exactly-once semantics turns on, // waits for the checkpoint notification until the checkpoint timeout threshold hits. TimeWait timeWait = TimeWait.builder() @@ -247,18 +306,23 @@ public abstract class AbstractStreamWriteFunction // wait condition: // 1. there is no inflight instant // 2. the inflight instant does not change and the checkpoint has buffering data - if (instant == null || (instant.equals(this.currentInstant) && hasData)) { + if (instant == null) { // sleep for a while boolean timeout = timeWait.waitFor(); - if (timeout && instant != null) { + if (timeout && MessageBus.notInitialCkp(ckpId)) { // if the timeout threshold hits but the last instant still not commit, // and the task does not receive commit ask event(no data or aborted checkpoint), // assumes the checkpoint was canceled silently and unblock the data flushing confirming = false; + instant = lastPendingInstant(); } else { // refresh the inflight instant - instant = lastPendingInstant(); + instant = ackInstant(ckpId); } + } else if (MessageBus.canAbort(instant, ckpId)) { + // the checkpoint was canceled, reuse the last instant + confirming = false; + instant = lastPendingInstant(); } else { // the pending instant changed, that means the last instant was committed // successfully. diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/message/MessageBus.java b/hudi-flink/src/main/java/org/apache/hudi/sink/message/MessageBus.java new file mode 100644 index 000000000..ff8f3ebfa --- /dev/null +++ b/hudi-flink/src/main/java/org/apache/hudi/sink/message/MessageBus.java @@ -0,0 +1,173 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.sink.message; + +import org.apache.hudi.common.fs.FSUtils; +import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.common.util.FileIOUtils; +import org.apache.hudi.common.util.ValidationUtils; +import org.apache.hudi.util.StreamerUtil; + +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; + +import java.io.IOException; +import java.nio.charset.StandardCharsets; + +/** + * A message bus for transferring the checkpoint messages. + * + *

Each time the driver starts a new instant, it writes a commit message into the bus, the write tasks + * then consume the message and unblocking the data flush. + * + *

Why we use the DFS based message queue instead of sending + * the {@link org.apache.flink.runtime.operators.coordination.OperatorEvent} ? + * The write task handles the operator event using the main mailbox executor which has the lowest priority for mails, + * it is also used to process the inputs. When the write task blocks and waits for the operator event to ack the valid instant to write, + * it actually blocks all the following events in the mailbox, the operator event can never be consumed then it causes deadlock. + * + *

The message bus is also more lightweight than the active timeline. + */ +public abstract class MessageBus implements AutoCloseable { + + public static final long INITIAL_CKP_ID = 0L; + + public static final String ABORTED_CKP_INSTANT = "aborted"; + + protected static final int MESSAGE_QUEUE_LENGTH = 20; + + protected static final int CLIENT_MESSAGE_CACHE_SIZE = 10; + + private static final String MESSAGE_BUS = "message_bus"; + + private static final String COMMIT = "commit"; + + private static final String COMMIT_EXTENSION = "." + COMMIT; + private static final String ABORTED_EXTENSION = ".aborted"; + + protected final FileSystem fs; + protected final String basePath; + protected final String messageBusPath; + + protected MessageBus(FileSystem fs, String basePath) { + this.fs = fs; + this.basePath = basePath; + this.messageBusPath = messageBusPath(basePath); + } + + public static MessageDriver getDriver(FileSystem fs, String basePath) { + return MessageDriver.getInstance(fs, basePath); + } + + public static MessageClient getClient(FileSystem fs, String basePath) { + return MessageClient.getSingleton(fs, basePath); + } + + public static MessageClient getClient(String basePath) { + FileSystem fs = FSUtils.getFs(basePath, StreamerUtil.getHadoopConf()); + return MessageClient.getSingleton(fs, basePath); + } + + // ------------------------------------------------------------------------- + // Utilities + // ------------------------------------------------------------------------- + public static boolean canAbort(String instant, long checkpointId) { + return ABORTED_CKP_INSTANT.equals(instant) && MessageBus.notInitialCkp(checkpointId); + } + + public static boolean notInitialCkp(long checkpointId) { + return checkpointId != INITIAL_CKP_ID; + } + + protected Path fullFilePath(String fileName) { + return new Path(messageBusPath, fileName); + } + + protected static String messageBusPath(String basePath) { + return basePath + Path.SEPARATOR + HoodieTableMetaClient.AUXILIARYFOLDER_NAME + Path.SEPARATOR + MESSAGE_BUS; + } + + protected static String getCommitFileName(long checkpointId) { + return checkpointId + COMMIT_EXTENSION; + } + + protected static String getAbortedFileName(long checkpointId) { + return checkpointId + ABORTED_EXTENSION; + } + + // ------------------------------------------------------------------------- + // Inner Class + // ------------------------------------------------------------------------- + + /** + * A checkpoint message. + */ + public static class CkpMessage { + private static final String SEPARATOR = ","; + + public final boolean committed; // whether the checkpoint is committed + + public final long checkpointId; + public final String commitInstant; + public final String inflightInstant; + + private CkpMessage(long checkpointId, String commitInstant, String inflightInstant) { + this.committed = true; + this.checkpointId = checkpointId; + this.commitInstant = commitInstant; + this.inflightInstant = inflightInstant; + } + + private CkpMessage(long checkpointId) { + this.committed = false; + this.checkpointId = checkpointId; + this.commitInstant = ABORTED_CKP_INSTANT; + this.inflightInstant = ABORTED_CKP_INSTANT; + } + + /** + * Encodes the instants as 'commitInstant,inflightInstant'. + */ + public static byte[] toBytes(String commitInstant, String inflightInstant) { + return (commitInstant + SEPARATOR + inflightInstant).getBytes(StandardCharsets.UTF_8); + } + + public static CkpMessage fromBytes(long checkpointId, byte[] bytes) { + String content = new String(bytes, StandardCharsets.UTF_8); + String[] splits = content.split(SEPARATOR); + return new CkpMessage(checkpointId, splits[0], splits[1]); + } + + public static CkpMessage fromPath(FileSystem fs, Path path) throws IOException { + final String[] splits = path.getName().split("\\."); + ValidationUtils.checkState(splits.length == 2, "Invalid checkpoint message file name: " + path.getName()); + final long checkpointId = Long.parseLong(splits[0]); + final String suffix = splits[1]; + if (suffix.equals(COMMIT)) { + try (FSDataInputStream is = fs.open(path)) { + byte[] bytes = FileIOUtils.readAsByteArray(is); + return CkpMessage.fromBytes(checkpointId, bytes); + } + } else { + return new CkpMessage(checkpointId); + } + } + } +} diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/message/MessageClient.java b/hudi-flink/src/main/java/org/apache/hudi/sink/message/MessageClient.java new file mode 100644 index 000000000..ea893d536 --- /dev/null +++ b/hudi-flink/src/main/java/org/apache/hudi/sink/message/MessageClient.java @@ -0,0 +1,126 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.sink.message; + +import org.apache.hudi.common.util.Option; +import org.apache.hudi.exception.HoodieException; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; +import java.util.TreeMap; + +/** + * A client that consumes messages from the {@link MessageBus}. + */ +public class MessageClient extends MessageBus { + private static final Logger LOG = LoggerFactory.getLogger(MessageClient.class); + + private static final Map CLIENTS = new HashMap<>(); + + private final TreeMap ckpCache; // checkpoint id -> CkpMessage mapping + + private MessageClient(FileSystem fs, String basePath) throws IOException { + super(fs, basePath); + this.ckpCache = new TreeMap<>(); + } + + /** + * Returns the message bus instance. + * + *

This expects to be called by the client. + * + * @param fs The filesystem + * @param basePath The table base path + * @return The instance of message bus + */ + private static MessageClient getInstance(FileSystem fs, String basePath) { + try { + return new MessageClient(fs, basePath); + } catch (IOException e) { + throw new HoodieException("Initialize checkpoint message bus error", e); + } + } + + /** + * Returns the singleton message bus instance. + * + *

This expects to be called by the client. + * + * @param fs The filesystem + * @param basePath The table base path + * @return The instance of message bus + */ + public static synchronized MessageClient getSingleton(FileSystem fs, String basePath) { + return CLIENTS.computeIfAbsent(basePath, + k -> getInstance(fs, basePath)); + } + + public synchronized Option getCkpMessage(long checkpointId) { + if (this.ckpCache.size() >= CLIENT_MESSAGE_CACHE_SIZE) { + this.ckpCache.pollFirstEntry(); + } + if (this.ckpCache.containsKey(checkpointId)) { + return Option.of(this.ckpCache.get(checkpointId)); + } + final Path commitFilePath = fullFilePath(getCommitFileName(checkpointId)); + try { + if (fs.exists(commitFilePath)) { + CkpMessage ckpMessage = CkpMessage.fromPath(fs, commitFilePath); + this.ckpCache.put(checkpointId, ckpMessage); + return Option.of(ckpMessage); + } + } catch (Throwable e) { + // ignored + LOG.warn("Read committed checkpoint message error: " + checkpointId, e); + return Option.empty(); + } + final Path abortedFilePath = fullFilePath(getAbortedFileName(checkpointId)); + try { + if (fs.exists(abortedFilePath)) { + CkpMessage ckpMessage = CkpMessage.fromPath(fs, abortedFilePath); + this.ckpCache.put(checkpointId, ckpMessage); + return Option.of(ckpMessage); + } + } catch (Throwable e) { + // ignored + LOG.warn("Read aborted checkpoint message error: " + checkpointId, e); + return Option.empty(); + } + return Option.empty(); + } + + @VisibleForTesting + public TreeMap getCkpCache() { + return ckpCache; + } + + @Override + public void close() { + synchronized (this) { + this.ckpCache.clear(); + } + } +} diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/message/MessageDriver.java b/hudi-flink/src/main/java/org/apache/hudi/sink/message/MessageDriver.java new file mode 100644 index 000000000..bf98209ee --- /dev/null +++ b/hudi-flink/src/main/java/org/apache/hudi/sink/message/MessageDriver.java @@ -0,0 +1,132 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.sink.message; + +import org.apache.hudi.exception.HoodieException; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; + +import java.io.IOException; +import java.util.Map; +import java.util.TreeMap; + +/** + * A driver that produces messages to the {@link MessageBus}. + */ +public class MessageDriver extends MessageBus { + private final TreeMap ckpIdCache; // checkpoint id -> isCommitted mapping + + public MessageDriver(FileSystem fs, String basePath) throws IOException { + super(fs, basePath); + this.ckpIdCache = new TreeMap<>(); + initialize(); + } + + /** + * Returns the message bus instance. + * + *

This expects to be called by the driver. + * + * @param fs The filesystem + * @param basePath The table base path + * @return The instance of message bus + */ + public static MessageDriver getInstance(FileSystem fs, String basePath) { + try { + return new MessageDriver(fs, basePath); + } catch (IOException e) { + throw new HoodieException("Initialize checkpoint message bus error", e); + } + } + + /** + * Initialize the message bus, would clean all the messages. + * + *

This expects to be called by the driver. + */ + private void initialize() throws IOException { + Path path = new Path(messageBusPath(basePath)); + if (fs.exists(path)) { + fs.delete(path, true); + } + fs.mkdirs(path); + } + + /** + * Add a checkpoint commit message. + * + * @param checkpointId The checkpoint id + * @param commitInstant The committed instant + * @param inflightInstant The new inflight instant + */ + public void commitCkp(long checkpointId, String commitInstant, String inflightInstant) { + Path path = fullFilePath(getCommitFileName(checkpointId)); + + try (FSDataOutputStream outputStream = fs.create(path, true)) { + byte[] bytes = CkpMessage.toBytes(commitInstant, inflightInstant); + outputStream.write(bytes); + outputStream.close(); + this.ckpIdCache.put(checkpointId, true); + clean(); + } catch (Throwable e) { + throw new HoodieException("Adding committed message error for checkpoint: " + checkpointId, e); + } + } + + /** + * Add an aborted checkpoint message. + * + * @param checkpointId The checkpoint id + */ + public void abortCkp(long checkpointId) { + Path path = fullFilePath(getAbortedFileName(checkpointId)); + try { + fs.createNewFile(path); + this.ckpIdCache.put(checkpointId, false); + clean(); + } catch (Throwable e) { + throw new HoodieException("Adding aborted message error for checkpoint: " + checkpointId, e); + } + } + + private void clean() throws IOException { + int numToClean = this.ckpIdCache.size() - MESSAGE_QUEUE_LENGTH; + if (numToClean >= 10) { + for (int i = 0; i < numToClean; i++) { + Map.Entry entry = this.ckpIdCache.pollFirstEntry(); + final String fileName = entry.getValue() ? getCommitFileName(entry.getKey()) : getAbortedFileName(entry.getKey()); + final Path filePath = fullFilePath(fileName); + fs.delete(filePath, false); + } + } + } + + @VisibleForTesting + public TreeMap getCkpIdCache() { + return ckpIdCache; + } + + @Override + public void close() throws Exception { + this.ckpIdCache.clear(); + } +} diff --git a/hudi-flink/src/test/java/org/apache/hudi/sink/message/TestMessageBus.java b/hudi-flink/src/test/java/org/apache/hudi/sink/message/TestMessageBus.java new file mode 100644 index 000000000..b161c969c --- /dev/null +++ b/hudi-flink/src/test/java/org/apache/hudi/sink/message/TestMessageBus.java @@ -0,0 +1,137 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.sink.message; + +import org.apache.hudi.common.fs.FSUtils; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.util.StreamerUtil; +import org.apache.hudi.utils.TestConfigurations; + +import org.apache.flink.configuration.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; + +import java.io.File; +import java.util.ArrayList; +import java.util.List; +import java.util.Set; +import java.util.stream.IntStream; + +import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; + +/** + * Test cases for {@link MessageBus}. + */ +public class TestMessageBus { + + private String basePath; + private FileSystem fs; + + private MessageDriver driver; + + @TempDir + File tempFile; + + @BeforeEach + public void beforeEach() throws Exception { + basePath = tempFile.getAbsolutePath(); + this.fs = FSUtils.getFs(tempFile.getAbsolutePath(), StreamerUtil.getHadoopConf()); + + Configuration conf = TestConfigurations.getDefaultConf(basePath); + StreamerUtil.initTableIfNotExists(conf); + + this.driver = MessageDriver.getInstance(fs, basePath); + } + + @Test + void testWriteAndReadMessage() { + MessageClient client = MessageClient.getSingleton(fs, basePath); + + // write and read 5 committed checkpoints + IntStream.range(0, 5).forEach(i -> driver.commitCkp(i, i + "", i + 1 + "")); + + IntStream.range(0, 5).forEach(i -> { + Option messageOpt = client.getCkpMessage(i); + assertTrue(messageOpt.isPresent()); + + MessageBus.CkpMessage ckpMessage = messageOpt.get(); + assertTrue(ckpMessage.committed); + assertThat(ckpMessage.commitInstant, is(i + "")); + assertThat(ckpMessage.inflightInstant, is(i + 1 + "")); + }); + + // write and read 5 aborted checkpoints + IntStream.range(5, 10).forEach(i -> driver.abortCkp(i)); + + IntStream.range(5, 10).forEach(i -> { + Option messageOpt = client.getCkpMessage(i); + assertTrue(messageOpt.isPresent()); + + MessageBus.CkpMessage ckpMessage = messageOpt.get(); + assertFalse(ckpMessage.committed); + assertThat(ckpMessage.commitInstant, is(MessageBus.ABORTED_CKP_INSTANT)); + assertThat(ckpMessage.inflightInstant, is(MessageBus.ABORTED_CKP_INSTANT)); + }); + } + + @Test + void testWriteCleaning() { + // write and read 20 committed checkpoints + IntStream.range(0, 20).forEach(i -> driver.commitCkp(i, i + "", i + 1 + "")); + assertThat("The id cache should not be cleaned", driver.getCkpIdCache().size(), is(20)); + + // write and read 10 aborted checkpoints + IntStream.range(20, 29).forEach(i -> driver.abortCkp(i)); + assertThat("The id cache should not be cleaned", driver.getCkpIdCache().size(), is(29)); + + driver.commitCkp(29, "29", "30"); + assertThat("The cache should be cleaned", driver.getCkpIdCache().size(), is(20)); + assertThat(longSet2String(driver.getCkpIdCache().keySet()), + is("10,11,12,13,14,15,16,17,18,19,20,21,22,23,24,25,26,27,28,29")); + } + + @Test + void testReadCleaning() { + MessageClient client = MessageClient.getSingleton(fs, basePath); + + // write and read 20 committed checkpoints + IntStream.range(0, 20).forEach(i -> driver.commitCkp(i, i + "", i + 1 + "")); + + IntStream.range(0, 10).forEach(client::getCkpMessage); + assertThat("The checkpoint cache should not be cleaned", client.getCkpCache().size(), is(10)); + + client.getCkpMessage(10); + assertThat("The checkpoint cache should be cleaned", client.getCkpCache().size(), is(10)); + + IntStream.range(11, 15).forEach(client::getCkpMessage); + assertThat("The checkpoint cache should be cleaned", client.getCkpCache().size(), is(10)); + assertThat(longSet2String(client.getCkpCache().keySet()), is("5,6,7,8,9,10,11,12,13,14")); + } + + private static String longSet2String(Set longSet) { + List elements = new ArrayList<>(); + longSet.stream().mapToInt(Long::intValue).forEach(i -> elements.add(i + "")); + return String.join(",", elements); + } +} diff --git a/hudi-flink/src/test/java/org/apache/hudi/sink/utils/StreamWriteFunctionWrapper.java b/hudi-flink/src/test/java/org/apache/hudi/sink/utils/StreamWriteFunctionWrapper.java index 54a142a25..f1f5a1f88 100644 --- a/hudi-flink/src/test/java/org/apache/hudi/sink/utils/StreamWriteFunctionWrapper.java +++ b/hudi-flink/src/test/java/org/apache/hudi/sink/utils/StreamWriteFunctionWrapper.java @@ -248,6 +248,7 @@ public class StreamWriteFunctionWrapper implements TestFunctionWrapper { public void close() throws Exception { coordinator.close(); ioManager.close(); + writeFunction.close(); } public StreamWriteOperatorCoordinator getCoordinator() {