diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java index 6ce0564e2..a3ba00895 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java @@ -19,6 +19,7 @@ package org.apache.hudi.client; import com.codahale.metrics.Timer; +import java.util.stream.Stream; import org.apache.hadoop.conf.Configuration; import org.apache.hudi.avro.model.HoodieCleanMetadata; import org.apache.hudi.avro.model.HoodieClusteringPlan; @@ -32,7 +33,6 @@ import org.apache.hudi.client.embedded.EmbeddedTimelineService; import org.apache.hudi.client.heartbeat.HeartbeatUtils; import org.apache.hudi.common.engine.HoodieEngineContext; import org.apache.hudi.common.model.HoodieCommitMetadata; -import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy; import org.apache.hudi.common.model.HoodieKey; import org.apache.hudi.common.model.HoodieRecordPayload; import org.apache.hudi.common.model.HoodieWriteStat; @@ -765,21 +765,20 @@ public abstract class AbstractHoodieWriteClient getInstantsToRollback(HoodieTable table) { + Stream inflightInstantsStream = getInflightTimelineExcludeCompactionAndClustering(table) + .getReverseOrderedInstants(); if (config.getFailedWritesCleanPolicy().isEager()) { - HoodieTimeline inflightTimeline = table.getMetaClient().getCommitsTimeline().filterPendingExcludingCompaction(); - return inflightTimeline.getReverseOrderedInstants().map(HoodieInstant::getTimestamp) - .collect(Collectors.toList()); - } else if (config.getFailedWritesCleanPolicy() == HoodieFailedWritesCleaningPolicy.NEVER) { - return Collections.EMPTY_LIST; + return inflightInstantsStream.map(HoodieInstant::getTimestamp).collect(Collectors.toList()); } else if (config.getFailedWritesCleanPolicy().isLazy()) { - return table.getMetaClient().getActiveTimeline() - .getCommitsTimeline().filterInflights().getReverseOrderedInstants().filter(instant -> { - try { - return heartbeatClient.isHeartbeatExpired(instant.getTimestamp()); - } catch (IOException io) { - throw new HoodieException("Failed to check heartbeat for instant " + instant, io); - } - }).map(HoodieInstant::getTimestamp).collect(Collectors.toList()); + return inflightInstantsStream.filter(instant -> { + try { + return heartbeatClient.isHeartbeatExpired(instant.getTimestamp()); + } catch (IOException io) { + throw new HoodieException("Failed to check heartbeat for instant " + instant, io); + } + }).map(HoodieInstant::getTimestamp).collect(Collectors.toList()); + } else if (config.getFailedWritesCleanPolicy().isNever()) { + return Collections.EMPTY_LIST; } else { throw new IllegalArgumentException("Invalid Failed Writes Cleaning Policy " + config.getFailedWritesCleanPolicy()); }