[HUDI-1598] Write as minor batches during one checkpoint interval for the new writer (#2553)
This commit is contained in:
@@ -159,6 +159,12 @@ public class FlinkOptions {
|
||||
.defaultValue(4)
|
||||
.withDescription("Parallelism of tasks that do actual write, default is 4");
|
||||
|
||||
public static final ConfigOption<Double> WRITE_BATCH_SIZE = ConfigOptions
|
||||
.key("write.batch.size.MB")
|
||||
.doubleType()
|
||||
.defaultValue(128D) // 128MB
|
||||
.withDescription("Batch buffer size in MB to flush data into the underneath filesystem");
|
||||
|
||||
// -------------------------------------------------------------------------
|
||||
// Utilities
|
||||
// -------------------------------------------------------------------------
|
||||
|
||||
@@ -34,6 +34,7 @@ import org.apache.hudi.util.StreamerUtil;
|
||||
|
||||
import org.apache.flink.api.java.tuple.Tuple3;
|
||||
import org.apache.flink.configuration.Configuration;
|
||||
import org.apache.flink.runtime.state.CheckpointListener;
|
||||
import org.apache.flink.runtime.state.FunctionInitializationContext;
|
||||
import org.apache.flink.runtime.state.FunctionSnapshotContext;
|
||||
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
|
||||
@@ -50,7 +51,9 @@ import java.util.Map;
|
||||
/**
|
||||
* A {@link KeyedProcessFunction} where the write operations really happens.
|
||||
*/
|
||||
public class KeyedWriteProcessFunction extends KeyedProcessFunction<String, HoodieRecord, Tuple3<String, List<WriteStatus>, Integer>> implements CheckpointedFunction {
|
||||
public class KeyedWriteProcessFunction
|
||||
extends KeyedProcessFunction<String, HoodieRecord, Tuple3<String, List<WriteStatus>, Integer>>
|
||||
implements CheckpointedFunction, CheckpointListener {
|
||||
|
||||
private static final Logger LOG = LoggerFactory.getLogger(KeyedWriteProcessFunction.class);
|
||||
/**
|
||||
@@ -160,6 +163,11 @@ public class KeyedWriteProcessFunction extends KeyedProcessFunction<String, Hood
|
||||
putDataIntoBuffer(hoodieRecord);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void notifyCheckpointComplete(long checkpointId) throws Exception {
|
||||
this.writeClient.cleanHandles();
|
||||
}
|
||||
|
||||
public boolean hasRecordsIn() {
|
||||
return hasRecordsIn;
|
||||
}
|
||||
|
||||
@@ -25,6 +25,7 @@ import org.apache.hudi.client.common.HoodieFlinkEngineContext;
|
||||
import org.apache.hudi.common.config.SerializableConfiguration;
|
||||
import org.apache.hudi.common.model.HoodieRecord;
|
||||
import org.apache.hudi.common.model.WriteOperationType;
|
||||
import org.apache.hudi.common.util.ObjectSizeCalculator;
|
||||
import org.apache.hudi.index.HoodieIndex;
|
||||
import org.apache.hudi.operator.event.BatchWriteSuccessEvent;
|
||||
import org.apache.hudi.table.action.commit.FlinkWriteHelper;
|
||||
@@ -33,6 +34,7 @@ import org.apache.hudi.util.StreamerUtil;
|
||||
import org.apache.flink.annotation.VisibleForTesting;
|
||||
import org.apache.flink.configuration.Configuration;
|
||||
import org.apache.flink.runtime.operators.coordination.OperatorEventGateway;
|
||||
import org.apache.flink.runtime.state.CheckpointListener;
|
||||
import org.apache.flink.runtime.state.FunctionInitializationContext;
|
||||
import org.apache.flink.runtime.state.FunctionSnapshotContext;
|
||||
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
|
||||
@@ -48,6 +50,7 @@ import java.util.Collections;
|
||||
import java.util.LinkedHashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Random;
|
||||
import java.util.concurrent.locks.Condition;
|
||||
import java.util.concurrent.locks.ReentrantLock;
|
||||
import java.util.function.BiFunction;
|
||||
@@ -58,33 +61,39 @@ import java.util.function.BiFunction;
|
||||
* <p><h2>Work Flow</h2>
|
||||
*
|
||||
* <p>The function firstly buffers the data as a batch of {@link HoodieRecord}s,
|
||||
* It flushes(write) the records batch when a Flink checkpoint starts. After a batch has been written successfully,
|
||||
* It flushes(write) the records batch when a batch exceeds the configured size {@link FlinkOptions#WRITE_BATCH_SIZE}
|
||||
* or a Flink checkpoint starts. After a batch has been written successfully,
|
||||
* the function notifies its operator coordinator {@link StreamWriteOperatorCoordinator} to mark a successful write.
|
||||
*
|
||||
* <p><h2>Exactly-once Semantics</h2>
|
||||
* <p><h2>The Semantics</h2>
|
||||
*
|
||||
* <p>The task implements exactly-once semantics by buffering the data between checkpoints. The operator coordinator
|
||||
* starts a new instant on the time line when a checkpoint triggers, the coordinator checkpoints always
|
||||
* start before its operator, so when this function starts a checkpoint, a REQUESTED instant already exists.
|
||||
* The function process thread then block data buffering and the checkpoint thread starts flushing the existing data buffer.
|
||||
* When the existing data buffer write successfully, the process thread unblock and start buffering again for the next round checkpoint.
|
||||
* Because any checkpoint failures would trigger the write rollback, it implements the exactly-once semantics.
|
||||
*
|
||||
* <p>In order to improve the throughput, The function process thread does not block data buffering
|
||||
* after the checkpoint thread starts flushing the existing data buffer. So there is possibility that the next checkpoint
|
||||
* batch was written to current checkpoint. When a checkpoint failure triggers the write rollback, there may be some duplicate records
|
||||
* (e.g. the eager write batch), the semantics is still correct using the UPSERT operation.
|
||||
*
|
||||
* <p><h2>Fault Tolerance</h2>
|
||||
*
|
||||
* <p>The operator coordinator checks the validity for the last instant when it starts a new one. The operator rolls back
|
||||
* the written data and throws when any error occurs. This means any checkpoint or task failure would trigger a failover.
|
||||
* The operator coordinator would try several times when committing the writestatus.
|
||||
* <p>The operator coordinator checks and commits the last instant then starts a new one when a checkpoint finished successfully.
|
||||
* The operator rolls back the written data and throws to trigger a failover when any error occurs.
|
||||
* This means one Hoodie instant may span one or more checkpoints(some checkpoints notifications may be skipped).
|
||||
* If a checkpoint timed out, the next checkpoint would help to rewrite the left buffer data (clean the buffer in the last
|
||||
* step of the #flushBuffer method).
|
||||
*
|
||||
* <p>Note: The function task requires the input stream be partitioned by the partition fields to avoid different write tasks
|
||||
* write to the same file group that conflict. The general case for partition path is a datetime field,
|
||||
* so the sink task is very possible to have IO bottleneck, the more flexible solution is to shuffle the
|
||||
* data by the file group IDs.
|
||||
* <p>The operator coordinator would try several times when committing the write status.
|
||||
*
|
||||
* <p>Note: The function task requires the input stream be shuffled by the file IDs.
|
||||
*
|
||||
* @param <I> Type of the input record
|
||||
* @see StreamWriteOperatorCoordinator
|
||||
*/
|
||||
public class StreamWriteFunction<K, I, O> extends KeyedProcessFunction<K, I, O> implements CheckpointedFunction {
|
||||
public class StreamWriteFunction<K, I, O>
|
||||
extends KeyedProcessFunction<K, I, O>
|
||||
implements CheckpointedFunction, CheckpointListener {
|
||||
|
||||
private static final long serialVersionUID = 1L;
|
||||
|
||||
@@ -137,10 +146,15 @@ public class StreamWriteFunction<K, I, O> extends KeyedProcessFunction<K, I, O>
|
||||
*/
|
||||
private transient OperatorEventGateway eventGateway;
|
||||
|
||||
/**
|
||||
* The detector that tells if to flush the data as mini-batch.
|
||||
*/
|
||||
private transient BufferSizeDetector detector;
|
||||
|
||||
/**
|
||||
* Constructs a StreamingSinkFunction.
|
||||
*
|
||||
* @param config The config options
|
||||
* @param config The config options
|
||||
*/
|
||||
public StreamWriteFunction(Configuration config) {
|
||||
this.config = config;
|
||||
@@ -149,6 +163,7 @@ public class StreamWriteFunction<K, I, O> extends KeyedProcessFunction<K, I, O>
|
||||
@Override
|
||||
public void open(Configuration parameters) throws IOException {
|
||||
this.taskID = getRuntimeContext().getIndexOfThisSubtask();
|
||||
this.detector = new BufferSizeDetector(this.config.getDouble(FlinkOptions.WRITE_BATCH_SIZE));
|
||||
initBuffer();
|
||||
initWriteClient();
|
||||
initWriteFunction();
|
||||
@@ -166,11 +181,8 @@ public class StreamWriteFunction<K, I, O> extends KeyedProcessFunction<K, I, O>
|
||||
// Based on the fact that the coordinator starts the checkpoint first,
|
||||
// it would check the validity.
|
||||
this.onCheckpointing = true;
|
||||
this.currentInstant = this.writeClient.getInflightAndRequestedInstant(this.config.get(FlinkOptions.TABLE_TYPE));
|
||||
Preconditions.checkNotNull(this.currentInstant,
|
||||
"No inflight instant when flushing data");
|
||||
// wait for the buffer data flush out and request a new instant
|
||||
flushBuffer();
|
||||
flushBuffer(true);
|
||||
// signal the task thread to start buffering
|
||||
addToBufferCondition.signal();
|
||||
} finally {
|
||||
@@ -186,6 +198,7 @@ public class StreamWriteFunction<K, I, O> extends KeyedProcessFunction<K, I, O>
|
||||
if (onCheckpointing) {
|
||||
addToBufferCondition.await();
|
||||
}
|
||||
flushBufferOnCondition(value);
|
||||
putDataIntoBuffer(value);
|
||||
} finally {
|
||||
bufferLock.unlock();
|
||||
@@ -199,6 +212,11 @@ public class StreamWriteFunction<K, I, O> extends KeyedProcessFunction<K, I, O>
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void notifyCheckpointComplete(long checkpointId) {
|
||||
this.writeClient.cleanHandles();
|
||||
}
|
||||
|
||||
// -------------------------------------------------------------------------
|
||||
// Getter/Setter
|
||||
// -------------------------------------------------------------------------
|
||||
@@ -252,6 +270,42 @@ public class StreamWriteFunction<K, I, O> extends KeyedProcessFunction<K, I, O>
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Tool to detect if to flush out the existing buffer.
|
||||
* Sampling the record to compute the size with 0.01 percentage.
|
||||
*/
|
||||
private static class BufferSizeDetector {
|
||||
private final Random random = new Random(47);
|
||||
private static final int DENOMINATOR = 100;
|
||||
|
||||
private final double batchSizeBytes;
|
||||
|
||||
private long lastRecordSize = -1L;
|
||||
private long totalSize = 0L;
|
||||
|
||||
BufferSizeDetector(double batchSizeMb) {
|
||||
this.batchSizeBytes = batchSizeMb * 1024 * 1024;
|
||||
}
|
||||
|
||||
boolean detect(Object record) {
|
||||
if (lastRecordSize == -1 || sampling()) {
|
||||
lastRecordSize = ObjectSizeCalculator.getObjectSize(record);
|
||||
}
|
||||
totalSize += lastRecordSize;
|
||||
return totalSize > this.batchSizeBytes;
|
||||
}
|
||||
|
||||
boolean sampling() {
|
||||
// 0.01 sampling percentage
|
||||
return random.nextInt(DENOMINATOR) == 1;
|
||||
}
|
||||
|
||||
void reset() {
|
||||
this.lastRecordSize = -1L;
|
||||
this.totalSize = 0L;
|
||||
}
|
||||
}
|
||||
|
||||
private void putDataIntoBuffer(I value) {
|
||||
HoodieRecord<?> record = (HoodieRecord<?>) value;
|
||||
final String fileId = record.getCurrentLocation().getFileId();
|
||||
@@ -262,8 +316,25 @@ public class StreamWriteFunction<K, I, O> extends KeyedProcessFunction<K, I, O>
|
||||
this.buffer.get(key).add(record);
|
||||
}
|
||||
|
||||
/**
|
||||
* Flush the data buffer if the buffer size is greater than
|
||||
* the configured value {@link FlinkOptions#WRITE_BATCH_SIZE}.
|
||||
*
|
||||
* @param value HoodieRecord
|
||||
*/
|
||||
private void flushBufferOnCondition(I value) {
|
||||
boolean needFlush = this.detector.detect(value);
|
||||
if (needFlush) {
|
||||
flushBuffer(false);
|
||||
this.detector.reset();
|
||||
}
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked, rawtypes")
|
||||
private void flushBuffer() {
|
||||
private void flushBuffer(boolean isFinalBatch) {
|
||||
this.currentInstant = this.writeClient.getInflightAndRequestedInstant(this.config.get(FlinkOptions.TABLE_TYPE));
|
||||
Preconditions.checkNotNull(this.currentInstant,
|
||||
"No inflight instant when flushing data");
|
||||
final List<WriteStatus> writeStatus;
|
||||
if (buffer.size() > 0) {
|
||||
writeStatus = new ArrayList<>();
|
||||
@@ -278,12 +349,13 @@ public class StreamWriteFunction<K, I, O> extends KeyedProcessFunction<K, I, O>
|
||||
writeStatus.addAll(writeFunction.apply(records, currentInstant));
|
||||
}
|
||||
});
|
||||
this.buffer.clear();
|
||||
} else {
|
||||
LOG.info("No data to write in subtask [{}] for instant [{}]", taskID, currentInstant);
|
||||
writeStatus = Collections.emptyList();
|
||||
}
|
||||
this.eventGateway.sendEventToCoordinator(new BatchWriteSuccessEvent(this.taskID, currentInstant, writeStatus));
|
||||
this.eventGateway.sendEventToCoordinator(
|
||||
new BatchWriteSuccessEvent(this.taskID, currentInstant, writeStatus, isFinalBatch));
|
||||
this.buffer.clear();
|
||||
this.currentInstant = "";
|
||||
}
|
||||
}
|
||||
|
||||
@@ -80,12 +80,15 @@ public class StreamWriteOperatorCoordinator
|
||||
*/
|
||||
private transient HoodieFlinkWriteClient writeClient;
|
||||
|
||||
/**
|
||||
* Current data buffering checkpoint.
|
||||
*/
|
||||
private long inFlightCheckpoint = -1;
|
||||
|
||||
/**
|
||||
* Current REQUESTED instant, for validation.
|
||||
*/
|
||||
private String inFlightInstant = "";
|
||||
private String instant = "";
|
||||
|
||||
/**
|
||||
* Event buffer for one round of checkpointing. When all the elements are non-null and have the same
|
||||
@@ -119,6 +122,8 @@ public class StreamWriteOperatorCoordinator
|
||||
initWriteClient();
|
||||
// init table, create it if not exists.
|
||||
initTableIfNotExists(this.conf);
|
||||
// start a new instant
|
||||
startInstant();
|
||||
}
|
||||
|
||||
@Override
|
||||
@@ -132,20 +137,14 @@ public class StreamWriteOperatorCoordinator
|
||||
@Override
|
||||
public void checkpointCoordinator(long checkpointId, CompletableFuture<byte[]> result) {
|
||||
try {
|
||||
final String errMsg = "A new checkpoint starts while the last checkpoint buffer"
|
||||
+ " data has not finish writing, roll back the last write and throw";
|
||||
checkAndForceCommit(errMsg);
|
||||
this.inFlightInstant = this.writeClient.startCommit();
|
||||
this.writeClient.transitionRequestedToInflight(conf.getString(FlinkOptions.TABLE_TYPE), this.inFlightInstant);
|
||||
this.inFlightCheckpoint = checkpointId;
|
||||
LOG.info("Create instant [{}], at checkpoint [{}]", this.inFlightInstant, checkpointId);
|
||||
result.complete(writeCheckpointBytes());
|
||||
} catch (Throwable throwable) {
|
||||
// when a checkpoint fails, throws directly.
|
||||
result.completeExceptionally(
|
||||
new CompletionException(
|
||||
String.format("Failed to checkpoint Instant %s for source %s",
|
||||
this.inFlightInstant, this.getClass().getSimpleName()), throwable));
|
||||
this.instant, this.getClass().getSimpleName()), throwable));
|
||||
}
|
||||
}
|
||||
|
||||
@@ -153,6 +152,15 @@ public class StreamWriteOperatorCoordinator
|
||||
public void checkpointComplete(long checkpointId) {
|
||||
// start to commit the instant.
|
||||
checkAndCommitWithRetry();
|
||||
// start new instant.
|
||||
startInstant();
|
||||
}
|
||||
|
||||
private void startInstant() {
|
||||
this.instant = this.writeClient.startCommit();
|
||||
this.writeClient.transitionRequestedToInflight(conf.getString(FlinkOptions.TABLE_TYPE), this.instant);
|
||||
LOG.info("Create instant [{}] for table [{}] with type [{}]", this.instant,
|
||||
this.conf.getString(FlinkOptions.TABLE_NAME), conf.getString(FlinkOptions.TABLE_TYPE));
|
||||
}
|
||||
|
||||
public void notifyCheckpointAborted(long checkpointId) {
|
||||
@@ -175,10 +183,14 @@ public class StreamWriteOperatorCoordinator
|
||||
Preconditions.checkState(operatorEvent instanceof BatchWriteSuccessEvent,
|
||||
"The coordinator can only handle BatchWriteSuccessEvent");
|
||||
BatchWriteSuccessEvent event = (BatchWriteSuccessEvent) operatorEvent;
|
||||
Preconditions.checkState(event.getInstantTime().equals(this.inFlightInstant),
|
||||
Preconditions.checkState(event.getInstantTime().equals(this.instant),
|
||||
String.format("Receive an unexpected event for instant %s from task %d",
|
||||
event.getInstantTime(), event.getTaskID()));
|
||||
this.eventBuffer[event.getTaskID()] = event;
|
||||
if (this.eventBuffer[event.getTaskID()] != null) {
|
||||
this.eventBuffer[event.getTaskID()].mergeWith(event);
|
||||
} else {
|
||||
this.eventBuffer[event.getTaskID()] = event;
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
@@ -218,7 +230,7 @@ public class StreamWriteOperatorCoordinator
|
||||
DataOutputStream out = new DataOutputViewStreamWrapper(baos)) {
|
||||
|
||||
out.writeLong(this.inFlightCheckpoint);
|
||||
byte[] serializedInstant = this.inFlightInstant.getBytes();
|
||||
byte[] serializedInstant = this.instant.getBytes();
|
||||
out.writeInt(serializedInstant.length);
|
||||
out.write(serializedInstant);
|
||||
out.flush();
|
||||
@@ -239,12 +251,12 @@ public class StreamWriteOperatorCoordinator
|
||||
int serializedInstantSize = in.readInt();
|
||||
byte[] serializedInstant = readBytes(in, serializedInstantSize);
|
||||
this.inFlightCheckpoint = checkpointID;
|
||||
this.inFlightInstant = new String(serializedInstant);
|
||||
this.instant = new String(serializedInstant);
|
||||
}
|
||||
}
|
||||
|
||||
private void reset() {
|
||||
this.inFlightInstant = "";
|
||||
this.instant = "";
|
||||
this.eventBuffer = new BatchWriteSuccessEvent[this.parallelism];
|
||||
}
|
||||
|
||||
@@ -253,8 +265,8 @@ public class StreamWriteOperatorCoordinator
|
||||
// forced but still has inflight instant
|
||||
String inflightInstant = writeClient.getInflightAndRequestedInstant(this.conf.getString(FlinkOptions.TABLE_TYPE));
|
||||
if (inflightInstant != null) {
|
||||
assert inflightInstant.equals(this.inFlightInstant);
|
||||
writeClient.rollback(this.inFlightInstant);
|
||||
assert inflightInstant.equals(this.instant);
|
||||
writeClient.rollback(this.instant);
|
||||
throw new HoodieException(errMsg);
|
||||
}
|
||||
if (Arrays.stream(eventBuffer).allMatch(Objects::isNull)) {
|
||||
@@ -277,6 +289,10 @@ public class StreamWriteOperatorCoordinator
|
||||
if (!checkReady()) {
|
||||
// Do not throw if the try times expires but the event buffer are still not ready,
|
||||
// because we have a force check when next checkpoint starts.
|
||||
if (tryTimes == retryTimes) {
|
||||
// Throw if the try times expires but the event buffer are still not ready
|
||||
throw new HoodieException("Try " + retryTimes + " to commit instant [" + this.instant + "] failed");
|
||||
}
|
||||
sleepFor(retryIntervalMillis);
|
||||
continue;
|
||||
}
|
||||
@@ -284,9 +300,9 @@ public class StreamWriteOperatorCoordinator
|
||||
return;
|
||||
} catch (Throwable throwable) {
|
||||
String cause = throwable.getCause() == null ? "" : throwable.getCause().toString();
|
||||
LOG.warn("Try to commit the instant {} failed, with times {} and cause {}", this.inFlightInstant, tryTimes, cause);
|
||||
LOG.warn("Try to commit the instant {} failed, with times {} and cause {}", this.instant, tryTimes, cause);
|
||||
if (tryTimes == retryTimes) {
|
||||
throw new HoodieException(throwable);
|
||||
throw new HoodieException("Not all write tasks finish the batch write to commit", throwable);
|
||||
}
|
||||
sleepFor(retryIntervalMillis);
|
||||
}
|
||||
@@ -307,8 +323,8 @@ public class StreamWriteOperatorCoordinator
|
||||
|
||||
/** Checks the buffer is ready to commit. */
|
||||
private boolean checkReady() {
|
||||
return Arrays.stream(eventBuffer).allMatch(event ->
|
||||
event != null && event.getInstantTime().equals(this.inFlightInstant));
|
||||
return Arrays.stream(eventBuffer)
|
||||
.allMatch(event -> event != null && event.isReady(this.instant));
|
||||
}
|
||||
|
||||
/** Performs the actual commit action. */
|
||||
@@ -320,7 +336,7 @@ public class StreamWriteOperatorCoordinator
|
||||
|
||||
if (writeResults.size() == 0) {
|
||||
// No data has written, clear the metadata file
|
||||
this.writeClient.deletePendingInstant(this.conf.getString(FlinkOptions.TABLE_TYPE), this.inFlightInstant);
|
||||
this.writeClient.deletePendingInstant(this.conf.getString(FlinkOptions.TABLE_TYPE), this.instant);
|
||||
reset();
|
||||
return;
|
||||
}
|
||||
@@ -337,12 +353,12 @@ public class StreamWriteOperatorCoordinator
|
||||
+ totalErrorRecords + "/" + totalRecords);
|
||||
}
|
||||
|
||||
boolean success = writeClient.commit(this.inFlightInstant, writeResults, Option.of(checkpointCommitMetadata));
|
||||
boolean success = writeClient.commit(this.instant, writeResults, Option.of(checkpointCommitMetadata));
|
||||
if (success) {
|
||||
reset();
|
||||
LOG.info("Commit instant [{}] success!", this.inFlightInstant);
|
||||
LOG.info("Commit instant [{}] success!", this.instant);
|
||||
} else {
|
||||
throw new HoodieException(String.format("Commit instant [%s] failed!", this.inFlightInstant));
|
||||
throw new HoodieException(String.format("Commit instant [%s] failed!", this.instant));
|
||||
}
|
||||
} else {
|
||||
LOG.error("Error when writing. Errors/Total=" + totalErrorRecords + "/" + totalRecords);
|
||||
@@ -355,8 +371,8 @@ public class StreamWriteOperatorCoordinator
|
||||
}
|
||||
});
|
||||
// Rolls back instant
|
||||
writeClient.rollback(this.inFlightInstant);
|
||||
throw new HoodieException(String.format("Commit instant [%s] failed and rolled back !", this.inFlightInstant));
|
||||
writeClient.rollback(this.instant);
|
||||
throw new HoodieException(String.format("Commit instant [%s] failed and rolled back !", this.instant));
|
||||
}
|
||||
}
|
||||
|
||||
@@ -366,8 +382,14 @@ public class StreamWriteOperatorCoordinator
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
public String getInFlightInstant() {
|
||||
return inFlightInstant;
|
||||
public String getInstant() {
|
||||
return instant;
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
@SuppressWarnings("rawtypes")
|
||||
public HoodieFlinkWriteClient getWriteClient() {
|
||||
return writeClient;
|
||||
}
|
||||
|
||||
/**
|
||||
|
||||
@@ -21,7 +21,9 @@ package org.apache.hudi.operator.event;
|
||||
import org.apache.flink.runtime.operators.coordination.OperatorEvent;
|
||||
|
||||
import org.apache.hudi.client.WriteStatus;
|
||||
import org.apache.hudi.common.util.ValidationUtils;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
@@ -30,17 +32,38 @@ import java.util.List;
|
||||
public class BatchWriteSuccessEvent implements OperatorEvent {
|
||||
private static final long serialVersionUID = 1L;
|
||||
|
||||
private final List<WriteStatus> writeStatuses;
|
||||
private List<WriteStatus> writeStatuses;
|
||||
private final int taskID;
|
||||
private final String instantTime;
|
||||
private boolean isLastBatch;
|
||||
|
||||
public BatchWriteSuccessEvent(
|
||||
int taskID,
|
||||
String instantTime,
|
||||
List<WriteStatus> writeStatuses) {
|
||||
this(taskID, instantTime, writeStatuses, false);
|
||||
}
|
||||
|
||||
/**
|
||||
* Creates an event.
|
||||
*
|
||||
* @param taskID The task ID
|
||||
* @param instantTime The instant time under which to write the data
|
||||
* @param writeStatuses The write statues list
|
||||
* @param isLastBatch Whether the event reports the last batch
|
||||
* within an checkpoint interval,
|
||||
* if true, the whole data set of the checkpoint
|
||||
* has been flushed successfully
|
||||
*/
|
||||
public BatchWriteSuccessEvent(
|
||||
int taskID,
|
||||
String instantTime,
|
||||
List<WriteStatus> writeStatuses,
|
||||
boolean isLastBatch) {
|
||||
this.taskID = taskID;
|
||||
this.instantTime = instantTime;
|
||||
this.writeStatuses = writeStatuses;
|
||||
this.writeStatuses = new ArrayList<>(writeStatuses);
|
||||
this.isLastBatch = isLastBatch;
|
||||
}
|
||||
|
||||
public List<WriteStatus> getWriteStatuses() {
|
||||
@@ -54,4 +77,28 @@ public class BatchWriteSuccessEvent implements OperatorEvent {
|
||||
public String getInstantTime() {
|
||||
return instantTime;
|
||||
}
|
||||
|
||||
public boolean isLastBatch() {
|
||||
return isLastBatch;
|
||||
}
|
||||
|
||||
/**
|
||||
* Merges this event with given {@link BatchWriteSuccessEvent} {@code other}.
|
||||
*
|
||||
* @param other The event to be merged
|
||||
*/
|
||||
public void mergeWith(BatchWriteSuccessEvent other) {
|
||||
ValidationUtils.checkArgument(this.instantTime.equals(other.instantTime));
|
||||
ValidationUtils.checkArgument(this.taskID == other.taskID);
|
||||
this.isLastBatch |= other.isLastBatch; // true if one of the event isLastBatch true.
|
||||
List<WriteStatus> statusList = new ArrayList<>();
|
||||
statusList.addAll(this.writeStatuses);
|
||||
statusList.addAll(other.writeStatuses);
|
||||
this.writeStatuses = statusList;
|
||||
}
|
||||
|
||||
/** Returns whether the event is ready to commit. */
|
||||
public boolean isReady(String currentInstant) {
|
||||
return isLastBatch && this.instantTime.equals(currentInstant);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -89,7 +89,7 @@ public class BucketAssignFunction<K, I, O extends HoodieRecord<?>>
|
||||
|
||||
@Override
|
||||
public void snapshotState(FunctionSnapshotContext context) {
|
||||
this.bucketAssigner.reset();
|
||||
// no operation
|
||||
}
|
||||
|
||||
@Override
|
||||
@@ -144,6 +144,7 @@ public class BucketAssignFunction<K, I, O extends HoodieRecord<?>>
|
||||
@Override
|
||||
public void notifyCheckpointComplete(long l) {
|
||||
// Refresh the table state when there are new commits.
|
||||
this.bucketAssigner.reset();
|
||||
this.bucketAssigner.refreshTable();
|
||||
}
|
||||
}
|
||||
|
||||
@@ -18,6 +18,7 @@
|
||||
|
||||
package org.apache.hudi.util;
|
||||
|
||||
import org.apache.hudi.common.model.HoodieRecordLocation;
|
||||
import org.apache.hudi.common.model.HoodieTableType;
|
||||
import org.apache.hudi.common.table.HoodieTableMetaClient;
|
||||
import org.apache.hudi.keygen.SimpleAvroKeyGenerator;
|
||||
@@ -53,6 +54,7 @@ import java.io.File;
|
||||
import java.io.IOException;
|
||||
import java.io.StringReader;
|
||||
import java.util.List;
|
||||
import java.util.Objects;
|
||||
import java.util.Properties;
|
||||
|
||||
/**
|
||||
@@ -293,4 +295,9 @@ public class StreamerUtil {
|
||||
public static String generateBucketKey(String partitionPath, String fileId) {
|
||||
return String.format("%s_%s", partitionPath, fileId);
|
||||
}
|
||||
|
||||
/** Returns whether the location represents an insert. */
|
||||
public static boolean isInsert(HoodieRecordLocation loc) {
|
||||
return Objects.equals(loc.getInstantTime(), "I");
|
||||
}
|
||||
}
|
||||
|
||||
@@ -20,6 +20,7 @@ package org.apache.hudi.operator;
|
||||
|
||||
import org.apache.hudi.client.HoodieFlinkWriteClient;
|
||||
import org.apache.hudi.client.WriteStatus;
|
||||
import org.apache.hudi.common.model.HoodieRecord;
|
||||
import org.apache.hudi.common.table.timeline.HoodieInstant;
|
||||
import org.apache.hudi.exception.HoodieException;
|
||||
import org.apache.hudi.operator.event.BatchWriteSuccessEvent;
|
||||
@@ -150,7 +151,8 @@ public class StreamWriteFunctionTest {
|
||||
assertNotNull(funcWrapper.getEventBuffer()[0], "The coordinator missed the event");
|
||||
|
||||
funcWrapper.checkpointComplete(2);
|
||||
checkInstantState(funcWrapper.getWriteClient(), HoodieInstant.State.REQUESTED, null);
|
||||
// started a new instant already
|
||||
checkInflightInstant(funcWrapper.getWriteClient());
|
||||
checkInstantState(funcWrapper.getWriteClient(), HoodieInstant.State.COMPLETED, instant);
|
||||
}
|
||||
|
||||
@@ -187,12 +189,13 @@ public class StreamWriteFunctionTest {
|
||||
funcWrapper.invoke(rowData);
|
||||
}
|
||||
|
||||
// this triggers the data write and event send
|
||||
funcWrapper.checkpointFunction(2);
|
||||
// Do not sent the write event and fails the checkpoint
|
||||
assertThrows(HoodieException.class,
|
||||
() -> funcWrapper.checkpointFails(2),
|
||||
"The last checkpoint was aborted, roll back the last write and throw");
|
||||
// this triggers NPE cause there is no inflight instant
|
||||
assertThrows(NullPointerException.class,
|
||||
() -> funcWrapper.checkpointFunction(2),
|
||||
"No inflight instant when flushing data");
|
||||
// do not sent the write event and fails the checkpoint,
|
||||
// behaves like the last checkpoint is successful.
|
||||
funcWrapper.checkpointFails(2);
|
||||
}
|
||||
|
||||
@Test
|
||||
@@ -212,13 +215,13 @@ public class StreamWriteFunctionTest {
|
||||
|
||||
final OperatorEvent nextEvent = funcWrapper.getNextEvent();
|
||||
assertThat("The operator expect to send an event", nextEvent, instanceOf(BatchWriteSuccessEvent.class));
|
||||
checkWrittenData(tempFile, EXPECTED1);
|
||||
|
||||
funcWrapper.getCoordinator().handleEventFromOperator(0, nextEvent);
|
||||
assertNotNull(funcWrapper.getEventBuffer()[0], "The coordinator missed the event");
|
||||
|
||||
checkInstantState(funcWrapper.getWriteClient(), HoodieInstant.State.REQUESTED, instant);
|
||||
funcWrapper.checkpointComplete(1);
|
||||
checkWrittenData(tempFile, EXPECTED1);
|
||||
// the coordinator checkpoint commits the inflight instant.
|
||||
checkInstantState(funcWrapper.getWriteClient(), HoodieInstant.State.COMPLETED, instant);
|
||||
checkWrittenData(tempFile, EXPECTED1);
|
||||
@@ -241,15 +244,16 @@ public class StreamWriteFunctionTest {
|
||||
// this triggers the data write and event send
|
||||
funcWrapper.checkpointFunction(1);
|
||||
|
||||
final OperatorEvent nextEvent = funcWrapper.getNextEvent();
|
||||
OperatorEvent nextEvent = funcWrapper.getNextEvent();
|
||||
assertThat("The operator expect to send an event", nextEvent, instanceOf(BatchWriteSuccessEvent.class));
|
||||
checkWrittenData(tempFile, EXPECTED3, 1);
|
||||
|
||||
funcWrapper.getCoordinator().handleEventFromOperator(0, nextEvent);
|
||||
assertNotNull(funcWrapper.getEventBuffer()[0], "The coordinator missed the event");
|
||||
|
||||
funcWrapper.checkpointComplete(1);
|
||||
|
||||
checkWrittenData(tempFile, EXPECTED3, 1);
|
||||
|
||||
// insert duplicates again
|
||||
for (RowData rowData : TestData.DATA_SET_THREE) {
|
||||
funcWrapper.invoke(rowData);
|
||||
@@ -257,6 +261,10 @@ public class StreamWriteFunctionTest {
|
||||
|
||||
funcWrapper.checkpointFunction(2);
|
||||
|
||||
nextEvent = funcWrapper.getNextEvent();
|
||||
funcWrapper.getCoordinator().handleEventFromOperator(0, nextEvent);
|
||||
funcWrapper.checkpointComplete(2);
|
||||
|
||||
checkWrittenData(tempFile, EXPECTED3, 1);
|
||||
}
|
||||
|
||||
@@ -306,10 +314,84 @@ public class StreamWriteFunctionTest {
|
||||
checkWrittenData(tempFile, EXPECTED2);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testInsertWithMiniBatches() throws Exception {
|
||||
// reset the config option
|
||||
Configuration conf = TestConfigurations.getDefaultConf(tempFile.getAbsolutePath());
|
||||
conf.setDouble(FlinkOptions.WRITE_BATCH_SIZE, 0.001); // 1Kb batch size
|
||||
funcWrapper = new StreamWriteFunctionWrapper<>(tempFile.getAbsolutePath(), conf);
|
||||
|
||||
// open the function and ingest data
|
||||
funcWrapper.openFunction();
|
||||
// Each record is 424 bytes. so 3 records expect to trigger a mini-batch write
|
||||
for (RowData rowData : TestData.DATA_SET_THREE) {
|
||||
funcWrapper.invoke(rowData);
|
||||
}
|
||||
|
||||
Map<String, List<HoodieRecord>> dataBuffer = funcWrapper.getDataBuffer();
|
||||
assertThat("Should have 1 data bucket", dataBuffer.size(), is(1));
|
||||
assertThat("2 records expect to flush out as a mini-batch",
|
||||
dataBuffer.values().stream().findFirst().map(List::size).orElse(-1),
|
||||
is(3));
|
||||
|
||||
// this triggers the data write and event send
|
||||
funcWrapper.checkpointFunction(1);
|
||||
assertThat("All data should be flushed out", dataBuffer.size(), is(0));
|
||||
|
||||
final OperatorEvent event1 = funcWrapper.getNextEvent(); // remove the first event first
|
||||
final OperatorEvent event2 = funcWrapper.getNextEvent();
|
||||
assertThat("The operator expect to send an event", event2, instanceOf(BatchWriteSuccessEvent.class));
|
||||
|
||||
funcWrapper.getCoordinator().handleEventFromOperator(0, event1);
|
||||
funcWrapper.getCoordinator().handleEventFromOperator(0, event2);
|
||||
assertNotNull(funcWrapper.getEventBuffer()[0], "The coordinator missed the event");
|
||||
|
||||
String instant = funcWrapper.getWriteClient()
|
||||
.getInflightAndRequestedInstant("COPY_ON_WRITE");
|
||||
|
||||
funcWrapper.checkpointComplete(1);
|
||||
|
||||
Map<String, String> expected = new HashMap<>();
|
||||
expected.put("par1", "[id1,par1,id1,Danny,23,1,par1, "
|
||||
+ "id1,par1,id1,Danny,23,1,par1, "
|
||||
+ "id1,par1,id1,Danny,23,1,par1, "
|
||||
+ "id1,par1,id1,Danny,23,1,par1, "
|
||||
+ "id1,par1,id1,Danny,23,1,par1]");
|
||||
checkWrittenData(tempFile, expected, 1);
|
||||
|
||||
// started a new instant already
|
||||
checkInflightInstant(funcWrapper.getWriteClient());
|
||||
checkInstantState(funcWrapper.getWriteClient(), HoodieInstant.State.COMPLETED, instant);
|
||||
|
||||
// insert duplicates again
|
||||
for (RowData rowData : TestData.DATA_SET_THREE) {
|
||||
funcWrapper.invoke(rowData);
|
||||
}
|
||||
|
||||
funcWrapper.checkpointFunction(2);
|
||||
|
||||
final OperatorEvent event3 = funcWrapper.getNextEvent(); // remove the first event first
|
||||
final OperatorEvent event4 = funcWrapper.getNextEvent();
|
||||
final OperatorEvent event5 = funcWrapper.getNextEvent();
|
||||
funcWrapper.getCoordinator().handleEventFromOperator(0, event3);
|
||||
funcWrapper.getCoordinator().handleEventFromOperator(0, event4);
|
||||
funcWrapper.getCoordinator().handleEventFromOperator(0, event5);
|
||||
funcWrapper.checkpointComplete(2);
|
||||
|
||||
// Same the original base file content.
|
||||
checkWrittenData(tempFile, expected, 1);
|
||||
}
|
||||
|
||||
// -------------------------------------------------------------------------
|
||||
// Utilities
|
||||
// -------------------------------------------------------------------------
|
||||
|
||||
@SuppressWarnings("rawtypes")
|
||||
private void checkInflightInstant(HoodieFlinkWriteClient writeClient) {
|
||||
final String instant = writeClient.getInflightAndRequestedInstant("COPY_ON_WRITE");
|
||||
assertNotNull(instant);
|
||||
}
|
||||
|
||||
@SuppressWarnings("rawtypes")
|
||||
private void checkInstantState(
|
||||
HoodieFlinkWriteClient writeClient,
|
||||
|
||||
@@ -18,8 +18,11 @@
|
||||
|
||||
package org.apache.hudi.operator;
|
||||
|
||||
import org.apache.hudi.client.WriteStatus;
|
||||
import org.apache.hudi.common.fs.FSUtils;
|
||||
import org.apache.hudi.common.model.HoodieWriteStat;
|
||||
import org.apache.hudi.common.table.HoodieTableMetaClient;
|
||||
import org.apache.hudi.exception.HoodieException;
|
||||
import org.apache.hudi.operator.event.BatchWriteSuccessEvent;
|
||||
import org.apache.hudi.operator.utils.TestConfigurations;
|
||||
import org.apache.hudi.util.StreamerUtil;
|
||||
@@ -37,6 +40,9 @@ import java.io.IOException;
|
||||
import java.util.Collections;
|
||||
import java.util.concurrent.CompletableFuture;
|
||||
|
||||
import static org.hamcrest.CoreMatchers.is;
|
||||
import static org.hamcrest.MatcherAssert.assertThat;
|
||||
import static org.junit.jupiter.api.Assertions.assertNotEquals;
|
||||
import static org.junit.jupiter.api.Assertions.assertThrows;
|
||||
import static org.junit.jupiter.api.Assertions.assertTrue;
|
||||
|
||||
@@ -61,6 +67,34 @@ public class StreamWriteOperatorCoordinatorTest {
|
||||
coordinator.close();
|
||||
}
|
||||
|
||||
@Test
|
||||
void testInstantState() {
|
||||
String instant = coordinator.getInstant();
|
||||
assertNotEquals("", instant);
|
||||
|
||||
WriteStatus writeStatus = new WriteStatus(true, 0.1D);
|
||||
writeStatus.setPartitionPath("par1");
|
||||
writeStatus.setStat(new HoodieWriteStat());
|
||||
OperatorEvent event0 =
|
||||
new BatchWriteSuccessEvent(0, instant, Collections.singletonList(writeStatus), true);
|
||||
|
||||
WriteStatus writeStatus1 = new WriteStatus(false, 0.2D);
|
||||
writeStatus1.setPartitionPath("par2");
|
||||
writeStatus1.setStat(new HoodieWriteStat());
|
||||
OperatorEvent event1 =
|
||||
new BatchWriteSuccessEvent(1, instant, Collections.singletonList(writeStatus1), true);
|
||||
coordinator.handleEventFromOperator(0, event0);
|
||||
coordinator.handleEventFromOperator(1, event1);
|
||||
|
||||
coordinator.checkpointComplete(1);
|
||||
String inflight = coordinator.getWriteClient()
|
||||
.getInflightAndRequestedInstant("COPY_ON_WRITE");
|
||||
String lastCompleted = coordinator.getWriteClient().getLastCompletedInstant("COPY_ON_WRITE");
|
||||
assertThat("Instant should be complete", lastCompleted, is(instant));
|
||||
assertNotEquals("", inflight, "Should start a new instant");
|
||||
assertNotEquals(instant, inflight, "Should start a new instant");
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testTableInitialized() throws IOException {
|
||||
final org.apache.hadoop.conf.Configuration hadoopConf = StreamerUtil.getHadoopConf();
|
||||
@@ -88,14 +122,14 @@ public class StreamWriteOperatorCoordinatorTest {
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testCheckpointInvalid() {
|
||||
public void testCheckpointCompleteWithRetry() {
|
||||
final CompletableFuture<byte[]> future = new CompletableFuture<>();
|
||||
coordinator.checkpointCoordinator(1, future);
|
||||
String inflightInstant = coordinator.getInFlightInstant();
|
||||
String inflightInstant = coordinator.getInstant();
|
||||
OperatorEvent event = new BatchWriteSuccessEvent(0, inflightInstant, Collections.emptyList());
|
||||
coordinator.handleEventFromOperator(0, event);
|
||||
final CompletableFuture<byte[]> future2 = new CompletableFuture<>();
|
||||
coordinator.checkpointCoordinator(2, future2);
|
||||
assertTrue(future2.isCompletedExceptionally());
|
||||
assertThrows(HoodieException.class,
|
||||
() -> coordinator.checkpointComplete(1),
|
||||
"Try 3 to commit instant");
|
||||
}
|
||||
}
|
||||
|
||||
@@ -38,6 +38,8 @@ import org.apache.flink.streaming.api.operators.collect.utils.MockOperatorEventG
|
||||
import org.apache.flink.table.data.RowData;
|
||||
import org.apache.flink.util.Collector;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.CompletableFuture;
|
||||
|
||||
/**
|
||||
@@ -77,11 +79,11 @@ public class StreamWriteFunctionWrapper<I> {
|
||||
this.conf = conf;
|
||||
// one function
|
||||
this.coordinator = new StreamWriteOperatorCoordinator(conf, 1);
|
||||
this.coordinator.start();
|
||||
this.functionInitializationContext = new MockFunctionInitializationContext();
|
||||
}
|
||||
|
||||
public void openFunction() throws Exception {
|
||||
this.coordinator.start();
|
||||
toHoodieFunction = new RowDataToHoodieFunction<>(TestConfigurations.ROW_TYPE, conf);
|
||||
toHoodieFunction.setRuntimeContext(runtimeContext);
|
||||
toHoodieFunction.open(conf);
|
||||
@@ -123,6 +125,10 @@ public class StreamWriteFunctionWrapper<I> {
|
||||
return this.gateway.getNextEvent();
|
||||
}
|
||||
|
||||
public Map<String, List<HoodieRecord>> getDataBuffer() {
|
||||
return this.writeFunction.getBuffer();
|
||||
}
|
||||
|
||||
@SuppressWarnings("rawtypes")
|
||||
public HoodieFlinkWriteClient getWriteClient() {
|
||||
return this.writeFunction.getWriteClient();
|
||||
@@ -141,6 +147,7 @@ public class StreamWriteFunctionWrapper<I> {
|
||||
functionInitializationContext.getOperatorStateStore().checkpointSuccess(checkpointId);
|
||||
coordinator.checkpointComplete(checkpointId);
|
||||
this.bucketAssignerFunction.notifyCheckpointComplete(checkpointId);
|
||||
this.writeFunction.notifyCheckpointComplete(checkpointId);
|
||||
}
|
||||
|
||||
public void checkpointFails(long checkpointId) {
|
||||
|
||||
Reference in New Issue
Block a user