1
0

[HUDI-839] Introducing support for rollbacks using marker files (#1756)

* [HUDI-839] Introducing rollback strategy using marker files

 - Adds a new mechanism for rollbacks where it's based on the marker files generated during the write
 - Consequently, marker file/dir deletion now happens post commit, instead of during finalize 
 - Marker files are also generated for AppendHandle, making it consistent throughout the write path 
 - Until upgrade-downgrade mechanism can upgrade non-marker based inflight writes to marker based, this should only be turned on for new datasets.
 - Added marker dir deletion after successful commit/rollback, individual files are not deleted during finalize
 - Fail safe for deleting marker directories, now during timeline archival process
 - Added check to ensure completed instants are not rolled back using marker based strategy. This will be incorrect
 - Reworked tests to rollback inflight instants, instead of completed instants whenever necessary
 - Added an unit test for MarkerBasedRollbackStrategy


Co-authored-by: Vinoth Chandar <vinoth@apache.org>
This commit is contained in:
lw0090
2020-07-21 13:41:42 +08:00
committed by GitHub
parent b71f25f210
commit 1ec89e9a94
43 changed files with 1947 additions and 512 deletions

View File

@@ -44,7 +44,9 @@ import org.apache.hudi.exception.HoodieCommitException;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.index.HoodieIndex;
import org.apache.hudi.index.HoodieIndex.IndexType;
import org.apache.hudi.io.IOType;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.MarkerFiles;
import org.apache.hudi.table.action.commit.WriteHelper;
import org.apache.hudi.testutils.HoodieClientTestBase;
import org.apache.hudi.testutils.HoodieClientTestUtils;
@@ -1063,11 +1065,10 @@ public class TestHoodieClientOnCopyOnWriteStorage extends HoodieClientTestBase {
assertFalse(metaClient.getFs().exists(new Path(metaClient.getMarkerFolderPath(instantTime))));
}
@Test
public void testRollbackAfterConsistencyCheckFailure() throws Exception {
private void testRollbackAfterConsistencyCheckFailureUsingFileList(boolean rollbackUsingMarkers) throws Exception {
String instantTime = "000";
HoodieTableMetaClient metaClient = new HoodieTableMetaClient(hadoopConf, basePath);
HoodieWriteConfig cfg = getConfigBuilder().withAutoCommit(false).build();
HoodieWriteConfig cfg = getConfigBuilder().withRollbackUsingMarkers(rollbackUsingMarkers).withAutoCommit(false).build();
HoodieWriteClient client = getHoodieWriteClient(cfg);
testConsistencyCheck(metaClient, instantTime);
@@ -1079,6 +1080,16 @@ public class TestHoodieClientOnCopyOnWriteStorage extends HoodieClientTestBase {
assertFalse(metaClient.getFs().exists(new Path(metaClient.getMarkerFolderPath(instantTime))));
}
@Test
public void testRollbackAfterConsistencyCheckFailureUsingFileList() throws Exception {
testRollbackAfterConsistencyCheckFailureUsingFileList(false);
}
@Test
public void testRollbackAfterConsistencyCheckFailureUsingMarkers() throws Exception {
testRollbackAfterConsistencyCheckFailureUsingFileList(true);
}
private Pair<Path, JavaRDD<WriteStatus>> testConsistencyCheck(HoodieTableMetaClient metaClient, String instantTime)
throws Exception {
HoodieWriteConfig cfg = getConfigBuilder().withAutoCommit(false)
@@ -1096,11 +1107,13 @@ public class TestHoodieClientOnCopyOnWriteStorage extends HoodieClientTestBase {
// This should fail the commit
String partitionPath = Arrays
.stream(fs.globStatus(new Path(String.format("%s/*/*/*/*", metaClient.getMarkerFolderPath(instantTime))),
path -> path.toString().endsWith(HoodieTableMetaClient.MARKER_EXTN)))
path -> path.toString().contains(HoodieTableMetaClient.MARKER_EXTN)))
.limit(1).map(status -> status.getPath().getParent().toString()).collect(Collectors.toList()).get(0);
Path markerFilePath = new Path(String.format("%s/%s", partitionPath,
FSUtils.makeMarkerFile(instantTime, "1-0-1", UUID.randomUUID().toString())));
metaClient.getFs().create(markerFilePath);
Path markerFilePath = new MarkerFiles(fs, basePath, metaClient.getMarkerFolderPath(instantTime), instantTime)
.create(partitionPath,
FSUtils.makeDataFileName(instantTime, "1-0-1", UUID.randomUUID().toString()),
IOType.MERGE);
LOG.info("Created a dummy marker path=" + markerFilePath);
Exception e = assertThrows(HoodieCommitException.class, () -> {

View File

@@ -51,7 +51,7 @@ import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;
public class TestHoodieCommitArchiveLog extends HoodieClientTestHarness {
public class TestHoodieTimelineArchiveLog extends HoodieClientTestHarness {
private Configuration hadoopConf;
private HoodieTableMetaClient metaClient;
@@ -78,8 +78,8 @@ public class TestHoodieCommitArchiveLog extends HoodieClientTestHarness {
HoodieWriteConfig.newBuilder().withPath(basePath).withSchema(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA)
.withParallelism(2, 2).forTable("test-trip-table").build();
metaClient = HoodieTableMetaClient.reload(metaClient);
HoodieTimelineArchiveLog archiveLog = new HoodieTimelineArchiveLog(cfg, metaClient);
boolean result = archiveLog.archiveIfRequired(hadoopConf);
HoodieTimelineArchiveLog archiveLog = new HoodieTimelineArchiveLog(cfg, hadoopConf);
boolean result = archiveLog.archiveIfRequired();
assertTrue(result);
}
@@ -156,9 +156,8 @@ public class TestHoodieCommitArchiveLog extends HoodieClientTestHarness {
verifyInflightInstants(metaClient, 2);
metaClient = HoodieTableMetaClient.reload(metaClient);
HoodieTimelineArchiveLog archiveLog = new HoodieTimelineArchiveLog(cfg, metaClient);
assertTrue(archiveLog.archiveIfRequired(hadoopConf));
HoodieTimelineArchiveLog archiveLog = new HoodieTimelineArchiveLog(cfg, hadoopConf);
assertTrue(archiveLog.archiveIfRequired());
// reload the timeline and remove the remaining commits
timeline = metaClient.getActiveTimeline().reload().getAllCommitsTimeline().filterCompletedInstants();
@@ -215,7 +214,7 @@ public class TestHoodieCommitArchiveLog extends HoodieClientTestHarness {
.withCompactionConfig(HoodieCompactionConfig.newBuilder().retainCommits(1).archiveCommitsWith(2, 5).build())
.build();
metaClient = HoodieTableMetaClient.reload(metaClient);
HoodieTimelineArchiveLog archiveLog = new HoodieTimelineArchiveLog(cfg, metaClient);
HoodieTimelineArchiveLog archiveLog = new HoodieTimelineArchiveLog(cfg, hadoopConf);
// Requested Compaction
HoodieTestDataGenerator.createCompactionAuxiliaryMetadata(basePath,
new HoodieInstant(State.REQUESTED, HoodieTimeline.COMPACTION_ACTION, "100"), dfs.getConf());
@@ -247,7 +246,7 @@ public class TestHoodieCommitArchiveLog extends HoodieClientTestHarness {
HoodieTimeline timeline = metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants();
assertEquals(4, timeline.countInstants(), "Loaded 4 commits and the count should match");
boolean result = archiveLog.archiveIfRequired(hadoopConf);
boolean result = archiveLog.archiveIfRequired();
assertTrue(result);
timeline = metaClient.getActiveTimeline().reload().getCommitsTimeline().filterCompletedInstants();
assertEquals(4, timeline.countInstants(), "Should not archive commits when maxCommitsToKeep is 5");
@@ -280,7 +279,7 @@ public class TestHoodieCommitArchiveLog extends HoodieClientTestHarness {
.withCompactionConfig(HoodieCompactionConfig.newBuilder().retainCommits(1).archiveCommitsWith(2, 5).build())
.build();
metaClient = HoodieTableMetaClient.reload(metaClient);
HoodieTimelineArchiveLog archiveLog = new HoodieTimelineArchiveLog(cfg, metaClient);
HoodieTimelineArchiveLog archiveLog = new HoodieTimelineArchiveLog(cfg, hadoopConf);
HoodieTestDataGenerator.createCommitFile(basePath, "100", dfs.getConf());
HoodieTestDataGenerator.createCommitFile(basePath, "101", dfs.getConf());
HoodieTestDataGenerator.createCommitFile(basePath, "102", dfs.getConf());
@@ -290,7 +289,7 @@ public class TestHoodieCommitArchiveLog extends HoodieClientTestHarness {
HoodieTimeline timeline = metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants();
assertEquals(6, timeline.countInstants(), "Loaded 6 commits and the count should match");
boolean result = archiveLog.archiveIfRequired(hadoopConf);
boolean result = archiveLog.archiveIfRequired();
assertTrue(result);
timeline = metaClient.getActiveTimeline().reload().getCommitsTimeline().filterCompletedInstants();
assertTrue(timeline.containsOrBeforeTimelineStarts("100"), "Archived commits should always be safe");
@@ -305,8 +304,6 @@ public class TestHoodieCommitArchiveLog extends HoodieClientTestHarness {
.withSchema(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA).withParallelism(2, 2).forTable("test-trip-table")
.withCompactionConfig(HoodieCompactionConfig.newBuilder().retainCommits(1).archiveCommitsWith(2, 5).build())
.build();
metaClient = HoodieTableMetaClient.reload(metaClient);
HoodieTimelineArchiveLog archiveLog = new HoodieTimelineArchiveLog(cfg, metaClient);
HoodieTestDataGenerator.createCommitFile(basePath, "100", dfs.getConf());
HoodieTestDataGenerator.createCommitFile(basePath, "101", dfs.getConf());
HoodieTestDataGenerator.createSavepointFile(basePath, "101", dfs.getConf());
@@ -314,11 +311,11 @@ public class TestHoodieCommitArchiveLog extends HoodieClientTestHarness {
HoodieTestDataGenerator.createCommitFile(basePath, "103", dfs.getConf());
HoodieTestDataGenerator.createCommitFile(basePath, "104", dfs.getConf());
HoodieTestDataGenerator.createCommitFile(basePath, "105", dfs.getConf());
HoodieTimelineArchiveLog archiveLog = new HoodieTimelineArchiveLog(cfg, hadoopConf);
HoodieTimeline timeline = metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants();
assertEquals(6, timeline.countInstants(), "Loaded 6 commits and the count should match");
boolean result = archiveLog.archiveIfRequired(hadoopConf);
assertTrue(result);
assertTrue(archiveLog.archiveIfRequired());
timeline = metaClient.getActiveTimeline().reload().getCommitsTimeline().filterCompletedInstants();
assertEquals(5, timeline.countInstants(),
"Since we have a savepoint at 101, we should never archive any commit after 101 (we only archive 100)");
@@ -336,8 +333,6 @@ public class TestHoodieCommitArchiveLog extends HoodieClientTestHarness {
.withSchema(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA).withParallelism(2, 2).forTable("test-trip-table")
.withCompactionConfig(HoodieCompactionConfig.newBuilder().retainCommits(1).archiveCommitsWith(2, 5).build())
.build();
metaClient = HoodieTableMetaClient.reload(metaClient);
HoodieTimelineArchiveLog archiveLog = new HoodieTimelineArchiveLog(cfg, metaClient);
HoodieTestDataGenerator.createCommitFile(basePath, "100", dfs.getConf());
HoodieTestDataGenerator.createCompactionRequestedFile(basePath, "101", dfs.getConf());
HoodieTestDataGenerator.createCompactionAuxiliaryMetadata(basePath,
@@ -350,10 +345,11 @@ public class TestHoodieCommitArchiveLog extends HoodieClientTestHarness {
HoodieTestDataGenerator.createCommitFile(basePath, "105", dfs.getConf());
HoodieTestDataGenerator.createCommitFile(basePath, "106", dfs.getConf());
HoodieTestDataGenerator.createCommitFile(basePath, "107", dfs.getConf());
HoodieTimelineArchiveLog archiveLog = new HoodieTimelineArchiveLog(cfg, dfs.getConf());
HoodieTimeline timeline = metaClient.getActiveTimeline().getCommitsAndCompactionTimeline();
assertEquals(8, timeline.countInstants(), "Loaded 6 commits and the count should match");
boolean result = archiveLog.archiveIfRequired(hadoopConf);
boolean result = archiveLog.archiveIfRequired();
assertTrue(result);
timeline = metaClient.getActiveTimeline().reload().getCommitsAndCompactionTimeline();
assertFalse(timeline.containsInstant(new HoodieInstant(false, HoodieTimeline.COMMIT_ACTION, "100")),
@@ -378,34 +374,35 @@ public class TestHoodieCommitArchiveLog extends HoodieClientTestHarness {
}
@Test
public void checkArchiveCommitTimeline() throws IOException {
public void testArchiveCommitTimeline() throws IOException {
HoodieWriteConfig cfg =
HoodieWriteConfig.newBuilder().withPath(basePath).withSchema(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA)
.withParallelism(2, 2).forTable("test-trip-table")
.withCompactionConfig(HoodieCompactionConfig.newBuilder().retainCommits(1).archiveCommitsWith(2, 3).build())
.build();
metaClient = HoodieTableMetaClient.reload(metaClient);
HoodieTimelineArchiveLog archiveLog = new HoodieTimelineArchiveLog(cfg, metaClient);
HoodieTestDataGenerator.createCommitFile(basePath, "1", dfs.getConf());
HoodieInstant instant1 = new HoodieInstant(false, HoodieTimeline.COMMIT_ACTION, "1");
HoodieTestDataGenerator.createCommitFile(basePath, "2", dfs.getConf());
Path markerPath = new Path(metaClient.getMarkerFolderPath("2"));
dfs.mkdirs(markerPath);
HoodieInstant instant2 = new HoodieInstant(false, HoodieTimeline.COMMIT_ACTION, "2");
HoodieTestDataGenerator.createCommitFile(basePath, "3", dfs.getConf());
HoodieInstant instant3 = new HoodieInstant(false, HoodieTimeline.COMMIT_ACTION, "3");
//add 2 more instants to pass filter criteria set in compaction config above
HoodieTestDataGenerator.createCommitFile(basePath, "4", dfs.getConf());
HoodieInstant instant4 = new HoodieInstant(false, HoodieTimeline.COMMIT_ACTION, "4");
HoodieTestDataGenerator.createCommitFile(basePath, "5", dfs.getConf());
HoodieInstant instant5 = new HoodieInstant(false, HoodieTimeline.COMMIT_ACTION, "5");
boolean result = archiveLog.archiveIfRequired(hadoopConf);
HoodieTimelineArchiveLog archiveLog = new HoodieTimelineArchiveLog(cfg, dfs.getConf());
boolean result = archiveLog.archiveIfRequired();
assertTrue(result);
HoodieArchivedTimeline archivedTimeline = metaClient.getArchivedTimeline();
List<HoodieInstant> archivedInstants = Arrays.asList(instant1, instant2, instant3);
assertEquals(new HashSet<>(archivedInstants), archivedTimeline.getInstants().collect(Collectors.toSet()));
assertFalse(dfs.exists(markerPath));
}
private void verifyInflightInstants(HoodieTableMetaClient metaClient, int expectedTotalInstants) {
@@ -425,7 +422,7 @@ public class TestHoodieCommitArchiveLog extends HoodieClientTestHarness {
.withCompactionConfig(HoodieCompactionConfig.newBuilder().retainCommits(1).archiveCommitsWith(2, 5).build())
.build();
metaClient = HoodieTableMetaClient.reload(metaClient);
HoodieTimelineArchiveLog archiveLog = new HoodieTimelineArchiveLog(cfg, metaClient);
HoodieTimelineArchiveLog archiveLog = new HoodieTimelineArchiveLog(cfg, hadoopConf);
org.apache.hudi.avro.model.HoodieCommitMetadata expectedCommitMetadata = archiveLog.convertCommitMetadata(hoodieCommitMetadata);
assertEquals(expectedCommitMetadata.getOperationType(), WriteOperationType.INSERT.toString());

View File

@@ -43,6 +43,7 @@ import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.table.timeline.TimelineMetadataUtils;
import org.apache.hudi.common.table.timeline.versioning.clean.CleanMetadataMigrator;
import org.apache.hudi.common.table.view.TableFileSystemView;
import org.apache.hudi.common.testutils.FileSystemTestUtils;
import org.apache.hudi.common.testutils.HoodieTestUtils;
import org.apache.hudi.common.util.CleanerUtils;
import org.apache.hudi.common.util.CollectionUtils;
@@ -54,10 +55,10 @@ import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.index.HoodieIndex;
import org.apache.hudi.testutils.HoodieClientTestBase;
import org.apache.hudi.testutils.HoodieClientTestUtils;
import org.apache.hudi.testutils.HoodieTestDataGenerator;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.spark.api.java.JavaRDD;
@@ -1128,7 +1129,7 @@ public class TestCleaner extends HoodieClientTestBase {
private List<String> createMarkerFiles(String instantTime, int numFiles) throws IOException {
List<String> files = new ArrayList<>();
for (int i = 0; i < numFiles; i++) {
files.add(HoodieTestUtils.createNewMarkerFile(basePath, "2019/03/29", instantTime));
files.add(HoodieClientTestUtils.createNewMarkerFile(basePath, "2019/03/29", instantTime));
}
return files;
}
@@ -1140,13 +1141,8 @@ public class TestCleaner extends HoodieClientTestBase {
* @throws IOException in case of error
*/
private int getTotalTempFiles() throws IOException {
RemoteIterator<?> itr = fs.listFiles(new Path(basePath, HoodieTableMetaClient.TEMPFOLDER_NAME), true);
int count = 0;
while (itr.hasNext()) {
count++;
itr.next();
}
return count;
return FileSystemTestUtils.listRecursive(fs, new Path(basePath, HoodieTableMetaClient.TEMPFOLDER_NAME))
.size();
}
private Stream<Pair<String, String>> convertPathToFileIdWithCommitTime(final HoodieTableMetaClient metaClient,

View File

@@ -387,15 +387,12 @@ public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness {
}
}
@ParameterizedTest
@MethodSource("argumentsProvider")
public void testCOWToMORConvertedTableRollback(HoodieFileFormat baseFileFormat) throws Exception {
private void testCOWToMORConvertedTableRollback(HoodieFileFormat baseFileFormat, Boolean rollbackUsingMarkers) throws Exception {
init(baseFileFormat);
// Set TableType to COW
HoodieTestUtils.init(hadoopConf, basePath, HoodieTableType.COPY_ON_WRITE, baseFileFormat);
HoodieWriteConfig cfg = getConfig(true);
HoodieWriteConfig cfg = getConfig(false, rollbackUsingMarkers);
try (HoodieWriteClient client = getHoodieWriteClient(cfg);) {
/**
@@ -410,6 +407,7 @@ public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness {
List<WriteStatus> statuses = client.upsert(writeRecords, newCommitTime).collect();
// verify there are no errors
assertNoWriteErrors(statuses);
client.commit(newCommitTime, jsc.parallelize(statuses));
metaClient = getHoodieMetaClient(hadoopConf, cfg.getBasePath());
Option<HoodieInstant> commit = metaClient.getActiveTimeline().getCommitTimeline().firstInstant();
@@ -446,10 +444,20 @@ public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness {
@ParameterizedTest
@MethodSource("argumentsProvider")
public void testRollbackWithDeltaAndCompactionCommit(HoodieFileFormat baseFileFormat) throws Exception {
public void testCOWToMORConvertedTableRollbackUsingFileList(HoodieFileFormat baseFileFormat) throws Exception {
testCOWToMORConvertedTableRollback(baseFileFormat, false);
}
@ParameterizedTest
@MethodSource("argumentsProvider")
public void testCOWToMORConvertedTableRollbackUsingMarkers(HoodieFileFormat baseFileFormat) throws Exception {
testCOWToMORConvertedTableRollback(baseFileFormat, true);
}
private void testRollbackWithDeltaAndCompactionCommit(HoodieFileFormat baseFileFormat, Boolean rollbackUsingMarkers) throws Exception {
init(baseFileFormat);
HoodieWriteConfig cfg = getConfig(false);
HoodieWriteConfig cfg = getConfig(false, rollbackUsingMarkers);
try (HoodieWriteClient client = getHoodieWriteClient(cfg);) {
// Test delta commit rollback
@@ -538,7 +546,6 @@ public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness {
writeRecords = jsc.parallelize(copyOfRecords, 1);
writeStatusJavaRDD = thirdClient.upsert(writeRecords, commitTime2);
thirdClient.commit(commitTime2, writeStatusJavaRDD);
statuses = writeStatusJavaRDD.collect();
// Verify there are no errors
assertNoWriteErrors(statuses);
@@ -574,20 +581,17 @@ public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness {
metaClient = HoodieTableMetaClient.reload(metaClient);
String compactionInstantTime = thirdClient.scheduleCompaction(Option.empty()).get().toString();
JavaRDD<WriteStatus> ws = thirdClient.compact(compactionInstantTime);
thirdClient.commitCompaction(compactionInstantTime, ws, Option.empty());
thirdClient.compact(compactionInstantTime);
allFiles = listAllDataFilesInPath(hoodieTable, cfg.getBasePath());
metaClient = HoodieTableMetaClient.reload(metaClient);
tableView = getHoodieTableFileSystemView(metaClient, metaClient.getCommitsTimeline(), allFiles);
final String compactedCommitTime =
metaClient.getActiveTimeline().reload().getCommitsTimeline().lastInstant().get().getTimestamp();
assertTrue(tableView.getLatestBaseFiles().anyMatch(file -> compactedCommitTime.equals(file.getCommitTime())));
thirdClient.rollback(compactedCommitTime);
final String compactedCommitTime = metaClient.getActiveTimeline().reload().lastInstant().get().getTimestamp();
assertTrue(Arrays.stream(listAllDataFilesInPath(hoodieTable, cfg.getBasePath()))
.anyMatch(file -> compactedCommitTime.equals(new HoodieBaseFile(file).getCommitTime())));
thirdClient.rollbackInflightCompaction(new HoodieInstant(State.INFLIGHT, HoodieTimeline.COMPACTION_ACTION, compactedCommitTime),
hoodieTable);
allFiles = listAllDataFilesInPath(hoodieTable, cfg.getBasePath());
metaClient = HoodieTableMetaClient.reload(metaClient);
tableView = getHoodieTableFileSystemView(metaClient, metaClient.getCommitsTimeline(), allFiles);
@@ -597,6 +601,18 @@ public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness {
}
}
@ParameterizedTest
@MethodSource("argumentsProvider")
public void testRollbackWithDeltaAndCompactionCommitUsingFileList(HoodieFileFormat baseFileFormat) throws Exception {
testRollbackWithDeltaAndCompactionCommit(baseFileFormat, false);
}
@ParameterizedTest
@MethodSource("argumentsProvider")
public void testRollbackWithDeltaAndCompactionCommitUsingMarkers(HoodieFileFormat baseFileFormat) throws Exception {
testRollbackWithDeltaAndCompactionCommit(baseFileFormat, true);
}
@ParameterizedTest
@MethodSource("argumentsProvider")
public void testMultiRollbackWithDeltaAndCompactionCommit(HoodieFileFormat baseFileFormat) throws Exception {
@@ -960,15 +976,14 @@ public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness {
}
}
@ParameterizedTest
@MethodSource("argumentsProvider")
public void testInsertsGeneratedIntoLogFilesRollback(HoodieFileFormat baseFileFormat) throws Exception {
private void testInsertsGeneratedIntoLogFilesRollback(HoodieFileFormat baseFileFormat,
Boolean rollbackUsingMarkers) throws Exception {
init(baseFileFormat);
// insert 100 records
// Setting IndexType to be InMemory to simulate Global Index nature
HoodieWriteConfig config = getConfigBuilder(false, IndexType.INMEMORY).build();
try (HoodieWriteClient writeClient = getHoodieWriteClient(config);) {
HoodieWriteConfig config = getConfigBuilder(false, rollbackUsingMarkers, IndexType.INMEMORY).build();
try (HoodieWriteClient writeClient = getHoodieWriteClient(config)) {
String newCommitTime = "100";
writeClient.startCommitWithTime(newCommitTime);
@@ -987,14 +1002,13 @@ public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness {
// rollback a failed commit
boolean rollback = writeClient.rollback(newCommitTime);
assertTrue(rollback);
newCommitTime = "101";
writeClient.startCommitWithTime(newCommitTime);
// insert 100 records
newCommitTime = "101";
writeClient.startCommitWithTime(newCommitTime);
records = dataGen.generateInserts(newCommitTime, 100);
recordsRDD = jsc.parallelize(records, 1);
statuses = writeClient.insert(recordsRDD, newCommitTime);
writeClient.commit(newCommitTime, statuses);
writeClient.insert(recordsRDD, newCommitTime).collect();
// Sleep for small interval (at least 1 second) to force a new rollback start time.
Thread.sleep(1000);
@@ -1003,19 +1017,27 @@ public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness {
// and calling rollback twice
final String lastCommitTime = newCommitTime;
metaClient = getHoodieMetaClient(hadoopConf, basePath);
HoodieInstant last = metaClient.getCommitsTimeline().getInstants()
.filter(instant -> instant.getTimestamp().equals(lastCommitTime)).findFirst().get();
String fileName = last.getFileName();
// Save the .commit file to local directory.
// Rollback will be called twice to test the case where rollback failed first time and retried.
// We got the "BaseCommitTime cannot be null" exception before the fix
File file = Files.createTempFile(tempFolder, null, null).toFile();
metaClient.getFs().copyToLocalFile(new Path(metaClient.getMetaPath(), fileName),
new Path(file.getAbsolutePath()));
writeClient.rollback(newCommitTime);
Map<String, String> fileNameMap = new HashMap<>();
for (State state : Arrays.asList(State.REQUESTED, State.INFLIGHT)) {
HoodieInstant toCopy = new HoodieInstant(state, HoodieTimeline.DELTA_COMMIT_ACTION, lastCommitTime);
File file = Files.createTempFile(tempFolder, null, null).toFile();
metaClient.getFs().copyToLocalFile(new Path(metaClient.getMetaPath(), toCopy.getFileName()),
new Path(file.getAbsolutePath()));
fileNameMap.put(file.getAbsolutePath(), toCopy.getFileName());
}
Path markerDir = new Path(Files.createTempDirectory(tempFolder,null).toAbsolutePath().toString());
if (rollbackUsingMarkers) {
metaClient.getFs().copyToLocalFile(new Path(metaClient.getMarkerFolderPath(lastCommitTime)),
markerDir);
}
writeClient.rollback(newCommitTime);
metaClient = HoodieTableMetaClient.reload(metaClient);
HoodieTable table = HoodieTable.create(metaClient, config, hadoopConf);
HoodieTable table = HoodieTable.create(config, hadoopConf);
SliceView tableRTFileSystemView = table.getSliceView();
long numLogFiles = 0;
@@ -1026,22 +1048,43 @@ public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness {
.filter(fileSlice -> fileSlice.getLogFiles().count() > 0).count();
}
assertEquals(0, numLogFiles);
metaClient.getFs().copyFromLocalFile(new Path(file.getAbsolutePath()),
new Path(metaClient.getMetaPath(), fileName));
fileNameMap.forEach((key, value) -> {
try {
metaClient.getFs().copyFromLocalFile(new Path(key),
new Path(metaClient.getMetaPath(), value));
} catch (IOException e) {
throw new HoodieIOException("Error copying state from local disk.", e);
}
});
if (rollbackUsingMarkers) {
metaClient.getFs().copyFromLocalFile(markerDir,
new Path(metaClient.getMarkerFolderPath(lastCommitTime)));
}
Thread.sleep(1000);
// Rollback again to pretend the first rollback failed partially. This should not error our
// Rollback again to pretend the first rollback failed partially. This should not error out
writeClient.rollback(newCommitTime);
}
}
@ParameterizedTest
@MethodSource("argumentsProvider")
public void testInsertsGeneratedIntoLogFilesRollbackAfterCompaction(HoodieFileFormat baseFileFormat) throws Exception {
public void testInsertsGeneratedIntoLogFilesRollbackUsingFileList(HoodieFileFormat baseFileFormat) throws Exception {
testInsertsGeneratedIntoLogFilesRollback(baseFileFormat, false);
}
@ParameterizedTest
@MethodSource("argumentsProvider")
public void testInsertsGeneratedIntoLogFilesRollbackUsingMarkers(HoodieFileFormat baseFileFormat) throws Exception {
testInsertsGeneratedIntoLogFilesRollback(baseFileFormat, true);
}
private void testInsertsGeneratedIntoLogFilesRollbackAfterCompaction(HoodieFileFormat baseFileFormat,
Boolean rollbackUsingMarkers) throws Exception {
init(baseFileFormat);
// insert 100 records
// Setting IndexType to be InMemory to simulate Global Index nature
HoodieWriteConfig config = getConfigBuilder(false, IndexType.INMEMORY).build();
HoodieWriteConfig config = getConfigBuilder(false, rollbackUsingMarkers, IndexType.INMEMORY).build();
try (HoodieWriteClient writeClient = getHoodieWriteClient(config);) {
String newCommitTime = "100";
writeClient.startCommitWithTime(newCommitTime);
@@ -1053,8 +1096,7 @@ public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness {
// trigger an action
statuses.collect();
HoodieTable table =
HoodieTable.create(getHoodieMetaClient(hadoopConf, basePath), config, hadoopConf);
HoodieTable table = HoodieTable.create(getHoodieMetaClient(hadoopConf, basePath), config, hadoopConf);
SliceView tableRTFileSystemView = table.getSliceView();
long numLogFiles = 0;
@@ -1072,30 +1114,43 @@ public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness {
// Ensure all log files have been compacted into parquet files
assertEquals(statuses.map(status -> status.getStat().getPath().contains("parquet")).count(), numLogFiles);
assertEquals(statuses.count(), numLogFiles);
writeClient.commitCompaction(newCommitTime, statuses, Option.empty());
//writeClient.commitCompaction(newCommitTime, statuses, Option.empty());
// Trigger a rollback of compaction
writeClient.rollback(newCommitTime);
table.getActiveTimeline().reload();
writeClient.rollbackInflightCompaction(new HoodieInstant(State.INFLIGHT, HoodieTimeline.COMPACTION_ACTION, newCommitTime), table);
table = HoodieTable.create(getHoodieMetaClient(hadoopConf, basePath), config, hadoopConf);
tableRTFileSystemView = table.getSliceView();
((SyncableFileSystemView) tableRTFileSystemView).reset();
Option<HoodieInstant> lastInstant = ((SyncableFileSystemView) tableRTFileSystemView).getLastInstant();
System.out.println("Last Instant =" + lastInstant);
for (String partitionPath : dataGen.getPartitionPaths()) {
assertTrue(tableRTFileSystemView.getLatestFileSlices(partitionPath).noneMatch(fileSlice -> fileSlice.getBaseFile().isPresent()));
assertTrue(tableRTFileSystemView.getLatestFileSlices(partitionPath).anyMatch(fileSlice -> fileSlice.getLogFiles().count() > 0));
List<FileSlice> fileSlices = getFileSystemViewWithUnCommittedSlices(getHoodieMetaClient(hadoopConf, basePath))
.getAllFileSlices(partitionPath).filter(fs -> fs.getBaseInstantTime().equals("100")).collect(Collectors.toList());
assertTrue(fileSlices.stream().noneMatch(fileSlice -> fileSlice.getBaseFile().isPresent()));
assertTrue(fileSlices.stream().anyMatch(fileSlice -> fileSlice.getLogFiles().count() > 0));
}
}
}
@ParameterizedTest
@MethodSource("argumentsProvider")
public void testInsertsGeneratedIntoLogFilesRollbackAfterCompactionUsingFileList(HoodieFileFormat baseFileFormat) throws Exception {
testInsertsGeneratedIntoLogFilesRollbackAfterCompaction(baseFileFormat, false);
}
@ParameterizedTest
@MethodSource("argumentsProvider")
public void testInsertsGeneratedIntoLogFilesRollbackAfterCompactionUsingMarkers(HoodieFileFormat baseFileFormat) throws Exception {
testInsertsGeneratedIntoLogFilesRollbackAfterCompaction(baseFileFormat, true);
}
/**
* Test to ensure metadata stats are correctly written to metadata file.
*/
@ParameterizedTest
@MethodSource("argumentsProvider")
public void testMetadataStatsOnCommit(HoodieFileFormat baseFileFormat) throws Exception {
public void testMetadataStatsOnCommit(HoodieFileFormat baseFileFormat, Boolean rollbackUsingMarkers) throws Exception {
init(baseFileFormat);
HoodieWriteConfig cfg = getConfigBuilder(false, IndexType.INMEMORY).withAutoCommit(false).build();
HoodieWriteConfig cfg = getConfigBuilder(false, rollbackUsingMarkers, IndexType.INMEMORY)
.withAutoCommit(false).build();
try (HoodieWriteClient client = getHoodieWriteClient(cfg);) {
metaClient = getHoodieMetaClient(hadoopConf, basePath);
HoodieTable table = HoodieTable.create(metaClient, cfg, hadoopConf);
@@ -1136,24 +1191,16 @@ public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness {
records = dataGen.generateUpdates(instantTime, records);
writeRecords = jsc.parallelize(records, 1);
statuses = client.upsert(writeRecords, instantTime);
assertTrue(client.commit(instantTime, statuses), "Commit should succeed");
// Read from commit file
table = HoodieTable.create(cfg, hadoopConf);
metadata = HoodieCommitMetadata.fromBytes(
table.getActiveTimeline()
.getInstantDetails(table.getActiveTimeline().getDeltaCommitTimeline().lastInstant().get()).get(),
HoodieCommitMetadata.class);
//assertTrue(client.commit(instantTime, statuses), "Commit should succeed");
inserts = 0;
int upserts = 0;
for (Map.Entry<String, List<HoodieWriteStat>> pstat : metadata.getPartitionToWriteStats().entrySet()) {
for (HoodieWriteStat stat : pstat.getValue()) {
inserts += stat.getNumInserts();
upserts += stat.getNumUpdateWrites();
}
List<WriteStatus> writeStatusList = statuses.collect();
for (WriteStatus ws: writeStatusList) {
inserts += ws.getStat().getNumInserts();
upserts += ws.getStat().getNumUpdateWrites();
}
// Read from commit file
assertEquals(0, inserts);
assertEquals(200, upserts);
@@ -1179,7 +1226,22 @@ public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness {
}
/**
* Test to ensure metadata stats are correctly written to the metadata file, identifies small files and corrects them.
* Test to ensure rolling stats are correctly written to metadata file.
*/
@ParameterizedTest
@MethodSource("argumentsProvider")
public void testMetadataStatsOnCommitUsingFileList(HoodieFileFormat baseFileFormat) throws Exception {
testMetadataStatsOnCommit(baseFileFormat, false);
}
@ParameterizedTest
@MethodSource("argumentsProvider")
public void testMetadataStatsOnCommitUsingMarkers(HoodieFileFormat baseFileFormat) throws Exception {
testMetadataStatsOnCommit(baseFileFormat, true);
}
/**
* Test to ensure rolling stats are correctly written to the metadata file, identifies small files and corrects them.
*/
@ParameterizedTest
@MethodSource("argumentsProvider")
@@ -1385,11 +1447,19 @@ public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness {
return getConfigBuilder(autoCommit).build();
}
private HoodieWriteConfig getConfig(Boolean autoCommit, Boolean rollbackUsingMarkers) {
return getConfigBuilder(autoCommit, rollbackUsingMarkers, IndexType.BLOOM).build();
}
protected HoodieWriteConfig.Builder getConfigBuilder(Boolean autoCommit) {
return getConfigBuilder(autoCommit, IndexType.BLOOM);
}
protected HoodieWriteConfig.Builder getConfigBuilder(Boolean autoCommit, HoodieIndex.IndexType indexType) {
return getConfigBuilder(autoCommit, false, indexType);
}
protected HoodieWriteConfig.Builder getConfigBuilder(Boolean autoCommit, Boolean rollbackUsingMarkers, HoodieIndex.IndexType indexType) {
return HoodieWriteConfig.newBuilder().withPath(basePath).withSchema(TRIP_EXAMPLE_SCHEMA).withParallelism(2, 2)
.withAutoCommit(autoCommit).withAssumeDatePartitioning(true)
.withCompactionConfig(HoodieCompactionConfig.newBuilder().compactionSmallFileSize(1024 * 1024 * 1024)
@@ -1398,7 +1468,8 @@ public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness {
.withEmbeddedTimelineServerEnabled(true).forTable("test-trip-table")
.withFileSystemViewConfig(new FileSystemViewStorageConfig.Builder()
.withEnableBackupForRemoteFileSystemView(false).build())
.withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(indexType).build());
.withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(indexType).build())
.withRollbackUsingMarkers(rollbackUsingMarkers);
}
private FileStatus[] insertAndGetFilePaths(List<HoodieRecord> records, HoodieWriteClient client,

View File

@@ -0,0 +1,148 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.table;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.testutils.FileSystemTestUtils;
import org.apache.hudi.common.testutils.HoodieCommonTestHarness;
import org.apache.hudi.common.util.CollectionUtils;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.io.IOType;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import java.io.IOException;
import java.util.List;
import java.util.stream.Collectors;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertIterableEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
public class TestMarkerFiles extends HoodieCommonTestHarness {
private MarkerFiles markerFiles;
private FileSystem fs;
private Path markerFolderPath;
@BeforeEach
public void setup() throws IOException {
initPath();
initMetaClient();
this.fs = FSUtils.getFs(metaClient.getBasePath(), metaClient.getHadoopConf());
this.markerFolderPath = new Path(metaClient.getMarkerFolderPath("000"));
this.markerFiles = new MarkerFiles(fs, metaClient.getBasePath(), markerFolderPath.toString(), "000");
}
private void createSomeMarkerFiles() {
markerFiles.create("2020/06/01", "file1", IOType.MERGE);
markerFiles.create("2020/06/02", "file2", IOType.APPEND);
markerFiles.create("2020/06/03", "file3", IOType.CREATE);
}
private void createInvalidFile(String partitionPath, String invalidFileName) {
Path path = FSUtils.getPartitionPath(markerFolderPath.toString(), partitionPath);
Path invalidFilePath = new Path(path, invalidFileName);
try {
fs.create(invalidFilePath, false).close();
} catch (IOException e) {
throw new HoodieException("Failed to create invalid file " + invalidFilePath, e);
}
}
@Test
public void testCreation() throws Exception {
// when
createSomeMarkerFiles();
// then
assertTrue(fs.exists(markerFolderPath));
List<FileStatus> markerFiles = FileSystemTestUtils.listRecursive(fs, markerFolderPath)
.stream().filter(status -> status.getPath().getName().contains(".marker"))
.sorted().collect(Collectors.toList());
assertEquals(3, markerFiles.size());
assertIterableEquals(CollectionUtils.createImmutableList(
"file:" + markerFolderPath.toString() + "/2020/06/01/file1.marker.MERGE",
"file:" + markerFolderPath.toString() + "/2020/06/02/file2.marker.APPEND",
"file:" + markerFolderPath.toString() + "/2020/06/03/file3.marker.CREATE"),
markerFiles.stream().map(m -> m.getPath().toString()).collect(Collectors.toList())
);
}
@Test
public void testDeletionWhenMarkerDirExists() throws IOException {
//when
markerFiles.create("2020/06/01", "file1", IOType.MERGE);
// then
assertTrue(markerFiles.doesMarkerDirExist());
assertTrue(markerFiles.deleteMarkerDir());
assertFalse(markerFiles.doesMarkerDirExist());
}
@Test
public void testDeletionWhenMarkerDirNotExists() throws IOException {
// then
assertFalse(markerFiles.doesMarkerDirExist());
assertFalse(markerFiles.deleteMarkerDir());
}
@Test
public void testDataPathsWhenCreatingOrMerging() throws IOException {
// add markfiles
createSomeMarkerFiles();
// add invalid file
createInvalidFile("2020/06/01", "invalid_file3");
int fileSize = FileSystemTestUtils.listRecursive(fs, markerFolderPath).size();
assertEquals(fileSize,4);
// then
assertIterableEquals(CollectionUtils.createImmutableList(
"2020/06/01/file1", "2020/06/03/file3"),
markerFiles.createdAndMergedDataPaths().stream().sorted().collect(Collectors.toList())
);
}
@Test
public void testAllMarkerPaths() throws IOException {
// given
createSomeMarkerFiles();
// then
assertIterableEquals(CollectionUtils.createImmutableList("2020/06/01/file1.marker.MERGE",
"2020/06/02/file2.marker.APPEND", "2020/06/03/file3.marker.CREATE"),
markerFiles.allMarkerFilePaths().stream().sorted().collect(Collectors.toList())
);
}
@Test
public void testStripMarkerSuffix() {
// Given
final String pathPrefix = "file://" + metaClient.getMetaPath() + "/file";
final String markerFilePath = pathPrefix + ".marker.APPEND";
// when-then
assertEquals(pathPrefix, MarkerFiles.stripMarkerSuffix(markerFilePath));
}
}

View File

@@ -68,7 +68,7 @@ public class TestUpsertPartitioner extends HoodieClientTestBase {
.insertSplitSize(100).autoTuneInsertSplits(autoSplitInserts).build())
.withStorageConfig(HoodieStorageConfig.newBuilder().limitFileSize(1000 * 1024).build()).build();
HoodieClientTestUtils.fakeCommitFile(basePath, "001");
HoodieClientTestUtils.fakeCommit(basePath, "001");
HoodieClientTestUtils.fakeDataFile(basePath, testPartitionPath, "001", "file1", fileSize);
metaClient = HoodieTableMetaClient.reload(metaClient);
HoodieCopyOnWriteTable table = (HoodieCopyOnWriteTable) HoodieTable.create(metaClient, config, hadoopConf);

View File

@@ -0,0 +1,98 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.table.action.rollback;
import org.apache.hudi.client.HoodieWriteClient;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.common.model.FileSlice;
import org.apache.hudi.common.model.HoodieFileGroup;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.table.view.SyncableFileSystemView;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.testutils.Assertions;
import org.apache.hudi.testutils.HoodieClientTestBase;
import org.apache.hudi.testutils.HoodieTestDataGenerator;
import org.apache.spark.api.java.JavaRDD;
import java.io.IOException;
import java.util.List;
import java.util.stream.Collectors;
import static org.apache.hudi.testutils.HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH;
import static org.apache.hudi.testutils.HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH;
import static org.junit.jupiter.api.Assertions.assertEquals;
public class HoodieClientRollbackTestBase extends HoodieClientTestBase {
protected void twoUpsertCommitDataWithTwoPartitions(List<FileSlice> firstPartitionCommit2FileSlices,
List<FileSlice> secondPartitionCommit2FileSlices,
HoodieWriteConfig cfg,
boolean commitSecondUpsert) throws IOException {
//just generate two partitions
dataGen = new HoodieTestDataGenerator(new String[]{DEFAULT_FIRST_PARTITION_PATH, DEFAULT_SECOND_PARTITION_PATH});
//1. prepare data
HoodieTestDataGenerator.writePartitionMetadata(fs, new String[]{DEFAULT_FIRST_PARTITION_PATH, DEFAULT_SECOND_PARTITION_PATH}, basePath);
HoodieWriteClient client = getHoodieWriteClient(cfg);
/**
* Write 1 (only inserts)
*/
String newCommitTime = "001";
client.startCommitWithTime(newCommitTime);
List<HoodieRecord> records = dataGen.generateInsertsContainsAllPartitions(newCommitTime, 2);
JavaRDD<HoodieRecord> writeRecords = jsc.parallelize(records, 1);
JavaRDD<WriteStatus> statuses = client.upsert(writeRecords, newCommitTime);
Assertions.assertNoWriteErrors(statuses.collect());
client.commit(newCommitTime, statuses);
/**
* Write 2 (updates)
*/
newCommitTime = "002";
client.startCommitWithTime(newCommitTime);
records = dataGen.generateUpdates(newCommitTime, records);
statuses = client.upsert(jsc.parallelize(records, 1), newCommitTime);
Assertions.assertNoWriteErrors(statuses.collect());
if (commitSecondUpsert) {
client.commit(newCommitTime, statuses);
}
//2. assert filegroup and get the first partition fileslice
HoodieTable table = this.getHoodieTable(metaClient, cfg);
SyncableFileSystemView fsView = getFileSystemViewWithUnCommittedSlices(table.getMetaClient());
List<HoodieFileGroup> firstPartitionCommit2FileGroups = fsView.getAllFileGroups(DEFAULT_FIRST_PARTITION_PATH).collect(Collectors.toList());
assertEquals(1, firstPartitionCommit2FileGroups.size());
firstPartitionCommit2FileSlices.addAll(firstPartitionCommit2FileGroups.get(0).getAllFileSlices().collect(Collectors.toList()));
//3. assert filegroup and get the second partition fileslice
List<HoodieFileGroup> secondPartitionCommit2FileGroups = fsView.getAllFileGroups(DEFAULT_SECOND_PARTITION_PATH).collect(Collectors.toList());
assertEquals(1, secondPartitionCommit2FileGroups.size());
secondPartitionCommit2FileSlices.addAll(secondPartitionCommit2FileGroups.get(0).getAllFileSlices().collect(Collectors.toList()));
//4. assert fileslice
HoodieTableType tableType = this.getTableType();
if (tableType.equals(HoodieTableType.COPY_ON_WRITE)) {
assertEquals(2, firstPartitionCommit2FileSlices.size());
assertEquals(2, secondPartitionCommit2FileSlices.size());
} else {
assertEquals(1, firstPartitionCommit2FileSlices.size());
assertEquals(1, secondPartitionCommit2FileSlices.size());
}
}
}

View File

@@ -0,0 +1,198 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.table.action.rollback;
import org.apache.hudi.avro.model.HoodieRollbackPartitionMetadata;
import org.apache.hudi.common.model.FileSlice;
import org.apache.hudi.common.model.HoodieFileGroup;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.HoodieRollbackStat;
import org.apache.hudi.common.testutils.HoodieTestUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.MarkerFiles;
import org.apache.hudi.testutils.HoodieTestDataGenerator;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import static org.apache.hudi.testutils.HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH;
import static org.apache.hudi.testutils.HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;
public class TestCopyOnWriteRollbackActionExecutor extends HoodieClientRollbackTestBase {
@BeforeEach
public void setUp() throws Exception {
initPath();
initSparkContexts();
initFileSystem();
initMetaClient();
}
@AfterEach
public void tearDown() throws Exception {
cleanupResources();
}
@Test
public void testCopyOnWriteRollbackActionExecutorForFileListingAsGenerateFile() throws IOException {
// Let's create some commit files and parquet files
String commitTime1 = "001";
String commitTime2 = "002";
new File(basePath + "/.hoodie").mkdirs();
HoodieTestDataGenerator.writePartitionMetadata(fs, new String[]{"2015/03/16", "2015/03/17", "2016/03/15"},
basePath);
HoodieTestUtils.createCommitFiles(basePath, commitTime1, commitTime2);
// Make commit1
String file11 = HoodieTestUtils.createDataFile(basePath, "2015/03/16", commitTime1, "id11");
HoodieTestUtils.createNewLogFile(fs, basePath, "2015/03/16",
commitTime1, "id11", Option.of(3));
String file12 = HoodieTestUtils.createDataFile(basePath, "2015/03/17", commitTime1, "id12");
// Make commit2
String file21 = HoodieTestUtils.createDataFile(basePath, "2015/03/16", commitTime2, "id21");
String file22 = HoodieTestUtils.createDataFile(basePath, "2015/03/17", commitTime2, "id22");
HoodieTable table = this.getHoodieTable(metaClient, getConfig());
HoodieInstant needRollBackInstant = new HoodieInstant(false, HoodieTimeline.COMMIT_ACTION, "002");
// execute CopyOnWriteRollbackActionExecutor with filelisting mode
CopyOnWriteRollbackActionExecutor copyOnWriteRollbackActionExecutor = new CopyOnWriteRollbackActionExecutor(jsc, table.getConfig(), table, "003", needRollBackInstant, true);
assertFalse(copyOnWriteRollbackActionExecutor.getRollbackStrategy() instanceof MarkerBasedRollbackStrategy);
List<HoodieRollbackStat> hoodieRollbackStats = copyOnWriteRollbackActionExecutor.executeRollback();
// assert hoodieRollbackStats
assertEquals(hoodieRollbackStats.size(), 3);
hoodieRollbackStats.forEach(stat -> {
if (stat.getPartitionPath().equals("2015/03/16")) {
assertEquals(1, stat.getSuccessDeleteFiles().size());
assertEquals(0, stat.getFailedDeleteFiles().size());
assertEquals(Collections.EMPTY_MAP, stat.getCommandBlocksCount());
assertEquals("file:" + HoodieTestUtils.getDataFilePath(basePath, "2015/03/16", commitTime2, file21),
stat.getSuccessDeleteFiles().get(0));
} else if (stat.getPartitionPath().equals("2015/03/17")) {
assertEquals(1, stat.getSuccessDeleteFiles().size());
assertEquals(0, stat.getFailedDeleteFiles().size());
assertEquals(Collections.EMPTY_MAP, stat.getCommandBlocksCount());
assertEquals("file:" + HoodieTestUtils.getDataFilePath(basePath, "2015/03/17", commitTime2, file22),
stat.getSuccessDeleteFiles().get(0));
} else if (stat.getPartitionPath().equals("2016/03/15")) {
assertEquals(0, stat.getSuccessDeleteFiles().size());
assertEquals(0, stat.getFailedDeleteFiles().size());
assertEquals(Collections.EMPTY_MAP, stat.getCommandBlocksCount());
}
});
assertTrue(HoodieTestUtils.doesCommitExist(basePath, "001"));
assertTrue(HoodieTestUtils.doesInflightExist(basePath, "001"));
assertFalse(HoodieTestUtils.doesCommitExist(basePath, "002"));
assertFalse(HoodieTestUtils.doesInflightExist(basePath, "002"));
assertTrue(HoodieTestUtils.doesDataFileExist(basePath, "2015/03/16", commitTime1, file11)
&& HoodieTestUtils.doesDataFileExist(basePath, "2015/03/17", commitTime1, file12));
assertFalse(HoodieTestUtils.doesDataFileExist(basePath, "2015/03/16", commitTime2, file21)
|| HoodieTestUtils.doesDataFileExist(basePath, "2015/03/17", commitTime2, file22));
}
@ParameterizedTest
@ValueSource(booleans = {true, false})
public void testCopyOnWriteRollbackActionExecutor(boolean isUsingMarkers) throws IOException {
//1. prepare data and assert data result
List<FileSlice> firstPartitionCommit2FileSlices = new ArrayList<>();
List<FileSlice> secondPartitionCommit2FileSlices = new ArrayList<>();
HoodieWriteConfig cfg = getConfigBuilder().withRollbackUsingMarkers(isUsingMarkers).withAutoCommit(false).build();
this.twoUpsertCommitDataWithTwoPartitions(firstPartitionCommit2FileSlices, secondPartitionCommit2FileSlices, cfg, !isUsingMarkers);
HoodieTable<?> table = this.getHoodieTable(metaClient, cfg);
//2. rollback
HoodieInstant commitInstant;
if (isUsingMarkers) {
commitInstant = table.getActiveTimeline().getCommitTimeline().filterInflights().lastInstant().get();
} else {
commitInstant = table.getCompletedCommitTimeline().lastInstant().get();
}
CopyOnWriteRollbackActionExecutor copyOnWriteRollbackActionExecutor = new CopyOnWriteRollbackActionExecutor(jsc, cfg, table, "003", commitInstant, false);
if (!isUsingMarkers) {
assertFalse(copyOnWriteRollbackActionExecutor.getRollbackStrategy() instanceof MarkerBasedRollbackStrategy);
} else {
assertTrue(copyOnWriteRollbackActionExecutor.getRollbackStrategy() instanceof MarkerBasedRollbackStrategy);
}
Map<String, HoodieRollbackPartitionMetadata> rollbackMetadata = copyOnWriteRollbackActionExecutor.execute().getPartitionMetadata();
//3. assert the rollback stat
assertEquals(2, rollbackMetadata.size());
for (Map.Entry<String, HoodieRollbackPartitionMetadata> entry : rollbackMetadata.entrySet()) {
HoodieRollbackPartitionMetadata meta = entry.getValue();
assertTrue(meta.getFailedDeleteFiles() == null
|| meta.getFailedDeleteFiles().size() == 0);
assertTrue(meta.getSuccessDeleteFiles() == null
|| meta.getSuccessDeleteFiles().size() == 1);
}
//4. assert filegroup after rollback, and compare to the rollbackstat
// assert the first partition file group and file slice
List<HoodieFileGroup> firstPartitionRollBack1FileGroups = table.getFileSystemView().getAllFileGroups(DEFAULT_FIRST_PARTITION_PATH).collect(Collectors.toList());
assertEquals(1, firstPartitionRollBack1FileGroups.size());
List<FileSlice> firstPartitionRollBack1FileSlices = firstPartitionRollBack1FileGroups.get(0).getAllFileSlices().collect(Collectors.toList());
assertEquals(1, firstPartitionRollBack1FileSlices.size());
if (!isUsingMarkers) {
firstPartitionCommit2FileSlices.removeAll(firstPartitionRollBack1FileSlices);
assertEquals(1, firstPartitionCommit2FileSlices.size());
assertEquals(firstPartitionCommit2FileSlices.get(0).getBaseFile().get().getPath(),
rollbackMetadata.get(DEFAULT_FIRST_PARTITION_PATH).getSuccessDeleteFiles().get(0));
} else {
assertEquals(firstPartitionCommit2FileSlices.get(0).getBaseFile().get().getPath(),
String.format("%s:%s/%s", this.fs.getScheme(), basePath, rollbackMetadata.get(DEFAULT_FIRST_PARTITION_PATH).getSuccessDeleteFiles().get(0)));
}
// assert the second partition file group and file slice
List<HoodieFileGroup> secondPartitionRollBack1FileGroups = table.getFileSystemView().getAllFileGroups(DEFAULT_SECOND_PARTITION_PATH).collect(Collectors.toList());
assertEquals(1, secondPartitionRollBack1FileGroups.size());
List<FileSlice> secondPartitionRollBack1FileSlices = secondPartitionRollBack1FileGroups.get(0).getAllFileSlices().collect(Collectors.toList());
assertEquals(1, secondPartitionRollBack1FileSlices.size());
// assert the second partition rollback file is equals rollBack1SecondPartitionStat
if (!isUsingMarkers) {
secondPartitionCommit2FileSlices.removeAll(secondPartitionRollBack1FileSlices);
assertEquals(1, secondPartitionCommit2FileSlices.size());
assertEquals(secondPartitionCommit2FileSlices.get(0).getBaseFile().get().getPath(),
rollbackMetadata.get(DEFAULT_SECOND_PARTITION_PATH).getSuccessDeleteFiles().get(0));
} else {
assertEquals(secondPartitionCommit2FileSlices.get(0).getBaseFile().get().getPath(),
String.format("%s:%s/%s", this.fs.getScheme(), basePath, rollbackMetadata.get(DEFAULT_SECOND_PARTITION_PATH).getSuccessDeleteFiles().get(0)));
}
assertFalse(new MarkerFiles(table, commitInstant.getTimestamp()).doesMarkerDirExist());
}
}

View File

@@ -0,0 +1,135 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.table.action.rollback;
import org.apache.hudi.common.HoodieRollbackStat;
import org.apache.hudi.common.model.HoodieFileFormat;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.testutils.FileSystemTestUtils;
import org.apache.hudi.io.IOType;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.testutils.HoodieClientTestBase;
import org.apache.hudi.testutils.HoodieClientTestUtils;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import java.util.List;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
public class TestMarkerBasedRollbackStrategy extends HoodieClientTestBase {
@BeforeEach
public void setUp() throws Exception {
initPath();
initSparkContexts();
initFileSystem();
initMetaClient();
initDFS();
}
@AfterEach
public void tearDown() throws Exception {
cleanupResources();
}
private void givenCommit0(boolean isDeltaCommit) throws Exception {
HoodieClientTestUtils.fakeDataFile(basePath, "partA", "000", "f2");
if (isDeltaCommit) {
HoodieClientTestUtils.fakeDeltaCommit(basePath, "000");
} else {
HoodieClientTestUtils.fakeCommit(basePath, "000");
}
}
private void givenInflightCommit1(boolean isDeltaCommit) throws Exception {
HoodieClientTestUtils.fakeDataFile(basePath, "partB", "001", "f1");
HoodieClientTestUtils.createMarkerFile(basePath, "partB", "001", "f1", IOType.CREATE);
HoodieClientTestUtils.createMarkerFile(basePath, "partA", "001", "f3", IOType.CREATE);
if (isDeltaCommit) {
HoodieClientTestUtils.fakeLogFile(basePath, "partA", "001", "f2", 0);
HoodieClientTestUtils.createMarkerFile(basePath, "partA", "001", "f2", IOType.APPEND);
HoodieClientTestUtils.createMarkerFile(basePath, "partB", "001", "f4", IOType.APPEND);
HoodieClientTestUtils.fakeInflightDeltaCommit(basePath, "001");
} else {
HoodieClientTestUtils.fakeDataFile(basePath, "partA", "001", "f2");
HoodieClientTestUtils.createMarkerFile(basePath, "partA", "001", "f2", IOType.MERGE);
HoodieClientTestUtils.fakeInFlightCommit(basePath, "001");
}
}
@Test
public void testCopyOnWriteRollback() throws Exception {
// given: wrote some base files and corresponding markers
givenCommit0(false);
givenInflightCommit1(false);
// when
List<HoodieRollbackStat> stats = new MarkerBasedRollbackStrategy(HoodieTable.create(metaClient, getConfig(), hadoopConf), jsc, getConfig(), "002")
.execute(new HoodieInstant(HoodieInstant.State.INFLIGHT, HoodieTimeline.COMMIT_ACTION, "001"));
// then: ensure files are deleted correctly, non-existent files reported as failed deletes
assertEquals(2, stats.size());
List<FileStatus> partAFiles = FileSystemTestUtils.listRecursive(fs, new Path(basePath + "/partA"));
List<FileStatus> partBFiles = FileSystemTestUtils.listRecursive(fs, new Path(basePath + "/partB"));
assertEquals(0, partBFiles.size());
assertEquals(1, partAFiles.size());
assertEquals(2, stats.stream().mapToInt(r -> r.getSuccessDeleteFiles().size()).sum());
assertEquals(1, stats.stream().mapToInt(r -> r.getFailedDeleteFiles().size()).sum());
}
@Test
public void testMergeOnReadRollback() throws Exception {
// given: wrote some base + log files and corresponding markers
givenCommit0(true);
givenInflightCommit1(true);
// when
List<HoodieRollbackStat> stats = new MarkerBasedRollbackStrategy(HoodieTable.create(metaClient, getConfig(), hadoopConf), jsc, getConfig(), "002")
.execute(new HoodieInstant(HoodieInstant.State.INFLIGHT, HoodieTimeline.DELTA_COMMIT_ACTION, "001"));
// then: ensure files are deleted, rollback block is appended (even if append does not exist)
assertEquals(2, stats.size());
// will have the log file
List<FileStatus> partBFiles = FileSystemTestUtils.listRecursive(fs, new Path(basePath + "/partB"));
assertEquals(1, partBFiles.size());
assertTrue(partBFiles.get(0).getPath().getName().contains(HoodieFileFormat.HOODIE_LOG.getFileExtension()));
assertTrue(partBFiles.get(0).getLen() > 0);
List<FileStatus> partAFiles = FileSystemTestUtils.listRecursive(fs, new Path(basePath + "/partA"));
assertEquals(3, partAFiles.size());
assertEquals(2, partAFiles.stream().filter(s -> s.getPath().getName().contains(HoodieFileFormat.HOODIE_LOG.getFileExtension())).count());
assertEquals(1, partAFiles.stream().filter(s -> s.getPath().getName().contains(HoodieFileFormat.HOODIE_LOG.getFileExtension())).filter(f -> f.getLen() > 0).count());
// only partB/f1_001 will be deleted
assertEquals(1, stats.stream().mapToInt(r -> r.getSuccessDeleteFiles().size()).sum());
// partA/f3_001 is non existent
assertEquals(1, stats.stream().mapToInt(r -> r.getFailedDeleteFiles().size()).sum());
}
}

View File

@@ -0,0 +1,157 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.table.action.rollback;
import org.apache.hudi.avro.model.HoodieRollbackPartitionMetadata;
import org.apache.hudi.common.model.FileSlice;
import org.apache.hudi.common.model.HoodieFileGroup;
import org.apache.hudi.common.model.HoodieLogFile;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.MarkerFiles;
import org.apache.hudi.testutils.HoodieTestDataGenerator;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import static org.apache.hudi.testutils.HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH;
import static org.apache.hudi.testutils.HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.assertFalse;
public class TestMergeOnReadRollbackActionExecutor extends HoodieClientRollbackTestBase {
@Override
protected HoodieTableType getTableType() {
return HoodieTableType.MERGE_ON_READ;
}
@BeforeEach
public void setUp() throws Exception {
initPath();
initSparkContexts();
//just generate tow partitions
dataGen = new HoodieTestDataGenerator(new String[]{DEFAULT_FIRST_PARTITION_PATH, DEFAULT_SECOND_PARTITION_PATH});
initFileSystem();
initMetaClient();
}
@AfterEach
public void tearDown() throws Exception {
cleanupResources();
}
@ParameterizedTest
@ValueSource(booleans = {true, false})
public void testMergeOnReadRollbackActionExecutor(boolean isUsingMarkers) throws IOException {
//1. prepare data and assert data result
List<FileSlice> firstPartitionCommit2FileSlices = new ArrayList<>();
List<FileSlice> secondPartitionCommit2FileSlices = new ArrayList<>();
HoodieWriteConfig cfg = getConfigBuilder().withRollbackUsingMarkers(isUsingMarkers).withAutoCommit(false).build();
twoUpsertCommitDataWithTwoPartitions(firstPartitionCommit2FileSlices, secondPartitionCommit2FileSlices, cfg, !isUsingMarkers);
List<HoodieLogFile> firstPartitionCommit2LogFiles = new ArrayList<>();
List<HoodieLogFile> secondPartitionCommit2LogFiles = new ArrayList<>();
firstPartitionCommit2FileSlices.get(0).getLogFiles().collect(Collectors.toList()).forEach(logFile -> firstPartitionCommit2LogFiles.add(logFile));
assertEquals(1, firstPartitionCommit2LogFiles.size());
secondPartitionCommit2FileSlices.get(0).getLogFiles().collect(Collectors.toList()).forEach(logFile -> secondPartitionCommit2LogFiles.add(logFile));
assertEquals(1, secondPartitionCommit2LogFiles.size());
HoodieTable table = this.getHoodieTable(metaClient, cfg);
//2. rollback
HoodieInstant rollBackInstant = new HoodieInstant(isUsingMarkers, HoodieTimeline.DELTA_COMMIT_ACTION, "002");
MergeOnReadRollbackActionExecutor mergeOnReadRollbackActionExecutor = new MergeOnReadRollbackActionExecutor(
jsc,
cfg,
table,
"003",
rollBackInstant,
true);
// assert is filelist mode
if (!isUsingMarkers) {
assertFalse(mergeOnReadRollbackActionExecutor.getRollbackStrategy() instanceof MarkerBasedRollbackStrategy);
} else {
assertTrue(mergeOnReadRollbackActionExecutor.getRollbackStrategy() instanceof MarkerBasedRollbackStrategy);
}
//3. assert the rollback stat
Map<String, HoodieRollbackPartitionMetadata> rollbackMetadata = mergeOnReadRollbackActionExecutor.execute().getPartitionMetadata();
assertEquals(2, rollbackMetadata.size());
for (Map.Entry<String, HoodieRollbackPartitionMetadata> entry : rollbackMetadata.entrySet()) {
HoodieRollbackPartitionMetadata meta = entry.getValue();
assertTrue(meta.getFailedDeleteFiles() == null || meta.getFailedDeleteFiles().size() == 0);
assertTrue(meta.getSuccessDeleteFiles() == null || meta.getSuccessDeleteFiles().size() == 0);
}
//4. assert filegroup after rollback, and compare to the rollbackstat
// assert the first partition data and log file size
List<HoodieFileGroup> firstPartitionRollBack1FileGroups = table.getFileSystemView().getAllFileGroups(DEFAULT_FIRST_PARTITION_PATH).collect(Collectors.toList());
assertEquals(1, firstPartitionRollBack1FileGroups.size());
List<FileSlice> firstPartitionRollBack1FileSlices = firstPartitionRollBack1FileGroups.get(0).getAllFileSlices().collect(Collectors.toList());
assertEquals(1, firstPartitionRollBack1FileSlices.size());
FileSlice firstPartitionRollBack1FileSlice = firstPartitionRollBack1FileSlices.get(0);
List<HoodieLogFile> firstPartitionRollBackLogFiles = firstPartitionRollBack1FileSlice.getLogFiles().collect(Collectors.toList());
assertEquals(2, firstPartitionRollBackLogFiles.size());
firstPartitionRollBackLogFiles.removeAll(firstPartitionCommit2LogFiles);
assertEquals(1, firstPartitionRollBackLogFiles.size());
// assert the second partition data and log file size
List<HoodieFileGroup> secondPartitionRollBack1FileGroups = table.getFileSystemView().getAllFileGroups(DEFAULT_SECOND_PARTITION_PATH).collect(Collectors.toList());
assertEquals(1, secondPartitionRollBack1FileGroups.size());
List<FileSlice> secondPartitionRollBack1FileSlices = secondPartitionRollBack1FileGroups.get(0).getAllFileSlices().collect(Collectors.toList());
assertEquals(1, secondPartitionRollBack1FileSlices.size());
FileSlice secondPartitionRollBack1FileSlice = secondPartitionRollBack1FileSlices.get(0);
List<HoodieLogFile> secondPartitionRollBackLogFiles = secondPartitionRollBack1FileSlice.getLogFiles().collect(Collectors.toList());
assertEquals(2, secondPartitionRollBackLogFiles.size());
secondPartitionRollBackLogFiles.removeAll(secondPartitionCommit2LogFiles);
assertEquals(1, secondPartitionRollBackLogFiles.size());
assertFalse(new MarkerFiles(table, "002").doesMarkerDirExist());
}
@Test
public void testFailForCompletedInstants() {
Assertions.assertThrows(IllegalArgumentException.class, () -> {
HoodieInstant rollBackInstant = new HoodieInstant(false, HoodieTimeline.DELTA_COMMIT_ACTION, "002");
new MergeOnReadRollbackActionExecutor(
jsc,
getConfigBuilder().build(),
getHoodieTable(metaClient, getConfigBuilder().build()),
"003",
rollBackInstant,
true,
true,
true);
});
}
}

View File

@@ -0,0 +1,120 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.table.action.rollback;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hudi.common.HoodieRollbackStat;
import org.apache.hudi.common.table.log.block.HoodieLogBlock;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.util.CollectionUtils;
import org.junit.jupiter.api.Test;
import java.util.HashMap;
import java.util.Map;
import java.util.stream.Collectors;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertIterableEquals;
public class TestRollbackUtils {
private FileStatus generateFileStatus(String filePath) {
Path dataFile1Path = new Path(filePath);
return new FileStatus(1, true, 1, 1, 1, 1,
FsPermission.valueOf("-rw-rw-rw-"), "one", "one", null, dataFile1Path);
}
@Test
public void testGenerateHeader() {
HoodieInstant hoodieInstant = new HoodieInstant(false, HoodieTimeline.COMMIT_ACTION, "101");
String instantToRollback = "1";
Map<HoodieLogBlock.HeaderMetadataType, String> header = RollbackUtils.generateHeader(instantToRollback, hoodieInstant.getTimestamp());
Map<HoodieLogBlock.HeaderMetadataType, String> headerExpect = new HashMap<>(3);
headerExpect.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "101");
headerExpect.put(HoodieLogBlock.HeaderMetadataType.TARGET_INSTANT_TIME, "1");
headerExpect.put(HoodieLogBlock.HeaderMetadataType.COMMAND_BLOCK_TYPE, "0");
assertEquals(header, headerExpect);
}
@Test
public void testMergeRollbackStat() {
String partitionPath1 = "/partitionPath1/";
String partitionPath2 = "/partitionPath2/";
//prepare HoodieRollbackStat for different partition
Map<FileStatus, Boolean> dataFilesOnlyStat1Files = new HashMap<>();
dataFilesOnlyStat1Files.put(generateFileStatus(partitionPath1 + "dataFile1.parquet"), true);
dataFilesOnlyStat1Files.put(generateFileStatus(partitionPath1 + "dataFile2.parquet"), true);
HoodieRollbackStat dataFilesOnlyStat1 = HoodieRollbackStat.newBuilder()
.withPartitionPath(partitionPath1)
.withDeletedFileResults(dataFilesOnlyStat1Files).build();
Map<FileStatus, Boolean> dataFilesOnlyStat2Files = new HashMap<>();
dataFilesOnlyStat2Files.put(generateFileStatus(partitionPath2 + "dataFile1.parquet"), true);
dataFilesOnlyStat2Files.put(generateFileStatus(partitionPath2 + "dataFile2.parquet"), true);
HoodieRollbackStat dataFilesOnlyStat2 = HoodieRollbackStat.newBuilder()
.withPartitionPath(partitionPath2)
.withDeletedFileResults(dataFilesOnlyStat1Files).build();
//1. test different partitionpath merge
assertThrows(IllegalArgumentException.class, () -> {
RollbackUtils.mergeRollbackStat(dataFilesOnlyStat1,
dataFilesOnlyStat2);
}, "different partition rollbackstat merge will failed");
//prepare HoodieRollbackStat for failed and block append
Map<FileStatus, Boolean> dataFilesOnlyStat3Files = new HashMap<>();
dataFilesOnlyStat3Files.put(generateFileStatus(partitionPath1 + "dataFile1.log"), true);
dataFilesOnlyStat3Files.put(generateFileStatus(partitionPath1 + "dataFile3.parquet"), false);
HoodieRollbackStat dataFilesOnlyStat3 = HoodieRollbackStat.newBuilder()
.withPartitionPath(partitionPath1)
.withDeletedFileResults(dataFilesOnlyStat3Files).build();
Map<FileStatus, Long> dataFilesOnlyStat4Files = new HashMap<>();
dataFilesOnlyStat4Files.put(generateFileStatus(partitionPath1 + "dataFile1.log"), 10L);
HoodieRollbackStat dataFilesOnlyStat4 = HoodieRollbackStat.newBuilder()
.withPartitionPath(partitionPath1)
.withRollbackBlockAppendResults(dataFilesOnlyStat4Files).build();
//2. test merge dataFilesOnlyStat1 and dataFilesOnlyStat3
HoodieRollbackStat dataFilesOnlyStatMerge1 =
RollbackUtils.mergeRollbackStat(dataFilesOnlyStat1, dataFilesOnlyStat3);
assertEquals(partitionPath1, dataFilesOnlyStatMerge1.getPartitionPath());
assertIterableEquals(CollectionUtils.createImmutableList(partitionPath1 + "dataFile3.parquet"),
dataFilesOnlyStatMerge1.getFailedDeleteFiles());
assertIterableEquals(CollectionUtils.createImmutableList(partitionPath1 + "dataFile1.parquet",
partitionPath1 + "dataFile2.parquet", partitionPath1 + "dataFile1.log").stream().sorted().collect(Collectors.toList()),
dataFilesOnlyStatMerge1.getSuccessDeleteFiles().stream().sorted().collect(Collectors.toList()));
assertEquals(0, dataFilesOnlyStatMerge1.getCommandBlocksCount().size());
//3. test merge dataFilesOnlyStatMerge1 and dataFilesOnlyStat4
HoodieRollbackStat dataFilesOnlyStatMerge2 =
RollbackUtils.mergeRollbackStat(dataFilesOnlyStatMerge1, dataFilesOnlyStat4);
assertEquals(partitionPath1, dataFilesOnlyStatMerge1.getPartitionPath());
assertIterableEquals(CollectionUtils.createImmutableList(partitionPath1 + "dataFile3.parquet").stream().sorted().collect(Collectors.toList()),
dataFilesOnlyStatMerge2.getFailedDeleteFiles().stream().sorted().collect(Collectors.toList()));
assertIterableEquals(CollectionUtils.createImmutableList(partitionPath1 + "dataFile1.parquet",
partitionPath1 + "dataFile2.parquet", partitionPath1 + "dataFile1.log").stream().sorted().collect(Collectors.toList()),
dataFilesOnlyStatMerge2.getSuccessDeleteFiles().stream().sorted().collect(Collectors.toList()));
assertEquals(CollectionUtils.createImmutableMap(generateFileStatus(partitionPath1 + "dataFile1.log"), 10L),
dataFilesOnlyStatMerge2.getCommandBlocksCount());
}
}

View File

@@ -37,7 +37,6 @@ import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.index.HoodieIndex;
import org.apache.hudi.table.HoodieTable;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.SQLContext;
@@ -71,7 +70,6 @@ public abstract class HoodieClientTestHarness extends HoodieCommonTestHarness im
protected transient HoodieWriteClient writeClient;
protected transient HoodieReadClient readClient;
protected transient HoodieTableFileSystemView tableView;
protected transient HoodieTable hoodieTable;
protected final SparkTaskContextSupplier supplier = new SparkTaskContextSupplier();

View File

@@ -40,6 +40,7 @@ import org.apache.hudi.common.table.view.TableFileSystemView.BaseFileOnlyView;
import org.apache.hudi.common.testutils.HoodieTestUtils;
import org.apache.hudi.config.HoodieStorageConfig;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.io.IOType;
import org.apache.hudi.io.storage.HoodieParquetConfig;
import org.apache.hudi.io.storage.HoodieParquetWriter;
@@ -80,6 +81,7 @@ public class HoodieClientTestUtils {
private static final Logger LOG = LogManager.getLogger(HoodieClientTestUtils.class);
private static final Random RANDOM = new Random();
public static final String DEFAULT_WRITE_TOKEN = "1-0-1";
public static List<WriteStatus> collectStatuses(Iterator<List<WriteStatus>> statusListItr) {
List<WriteStatus> statuses = new ArrayList<>();
@@ -124,11 +126,19 @@ public class HoodieClientTestUtils {
new File(parentPath + "/" + instantTime + suffix).createNewFile();
}
public static void fakeCommitFile(String basePath, String instantTime) throws IOException {
public static void fakeCommit(String basePath, String instantTime) throws IOException {
fakeMetaFile(basePath, instantTime, HoodieTimeline.COMMIT_EXTENSION);
}
public static void fakeInFlightFile(String basePath, String instantTime) throws IOException {
public static void fakeDeltaCommit(String basePath, String instantTime) throws IOException {
fakeMetaFile(basePath, instantTime, HoodieTimeline.DELTA_COMMIT_EXTENSION);
}
public static void fakeInflightDeltaCommit(String basePath, String instantTime) throws IOException {
fakeMetaFile(basePath, instantTime, HoodieTimeline.INFLIGHT_DELTA_COMMIT_EXTENSION);
}
public static void fakeInFlightCommit(String basePath, String instantTime) throws IOException {
fakeMetaFile(basePath, instantTime, HoodieTimeline.INFLIGHT_EXTENSION);
}
@@ -146,6 +156,20 @@ public class HoodieClientTestUtils {
new RandomAccessFile(path, "rw").setLength(length);
}
public static void fakeLogFile(String basePath, String partitionPath, String baseInstantTime, String fileId, int version)
throws Exception {
fakeLogFile(basePath, partitionPath, baseInstantTime, fileId, version, 0);
}
public static void fakeLogFile(String basePath, String partitionPath, String baseInstantTime, String fileId, int version, int length)
throws Exception {
String parentPath = String.format("%s/%s", basePath, partitionPath);
new File(parentPath).mkdirs();
String path = String.format("%s/%s", parentPath, FSUtils.makeLogFileName(fileId, HoodieFileFormat.HOODIE_LOG.getFileExtension(), baseInstantTime, version, "1-0-1"));
new File(path).createNewFile();
new RandomAccessFile(path, "rw").setLength(length);
}
/**
* Returns a Spark config for this test.
*
@@ -308,4 +332,25 @@ public class HoodieClientTestUtils {
return HoodieClientTestUtils.writeParquetFile(basePath, partitionPath, filename, records, schema, filter,
createCommitTime);
}
public static String createNewMarkerFile(String basePath, String partitionPath, String instantTime)
throws IOException {
return createMarkerFile(basePath, partitionPath, instantTime);
}
public static String createMarkerFile(String basePath, String partitionPath, String instantTime)
throws IOException {
return createMarkerFile(basePath, partitionPath, instantTime, UUID.randomUUID().toString(), IOType.MERGE);
}
public static String createMarkerFile(String basePath, String partitionPath, String instantTime, String fileID, IOType ioType)
throws IOException {
String folderPath = basePath + "/" + HoodieTableMetaClient.TEMPFOLDER_NAME + "/" + instantTime + "/" + partitionPath + "/";
new File(folderPath).mkdirs();
String markerFileName = String.format("%s_%s_%s%s%s.%s", fileID, DEFAULT_WRITE_TOKEN, instantTime,
HoodieFileFormat.PARQUET.getFileExtension(), HoodieTableMetaClient.MARKER_EXTN, ioType);
File f = new File(folderPath + markerFileName);
f.createNewFile();
return f.getAbsolutePath();
}
}

View File

@@ -90,7 +90,7 @@ public class HoodieMergeOnReadTestUtils {
}).reduce((a, b) -> {
a.addAll(b);
return a;
}).orElse(new ArrayList<GenericRecord>());
}).orElse(new ArrayList<>());
}
private static void setPropsForInputFormat(FileInputFormat inputFormat, JobConf jobConf, Schema schema,

View File

@@ -427,15 +427,33 @@ public class HoodieTestDataGenerator {
return generateInsertsStream(commitTime, n, false, schemaStr);
}
public List<HoodieRecord> generateInsertsContainsAllPartitions(String instantTime, Integer n) {
if (n < partitionPaths.length) {
throw new HoodieIOException("n must greater then partitionPaths length");
}
return generateInsertsStream(
instantTime, n, false, TRIP_EXAMPLE_SCHEMA, true).collect(Collectors.toList());
}
/**
* Generates new inserts, uniformly across the partition paths above. It also updates the list of existing keys.
*/
public Stream<HoodieRecord> generateInsertsStream(
String instantTime, Integer n, boolean isFlattened, String schemaStr) {
int currSize = getNumExistingKeys(schemaStr);
String instantTime, Integer n, boolean isFlattened, String schemaStr) {
return generateInsertsStream(instantTime, n, isFlattened, schemaStr, false);
}
/**
* Generates new inserts, uniformly across the partition paths above. It also updates the list of existing keys.
*/
public Stream<HoodieRecord> generateInsertsStream(
String instantTime, Integer n, boolean isFlattened, String schemaStr, boolean containsAllPartitions) {
int currSize = getNumExistingKeys(schemaStr);
return IntStream.range(0, n).boxed().map(i -> {
String partitionPath = partitionPaths[RAND.nextInt(partitionPaths.length)];
if (containsAllPartitions && i < partitionPaths.length) {
partitionPath = partitionPaths[i];
}
HoodieKey key = new HoodieKey(UUID.randomUUID().toString(), partitionPath);
KeyPartition kp = new KeyPartition();
kp.key = key;