1
0

[HUDI-1660] Excluding compaction and clustering instants from inflight rollback (#2631)

This commit is contained in:
n3nash
2021-03-05 11:18:09 -08:00
committed by GitHub
parent bc883db5de
commit f2159c4573

View File

@@ -19,6 +19,7 @@
package org.apache.hudi.client; package org.apache.hudi.client;
import com.codahale.metrics.Timer; import com.codahale.metrics.Timer;
import java.util.stream.Stream;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hudi.avro.model.HoodieCleanMetadata; import org.apache.hudi.avro.model.HoodieCleanMetadata;
import org.apache.hudi.avro.model.HoodieClusteringPlan; import org.apache.hudi.avro.model.HoodieClusteringPlan;
@@ -32,7 +33,6 @@ import org.apache.hudi.client.embedded.EmbeddedTimelineService;
import org.apache.hudi.client.heartbeat.HeartbeatUtils; import org.apache.hudi.client.heartbeat.HeartbeatUtils;
import org.apache.hudi.common.engine.HoodieEngineContext; import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.model.HoodieCommitMetadata; import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy;
import org.apache.hudi.common.model.HoodieKey; import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecordPayload; import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.model.HoodieWriteStat; import org.apache.hudi.common.model.HoodieWriteStat;
@@ -765,21 +765,20 @@ public abstract class AbstractHoodieWriteClient<T extends HoodieRecordPayload, I
} }
private List<String> getInstantsToRollback(HoodieTable<T, I, K, O> table) { private List<String> getInstantsToRollback(HoodieTable<T, I, K, O> table) {
Stream<HoodieInstant> inflightInstantsStream = getInflightTimelineExcludeCompactionAndClustering(table)
.getReverseOrderedInstants();
if (config.getFailedWritesCleanPolicy().isEager()) { if (config.getFailedWritesCleanPolicy().isEager()) {
HoodieTimeline inflightTimeline = table.getMetaClient().getCommitsTimeline().filterPendingExcludingCompaction(); return inflightInstantsStream.map(HoodieInstant::getTimestamp).collect(Collectors.toList());
return inflightTimeline.getReverseOrderedInstants().map(HoodieInstant::getTimestamp)
.collect(Collectors.toList());
} else if (config.getFailedWritesCleanPolicy() == HoodieFailedWritesCleaningPolicy.NEVER) {
return Collections.EMPTY_LIST;
} else if (config.getFailedWritesCleanPolicy().isLazy()) { } else if (config.getFailedWritesCleanPolicy().isLazy()) {
return table.getMetaClient().getActiveTimeline() return inflightInstantsStream.filter(instant -> {
.getCommitsTimeline().filterInflights().getReverseOrderedInstants().filter(instant -> { try {
try { return heartbeatClient.isHeartbeatExpired(instant.getTimestamp());
return heartbeatClient.isHeartbeatExpired(instant.getTimestamp()); } catch (IOException io) {
} catch (IOException io) { throw new HoodieException("Failed to check heartbeat for instant " + instant, io);
throw new HoodieException("Failed to check heartbeat for instant " + instant, io); }
} }).map(HoodieInstant::getTimestamp).collect(Collectors.toList());
}).map(HoodieInstant::getTimestamp).collect(Collectors.toList()); } else if (config.getFailedWritesCleanPolicy().isNever()) {
return Collections.EMPTY_LIST;
} else { } else {
throw new IllegalArgumentException("Invalid Failed Writes Cleaning Policy " + config.getFailedWritesCleanPolicy()); throw new IllegalArgumentException("Invalid Failed Writes Cleaning Policy " + config.getFailedWritesCleanPolicy());
} }