diff --git a/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java b/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java index 852639293..4c5d309c4 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java +++ b/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java @@ -305,6 +305,22 @@ public class FlinkOptions { .defaultValue(100) // default 100 MB .withDescription("Max memory in MB for merge, default 100MB"); + public static final ConfigOption WRITE_EXACTLY_ONCE_ENABLED = ConfigOptions + .key("write.exactly_once.enabled") + .booleanType() + .defaultValue(false) // default at least once + .withDescription("Whether write in exactly_once semantics, if true,\n" + + "the write task would block flushing after it finishes a checkpoint\n" + + "until it receives the checkpoint success event, default false"); + + // this is only for internal use + public static final ConfigOption WRITE_COMMIT_ACK_TIMEOUT = ConfigOptions + .key("write.commit.ack.timeout") + .longType() + .defaultValue(-1L) // default at least once + .withDescription("Timeout limit for a writer task after it finishes a checkpoint and\n" + + "waits for the instant commit success, only for internal use"); + // ------------------------------------------------------------------------ // Compaction Options // ------------------------------------------------------------------------ diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java b/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java index a1b3346c0..7679f2c24 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java +++ b/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java @@ -30,6 +30,7 @@ import org.apache.hudi.common.util.CommitUtils; import org.apache.hudi.common.util.ObjectSizeCalculator; import org.apache.hudi.common.util.ValidationUtils; import org.apache.hudi.configuration.FlinkOptions; +import org.apache.hudi.exception.HoodieException; import org.apache.hudi.index.HoodieIndex; import org.apache.hudi.sink.event.BatchWriteSuccessEvent; import org.apache.hudi.table.action.commit.FlinkWriteHelper; @@ -54,7 +55,9 @@ import java.util.HashMap; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.Random; +import java.util.concurrent.TimeUnit; import java.util.function.BiFunction; import java.util.stream.Collectors; @@ -145,6 +148,27 @@ public class StreamWriteFunction */ private transient TotalSizeTracer tracer; + /** + * Whether write in exactly-once semantics. + */ + private boolean exactlyOnce; + + /** + * Flag saying whether the write task is waiting for the checkpoint success notification + * after it finished a checkpoint. + * + *

The flag is needed because the write task does not block during the waiting time interval, + * some data buckets still flush out with old instant time. There are two cases that the flush may produce + * corrupted files if the old instant is committed successfully: + * 1) the write handle was writing data but interrupted, left a corrupted parquet file; + * 2) the write handle finished the write but was not closed, left an empty parquet file. + * + *

