1
0

[FLINK-1923] Exactly-once write for flink writer (#3002)

Co-authored-by: 喻兆靖 <yuzhaojing@bilibili.com>
This commit is contained in:
yuzhaojing
2021-05-28 14:58:21 +08:00
committed by GitHub
parent 7fed7352bd
commit bc18c39835
7 changed files with 133 additions and 11 deletions

View File

@@ -305,6 +305,22 @@ public class FlinkOptions {
.defaultValue(100) // default 100 MB
.withDescription("Max memory in MB for merge, default 100MB");
public static final ConfigOption<Boolean> WRITE_EXACTLY_ONCE_ENABLED = ConfigOptions
.key("write.exactly_once.enabled")
.booleanType()
.defaultValue(false) // default at least once
.withDescription("Whether write in exactly_once semantics, if true,\n"
+ "the write task would block flushing after it finishes a checkpoint\n"
+ "until it receives the checkpoint success event, default false");
// this is only for internal use
public static final ConfigOption<Long> WRITE_COMMIT_ACK_TIMEOUT = ConfigOptions
.key("write.commit.ack.timeout")
.longType()
.defaultValue(-1L) // default at least once
.withDescription("Timeout limit for a writer task after it finishes a checkpoint and\n"
+ "waits for the instant commit success, only for internal use");
// ------------------------------------------------------------------------
// Compaction Options
// ------------------------------------------------------------------------

View File

@@ -30,6 +30,7 @@ import org.apache.hudi.common.util.CommitUtils;
import org.apache.hudi.common.util.ObjectSizeCalculator;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.index.HoodieIndex;
import org.apache.hudi.sink.event.BatchWriteSuccessEvent;
import org.apache.hudi.table.action.commit.FlinkWriteHelper;
@@ -54,7 +55,9 @@ import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Random;
import java.util.concurrent.TimeUnit;
import java.util.function.BiFunction;
import java.util.stream.Collectors;
@@ -145,6 +148,27 @@ public class StreamWriteFunction<K, I, O>
*/
private transient TotalSizeTracer tracer;
/**
* Whether write in exactly-once semantics.
*/
private boolean exactlyOnce;
/**
* Flag saying whether the write task is waiting for the checkpoint success notification
* after it finished a checkpoint.
*
* <p>The flag is needed because the write task does not block during the waiting time interval,
* some data buckets still flush out with old instant time. There are two cases that the flush may produce
* corrupted files if the old instant is committed successfully:
* 1) the write handle was writing data but interrupted, left a corrupted parquet file;
* 2) the write handle finished the write but was not closed, left an empty parquet file.
*
* <p>To solve, when this flag was set to true, we block the data flushing thus the #processElement method,
* the flag was reset to false if the task receives the checkpoint success event or the latest inflight instant
* time changed(the last instant committed successfully).
*/
private volatile boolean confirming = false;
/**
* Constructs a StreamingSinkFunction.
*
@@ -162,6 +186,7 @@ public class StreamWriteFunction<K, I, O>
WriteOperationType.fromValue(config.getString(FlinkOptions.OPERATION)),
HoodieTableType.valueOf(config.getString(FlinkOptions.TABLE_TYPE)));
this.tracer = new TotalSizeTracer(this.config);
this.exactlyOnce = config.getBoolean(FlinkOptions.WRITE_EXACTLY_ONCE_ENABLED);
initBuffer();
initWriteFunction();
}
@@ -225,6 +250,11 @@ public class StreamWriteFunction<K, I, O>
return writeClient;
}
@VisibleForTesting
public boolean isConfirming() {
return this.confirming;
}
public void setOperatorEventGateway(OperatorEventGateway operatorEventGateway) {
this.eventGateway = operatorEventGateway;
}
@@ -458,7 +488,7 @@ public class StreamWriteFunction<K, I, O>
@SuppressWarnings("unchecked, rawtypes")
private void flushBucket(DataBucket bucket) {
final String instant = this.writeClient.getLastPendingInstant(this.actionType);
String instant = this.writeClient.getLastPendingInstant(this.actionType);
if (instant == null) {
// in case there are empty checkpoints that has no input data
@@ -466,6 +496,31 @@ public class StreamWriteFunction<K, I, O>
return;
}
// if exactly-once semantics turns on,
// waits for the checkpoint notification until the checkpoint timeout threshold hits.
if (exactlyOnce && confirming) {
long waitingTime = 0L;
long ckpTimeout = config.getLong(FlinkOptions.WRITE_COMMIT_ACK_TIMEOUT);
long interval = 500L;
while (Objects.equals(instant, this.currentInstant)) {
// sleep for a while
try {
if (waitingTime > ckpTimeout) {
throw new HoodieException("Timeout(" + waitingTime + "ms) while waiting for instant " + instant + " to commit");
}
TimeUnit.MILLISECONDS.sleep(interval);
waitingTime += interval;
} catch (InterruptedException e) {
throw new HoodieException("Error while waiting for instant " + instant + " to commit", e);
}
// refresh the inflight instant
instant = this.writeClient.getLastPendingInstant(this.actionType);
}
// the inflight instant changed, which means the last instant was committed
// successfully.
confirming = false;
}
List<HoodieRecord> records = bucket.writeBuffer();
ValidationUtils.checkState(records.size() > 0, "Data bucket to flush has no buffering records");
if (config.getBoolean(FlinkOptions.INSERT_DROP_DUPS)) {
@@ -522,5 +577,6 @@ public class StreamWriteFunction<K, I, O>
this.eventGateway.sendEventToCoordinator(event);
this.buckets.clear();
this.tracer.reset();
this.confirming = true;
}
}

View File

@@ -58,6 +58,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.List;
import java.util.Objects;
/**
* The function to build the write profile incrementally for records within a checkpoint,
@@ -197,7 +198,7 @@ public class BucketAssignFunction<K, I, O extends HoodieRecord<?>>
if (isChangingRecords && indexState.contains(recordKey)) {
// Set up the instant time as "U" to mark the bucket as an update bucket.
HoodieRecordGlobalLocation oldLoc = this.indexState.get(recordKey);
if (!StreamerUtil.equal(oldLoc.getPartitionPath(), partitionPath)) {
if (!Objects.equals(oldLoc.getPartitionPath(), partitionPath)) {
if (globalIndex) {
// if partition path changes, emit a delete record for old partition path,
// then update the index state using location with new partition path.

View File

@@ -68,6 +68,9 @@ public class HoodieTableSink implements DynamicTableSink, SupportsPartitioning,
// Read from kafka source
RowType rowType = (RowType) schema.toRowDataType().notNull().getLogicalType();
int numWriteTasks = conf.getInteger(FlinkOptions.WRITE_TASKS);
long ckpTimeout = dataStream.getExecutionEnvironment()
.getCheckpointConfig().getCheckpointTimeout();
conf.setLong(FlinkOptions.WRITE_COMMIT_ACK_TIMEOUT, ckpTimeout);
StreamWriteOperatorFactory<HoodieRecord> operatorFactory = new StreamWriteOperatorFactory<>(conf);
DataStream<Object> pipeline = dataStream

View File

@@ -58,8 +58,6 @@ import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.annotation.Nullable;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.StringReader;
@@ -336,11 +334,4 @@ public class StreamerUtil {
long oldTime = Long.parseLong(oldInstant);
return String.valueOf(oldTime + milliseconds);
}
/**
* Copied from Objects#equal.
*/
public static boolean equal(@Nullable Object a, @Nullable Object b) {
return a == b || (a != null && a.equals(b));
}
}