From 3a78be9203a9c3cea33fa6120c89f7702275fc31 Mon Sep 17 00:00:00 2001 From: Danny Chan Date: Tue, 19 Oct 2021 10:47:38 +0800 Subject: [PATCH] [HUDI-2572] Strength flink compaction rollback strategy (#3819) * make the events of commit task distinct by file id * fix the existence check for inflight state file * make the compaction task fail-safe --- .../hudi/sink/compact/CompactFunction.java | 2 +- .../sink/compact/CompactionCommitEvent.java | 17 ++++++- .../sink/compact/CompactionCommitSink.java | 47 +++++++++---------- .../sink/compact/CompactionPlanOperator.java | 27 ++++------- 4 files changed, 50 insertions(+), 43 deletions(-) diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactFunction.java b/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactFunction.java index 591624429..57b79df1c 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactFunction.java +++ b/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactFunction.java @@ -99,7 +99,7 @@ public class CompactFunction extends ProcessFunction collector) throws IOException { List writeStatuses = FlinkCompactHelpers.compact(writeClient, instantTime, compactionOperation); - collector.collect(new CompactionCommitEvent(instantTime, writeStatuses, taskID)); + collector.collect(new CompactionCommitEvent(instantTime, compactionOperation.getFileId(), writeStatuses, taskID)); } @VisibleForTesting diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactionCommitEvent.java b/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactionCommitEvent.java index 52c0812d8..04449441c 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactionCommitEvent.java +++ b/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactionCommitEvent.java @@ -33,6 +33,12 @@ public class CompactionCommitEvent implements Serializable { * The compaction commit instant time. */ private String instant; + + /** + * The file ID. + */ + private String fileId; + /** * The write statuses. */ @@ -45,8 +51,9 @@ public class CompactionCommitEvent implements Serializable { public CompactionCommitEvent() { } - public CompactionCommitEvent(String instant, List writeStatuses, int taskID) { + public CompactionCommitEvent(String instant, String fileId, List writeStatuses, int taskID) { this.instant = instant; + this.fileId = fileId; this.writeStatuses = writeStatuses; this.taskID = taskID; } @@ -55,6 +62,10 @@ public class CompactionCommitEvent implements Serializable { this.instant = instant; } + public void setFileId(String fileId) { + this.fileId = fileId; + } + public void setWriteStatuses(List writeStatuses) { this.writeStatuses = writeStatuses; } @@ -67,6 +78,10 @@ public class CompactionCommitEvent implements Serializable { return instant; } + public String getFileId() { + return fileId; + } + public List getWriteStatuses() { return writeStatuses; } diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactionCommitSink.java b/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactionCommitSink.java index e6c4cedaa..d90af2c32 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactionCommitSink.java +++ b/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactionCommitSink.java @@ -20,8 +20,6 @@ package org.apache.hudi.sink.compact; import org.apache.hudi.avro.model.HoodieCompactionPlan; import org.apache.hudi.client.WriteStatus; -import org.apache.hudi.common.model.HoodieCommitMetadata; -import org.apache.hudi.common.model.HoodieWriteStat; import org.apache.hudi.common.util.CompactionUtils; import org.apache.hudi.common.util.Option; import org.apache.hudi.configuration.FlinkOptions; @@ -33,7 +31,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; -import java.util.ArrayList; import java.util.Collection; import java.util.HashMap; import java.util.List; @@ -61,9 +58,12 @@ public class CompactionCommitSink extends CleanFunction { /** * Buffer to collect the event from each compact task {@code CompactFunction}. - * The key is the instant time. + * + *

Stores the mapping of instant_time -> file_id -> event. Use a map to collect the + * events because the rolling back of intermediate compaction tasks generates corrupt + * events. */ - private transient Map> commitBuffer; + private transient Map> commitBuffer; public CompactionCommitSink(Configuration conf) { super(conf); @@ -82,9 +82,9 @@ public class CompactionCommitSink extends CleanFunction { @Override public void invoke(CompactionCommitEvent event, Context context) throws Exception { final String instant = event.getInstant(); - commitBuffer.computeIfAbsent(instant, k -> new ArrayList<>()) - .add(event); - commitIfNecessary(instant, commitBuffer.get(instant)); + commitBuffer.computeIfAbsent(instant, k -> new HashMap<>()) + .put(event.getFileId(), event); + commitIfNecessary(instant, commitBuffer.get(instant).values()); } /** @@ -94,39 +94,38 @@ public class CompactionCommitSink extends CleanFunction { * @param instant Compaction commit instant time * @param events Commit events ever received for the instant */ - private void commitIfNecessary(String instant, List events) throws IOException { + private void commitIfNecessary(String instant, Collection events) throws IOException { HoodieCompactionPlan compactionPlan = CompactionUtils.getCompactionPlan( this.writeClient.getHoodieTable().getMetaClient(), instant); boolean isReady = compactionPlan.getOperations().size() == events.size(); if (!isReady) { return; } + try { + doCommit(instant, events); + } catch (Throwable throwable) { + // make it fail-safe + LOG.error("Error while committing compaction instant: " + instant, throwable); + } finally { + // reset the status + reset(instant); + } + } + + @SuppressWarnings("unchecked") + private void doCommit(String instant, Collection events) throws IOException { List statuses = events.stream() .map(CompactionCommitEvent::getWriteStatuses) .flatMap(Collection::stream) .collect(Collectors.toList()); - if (this.writeClient.getConfig().shouldAutoCommit()) { - // Prepare the commit metadata. - List updateStatusMap = statuses.stream().map(WriteStatus::getStat).collect(Collectors.toList()); - HoodieCommitMetadata metadata = new HoodieCommitMetadata(true); - for (HoodieWriteStat stat : updateStatusMap) { - metadata.addWriteStat(stat.getPartitionPath(), stat); - } - metadata.addMetadata(HoodieCommitMetadata.SCHEMA_KEY, writeClient.getConfig().getSchema()); - this.writeClient.completeCompaction( - metadata, statuses, this.writeClient.getHoodieTable(), instant); - } // commit the compaction this.writeClient.commitCompaction(instant, statuses, Option.empty()); - // Whether to cleanup the old log file when compaction + // Whether to clean up the old log file when compaction if (!conf.getBoolean(FlinkOptions.CLEAN_ASYNC_ENABLED)) { this.writeClient.clean(); } - - // reset the status - reset(instant); } private void reset(String instant) { diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactionPlanOperator.java b/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactionPlanOperator.java index a17ea0404..325d88f3e 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactionPlanOperator.java +++ b/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactionPlanOperator.java @@ -38,7 +38,6 @@ import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import java.io.IOException; import java.util.List; -import java.util.Objects; import static java.util.stream.Collectors.toList; @@ -61,9 +60,9 @@ public class CompactionPlanOperator extends AbstractStreamOperator operations = compactionPlan.getOperations().stream() .map(CompactionOperation::convertFromAvroRecordInstance).collect(toList()); - LOG.info("CompactionPlanOperator compacting " + operations + " files"); + LOG.info("Execute compaction plan for instant {} as {} file groups", compactionInstantTime, operations.size()); for (CompactionOperation operation : operations) { output.collect(new StreamRecord<>(new CompactionPlanEvent(compactionInstantTime, operation))); }