[HUDI-850] Avoid unnecessary listings in incremental cleaning mode (#1576)
This commit is contained in:
committed by
GitHub
parent
c4b71622b9
commit
506447fd4f
@@ -39,7 +39,6 @@ import org.apache.hudi.common.util.Option;
|
|||||||
import org.apache.hudi.common.util.collection.Pair;
|
import org.apache.hudi.common.util.collection.Pair;
|
||||||
import org.apache.hudi.config.HoodieWriteConfig;
|
import org.apache.hudi.config.HoodieWriteConfig;
|
||||||
import org.apache.hudi.exception.HoodieIOException;
|
import org.apache.hudi.exception.HoodieIOException;
|
||||||
|
|
||||||
import org.apache.hudi.exception.HoodieSavepointException;
|
import org.apache.hudi.exception.HoodieSavepointException;
|
||||||
import org.apache.hudi.table.HoodieTable;
|
import org.apache.hudi.table.HoodieTable;
|
||||||
import org.apache.log4j.LogManager;
|
import org.apache.log4j.LogManager;
|
||||||
@@ -48,6 +47,7 @@ import org.apache.log4j.Logger;
|
|||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.io.Serializable;
|
import java.io.Serializable;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
|
import java.util.Collections;
|
||||||
import java.util.Iterator;
|
import java.util.Iterator;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
@@ -111,36 +111,77 @@ public class CleanPlanner<T extends HoodieRecordPayload<T>> implements Serializa
|
|||||||
* @throws IOException when underlying file-system throws this exception
|
* @throws IOException when underlying file-system throws this exception
|
||||||
*/
|
*/
|
||||||
public List<String> getPartitionPathsToClean(Option<HoodieInstant> newInstantToRetain) throws IOException {
|
public List<String> getPartitionPathsToClean(Option<HoodieInstant> newInstantToRetain) throws IOException {
|
||||||
if (config.incrementalCleanerModeEnabled() && newInstantToRetain.isPresent()
|
switch (config.getCleanerPolicy()) {
|
||||||
&& (HoodieCleaningPolicy.KEEP_LATEST_COMMITS == config.getCleanerPolicy())) {
|
case KEEP_LATEST_COMMITS:
|
||||||
Option<HoodieInstant> lastClean =
|
return getPartitionPathsForCleanByCommits(newInstantToRetain);
|
||||||
hoodieTable.getCleanTimeline().filterCompletedInstants().lastInstant();
|
case KEEP_LATEST_FILE_VERSIONS:
|
||||||
|
return getPartitionPathsForFullCleaning();
|
||||||
|
default:
|
||||||
|
throw new IllegalStateException("Unknown Cleaner Policy");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Return partition paths for cleaning by commits mode.
|
||||||
|
* @param instantToRetain Earliest Instant to retain
|
||||||
|
* @return list of partitions
|
||||||
|
* @throws IOException
|
||||||
|
*/
|
||||||
|
private List<String> getPartitionPathsForCleanByCommits(Option<HoodieInstant> instantToRetain) throws IOException {
|
||||||
|
if (!instantToRetain.isPresent()) {
|
||||||
|
LOG.info("No earliest commit to retain. No need to scan partitions !!");
|
||||||
|
return Collections.emptyList();
|
||||||
|
}
|
||||||
|
|
||||||
|
if (config.incrementalCleanerModeEnabled()) {
|
||||||
|
Option<HoodieInstant> lastClean = hoodieTable.getCleanTimeline().filterCompletedInstants().lastInstant();
|
||||||
if (lastClean.isPresent()) {
|
if (lastClean.isPresent()) {
|
||||||
HoodieCleanMetadata cleanMetadata = TimelineMetadataUtils
|
HoodieCleanMetadata cleanMetadata = TimelineMetadataUtils
|
||||||
.deserializeHoodieCleanMetadata(hoodieTable.getActiveTimeline().getInstantDetails(lastClean.get()).get());
|
.deserializeHoodieCleanMetadata(hoodieTable.getActiveTimeline().getInstantDetails(lastClean.get()).get());
|
||||||
if ((cleanMetadata.getEarliestCommitToRetain() != null)
|
if ((cleanMetadata.getEarliestCommitToRetain() != null)
|
||||||
&& (cleanMetadata.getEarliestCommitToRetain().length() > 0)) {
|
&& (cleanMetadata.getEarliestCommitToRetain().length() > 0)) {
|
||||||
|
return getPartitionPathsForIncrementalCleaning(cleanMetadata, instantToRetain);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return getPartitionPathsForFullCleaning();
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Use Incremental Mode for finding partition paths.
|
||||||
|
* @param cleanMetadata
|
||||||
|
* @param newInstantToRetain
|
||||||
|
* @return
|
||||||
|
*/
|
||||||
|
private List<String> getPartitionPathsForIncrementalCleaning(HoodieCleanMetadata cleanMetadata,
|
||||||
|
Option<HoodieInstant> newInstantToRetain) {
|
||||||
LOG.warn("Incremental Cleaning mode is enabled. Looking up partition-paths that have since changed "
|
LOG.warn("Incremental Cleaning mode is enabled. Looking up partition-paths that have since changed "
|
||||||
+ "since last cleaned at " + cleanMetadata.getEarliestCommitToRetain()
|
+ "since last cleaned at " + cleanMetadata.getEarliestCommitToRetain()
|
||||||
+ ". New Instant to retain : " + newInstantToRetain);
|
+ ". New Instant to retain : " + newInstantToRetain);
|
||||||
return hoodieTable.getCompletedCommitsTimeline().getInstants()
|
return hoodieTable.getCompletedCommitsTimeline().getInstants().filter(
|
||||||
.filter(instant ->
|
instant -> HoodieTimeline.compareTimestamps(instant.getTimestamp(), HoodieTimeline.GREATER_THAN_OR_EQUALS,
|
||||||
HoodieTimeline.compareTimestamps(instant.getTimestamp(), HoodieTimeline.GREATER_THAN_OR_EQUALS, cleanMetadata.getEarliestCommitToRetain())
|
cleanMetadata.getEarliestCommitToRetain()) && HoodieTimeline.compareTimestamps(instant.getTimestamp(),
|
||||||
&& HoodieTimeline.compareTimestamps(instant.getTimestamp(), HoodieTimeline.LESSER_THAN, newInstantToRetain.get().getTimestamp())
|
HoodieTimeline.LESSER_THAN, newInstantToRetain.get().getTimestamp())).flatMap(instant -> {
|
||||||
).flatMap(instant -> {
|
|
||||||
try {
|
try {
|
||||||
HoodieCommitMetadata commitMetadata = HoodieCommitMetadata.fromBytes(hoodieTable.getActiveTimeline().getInstantDetails(instant).get(), HoodieCommitMetadata.class);
|
HoodieCommitMetadata commitMetadata = HoodieCommitMetadata
|
||||||
|
.fromBytes(hoodieTable.getActiveTimeline().getInstantDetails(instant).get(),
|
||||||
|
HoodieCommitMetadata.class);
|
||||||
return commitMetadata.getPartitionToWriteStats().keySet().stream();
|
return commitMetadata.getPartitionToWriteStats().keySet().stream();
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
throw new HoodieIOException(e.getMessage(), e);
|
throw new HoodieIOException(e.getMessage(), e);
|
||||||
}
|
}
|
||||||
}).distinct().collect(Collectors.toList());
|
}).distinct().collect(Collectors.toList());
|
||||||
}
|
}
|
||||||
}
|
|
||||||
}
|
/**
|
||||||
// Otherwise go to brute force mode of scanning all partitions
|
* Scan and list all paritions for cleaning.
|
||||||
return FSUtils.getAllPartitionPaths(hoodieTable.getMetaClient().getFs(),
|
* @return all partitions paths for the dataset.
|
||||||
hoodieTable.getMetaClient().getBasePath(), config.shouldAssumeDatePartitioning());
|
* @throws IOException
|
||||||
|
*/
|
||||||
|
private List<String> getPartitionPathsForFullCleaning() throws IOException {
|
||||||
|
// Go to brute force mode of scanning all partitions
|
||||||
|
return FSUtils.getAllPartitionPaths(hoodieTable.getMetaClient().getFs(), hoodieTable.getMetaClient().getBasePath(),
|
||||||
|
config.shouldAssumeDatePartitioning());
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|||||||
@@ -104,7 +104,8 @@ public class TestCleaner extends TestHoodieClientBase {
|
|||||||
*/
|
*/
|
||||||
private void insertFirstBigBatchForClientCleanerTest(HoodieWriteConfig cfg, HoodieWriteClient client,
|
private void insertFirstBigBatchForClientCleanerTest(HoodieWriteConfig cfg, HoodieWriteClient client,
|
||||||
Function2<List<HoodieRecord>, String, Integer> recordGenFunction,
|
Function2<List<HoodieRecord>, String, Integer> recordGenFunction,
|
||||||
Function3<JavaRDD<WriteStatus>, HoodieWriteClient, JavaRDD<HoodieRecord>, String> insertFn) throws Exception {
|
Function3<JavaRDD<WriteStatus>, HoodieWriteClient, JavaRDD<HoodieRecord>, String> insertFn,
|
||||||
|
HoodieCleaningPolicy cleaningPolicy) throws Exception {
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* do a big insert (this is basically same as insert part of upsert, just adding it here so we can catch breakages
|
* do a big insert (this is basically same as insert part of upsert, just adding it here so we can catch breakages
|
||||||
@@ -127,11 +128,15 @@ public class TestCleaner extends TestHoodieClientBase {
|
|||||||
HoodieTable table = HoodieTable.create(metaClient, client.getConfig(), jsc);
|
HoodieTable table = HoodieTable.create(metaClient, client.getConfig(), jsc);
|
||||||
|
|
||||||
assertFalse(table.getCompletedCommitsTimeline().empty());
|
assertFalse(table.getCompletedCommitsTimeline().empty());
|
||||||
|
if (cleaningPolicy.equals(HoodieCleaningPolicy.KEEP_LATEST_COMMITS)) {
|
||||||
|
// We no longer write empty cleaner plans when there are not enough commits present
|
||||||
|
assertTrue(table.getCompletedCleanTimeline().empty());
|
||||||
|
} else {
|
||||||
String instantTime = table.getCompletedCommitsTimeline().getInstants().findFirst().get().getTimestamp();
|
String instantTime = table.getCompletedCommitsTimeline().getInstants().findFirst().get().getTimestamp();
|
||||||
assertFalse(table.getCompletedCleanTimeline().empty());
|
assertFalse(table.getCompletedCleanTimeline().empty());
|
||||||
assertEquals(instantTime,
|
assertEquals(instantTime, table.getCompletedCleanTimeline().getInstants().findFirst().get().getTimestamp(),
|
||||||
table.getCompletedCleanTimeline().getInstants().findFirst().get().getTimestamp(),
|
|
||||||
"The clean instant should be the same as the commit instant");
|
"The clean instant should be the same as the commit instant");
|
||||||
|
}
|
||||||
|
|
||||||
HoodieIndex index = HoodieIndex.createIndex(cfg, jsc);
|
HoodieIndex index = HoodieIndex.createIndex(cfg, jsc);
|
||||||
List<HoodieRecord> taggedRecords = index.tagLocation(jsc.parallelize(records, 1), jsc, table).collect();
|
List<HoodieRecord> taggedRecords = index.tagLocation(jsc.parallelize(records, 1), jsc, table).collect();
|
||||||
@@ -201,7 +206,8 @@ public class TestCleaner extends TestHoodieClientBase {
|
|||||||
final Function2<List<HoodieRecord>, String, Integer> recordUpsertGenWrappedFunction =
|
final Function2<List<HoodieRecord>, String, Integer> recordUpsertGenWrappedFunction =
|
||||||
generateWrapRecordsFn(isPreppedAPI, cfg, dataGen::generateUniqueUpdates);
|
generateWrapRecordsFn(isPreppedAPI, cfg, dataGen::generateUniqueUpdates);
|
||||||
|
|
||||||
insertFirstBigBatchForClientCleanerTest(cfg, client, recordInsertGenWrappedFunction, insertFn);
|
insertFirstBigBatchForClientCleanerTest(cfg, client, recordInsertGenWrappedFunction, insertFn,
|
||||||
|
HoodieCleaningPolicy.KEEP_LATEST_FILE_VERSIONS);
|
||||||
|
|
||||||
Map<HoodieFileGroupId, FileSlice> compactionFileIdToLatestFileSlice = new HashMap<>();
|
Map<HoodieFileGroupId, FileSlice> compactionFileIdToLatestFileSlice = new HashMap<>();
|
||||||
metaClient = HoodieTableMetaClient.reload(metaClient);
|
metaClient = HoodieTableMetaClient.reload(metaClient);
|
||||||
@@ -349,7 +355,7 @@ public class TestCleaner extends TestHoodieClientBase {
|
|||||||
int maxCommits = 3; // keep upto 3 commits from the past
|
int maxCommits = 3; // keep upto 3 commits from the past
|
||||||
HoodieWriteConfig cfg = getConfigBuilder()
|
HoodieWriteConfig cfg = getConfigBuilder()
|
||||||
.withCompactionConfig(HoodieCompactionConfig.newBuilder()
|
.withCompactionConfig(HoodieCompactionConfig.newBuilder()
|
||||||
.withCleanerPolicy(HoodieCleaningPolicy.KEEP_LATEST_FILE_VERSIONS).retainCommits(maxCommits).build())
|
.withCleanerPolicy(HoodieCleaningPolicy.KEEP_LATEST_COMMITS).retainCommits(maxCommits).build())
|
||||||
.withParallelism(1, 1).withBulkInsertParallelism(1).withFinalizeWriteParallelism(1)
|
.withParallelism(1, 1).withBulkInsertParallelism(1).withFinalizeWriteParallelism(1)
|
||||||
.withConsistencyGuardConfig(ConsistencyGuardConfig.newBuilder().withConsistencyCheckEnabled(true).build())
|
.withConsistencyGuardConfig(ConsistencyGuardConfig.newBuilder().withConsistencyCheckEnabled(true).build())
|
||||||
.build();
|
.build();
|
||||||
@@ -361,7 +367,8 @@ public class TestCleaner extends TestHoodieClientBase {
|
|||||||
final Function2<List<HoodieRecord>, String, Integer> recordUpsertGenWrappedFunction =
|
final Function2<List<HoodieRecord>, String, Integer> recordUpsertGenWrappedFunction =
|
||||||
generateWrapRecordsFn(isPreppedAPI, cfg, dataGen::generateUniqueUpdates);
|
generateWrapRecordsFn(isPreppedAPI, cfg, dataGen::generateUniqueUpdates);
|
||||||
|
|
||||||
insertFirstBigBatchForClientCleanerTest(cfg, client, recordInsertGenWrappedFunction, insertFn);
|
insertFirstBigBatchForClientCleanerTest(cfg, client, recordInsertGenWrappedFunction, insertFn,
|
||||||
|
HoodieCleaningPolicy.KEEP_LATEST_COMMITS);
|
||||||
|
|
||||||
// Keep doing some writes and clean inline. Make sure we have expected number of files remaining.
|
// Keep doing some writes and clean inline. Make sure we have expected number of files remaining.
|
||||||
HoodieTestUtils.monotonicIncreasingCommitTimestamps(8, 1).forEach(newCommitTime -> {
|
HoodieTestUtils.monotonicIncreasingCommitTimestamps(8, 1).forEach(newCommitTime -> {
|
||||||
@@ -376,7 +383,9 @@ public class TestCleaner extends TestHoodieClientBase {
|
|||||||
metaClient = HoodieTableMetaClient.reload(metaClient);
|
metaClient = HoodieTableMetaClient.reload(metaClient);
|
||||||
HoodieTable table1 = HoodieTable.create(metaClient, cfg, jsc);
|
HoodieTable table1 = HoodieTable.create(metaClient, cfg, jsc);
|
||||||
HoodieTimeline activeTimeline = table1.getCompletedCommitsTimeline();
|
HoodieTimeline activeTimeline = table1.getCompletedCommitsTimeline();
|
||||||
Option<HoodieInstant> earliestRetainedCommit = activeTimeline.nthFromLastInstant(maxCommits - 1);
|
// NOTE: See CleanPlanner#getFilesToCleanKeepingLatestCommits. We explicitly keep one commit before earliest
|
||||||
|
// commit
|
||||||
|
Option<HoodieInstant> earliestRetainedCommit = activeTimeline.nthFromLastInstant(maxCommits);
|
||||||
Set<HoodieInstant> acceptableCommits = activeTimeline.getInstants().collect(Collectors.toSet());
|
Set<HoodieInstant> acceptableCommits = activeTimeline.getInstants().collect(Collectors.toSet());
|
||||||
if (earliestRetainedCommit.isPresent()) {
|
if (earliestRetainedCommit.isPresent()) {
|
||||||
acceptableCommits
|
acceptableCommits
|
||||||
@@ -751,12 +760,7 @@ public class TestCleaner extends TestHoodieClientBase {
|
|||||||
metaClient = HoodieTableMetaClient.reload(metaClient);
|
metaClient = HoodieTableMetaClient.reload(metaClient);
|
||||||
|
|
||||||
List<HoodieCleanStat> hoodieCleanStatsOne = runCleaner(config, simulateFailureRetry);
|
List<HoodieCleanStat> hoodieCleanStatsOne = runCleaner(config, simulateFailureRetry);
|
||||||
assertEquals(0,
|
assertEquals(0, hoodieCleanStatsOne.size(), "Must not scan any partitions and clean any files");
|
||||||
getCleanStat(hoodieCleanStatsOne, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH).getSuccessDeleteFiles()
|
|
||||||
.size(), "Must not clean any files");
|
|
||||||
assertEquals(0,
|
|
||||||
getCleanStat(hoodieCleanStatsOne, HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH).getSuccessDeleteFiles()
|
|
||||||
.size(), "Must not clean any files");
|
|
||||||
assertTrue(HoodieTestUtils.doesDataFileExist(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "000",
|
assertTrue(HoodieTestUtils.doesDataFileExist(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "000",
|
||||||
file1P0C0));
|
file1P0C0));
|
||||||
assertTrue(HoodieTestUtils.doesDataFileExist(basePath, HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH, "000",
|
assertTrue(HoodieTestUtils.doesDataFileExist(basePath, HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH, "000",
|
||||||
@@ -786,12 +790,7 @@ public class TestCleaner extends TestHoodieClientBase {
|
|||||||
new HoodieInstant(State.INFLIGHT, HoodieTimeline.COMMIT_ACTION, "001"),
|
new HoodieInstant(State.INFLIGHT, HoodieTimeline.COMMIT_ACTION, "001"),
|
||||||
Option.of(commitMetadata.toJsonString().getBytes(StandardCharsets.UTF_8)));
|
Option.of(commitMetadata.toJsonString().getBytes(StandardCharsets.UTF_8)));
|
||||||
List<HoodieCleanStat> hoodieCleanStatsTwo = runCleaner(config, simulateFailureRetry);
|
List<HoodieCleanStat> hoodieCleanStatsTwo = runCleaner(config, simulateFailureRetry);
|
||||||
assertEquals(0,
|
assertEquals(0, hoodieCleanStatsTwo.size(), "Must not scan any partitions and clean any files");
|
||||||
getCleanStat(hoodieCleanStatsTwo, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH).getSuccessDeleteFiles()
|
|
||||||
.size(), "Must not clean any files");
|
|
||||||
assertEquals(0,
|
|
||||||
getCleanStat(hoodieCleanStatsTwo, HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH).getSuccessDeleteFiles()
|
|
||||||
.size(), "Must not clean any files");
|
|
||||||
assertTrue(HoodieTestUtils.doesDataFileExist(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "001",
|
assertTrue(HoodieTestUtils.doesDataFileExist(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "001",
|
||||||
file2P0C1));
|
file2P0C1));
|
||||||
assertTrue(HoodieTestUtils.doesDataFileExist(basePath, HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH, "001",
|
assertTrue(HoodieTestUtils.doesDataFileExist(basePath, HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH, "001",
|
||||||
|
|||||||
@@ -139,9 +139,6 @@ public class TestHoodieSnapshotExporter extends HoodieClientTestHarness {
|
|||||||
new HoodieSnapshotExporter().export(jsc, cfg);
|
new HoodieSnapshotExporter().export(jsc, cfg);
|
||||||
|
|
||||||
// Check results
|
// Check results
|
||||||
assertTrue(dfs.exists(new Path(targetPath + "/.hoodie/" + COMMIT_TIME + ".clean")));
|
|
||||||
assertTrue(dfs.exists(new Path(targetPath + "/.hoodie/" + COMMIT_TIME + ".clean.inflight")));
|
|
||||||
assertTrue(dfs.exists(new Path(targetPath + "/.hoodie/" + COMMIT_TIME + ".clean.requested")));
|
|
||||||
assertTrue(dfs.exists(new Path(targetPath + "/.hoodie/" + COMMIT_TIME + ".commit")));
|
assertTrue(dfs.exists(new Path(targetPath + "/.hoodie/" + COMMIT_TIME + ".commit")));
|
||||||
assertTrue(dfs.exists(new Path(targetPath + "/.hoodie/" + COMMIT_TIME + ".commit.requested")));
|
assertTrue(dfs.exists(new Path(targetPath + "/.hoodie/" + COMMIT_TIME + ".commit.requested")));
|
||||||
assertTrue(dfs.exists(new Path(targetPath + "/.hoodie/" + COMMIT_TIME + ".inflight")));
|
assertTrue(dfs.exists(new Path(targetPath + "/.hoodie/" + COMMIT_TIME + ".inflight")));
|
||||||
|
|||||||
Reference in New Issue
Block a user