1
0

[HUDI-2683] Parallelize deleting archived hoodie commits (#3920)

Co-authored-by: yuezhang <yuezhang@freewheel.tv>
This commit is contained in:
zhangyue19921010
2021-11-15 22:36:54 +08:00
committed by GitHub
parent 53d2d6ae24
commit 38b6934352
4 changed files with 61 additions and 19 deletions

View File

@@ -119,6 +119,11 @@ public class HoodieCompactionConfig extends HoodieConfig {
+ " keep the metadata overhead constant, even as the table size grows." + " keep the metadata overhead constant, even as the table size grows."
+ "This config controls the maximum number of instants to retain in the active timeline. "); + "This config controls the maximum number of instants to retain in the active timeline. ");
public static final ConfigProperty<Integer> DELETE_ARCHIVED_INSTANT_PARALLELISM_VALUE = ConfigProperty
.key("hoodie.archive.delete.parallelism")
.defaultValue(100)
.withDocumentation("Parallelism for deleting archived hoodie commits.");
public static final ConfigProperty<String> MIN_COMMITS_TO_KEEP = ConfigProperty public static final ConfigProperty<String> MIN_COMMITS_TO_KEEP = ConfigProperty
.key("hoodie.keep.min.commits") .key("hoodie.keep.min.commits")
.defaultValue("20") .defaultValue("20")
@@ -568,6 +573,11 @@ public class HoodieCompactionConfig extends HoodieConfig {
return this; return this;
} }
public Builder withArchiveDeleteParallelism(int archiveDeleteParallelism) {
compactionConfig.setValue(DELETE_ARCHIVED_INSTANT_PARALLELISM_VALUE, String.valueOf(archiveDeleteParallelism));
return this;
}
public Builder withMaxDeltaSecondsBeforeCompaction(int maxDeltaSecondsBeforeCompaction) { public Builder withMaxDeltaSecondsBeforeCompaction(int maxDeltaSecondsBeforeCompaction) {
compactionConfig.setValue(INLINE_COMPACT_TIME_DELTA_SECONDS, String.valueOf(maxDeltaSecondsBeforeCompaction)); compactionConfig.setValue(INLINE_COMPACT_TIME_DELTA_SECONDS, String.valueOf(maxDeltaSecondsBeforeCompaction));
return this; return this;

View File

@@ -1135,6 +1135,10 @@ public class HoodieWriteConfig extends HoodieConfig {
return getBoolean(HoodieCompactionConfig.COMPACTION_REVERSE_LOG_READ_ENABLE); return getBoolean(HoodieCompactionConfig.COMPACTION_REVERSE_LOG_READ_ENABLE);
} }
public int getArchiveDeleteParallelism() {
return getInt(HoodieCompactionConfig.DELETE_ARCHIVED_INSTANT_PARALLELISM_VALUE);
}
public boolean inlineClusteringEnabled() { public boolean inlineClusteringEnabled() {
return getBoolean(HoodieClusteringConfig.INLINE_CLUSTERING); return getBoolean(HoodieClusteringConfig.INLINE_CLUSTERING);
} }

View File

@@ -18,9 +18,11 @@
package org.apache.hudi.table; package org.apache.hudi.table;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hudi.avro.model.HoodieArchivedMetaEntry; import org.apache.hudi.avro.model.HoodieArchivedMetaEntry;
import org.apache.hudi.client.utils.MetadataConversionUtils; import org.apache.hudi.client.utils.MetadataConversionUtils;
import org.apache.hudi.common.engine.HoodieEngineContext; import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.HoodieArchivedLogFile; import org.apache.hudi.common.model.HoodieArchivedLogFile;
import org.apache.hudi.common.model.HoodieAvroPayload; import org.apache.hudi.common.model.HoodieAvroPayload;
import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy; import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy;
@@ -127,7 +129,7 @@ public class HoodieTimelineArchiveLog<T extends HoodieAvroPayload, I, K, O> {
LOG.info("Archiving instants " + instantsToArchive); LOG.info("Archiving instants " + instantsToArchive);
archive(context, instantsToArchive); archive(context, instantsToArchive);
LOG.info("Deleting archived instants " + instantsToArchive); LOG.info("Deleting archived instants " + instantsToArchive);
success = deleteArchivedInstants(instantsToArchive); success = deleteArchivedInstants(instantsToArchive, context);
} else { } else {
LOG.info("No Instants to archive"); LOG.info("No Instants to archive");
} }
@@ -224,19 +226,34 @@ public class HoodieTimelineArchiveLog<T extends HoodieAvroPayload, I, K, O> {
HoodieInstant.getComparableAction(hoodieInstant.getAction()))).stream()); HoodieInstant.getComparableAction(hoodieInstant.getAction()))).stream());
} }
private boolean deleteArchivedInstants(List<HoodieInstant> archivedInstants) throws IOException { private boolean deleteArchivedInstants(List<HoodieInstant> archivedInstants, HoodieEngineContext context) throws IOException {
LOG.info("Deleting instants " + archivedInstants); LOG.info("Deleting instants " + archivedInstants);
boolean success = true; boolean success = true;
for (HoodieInstant archivedInstant : archivedInstants) { List<String> instantFiles = archivedInstants.stream().map(archivedInstant -> {
Path commitFile = new Path(metaClient.getMetaPath(), archivedInstant.getFileName()); return new Path(metaClient.getMetaPath(), archivedInstant.getFileName());
try { }).map(Path::toString).collect(Collectors.toList());
if (metaClient.getFs().exists(commitFile)) {
success &= metaClient.getFs().delete(commitFile, false); context.setJobStatus(this.getClass().getSimpleName(), "Delete archived instants");
LOG.info("Archived and deleted instant file " + commitFile); Map<String, Boolean> resultDeleteInstantFiles = FSUtils.parallelizeFilesProcess(context,
} metaClient.getFs(),
} catch (IOException e) { config.getArchiveDeleteParallelism(),
throw new HoodieIOException("Failed to delete archived instant " + archivedInstant, e); pairOfSubPathAndConf -> {
} Path commitFile = new Path(pairOfSubPathAndConf.getKey());
try {
FileSystem fs = commitFile.getFileSystem(pairOfSubPathAndConf.getValue().get());
if (fs.exists(commitFile)) {
return fs.delete(commitFile, false);
}
return true;
} catch (IOException e) {
throw new HoodieIOException("Failed to delete archived instant " + commitFile, e);
}
},
instantFiles);
for (Map.Entry<String, Boolean> result : resultDeleteInstantFiles.entrySet()) {
LOG.info("Archived and deleted instant file " + result.getKey() + " : " + result.getValue());
success &= result.getValue();
} }
// Remove older meta-data from auxiliary path too // Remove older meta-data from auxiliary path too

View File

@@ -670,19 +670,30 @@ public class FSUtils {
.filter(subPathPredicate) .filter(subPathPredicate)
.map(fileStatus -> fileStatus.getPath().toString()) .map(fileStatus -> fileStatus.getPath().toString())
.collect(Collectors.toList()); .collect(Collectors.toList());
if (subPaths.size() > 0) { result = parallelizeFilesProcess(hoodieEngineContext, fs, parallelism, pairFunction, subPaths);
SerializableConfiguration conf = new SerializableConfiguration(fs.getConf());
int actualParallelism = Math.min(subPaths.size(), parallelism);
result = hoodieEngineContext.mapToPair(subPaths,
subPath -> new ImmutablePair<>(subPath, pairFunction.apply(new ImmutablePair<>(subPath, conf))),
actualParallelism);
}
} catch (IOException ioe) { } catch (IOException ioe) {
throw new HoodieIOException(ioe.getMessage(), ioe); throw new HoodieIOException(ioe.getMessage(), ioe);
} }
return result; return result;
} }
public static <T> Map<String, T> parallelizeFilesProcess(
HoodieEngineContext hoodieEngineContext,
FileSystem fs,
int parallelism,
SerializableFunction<Pair<String, SerializableConfiguration>, T> pairFunction,
List<String> subPaths) {
Map<String, T> result = new HashMap<>();
if (subPaths.size() > 0) {
SerializableConfiguration conf = new SerializableConfiguration(fs.getConf());
int actualParallelism = Math.min(subPaths.size(), parallelism);
result = hoodieEngineContext.mapToPair(subPaths,
subPath -> new ImmutablePair<>(subPath, pairFunction.apply(new ImmutablePair<>(subPath, conf))),
actualParallelism);
}
return result;
}
/** /**
* Deletes a sub-path. * Deletes a sub-path.
* *