diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieCompactionConfig.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieCompactionConfig.java index c42294f12..4d1e197cf 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieCompactionConfig.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieCompactionConfig.java @@ -255,6 +255,22 @@ public class HoodieCompactionConfig extends HoodieConfig { + "record size estimate compute dynamically based on commit metadata. " + " This is critical in computing the insert parallelism and bin-packing inserts into small files."); + public static final ConfigProperty ARCHIVE_MERGE_FILES_BATCH_SIZE = ConfigProperty + .key("hoodie.archive.merge.files.batch.size") + .defaultValue(10) + .withDocumentation("The number of small archive files to be merged at once."); + + public static final ConfigProperty ARCHIVE_MERGE_SMALL_FILE_LIMIT_BYTES = ConfigProperty + .key("hoodie.archive.merge.small.file.limit.bytes") + .defaultValue(20L * 1024 * 1024) + .withDocumentation("This config sets the archive file size limit below which an archive file becomes a candidate to be selected as such a small file."); + + public static final ConfigProperty ARCHIVE_MERGE_ENABLE = ConfigProperty + .key("hoodie.archive.merge.enable") + .defaultValue(false) + .withDocumentation("When enable, hoodie will auto merge several small archive files into larger one. It's" + + " useful when storage scheme doesn't support append operation."); + /** @deprecated Use {@link #CLEANER_POLICY} and its methods instead */ @Deprecated public static final String CLEANER_POLICY_PROP = CLEANER_POLICY.key(); @@ -547,6 +563,21 @@ public class HoodieCompactionConfig extends HoodieConfig { return this; } + public Builder withArchiveMergeFilesBatchSize(int number) { + compactionConfig.setValue(ARCHIVE_MERGE_FILES_BATCH_SIZE, String.valueOf(number)); + return this; + } + + public Builder withArchiveMergeSmallFileLimit(long size) { + compactionConfig.setValue(ARCHIVE_MERGE_SMALL_FILE_LIMIT_BYTES, String.valueOf(size)); + return this; + } + + public Builder withArchiveMergeEnable(boolean enable) { + compactionConfig.setValue(ARCHIVE_MERGE_ENABLE, String.valueOf(enable)); + return this; + } + public Builder compactionSmallFileSize(long smallFileLimitBytes) { compactionConfig.setValue(PARQUET_SMALL_FILE_LIMIT, String.valueOf(smallFileLimitBytes)); return this; diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java index 4c392d5c9..5a4989773 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java @@ -1082,6 +1082,10 @@ public class HoodieWriteConfig extends HoodieConfig { return getInt(HoodieCompactionConfig.MIN_COMMITS_TO_KEEP); } + public int getArchiveMergeFilesBatchSize() { + return getInt(HoodieCompactionConfig.ARCHIVE_MERGE_FILES_BATCH_SIZE); + } + public int getParquetSmallFileLimit() { return getInt(HoodieCompactionConfig.PARQUET_SMALL_FILE_LIMIT); } @@ -1110,6 +1114,14 @@ public class HoodieWriteConfig extends HoodieConfig { return getBoolean(HoodieCompactionConfig.AUTO_CLEAN); } + public boolean getArchiveMergeEnable() { + return getBoolean(HoodieCompactionConfig.ARCHIVE_MERGE_ENABLE); + } + + public long getArchiveMergeSmallFileLimitBytes() { + return getLong(HoodieCompactionConfig.ARCHIVE_MERGE_SMALL_FILE_LIMIT_BYTES); + } + public boolean isAutoArchive() { return getBoolean(HoodieCompactionConfig.AUTO_ARCHIVE); } diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTimelineArchiveLog.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTimelineArchiveLog.java index 58e03f5f3..138e40a90 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTimelineArchiveLog.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTimelineArchiveLog.java @@ -18,14 +18,19 @@ package org.apache.hudi.table; +import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hudi.avro.model.HoodieArchivedMetaEntry; +import org.apache.hudi.avro.model.HoodieMergeArchiveFilePlan; import org.apache.hudi.client.utils.MetadataConversionUtils; import org.apache.hudi.common.engine.HoodieEngineContext; import org.apache.hudi.common.fs.FSUtils; +import org.apache.hudi.common.fs.HoodieWrapperFileSystem; +import org.apache.hudi.common.fs.StorageSchemes; import org.apache.hudi.common.model.HoodieArchivedLogFile; import org.apache.hudi.common.model.HoodieAvroPayload; import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy; +import org.apache.hudi.common.model.HoodieLogFile; import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.log.HoodieLogFormat; import org.apache.hudi.common.table.log.HoodieLogFormat.Writer; @@ -36,8 +41,10 @@ import org.apache.hudi.common.table.timeline.HoodieActiveTimeline; import org.apache.hudi.common.table.timeline.HoodieArchivedTimeline; import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.table.timeline.HoodieTimeline; +import org.apache.hudi.common.table.timeline.TimelineMetadataUtils; import org.apache.hudi.common.table.view.FileSystemViewStorageConfig; import org.apache.hudi.common.util.CollectionUtils; +import org.apache.hudi.common.util.FileIOUtils; import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.collection.Pair; import org.apache.hudi.config.HoodieWriteConfig; @@ -57,6 +64,7 @@ import org.apache.log4j.Logger; import java.io.FileNotFoundException; import java.io.IOException; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collection; import java.util.Comparator; import java.util.HashMap; @@ -106,6 +114,19 @@ public class HoodieTimelineArchiveLog { } } + public Writer reOpenWriter() { + try { + if (this.writer != null) { + this.writer.close(); + this.writer = null; + } + this.writer = openWriter(); + return writer; + } catch (IOException e) { + throw new HoodieException("Unable to initialize HoodieLogFormat writer", e); + } + } + private void close() { try { if (this.writer != null) { @@ -122,7 +143,7 @@ public class HoodieTimelineArchiveLog { public boolean archiveIfRequired(HoodieEngineContext context) throws IOException { try { List instantsToArchive = getInstantsToArchive().collect(Collectors.toList()); - + verifyLastMergeArchiveFilesIfNecessary(context); boolean success = true; if (!instantsToArchive.isEmpty()) { this.writer = openWriter(); @@ -134,12 +155,212 @@ public class HoodieTimelineArchiveLog { LOG.info("No Instants to archive"); } + if (shouldMergeSmallArchiveFies()) { + mergeArchiveFilesIfNecessary(context); + } return success; } finally { close(); } } + public boolean shouldMergeSmallArchiveFies() { + return config.getArchiveMergeEnable() && !StorageSchemes.isAppendSupported(metaClient.getFs().getScheme()); + } + + /** + * Here Hoodie can merge the small archive files into a new larger one. + * Only used for filesystem which does not support append operation. + * The whole merge small archive files operation has four stages: + * 1. Build merge plan with merge candidates/merged file name infos. + * 2. Do merge. + * 3. Delete all the candidates. + * 4. Delete the merge plan. + * @param context HoodieEngineContext + * @throws IOException + */ + private void mergeArchiveFilesIfNecessary(HoodieEngineContext context) throws IOException { + Path planPath = new Path(metaClient.getArchivePath(), HoodieArchivedTimeline.MERGE_ARCHIVE_PLAN_NAME); + // Flush remained content if existed and open a new write + reOpenWriter(); + // List all archive files + FileStatus[] fsStatuses = metaClient.getFs().globStatus( + new Path(metaClient.getArchivePath() + "/.commits_.archive*")); + // Sort files by version suffix in reverse (implies reverse chronological order) + Arrays.sort(fsStatuses, new HoodieArchivedTimeline.ArchiveFileVersionComparator()); + + int archiveMergeFilesBatchSize = config.getArchiveMergeFilesBatchSize(); + long smallFileLimitBytes = config.getArchiveMergeSmallFileLimitBytes(); + + List mergeCandidate = getMergeCandidates(smallFileLimitBytes, fsStatuses); + + if (mergeCandidate.size() >= archiveMergeFilesBatchSize) { + List candidateFiles = mergeCandidate.stream().map(fs -> fs.getPath().toString()).collect(Collectors.toList()); + // before merge archive files build merge plan + String logFileName = computeLogFileName(); + buildArchiveMergePlan(candidateFiles, planPath, logFileName); + // merge archive files + mergeArchiveFiles(mergeCandidate); + // after merge, delete the small archive files. + deleteFilesParallelize(metaClient, candidateFiles, context, true); + LOG.info("Success to delete replaced small archive files."); + // finally, delete archiveMergePlan which means merging small archive files operation is succeed. + metaClient.getFs().delete(planPath, false); + LOG.info("Success to merge small archive files."); + } + } + + /** + * Find the latest 'huge archive file' index as a break point and only check/merge newer archive files. + * Because we need to keep the original order of archive files which is important when loading archived instants with time filter. + * {@link HoodieArchivedTimeline} loadInstants(TimeRangeFilter filter, boolean loadInstantDetails, Function commitsFilter) + * @param smallFileLimitBytes small File Limit Bytes + * @param fsStatuses Sort by version suffix in reverse + * @return merge candidates + */ + private List getMergeCandidates(long smallFileLimitBytes, FileStatus[] fsStatuses) { + int index = 0; + for (; index < fsStatuses.length; index++) { + if (fsStatuses[index].getLen() > smallFileLimitBytes) { + break; + } + } + return Arrays.stream(fsStatuses).limit(index).collect(Collectors.toList()); + } + + /** + * Get final written archive file name based on storageSchemes which does not support append. + */ + private String computeLogFileName() throws IOException { + String logWriteToken = writer.getLogFile().getLogWriteToken(); + HoodieLogFile hoodieLogFile = writer.getLogFile().rollOver(metaClient.getFs(), logWriteToken); + return hoodieLogFile.getFileName(); + } + + /** + * Check/Solve if there is any failed and unfinished merge small archive files operation + * @param context HoodieEngineContext used for parallelize to delete small archive files if necessary. + * @throws IOException + */ + private void verifyLastMergeArchiveFilesIfNecessary(HoodieEngineContext context) throws IOException { + if (shouldMergeSmallArchiveFies()) { + Path planPath = new Path(metaClient.getArchivePath(), HoodieArchivedTimeline.MERGE_ARCHIVE_PLAN_NAME); + HoodieWrapperFileSystem fs = metaClient.getFs(); + // If plan exist, last merge small archive files was failed. + // we need to revert or complete last action. + if (fs.exists(planPath)) { + HoodieMergeArchiveFilePlan plan = null; + try { + plan = TimelineMetadataUtils.deserializeAvroMetadata(FileIOUtils.readDataFromPath(fs, planPath).get(), HoodieMergeArchiveFilePlan.class); + } catch (IOException e) { + LOG.warn("Parsing merge archive plan failed.", e); + // Reading partial plan file which means last merge action is failed during writing plan file. + fs.delete(planPath); + return; + } + Path mergedArchiveFile = new Path(metaClient.getArchivePath(), plan.getMergedArchiveFileName()); + List candidates = plan.getCandidate().stream().map(Path::new).collect(Collectors.toList()); + if (candidateAllExists(candidates)) { + // Last merge action is failed during writing merged archive file. + // But all the small archive files are not deleted. + // Revert last action by deleting mergedArchiveFile if existed. + if (fs.exists(mergedArchiveFile)) { + fs.delete(mergedArchiveFile, false); + } + } else { + // Last merge action is failed during deleting small archive files. + // But the merged files is completed. + // Try to complete last action + if (fs.exists(mergedArchiveFile)) { + deleteFilesParallelize(metaClient, plan.getCandidate(), context, true); + } + } + + fs.delete(planPath); + } + } + } + + /** + * If all the candidate small archive files existed, last merge operation was failed during writing the merged archive file. + * If at least one of candidate small archive files existed, the merged archive file was created and last operation was failed during deleting the small archive files. + */ + private boolean candidateAllExists(List candidates) throws IOException { + for (Path archiveFile : candidates) { + if (!metaClient.getFs().exists(archiveFile)) { + // candidate is deleted + return false; + } + } + return true; + } + + public void buildArchiveMergePlan(List compactCandidate, Path planPath, String compactedArchiveFileName) throws IOException { + LOG.info("Start to build archive merge plan."); + HoodieMergeArchiveFilePlan plan = HoodieMergeArchiveFilePlan.newBuilder() + .setCandidate(compactCandidate) + .setMergedArchiveFileName(compactedArchiveFileName) + .build(); + Option content = TimelineMetadataUtils.serializeAvroMetadata(plan, HoodieMergeArchiveFilePlan.class); + // building merge archive files plan. + FileIOUtils.createFileInPath(metaClient.getFs(), planPath, content); + LOG.info("Success to build archive merge plan"); + } + + public void mergeArchiveFiles(List compactCandidate) throws IOException { + LOG.info("Starting to merge small archive files."); + Schema wrapperSchema = HoodieArchivedMetaEntry.getClassSchema(); + try { + List records = new ArrayList<>(); + for (FileStatus fs : compactCandidate) { + // Read the archived file + try (HoodieLogFormat.Reader reader = HoodieLogFormat.newReader(metaClient.getFs(), + new HoodieLogFile(fs.getPath()), HoodieArchivedMetaEntry.getClassSchema())) { + // Read the avro blocks + while (reader.hasNext()) { + HoodieAvroDataBlock blk = (HoodieAvroDataBlock) reader.next(); + List recordsPerFile = blk.getRecords(); + records.addAll(recordsPerFile); + if (records.size() >= this.config.getCommitArchivalBatchSize()) { + writeToFile(wrapperSchema, records); + } + } + } + } + writeToFile(wrapperSchema, records); + } catch (Exception e) { + throw new HoodieCommitException("Failed to merge small archive files", e); + } finally { + writer.close(); + } + LOG.info("Success to merge small archive files."); + } + + private Map deleteFilesParallelize(HoodieTableMetaClient metaClient, List paths, HoodieEngineContext context, boolean ignoreFailed) { + + return FSUtils.parallelizeFilesProcess(context, + metaClient.getFs(), + config.getArchiveDeleteParallelism(), + pairOfSubPathAndConf -> { + Path file = new Path(pairOfSubPathAndConf.getKey()); + try { + FileSystem fs = metaClient.getFs(); + if (fs.exists(file)) { + return fs.delete(file, false); + } + return true; + } catch (IOException e) { + if (!ignoreFailed) { + throw new HoodieIOException("Failed to delete : " + file, e); + } else { + LOG.warn("Ignore failed deleting : " + file); + return true; + } + } + }, + paths); + } + private Stream getCleanInstantsToArchive() { HoodieTimeline cleanAndRollbackTimeline = table.getActiveTimeline() .getTimelineOfActions(CollectionUtils.createSet(HoodieTimeline.CLEAN_ACTION, HoodieTimeline.ROLLBACK_ACTION)).filterCompletedInstants(); @@ -238,22 +459,7 @@ public class HoodieTimelineArchiveLog { ).map(Path::toString).collect(Collectors.toList()); context.setJobStatus(this.getClass().getSimpleName(), "Delete archived instants"); - Map resultDeleteInstantFiles = FSUtils.parallelizeFilesProcess(context, - metaClient.getFs(), - config.getArchiveDeleteParallelism(), - pairOfSubPathAndConf -> { - Path commitFile = new Path(pairOfSubPathAndConf.getKey()); - try { - FileSystem fs = commitFile.getFileSystem(pairOfSubPathAndConf.getValue().get()); - if (fs.exists(commitFile)) { - return fs.delete(commitFile, false); - } - return true; - } catch (IOException e) { - throw new HoodieIOException("Failed to delete archived instant " + commitFile, e); - } - }, - instantFiles); + Map resultDeleteInstantFiles = deleteFilesParallelize(metaClient, instantFiles, context, false); for (Map.Entry result : resultDeleteInstantFiles.entrySet()) { LOG.info("Archived and deleted instant file " + result.getKey() + " : " + result.getValue()); diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestHoodieTimelineArchiveLog.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestHoodieTimelineArchiveLog.java index 64ad2d6f7..4902d7426 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestHoodieTimelineArchiveLog.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestHoodieTimelineArchiveLog.java @@ -18,6 +18,7 @@ package org.apache.hudi.io; +import org.apache.hadoop.fs.FileStatus; import org.apache.hudi.avro.model.HoodieRollbackMetadata; import org.apache.hudi.client.utils.MetadataConversionUtils; import org.apache.hudi.common.config.HoodieMetadataConfig; @@ -26,6 +27,8 @@ import org.apache.hudi.common.model.HoodieCommitMetadata; import org.apache.hudi.common.model.HoodieTableType; import org.apache.hudi.common.model.WriteOperationType; import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.common.table.log.HoodieLogFormat; +import org.apache.hudi.common.table.timeline.HoodieActiveTimeline; import org.apache.hudi.common.table.timeline.HoodieArchivedTimeline; import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.table.timeline.HoodieInstant.State; @@ -35,9 +38,12 @@ import org.apache.hudi.common.testutils.HoodieMetadataTestTable; import org.apache.hudi.common.testutils.HoodieTestDataGenerator; import org.apache.hudi.common.testutils.HoodieTestTable; import org.apache.hudi.common.testutils.HoodieTestUtils; +import org.apache.hudi.common.util.FileIOUtils; +import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.collection.Pair; import org.apache.hudi.config.HoodieCompactionConfig; import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.exception.HoodieException; import org.apache.hudi.metadata.HoodieTableMetadataWriter; import org.apache.hudi.metadata.SparkHoodieBackedTableMetadataWriter; import org.apache.hudi.table.HoodieSparkTable; @@ -68,6 +74,7 @@ import java.util.stream.Stream; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; public class TestHoodieTimelineArchiveLog extends HoodieClientTestHarness { @@ -113,12 +120,41 @@ public class TestHoodieTimelineArchiveLog extends HoodieClientTestHarness { return initTestTableAndGetWriteConfig(enableMetadata, minArchivalCommits, maxArchivalCommits, maxDeltaCommitsMetadataTable, HoodieTableType.COPY_ON_WRITE); } - private HoodieWriteConfig initTestTableAndGetWriteConfig(boolean enableMetadata, int minArchivalCommits, int maxArchivalCommits, int maxDeltaCommitsMetadataTable, + private HoodieWriteConfig initTestTableAndGetWriteConfig(boolean enableMetadata, + int minArchivalCommits, + int maxArchivalCommits, + int maxDeltaCommitsMetadataTable, HoodieTableType tableType) throws Exception { + return initTestTableAndGetWriteConfig(enableMetadata, minArchivalCommits, maxArchivalCommits, maxDeltaCommitsMetadataTable, tableType, false, 10, 209715200); + } + + private HoodieWriteConfig initTestTableAndGetWriteConfig(boolean enableMetadata, + int minArchivalCommits, + int maxArchivalCommits, + int maxDeltaCommitsMetadataTable, + boolean enableArchiveMerge, + int archiveFilesBatch, + long size) throws Exception { + return initTestTableAndGetWriteConfig(enableMetadata, minArchivalCommits, maxArchivalCommits, + maxDeltaCommitsMetadataTable, HoodieTableType.COPY_ON_WRITE, enableArchiveMerge, archiveFilesBatch, size); + } + + private HoodieWriteConfig initTestTableAndGetWriteConfig(boolean enableMetadata, + int minArchivalCommits, + int maxArchivalCommits, + int maxDeltaCommitsMetadataTable, + HoodieTableType tableType, + boolean enableArchiveMerge, + int archiveFilesBatch, + long size) throws Exception { init(tableType); HoodieWriteConfig writeConfig = HoodieWriteConfig.newBuilder().withPath(basePath) .withSchema(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA).withParallelism(2, 2) - .withCompactionConfig(HoodieCompactionConfig.newBuilder().retainCommits(1).archiveCommitsWith(minArchivalCommits, maxArchivalCommits).build()) + .withCompactionConfig(HoodieCompactionConfig.newBuilder().retainCommits(1).archiveCommitsWith(minArchivalCommits, maxArchivalCommits) + .withArchiveMergeEnable(enableArchiveMerge) + .withArchiveMergeFilesBatchSize(archiveFilesBatch) + .withArchiveMergeSmallFileLimit(size) + .build()) .withFileSystemViewConfig(FileSystemViewStorageConfig.newBuilder() .withRemoteServerPort(timelineServicePort).build()) .withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(enableMetadata) @@ -174,6 +210,222 @@ public class TestHoodieTimelineArchiveLog extends HoodieClientTestHarness { } } + @ParameterizedTest + @ValueSource(booleans = {true, false}) + public void testMergeSmallArchiveFilesRecoverFromBuildPlanFailed(boolean enableArchiveMerge) throws Exception { + HoodieWriteConfig writeConfig = initTestTableAndGetWriteConfig(false, 2, 3, 2, enableArchiveMerge, 3, 209715200); + + // do ingestion and trigger archive actions here. + for (int i = 1; i < 8; i++) { + testTable.doWriteOperation("0000000" + i, WriteOperationType.UPSERT, i == 1 ? Arrays.asList("p1", "p2") : Collections.emptyList(), Arrays.asList("p1", "p2"), 2); + archiveAndGetCommitsList(writeConfig); + } + + // build a merge small archive plan with dummy content + // this plan can not be deserialized. + HoodieTable table = HoodieSparkTable.create(writeConfig, context, metaClient); + HoodieTimelineArchiveLog archiveLog = new HoodieTimelineArchiveLog(writeConfig, table); + FileStatus[] fsStatuses = metaClient.getFs().globStatus( + new Path(metaClient.getArchivePath() + "/.commits_.archive*")); + List candidateFiles = Arrays.stream(fsStatuses).map(fs -> fs.getPath().toString()).collect(Collectors.toList()); + + archiveLog.reOpenWriter(); + Path plan = new Path(metaClient.getArchivePath(), HoodieArchivedTimeline.MERGE_ARCHIVE_PLAN_NAME); + archiveLog.buildArchiveMergePlan(candidateFiles, plan, ".commits_.archive.3_1-0-1"); + String s = "Dummy Content"; + // stain the current merge plan file. + FileIOUtils.createFileInPath(metaClient.getFs(), plan, Option.of(s.getBytes())); + + // check that damaged plan file will not block archived timeline loading. + HoodieActiveTimeline rawActiveTimeline = new HoodieActiveTimeline(metaClient, false); + HoodieArchivedTimeline archivedTimeLine = metaClient.getArchivedTimeline().reload(); + assertEquals(7 * 3, rawActiveTimeline.countInstants() + archivedTimeLine.countInstants()); + + // trigger several archive after left damaged merge small archive file plan. + for (int i = 1; i < 10; i++) { + testTable.doWriteOperation("1000000" + i, WriteOperationType.UPSERT, i == 1 ? Arrays.asList("p1", "p2") : Collections.emptyList(), Arrays.asList("p1", "p2"), 2); + archiveAndGetCommitsList(writeConfig); + } + + // loading archived timeline and active timeline success + HoodieActiveTimeline rawActiveTimeline1 = new HoodieActiveTimeline(metaClient, false); + HoodieArchivedTimeline archivedTimeLine1 = metaClient.getArchivedTimeline().reload(); + + // check instant number + assertEquals(16 * 3, archivedTimeLine1.countInstants() + rawActiveTimeline1.countInstants()); + + // if there are damaged archive files and damaged plan, hoodie need throw ioe while loading archived timeline. + Path damagedFile = new Path(metaClient.getArchivePath(), ".commits_.archive.300_1-0-1"); + FileIOUtils.createFileInPath(metaClient.getFs(), damagedFile, Option.of(s.getBytes())); + + assertThrows(HoodieException.class, () -> metaClient.getArchivedTimeline().reload()); + } + + @ParameterizedTest + @ValueSource(booleans = {true, false}) + public void testMergeSmallArchiveFilesRecoverFromMergeFailed(boolean enableArchiveMerge) throws Exception { + HoodieWriteConfig writeConfig = initTestTableAndGetWriteConfig(false, 2, 3, 2, enableArchiveMerge, 3, 209715200); + + // do ingestion and trigger archive actions here. + for (int i = 1; i < 8; i++) { + testTable.doWriteOperation("0000000" + i, WriteOperationType.UPSERT, i == 1 ? Arrays.asList("p1", "p2") : Collections.emptyList(), Arrays.asList("p1", "p2"), 2); + archiveAndGetCommitsList(writeConfig); + } + + // do a single merge small archive files + HoodieTable table = HoodieSparkTable.create(writeConfig, context, metaClient); + HoodieTimelineArchiveLog archiveLog = new HoodieTimelineArchiveLog(writeConfig, table); + FileStatus[] fsStatuses = metaClient.getFs().globStatus( + new Path(metaClient.getArchivePath() + "/.commits_.archive*")); + List candidateFiles = Arrays.stream(fsStatuses).map(fs -> fs.getPath().toString()).collect(Collectors.toList()); + archiveLog.reOpenWriter(); + + archiveLog.buildArchiveMergePlan(candidateFiles, new Path(metaClient.getArchivePath(), HoodieArchivedTimeline.MERGE_ARCHIVE_PLAN_NAME), ".commits_.archive.3_1-0-1"); + archiveLog.mergeArchiveFiles(Arrays.stream(fsStatuses).collect(Collectors.toList())); + HoodieLogFormat.Writer writer = archiveLog.reOpenWriter(); + + // check loading archived and active timeline success + HoodieActiveTimeline rawActiveTimeline = new HoodieActiveTimeline(metaClient, false); + HoodieArchivedTimeline archivedTimeLine = metaClient.getArchivedTimeline().reload(); + assertEquals(7 * 3, rawActiveTimeline.countInstants() + archivedTimeLine.reload().countInstants()); + + String s = "Dummy Content"; + // stain the current merged archive file. + FileIOUtils.createFileInPath(metaClient.getFs(), writer.getLogFile().getPath(), Option.of(s.getBytes())); + + // do another archive actions with merge small archive files. + for (int i = 1; i < 10; i++) { + testTable.doWriteOperation("1000000" + i, WriteOperationType.UPSERT, i == 1 ? Arrays.asList("p1", "p2") : Collections.emptyList(), Arrays.asList("p1", "p2"), 2); + archiveAndGetCommitsList(writeConfig); + } + + // check result. + // we need to load archived timeline successfully and ignore the parsing damage merged archive files exception. + HoodieActiveTimeline rawActiveTimeline1 = new HoodieActiveTimeline(metaClient, false); + HoodieArchivedTimeline archivedTimeLine1 = metaClient.getArchivedTimeline().reload(); + + assertEquals(16 * 3, archivedTimeLine1.countInstants() + rawActiveTimeline1.countInstants()); + + // if there are a damaged merged archive files and other common damaged archive file. + // hoodie need throw ioe while loading archived timeline because of parsing the damaged archive file. + Path damagedFile = new Path(metaClient.getArchivePath(), ".commits_.archive.300_1-0-1"); + FileIOUtils.createFileInPath(metaClient.getFs(), damagedFile, Option.of(s.getBytes())); + + assertThrows(HoodieException.class, () -> metaClient.getArchivedTimeline().reload()); + } + + @ParameterizedTest + @ValueSource(booleans = {true, false}) + public void testMergeSmallArchiveFilesRecoverFromDeleteFailed(boolean enableArchiveMerge) throws Exception { + HoodieWriteConfig writeConfig = initTestTableAndGetWriteConfig(false, 2, 3, 2, enableArchiveMerge, 3, 209715200); + + // do ingestion and trigger archive actions here. + for (int i = 1; i < 8; i++) { + testTable.doWriteOperation("0000000" + i, WriteOperationType.UPSERT, i == 1 ? Arrays.asList("p1", "p2") : Collections.emptyList(), Arrays.asList("p1", "p2"), 2); + archiveAndGetCommitsList(writeConfig); + } + + // do a single merge small archive files + HoodieTable table = HoodieSparkTable.create(writeConfig, context, metaClient); + HoodieTimelineArchiveLog archiveLog = new HoodieTimelineArchiveLog(writeConfig, table); + FileStatus[] fsStatuses = metaClient.getFs().globStatus( + new Path(metaClient.getArchivePath() + "/.commits_.archive*")); + List candidateFiles = Arrays.stream(fsStatuses).map(fs -> fs.getPath().toString()).collect(Collectors.toList()); + + archiveLog.reOpenWriter(); + + archiveLog.buildArchiveMergePlan(candidateFiles, new Path(metaClient.getArchivePath(), HoodieArchivedTimeline.MERGE_ARCHIVE_PLAN_NAME), ".commits_.archive.3_1-0-1"); + archiveLog.mergeArchiveFiles(Arrays.stream(fsStatuses).collect(Collectors.toList())); + archiveLog.reOpenWriter(); + + // delete only one of the small archive file to simulate delete action failed. + metaClient.getFs().delete(fsStatuses[0].getPath()); + + // loading archived timeline and active timeline success + HoodieActiveTimeline rawActiveTimeline = new HoodieActiveTimeline(metaClient, false); + HoodieArchivedTimeline archivedTimeLine = metaClient.getArchivedTimeline().reload(); + assertEquals(7 * 3, rawActiveTimeline.countInstants() + archivedTimeLine.countInstants()); + + // do another archive actions with merge small archive files. + for (int i = 1; i < 10; i++) { + testTable.doWriteOperation("1000000" + i, WriteOperationType.UPSERT, i == 1 ? Arrays.asList("p1", "p2") : Collections.emptyList(), Arrays.asList("p1", "p2"), 2); + archiveAndGetCommitsList(writeConfig); + } + + // check result. + HoodieActiveTimeline rawActiveTimeline1 = new HoodieActiveTimeline(metaClient, false); + HoodieArchivedTimeline archivedTimeLine1 = metaClient.getArchivedTimeline().reload(); + + assertEquals(16 * 3, archivedTimeLine1.countInstants() + rawActiveTimeline1.countInstants()); + } + + @ParameterizedTest + @ValueSource(booleans = {true, false}) + public void testLoadArchiveTimelineWithDamagedPlanFile(boolean enableArchiveMerge) throws Exception { + HoodieWriteConfig writeConfig = initTestTableAndGetWriteConfig(false, 2, 3, 2, enableArchiveMerge, 3, 209715200); + + // do ingestion and trigger archive actions here. + for (int i = 1; i < 8; i++) { + testTable.doWriteOperation("0000000" + i, WriteOperationType.UPSERT, i == 1 ? Arrays.asList("p1", "p2") : Collections.emptyList(), Arrays.asList("p1", "p2"), 2); + archiveAndGetCommitsList(writeConfig); + } + + Path plan = new Path(metaClient.getArchivePath(), HoodieArchivedTimeline.MERGE_ARCHIVE_PLAN_NAME); + String s = "Dummy Content"; + // stain the current merge plan file. + FileIOUtils.createFileInPath(metaClient.getFs(), plan, Option.of(s.getBytes())); + + // check that damaged plan file will not block archived timeline loading. + HoodieActiveTimeline rawActiveTimeline = new HoodieActiveTimeline(metaClient, false); + HoodieArchivedTimeline archivedTimeLine = metaClient.getArchivedTimeline().reload(); + assertEquals(7 * 3, rawActiveTimeline.countInstants() + archivedTimeLine.countInstants()); + + // if there are damaged archive files and damaged plan, hoodie need throw ioe while loading archived timeline. + Path damagedFile = new Path(metaClient.getArchivePath(), ".commits_.archive.300_1-0-1"); + FileIOUtils.createFileInPath(metaClient.getFs(), damagedFile, Option.of(s.getBytes())); + + assertThrows(HoodieException.class, () -> metaClient.getArchivedTimeline().reload()); + } + + @ParameterizedTest + @ValueSource(booleans = {true, false}) + public void testLoadArchiveTimelineWithUncompletedMergeArchiveFile(boolean enableArchiveMerge) throws Exception { + HoodieWriteConfig writeConfig = initTestTableAndGetWriteConfig(false, 2, 3, 2, enableArchiveMerge, 3, 209715200); + for (int i = 1; i < 8; i++) { + testTable.doWriteOperation("0000000" + i, WriteOperationType.UPSERT, i == 1 ? Arrays.asList("p1", "p2") : Collections.emptyList(), Arrays.asList("p1", "p2"), 2); + archiveAndGetCommitsList(writeConfig); + } + + HoodieTable table = HoodieSparkTable.create(writeConfig, context, metaClient); + HoodieTimelineArchiveLog archiveLog = new HoodieTimelineArchiveLog(writeConfig, table); + FileStatus[] fsStatuses = metaClient.getFs().globStatus( + new Path(metaClient.getArchivePath() + "/.commits_.archive*")); + List candidateFiles = Arrays.stream(fsStatuses).map(fs -> fs.getPath().toString()).collect(Collectors.toList()); + + archiveLog.reOpenWriter(); + + archiveLog.buildArchiveMergePlan(candidateFiles, new Path(metaClient.getArchivePath(), HoodieArchivedTimeline.MERGE_ARCHIVE_PLAN_NAME), ".commits_.archive.3_1-0-1"); + archiveLog.mergeArchiveFiles(Arrays.stream(fsStatuses).collect(Collectors.toList())); + HoodieLogFormat.Writer writer = archiveLog.reOpenWriter(); + + String s = "Dummy Content"; + // stain the current merged archive file. + FileIOUtils.createFileInPath(metaClient.getFs(), writer.getLogFile().getPath(), Option.of(s.getBytes())); + + // if there's only a damaged merged archive file, we need to ignore the exception while reading this damaged file. + HoodieActiveTimeline rawActiveTimeline1 = new HoodieActiveTimeline(metaClient, false); + HoodieArchivedTimeline archivedTimeLine1 = metaClient.getArchivedTimeline(); + + assertEquals(7 * 3, archivedTimeLine1.countInstants() + rawActiveTimeline1.countInstants()); + + // if there are a damaged merged archive files and other common damaged archive file. + // hoodie need throw ioe while loading archived timeline because of parsing the damaged archive file. + Path damagedFile = new Path(metaClient.getArchivePath(), ".commits_.archive.300_1-0-1"); + FileIOUtils.createFileInPath(metaClient.getFs(), damagedFile, Option.of(s.getBytes())); + + assertThrows(HoodieException.class, () -> metaClient.getArchivedTimeline().reload()); + } + @ParameterizedTest @ValueSource(booleans = {true, false}) public void testNoArchivalUntilMaxArchiveConfigWithExtraInflightCommits(boolean enableMetadata) throws Exception { diff --git a/hudi-common/src/main/avro/HoodieMergeArchiveFilePlan.avsc b/hudi-common/src/main/avro/HoodieMergeArchiveFilePlan.avsc new file mode 100644 index 000000000..2284109f7 --- /dev/null +++ b/hudi-common/src/main/avro/HoodieMergeArchiveFilePlan.avsc @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +{ + "namespace":"org.apache.hudi.avro.model", + "type":"record", + "name":"HoodieMergeArchiveFilePlan", + "fields":[ + { + "name":"version", + "type":["int", "null"], + "default": 1 + }, + { + "name":"candidate", + "type":["null", { + "type":"array", + "items": "string" + }], + "default": null + }, + { + "name":"mergedArchiveFileName", + "type":["null", "string"], + "default": null + } + ] +} diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieActiveTimeline.java b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieActiveTimeline.java index 2fa0b95fd..c7473bd7d 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieActiveTimeline.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieActiveTimeline.java @@ -361,7 +361,7 @@ public class HoodieActiveTimeline extends HoodieDefaultTimeline { private void createFileInAuxiliaryFolder(HoodieInstant instant, Option data) { // This will be removed in future release. See HUDI-546 Path fullPath = new Path(metaClient.getMetaAuxiliaryPath(), instant.getFileName()); - createFileInPath(fullPath, data); + FileIOUtils.createFileInPath(metaClient.getFs(), fullPath, data); } //----------------------------------------------------------------- @@ -505,7 +505,7 @@ public class HoodieActiveTimeline extends HoodieDefaultTimeline { fromInstant.getFileName()))); // Use Write Once to create Target File if (allowRedundantTransitions) { - createFileInPath(new Path(metaClient.getMetaPath(), toInstant.getFileName()), data); + FileIOUtils.createFileInPath(metaClient.getFs(), new Path(metaClient.getMetaPath(), toInstant.getFileName()), data); } else { createImmutableFileInPath(new Path(metaClient.getMetaPath(), toInstant.getFileName()), data); } @@ -602,33 +602,12 @@ public class HoodieActiveTimeline extends HoodieDefaultTimeline { private void createFileInMetaPath(String filename, Option content, boolean allowOverwrite) { Path fullPath = new Path(metaClient.getMetaPath(), filename); if (allowOverwrite || metaClient.getTimelineLayoutVersion().isNullVersion()) { - createFileInPath(fullPath, content); + FileIOUtils.createFileInPath(metaClient.getFs(), fullPath, content); } else { createImmutableFileInPath(fullPath, content); } } - private void createFileInPath(Path fullPath, Option content) { - try { - // If the path does not exist, create it first - if (!metaClient.getFs().exists(fullPath)) { - if (metaClient.getFs().createNewFile(fullPath)) { - LOG.info("Created a new file in meta path: " + fullPath); - } else { - throw new HoodieIOException("Failed to create file " + fullPath); - } - } - - if (content.isPresent()) { - FSDataOutputStream fsout = metaClient.getFs().create(fullPath, true); - fsout.write(content.get()); - fsout.close(); - } - } catch (IOException e) { - throw new HoodieIOException("Failed to create file " + fullPath, e); - } - } - /** * Creates a new file in timeline with overwrite set to false. This ensures * files are created only once and never rewritten diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieArchivedTimeline.java b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieArchivedTimeline.java index 6090e0861..5ad3fa7a9 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieArchivedTimeline.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieArchivedTimeline.java @@ -20,13 +20,17 @@ package org.apache.hudi.common.table.timeline; import org.apache.hudi.avro.HoodieAvroUtils; import org.apache.hudi.avro.model.HoodieArchivedMetaEntry; +import org.apache.hudi.avro.model.HoodieMergeArchiveFilePlan; +import org.apache.hudi.common.fs.HoodieWrapperFileSystem; import org.apache.hudi.common.model.HoodieLogFile; import org.apache.hudi.common.model.HoodiePartitionMetadata; import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.log.HoodieLogFormat; import org.apache.hudi.common.table.log.block.HoodieAvroDataBlock; import org.apache.hudi.common.util.CollectionUtils; +import org.apache.hudi.common.util.FileIOUtils; import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.StringUtils; import org.apache.hudi.exception.HoodieIOException; import org.apache.avro.generic.GenericRecord; @@ -46,6 +50,7 @@ import java.util.Arrays; import java.util.Collections; import java.util.Comparator; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; @@ -66,6 +71,7 @@ import java.util.stream.Collectors; * This class can be serialized and de-serialized and on de-serialization the FileSystem is re-initialized. */ public class HoodieArchivedTimeline extends HoodieDefaultTimeline { + public static final String MERGE_ARCHIVE_PLAN_NAME = "mergeArchivePlan"; private static final Pattern ARCHIVE_FILE_PATTERN = Pattern.compile("^\\.commits_\\.archive\\.([0-9]+).*"); @@ -218,7 +224,7 @@ public class HoodieArchivedTimeline extends HoodieDefaultTimeline { // Sort files by version suffix in reverse (implies reverse chronological order) Arrays.sort(fsStatuses, new ArchiveFileVersionComparator()); - List instantsInRange = new ArrayList<>(); + Set instantsInRange = new HashSet<>(); for (FileStatus fs : fsStatuses) { // Read the archived file try (HoodieLogFormat.Reader reader = HoodieLogFormat.newReader(metaClient.getFs(), @@ -248,11 +254,32 @@ public class HoodieArchivedTimeline extends HoodieDefaultTimeline { break; } } + } catch (Exception originalException) { + // merge small archive files may left uncompleted archive file which will cause exception. + // need to ignore this kind of exception here. + try { + Path planPath = new Path(metaClient.getArchivePath(), MERGE_ARCHIVE_PLAN_NAME); + HoodieWrapperFileSystem fileSystem = metaClient.getFs(); + if (fileSystem.exists(planPath)) { + HoodieMergeArchiveFilePlan plan = TimelineMetadataUtils.deserializeAvroMetadata(FileIOUtils.readDataFromPath(fileSystem, planPath).get(), HoodieMergeArchiveFilePlan.class); + String mergedArchiveFileName = plan.getMergedArchiveFileName(); + if (!StringUtils.isNullOrEmpty(mergedArchiveFileName) && fs.getPath().getName().equalsIgnoreCase(mergedArchiveFileName)) { + LOG.warn("Catch exception because of reading uncompleted merging archive file " + mergedArchiveFileName + ". Ignore it here."); + continue; + } + } + throw originalException; + } catch (Exception e) { + // If anything wrong during parsing merge archive plan, we need to throw the original exception. + // For example corrupted archive file and corrupted plan are both existed. + throw originalException; + } } } - Collections.sort(instantsInRange); - return instantsInRange; + ArrayList result = new ArrayList<>(instantsInRange); + Collections.sort(result); + return result; } catch (IOException e) { throw new HoodieIOException( "Could not load archived commit timeline from path " + metaClient.getArchivePath(), e); diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/FileIOUtils.java b/hudi-common/src/main/java/org/apache/hudi/common/util/FileIOUtils.java index 8fc43ef1c..6a9e2e1b3 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/util/FileIOUtils.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/util/FileIOUtils.java @@ -160,4 +160,48 @@ public class FileIOUtils { LOG.warn("IOException during close", e); } } + + public static void createFileInPath(FileSystem fileSystem, org.apache.hadoop.fs.Path fullPath, Option content, boolean ignoreIOE) { + try { + // If the path does not exist, create it first + if (!fileSystem.exists(fullPath)) { + if (fileSystem.createNewFile(fullPath)) { + LOG.info("Created a new file in meta path: " + fullPath); + } else { + throw new HoodieIOException("Failed to create file " + fullPath); + } + } + + if (content.isPresent()) { + FSDataOutputStream fsout = fileSystem.create(fullPath, true); + fsout.write(content.get()); + fsout.close(); + } + } catch (IOException e) { + LOG.warn("Failed to create file " + fullPath, e); + if (!ignoreIOE) { + throw new HoodieIOException("Failed to create file " + fullPath, e); + } + } + } + + public static void createFileInPath(FileSystem fileSystem, org.apache.hadoop.fs.Path fullPath, Option content) { + createFileInPath(fileSystem, fullPath, content, false); + } + + public static Option readDataFromPath(FileSystem fileSystem, org.apache.hadoop.fs.Path detailPath, boolean ignoreIOE) { + try (FSDataInputStream is = fileSystem.open(detailPath)) { + return Option.of(FileIOUtils.readAsByteArray(is)); + } catch (IOException e) { + LOG.warn("Could not read commit details from " + detailPath, e); + if (!ignoreIOE) { + throw new HoodieIOException("Could not read commit details from " + detailPath, e); + } + return Option.empty(); + } + } + + public static Option readDataFromPath(FileSystem fileSystem, org.apache.hadoop.fs.Path detailPath) { + return readDataFromPath(fileSystem, detailPath, false); + } }