[HUDI-294] Delete Paths written in Cleaner plan needs to be relative to partition-path (#1062)
[HUDI-294] Delete Paths written in Cleaner plan needs to be relative to partition-path
This commit is contained in:
committed by
Balaji Varadarajan
parent
845e261658
commit
98ab33bb6e
@@ -23,11 +23,13 @@ import org.apache.hudi.avro.model.HoodieCleanerPlan;
|
||||
import org.apache.hudi.client.embedded.EmbeddedTimelineService;
|
||||
import org.apache.hudi.common.HoodieCleanStat;
|
||||
import org.apache.hudi.common.model.HoodieRecordPayload;
|
||||
import org.apache.hudi.common.table.HoodieTableMetaClient;
|
||||
import org.apache.hudi.common.table.HoodieTimeline;
|
||||
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
|
||||
import org.apache.hudi.common.table.timeline.HoodieInstant;
|
||||
import org.apache.hudi.common.table.timeline.HoodieInstant.State;
|
||||
import org.apache.hudi.common.util.AvroUtils;
|
||||
import org.apache.hudi.common.util.CleanerUtils;
|
||||
import org.apache.hudi.common.util.Option;
|
||||
import org.apache.hudi.config.HoodieWriteConfig;
|
||||
import org.apache.hudi.exception.HoodieIOException;
|
||||
@@ -166,9 +168,10 @@ public class HoodieCleanClient<T extends HoodieRecordPayload> extends AbstractHo
|
||||
logger.info("cleanerElaspsedTime (Minutes): " + durationInMs.get() / (1000 * 60));
|
||||
}
|
||||
|
||||
HoodieTableMetaClient metaClient = createMetaClient(true);
|
||||
// Create the metadata and save it
|
||||
HoodieCleanMetadata metadata =
|
||||
AvroUtils.convertCleanMetadata(cleanInstant.getTimestamp(), durationInMs, cleanStats);
|
||||
CleanerUtils.convertCleanMetadata(metaClient, cleanInstant.getTimestamp(), durationInMs, cleanStats);
|
||||
logger.info("Cleaned " + metadata.getTotalFilesDeleted() + " files. Earliest Retained :" + metadata.getEarliestCommitToRetain());
|
||||
metrics.updateCleanMetrics(durationInMs.orElseGet(() -> -1L), metadata.getTotalFilesDeleted());
|
||||
|
||||
|
||||
@@ -40,11 +40,13 @@ import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
|
||||
import org.apache.hudi.common.table.timeline.HoodieInstant;
|
||||
import org.apache.hudi.common.table.timeline.HoodieInstant.State;
|
||||
import org.apache.hudi.common.util.AvroUtils;
|
||||
import org.apache.hudi.common.util.CleanerUtils;
|
||||
import org.apache.hudi.common.util.CompactionUtils;
|
||||
import org.apache.hudi.common.util.ConsistencyGuardConfig;
|
||||
import org.apache.hudi.common.util.FSUtils;
|
||||
import org.apache.hudi.common.util.Option;
|
||||
import org.apache.hudi.common.util.collection.Pair;
|
||||
import org.apache.hudi.common.versioning.clean.CleanMetadataMigrator;
|
||||
import org.apache.hudi.config.HoodieCompactionConfig;
|
||||
import org.apache.hudi.config.HoodieWriteConfig;
|
||||
import org.apache.hudi.index.HoodieIndex;
|
||||
@@ -65,6 +67,7 @@ import java.io.IOException;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.nio.file.Paths;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
@@ -75,7 +78,9 @@ import java.util.TreeSet;
|
||||
import java.util.function.Predicate;
|
||||
import java.util.stream.Collectors;
|
||||
import java.util.stream.Stream;
|
||||
import scala.Tuple3;
|
||||
|
||||
import static org.apache.hudi.common.model.HoodieTestUtils.DEFAULT_PARTITION_PATHS;
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertFalse;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
@@ -597,6 +602,103 @@ public class TestCleaner extends TestHoodieClientBase {
|
||||
file2P0L0, Option.of(2)));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testUpgradeDowngrade() {
|
||||
String commitTime = "000";
|
||||
|
||||
String partition1 = DEFAULT_PARTITION_PATHS[0];
|
||||
String partition2 = DEFAULT_PARTITION_PATHS[1];
|
||||
|
||||
String fileName1 = "data1_1_000.parquet";
|
||||
String fileName2 = "data2_1_000.parquet";
|
||||
|
||||
String filePath1 = metaClient.getBasePath() + "/" + partition1 + "/" + fileName1;
|
||||
String filePath2 = metaClient.getBasePath() + "/" + partition1 + "/" + fileName2;
|
||||
|
||||
List<String> deletePathPatterns1 = Arrays.asList(filePath1, filePath2);
|
||||
List<String> successDeleteFiles1 = Arrays.asList(filePath1);
|
||||
List<String> failedDeleteFiles1 = Arrays.asList(filePath2);
|
||||
|
||||
// create partition1 clean stat.
|
||||
HoodieCleanStat cleanStat1 = new HoodieCleanStat(HoodieCleaningPolicy.KEEP_LATEST_FILE_VERSIONS,
|
||||
partition1, deletePathPatterns1, successDeleteFiles1,
|
||||
failedDeleteFiles1, commitTime);
|
||||
|
||||
List<String> deletePathPatterns2 = new ArrayList<>();
|
||||
List<String> successDeleteFiles2 = new ArrayList<>();
|
||||
List<String> failedDeleteFiles2 = new ArrayList<>();
|
||||
|
||||
// create partition2 empty clean stat.
|
||||
HoodieCleanStat cleanStat2 = new HoodieCleanStat(HoodieCleaningPolicy.KEEP_LATEST_COMMITS,
|
||||
partition2, deletePathPatterns2, successDeleteFiles2,
|
||||
failedDeleteFiles2, commitTime);
|
||||
|
||||
// map with absolute file path.
|
||||
Map<String, Tuple3> oldExpected = new HashMap<>();
|
||||
oldExpected.put(partition1, new Tuple3<>(deletePathPatterns1, successDeleteFiles1, failedDeleteFiles1));
|
||||
oldExpected.put(partition2, new Tuple3<>(deletePathPatterns2, successDeleteFiles2, failedDeleteFiles2));
|
||||
|
||||
// map with relative path.
|
||||
Map<String, Tuple3> newExpected = new HashMap<>();
|
||||
newExpected.put(partition1, new Tuple3<>(Arrays.asList(fileName1, fileName2), Arrays.asList(fileName1), Arrays.asList(fileName2)));
|
||||
newExpected.put(partition2, new Tuple3<>(deletePathPatterns2, successDeleteFiles2, failedDeleteFiles2));
|
||||
|
||||
HoodieCleanMetadata metadata =
|
||||
CleanerUtils.convertCleanMetadata(metaClient, commitTime, Option.of(0L), Arrays.asList(cleanStat1, cleanStat2));
|
||||
|
||||
Assert.assertEquals(CleanerUtils.LATEST_CLEAN_METADATA_VERSION, metadata.getVersion());
|
||||
testCleanMetadataPathEquality(metadata, newExpected);
|
||||
|
||||
CleanMetadataMigrator migrator = new CleanMetadataMigrator(metaClient);
|
||||
HoodieCleanMetadata oldMetadata =
|
||||
migrator.migrateToVersion(metadata, metadata.getVersion(), CleanerUtils.CLEAN_METADATA_VERSION_1);
|
||||
Assert.assertEquals(CleanerUtils.CLEAN_METADATA_VERSION_1, oldMetadata.getVersion());
|
||||
testCleanMetadataEquality(metadata, oldMetadata);
|
||||
testCleanMetadataPathEquality(oldMetadata, oldExpected);
|
||||
|
||||
HoodieCleanMetadata newMetadata = migrator.upgradeToLatest(oldMetadata, oldMetadata.getVersion());
|
||||
Assert.assertEquals(CleanerUtils.LATEST_CLEAN_METADATA_VERSION, newMetadata.getVersion());
|
||||
testCleanMetadataEquality(oldMetadata, newMetadata);
|
||||
testCleanMetadataPathEquality(newMetadata, newExpected);
|
||||
testCleanMetadataPathEquality(oldMetadata, oldExpected);
|
||||
}
|
||||
|
||||
public void testCleanMetadataEquality(HoodieCleanMetadata input1, HoodieCleanMetadata input2) {
|
||||
Assert.assertEquals(input1.getEarliestCommitToRetain(), input2.getEarliestCommitToRetain());
|
||||
Assert.assertEquals(input1.getStartCleanTime(), input2.getStartCleanTime());
|
||||
Assert.assertEquals(input1.getTimeTakenInMillis(), input2.getTimeTakenInMillis());
|
||||
Assert.assertEquals(input1.getTotalFilesDeleted(), input2.getTotalFilesDeleted());
|
||||
|
||||
Map<String, HoodieCleanPartitionMetadata> map1 = input1.getPartitionMetadata();
|
||||
Map<String, HoodieCleanPartitionMetadata> map2 = input2.getPartitionMetadata();
|
||||
|
||||
Assert.assertEquals(map1.keySet(), map2.keySet());
|
||||
|
||||
List<String> partitions1 = map1.values().stream().map(m -> m.getPartitionPath()).collect(
|
||||
Collectors.toList());
|
||||
List<String> partitions2 = map2.values().stream().map(m -> m.getPartitionPath()).collect(
|
||||
Collectors.toList());
|
||||
Assert.assertEquals(partitions1, partitions2);
|
||||
|
||||
List<String> policies1 = map1.values().stream().map(m -> m.getPolicy()).collect(Collectors.toList());
|
||||
List<String> policies2 = map2.values().stream().map(m -> m.getPolicy()).collect(Collectors.toList());
|
||||
Assert.assertEquals(policies1, policies2);
|
||||
}
|
||||
|
||||
private void testCleanMetadataPathEquality(HoodieCleanMetadata metadata, Map<String, Tuple3> expected) {
|
||||
|
||||
Map<String, HoodieCleanPartitionMetadata> partitionMetadataMap = metadata.getPartitionMetadata();
|
||||
|
||||
for (Map.Entry<String, HoodieCleanPartitionMetadata> entry : partitionMetadataMap.entrySet()) {
|
||||
String partitionPath = entry.getKey();
|
||||
HoodieCleanPartitionMetadata partitionMetadata = entry.getValue();
|
||||
|
||||
Assert.assertEquals(expected.get(partitionPath)._1(), partitionMetadata.getDeletePathPatterns());
|
||||
Assert.assertEquals(expected.get(partitionPath)._2(), partitionMetadata.getSuccessDeleteFiles());
|
||||
Assert.assertEquals(expected.get(partitionPath)._3(), partitionMetadata.getFailedDeleteFiles());
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Test HoodieTable.clean() Cleaning by commit logic for MOR table with Log files
|
||||
*/
|
||||
|
||||
@@ -141,13 +141,13 @@ public class TestHoodieCommitArchiveLog extends HoodieClientTestHarness {
|
||||
|
||||
assertEquals("Loaded 6 commits and the count should match", 6, timeline.countInstants());
|
||||
|
||||
HoodieTestUtils.createCleanFiles(basePath, "100", dfs.getConf());
|
||||
HoodieTestUtils.createCleanFiles(metaClient, basePath, "100", dfs.getConf());
|
||||
HoodieTestUtils.createInflightCleanFiles(basePath, dfs.getConf(), "101");
|
||||
HoodieTestUtils.createCleanFiles(basePath, "101", dfs.getConf());
|
||||
HoodieTestUtils.createCleanFiles(basePath, "102", dfs.getConf());
|
||||
HoodieTestUtils.createCleanFiles(basePath, "103", dfs.getConf());
|
||||
HoodieTestUtils.createCleanFiles(basePath, "104", dfs.getConf());
|
||||
HoodieTestUtils.createCleanFiles(basePath, "105", dfs.getConf());
|
||||
HoodieTestUtils.createCleanFiles(metaClient, basePath, "101", dfs.getConf());
|
||||
HoodieTestUtils.createCleanFiles(metaClient, basePath, "102", dfs.getConf());
|
||||
HoodieTestUtils.createCleanFiles(metaClient, basePath, "103", dfs.getConf());
|
||||
HoodieTestUtils.createCleanFiles(metaClient, basePath, "104", dfs.getConf());
|
||||
HoodieTestUtils.createCleanFiles(metaClient, basePath, "105", dfs.getConf());
|
||||
HoodieTestUtils.createInflightCleanFiles(basePath, dfs.getConf(), "106", "107");
|
||||
|
||||
// reload the timeline and get all the commmits before archive
|
||||
|
||||
Reference in New Issue
Block a user