diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/ReplaceArchivalHelper.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/ReplaceArchivalHelper.java index 5144f2b83..40eff71c9 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/ReplaceArchivalHelper.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/ReplaceArchivalHelper.java @@ -20,24 +20,13 @@ package org.apache.hudi.client; import com.fasterxml.jackson.databind.DeserializationFeature; import com.fasterxml.jackson.databind.ObjectMapper; -import org.apache.hadoop.fs.Path; - -import org.apache.hudi.common.engine.HoodieEngineContext; -import org.apache.hudi.common.model.FileSlice; -import org.apache.hudi.common.model.HoodieFileGroup; import org.apache.hudi.common.model.HoodieReplaceCommitMetadata; import org.apache.hudi.common.model.HoodieRollingStatMetadata; -import org.apache.hudi.common.table.HoodieTableMetaClient; -import org.apache.hudi.common.table.timeline.HoodieInstant; -import org.apache.hudi.common.table.view.TableFileSystemView; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; -import java.io.IOException; import java.io.Serializable; -import java.util.List; -import java.util.stream.Stream; /** * Operates on marker files for a given write action (commit, delta commit, compaction). @@ -61,45 +50,4 @@ public class ReplaceArchivalHelper implements Serializable { avroMetaData.getExtraMetadata().put(HoodieRollingStatMetadata.ROLLING_STAT_METADATA_KEY, ""); return avroMetaData; } - - /** - * Delete all files represented by FileSlices in parallel. Return true if all files are deleted successfully. - */ - public static boolean deleteReplacedFileGroups(HoodieEngineContext context, HoodieTableMetaClient metaClient, - TableFileSystemView fileSystemView, - HoodieInstant instant, List replacedPartitions) { - // There is no file id to be replaced in the very first replace commit file for insert overwrite operation - if (replacedPartitions.isEmpty()) { - LOG.warn("Found no partition files to replace"); - return true; - } - context.setJobStatus(ReplaceArchivalHelper.class.getSimpleName(), "Delete replaced file groups"); - List f = context.map(replacedPartitions, partition -> { - Stream fileSlices = fileSystemView.getReplacedFileGroupsBeforeOrOn(instant.getTimestamp(), partition) - .flatMap(HoodieFileGroup::getAllRawFileSlices); - return fileSlices.allMatch(slice -> deleteFileSlice(slice, metaClient, instant)); - }, replacedPartitions.size()); - - return f.stream().reduce((x, y) -> x & y).orElse(true); - } - - private static boolean deleteFileSlice(FileSlice fileSlice, HoodieTableMetaClient metaClient, HoodieInstant instant) { - boolean baseFileDeleteSuccess = fileSlice.getBaseFile().map(baseFile -> - deletePath(new Path(baseFile.getPath()), metaClient, instant)).orElse(true); - - boolean logFileSuccess = fileSlice.getLogFiles().map(logFile -> - deletePath(logFile.getPath(), metaClient, instant)).allMatch(x -> x); - return baseFileDeleteSuccess & logFileSuccess; - } - - private static boolean deletePath(Path path, HoodieTableMetaClient metaClient, HoodieInstant instant) { - try { - LOG.info("Deleting " + path + " before archiving " + instant); - metaClient.getFs().delete(path); - return true; - } catch (IOException e) { - LOG.error("unable to delete file groups that are replaced", e); - return false; - } - } } diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTimelineArchiveLog.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTimelineArchiveLog.java index 43698f047..11c288bd0 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTimelineArchiveLog.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTimelineArchiveLog.java @@ -22,13 +22,11 @@ import org.apache.avro.Schema; import org.apache.avro.generic.IndexedRecord; import org.apache.hadoop.fs.Path; import org.apache.hudi.avro.model.HoodieArchivedMetaEntry; -import org.apache.hudi.client.ReplaceArchivalHelper; import org.apache.hudi.client.utils.MetadataConversionUtils; import org.apache.hudi.common.engine.HoodieEngineContext; import org.apache.hudi.common.model.HoodieArchivedLogFile; import org.apache.hudi.common.model.HoodieAvroPayload; import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy; -import org.apache.hudi.common.model.HoodieReplaceCommitMetadata; import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.log.HoodieLogFormat; import org.apache.hudi.common.table.log.HoodieLogFormat.Writer; @@ -40,7 +38,6 @@ import org.apache.hudi.common.table.timeline.HoodieArchivedTimeline; import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.table.timeline.HoodieTimeline; import org.apache.hudi.common.table.view.FileSystemViewStorageConfig; -import org.apache.hudi.common.table.view.TableFileSystemView; import org.apache.hudi.common.util.CollectionUtils; import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.collection.Pair; @@ -303,11 +300,6 @@ public class HoodieTimelineArchiveLog { LOG.info("Wrapper schema " + wrapperSchema.toString()); List records = new ArrayList<>(); for (HoodieInstant hoodieInstant : instants) { - // TODO HUDI-1518 Cleaner now takes care of removing replaced file groups. This call to deleteReplacedFileGroups can be removed. - boolean deleteSuccess = deleteReplacedFileGroups(context, hoodieInstant); - if (!deleteSuccess) { - LOG.warn("Unable to delete file(s) for " + hoodieInstant.getFileName() + ", replaced files possibly deleted by cleaner"); - } try { deleteAnyLeftOverMarkers(context, hoodieInstant); records.add(convertToAvroRecord(hoodieInstant)); @@ -334,29 +326,6 @@ public class HoodieTimelineArchiveLog { } } - private boolean deleteReplacedFileGroups(HoodieEngineContext context, HoodieInstant instant) { - if (!instant.isCompleted() || !HoodieTimeline.REPLACE_COMMIT_ACTION.equals(instant.getAction())) { - // only delete files for completed replace instants - return true; - } - - TableFileSystemView fileSystemView = this.table.getFileSystemView(); - List replacedPartitions = getReplacedPartitions(instant); - return ReplaceArchivalHelper.deleteReplacedFileGroups(context, metaClient, fileSystemView, instant, replacedPartitions); - } - - private List getReplacedPartitions(HoodieInstant instant) { - try { - HoodieReplaceCommitMetadata metadata = HoodieReplaceCommitMetadata.fromBytes( - metaClient.getActiveTimeline().getInstantDetails(instant).get(), - HoodieReplaceCommitMetadata.class); - - return new ArrayList<>(metadata.getPartitionToReplaceFileIds().keySet()); - } catch (IOException e) { - throw new HoodieCommitException("Failed to archive because cannot delete replace files", e); - } - } - private void writeToFile(Schema wrapperSchema, List records) throws Exception { if (records.size() > 0) { Map header = new HashMap<>(); diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestHoodieTimelineArchiveLog.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestHoodieTimelineArchiveLog.java index ea80b089e..cdd3fa526 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestHoodieTimelineArchiveLog.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestHoodieTimelineArchiveLog.java @@ -21,15 +21,12 @@ package org.apache.hudi.io; import org.apache.hudi.avro.model.HoodieActionInstant; import org.apache.hudi.avro.model.HoodieCleanMetadata; import org.apache.hudi.avro.model.HoodieCleanerPlan; -import org.apache.hudi.avro.model.HoodieRequestedReplaceMetadata; import org.apache.hudi.avro.model.HoodieRollbackMetadata; import org.apache.hudi.client.utils.MetadataConversionUtils; import org.apache.hudi.common.HoodieCleanStat; -import org.apache.hudi.common.fs.FSUtils; import org.apache.hudi.common.fs.HoodieWrapperFileSystem; import org.apache.hudi.common.model.HoodieCleaningPolicy; import org.apache.hudi.common.model.HoodieCommitMetadata; -import org.apache.hudi.common.model.HoodieReplaceCommitMetadata; import org.apache.hudi.common.model.WriteOperationType; import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.timeline.HoodieActiveTimeline; @@ -50,7 +47,6 @@ import org.apache.hudi.table.HoodieTimelineArchiveLog; import org.apache.hudi.testutils.HoodieClientTestHarness; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.Path; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; @@ -233,45 +229,6 @@ public class TestHoodieTimelineArchiveLog extends HoodieClientTestHarness { verifyInflightInstants(metaClient, 2); } - @Test - public void testArchiveTableWithReplacedFiles() throws Exception { - HoodieTestUtils.init(hadoopConf, basePath); - HoodieWriteConfig cfg = HoodieWriteConfig.newBuilder().withPath(basePath) - .withSchema(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA).withParallelism(2, 2).forTable("test-trip-table") - .withCompactionConfig(HoodieCompactionConfig.newBuilder().retainCommits(1).archiveCommitsWith(2, 3).build()) - .build(); - - // when using insert_overwrite or insert_overwrite_table - // first commit may without replaceFileIds - createReplaceMetadataWithoutReplaceFileId("000"); - - int numCommits = 4; - int commitInstant = 100; - for (int i = 0; i < numCommits; i++) { - createReplaceMetadata(String.valueOf(commitInstant)); - commitInstant += 100; - } - - metaClient = HoodieTableMetaClient.reload(metaClient); - HoodieTimeline timeline = metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants(); - HoodieTable table = HoodieSparkTable.create(cfg, context, metaClient); - assertEquals(5, timeline.countInstants(), "Loaded 5 commits and the count should match"); - HoodieTimelineArchiveLog archiveLog = new HoodieTimelineArchiveLog(cfg, table); - boolean result = archiveLog.archiveIfRequired(context); - assertTrue(result); - - FileStatus[] allFiles = metaClient.getFs().listStatus(new Path(basePath + "/" + HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH)); - Set allFileIds = Arrays.stream(allFiles).map(fs -> FSUtils.getFileIdFromFilePath(fs.getPath())).collect(Collectors.toSet()); - - // verify 100-1,200-1 are deleted by archival - assertFalse(allFileIds.contains("file-100-1")); - assertFalse(allFileIds.contains("file-200-1")); - assertTrue(allFileIds.contains("file-100-2")); - assertTrue(allFileIds.contains("file-200-2")); - assertTrue(allFileIds.contains("file-300-1")); - assertTrue(allFileIds.contains("file-400-1")); - } - @Test public void testArchiveTableWithNoArchival() throws IOException { HoodieWriteConfig cfg = HoodieWriteConfig.newBuilder().withPath(basePath) @@ -679,42 +636,6 @@ public class TestHoodieTimelineArchiveLog extends HoodieClientTestHarness { assertEquals(notArchivedInstants, Arrays.asList(notArchivedInstant1, notArchivedInstant2, notArchivedInstant3), ""); } - private void createReplaceMetadataWithoutReplaceFileId(String instantTime) throws Exception { - - // create replace instant without a previous replace commit - HoodieRequestedReplaceMetadata requestedReplaceMetadata = HoodieRequestedReplaceMetadata.newBuilder() - .setOperationType(WriteOperationType.INSERT_OVERWRITE_TABLE.toString()) - .setVersion(1) - .setExtraMetadata(Collections.emptyMap()) - .build(); - HoodieReplaceCommitMetadata completeReplaceMetadata = new HoodieReplaceCommitMetadata(); - HoodieCommitMetadata inflightReplaceMetadata = new HoodieCommitMetadata(); - completeReplaceMetadata.setOperationType(WriteOperationType.INSERT_OVERWRITE_TABLE); - inflightReplaceMetadata.setOperationType(WriteOperationType.INSERT_OVERWRITE_TABLE); - HoodieTestTable.of(metaClient) - .addReplaceCommit(instantTime, Option.of(requestedReplaceMetadata), Option.of(inflightReplaceMetadata), completeReplaceMetadata); - } - - private void createReplaceMetadata(String instantTime) throws Exception { - String fileId1 = "file-" + instantTime + "-1"; - String fileId2 = "file-" + instantTime + "-2"; - - // create replace instant to mark fileId1 as deleted - HoodieRequestedReplaceMetadata requestedReplaceMetadata = HoodieRequestedReplaceMetadata.newBuilder() - .setOperationType(WriteOperationType.INSERT_OVERWRITE.toString()) - .setVersion(1) - .setExtraMetadata(Collections.emptyMap()) - .build(); - HoodieReplaceCommitMetadata completeReplaceMetadata = new HoodieReplaceCommitMetadata(); - HoodieCommitMetadata inflightReplaceMetadata = new HoodieCommitMetadata(); - completeReplaceMetadata.addReplaceFileId(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, fileId1); - completeReplaceMetadata.setOperationType(WriteOperationType.INSERT_OVERWRITE); - inflightReplaceMetadata.setOperationType(WriteOperationType.INSERT_OVERWRITE); - HoodieTestTable.of(metaClient) - .addReplaceCommit(instantTime, Option.of(requestedReplaceMetadata), Option.of(inflightReplaceMetadata), completeReplaceMetadata) - .withBaseFilesInPartition(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, fileId1, fileId2); - } - private HoodieInstant createCleanMetadata(String instantTime, boolean inflightOnly) throws IOException { HoodieCleanerPlan cleanerPlan = new HoodieCleanerPlan(new HoodieActionInstant("", "", ""), "", new HashMap<>(), CleanPlanV2MigrationHandler.VERSION, new HashMap<>());