diff --git a/hoodie-client/src/main/java/com/uber/hoodie/HoodieWriteClient.java b/hoodie-client/src/main/java/com/uber/hoodie/HoodieWriteClient.java index 57d7f8ffa..17a94e2bc 100644 --- a/hoodie-client/src/main/java/com/uber/hoodie/HoodieWriteClient.java +++ b/hoodie-client/src/main/java/com/uber/hoodie/HoodieWriteClient.java @@ -50,7 +50,6 @@ import com.uber.hoodie.exception.HoodieSavepointException; import com.uber.hoodie.exception.HoodieUpsertException; import com.uber.hoodie.func.BulkInsertMapFunction; import com.uber.hoodie.index.HoodieIndex; -import com.uber.hoodie.io.HoodieCleaner; import com.uber.hoodie.io.HoodieCommitArchiveLog; import com.uber.hoodie.metrics.HoodieMetrics; import com.uber.hoodie.table.HoodieTable; @@ -738,25 +737,11 @@ public class HoodieWriteClient implements Seriali HoodieTable table = HoodieTable .getHoodieTable(new HoodieTableMetaClient(fs, config.getBasePath(), true), config); - List partitionsToClean = - FSUtils.getAllPartitionPaths(fs, table.getMetaClient().getBasePath(), config.shouldAssumeDatePartitioning()); - // shuffle to distribute cleaning work across partitions evenly - Collections.shuffle(partitionsToClean); - logger.info("Partitions to clean up : " + partitionsToClean + ", with policy " + config - .getCleanerPolicy()); - if (partitionsToClean.isEmpty()) { - logger.info("Nothing to clean here mom. It is already clean"); + List cleanStats = table.clean(jsc); + if (cleanStats.isEmpty()) { return; } - int cleanerParallelism = Math.min(partitionsToClean.size(), config.getCleanerParallelism()); - List cleanStats = jsc.parallelize(partitionsToClean, cleanerParallelism) - .map((Function) partitionPathToClean -> { - HoodieCleaner cleaner = new HoodieCleaner(table, config); - return cleaner.clean(partitionPathToClean); - }) - .collect(); - // Emit metrics (duration, numFilesDeleted) if needed Optional durationInMs = Optional.empty(); if (context != null) { diff --git a/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieCleaner.java b/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieCleanHelper.java similarity index 80% rename from hoodie-client/src/main/java/com/uber/hoodie/io/HoodieCleaner.java rename to hoodie-client/src/main/java/com/uber/hoodie/io/HoodieCleanHelper.java index c56f72b5f..27fd64d00 100644 --- a/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieCleaner.java +++ b/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieCleanHelper.java @@ -16,9 +16,6 @@ package com.uber.hoodie.io; -import com.clearspring.analytics.util.Lists; -import com.google.common.collect.Maps; -import com.uber.hoodie.common.HoodieCleanStat; import com.uber.hoodie.common.model.HoodieCleaningPolicy; import com.uber.hoodie.common.model.HoodieDataFile; import com.uber.hoodie.common.model.HoodieRecordPayload; @@ -30,22 +27,16 @@ import com.uber.hoodie.common.table.timeline.HoodieInstant; import com.uber.hoodie.common.util.FSUtils; import com.uber.hoodie.config.HoodieWriteConfig; import com.uber.hoodie.table.HoodieTable; -import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; -import org.apache.spark.api.java.function.FlatMapFunction; import java.io.IOException; import java.util.ArrayList; import java.util.Iterator; import java.util.List; -import java.util.Map; import java.util.Optional; -import java.util.function.Function; import java.util.stream.Collectors; -import java.util.stream.Stream; /** * Cleaner is responsible for garbage collecting older files in a given partition path, such that @@ -56,8 +47,8 @@ import java.util.stream.Stream; *

