1
0

[HUDI-808] Support cleaning bootstrap source data (#1870)

Co-authored-by: Wenning Ding <wenningd@amazon.com>
Co-authored-by: Balaji Varadarajan <vbalaji@apache.org>
This commit is contained in:
wenningd
2020-08-11 01:43:46 -07:00
committed by GitHub
parent 626f78f6f6
commit 8b928e9bca
23 changed files with 772 additions and 173 deletions

View File

@@ -52,6 +52,8 @@ public class HoodieCompactionConfig extends DefaultHoodieConfig {
public static final String MAX_COMMITS_TO_KEEP_PROP = "hoodie.keep.max.commits"; public static final String MAX_COMMITS_TO_KEEP_PROP = "hoodie.keep.max.commits";
public static final String MIN_COMMITS_TO_KEEP_PROP = "hoodie.keep.min.commits"; public static final String MIN_COMMITS_TO_KEEP_PROP = "hoodie.keep.min.commits";
public static final String COMMITS_ARCHIVAL_BATCH_SIZE_PROP = "hoodie.commits.archival.batch"; public static final String COMMITS_ARCHIVAL_BATCH_SIZE_PROP = "hoodie.commits.archival.batch";
// Set true to clean bootstrap source files when necessary
public static final String CLEANER_BOOTSTRAP_BASE_FILE_ENABLED = "hoodie.cleaner.delete.bootstrap.base.file";
// Upsert uses this file size to compact new data onto existing files.. // Upsert uses this file size to compact new data onto existing files..
public static final String PARQUET_SMALL_FILE_LIMIT_BYTES = "hoodie.parquet.small.file.limit"; public static final String PARQUET_SMALL_FILE_LIMIT_BYTES = "hoodie.parquet.small.file.limit";
// By default, treat any file <= 100MB as a small file. // By default, treat any file <= 100MB as a small file.
@@ -112,6 +114,7 @@ public class HoodieCompactionConfig extends DefaultHoodieConfig {
private static final String DEFAULT_MAX_COMMITS_TO_KEEP = "30"; private static final String DEFAULT_MAX_COMMITS_TO_KEEP = "30";
private static final String DEFAULT_MIN_COMMITS_TO_KEEP = "20"; private static final String DEFAULT_MIN_COMMITS_TO_KEEP = "20";
private static final String DEFAULT_COMMITS_ARCHIVAL_BATCH_SIZE = String.valueOf(10); private static final String DEFAULT_COMMITS_ARCHIVAL_BATCH_SIZE = String.valueOf(10);
private static final String DEFAULT_CLEANER_BOOTSTRAP_BASE_FILE_ENABLED = "false";
public static final String TARGET_PARTITIONS_PER_DAYBASED_COMPACTION_PROP = public static final String TARGET_PARTITIONS_PER_DAYBASED_COMPACTION_PROP =
"hoodie.compaction.daybased.target.partitions"; "hoodie.compaction.daybased.target.partitions";
// 500GB of target IO per compaction (both read and write) // 500GB of target IO per compaction (both read and write)
@@ -252,6 +255,11 @@ public class HoodieCompactionConfig extends DefaultHoodieConfig {
return this; return this;
} }
public Builder withCleanBootstrapBaseFileEnabled(Boolean cleanBootstrapSourceFileEnabled) {
props.setProperty(CLEANER_BOOTSTRAP_BASE_FILE_ENABLED, String.valueOf(cleanBootstrapSourceFileEnabled));
return this;
}
public HoodieCompactionConfig build() { public HoodieCompactionConfig build() {
HoodieCompactionConfig config = new HoodieCompactionConfig(props); HoodieCompactionConfig config = new HoodieCompactionConfig(props);
setDefaultOnCondition(props, !props.containsKey(AUTO_CLEAN_PROP), AUTO_CLEAN_PROP, DEFAULT_AUTO_CLEAN); setDefaultOnCondition(props, !props.containsKey(AUTO_CLEAN_PROP), AUTO_CLEAN_PROP, DEFAULT_AUTO_CLEAN);
@@ -298,6 +306,8 @@ public class HoodieCompactionConfig extends DefaultHoodieConfig {
TARGET_PARTITIONS_PER_DAYBASED_COMPACTION_PROP, DEFAULT_TARGET_PARTITIONS_PER_DAYBASED_COMPACTION); TARGET_PARTITIONS_PER_DAYBASED_COMPACTION_PROP, DEFAULT_TARGET_PARTITIONS_PER_DAYBASED_COMPACTION);
setDefaultOnCondition(props, !props.containsKey(COMMITS_ARCHIVAL_BATCH_SIZE_PROP), setDefaultOnCondition(props, !props.containsKey(COMMITS_ARCHIVAL_BATCH_SIZE_PROP),
COMMITS_ARCHIVAL_BATCH_SIZE_PROP, DEFAULT_COMMITS_ARCHIVAL_BATCH_SIZE); COMMITS_ARCHIVAL_BATCH_SIZE_PROP, DEFAULT_COMMITS_ARCHIVAL_BATCH_SIZE);
setDefaultOnCondition(props, !props.containsKey(CLEANER_BOOTSTRAP_BASE_FILE_ENABLED),
CLEANER_BOOTSTRAP_BASE_FILE_ENABLED, DEFAULT_CLEANER_BOOTSTRAP_BASE_FILE_ENABLED);
HoodieCleaningPolicy.valueOf(props.getProperty(CLEANER_POLICY_PROP)); HoodieCleaningPolicy.valueOf(props.getProperty(CLEANER_POLICY_PROP));

View File

@@ -369,6 +369,10 @@ public class HoodieWriteConfig extends DefaultHoodieConfig {
return Integer.parseInt(props.getProperty(HoodieCompactionConfig.COMMITS_ARCHIVAL_BATCH_SIZE_PROP)); return Integer.parseInt(props.getProperty(HoodieCompactionConfig.COMMITS_ARCHIVAL_BATCH_SIZE_PROP));
} }
public Boolean shouldCleanBootstrapBaseFile() {
return Boolean.valueOf(props.getProperty(HoodieCompactionConfig.CLEANER_BOOTSTRAP_BASE_FILE_ENABLED));
}
/** /**
* index properties. * index properties.
*/ */

View File

@@ -23,13 +23,15 @@ import org.apache.hadoop.fs.Path;
import org.apache.hudi.avro.model.HoodieActionInstant; import org.apache.hudi.avro.model.HoodieActionInstant;
import org.apache.hudi.avro.model.HoodieCleanMetadata; import org.apache.hudi.avro.model.HoodieCleanMetadata;
import org.apache.hudi.avro.model.HoodieCleanerPlan; import org.apache.hudi.avro.model.HoodieCleanerPlan;
import org.apache.hudi.avro.model.HoodieCleanFileInfo;
import org.apache.hudi.common.HoodieCleanStat; import org.apache.hudi.common.HoodieCleanStat;
import org.apache.hudi.common.fs.FSUtils; import org.apache.hudi.common.model.CleanFileInfo;
import org.apache.hudi.common.model.HoodieCleaningPolicy; import org.apache.hudi.common.model.HoodieCleaningPolicy;
import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline; import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.table.timeline.TimelineMetadataUtils; import org.apache.hudi.common.table.timeline.TimelineMetadataUtils;
import org.apache.hudi.common.util.CleanerUtils; import org.apache.hudi.common.util.CleanerUtils;
import org.apache.hudi.common.util.CollectionUtils;
import org.apache.hudi.common.util.HoodieTimer; import org.apache.hudi.common.util.HoodieTimer;
import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ValidationUtils; import org.apache.hudi.common.util.ValidationUtils;
@@ -82,40 +84,45 @@ public class CleanActionExecutor extends BaseActionExecutor<HoodieCleanMetadata>
LOG.info("Using cleanerParallelism: " + cleanerParallelism); LOG.info("Using cleanerParallelism: " + cleanerParallelism);
jsc.setJobGroup(this.getClass().getSimpleName(), "Generates list of file slices to be cleaned"); jsc.setJobGroup(this.getClass().getSimpleName(), "Generates list of file slices to be cleaned");
Map<String, List<String>> cleanOps = jsc Map<String, List<HoodieCleanFileInfo>> cleanOps = jsc
.parallelize(partitionsToClean, cleanerParallelism) .parallelize(partitionsToClean, cleanerParallelism)
.map(partitionPathToClean -> Pair.of(partitionPathToClean, planner.getDeletePaths(partitionPathToClean))) .map(partitionPathToClean -> Pair.of(partitionPathToClean, planner.getDeletePaths(partitionPathToClean)))
.collect().stream() .collect().stream()
.collect(Collectors.toMap(Pair::getKey, Pair::getValue)); .collect(Collectors.toMap(Pair::getKey, y -> CleanerUtils.convertToHoodieCleanFileInfoList(y.getValue())));
return new HoodieCleanerPlan(earliestInstant return new HoodieCleanerPlan(earliestInstant
.map(x -> new HoodieActionInstant(x.getTimestamp(), x.getAction(), x.getState().name())).orElse(null), .map(x -> new HoodieActionInstant(x.getTimestamp(), x.getAction(), x.getState().name())).orElse(null),
config.getCleanerPolicy().name(), cleanOps, 1); config.getCleanerPolicy().name(), CollectionUtils.createImmutableMap(),
CleanPlanner.LATEST_CLEAN_PLAN_VERSION, cleanOps);
} catch (IOException e) { } catch (IOException e) {
throw new HoodieIOException("Failed to schedule clean operation", e); throw new HoodieIOException("Failed to schedule clean operation", e);
} }
} }
private static PairFlatMapFunction<Iterator<Tuple2<String, String>>, String, PartitionCleanStat> deleteFilesFunc( private static PairFlatMapFunction<Iterator<Tuple2<String, CleanFileInfo>>, String, PartitionCleanStat>
HoodieTable table) { deleteFilesFunc(HoodieTable table) {
return (PairFlatMapFunction<Iterator<Tuple2<String, String>>, String, PartitionCleanStat>) iter -> { return (PairFlatMapFunction<Iterator<Tuple2<String, CleanFileInfo>>, String, PartitionCleanStat>) iter -> {
Map<String, PartitionCleanStat> partitionCleanStatMap = new HashMap<>(); Map<String, PartitionCleanStat> partitionCleanStatMap = new HashMap<>();
FileSystem fs = table.getMetaClient().getFs(); FileSystem fs = table.getMetaClient().getFs();
Path basePath = new Path(table.getMetaClient().getBasePath());
while (iter.hasNext()) { while (iter.hasNext()) {
Tuple2<String, String> partitionDelFileTuple = iter.next(); Tuple2<String, CleanFileInfo> partitionDelFileTuple = iter.next();
String partitionPath = partitionDelFileTuple._1(); String partitionPath = partitionDelFileTuple._1();
String delFileName = partitionDelFileTuple._2(); Path deletePath = new Path(partitionDelFileTuple._2().getFilePath());
Path deletePath = FSUtils.getPartitionPath(FSUtils.getPartitionPath(basePath, partitionPath), delFileName);
String deletePathStr = deletePath.toString(); String deletePathStr = deletePath.toString();
Boolean deletedFileResult = deleteFileAndGetResult(fs, deletePathStr); Boolean deletedFileResult = deleteFileAndGetResult(fs, deletePathStr);
if (!partitionCleanStatMap.containsKey(partitionPath)) { if (!partitionCleanStatMap.containsKey(partitionPath)) {
partitionCleanStatMap.put(partitionPath, new PartitionCleanStat(partitionPath)); partitionCleanStatMap.put(partitionPath, new PartitionCleanStat(partitionPath));
} }
boolean isBootstrapBasePathFile = partitionDelFileTuple._2().isBootstrapBaseFile();
PartitionCleanStat partitionCleanStat = partitionCleanStatMap.get(partitionPath); PartitionCleanStat partitionCleanStat = partitionCleanStatMap.get(partitionPath);
partitionCleanStat.addDeleteFilePatterns(deletePath.getName()); if (isBootstrapBasePathFile) {
partitionCleanStat.addDeletedFileResult(deletePath.getName(), deletedFileResult); // For Bootstrap Base file deletions, store the full file path.
partitionCleanStat.addDeleteFilePatterns(deletePath.toString(), true);
partitionCleanStat.addDeletedFileResult(deletePath.toString(), deletedFileResult, true);
} else {
partitionCleanStat.addDeleteFilePatterns(deletePath.getName(), false);
partitionCleanStat.addDeletedFileResult(deletePath.getName(), deletedFileResult, false);
}
} }
return partitionCleanStatMap.entrySet().stream().map(e -> new Tuple2<>(e.getKey(), e.getValue())) return partitionCleanStatMap.entrySet().stream().map(e -> new Tuple2<>(e.getKey(), e.getValue()))
.collect(Collectors.toList()).iterator(); .collect(Collectors.toList()).iterator();
@@ -145,14 +152,15 @@ public class CleanActionExecutor extends BaseActionExecutor<HoodieCleanMetadata>
*/ */
List<HoodieCleanStat> clean(JavaSparkContext jsc, HoodieCleanerPlan cleanerPlan) { List<HoodieCleanStat> clean(JavaSparkContext jsc, HoodieCleanerPlan cleanerPlan) {
int cleanerParallelism = Math.min( int cleanerParallelism = Math.min(
(int) (cleanerPlan.getFilesToBeDeletedPerPartition().values().stream().mapToInt(List::size).count()), (int) (cleanerPlan.getFilePathsToBeDeletedPerPartition().values().stream().mapToInt(List::size).count()),
config.getCleanerParallelism()); config.getCleanerParallelism());
LOG.info("Using cleanerParallelism: " + cleanerParallelism); LOG.info("Using cleanerParallelism: " + cleanerParallelism);
jsc.setJobGroup(this.getClass().getSimpleName(), "Perform cleaning of partitions"); jsc.setJobGroup(this.getClass().getSimpleName(), "Perform cleaning of partitions");
List<Tuple2<String, PartitionCleanStat>> partitionCleanStats = jsc List<Tuple2<String, PartitionCleanStat>> partitionCleanStats = jsc
.parallelize(cleanerPlan.getFilesToBeDeletedPerPartition().entrySet().stream() .parallelize(cleanerPlan.getFilePathsToBeDeletedPerPartition().entrySet().stream()
.flatMap(x -> x.getValue().stream().map(y -> new Tuple2<>(x.getKey(), y))) .flatMap(x -> x.getValue().stream().map(y -> new Tuple2<>(x.getKey(),
new CleanFileInfo(y.getFilePath(), y.getIsBootstrapBaseFile()))))
.collect(Collectors.toList()), cleanerParallelism) .collect(Collectors.toList()), cleanerParallelism)
.mapPartitionsToPair(deleteFilesFunc(table)) .mapPartitionsToPair(deleteFilesFunc(table))
.reduceByKey(PartitionCleanStat::merge).collect(); .reduceByKey(PartitionCleanStat::merge).collect();
@@ -161,7 +169,7 @@ public class CleanActionExecutor extends BaseActionExecutor<HoodieCleanMetadata>
.collect(Collectors.toMap(Tuple2::_1, Tuple2::_2)); .collect(Collectors.toMap(Tuple2::_1, Tuple2::_2));
// Return PartitionCleanStat for each partition passed. // Return PartitionCleanStat for each partition passed.
return cleanerPlan.getFilesToBeDeletedPerPartition().keySet().stream().map(partitionPath -> { return cleanerPlan.getFilePathsToBeDeletedPerPartition().keySet().stream().map(partitionPath -> {
PartitionCleanStat partitionCleanStat = partitionCleanStatsMap.containsKey(partitionPath) PartitionCleanStat partitionCleanStat = partitionCleanStatsMap.containsKey(partitionPath)
? partitionCleanStatsMap.get(partitionPath) ? partitionCleanStatsMap.get(partitionPath)
: new PartitionCleanStat(partitionPath); : new PartitionCleanStat(partitionPath);
@@ -175,21 +183,25 @@ public class CleanActionExecutor extends BaseActionExecutor<HoodieCleanMetadata>
.withDeletePathPattern(partitionCleanStat.deletePathPatterns()) .withDeletePathPattern(partitionCleanStat.deletePathPatterns())
.withSuccessfulDeletes(partitionCleanStat.successDeleteFiles()) .withSuccessfulDeletes(partitionCleanStat.successDeleteFiles())
.withFailedDeletes(partitionCleanStat.failedDeleteFiles()) .withFailedDeletes(partitionCleanStat.failedDeleteFiles())
.withDeleteBootstrapBasePathPatterns(partitionCleanStat.getDeleteBootstrapBasePathPatterns())
.withSuccessfulDeleteBootstrapBaseFiles(partitionCleanStat.getSuccessfulDeleteBootstrapBaseFiles())
.withFailedDeleteBootstrapBaseFiles(partitionCleanStat.getFailedDeleteBootstrapBaseFiles())
.build(); .build();
}).collect(Collectors.toList()); }).collect(Collectors.toList());
} }
/** /**
* Creates a Cleaner plan if there are files to be cleaned and stores them in instant file. * Creates a Cleaner plan if there are files to be cleaned and stores them in instant file.
* Cleaner Plan contains absolute file paths.
* *
* @param startCleanTime Cleaner Instant Time * @param startCleanTime Cleaner Instant Time
* @return Cleaner Plan if generated * @return Cleaner Plan if generated
*/ */
Option<HoodieCleanerPlan> requestClean(String startCleanTime) { Option<HoodieCleanerPlan> requestClean(String startCleanTime) {
final HoodieCleanerPlan cleanerPlan = requestClean(jsc); final HoodieCleanerPlan cleanerPlan = requestClean(jsc);
if ((cleanerPlan.getFilesToBeDeletedPerPartition() != null) if ((cleanerPlan.getFilePathsToBeDeletedPerPartition() != null)
&& !cleanerPlan.getFilesToBeDeletedPerPartition().isEmpty() && !cleanerPlan.getFilePathsToBeDeletedPerPartition().isEmpty()
&& cleanerPlan.getFilesToBeDeletedPerPartition().values().stream().mapToInt(List::size).sum() > 0) { && cleanerPlan.getFilePathsToBeDeletedPerPartition().values().stream().mapToInt(List::size).sum() > 0) {
// Only create cleaner plan which does some work // Only create cleaner plan which does some work
final HoodieInstant cleanInstant = new HoodieInstant(HoodieInstant.State.REQUESTED, HoodieTimeline.CLEAN_ACTION, startCleanTime); final HoodieInstant cleanInstant = new HoodieInstant(HoodieInstant.State.REQUESTED, HoodieTimeline.CLEAN_ACTION, startCleanTime);
// Save to both aux and timeline folder // Save to both aux and timeline folder
@@ -275,7 +287,7 @@ public class CleanActionExecutor extends BaseActionExecutor<HoodieCleanMetadata>
if (cleanerPlanOpt.isPresent()) { if (cleanerPlanOpt.isPresent()) {
table.getMetaClient().reloadActiveTimeline(); table.getMetaClient().reloadActiveTimeline();
HoodieCleanerPlan cleanerPlan = cleanerPlanOpt.get(); HoodieCleanerPlan cleanerPlan = cleanerPlanOpt.get();
if ((cleanerPlan.getFilesToBeDeletedPerPartition() != null) && !cleanerPlan.getFilesToBeDeletedPerPartition().isEmpty()) { if ((cleanerPlan.getFilePathsToBeDeletedPerPartition() != null) && !cleanerPlan.getFilePathsToBeDeletedPerPartition().isEmpty()) {
return runClean(table, HoodieTimeline.getCleanRequestedInstant(instantTime), cleanerPlan); return runClean(table, HoodieTimeline.getCleanRequestedInstant(instantTime), cleanerPlan);
} }
} }

View File

@@ -21,6 +21,7 @@ package org.apache.hudi.table.action.clean;
import org.apache.hudi.avro.model.HoodieCleanMetadata; import org.apache.hudi.avro.model.HoodieCleanMetadata;
import org.apache.hudi.avro.model.HoodieSavepointMetadata; import org.apache.hudi.avro.model.HoodieSavepointMetadata;
import org.apache.hudi.common.fs.FSUtils; import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.CleanFileInfo;
import org.apache.hudi.common.model.CompactionOperation; import org.apache.hudi.common.model.CompactionOperation;
import org.apache.hudi.common.model.FileSlice; import org.apache.hudi.common.model.FileSlice;
import org.apache.hudi.common.model.HoodieBaseFile; import org.apache.hudi.common.model.HoodieBaseFile;
@@ -28,12 +29,13 @@ import org.apache.hudi.common.model.HoodieCleaningPolicy;
import org.apache.hudi.common.model.HoodieCommitMetadata; import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.HoodieFileGroup; import org.apache.hudi.common.model.HoodieFileGroup;
import org.apache.hudi.common.model.HoodieFileGroupId; import org.apache.hudi.common.model.HoodieFileGroupId;
import org.apache.hudi.common.model.HoodieLogFile;
import org.apache.hudi.common.model.HoodieRecordPayload; import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.model.HoodieTableType; import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline; import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.table.timeline.TimelineMetadataUtils; import org.apache.hudi.common.table.timeline.TimelineMetadataUtils;
import org.apache.hudi.common.table.timeline.versioning.clean.CleanPlanV1MigrationHandler;
import org.apache.hudi.common.table.timeline.versioning.clean.CleanPlanV2MigrationHandler;
import org.apache.hudi.common.table.view.SyncableFileSystemView; import org.apache.hudi.common.table.view.SyncableFileSystemView;
import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.Pair; import org.apache.hudi.common.util.collection.Pair;
@@ -65,6 +67,10 @@ public class CleanPlanner<T extends HoodieRecordPayload<T>> implements Serializa
private static final Logger LOG = LogManager.getLogger(CleanPlanner.class); private static final Logger LOG = LogManager.getLogger(CleanPlanner.class);
public static final Integer CLEAN_PLAN_VERSION_1 = CleanPlanV1MigrationHandler.VERSION;
public static final Integer CLEAN_PLAN_VERSION_2 = CleanPlanV2MigrationHandler.VERSION;
public static final Integer LATEST_CLEAN_PLAN_VERSION = CLEAN_PLAN_VERSION_2;
private final SyncableFileSystemView fileSystemView; private final SyncableFileSystemView fileSystemView;
private final HoodieTimeline commitTimeline; private final HoodieTimeline commitTimeline;
private final Map<HoodieFileGroupId, CompactionOperation> fgIdToPendingCompactionOperations; private final Map<HoodieFileGroupId, CompactionOperation> fgIdToPendingCompactionOperations;
@@ -189,11 +195,11 @@ public class CleanPlanner<T extends HoodieRecordPayload<T>> implements Serializa
* policy is useful, if you are simply interested in querying the table, and you don't want too many versions for a * policy is useful, if you are simply interested in querying the table, and you don't want too many versions for a
* single file (i.e run it with versionsRetained = 1) * single file (i.e run it with versionsRetained = 1)
*/ */
private List<String> getFilesToCleanKeepingLatestVersions(String partitionPath) { private List<CleanFileInfo> getFilesToCleanKeepingLatestVersions(String partitionPath) {
LOG.info("Cleaning " + partitionPath + ", retaining latest " + config.getCleanerFileVersionsRetained() LOG.info("Cleaning " + partitionPath + ", retaining latest " + config.getCleanerFileVersionsRetained()
+ " file versions. "); + " file versions. ");
List<HoodieFileGroup> fileGroups = fileSystemView.getAllFileGroups(partitionPath).collect(Collectors.toList()); List<HoodieFileGroup> fileGroups = fileSystemView.getAllFileGroups(partitionPath).collect(Collectors.toList());
List<String> deletePaths = new ArrayList<>(); List<CleanFileInfo> deletePaths = new ArrayList<>();
// Collect all the datafiles savepointed by all the savepoints // Collect all the datafiles savepointed by all the savepoints
List<String> savepointedFiles = hoodieTable.getSavepoints().stream() List<String> savepointedFiles = hoodieTable.getSavepoints().stream()
.flatMap(this::getSavepointedDataFiles) .flatMap(this::getSavepointedDataFiles)
@@ -224,11 +230,15 @@ public class CleanPlanner<T extends HoodieRecordPayload<T>> implements Serializa
FileSlice nextSlice = fileSliceIterator.next(); FileSlice nextSlice = fileSliceIterator.next();
if (nextSlice.getBaseFile().isPresent()) { if (nextSlice.getBaseFile().isPresent()) {
HoodieBaseFile dataFile = nextSlice.getBaseFile().get(); HoodieBaseFile dataFile = nextSlice.getBaseFile().get();
deletePaths.add(dataFile.getFileName()); deletePaths.add(new CleanFileInfo(dataFile.getPath(), false));
if (dataFile.getBootstrapBaseFile().isPresent() && config.shouldCleanBootstrapBaseFile()) {
deletePaths.add(new CleanFileInfo(dataFile.getBootstrapBaseFile().get().getPath(), true));
}
} }
if (hoodieTable.getMetaClient().getTableType() == HoodieTableType.MERGE_ON_READ) { if (hoodieTable.getMetaClient().getTableType() == HoodieTableType.MERGE_ON_READ) {
// If merge on read, then clean the log files for the commits as well // If merge on read, then clean the log files for the commits as well
deletePaths.addAll(nextSlice.getLogFiles().map(HoodieLogFile::getFileName).collect(Collectors.toList())); deletePaths.addAll(nextSlice.getLogFiles().map(lf -> new CleanFileInfo(lf.getPath().toString(), false))
.collect(Collectors.toList()));
} }
} }
} }
@@ -249,10 +259,10 @@ public class CleanPlanner<T extends HoodieRecordPayload<T>> implements Serializa
* <p> * <p>
* This policy is the default. * This policy is the default.
*/ */
private List<String> getFilesToCleanKeepingLatestCommits(String partitionPath) { private List<CleanFileInfo> getFilesToCleanKeepingLatestCommits(String partitionPath) {
int commitsRetained = config.getCleanerCommitsRetained(); int commitsRetained = config.getCleanerCommitsRetained();
LOG.info("Cleaning " + partitionPath + ", retaining latest " + commitsRetained + " commits. "); LOG.info("Cleaning " + partitionPath + ", retaining latest " + commitsRetained + " commits. ");
List<String> deletePaths = new ArrayList<>(); List<CleanFileInfo> deletePaths = new ArrayList<>();
// Collect all the datafiles savepointed by all the savepoints // Collect all the datafiles savepointed by all the savepoints
List<String> savepointedFiles = hoodieTable.getSavepoints().stream() List<String> savepointedFiles = hoodieTable.getSavepoints().stream()
@@ -297,16 +307,21 @@ public class CleanPlanner<T extends HoodieRecordPayload<T>> implements Serializa
if (!isFileSliceNeededForPendingCompaction(aSlice) && HoodieTimeline if (!isFileSliceNeededForPendingCompaction(aSlice) && HoodieTimeline
.compareTimestamps(earliestCommitToRetain.getTimestamp(), HoodieTimeline.GREATER_THAN, fileCommitTime)) { .compareTimestamps(earliestCommitToRetain.getTimestamp(), HoodieTimeline.GREATER_THAN, fileCommitTime)) {
// this is a commit, that should be cleaned. // this is a commit, that should be cleaned.
aFile.ifPresent(hoodieDataFile -> deletePaths.add(hoodieDataFile.getFileName())); aFile.ifPresent(hoodieDataFile -> {
deletePaths.add(new CleanFileInfo(hoodieDataFile.getPath(), false));
if (hoodieDataFile.getBootstrapBaseFile().isPresent() && config.shouldCleanBootstrapBaseFile()) {
deletePaths.add(new CleanFileInfo(hoodieDataFile.getBootstrapBaseFile().get().getPath(), true));
}
});
if (hoodieTable.getMetaClient().getTableType() == HoodieTableType.MERGE_ON_READ) { if (hoodieTable.getMetaClient().getTableType() == HoodieTableType.MERGE_ON_READ) {
// If merge on read, then clean the log files for the commits as well // If merge on read, then clean the log files for the commits as well
deletePaths.addAll(aSlice.getLogFiles().map(HoodieLogFile::getFileName).collect(Collectors.toList())); deletePaths.addAll(aSlice.getLogFiles().map(lf -> new CleanFileInfo(lf.getPath().toString(), false))
.collect(Collectors.toList()));
} }
} }
} }
} }
} }
return deletePaths; return deletePaths;
} }
@@ -329,9 +344,9 @@ public class CleanPlanner<T extends HoodieRecordPayload<T>> implements Serializa
/** /**
* Returns files to be cleaned for the given partitionPath based on cleaning policy. * Returns files to be cleaned for the given partitionPath based on cleaning policy.
*/ */
public List<String> getDeletePaths(String partitionPath) { public List<CleanFileInfo> getDeletePaths(String partitionPath) {
HoodieCleaningPolicy policy = config.getCleanerPolicy(); HoodieCleaningPolicy policy = config.getCleanerPolicy();
List<String> deletePaths; List<CleanFileInfo> deletePaths;
if (policy == HoodieCleaningPolicy.KEEP_LATEST_COMMITS) { if (policy == HoodieCleaningPolicy.KEEP_LATEST_COMMITS) {
deletePaths = getFilesToCleanKeepingLatestCommits(partitionPath); deletePaths = getFilesToCleanKeepingLatestCommits(partitionPath);
} else if (policy == HoodieCleaningPolicy.KEEP_LATEST_FILE_VERSIONS) { } else if (policy == HoodieCleaningPolicy.KEEP_LATEST_FILE_VERSIONS) {

View File

@@ -28,21 +28,36 @@ class PartitionCleanStat implements Serializable {
private final List<String> deletePathPatterns = new ArrayList<>(); private final List<String> deletePathPatterns = new ArrayList<>();
private final List<String> successDeleteFiles = new ArrayList<>(); private final List<String> successDeleteFiles = new ArrayList<>();
private final List<String> failedDeleteFiles = new ArrayList<>(); private final List<String> failedDeleteFiles = new ArrayList<>();
private final List<String> deleteBootstrapBasePathPatterns = new ArrayList<>();
private final List<String> successfulDeleteBootstrapBaseFiles = new ArrayList<>();
private final List<String> failedDeleteBootstrapBaseFiles = new ArrayList<>();
PartitionCleanStat(String partitionPath) { PartitionCleanStat(String partitionPath) {
this.partitionPath = partitionPath; this.partitionPath = partitionPath;
} }
void addDeletedFileResult(String deletePathStr, Boolean deletedFileResult) { void addDeletedFileResult(String deletePathStr, boolean success, boolean isBootstrapBasePath) {
if (deletedFileResult) { if (success) {
successDeleteFiles.add(deletePathStr); if (isBootstrapBasePath) {
successfulDeleteBootstrapBaseFiles.add(deletePathStr);
} else {
successDeleteFiles.add(deletePathStr);
}
} else { } else {
failedDeleteFiles.add(deletePathStr); if (isBootstrapBasePath) {
failedDeleteBootstrapBaseFiles.add(deletePathStr);
} else {
failedDeleteFiles.add(deletePathStr);
}
} }
} }
void addDeleteFilePatterns(String deletePathStr) { void addDeleteFilePatterns(String deletePathStr, boolean isBootstrapBasePath) {
deletePathPatterns.add(deletePathStr); if (isBootstrapBasePath) {
deleteBootstrapBasePathPatterns.add(deletePathStr);
} else {
deletePathPatterns.add(deletePathStr);
}
} }
PartitionCleanStat merge(PartitionCleanStat other) { PartitionCleanStat merge(PartitionCleanStat other) {
@@ -53,6 +68,9 @@ class PartitionCleanStat implements Serializable {
successDeleteFiles.addAll(other.successDeleteFiles); successDeleteFiles.addAll(other.successDeleteFiles);
deletePathPatterns.addAll(other.deletePathPatterns); deletePathPatterns.addAll(other.deletePathPatterns);
failedDeleteFiles.addAll(other.failedDeleteFiles); failedDeleteFiles.addAll(other.failedDeleteFiles);
deleteBootstrapBasePathPatterns.addAll(other.deleteBootstrapBasePathPatterns);
successfulDeleteBootstrapBaseFiles.addAll(other.successfulDeleteBootstrapBaseFiles);
failedDeleteBootstrapBaseFiles.addAll(other.failedDeleteBootstrapBaseFiles);
return this; return this;
} }
@@ -67,4 +85,16 @@ class PartitionCleanStat implements Serializable {
public List<String> failedDeleteFiles() { public List<String> failedDeleteFiles() {
return failedDeleteFiles; return failedDeleteFiles;
} }
public List<String> getDeleteBootstrapBasePathPatterns() {
return deleteBootstrapBasePathPatterns;
}
public List<String> getSuccessfulDeleteBootstrapBaseFiles() {
return successfulDeleteBootstrapBaseFiles;
}
public List<String> getFailedDeleteBootstrapBaseFiles() {
return failedDeleteBootstrapBaseFiles;
}
} }

View File

@@ -18,14 +18,19 @@
package org.apache.hudi.table; package org.apache.hudi.table;
import org.apache.hudi.avro.model.HoodieActionInstant;
import org.apache.hudi.avro.model.HoodieCleanMetadata; import org.apache.hudi.avro.model.HoodieCleanMetadata;
import org.apache.hudi.avro.model.HoodieCleanPartitionMetadata; import org.apache.hudi.avro.model.HoodieCleanPartitionMetadata;
import org.apache.hudi.avro.model.HoodieCleanerPlan;
import org.apache.hudi.avro.model.HoodieCompactionPlan; import org.apache.hudi.avro.model.HoodieCompactionPlan;
import org.apache.hudi.avro.model.HoodieFileStatus;
import org.apache.hudi.client.HoodieWriteClient; import org.apache.hudi.client.HoodieWriteClient;
import org.apache.hudi.client.WriteStatus; import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.common.HoodieCleanStat; import org.apache.hudi.common.HoodieCleanStat;
import org.apache.hudi.common.bootstrap.TestBootstrapIndex;
import org.apache.hudi.common.fs.ConsistencyGuardConfig; import org.apache.hudi.common.fs.ConsistencyGuardConfig;
import org.apache.hudi.common.fs.FSUtils; import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.BootstrapFileMapping;
import org.apache.hudi.common.model.FileSlice; import org.apache.hudi.common.model.FileSlice;
import org.apache.hudi.common.model.HoodieBaseFile; import org.apache.hudi.common.model.HoodieBaseFile;
import org.apache.hudi.common.model.HoodieCleaningPolicy; import org.apache.hudi.common.model.HoodieCleaningPolicy;
@@ -42,6 +47,8 @@ import org.apache.hudi.common.table.timeline.HoodieInstant.State;
import org.apache.hudi.common.table.timeline.HoodieTimeline; import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.table.timeline.TimelineMetadataUtils; import org.apache.hudi.common.table.timeline.TimelineMetadataUtils;
import org.apache.hudi.common.table.timeline.versioning.clean.CleanMetadataMigrator; import org.apache.hudi.common.table.timeline.versioning.clean.CleanMetadataMigrator;
import org.apache.hudi.common.table.timeline.versioning.clean.CleanPlanMigrator;
import org.apache.hudi.common.table.timeline.versioning.clean.CleanPlanV1MigrationHandler;
import org.apache.hudi.common.table.view.TableFileSystemView; import org.apache.hudi.common.table.view.TableFileSystemView;
import org.apache.hudi.common.testutils.FileSystemTestUtils; import org.apache.hudi.common.testutils.FileSystemTestUtils;
import org.apache.hudi.common.testutils.HoodieTestDataGenerator; import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
@@ -55,6 +62,7 @@ import org.apache.hudi.config.HoodieCompactionConfig;
import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieIOException; import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.index.HoodieIndex; import org.apache.hudi.index.HoodieIndex;
import org.apache.hudi.table.action.clean.CleanPlanner;
import org.apache.hudi.testutils.HoodieClientTestBase; import org.apache.hudi.testutils.HoodieClientTestBase;
import org.apache.hudi.testutils.HoodieClientTestUtils; import org.apache.hudi.testutils.HoodieClientTestUtils;
@@ -64,6 +72,7 @@ import org.apache.log4j.Logger;
import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaRDD;
import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Test;
import java.io.File;
import java.io.IOException; import java.io.IOException;
import java.nio.charset.StandardCharsets; import java.nio.charset.StandardCharsets;
import java.nio.file.Paths; import java.nio.file.Paths;
@@ -76,6 +85,7 @@ import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Set; import java.util.Set;
import java.util.TreeSet; import java.util.TreeSet;
import java.util.UUID;
import java.util.function.Predicate; import java.util.function.Predicate;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import java.util.stream.Stream; import java.util.stream.Stream;
@@ -467,7 +477,7 @@ public class TestCleaner extends HoodieClientTestBase {
}); });
} }
return cleanMetadata1.getPartitionMetadata().values().stream() Map<String, HoodieCleanStat> cleanStatMap = cleanMetadata1.getPartitionMetadata().values().stream()
.map(x -> new HoodieCleanStat.Builder().withPartitionPath(x.getPartitionPath()) .map(x -> new HoodieCleanStat.Builder().withPartitionPath(x.getPartitionPath())
.withFailedDeletes(x.getFailedDeleteFiles()).withSuccessfulDeletes(x.getSuccessDeleteFiles()) .withFailedDeletes(x.getFailedDeleteFiles()).withSuccessfulDeletes(x.getSuccessDeleteFiles())
.withPolicy(HoodieCleaningPolicy.valueOf(x.getPolicy())).withDeletePathPattern(x.getDeletePathPatterns()) .withPolicy(HoodieCleaningPolicy.valueOf(x.getPolicy())).withDeletePathPattern(x.getDeletePathPatterns())
@@ -475,88 +485,144 @@ public class TestCleaner extends HoodieClientTestBase {
? new HoodieInstant(State.COMPLETED, HoodieTimeline.COMMIT_ACTION, "000") ? new HoodieInstant(State.COMPLETED, HoodieTimeline.COMMIT_ACTION, "000")
: null)) : null))
.build()) .build())
.collect(Collectors.toList()); .collect(Collectors.toMap(HoodieCleanStat::getPartitionPath, x -> x));
cleanMetadata1.getBootstrapPartitionMetadata().values().forEach(x -> {
HoodieCleanStat s = cleanStatMap.get(x.getPartitionPath());
cleanStatMap.put(x.getPartitionPath(), new HoodieCleanStat.Builder().withPartitionPath(x.getPartitionPath())
.withFailedDeletes(s.getFailedDeleteFiles()).withSuccessfulDeletes(s.getSuccessDeleteFiles())
.withPolicy(HoodieCleaningPolicy.valueOf(x.getPolicy())).withDeletePathPattern(s.getDeletePathPatterns())
.withEarliestCommitRetained(Option.ofNullable(s.getEarliestCommitToRetain())
.map(y -> new HoodieInstant(State.COMPLETED, HoodieTimeline.COMMIT_ACTION, y)))
.withSuccessfulDeleteBootstrapBaseFiles(x.getSuccessDeleteFiles())
.withFailedDeleteBootstrapBaseFiles(x.getFailedDeleteFiles())
.withDeleteBootstrapBasePathPatterns(x.getDeletePathPatterns()).build());
});
return new ArrayList<>(cleanStatMap.values());
}
/**
* Test HoodieTable.clean() Cleaning by versions for COW table.
*/
@Test
public void testKeepLatestFileVersions() throws IOException {
testKeepLatestFileVersions(false);
}
/**
* Test HoodieTable.clean() Cleaning by version logic for COW table with Bootstrap source file clean enable.
*/
@Test
public void testBootstrapSourceFileCleanWithKeepLatestFileVersions() throws IOException {
testKeepLatestFileVersions(true);
} }
/** /**
* Test HoodieTable.clean() Cleaning by versions logic. * Test HoodieTable.clean() Cleaning by versions logic.
*/ */
@Test public void testKeepLatestFileVersions(Boolean enableBootstrapSourceClean) throws IOException {
public void testKeepLatestFileVersions() throws IOException {
HoodieWriteConfig config = HoodieWriteConfig config =
HoodieWriteConfig.newBuilder().withPath(basePath).withAssumeDatePartitioning(true) HoodieWriteConfig.newBuilder().withPath(basePath).withAssumeDatePartitioning(true)
.withCompactionConfig(HoodieCompactionConfig.newBuilder() .withCompactionConfig(HoodieCompactionConfig.newBuilder()
.withCleanBootstrapBaseFileEnabled(enableBootstrapSourceClean)
.withCleanerPolicy(HoodieCleaningPolicy.KEEP_LATEST_FILE_VERSIONS).retainFileVersions(1).build()) .withCleanerPolicy(HoodieCleaningPolicy.KEEP_LATEST_FILE_VERSIONS).retainFileVersions(1).build())
.build(); .build();
// make 1 commit, with 1 file per partition // make 1 commit, with 1 file per partition
HoodieTestUtils.createCommitFiles(basePath, "000"); HoodieTestUtils.createCommitFiles(basePath, "00000000000001");
String file1P0C0 = Map<String, List<BootstrapFileMapping>> bootstrapMapping = enableBootstrapSourceClean ? generateBootstrapIndexAndSourceData() : null;
HoodieTestUtils.createNewDataFile(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "000");
String file1P1C0 = String file1P0C0 = enableBootstrapSourceClean ? bootstrapMapping.get(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH).get(0).getFileId()
HoodieTestUtils.createNewDataFile(basePath, HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH, "000"); : UUID.randomUUID().toString();
metaClient = HoodieTableMetaClient.reload(metaClient); String file1P1C0 = enableBootstrapSourceClean ? bootstrapMapping.get(HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH).get(0).getFileId()
: UUID.randomUUID().toString();
HoodieTestUtils
.createDataFile(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "00000000000001", file1P0C0); // insert
HoodieTestUtils
.createDataFile(basePath, HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH, "00000000000001", file1P1C0); // insert
List<HoodieCleanStat> hoodieCleanStatsOne = runCleaner(config); List<HoodieCleanStat> hoodieCleanStatsOne = runCleaner(config);
assertEquals(0, hoodieCleanStatsOne.size(), "Must not clean any files"); assertEquals(0, hoodieCleanStatsOne.size(), "Must not clean any files");
assertTrue(HoodieTestUtils.doesDataFileExist(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "000", assertTrue(HoodieTestUtils.doesDataFileExist(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "00000000000001",
file1P0C0)); file1P0C0));
assertTrue(HoodieTestUtils.doesDataFileExist(basePath, HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH, "000", assertTrue(HoodieTestUtils.doesDataFileExist(basePath, HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH, "00000000000001",
file1P1C0)); file1P1C0));
// make next commit, with 1 insert & 1 update per partition // make next commit, with 1 insert & 1 update per partition
HoodieTestUtils.createCommitFiles(basePath, "001"); HoodieTestUtils.createCommitFiles(basePath, "00000000000002");
metaClient = HoodieTableMetaClient.reload(metaClient); metaClient = HoodieTableMetaClient.reload(metaClient);
String file2P0C1 = String file2P0C1 =
HoodieTestUtils.createNewDataFile(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "001"); // insert HoodieTestUtils.createNewDataFile(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "00000000000002"); // insert
String file2P1C1 = String file2P1C1 =
HoodieTestUtils.createNewDataFile(basePath, HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH, "001"); // insert HoodieTestUtils.createNewDataFile(basePath, HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH, "00000000000002"); // insert
HoodieTestUtils.createDataFile(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "001", file1P0C0); // update HoodieTestUtils.createDataFile(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "00000000000002", file1P0C0); // update
HoodieTestUtils.createDataFile(basePath, HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH, "001", file1P1C0); // update HoodieTestUtils.createDataFile(basePath, HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH, "00000000000002", file1P1C0); // update
List<HoodieCleanStat> hoodieCleanStatsTwo = runCleaner(config); List<HoodieCleanStat> hoodieCleanStatsTwo = runCleaner(config);
assertEquals(1, // enableBootstrapSourceClean would delete the bootstrap base file as the same time
getCleanStat(hoodieCleanStatsTwo, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH).getSuccessDeleteFiles() HoodieCleanStat cleanStat = getCleanStat(hoodieCleanStatsTwo, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH);
.size(), "Must clean 1 file"); assertEquals(enableBootstrapSourceClean ? 2 : 1, cleanStat.getSuccessDeleteFiles().size()
assertEquals(1, + (cleanStat.getSuccessDeleteBootstrapBaseFiles() == null ? 0
getCleanStat(hoodieCleanStatsTwo, HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH).getSuccessDeleteFiles() : cleanStat.getSuccessDeleteBootstrapBaseFiles().size()), "Must clean at least 1 file");
.size(), "Must clean 1 file"); if (enableBootstrapSourceClean) {
assertTrue(HoodieTestUtils.doesDataFileExist(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "001", HoodieFileStatus fstatus =
bootstrapMapping.get(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH).get(0).getBoostrapFileStatus();
// This ensures full path is recorded in metadata.
assertTrue(cleanStat.getSuccessDeleteBootstrapBaseFiles().contains(fstatus.getPath().getUri()),
"Successful delete files were " + cleanStat.getSuccessDeleteBootstrapBaseFiles()
+ " but did not contain " + fstatus.getPath().getUri());
assertFalse(new File(bootstrapMapping.get(
HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH).get(0).getBoostrapFileStatus().getPath().getUri()).exists());
}
cleanStat = getCleanStat(hoodieCleanStatsTwo, HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH);
assertEquals(enableBootstrapSourceClean ? 2 : 1, cleanStat.getSuccessDeleteFiles().size()
+ (cleanStat.getSuccessDeleteBootstrapBaseFiles() == null ? 0
: cleanStat.getSuccessDeleteBootstrapBaseFiles().size()), "Must clean at least 1 file");
assertTrue(HoodieTestUtils.doesDataFileExist(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "00000000000002",
file2P0C1)); file2P0C1));
assertTrue(HoodieTestUtils.doesDataFileExist(basePath, HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH, "001", assertTrue(HoodieTestUtils.doesDataFileExist(basePath, HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH, "00000000000002",
file2P1C1)); file2P1C1));
assertFalse(HoodieTestUtils.doesDataFileExist(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "000", assertFalse(HoodieTestUtils.doesDataFileExist(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "00000000000001",
file1P0C0)); file1P0C0));
if (enableBootstrapSourceClean) {
HoodieFileStatus fstatus =
bootstrapMapping.get(HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH).get(0).getBoostrapFileStatus();
// This ensures full path is recorded in metadata.
assertTrue(cleanStat.getSuccessDeleteBootstrapBaseFiles().contains(fstatus.getPath().getUri()),
"Successful delete files were " + cleanStat.getSuccessDeleteBootstrapBaseFiles()
+ " but did not contain " + fstatus.getPath().getUri());
assertFalse(new File(bootstrapMapping.get(
HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH).get(0).getBoostrapFileStatus().getPath().getUri()).exists());
}
assertFalse(HoodieTestUtils.doesDataFileExist(basePath, HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH, assertFalse(HoodieTestUtils.doesDataFileExist(basePath, HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH,
"000", file1P1C0)); "00000000000001", file1P1C0));
// make next commit, with 2 updates to existing files, and 1 insert // make next commit, with 2 updates to existing files, and 1 insert
HoodieTestUtils.createCommitFiles(basePath, "002"); HoodieTestUtils.createCommitFiles(basePath, "00000000000003");
metaClient = HoodieTableMetaClient.reload(metaClient); metaClient = HoodieTableMetaClient.reload(metaClient);
HoodieTestUtils.createDataFile(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "002", file1P0C0); // update HoodieTestUtils.createDataFile(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "00000000000003", file1P0C0); // update
HoodieTestUtils.createDataFile(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "002", file2P0C1); // update HoodieTestUtils.createDataFile(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "00000000000003", file2P0C1); // update
String file3P0C2 = String file3P0C2 =
HoodieTestUtils.createNewDataFile(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "002"); HoodieTestUtils.createNewDataFile(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "00000000000003");
List<HoodieCleanStat> hoodieCleanStatsThree = runCleaner(config); List<HoodieCleanStat> hoodieCleanStatsThree = runCleaner(config);
assertEquals(2, assertEquals(2,
getCleanStat(hoodieCleanStatsThree, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH) getCleanStat(hoodieCleanStatsThree, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH)
.getSuccessDeleteFiles().size(), "Must clean two files"); .getSuccessDeleteFiles().size(), "Must clean two files");
assertFalse(HoodieTestUtils.doesDataFileExist(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "001", assertFalse(HoodieTestUtils.doesDataFileExist(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "00000000000002",
file1P0C0)); file1P0C0));
assertFalse(HoodieTestUtils.doesDataFileExist(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "001", assertFalse(HoodieTestUtils.doesDataFileExist(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "00000000000002",
file2P0C1)); file2P0C1));
assertTrue(HoodieTestUtils.doesDataFileExist(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "002", assertTrue(HoodieTestUtils.doesDataFileExist(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "00000000000003",
file3P0C2)); file3P0C2));
// No cleaning on partially written file, with no commit. // No cleaning on partially written file, with no commit.
HoodieTestUtils.createDataFile(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "003", file3P0C2); // update HoodieTestUtils.createDataFile(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "00000000000004", file3P0C2); // update
List<HoodieCleanStat> hoodieCleanStatsFour = runCleaner(config); List<HoodieCleanStat> hoodieCleanStatsFour = runCleaner(config);
assertEquals(0, hoodieCleanStatsFour.size(), "Must not clean any files"); assertEquals(0, hoodieCleanStatsFour.size(), "Must not clean any files");
assertTrue(HoodieTestUtils.doesDataFileExist(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "002", assertTrue(HoodieTestUtils.doesDataFileExist(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "00000000000003",
file3P0C2)); file3P0C2));
} }
@@ -604,7 +670,7 @@ public class TestCleaner extends HoodieClientTestBase {
} }
@Test @Test
public void testUpgradeDowngrade() { public void testCleanMetadataUpgradeDowngrade() {
String instantTime = "000"; String instantTime = "000";
String partition1 = DEFAULT_PARTITION_PATHS[0]; String partition1 = DEFAULT_PARTITION_PATHS[0];
@@ -693,6 +759,68 @@ public class TestCleaner extends HoodieClientTestBase {
assertEquals(policies1, policies2); assertEquals(policies1, policies2);
} }
@Test
public void testCleanPlanUpgradeDowngrade() {
String instantTime = "000";
String partition1 = DEFAULT_PARTITION_PATHS[0];
String partition2 = DEFAULT_PARTITION_PATHS[1];
String fileName1 = "data1_1_000.parquet";
String fileName2 = "data2_1_000.parquet";
Map<String, List<String>> filesToBeCleanedPerPartition = new HashMap<>();
filesToBeCleanedPerPartition.put(partition1, Arrays.asList(fileName1));
filesToBeCleanedPerPartition.put(partition2, Arrays.asList(fileName2));
HoodieCleanerPlan version1Plan =
HoodieCleanerPlan.newBuilder().setEarliestInstantToRetain(HoodieActionInstant.newBuilder()
.setAction(HoodieTimeline.COMMIT_ACTION)
.setTimestamp(instantTime).setState(State.COMPLETED.name()).build())
.setPolicy(HoodieCleaningPolicy.KEEP_LATEST_COMMITS.name())
.setFilesToBeDeletedPerPartition(filesToBeCleanedPerPartition)
.setVersion(CleanPlanV1MigrationHandler.VERSION)
.build();
// Upgrade and Verify version 2 plan
HoodieCleanerPlan version2Plan =
new CleanPlanMigrator(metaClient).upgradeToLatest(version1Plan, version1Plan.getVersion());
assertEquals(version1Plan.getEarliestInstantToRetain(), version2Plan.getEarliestInstantToRetain());
assertEquals(version1Plan.getPolicy(), version2Plan.getPolicy());
assertEquals(CleanPlanner.LATEST_CLEAN_PLAN_VERSION, version2Plan.getVersion());
// Deprecated Field is not used.
assertEquals(0, version2Plan.getFilesToBeDeletedPerPartition().size());
assertEquals(version1Plan.getFilesToBeDeletedPerPartition().size(),
version2Plan.getFilePathsToBeDeletedPerPartition().size());
assertEquals(version1Plan.getFilesToBeDeletedPerPartition().get(partition1).size(),
version2Plan.getFilePathsToBeDeletedPerPartition().get(partition1).size());
assertEquals(version1Plan.getFilesToBeDeletedPerPartition().get(partition2).size(),
version2Plan.getFilePathsToBeDeletedPerPartition().get(partition2).size());
assertEquals(new Path(FSUtils.getPartitionPath(metaClient.getBasePath(), partition1), fileName1).toString(),
version2Plan.getFilePathsToBeDeletedPerPartition().get(partition1).get(0).getFilePath());
assertEquals(new Path(FSUtils.getPartitionPath(metaClient.getBasePath(), partition2), fileName2).toString(),
version2Plan.getFilePathsToBeDeletedPerPartition().get(partition2).get(0).getFilePath());
// Downgrade and verify version 1 plan
HoodieCleanerPlan gotVersion1Plan = new CleanPlanMigrator(metaClient).migrateToVersion(version2Plan,
version2Plan.getVersion(), version1Plan.getVersion());
assertEquals(version1Plan.getEarliestInstantToRetain(), gotVersion1Plan.getEarliestInstantToRetain());
assertEquals(version1Plan.getPolicy(), version2Plan.getPolicy());
assertEquals(version1Plan.getVersion(), gotVersion1Plan.getVersion());
assertEquals(version1Plan.getFilesToBeDeletedPerPartition().size(),
gotVersion1Plan.getFilesToBeDeletedPerPartition().size());
assertEquals(version1Plan.getFilesToBeDeletedPerPartition().get(partition1).size(),
gotVersion1Plan.getFilesToBeDeletedPerPartition().get(partition1).size());
assertEquals(version1Plan.getFilesToBeDeletedPerPartition().get(partition2).size(),
gotVersion1Plan.getFilesToBeDeletedPerPartition().get(partition2).size());
assertEquals(version1Plan.getFilesToBeDeletedPerPartition().get(partition1).get(0),
gotVersion1Plan.getFilesToBeDeletedPerPartition().get(partition1).get(0));
assertEquals(version1Plan.getFilesToBeDeletedPerPartition().get(partition2).get(0),
gotVersion1Plan.getFilesToBeDeletedPerPartition().get(partition2).get(0));
assertTrue(gotVersion1Plan.getFilePathsToBeDeletedPerPartition().isEmpty());
assertNull(version1Plan.getFilePathsToBeDeletedPerPartition());
}
private void testCleanMetadataPathEquality(HoodieCleanMetadata metadata, Map<String, Tuple3> expected) { private void testCleanMetadataPathEquality(HoodieCleanMetadata metadata, Map<String, Tuple3> expected) {
Map<String, HoodieCleanPartitionMetadata> partitionMetadataMap = metadata.getPartitionMetadata(); Map<String, HoodieCleanPartitionMetadata> partitionMetadataMap = metadata.getPartitionMetadata();
@@ -708,47 +836,62 @@ public class TestCleaner extends HoodieClientTestBase {
} }
/** /**
* Test HoodieTable.clean() Cleaning by commit logic for MOR table with Log files. * Test HoodieTable.clean() Cleaning by commit logic for COW table.
*/ */
@Test @Test
public void testKeepLatestCommits() throws IOException { public void testKeepLatestCommits() throws IOException {
testKeepLatestCommits(false, false); testKeepLatestCommits(false, false, false);
} }
/** /**
* Test HoodieTable.clean() Cleaning by commit logic for MOR table with Log files. Here the operations are simulated * Test HoodieTable.clean() Cleaning by commit logic for COW table. Here the operations are simulated
* such that first clean attempt failed after files were cleaned and a subsequent cleanup succeeds. * such that first clean attempt failed after files were cleaned and a subsequent cleanup succeeds.
*/ */
@Test @Test
public void testKeepLatestCommitsWithFailureRetry() throws IOException { public void testKeepLatestCommitsWithFailureRetry() throws IOException {
testKeepLatestCommits(true, false); testKeepLatestCommits(true, false, false);
} }
/** /**
* Test HoodieTable.clean() Cleaning by commit logic for MOR table with Log files. * Test HoodieTable.clean() Cleaning by commit logic for COW table.
*/ */
@Test @Test
public void testKeepLatestCommitsIncrMode() throws IOException { public void testKeepLatestCommitsIncrMode() throws IOException {
testKeepLatestCommits(false, true); testKeepLatestCommits(false, true, false);
} }
/** /**
* Test HoodieTable.clean() Cleaning by commit logic for MOR table with Log files. * Test HoodieTable.clean() Cleaning by commit logic for COW table with Bootstrap source file clean enable.
*/ */
private void testKeepLatestCommits(boolean simulateFailureRetry, boolean enableIncrementalClean) throws IOException { @Test
public void testBootstrapSourceFileCleanWithKeepLatestCommits() throws IOException {
testKeepLatestCommits(false, false, true);
}
/**
* Test HoodieTable.clean() Cleaning by commit logic for COW table.
*/
private void testKeepLatestCommits(boolean simulateFailureRetry, boolean enableIncrementalClean, boolean enableBootstrapSourceClean) throws IOException {
HoodieWriteConfig config = HoodieWriteConfig.newBuilder().withPath(basePath).withAssumeDatePartitioning(true) HoodieWriteConfig config = HoodieWriteConfig.newBuilder().withPath(basePath).withAssumeDatePartitioning(true)
.withCompactionConfig(HoodieCompactionConfig.newBuilder() .withCompactionConfig(HoodieCompactionConfig.newBuilder()
.withIncrementalCleaningMode(enableIncrementalClean) .withIncrementalCleaningMode(enableIncrementalClean)
.withCleanBootstrapBaseFileEnabled(enableBootstrapSourceClean)
.withCleanerPolicy(HoodieCleaningPolicy.KEEP_LATEST_COMMITS).retainCommits(2).build()) .withCleanerPolicy(HoodieCleaningPolicy.KEEP_LATEST_COMMITS).retainCommits(2).build())
.build(); .build();
// make 1 commit, with 1 file per partition Map<String, List<BootstrapFileMapping>> bootstrapMapping = enableBootstrapSourceClean ? generateBootstrapIndexAndSourceData() : null;
HoodieTestUtils.createInflightCommitFiles(basePath, "000");
String file1P0C0 = // make 1 commit, with 1 file per partition
HoodieTestUtils.createNewDataFile(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "000"); HoodieTestUtils.createInflightCommitFiles(basePath, "00000000000001");
String file1P1C0 =
HoodieTestUtils.createNewDataFile(basePath, HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH, "000"); String file1P0C0 = enableBootstrapSourceClean ? bootstrapMapping.get(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH).get(0).getFileId()
: UUID.randomUUID().toString();
String file1P1C0 = enableBootstrapSourceClean ? bootstrapMapping.get(HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH).get(0).getFileId()
: UUID.randomUUID().toString();
HoodieTestUtils
.createDataFile(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "00000000000001", file1P0C0); // insert
HoodieTestUtils
.createDataFile(basePath, HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH, "00000000000001", file1P1C0); // insert
HoodieCommitMetadata commitMetadata = generateCommitMetadata( HoodieCommitMetadata commitMetadata = generateCommitMetadata(
Collections.unmodifiableMap(new HashMap<String, List<String>>() { Collections.unmodifiableMap(new HashMap<String, List<String>>() {
@@ -759,32 +902,32 @@ public class TestCleaner extends HoodieClientTestBase {
}) })
); );
metaClient.getActiveTimeline().saveAsComplete( metaClient.getActiveTimeline().saveAsComplete(
new HoodieInstant(State.INFLIGHT, HoodieTimeline.COMMIT_ACTION, "000"), new HoodieInstant(State.INFLIGHT, HoodieTimeline.COMMIT_ACTION, "00000000000001"),
Option.of(commitMetadata.toJsonString().getBytes(StandardCharsets.UTF_8))); Option.of(commitMetadata.toJsonString().getBytes(StandardCharsets.UTF_8)));
metaClient = HoodieTableMetaClient.reload(metaClient); metaClient = HoodieTableMetaClient.reload(metaClient);
List<HoodieCleanStat> hoodieCleanStatsOne = runCleaner(config, simulateFailureRetry); List<HoodieCleanStat> hoodieCleanStatsOne = runCleaner(config, simulateFailureRetry);
assertEquals(0, hoodieCleanStatsOne.size(), "Must not scan any partitions and clean any files"); assertEquals(0, hoodieCleanStatsOne.size(), "Must not scan any partitions and clean any files");
assertTrue(HoodieTestUtils.doesDataFileExist(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "000", assertTrue(HoodieTestUtils.doesDataFileExist(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "00000000000001",
file1P0C0)); file1P0C0));
assertTrue(HoodieTestUtils.doesDataFileExist(basePath, HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH, "000", assertTrue(HoodieTestUtils.doesDataFileExist(basePath, HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH, "00000000000001",
file1P1C0)); file1P1C0));
// make next commit, with 1 insert & 1 update per partition // make next commit, with 1 insert & 1 update per partition
HoodieTestUtils.createInflightCommitFiles(basePath, "001"); HoodieTestUtils.createInflightCommitFiles(basePath, "00000000000002");
metaClient = HoodieTableMetaClient.reload(metaClient); metaClient = HoodieTableMetaClient.reload(metaClient);
String file2P0C1 = String file2P0C1 =
HoodieTestUtils HoodieTestUtils
.createNewDataFile(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "001"); // insert .createNewDataFile(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "00000000000002"); // insert
String file2P1C1 = String file2P1C1 =
HoodieTestUtils HoodieTestUtils
.createNewDataFile(basePath, HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH, "001"); // insert .createNewDataFile(basePath, HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH, "00000000000002"); // insert
HoodieTestUtils HoodieTestUtils
.createDataFile(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "001", file1P0C0); // update .createDataFile(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "00000000000002", file1P0C0); // update
HoodieTestUtils HoodieTestUtils
.createDataFile(basePath, HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH, "001", file1P1C0); // update .createDataFile(basePath, HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH, "00000000000002", file1P1C0); // update
commitMetadata = generateCommitMetadata(new HashMap<String, List<String>>() { commitMetadata = generateCommitMetadata(new HashMap<String, List<String>>() {
{ {
put(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, CollectionUtils.createImmutableList(file1P0C0, file2P0C1)); put(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, CollectionUtils.createImmutableList(file1P0C0, file2P0C1));
@@ -792,99 +935,132 @@ public class TestCleaner extends HoodieClientTestBase {
} }
}); });
metaClient.getActiveTimeline().saveAsComplete( metaClient.getActiveTimeline().saveAsComplete(
new HoodieInstant(State.INFLIGHT, HoodieTimeline.COMMIT_ACTION, "001"), new HoodieInstant(State.INFLIGHT, HoodieTimeline.COMMIT_ACTION, "00000000000002"),
Option.of(commitMetadata.toJsonString().getBytes(StandardCharsets.UTF_8))); Option.of(commitMetadata.toJsonString().getBytes(StandardCharsets.UTF_8)));
List<HoodieCleanStat> hoodieCleanStatsTwo = runCleaner(config, simulateFailureRetry); List<HoodieCleanStat> hoodieCleanStatsTwo = runCleaner(config, simulateFailureRetry);
assertEquals(0, hoodieCleanStatsTwo.size(), "Must not scan any partitions and clean any files"); assertEquals(0, hoodieCleanStatsTwo.size(), "Must not scan any partitions and clean any files");
assertTrue(HoodieTestUtils.doesDataFileExist(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "001", assertTrue(HoodieTestUtils.doesDataFileExist(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "00000000000002",
file2P0C1)); file2P0C1));
assertTrue(HoodieTestUtils.doesDataFileExist(basePath, HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH, "001", assertTrue(HoodieTestUtils.doesDataFileExist(basePath, HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH, "00000000000002",
file2P1C1)); file2P1C1));
assertTrue(HoodieTestUtils.doesDataFileExist(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "000", assertTrue(HoodieTestUtils.doesDataFileExist(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "00000000000001",
file1P0C0)); file1P0C0));
assertTrue(HoodieTestUtils.doesDataFileExist(basePath, HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH, "000", assertTrue(HoodieTestUtils.doesDataFileExist(basePath, HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH, "00000000000001",
file1P1C0)); file1P1C0));
// make next commit, with 2 updates to existing files, and 1 insert // make next commit, with 2 updates to existing files, and 1 insert
HoodieTestUtils.createInflightCommitFiles(basePath, "002"); HoodieTestUtils.createInflightCommitFiles(basePath, "00000000000003");
metaClient = HoodieTableMetaClient.reload(metaClient); metaClient = HoodieTableMetaClient.reload(metaClient);
HoodieTestUtils HoodieTestUtils
.createDataFile(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "002", file1P0C0); // update .createDataFile(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "00000000000003", file1P0C0); // update
HoodieTestUtils HoodieTestUtils
.createDataFile(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "002", file2P0C1); // update .createDataFile(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "00000000000003", file2P0C1); // update
String file3P0C2 = String file3P0C2 =
HoodieTestUtils.createNewDataFile(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "002"); HoodieTestUtils.createNewDataFile(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "00000000000003");
commitMetadata = generateCommitMetadata(CollectionUtils commitMetadata = generateCommitMetadata(CollectionUtils
.createImmutableMap(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, .createImmutableMap(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH,
CollectionUtils.createImmutableList(file1P0C0, file2P0C1, file3P0C2))); CollectionUtils.createImmutableList(file1P0C0, file2P0C1, file3P0C2)));
metaClient.getActiveTimeline().saveAsComplete( metaClient.getActiveTimeline().saveAsComplete(
new HoodieInstant(State.INFLIGHT, HoodieTimeline.COMMIT_ACTION, "002"), new HoodieInstant(State.INFLIGHT, HoodieTimeline.COMMIT_ACTION, "00000000000003"),
Option.of(commitMetadata.toJsonString().getBytes(StandardCharsets.UTF_8))); Option.of(commitMetadata.toJsonString().getBytes(StandardCharsets.UTF_8)));
List<HoodieCleanStat> hoodieCleanStatsThree = runCleaner(config, simulateFailureRetry); List<HoodieCleanStat> hoodieCleanStatsThree = runCleaner(config, simulateFailureRetry);
assertEquals(0, hoodieCleanStatsThree.size(), assertEquals(0, hoodieCleanStatsThree.size(),
"Must not clean any file. We have to keep 1 version before the latest commit time to keep"); "Must not clean any file. We have to keep 1 version before the latest commit time to keep");
assertTrue(HoodieTestUtils.doesDataFileExist(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "000", assertTrue(HoodieTestUtils.doesDataFileExist(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "00000000000001",
file1P0C0)); file1P0C0));
// make next commit, with 2 updates to existing files, and 1 insert // make next commit, with 2 updates to existing files, and 1 insert
HoodieTestUtils.createInflightCommitFiles(basePath, "003"); HoodieTestUtils.createInflightCommitFiles(basePath, "00000000000004");
metaClient = HoodieTableMetaClient.reload(metaClient); metaClient = HoodieTableMetaClient.reload(metaClient);
HoodieTestUtils HoodieTestUtils
.createDataFile(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "003", file1P0C0); // update .createDataFile(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "00000000000004", file1P0C0); // update
HoodieTestUtils HoodieTestUtils
.createDataFile(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "003", file2P0C1); // update .createDataFile(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "00000000000004", file2P0C1); // update
String file4P0C3 = String file4P0C3 =
HoodieTestUtils.createNewDataFile(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "003"); HoodieTestUtils.createNewDataFile(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "00000000000004");
commitMetadata = generateCommitMetadata(CollectionUtils.createImmutableMap( commitMetadata = generateCommitMetadata(CollectionUtils.createImmutableMap(
HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, CollectionUtils.createImmutableList(file1P0C0, file2P0C1, file4P0C3))); HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, CollectionUtils.createImmutableList(file1P0C0, file2P0C1, file4P0C3)));
metaClient.getActiveTimeline().saveAsComplete( metaClient.getActiveTimeline().saveAsComplete(
new HoodieInstant(State.INFLIGHT, HoodieTimeline.COMMIT_ACTION, "003"), new HoodieInstant(State.INFLIGHT, HoodieTimeline.COMMIT_ACTION, "00000000000004"),
Option.of(commitMetadata.toJsonString().getBytes(StandardCharsets.UTF_8))); Option.of(commitMetadata.toJsonString().getBytes(StandardCharsets.UTF_8)));
List<HoodieCleanStat> hoodieCleanStatsFour = runCleaner(config, simulateFailureRetry); List<HoodieCleanStat> hoodieCleanStatsFour = runCleaner(config, simulateFailureRetry);
assertEquals(1, // enableBootstrapSourceClean would delete the bootstrap base file as the same time
getCleanStat(hoodieCleanStatsFour, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH).getSuccessDeleteFiles() HoodieCleanStat partitionCleanStat =
.size(), "Must not clean one old file"); getCleanStat(hoodieCleanStatsFour, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH);
assertFalse(HoodieTestUtils.doesDataFileExist(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "000", assertEquals(enableBootstrapSourceClean ? 2 : 1, partitionCleanStat.getSuccessDeleteFiles().size()
+ (partitionCleanStat.getSuccessDeleteBootstrapBaseFiles() == null ? 0
: partitionCleanStat.getSuccessDeleteBootstrapBaseFiles().size()), "Must clean at least one old file");
assertFalse(HoodieTestUtils.doesDataFileExist(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "00000000000001",
file1P0C0)); file1P0C0));
assertTrue(HoodieTestUtils.doesDataFileExist(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "001", if (enableBootstrapSourceClean) {
assertFalse(new File(bootstrapMapping.get(
HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH).get(0).getBoostrapFileStatus().getPath().getUri()).exists());
}
assertTrue(HoodieTestUtils.doesDataFileExist(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "00000000000002",
file1P0C0)); file1P0C0));
assertTrue(HoodieTestUtils.doesDataFileExist(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "002", assertTrue(HoodieTestUtils.doesDataFileExist(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "00000000000003",
file1P0C0)); file1P0C0));
assertTrue(HoodieTestUtils.doesDataFileExist(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "001", assertTrue(HoodieTestUtils.doesDataFileExist(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "00000000000002",
file2P0C1)); file2P0C1));
assertTrue(HoodieTestUtils.doesDataFileExist(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "002", assertTrue(HoodieTestUtils.doesDataFileExist(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "00000000000003",
file2P0C1)); file2P0C1));
assertTrue(HoodieTestUtils.doesDataFileExist(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "002", assertTrue(HoodieTestUtils.doesDataFileExist(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "00000000000003",
file3P0C2)); file3P0C2));
assertTrue(HoodieTestUtils.doesDataFileExist(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "003", assertTrue(HoodieTestUtils.doesDataFileExist(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "00000000000004",
file4P0C3)); file4P0C3));
// No cleaning on partially written file, with no commit. // No cleaning on partially written file, with no commit.
HoodieTestUtils HoodieTestUtils
.createDataFile(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "004", file3P0C2); // update .createDataFile(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "00000000000005", file3P0C2); // update
commitMetadata = generateCommitMetadata(CollectionUtils.createImmutableMap(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, commitMetadata = generateCommitMetadata(CollectionUtils.createImmutableMap(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH,
CollectionUtils.createImmutableList(file3P0C2))); CollectionUtils.createImmutableList(file3P0C2)));
metaClient.getActiveTimeline().createNewInstant( metaClient.getActiveTimeline().createNewInstant(
new HoodieInstant(State.REQUESTED, HoodieTimeline.COMMIT_ACTION, "004")); new HoodieInstant(State.REQUESTED, HoodieTimeline.COMMIT_ACTION, "00000000000005"));
metaClient.getActiveTimeline().transitionRequestedToInflight( metaClient.getActiveTimeline().transitionRequestedToInflight(
new HoodieInstant(State.REQUESTED, HoodieTimeline.COMMIT_ACTION, "004"), new HoodieInstant(State.REQUESTED, HoodieTimeline.COMMIT_ACTION, "00000000000005"),
Option.of(commitMetadata.toJsonString().getBytes(StandardCharsets.UTF_8))); Option.of(commitMetadata.toJsonString().getBytes(StandardCharsets.UTF_8)));
List<HoodieCleanStat> hoodieCleanStatsFive = runCleaner(config, simulateFailureRetry); List<HoodieCleanStat> hoodieCleanStatsFive = runCleaner(config, simulateFailureRetry);
HoodieCleanStat cleanStat = getCleanStat(hoodieCleanStatsFive, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH); HoodieCleanStat cleanStat = getCleanStat(hoodieCleanStatsFive, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH);
assertEquals(0, assertEquals(0,
cleanStat != null ? cleanStat.getSuccessDeleteFiles().size() : 0, "Must not clean any files"); cleanStat != null ? cleanStat.getSuccessDeleteFiles().size() : 0, "Must not clean any files");
assertTrue(HoodieTestUtils.doesDataFileExist(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "001", assertTrue(HoodieTestUtils.doesDataFileExist(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "00000000000002",
file1P0C0)); file1P0C0));
assertTrue(HoodieTestUtils.doesDataFileExist(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "001", assertTrue(HoodieTestUtils.doesDataFileExist(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "00000000000002",
file2P0C1)); file2P0C1));
} }
/**
* Generate Bootstrap index, bootstrap base file and corresponding metaClient.
* @return Partition to BootstrapFileMapping Map
* @throws IOException
*/
private Map<String, List<BootstrapFileMapping>> generateBootstrapIndexAndSourceData() throws IOException {
// create bootstrap source data path
java.nio.file.Path sourcePath = tempDir.resolve("data");
java.nio.file.Files.createDirectories(sourcePath);
assertTrue(new File(sourcePath.toString()).exists());
// recreate metaClient with Bootstrap base path
metaClient = HoodieTestUtils.init(basePath, getTableType(), sourcePath.toString());
// generate bootstrap index
Map<String, List<BootstrapFileMapping>> bootstrapMapping = TestBootstrapIndex.generateBootstrapIndex(metaClient, sourcePath.toString(),
new String[] {HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH}, 1);
for (Map.Entry<String, List<BootstrapFileMapping>> entry : bootstrapMapping.entrySet()) {
new File(sourcePath.toString() + "/" + entry.getKey()).mkdirs();
assertTrue(new File(entry.getValue().get(0).getBoostrapFileStatus().getPath().getUri()).createNewFile());
}
return bootstrapMapping;
}
/** /**
* Test Cleaning functionality of table.rollback() API. * Test Cleaning functionality of table.rollback() API.
*/ */

View File

@@ -70,6 +70,7 @@
<import>${basedir}/src/main/avro/HoodieCompactionOperation.avsc</import> <import>${basedir}/src/main/avro/HoodieCompactionOperation.avsc</import>
<import>${basedir}/src/main/avro/HoodieSavePointMetadata.avsc</import> <import>${basedir}/src/main/avro/HoodieSavePointMetadata.avsc</import>
<import>${basedir}/src/main/avro/HoodieCompactionMetadata.avsc</import> <import>${basedir}/src/main/avro/HoodieCompactionMetadata.avsc</import>
<import>${basedir}/src/main/avro/HoodieCleanPartitionMetadata.avsc</import>
<import>${basedir}/src/main/avro/HoodieCleanMetadata.avsc</import> <import>${basedir}/src/main/avro/HoodieCleanMetadata.avsc</import>
<import>${basedir}/src/main/avro/HoodieCleanerPlan.avsc</import> <import>${basedir}/src/main/avro/HoodieCleanerPlan.avsc</import>
<import>${basedir}/src/main/avro/HoodieRollbackMetadata.avsc</import> <import>${basedir}/src/main/avro/HoodieRollbackMetadata.avsc</import>

View File

@@ -24,23 +24,22 @@
{"name": "totalFilesDeleted", "type": "int"}, {"name": "totalFilesDeleted", "type": "int"},
{"name": "earliestCommitToRetain", "type": "string"}, {"name": "earliestCommitToRetain", "type": "string"},
{"name": "partitionMetadata", "type": { {"name": "partitionMetadata", "type": {
"type" : "map", "values" : { "type" : "map", "values" : "HoodieCleanPartitionMetadata"
"type": "record", }
"name": "HoodieCleanPartitionMetadata",
"fields": [
{"name": "partitionPath", "type": "string"},
{"name": "policy", "type": "string"},
{"name": "deletePathPatterns", "type": {"type": "array", "items": "string"}},
{"name": "successDeleteFiles", "type": {"type": "array", "items": "string"}},
{"name": "failedDeleteFiles", "type": {"type": "array", "items": "string"}}
]
}
}
}, },
{ {
"name":"version", "name":"version",
"type":["int", "null"], "type":["int", "null"],
"default": 1 "default": 1
},
{
"name": "bootstrapPartitionMetadata",
"type": [ "null", {
"type" : "map",
"values" : "HoodieCleanPartitionMetadata",
"default" : null
}],
"default" : null
} }
] ]
} }

