[HUDI-2040] Make flink writer as exactly-once by default (#3106)
This commit is contained in:
@@ -315,14 +315,6 @@ public class FlinkOptions {
|
|||||||
.defaultValue(100) // default 100 MB
|
.defaultValue(100) // default 100 MB
|
||||||
.withDescription("Max memory in MB for merge, default 100MB");
|
.withDescription("Max memory in MB for merge, default 100MB");
|
||||||
|
|
||||||
public static final ConfigOption<Boolean> WRITE_EXACTLY_ONCE_ENABLED = ConfigOptions
|
|
||||||
.key("write.exactly_once.enabled")
|
|
||||||
.booleanType()
|
|
||||||
.defaultValue(false) // default at least once
|
|
||||||
.withDescription("Whether write in exactly_once semantics, if true,\n"
|
|
||||||
+ "the write task would block flushing after it finishes a checkpoint\n"
|
|
||||||
+ "until it receives the checkpoint success event, default false");
|
|
||||||
|
|
||||||
// this is only for internal use
|
// this is only for internal use
|
||||||
public static final ConfigOption<Long> WRITE_COMMIT_ACK_TIMEOUT = ConfigOptions
|
public static final ConfigOption<Long> WRITE_COMMIT_ACK_TIMEOUT = ConfigOptions
|
||||||
.key("write.commit.ack.timeout")
|
.key("write.commit.ack.timeout")
|
||||||
|
|||||||
@@ -37,7 +37,6 @@ import org.apache.hudi.table.action.commit.FlinkWriteHelper;
|
|||||||
import org.apache.hudi.util.StreamerUtil;
|
import org.apache.hudi.util.StreamerUtil;
|
||||||
|
|
||||||
import org.apache.flink.annotation.VisibleForTesting;
|
import org.apache.flink.annotation.VisibleForTesting;
|
||||||
import org.apache.flink.api.common.state.CheckpointListener;
|
|
||||||
import org.apache.flink.configuration.Configuration;
|
import org.apache.flink.configuration.Configuration;
|
||||||
import org.apache.flink.runtime.operators.coordination.OperatorEventGateway;
|
import org.apache.flink.runtime.operators.coordination.OperatorEventGateway;
|
||||||
import org.apache.flink.runtime.state.FunctionInitializationContext;
|
import org.apache.flink.runtime.state.FunctionInitializationContext;
|
||||||
@@ -100,7 +99,7 @@ import java.util.stream.Collectors;
|
|||||||
*/
|
*/
|
||||||
public class StreamWriteFunction<K, I, O>
|
public class StreamWriteFunction<K, I, O>
|
||||||
extends KeyedProcessFunction<K, I, O>
|
extends KeyedProcessFunction<K, I, O>
|
||||||
implements CheckpointedFunction, CheckpointListener {
|
implements CheckpointedFunction {
|
||||||
|
|
||||||
private static final long serialVersionUID = 1L;
|
private static final long serialVersionUID = 1L;
|
||||||
|
|
||||||
@@ -148,11 +147,6 @@ public class StreamWriteFunction<K, I, O>
|
|||||||
*/
|
*/
|
||||||
private transient TotalSizeTracer tracer;
|
private transient TotalSizeTracer tracer;
|
||||||
|
|
||||||
/**
|
|
||||||
* Whether write in exactly-once semantics.
|
|
||||||
*/
|
|
||||||
private boolean exactlyOnce;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Flag saying whether the write task is waiting for the checkpoint success notification
|
* Flag saying whether the write task is waiting for the checkpoint success notification
|
||||||
* after it finished a checkpoint.
|
* after it finished a checkpoint.
|
||||||
@@ -186,7 +180,6 @@ public class StreamWriteFunction<K, I, O>
|
|||||||
WriteOperationType.fromValue(config.getString(FlinkOptions.OPERATION)),
|
WriteOperationType.fromValue(config.getString(FlinkOptions.OPERATION)),
|
||||||
HoodieTableType.valueOf(config.getString(FlinkOptions.TABLE_TYPE)));
|
HoodieTableType.valueOf(config.getString(FlinkOptions.TABLE_TYPE)));
|
||||||
this.tracer = new TotalSizeTracer(this.config);
|
this.tracer = new TotalSizeTracer(this.config);
|
||||||
this.exactlyOnce = config.getBoolean(FlinkOptions.WRITE_EXACTLY_ONCE_ENABLED);
|
|
||||||
initBuffer();
|
initBuffer();
|
||||||
initWriteFunction();
|
initWriteFunction();
|
||||||
}
|
}
|
||||||
@@ -217,11 +210,6 @@ public class StreamWriteFunction<K, I, O>
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
|
||||||
public void notifyCheckpointComplete(long checkpointId) {
|
|
||||||
this.writeClient.cleanHandles();
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* End input action for batch source.
|
* End input action for batch source.
|
||||||
*/
|
*/
|
||||||
@@ -502,7 +490,7 @@ public class StreamWriteFunction<K, I, O>
|
|||||||
|
|
||||||
// if exactly-once semantics turns on,
|
// if exactly-once semantics turns on,
|
||||||
// waits for the checkpoint notification until the checkpoint timeout threshold hits.
|
// waits for the checkpoint notification until the checkpoint timeout threshold hits.
|
||||||
if (exactlyOnce && confirming) {
|
if (confirming) {
|
||||||
long waitingTime = 0L;
|
long waitingTime = 0L;
|
||||||
long ckpTimeout = config.getLong(FlinkOptions.WRITE_COMMIT_ACK_TIMEOUT);
|
long ckpTimeout = config.getLong(FlinkOptions.WRITE_COMMIT_ACK_TIMEOUT);
|
||||||
long interval = 500L;
|
long interval = 500L;
|
||||||
@@ -583,6 +571,7 @@ public class StreamWriteFunction<K, I, O>
|
|||||||
this.eventGateway.sendEventToCoordinator(event);
|
this.eventGateway.sendEventToCoordinator(event);
|
||||||
this.buckets.clear();
|
this.buckets.clear();
|
||||||
this.tracer.reset();
|
this.tracer.reset();
|
||||||
|
this.writeClient.cleanHandles();
|
||||||
this.confirming = true;
|
this.confirming = true;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -104,6 +104,11 @@ public class StreamWriteOperatorCoordinator
|
|||||||
*/
|
*/
|
||||||
private final int parallelism;
|
private final int parallelism;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Whether to schedule asynchronous compaction task on finished checkpoints.
|
||||||
|
*/
|
||||||
|
private final boolean asyncCompaction;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* A single-thread executor to handle all the asynchronous jobs of the coordinator.
|
* A single-thread executor to handle all the asynchronous jobs of the coordinator.
|
||||||
*/
|
*/
|
||||||
@@ -136,6 +141,7 @@ public class StreamWriteOperatorCoordinator
|
|||||||
this.conf = conf;
|
this.conf = conf;
|
||||||
this.context = context;
|
this.context = context;
|
||||||
this.parallelism = context.currentParallelism();
|
this.parallelism = context.currentParallelism();
|
||||||
|
this.asyncCompaction = StreamerUtil.needsAsyncCompaction(conf);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
@@ -197,6 +203,10 @@ public class StreamWriteOperatorCoordinator
|
|||||||
// so a successful checkpoint subsumes the old one(follows the checkpoint subsuming contract)
|
// so a successful checkpoint subsumes the old one(follows the checkpoint subsuming contract)
|
||||||
final boolean committed = commitInstant();
|
final boolean committed = commitInstant();
|
||||||
if (committed) {
|
if (committed) {
|
||||||
|
// if async compaction is on, schedule the compaction
|
||||||
|
if (asyncCompaction) {
|
||||||
|
writeClient.scheduleCompaction(Option.empty());
|
||||||
|
}
|
||||||
// start new instant.
|
// start new instant.
|
||||||
startInstant();
|
startInstant();
|
||||||
// sync Hive if is enabled
|
// sync Hive if is enabled
|
||||||
|
|||||||
@@ -21,13 +21,11 @@ package org.apache.hudi.sink.compact;
|
|||||||
import org.apache.hudi.avro.model.HoodieCompactionPlan;
|
import org.apache.hudi.avro.model.HoodieCompactionPlan;
|
||||||
import org.apache.hudi.client.HoodieFlinkWriteClient;
|
import org.apache.hudi.client.HoodieFlinkWriteClient;
|
||||||
import org.apache.hudi.common.model.CompactionOperation;
|
import org.apache.hudi.common.model.CompactionOperation;
|
||||||
import org.apache.hudi.common.table.HoodieTableMetaClient;
|
|
||||||
import org.apache.hudi.common.table.timeline.HoodieInstant;
|
import org.apache.hudi.common.table.timeline.HoodieInstant;
|
||||||
import org.apache.hudi.common.table.timeline.HoodieTimeline;
|
import org.apache.hudi.common.table.timeline.HoodieTimeline;
|
||||||
import org.apache.hudi.common.util.CompactionUtils;
|
import org.apache.hudi.common.util.CompactionUtils;
|
||||||
import org.apache.hudi.common.util.Option;
|
import org.apache.hudi.common.util.Option;
|
||||||
import org.apache.hudi.table.HoodieFlinkTable;
|
import org.apache.hudi.table.HoodieFlinkTable;
|
||||||
import org.apache.hudi.util.CompactionUtil;
|
|
||||||
import org.apache.hudi.util.StreamerUtil;
|
import org.apache.hudi.util.StreamerUtil;
|
||||||
|
|
||||||
import org.apache.flink.annotation.VisibleForTesting;
|
import org.apache.flink.annotation.VisibleForTesting;
|
||||||
@@ -92,17 +90,18 @@ public class CompactionPlanOperator extends AbstractStreamOperator<CompactionPla
|
|||||||
}
|
}
|
||||||
|
|
||||||
private void scheduleCompaction(long checkpointId) throws IOException {
|
private void scheduleCompaction(long checkpointId) throws IOException {
|
||||||
// if async compaction is on, schedule the compaction
|
HoodieFlinkTable<?> table = writeClient.getHoodieTable();
|
||||||
HoodieTableMetaClient metaClient = CompactionUtil.createMetaClient(conf);
|
|
||||||
final String compactionInstantTime = CompactionUtil.getCompactionInstantTime(metaClient);
|
|
||||||
|
|
||||||
boolean scheduled = writeClient.scheduleCompactionAtInstant(compactionInstantTime, Option.empty());
|
// the last instant takes the highest priority.
|
||||||
if (!scheduled) {
|
Option<HoodieInstant> lastRequested = table.getActiveTimeline().filterPendingCompactionTimeline()
|
||||||
|
.filter(instant -> instant.getState() == HoodieInstant.State.REQUESTED).lastInstant();
|
||||||
|
if (!lastRequested.isPresent()) {
|
||||||
// do nothing.
|
// do nothing.
|
||||||
LOG.info("No compaction plan for checkpoint " + checkpointId);
|
LOG.info("No compaction plan for checkpoint " + checkpointId);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
String compactionInstantTime = lastRequested.get().getTimestamp();
|
||||||
if (this.compactionInstantTime != null
|
if (this.compactionInstantTime != null
|
||||||
&& Objects.equals(this.compactionInstantTime, compactionInstantTime)) {
|
&& Objects.equals(this.compactionInstantTime, compactionInstantTime)) {
|
||||||
// do nothing
|
// do nothing
|
||||||
@@ -110,7 +109,6 @@ public class CompactionPlanOperator extends AbstractStreamOperator<CompactionPla
|
|||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
HoodieFlinkTable<?> table = writeClient.getHoodieTable();
|
|
||||||
// generate compaction plan
|
// generate compaction plan
|
||||||
// should support configurable commit metadata
|
// should support configurable commit metadata
|
||||||
HoodieCompactionPlan compactionPlan = CompactionUtils.getCompactionPlan(
|
HoodieCompactionPlan compactionPlan = CompactionUtils.getCompactionPlan(
|
||||||
@@ -123,27 +121,13 @@ public class CompactionPlanOperator extends AbstractStreamOperator<CompactionPla
|
|||||||
} else {
|
} else {
|
||||||
this.compactionInstantTime = compactionInstantTime;
|
this.compactionInstantTime = compactionInstantTime;
|
||||||
HoodieInstant instant = HoodieTimeline.getCompactionRequestedInstant(compactionInstantTime);
|
HoodieInstant instant = HoodieTimeline.getCompactionRequestedInstant(compactionInstantTime);
|
||||||
HoodieTimeline pendingCompactionTimeline = table.getActiveTimeline().filterPendingCompactionTimeline();
|
|
||||||
if (!pendingCompactionTimeline.containsInstant(instant)) {
|
|
||||||
// this means that the compaction plan was written to auxiliary path(.tmp)
|
|
||||||
// but not the meta path(.hoodie), this usually happens when the job crush
|
|
||||||
// exceptionally.
|
|
||||||
|
|
||||||
// clean the compaction plan in auxiliary path and cancels the compaction.
|
|
||||||
|
|
||||||
LOG.warn("The compaction plan was fetched through the auxiliary path(.tmp) but not the meta path(.hoodie).\n"
|
|
||||||
+ "Clean the compaction plan in auxiliary path and cancels the compaction");
|
|
||||||
CompactionUtil.cleanInstant(table.getMetaClient(), instant);
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
// Mark instant as compaction inflight
|
// Mark instant as compaction inflight
|
||||||
table.getActiveTimeline().transitionCompactionRequestedToInflight(instant);
|
table.getActiveTimeline().transitionCompactionRequestedToInflight(instant);
|
||||||
table.getMetaClient().reloadActiveTimeline();
|
table.getMetaClient().reloadActiveTimeline();
|
||||||
|
|
||||||
List<CompactionOperation> operations = compactionPlan.getOperations().stream()
|
List<CompactionOperation> operations = compactionPlan.getOperations().stream()
|
||||||
.map(CompactionOperation::convertFromAvroRecordInstance).collect(toList());
|
.map(CompactionOperation::convertFromAvroRecordInstance).collect(toList());
|
||||||
LOG.info("CompactionPlanFunction compacting " + operations + " files");
|
LOG.info("CompactionPlanOperator compacting " + operations + " files");
|
||||||
for (CompactionOperation operation : operations) {
|
for (CompactionOperation operation : operations) {
|
||||||
output.collect(new StreamRecord<>(new CompactionPlanEvent(compactionInstantTime, operation)));
|
output.collect(new StreamRecord<>(new CompactionPlanEvent(compactionInstantTime, operation)));
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -651,7 +651,6 @@ public class TestWriteCopyOnWrite {
|
|||||||
@Test
|
@Test
|
||||||
public void testWriteExactlyOnce() throws Exception {
|
public void testWriteExactlyOnce() throws Exception {
|
||||||
// reset the config option
|
// reset the config option
|
||||||
conf.setBoolean(FlinkOptions.WRITE_EXACTLY_ONCE_ENABLED, true);
|
|
||||||
conf.setLong(FlinkOptions.WRITE_COMMIT_ACK_TIMEOUT, 3);
|
conf.setLong(FlinkOptions.WRITE_COMMIT_ACK_TIMEOUT, 3);
|
||||||
conf.setDouble(FlinkOptions.WRITE_TASK_MAX_SIZE, 200.0006); // 630 bytes buffer size
|
conf.setDouble(FlinkOptions.WRITE_TASK_MAX_SIZE, 200.0006); // 630 bytes buffer size
|
||||||
funcWrapper = new StreamWriteFunctionWrapper<>(tempFile.getAbsolutePath(), conf);
|
funcWrapper = new StreamWriteFunctionWrapper<>(tempFile.getAbsolutePath(), conf);
|
||||||
|
|||||||
@@ -23,10 +23,10 @@ import org.apache.hudi.common.model.HoodieKey;
|
|||||||
import org.apache.hudi.common.model.HoodieRecord;
|
import org.apache.hudi.common.model.HoodieRecord;
|
||||||
import org.apache.hudi.configuration.FlinkOptions;
|
import org.apache.hudi.configuration.FlinkOptions;
|
||||||
import org.apache.hudi.exception.HoodieException;
|
import org.apache.hudi.exception.HoodieException;
|
||||||
import org.apache.hudi.sink.bootstrap.BootstrapFunction;
|
|
||||||
import org.apache.hudi.sink.bootstrap.IndexRecord;
|
|
||||||
import org.apache.hudi.sink.StreamWriteFunction;
|
import org.apache.hudi.sink.StreamWriteFunction;
|
||||||
import org.apache.hudi.sink.StreamWriteOperatorCoordinator;
|
import org.apache.hudi.sink.StreamWriteOperatorCoordinator;
|
||||||
|
import org.apache.hudi.sink.bootstrap.BootstrapFunction;
|
||||||
|
import org.apache.hudi.sink.bootstrap.IndexRecord;
|
||||||
import org.apache.hudi.sink.event.BatchWriteSuccessEvent;
|
import org.apache.hudi.sink.event.BatchWriteSuccessEvent;
|
||||||
import org.apache.hudi.sink.partitioner.BucketAssignFunction;
|
import org.apache.hudi.sink.partitioner.BucketAssignFunction;
|
||||||
import org.apache.hudi.sink.partitioner.BucketAssignOperator;
|
import org.apache.hudi.sink.partitioner.BucketAssignOperator;
|
||||||
@@ -48,13 +48,12 @@ import org.apache.flink.streaming.api.operators.collect.utils.MockOperatorEventG
|
|||||||
import org.apache.flink.table.data.RowData;
|
import org.apache.flink.table.data.RowData;
|
||||||
import org.apache.flink.util.Collector;
|
import org.apache.flink.util.Collector;
|
||||||
|
|
||||||
import java.util.HashSet;
|
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
|
import java.util.HashSet;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
import java.util.concurrent.CompletableFuture;
|
import java.util.concurrent.CompletableFuture;
|
||||||
import java.util.concurrent.TimeUnit;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* A wrapper class to manipulate the {@link StreamWriteFunction} instance for testing.
|
* A wrapper class to manipulate the {@link StreamWriteFunction} instance for testing.
|
||||||
@@ -213,18 +212,8 @@ public class StreamWriteFunctionWrapper<I> {
|
|||||||
|
|
||||||
public void checkpointComplete(long checkpointId) {
|
public void checkpointComplete(long checkpointId) {
|
||||||
functionInitializationContext.getOperatorStateStore().checkpointSuccess(checkpointId);
|
functionInitializationContext.getOperatorStateStore().checkpointSuccess(checkpointId);
|
||||||
if (asyncCompaction) {
|
|
||||||
// sleep for a while to give a change for scheduling compaction,
|
|
||||||
// see HoodieActiveTimeline#createNewInstantTime for details.
|
|
||||||
try {
|
|
||||||
TimeUnit.SECONDS.sleep(2);
|
|
||||||
} catch (InterruptedException e) {
|
|
||||||
throw new HoodieException("Waiting for checkpoint success exception", e);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
coordinator.notifyCheckpointComplete(checkpointId);
|
coordinator.notifyCheckpointComplete(checkpointId);
|
||||||
this.bucketAssignerFunction.notifyCheckpointComplete(checkpointId);
|
this.bucketAssignerFunction.notifyCheckpointComplete(checkpointId);
|
||||||
this.writeFunction.notifyCheckpointComplete(checkpointId);
|
|
||||||
if (asyncCompaction) {
|
if (asyncCompaction) {
|
||||||
try {
|
try {
|
||||||
compactFunctionWrapper.compact(checkpointId);
|
compactFunctionWrapper.compact(checkpointId);
|
||||||
|
|||||||
Reference in New Issue
Block a user