To solve, when this flag was set to true, we block the data flushing thus the #processElement method, + * the flag was reset to false if the task receives the checkpoint success event or the latest inflight instant + * time changed(the last instant committed successfully). + */ + private volatile boolean confirming = false; + /** * Constructs a StreamingSinkFunction. * @@ -162,6 +186,7 @@ public class StreamWriteFunction WriteOperationType.fromValue(config.getString(FlinkOptions.OPERATION)), HoodieTableType.valueOf(config.getString(FlinkOptions.TABLE_TYPE))); this.tracer = new TotalSizeTracer(this.config); + this.exactlyOnce = config.getBoolean(FlinkOptions.WRITE_EXACTLY_ONCE_ENABLED); initBuffer(); initWriteFunction(); } @@ -225,6 +250,11 @@ public class StreamWriteFunction return writeClient; } + @VisibleForTesting + public boolean isConfirming() { + return this.confirming; + } + public void setOperatorEventGateway(OperatorEventGateway operatorEventGateway) { this.eventGateway = operatorEventGateway; } @@ -458,7 +488,7 @@ public class StreamWriteFunction @SuppressWarnings("unchecked, rawtypes") private void flushBucket(DataBucket bucket) { - final String instant = this.writeClient.getLastPendingInstant(this.actionType); + String instant = this.writeClient.getLastPendingInstant(this.actionType); if (instant == null) { // in case there are empty checkpoints that has no input data @@ -466,6 +496,31 @@ public class StreamWriteFunction return; } + // if exactly-once semantics turns on, + // waits for the checkpoint notification until the checkpoint timeout threshold hits. + if (exactlyOnce && confirming) { + long waitingTime = 0L; + long ckpTimeout = config.getLong(FlinkOptions.WRITE_COMMIT_ACK_TIMEOUT); + long interval = 500L; + while (Objects.equals(instant, this.currentInstant)) { + // sleep for a while + try { + if (waitingTime > ckpTimeout) { + throw new HoodieException("Timeout(" + waitingTime + "ms) while waiting for instant " + instant + " to commit"); + } + TimeUnit.MILLISECONDS.sleep(interval); + waitingTime += interval; + } catch (InterruptedException e) { + throw new HoodieException("Error while waiting for instant " + instant + " to commit", e); + } + // refresh the inflight instant + instant = this.writeClient.getLastPendingInstant(this.actionType); + } + // the inflight instant changed, which means the last instant was committed + // successfully. + confirming = false; + } + List records = bucket.writeBuffer(); ValidationUtils.checkState(records.size() > 0, "Data bucket to flush has no buffering records"); if (config.getBoolean(FlinkOptions.INSERT_DROP_DUPS)) { @@ -522,5 +577,6 @@ public class StreamWriteFunction this.eventGateway.sendEventToCoordinator(event); this.buckets.clear(); this.tracer.reset(); + this.confirming = true; } } diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketAssignFunction.java b/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketAssignFunction.java index 4c51275a5..54a4a62eb 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketAssignFunction.java +++ b/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketAssignFunction.java @@ -58,6 +58,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.List; +import java.util.Objects; /** * The function to build the write profile incrementally for records within a checkpoint, @@ -197,7 +198,7 @@ public class BucketAssignFunction> if (isChangingRecords && indexState.contains(recordKey)) { // Set up the instant time as "U" to mark the bucket as an update bucket. HoodieRecordGlobalLocation oldLoc = this.indexState.get(recordKey); - if (!StreamerUtil.equal(oldLoc.getPartitionPath(), partitionPath)) { + if (!Objects.equals(oldLoc.getPartitionPath(), partitionPath)) { if (globalIndex) { // if partition path changes, emit a delete record for old partition path, // then update the index state using location with new partition path. diff --git a/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSink.java b/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSink.java index c3427f364..b4109b3d5 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSink.java +++ b/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSink.java @@ -68,6 +68,9 @@ public class HoodieTableSink implements DynamicTableSink, SupportsPartitioning, // Read from kafka source RowType rowType = (RowType) schema.toRowDataType().notNull().getLogicalType(); int numWriteTasks = conf.getInteger(FlinkOptions.WRITE_TASKS); + long ckpTimeout = dataStream.getExecutionEnvironment() + .getCheckpointConfig().getCheckpointTimeout(); + conf.setLong(FlinkOptions.WRITE_COMMIT_ACK_TIMEOUT, ckpTimeout); StreamWriteOperatorFactory operatorFactory = new StreamWriteOperatorFactory<>(conf); DataStream pipeline = dataStream diff --git a/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java b/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java index 6f69101d3..c33d635ae 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java +++ b/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java @@ -58,8 +58,6 @@ import org.apache.kafka.clients.consumer.ConsumerConfig; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import javax.annotation.Nullable; - import java.io.BufferedReader; import java.io.IOException; import java.io.StringReader; @@ -336,11 +334,4 @@ public class StreamerUtil { long oldTime = Long.parseLong(oldInstant); return String.valueOf(oldTime + milliseconds); } - - /** - * Copied from Objects#equal. - */ - public static boolean equal(@Nullable Object a, @Nullable Object b) { - return a == b || (a != null && a.equals(b)); - } } diff --git a/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteCopyOnWrite.java b/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteCopyOnWrite.java index d2d04ee27..247e17e6c 100644 --- a/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteCopyOnWrite.java +++ b/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteCopyOnWrite.java @@ -25,6 +25,7 @@ import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieTableType; import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.configuration.FlinkOptions; +import org.apache.hudi.exception.HoodieException; import org.apache.hudi.sink.event.BatchWriteSuccessEvent; import org.apache.hudi.sink.utils.StreamWriteFunctionWrapper; import org.apache.hudi.utils.TestConfigurations; @@ -52,6 +53,7 @@ import static org.hamcrest.MatcherAssert.assertThat; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertNotEquals; import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; /** @@ -635,6 +637,55 @@ public class TestWriteCopyOnWrite { checkWrittenData(tempFile, EXPECTED2); } + @Test + public void testWriteExactlyOnce() throws Exception { + // reset the config option + conf.setBoolean(FlinkOptions.WRITE_EXACTLY_ONCE_ENABLED, true); + conf.setLong(FlinkOptions.WRITE_COMMIT_ACK_TIMEOUT, 3); + conf.setDouble(FlinkOptions.WRITE_TASK_MAX_SIZE, 200.0006); // 630 bytes buffer size + funcWrapper = new StreamWriteFunctionWrapper<>(tempFile.getAbsolutePath(), conf); + + // open the function and ingest data + + funcWrapper.openFunction(); + for (RowData rowData : TestData.DATA_SET_INSERT) { + funcWrapper.invoke(rowData); + } + + // no checkpoint, so the coordinator does not accept any events + assertTrue( + funcWrapper.getEventBuffer().length == 1 + && funcWrapper.getEventBuffer()[0] == null, "The coordinator events buffer expect to be empty"); + + // this triggers the data write and event send + funcWrapper.checkpointFunction(1); + assertTrue(funcWrapper.isConforming(), "The write function should be waiting for the instant to commit"); + + for (int i = 0; i < 2; i++) { + final OperatorEvent event = funcWrapper.getNextEvent(); // remove the first event first + assertThat("The operator expect to send an event", event, instanceOf(BatchWriteSuccessEvent.class)); + funcWrapper.getCoordinator().handleEventFromOperator(0, event); + } + + funcWrapper.checkpointComplete(1); + + for (RowData rowData : TestData.DATA_SET_INSERT) { + funcWrapper.invoke(rowData); + } + + assertFalse(funcWrapper.isConforming(), "The write function should finish waiting for the instant to commit"); + + // checkpoint for the next round, when there is eager flush but the write + // task is waiting for the instant commit ack, should throw for timeout. + funcWrapper.checkpointFunction(2); + + assertThrows(HoodieException.class, () -> { + for (RowData rowData : TestData.DATA_SET_INSERT) { + funcWrapper.invoke(rowData); + } + }, "Timeout(500ms) while waiting for instant"); + } + // ------------------------------------------------------------------------- // Utilities // ------------------------------------------------------------------------- diff --git a/hudi-flink/src/test/java/org/apache/hudi/sink/utils/StreamWriteFunctionWrapper.java b/hudi-flink/src/test/java/org/apache/hudi/sink/utils/StreamWriteFunctionWrapper.java index a4b6c16a3..9f8852f28 100644 --- a/hudi-flink/src/test/java/org/apache/hudi/sink/utils/StreamWriteFunctionWrapper.java +++ b/hudi-flink/src/test/java/org/apache/hudi/sink/utils/StreamWriteFunctionWrapper.java @@ -197,4 +197,8 @@ public class StreamWriteFunctionWrapper { public boolean isKeyInState(HoodieKey hoodieKey) { return this.bucketAssignerFunction.isKeyInState(hoodieKey); } + + public boolean isConforming() { + return this.writeFunction.isConfirming(); + } }