View File

@@ -0,0 +1,29 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
{
"namespace": "org.apache.hudi.avro.model",
"type": "record",
"name": "HoodieCleanPartitionMetadata",
"fields": [
{"name": "partitionPath", "type": "string"},
{"name": "policy", "type": "string"},
{"name": "deletePathPatterns", "type": {"type": "array", "items": "string"}},
{"name": "successDeleteFiles", "type": {"type": "array", "items": "string"}},
{"name": "failedDeleteFiles", "type": {"type": "array", "items": "string"}}
]
}

View File

@@ -47,6 +47,7 @@
"type": "string" "type": "string"
}, },
{ {
/** This is deprecated and replaced by the field filePathsToBeDeletedPerPartition **/
"name": "filesToBeDeletedPerPartition", "name": "filesToBeDeletedPerPartition",
"type": [ "type": [
"null", { "null", {
@@ -64,6 +65,33 @@
"name":"version", "name":"version",
"type":["int", "null"], "type":["int", "null"],
"default": 1 "default": 1
},
{
"name": "filePathsToBeDeletedPerPartition",
"doc": "This field replaces the field filesToBeDeletedPerPartition",
"type": [
"null", {
"type":"map",
"values": {
"type":"array",
"items":{
"name":"HoodieCleanFileInfo",
"type": "record",
"fields":[
{
"name":"filePath",
"type":["null","string"],
"default": null
},
{
"name":"isBootstrapBaseFile",
"type":["null","boolean"],
"default": null
}
]
}
}}],
"default" : null
} }
] ]
} }

