[HUDI-1016] Code optimization in MergeOnReadRollbackActionExecutor(#1718)
This commit is contained in:
@@ -67,12 +67,12 @@ public class MergeOnReadRollbackActionExecutor extends BaseRollbackActionExecuto
|
|||||||
@Override
|
@Override
|
||||||
protected List<HoodieRollbackStat> executeRollback() throws IOException {
|
protected List<HoodieRollbackStat> executeRollback() throws IOException {
|
||||||
long startTime = System.currentTimeMillis();
|
long startTime = System.currentTimeMillis();
|
||||||
LOG.error("Rolling back instant " + instantToRollback.getTimestamp());
|
LOG.info("Rolling back instant " + instantToRollback.getTimestamp());
|
||||||
|
|
||||||
HoodieInstant resolvedInstant = instantToRollback;
|
HoodieInstant resolvedInstant = instantToRollback;
|
||||||
// Atomically un-publish all non-inflight commits
|
// Atomically un-publish all non-inflight commits
|
||||||
if (instantToRollback.isCompleted()) {
|
if (instantToRollback.isCompleted()) {
|
||||||
LOG.error("Un-publishing instant " + instantToRollback + ", deleteInstants=" + deleteInstants);
|
LOG.info("Un-publishing instant " + instantToRollback + ", deleteInstants=" + deleteInstants);
|
||||||
resolvedInstant = table.getActiveTimeline().revertToInflight(instantToRollback);
|
resolvedInstant = table.getActiveTimeline().revertToInflight(instantToRollback);
|
||||||
// reload meta-client to reflect latest timeline status
|
// reload meta-client to reflect latest timeline status
|
||||||
table.getMetaClient().reloadActiveTimeline();
|
table.getMetaClient().reloadActiveTimeline();
|
||||||
@@ -86,11 +86,10 @@ public class MergeOnReadRollbackActionExecutor extends BaseRollbackActionExecuto
|
|||||||
// NOTE {@link HoodieCompactionConfig#withCompactionLazyBlockReadEnabled} needs to be set to TRUE. This is
|
// NOTE {@link HoodieCompactionConfig#withCompactionLazyBlockReadEnabled} needs to be set to TRUE. This is
|
||||||
// required to avoid OOM when merging multiple LogBlocks performed during nested rollbacks.
|
// required to avoid OOM when merging multiple LogBlocks performed during nested rollbacks.
|
||||||
// Atomically un-publish all non-inflight commits
|
// Atomically un-publish all non-inflight commits
|
||||||
// Atomically un-publish all non-inflight commits
|
|
||||||
// For Requested State (like failure during index lookup), there is nothing to do rollback other than
|
// For Requested State (like failure during index lookup), there is nothing to do rollback other than
|
||||||
// deleting the timeline file
|
// deleting the timeline file
|
||||||
if (!resolvedInstant.isRequested()) {
|
if (!resolvedInstant.isRequested()) {
|
||||||
LOG.info("Unpublished " + resolvedInstant);
|
LOG.info("Un-published " + resolvedInstant);
|
||||||
List<RollbackRequest> rollbackRequests = generateRollbackRequests(jsc, resolvedInstant);
|
List<RollbackRequest> rollbackRequests = generateRollbackRequests(jsc, resolvedInstant);
|
||||||
// TODO: We need to persist this as rollback workload and use it in case of partial failures
|
// TODO: We need to persist this as rollback workload and use it in case of partial failures
|
||||||
allRollbackStats = new RollbackHelper(table.getMetaClient(), config).performRollback(jsc, resolvedInstant, rollbackRequests);
|
allRollbackStats = new RollbackHelper(table.getMetaClient(), config).performRollback(jsc, resolvedInstant, rollbackRequests);
|
||||||
|
|||||||
Reference in New Issue
Block a user