From cdb9b48170ef98634babd8954392efb1c1b90fcf Mon Sep 17 00:00:00 2001 From: Danny Chan Date: Fri, 18 Jun 2021 13:55:23 +0800 Subject: [PATCH] [HUDI-2040] Make flink writer as exactly-once by default (#3106) --- .../hudi/configuration/FlinkOptions.java | 8 ----- .../apache/hudi/sink/StreamWriteFunction.java | 17 ++--------- .../sink/StreamWriteOperatorCoordinator.java | 10 +++++++ .../sink/compact/CompactionPlanOperator.java | 30 +++++-------------- .../hudi/sink/TestWriteCopyOnWrite.java | 1 - .../utils/StreamWriteFunctionWrapper.java | 17 ++--------- 6 files changed, 23 insertions(+), 60 deletions(-) diff --git a/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java b/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java index b14ef6e2b..feef0f3cc 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java +++ b/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java @@ -315,14 +315,6 @@ public class FlinkOptions { .defaultValue(100) // default 100 MB .withDescription("Max memory in MB for merge, default 100MB"); - public static final ConfigOption WRITE_EXACTLY_ONCE_ENABLED = ConfigOptions - .key("write.exactly_once.enabled") - .booleanType() - .defaultValue(false) // default at least once - .withDescription("Whether write in exactly_once semantics, if true,\n" - + "the write task would block flushing after it finishes a checkpoint\n" - + "until it receives the checkpoint success event, default false"); - // this is only for internal use public static final ConfigOption WRITE_COMMIT_ACK_TIMEOUT = ConfigOptions .key("write.commit.ack.timeout") diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java b/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java index 7b1653853..965e5737e 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java +++ b/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java @@ -37,7 +37,6 @@ import org.apache.hudi.table.action.commit.FlinkWriteHelper; import org.apache.hudi.util.StreamerUtil; import org.apache.flink.annotation.VisibleForTesting; -import org.apache.flink.api.common.state.CheckpointListener; import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.operators.coordination.OperatorEventGateway; import org.apache.flink.runtime.state.FunctionInitializationContext; @@ -100,7 +99,7 @@ import java.util.stream.Collectors; */ public class StreamWriteFunction extends KeyedProcessFunction - implements CheckpointedFunction, CheckpointListener { + implements CheckpointedFunction { private static final long serialVersionUID = 1L; @@ -148,11 +147,6 @@ public class StreamWriteFunction */ private transient TotalSizeTracer tracer; - /** - * Whether write in exactly-once semantics. - */ - private boolean exactlyOnce; - /** * Flag saying whether the write task is waiting for the checkpoint success notification * after it finished a checkpoint. @@ -186,7 +180,6 @@ public class StreamWriteFunction WriteOperationType.fromValue(config.getString(FlinkOptions.OPERATION)), HoodieTableType.valueOf(config.getString(FlinkOptions.TABLE_TYPE))); this.tracer = new TotalSizeTracer(this.config); - this.exactlyOnce = config.getBoolean(FlinkOptions.WRITE_EXACTLY_ONCE_ENABLED); initBuffer(); initWriteFunction(); } @@ -217,11 +210,6 @@ public class StreamWriteFunction } } - @Override - public void notifyCheckpointComplete(long checkpointId) { - this.writeClient.cleanHandles(); - } - /** * End input action for batch source. */ @@ -502,7 +490,7 @@ public class StreamWriteFunction // if exactly-once semantics turns on, // waits for the checkpoint notification until the checkpoint timeout threshold hits. - if (exactlyOnce && confirming) { + if (confirming) { long waitingTime = 0L; long ckpTimeout = config.getLong(FlinkOptions.WRITE_COMMIT_ACK_TIMEOUT); long interval = 500L; @@ -583,6 +571,7 @@ public class StreamWriteFunction this.eventGateway.sendEventToCoordinator(event); this.buckets.clear(); this.tracer.reset(); + this.writeClient.cleanHandles(); this.confirming = true; } } diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java b/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java index faa5264d9..7f6f8163c 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java +++ b/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java @@ -104,6 +104,11 @@ public class StreamWriteOperatorCoordinator */ private final int parallelism; + /** + * Whether to schedule asynchronous compaction task on finished checkpoints. + */ + private final boolean asyncCompaction; + /** * A single-thread executor to handle all the asynchronous jobs of the coordinator. */ @@ -136,6 +141,7 @@ public class StreamWriteOperatorCoordinator this.conf = conf; this.context = context; this.parallelism = context.currentParallelism(); + this.asyncCompaction = StreamerUtil.needsAsyncCompaction(conf); } @Override @@ -197,6 +203,10 @@ public class StreamWriteOperatorCoordinator // so a successful checkpoint subsumes the old one(follows the checkpoint subsuming contract) final boolean committed = commitInstant(); if (committed) { + // if async compaction is on, schedule the compaction + if (asyncCompaction) { + writeClient.scheduleCompaction(Option.empty()); + } // start new instant. startInstant(); // sync Hive if is enabled diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactionPlanOperator.java b/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactionPlanOperator.java index af9bae766..ad3dd577a 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactionPlanOperator.java +++ b/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactionPlanOperator.java @@ -21,13 +21,11 @@ package org.apache.hudi.sink.compact; import org.apache.hudi.avro.model.HoodieCompactionPlan; import org.apache.hudi.client.HoodieFlinkWriteClient; import org.apache.hudi.common.model.CompactionOperation; -import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.table.timeline.HoodieTimeline; import org.apache.hudi.common.util.CompactionUtils; import org.apache.hudi.common.util.Option; import org.apache.hudi.table.HoodieFlinkTable; -import org.apache.hudi.util.CompactionUtil; import org.apache.hudi.util.StreamerUtil; import org.apache.flink.annotation.VisibleForTesting; @@ -92,17 +90,18 @@ public class CompactionPlanOperator extends AbstractStreamOperator table = writeClient.getHoodieTable(); - boolean scheduled = writeClient.scheduleCompactionAtInstant(compactionInstantTime, Option.empty()); - if (!scheduled) { + // the last instant takes the highest priority. + Option lastRequested = table.getActiveTimeline().filterPendingCompactionTimeline() + .filter(instant -> instant.getState() == HoodieInstant.State.REQUESTED).lastInstant(); + if (!lastRequested.isPresent()) { // do nothing. LOG.info("No compaction plan for checkpoint " + checkpointId); return; } + String compactionInstantTime = lastRequested.get().getTimestamp(); if (this.compactionInstantTime != null && Objects.equals(this.compactionInstantTime, compactionInstantTime)) { // do nothing @@ -110,7 +109,6 @@ public class CompactionPlanOperator extends AbstractStreamOperator table = writeClient.getHoodieTable(); // generate compaction plan // should support configurable commit metadata HoodieCompactionPlan compactionPlan = CompactionUtils.getCompactionPlan( @@ -123,27 +121,13 @@ public class CompactionPlanOperator extends AbstractStreamOperator operations = compactionPlan.getOperations().stream() .map(CompactionOperation::convertFromAvroRecordInstance).collect(toList()); - LOG.info("CompactionPlanFunction compacting " + operations + " files"); + LOG.info("CompactionPlanOperator compacting " + operations + " files"); for (CompactionOperation operation : operations) { output.collect(new StreamRecord<>(new CompactionPlanEvent(compactionInstantTime, operation))); } diff --git a/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteCopyOnWrite.java b/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteCopyOnWrite.java index d150d5346..e1cb99e1c 100644 --- a/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteCopyOnWrite.java +++ b/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteCopyOnWrite.java @@ -651,7 +651,6 @@ public class TestWriteCopyOnWrite { @Test public void testWriteExactlyOnce() throws Exception { // reset the config option - conf.setBoolean(FlinkOptions.WRITE_EXACTLY_ONCE_ENABLED, true); conf.setLong(FlinkOptions.WRITE_COMMIT_ACK_TIMEOUT, 3); conf.setDouble(FlinkOptions.WRITE_TASK_MAX_SIZE, 200.0006); // 630 bytes buffer size funcWrapper = new StreamWriteFunctionWrapper<>(tempFile.getAbsolutePath(), conf); diff --git a/hudi-flink/src/test/java/org/apache/hudi/sink/utils/StreamWriteFunctionWrapper.java b/hudi-flink/src/test/java/org/apache/hudi/sink/utils/StreamWriteFunctionWrapper.java index e0a0f4b6e..84ba9da91 100644 --- a/hudi-flink/src/test/java/org/apache/hudi/sink/utils/StreamWriteFunctionWrapper.java +++ b/hudi-flink/src/test/java/org/apache/hudi/sink/utils/StreamWriteFunctionWrapper.java @@ -23,10 +23,10 @@ import org.apache.hudi.common.model.HoodieKey; import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.configuration.FlinkOptions; import org.apache.hudi.exception.HoodieException; -import org.apache.hudi.sink.bootstrap.BootstrapFunction; -import org.apache.hudi.sink.bootstrap.IndexRecord; import org.apache.hudi.sink.StreamWriteFunction; import org.apache.hudi.sink.StreamWriteOperatorCoordinator; +import org.apache.hudi.sink.bootstrap.BootstrapFunction; +import org.apache.hudi.sink.bootstrap.IndexRecord; import org.apache.hudi.sink.event.BatchWriteSuccessEvent; import org.apache.hudi.sink.partitioner.BucketAssignFunction; import org.apache.hudi.sink.partitioner.BucketAssignOperator; @@ -48,13 +48,12 @@ import org.apache.flink.streaming.api.operators.collect.utils.MockOperatorEventG import org.apache.flink.table.data.RowData; import org.apache.flink.util.Collector; -import java.util.HashSet; import java.util.ArrayList; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.TimeUnit; /** * A wrapper class to manipulate the {@link StreamWriteFunction} instance for testing. @@ -213,18 +212,8 @@ public class StreamWriteFunctionWrapper { public void checkpointComplete(long checkpointId) { functionInitializationContext.getOperatorStateStore().checkpointSuccess(checkpointId); - if (asyncCompaction) { - // sleep for a while to give a change for scheduling compaction, - // see HoodieActiveTimeline#createNewInstantTime for details. - try { - TimeUnit.SECONDS.sleep(2); - } catch (InterruptedException e) { - throw new HoodieException("Waiting for checkpoint success exception", e); - } - } coordinator.notifyCheckpointComplete(checkpointId); this.bucketAssignerFunction.notifyCheckpointComplete(checkpointId); - this.writeFunction.notifyCheckpointComplete(checkpointId); if (asyncCompaction) { try { compactFunctionWrapper.compact(checkpointId);