View File

@@ -20,6 +20,7 @@ package org.apache.hudi.common;
import org.apache.hudi.common.model.HoodieCleaningPolicy; import org.apache.hudi.common.model.HoodieCleaningPolicy;
import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.util.CollectionUtils;
import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.Option;
import java.io.Serializable; import java.io.Serializable;
@@ -39,17 +40,35 @@ public class HoodieCleanStat implements Serializable {
private final List<String> successDeleteFiles; private final List<String> successDeleteFiles;
// Files that could not be deleted // Files that could not be deleted
private final List<String> failedDeleteFiles; private final List<String> failedDeleteFiles;
// Bootstrap Base Path patterns that were generated for the delete operation
private final List<String> deleteBootstrapBasePathPatterns;
private final List<String> successDeleteBootstrapBaseFiles;
// Files that could not be deleted
private final List<String> failedDeleteBootstrapBaseFiles;
// Earliest commit that was retained in this clean // Earliest commit that was retained in this clean
private final String earliestCommitToRetain; private final String earliestCommitToRetain;
public HoodieCleanStat(HoodieCleaningPolicy policy, String partitionPath, List<String> deletePathPatterns, public HoodieCleanStat(HoodieCleaningPolicy policy, String partitionPath, List<String> deletePathPatterns,
List<String> successDeleteFiles, List<String> failedDeleteFiles, String earliestCommitToRetain) { List<String> successDeleteFiles, List<String> failedDeleteFiles, String earliestCommitToRetain) {
this(policy, partitionPath, deletePathPatterns, successDeleteFiles, failedDeleteFiles, earliestCommitToRetain,
CollectionUtils.createImmutableList(), CollectionUtils.createImmutableList(),
CollectionUtils.createImmutableList());
}
public HoodieCleanStat(HoodieCleaningPolicy policy, String partitionPath, List<String> deletePathPatterns,
List<String> successDeleteFiles, List<String> failedDeleteFiles,
String earliestCommitToRetain, List<String> deleteBootstrapBasePathPatterns,
List<String> successDeleteBootstrapBaseFiles,
List<String> failedDeleteBootstrapBaseFiles) {
this.policy = policy; this.policy = policy;
this.partitionPath = partitionPath; this.partitionPath = partitionPath;
this.deletePathPatterns = deletePathPatterns; this.deletePathPatterns = deletePathPatterns;
this.successDeleteFiles = successDeleteFiles; this.successDeleteFiles = successDeleteFiles;
this.failedDeleteFiles = failedDeleteFiles; this.failedDeleteFiles = failedDeleteFiles;
this.earliestCommitToRetain = earliestCommitToRetain; this.earliestCommitToRetain = earliestCommitToRetain;
this.deleteBootstrapBasePathPatterns = deleteBootstrapBasePathPatterns;
this.successDeleteBootstrapBaseFiles = successDeleteBootstrapBaseFiles;
this.failedDeleteBootstrapBaseFiles = failedDeleteBootstrapBaseFiles;
} }
public HoodieCleaningPolicy getPolicy() { public HoodieCleaningPolicy getPolicy() {
@@ -72,6 +91,18 @@ public class HoodieCleanStat implements Serializable {
return failedDeleteFiles; return failedDeleteFiles;
} }
public List<String> getDeleteBootstrapBasePathPatterns() {
return deleteBootstrapBasePathPatterns;
}
public List<String> getSuccessDeleteBootstrapBaseFiles() {
return successDeleteBootstrapBaseFiles;
}
public List<String> getFailedDeleteBootstrapBaseFiles() {
return failedDeleteBootstrapBaseFiles;
}
public String getEarliestCommitToRetain() { public String getEarliestCommitToRetain() {
return earliestCommitToRetain; return earliestCommitToRetain;
} }
@@ -91,6 +122,9 @@ public class HoodieCleanStat implements Serializable {
private List<String> failedDeleteFiles; private List<String> failedDeleteFiles;
private String partitionPath; private String partitionPath;
private String earliestCommitToRetain; private String earliestCommitToRetain;
private List<String> deleteBootstrapBasePathPatterns;
private List<String> successDeleteBootstrapBaseFiles;
private List<String> failedDeleteBootstrapBaseFiles;
public Builder withPolicy(HoodieCleaningPolicy policy) { public Builder withPolicy(HoodieCleaningPolicy policy) {
this.policy = policy; this.policy = policy;
@@ -112,6 +146,21 @@ public class HoodieCleanStat implements Serializable {
return this; return this;
} }
public Builder withDeleteBootstrapBasePathPatterns(List<String> deletePathPatterns) {
this.deleteBootstrapBasePathPatterns = deletePathPatterns;
return this;
}
public Builder withSuccessfulDeleteBootstrapBaseFiles(List<String> successDeleteFiles) {
this.successDeleteBootstrapBaseFiles = successDeleteFiles;
return this;
}
public Builder withFailedDeleteBootstrapBaseFiles(List<String> failedDeleteFiles) {
this.failedDeleteBootstrapBaseFiles = failedDeleteFiles;
return this;
}
public Builder withPartitionPath(String partitionPath) { public Builder withPartitionPath(String partitionPath) {
this.partitionPath = partitionPath; this.partitionPath = partitionPath;
return this; return this;
@@ -125,7 +174,8 @@ public class HoodieCleanStat implements Serializable {
public HoodieCleanStat build() { public HoodieCleanStat build() {
return new HoodieCleanStat(policy, partitionPath, deletePathPatterns, successDeleteFiles, failedDeleteFiles, return new HoodieCleanStat(policy, partitionPath, deletePathPatterns, successDeleteFiles, failedDeleteFiles,
earliestCommitToRetain); earliestCommitToRetain, deleteBootstrapBasePathPatterns, successDeleteBootstrapBaseFiles,
failedDeleteBootstrapBaseFiles);
} }
} }
@@ -137,7 +187,10 @@ public class HoodieCleanStat implements Serializable {
+ ", deletePathPatterns=" + deletePathPatterns + ", deletePathPatterns=" + deletePathPatterns
+ ", successDeleteFiles=" + successDeleteFiles + ", successDeleteFiles=" + successDeleteFiles
+ ", failedDeleteFiles=" + failedDeleteFiles + ", failedDeleteFiles=" + failedDeleteFiles
+ ", earliestCommitToRetain='" + earliestCommitToRetain + '\'' + ", earliestCommitToRetain='" + earliestCommitToRetain
+ ", deleteBootstrapBasePathPatterns=" + deleteBootstrapBasePathPatterns
+ ", successDeleteBootstrapBaseFiles=" + successDeleteBootstrapBaseFiles
+ ", failedDeleteBootstrapBaseFiles=" + failedDeleteBootstrapBaseFiles + '\''
+ '}'; + '}';
} }
} }

View File

@@ -0,0 +1,47 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.common.model;
import org.apache.hudi.avro.model.HoodieCleanFileInfo;
import java.io.Serializable;
public class CleanFileInfo implements Serializable {
private final String filePath;
private final boolean isBootstrapBaseFile;
public CleanFileInfo(String filePath, boolean isBootstrapBaseFile) {
this.filePath = filePath;
this.isBootstrapBaseFile = isBootstrapBaseFile;
}
public String getFilePath() {
return filePath;
}
public boolean isBootstrapBaseFile() {
return isBootstrapBaseFile;
}
public HoodieCleanFileInfo toHoodieFileCleanInfo() {
return new HoodieCleanFileInfo(filePath, isBootstrapBaseFile);
}
}

View File

@@ -29,7 +29,7 @@ public class CleanMetadataMigrator extends MetadataMigrator<HoodieCleanMetadata>
public CleanMetadataMigrator(HoodieTableMetaClient metaClient) { public CleanMetadataMigrator(HoodieTableMetaClient metaClient) {
super(metaClient, super(metaClient,
Arrays Arrays
.asList(new CleanV1MigrationHandler(metaClient), .asList(new CleanMetadataV1MigrationHandler(metaClient),
new CleanV2MigrationHandler(metaClient))); new CleanMetadataV2MigrationHandler(metaClient)));
} }
} }

