From b31c520c66a65723580807b8eb6a2639c96328a8 Mon Sep 17 00:00:00 2001 From: jsbali Date: Wed, 21 Apr 2021 22:57:43 +0530 Subject: [PATCH] =?UTF-8?q?[HUDI-1714]=20Added=20tests=20to=20TestHoodieTi?= =?UTF-8?q?melineArchiveLog=20for=20the=20archival=20of=20compl=E2=80=A6?= =?UTF-8?q?=20(#2677)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * Added tests to TestHoodieTimelineArchiveLog for the archival of completed clean and rollback actions. * Adding code review changes * [HUDI-1714] Minor Fixes --- .../client/utils/MetadataConversionUtils.java | 6 +- .../hudi/table/HoodieTimelineArchiveLog.java | 2 +- .../hudi/io/TestHoodieTimelineArchiveLog.java | 213 +++++++++++++++++- .../common/testutils/FileCreateUtils.java | 4 +- .../testutils/HoodieTestDataGenerator.java | 10 +- .../common/testutils/HoodieTestTable.java | 7 + 6 files changed, 235 insertions(+), 7 deletions(-) diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/utils/MetadataConversionUtils.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/utils/MetadataConversionUtils.java index b46f3d866..7061dfbf7 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/utils/MetadataConversionUtils.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/utils/MetadataConversionUtils.java @@ -81,8 +81,10 @@ public class MetadataConversionUtils { break; } case HoodieTimeline.ROLLBACK_ACTION: { - archivedMetaWrapper.setHoodieRollbackMetadata(TimelineMetadataUtils.deserializeAvroMetadata( - metaClient.getActiveTimeline().getInstantDetails(hoodieInstant).get(), HoodieRollbackMetadata.class)); + if (hoodieInstant.isCompleted()) { + archivedMetaWrapper.setHoodieRollbackMetadata(TimelineMetadataUtils.deserializeAvroMetadata( + metaClient.getActiveTimeline().getInstantDetails(hoodieInstant).get(), HoodieRollbackMetadata.class)); + } archivedMetaWrapper.setActionType(ActionType.rollback.name()); break; } diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTimelineArchiveLog.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTimelineArchiveLog.java index 09df62c91..7993f0282 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTimelineArchiveLog.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTimelineArchiveLog.java @@ -366,7 +366,7 @@ public class HoodieTimelineArchiveLog { } private IndexedRecord convertToAvroRecord(HoodieTimeline commitTimeline, HoodieInstant hoodieInstant) - throws IOException { + throws IOException { return MetadataConversionUtils.createMetaWrapper(hoodieInstant, metaClient); } } diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestHoodieTimelineArchiveLog.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestHoodieTimelineArchiveLog.java index f0f13923e..b42f650c3 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestHoodieTimelineArchiveLog.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestHoodieTimelineArchiveLog.java @@ -22,6 +22,7 @@ import org.apache.hudi.avro.model.HoodieActionInstant; import org.apache.hudi.avro.model.HoodieCleanMetadata; import org.apache.hudi.avro.model.HoodieCleanerPlan; import org.apache.hudi.avro.model.HoodieRequestedReplaceMetadata; +import org.apache.hudi.avro.model.HoodieRollbackMetadata; import org.apache.hudi.client.utils.MetadataConversionUtils; import org.apache.hudi.common.HoodieCleanStat; import org.apache.hudi.common.fs.FSUtils; @@ -61,9 +62,11 @@ import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.List; +import java.util.Map; import java.util.Random; import java.util.Set; import java.util.stream.Collectors; +import java.util.stream.Stream; import static org.apache.hudi.common.util.CleanerUtils.convertCleanMetadata; import static org.junit.jupiter.api.Assertions.assertEquals; @@ -389,6 +392,35 @@ public class TestHoodieTimelineArchiveLog extends HoodieClientTestHarness { "Archived commits should always be safe"); } + @Test + public void testArchiveRollbacks() throws IOException { + HoodieWriteConfig cfg = HoodieWriteConfig.newBuilder().withPath(basePath) + .withSchema(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA).withParallelism(2, 2).forTable("test-trip-table") + .withCompactionConfig(HoodieCompactionConfig.newBuilder().retainCommits(1).archiveCommitsWith(2, 3).build()) + .build(); + + createCommitAndRollbackFile("100", "101", false); + createCommitAndRollbackFile("102", "103", false); + createCommitAndRollbackFile("104", "105", false); + createCommitAndRollbackFile("106", "107", false); + + HoodieTable table = HoodieSparkTable.create(cfg, context); + HoodieTimelineArchiveLog archiveLog = new HoodieTimelineArchiveLog(cfg, table); + + assertTrue(archiveLog.archiveIfRequired(context)); + HoodieTimeline timeline = metaClient.getActiveTimeline().reload().getCommitsTimeline().filterCompletedInstants(); + assertEquals(2, timeline.countInstants(), + "first two commits must have been archived"); + assertFalse(metaClient.getActiveTimeline().containsInstant(new HoodieInstant(false, HoodieTimeline.ROLLBACK_ACTION, "101")), + "first rollback must have been archived"); + assertFalse(metaClient.getActiveTimeline().containsInstant(new HoodieInstant(false, HoodieTimeline.ROLLBACK_ACTION, "103")), + "second rollback must have been archived"); + assertTrue(metaClient.getActiveTimeline().containsInstant(new HoodieInstant(false, HoodieTimeline.ROLLBACK_ACTION, "105")), + "first rollback must have been archived"); + assertTrue(metaClient.getActiveTimeline().containsInstant(new HoodieInstant(false, HoodieTimeline.ROLLBACK_ACTION, "107")), + "second rollback must have been archived"); + } + @Test public void testArchiveCommitCompactionNoHole() throws IOException { HoodieWriteConfig cfg = HoodieWriteConfig.newBuilder().withPath(basePath) @@ -494,6 +526,161 @@ public class TestHoodieTimelineArchiveLog extends HoodieClientTestHarness { assertEquals(expectedCommitMetadata.getOperationType(), WriteOperationType.INSERT.toString()); } + @Test + public void testArchiveCompletedClean() throws IOException { + HoodieWriteConfig cfg = + HoodieWriteConfig.newBuilder().withPath(basePath).withSchema(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA) + .withParallelism(2, 2).forTable("test-trip-table") + .withCompactionConfig(HoodieCompactionConfig.newBuilder().retainCommits(1).archiveCommitsWith(2, 3).build()) + .build(); + metaClient = HoodieTableMetaClient.reload(metaClient); + + createCleanMetadata("10", false); + createCleanMetadata("11", false); + HoodieInstant notArchivedInstant1 = createCleanMetadata("12", false); + HoodieInstant notArchivedInstant2 = createCleanMetadata("13", false); + + HoodieTable table = HoodieSparkTable.create(cfg, context, metaClient); + HoodieTimelineArchiveLog archiveLog = new HoodieTimelineArchiveLog(cfg, table); + + archiveLog.archiveIfRequired(context); + + List notArchivedInstants = metaClient.getActiveTimeline().reload().getInstants().collect(Collectors.toList()); + //There will be 3 * 2 files but due to TimelineLayoutV1 this will show as 2. + assertEquals(2, notArchivedInstants.size(), "Not archived instants should be 2"); + assertEquals(notArchivedInstants, Arrays.asList(notArchivedInstant1, notArchivedInstant2), ""); + } + + @Test + public void testArchiveCompletedRollback() throws IOException { + HoodieWriteConfig cfg = + HoodieWriteConfig.newBuilder().withPath(basePath).withSchema(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA) + .withParallelism(2, 2).forTable("test-trip-table") + .withCompactionConfig(HoodieCompactionConfig.newBuilder().retainCommits(1).archiveCommitsWith(2, 3).build()) + .build(); + metaClient = HoodieTableMetaClient.reload(metaClient); + + createCommitAndRollbackFile("6", "10", false); + createCommitAndRollbackFile("8", "11", false); + createCommitAndRollbackFile("7", "12", false); + HoodieInstant notArchivedInstant1 = new HoodieInstant(State.COMPLETED, "rollback", "12"); + + createCommitAndRollbackFile("5", "13", false); + HoodieInstant notArchivedInstant2 = new HoodieInstant(State.COMPLETED, "rollback", "13"); + + HoodieTable table = HoodieSparkTable.create(cfg, context, metaClient); + HoodieTimelineArchiveLog archiveLog = new HoodieTimelineArchiveLog(cfg, table); + + archiveLog.archiveIfRequired(context); + + List notArchivedInstants = metaClient.getActiveTimeline().reload().getRollbackTimeline().getInstants().collect(Collectors.toList()); + //There will be 2 * 2 files but due to TimelineLayoutV1 this will show as 2. + assertEquals(2, notArchivedInstants.size(), "Not archived instants should be 2"); + assertEquals(notArchivedInstants, Arrays.asList(notArchivedInstant1, notArchivedInstant2), ""); + } + + @Test + public void testArchiveCompletedShouldRetainMinInstantsIfInstantsGreaterThanMaxtoKeep() throws IOException { + int minInstants = 2; + int maxInstants = 10; + HoodieWriteConfig cfg = + HoodieWriteConfig.newBuilder().withPath(basePath).withSchema(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA) + .withParallelism(2, 2).forTable("test-trip-table") + .withCompactionConfig(HoodieCompactionConfig.newBuilder().retainCommits(1).archiveCommitsWith(minInstants, maxInstants).build()) + .build(); + metaClient = HoodieTableMetaClient.reload(metaClient); + for (int i = 0; i < maxInstants + 2; i++) { + createCleanMetadata(i + "", false); + } + + HoodieTable table = HoodieSparkTable.create(cfg, context, metaClient); + HoodieTimelineArchiveLog archiveLog = new HoodieTimelineArchiveLog(cfg, table); + + archiveLog.archiveIfRequired(context); + assertEquals(minInstants, metaClient.getActiveTimeline().reload().getInstants().count()); + } + + @Test + public void testArchiveCompletedShouldNotArchiveIfInstantsLessThanMaxtoKeep() throws IOException { + int minInstants = 2; + int maxInstants = 10; + HoodieWriteConfig cfg = + HoodieWriteConfig.newBuilder().withPath(basePath).withSchema(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA) + .withParallelism(2, 2).forTable("test-trip-table") + .withCompactionConfig(HoodieCompactionConfig.newBuilder().retainCommits(1).archiveCommitsWith(minInstants, maxInstants).build()) + .build(); + metaClient = HoodieTableMetaClient.reload(metaClient); + for (int i = 0; i < maxInstants; i++) { + createCleanMetadata(i + "", false); + } + + HoodieTable table = HoodieSparkTable.create(cfg, context, metaClient); + HoodieTimelineArchiveLog archiveLog = new HoodieTimelineArchiveLog(cfg, table); + + archiveLog.archiveIfRequired(context); + assertEquals(maxInstants, metaClient.getActiveTimeline().reload().getInstants().count()); + } + + @Test + public void testArchiveCompletedRollbackAndClean() throws IOException { + int minInstantsToKeep = 2; + int maxInstantsToKeep = 10; + HoodieWriteConfig cfg = + HoodieWriteConfig.newBuilder().withPath(basePath).withSchema(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA) + .withParallelism(2, 2).forTable("test-trip-table") + .withCompactionConfig(HoodieCompactionConfig.newBuilder().retainCommits(1).archiveCommitsWith(minInstantsToKeep, maxInstantsToKeep).build()) + .build(); + metaClient = HoodieTableMetaClient.reload(metaClient); + + int startInstant = 1; + for (int i = 0; i < maxInstantsToKeep + 1; i++, startInstant++) { + createCleanMetadata(startInstant + "", false); + } + + for (int i = 0; i < maxInstantsToKeep + 1; i++, startInstant += 2) { + createCommitAndRollbackFile(startInstant + 1 + "", startInstant + "", false); + } + + HoodieTable table = HoodieSparkTable.create(cfg, context, metaClient); + HoodieTimelineArchiveLog archiveLog = new HoodieTimelineArchiveLog(cfg, table); + + archiveLog.archiveIfRequired(context); + + Stream currentInstants = metaClient.getActiveTimeline().reload().getInstants(); + Map> actionInstantMap = currentInstants.collect(Collectors.groupingBy(HoodieInstant::getAction)); + + assertTrue(actionInstantMap.containsKey("clean"), "Clean Action key must be preset"); + assertEquals(minInstantsToKeep, actionInstantMap.get("clean").size(), "Should have min instant"); + + assertTrue(actionInstantMap.containsKey("rollback"), "Rollback Action key must be preset"); + assertEquals(minInstantsToKeep, actionInstantMap.get("rollback").size(), "Should have min instant"); + } + + @Test + public void testArchiveInflightClean() throws IOException { + HoodieWriteConfig cfg = + HoodieWriteConfig.newBuilder().withPath(basePath).withSchema(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA) + .withParallelism(2, 2).forTable("test-trip-table") + .withCompactionConfig(HoodieCompactionConfig.newBuilder().retainCommits(1).archiveCommitsWith(2, 3).build()) + .build(); + metaClient = HoodieTableMetaClient.reload(metaClient); + + createCleanMetadata("10", false); + createCleanMetadata("11", false); + HoodieInstant notArchivedInstant1 = createCleanMetadata("12", false); + HoodieInstant notArchivedInstant2 = createCleanMetadata("13", false); + HoodieInstant notArchivedInstant3 = createCleanMetadata("14", true); + + HoodieTable table = HoodieSparkTable.create(cfg, context, metaClient); + HoodieTimelineArchiveLog archiveLog = new HoodieTimelineArchiveLog(cfg, table); + + archiveLog.archiveIfRequired(context); + + List notArchivedInstants = metaClient.getActiveTimeline().reload().getInstants().collect(Collectors.toList()); + assertEquals(3, notArchivedInstants.size(), "Not archived instants should be 3"); + assertEquals(notArchivedInstants, Arrays.asList(notArchivedInstant1, notArchivedInstant2, notArchivedInstant3), ""); + } + private void createReplaceMetadata(String instantTime) throws Exception { String fileId1 = "file-" + instantTime + "-1"; String fileId2 = "file-" + instantTime + "-2"; @@ -512,7 +699,7 @@ public class TestHoodieTimelineArchiveLog extends HoodieClientTestHarness { .withBaseFilesInPartition(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, fileId1, fileId2); } - private void createCleanMetadata(String instantTime, boolean inflightOnly) throws IOException { + private HoodieInstant createCleanMetadata(String instantTime, boolean inflightOnly) throws IOException { HoodieCleanerPlan cleanerPlan = new HoodieCleanerPlan(new HoodieActionInstant("", "", ""), "", new HashMap<>(), CleanPlanV2MigrationHandler.VERSION, new HashMap<>()); if (inflightOnly) { @@ -528,5 +715,29 @@ public class TestHoodieTimelineArchiveLog extends HoodieClientTestHarness { HoodieCleanMetadata cleanMetadata = convertCleanMetadata(instantTime, Option.of(0L), Collections.singletonList(cleanStats)); HoodieTestTable.of(metaClient).addClean(instantTime, cleanerPlan, cleanMetadata); } + return new HoodieInstant(inflightOnly, "clean", instantTime); + } + + private void createCommitAndRollbackFile(String commitToRollback, String rollbackTIme, boolean isRollbackInflight) throws IOException { + HoodieTestDataGenerator.createCommitFile(basePath, commitToRollback, wrapperFs.getConf()); + createRollbackMetadata(rollbackTIme, commitToRollback, isRollbackInflight); + } + + private HoodieInstant createRollbackMetadata(String rollbackTime, String commitToRollback, boolean inflight) throws IOException { + if (inflight) { + HoodieTestTable.of(metaClient).addInflightRollback(rollbackTime); + } else { + HoodieRollbackMetadata hoodieRollbackMetadata = HoodieRollbackMetadata.newBuilder() + .setVersion(1) + .setStartRollbackTime(rollbackTime) + .setTotalFilesDeleted(1) + .setTimeTakenInMillis(1000) + .setCommitsRollback(Collections.singletonList(commitToRollback)) + .setPartitionMetadata(Collections.emptyMap()) + .setInstantsRollback(Collections.emptyList()) + .build(); + HoodieTestTable.of(metaClient).addRollback(rollbackTime, hoodieRollbackMetadata); + } + return new HoodieInstant(inflight, "rollback", rollbackTime); } } diff --git a/hudi-common/src/test/java/org/apache/hudi/common/testutils/FileCreateUtils.java b/hudi-common/src/test/java/org/apache/hudi/common/testutils/FileCreateUtils.java index 5474b184b..9409566b4 100644 --- a/hudi-common/src/test/java/org/apache/hudi/common/testutils/FileCreateUtils.java +++ b/hudi-common/src/test/java/org/apache/hudi/common/testutils/FileCreateUtils.java @@ -180,8 +180,8 @@ public class FileCreateUtils { createMetaFile(basePath, instantTime, HoodieTimeline.INFLIGHT_ROLLBACK_EXTENSION); } - public static void createRollbackFile(String basePath, String instantTime, HoodieRollbackMetadata rollbackMetadata) throws IOException { - createMetaFile(basePath, instantTime, HoodieTimeline.ROLLBACK_EXTENSION, serializeRollbackMetadata(rollbackMetadata).get()); + public static void createRollbackFile(String basePath, String instantTime, HoodieRollbackMetadata hoodieRollbackMetadata) throws IOException { + createMetaFile(basePath, instantTime, HoodieTimeline.ROLLBACK_EXTENSION, serializeRollbackMetadata(hoodieRollbackMetadata).get()); } private static void createAuxiliaryMetaFile(String basePath, String instantTime, String suffix) throws IOException { diff --git a/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestDataGenerator.java b/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestDataGenerator.java index 8017bc3d7..7499894c2 100644 --- a/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestDataGenerator.java +++ b/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestDataGenerator.java @@ -341,6 +341,14 @@ public class HoodieTestDataGenerator { } private static void createMetadataFile(String f, String basePath, Configuration configuration, HoodieCommitMetadata commitMetadata) { + try { + createMetadataFile(f, basePath, configuration, commitMetadata.toJsonString().getBytes(StandardCharsets.UTF_8)); + } catch (IOException e) { + throw new HoodieIOException(e.getMessage(), e); + } + } + + private static void createMetadataFile(String f, String basePath, Configuration configuration, byte[] content) { Path commitFile = new Path( basePath + "/" + HoodieTableMetaClient.METAFOLDER_NAME + "/" + f); FSDataOutputStream os = null; @@ -348,7 +356,7 @@ public class HoodieTestDataGenerator { FileSystem fs = FSUtils.getFs(basePath, configuration); os = fs.create(commitFile, true); // Write empty commit metadata - os.writeBytes(new String(commitMetadata.toJsonString().getBytes(StandardCharsets.UTF_8))); + os.write(content); } catch (IOException ioe) { throw new HoodieIOException(ioe.getMessage(), ioe); } finally { diff --git a/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestTable.java b/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestTable.java index 182a8d409..98b0f90b7 100644 --- a/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestTable.java +++ b/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestTable.java @@ -210,6 +210,13 @@ public class HoodieTestTable { return this; } + public HoodieTestTable addInflightRollback(String instantTime) throws IOException { + createInflightRollbackFile(basePath, instantTime); + currentInstantTime = instantTime; + metaClient = HoodieTableMetaClient.reload(metaClient); + return this; + } + public HoodieTestTable addRollback(String instantTime, HoodieRollbackMetadata rollbackMetadata) throws IOException { createInflightRollbackFile(basePath, instantTime); createRollbackFile(basePath, instantTime, rollbackMetadata);