[HUDI-4275] Refactor rollback inflight instant for clustering/compaction to reuse some code (#5894)
This commit is contained in:
@@ -999,7 +999,6 @@ public abstract class BaseHoodieWriteClient<T extends HoodieRecordPayload, I, K,
|
|||||||
return scheduleTableService(instantTime, extraMetadata, TableServiceType.COMPACT).isPresent();
|
return scheduleTableService(instantTime, extraMetadata, TableServiceType.COMPACT).isPresent();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Schedules INDEX action.
|
* Schedules INDEX action.
|
||||||
*
|
*
|
||||||
@@ -1094,7 +1093,7 @@ public abstract class BaseHoodieWriteClient<T extends HoodieRecordPayload, I, K,
|
|||||||
return getPendingRollbackInfo(metaClient, commitToRollback, true);
|
return getPendingRollbackInfo(metaClient, commitToRollback, true);
|
||||||
}
|
}
|
||||||
|
|
||||||
protected Option<HoodiePendingRollbackInfo> getPendingRollbackInfo(HoodieTableMetaClient metaClient, String commitToRollback, boolean ignoreCompactionAndClusteringInstants) {
|
public Option<HoodiePendingRollbackInfo> getPendingRollbackInfo(HoodieTableMetaClient metaClient, String commitToRollback, boolean ignoreCompactionAndClusteringInstants) {
|
||||||
return getPendingRollbackInfos(metaClient, ignoreCompactionAndClusteringInstants).getOrDefault(commitToRollback, Option.empty());
|
return getPendingRollbackInfos(metaClient, ignoreCompactionAndClusteringInstants).getOrDefault(commitToRollback, Option.empty());
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -1375,14 +1374,6 @@ public abstract class BaseHoodieWriteClient<T extends HoodieRecordPayload, I, K,
|
|||||||
return scheduleClustering(extraMetadata);
|
return scheduleClustering(extraMetadata);
|
||||||
}
|
}
|
||||||
|
|
||||||
public void rollbackInflightClustering(HoodieInstant inflightInstant, HoodieTable table) {
|
|
||||||
Option<HoodiePendingRollbackInfo> pendingRollbackInstantInfo = getPendingRollbackInfo(table.getMetaClient(), inflightInstant.getTimestamp(), false);
|
|
||||||
String commitTime = pendingRollbackInstantInfo.map(entry -> entry.getRollbackInstant().getTimestamp()).orElse(HoodieActiveTimeline.createNewInstantTime());
|
|
||||||
table.scheduleRollback(context, commitTime, inflightInstant, false, config.shouldRollbackUsingMarkers());
|
|
||||||
table.rollback(context, commitTime, inflightInstant, false, false);
|
|
||||||
table.getActiveTimeline().revertReplaceCommitInflightToRequested(inflightInstant);
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Finalize Write operation.
|
* Finalize Write operation.
|
||||||
*
|
*
|
||||||
|
|||||||
@@ -18,8 +18,6 @@
|
|||||||
|
|
||||||
package org.apache.hudi.client;
|
package org.apache.hudi.client;
|
||||||
|
|
||||||
import org.apache.hadoop.fs.FileStatus;
|
|
||||||
import org.apache.hadoop.fs.Path;
|
|
||||||
import org.apache.hudi.avro.model.HoodieCompactionOperation;
|
import org.apache.hudi.avro.model.HoodieCompactionOperation;
|
||||||
import org.apache.hudi.avro.model.HoodieCompactionPlan;
|
import org.apache.hudi.avro.model.HoodieCompactionPlan;
|
||||||
import org.apache.hudi.common.engine.HoodieEngineContext;
|
import org.apache.hudi.common.engine.HoodieEngineContext;
|
||||||
@@ -44,6 +42,9 @@ import org.apache.hudi.config.HoodieWriteConfig;
|
|||||||
import org.apache.hudi.exception.HoodieException;
|
import org.apache.hudi.exception.HoodieException;
|
||||||
import org.apache.hudi.exception.HoodieIOException;
|
import org.apache.hudi.exception.HoodieIOException;
|
||||||
import org.apache.hudi.table.action.compact.OperationResult;
|
import org.apache.hudi.table.action.compact.OperationResult;
|
||||||
|
|
||||||
|
import org.apache.hadoop.fs.FileStatus;
|
||||||
|
import org.apache.hadoop.fs.Path;
|
||||||
import org.apache.log4j.LogManager;
|
import org.apache.log4j.LogManager;
|
||||||
import org.apache.log4j.Logger;
|
import org.apache.log4j.Logger;
|
||||||
|
|
||||||
@@ -172,7 +173,7 @@ public class CompactionAdminClient extends BaseHoodieClient {
|
|||||||
Path inflightPath = new Path(metaClient.getMetaPath(), inflight.getFileName());
|
Path inflightPath = new Path(metaClient.getMetaPath(), inflight.getFileName());
|
||||||
if (metaClient.getFs().exists(inflightPath)) {
|
if (metaClient.getFs().exists(inflightPath)) {
|
||||||
// revert if in inflight state
|
// revert if in inflight state
|
||||||
metaClient.getActiveTimeline().revertCompactionInflightToRequested(inflight);
|
metaClient.getActiveTimeline().revertInstantFromInflightToRequested(inflight);
|
||||||
}
|
}
|
||||||
// Overwrite compaction plan with updated info
|
// Overwrite compaction plan with updated info
|
||||||
metaClient.getActiveTimeline().saveToCompactionRequested(
|
metaClient.getActiveTimeline().saveToCompactionRequested(
|
||||||
|
|||||||
@@ -18,11 +18,6 @@
|
|||||||
|
|
||||||
package org.apache.hudi.table;
|
package org.apache.hudi.table;
|
||||||
|
|
||||||
import org.apache.avro.Schema;
|
|
||||||
import org.apache.avro.specific.SpecificRecordBase;
|
|
||||||
import org.apache.hadoop.conf.Configuration;
|
|
||||||
import org.apache.hadoop.fs.FileSystem;
|
|
||||||
import org.apache.hadoop.fs.Path;
|
|
||||||
import org.apache.hudi.avro.HoodieAvroUtils;
|
import org.apache.hudi.avro.HoodieAvroUtils;
|
||||||
import org.apache.hudi.avro.model.HoodieCleanMetadata;
|
import org.apache.hudi.avro.model.HoodieCleanMetadata;
|
||||||
import org.apache.hudi.avro.model.HoodieCleanerPlan;
|
import org.apache.hudi.avro.model.HoodieCleanerPlan;
|
||||||
@@ -65,6 +60,7 @@ import org.apache.hudi.common.table.view.TableFileSystemView.BaseFileOnlyView;
|
|||||||
import org.apache.hudi.common.table.view.TableFileSystemView.SliceView;
|
import org.apache.hudi.common.table.view.TableFileSystemView.SliceView;
|
||||||
import org.apache.hudi.common.util.Functions;
|
import org.apache.hudi.common.util.Functions;
|
||||||
import org.apache.hudi.common.util.Option;
|
import org.apache.hudi.common.util.Option;
|
||||||
|
import org.apache.hudi.common.util.ValidationUtils;
|
||||||
import org.apache.hudi.common.util.collection.Pair;
|
import org.apache.hudi.common.util.collection.Pair;
|
||||||
import org.apache.hudi.config.HoodieWriteConfig;
|
import org.apache.hudi.config.HoodieWriteConfig;
|
||||||
import org.apache.hudi.exception.HoodieException;
|
import org.apache.hudi.exception.HoodieException;
|
||||||
@@ -82,6 +78,12 @@ import org.apache.hudi.table.marker.WriteMarkers;
|
|||||||
import org.apache.hudi.table.marker.WriteMarkersFactory;
|
import org.apache.hudi.table.marker.WriteMarkersFactory;
|
||||||
import org.apache.hudi.table.storage.HoodieLayoutFactory;
|
import org.apache.hudi.table.storage.HoodieLayoutFactory;
|
||||||
import org.apache.hudi.table.storage.HoodieStorageLayout;
|
import org.apache.hudi.table.storage.HoodieStorageLayout;
|
||||||
|
|
||||||
|
import org.apache.avro.Schema;
|
||||||
|
import org.apache.avro.specific.SpecificRecordBase;
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
|
import org.apache.hadoop.fs.Path;
|
||||||
import org.apache.log4j.LogManager;
|
import org.apache.log4j.LogManager;
|
||||||
import org.apache.log4j.Logger;
|
import org.apache.log4j.Logger;
|
||||||
|
|
||||||
@@ -545,12 +547,37 @@ public abstract class HoodieTable<T extends HoodieRecordPayload, I, K, O> implem
|
|||||||
*
|
*
|
||||||
* @param inflightInstant Inflight Compaction Instant
|
* @param inflightInstant Inflight Compaction Instant
|
||||||
*/
|
*/
|
||||||
public void rollbackInflightCompaction(HoodieInstant inflightInstant, Function<String, Option<HoodiePendingRollbackInfo>> getPendingRollbackInstantFunc) {
|
public void rollbackInflightCompaction(HoodieInstant inflightInstant,
|
||||||
|
Function<String, Option<HoodiePendingRollbackInfo>> getPendingRollbackInstantFunc) {
|
||||||
|
ValidationUtils.checkArgument(inflightInstant.getAction().equals(HoodieTimeline.COMPACTION_ACTION));
|
||||||
|
rollbackInflightInstant(inflightInstant, getPendingRollbackInstantFunc);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Rollback inflight clustering instant to requested clustering instant
|
||||||
|
*
|
||||||
|
* @param inflightInstant Inflight clustering instant
|
||||||
|
* @param getPendingRollbackInstantFunc Function to get rollback instant
|
||||||
|
*/
|
||||||
|
public void rollbackInflightClustering(HoodieInstant inflightInstant,
|
||||||
|
Function<String, Option<HoodiePendingRollbackInfo>> getPendingRollbackInstantFunc) {
|
||||||
|
ValidationUtils.checkArgument(inflightInstant.getAction().equals(HoodieTimeline.REPLACE_COMMIT_ACTION));
|
||||||
|
rollbackInflightInstant(inflightInstant, getPendingRollbackInstantFunc);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Rollback inflight instant to requested instant
|
||||||
|
*
|
||||||
|
* @param inflightInstant Inflight instant
|
||||||
|
* @param getPendingRollbackInstantFunc Function to get rollback instant
|
||||||
|
*/
|
||||||
|
private void rollbackInflightInstant(HoodieInstant inflightInstant,
|
||||||
|
Function<String, Option<HoodiePendingRollbackInfo>> getPendingRollbackInstantFunc) {
|
||||||
final String commitTime = getPendingRollbackInstantFunc.apply(inflightInstant.getTimestamp()).map(entry
|
final String commitTime = getPendingRollbackInstantFunc.apply(inflightInstant.getTimestamp()).map(entry
|
||||||
-> entry.getRollbackInstant().getTimestamp()).orElse(HoodieActiveTimeline.createNewInstantTime());
|
-> entry.getRollbackInstant().getTimestamp()).orElse(HoodieActiveTimeline.createNewInstantTime());
|
||||||
scheduleRollback(context, commitTime, inflightInstant, false, config.shouldRollbackUsingMarkers());
|
scheduleRollback(context, commitTime, inflightInstant, false, config.shouldRollbackUsingMarkers());
|
||||||
rollback(context, commitTime, inflightInstant, false, false);
|
rollback(context, commitTime, inflightInstant, false, false);
|
||||||
getActiveTimeline().revertCompactionInflightToRequested(inflightInstant);
|
getActiveTimeline().revertInstantFromInflightToRequested(inflightInstant);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|||||||
@@ -355,7 +355,7 @@ public class SparkRDDWriteClient<T extends HoodieRecordPayload> extends
|
|||||||
HoodieTimeline pendingClusteringTimeline = table.getActiveTimeline().filterPendingReplaceTimeline();
|
HoodieTimeline pendingClusteringTimeline = table.getActiveTimeline().filterPendingReplaceTimeline();
|
||||||
HoodieInstant inflightInstant = HoodieTimeline.getReplaceCommitInflightInstant(clusteringInstant);
|
HoodieInstant inflightInstant = HoodieTimeline.getReplaceCommitInflightInstant(clusteringInstant);
|
||||||
if (pendingClusteringTimeline.containsInstant(inflightInstant)) {
|
if (pendingClusteringTimeline.containsInstant(inflightInstant)) {
|
||||||
rollbackInflightClustering(inflightInstant, table);
|
table.rollbackInflightClustering(inflightInstant, commitToRollback -> getPendingRollbackInfo(table.getMetaClient(), commitToRollback, false));
|
||||||
table.getMetaClient().reloadActiveTimeline();
|
table.getMetaClient().reloadActiveTimeline();
|
||||||
}
|
}
|
||||||
clusteringTimer = metrics.getClusteringCtx();
|
clusteringTimer = metrics.getClusteringCtx();
|
||||||
|
|||||||
@@ -18,10 +18,6 @@
|
|||||||
|
|
||||||
package org.apache.hudi.common.table.timeline;
|
package org.apache.hudi.common.table.timeline;
|
||||||
|
|
||||||
import org.apache.hadoop.fs.FSDataInputStream;
|
|
||||||
import org.apache.hadoop.fs.FSDataOutputStream;
|
|
||||||
import org.apache.hadoop.fs.FileSystem;
|
|
||||||
import org.apache.hadoop.fs.Path;
|
|
||||||
import org.apache.hudi.avro.model.HoodieRequestedReplaceMetadata;
|
import org.apache.hudi.avro.model.HoodieRequestedReplaceMetadata;
|
||||||
import org.apache.hudi.common.model.HoodieCommitMetadata;
|
import org.apache.hudi.common.model.HoodieCommitMetadata;
|
||||||
import org.apache.hudi.common.table.HoodieTableMetaClient;
|
import org.apache.hudi.common.table.HoodieTableMetaClient;
|
||||||
@@ -32,6 +28,11 @@ import org.apache.hudi.common.util.StringUtils;
|
|||||||
import org.apache.hudi.common.util.ValidationUtils;
|
import org.apache.hudi.common.util.ValidationUtils;
|
||||||
import org.apache.hudi.common.util.collection.Pair;
|
import org.apache.hudi.common.util.collection.Pair;
|
||||||
import org.apache.hudi.exception.HoodieIOException;
|
import org.apache.hudi.exception.HoodieIOException;
|
||||||
|
|
||||||
|
import org.apache.hadoop.fs.FSDataInputStream;
|
||||||
|
import org.apache.hadoop.fs.FSDataOutputStream;
|
||||||
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
|
import org.apache.hadoop.fs.Path;
|
||||||
import org.apache.log4j.LogManager;
|
import org.apache.log4j.LogManager;
|
||||||
import org.apache.log4j.Logger;
|
import org.apache.log4j.Logger;
|
||||||
|
|
||||||
@@ -347,16 +348,15 @@ public class HoodieActiveTimeline extends HoodieDefaultTimeline {
|
|||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Revert compaction State from inflight to requested.
|
* Revert instant state from inflight to requested.
|
||||||
*
|
*
|
||||||
* @param inflightInstant Inflight Instant
|
* @param inflightInstant Inflight Instant
|
||||||
* @return requested instant
|
* @return requested instant
|
||||||
*/
|
*/
|
||||||
public HoodieInstant revertCompactionInflightToRequested(HoodieInstant inflightInstant) {
|
public HoodieInstant revertInstantFromInflightToRequested(HoodieInstant inflightInstant) {
|
||||||
ValidationUtils.checkArgument(inflightInstant.getAction().equals(HoodieTimeline.COMPACTION_ACTION));
|
|
||||||
ValidationUtils.checkArgument(inflightInstant.isInflight());
|
ValidationUtils.checkArgument(inflightInstant.isInflight());
|
||||||
HoodieInstant requestedInstant =
|
HoodieInstant requestedInstant =
|
||||||
new HoodieInstant(State.REQUESTED, COMPACTION_ACTION, inflightInstant.getTimestamp());
|
new HoodieInstant(State.REQUESTED, inflightInstant.getAction(), inflightInstant.getTimestamp());
|
||||||
if (metaClient.getTimelineLayoutVersion().isNullVersion()) {
|
if (metaClient.getTimelineLayoutVersion().isNullVersion()) {
|
||||||
// Pass empty data since it is read from the corresponding .aux/.compaction instant file
|
// Pass empty data since it is read from the corresponding .aux/.compaction instant file
|
||||||
transitionState(inflightInstant, requestedInstant, Option.empty());
|
transitionState(inflightInstant, requestedInstant, Option.empty());
|
||||||
@@ -514,26 +514,6 @@ public class HoodieActiveTimeline extends HoodieDefaultTimeline {
|
|||||||
return commitInstant;
|
return commitInstant;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* Revert replace requested State from inflight to requested.
|
|
||||||
*
|
|
||||||
* @param inflightInstant Inflight Instant
|
|
||||||
* @return requested instant
|
|
||||||
*/
|
|
||||||
public HoodieInstant revertReplaceCommitInflightToRequested(HoodieInstant inflightInstant) {
|
|
||||||
ValidationUtils.checkArgument(inflightInstant.getAction().equals(HoodieTimeline.REPLACE_COMMIT_ACTION));
|
|
||||||
ValidationUtils.checkArgument(inflightInstant.isInflight());
|
|
||||||
HoodieInstant requestedInstant =
|
|
||||||
new HoodieInstant(State.REQUESTED, REPLACE_COMMIT_ACTION, inflightInstant.getTimestamp());
|
|
||||||
if (metaClient.getTimelineLayoutVersion().isNullVersion()) {
|
|
||||||
// Pass empty data since it is read from the corresponding .aux/.compaction instant file
|
|
||||||
transitionState(inflightInstant, requestedInstant, Option.empty());
|
|
||||||
} else {
|
|
||||||
deleteInflight(inflightInstant);
|
|
||||||
}
|
|
||||||
return requestedInstant;
|
|
||||||
}
|
|
||||||
|
|
||||||
private void transitionState(HoodieInstant fromInstant, HoodieInstant toInstant, Option<byte[]> data) {
|
private void transitionState(HoodieInstant fromInstant, HoodieInstant toInstant, Option<byte[]> data) {
|
||||||
transitionState(fromInstant, toInstant, data, false);
|
transitionState(fromInstant, toInstant, data, false);
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -336,7 +336,7 @@ public class TestHoodieActiveTimeline extends HoodieCommonTestHarness {
|
|||||||
timeline = timeline.reload();
|
timeline = timeline.reload();
|
||||||
assertFalse(timeline.containsInstant(compaction));
|
assertFalse(timeline.containsInstant(compaction));
|
||||||
assertTrue(timeline.containsInstant(inflight));
|
assertTrue(timeline.containsInstant(inflight));
|
||||||
compaction = timeline.revertCompactionInflightToRequested(inflight);
|
compaction = timeline.revertInstantFromInflightToRequested(inflight);
|
||||||
timeline = timeline.reload();
|
timeline = timeline.reload();
|
||||||
assertTrue(timeline.containsInstant(compaction));
|
assertTrue(timeline.containsInstant(compaction));
|
||||||
assertFalse(timeline.containsInstant(inflight));
|
assertFalse(timeline.containsInstant(inflight));
|
||||||
|
|||||||
@@ -114,7 +114,8 @@ public class HoodieFlinkClusteringJob {
|
|||||||
HoodieInstant inflightInstant = HoodieTimeline.getReplaceCommitInflightInstant(clusteringInstant.getTimestamp());
|
HoodieInstant inflightInstant = HoodieTimeline.getReplaceCommitInflightInstant(clusteringInstant.getTimestamp());
|
||||||
if (timeline.containsInstant(inflightInstant)) {
|
if (timeline.containsInstant(inflightInstant)) {
|
||||||
LOG.info("Rollback inflight clustering instant: [" + clusteringInstant + "]");
|
LOG.info("Rollback inflight clustering instant: [" + clusteringInstant + "]");
|
||||||
writeClient.rollbackInflightClustering(inflightInstant, table);
|
table.rollbackInflightClustering(inflightInstant,
|
||||||
|
commitToRollback -> writeClient.getPendingRollbackInfo(table.getMetaClient(), commitToRollback, false));
|
||||||
table.getMetaClient().reloadActiveTimeline();
|
table.getMetaClient().reloadActiveTimeline();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user