View File

@@ -31,11 +31,11 @@ import org.apache.hadoop.fs.Path;
import java.util.Map; import java.util.Map;
import java.util.stream.Collectors; import java.util.stream.Collectors;
public class CleanV1MigrationHandler extends AbstractMigratorBase<HoodieCleanMetadata> { public class CleanMetadataV1MigrationHandler extends AbstractMigratorBase<HoodieCleanMetadata> {
public static final Integer VERSION = 1; public static final Integer VERSION = 1;
public CleanV1MigrationHandler(HoodieTableMetaClient metaClient) { public CleanMetadataV1MigrationHandler(HoodieTableMetaClient metaClient) {
super(metaClient); super(metaClient);
} }

View File

@@ -31,11 +31,11 @@ import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.stream.Collectors; import java.util.stream.Collectors;
public class CleanV2MigrationHandler extends AbstractMigratorBase<HoodieCleanMetadata> { public class CleanMetadataV2MigrationHandler extends AbstractMigratorBase<HoodieCleanMetadata> {
public static final Integer VERSION = 2; public static final Integer VERSION = 2;
public CleanV2MigrationHandler(HoodieTableMetaClient metaClient) { public CleanMetadataV2MigrationHandler(HoodieTableMetaClient metaClient) {
super(metaClient); super(metaClient);
} }

View File

@@ -0,0 +1,36 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.common.table.timeline.versioning.clean;
import org.apache.hudi.avro.model.HoodieCleanerPlan;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.versioning.MetadataMigrator;
import java.util.Arrays;
/**
* Manages upgrade/downgrade of cleaner plan.
*/
public class CleanPlanMigrator extends MetadataMigrator<HoodieCleanerPlan> {
public CleanPlanMigrator(HoodieTableMetaClient metaClient) {
super(metaClient,
Arrays.asList(new CleanPlanV1MigrationHandler(metaClient), new CleanPlanV2MigrationHandler(metaClient)));
}
}

View File

@@ -0,0 +1,66 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.common.table.timeline.versioning.clean;
import java.util.HashMap;
import org.apache.hudi.avro.model.HoodieCleanerPlan;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.versioning.AbstractMigratorBase;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hadoop.fs.Path;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
public class CleanPlanV1MigrationHandler extends AbstractMigratorBase<HoodieCleanerPlan> {
public static final Integer VERSION = 1;
public CleanPlanV1MigrationHandler(HoodieTableMetaClient metaClient) {
super(metaClient);
}
@Override
public Integer getManagedVersion() {
return VERSION;
}
@Override
public HoodieCleanerPlan upgradeFrom(HoodieCleanerPlan plan) {
throw new IllegalArgumentException(
"This is the lowest version. Plan cannot be any lower version");
}
@Override
public HoodieCleanerPlan downgradeFrom(HoodieCleanerPlan plan) {
if (metaClient.getTableConfig().getBootstrapBasePath().isPresent()) {
throw new IllegalArgumentException(
"This version do not support METADATA_ONLY bootstrapped tables. Failed to downgrade.");
}
Map<String, List<String>> filesPerPartition = plan.getFilePathsToBeDeletedPerPartition().entrySet().stream()
.map(e -> {
return Pair.of(e.getKey(), e.getValue().stream().map(v -> new Path(v.getFilePath()).getName())
.collect(Collectors.toList()));
}).collect(Collectors.toMap(Pair::getKey, Pair::getValue));
return new HoodieCleanerPlan(plan.getEarliestInstantToRetain(), plan.getPolicy(), filesPerPartition, VERSION,
new HashMap<>());
}
}

View File

@@ -0,0 +1,64 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.common.table.timeline.versioning.clean;
import org.apache.hudi.avro.model.HoodieCleanFileInfo;
import org.apache.hudi.avro.model.HoodieCleanerPlan;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.versioning.AbstractMigratorBase;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hadoop.fs.Path;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
public class CleanPlanV2MigrationHandler extends AbstractMigratorBase<HoodieCleanerPlan> {
public static final Integer VERSION = 2;
public CleanPlanV2MigrationHandler(HoodieTableMetaClient metaClient) {
super(metaClient);
}
@Override
public Integer getManagedVersion() {
return VERSION;
}
@Override
public HoodieCleanerPlan upgradeFrom(HoodieCleanerPlan plan) {
Map<String, List<HoodieCleanFileInfo>> filePathsPerPartition =
plan.getFilesToBeDeletedPerPartition().entrySet().stream().map(e -> Pair.of(e.getKey(), e.getValue().stream()
.map(v -> new HoodieCleanFileInfo(
new Path(FSUtils.getPartitionPath(metaClient.getBasePath(), e.getKey()), v).toString(), false))
.collect(Collectors.toList()))).collect(Collectors.toMap(Pair::getKey, Pair::getValue));
return new HoodieCleanerPlan(plan.getEarliestInstantToRetain(), plan.getPolicy(), new HashMap<>(), VERSION,
filePathsPerPartition);
}
@Override
public HoodieCleanerPlan downgradeFrom(HoodieCleanerPlan input) {
throw new IllegalArgumentException(
"This is the current highest version. Plan cannot be any higher version");
}
}

View File

@@ -252,7 +252,8 @@ public abstract class IncrementalTimelineSyncFileSystemView extends AbstractTabl
} }
/** /**
* Add newly found clean instant. * Add newly found clean instant. Note that cleaner metadata (.clean.completed)
* contains only relative paths unlike clean plans (.clean.requested) which contains absolute paths.
* *
* @param timeline Timeline * @param timeline Timeline
* @param instant Clean instant * @param instant Clean instant

View File

@@ -18,16 +18,20 @@
package org.apache.hudi.common.util; package org.apache.hudi.common.util;
import java.util.stream.Collectors;
import org.apache.hudi.avro.model.HoodieCleanFileInfo;
import org.apache.hudi.avro.model.HoodieCleanMetadata; import org.apache.hudi.avro.model.HoodieCleanMetadata;
import org.apache.hudi.avro.model.HoodieCleanPartitionMetadata; import org.apache.hudi.avro.model.HoodieCleanPartitionMetadata;
import org.apache.hudi.avro.model.HoodieCleanerPlan; import org.apache.hudi.avro.model.HoodieCleanerPlan;
import org.apache.hudi.common.HoodieCleanStat; import org.apache.hudi.common.HoodieCleanStat;
import org.apache.hudi.common.model.CleanFileInfo;
import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.TimelineMetadataUtils; import org.apache.hudi.common.table.timeline.TimelineMetadataUtils;
import org.apache.hudi.common.table.timeline.versioning.clean.CleanMetadataMigrator; import org.apache.hudi.common.table.timeline.versioning.clean.CleanMetadataMigrator;
import org.apache.hudi.common.table.timeline.versioning.clean.CleanV1MigrationHandler; import org.apache.hudi.common.table.timeline.versioning.clean.CleanMetadataV1MigrationHandler;
import org.apache.hudi.common.table.timeline.versioning.clean.CleanV2MigrationHandler; import org.apache.hudi.common.table.timeline.versioning.clean.CleanMetadataV2MigrationHandler;
import org.apache.hudi.common.table.timeline.versioning.clean.CleanPlanMigrator;
import java.io.IOException; import java.io.IOException;
import java.util.HashMap; import java.util.HashMap;
@@ -35,14 +39,16 @@ import java.util.List;
import java.util.Map; import java.util.Map;
public class CleanerUtils { public class CleanerUtils {
public static final Integer CLEAN_METADATA_VERSION_1 = CleanV1MigrationHandler.VERSION; public static final Integer CLEAN_METADATA_VERSION_1 = CleanMetadataV1MigrationHandler.VERSION;
public static final Integer CLEAN_METADATA_VERSION_2 = CleanV2MigrationHandler.VERSION; public static final Integer CLEAN_METADATA_VERSION_2 = CleanMetadataV2MigrationHandler.VERSION;
public static final Integer LATEST_CLEAN_METADATA_VERSION = CLEAN_METADATA_VERSION_2; public static final Integer LATEST_CLEAN_METADATA_VERSION = CLEAN_METADATA_VERSION_2;
public static HoodieCleanMetadata convertCleanMetadata(String startCleanTime, public static HoodieCleanMetadata convertCleanMetadata(String startCleanTime,
Option<Long> durationInMs, Option<Long> durationInMs,
List<HoodieCleanStat> cleanStats) { List<HoodieCleanStat> cleanStats) {
Map<String, HoodieCleanPartitionMetadata> partitionMetadataMap = new HashMap<>(); Map<String, HoodieCleanPartitionMetadata> partitionMetadataMap = new HashMap<>();
Map<String, HoodieCleanPartitionMetadata> partitionBootstrapMetadataMap = new HashMap<>();
int totalDeleted = 0; int totalDeleted = 0;
String earliestCommitToRetain = null; String earliestCommitToRetain = null;
for (HoodieCleanStat stat : cleanStats) { for (HoodieCleanStat stat : cleanStats) {
@@ -50,6 +56,13 @@ public class CleanerUtils {
new HoodieCleanPartitionMetadata(stat.getPartitionPath(), stat.getPolicy().name(), new HoodieCleanPartitionMetadata(stat.getPartitionPath(), stat.getPolicy().name(),
stat.getDeletePathPatterns(), stat.getSuccessDeleteFiles(), stat.getFailedDeleteFiles()); stat.getDeletePathPatterns(), stat.getSuccessDeleteFiles(), stat.getFailedDeleteFiles());
partitionMetadataMap.put(stat.getPartitionPath(), metadata); partitionMetadataMap.put(stat.getPartitionPath(), metadata);
if ((null != stat.getDeleteBootstrapBasePathPatterns())
&& (!stat.getDeleteBootstrapBasePathPatterns().isEmpty())) {
HoodieCleanPartitionMetadata bootstrapMetadata = new HoodieCleanPartitionMetadata(stat.getPartitionPath(),
stat.getPolicy().name(), stat.getDeleteBootstrapBasePathPatterns(), stat.getSuccessDeleteBootstrapBaseFiles(),
stat.getFailedDeleteBootstrapBaseFiles());
partitionBootstrapMetadataMap.put(stat.getPartitionPath(), bootstrapMetadata);
}
totalDeleted += stat.getSuccessDeleteFiles().size(); totalDeleted += stat.getSuccessDeleteFiles().size();
if (earliestCommitToRetain == null) { if (earliestCommitToRetain == null) {
// This will be the same for all partitions // This will be the same for all partitions
@@ -57,8 +70,8 @@ public class CleanerUtils {
} }
} }
return new HoodieCleanMetadata(startCleanTime, return new HoodieCleanMetadata(startCleanTime, durationInMs.orElseGet(() -> -1L), totalDeleted,
durationInMs.orElseGet(() -> -1L), totalDeleted, earliestCommitToRetain, partitionMetadataMap, CLEAN_METADATA_VERSION_2); earliestCommitToRetain, partitionMetadataMap, CLEAN_METADATA_VERSION_2, partitionBootstrapMetadataMap);
} }
/** /**
@@ -77,7 +90,7 @@ public class CleanerUtils {
} }
/** /**
* Get Cleaner Plan corresponding to a clean instant. * Get Latest version of cleaner plan corresponding to a clean instant.
* @param metaClient Hoodie Table Meta Client * @param metaClient Hoodie Table Meta Client
* @param cleanInstant Instant referring to clean action * @param cleanInstant Instant referring to clean action
* @return Cleaner plan corresponding to clean instant * @return Cleaner plan corresponding to clean instant
@@ -85,7 +98,18 @@ public class CleanerUtils {
*/ */
public static HoodieCleanerPlan getCleanerPlan(HoodieTableMetaClient metaClient, HoodieInstant cleanInstant) public static HoodieCleanerPlan getCleanerPlan(HoodieTableMetaClient metaClient, HoodieInstant cleanInstant)
throws IOException { throws IOException {
return TimelineMetadataUtils.deserializeAvroMetadata(metaClient.getActiveTimeline().readCleanerInfoAsBytes(cleanInstant).get(), CleanPlanMigrator cleanPlanMigrator = new CleanPlanMigrator(metaClient);
HoodieCleanerPlan.class); HoodieCleanerPlan cleanerPlan = TimelineMetadataUtils.deserializeAvroMetadata(
metaClient.getActiveTimeline().readCleanerInfoAsBytes(cleanInstant).get(), HoodieCleanerPlan.class);
return cleanPlanMigrator.upgradeToLatest(cleanerPlan, cleanerPlan.getVersion());
}
/**
* Convert list of cleanFileInfo instances to list of avro-generated HoodieCleanFileInfo instances.
* @param cleanFileInfoList
* @return
*/
public static List<HoodieCleanFileInfo> convertToHoodieCleanFileInfoList(List<CleanFileInfo> cleanFileInfoList) {
return cleanFileInfoList.stream().map(CleanFileInfo::toHoodieFileCleanInfo).collect(Collectors.toList());
} }
} }

