1
0

Revert "[HUDI-2677] Add DFS based message queue for flink writer (#3915)" (#3923)

This reverts commit dbf8c44bdb.
This commit is contained in:
Danny Chan
2021-11-04 20:48:57 +08:00
committed by GitHub
parent dbf8c44bdb
commit 33436aa359
9 changed files with 80 additions and 702 deletions

View File

@@ -137,7 +137,6 @@ public class StreamWriteFunction<I> extends AbstractStreamWriteFunction<I> {
@Override
public void close() {
super.close();
if (this.writeClient != null) {
this.writeClient.cleanHandlesGracefully();
this.writeClient.close();
@@ -402,6 +401,11 @@ public class StreamWriteFunction<I> extends AbstractStreamWriteFunction<I> {
}
}
private boolean hasData() {
return this.buckets.size() > 0
&& this.buckets.values().stream().anyMatch(bucket -> bucket.records.size() > 0);
}
@SuppressWarnings("unchecked, rawtypes")
private boolean flushBucket(DataBucket bucket) {
String instant = instantToWrite(true);
@@ -435,7 +439,7 @@ public class StreamWriteFunction<I> extends AbstractStreamWriteFunction<I> {
@SuppressWarnings("unchecked, rawtypes")
private void flushRemaining(boolean endInput) {
this.currentInstant = instantToWrite(false);
this.currentInstant = instantToWrite(hasData());
if (this.currentInstant == null) {
// in case there are empty checkpoints that has no input data
throw new HoodieException("No inflight instant when flushing data!");

View File

@@ -30,9 +30,8 @@ import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.sink.event.CommitAckEvent;
import org.apache.hudi.sink.event.WriteMetadataEvent;
import org.apache.hudi.sink.message.MessageBus;
import org.apache.hudi.sink.message.MessageDriver;
import org.apache.hudi.sink.utils.HiveSyncContext;
import org.apache.hudi.sink.utils.NonThrownExecutor;
import org.apache.hudi.util.StreamerUtil;
@@ -42,6 +41,7 @@ import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
import org.apache.flink.runtime.operators.coordination.OperatorEvent;
import org.apache.flink.runtime.operators.coordination.TaskNotRunningException;
import org.jetbrains.annotations.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -136,11 +136,6 @@ public class StreamWriteOperatorCoordinator
*/
private transient TableState tableState;
/**
* The message driver.
*/
private MessageDriver messageDriver;
/**
* Constructs a StreamingSinkOperatorCoordinator.
*
@@ -179,7 +174,6 @@ public class StreamWriteOperatorCoordinator
if (tableState.syncMetadata) {
initMetadataSync();
}
this.messageDriver = MessageBus.getDriver(this.metaClient.getFs(), metaClient.getBasePath());
}
@Override
@@ -197,9 +191,6 @@ public class StreamWriteOperatorCoordinator
writeClient.close();
}
this.eventBuffer = null;
if (this.messageDriver != null) {
this.messageDriver.close();
}
}
@Override
@@ -236,7 +227,7 @@ public class StreamWriteOperatorCoordinator
writeClient.scheduleCompaction(Option.empty());
}
// start new instant.
startInstant(checkpointId);
startInstant();
// sync Hive if is enabled
syncHiveIfEnabled();
}
@@ -246,7 +237,12 @@ public class StreamWriteOperatorCoordinator
@Override
public void notifyCheckpointAborted(long checkpointId) {
this.messageDriver.abortCkp(checkpointId);
// once the checkpoint was aborted, unblock the writer tasks to
// reuse the last instant.
if (!WriteMetadataEvent.BOOTSTRAP_INSTANT.equals(this.instant)) {
executor.execute(() -> sendCommitAckEvents(checkpointId),
"unblock data write with aborted checkpoint %s", checkpointId);
}
}
@Override
@@ -337,19 +333,12 @@ public class StreamWriteOperatorCoordinator
}
private void startInstant() {
// the flink checkpoint id starts from 1,
// see AbstractStreamWriteFunction#ackInstant
startInstant(MessageBus.INITIAL_CKP_ID);
}
private void startInstant(long checkpoint) {
final String instant = HoodieActiveTimeline.createNewInstantTime();
this.writeClient.startCommitWithTime(instant, tableState.commitAction);
this.metaClient.getActiveTimeline().transitionRequestedToInflight(tableState.commitAction, instant);
this.writeClient.upgradeDowngrade(instant);
this.messageDriver.commitCkp(checkpoint, this.instant, instant);
this.instant = instant;
LOG.info("Create instant [{}] for table [{}] with type [{}]", instant,
this.metaClient.getActiveTimeline().transitionRequestedToInflight(tableState.commitAction, this.instant);
this.writeClient.upgradeDowngrade(this.instant);
LOG.info("Create instant [{}] for table [{}] with type [{}]", this.instant,
this.conf.getString(FlinkOptions.TABLE_NAME), conf.getString(FlinkOptions.TABLE_TYPE));
}
@@ -408,6 +397,33 @@ public class StreamWriteOperatorCoordinator
addEventToBuffer(event);
}
/**
* The coordinator reuses the instant if there is no data for this round of checkpoint,
* sends the commit ack events to unblock the flushing.
*/
private void sendCommitAckEvents(long checkpointId) {
CompletableFuture<?>[] futures = Arrays.stream(this.gateways).filter(Objects::nonNull)
.map(gw -> gw.sendEvent(CommitAckEvent.getInstance(checkpointId)))
.toArray(CompletableFuture<?>[]::new);
try {
CompletableFuture.allOf(futures).get();
} catch (Throwable throwable) {
if (!sendToFinishedTasks(throwable)) {
throw new HoodieException("Error while waiting for the commit ack events to finish sending", throwable);
}
}
}
/**
* Decides whether the given exception is caused by sending events to FINISHED tasks.
*
* <p>Ugly impl: the exception may change in the future.
*/
private static boolean sendToFinishedTasks(Throwable throwable) {
return throwable.getCause() instanceof TaskNotRunningException
|| throwable.getCause().getMessage().contains("running");
}
/**
* Commits the instant.
*/
@@ -435,7 +451,8 @@ public class StreamWriteOperatorCoordinator
if (writeResults.size() == 0) {
// No data has written, reset the buffer and returns early
reset();
messageDriver.commitCkp(checkpointId, this.instant, this.instant);
// Send commit ack event to the write function to unblock the flushing
sendCommitAckEvents(checkpointId);
return false;
}
doCommit(instant, writeResults);

View File

@@ -21,13 +21,11 @@ package org.apache.hudi.sink.bulk;
import org.apache.hudi.client.HoodieFlinkWriteClient;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.sink.StreamWriteOperatorCoordinator;
import org.apache.hudi.sink.common.AbstractWriteFunction;
import org.apache.hudi.sink.event.WriteMetadataEvent;
import org.apache.hudi.sink.message.MessageBus;
import org.apache.hudi.sink.message.MessageClient;
import org.apache.hudi.sink.utils.TimeWait;
import org.apache.hudi.util.StreamerUtil;
@@ -40,8 +38,6 @@ import org.apache.flink.util.Collector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.annotation.Nullable;
import java.io.IOException;
import java.util.Collections;
import java.util.List;
@@ -83,21 +79,26 @@ public class BulkInsertWriteFunction<I>
*/
private int taskID;
/**
* Meta Client.
*/
private transient HoodieTableMetaClient metaClient;
/**
* Write Client.
*/
private transient HoodieFlinkWriteClient writeClient;
/**
* The initial inflight instant when start up.
*/
private volatile String initInstant;
/**
* Gateway to send operator events to the operator coordinator.
*/
private transient OperatorEventGateway eventGateway;
/**
* The message client.
*/
private MessageClient messageClient;
/**
* Constructs a StreamingSinkFunction.
*
@@ -111,8 +112,9 @@ public class BulkInsertWriteFunction<I>
@Override
public void open(Configuration parameters) throws IOException {
this.taskID = getRuntimeContext().getIndexOfThisSubtask();
this.metaClient = StreamerUtil.createMetaClient(this.config);
this.writeClient = StreamerUtil.createWriteClient(this.config, getRuntimeContext());
this.messageClient = MessageBus.getClient(config.getString(FlinkOptions.PATH));
this.initInstant = StreamerUtil.getLastPendingInstant(this.metaClient, false);
sendBootstrapEvent();
initWriterHelper();
}
@@ -128,9 +130,6 @@ public class BulkInsertWriteFunction<I>
this.writeClient.cleanHandlesGracefully();
this.writeClient.close();
}
if (this.messageClient != null) {
this.messageClient.close();
}
}
/**
@@ -184,17 +183,8 @@ public class BulkInsertWriteFunction<I>
LOG.info("Send bootstrap write metadata event to coordinator, task[{}].", taskID);
}
/**
* Returns the next instant to write from the message bus.
*/
@Nullable
private String ackInstant() {
Option<MessageBus.CkpMessage> ckpMessageOption = this.messageClient.getCkpMessage(MessageBus.INITIAL_CKP_ID);
return ckpMessageOption.map(message -> message.inflightInstant).orElse(null);
}
private String instantToWrite() {
String instant = ackInstant();
String instant = StreamerUtil.getLastPendingInstant(this.metaClient);
// if exactly-once semantics turns on,
// waits for the checkpoint notification until the checkpoint timeout threshold hits.
TimeWait timeWait = TimeWait.builder()
@@ -202,14 +192,14 @@ public class BulkInsertWriteFunction<I>
.action("instant initialize")
.throwsT(true)
.build();
while (instant == null) {
while (instant == null || instant.equals(this.initInstant)) {
// wait condition:
// 1. there is no inflight instant
// 2. the inflight instant does not change
// sleep for a while
timeWait.waitFor();
// refresh the inflight instant
instant = ackInstant();
instant = StreamerUtil.getLastPendingInstant(this.metaClient);
}
return instant;
}

View File

@@ -21,14 +21,11 @@ package org.apache.hudi.sink.common;
import org.apache.hudi.client.HoodieFlinkWriteClient;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.sink.StreamWriteOperatorCoordinator;
import org.apache.hudi.sink.event.CommitAckEvent;
import org.apache.hudi.sink.event.WriteMetadataEvent;
import org.apache.hudi.sink.message.MessageBus;
import org.apache.hudi.sink.message.MessageClient;
import org.apache.hudi.sink.utils.TimeWait;
import org.apache.hudi.util.StreamerUtil;
@@ -42,14 +39,12 @@ import org.apache.flink.runtime.operators.coordination.OperatorEventGateway;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.util.CollectionUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.annotation.Nullable;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
/**
* Base infrastructures for streaming writer function.
@@ -124,11 +119,6 @@ public abstract class AbstractStreamWriteFunction<I>
*/
private long checkpointId = -1;
/**
* The message client.
*/
private MessageClient messageClient;
/**
* Constructs a StreamWriteFunctionBase.
*
@@ -150,6 +140,7 @@ public abstract class AbstractStreamWriteFunction<I>
TypeInformation.of(WriteMetadataEvent.class)
));
this.currentInstant = lastPendingInstant();
if (context.isRestored()) {
restoreWriteMetadata();
} else {
@@ -157,7 +148,6 @@ public abstract class AbstractStreamWriteFunction<I>
}
// blocks flushing until the coordinator starts a new instant
this.confirming = true;
this.messageClient = MessageBus.getClient(this.metaClient.getFs(), this.metaClient.getBasePath());
}
@Override
@@ -187,19 +177,14 @@ public abstract class AbstractStreamWriteFunction<I>
// -------------------------------------------------------------------------
private void restoreWriteMetadata() throws Exception {
List<WriteMetadataEvent> events = CollectionUtil.iterableToList(this.writeMetadataState.get());
String lastInflight = lastPendingInstant();
boolean eventSent = false;
if (events.size() > 0) {
boolean committed = this.metaClient.getActiveTimeline()
.filterCompletedInstants()
.containsInstant(events.get(0).getInstantTime());
if (!committed) {
for (WriteMetadataEvent event : events) {
// The checkpoint succeed but the meta does not commit,
// re-commit the inflight instant
this.eventGateway.sendEventToCoordinator(event);
LOG.info("Send uncommitted write metadata event to coordinator, task[{}].", taskID);
}
for (WriteMetadataEvent event : this.writeMetadataState.get()) {
if (Objects.equals(lastInflight, event.getInstantTime())) {
// The checkpoint succeed but the meta does not commit,
// re-commit the inflight instant
this.eventGateway.sendEventToCoordinator(event);
LOG.info("Send uncommitted write metadata event to coordinator, task[{}].", taskID);
eventSent = true;
}
}
@@ -237,65 +222,21 @@ public abstract class AbstractStreamWriteFunction<I>
}
}
@Override
public void close() {
if (this.messageClient != null) {
this.messageClient.close();
}
}
/**
* Returns the last pending instant time.
*/
private String lastPendingInstant() {
return StreamerUtil.getLastPendingInstant(metaClient);
}
/**
* Returns the previous committed checkpoint id.
*
* @param eagerFlush Whether the data flush happens before the checkpoint barrier arrives
*/
private long prevCkp(boolean eagerFlush) {
// Use the last checkpoint id to request for the message,
// the time sequence of committed checkpoints and ongoing
// checkpoints are as following:
// 0 ------------ 1 ------------ 2 ------------ 3 ------------> committed ckp id
// | / / / /
// |--- ckp-1 ----|--- ckp-2 ----|--- ckp-3 ----|--- ckp-4 ----| ongoing ckp id
// Use 0 as the initial committed checkpoint id, the 0th checkpoint message records the writing instant for ckp-1;
// when ckp-1 success event is received, commits a checkpoint message with the writing instant for ckp-2;
// that means, the checkpoint message records the writing instant of next checkpoint.
return Math.max(0, eagerFlush ? this.checkpointId : this.checkpointId - 1);
}
/**
* Returns the next instant to write from the message bus.
*
* <p>It returns 3 kinds of value:
* i) normal instant time: the previous checkpoint succeed;
* ii) 'aborted' instant time: the previous checkpoint has been aborted;
* ii) null: the checkpoint is till ongoing without any notifications.
*/
@Nullable
protected String ackInstant(long checkpointId) {
Option<MessageBus.CkpMessage> ckpMessageOption = this.messageClient.getCkpMessage(checkpointId);
return ckpMessageOption.map(message -> message.inflightInstant).orElse(null);
protected String lastPendingInstant() {
return StreamerUtil.getLastPendingInstant(this.metaClient);
}
/**
* Prepares the instant time to write with for next checkpoint.
*
* @param eagerFlush Whether the data flush happens before the checkpoint barrier arrives
*
* @param hasData Whether the task has buffering data
* @return The instant time
*/
protected String instantToWrite(boolean eagerFlush) {
final long ckpId = prevCkp(eagerFlush);
String instant = ackInstant(ckpId);
protected String instantToWrite(boolean hasData) {
String instant = lastPendingInstant();
// if exactly-once semantics turns on,
// waits for the checkpoint notification until the checkpoint timeout threshold hits.
TimeWait timeWait = TimeWait.builder()
@@ -306,23 +247,18 @@ public abstract class AbstractStreamWriteFunction<I>
// wait condition:
// 1. there is no inflight instant
// 2. the inflight instant does not change and the checkpoint has buffering data
if (instant == null) {
if (instant == null || (instant.equals(this.currentInstant) && hasData)) {
// sleep for a while
boolean timeout = timeWait.waitFor();
if (timeout && MessageBus.notInitialCkp(ckpId)) {
if (timeout && instant != null) {
// if the timeout threshold hits but the last instant still not commit,
// and the task does not receive commit ask event(no data or aborted checkpoint),
// assumes the checkpoint was canceled silently and unblock the data flushing
confirming = false;
instant = lastPendingInstant();
} else {
// refresh the inflight instant
instant = ackInstant(ckpId);
instant = lastPendingInstant();
}
} else if (MessageBus.canAbort(instant, ckpId)) {
// the checkpoint was canceled, reuse the last instant
confirming = false;
instant = lastPendingInstant();
} else {
// the pending instant changed, that means the last instant was committed
// successfully.

View File

@@ -1,173 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.sink.message;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.util.FileIOUtils;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.util.StreamerUtil;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
/**
* A message bus for transferring the checkpoint messages.
*
* <p>Each time the driver starts a new instant, it writes a commit message into the bus, the write tasks
* then consume the message and unblocking the data flush.
*
* <p>Why we use the DFS based message queue instead of sending
* the {@link org.apache.flink.runtime.operators.coordination.OperatorEvent} ?
* The write task handles the operator event using the main mailbox executor which has the lowest priority for mails,
* it is also used to process the inputs. When the write task blocks and waits for the operator event to ack the valid instant to write,
* it actually blocks all the following events in the mailbox, the operator event can never be consumed then it causes deadlock.
*
* <p>The message bus is also more lightweight than the active timeline.
*/
public abstract class MessageBus implements AutoCloseable {
public static final long INITIAL_CKP_ID = 0L;
public static final String ABORTED_CKP_INSTANT = "aborted";
protected static final int MESSAGE_QUEUE_LENGTH = 20;
protected static final int CLIENT_MESSAGE_CACHE_SIZE = 10;
private static final String MESSAGE_BUS = "message_bus";
private static final String COMMIT = "commit";
private static final String COMMIT_EXTENSION = "." + COMMIT;
private static final String ABORTED_EXTENSION = ".aborted";
protected final FileSystem fs;
protected final String basePath;
protected final String messageBusPath;
protected MessageBus(FileSystem fs, String basePath) {
this.fs = fs;
this.basePath = basePath;
this.messageBusPath = messageBusPath(basePath);
}
public static MessageDriver getDriver(FileSystem fs, String basePath) {
return MessageDriver.getInstance(fs, basePath);
}
public static MessageClient getClient(FileSystem fs, String basePath) {
return MessageClient.getSingleton(fs, basePath);
}
public static MessageClient getClient(String basePath) {
FileSystem fs = FSUtils.getFs(basePath, StreamerUtil.getHadoopConf());
return MessageClient.getSingleton(fs, basePath);
}
// -------------------------------------------------------------------------
// Utilities
// -------------------------------------------------------------------------
public static boolean canAbort(String instant, long checkpointId) {
return ABORTED_CKP_INSTANT.equals(instant) && MessageBus.notInitialCkp(checkpointId);
}
public static boolean notInitialCkp(long checkpointId) {
return checkpointId != INITIAL_CKP_ID;
}
protected Path fullFilePath(String fileName) {
return new Path(messageBusPath, fileName);
}
protected static String messageBusPath(String basePath) {
return basePath + Path.SEPARATOR + HoodieTableMetaClient.AUXILIARYFOLDER_NAME + Path.SEPARATOR + MESSAGE_BUS;
}
protected static String getCommitFileName(long checkpointId) {
return checkpointId + COMMIT_EXTENSION;
}
protected static String getAbortedFileName(long checkpointId) {
return checkpointId + ABORTED_EXTENSION;
}
// -------------------------------------------------------------------------
// Inner Class
// -------------------------------------------------------------------------
/**
* A checkpoint message.
*/
public static class CkpMessage {
private static final String SEPARATOR = ",";
public final boolean committed; // whether the checkpoint is committed
public final long checkpointId;
public final String commitInstant;
public final String inflightInstant;
private CkpMessage(long checkpointId, String commitInstant, String inflightInstant) {
this.committed = true;
this.checkpointId = checkpointId;
this.commitInstant = commitInstant;
this.inflightInstant = inflightInstant;
}
private CkpMessage(long checkpointId) {
this.committed = false;
this.checkpointId = checkpointId;
this.commitInstant = ABORTED_CKP_INSTANT;
this.inflightInstant = ABORTED_CKP_INSTANT;
}
/**
* Encodes the instants as 'commitInstant,inflightInstant'.
*/
public static byte[] toBytes(String commitInstant, String inflightInstant) {
return (commitInstant + SEPARATOR + inflightInstant).getBytes(StandardCharsets.UTF_8);
}
public static CkpMessage fromBytes(long checkpointId, byte[] bytes) {
String content = new String(bytes, StandardCharsets.UTF_8);
String[] splits = content.split(SEPARATOR);
return new CkpMessage(checkpointId, splits[0], splits[1]);
}
public static CkpMessage fromPath(FileSystem fs, Path path) throws IOException {
final String[] splits = path.getName().split("\\.");
ValidationUtils.checkState(splits.length == 2, "Invalid checkpoint message file name: " + path.getName());
final long checkpointId = Long.parseLong(splits[0]);
final String suffix = splits[1];
if (suffix.equals(COMMIT)) {
try (FSDataInputStream is = fs.open(path)) {
byte[] bytes = FileIOUtils.readAsByteArray(is);
return CkpMessage.fromBytes(checkpointId, bytes);
}
} else {
return new CkpMessage(checkpointId);
}
}
}
}

View File

@@ -1,126 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.sink.message;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.exception.HoodieException;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.TreeMap;
/**
* A client that consumes messages from the {@link MessageBus}.
*/
public class MessageClient extends MessageBus {
private static final Logger LOG = LoggerFactory.getLogger(MessageClient.class);
private static final Map<String, MessageClient> CLIENTS = new HashMap<>();
private final TreeMap<Long, CkpMessage> ckpCache; // checkpoint id -> CkpMessage mapping
private MessageClient(FileSystem fs, String basePath) throws IOException {
super(fs, basePath);
this.ckpCache = new TreeMap<>();
}
/**
* Returns the message bus instance.
*
* <p>This expects to be called by the client.
*
* @param fs The filesystem
* @param basePath The table base path
* @return The instance of message bus
*/
private static MessageClient getInstance(FileSystem fs, String basePath) {
try {
return new MessageClient(fs, basePath);
} catch (IOException e) {
throw new HoodieException("Initialize checkpoint message bus error", e);
}
}
/**
* Returns the singleton message bus instance.
*
* <p>This expects to be called by the client.
*
* @param fs The filesystem
* @param basePath The table base path
* @return The instance of message bus
*/
public static synchronized MessageClient getSingleton(FileSystem fs, String basePath) {
return CLIENTS.computeIfAbsent(basePath,
k -> getInstance(fs, basePath));
}
public synchronized Option<CkpMessage> getCkpMessage(long checkpointId) {
if (this.ckpCache.size() >= CLIENT_MESSAGE_CACHE_SIZE) {
this.ckpCache.pollFirstEntry();
}
if (this.ckpCache.containsKey(checkpointId)) {
return Option.of(this.ckpCache.get(checkpointId));
}
final Path commitFilePath = fullFilePath(getCommitFileName(checkpointId));
try {
if (fs.exists(commitFilePath)) {
CkpMessage ckpMessage = CkpMessage.fromPath(fs, commitFilePath);
this.ckpCache.put(checkpointId, ckpMessage);
return Option.of(ckpMessage);
}
} catch (Throwable e) {
// ignored
LOG.warn("Read committed checkpoint message error: " + checkpointId, e);
return Option.empty();
}
final Path abortedFilePath = fullFilePath(getAbortedFileName(checkpointId));
try {
if (fs.exists(abortedFilePath)) {
CkpMessage ckpMessage = CkpMessage.fromPath(fs, abortedFilePath);
this.ckpCache.put(checkpointId, ckpMessage);
return Option.of(ckpMessage);
}
} catch (Throwable e) {
// ignored
LOG.warn("Read aborted checkpoint message error: " + checkpointId, e);
return Option.empty();
}
return Option.empty();
}
@VisibleForTesting
public TreeMap<Long, CkpMessage> getCkpCache() {
return ckpCache;
}
@Override
public void close() {
synchronized (this) {
this.ckpCache.clear();
}
}
}

View File

@@ -1,132 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.sink.message;
import org.apache.hudi.exception.HoodieException;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import java.io.IOException;
import java.util.Map;
import java.util.TreeMap;
/**
* A driver that produces messages to the {@link MessageBus}.
*/
public class MessageDriver extends MessageBus {
private final TreeMap<Long, Boolean> ckpIdCache; // checkpoint id -> isCommitted mapping
public MessageDriver(FileSystem fs, String basePath) throws IOException {
super(fs, basePath);
this.ckpIdCache = new TreeMap<>();
initialize();
}
/**
* Returns the message bus instance.
*
* <p>This expects to be called by the driver.
*
* @param fs The filesystem
* @param basePath The table base path
* @return The instance of message bus
*/
public static MessageDriver getInstance(FileSystem fs, String basePath) {
try {
return new MessageDriver(fs, basePath);
} catch (IOException e) {
throw new HoodieException("Initialize checkpoint message bus error", e);
}
}
/**
* Initialize the message bus, would clean all the messages.
*
* <p>This expects to be called by the driver.
*/
private void initialize() throws IOException {
Path path = new Path(messageBusPath(basePath));
if (fs.exists(path)) {
fs.delete(path, true);
}
fs.mkdirs(path);
}
/**
* Add a checkpoint commit message.
*
* @param checkpointId The checkpoint id
* @param commitInstant The committed instant
* @param inflightInstant The new inflight instant
*/
public void commitCkp(long checkpointId, String commitInstant, String inflightInstant) {
Path path = fullFilePath(getCommitFileName(checkpointId));
try (FSDataOutputStream outputStream = fs.create(path, true)) {
byte[] bytes = CkpMessage.toBytes(commitInstant, inflightInstant);
outputStream.write(bytes);
outputStream.close();
this.ckpIdCache.put(checkpointId, true);
clean();
} catch (Throwable e) {
throw new HoodieException("Adding committed message error for checkpoint: " + checkpointId, e);
}
}
/**
* Add an aborted checkpoint message.
*
* @param checkpointId The checkpoint id
*/
public void abortCkp(long checkpointId) {
Path path = fullFilePath(getAbortedFileName(checkpointId));
try {
fs.createNewFile(path);
this.ckpIdCache.put(checkpointId, false);
clean();
} catch (Throwable e) {
throw new HoodieException("Adding aborted message error for checkpoint: " + checkpointId, e);
}
}
private void clean() throws IOException {
int numToClean = this.ckpIdCache.size() - MESSAGE_QUEUE_LENGTH;
if (numToClean >= 10) {
for (int i = 0; i < numToClean; i++) {
Map.Entry<Long, Boolean> entry = this.ckpIdCache.pollFirstEntry();
final String fileName = entry.getValue() ? getCommitFileName(entry.getKey()) : getAbortedFileName(entry.getKey());
final Path filePath = fullFilePath(fileName);
fs.delete(filePath, false);
}
}
}
@VisibleForTesting
public TreeMap<Long, Boolean> getCkpIdCache() {
return ckpIdCache;
}
@Override
public void close() throws Exception {
this.ckpIdCache.clear();
}
}

View File

@@ -1,137 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.sink.message;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.util.StreamerUtil;
import org.apache.hudi.utils.TestConfigurations;
import org.apache.flink.configuration.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
import java.io.File;
import java.util.ArrayList;
import java.util.List;
import java.util.Set;
import java.util.stream.IntStream;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;
/**
* Test cases for {@link MessageBus}.
*/
public class TestMessageBus {
private String basePath;
private FileSystem fs;
private MessageDriver driver;
@TempDir
File tempFile;
@BeforeEach
public void beforeEach() throws Exception {
basePath = tempFile.getAbsolutePath();
this.fs = FSUtils.getFs(tempFile.getAbsolutePath(), StreamerUtil.getHadoopConf());
Configuration conf = TestConfigurations.getDefaultConf(basePath);
StreamerUtil.initTableIfNotExists(conf);
this.driver = MessageDriver.getInstance(fs, basePath);
}
@Test
void testWriteAndReadMessage() {
MessageClient client = MessageClient.getSingleton(fs, basePath);
// write and read 5 committed checkpoints
IntStream.range(0, 5).forEach(i -> driver.commitCkp(i, i + "", i + 1 + ""));
IntStream.range(0, 5).forEach(i -> {
Option<MessageBus.CkpMessage> messageOpt = client.getCkpMessage(i);
assertTrue(messageOpt.isPresent());
MessageBus.CkpMessage ckpMessage = messageOpt.get();
assertTrue(ckpMessage.committed);
assertThat(ckpMessage.commitInstant, is(i + ""));
assertThat(ckpMessage.inflightInstant, is(i + 1 + ""));
});
// write and read 5 aborted checkpoints
IntStream.range(5, 10).forEach(i -> driver.abortCkp(i));
IntStream.range(5, 10).forEach(i -> {
Option<MessageBus.CkpMessage> messageOpt = client.getCkpMessage(i);
assertTrue(messageOpt.isPresent());
MessageBus.CkpMessage ckpMessage = messageOpt.get();
assertFalse(ckpMessage.committed);
assertThat(ckpMessage.commitInstant, is(MessageBus.ABORTED_CKP_INSTANT));
assertThat(ckpMessage.inflightInstant, is(MessageBus.ABORTED_CKP_INSTANT));
});
}
@Test
void testWriteCleaning() {
// write and read 20 committed checkpoints
IntStream.range(0, 20).forEach(i -> driver.commitCkp(i, i + "", i + 1 + ""));
assertThat("The id cache should not be cleaned", driver.getCkpIdCache().size(), is(20));
// write and read 10 aborted checkpoints
IntStream.range(20, 29).forEach(i -> driver.abortCkp(i));
assertThat("The id cache should not be cleaned", driver.getCkpIdCache().size(), is(29));
driver.commitCkp(29, "29", "30");
assertThat("The cache should be cleaned", driver.getCkpIdCache().size(), is(20));
assertThat(longSet2String(driver.getCkpIdCache().keySet()),
is("10,11,12,13,14,15,16,17,18,19,20,21,22,23,24,25,26,27,28,29"));
}
@Test
void testReadCleaning() {
MessageClient client = MessageClient.getSingleton(fs, basePath);
// write and read 20 committed checkpoints
IntStream.range(0, 20).forEach(i -> driver.commitCkp(i, i + "", i + 1 + ""));
IntStream.range(0, 10).forEach(client::getCkpMessage);
assertThat("The checkpoint cache should not be cleaned", client.getCkpCache().size(), is(10));
client.getCkpMessage(10);
assertThat("The checkpoint cache should be cleaned", client.getCkpCache().size(), is(10));
IntStream.range(11, 15).forEach(client::getCkpMessage);
assertThat("The checkpoint cache should be cleaned", client.getCkpCache().size(), is(10));
assertThat(longSet2String(client.getCkpCache().keySet()), is("5,6,7,8,9,10,11,12,13,14"));
}
private static String longSet2String(Set<Long> longSet) {
List<String> elements = new ArrayList<>();
longSet.stream().mapToInt(Long::intValue).forEach(i -> elements.add(i + ""));
return String.join(",", elements);
}
}

View File

@@ -248,7 +248,6 @@ public class StreamWriteFunctionWrapper<I> implements TestFunctionWrapper<I> {
public void close() throws Exception {
coordinator.close();
ioManager.close();
writeFunction.close();
}
public StreamWriteOperatorCoordinator getCoordinator() {