* TODO: Should all cleaning be done based on {@link com.uber.hoodie.common.model.HoodieCommitMetadata} */ -public class HoodieCleaner> { - private static Logger logger = LogManager.getLogger(HoodieCleaner.class); +public class HoodieCleanHelper> { + private static Logger logger = LogManager.getLogger(HoodieCleanHelper.class); private final TableFileSystemView fileSystemView; private final HoodieTimeline commitTimeline; @@ -65,7 +56,7 @@ public class HoodieCleaner> { private HoodieWriteConfig config; private FileSystem fs; - public HoodieCleaner(HoodieTable hoodieTable, HoodieWriteConfig config) { + public HoodieCleanHelper(HoodieTable hoodieTable, HoodieWriteConfig config) { this.hoodieTable = hoodieTable; this.fileSystemView = hoodieTable.getCompactedFileSystemView(); this.commitTimeline = hoodieTable.getCompletedCommitTimeline(); @@ -110,11 +101,11 @@ public class HoodieCleaner> { // Delete the remaining files while (commitItr.hasNext()) { HoodieDataFile nextRecord = commitItr.next(); - deletePaths.add(String.format("%s/%s/%s", config.getBasePath(), partitionPath, - nextRecord.getFileName())); + deletePaths.add(nextRecord.getFileStatus().getPath().toString()); if (hoodieTable.getMetaClient().getTableType() == HoodieTableType.MERGE_ON_READ) { // If merge on read, then clean the log files for the commits as well + // todo: fix below for MERGE_ON_READ deletePaths.add(String .format("%s/%s/%s", config.getBasePath(), partitionPath, FSUtils.maskWithoutLogVersion(nextRecord.getCommitTime(), @@ -158,8 +149,7 @@ public class HoodieCleaner> { // determine if we have enough commits, to start cleaning. if (commitTimeline.countInstants() > commitsRetained) { - HoodieInstant earliestCommitToRetain = - commitTimeline.nthInstant(commitTimeline.countInstants() - commitsRetained).get(); + HoodieInstant earliestCommitToRetain = getEarliestCommitToRetain().get(); List> fileVersions = fileSystemView.getEveryVersionInPartition(partitionPath) .collect(Collectors.toList()); @@ -192,12 +182,11 @@ public class HoodieCleaner> { fileCommitTime, HoodieTimeline.GREATER)) { // this is a commit, that should be cleaned. - deletePaths.add(String - .format("%s/%s/%s", config.getBasePath(), partitionPath, FSUtils - .maskWithoutTaskPartitionId(fileCommitTime, afile.getFileId()))); + deletePaths.add(afile.getFileStatus().getPath().toString()); if (hoodieTable.getMetaClient().getTableType() == HoodieTableType.MERGE_ON_READ) { // If merge on read, then clean the log files for the commits as well + // todo: fix below for MERGE_ON_READ deletePaths.add(String .format("%s/%s/%s", config.getBasePath(), partitionPath, FSUtils.maskWithoutLogVersion(fileCommitTime, afile.getFileId(), @@ -228,49 +217,36 @@ public class HoodieCleaner> { return null; } - /** - * Performs cleaning of the partition path according to cleaning policy and returns the number - * of files cleaned. - * - * @throws IllegalArgumentException if unknown cleaning policy is provided + * Returns files to be cleaned for the given partitionPath based on cleaning policy. */ - public HoodieCleanStat clean(String partitionPath) throws IOException { + public List getDeletePaths(String partitionPath) throws IOException { HoodieCleaningPolicy policy = config.getCleanerPolicy(); List deletePaths; - Optional earliestCommitToRetain = Optional.empty(); if (policy == HoodieCleaningPolicy.KEEP_LATEST_COMMITS) { deletePaths = getFilesToCleanKeepingLatestCommits(partitionPath); - int commitsRetained = config.getCleanerCommitsRetained(); - if (commitTimeline.countInstants() > commitsRetained) { - earliestCommitToRetain = - commitTimeline.nthInstant(commitTimeline.countInstants() - commitsRetained); - } } else if (policy == HoodieCleaningPolicy.KEEP_LATEST_FILE_VERSIONS) { deletePaths = getFilesToCleanKeepingLatestVersions(partitionPath); } else { throw new IllegalArgumentException("Unknown cleaning policy : " + policy.name()); } + logger.info( + deletePaths.size() + " patterns used to delete in partition path:" + partitionPath); - // perform the actual deletes - Map deletedFiles = Maps.newHashMap(); - for (String deletePath : deletePaths) { - logger.info("Working on delete path :" + deletePath); - FileStatus[] deleteVersions = fs.globStatus(new Path(deletePath)); - if (deleteVersions != null) { - for (FileStatus deleteVersion : deleteVersions) { - boolean deleteResult = fs.delete(deleteVersion.getPath(), false); - deletedFiles.put(deleteVersion, deleteResult); - if (deleteResult) { - logger.info("Cleaned file at path :" + deleteVersion.getPath()); - } - } - } + return deletePaths; + } + + /** + * Returns earliest commit to retain based on cleaning policy. + */ + public Optional getEarliestCommitToRetain() { + Optional earliestCommitToRetain = Optional.empty(); + int commitsRetained = config.getCleanerCommitsRetained(); + if (config.getCleanerPolicy() == HoodieCleaningPolicy.KEEP_LATEST_COMMITS + && commitTimeline.countInstants() > commitsRetained) { + earliestCommitToRetain = + commitTimeline.nthInstant(commitTimeline.countInstants() - commitsRetained); } - - logger.info(deletePaths.size() + " patterns used to delete in partition path:" + partitionPath); - return HoodieCleanStat.newBuilder().withPolicy(policy).withDeletePathPattern(deletePaths) - .withPartitionPath(partitionPath).withEarliestCommitRetained(earliestCommitToRetain) - .withDeletedFileResults(deletedFiles).build(); + return earliestCommitToRetain; } } diff --git a/hoodie-client/src/main/java/com/uber/hoodie/table/HoodieCopyOnWriteTable.java b/hoodie-client/src/main/java/com/uber/hoodie/table/HoodieCopyOnWriteTable.java index 20638a2c1..ffe30dcd9 100644 --- a/hoodie-client/src/main/java/com/uber/hoodie/table/HoodieCopyOnWriteTable.java +++ b/hoodie-client/src/main/java/com/uber/hoodie/table/HoodieCopyOnWriteTable.java @@ -23,28 +23,18 @@ import com.uber.hoodie.common.table.HoodieTimeline; import com.uber.hoodie.common.table.timeline.HoodieInstant; import com.uber.hoodie.config.HoodieWriteConfig; import com.uber.hoodie.WriteStatus; +import com.uber.hoodie.common.HoodieCleanStat; import com.uber.hoodie.common.model.HoodieCommitMetadata; import com.uber.hoodie.common.model.HoodieKey; import com.uber.hoodie.common.model.HoodieRecord; import com.uber.hoodie.common.model.HoodieRecordLocation; import com.uber.hoodie.common.model.HoodieRecordPayload; import com.uber.hoodie.common.util.FSUtils; +import com.uber.hoodie.exception.HoodieIOException; import com.uber.hoodie.exception.HoodieUpsertException; import com.uber.hoodie.func.LazyInsertIterable; +import com.uber.hoodie.io.HoodieCleanHelper; import com.uber.hoodie.io.HoodieUpdateHandle; - -import java.util.Optional; -import org.apache.avro.generic.GenericRecord; -import org.apache.avro.generic.IndexedRecord; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileSystem; -import org.apache.log4j.LogManager; -import org.apache.log4j.Logger; -import org.apache.parquet.avro.AvroParquetReader; -import org.apache.parquet.avro.AvroReadSupport; -import org.apache.parquet.hadoop.ParquetReader; -import org.apache.spark.Partitioner; - import java.io.IOException; import java.io.Serializable; import java.util.ArrayList; @@ -53,13 +43,26 @@ import java.util.HashMap; import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.Random; import java.util.Set; import java.util.stream.Collectors; - +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.generic.IndexedRecord; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; +import org.apache.parquet.avro.AvroParquetReader; +import org.apache.parquet.avro.AvroReadSupport; +import org.apache.parquet.hadoop.ParquetReader; +import org.apache.spark.Partitioner; import org.apache.spark.api.java.JavaSparkContext; -import scala.Option; +import org.apache.spark.api.java.function.Function2; +import org.apache.spark.api.java.function.PairFlatMapFunction; import scala.Tuple2; +import scala.Option; /** * Implementation of a very heavily read-optimized Hoodie Table where @@ -479,4 +482,145 @@ public class HoodieCopyOnWriteTable extends Hoodi logger.info("Nothing to compact in COW storage format"); return Optional.empty(); } + + /** + * Performs cleaning of partition paths according to cleaning policy and returns the number + * of files cleaned. Handles skews in partitions to clean by making files to clean as the + * unit of task distribution. + * + * @throws IllegalArgumentException if unknown cleaning policy is provided + */ + @Override + public List clean(JavaSparkContext jsc) { + List partitionCleanStats; + try { + List partitionsToClean = + FSUtils.getAllPartitionPaths(getFs(), getMetaClient().getBasePath(), config.shouldAssumeDatePartitioning()); + logger.info("Partitions to clean up : " + partitionsToClean + ", with policy " + config + .getCleanerPolicy()); + partitionCleanStats = cleanPartitionPaths(partitionsToClean, jsc); + if (partitionsToClean.isEmpty()) { + logger.info("Nothing to clean here mom. It is already clean"); + return partitionCleanStats; + } + } catch (IOException e) { + throw new HoodieIOException("Failed to clean up after commit", e); + } + return partitionCleanStats; + } + + private static class PartitionCleanStat implements Serializable { + private final String partitionPath; + private final List deletePathPatterns = new ArrayList<>(); + private final List successDeleteFiles = new ArrayList<>(); + private final List failedDeleteFiles = new ArrayList<>(); + + private PartitionCleanStat(String partitionPath) { + this.partitionPath = partitionPath; + } + + private void addDeletedFileResult(String deletePathStr, Boolean deletedFileResult) { + if (deletedFileResult) { + successDeleteFiles.add(deletePathStr); + } else { + failedDeleteFiles.add(deletePathStr); + } + } + + private void addDeleteFilePatterns(String deletePathStr) { + deletePathPatterns.add(deletePathStr); + } + + private PartitionCleanStat merge(PartitionCleanStat other) { + if (!this.partitionPath.equals(other.partitionPath)) { + throw new RuntimeException(String.format( + "partitionPath is not a match: (%s, %s)", + partitionPath, other.partitionPath)); + } + successDeleteFiles.addAll(other.successDeleteFiles); + deletePathPatterns.addAll(other.deletePathPatterns); + failedDeleteFiles.addAll(other.failedDeleteFiles); + return this; + } + } + + private List cleanPartitionPaths(List partitionsToClean, JavaSparkContext jsc) { + int cleanerParallelism = Math.min(partitionsToClean.size(), config.getCleanerParallelism()); + logger.info("Using cleanerParallelism: " + cleanerParallelism); + List> partitionCleanStats = jsc + .parallelize(partitionsToClean, cleanerParallelism) + .flatMapToPair(getFilesToDeleteFunc(this, config)) + .repartition(cleanerParallelism) // repartition to remove skews + .mapPartitionsToPair(deleteFilesFunc(this, config)) + .reduceByKey( // merge partition level clean stats below + (Function2) (e1, e2) -> e1 + .merge(e2)) + .collect(); + + Map partitionCleanStatsMap = partitionCleanStats + .stream().collect(Collectors.toMap(e -> e._1(), e -> e._2())); + + HoodieCleanHelper cleaner = new HoodieCleanHelper(this, config); + // Return PartitionCleanStat for each partition passed. + return partitionsToClean.stream().map(partitionPath -> { + PartitionCleanStat partitionCleanStat = + (partitionCleanStatsMap.containsKey(partitionPath)) ? + partitionCleanStatsMap.get(partitionPath) + : new PartitionCleanStat(partitionPath); + return HoodieCleanStat.newBuilder() + .withPolicy(config.getCleanerPolicy()) + .withPartitionPath(partitionPath) + .withEarliestCommitRetained(cleaner.getEarliestCommitToRetain()) + .withDeletePathPattern(partitionCleanStat.deletePathPatterns) + .withSuccessfulDeletes(partitionCleanStat.successDeleteFiles) + .withFailedDeletes(partitionCleanStat.failedDeleteFiles) + .build(); + }).collect(Collectors.toList()); + } + + private PairFlatMapFunction>, String, PartitionCleanStat> deleteFilesFunc( + HoodieTable table, HoodieWriteConfig config) { + return (PairFlatMapFunction>, String, PartitionCleanStat>) iter -> { + HoodieCleanHelper cleaner = new HoodieCleanHelper(table, config); + Map partitionCleanStatMap = new HashMap<>(); + + while (iter.hasNext()) { + Tuple2 partitionDelFileTuple = iter.next(); + String partitionPath = partitionDelFileTuple._1(); + String deletePathStr = partitionDelFileTuple._2(); + Boolean deletedFileResult = deleteFileAndGetResult(deletePathStr); + if (!partitionCleanStatMap.containsKey(partitionPath)) { + partitionCleanStatMap.put(partitionPath, + new PartitionCleanStat(partitionPath)); + } + PartitionCleanStat partitionCleanStat = partitionCleanStatMap.get(partitionPath); + partitionCleanStat.addDeleteFilePatterns(deletePathStr); + partitionCleanStat.addDeletedFileResult(deletePathStr, deletedFileResult); + } + + return partitionCleanStatMap.entrySet().stream() + .map(e -> new Tuple2<>(e.getKey(), e.getValue())) + .collect(Collectors.toList()).iterator(); + }; + } + + private static PairFlatMapFunction getFilesToDeleteFunc( + HoodieTable table, HoodieWriteConfig config) { + return (PairFlatMapFunction) partitionPathToClean -> { + HoodieCleanHelper cleaner = new HoodieCleanHelper(table, config); + return cleaner.getDeletePaths(partitionPathToClean).stream() + .map(deleteFile -> new Tuple2<>(partitionPathToClean, deleteFile.toString())) + .iterator(); + }; + } + + private Boolean deleteFileAndGetResult(String deletePathStr) throws IOException { + Path deletePath = new Path(deletePathStr); + logger.debug("Working on delete path :" + deletePath); + boolean deleteResult = getFs().delete(deletePath, false); + if (deleteResult) { + logger.debug("Cleaned file at path :" + deletePath); + } + return deleteResult; + } } diff --git a/hoodie-client/src/main/java/com/uber/hoodie/table/HoodieTable.java b/hoodie-client/src/main/java/com/uber/hoodie/table/HoodieTable.java index 2d1097757..6f0acbac3 100644 --- a/hoodie-client/src/main/java/com/uber/hoodie/table/HoodieTable.java +++ b/hoodie-client/src/main/java/com/uber/hoodie/table/HoodieTable.java @@ -17,8 +17,12 @@ package com.uber.hoodie.table; import com.google.common.collect.Sets; +import com.uber.hoodie.WriteStatus; import com.uber.hoodie.avro.model.HoodieSavepointMetadata; +import com.uber.hoodie.common.HoodieCleanStat; import com.uber.hoodie.common.model.HoodieCompactionMetadata; +import com.uber.hoodie.common.model.HoodieRecord; +import com.uber.hoodie.common.model.HoodieRecordPayload; import com.uber.hoodie.common.table.HoodieTableMetaClient; import com.uber.hoodie.common.table.HoodieTimeline; import com.uber.hoodie.common.table.TableFileSystemView; @@ -27,23 +31,20 @@ import com.uber.hoodie.common.table.timeline.HoodieInstant; import com.uber.hoodie.common.table.view.HoodieTableFileSystemView; import com.uber.hoodie.common.util.AvroUtils; import com.uber.hoodie.config.HoodieWriteConfig; -import com.uber.hoodie.WriteStatus; -import com.uber.hoodie.common.model.HoodieRecord; -import com.uber.hoodie.common.model.HoodieRecordPayload; import com.uber.hoodie.exception.HoodieCommitException; import com.uber.hoodie.exception.HoodieException; - import com.uber.hoodie.exception.HoodieSavepointException; -import java.util.Optional; -import org.apache.hadoop.fs.FileSystem; -import org.apache.spark.Partitioner; - import java.io.IOException; import java.io.Serializable; import java.util.Iterator; import java.util.List; +import java.util.Optional; import java.util.stream.Collectors; import java.util.stream.Stream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; +import org.apache.spark.Partitioner; import org.apache.spark.api.java.JavaSparkContext; /** @@ -52,6 +53,7 @@ import org.apache.spark.api.java.JavaSparkContext; public abstract class HoodieTable implements Serializable { protected final HoodieWriteConfig config; protected final HoodieTableMetaClient metaClient; + private static Logger logger = LogManager.getLogger(HoodieTable.class); protected HoodieTable(HoodieWriteConfig config, HoodieTableMetaClient metaClient) { this.config = config; @@ -299,4 +301,10 @@ public abstract class HoodieTable implements Seri * Compaction arranges the data so that it is optimized for data access */ public abstract Optional compact(JavaSparkContext jsc); + + /** + * Clean partition paths according to cleaning policy and returns the number + * of files cleaned. + */ + public abstract List clean(JavaSparkContext jsc); } diff --git a/hoodie-client/src/test/java/com/uber/hoodie/TestHoodieClient.java b/hoodie-client/src/test/java/com/uber/hoodie/TestHoodieClient.java index 0f3be9901..d8e0d4cb7 100644 --- a/hoodie-client/src/test/java/com/uber/hoodie/TestHoodieClient.java +++ b/hoodie-client/src/test/java/com/uber/hoodie/TestHoodieClient.java @@ -18,6 +18,7 @@ package com.uber.hoodie; import com.google.common.collect.Iterables; +import com.uber.hoodie.common.HoodieCleanStat; import com.uber.hoodie.common.HoodieClientTestUtils; import com.uber.hoodie.common.HoodieTestDataGenerator; import com.uber.hoodie.common.model.HoodieCleaningPolicy; @@ -42,13 +43,17 @@ import com.uber.hoodie.exception.HoodieRollbackException; import com.uber.hoodie.index.HoodieIndex; import com.uber.hoodie.table.HoodieTable; +import java.util.Map; import org.apache.avro.generic.GenericRecord; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.scheduler.SparkListener; +import org.apache.spark.scheduler.SparkListenerTaskEnd; import org.apache.spark.sql.SQLContext; +import org.apache.spark.util.AccumulatorV2; import org.junit.After; import org.junit.Before; import org.junit.Test; @@ -68,6 +73,7 @@ import java.util.Set; import java.util.TreeSet; import java.util.stream.Collectors; import java.util.stream.Stream; +import scala.collection.Iterator; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; @@ -78,8 +84,8 @@ public class TestHoodieClient implements Serializable { private transient JavaSparkContext jsc = null; private transient SQLContext sqlContext; private String basePath = null; - private transient HoodieTestDataGenerator - dataGen = null; + private transient HoodieTestDataGenerator dataGen = null; + private String[] partitionPaths = {"2016/01/01", "2016/02/02", "2016/06/02"}; @Before public void init() throws IOException { @@ -1070,8 +1076,249 @@ public class TestHoodieClient implements Serializable { assertEquals("Total number of records must add up", totalInserts, inserts1.size() + inserts2.size() + insert3.size()); } + @Test + public void testKeepLatestFileVersions() throws IOException { + HoodieWriteConfig config = HoodieWriteConfig.newBuilder().withPath(basePath) + .withAssumeDatePartitioning(true) + .withCompactionConfig(HoodieCompactionConfig.newBuilder() + .withCleanerPolicy(HoodieCleaningPolicy.KEEP_LATEST_FILE_VERSIONS) + .retainFileVersions(1).build()).build(); + // make 1 commit, with 1 file per partition + HoodieTestUtils.createCommitFiles(basePath, "000"); + String file1P0C0 = HoodieTestUtils.createNewDataFile(basePath, partitionPaths[0], "000"); + String file1P1C0 = HoodieTestUtils.createNewDataFile(basePath, partitionPaths[1], "000"); + HoodieTable table = HoodieTable + .getHoodieTable(new HoodieTableMetaClient(FSUtils.getFs(), config.getBasePath(), true), config); + + List hoodieCleanStatsOne = table.clean(jsc); + assertEquals("Must not clean any files" , 0, getCleanStat(hoodieCleanStatsOne, partitionPaths[0]).getSuccessDeleteFiles().size()); + assertEquals("Must not clean any files" , 0, getCleanStat(hoodieCleanStatsOne, partitionPaths[1]).getSuccessDeleteFiles().size()); + assertTrue(HoodieTestUtils.doesDataFileExist(basePath, partitionPaths[0], "000", file1P0C0)); + assertTrue(HoodieTestUtils.doesDataFileExist(basePath, partitionPaths[1], "000", file1P1C0)); + + // make next commit, with 1 insert & 1 update per partition + HoodieTestUtils.createCommitFiles(basePath, "001"); + table = HoodieTable + .getHoodieTable(new HoodieTableMetaClient(FSUtils.getFs(), config.getBasePath(), true), config); + + String file2P0C1 = HoodieTestUtils.createNewDataFile(basePath, partitionPaths[0], "001"); // insert + String file2P1C1 = HoodieTestUtils.createNewDataFile(basePath, partitionPaths[1], "001"); // insert + HoodieTestUtils.createDataFile(basePath, partitionPaths[0], "001", file1P0C0); // update + HoodieTestUtils.createDataFile(basePath, partitionPaths[1], "001", file1P1C0); // update + + List hoodieCleanStatsTwo = table.clean(jsc); + assertEquals("Must clean 1 file" , 1, getCleanStat(hoodieCleanStatsTwo, partitionPaths[0]).getSuccessDeleteFiles().size()); + assertEquals("Must clean 1 file" , 1, getCleanStat(hoodieCleanStatsTwo, partitionPaths[1]).getSuccessDeleteFiles().size()); + assertTrue(HoodieTestUtils.doesDataFileExist(basePath, partitionPaths[0], "001", file2P0C1)); + assertTrue(HoodieTestUtils.doesDataFileExist(basePath, partitionPaths[1], "001", file2P1C1)); + assertFalse(HoodieTestUtils.doesDataFileExist(basePath, partitionPaths[0], "000", file1P0C0)); + assertFalse(HoodieTestUtils.doesDataFileExist(basePath, partitionPaths[1], "000", file1P1C0)); + + // make next commit, with 2 updates to existing files, and 1 insert + HoodieTestUtils.createCommitFiles(basePath, "002"); + table = HoodieTable + .getHoodieTable(new HoodieTableMetaClient(FSUtils.getFs(), config.getBasePath(), true), config); + + HoodieTestUtils.createDataFile(basePath, partitionPaths[0], "002", file1P0C0); // update + HoodieTestUtils.createDataFile(basePath, partitionPaths[0], "002", file2P0C1); // update + String file3P0C2 = HoodieTestUtils.createNewDataFile(basePath, partitionPaths[0], "002"); + + List hoodieCleanStatsThree = table.clean(jsc); + assertEquals("Must clean two files" , 2, getCleanStat(hoodieCleanStatsThree, partitionPaths[0]).getSuccessDeleteFiles().size()); + assertFalse(HoodieTestUtils.doesDataFileExist(basePath, partitionPaths[0], "001", file1P0C0)); + assertFalse(HoodieTestUtils.doesDataFileExist(basePath, partitionPaths[0], "001", file2P0C1)); + assertTrue(HoodieTestUtils.doesDataFileExist(basePath, partitionPaths[0], "002", file3P0C2)); + + // No cleaning on partially written file, with no commit. + HoodieTestUtils.createDataFile(basePath, partitionPaths[0], "003", file3P0C2); // update + List hoodieCleanStatsFour = table.clean(jsc); + assertEquals("Must not clean any files" , 0, getCleanStat(hoodieCleanStatsFour, partitionPaths[0]).getSuccessDeleteFiles().size()); + assertTrue(HoodieTestUtils.doesDataFileExist(basePath, partitionPaths[0], "002", file3P0C2)); + } + + @Test + public void testKeepLatestCommits() throws IOException { + HoodieWriteConfig config = HoodieWriteConfig.newBuilder().withPath(basePath) + .withAssumeDatePartitioning(true) + .withCompactionConfig(HoodieCompactionConfig.newBuilder() + .withCleanerPolicy(HoodieCleaningPolicy.KEEP_LATEST_COMMITS) + .retainCommits(2).build()).build(); + + // make 1 commit, with 1 file per partition + HoodieTestUtils.createCommitFiles(basePath, "000"); + + String file1P0C0 = HoodieTestUtils.createNewDataFile(basePath, partitionPaths[0], "000"); + String file1P1C0 = HoodieTestUtils.createNewDataFile(basePath, partitionPaths[1], "000"); + + HoodieTable table = HoodieTable + .getHoodieTable(new HoodieTableMetaClient(FSUtils.getFs(), config.getBasePath(), true), config); + + List hoodieCleanStatsOne = table.clean(jsc); + assertEquals("Must not clean any files" , 0, getCleanStat(hoodieCleanStatsOne, partitionPaths[0]).getSuccessDeleteFiles().size()); + assertEquals("Must not clean any files" , 0, getCleanStat(hoodieCleanStatsOne, partitionPaths[1]).getSuccessDeleteFiles().size()); + assertTrue(HoodieTestUtils.doesDataFileExist(basePath, partitionPaths[0], "000", file1P0C0)); + assertTrue(HoodieTestUtils.doesDataFileExist(basePath, partitionPaths[1], "000", file1P1C0)); + + // make next commit, with 1 insert & 1 update per partition + HoodieTestUtils.createCommitFiles(basePath, "001"); + table = HoodieTable + .getHoodieTable(new HoodieTableMetaClient(FSUtils.getFs(), config.getBasePath(), true), config); + + String file2P0C1 = HoodieTestUtils.createNewDataFile(basePath, partitionPaths[0], "001"); // insert + String file2P1C1 = HoodieTestUtils.createNewDataFile(basePath, partitionPaths[1], "001"); // insert + HoodieTestUtils.createDataFile(basePath, partitionPaths[0], "001", file1P0C0); // update + HoodieTestUtils.createDataFile(basePath, partitionPaths[1], "001", file1P1C0); // update + + List hoodieCleanStatsTwo = table.clean(jsc); + assertEquals("Must not clean any files" , 0, getCleanStat(hoodieCleanStatsTwo, partitionPaths[0]).getSuccessDeleteFiles().size()); + assertEquals("Must not clean any files" , 0, getCleanStat(hoodieCleanStatsTwo, partitionPaths[1]).getSuccessDeleteFiles().size()); + assertTrue(HoodieTestUtils.doesDataFileExist(basePath, partitionPaths[0], "001", file2P0C1)); + assertTrue(HoodieTestUtils.doesDataFileExist(basePath, partitionPaths[1], "001", file2P1C1)); + assertTrue(HoodieTestUtils.doesDataFileExist(basePath, partitionPaths[0], "000", file1P0C0)); + assertTrue(HoodieTestUtils.doesDataFileExist(basePath, partitionPaths[1], "000", file1P1C0)); + + // make next commit, with 2 updates to existing files, and 1 insert + HoodieTestUtils.createCommitFiles(basePath, "002"); + table = HoodieTable + .getHoodieTable(new HoodieTableMetaClient(FSUtils.getFs(), config.getBasePath(), true), config); + + HoodieTestUtils.createDataFile(basePath, partitionPaths[0], "002", file1P0C0); // update + HoodieTestUtils.createDataFile(basePath, partitionPaths[0], "002", file2P0C1); // update + String file3P0C2 = HoodieTestUtils.createNewDataFile(basePath, partitionPaths[0], "002"); + + List hoodieCleanStatsThree = table.clean(jsc); + assertEquals( + "Must not clean any file. We have to keep 1 version before the latest commit time to keep", + 0, getCleanStat(hoodieCleanStatsThree, partitionPaths[0]).getSuccessDeleteFiles().size()); + + assertTrue(HoodieTestUtils.doesDataFileExist(basePath, partitionPaths[0], "000", file1P0C0)); + + // make next commit, with 2 updates to existing files, and 1 insert + HoodieTestUtils.createCommitFiles(basePath, "003"); + table = HoodieTable + .getHoodieTable(new HoodieTableMetaClient(FSUtils.getFs(), config.getBasePath(), true), config); + + HoodieTestUtils.createDataFile(basePath, partitionPaths[0], "003", file1P0C0); // update + HoodieTestUtils.createDataFile(basePath, partitionPaths[0], "003", file2P0C1); // update + String file4P0C3 = HoodieTestUtils.createNewDataFile(basePath, partitionPaths[0], "003"); + + List hoodieCleanStatsFour = table.clean(jsc); + assertEquals( + "Must not clean one old file", 1, getCleanStat(hoodieCleanStatsFour, partitionPaths[0]).getSuccessDeleteFiles().size()); + + assertFalse(HoodieTestUtils.doesDataFileExist(basePath, partitionPaths[0], "000", file1P0C0)); + assertTrue(HoodieTestUtils.doesDataFileExist(basePath, partitionPaths[0], "001", file1P0C0)); + assertTrue(HoodieTestUtils.doesDataFileExist(basePath, partitionPaths[0], "002", file1P0C0)); + assertTrue(HoodieTestUtils.doesDataFileExist(basePath, partitionPaths[0], "001", file2P0C1)); + assertTrue(HoodieTestUtils.doesDataFileExist(basePath, partitionPaths[0], "002", file2P0C1)); + assertTrue(HoodieTestUtils.doesDataFileExist(basePath, partitionPaths[0], "002", file3P0C2)); + assertTrue(HoodieTestUtils.doesDataFileExist(basePath, partitionPaths[0], "003", file4P0C3)); + + // No cleaning on partially written file, with no commit. + HoodieTestUtils.createDataFile(basePath, partitionPaths[0], "004", file3P0C2); // update + List hoodieCleanStatsFive = table.clean(jsc); + assertEquals("Must not clean any files" , 0, getCleanStat(hoodieCleanStatsFive, partitionPaths[0]).getSuccessDeleteFiles().size()); + assertTrue(HoodieTestUtils.doesDataFileExist(basePath, partitionPaths[0], "001", file1P0C0)); + assertTrue(HoodieTestUtils.doesDataFileExist(basePath, partitionPaths[0], "001", file2P0C1)); + } + + @Test + public void testCleaningSkewedPartitons() throws IOException { + HoodieWriteConfig config = HoodieWriteConfig.newBuilder().withPath(basePath) + .withAssumeDatePartitioning(true) + .withCompactionConfig(HoodieCompactionConfig.newBuilder() + .withCleanerPolicy(HoodieCleaningPolicy.KEEP_LATEST_COMMITS) + .retainCommits(2).build()).build(); + Map stageOneShuffleReadTaskRecordsCountMap = new HashMap<>(); + + // Since clean involves repartition in order to uniformly distribute data, + // we can inspect the number of records read by various tasks in stage 1. + // There should not be skew in the number of records read in the task. + + // SparkListener below listens to the stage end events and captures number of + // records read by various tasks in stage-1. + jsc.sc().addSparkListener(new SparkListener() { + + @Override + public void onTaskEnd(SparkListenerTaskEnd taskEnd) { + + Iterator> iterator = taskEnd.taskMetrics().accumulators() + .iterator(); + while(iterator.hasNext()) { + AccumulatorV2 accumulator = iterator.next(); + if (taskEnd.stageId() == 1 && + accumulator.isRegistered() && + accumulator.name().isDefined() && + accumulator.name().get().equals("internal.metrics.shuffle.read.recordsRead")) { + stageOneShuffleReadTaskRecordsCountMap.put(taskEnd.taskInfo().taskId(), (Long) accumulator.value()); + } + } + } + }); + + // make 1 commit, with 100 files in one partition and 10 in other two + HoodieTestUtils.createCommitFiles(basePath, "000"); + List filesP0C0 = createFilesInPartition(partitionPaths[0], "000", 100); + List filesP1C0 = createFilesInPartition(partitionPaths[1], "000", 10); + List filesP2C0 = createFilesInPartition(partitionPaths[2], "000", 10); + + HoodieTestUtils.createCommitFiles(basePath, "001"); + updateAllFilesInPartition(filesP0C0, partitionPaths[0], "001"); + updateAllFilesInPartition(filesP1C0, partitionPaths[1], "001"); + updateAllFilesInPartition(filesP2C0, partitionPaths[2], "001"); + + HoodieTestUtils.createCommitFiles(basePath, "002"); + updateAllFilesInPartition(filesP0C0, partitionPaths[0], "002"); + updateAllFilesInPartition(filesP1C0, partitionPaths[1], "002"); + updateAllFilesInPartition(filesP2C0, partitionPaths[2], "002"); + + HoodieTestUtils.createCommitFiles(basePath, "003"); + updateAllFilesInPartition(filesP0C0, partitionPaths[0], "003"); + updateAllFilesInPartition(filesP1C0, partitionPaths[1], "003"); + updateAllFilesInPartition(filesP2C0, partitionPaths[2], "003"); + + HoodieTable table = HoodieTable + .getHoodieTable(new HoodieTableMetaClient(FSUtils.getFs(), config.getBasePath(), true), config); + List hoodieCleanStats = table.clean(jsc); + + assertEquals(100, getCleanStat(hoodieCleanStats, partitionPaths[0]).getSuccessDeleteFiles().size()); + assertEquals(10, getCleanStat(hoodieCleanStats, partitionPaths[1]).getSuccessDeleteFiles().size()); + assertEquals(10, getCleanStat(hoodieCleanStats, partitionPaths[2]).getSuccessDeleteFiles().size()); + + // 3 tasks are expected since the number of partitions is 3 + assertEquals(3, stageOneShuffleReadTaskRecordsCountMap.keySet().size()); + // Sum of all records processed = total number of files to clean + assertEquals(120, stageOneShuffleReadTaskRecordsCountMap + .values().stream().reduce((a,b) -> a + b).get().intValue()); + assertTrue("The skew in handling files to clean is not removed. " + + "Each task should handle more records than the partitionPath with least files " + + "and less records than the partitionPath with most files.", + stageOneShuffleReadTaskRecordsCountMap.values().stream().filter(a -> a > 10 && a < 100).count() == 3); + } + + private HoodieCleanStat getCleanStat(List hoodieCleanStatsTwo, + String partitionPath) { + return hoodieCleanStatsTwo.stream() + .filter(e -> e.getPartitionPath().equals(partitionPath)) + .findFirst().get(); + } + + private void updateAllFilesInPartition(List files, String partitionPath, + String commitTime) throws IOException { + for (String fileId : files) { + HoodieTestUtils.createDataFile(basePath, partitionPath, commitTime, fileId); + } + } + + private List createFilesInPartition(String partitionPath, String commitTime, int numFiles) throws IOException { + List files = new ArrayList<>(); + for (int i = 0; i < numFiles; i++) { + files.add(HoodieTestUtils.createNewDataFile(basePath, partitionPath, commitTime)); + } + return files; + } @After public void clean() { diff --git a/hoodie-client/src/test/java/com/uber/hoodie/io/TestHoodieCleaner.java b/hoodie-client/src/test/java/com/uber/hoodie/io/TestHoodieCleaner.java deleted file mode 100644 index c2b37ee7e..000000000 --- a/hoodie-client/src/test/java/com/uber/hoodie/io/TestHoodieCleaner.java +++ /dev/null @@ -1,198 +0,0 @@ -/* - * Copyright (c) 2016 Uber Technologies, Inc. (hoodie-dev-group@uber.com) - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.uber.hoodie.io; - -import com.uber.hoodie.common.model.HoodieCleaningPolicy; -import com.uber.hoodie.common.table.HoodieTableMetaClient; -import com.uber.hoodie.common.table.timeline.HoodieInstant; -import com.uber.hoodie.config.HoodieWriteConfig; -import com.uber.hoodie.common.model.HoodieTestUtils; -import com.uber.hoodie.common.util.FSUtils; - -import com.uber.hoodie.config.HoodieCompactionConfig; -import com.uber.hoodie.table.HoodieTable; -import org.junit.Before; -import org.junit.Test; -import java.io.IOException; -import java.util.List; -import java.util.stream.Collectors; -import java.util.stream.Stream; - -import static org.junit.Assert.*; - -/** - * Tests around Cleaning logic in Hoodie - */ -public class TestHoodieCleaner { - - private String basePath = null; - private String[] partitionPaths = {"2016/01/01", "2016/02/02"}; - private HoodieTableMetaClient metaClient; - - @Before - public void init() throws Exception { - this.metaClient = HoodieTestUtils.initOnTemp(); - this.basePath = metaClient.getBasePath(); - } - - @Test - public void testKeepLatestFileVersions() throws IOException { - HoodieWriteConfig config = HoodieWriteConfig.newBuilder().withPath(basePath) - .withCompactionConfig(HoodieCompactionConfig.newBuilder() - .withCleanerPolicy(HoodieCleaningPolicy.KEEP_LATEST_FILE_VERSIONS) - .retainFileVersions(1).build()).build(); - - // make 1 commit, with 1 file per partition - HoodieTestUtils.createCommitFiles(basePath, "000"); - - String file1P0C0 = HoodieTestUtils.createNewDataFile(basePath, partitionPaths[0], "000"); - String file1P1C0 = HoodieTestUtils.createNewDataFile(basePath, partitionPaths[1], "000"); - - HoodieTableMetaClient metadata = new HoodieTableMetaClient(FSUtils.getFs(), basePath); - HoodieTable table = HoodieTable.getHoodieTable(metadata, config); - - HoodieCleaner cleaner = new HoodieCleaner(table, config); - assertEquals("Must not clean any files" , 0, cleaner.clean(partitionPaths[0]).getSuccessDeleteFiles().size()); - assertEquals("Must not clean any files" , 0, cleaner.clean(partitionPaths[1]).getSuccessDeleteFiles().size()); - assertTrue(HoodieTestUtils.doesDataFileExist(basePath, partitionPaths[0], "000", file1P0C0)); - assertTrue(HoodieTestUtils.doesDataFileExist(basePath, partitionPaths[1], "000", file1P1C0)); - - // make next commit, with 1 insert & 1 update per partition - HoodieTestUtils.createCommitFiles(basePath, "001"); - - String file2P0C1 = HoodieTestUtils.createNewDataFile(basePath, partitionPaths[0], "001"); // insert - String file2P1C1 = HoodieTestUtils.createNewDataFile(basePath, partitionPaths[1], "001"); // insert - HoodieTestUtils.createDataFile(basePath, partitionPaths[0], "001", file1P0C0); // update - HoodieTestUtils.createDataFile(basePath, partitionPaths[1], "001", file1P1C0); // update - metadata = new HoodieTableMetaClient(FSUtils.getFs(), basePath); - table = HoodieTable.getHoodieTable(metadata, config); - - cleaner = new HoodieCleaner(table, config); - assertEquals("Must clean 1 file" , 1, cleaner.clean(partitionPaths[0]).getSuccessDeleteFiles().size()); - assertEquals("Must clean 1 file" , 1, cleaner.clean(partitionPaths[1]).getSuccessDeleteFiles().size()); - assertTrue(HoodieTestUtils.doesDataFileExist(basePath, partitionPaths[0], "001", file2P0C1)); - assertTrue(HoodieTestUtils.doesDataFileExist(basePath, partitionPaths[1], "001", file2P1C1)); - assertFalse(HoodieTestUtils.doesDataFileExist(basePath, partitionPaths[0], "000", file1P0C0)); - assertFalse(HoodieTestUtils.doesDataFileExist(basePath, partitionPaths[1], "000", file1P1C0)); - - // make next commit, with 2 updates to existing files, and 1 insert - HoodieTestUtils.createCommitFiles(basePath, "002"); - - HoodieTestUtils.createDataFile(basePath, partitionPaths[0], "002", file1P0C0); // update - HoodieTestUtils.createDataFile(basePath, partitionPaths[0], "002", file2P0C1); // update - String file3P0C2 = HoodieTestUtils.createNewDataFile(basePath, partitionPaths[0], "002"); - metadata = new HoodieTableMetaClient(FSUtils.getFs(), basePath); - table = HoodieTable.getHoodieTable(metadata, config); - - cleaner = new HoodieCleaner(table, config); - assertEquals("Must clean two files" , 2, cleaner.clean(partitionPaths[0]).getSuccessDeleteFiles().size()); - assertFalse(HoodieTestUtils.doesDataFileExist(basePath, partitionPaths[0], "001", file1P0C0)); - assertFalse(HoodieTestUtils.doesDataFileExist(basePath, partitionPaths[0], "001", file2P0C1)); - assertTrue(HoodieTestUtils.doesDataFileExist(basePath, partitionPaths[0], "002", file3P0C2)); - - // No cleaning on partially written file, with no commit. - HoodieTestUtils.createDataFile(basePath, partitionPaths[0], "003", file3P0C2); // update - assertEquals("Must not clean any files" , 0, cleaner.clean(partitionPaths[0]).getSuccessDeleteFiles().size()); - assertTrue(HoodieTestUtils.doesDataFileExist(basePath, partitionPaths[0], "002", file3P0C2)); - } - - - @Test - public void testKeepLatestCommits() throws IOException { - HoodieWriteConfig config = HoodieWriteConfig.newBuilder().withPath(basePath) - .withCompactionConfig(HoodieCompactionConfig.newBuilder() - .withCleanerPolicy(HoodieCleaningPolicy.KEEP_LATEST_COMMITS) - .retainCommits(2).build()).build(); - - - // make 1 commit, with 1 file per partition - HoodieTestUtils.createCommitFiles(basePath, "000"); - - String file1P0C0 = HoodieTestUtils.createNewDataFile(basePath, partitionPaths[0], "000"); - String file1P1C0 = HoodieTestUtils.createNewDataFile(basePath, partitionPaths[1], "000"); - - HoodieTableMetaClient metadata = new HoodieTableMetaClient(FSUtils.getFs(), basePath); - HoodieTable table = HoodieTable.getHoodieTable(metadata, config); - - HoodieCleaner cleaner = new HoodieCleaner(table, config); - assertEquals("Must not clean any files" , 0, cleaner.clean(partitionPaths[0]).getSuccessDeleteFiles().size()); - assertEquals("Must not clean any files" , 0, cleaner.clean(partitionPaths[1]).getSuccessDeleteFiles().size()); - assertTrue(HoodieTestUtils.doesDataFileExist(basePath, partitionPaths[0], "000", file1P0C0)); - assertTrue(HoodieTestUtils.doesDataFileExist(basePath, partitionPaths[1], "000", file1P1C0)); - - // make next commit, with 1 insert & 1 update per partition - HoodieTestUtils.createCommitFiles(basePath, "001"); - - String file2P0C1 = HoodieTestUtils.createNewDataFile(basePath, partitionPaths[0], "001"); // insert - String file2P1C1 = HoodieTestUtils.createNewDataFile(basePath, partitionPaths[1], "001"); // insert - HoodieTestUtils.createDataFile(basePath, partitionPaths[0], "001", file1P0C0); // update - HoodieTestUtils.createDataFile(basePath, partitionPaths[1], "001", file1P1C0); // update - metadata = new HoodieTableMetaClient(FSUtils.getFs(), basePath); - table = HoodieTable.getHoodieTable(metadata, config); - - cleaner = new HoodieCleaner(table, config); - assertEquals("Must not clean any files" , 0, cleaner.clean(partitionPaths[0]).getSuccessDeleteFiles().size()); - assertEquals("Must not clean any files" , 0, cleaner.clean(partitionPaths[1]).getSuccessDeleteFiles().size()); - assertTrue(HoodieTestUtils.doesDataFileExist(basePath, partitionPaths[0], "001", file2P0C1)); - assertTrue(HoodieTestUtils.doesDataFileExist(basePath, partitionPaths[1], "001", file2P1C1)); - assertTrue(HoodieTestUtils.doesDataFileExist(basePath, partitionPaths[0], "000", file1P0C0)); - assertTrue(HoodieTestUtils.doesDataFileExist(basePath, partitionPaths[1], "000", file1P1C0)); - - // make next commit, with 2 updates to existing files, and 1 insert - HoodieTestUtils.createCommitFiles(basePath, "002"); - - HoodieTestUtils.createDataFile(basePath, partitionPaths[0], "002", file1P0C0); // update - HoodieTestUtils.createDataFile(basePath, partitionPaths[0], "002", file2P0C1); // update - String file3P0C2 = HoodieTestUtils.createNewDataFile(basePath, partitionPaths[0], "002"); - metadata = new HoodieTableMetaClient(FSUtils.getFs(), basePath); - table = HoodieTable.getHoodieTable(metadata, config); - - cleaner = new HoodieCleaner(table, config); - assertEquals( - "Must not clean any file. We have to keep 1 version before the latest commit time to keep", - 0, cleaner.clean(partitionPaths[0]).getSuccessDeleteFiles().size()); - - assertTrue(HoodieTestUtils.doesDataFileExist(basePath, partitionPaths[0], "000", file1P0C0)); - - // make next commit, with 2 updates to existing files, and 1 insert - HoodieTestUtils.createCommitFiles(basePath, "003"); - - HoodieTestUtils.createDataFile(basePath, partitionPaths[0], "003", file1P0C0); // update - HoodieTestUtils.createDataFile(basePath, partitionPaths[0], "003", file2P0C1); // update - String file4P0C3 = HoodieTestUtils.createNewDataFile(basePath, partitionPaths[0], "003"); - metadata = new HoodieTableMetaClient(FSUtils.getFs(), basePath); - table = HoodieTable.getHoodieTable(metadata, config); - - cleaner = new HoodieCleaner(table, config); - assertEquals( - "Must not clean one old file", 1, cleaner.clean(partitionPaths[0]).getSuccessDeleteFiles().size()); - - assertFalse(HoodieTestUtils.doesDataFileExist(basePath, partitionPaths[0], "000", file1P0C0)); - assertTrue(HoodieTestUtils.doesDataFileExist(basePath, partitionPaths[0], "001", file1P0C0)); - assertTrue(HoodieTestUtils.doesDataFileExist(basePath, partitionPaths[0], "002", file1P0C0)); - assertTrue(HoodieTestUtils.doesDataFileExist(basePath, partitionPaths[0], "001", file2P0C1)); - assertTrue(HoodieTestUtils.doesDataFileExist(basePath, partitionPaths[0], "002", file2P0C1)); - assertTrue(HoodieTestUtils.doesDataFileExist(basePath, partitionPaths[0], "002", file3P0C2)); - assertTrue(HoodieTestUtils.doesDataFileExist(basePath, partitionPaths[0], "003", file4P0C3)); - - // No cleaning on partially written file, with no commit. - HoodieTestUtils.createDataFile(basePath, partitionPaths[0], "004", file3P0C2); // update - assertEquals("Must not clean any files" , 0, cleaner.clean(partitionPaths[0]).getSuccessDeleteFiles().size()); - assertTrue(HoodieTestUtils.doesDataFileExist(basePath, partitionPaths[0], "001", file1P0C0)); - assertTrue(HoodieTestUtils.doesDataFileExist(basePath, partitionPaths[0], "001", file2P0C1)); - } -} diff --git a/hoodie-common/src/main/java/com/uber/hoodie/common/HoodieCleanStat.java b/hoodie-common/src/main/java/com/uber/hoodie/common/HoodieCleanStat.java index f0e2a9bdf..ee3177f1d 100644 --- a/hoodie-common/src/main/java/com/uber/hoodie/common/HoodieCleanStat.java +++ b/hoodie-common/src/main/java/com/uber/hoodie/common/HoodieCleanStat.java @@ -18,13 +18,10 @@ package com.uber.hoodie.common; import com.uber.hoodie.common.model.HoodieCleaningPolicy; import com.uber.hoodie.common.table.timeline.HoodieInstant; -import org.apache.hadoop.fs.FileStatus; import java.io.Serializable; import java.util.List; -import java.util.Map; import java.util.Optional; -import java.util.stream.Collectors; /** * Collects stats about a single partition clean operation @@ -99,12 +96,13 @@ public class HoodieCleanStat implements Serializable { return this; } - public Builder withDeletedFileResults(Map deletedFiles) { - //noinspection Convert2MethodRef - successDeleteFiles = deletedFiles.entrySet().stream().filter(s -> s.getValue()) - .map(s -> s.getKey().getPath().toString()).collect(Collectors.toList()); - failedDeleteFiles = deletedFiles.entrySet().stream().filter(s -> !s.getValue()) - .map(s -> s.getKey().getPath().toString()).collect(Collectors.toList()); + public Builder withSuccessfulDeletes(List successDeleteFiles) { + this.successDeleteFiles = successDeleteFiles; + return this; + } + + public Builder withFailedDeletes(List failedDeleteFiles) { + this.failedDeleteFiles= failedDeleteFiles; return this; }