1
0

[HUDI-80] Leverage Commit metadata to figure out partitions to be cleaned for Cleaning by commits mode (#1008)

This commit is contained in:
Balaji Varadarajan
2019-11-12 06:12:44 -08:00
committed by vinoth chandar
parent 0bb5999f79
commit 8ff06ddb0f
9 changed files with 167 additions and 24 deletions

View File

@@ -167,7 +167,7 @@ public class HoodieCleanClient<T extends HoodieRecordPayload> extends AbstractHo
// Create the metadata and save it
HoodieCleanMetadata metadata =
AvroUtils.convertCleanMetadata(cleanInstant.getTimestamp(), durationInMs, cleanStats);
logger.info("Cleaned " + metadata.getTotalFilesDeleted() + " files");
logger.info("Cleaned " + metadata.getTotalFilesDeleted() + " files. Earliest Retained :" + metadata.getEarliestCommitToRetain());
metrics.updateCleanMetrics(durationInMs.orElseGet(() -> -1L), metadata.getTotalFilesDeleted());
table.getActiveTimeline().transitionCleanInflightToComplete(

View File

@@ -43,6 +43,7 @@ public class HoodieCompactionConfig extends DefaultHoodieConfig {
public static final String INLINE_COMPACT_NUM_DELTA_COMMITS_PROP = "hoodie.compact.inline.max" + ".delta.commits";
public static final String CLEANER_FILE_VERSIONS_RETAINED_PROP = "hoodie.cleaner.fileversions" + ".retained";
public static final String CLEANER_COMMITS_RETAINED_PROP = "hoodie.cleaner.commits.retained";
public static final String CLEANER_INCREMENTAL_MODE = "hoodie.cleaner.incremental.mode";
public static final String MAX_COMMITS_TO_KEEP_PROP = "hoodie.keep.max.commits";
public static final String MIN_COMMITS_TO_KEEP_PROP = "hoodie.keep.min.commits";
public static final String COMMITS_ARCHIVAL_BATCH_SIZE_PROP = "hoodie.commits.archival.batch";
@@ -92,6 +93,7 @@ public class HoodieCompactionConfig extends DefaultHoodieConfig {
private static final String DEFAULT_CLEANER_POLICY = HoodieCleaningPolicy.KEEP_LATEST_COMMITS.name();
private static final String DEFAULT_AUTO_CLEAN = "true";
private static final String DEFAULT_INLINE_COMPACT = "false";
private static final String DEFAULT_INCREMENTAL_CLEANER = "false";
private static final String DEFAULT_INLINE_COMPACT_NUM_DELTA_COMMITS = "1";
private static final String DEFAULT_CLEANER_FILE_VERSIONS_RETAINED = "3";
private static final String DEFAULT_CLEANER_COMMITS_RETAINED = "10";
@@ -136,6 +138,11 @@ public class HoodieCompactionConfig extends DefaultHoodieConfig {
return this;
}
public Builder withIncrementalCleaningMode(Boolean incrementalCleaningMode) {
props.setProperty(CLEANER_INCREMENTAL_MODE, String.valueOf(incrementalCleaningMode));
return this;
}
public Builder withInlineCompaction(Boolean inlineCompaction) {
props.setProperty(INLINE_COMPACT_PROP, String.valueOf(inlineCompaction));
return this;
@@ -235,6 +242,8 @@ public class HoodieCompactionConfig extends DefaultHoodieConfig {
public HoodieCompactionConfig build() {
HoodieCompactionConfig config = new HoodieCompactionConfig(props);
setDefaultOnCondition(props, !props.containsKey(AUTO_CLEAN_PROP), AUTO_CLEAN_PROP, DEFAULT_AUTO_CLEAN);
setDefaultOnCondition(props, !props.containsKey(CLEANER_INCREMENTAL_MODE), CLEANER_INCREMENTAL_MODE,
DEFAULT_INCREMENTAL_CLEANER);
setDefaultOnCondition(props, !props.containsKey(INLINE_COMPACT_PROP), INLINE_COMPACT_PROP,
DEFAULT_INLINE_COMPACT);
setDefaultOnCondition(props, !props.containsKey(INLINE_COMPACT_NUM_DELTA_COMMITS_PROP),

View File

@@ -239,6 +239,10 @@ public class HoodieWriteConfig extends DefaultHoodieConfig {
return Boolean.parseBoolean(props.getProperty(HoodieCompactionConfig.AUTO_CLEAN_PROP));
}
public boolean incrementalCleanerModeEnabled() {
return Boolean.parseBoolean(props.getProperty(HoodieCompactionConfig.CLEANER_INCREMENTAL_MODE));
}
public boolean isInlineCompaction() {
return Boolean.parseBoolean(props.getProperty(HoodieCompactionConfig.INLINE_COMPACT_PROP));
}

View File

@@ -25,6 +25,7 @@ import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import org.apache.hudi.avro.model.HoodieCleanMetadata;
import org.apache.hudi.common.model.CompactionOperation;
import org.apache.hudi.common.model.FileSlice;
import org.apache.hudi.common.model.HoodieCleaningPolicy;
@@ -37,9 +38,12 @@ import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.table.HoodieTimeline;
import org.apache.hudi.common.table.SyncableFileSystemView;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.util.AvroUtils;
import org.apache.hudi.common.util.FSUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.table.HoodieTable;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
@@ -76,6 +80,46 @@ public class HoodieCleanHelper<T extends HoodieRecordPayload<T>> implements Seri
.collect(Collectors.toMap(Pair::getKey, Pair::getValue));
}
/**
* Returns list of partitions where clean operations needs to be performed
* @param newInstantToRetain New instant to be retained after this cleanup operation
* @return list of partitions to scan for cleaning
* @throws IOException when underlying file-system throws this exception
*/
public List<String> getPartitionPathsToClean(Option<HoodieInstant> newInstantToRetain) throws IOException {
if (config.incrementalCleanerModeEnabled() && newInstantToRetain.isPresent()
&& (HoodieCleaningPolicy.KEEP_LATEST_COMMITS == config.getCleanerPolicy())) {
Option<HoodieInstant> lastClean =
hoodieTable.getCleanTimeline().filterCompletedInstants().lastInstant();
if (lastClean.isPresent()) {
HoodieCleanMetadata cleanMetadata = AvroUtils
.deserializeHoodieCleanMetadata(hoodieTable.getActiveTimeline().getInstantDetails(lastClean.get()).get());
if ((cleanMetadata.getEarliestCommitToRetain() != null)
&& (cleanMetadata.getEarliestCommitToRetain().length() > 0)) {
logger.warn("Incremental Cleaning mode is enabled. Looking up partition-paths that have since changed "
+ "since last cleaned at " + cleanMetadata.getEarliestCommitToRetain()
+ ". New Instant to retain : " + newInstantToRetain);
return hoodieTable.getCompletedCommitsTimeline().getInstants().filter(instant -> {
return HoodieTimeline.compareTimestamps(instant.getTimestamp(), cleanMetadata.getEarliestCommitToRetain(),
HoodieTimeline.GREATER_OR_EQUAL) && HoodieTimeline.compareTimestamps(instant.getTimestamp(),
newInstantToRetain.get().getTimestamp(), HoodieTimeline.LESSER);
}).flatMap(instant -> {
try {
HoodieCommitMetadata commitMetadata = HoodieCommitMetadata.fromBytes(
hoodieTable.getActiveTimeline().getInstantDetails(instant).get(), HoodieCommitMetadata.class);
return commitMetadata.getPartitionToWriteStats().keySet().stream();
} catch (IOException e) {
throw new HoodieIOException(e.getMessage(), e);
}
}).distinct().collect(Collectors.toList());
}
}
}
// Otherwise go to brute force mode of scanning all partitions
return FSUtils.getAllPartitionPaths(hoodieTable.getMetaClient().getFs(),
hoodieTable.getMetaClient().getBasePath(), config.shouldAssumeDatePartitioning());
}
/**
* Selects the older versions of files for cleaning, such that it bounds the number of versions of each file. This
* policy is useful, if you are simply interested in querying the table, and you don't want too many versions for a

View File

@@ -278,10 +278,11 @@ public class HoodieCopyOnWriteTable<T extends HoodieRecordPayload> extends Hoodi
*/
public HoodieCleanerPlan scheduleClean(JavaSparkContext jsc) {
try {
FileSystem fs = getMetaClient().getFs();
HoodieCleanHelper cleaner = new HoodieCleanHelper(this, config);
Option<HoodieInstant> earliestInstant = cleaner.getEarliestCommitToRetain();
List<String> partitionsToClean = cleaner.getPartitionPathsToClean(earliestInstant);
List<String> partitionsToClean =
FSUtils.getAllPartitionPaths(fs, getMetaClient().getBasePath(), config.shouldAssumeDatePartitioning());
if (partitionsToClean.isEmpty()) {
logger.info("Nothing to clean here. It is already clean");
return HoodieCleanerPlan.newBuilder().setPolicy(HoodieCleaningPolicy.KEEP_LATEST_COMMITS.name()).build();
@@ -290,8 +291,6 @@ public class HoodieCopyOnWriteTable<T extends HoodieRecordPayload> extends Hoodi
"Total Partitions to clean : " + partitionsToClean.size() + ", with policy " + config.getCleanerPolicy());
int cleanerParallelism = Math.min(partitionsToClean.size(), config.getCleanerParallelism());
logger.info("Using cleanerParallelism: " + cleanerParallelism);
HoodieCleanHelper cleaner = new HoodieCleanHelper(this, config);
Option<HoodieInstant> earliestInstant = cleaner.getEarliestCommitToRetain();
Map<String, List<String>> cleanOps = jsc.parallelize(partitionsToClean, cleanerParallelism)
.map(partitionPathToClean -> Pair.of(partitionPathToClean, cleaner.getDeletePaths(partitionPathToClean)))