[HUDI-1714] Added tests to TestHoodieTimelineArchiveLog for the archival of compl… (#2677)
* Added tests to TestHoodieTimelineArchiveLog for the archival of completed clean and rollback actions. * Adding code review changes * [HUDI-1714] Minor Fixes
This commit is contained in:
@@ -22,6 +22,7 @@ import org.apache.hudi.avro.model.HoodieActionInstant;
|
||||
import org.apache.hudi.avro.model.HoodieCleanMetadata;
|
||||
import org.apache.hudi.avro.model.HoodieCleanerPlan;
|
||||
import org.apache.hudi.avro.model.HoodieRequestedReplaceMetadata;
|
||||
import org.apache.hudi.avro.model.HoodieRollbackMetadata;
|
||||
import org.apache.hudi.client.utils.MetadataConversionUtils;
|
||||
import org.apache.hudi.common.HoodieCleanStat;
|
||||
import org.apache.hudi.common.fs.FSUtils;
|
||||
@@ -61,9 +62,11 @@ import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Random;
|
||||
import java.util.Set;
|
||||
import java.util.stream.Collectors;
|
||||
import java.util.stream.Stream;
|
||||
|
||||
import static org.apache.hudi.common.util.CleanerUtils.convertCleanMetadata;
|
||||
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||
@@ -389,6 +392,35 @@ public class TestHoodieTimelineArchiveLog extends HoodieClientTestHarness {
|
||||
"Archived commits should always be safe");
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testArchiveRollbacks() throws IOException {
|
||||
HoodieWriteConfig cfg = HoodieWriteConfig.newBuilder().withPath(basePath)
|
||||
.withSchema(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA).withParallelism(2, 2).forTable("test-trip-table")
|
||||
.withCompactionConfig(HoodieCompactionConfig.newBuilder().retainCommits(1).archiveCommitsWith(2, 3).build())
|
||||
.build();
|
||||
|
||||
createCommitAndRollbackFile("100", "101", false);
|
||||
createCommitAndRollbackFile("102", "103", false);
|
||||
createCommitAndRollbackFile("104", "105", false);
|
||||
createCommitAndRollbackFile("106", "107", false);
|
||||
|
||||
HoodieTable table = HoodieSparkTable.create(cfg, context);
|
||||
HoodieTimelineArchiveLog archiveLog = new HoodieTimelineArchiveLog(cfg, table);
|
||||
|
||||
assertTrue(archiveLog.archiveIfRequired(context));
|
||||
HoodieTimeline timeline = metaClient.getActiveTimeline().reload().getCommitsTimeline().filterCompletedInstants();
|
||||
assertEquals(2, timeline.countInstants(),
|
||||
"first two commits must have been archived");
|
||||
assertFalse(metaClient.getActiveTimeline().containsInstant(new HoodieInstant(false, HoodieTimeline.ROLLBACK_ACTION, "101")),
|
||||
"first rollback must have been archived");
|
||||
assertFalse(metaClient.getActiveTimeline().containsInstant(new HoodieInstant(false, HoodieTimeline.ROLLBACK_ACTION, "103")),
|
||||
"second rollback must have been archived");
|
||||
assertTrue(metaClient.getActiveTimeline().containsInstant(new HoodieInstant(false, HoodieTimeline.ROLLBACK_ACTION, "105")),
|
||||
"first rollback must have been archived");
|
||||
assertTrue(metaClient.getActiveTimeline().containsInstant(new HoodieInstant(false, HoodieTimeline.ROLLBACK_ACTION, "107")),
|
||||
"second rollback must have been archived");
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testArchiveCommitCompactionNoHole() throws IOException {
|
||||
HoodieWriteConfig cfg = HoodieWriteConfig.newBuilder().withPath(basePath)
|
||||
@@ -494,6 +526,161 @@ public class TestHoodieTimelineArchiveLog extends HoodieClientTestHarness {
|
||||
assertEquals(expectedCommitMetadata.getOperationType(), WriteOperationType.INSERT.toString());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testArchiveCompletedClean() throws IOException {
|
||||
HoodieWriteConfig cfg =
|
||||
HoodieWriteConfig.newBuilder().withPath(basePath).withSchema(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA)
|
||||
.withParallelism(2, 2).forTable("test-trip-table")
|
||||
.withCompactionConfig(HoodieCompactionConfig.newBuilder().retainCommits(1).archiveCommitsWith(2, 3).build())
|
||||
.build();
|
||||
metaClient = HoodieTableMetaClient.reload(metaClient);
|
||||
|
||||
createCleanMetadata("10", false);
|
||||
createCleanMetadata("11", false);
|
||||
HoodieInstant notArchivedInstant1 = createCleanMetadata("12", false);
|
||||
HoodieInstant notArchivedInstant2 = createCleanMetadata("13", false);
|
||||
|
||||
HoodieTable table = HoodieSparkTable.create(cfg, context, metaClient);
|
||||
HoodieTimelineArchiveLog archiveLog = new HoodieTimelineArchiveLog(cfg, table);
|
||||
|
||||
archiveLog.archiveIfRequired(context);
|
||||
|
||||
List<HoodieInstant> notArchivedInstants = metaClient.getActiveTimeline().reload().getInstants().collect(Collectors.toList());
|
||||
//There will be 3 * 2 files but due to TimelineLayoutV1 this will show as 2.
|
||||
assertEquals(2, notArchivedInstants.size(), "Not archived instants should be 2");
|
||||
assertEquals(notArchivedInstants, Arrays.asList(notArchivedInstant1, notArchivedInstant2), "");
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testArchiveCompletedRollback() throws IOException {
|
||||
HoodieWriteConfig cfg =
|
||||
HoodieWriteConfig.newBuilder().withPath(basePath).withSchema(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA)
|
||||
.withParallelism(2, 2).forTable("test-trip-table")
|
||||
.withCompactionConfig(HoodieCompactionConfig.newBuilder().retainCommits(1).archiveCommitsWith(2, 3).build())
|
||||
.build();
|
||||
metaClient = HoodieTableMetaClient.reload(metaClient);
|
||||
|
||||
createCommitAndRollbackFile("6", "10", false);
|
||||
createCommitAndRollbackFile("8", "11", false);
|
||||
createCommitAndRollbackFile("7", "12", false);
|
||||
HoodieInstant notArchivedInstant1 = new HoodieInstant(State.COMPLETED, "rollback", "12");
|
||||
|
||||
createCommitAndRollbackFile("5", "13", false);
|
||||
HoodieInstant notArchivedInstant2 = new HoodieInstant(State.COMPLETED, "rollback", "13");
|
||||
|
||||
HoodieTable table = HoodieSparkTable.create(cfg, context, metaClient);
|
||||
HoodieTimelineArchiveLog archiveLog = new HoodieTimelineArchiveLog(cfg, table);
|
||||
|
||||
archiveLog.archiveIfRequired(context);
|
||||
|
||||
List<HoodieInstant> notArchivedInstants = metaClient.getActiveTimeline().reload().getRollbackTimeline().getInstants().collect(Collectors.toList());
|
||||
//There will be 2 * 2 files but due to TimelineLayoutV1 this will show as 2.
|
||||
assertEquals(2, notArchivedInstants.size(), "Not archived instants should be 2");
|
||||
assertEquals(notArchivedInstants, Arrays.asList(notArchivedInstant1, notArchivedInstant2), "");
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testArchiveCompletedShouldRetainMinInstantsIfInstantsGreaterThanMaxtoKeep() throws IOException {
|
||||
int minInstants = 2;
|
||||
int maxInstants = 10;
|
||||
HoodieWriteConfig cfg =
|
||||
HoodieWriteConfig.newBuilder().withPath(basePath).withSchema(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA)
|
||||
.withParallelism(2, 2).forTable("test-trip-table")
|
||||
.withCompactionConfig(HoodieCompactionConfig.newBuilder().retainCommits(1).archiveCommitsWith(minInstants, maxInstants).build())
|
||||
.build();
|
||||
metaClient = HoodieTableMetaClient.reload(metaClient);
|
||||
for (int i = 0; i < maxInstants + 2; i++) {
|
||||
createCleanMetadata(i + "", false);
|
||||
}
|
||||
|
||||
HoodieTable table = HoodieSparkTable.create(cfg, context, metaClient);
|
||||
HoodieTimelineArchiveLog archiveLog = new HoodieTimelineArchiveLog(cfg, table);
|
||||
|
||||
archiveLog.archiveIfRequired(context);
|
||||
assertEquals(minInstants, metaClient.getActiveTimeline().reload().getInstants().count());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testArchiveCompletedShouldNotArchiveIfInstantsLessThanMaxtoKeep() throws IOException {
|
||||
int minInstants = 2;
|
||||
int maxInstants = 10;
|
||||
HoodieWriteConfig cfg =
|
||||
HoodieWriteConfig.newBuilder().withPath(basePath).withSchema(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA)
|
||||
.withParallelism(2, 2).forTable("test-trip-table")
|
||||
.withCompactionConfig(HoodieCompactionConfig.newBuilder().retainCommits(1).archiveCommitsWith(minInstants, maxInstants).build())
|
||||
.build();
|
||||
metaClient = HoodieTableMetaClient.reload(metaClient);
|
||||
for (int i = 0; i < maxInstants; i++) {
|
||||
createCleanMetadata(i + "", false);
|
||||
}
|
||||
|
||||
HoodieTable table = HoodieSparkTable.create(cfg, context, metaClient);
|
||||
HoodieTimelineArchiveLog archiveLog = new HoodieTimelineArchiveLog(cfg, table);
|
||||
|
||||
archiveLog.archiveIfRequired(context);
|
||||
assertEquals(maxInstants, metaClient.getActiveTimeline().reload().getInstants().count());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testArchiveCompletedRollbackAndClean() throws IOException {
|
||||
int minInstantsToKeep = 2;
|
||||
int maxInstantsToKeep = 10;
|
||||
HoodieWriteConfig cfg =
|
||||
HoodieWriteConfig.newBuilder().withPath(basePath).withSchema(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA)
|
||||
.withParallelism(2, 2).forTable("test-trip-table")
|
||||
.withCompactionConfig(HoodieCompactionConfig.newBuilder().retainCommits(1).archiveCommitsWith(minInstantsToKeep, maxInstantsToKeep).build())
|
||||
.build();
|
||||
metaClient = HoodieTableMetaClient.reload(metaClient);
|
||||
|
||||
int startInstant = 1;
|
||||
for (int i = 0; i < maxInstantsToKeep + 1; i++, startInstant++) {
|
||||
createCleanMetadata(startInstant + "", false);
|
||||
}
|
||||
|
||||
for (int i = 0; i < maxInstantsToKeep + 1; i++, startInstant += 2) {
|
||||
createCommitAndRollbackFile(startInstant + 1 + "", startInstant + "", false);
|
||||
}
|
||||
|
||||
HoodieTable table = HoodieSparkTable.create(cfg, context, metaClient);
|
||||
HoodieTimelineArchiveLog archiveLog = new HoodieTimelineArchiveLog(cfg, table);
|
||||
|
||||
archiveLog.archiveIfRequired(context);
|
||||
|
||||
Stream<HoodieInstant> currentInstants = metaClient.getActiveTimeline().reload().getInstants();
|
||||
Map<Object, List<HoodieInstant>> actionInstantMap = currentInstants.collect(Collectors.groupingBy(HoodieInstant::getAction));
|
||||
|
||||
assertTrue(actionInstantMap.containsKey("clean"), "Clean Action key must be preset");
|
||||
assertEquals(minInstantsToKeep, actionInstantMap.get("clean").size(), "Should have min instant");
|
||||
|
||||
assertTrue(actionInstantMap.containsKey("rollback"), "Rollback Action key must be preset");
|
||||
assertEquals(minInstantsToKeep, actionInstantMap.get("rollback").size(), "Should have min instant");
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testArchiveInflightClean() throws IOException {
|
||||
HoodieWriteConfig cfg =
|
||||
HoodieWriteConfig.newBuilder().withPath(basePath).withSchema(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA)
|
||||
.withParallelism(2, 2).forTable("test-trip-table")
|
||||
.withCompactionConfig(HoodieCompactionConfig.newBuilder().retainCommits(1).archiveCommitsWith(2, 3).build())
|
||||
.build();
|
||||
metaClient = HoodieTableMetaClient.reload(metaClient);
|
||||
|
||||
createCleanMetadata("10", false);
|
||||
createCleanMetadata("11", false);
|
||||
HoodieInstant notArchivedInstant1 = createCleanMetadata("12", false);
|
||||
HoodieInstant notArchivedInstant2 = createCleanMetadata("13", false);
|
||||
HoodieInstant notArchivedInstant3 = createCleanMetadata("14", true);
|
||||
|
||||
HoodieTable table = HoodieSparkTable.create(cfg, context, metaClient);
|
||||
HoodieTimelineArchiveLog archiveLog = new HoodieTimelineArchiveLog(cfg, table);
|
||||
|
||||
archiveLog.archiveIfRequired(context);
|
||||
|
||||
List<HoodieInstant> notArchivedInstants = metaClient.getActiveTimeline().reload().getInstants().collect(Collectors.toList());
|
||||
assertEquals(3, notArchivedInstants.size(), "Not archived instants should be 3");
|
||||
assertEquals(notArchivedInstants, Arrays.asList(notArchivedInstant1, notArchivedInstant2, notArchivedInstant3), "");
|
||||
}
|
||||
|
||||
private void createReplaceMetadata(String instantTime) throws Exception {
|
||||
String fileId1 = "file-" + instantTime + "-1";
|
||||
String fileId2 = "file-" + instantTime + "-2";
|
||||
@@ -512,7 +699,7 @@ public class TestHoodieTimelineArchiveLog extends HoodieClientTestHarness {
|
||||
.withBaseFilesInPartition(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, fileId1, fileId2);
|
||||
}
|
||||
|
||||
private void createCleanMetadata(String instantTime, boolean inflightOnly) throws IOException {
|
||||
private HoodieInstant createCleanMetadata(String instantTime, boolean inflightOnly) throws IOException {
|
||||
HoodieCleanerPlan cleanerPlan = new HoodieCleanerPlan(new HoodieActionInstant("", "", ""), "", new HashMap<>(),
|
||||
CleanPlanV2MigrationHandler.VERSION, new HashMap<>());
|
||||
if (inflightOnly) {
|
||||
@@ -528,5 +715,29 @@ public class TestHoodieTimelineArchiveLog extends HoodieClientTestHarness {
|
||||
HoodieCleanMetadata cleanMetadata = convertCleanMetadata(instantTime, Option.of(0L), Collections.singletonList(cleanStats));
|
||||
HoodieTestTable.of(metaClient).addClean(instantTime, cleanerPlan, cleanMetadata);
|
||||
}
|
||||
return new HoodieInstant(inflightOnly, "clean", instantTime);
|
||||
}
|
||||
|
||||
private void createCommitAndRollbackFile(String commitToRollback, String rollbackTIme, boolean isRollbackInflight) throws IOException {
|
||||
HoodieTestDataGenerator.createCommitFile(basePath, commitToRollback, wrapperFs.getConf());
|
||||
createRollbackMetadata(rollbackTIme, commitToRollback, isRollbackInflight);
|
||||
}
|
||||
|
||||
private HoodieInstant createRollbackMetadata(String rollbackTime, String commitToRollback, boolean inflight) throws IOException {
|
||||
if (inflight) {
|
||||
HoodieTestTable.of(metaClient).addInflightRollback(rollbackTime);
|
||||
} else {
|
||||
HoodieRollbackMetadata hoodieRollbackMetadata = HoodieRollbackMetadata.newBuilder()
|
||||
.setVersion(1)
|
||||
.setStartRollbackTime(rollbackTime)
|
||||
.setTotalFilesDeleted(1)
|
||||
.setTimeTakenInMillis(1000)
|
||||
.setCommitsRollback(Collections.singletonList(commitToRollback))
|
||||
.setPartitionMetadata(Collections.emptyMap())
|
||||
.setInstantsRollback(Collections.emptyList())
|
||||
.build();
|
||||
HoodieTestTable.of(metaClient).addRollback(rollbackTime, hoodieRollbackMetadata);
|
||||
}
|
||||
return new HoodieInstant(inflight, "rollback", rollbackTime);
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user