diff --git a/hudi-client/src/main/java/org/apache/hudi/config/HoodieCompactionConfig.java b/hudi-client/src/main/java/org/apache/hudi/config/HoodieCompactionConfig.java index 1b993b1f4..08f377407 100644 --- a/hudi-client/src/main/java/org/apache/hudi/config/HoodieCompactionConfig.java +++ b/hudi-client/src/main/java/org/apache/hudi/config/HoodieCompactionConfig.java @@ -52,6 +52,8 @@ public class HoodieCompactionConfig extends DefaultHoodieConfig { public static final String MAX_COMMITS_TO_KEEP_PROP = "hoodie.keep.max.commits"; public static final String MIN_COMMITS_TO_KEEP_PROP = "hoodie.keep.min.commits"; public static final String COMMITS_ARCHIVAL_BATCH_SIZE_PROP = "hoodie.commits.archival.batch"; + // Set true to clean bootstrap source files when necessary + public static final String CLEANER_BOOTSTRAP_BASE_FILE_ENABLED = "hoodie.cleaner.delete.bootstrap.base.file"; // Upsert uses this file size to compact new data onto existing files.. public static final String PARQUET_SMALL_FILE_LIMIT_BYTES = "hoodie.parquet.small.file.limit"; // By default, treat any file <= 100MB as a small file. @@ -112,6 +114,7 @@ public class HoodieCompactionConfig extends DefaultHoodieConfig { private static final String DEFAULT_MAX_COMMITS_TO_KEEP = "30"; private static final String DEFAULT_MIN_COMMITS_TO_KEEP = "20"; private static final String DEFAULT_COMMITS_ARCHIVAL_BATCH_SIZE = String.valueOf(10); + private static final String DEFAULT_CLEANER_BOOTSTRAP_BASE_FILE_ENABLED = "false"; public static final String TARGET_PARTITIONS_PER_DAYBASED_COMPACTION_PROP = "hoodie.compaction.daybased.target.partitions"; // 500GB of target IO per compaction (both read and write) @@ -252,6 +255,11 @@ public class HoodieCompactionConfig extends DefaultHoodieConfig { return this; } + public Builder withCleanBootstrapBaseFileEnabled(Boolean cleanBootstrapSourceFileEnabled) { + props.setProperty(CLEANER_BOOTSTRAP_BASE_FILE_ENABLED, String.valueOf(cleanBootstrapSourceFileEnabled)); + return this; + } + public HoodieCompactionConfig build() { HoodieCompactionConfig config = new HoodieCompactionConfig(props); setDefaultOnCondition(props, !props.containsKey(AUTO_CLEAN_PROP), AUTO_CLEAN_PROP, DEFAULT_AUTO_CLEAN); @@ -298,6 +306,8 @@ public class HoodieCompactionConfig extends DefaultHoodieConfig { TARGET_PARTITIONS_PER_DAYBASED_COMPACTION_PROP, DEFAULT_TARGET_PARTITIONS_PER_DAYBASED_COMPACTION); setDefaultOnCondition(props, !props.containsKey(COMMITS_ARCHIVAL_BATCH_SIZE_PROP), COMMITS_ARCHIVAL_BATCH_SIZE_PROP, DEFAULT_COMMITS_ARCHIVAL_BATCH_SIZE); + setDefaultOnCondition(props, !props.containsKey(CLEANER_BOOTSTRAP_BASE_FILE_ENABLED), + CLEANER_BOOTSTRAP_BASE_FILE_ENABLED, DEFAULT_CLEANER_BOOTSTRAP_BASE_FILE_ENABLED); HoodieCleaningPolicy.valueOf(props.getProperty(CLEANER_POLICY_PROP)); diff --git a/hudi-client/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java b/hudi-client/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java index 89efc4e47..12026a985 100644 --- a/hudi-client/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java +++ b/hudi-client/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java @@ -369,6 +369,10 @@ public class HoodieWriteConfig extends DefaultHoodieConfig { return Integer.parseInt(props.getProperty(HoodieCompactionConfig.COMMITS_ARCHIVAL_BATCH_SIZE_PROP)); } + public Boolean shouldCleanBootstrapBaseFile() { + return Boolean.valueOf(props.getProperty(HoodieCompactionConfig.CLEANER_BOOTSTRAP_BASE_FILE_ENABLED)); + } + /** * index properties. */ diff --git a/hudi-client/src/main/java/org/apache/hudi/table/action/clean/CleanActionExecutor.java b/hudi-client/src/main/java/org/apache/hudi/table/action/clean/CleanActionExecutor.java index c72a453ac..52614476e 100644 --- a/hudi-client/src/main/java/org/apache/hudi/table/action/clean/CleanActionExecutor.java +++ b/hudi-client/src/main/java/org/apache/hudi/table/action/clean/CleanActionExecutor.java @@ -23,13 +23,15 @@ import org.apache.hadoop.fs.Path; import org.apache.hudi.avro.model.HoodieActionInstant; import org.apache.hudi.avro.model.HoodieCleanMetadata; import org.apache.hudi.avro.model.HoodieCleanerPlan; +import org.apache.hudi.avro.model.HoodieCleanFileInfo; import org.apache.hudi.common.HoodieCleanStat; -import org.apache.hudi.common.fs.FSUtils; +import org.apache.hudi.common.model.CleanFileInfo; import org.apache.hudi.common.model.HoodieCleaningPolicy; import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.table.timeline.HoodieTimeline; import org.apache.hudi.common.table.timeline.TimelineMetadataUtils; import org.apache.hudi.common.util.CleanerUtils; +import org.apache.hudi.common.util.CollectionUtils; import org.apache.hudi.common.util.HoodieTimer; import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.ValidationUtils; @@ -82,40 +84,45 @@ public class CleanActionExecutor extends BaseActionExecutor LOG.info("Using cleanerParallelism: " + cleanerParallelism); jsc.setJobGroup(this.getClass().getSimpleName(), "Generates list of file slices to be cleaned"); - Map> cleanOps = jsc + Map> cleanOps = jsc .parallelize(partitionsToClean, cleanerParallelism) .map(partitionPathToClean -> Pair.of(partitionPathToClean, planner.getDeletePaths(partitionPathToClean))) .collect().stream() - .collect(Collectors.toMap(Pair::getKey, Pair::getValue)); + .collect(Collectors.toMap(Pair::getKey, y -> CleanerUtils.convertToHoodieCleanFileInfoList(y.getValue()))); return new HoodieCleanerPlan(earliestInstant .map(x -> new HoodieActionInstant(x.getTimestamp(), x.getAction(), x.getState().name())).orElse(null), - config.getCleanerPolicy().name(), cleanOps, 1); + config.getCleanerPolicy().name(), CollectionUtils.createImmutableMap(), + CleanPlanner.LATEST_CLEAN_PLAN_VERSION, cleanOps); } catch (IOException e) { throw new HoodieIOException("Failed to schedule clean operation", e); } } - private static PairFlatMapFunction>, String, PartitionCleanStat> deleteFilesFunc( - HoodieTable table) { - return (PairFlatMapFunction>, String, PartitionCleanStat>) iter -> { + private static PairFlatMapFunction>, String, PartitionCleanStat> + deleteFilesFunc(HoodieTable table) { + return (PairFlatMapFunction>, String, PartitionCleanStat>) iter -> { Map partitionCleanStatMap = new HashMap<>(); - FileSystem fs = table.getMetaClient().getFs(); - Path basePath = new Path(table.getMetaClient().getBasePath()); while (iter.hasNext()) { - Tuple2 partitionDelFileTuple = iter.next(); + Tuple2 partitionDelFileTuple = iter.next(); String partitionPath = partitionDelFileTuple._1(); - String delFileName = partitionDelFileTuple._2(); - Path deletePath = FSUtils.getPartitionPath(FSUtils.getPartitionPath(basePath, partitionPath), delFileName); + Path deletePath = new Path(partitionDelFileTuple._2().getFilePath()); String deletePathStr = deletePath.toString(); Boolean deletedFileResult = deleteFileAndGetResult(fs, deletePathStr); if (!partitionCleanStatMap.containsKey(partitionPath)) { partitionCleanStatMap.put(partitionPath, new PartitionCleanStat(partitionPath)); } + boolean isBootstrapBasePathFile = partitionDelFileTuple._2().isBootstrapBaseFile(); PartitionCleanStat partitionCleanStat = partitionCleanStatMap.get(partitionPath); - partitionCleanStat.addDeleteFilePatterns(deletePath.getName()); - partitionCleanStat.addDeletedFileResult(deletePath.getName(), deletedFileResult); + if (isBootstrapBasePathFile) { + // For Bootstrap Base file deletions, store the full file path. + partitionCleanStat.addDeleteFilePatterns(deletePath.toString(), true); + partitionCleanStat.addDeletedFileResult(deletePath.toString(), deletedFileResult, true); + } else { + partitionCleanStat.addDeleteFilePatterns(deletePath.getName(), false); + partitionCleanStat.addDeletedFileResult(deletePath.getName(), deletedFileResult, false); + } } return partitionCleanStatMap.entrySet().stream().map(e -> new Tuple2<>(e.getKey(), e.getValue())) .collect(Collectors.toList()).iterator(); @@ -145,14 +152,15 @@ public class CleanActionExecutor extends BaseActionExecutor */ List clean(JavaSparkContext jsc, HoodieCleanerPlan cleanerPlan) { int cleanerParallelism = Math.min( - (int) (cleanerPlan.getFilesToBeDeletedPerPartition().values().stream().mapToInt(List::size).count()), + (int) (cleanerPlan.getFilePathsToBeDeletedPerPartition().values().stream().mapToInt(List::size).count()), config.getCleanerParallelism()); LOG.info("Using cleanerParallelism: " + cleanerParallelism); - + jsc.setJobGroup(this.getClass().getSimpleName(), "Perform cleaning of partitions"); List> partitionCleanStats = jsc - .parallelize(cleanerPlan.getFilesToBeDeletedPerPartition().entrySet().stream() - .flatMap(x -> x.getValue().stream().map(y -> new Tuple2<>(x.getKey(), y))) + .parallelize(cleanerPlan.getFilePathsToBeDeletedPerPartition().entrySet().stream() + .flatMap(x -> x.getValue().stream().map(y -> new Tuple2<>(x.getKey(), + new CleanFileInfo(y.getFilePath(), y.getIsBootstrapBaseFile())))) .collect(Collectors.toList()), cleanerParallelism) .mapPartitionsToPair(deleteFilesFunc(table)) .reduceByKey(PartitionCleanStat::merge).collect(); @@ -161,7 +169,7 @@ public class CleanActionExecutor extends BaseActionExecutor .collect(Collectors.toMap(Tuple2::_1, Tuple2::_2)); // Return PartitionCleanStat for each partition passed. - return cleanerPlan.getFilesToBeDeletedPerPartition().keySet().stream().map(partitionPath -> { + return cleanerPlan.getFilePathsToBeDeletedPerPartition().keySet().stream().map(partitionPath -> { PartitionCleanStat partitionCleanStat = partitionCleanStatsMap.containsKey(partitionPath) ? partitionCleanStatsMap.get(partitionPath) : new PartitionCleanStat(partitionPath); @@ -175,21 +183,25 @@ public class CleanActionExecutor extends BaseActionExecutor .withDeletePathPattern(partitionCleanStat.deletePathPatterns()) .withSuccessfulDeletes(partitionCleanStat.successDeleteFiles()) .withFailedDeletes(partitionCleanStat.failedDeleteFiles()) + .withDeleteBootstrapBasePathPatterns(partitionCleanStat.getDeleteBootstrapBasePathPatterns()) + .withSuccessfulDeleteBootstrapBaseFiles(partitionCleanStat.getSuccessfulDeleteBootstrapBaseFiles()) + .withFailedDeleteBootstrapBaseFiles(partitionCleanStat.getFailedDeleteBootstrapBaseFiles()) .build(); }).collect(Collectors.toList()); } /** * Creates a Cleaner plan if there are files to be cleaned and stores them in instant file. + * Cleaner Plan contains absolute file paths. * * @param startCleanTime Cleaner Instant Time * @return Cleaner Plan if generated */ Option requestClean(String startCleanTime) { final HoodieCleanerPlan cleanerPlan = requestClean(jsc); - if ((cleanerPlan.getFilesToBeDeletedPerPartition() != null) - && !cleanerPlan.getFilesToBeDeletedPerPartition().isEmpty() - && cleanerPlan.getFilesToBeDeletedPerPartition().values().stream().mapToInt(List::size).sum() > 0) { + if ((cleanerPlan.getFilePathsToBeDeletedPerPartition() != null) + && !cleanerPlan.getFilePathsToBeDeletedPerPartition().isEmpty() + && cleanerPlan.getFilePathsToBeDeletedPerPartition().values().stream().mapToInt(List::size).sum() > 0) { // Only create cleaner plan which does some work final HoodieInstant cleanInstant = new HoodieInstant(HoodieInstant.State.REQUESTED, HoodieTimeline.CLEAN_ACTION, startCleanTime); // Save to both aux and timeline folder @@ -275,7 +287,7 @@ public class CleanActionExecutor extends BaseActionExecutor if (cleanerPlanOpt.isPresent()) { table.getMetaClient().reloadActiveTimeline(); HoodieCleanerPlan cleanerPlan = cleanerPlanOpt.get(); - if ((cleanerPlan.getFilesToBeDeletedPerPartition() != null) && !cleanerPlan.getFilesToBeDeletedPerPartition().isEmpty()) { + if ((cleanerPlan.getFilePathsToBeDeletedPerPartition() != null) && !cleanerPlan.getFilePathsToBeDeletedPerPartition().isEmpty()) { return runClean(table, HoodieTimeline.getCleanRequestedInstant(instantTime), cleanerPlan); } } diff --git a/hudi-client/src/main/java/org/apache/hudi/table/action/clean/CleanPlanner.java b/hudi-client/src/main/java/org/apache/hudi/table/action/clean/CleanPlanner.java index 815b41dc5..dc891262e 100644 --- a/hudi-client/src/main/java/org/apache/hudi/table/action/clean/CleanPlanner.java +++ b/hudi-client/src/main/java/org/apache/hudi/table/action/clean/CleanPlanner.java @@ -21,6 +21,7 @@ package org.apache.hudi.table.action.clean; import org.apache.hudi.avro.model.HoodieCleanMetadata; import org.apache.hudi.avro.model.HoodieSavepointMetadata; import org.apache.hudi.common.fs.FSUtils; +import org.apache.hudi.common.model.CleanFileInfo; import org.apache.hudi.common.model.CompactionOperation; import org.apache.hudi.common.model.FileSlice; import org.apache.hudi.common.model.HoodieBaseFile; @@ -28,12 +29,13 @@ import org.apache.hudi.common.model.HoodieCleaningPolicy; import org.apache.hudi.common.model.HoodieCommitMetadata; import org.apache.hudi.common.model.HoodieFileGroup; import org.apache.hudi.common.model.HoodieFileGroupId; -import org.apache.hudi.common.model.HoodieLogFile; import org.apache.hudi.common.model.HoodieRecordPayload; import org.apache.hudi.common.model.HoodieTableType; import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.table.timeline.HoodieTimeline; import org.apache.hudi.common.table.timeline.TimelineMetadataUtils; +import org.apache.hudi.common.table.timeline.versioning.clean.CleanPlanV1MigrationHandler; +import org.apache.hudi.common.table.timeline.versioning.clean.CleanPlanV2MigrationHandler; import org.apache.hudi.common.table.view.SyncableFileSystemView; import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.collection.Pair; @@ -65,6 +67,10 @@ public class CleanPlanner> implements Serializa private static final Logger LOG = LogManager.getLogger(CleanPlanner.class); + public static final Integer CLEAN_PLAN_VERSION_1 = CleanPlanV1MigrationHandler.VERSION; + public static final Integer CLEAN_PLAN_VERSION_2 = CleanPlanV2MigrationHandler.VERSION; + public static final Integer LATEST_CLEAN_PLAN_VERSION = CLEAN_PLAN_VERSION_2; + private final SyncableFileSystemView fileSystemView; private final HoodieTimeline commitTimeline; private final Map fgIdToPendingCompactionOperations; @@ -189,11 +195,11 @@ public class CleanPlanner> implements Serializa * policy is useful, if you are simply interested in querying the table, and you don't want too many versions for a * single file (i.e run it with versionsRetained = 1) */ - private List getFilesToCleanKeepingLatestVersions(String partitionPath) { + private List getFilesToCleanKeepingLatestVersions(String partitionPath) { LOG.info("Cleaning " + partitionPath + ", retaining latest " + config.getCleanerFileVersionsRetained() + " file versions. "); List fileGroups = fileSystemView.getAllFileGroups(partitionPath).collect(Collectors.toList()); - List deletePaths = new ArrayList<>(); + List deletePaths = new ArrayList<>(); // Collect all the datafiles savepointed by all the savepoints List savepointedFiles = hoodieTable.getSavepoints().stream() .flatMap(this::getSavepointedDataFiles) @@ -224,11 +230,15 @@ public class CleanPlanner> implements Serializa FileSlice nextSlice = fileSliceIterator.next(); if (nextSlice.getBaseFile().isPresent()) { HoodieBaseFile dataFile = nextSlice.getBaseFile().get(); - deletePaths.add(dataFile.getFileName()); + deletePaths.add(new CleanFileInfo(dataFile.getPath(), false)); + if (dataFile.getBootstrapBaseFile().isPresent() && config.shouldCleanBootstrapBaseFile()) { + deletePaths.add(new CleanFileInfo(dataFile.getBootstrapBaseFile().get().getPath(), true)); + } } if (hoodieTable.getMetaClient().getTableType() == HoodieTableType.MERGE_ON_READ) { // If merge on read, then clean the log files for the commits as well - deletePaths.addAll(nextSlice.getLogFiles().map(HoodieLogFile::getFileName).collect(Collectors.toList())); + deletePaths.addAll(nextSlice.getLogFiles().map(lf -> new CleanFileInfo(lf.getPath().toString(), false)) + .collect(Collectors.toList())); } } } @@ -249,10 +259,10 @@ public class CleanPlanner> implements Serializa *

* This policy is the default. */ - private List getFilesToCleanKeepingLatestCommits(String partitionPath) { + private List getFilesToCleanKeepingLatestCommits(String partitionPath) { int commitsRetained = config.getCleanerCommitsRetained(); LOG.info("Cleaning " + partitionPath + ", retaining latest " + commitsRetained + " commits. "); - List deletePaths = new ArrayList<>(); + List deletePaths = new ArrayList<>(); // Collect all the datafiles savepointed by all the savepoints List savepointedFiles = hoodieTable.getSavepoints().stream() @@ -297,16 +307,21 @@ public class CleanPlanner> implements Serializa if (!isFileSliceNeededForPendingCompaction(aSlice) && HoodieTimeline .compareTimestamps(earliestCommitToRetain.getTimestamp(), HoodieTimeline.GREATER_THAN, fileCommitTime)) { // this is a commit, that should be cleaned. - aFile.ifPresent(hoodieDataFile -> deletePaths.add(hoodieDataFile.getFileName())); + aFile.ifPresent(hoodieDataFile -> { + deletePaths.add(new CleanFileInfo(hoodieDataFile.getPath(), false)); + if (hoodieDataFile.getBootstrapBaseFile().isPresent() && config.shouldCleanBootstrapBaseFile()) { + deletePaths.add(new CleanFileInfo(hoodieDataFile.getBootstrapBaseFile().get().getPath(), true)); + } + }); if (hoodieTable.getMetaClient().getTableType() == HoodieTableType.MERGE_ON_READ) { // If merge on read, then clean the log files for the commits as well - deletePaths.addAll(aSlice.getLogFiles().map(HoodieLogFile::getFileName).collect(Collectors.toList())); + deletePaths.addAll(aSlice.getLogFiles().map(lf -> new CleanFileInfo(lf.getPath().toString(), false)) + .collect(Collectors.toList())); } } } } } - return deletePaths; } @@ -329,9 +344,9 @@ public class CleanPlanner> implements Serializa /** * Returns files to be cleaned for the given partitionPath based on cleaning policy. */ - public List getDeletePaths(String partitionPath) { + public List getDeletePaths(String partitionPath) { HoodieCleaningPolicy policy = config.getCleanerPolicy(); - List deletePaths; + List deletePaths; if (policy == HoodieCleaningPolicy.KEEP_LATEST_COMMITS) { deletePaths = getFilesToCleanKeepingLatestCommits(partitionPath); } else if (policy == HoodieCleaningPolicy.KEEP_LATEST_FILE_VERSIONS) { diff --git a/hudi-client/src/main/java/org/apache/hudi/table/action/clean/PartitionCleanStat.java b/hudi-client/src/main/java/org/apache/hudi/table/action/clean/PartitionCleanStat.java index 3493ad610..2ff2d9660 100644 --- a/hudi-client/src/main/java/org/apache/hudi/table/action/clean/PartitionCleanStat.java +++ b/hudi-client/src/main/java/org/apache/hudi/table/action/clean/PartitionCleanStat.java @@ -28,21 +28,36 @@ class PartitionCleanStat implements Serializable { private final List deletePathPatterns = new ArrayList<>(); private final List successDeleteFiles = new ArrayList<>(); private final List failedDeleteFiles = new ArrayList<>(); + private final List deleteBootstrapBasePathPatterns = new ArrayList<>(); + private final List successfulDeleteBootstrapBaseFiles = new ArrayList<>(); + private final List failedDeleteBootstrapBaseFiles = new ArrayList<>(); PartitionCleanStat(String partitionPath) { this.partitionPath = partitionPath; } - void addDeletedFileResult(String deletePathStr, Boolean deletedFileResult) { - if (deletedFileResult) { - successDeleteFiles.add(deletePathStr); + void addDeletedFileResult(String deletePathStr, boolean success, boolean isBootstrapBasePath) { + if (success) { + if (isBootstrapBasePath) { + successfulDeleteBootstrapBaseFiles.add(deletePathStr); + } else { + successDeleteFiles.add(deletePathStr); + } } else { - failedDeleteFiles.add(deletePathStr); + if (isBootstrapBasePath) { + failedDeleteBootstrapBaseFiles.add(deletePathStr); + } else { + failedDeleteFiles.add(deletePathStr); + } } } - void addDeleteFilePatterns(String deletePathStr) { - deletePathPatterns.add(deletePathStr); + void addDeleteFilePatterns(String deletePathStr, boolean isBootstrapBasePath) { + if (isBootstrapBasePath) { + deleteBootstrapBasePathPatterns.add(deletePathStr); + } else { + deletePathPatterns.add(deletePathStr); + } } PartitionCleanStat merge(PartitionCleanStat other) { @@ -53,6 +68,9 @@ class PartitionCleanStat implements Serializable { successDeleteFiles.addAll(other.successDeleteFiles); deletePathPatterns.addAll(other.deletePathPatterns); failedDeleteFiles.addAll(other.failedDeleteFiles); + deleteBootstrapBasePathPatterns.addAll(other.deleteBootstrapBasePathPatterns); + successfulDeleteBootstrapBaseFiles.addAll(other.successfulDeleteBootstrapBaseFiles); + failedDeleteBootstrapBaseFiles.addAll(other.failedDeleteBootstrapBaseFiles); return this; } @@ -67,4 +85,16 @@ class PartitionCleanStat implements Serializable { public List failedDeleteFiles() { return failedDeleteFiles; } + + public List getDeleteBootstrapBasePathPatterns() { + return deleteBootstrapBasePathPatterns; + } + + public List getSuccessfulDeleteBootstrapBaseFiles() { + return successfulDeleteBootstrapBaseFiles; + } + + public List getFailedDeleteBootstrapBaseFiles() { + return failedDeleteBootstrapBaseFiles; + } } diff --git a/hudi-client/src/test/java/org/apache/hudi/table/TestCleaner.java b/hudi-client/src/test/java/org/apache/hudi/table/TestCleaner.java index 0376ec36a..a0647882a 100644 --- a/hudi-client/src/test/java/org/apache/hudi/table/TestCleaner.java +++ b/hudi-client/src/test/java/org/apache/hudi/table/TestCleaner.java @@ -18,14 +18,19 @@ package org.apache.hudi.table; +import org.apache.hudi.avro.model.HoodieActionInstant; import org.apache.hudi.avro.model.HoodieCleanMetadata; import org.apache.hudi.avro.model.HoodieCleanPartitionMetadata; +import org.apache.hudi.avro.model.HoodieCleanerPlan; import org.apache.hudi.avro.model.HoodieCompactionPlan; +import org.apache.hudi.avro.model.HoodieFileStatus; import org.apache.hudi.client.HoodieWriteClient; import org.apache.hudi.client.WriteStatus; import org.apache.hudi.common.HoodieCleanStat; +import org.apache.hudi.common.bootstrap.TestBootstrapIndex; import org.apache.hudi.common.fs.ConsistencyGuardConfig; import org.apache.hudi.common.fs.FSUtils; +import org.apache.hudi.common.model.BootstrapFileMapping; import org.apache.hudi.common.model.FileSlice; import org.apache.hudi.common.model.HoodieBaseFile; import org.apache.hudi.common.model.HoodieCleaningPolicy; @@ -42,6 +47,8 @@ import org.apache.hudi.common.table.timeline.HoodieInstant.State; import org.apache.hudi.common.table.timeline.HoodieTimeline; import org.apache.hudi.common.table.timeline.TimelineMetadataUtils; import org.apache.hudi.common.table.timeline.versioning.clean.CleanMetadataMigrator; +import org.apache.hudi.common.table.timeline.versioning.clean.CleanPlanMigrator; +import org.apache.hudi.common.table.timeline.versioning.clean.CleanPlanV1MigrationHandler; import org.apache.hudi.common.table.view.TableFileSystemView; import org.apache.hudi.common.testutils.FileSystemTestUtils; import org.apache.hudi.common.testutils.HoodieTestDataGenerator; @@ -55,6 +62,7 @@ import org.apache.hudi.config.HoodieCompactionConfig; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.exception.HoodieIOException; import org.apache.hudi.index.HoodieIndex; +import org.apache.hudi.table.action.clean.CleanPlanner; import org.apache.hudi.testutils.HoodieClientTestBase; import org.apache.hudi.testutils.HoodieClientTestUtils; @@ -64,6 +72,7 @@ import org.apache.log4j.Logger; import org.apache.spark.api.java.JavaRDD; import org.junit.jupiter.api.Test; +import java.io.File; import java.io.IOException; import java.nio.charset.StandardCharsets; import java.nio.file.Paths; @@ -76,6 +85,7 @@ import java.util.List; import java.util.Map; import java.util.Set; import java.util.TreeSet; +import java.util.UUID; import java.util.function.Predicate; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -467,7 +477,7 @@ public class TestCleaner extends HoodieClientTestBase { }); } - return cleanMetadata1.getPartitionMetadata().values().stream() + Map cleanStatMap = cleanMetadata1.getPartitionMetadata().values().stream() .map(x -> new HoodieCleanStat.Builder().withPartitionPath(x.getPartitionPath()) .withFailedDeletes(x.getFailedDeleteFiles()).withSuccessfulDeletes(x.getSuccessDeleteFiles()) .withPolicy(HoodieCleaningPolicy.valueOf(x.getPolicy())).withDeletePathPattern(x.getDeletePathPatterns()) @@ -475,88 +485,144 @@ public class TestCleaner extends HoodieClientTestBase { ? new HoodieInstant(State.COMPLETED, HoodieTimeline.COMMIT_ACTION, "000") : null)) .build()) - .collect(Collectors.toList()); + .collect(Collectors.toMap(HoodieCleanStat::getPartitionPath, x -> x)); + cleanMetadata1.getBootstrapPartitionMetadata().values().forEach(x -> { + HoodieCleanStat s = cleanStatMap.get(x.getPartitionPath()); + cleanStatMap.put(x.getPartitionPath(), new HoodieCleanStat.Builder().withPartitionPath(x.getPartitionPath()) + .withFailedDeletes(s.getFailedDeleteFiles()).withSuccessfulDeletes(s.getSuccessDeleteFiles()) + .withPolicy(HoodieCleaningPolicy.valueOf(x.getPolicy())).withDeletePathPattern(s.getDeletePathPatterns()) + .withEarliestCommitRetained(Option.ofNullable(s.getEarliestCommitToRetain()) + .map(y -> new HoodieInstant(State.COMPLETED, HoodieTimeline.COMMIT_ACTION, y))) + .withSuccessfulDeleteBootstrapBaseFiles(x.getSuccessDeleteFiles()) + .withFailedDeleteBootstrapBaseFiles(x.getFailedDeleteFiles()) + .withDeleteBootstrapBasePathPatterns(x.getDeletePathPatterns()).build()); + }); + return new ArrayList<>(cleanStatMap.values()); + } + + /** + * Test HoodieTable.clean() Cleaning by versions for COW table. + */ + @Test + public void testKeepLatestFileVersions() throws IOException { + testKeepLatestFileVersions(false); + } + + /** + * Test HoodieTable.clean() Cleaning by version logic for COW table with Bootstrap source file clean enable. + */ + @Test + public void testBootstrapSourceFileCleanWithKeepLatestFileVersions() throws IOException { + testKeepLatestFileVersions(true); } /** * Test HoodieTable.clean() Cleaning by versions logic. */ - @Test - public void testKeepLatestFileVersions() throws IOException { + public void testKeepLatestFileVersions(Boolean enableBootstrapSourceClean) throws IOException { HoodieWriteConfig config = HoodieWriteConfig.newBuilder().withPath(basePath).withAssumeDatePartitioning(true) .withCompactionConfig(HoodieCompactionConfig.newBuilder() + .withCleanBootstrapBaseFileEnabled(enableBootstrapSourceClean) .withCleanerPolicy(HoodieCleaningPolicy.KEEP_LATEST_FILE_VERSIONS).retainFileVersions(1).build()) .build(); // make 1 commit, with 1 file per partition - HoodieTestUtils.createCommitFiles(basePath, "000"); + HoodieTestUtils.createCommitFiles(basePath, "00000000000001"); - String file1P0C0 = - HoodieTestUtils.createNewDataFile(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "000"); - String file1P1C0 = - HoodieTestUtils.createNewDataFile(basePath, HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH, "000"); - metaClient = HoodieTableMetaClient.reload(metaClient); + Map> bootstrapMapping = enableBootstrapSourceClean ? generateBootstrapIndexAndSourceData() : null; + + String file1P0C0 = enableBootstrapSourceClean ? bootstrapMapping.get(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH).get(0).getFileId() + : UUID.randomUUID().toString(); + String file1P1C0 = enableBootstrapSourceClean ? bootstrapMapping.get(HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH).get(0).getFileId() + : UUID.randomUUID().toString(); + HoodieTestUtils + .createDataFile(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "00000000000001", file1P0C0); // insert + HoodieTestUtils + .createDataFile(basePath, HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH, "00000000000001", file1P1C0); // insert List hoodieCleanStatsOne = runCleaner(config); assertEquals(0, hoodieCleanStatsOne.size(), "Must not clean any files"); - assertTrue(HoodieTestUtils.doesDataFileExist(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "000", + assertTrue(HoodieTestUtils.doesDataFileExist(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "00000000000001", file1P0C0)); - assertTrue(HoodieTestUtils.doesDataFileExist(basePath, HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH, "000", + assertTrue(HoodieTestUtils.doesDataFileExist(basePath, HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH, "00000000000001", file1P1C0)); // make next commit, with 1 insert & 1 update per partition - HoodieTestUtils.createCommitFiles(basePath, "001"); + HoodieTestUtils.createCommitFiles(basePath, "00000000000002"); metaClient = HoodieTableMetaClient.reload(metaClient); String file2P0C1 = - HoodieTestUtils.createNewDataFile(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "001"); // insert + HoodieTestUtils.createNewDataFile(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "00000000000002"); // insert String file2P1C1 = - HoodieTestUtils.createNewDataFile(basePath, HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH, "001"); // insert - HoodieTestUtils.createDataFile(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "001", file1P0C0); // update - HoodieTestUtils.createDataFile(basePath, HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH, "001", file1P1C0); // update + HoodieTestUtils.createNewDataFile(basePath, HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH, "00000000000002"); // insert + HoodieTestUtils.createDataFile(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "00000000000002", file1P0C0); // update + HoodieTestUtils.createDataFile(basePath, HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH, "00000000000002", file1P1C0); // update List hoodieCleanStatsTwo = runCleaner(config); - assertEquals(1, - getCleanStat(hoodieCleanStatsTwo, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH).getSuccessDeleteFiles() - .size(), "Must clean 1 file"); - assertEquals(1, - getCleanStat(hoodieCleanStatsTwo, HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH).getSuccessDeleteFiles() - .size(), "Must clean 1 file"); - assertTrue(HoodieTestUtils.doesDataFileExist(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "001", + // enableBootstrapSourceClean would delete the bootstrap base file as the same time + HoodieCleanStat cleanStat = getCleanStat(hoodieCleanStatsTwo, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH); + assertEquals(enableBootstrapSourceClean ? 2 : 1, cleanStat.getSuccessDeleteFiles().size() + + (cleanStat.getSuccessDeleteBootstrapBaseFiles() == null ? 0 + : cleanStat.getSuccessDeleteBootstrapBaseFiles().size()), "Must clean at least 1 file"); + if (enableBootstrapSourceClean) { + HoodieFileStatus fstatus = + bootstrapMapping.get(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH).get(0).getBoostrapFileStatus(); + // This ensures full path is recorded in metadata. + assertTrue(cleanStat.getSuccessDeleteBootstrapBaseFiles().contains(fstatus.getPath().getUri()), + "Successful delete files were " + cleanStat.getSuccessDeleteBootstrapBaseFiles() + + " but did not contain " + fstatus.getPath().getUri()); + assertFalse(new File(bootstrapMapping.get( + HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH).get(0).getBoostrapFileStatus().getPath().getUri()).exists()); + } + cleanStat = getCleanStat(hoodieCleanStatsTwo, HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH); + assertEquals(enableBootstrapSourceClean ? 2 : 1, cleanStat.getSuccessDeleteFiles().size() + + (cleanStat.getSuccessDeleteBootstrapBaseFiles() == null ? 0 + : cleanStat.getSuccessDeleteBootstrapBaseFiles().size()), "Must clean at least 1 file"); + assertTrue(HoodieTestUtils.doesDataFileExist(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "00000000000002", file2P0C1)); - assertTrue(HoodieTestUtils.doesDataFileExist(basePath, HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH, "001", + assertTrue(HoodieTestUtils.doesDataFileExist(basePath, HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH, "00000000000002", file2P1C1)); - assertFalse(HoodieTestUtils.doesDataFileExist(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "000", + assertFalse(HoodieTestUtils.doesDataFileExist(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "00000000000001", file1P0C0)); + if (enableBootstrapSourceClean) { + HoodieFileStatus fstatus = + bootstrapMapping.get(HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH).get(0).getBoostrapFileStatus(); + // This ensures full path is recorded in metadata. + assertTrue(cleanStat.getSuccessDeleteBootstrapBaseFiles().contains(fstatus.getPath().getUri()), + "Successful delete files were " + cleanStat.getSuccessDeleteBootstrapBaseFiles() + + " but did not contain " + fstatus.getPath().getUri()); + assertFalse(new File(bootstrapMapping.get( + HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH).get(0).getBoostrapFileStatus().getPath().getUri()).exists()); + } assertFalse(HoodieTestUtils.doesDataFileExist(basePath, HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH, - "000", file1P1C0)); + "00000000000001", file1P1C0)); // make next commit, with 2 updates to existing files, and 1 insert - HoodieTestUtils.createCommitFiles(basePath, "002"); + HoodieTestUtils.createCommitFiles(basePath, "00000000000003"); metaClient = HoodieTableMetaClient.reload(metaClient); - HoodieTestUtils.createDataFile(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "002", file1P0C0); // update - HoodieTestUtils.createDataFile(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "002", file2P0C1); // update + HoodieTestUtils.createDataFile(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "00000000000003", file1P0C0); // update + HoodieTestUtils.createDataFile(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "00000000000003", file2P0C1); // update String file3P0C2 = - HoodieTestUtils.createNewDataFile(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "002"); + HoodieTestUtils.createNewDataFile(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "00000000000003"); List hoodieCleanStatsThree = runCleaner(config); assertEquals(2, getCleanStat(hoodieCleanStatsThree, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH) .getSuccessDeleteFiles().size(), "Must clean two files"); - assertFalse(HoodieTestUtils.doesDataFileExist(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "001", + assertFalse(HoodieTestUtils.doesDataFileExist(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "00000000000002", file1P0C0)); - assertFalse(HoodieTestUtils.doesDataFileExist(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "001", + assertFalse(HoodieTestUtils.doesDataFileExist(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "00000000000002", file2P0C1)); - assertTrue(HoodieTestUtils.doesDataFileExist(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "002", + assertTrue(HoodieTestUtils.doesDataFileExist(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "00000000000003", file3P0C2)); // No cleaning on partially written file, with no commit. - HoodieTestUtils.createDataFile(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "003", file3P0C2); // update + HoodieTestUtils.createDataFile(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "00000000000004", file3P0C2); // update List hoodieCleanStatsFour = runCleaner(config); assertEquals(0, hoodieCleanStatsFour.size(), "Must not clean any files"); - assertTrue(HoodieTestUtils.doesDataFileExist(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "002", + assertTrue(HoodieTestUtils.doesDataFileExist(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "00000000000003", file3P0C2)); } @@ -604,7 +670,7 @@ public class TestCleaner extends HoodieClientTestBase { } @Test - public void testUpgradeDowngrade() { + public void testCleanMetadataUpgradeDowngrade() { String instantTime = "000"; String partition1 = DEFAULT_PARTITION_PATHS[0]; @@ -693,6 +759,68 @@ public class TestCleaner extends HoodieClientTestBase { assertEquals(policies1, policies2); } + @Test + public void testCleanPlanUpgradeDowngrade() { + String instantTime = "000"; + + String partition1 = DEFAULT_PARTITION_PATHS[0]; + String partition2 = DEFAULT_PARTITION_PATHS[1]; + + String fileName1 = "data1_1_000.parquet"; + String fileName2 = "data2_1_000.parquet"; + + Map> filesToBeCleanedPerPartition = new HashMap<>(); + filesToBeCleanedPerPartition.put(partition1, Arrays.asList(fileName1)); + filesToBeCleanedPerPartition.put(partition2, Arrays.asList(fileName2)); + + HoodieCleanerPlan version1Plan = + HoodieCleanerPlan.newBuilder().setEarliestInstantToRetain(HoodieActionInstant.newBuilder() + .setAction(HoodieTimeline.COMMIT_ACTION) + .setTimestamp(instantTime).setState(State.COMPLETED.name()).build()) + .setPolicy(HoodieCleaningPolicy.KEEP_LATEST_COMMITS.name()) + .setFilesToBeDeletedPerPartition(filesToBeCleanedPerPartition) + .setVersion(CleanPlanV1MigrationHandler.VERSION) + .build(); + + // Upgrade and Verify version 2 plan + HoodieCleanerPlan version2Plan = + new CleanPlanMigrator(metaClient).upgradeToLatest(version1Plan, version1Plan.getVersion()); + assertEquals(version1Plan.getEarliestInstantToRetain(), version2Plan.getEarliestInstantToRetain()); + assertEquals(version1Plan.getPolicy(), version2Plan.getPolicy()); + assertEquals(CleanPlanner.LATEST_CLEAN_PLAN_VERSION, version2Plan.getVersion()); + // Deprecated Field is not used. + assertEquals(0, version2Plan.getFilesToBeDeletedPerPartition().size()); + assertEquals(version1Plan.getFilesToBeDeletedPerPartition().size(), + version2Plan.getFilePathsToBeDeletedPerPartition().size()); + assertEquals(version1Plan.getFilesToBeDeletedPerPartition().get(partition1).size(), + version2Plan.getFilePathsToBeDeletedPerPartition().get(partition1).size()); + assertEquals(version1Plan.getFilesToBeDeletedPerPartition().get(partition2).size(), + version2Plan.getFilePathsToBeDeletedPerPartition().get(partition2).size()); + assertEquals(new Path(FSUtils.getPartitionPath(metaClient.getBasePath(), partition1), fileName1).toString(), + version2Plan.getFilePathsToBeDeletedPerPartition().get(partition1).get(0).getFilePath()); + assertEquals(new Path(FSUtils.getPartitionPath(metaClient.getBasePath(), partition2), fileName2).toString(), + version2Plan.getFilePathsToBeDeletedPerPartition().get(partition2).get(0).getFilePath()); + + // Downgrade and verify version 1 plan + HoodieCleanerPlan gotVersion1Plan = new CleanPlanMigrator(metaClient).migrateToVersion(version2Plan, + version2Plan.getVersion(), version1Plan.getVersion()); + assertEquals(version1Plan.getEarliestInstantToRetain(), gotVersion1Plan.getEarliestInstantToRetain()); + assertEquals(version1Plan.getPolicy(), version2Plan.getPolicy()); + assertEquals(version1Plan.getVersion(), gotVersion1Plan.getVersion()); + assertEquals(version1Plan.getFilesToBeDeletedPerPartition().size(), + gotVersion1Plan.getFilesToBeDeletedPerPartition().size()); + assertEquals(version1Plan.getFilesToBeDeletedPerPartition().get(partition1).size(), + gotVersion1Plan.getFilesToBeDeletedPerPartition().get(partition1).size()); + assertEquals(version1Plan.getFilesToBeDeletedPerPartition().get(partition2).size(), + gotVersion1Plan.getFilesToBeDeletedPerPartition().get(partition2).size()); + assertEquals(version1Plan.getFilesToBeDeletedPerPartition().get(partition1).get(0), + gotVersion1Plan.getFilesToBeDeletedPerPartition().get(partition1).get(0)); + assertEquals(version1Plan.getFilesToBeDeletedPerPartition().get(partition2).get(0), + gotVersion1Plan.getFilesToBeDeletedPerPartition().get(partition2).get(0)); + assertTrue(gotVersion1Plan.getFilePathsToBeDeletedPerPartition().isEmpty()); + assertNull(version1Plan.getFilePathsToBeDeletedPerPartition()); + } + private void testCleanMetadataPathEquality(HoodieCleanMetadata metadata, Map expected) { Map partitionMetadataMap = metadata.getPartitionMetadata(); @@ -708,47 +836,62 @@ public class TestCleaner extends HoodieClientTestBase { } /** - * Test HoodieTable.clean() Cleaning by commit logic for MOR table with Log files. + * Test HoodieTable.clean() Cleaning by commit logic for COW table. */ @Test public void testKeepLatestCommits() throws IOException { - testKeepLatestCommits(false, false); + testKeepLatestCommits(false, false, false); } /** - * Test HoodieTable.clean() Cleaning by commit logic for MOR table with Log files. Here the operations are simulated + * Test HoodieTable.clean() Cleaning by commit logic for COW table. Here the operations are simulated * such that first clean attempt failed after files were cleaned and a subsequent cleanup succeeds. */ @Test public void testKeepLatestCommitsWithFailureRetry() throws IOException { - testKeepLatestCommits(true, false); + testKeepLatestCommits(true, false, false); } /** - * Test HoodieTable.clean() Cleaning by commit logic for MOR table with Log files. + * Test HoodieTable.clean() Cleaning by commit logic for COW table. */ @Test public void testKeepLatestCommitsIncrMode() throws IOException { - testKeepLatestCommits(false, true); + testKeepLatestCommits(false, true, false); } /** - * Test HoodieTable.clean() Cleaning by commit logic for MOR table with Log files. + * Test HoodieTable.clean() Cleaning by commit logic for COW table with Bootstrap source file clean enable. */ - private void testKeepLatestCommits(boolean simulateFailureRetry, boolean enableIncrementalClean) throws IOException { + @Test + public void testBootstrapSourceFileCleanWithKeepLatestCommits() throws IOException { + testKeepLatestCommits(false, false, true); + } + + /** + * Test HoodieTable.clean() Cleaning by commit logic for COW table. + */ + private void testKeepLatestCommits(boolean simulateFailureRetry, boolean enableIncrementalClean, boolean enableBootstrapSourceClean) throws IOException { HoodieWriteConfig config = HoodieWriteConfig.newBuilder().withPath(basePath).withAssumeDatePartitioning(true) .withCompactionConfig(HoodieCompactionConfig.newBuilder() .withIncrementalCleaningMode(enableIncrementalClean) + .withCleanBootstrapBaseFileEnabled(enableBootstrapSourceClean) .withCleanerPolicy(HoodieCleaningPolicy.KEEP_LATEST_COMMITS).retainCommits(2).build()) .build(); - // make 1 commit, with 1 file per partition - HoodieTestUtils.createInflightCommitFiles(basePath, "000"); + Map> bootstrapMapping = enableBootstrapSourceClean ? generateBootstrapIndexAndSourceData() : null; - String file1P0C0 = - HoodieTestUtils.createNewDataFile(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "000"); - String file1P1C0 = - HoodieTestUtils.createNewDataFile(basePath, HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH, "000"); + // make 1 commit, with 1 file per partition + HoodieTestUtils.createInflightCommitFiles(basePath, "00000000000001"); + + String file1P0C0 = enableBootstrapSourceClean ? bootstrapMapping.get(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH).get(0).getFileId() + : UUID.randomUUID().toString(); + String file1P1C0 = enableBootstrapSourceClean ? bootstrapMapping.get(HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH).get(0).getFileId() + : UUID.randomUUID().toString(); + HoodieTestUtils + .createDataFile(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "00000000000001", file1P0C0); // insert + HoodieTestUtils + .createDataFile(basePath, HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH, "00000000000001", file1P1C0); // insert HoodieCommitMetadata commitMetadata = generateCommitMetadata( Collections.unmodifiableMap(new HashMap>() { @@ -759,32 +902,32 @@ public class TestCleaner extends HoodieClientTestBase { }) ); metaClient.getActiveTimeline().saveAsComplete( - new HoodieInstant(State.INFLIGHT, HoodieTimeline.COMMIT_ACTION, "000"), + new HoodieInstant(State.INFLIGHT, HoodieTimeline.COMMIT_ACTION, "00000000000001"), Option.of(commitMetadata.toJsonString().getBytes(StandardCharsets.UTF_8))); metaClient = HoodieTableMetaClient.reload(metaClient); List hoodieCleanStatsOne = runCleaner(config, simulateFailureRetry); assertEquals(0, hoodieCleanStatsOne.size(), "Must not scan any partitions and clean any files"); - assertTrue(HoodieTestUtils.doesDataFileExist(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "000", + assertTrue(HoodieTestUtils.doesDataFileExist(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "00000000000001", file1P0C0)); - assertTrue(HoodieTestUtils.doesDataFileExist(basePath, HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH, "000", + assertTrue(HoodieTestUtils.doesDataFileExist(basePath, HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH, "00000000000001", file1P1C0)); // make next commit, with 1 insert & 1 update per partition - HoodieTestUtils.createInflightCommitFiles(basePath, "001"); + HoodieTestUtils.createInflightCommitFiles(basePath, "00000000000002"); metaClient = HoodieTableMetaClient.reload(metaClient); String file2P0C1 = HoodieTestUtils - .createNewDataFile(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "001"); // insert + .createNewDataFile(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "00000000000002"); // insert String file2P1C1 = HoodieTestUtils - .createNewDataFile(basePath, HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH, "001"); // insert + .createNewDataFile(basePath, HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH, "00000000000002"); // insert HoodieTestUtils - .createDataFile(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "001", file1P0C0); // update + .createDataFile(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "00000000000002", file1P0C0); // update HoodieTestUtils - .createDataFile(basePath, HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH, "001", file1P1C0); // update + .createDataFile(basePath, HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH, "00000000000002", file1P1C0); // update commitMetadata = generateCommitMetadata(new HashMap>() { { put(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, CollectionUtils.createImmutableList(file1P0C0, file2P0C1)); @@ -792,99 +935,132 @@ public class TestCleaner extends HoodieClientTestBase { } }); metaClient.getActiveTimeline().saveAsComplete( - new HoodieInstant(State.INFLIGHT, HoodieTimeline.COMMIT_ACTION, "001"), + new HoodieInstant(State.INFLIGHT, HoodieTimeline.COMMIT_ACTION, "00000000000002"), Option.of(commitMetadata.toJsonString().getBytes(StandardCharsets.UTF_8))); List hoodieCleanStatsTwo = runCleaner(config, simulateFailureRetry); assertEquals(0, hoodieCleanStatsTwo.size(), "Must not scan any partitions and clean any files"); - assertTrue(HoodieTestUtils.doesDataFileExist(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "001", + assertTrue(HoodieTestUtils.doesDataFileExist(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "00000000000002", file2P0C1)); - assertTrue(HoodieTestUtils.doesDataFileExist(basePath, HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH, "001", + assertTrue(HoodieTestUtils.doesDataFileExist(basePath, HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH, "00000000000002", file2P1C1)); - assertTrue(HoodieTestUtils.doesDataFileExist(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "000", + assertTrue(HoodieTestUtils.doesDataFileExist(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "00000000000001", file1P0C0)); - assertTrue(HoodieTestUtils.doesDataFileExist(basePath, HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH, "000", + assertTrue(HoodieTestUtils.doesDataFileExist(basePath, HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH, "00000000000001", file1P1C0)); // make next commit, with 2 updates to existing files, and 1 insert - HoodieTestUtils.createInflightCommitFiles(basePath, "002"); + HoodieTestUtils.createInflightCommitFiles(basePath, "00000000000003"); metaClient = HoodieTableMetaClient.reload(metaClient); HoodieTestUtils - .createDataFile(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "002", file1P0C0); // update + .createDataFile(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "00000000000003", file1P0C0); // update HoodieTestUtils - .createDataFile(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "002", file2P0C1); // update + .createDataFile(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "00000000000003", file2P0C1); // update String file3P0C2 = - HoodieTestUtils.createNewDataFile(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "002"); + HoodieTestUtils.createNewDataFile(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "00000000000003"); commitMetadata = generateCommitMetadata(CollectionUtils .createImmutableMap(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, CollectionUtils.createImmutableList(file1P0C0, file2P0C1, file3P0C2))); metaClient.getActiveTimeline().saveAsComplete( - new HoodieInstant(State.INFLIGHT, HoodieTimeline.COMMIT_ACTION, "002"), + new HoodieInstant(State.INFLIGHT, HoodieTimeline.COMMIT_ACTION, "00000000000003"), Option.of(commitMetadata.toJsonString().getBytes(StandardCharsets.UTF_8))); List hoodieCleanStatsThree = runCleaner(config, simulateFailureRetry); assertEquals(0, hoodieCleanStatsThree.size(), "Must not clean any file. We have to keep 1 version before the latest commit time to keep"); - assertTrue(HoodieTestUtils.doesDataFileExist(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "000", + assertTrue(HoodieTestUtils.doesDataFileExist(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "00000000000001", file1P0C0)); // make next commit, with 2 updates to existing files, and 1 insert - HoodieTestUtils.createInflightCommitFiles(basePath, "003"); + HoodieTestUtils.createInflightCommitFiles(basePath, "00000000000004"); metaClient = HoodieTableMetaClient.reload(metaClient); HoodieTestUtils - .createDataFile(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "003", file1P0C0); // update + .createDataFile(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "00000000000004", file1P0C0); // update HoodieTestUtils - .createDataFile(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "003", file2P0C1); // update + .createDataFile(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "00000000000004", file2P0C1); // update String file4P0C3 = - HoodieTestUtils.createNewDataFile(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "003"); + HoodieTestUtils.createNewDataFile(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "00000000000004"); commitMetadata = generateCommitMetadata(CollectionUtils.createImmutableMap( HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, CollectionUtils.createImmutableList(file1P0C0, file2P0C1, file4P0C3))); metaClient.getActiveTimeline().saveAsComplete( - new HoodieInstant(State.INFLIGHT, HoodieTimeline.COMMIT_ACTION, "003"), + new HoodieInstant(State.INFLIGHT, HoodieTimeline.COMMIT_ACTION, "00000000000004"), Option.of(commitMetadata.toJsonString().getBytes(StandardCharsets.UTF_8))); List hoodieCleanStatsFour = runCleaner(config, simulateFailureRetry); - assertEquals(1, - getCleanStat(hoodieCleanStatsFour, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH).getSuccessDeleteFiles() - .size(), "Must not clean one old file"); + // enableBootstrapSourceClean would delete the bootstrap base file as the same time + HoodieCleanStat partitionCleanStat = + getCleanStat(hoodieCleanStatsFour, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH); - assertFalse(HoodieTestUtils.doesDataFileExist(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "000", + assertEquals(enableBootstrapSourceClean ? 2 : 1, partitionCleanStat.getSuccessDeleteFiles().size() + + (partitionCleanStat.getSuccessDeleteBootstrapBaseFiles() == null ? 0 + : partitionCleanStat.getSuccessDeleteBootstrapBaseFiles().size()), "Must clean at least one old file"); + + assertFalse(HoodieTestUtils.doesDataFileExist(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "00000000000001", file1P0C0)); - assertTrue(HoodieTestUtils.doesDataFileExist(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "001", + if (enableBootstrapSourceClean) { + assertFalse(new File(bootstrapMapping.get( + HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH).get(0).getBoostrapFileStatus().getPath().getUri()).exists()); + } + assertTrue(HoodieTestUtils.doesDataFileExist(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "00000000000002", file1P0C0)); - assertTrue(HoodieTestUtils.doesDataFileExist(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "002", + assertTrue(HoodieTestUtils.doesDataFileExist(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "00000000000003", file1P0C0)); - assertTrue(HoodieTestUtils.doesDataFileExist(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "001", + assertTrue(HoodieTestUtils.doesDataFileExist(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "00000000000002", file2P0C1)); - assertTrue(HoodieTestUtils.doesDataFileExist(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "002", + assertTrue(HoodieTestUtils.doesDataFileExist(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "00000000000003", file2P0C1)); - assertTrue(HoodieTestUtils.doesDataFileExist(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "002", + assertTrue(HoodieTestUtils.doesDataFileExist(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "00000000000003", file3P0C2)); - assertTrue(HoodieTestUtils.doesDataFileExist(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "003", + assertTrue(HoodieTestUtils.doesDataFileExist(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "00000000000004", file4P0C3)); // No cleaning on partially written file, with no commit. HoodieTestUtils - .createDataFile(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "004", file3P0C2); // update + .createDataFile(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "00000000000005", file3P0C2); // update commitMetadata = generateCommitMetadata(CollectionUtils.createImmutableMap(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, CollectionUtils.createImmutableList(file3P0C2))); metaClient.getActiveTimeline().createNewInstant( - new HoodieInstant(State.REQUESTED, HoodieTimeline.COMMIT_ACTION, "004")); + new HoodieInstant(State.REQUESTED, HoodieTimeline.COMMIT_ACTION, "00000000000005")); metaClient.getActiveTimeline().transitionRequestedToInflight( - new HoodieInstant(State.REQUESTED, HoodieTimeline.COMMIT_ACTION, "004"), + new HoodieInstant(State.REQUESTED, HoodieTimeline.COMMIT_ACTION, "00000000000005"), Option.of(commitMetadata.toJsonString().getBytes(StandardCharsets.UTF_8))); List hoodieCleanStatsFive = runCleaner(config, simulateFailureRetry); HoodieCleanStat cleanStat = getCleanStat(hoodieCleanStatsFive, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH); assertEquals(0, cleanStat != null ? cleanStat.getSuccessDeleteFiles().size() : 0, "Must not clean any files"); - assertTrue(HoodieTestUtils.doesDataFileExist(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "001", + assertTrue(HoodieTestUtils.doesDataFileExist(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "00000000000002", file1P0C0)); - assertTrue(HoodieTestUtils.doesDataFileExist(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "001", + assertTrue(HoodieTestUtils.doesDataFileExist(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "00000000000002", file2P0C1)); } + /** + * Generate Bootstrap index, bootstrap base file and corresponding metaClient. + * @return Partition to BootstrapFileMapping Map + * @throws IOException + */ + private Map> generateBootstrapIndexAndSourceData() throws IOException { + // create bootstrap source data path + java.nio.file.Path sourcePath = tempDir.resolve("data"); + java.nio.file.Files.createDirectories(sourcePath); + assertTrue(new File(sourcePath.toString()).exists()); + + // recreate metaClient with Bootstrap base path + metaClient = HoodieTestUtils.init(basePath, getTableType(), sourcePath.toString()); + + // generate bootstrap index + Map> bootstrapMapping = TestBootstrapIndex.generateBootstrapIndex(metaClient, sourcePath.toString(), + new String[] {HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH}, 1); + + for (Map.Entry> entry : bootstrapMapping.entrySet()) { + new File(sourcePath.toString() + "/" + entry.getKey()).mkdirs(); + assertTrue(new File(entry.getValue().get(0).getBoostrapFileStatus().getPath().getUri()).createNewFile()); + } + return bootstrapMapping; + } + /** * Test Cleaning functionality of table.rollback() API. */ diff --git a/hudi-common/pom.xml b/hudi-common/pom.xml index d9901dab8..857f334bc 100644 --- a/hudi-common/pom.xml +++ b/hudi-common/pom.xml @@ -70,6 +70,7 @@ ${basedir}/src/main/avro/HoodieCompactionOperation.avsc ${basedir}/src/main/avro/HoodieSavePointMetadata.avsc ${basedir}/src/main/avro/HoodieCompactionMetadata.avsc + ${basedir}/src/main/avro/HoodieCleanPartitionMetadata.avsc ${basedir}/src/main/avro/HoodieCleanMetadata.avsc ${basedir}/src/main/avro/HoodieCleanerPlan.avsc ${basedir}/src/main/avro/HoodieRollbackMetadata.avsc diff --git a/hudi-common/src/main/avro/HoodieCleanMetadata.avsc b/hudi-common/src/main/avro/HoodieCleanMetadata.avsc index f6c05c5b5..c26b5a693 100644 --- a/hudi-common/src/main/avro/HoodieCleanMetadata.avsc +++ b/hudi-common/src/main/avro/HoodieCleanMetadata.avsc @@ -24,23 +24,22 @@ {"name": "totalFilesDeleted", "type": "int"}, {"name": "earliestCommitToRetain", "type": "string"}, {"name": "partitionMetadata", "type": { - "type" : "map", "values" : { - "type": "record", - "name": "HoodieCleanPartitionMetadata", - "fields": [ - {"name": "partitionPath", "type": "string"}, - {"name": "policy", "type": "string"}, - {"name": "deletePathPatterns", "type": {"type": "array", "items": "string"}}, - {"name": "successDeleteFiles", "type": {"type": "array", "items": "string"}}, - {"name": "failedDeleteFiles", "type": {"type": "array", "items": "string"}} - ] - } - } + "type" : "map", "values" : "HoodieCleanPartitionMetadata" + } }, { "name":"version", "type":["int", "null"], "default": 1 + }, + { + "name": "bootstrapPartitionMetadata", + "type": [ "null", { + "type" : "map", + "values" : "HoodieCleanPartitionMetadata", + "default" : null + }], + "default" : null } ] } diff --git a/hudi-common/src/main/avro/HoodieCleanPartitionMetadata.avsc b/hudi-common/src/main/avro/HoodieCleanPartitionMetadata.avsc new file mode 100644 index 000000000..877b72591 --- /dev/null +++ b/hudi-common/src/main/avro/HoodieCleanPartitionMetadata.avsc @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +{ + "namespace": "org.apache.hudi.avro.model", + "type": "record", + "name": "HoodieCleanPartitionMetadata", + "fields": [ + {"name": "partitionPath", "type": "string"}, + {"name": "policy", "type": "string"}, + {"name": "deletePathPatterns", "type": {"type": "array", "items": "string"}}, + {"name": "successDeleteFiles", "type": {"type": "array", "items": "string"}}, + {"name": "failedDeleteFiles", "type": {"type": "array", "items": "string"}} + ] +} diff --git a/hudi-common/src/main/avro/HoodieCleanerPlan.avsc b/hudi-common/src/main/avro/HoodieCleanerPlan.avsc index b87ed7718..c4481c2cd 100644 --- a/hudi-common/src/main/avro/HoodieCleanerPlan.avsc +++ b/hudi-common/src/main/avro/HoodieCleanerPlan.avsc @@ -47,6 +47,7 @@ "type": "string" }, { + /** This is deprecated and replaced by the field filePathsToBeDeletedPerPartition **/ "name": "filesToBeDeletedPerPartition", "type": [ "null", { @@ -64,6 +65,33 @@ "name":"version", "type":["int", "null"], "default": 1 + }, + { + "name": "filePathsToBeDeletedPerPartition", + "doc": "This field replaces the field filesToBeDeletedPerPartition", + "type": [ + "null", { + "type":"map", + "values": { + "type":"array", + "items":{ + "name":"HoodieCleanFileInfo", + "type": "record", + "fields":[ + { + "name":"filePath", + "type":["null","string"], + "default": null + }, + { + "name":"isBootstrapBaseFile", + "type":["null","boolean"], + "default": null + } + ] + } + }}], + "default" : null } ] } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/HoodieCleanStat.java b/hudi-common/src/main/java/org/apache/hudi/common/HoodieCleanStat.java index 5fc3a155c..e9de502f7 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/HoodieCleanStat.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/HoodieCleanStat.java @@ -20,6 +20,7 @@ package org.apache.hudi.common; import org.apache.hudi.common.model.HoodieCleaningPolicy; import org.apache.hudi.common.table.timeline.HoodieInstant; +import org.apache.hudi.common.util.CollectionUtils; import org.apache.hudi.common.util.Option; import java.io.Serializable; @@ -39,17 +40,35 @@ public class HoodieCleanStat implements Serializable { private final List successDeleteFiles; // Files that could not be deleted private final List failedDeleteFiles; + // Bootstrap Base Path patterns that were generated for the delete operation + private final List deleteBootstrapBasePathPatterns; + private final List successDeleteBootstrapBaseFiles; + // Files that could not be deleted + private final List failedDeleteBootstrapBaseFiles; // Earliest commit that was retained in this clean private final String earliestCommitToRetain; public HoodieCleanStat(HoodieCleaningPolicy policy, String partitionPath, List deletePathPatterns, List successDeleteFiles, List failedDeleteFiles, String earliestCommitToRetain) { + this(policy, partitionPath, deletePathPatterns, successDeleteFiles, failedDeleteFiles, earliestCommitToRetain, + CollectionUtils.createImmutableList(), CollectionUtils.createImmutableList(), + CollectionUtils.createImmutableList()); + } + + public HoodieCleanStat(HoodieCleaningPolicy policy, String partitionPath, List deletePathPatterns, + List successDeleteFiles, List failedDeleteFiles, + String earliestCommitToRetain, List deleteBootstrapBasePathPatterns, + List successDeleteBootstrapBaseFiles, + List failedDeleteBootstrapBaseFiles) { this.policy = policy; this.partitionPath = partitionPath; this.deletePathPatterns = deletePathPatterns; this.successDeleteFiles = successDeleteFiles; this.failedDeleteFiles = failedDeleteFiles; this.earliestCommitToRetain = earliestCommitToRetain; + this.deleteBootstrapBasePathPatterns = deleteBootstrapBasePathPatterns; + this.successDeleteBootstrapBaseFiles = successDeleteBootstrapBaseFiles; + this.failedDeleteBootstrapBaseFiles = failedDeleteBootstrapBaseFiles; } public HoodieCleaningPolicy getPolicy() { @@ -72,6 +91,18 @@ public class HoodieCleanStat implements Serializable { return failedDeleteFiles; } + public List getDeleteBootstrapBasePathPatterns() { + return deleteBootstrapBasePathPatterns; + } + + public List getSuccessDeleteBootstrapBaseFiles() { + return successDeleteBootstrapBaseFiles; + } + + public List getFailedDeleteBootstrapBaseFiles() { + return failedDeleteBootstrapBaseFiles; + } + public String getEarliestCommitToRetain() { return earliestCommitToRetain; } @@ -91,6 +122,9 @@ public class HoodieCleanStat implements Serializable { private List failedDeleteFiles; private String partitionPath; private String earliestCommitToRetain; + private List deleteBootstrapBasePathPatterns; + private List successDeleteBootstrapBaseFiles; + private List failedDeleteBootstrapBaseFiles; public Builder withPolicy(HoodieCleaningPolicy policy) { this.policy = policy; @@ -112,6 +146,21 @@ public class HoodieCleanStat implements Serializable { return this; } + public Builder withDeleteBootstrapBasePathPatterns(List deletePathPatterns) { + this.deleteBootstrapBasePathPatterns = deletePathPatterns; + return this; + } + + public Builder withSuccessfulDeleteBootstrapBaseFiles(List successDeleteFiles) { + this.successDeleteBootstrapBaseFiles = successDeleteFiles; + return this; + } + + public Builder withFailedDeleteBootstrapBaseFiles(List failedDeleteFiles) { + this.failedDeleteBootstrapBaseFiles = failedDeleteFiles; + return this; + } + public Builder withPartitionPath(String partitionPath) { this.partitionPath = partitionPath; return this; @@ -125,7 +174,8 @@ public class HoodieCleanStat implements Serializable { public HoodieCleanStat build() { return new HoodieCleanStat(policy, partitionPath, deletePathPatterns, successDeleteFiles, failedDeleteFiles, - earliestCommitToRetain); + earliestCommitToRetain, deleteBootstrapBasePathPatterns, successDeleteBootstrapBaseFiles, + failedDeleteBootstrapBaseFiles); } } @@ -137,7 +187,10 @@ public class HoodieCleanStat implements Serializable { + ", deletePathPatterns=" + deletePathPatterns + ", successDeleteFiles=" + successDeleteFiles + ", failedDeleteFiles=" + failedDeleteFiles - + ", earliestCommitToRetain='" + earliestCommitToRetain + '\'' + + ", earliestCommitToRetain='" + earliestCommitToRetain + + ", deleteBootstrapBasePathPatterns=" + deleteBootstrapBasePathPatterns + + ", successDeleteBootstrapBaseFiles=" + successDeleteBootstrapBaseFiles + + ", failedDeleteBootstrapBaseFiles=" + failedDeleteBootstrapBaseFiles + '\'' + '}'; } } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/model/CleanFileInfo.java b/hudi-common/src/main/java/org/apache/hudi/common/model/CleanFileInfo.java new file mode 100644 index 000000000..dd6db7aa3 --- /dev/null +++ b/hudi-common/src/main/java/org/apache/hudi/common/model/CleanFileInfo.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.common.model; + +import org.apache.hudi.avro.model.HoodieCleanFileInfo; + +import java.io.Serializable; + +public class CleanFileInfo implements Serializable { + + private final String filePath; + private final boolean isBootstrapBaseFile; + + public CleanFileInfo(String filePath, boolean isBootstrapBaseFile) { + this.filePath = filePath; + this.isBootstrapBaseFile = isBootstrapBaseFile; + } + + public String getFilePath() { + return filePath; + } + + public boolean isBootstrapBaseFile() { + return isBootstrapBaseFile; + } + + public HoodieCleanFileInfo toHoodieFileCleanInfo() { + return new HoodieCleanFileInfo(filePath, isBootstrapBaseFile); + } +} + diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/versioning/clean/CleanMetadataMigrator.java b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/versioning/clean/CleanMetadataMigrator.java index cd30a692f..adb5cc00a 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/versioning/clean/CleanMetadataMigrator.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/versioning/clean/CleanMetadataMigrator.java @@ -29,7 +29,7 @@ public class CleanMetadataMigrator extends MetadataMigrator public CleanMetadataMigrator(HoodieTableMetaClient metaClient) { super(metaClient, Arrays - .asList(new CleanV1MigrationHandler(metaClient), - new CleanV2MigrationHandler(metaClient))); + .asList(new CleanMetadataV1MigrationHandler(metaClient), + new CleanMetadataV2MigrationHandler(metaClient))); } } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/versioning/clean/CleanV1MigrationHandler.java b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/versioning/clean/CleanMetadataV1MigrationHandler.java similarity index 95% rename from hudi-common/src/main/java/org/apache/hudi/common/table/timeline/versioning/clean/CleanV1MigrationHandler.java rename to hudi-common/src/main/java/org/apache/hudi/common/table/timeline/versioning/clean/CleanMetadataV1MigrationHandler.java index 49b70ac60..0b6989405 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/versioning/clean/CleanV1MigrationHandler.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/versioning/clean/CleanMetadataV1MigrationHandler.java @@ -31,11 +31,11 @@ import org.apache.hadoop.fs.Path; import java.util.Map; import java.util.stream.Collectors; -public class CleanV1MigrationHandler extends AbstractMigratorBase { +public class CleanMetadataV1MigrationHandler extends AbstractMigratorBase { public static final Integer VERSION = 1; - public CleanV1MigrationHandler(HoodieTableMetaClient metaClient) { + public CleanMetadataV1MigrationHandler(HoodieTableMetaClient metaClient) { super(metaClient); } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/versioning/clean/CleanV2MigrationHandler.java b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/versioning/clean/CleanMetadataV2MigrationHandler.java similarity index 95% rename from hudi-common/src/main/java/org/apache/hudi/common/table/timeline/versioning/clean/CleanV2MigrationHandler.java rename to hudi-common/src/main/java/org/apache/hudi/common/table/timeline/versioning/clean/CleanMetadataV2MigrationHandler.java index 2d8a869b1..d74dd888e 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/versioning/clean/CleanV2MigrationHandler.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/versioning/clean/CleanMetadataV2MigrationHandler.java @@ -31,11 +31,11 @@ import java.util.List; import java.util.Map; import java.util.stream.Collectors; -public class CleanV2MigrationHandler extends AbstractMigratorBase { +public class CleanMetadataV2MigrationHandler extends AbstractMigratorBase { public static final Integer VERSION = 2; - public CleanV2MigrationHandler(HoodieTableMetaClient metaClient) { + public CleanMetadataV2MigrationHandler(HoodieTableMetaClient metaClient) { super(metaClient); } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/versioning/clean/CleanPlanMigrator.java b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/versioning/clean/CleanPlanMigrator.java new file mode 100644 index 000000000..73e5cbc32 --- /dev/null +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/versioning/clean/CleanPlanMigrator.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.common.table.timeline.versioning.clean; + +import org.apache.hudi.avro.model.HoodieCleanerPlan; +import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.common.table.timeline.versioning.MetadataMigrator; + +import java.util.Arrays; + +/** + * Manages upgrade/downgrade of cleaner plan. + */ +public class CleanPlanMigrator extends MetadataMigrator { + + public CleanPlanMigrator(HoodieTableMetaClient metaClient) { + super(metaClient, + Arrays.asList(new CleanPlanV1MigrationHandler(metaClient), new CleanPlanV2MigrationHandler(metaClient))); + } +} diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/versioning/clean/CleanPlanV1MigrationHandler.java b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/versioning/clean/CleanPlanV1MigrationHandler.java new file mode 100644 index 000000000..0010aa21f --- /dev/null +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/versioning/clean/CleanPlanV1MigrationHandler.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.common.table.timeline.versioning.clean; + +import java.util.HashMap; +import org.apache.hudi.avro.model.HoodieCleanerPlan; +import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.common.table.timeline.versioning.AbstractMigratorBase; +import org.apache.hudi.common.util.collection.Pair; + +import org.apache.hadoop.fs.Path; + +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +public class CleanPlanV1MigrationHandler extends AbstractMigratorBase { + + public static final Integer VERSION = 1; + + public CleanPlanV1MigrationHandler(HoodieTableMetaClient metaClient) { + super(metaClient); + } + + @Override + public Integer getManagedVersion() { + return VERSION; + } + + @Override + public HoodieCleanerPlan upgradeFrom(HoodieCleanerPlan plan) { + throw new IllegalArgumentException( + "This is the lowest version. Plan cannot be any lower version"); + } + + @Override + public HoodieCleanerPlan downgradeFrom(HoodieCleanerPlan plan) { + if (metaClient.getTableConfig().getBootstrapBasePath().isPresent()) { + throw new IllegalArgumentException( + "This version do not support METADATA_ONLY bootstrapped tables. Failed to downgrade."); + } + Map> filesPerPartition = plan.getFilePathsToBeDeletedPerPartition().entrySet().stream() + .map(e -> { + return Pair.of(e.getKey(), e.getValue().stream().map(v -> new Path(v.getFilePath()).getName()) + .collect(Collectors.toList())); + }).collect(Collectors.toMap(Pair::getKey, Pair::getValue)); + return new HoodieCleanerPlan(plan.getEarliestInstantToRetain(), plan.getPolicy(), filesPerPartition, VERSION, + new HashMap<>()); + } +} diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/versioning/clean/CleanPlanV2MigrationHandler.java b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/versioning/clean/CleanPlanV2MigrationHandler.java new file mode 100644 index 000000000..e141e9a15 --- /dev/null +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/versioning/clean/CleanPlanV2MigrationHandler.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.common.table.timeline.versioning.clean; + +import org.apache.hudi.avro.model.HoodieCleanFileInfo; +import org.apache.hudi.avro.model.HoodieCleanerPlan; +import org.apache.hudi.common.fs.FSUtils; +import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.common.table.timeline.versioning.AbstractMigratorBase; +import org.apache.hudi.common.util.collection.Pair; + +import org.apache.hadoop.fs.Path; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +public class CleanPlanV2MigrationHandler extends AbstractMigratorBase { + + public static final Integer VERSION = 2; + + public CleanPlanV2MigrationHandler(HoodieTableMetaClient metaClient) { + super(metaClient); + } + + @Override + public Integer getManagedVersion() { + return VERSION; + } + + @Override + public HoodieCleanerPlan upgradeFrom(HoodieCleanerPlan plan) { + Map> filePathsPerPartition = + plan.getFilesToBeDeletedPerPartition().entrySet().stream().map(e -> Pair.of(e.getKey(), e.getValue().stream() + .map(v -> new HoodieCleanFileInfo( + new Path(FSUtils.getPartitionPath(metaClient.getBasePath(), e.getKey()), v).toString(), false)) + .collect(Collectors.toList()))).collect(Collectors.toMap(Pair::getKey, Pair::getValue)); + return new HoodieCleanerPlan(plan.getEarliestInstantToRetain(), plan.getPolicy(), new HashMap<>(), VERSION, + filePathsPerPartition); + } + + @Override + public HoodieCleanerPlan downgradeFrom(HoodieCleanerPlan input) { + throw new IllegalArgumentException( + "This is the current highest version. Plan cannot be any higher version"); + } +} diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/view/IncrementalTimelineSyncFileSystemView.java b/hudi-common/src/main/java/org/apache/hudi/common/table/view/IncrementalTimelineSyncFileSystemView.java index 2e31cea6e..ec29e93bb 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/view/IncrementalTimelineSyncFileSystemView.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/view/IncrementalTimelineSyncFileSystemView.java @@ -252,7 +252,8 @@ public abstract class IncrementalTimelineSyncFileSystemView extends AbstractTabl } /** - * Add newly found clean instant. + * Add newly found clean instant. Note that cleaner metadata (.clean.completed) + * contains only relative paths unlike clean plans (.clean.requested) which contains absolute paths. * * @param timeline Timeline * @param instant Clean instant diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/CleanerUtils.java b/hudi-common/src/main/java/org/apache/hudi/common/util/CleanerUtils.java index 96ac4ca41..6049ee307 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/util/CleanerUtils.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/util/CleanerUtils.java @@ -18,16 +18,20 @@ package org.apache.hudi.common.util; +import java.util.stream.Collectors; +import org.apache.hudi.avro.model.HoodieCleanFileInfo; import org.apache.hudi.avro.model.HoodieCleanMetadata; import org.apache.hudi.avro.model.HoodieCleanPartitionMetadata; import org.apache.hudi.avro.model.HoodieCleanerPlan; import org.apache.hudi.common.HoodieCleanStat; +import org.apache.hudi.common.model.CleanFileInfo; import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.table.timeline.TimelineMetadataUtils; import org.apache.hudi.common.table.timeline.versioning.clean.CleanMetadataMigrator; -import org.apache.hudi.common.table.timeline.versioning.clean.CleanV1MigrationHandler; -import org.apache.hudi.common.table.timeline.versioning.clean.CleanV2MigrationHandler; +import org.apache.hudi.common.table.timeline.versioning.clean.CleanMetadataV1MigrationHandler; +import org.apache.hudi.common.table.timeline.versioning.clean.CleanMetadataV2MigrationHandler; +import org.apache.hudi.common.table.timeline.versioning.clean.CleanPlanMigrator; import java.io.IOException; import java.util.HashMap; @@ -35,14 +39,16 @@ import java.util.List; import java.util.Map; public class CleanerUtils { - public static final Integer CLEAN_METADATA_VERSION_1 = CleanV1MigrationHandler.VERSION; - public static final Integer CLEAN_METADATA_VERSION_2 = CleanV2MigrationHandler.VERSION; + public static final Integer CLEAN_METADATA_VERSION_1 = CleanMetadataV1MigrationHandler.VERSION; + public static final Integer CLEAN_METADATA_VERSION_2 = CleanMetadataV2MigrationHandler.VERSION; public static final Integer LATEST_CLEAN_METADATA_VERSION = CLEAN_METADATA_VERSION_2; public static HoodieCleanMetadata convertCleanMetadata(String startCleanTime, Option durationInMs, List cleanStats) { Map partitionMetadataMap = new HashMap<>(); + Map partitionBootstrapMetadataMap = new HashMap<>(); + int totalDeleted = 0; String earliestCommitToRetain = null; for (HoodieCleanStat stat : cleanStats) { @@ -50,6 +56,13 @@ public class CleanerUtils { new HoodieCleanPartitionMetadata(stat.getPartitionPath(), stat.getPolicy().name(), stat.getDeletePathPatterns(), stat.getSuccessDeleteFiles(), stat.getFailedDeleteFiles()); partitionMetadataMap.put(stat.getPartitionPath(), metadata); + if ((null != stat.getDeleteBootstrapBasePathPatterns()) + && (!stat.getDeleteBootstrapBasePathPatterns().isEmpty())) { + HoodieCleanPartitionMetadata bootstrapMetadata = new HoodieCleanPartitionMetadata(stat.getPartitionPath(), + stat.getPolicy().name(), stat.getDeleteBootstrapBasePathPatterns(), stat.getSuccessDeleteBootstrapBaseFiles(), + stat.getFailedDeleteBootstrapBaseFiles()); + partitionBootstrapMetadataMap.put(stat.getPartitionPath(), bootstrapMetadata); + } totalDeleted += stat.getSuccessDeleteFiles().size(); if (earliestCommitToRetain == null) { // This will be the same for all partitions @@ -57,8 +70,8 @@ public class CleanerUtils { } } - return new HoodieCleanMetadata(startCleanTime, - durationInMs.orElseGet(() -> -1L), totalDeleted, earliestCommitToRetain, partitionMetadataMap, CLEAN_METADATA_VERSION_2); + return new HoodieCleanMetadata(startCleanTime, durationInMs.orElseGet(() -> -1L), totalDeleted, + earliestCommitToRetain, partitionMetadataMap, CLEAN_METADATA_VERSION_2, partitionBootstrapMetadataMap); } /** @@ -77,7 +90,7 @@ public class CleanerUtils { } /** - * Get Cleaner Plan corresponding to a clean instant. + * Get Latest version of cleaner plan corresponding to a clean instant. * @param metaClient Hoodie Table Meta Client * @param cleanInstant Instant referring to clean action * @return Cleaner plan corresponding to clean instant @@ -85,7 +98,18 @@ public class CleanerUtils { */ public static HoodieCleanerPlan getCleanerPlan(HoodieTableMetaClient metaClient, HoodieInstant cleanInstant) throws IOException { - return TimelineMetadataUtils.deserializeAvroMetadata(metaClient.getActiveTimeline().readCleanerInfoAsBytes(cleanInstant).get(), - HoodieCleanerPlan.class); + CleanPlanMigrator cleanPlanMigrator = new CleanPlanMigrator(metaClient); + HoodieCleanerPlan cleanerPlan = TimelineMetadataUtils.deserializeAvroMetadata( + metaClient.getActiveTimeline().readCleanerInfoAsBytes(cleanInstant).get(), HoodieCleanerPlan.class); + return cleanPlanMigrator.upgradeToLatest(cleanerPlan, cleanerPlan.getVersion()); + } + + /** + * Convert list of cleanFileInfo instances to list of avro-generated HoodieCleanFileInfo instances. + * @param cleanFileInfoList + * @return + */ + public static List convertToHoodieCleanFileInfoList(List cleanFileInfoList) { + return cleanFileInfoList.stream().map(CleanFileInfo::toHoodieFileCleanInfo).collect(Collectors.toList()); } } diff --git a/hudi-common/src/test/java/org/apache/hudi/common/bootstrap/TestBootstrapIndex.java b/hudi-common/src/test/java/org/apache/hudi/common/bootstrap/TestBootstrapIndex.java index 428bd8cb9..ecfb59da8 100644 --- a/hudi-common/src/test/java/org/apache/hudi/common/bootstrap/TestBootstrapIndex.java +++ b/hudi-common/src/test/java/org/apache/hudi/common/bootstrap/TestBootstrapIndex.java @@ -26,6 +26,7 @@ import org.apache.hudi.common.bootstrap.index.BootstrapIndex.IndexWriter; import org.apache.hudi.common.bootstrap.index.HFileBootstrapIndex; import org.apache.hudi.common.model.BootstrapFileMapping; import org.apache.hudi.common.model.HoodieFileGroupId; +import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.testutils.HoodieCommonTestHarness; import org.apache.hudi.common.util.collection.Pair; @@ -86,7 +87,7 @@ public class TestBootstrapIndex extends HoodieCommonTestHarness { @Test public void testBootstrapIndexConcurrent() throws Exception { - Map> bootstrapMapping = generateBootstrapIndex(100); + Map> bootstrapMapping = generateBootstrapIndex(metaClient, BOOTSTRAP_BASE_PATH, PARTITIONS, 100); final int numThreads = 20; final int numRequestsPerThread = 50; ExecutorService service = Executors.newFixedThreadPool(numThreads); @@ -111,15 +112,15 @@ public class TestBootstrapIndex extends HoodieCommonTestHarness { } private void testBootstrapIndexOneRound(int numEntriesPerPartition) throws IOException { - Map> bootstrapMapping = generateBootstrapIndex(numEntriesPerPartition); + Map> bootstrapMapping = generateBootstrapIndex(metaClient, BOOTSTRAP_BASE_PATH, PARTITIONS, numEntriesPerPartition); validateBootstrapIndex(bootstrapMapping); } - private Map> generateBootstrapIndex(int numEntriesPerPartition) - throws IOException { - Map> bootstrapMapping = generateBootstrapMapping(numEntriesPerPartition); + public static Map> generateBootstrapIndex(HoodieTableMetaClient metaClient, + String sourceBasePath, String[] partitions, int numEntriesPerPartition) { + Map> bootstrapMapping = generateBootstrapMapping(sourceBasePath, partitions, numEntriesPerPartition); BootstrapIndex index = new HFileBootstrapIndex(metaClient); - try (IndexWriter writer = index.createWriter(BOOTSTRAP_BASE_PATH)) { + try (IndexWriter writer = index.createWriter(sourceBasePath)) { writer.begin(); bootstrapMapping.entrySet().stream().forEach(e -> writer.appendNextPartition(e.getKey(), e.getValue())); writer.finish(); @@ -162,13 +163,14 @@ public class TestBootstrapIndex extends HoodieCommonTestHarness { } } - private Map> generateBootstrapMapping(int numEntriesPerPartition) { - return Arrays.stream(PARTITIONS).map(partition -> { + private static Map> generateBootstrapMapping(String sourceBasePath, + String[] partitions, int numEntriesPerPartition) { + return Arrays.stream(partitions).map(partition -> { return Pair.of(partition, IntStream.range(0, numEntriesPerPartition).mapToObj(idx -> { String hudiFileId = UUID.randomUUID().toString(); String sourceFileName = idx + ".parquet"; HoodieFileStatus sourceFileStatus = HoodieFileStatus.newBuilder() - .setPath(HoodiePath.newBuilder().setUri(BOOTSTRAP_BASE_PATH + "/" + partition + "/" + sourceFileName).build()) + .setPath(HoodiePath.newBuilder().setUri(sourceBasePath + "/" + partition + "/" + sourceFileName).build()) .setLength(256 * 1024 * 1024L) .setAccessTime(new Date().getTime()) .setModificationTime(new Date().getTime() + 99999) @@ -179,7 +181,7 @@ public class TestBootstrapIndex extends HoodieCommonTestHarness { .setPermission(HoodieFSPermission.newBuilder().setUserAction(FsAction.ALL.name()) .setGroupAction(FsAction.READ.name()).setOtherAction(FsAction.NONE.name()).setStickyBit(true).build()) .build(); - return new BootstrapFileMapping(BOOTSTRAP_BASE_PATH, partition, partition, sourceFileStatus, hudiFileId); + return new BootstrapFileMapping(sourceBasePath, partition, partition, sourceFileStatus, hudiFileId); }).collect(Collectors.toList())); }).collect(Collectors.toMap(Pair::getKey, Pair::getValue)); } diff --git a/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestUtils.java b/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestUtils.java index 92d431c2d..c88ea5129 100644 --- a/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestUtils.java +++ b/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestUtils.java @@ -45,6 +45,7 @@ import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.table.timeline.HoodieInstant.State; import org.apache.hudi.common.table.timeline.HoodieTimeline; import org.apache.hudi.common.table.timeline.TimelineMetadataUtils; +import org.apache.hudi.common.table.timeline.versioning.clean.CleanPlanV2MigrationHandler; import org.apache.hudi.common.util.CleanerUtils; import org.apache.hudi.common.util.CompactionUtils; import org.apache.hudi.common.util.Option; @@ -219,7 +220,8 @@ public class HoodieTestUtils { os = metaClient.getFs().create(commitFile, true); // Write empty clean metadata os.write(TimelineMetadataUtils.serializeCleanerPlan( - new HoodieCleanerPlan(new HoodieActionInstant("", "", ""), "", new HashMap<>(), 1)).get()); + new HoodieCleanerPlan(new HoodieActionInstant("", "", ""), "", new HashMap<>(), + CleanPlanV2MigrationHandler.VERSION, new HashMap<>())).get()); } catch (IOException ioe) { throw new HoodieIOException(ioe.getMessage(), ioe); } finally { diff --git a/hudi-spark/src/test/java/org/apache/hudi/client/TestBootstrap.java b/hudi-spark/src/test/java/org/apache/hudi/client/TestBootstrap.java index b53c7d887..1c31ed5a1 100644 --- a/hudi-spark/src/test/java/org/apache/hudi/client/TestBootstrap.java +++ b/hudi-spark/src/test/java/org/apache/hudi/client/TestBootstrap.java @@ -551,7 +551,7 @@ public class TestBootstrap extends HoodieClientTestBase { } public static Dataset generateTestRawTripDataset(double timestamp, int from, int to, List partitionPaths, - JavaSparkContext jsc, SQLContext sqlContext) { + JavaSparkContext jsc, SQLContext sqlContext) { boolean isPartitioned = partitionPaths != null && !partitionPaths.isEmpty(); final List records = new ArrayList<>(); IntStream.range(from, to).forEach(i -> {