From f2159c4573810f922fadff640a953175a852dc43 Mon Sep 17 00:00:00 2001 From: n3nash Date: Fri, 5 Mar 2021 11:18:09 -0800 Subject: [PATCH] [HUDI-1660] Excluding compaction and clustering instants from inflight rollback (#2631) --- .../client/AbstractHoodieWriteClient.java | 27 +++++++++---------- 1 file changed, 13 insertions(+), 14 deletions(-) diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java index 6ce0564e2..a3ba00895 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java @@ -19,6 +19,7 @@ package org.apache.hudi.client; import com.codahale.metrics.Timer; +import java.util.stream.Stream; import org.apache.hadoop.conf.Configuration; import org.apache.hudi.avro.model.HoodieCleanMetadata; import org.apache.hudi.avro.model.HoodieClusteringPlan; @@ -32,7 +33,6 @@ import org.apache.hudi.client.embedded.EmbeddedTimelineService; import org.apache.hudi.client.heartbeat.HeartbeatUtils; import org.apache.hudi.common.engine.HoodieEngineContext; import org.apache.hudi.common.model.HoodieCommitMetadata; -import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy; import org.apache.hudi.common.model.HoodieKey; import org.apache.hudi.common.model.HoodieRecordPayload; import org.apache.hudi.common.model.HoodieWriteStat; @@ -765,21 +765,20 @@ public abstract class AbstractHoodieWriteClient getInstantsToRollback(HoodieTable table) { + Stream inflightInstantsStream = getInflightTimelineExcludeCompactionAndClustering(table) + .getReverseOrderedInstants(); if (config.getFailedWritesCleanPolicy().isEager()) { - HoodieTimeline inflightTimeline = table.getMetaClient().getCommitsTimeline().filterPendingExcludingCompaction(); - return inflightTimeline.getReverseOrderedInstants().map(HoodieInstant::getTimestamp) - .collect(Collectors.toList()); - } else if (config.getFailedWritesCleanPolicy() == HoodieFailedWritesCleaningPolicy.NEVER) { - return Collections.EMPTY_LIST; + return inflightInstantsStream.map(HoodieInstant::getTimestamp).collect(Collectors.toList()); } else if (config.getFailedWritesCleanPolicy().isLazy()) { - return table.getMetaClient().getActiveTimeline() - .getCommitsTimeline().filterInflights().getReverseOrderedInstants().filter(instant -> { - try { - return heartbeatClient.isHeartbeatExpired(instant.getTimestamp()); - } catch (IOException io) { - throw new HoodieException("Failed to check heartbeat for instant " + instant, io); - } - }).map(HoodieInstant::getTimestamp).collect(Collectors.toList()); + return inflightInstantsStream.filter(instant -> { + try { + return heartbeatClient.isHeartbeatExpired(instant.getTimestamp()); + } catch (IOException io) { + throw new HoodieException("Failed to check heartbeat for instant " + instant, io); + } + }).map(HoodieInstant::getTimestamp).collect(Collectors.toList()); + } else if (config.getFailedWritesCleanPolicy().isNever()) { + return Collections.EMPTY_LIST; } else { throw new IllegalArgumentException("Invalid Failed Writes Cleaning Policy " + config.getFailedWritesCleanPolicy()); }