1
0

[HUDI-3583] Fix MarkerBasedRollbackStrategy NoSuchElementException (#4984)

Co-authored-by: Y Ethan Guo <ethan.guoyihua@gmail.com>
This commit is contained in:
liujinhui
2022-03-13 15:00:50 +08:00
committed by GitHub
parent eee96e9af3
commit e60acc1258
2 changed files with 28 additions and 9 deletions

View File

@@ -18,7 +18,6 @@
package org.apache.hudi.table.action.rollback;
import org.apache.hadoop.fs.Path;
import org.apache.hudi.avro.model.HoodieRollbackRequest;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.fs.FSUtils;
@@ -27,16 +26,20 @@ import org.apache.hudi.common.model.HoodieLogFile;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.model.IOType;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieRollbackException;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.marker.MarkerBasedRollbackUtils;
import org.apache.hudi.table.marker.WriteMarkers;
import org.apache.hadoop.fs.Path;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -111,16 +114,18 @@ public class MarkerBasedRollbackStrategy<T extends HoodieRecordPayload, I, K, O>
// NOTE: Since we're rolling back incomplete Delta Commit, it only could have appended its
// block to the latest log-file
// TODO(HUDI-1517) use provided marker-file's path instead
HoodieLogFile latestLogFile = FSUtils.getLatestLogFile(table.getMetaClient().getFs(), partitionPath, fileId,
HoodieFileFormat.HOODIE_LOG.getFileExtension(), baseCommitTime).get();
// NOTE: Marker's don't carry information about the cumulative size of the blocks that have been appended,
// therefore we simply stub this value.
Map<String, Long> logFilesWithBlocsToRollback =
Collections.singletonMap(latestLogFile.getFileStatus().getPath().toString(), -1L);
Option<HoodieLogFile> latestLogFileOption = FSUtils.getLatestLogFile(table.getMetaClient().getFs(), partitionPath, fileId,
HoodieFileFormat.HOODIE_LOG.getFileExtension(), baseCommitTime);
Map<String, Long> logFilesWithBlocsToRollback = new HashMap<>();
if (latestLogFileOption.isPresent()) {
HoodieLogFile latestLogFile = latestLogFileOption.get();
// NOTE: Marker's don't carry information about the cumulative size of the blocks that have been appended,
// therefore we simply stub this value.
logFilesWithBlocsToRollback = Collections.singletonMap(latestLogFile.getFileStatus().getPath().toString(), -1L);
}
return new HoodieRollbackRequest(relativePartitionPath, fileId, baseCommitTime, Collections.emptyList(),
logFilesWithBlocsToRollback);
}
}

View File

@@ -80,6 +80,20 @@ public class TestMarkerBasedRollbackStrategy extends HoodieClientTestBase {
cleanupResources();
}
@Test
public void testMarkerBasedRollbackAppend() throws Exception {
HoodieTestTable testTable = HoodieTestTable.of(metaClient);
String f0 = testTable.addRequestedCommit("000")
.getFileIdsWithBaseFilesInPartitions("partA").get("partA");
testTable.forCommit("001")
.withMarkerFile("partA", f0, IOType.APPEND);
HoodieTable hoodieTable = HoodieSparkTable.create(getConfig(), context, metaClient);
List<HoodieRollbackRequest> rollbackRequests = new MarkerBasedRollbackStrategy(hoodieTable, context, getConfig(),
"002").getRollbackRequests(new HoodieInstant(HoodieInstant.State.INFLIGHT, HoodieTimeline.COMMIT_ACTION, "001"));
assertEquals(1, rollbackRequests.size());
}
@Test
public void testCopyOnWriteRollbackWithTestTable() throws Exception {
// given: wrote some base files and corresponding markers