View File

@@ -26,6 +26,7 @@ import org.apache.hudi.common.bootstrap.index.BootstrapIndex.IndexWriter;
import org.apache.hudi.common.bootstrap.index.HFileBootstrapIndex; import org.apache.hudi.common.bootstrap.index.HFileBootstrapIndex;
import org.apache.hudi.common.model.BootstrapFileMapping; import org.apache.hudi.common.model.BootstrapFileMapping;
import org.apache.hudi.common.model.HoodieFileGroupId; import org.apache.hudi.common.model.HoodieFileGroupId;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.testutils.HoodieCommonTestHarness; import org.apache.hudi.common.testutils.HoodieCommonTestHarness;
import org.apache.hudi.common.util.collection.Pair; import org.apache.hudi.common.util.collection.Pair;
@@ -86,7 +87,7 @@ public class TestBootstrapIndex extends HoodieCommonTestHarness {
@Test @Test
public void testBootstrapIndexConcurrent() throws Exception { public void testBootstrapIndexConcurrent() throws Exception {
Map<String, List<BootstrapFileMapping>> bootstrapMapping = generateBootstrapIndex(100); Map<String, List<BootstrapFileMapping>> bootstrapMapping = generateBootstrapIndex(metaClient, BOOTSTRAP_BASE_PATH, PARTITIONS, 100);
final int numThreads = 20; final int numThreads = 20;
final int numRequestsPerThread = 50; final int numRequestsPerThread = 50;
ExecutorService service = Executors.newFixedThreadPool(numThreads); ExecutorService service = Executors.newFixedThreadPool(numThreads);
@@ -111,15 +112,15 @@ public class TestBootstrapIndex extends HoodieCommonTestHarness {
} }
private void testBootstrapIndexOneRound(int numEntriesPerPartition) throws IOException { private void testBootstrapIndexOneRound(int numEntriesPerPartition) throws IOException {
Map<String, List<BootstrapFileMapping>> bootstrapMapping = generateBootstrapIndex(numEntriesPerPartition); Map<String, List<BootstrapFileMapping>> bootstrapMapping = generateBootstrapIndex(metaClient, BOOTSTRAP_BASE_PATH, PARTITIONS, numEntriesPerPartition);
validateBootstrapIndex(bootstrapMapping); validateBootstrapIndex(bootstrapMapping);
} }
private Map<String, List<BootstrapFileMapping>> generateBootstrapIndex(int numEntriesPerPartition) public static Map<String, List<BootstrapFileMapping>> generateBootstrapIndex(HoodieTableMetaClient metaClient,
throws IOException { String sourceBasePath, String[] partitions, int numEntriesPerPartition) {
Map<String, List<BootstrapFileMapping>> bootstrapMapping = generateBootstrapMapping(numEntriesPerPartition); Map<String, List<BootstrapFileMapping>> bootstrapMapping = generateBootstrapMapping(sourceBasePath, partitions, numEntriesPerPartition);
BootstrapIndex index = new HFileBootstrapIndex(metaClient); BootstrapIndex index = new HFileBootstrapIndex(metaClient);
try (IndexWriter writer = index.createWriter(BOOTSTRAP_BASE_PATH)) { try (IndexWriter writer = index.createWriter(sourceBasePath)) {
writer.begin(); writer.begin();
bootstrapMapping.entrySet().stream().forEach(e -> writer.appendNextPartition(e.getKey(), e.getValue())); bootstrapMapping.entrySet().stream().forEach(e -> writer.appendNextPartition(e.getKey(), e.getValue()));
writer.finish(); writer.finish();
@@ -162,13 +163,14 @@ public class TestBootstrapIndex extends HoodieCommonTestHarness {
} }
} }
private Map<String, List<BootstrapFileMapping>> generateBootstrapMapping(int numEntriesPerPartition) { private static Map<String, List<BootstrapFileMapping>> generateBootstrapMapping(String sourceBasePath,
return Arrays.stream(PARTITIONS).map(partition -> { String[] partitions, int numEntriesPerPartition) {
return Arrays.stream(partitions).map(partition -> {
return Pair.of(partition, IntStream.range(0, numEntriesPerPartition).mapToObj(idx -> { return Pair.of(partition, IntStream.range(0, numEntriesPerPartition).mapToObj(idx -> {
String hudiFileId = UUID.randomUUID().toString(); String hudiFileId = UUID.randomUUID().toString();
String sourceFileName = idx + ".parquet"; String sourceFileName = idx + ".parquet";
HoodieFileStatus sourceFileStatus = HoodieFileStatus.newBuilder() HoodieFileStatus sourceFileStatus = HoodieFileStatus.newBuilder()
.setPath(HoodiePath.newBuilder().setUri(BOOTSTRAP_BASE_PATH + "/" + partition + "/" + sourceFileName).build()) .setPath(HoodiePath.newBuilder().setUri(sourceBasePath + "/" + partition + "/" + sourceFileName).build())
.setLength(256 * 1024 * 1024L) .setLength(256 * 1024 * 1024L)
.setAccessTime(new Date().getTime()) .setAccessTime(new Date().getTime())
.setModificationTime(new Date().getTime() + 99999) .setModificationTime(new Date().getTime() + 99999)
@@ -179,7 +181,7 @@ public class TestBootstrapIndex extends HoodieCommonTestHarness {
.setPermission(HoodieFSPermission.newBuilder().setUserAction(FsAction.ALL.name()) .setPermission(HoodieFSPermission.newBuilder().setUserAction(FsAction.ALL.name())
.setGroupAction(FsAction.READ.name()).setOtherAction(FsAction.NONE.name()).setStickyBit(true).build()) .setGroupAction(FsAction.READ.name()).setOtherAction(FsAction.NONE.name()).setStickyBit(true).build())
.build(); .build();
return new BootstrapFileMapping(BOOTSTRAP_BASE_PATH, partition, partition, sourceFileStatus, hudiFileId); return new BootstrapFileMapping(sourceBasePath, partition, partition, sourceFileStatus, hudiFileId);
}).collect(Collectors.toList())); }).collect(Collectors.toList()));
}).collect(Collectors.toMap(Pair::getKey, Pair::getValue)); }).collect(Collectors.toMap(Pair::getKey, Pair::getValue));
} }

View File

@@ -45,6 +45,7 @@ import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieInstant.State; import org.apache.hudi.common.table.timeline.HoodieInstant.State;
import org.apache.hudi.common.table.timeline.HoodieTimeline; import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.table.timeline.TimelineMetadataUtils; import org.apache.hudi.common.table.timeline.TimelineMetadataUtils;
import org.apache.hudi.common.table.timeline.versioning.clean.CleanPlanV2MigrationHandler;
import org.apache.hudi.common.util.CleanerUtils; import org.apache.hudi.common.util.CleanerUtils;
import org.apache.hudi.common.util.CompactionUtils; import org.apache.hudi.common.util.CompactionUtils;
import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.Option;
@@ -219,7 +220,8 @@ public class HoodieTestUtils {
os = metaClient.getFs().create(commitFile, true); os = metaClient.getFs().create(commitFile, true);
// Write empty clean metadata // Write empty clean metadata
os.write(TimelineMetadataUtils.serializeCleanerPlan( os.write(TimelineMetadataUtils.serializeCleanerPlan(
new HoodieCleanerPlan(new HoodieActionInstant("", "", ""), "", new HashMap<>(), 1)).get()); new HoodieCleanerPlan(new HoodieActionInstant("", "", ""), "", new HashMap<>(),
CleanPlanV2MigrationHandler.VERSION, new HashMap<>())).get());
} catch (IOException ioe) { } catch (IOException ioe) {
throw new HoodieIOException(ioe.getMessage(), ioe); throw new HoodieIOException(ioe.getMessage(), ioe);
} finally { } finally {

View File

@@ -551,7 +551,7 @@ public class TestBootstrap extends HoodieClientTestBase {
} }
public static Dataset<Row> generateTestRawTripDataset(double timestamp, int from, int to, List<String> partitionPaths, public static Dataset<Row> generateTestRawTripDataset(double timestamp, int from, int to, List<String> partitionPaths,
JavaSparkContext jsc, SQLContext sqlContext) { JavaSparkContext jsc, SQLContext sqlContext) {
boolean isPartitioned = partitionPaths != null && !partitionPaths.isEmpty(); boolean isPartitioned = partitionPaths != null && !partitionPaths.isEmpty();
final List<String> records = new ArrayList<>(); final List<String> records = new ArrayList<>();
IntStream.range(from, to).forEach(i -> { IntStream.range(from, to).forEach(i -> {