From 98ab33bb6e1637d18c8fab7a9ddd50daeaf56962 Mon Sep 17 00:00:00 2001 From: leesf <490081539@qq.com> Date: Wed, 4 Dec 2019 02:11:03 +0800 Subject: [PATCH] [HUDI-294] Delete Paths written in Cleaner plan needs to be relative to partition-path (#1062) [HUDI-294] Delete Paths written in Cleaner plan needs to be relative to partition-path --- .../org/apache/hudi/HoodieCleanClient.java | 5 +- .../java/org/apache/hudi/TestCleaner.java | 102 ++++++++++++++++++ .../hudi/io/TestHoodieCommitArchiveLog.java | 12 +-- ...IncrementalTimelineSyncFileSystemView.java | 9 +- .../apache/hudi/common/util/AvroUtils.java | 22 ---- .../apache/hudi/common/util/CleanerUtils.java | 60 +++++++++++ .../clean/CleanMetadataMigrator.java | 34 ++++++ .../clean/CleanV1MigrationHandler.java | 102 ++++++++++++++++++ .../clean/CleanV2MigrationHandler.java | 98 +++++++++++++++++ .../hudi/common/model/HoodieTestUtils.java | 12 ++- .../table/view/TestIncrementalFSViewSync.java | 4 +- 11 files changed, 425 insertions(+), 35 deletions(-) create mode 100644 hudi-common/src/main/java/org/apache/hudi/common/util/CleanerUtils.java create mode 100644 hudi-common/src/main/java/org/apache/hudi/common/versioning/clean/CleanMetadataMigrator.java create mode 100644 hudi-common/src/main/java/org/apache/hudi/common/versioning/clean/CleanV1MigrationHandler.java create mode 100644 hudi-common/src/main/java/org/apache/hudi/common/versioning/clean/CleanV2MigrationHandler.java diff --git a/hudi-client/src/main/java/org/apache/hudi/HoodieCleanClient.java b/hudi-client/src/main/java/org/apache/hudi/HoodieCleanClient.java index 6fae5ac7f..e08ecfb61 100644 --- a/hudi-client/src/main/java/org/apache/hudi/HoodieCleanClient.java +++ b/hudi-client/src/main/java/org/apache/hudi/HoodieCleanClient.java @@ -23,11 +23,13 @@ import org.apache.hudi.avro.model.HoodieCleanerPlan; import org.apache.hudi.client.embedded.EmbeddedTimelineService; import org.apache.hudi.common.HoodieCleanStat; import org.apache.hudi.common.model.HoodieRecordPayload; +import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.HoodieTimeline; import org.apache.hudi.common.table.timeline.HoodieActiveTimeline; import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.table.timeline.HoodieInstant.State; import org.apache.hudi.common.util.AvroUtils; +import org.apache.hudi.common.util.CleanerUtils; import org.apache.hudi.common.util.Option; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.exception.HoodieIOException; @@ -166,9 +168,10 @@ public class HoodieCleanClient extends AbstractHo logger.info("cleanerElaspsedTime (Minutes): " + durationInMs.get() / (1000 * 60)); } + HoodieTableMetaClient metaClient = createMetaClient(true); // Create the metadata and save it HoodieCleanMetadata metadata = - AvroUtils.convertCleanMetadata(cleanInstant.getTimestamp(), durationInMs, cleanStats); + CleanerUtils.convertCleanMetadata(metaClient, cleanInstant.getTimestamp(), durationInMs, cleanStats); logger.info("Cleaned " + metadata.getTotalFilesDeleted() + " files. Earliest Retained :" + metadata.getEarliestCommitToRetain()); metrics.updateCleanMetrics(durationInMs.orElseGet(() -> -1L), metadata.getTotalFilesDeleted()); diff --git a/hudi-client/src/test/java/org/apache/hudi/TestCleaner.java b/hudi-client/src/test/java/org/apache/hudi/TestCleaner.java index 8dfcd2413..370021a2c 100644 --- a/hudi-client/src/test/java/org/apache/hudi/TestCleaner.java +++ b/hudi-client/src/test/java/org/apache/hudi/TestCleaner.java @@ -40,11 +40,13 @@ import org.apache.hudi.common.table.timeline.HoodieActiveTimeline; import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.table.timeline.HoodieInstant.State; import org.apache.hudi.common.util.AvroUtils; +import org.apache.hudi.common.util.CleanerUtils; import org.apache.hudi.common.util.CompactionUtils; import org.apache.hudi.common.util.ConsistencyGuardConfig; import org.apache.hudi.common.util.FSUtils; import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.collection.Pair; +import org.apache.hudi.common.versioning.clean.CleanMetadataMigrator; import org.apache.hudi.config.HoodieCompactionConfig; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.index.HoodieIndex; @@ -65,6 +67,7 @@ import java.io.IOException; import java.nio.charset.StandardCharsets; import java.nio.file.Paths; import java.util.ArrayList; +import java.util.Arrays; import java.util.HashMap; import java.util.HashSet; import java.util.List; @@ -75,7 +78,9 @@ import java.util.TreeSet; import java.util.function.Predicate; import java.util.stream.Collectors; import java.util.stream.Stream; +import scala.Tuple3; +import static org.apache.hudi.common.model.HoodieTestUtils.DEFAULT_PARTITION_PATHS; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; @@ -597,6 +602,103 @@ public class TestCleaner extends TestHoodieClientBase { file2P0L0, Option.of(2))); } + @Test + public void testUpgradeDowngrade() { + String commitTime = "000"; + + String partition1 = DEFAULT_PARTITION_PATHS[0]; + String partition2 = DEFAULT_PARTITION_PATHS[1]; + + String fileName1 = "data1_1_000.parquet"; + String fileName2 = "data2_1_000.parquet"; + + String filePath1 = metaClient.getBasePath() + "/" + partition1 + "/" + fileName1; + String filePath2 = metaClient.getBasePath() + "/" + partition1 + "/" + fileName2; + + List deletePathPatterns1 = Arrays.asList(filePath1, filePath2); + List successDeleteFiles1 = Arrays.asList(filePath1); + List failedDeleteFiles1 = Arrays.asList(filePath2); + + // create partition1 clean stat. + HoodieCleanStat cleanStat1 = new HoodieCleanStat(HoodieCleaningPolicy.KEEP_LATEST_FILE_VERSIONS, + partition1, deletePathPatterns1, successDeleteFiles1, + failedDeleteFiles1, commitTime); + + List deletePathPatterns2 = new ArrayList<>(); + List successDeleteFiles2 = new ArrayList<>(); + List failedDeleteFiles2 = new ArrayList<>(); + + // create partition2 empty clean stat. + HoodieCleanStat cleanStat2 = new HoodieCleanStat(HoodieCleaningPolicy.KEEP_LATEST_COMMITS, + partition2, deletePathPatterns2, successDeleteFiles2, + failedDeleteFiles2, commitTime); + + // map with absolute file path. + Map oldExpected = new HashMap<>(); + oldExpected.put(partition1, new Tuple3<>(deletePathPatterns1, successDeleteFiles1, failedDeleteFiles1)); + oldExpected.put(partition2, new Tuple3<>(deletePathPatterns2, successDeleteFiles2, failedDeleteFiles2)); + + // map with relative path. + Map newExpected = new HashMap<>(); + newExpected.put(partition1, new Tuple3<>(Arrays.asList(fileName1, fileName2), Arrays.asList(fileName1), Arrays.asList(fileName2))); + newExpected.put(partition2, new Tuple3<>(deletePathPatterns2, successDeleteFiles2, failedDeleteFiles2)); + + HoodieCleanMetadata metadata = + CleanerUtils.convertCleanMetadata(metaClient, commitTime, Option.of(0L), Arrays.asList(cleanStat1, cleanStat2)); + + Assert.assertEquals(CleanerUtils.LATEST_CLEAN_METADATA_VERSION, metadata.getVersion()); + testCleanMetadataPathEquality(metadata, newExpected); + + CleanMetadataMigrator migrator = new CleanMetadataMigrator(metaClient); + HoodieCleanMetadata oldMetadata = + migrator.migrateToVersion(metadata, metadata.getVersion(), CleanerUtils.CLEAN_METADATA_VERSION_1); + Assert.assertEquals(CleanerUtils.CLEAN_METADATA_VERSION_1, oldMetadata.getVersion()); + testCleanMetadataEquality(metadata, oldMetadata); + testCleanMetadataPathEquality(oldMetadata, oldExpected); + + HoodieCleanMetadata newMetadata = migrator.upgradeToLatest(oldMetadata, oldMetadata.getVersion()); + Assert.assertEquals(CleanerUtils.LATEST_CLEAN_METADATA_VERSION, newMetadata.getVersion()); + testCleanMetadataEquality(oldMetadata, newMetadata); + testCleanMetadataPathEquality(newMetadata, newExpected); + testCleanMetadataPathEquality(oldMetadata, oldExpected); + } + + public void testCleanMetadataEquality(HoodieCleanMetadata input1, HoodieCleanMetadata input2) { + Assert.assertEquals(input1.getEarliestCommitToRetain(), input2.getEarliestCommitToRetain()); + Assert.assertEquals(input1.getStartCleanTime(), input2.getStartCleanTime()); + Assert.assertEquals(input1.getTimeTakenInMillis(), input2.getTimeTakenInMillis()); + Assert.assertEquals(input1.getTotalFilesDeleted(), input2.getTotalFilesDeleted()); + + Map map1 = input1.getPartitionMetadata(); + Map map2 = input2.getPartitionMetadata(); + + Assert.assertEquals(map1.keySet(), map2.keySet()); + + List partitions1 = map1.values().stream().map(m -> m.getPartitionPath()).collect( + Collectors.toList()); + List partitions2 = map2.values().stream().map(m -> m.getPartitionPath()).collect( + Collectors.toList()); + Assert.assertEquals(partitions1, partitions2); + + List policies1 = map1.values().stream().map(m -> m.getPolicy()).collect(Collectors.toList()); + List policies2 = map2.values().stream().map(m -> m.getPolicy()).collect(Collectors.toList()); + Assert.assertEquals(policies1, policies2); + } + + private void testCleanMetadataPathEquality(HoodieCleanMetadata metadata, Map expected) { + + Map partitionMetadataMap = metadata.getPartitionMetadata(); + + for (Map.Entry entry : partitionMetadataMap.entrySet()) { + String partitionPath = entry.getKey(); + HoodieCleanPartitionMetadata partitionMetadata = entry.getValue(); + + Assert.assertEquals(expected.get(partitionPath)._1(), partitionMetadata.getDeletePathPatterns()); + Assert.assertEquals(expected.get(partitionPath)._2(), partitionMetadata.getSuccessDeleteFiles()); + Assert.assertEquals(expected.get(partitionPath)._3(), partitionMetadata.getFailedDeleteFiles()); + } + } + /** * Test HoodieTable.clean() Cleaning by commit logic for MOR table with Log files */ diff --git a/hudi-client/src/test/java/org/apache/hudi/io/TestHoodieCommitArchiveLog.java b/hudi-client/src/test/java/org/apache/hudi/io/TestHoodieCommitArchiveLog.java index a672b91a8..1f7443d14 100644 --- a/hudi-client/src/test/java/org/apache/hudi/io/TestHoodieCommitArchiveLog.java +++ b/hudi-client/src/test/java/org/apache/hudi/io/TestHoodieCommitArchiveLog.java @@ -141,13 +141,13 @@ public class TestHoodieCommitArchiveLog extends HoodieClientTestHarness { assertEquals("Loaded 6 commits and the count should match", 6, timeline.countInstants()); - HoodieTestUtils.createCleanFiles(basePath, "100", dfs.getConf()); + HoodieTestUtils.createCleanFiles(metaClient, basePath, "100", dfs.getConf()); HoodieTestUtils.createInflightCleanFiles(basePath, dfs.getConf(), "101"); - HoodieTestUtils.createCleanFiles(basePath, "101", dfs.getConf()); - HoodieTestUtils.createCleanFiles(basePath, "102", dfs.getConf()); - HoodieTestUtils.createCleanFiles(basePath, "103", dfs.getConf()); - HoodieTestUtils.createCleanFiles(basePath, "104", dfs.getConf()); - HoodieTestUtils.createCleanFiles(basePath, "105", dfs.getConf()); + HoodieTestUtils.createCleanFiles(metaClient, basePath, "101", dfs.getConf()); + HoodieTestUtils.createCleanFiles(metaClient, basePath, "102", dfs.getConf()); + HoodieTestUtils.createCleanFiles(metaClient, basePath, "103", dfs.getConf()); + HoodieTestUtils.createCleanFiles(metaClient, basePath, "104", dfs.getConf()); + HoodieTestUtils.createCleanFiles(metaClient, basePath, "105", dfs.getConf()); HoodieTestUtils.createInflightCleanFiles(basePath, dfs.getConf(), "106", "107"); // reload the timeline and get all the commmits before archive diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/view/IncrementalTimelineSyncFileSystemView.java b/hudi-common/src/main/java/org/apache/hudi/common/table/view/IncrementalTimelineSyncFileSystemView.java index 6dccb4b45..42e1e2407 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/view/IncrementalTimelineSyncFileSystemView.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/view/IncrementalTimelineSyncFileSystemView.java @@ -32,6 +32,7 @@ import org.apache.hudi.common.table.HoodieTimeline; import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.util.AvroUtils; import org.apache.hudi.common.util.CompactionUtils; +import org.apache.hudi.common.util.FSUtils; import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.TimelineDiffHelper; import org.apache.hudi.common.util.TimelineDiffHelper.TimelineDiffResult; @@ -261,7 +262,13 @@ public abstract class IncrementalTimelineSyncFileSystemView extends AbstractTabl HoodieCleanMetadata cleanMetadata = AvroUtils.deserializeHoodieCleanMetadata(timeline.getInstantDetails(instant).get()); cleanMetadata.getPartitionMetadata().entrySet().stream().forEach(entry -> { - removeFileSlicesForPartition(timeline, instant, entry.getKey(), entry.getValue().getSuccessDeleteFiles()); + final String basePath = metaClient.getBasePath(); + final String partitionPath = entry.getValue().getPartitionPath(); + List fullPathList = entry.getValue().getSuccessDeleteFiles() + .stream().map(fileName -> new Path(FSUtils + .getPartitionPath(basePath, partitionPath), fileName).toString()) + .collect(Collectors.toList()); + removeFileSlicesForPartition(timeline, instant, entry.getKey(), fullPathList); }); log.info("Done Syncing cleaner instant (" + instant + ")"); } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/AvroUtils.java b/hudi-common/src/main/java/org/apache/hudi/common/util/AvroUtils.java index 1b5a7b796..7a90274d5 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/util/AvroUtils.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/util/AvroUtils.java @@ -19,7 +19,6 @@ package org.apache.hudi.common.util; import org.apache.hudi.avro.model.HoodieCleanMetadata; -import org.apache.hudi.avro.model.HoodieCleanPartitionMetadata; import org.apache.hudi.avro.model.HoodieCleanerPlan; import org.apache.hudi.avro.model.HoodieCompactionPlan; import org.apache.hudi.avro.model.HoodieRestoreMetadata; @@ -27,7 +26,6 @@ import org.apache.hudi.avro.model.HoodieRollbackMetadata; import org.apache.hudi.avro.model.HoodieRollbackPartitionMetadata; import org.apache.hudi.avro.model.HoodieSavepointMetadata; import org.apache.hudi.avro.model.HoodieSavepointPartitionMetadata; -import org.apache.hudi.common.HoodieCleanStat; import org.apache.hudi.common.HoodieRollbackStat; import com.google.common.base.Preconditions; @@ -52,26 +50,6 @@ public class AvroUtils { private static final Integer DEFAULT_VERSION = 1; - public static HoodieCleanMetadata convertCleanMetadata(String startCleanTime, Option durationInMs, - List cleanStats) { - ImmutableMap.Builder partitionMetadataBuilder = ImmutableMap.builder(); - int totalDeleted = 0; - String earliestCommitToRetain = null; - for (HoodieCleanStat stat : cleanStats) { - HoodieCleanPartitionMetadata metadata = - new HoodieCleanPartitionMetadata(stat.getPartitionPath(), stat.getPolicy().name(), - stat.getDeletePathPatterns(), stat.getSuccessDeleteFiles(), stat.getDeletePathPatterns()); - partitionMetadataBuilder.put(stat.getPartitionPath(), metadata); - totalDeleted += stat.getSuccessDeleteFiles().size(); - if (earliestCommitToRetain == null) { - // This will be the same for all partitions - earliestCommitToRetain = stat.getEarliestCommitToRetain(); - } - } - return new HoodieCleanMetadata(startCleanTime, durationInMs.orElseGet(() -> -1L), totalDeleted, - earliestCommitToRetain, partitionMetadataBuilder.build(), DEFAULT_VERSION); - } - public static HoodieRestoreMetadata convertRestoreMetadata(String startRestoreTime, Option durationInMs, List commits, Map> commitToStats) { ImmutableMap.Builder> commitToStatBuilder = ImmutableMap.builder(); diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/CleanerUtils.java b/hudi-common/src/main/java/org/apache/hudi/common/util/CleanerUtils.java new file mode 100644 index 000000000..4d4ccb93c --- /dev/null +++ b/hudi-common/src/main/java/org/apache/hudi/common/util/CleanerUtils.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.common.util; + +import com.google.common.collect.ImmutableMap; +import java.util.List; +import org.apache.hudi.avro.model.HoodieCleanMetadata; +import org.apache.hudi.avro.model.HoodieCleanPartitionMetadata; +import org.apache.hudi.common.HoodieCleanStat; +import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.common.versioning.clean.CleanMetadataMigrator; +import org.apache.hudi.common.versioning.clean.CleanV1MigrationHandler; +import org.apache.hudi.common.versioning.clean.CleanV2MigrationHandler; + +public class CleanerUtils { + public static final Integer CLEAN_METADATA_VERSION_1 = CleanV1MigrationHandler.VERSION; + public static final Integer CLEAN_METADATA_VERSION_2 = CleanV2MigrationHandler.VERSION; + public static final Integer LATEST_CLEAN_METADATA_VERSION = CLEAN_METADATA_VERSION_2; + + public static HoodieCleanMetadata convertCleanMetadata(HoodieTableMetaClient metaClient, + String startCleanTime, Option durationInMs, List cleanStats) { + ImmutableMap.Builder partitionMetadataBuilder = ImmutableMap.builder(); + int totalDeleted = 0; + String earliestCommitToRetain = null; + for (HoodieCleanStat stat : cleanStats) { + HoodieCleanPartitionMetadata metadata = + new HoodieCleanPartitionMetadata(stat.getPartitionPath(), stat.getPolicy().name(), + stat.getDeletePathPatterns(), stat.getSuccessDeleteFiles(), stat.getFailedDeleteFiles()); + partitionMetadataBuilder.put(stat.getPartitionPath(), metadata); + totalDeleted += stat.getSuccessDeleteFiles().size(); + if (earliestCommitToRetain == null) { + // This will be the same for all partitions + earliestCommitToRetain = stat.getEarliestCommitToRetain(); + } + } + + HoodieCleanMetadata metadata = new HoodieCleanMetadata(startCleanTime, + durationInMs.orElseGet(() -> -1L), totalDeleted, earliestCommitToRetain, + partitionMetadataBuilder.build(), CLEAN_METADATA_VERSION_1); + + CleanMetadataMigrator metadataMigrator = new CleanMetadataMigrator(metaClient); + return metadataMigrator.upgradeToLatest(metadata, metadata.getVersion()); + } +} diff --git a/hudi-common/src/main/java/org/apache/hudi/common/versioning/clean/CleanMetadataMigrator.java b/hudi-common/src/main/java/org/apache/hudi/common/versioning/clean/CleanMetadataMigrator.java new file mode 100644 index 000000000..219572d68 --- /dev/null +++ b/hudi-common/src/main/java/org/apache/hudi/common/versioning/clean/CleanMetadataMigrator.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.common.versioning.clean; + +import java.util.Arrays; +import org.apache.hudi.avro.model.HoodieCleanMetadata; +import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.common.versioning.MetadataMigrator; + +public class CleanMetadataMigrator extends MetadataMigrator { + + public CleanMetadataMigrator(HoodieTableMetaClient metaClient) { + super(metaClient, + Arrays + .asList(new CleanV1MigrationHandler(metaClient), + new CleanV2MigrationHandler(metaClient))); + } +} diff --git a/hudi-common/src/main/java/org/apache/hudi/common/versioning/clean/CleanV1MigrationHandler.java b/hudi-common/src/main/java/org/apache/hudi/common/versioning/clean/CleanV1MigrationHandler.java new file mode 100644 index 000000000..d9dbead16 --- /dev/null +++ b/hudi-common/src/main/java/org/apache/hudi/common/versioning/clean/CleanV1MigrationHandler.java @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.common.versioning.clean; + +import org.apache.hudi.avro.model.HoodieCleanMetadata; +import org.apache.hudi.avro.model.HoodieCleanPartitionMetadata; +import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.common.util.FSUtils; +import org.apache.hudi.common.util.collection.Pair; +import org.apache.hudi.common.versioning.AbstractMigratorBase; + +import com.google.common.base.Preconditions; +import java.util.Map; +import java.util.stream.Collectors; +import org.apache.hadoop.fs.Path; + +public class CleanV1MigrationHandler extends AbstractMigratorBase { + + public static final Integer VERSION = 1; + + public CleanV1MigrationHandler(HoodieTableMetaClient metaClient) { + super(metaClient); + } + + @Override + public Integer getManagedVersion() { + return VERSION; + } + + @Override + public HoodieCleanMetadata upgradeFrom(HoodieCleanMetadata input) { + throw new IllegalArgumentException( + "This is the lowest version. Input cannot be any lower version"); + } + + @Override + public HoodieCleanMetadata downgradeFrom(HoodieCleanMetadata input) { + Preconditions.checkArgument(input.getVersion() == 2, + "Input version is " + input.getVersion() + ". Must be 2"); + final Path basePath = new Path(metaClient.getBasePath()); + + final Map partitionMetadataMap = input + .getPartitionMetadata() + .entrySet().stream().map(entry -> { + final String partitionPath = entry.getKey(); + final HoodieCleanPartitionMetadata partitionMetadata = entry.getValue(); + + HoodieCleanPartitionMetadata cleanPartitionMetadata = HoodieCleanPartitionMetadata + .newBuilder() + .setDeletePathPatterns(partitionMetadata.getDeletePathPatterns().stream() + .map( + path -> convertToV1Path(basePath, partitionMetadata.getPartitionPath(), path)) + .collect(Collectors.toList())) + .setSuccessDeleteFiles(partitionMetadata.getSuccessDeleteFiles().stream() + .map( + path -> convertToV1Path(basePath, partitionMetadata.getPartitionPath(), path)) + .collect(Collectors.toList())).setPartitionPath(partitionPath) + .setFailedDeleteFiles(partitionMetadata.getFailedDeleteFiles().stream() + .map( + path -> convertToV1Path(basePath, partitionMetadata.getPartitionPath(), path)) + .collect(Collectors.toList())) + .setPolicy(partitionMetadata.getPolicy()).setPartitionPath(partitionPath) + .build(); + return Pair.of(partitionPath, cleanPartitionMetadata); + }).collect(Collectors.toMap(Pair::getKey, Pair::getValue)); + + HoodieCleanMetadata metadata = HoodieCleanMetadata.newBuilder() + .setEarliestCommitToRetain(input.getEarliestCommitToRetain()) + .setStartCleanTime(input.getStartCleanTime()) + .setTimeTakenInMillis(input.getTimeTakenInMillis()) + .setTotalFilesDeleted(input.getTotalFilesDeleted()) + .setPartitionMetadata(partitionMetadataMap) + .setVersion(getManagedVersion()).build(); + + return metadata; + + } + + private static String convertToV1Path(Path basePath, String partitionPath, String fileName) { + if ((fileName == null) || (fileName.isEmpty())) { + return fileName; + } + + return new Path(FSUtils.getPartitionPath(basePath, partitionPath), fileName).toString(); + } +} diff --git a/hudi-common/src/main/java/org/apache/hudi/common/versioning/clean/CleanV2MigrationHandler.java b/hudi-common/src/main/java/org/apache/hudi/common/versioning/clean/CleanV2MigrationHandler.java new file mode 100644 index 000000000..260967204 --- /dev/null +++ b/hudi-common/src/main/java/org/apache/hudi/common/versioning/clean/CleanV2MigrationHandler.java @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.common.versioning.clean; + +import org.apache.hudi.avro.model.HoodieCleanMetadata; +import org.apache.hudi.avro.model.HoodieCleanPartitionMetadata; +import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.common.util.collection.Pair; +import org.apache.hudi.common.versioning.AbstractMigratorBase; + +import com.google.common.base.Preconditions; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; +import org.apache.hadoop.fs.Path; + +public class CleanV2MigrationHandler extends AbstractMigratorBase { + + public static final Integer VERSION = 2; + + public CleanV2MigrationHandler(HoodieTableMetaClient metaClient) { + super(metaClient); + } + + @Override + public Integer getManagedVersion() { + return VERSION; + } + + @Override + public HoodieCleanMetadata upgradeFrom(HoodieCleanMetadata input) { + Preconditions.checkArgument(input.getVersion() == 1, + "Input version is " + input.getVersion() + ". Must be 1"); + HoodieCleanMetadata metadata = new HoodieCleanMetadata(); + metadata.setEarliestCommitToRetain(input.getEarliestCommitToRetain()); + metadata.setTimeTakenInMillis(input.getTimeTakenInMillis()); + metadata.setStartCleanTime(input.getStartCleanTime()); + metadata.setTotalFilesDeleted(input.getTotalFilesDeleted()); + metadata.setVersion(getManagedVersion()); + + Map partitionMetadataMap = input.getPartitionMetadata() + .entrySet() + .stream().map(entry -> { + final String partitionPath = entry.getKey(); + final HoodieCleanPartitionMetadata partitionMetadata = entry.getValue(); + + final List deletePathPatterns = convertToV2Path( + partitionMetadata.getDeletePathPatterns()); + final List successDeleteFiles = convertToV2Path( + partitionMetadata.getSuccessDeleteFiles()); + final List failedDeleteFiles = convertToV2Path( + partitionMetadata.getFailedDeleteFiles()); + + final HoodieCleanPartitionMetadata cleanPartitionMetadata = HoodieCleanPartitionMetadata + .newBuilder().setPolicy(partitionMetadata.getPolicy()) + .setPartitionPath(partitionMetadata.getPartitionPath()) + .setDeletePathPatterns(deletePathPatterns) + .setSuccessDeleteFiles(successDeleteFiles) + .setFailedDeleteFiles(failedDeleteFiles).build(); + + return Pair.of(partitionPath, cleanPartitionMetadata); + }).collect(Collectors.toMap(Pair::getKey, Pair::getValue)); + + return HoodieCleanMetadata.newBuilder() + .setEarliestCommitToRetain(input.getEarliestCommitToRetain()) + .setStartCleanTime(input.getStartCleanTime()) + .setTimeTakenInMillis(input.getTimeTakenInMillis()) + .setTotalFilesDeleted(input.getTotalFilesDeleted()) + .setPartitionMetadata(partitionMetadataMap).setVersion(getManagedVersion()).build(); + } + + @Override + public HoodieCleanMetadata downgradeFrom(HoodieCleanMetadata input) { + throw new IllegalArgumentException( + "This is the current highest version. Input cannot be any higher version"); + } + + private List convertToV2Path(List paths) { + return paths.stream().map(path -> new Path(path).getName()) + .collect(Collectors.toList()); + } +} diff --git a/hudi-common/src/test/java/org/apache/hudi/common/model/HoodieTestUtils.java b/hudi-common/src/test/java/org/apache/hudi/common/model/HoodieTestUtils.java index 405ba51b8..80f11e062 100644 --- a/hudi-common/src/test/java/org/apache/hudi/common/model/HoodieTestUtils.java +++ b/hudi-common/src/test/java/org/apache/hudi/common/model/HoodieTestUtils.java @@ -33,6 +33,7 @@ import org.apache.hudi.common.table.timeline.HoodieActiveTimeline; import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.table.timeline.HoodieInstant.State; import org.apache.hudi.common.util.AvroUtils; +import org.apache.hudi.common.util.CleanerUtils; import org.apache.hudi.common.util.CompactionUtils; import org.apache.hudi.common.util.FSUtils; import org.apache.hudi.common.util.HoodieAvroUtils; @@ -269,7 +270,8 @@ public class HoodieTestUtils { .exists(); } - public static void createCleanFiles(String basePath, String commitTime, Configuration configuration) + public static void createCleanFiles(HoodieTableMetaClient metaClient, String basePath, + String commitTime, Configuration configuration) throws IOException { Path commitFile = new Path( basePath + "/" + HoodieTableMetaClient.METAFOLDER_NAME + "/" + HoodieTimeline.makeCleanerFileName(commitTime)); @@ -280,8 +282,9 @@ public class HoodieTestUtils { DEFAULT_PARTITION_PATHS[rand.nextInt(DEFAULT_PARTITION_PATHS.length)], new ArrayList<>(), new ArrayList<>(), new ArrayList<>(), commitTime); // Create the clean metadata + HoodieCleanMetadata cleanMetadata = - AvroUtils.convertCleanMetadata(commitTime, Option.of(0L), Arrays.asList(cleanStats)); + CleanerUtils.convertCleanMetadata(metaClient, commitTime, Option.of(0L), Arrays.asList(cleanStats)); // Write empty clean metadata os.write(AvroUtils.serializeCleanMetadata(cleanMetadata).get()); } finally { @@ -289,8 +292,9 @@ public class HoodieTestUtils { } } - public static void createCleanFiles(String basePath, String commitTime) throws IOException { - createCleanFiles(basePath, commitTime, HoodieTestUtils.getDefaultHadoopConf()); + public static void createCleanFiles(HoodieTableMetaClient metaClient, + String basePath, String commitTime) throws IOException { + createCleanFiles(metaClient, basePath, commitTime, HoodieTestUtils.getDefaultHadoopConf()); } public static String makeTestFileName(String instant) { diff --git a/hudi-common/src/test/java/org/apache/hudi/common/table/view/TestIncrementalFSViewSync.java b/hudi-common/src/test/java/org/apache/hudi/common/table/view/TestIncrementalFSViewSync.java index adc860dfb..e47b834a9 100644 --- a/hudi-common/src/test/java/org/apache/hudi/common/table/view/TestIncrementalFSViewSync.java +++ b/hudi-common/src/test/java/org/apache/hudi/common/table/view/TestIncrementalFSViewSync.java @@ -40,6 +40,7 @@ import org.apache.hudi.common.table.SyncableFileSystemView; import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.table.timeline.HoodieInstant.State; import org.apache.hudi.common.util.AvroUtils; +import org.apache.hudi.common.util.CleanerUtils; import org.apache.hudi.common.util.CompactionUtils; import org.apache.hudi.common.util.FSUtils; import org.apache.hudi.common.util.Option; @@ -417,7 +418,8 @@ public class TestIncrementalFSViewSync extends HoodieCommonTestHarness { new ArrayList<>(), Integer.toString(Integer.parseInt(instant) + 1)); }).collect(Collectors.toList()); - HoodieCleanMetadata cleanMetadata = AvroUtils.convertCleanMetadata(cleanInstant, Option.empty(), cleanStats); + HoodieCleanMetadata cleanMetadata = CleanerUtils + .convertCleanMetadata(metaClient, cleanInstant, Option.empty(), cleanStats); metaClient.getActiveTimeline().saveAsComplete(new HoodieInstant(true, HoodieTimeline.CLEAN_ACTION, cleanInstant), AvroUtils.serializeCleanMetadata(cleanMetadata)); }