[HUDI-1276] [HUDI-1459] Make Clustering/ReplaceCommit and Metadata table be compatible (#2422)
* [HUDI-1276] [HUDI-1459] Make Clustering/ReplaceCommit and Metadata table be compatible * Use filesystemview and json format from metadata. Add tests Co-authored-by: Satish Kotha <satishkotha@uber.com>
This commit is contained in:
@@ -290,10 +290,10 @@ public class HoodieTimelineArchiveLog<T extends HoodieAvroPayload, I, K, O> {
|
||||
LOG.info("Wrapper schema " + wrapperSchema.toString());
|
||||
List<IndexedRecord> records = new ArrayList<>();
|
||||
for (HoodieInstant hoodieInstant : instants) {
|
||||
// TODO HUDI-1518 Cleaner now takes care of removing replaced file groups. This call to deleteReplacedFileGroups can be removed.
|
||||
boolean deleteSuccess = deleteReplacedFileGroups(context, hoodieInstant);
|
||||
if (!deleteSuccess) {
|
||||
// throw error and stop archival if deleting replaced file groups failed.
|
||||
throw new HoodieCommitException("Unable to delete file(s) for " + hoodieInstant.getFileName());
|
||||
LOG.warn("Unable to delete file(s) for " + hoodieInstant.getFileName() + ", replaced files possibly deleted by cleaner");
|
||||
}
|
||||
try {
|
||||
deleteAnyLeftOverMarkerFiles(context, hoodieInstant);
|
||||
|
||||
@@ -21,9 +21,9 @@ package org.apache.hudi.table.action.clean;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hudi.avro.model.HoodieActionInstant;
|
||||
import org.apache.hudi.avro.model.HoodieCleanFileInfo;
|
||||
import org.apache.hudi.avro.model.HoodieCleanMetadata;
|
||||
import org.apache.hudi.avro.model.HoodieCleanerPlan;
|
||||
import org.apache.hudi.avro.model.HoodieCleanFileInfo;
|
||||
import org.apache.hudi.common.HoodieCleanStat;
|
||||
import org.apache.hudi.common.engine.HoodieEngineContext;
|
||||
import org.apache.hudi.common.model.HoodieCleaningPolicy;
|
||||
@@ -72,7 +72,7 @@ public abstract class BaseCleanActionExecutor<T extends HoodieRecordPayload, I,
|
||||
List<String> partitionsToClean = planner.getPartitionPathsToClean(earliestInstant);
|
||||
|
||||
if (partitionsToClean.isEmpty()) {
|
||||
LOG.info("Nothing to clean here. It is already clean");
|
||||
LOG.info("Nothing to clean here.");
|
||||
return HoodieCleanerPlan.newBuilder().setPolicy(HoodieCleaningPolicy.KEEP_LATEST_COMMITS.name()).build();
|
||||
}
|
||||
LOG.info("Total Partitions to clean : " + partitionsToClean.size() + ", with policy " + config.getCleanerPolicy());
|
||||
|
||||
@@ -29,6 +29,7 @@ import org.apache.hudi.common.model.HoodieCommitMetadata;
|
||||
import org.apache.hudi.common.model.HoodieFileGroup;
|
||||
import org.apache.hudi.common.model.HoodieFileGroupId;
|
||||
import org.apache.hudi.common.model.HoodieRecordPayload;
|
||||
import org.apache.hudi.common.model.HoodieReplaceCommitMetadata;
|
||||
import org.apache.hudi.common.model.HoodieTableType;
|
||||
import org.apache.hudi.common.table.timeline.HoodieInstant;
|
||||
import org.apache.hudi.common.table.timeline.HoodieTimeline;
|
||||
@@ -111,14 +112,14 @@ public class CleanPlanner<T extends HoodieRecordPayload, I, K, O> implements Ser
|
||||
/**
|
||||
* Returns list of partitions where clean operations needs to be performed.
|
||||
*
|
||||
* @param newInstantToRetain New instant to be retained after this cleanup operation
|
||||
* @param earliestRetainedInstant New instant to be retained after this cleanup operation
|
||||
* @return list of partitions to scan for cleaning
|
||||
* @throws IOException when underlying file-system throws this exception
|
||||
*/
|
||||
public List<String> getPartitionPathsToClean(Option<HoodieInstant> newInstantToRetain) throws IOException {
|
||||
public List<String> getPartitionPathsToClean(Option<HoodieInstant> earliestRetainedInstant) throws IOException {
|
||||
switch (config.getCleanerPolicy()) {
|
||||
case KEEP_LATEST_COMMITS:
|
||||
return getPartitionPathsForCleanByCommits(newInstantToRetain);
|
||||
return getPartitionPathsForCleanByCommits(earliestRetainedInstant);
|
||||
case KEEP_LATEST_FILE_VERSIONS:
|
||||
return getPartitionPathsForFullCleaning();
|
||||
default:
|
||||
@@ -168,10 +169,16 @@ public class CleanPlanner<T extends HoodieRecordPayload, I, K, O> implements Ser
|
||||
cleanMetadata.getEarliestCommitToRetain()) && HoodieTimeline.compareTimestamps(instant.getTimestamp(),
|
||||
HoodieTimeline.LESSER_THAN, newInstantToRetain.get().getTimestamp())).flatMap(instant -> {
|
||||
try {
|
||||
HoodieCommitMetadata commitMetadata = HoodieCommitMetadata
|
||||
.fromBytes(hoodieTable.getActiveTimeline().getInstantDetails(instant).get(),
|
||||
HoodieCommitMetadata.class);
|
||||
return commitMetadata.getPartitionToWriteStats().keySet().stream();
|
||||
if (HoodieTimeline.REPLACE_COMMIT_ACTION.equals(instant.getAction())) {
|
||||
HoodieReplaceCommitMetadata replaceCommitMetadata = HoodieReplaceCommitMetadata.fromBytes(
|
||||
hoodieTable.getActiveTimeline().getInstantDetails(instant).get(), HoodieReplaceCommitMetadata.class);
|
||||
return Stream.concat(replaceCommitMetadata.getPartitionToReplaceFileIds().keySet().stream(), replaceCommitMetadata.getPartitionToWriteStats().keySet().stream());
|
||||
} else {
|
||||
HoodieCommitMetadata commitMetadata = HoodieCommitMetadata
|
||||
.fromBytes(hoodieTable.getActiveTimeline().getInstantDetails(instant).get(),
|
||||
HoodieCommitMetadata.class);
|
||||
return commitMetadata.getPartitionToWriteStats().keySet().stream();
|
||||
}
|
||||
} catch (IOException e) {
|
||||
throw new HoodieIOException(e.getMessage(), e);
|
||||
}
|
||||
@@ -196,13 +203,17 @@ public class CleanPlanner<T extends HoodieRecordPayload, I, K, O> implements Ser
|
||||
private List<CleanFileInfo> getFilesToCleanKeepingLatestVersions(String partitionPath) {
|
||||
LOG.info("Cleaning " + partitionPath + ", retaining latest " + config.getCleanerFileVersionsRetained()
|
||||
+ " file versions. ");
|
||||
List<HoodieFileGroup> fileGroups = fileSystemView.getAllFileGroups(partitionPath).collect(Collectors.toList());
|
||||
List<CleanFileInfo> deletePaths = new ArrayList<>();
|
||||
// Collect all the datafiles savepointed by all the savepoints
|
||||
List<String> savepointedFiles = hoodieTable.getSavepoints().stream()
|
||||
.flatMap(this::getSavepointedDataFiles)
|
||||
.collect(Collectors.toList());
|
||||
|
||||
// In this scenario, we will assume that once replaced a file group automatically becomes eligible for cleaning completely
|
||||
// In other words, the file versions only apply to the active file groups.
|
||||
deletePaths.addAll(getReplacedFilesEligibleToClean(savepointedFiles, partitionPath, Option.empty()));
|
||||
|
||||
List<HoodieFileGroup> fileGroups = fileSystemView.getAllFileGroups(partitionPath).collect(Collectors.toList());
|
||||
for (HoodieFileGroup fileGroup : fileGroups) {
|
||||
int keepVersions = config.getCleanerFileVersionsRetained();
|
||||
// do not cleanup slice required for pending compaction
|
||||
@@ -226,18 +237,7 @@ public class CleanPlanner<T extends HoodieRecordPayload, I, K, O> implements Ser
|
||||
// Delete the remaining files
|
||||
while (fileSliceIterator.hasNext()) {
|
||||
FileSlice nextSlice = fileSliceIterator.next();
|
||||
if (nextSlice.getBaseFile().isPresent()) {
|
||||
HoodieBaseFile dataFile = nextSlice.getBaseFile().get();
|
||||
deletePaths.add(new CleanFileInfo(dataFile.getPath(), false));
|
||||
if (dataFile.getBootstrapBaseFile().isPresent() && config.shouldCleanBootstrapBaseFile()) {
|
||||
deletePaths.add(new CleanFileInfo(dataFile.getBootstrapBaseFile().get().getPath(), true));
|
||||
}
|
||||
}
|
||||
if (hoodieTable.getMetaClient().getTableType() == HoodieTableType.MERGE_ON_READ) {
|
||||
// If merge on read, then clean the log files for the commits as well
|
||||
deletePaths.addAll(nextSlice.getLogFiles().map(lf -> new CleanFileInfo(lf.getPath().toString(), false))
|
||||
.collect(Collectors.toList()));
|
||||
}
|
||||
deletePaths.addAll(getCleanFileInfoForSlice(nextSlice));
|
||||
}
|
||||
}
|
||||
return deletePaths;
|
||||
@@ -269,7 +269,11 @@ public class CleanPlanner<T extends HoodieRecordPayload, I, K, O> implements Ser
|
||||
|
||||
// determine if we have enough commits, to start cleaning.
|
||||
if (commitTimeline.countInstants() > commitsRetained) {
|
||||
HoodieInstant earliestCommitToRetain = getEarliestCommitToRetain().get();
|
||||
Option<HoodieInstant> earliestCommitToRetainOption = getEarliestCommitToRetain();
|
||||
HoodieInstant earliestCommitToRetain = earliestCommitToRetainOption.get();
|
||||
// all replaced file groups before earliestCommitToRetain are eligible to clean
|
||||
deletePaths.addAll(getReplacedFilesEligibleToClean(savepointedFiles, partitionPath, earliestCommitToRetainOption));
|
||||
// add active files
|
||||
List<HoodieFileGroup> fileGroups = fileSystemView.getAllFileGroups(partitionPath).collect(Collectors.toList());
|
||||
for (HoodieFileGroup fileGroup : fileGroups) {
|
||||
List<FileSlice> fileSliceList = fileGroup.getAllFileSlices().collect(Collectors.toList());
|
||||
@@ -322,6 +326,20 @@ public class CleanPlanner<T extends HoodieRecordPayload, I, K, O> implements Ser
|
||||
}
|
||||
return deletePaths;
|
||||
}
|
||||
|
||||
private List<CleanFileInfo> getReplacedFilesEligibleToClean(List<String> savepointedFiles, String partitionPath, Option<HoodieInstant> earliestCommitToRetain) {
|
||||
final Stream<HoodieFileGroup> replacedGroups;
|
||||
if (earliestCommitToRetain.isPresent()) {
|
||||
replacedGroups = fileSystemView.getReplacedFileGroupsBefore(earliestCommitToRetain.get().getTimestamp(), partitionPath);
|
||||
} else {
|
||||
replacedGroups = fileSystemView.getAllReplacedFileGroups(partitionPath);
|
||||
}
|
||||
return replacedGroups.flatMap(HoodieFileGroup::getAllFileSlices)
|
||||
// do not delete savepointed files (archival will make sure corresponding replacecommit file is not deleted)
|
||||
.filter(slice -> !slice.getBaseFile().isPresent() || !savepointedFiles.contains(slice.getBaseFile().get().getFileName()))
|
||||
.flatMap(slice -> getCleanFileInfoForSlice(slice).stream())
|
||||
.collect(Collectors.toList());
|
||||
}
|
||||
|
||||
/**
|
||||
* Gets the latest version < instantTime. This version file could still be used by queries.
|
||||
@@ -339,6 +357,23 @@ public class CleanPlanner<T extends HoodieRecordPayload, I, K, O> implements Ser
|
||||
return null;
|
||||
}
|
||||
|
||||
private List<CleanFileInfo> getCleanFileInfoForSlice(FileSlice nextSlice) {
|
||||
List<CleanFileInfo> cleanPaths = new ArrayList<>();
|
||||
if (nextSlice.getBaseFile().isPresent()) {
|
||||
HoodieBaseFile dataFile = nextSlice.getBaseFile().get();
|
||||
cleanPaths.add(new CleanFileInfo(dataFile.getPath(), false));
|
||||
if (dataFile.getBootstrapBaseFile().isPresent() && config.shouldCleanBootstrapBaseFile()) {
|
||||
cleanPaths.add(new CleanFileInfo(dataFile.getBootstrapBaseFile().get().getPath(), true));
|
||||
}
|
||||
}
|
||||
if (hoodieTable.getMetaClient().getTableType() == HoodieTableType.MERGE_ON_READ) {
|
||||
// If merge on read, then clean the log files for the commits as well
|
||||
cleanPaths.addAll(nextSlice.getLogFiles().map(lf -> new CleanFileInfo(lf.getPath().toString(), false))
|
||||
.collect(Collectors.toList()));
|
||||
}
|
||||
return cleanPaths;
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns files to be cleaned for the given partitionPath based on cleaning policy.
|
||||
*/
|
||||
|
||||
@@ -122,6 +122,7 @@ public class RollbackUtils {
|
||||
List<ListingBasedRollbackRequest> partitionRollbackRequests = new ArrayList<>();
|
||||
switch (instantToRollback.getAction()) {
|
||||
case HoodieTimeline.COMMIT_ACTION:
|
||||
case HoodieTimeline.REPLACE_COMMIT_ACTION:
|
||||
LOG.info("Rolling back commit action.");
|
||||
partitionRollbackRequests.add(
|
||||
ListingBasedRollbackRequest.createRollbackRequestWithDeleteDataAndLogFilesAction(partitionPath));
|
||||
|
||||
@@ -18,6 +18,7 @@
|
||||
|
||||
package org.apache.hudi.metadata;
|
||||
|
||||
import org.apache.hudi.client.HoodieWriteResult;
|
||||
import org.apache.hudi.client.SparkRDDWriteClient;
|
||||
import org.apache.hudi.client.WriteStatus;
|
||||
import org.apache.hudi.client.common.HoodieSparkEngineContext;
|
||||
@@ -498,6 +499,15 @@ public class TestHoodieBackedMetadata extends HoodieClientTestHarness {
|
||||
writeStatuses = client.upsert(jsc.parallelize(records, 1), newCommitTime).collect();
|
||||
assertNoWriteErrors(writeStatuses);
|
||||
assertFalse(metadata(client).isInSync());
|
||||
|
||||
// insert overwrite to test replacecommit
|
||||
newCommitTime = HoodieActiveTimeline.createNewInstantTime();
|
||||
client.startCommitWithTime(newCommitTime, HoodieTimeline.REPLACE_COMMIT_ACTION);
|
||||
records = dataGen.generateInserts(newCommitTime, 5);
|
||||
HoodieWriteResult replaceResult = client.insertOverwrite(jsc.parallelize(records, 1), newCommitTime);
|
||||
writeStatuses = replaceResult.getWriteStatuses().collect();
|
||||
assertNoWriteErrors(writeStatuses);
|
||||
assertFalse(metadata(client).isInSync());
|
||||
}
|
||||
|
||||
// Enable metadata table and ensure it is synced
|
||||
@@ -800,6 +810,7 @@ public class TestHoodieBackedMetadata extends HoodieClientTestHarness {
|
||||
|
||||
// FileSystemView should expose the same data
|
||||
List<HoodieFileGroup> fileGroups = tableView.getAllFileGroups(partition).collect(Collectors.toList());
|
||||
fileGroups.addAll(tableView.getAllReplacedFileGroups(partition).collect(Collectors.toList()));
|
||||
|
||||
fileGroups.forEach(g -> LogManager.getLogger(TestHoodieBackedMetadata.class).info(g));
|
||||
fileGroups.forEach(g -> g.getAllBaseFiles().forEach(b -> LogManager.getLogger(TestHoodieBackedMetadata.class).info(b)));
|
||||
|
||||
@@ -18,6 +18,8 @@
|
||||
|
||||
package org.apache.hudi.table;
|
||||
|
||||
import org.apache.hadoop.fs.FSDataOutputStream;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hudi.avro.model.HoodieActionInstant;
|
||||
import org.apache.hudi.avro.model.HoodieCleanMetadata;
|
||||
import org.apache.hudi.avro.model.HoodieCleanPartitionMetadata;
|
||||
@@ -38,9 +40,11 @@ import org.apache.hudi.common.model.HoodieCommitMetadata;
|
||||
import org.apache.hudi.common.model.HoodieFileGroup;
|
||||
import org.apache.hudi.common.model.HoodieFileGroupId;
|
||||
import org.apache.hudi.common.model.HoodieRecord;
|
||||
import org.apache.hudi.common.model.HoodieReplaceCommitMetadata;
|
||||
import org.apache.hudi.common.model.HoodieTableType;
|
||||
import org.apache.hudi.common.model.HoodieWriteStat;
|
||||
import org.apache.hudi.common.model.IOType;
|
||||
import org.apache.hudi.common.model.WriteOperationType;
|
||||
import org.apache.hudi.common.table.HoodieTableMetaClient;
|
||||
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
|
||||
import org.apache.hudi.common.table.timeline.HoodieInstant;
|
||||
@@ -57,6 +61,7 @@ import org.apache.hudi.common.util.CleanerUtils;
|
||||
import org.apache.hudi.common.util.CollectionUtils;
|
||||
import org.apache.hudi.common.util.CompactionUtils;
|
||||
import org.apache.hudi.common.util.Option;
|
||||
import org.apache.hudi.common.util.StringUtils;
|
||||
import org.apache.hudi.common.util.collection.Pair;
|
||||
import org.apache.hudi.config.HoodieCompactionConfig;
|
||||
import org.apache.hudi.config.HoodieWriteConfig;
|
||||
@@ -65,9 +70,6 @@ import org.apache.hudi.index.HoodieIndex;
|
||||
import org.apache.hudi.index.SparkHoodieIndex;
|
||||
import org.apache.hudi.table.action.clean.CleanPlanner;
|
||||
import org.apache.hudi.testutils.HoodieClientTestBase;
|
||||
|
||||
import org.apache.hadoop.fs.FSDataOutputStream;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.log4j.LogManager;
|
||||
import org.apache.log4j.Logger;
|
||||
import org.apache.spark.api.java.JavaRDD;
|
||||
@@ -76,6 +78,7 @@ import org.junit.jupiter.params.ParameterizedTest;
|
||||
import org.junit.jupiter.params.provider.Arguments;
|
||||
import org.junit.jupiter.params.provider.MethodSource;
|
||||
import org.junit.jupiter.params.provider.ValueSource;
|
||||
import scala.Tuple3;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
@@ -96,8 +99,6 @@ import java.util.function.Predicate;
|
||||
import java.util.stream.Collectors;
|
||||
import java.util.stream.Stream;
|
||||
|
||||
import scala.Tuple3;
|
||||
|
||||
import static org.apache.hudi.common.testutils.HoodieTestTable.makeIncrementalCommitTimes;
|
||||
import static org.apache.hudi.common.testutils.HoodieTestTable.makeNewCommitTime;
|
||||
import static org.apache.hudi.common.testutils.HoodieTestUtils.DEFAULT_PARTITION_PATHS;
|
||||
@@ -687,6 +688,107 @@ public class TestCleaner extends HoodieClientTestBase {
|
||||
assertTrue(testTable.baseFileExists(p0, "002", file1P0));
|
||||
assertTrue(testTable.logFileExists(p0, "002", file1P0, 4));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testCleanWithReplaceCommits() throws Exception {
|
||||
HoodieWriteConfig config = HoodieWriteConfig.newBuilder().withPath(basePath).withAssumeDatePartitioning(true)
|
||||
.withCompactionConfig(HoodieCompactionConfig.newBuilder()
|
||||
.withCleanerPolicy(HoodieCleaningPolicy.KEEP_LATEST_COMMITS).retainCommits(2).build())
|
||||
.build();
|
||||
|
||||
HoodieTestTable testTable = HoodieTestTable.of(metaClient);
|
||||
String p0 = "2020/01/01";
|
||||
String p1 = "2020/01/02";
|
||||
|
||||
// make 1 commit, with 1 file per partition
|
||||
String file1P0C0 = UUID.randomUUID().toString();
|
||||
String file1P1C0 = UUID.randomUUID().toString();
|
||||
testTable.addInflightCommit("00000000000001").withBaseFilesInPartition(p0, file1P0C0).withBaseFilesInPartition(p1, file1P1C0);
|
||||
|
||||
HoodieCommitMetadata commitMetadata = generateCommitMetadata(
|
||||
Collections.unmodifiableMap(new HashMap<String, List<String>>() {
|
||||
{
|
||||
put(p0, CollectionUtils.createImmutableList(file1P0C0));
|
||||
put(p1, CollectionUtils.createImmutableList(file1P1C0));
|
||||
}
|
||||
})
|
||||
);
|
||||
metaClient.getActiveTimeline().saveAsComplete(
|
||||
new HoodieInstant(State.INFLIGHT, HoodieTimeline.COMMIT_ACTION, "00000000000001"),
|
||||
Option.of(commitMetadata.toJsonString().getBytes(StandardCharsets.UTF_8)));
|
||||
|
||||
metaClient = HoodieTableMetaClient.reload(metaClient);
|
||||
|
||||
List<HoodieCleanStat> hoodieCleanStatsOne = runCleaner(config);
|
||||
assertEquals(0, hoodieCleanStatsOne.size(), "Must not scan any partitions and clean any files");
|
||||
assertTrue(testTable.baseFileExists(p0, "00000000000001", file1P0C0));
|
||||
assertTrue(testTable.baseFileExists(p1, "00000000000001", file1P1C0));
|
||||
|
||||
// make next replacecommit, with 1 clustering operation. logically delete p0. No change to p1
|
||||
Map<String, String> partitionAndFileId002 = testTable.forReplaceCommit("00000000000002").getFileIdsWithBaseFilesInPartitions(p0);
|
||||
String file2P0C1 = partitionAndFileId002.get(p0);
|
||||
testTable.addReplaceCommit("00000000000002", generateReplaceCommitMetadata(p0, file1P0C0, file2P0C1));
|
||||
|
||||
// run cleaner
|
||||
List<HoodieCleanStat> hoodieCleanStatsTwo = runCleaner(config);
|
||||
assertEquals(0, hoodieCleanStatsTwo.size(), "Must not scan any partitions and clean any files");
|
||||
assertTrue(testTable.baseFileExists(p0, "00000000000002", file2P0C1));
|
||||
assertTrue(testTable.baseFileExists(p0, "00000000000001", file1P0C0));
|
||||
assertTrue(testTable.baseFileExists(p1, "00000000000001", file1P1C0));
|
||||
|
||||
// make next replacecommit, with 1 clustering operation. Replace data in p1. No change to p0
|
||||
Map<String, String> partitionAndFileId003 = testTable.forReplaceCommit("00000000000003").getFileIdsWithBaseFilesInPartitions(p1);
|
||||
String file3P1C2 = partitionAndFileId003.get(p1);
|
||||
testTable.addReplaceCommit("00000000000003", generateReplaceCommitMetadata(p1, file1P1C0, file3P1C2));
|
||||
|
||||
// run cleaner
|
||||
List<HoodieCleanStat> hoodieCleanStatsThree = runCleaner(config);
|
||||
assertEquals(0, hoodieCleanStatsThree.size(), "Must not scan any partitions and clean any files");
|
||||
assertTrue(testTable.baseFileExists(p0, "00000000000002", file2P0C1));
|
||||
assertTrue(testTable.baseFileExists(p0, "00000000000001", file1P0C0));
|
||||
assertTrue(testTable.baseFileExists(p1, "00000000000003", file3P1C2));
|
||||
assertTrue(testTable.baseFileExists(p1, "00000000000001", file1P1C0));
|
||||
|
||||
// make next replacecommit, with 1 clustering operation. Replace data in p0 again
|
||||
Map<String, String> partitionAndFileId004 = testTable.forReplaceCommit("00000000000004").getFileIdsWithBaseFilesInPartitions(p0);
|
||||
String file4P0C3 = partitionAndFileId004.get(p0);
|
||||
testTable.addReplaceCommit("00000000000004", generateReplaceCommitMetadata(p0, file2P0C1, file4P0C3));
|
||||
|
||||
// run cleaner
|
||||
List<HoodieCleanStat> hoodieCleanStatsFour = runCleaner(config);
|
||||
assertTrue(testTable.baseFileExists(p0, "00000000000004", file4P0C3));
|
||||
assertTrue(testTable.baseFileExists(p0, "00000000000002", file2P0C1));
|
||||
assertTrue(testTable.baseFileExists(p1, "00000000000003", file3P1C2));
|
||||
assertFalse(testTable.baseFileExists(p0, "00000000000001", file1P0C0));
|
||||
//file1P1C0 still stays because its not replaced until 3 and its the only version available
|
||||
assertTrue(testTable.baseFileExists(p1, "00000000000001", file1P1C0));
|
||||
|
||||
// make next replacecommit, with 1 clustering operation. Replace all data in p1. no new files created
|
||||
Map<String, String> partitionAndFileId005 = testTable.forReplaceCommit("00000000000005").getFileIdsWithBaseFilesInPartitions(p1);
|
||||
String file4P1C4 = partitionAndFileId005.get(p1);
|
||||
testTable.addReplaceCommit("00000000000005", generateReplaceCommitMetadata(p1, file3P1C2, file4P1C4));
|
||||
|
||||
List<HoodieCleanStat> hoodieCleanStatsFive = runCleaner(config, 2);
|
||||
assertTrue(testTable.baseFileExists(p0, "00000000000004", file4P0C3));
|
||||
assertTrue(testTable.baseFileExists(p0, "00000000000002", file2P0C1));
|
||||
assertTrue(testTable.baseFileExists(p1, "00000000000003", file3P1C2));
|
||||
assertFalse(testTable.baseFileExists(p0, "00000000000001", file1P0C0));
|
||||
assertFalse(testTable.baseFileExists(p1, "00000000000001", file1P1C0));
|
||||
}
|
||||
|
||||
private HoodieReplaceCommitMetadata generateReplaceCommitMetadata(String partition, String replacedFileId, String newFileId) {
|
||||
HoodieReplaceCommitMetadata replaceMetadata = new HoodieReplaceCommitMetadata();
|
||||
replaceMetadata.addReplaceFileId(partition, replacedFileId);
|
||||
replaceMetadata.setOperationType(WriteOperationType.CLUSTER);
|
||||
if (!StringUtils.isNullOrEmpty(newFileId)) {
|
||||
HoodieWriteStat writeStat = new HoodieWriteStat();
|
||||
writeStat.setPartitionPath(partition);
|
||||
writeStat.setPath(newFileId);
|
||||
writeStat.setFileId(newFileId);
|
||||
replaceMetadata.addWriteStat(partition, writeStat);
|
||||
}
|
||||
return replaceMetadata;
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testCleanMetadataUpgradeDowngrade() {
|
||||
|
||||
@@ -33,6 +33,7 @@ import org.apache.hudi.avro.model.HoodieCleanMetadata;
|
||||
import org.apache.hudi.avro.model.HoodieCleanerPlan;
|
||||
import org.apache.hudi.avro.model.HoodieCompactionPlan;
|
||||
import org.apache.hudi.avro.model.HoodieInstantInfo;
|
||||
import org.apache.hudi.avro.model.HoodieReplaceCommitMetadata;
|
||||
import org.apache.hudi.avro.model.HoodieRequestedReplaceMetadata;
|
||||
import org.apache.hudi.avro.model.HoodieRestoreMetadata;
|
||||
import org.apache.hudi.avro.model.HoodieRollbackMetadata;
|
||||
@@ -158,10 +159,14 @@ public class TimelineMetadataUtils {
|
||||
return deserializeAvroMetadata(bytes, HoodieSavepointMetadata.class);
|
||||
}
|
||||
|
||||
public static HoodieRequestedReplaceMetadata deserializeRequestedReplaceMetadta(byte[] bytes) throws IOException {
|
||||
public static HoodieRequestedReplaceMetadata deserializeRequestedReplaceMetadata(byte[] bytes) throws IOException {
|
||||
return deserializeAvroMetadata(bytes, HoodieRequestedReplaceMetadata.class);
|
||||
}
|
||||
|
||||
public static HoodieReplaceCommitMetadata deserializeHoodieReplaceMetadata(byte[] bytes) throws IOException {
|
||||
return deserializeAvroMetadata(bytes, HoodieReplaceCommitMetadata.class);
|
||||
}
|
||||
|
||||
public static <T extends SpecificRecordBase> T deserializeAvroMetadata(byte[] bytes, Class<T> clazz)
|
||||
throws IOException {
|
||||
DatumReader<T> reader = new SpecificDatumReader<>(clazz);
|
||||
|
||||
@@ -62,6 +62,7 @@ import java.util.function.Predicate;
|
||||
import java.util.stream.Collectors;
|
||||
import java.util.stream.Stream;
|
||||
|
||||
import static org.apache.hudi.common.table.timeline.HoodieTimeline.GREATER_THAN;
|
||||
import static org.apache.hudi.common.table.timeline.HoodieTimeline.GREATER_THAN_OR_EQUALS;
|
||||
import static org.apache.hudi.common.table.timeline.HoodieTimeline.METADATA_BOOTSTRAP_INSTANT_TS;
|
||||
|
||||
@@ -690,6 +691,16 @@ public abstract class AbstractTableFileSystemView implements SyncableFileSystemV
|
||||
return getAllFileGroupsIncludingReplaced(partitionPath).filter(fg -> isFileGroupReplacedBeforeOrOn(fg.getFileGroupId(), maxCommitTime));
|
||||
}
|
||||
|
||||
@Override
|
||||
public Stream<HoodieFileGroup> getReplacedFileGroupsBefore(String maxCommitTime, String partitionPath) {
|
||||
return getAllFileGroupsIncludingReplaced(partitionPath).filter(fg -> isFileGroupReplacedBefore(fg.getFileGroupId(), maxCommitTime));
|
||||
}
|
||||
|
||||
@Override
|
||||
public Stream<HoodieFileGroup> getAllReplacedFileGroups(String partitionPath) {
|
||||
return getAllFileGroupsIncludingReplaced(partitionPath).filter(fg -> isFileGroupReplaced(fg.getFileGroupId()));
|
||||
}
|
||||
|
||||
@Override
|
||||
public final Stream<Pair<HoodieFileGroupId, HoodieInstant>> getFileGroupsInPendingClustering() {
|
||||
try {
|
||||
@@ -1041,6 +1052,15 @@ public abstract class AbstractTableFileSystemView implements SyncableFileSystemV
|
||||
return isFileGroupReplacedBeforeOrOn(fileGroupId, instants.stream().max(Comparator.naturalOrder()).get());
|
||||
}
|
||||
|
||||
private boolean isFileGroupReplacedBefore(HoodieFileGroupId fileGroupId, String instant) {
|
||||
Option<HoodieInstant> hoodieInstantOption = getReplaceInstant(fileGroupId);
|
||||
if (!hoodieInstantOption.isPresent()) {
|
||||
return false;
|
||||
}
|
||||
|
||||
return HoodieTimeline.compareTimestamps(instant, GREATER_THAN, hoodieInstantOption.get().getTimestamp());
|
||||
}
|
||||
|
||||
private boolean isFileGroupReplacedBeforeOrOn(HoodieFileGroupId fileGroupId, String instant) {
|
||||
Option<HoodieInstant> hoodieInstantOption = getReplaceInstant(fileGroupId);
|
||||
if (!hoodieInstantOption.isPresent()) {
|
||||
|
||||
@@ -199,6 +199,16 @@ public class PriorityBasedFileSystemView implements SyncableFileSystemView, Seri
|
||||
return execute(maxCommitTime, partitionPath, preferredView::getReplacedFileGroupsBeforeOrOn, secondaryView::getReplacedFileGroupsBeforeOrOn);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Stream<HoodieFileGroup> getReplacedFileGroupsBefore(String maxCommitTime, String partitionPath) {
|
||||
return execute(maxCommitTime, partitionPath, preferredView::getReplacedFileGroupsBefore, secondaryView::getReplacedFileGroupsBefore);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Stream<HoodieFileGroup> getAllReplacedFileGroups(String partitionPath) {
|
||||
return execute(partitionPath, preferredView::getAllReplacedFileGroups, secondaryView::getAllReplacedFileGroups);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Stream<Pair<String, CompactionOperation>> getPendingCompactionOperations() {
|
||||
return execute(preferredView::getPendingCompactionOperations, secondaryView::getPendingCompactionOperations);
|
||||
|
||||
@@ -91,6 +91,12 @@ public class RemoteHoodieTableFileSystemView implements SyncableFileSystemView,
|
||||
public static final String ALL_REPLACED_FILEGROUPS_BEFORE_OR_ON =
|
||||
String.format("%s/%s", BASE_URL, "filegroups/replaced/beforeoron/");
|
||||
|
||||
public static final String ALL_REPLACED_FILEGROUPS_BEFORE =
|
||||
String.format("%s/%s", BASE_URL, "filegroups/replaced/before/");
|
||||
|
||||
public static final String ALL_REPLACED_FILEGROUPS_PARTITION =
|
||||
String.format("%s/%s", BASE_URL, "filegroups/replaced/partition/");
|
||||
|
||||
public static final String PENDING_CLUSTERING_FILEGROUPS = String.format("%s/%s", BASE_URL, "clustering/pending/");
|
||||
|
||||
|
||||
@@ -380,6 +386,30 @@ public class RemoteHoodieTableFileSystemView implements SyncableFileSystemView,
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public Stream<HoodieFileGroup> getReplacedFileGroupsBefore(String maxCommitTime, String partitionPath) {
|
||||
Map<String, String> paramsMap = getParamsWithAdditionalParam(partitionPath, MAX_INSTANT_PARAM, maxCommitTime);
|
||||
try {
|
||||
List<FileGroupDTO> fileGroups = executeRequest(ALL_REPLACED_FILEGROUPS_BEFORE, paramsMap,
|
||||
new TypeReference<List<FileGroupDTO>>() {}, RequestMethod.GET);
|
||||
return fileGroups.stream().map(dto -> FileGroupDTO.toFileGroup(dto, metaClient));
|
||||
} catch (IOException e) {
|
||||
throw new HoodieRemoteException(e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public Stream<HoodieFileGroup> getAllReplacedFileGroups(String partitionPath) {
|
||||
Map<String, String> paramsMap = getParamsWithPartitionPath(partitionPath);
|
||||
try {
|
||||
List<FileGroupDTO> fileGroups = executeRequest(ALL_REPLACED_FILEGROUPS_PARTITION, paramsMap,
|
||||
new TypeReference<List<FileGroupDTO>>() {}, RequestMethod.GET);
|
||||
return fileGroups.stream().map(dto -> FileGroupDTO.toFileGroup(dto, metaClient));
|
||||
} catch (IOException e) {
|
||||
throw new HoodieRemoteException(e);
|
||||
}
|
||||
}
|
||||
|
||||
public boolean refresh() {
|
||||
Map<String, String> paramsMap = getParams();
|
||||
try {
|
||||
|
||||
@@ -167,10 +167,20 @@ public interface TableFileSystemView {
|
||||
HoodieTimeline getTimeline();
|
||||
|
||||
/**
|
||||
* Stream all the replaced file groups before maxCommitTime.
|
||||
* Stream all the replaced file groups before or on maxCommitTime for given partition.
|
||||
*/
|
||||
Stream<HoodieFileGroup> getReplacedFileGroupsBeforeOrOn(String maxCommitTime, String partitionPath);
|
||||
|
||||
/**
|
||||
* Stream all the replaced file groups before maxCommitTime for given partition.
|
||||
*/
|
||||
Stream<HoodieFileGroup> getReplacedFileGroupsBefore(String maxCommitTime, String partitionPath);
|
||||
|
||||
/**
|
||||
* Stream all the replaced file groups for given partition.
|
||||
*/
|
||||
Stream<HoodieFileGroup> getAllReplacedFileGroups(String partitionPath);
|
||||
|
||||
/**
|
||||
* Filegroups that are in pending clustering.
|
||||
*/
|
||||
|
||||
@@ -86,7 +86,7 @@ public class ClusteringUtils {
|
||||
LOG.warn("No content found in requested file for instant " + pendingReplaceInstant);
|
||||
return Option.empty();
|
||||
}
|
||||
HoodieRequestedReplaceMetadata requestedReplaceMetadata = TimelineMetadataUtils.deserializeRequestedReplaceMetadta(content.get());
|
||||
HoodieRequestedReplaceMetadata requestedReplaceMetadata = TimelineMetadataUtils.deserializeRequestedReplaceMetadata(content.get());
|
||||
if (WriteOperationType.CLUSTER.name().equals(requestedReplaceMetadata.getOperationType())) {
|
||||
return Option.of(Pair.of(pendingReplaceInstant, requestedReplaceMetadata.getClusteringPlan()));
|
||||
}
|
||||
|
||||
@@ -25,6 +25,7 @@ import org.apache.hudi.avro.model.HoodieRestoreMetadata;
|
||||
import org.apache.hudi.avro.model.HoodieRollbackMetadata;
|
||||
import org.apache.hudi.common.model.HoodieCommitMetadata;
|
||||
import org.apache.hudi.common.model.HoodieRecord;
|
||||
import org.apache.hudi.common.model.HoodieReplaceCommitMetadata;
|
||||
import org.apache.hudi.common.table.HoodieTableMetaClient;
|
||||
import org.apache.hudi.common.table.timeline.HoodieInstant;
|
||||
import org.apache.hudi.common.table.timeline.HoodieTimeline;
|
||||
@@ -92,6 +93,12 @@ public class HoodieTableMetadataUtil {
|
||||
case HoodieTimeline.SAVEPOINT_ACTION:
|
||||
// Nothing to be done here
|
||||
break;
|
||||
case HoodieTimeline.REPLACE_COMMIT_ACTION:
|
||||
HoodieReplaceCommitMetadata replaceMetadata = HoodieReplaceCommitMetadata.fromBytes(
|
||||
timeline.getInstantDetails(instant).get(), HoodieReplaceCommitMetadata.class);
|
||||
// Note: we only add new files created here. Replaced files are removed from metadata later by cleaner.
|
||||
records = Option.of(convertMetadataToRecords(replaceMetadata, instant.getTimestamp()));
|
||||
break;
|
||||
default:
|
||||
throw new HoodieException("Unknown type of action " + instant.getAction());
|
||||
}
|
||||
|
||||
@@ -1356,6 +1356,13 @@ public class TestHoodieTableFileSystemView extends HoodieCommonTestHarness {
|
||||
List<HoodieFileGroup> allReplaced = fsView.getReplacedFileGroupsBeforeOrOn("2", partitionPath1).collect(Collectors.toList());
|
||||
assertEquals(1, allReplaced.size());
|
||||
assertEquals(fileId1, allReplaced.get(0).getFileGroupId().getFileId());
|
||||
|
||||
allReplaced = fsView.getReplacedFileGroupsBefore("2", partitionPath1).collect(Collectors.toList());
|
||||
assertEquals(0, allReplaced.size());
|
||||
|
||||
allReplaced = fsView.getAllReplacedFileGroups(partitionPath1).collect(Collectors.toList());
|
||||
assertEquals(1, allReplaced.size());
|
||||
assertEquals(fileId1, allReplaced.get(0).getFileGroupId().getFileId());
|
||||
}
|
||||
|
||||
@Test
|
||||
|
||||
@@ -228,6 +228,11 @@ public class HoodieTestTable {
|
||||
return this;
|
||||
}
|
||||
|
||||
public HoodieTestTable forReplaceCommit(String instantTime) {
|
||||
currentInstantTime = instantTime;
|
||||
return this;
|
||||
}
|
||||
|
||||
public HoodieTestTable forCompaction(String instantTime) {
|
||||
currentInstantTime = instantTime;
|
||||
return this;
|
||||
|
||||
@@ -299,6 +299,21 @@ public class FileSystemViewHandler {
|
||||
writeValueAsString(ctx, dtos);
|
||||
}, true));
|
||||
|
||||
app.get(RemoteHoodieTableFileSystemView.ALL_REPLACED_FILEGROUPS_BEFORE, new ViewHandler(ctx -> {
|
||||
List<FileGroupDTO> dtos = sliceHandler.getReplacedFileGroupsBefore(
|
||||
ctx.validatedQueryParam(RemoteHoodieTableFileSystemView.BASEPATH_PARAM).getOrThrow(),
|
||||
ctx.queryParam(RemoteHoodieTableFileSystemView.MAX_INSTANT_PARAM,""),
|
||||
ctx.queryParam(RemoteHoodieTableFileSystemView.PARTITION_PARAM,""));
|
||||
writeValueAsString(ctx, dtos);
|
||||
}, true));
|
||||
|
||||
app.get(RemoteHoodieTableFileSystemView.ALL_REPLACED_FILEGROUPS_PARTITION, new ViewHandler(ctx -> {
|
||||
List<FileGroupDTO> dtos = sliceHandler.getAllReplacedFileGroups(
|
||||
ctx.validatedQueryParam(RemoteHoodieTableFileSystemView.BASEPATH_PARAM).getOrThrow(),
|
||||
ctx.queryParam(RemoteHoodieTableFileSystemView.PARTITION_PARAM,""));
|
||||
writeValueAsString(ctx, dtos);
|
||||
}, true));
|
||||
|
||||
app.get(RemoteHoodieTableFileSystemView.PENDING_CLUSTERING_FILEGROUPS, new ViewHandler(ctx -> {
|
||||
List<ClusteringOpDTO> dtos = sliceHandler.getFileGroupsInPendingClustering(
|
||||
ctx.validatedQueryParam(RemoteHoodieTableFileSystemView.BASEPATH_PARAM).getOrThrow());
|
||||
|
||||
@@ -94,6 +94,16 @@ public class FileSliceHandler extends Handler {
|
||||
.collect(Collectors.toList());
|
||||
}
|
||||
|
||||
public List<FileGroupDTO> getReplacedFileGroupsBefore(String basePath, String maxCommitTime, String partitionPath) {
|
||||
return viewManager.getFileSystemView(basePath).getReplacedFileGroupsBefore(maxCommitTime, partitionPath).map(FileGroupDTO::fromFileGroup)
|
||||
.collect(Collectors.toList());
|
||||
}
|
||||
|
||||
public List<FileGroupDTO> getAllReplacedFileGroups(String basePath, String partitionPath) {
|
||||
return viewManager.getFileSystemView(basePath).getAllReplacedFileGroups(partitionPath).map(FileGroupDTO::fromFileGroup)
|
||||
.collect(Collectors.toList());
|
||||
}
|
||||
|
||||
public List<ClusteringOpDTO> getFileGroupsInPendingClustering(String basePath) {
|
||||
return viewManager.getFileSystemView(basePath).getFileGroupsInPendingClustering()
|
||||
.map(fgInstant -> ClusteringOpDTO.fromClusteringOp(fgInstant.getLeft(), fgInstant.getRight()))
|
||||
|
||||
Reference in New Issue
Block a user