1
0

[HUDI-2572] Strength flink compaction rollback strategy (#3819)

* make the events of commit task distinct by file id
* fix the existence check for inflight state file
* make the compaction task fail-safe
This commit is contained in:
Danny Chan
2021-10-19 10:47:38 +08:00
committed by GitHub
parent 335e80ea1b
commit 3a78be9203
4 changed files with 50 additions and 43 deletions

View File

@@ -99,7 +99,7 @@ public class CompactFunction extends ProcessFunction<CompactionPlanEvent, Compac
private void doCompaction(String instantTime, CompactionOperation compactionOperation, Collector<CompactionCommitEvent> collector) throws IOException {
List<WriteStatus> writeStatuses = FlinkCompactHelpers.compact(writeClient, instantTime, compactionOperation);
collector.collect(new CompactionCommitEvent(instantTime, writeStatuses, taskID));
collector.collect(new CompactionCommitEvent(instantTime, compactionOperation.getFileId(), writeStatuses, taskID));
}
@VisibleForTesting

View File

@@ -33,6 +33,12 @@ public class CompactionCommitEvent implements Serializable {
* The compaction commit instant time.
*/
private String instant;
/**
* The file ID.
*/
private String fileId;
/**
* The write statuses.
*/
@@ -45,8 +51,9 @@ public class CompactionCommitEvent implements Serializable {
public CompactionCommitEvent() {
}
public CompactionCommitEvent(String instant, List<WriteStatus> writeStatuses, int taskID) {
public CompactionCommitEvent(String instant, String fileId, List<WriteStatus> writeStatuses, int taskID) {
this.instant = instant;
this.fileId = fileId;
this.writeStatuses = writeStatuses;
this.taskID = taskID;
}
@@ -55,6 +62,10 @@ public class CompactionCommitEvent implements Serializable {
this.instant = instant;
}
public void setFileId(String fileId) {
this.fileId = fileId;
}
public void setWriteStatuses(List<WriteStatus> writeStatuses) {
this.writeStatuses = writeStatuses;
}
@@ -67,6 +78,10 @@ public class CompactionCommitEvent implements Serializable {
return instant;
}
public String getFileId() {
return fileId;
}
public List<WriteStatus> getWriteStatuses() {
return writeStatuses;
}

View File

@@ -20,8 +20,6 @@ package org.apache.hudi.sink.compact;
import org.apache.hudi.avro.model.HoodieCompactionPlan;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.HoodieWriteStat;
import org.apache.hudi.common.util.CompactionUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.configuration.FlinkOptions;
@@ -33,7 +31,6 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
@@ -61,9 +58,12 @@ public class CompactionCommitSink extends CleanFunction<CompactionCommitEvent> {
/**
* Buffer to collect the event from each compact task {@code CompactFunction}.
* The key is the instant time.
*
* <p>Stores the mapping of instant_time -> file_id -> event. Use a map to collect the
* events because the rolling back of intermediate compaction tasks generates corrupt
* events.
*/
private transient Map<String, List<CompactionCommitEvent>> commitBuffer;
private transient Map<String, Map<String, CompactionCommitEvent>> commitBuffer;
public CompactionCommitSink(Configuration conf) {
super(conf);
@@ -82,9 +82,9 @@ public class CompactionCommitSink extends CleanFunction<CompactionCommitEvent> {
@Override
public void invoke(CompactionCommitEvent event, Context context) throws Exception {
final String instant = event.getInstant();
commitBuffer.computeIfAbsent(instant, k -> new ArrayList<>())
.add(event);
commitIfNecessary(instant, commitBuffer.get(instant));
commitBuffer.computeIfAbsent(instant, k -> new HashMap<>())
.put(event.getFileId(), event);
commitIfNecessary(instant, commitBuffer.get(instant).values());
}
/**
@@ -94,39 +94,38 @@ public class CompactionCommitSink extends CleanFunction<CompactionCommitEvent> {
* @param instant Compaction commit instant time
* @param events Commit events ever received for the instant
*/
private void commitIfNecessary(String instant, List<CompactionCommitEvent> events) throws IOException {
private void commitIfNecessary(String instant, Collection<CompactionCommitEvent> events) throws IOException {
HoodieCompactionPlan compactionPlan = CompactionUtils.getCompactionPlan(
this.writeClient.getHoodieTable().getMetaClient(), instant);
boolean isReady = compactionPlan.getOperations().size() == events.size();
if (!isReady) {
return;
}
try {
doCommit(instant, events);
} catch (Throwable throwable) {
// make it fail-safe
LOG.error("Error while committing compaction instant: " + instant, throwable);
} finally {
// reset the status
reset(instant);
}
}
@SuppressWarnings("unchecked")
private void doCommit(String instant, Collection<CompactionCommitEvent> events) throws IOException {
List<WriteStatus> statuses = events.stream()
.map(CompactionCommitEvent::getWriteStatuses)
.flatMap(Collection::stream)
.collect(Collectors.toList());
if (this.writeClient.getConfig().shouldAutoCommit()) {
// Prepare the commit metadata.
List<HoodieWriteStat> updateStatusMap = statuses.stream().map(WriteStatus::getStat).collect(Collectors.toList());
HoodieCommitMetadata metadata = new HoodieCommitMetadata(true);
for (HoodieWriteStat stat : updateStatusMap) {
metadata.addWriteStat(stat.getPartitionPath(), stat);
}
metadata.addMetadata(HoodieCommitMetadata.SCHEMA_KEY, writeClient.getConfig().getSchema());
this.writeClient.completeCompaction(
metadata, statuses, this.writeClient.getHoodieTable(), instant);
}
// commit the compaction
this.writeClient.commitCompaction(instant, statuses, Option.empty());
// Whether to cleanup the old log file when compaction
// Whether to clean up the old log file when compaction
if (!conf.getBoolean(FlinkOptions.CLEAN_ASYNC_ENABLED)) {
this.writeClient.clean();
}
// reset the status
reset(instant);
}
private void reset(String instant) {

View File

@@ -38,7 +38,6 @@ import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import java.io.IOException;
import java.util.List;
import java.util.Objects;
import static java.util.stream.Collectors.toList;
@@ -61,9 +60,9 @@ public class CompactionPlanOperator extends AbstractStreamOperator<CompactionPla
private transient HoodieFlinkWriteClient writeClient;
/**
* Compaction instant time.
* Meta Client.
*/
private String compactionInstantTime;
private transient HoodieFlinkTable table;
public CompactionPlanOperator(Configuration conf) {
this.conf = conf;
@@ -73,6 +72,7 @@ public class CompactionPlanOperator extends AbstractStreamOperator<CompactionPla
public void open() throws Exception {
super.open();
this.writeClient = StreamerUtil.createWriteClient(conf, getRuntimeContext());
this.table = writeClient.getHoodieTable();
}
@Override
@@ -83,12 +83,12 @@ public class CompactionPlanOperator extends AbstractStreamOperator<CompactionPla
@Override
public void notifyCheckpointComplete(long checkpointId) {
try {
HoodieFlinkTable hoodieTable = writeClient.getHoodieTable();
CompactionUtil.rollbackCompaction(hoodieTable, writeClient, conf);
scheduleCompaction(hoodieTable, checkpointId);
table.getMetaClient().reloadActiveTimeline();
CompactionUtil.rollbackCompaction(table, writeClient, conf);
scheduleCompaction(table, checkpointId);
} catch (Throwable throwable) {
// make it fail safe
LOG.error("Error while scheduling compaction at instant: " + compactionInstantTime, throwable);
// make it fail-safe
LOG.error("Error while scheduling compaction plan for checkpoint: " + checkpointId, throwable);
}
}
@@ -103,12 +103,6 @@ public class CompactionPlanOperator extends AbstractStreamOperator<CompactionPla
}
String compactionInstantTime = lastRequested.get().getTimestamp();
if (this.compactionInstantTime != null
&& Objects.equals(this.compactionInstantTime, compactionInstantTime)) {
// do nothing
LOG.info("Duplicate scheduling for compaction instant: " + compactionInstantTime + ", ignore");
return;
}
// generate compaction plan
// should support configurable commit metadata
@@ -118,9 +112,8 @@ public class CompactionPlanOperator extends AbstractStreamOperator<CompactionPla
if (compactionPlan == null || (compactionPlan.getOperations() == null)
|| (compactionPlan.getOperations().isEmpty())) {
// do nothing.
LOG.info("No compaction plan for checkpoint " + checkpointId + " and instant " + compactionInstantTime);
LOG.info("Empty compaction plan for instant " + compactionInstantTime);
} else {
this.compactionInstantTime = compactionInstantTime;
HoodieInstant instant = HoodieTimeline.getCompactionRequestedInstant(compactionInstantTime);
// Mark instant as compaction inflight
table.getActiveTimeline().transitionCompactionRequestedToInflight(instant);
@@ -128,7 +121,7 @@ public class CompactionPlanOperator extends AbstractStreamOperator<CompactionPla
List<CompactionOperation> operations = compactionPlan.getOperations().stream()
.map(CompactionOperation::convertFromAvroRecordInstance).collect(toList());
LOG.info("CompactionPlanOperator compacting " + operations + " files");
LOG.info("Execute compaction plan for instant {} as {} file groups", compactionInstantTime, operations.size());
for (CompactionOperation operation : operations) {
output.collect(new StreamRecord<>(new CompactionPlanEvent(compactionInstantTime, operation)));
}