From 2444f40a4be5bbf0bf210dee5690267a9a1e35c8 Mon Sep 17 00:00:00 2001 From: Yuwei XIAO Date: Fri, 31 Dec 2021 13:37:52 +0800 Subject: [PATCH] [HUDI-3095] abstract partition filter logic to enable code reuse (#4454) * [HUDI-3095] abstract partition filter logic to enable code reuse * [HUDI-3095] address reviews --- .../hudi/config/HoodieClusteringConfig.java | 43 +++++++ .../apache/hudi/config/HoodieWriteConfig.java | 14 +++ .../BaseClusteringPlanActionExecutor.java | 3 +- .../ClusteringPlanPartitionFilter.java | 68 +++++++++++ .../ClusteringPlanPartitionFilterMode.java | 28 +++++ .../strategy/ClusteringPlanStrategy.java | 34 ++++++ .../PartitionAwareClusteringPlanStrategy.java | 6 +- ...teringPlanStrategyConfigCompatibility.java | 66 +++++++++++ .../JavaRecentDaysClusteringPlanStrategy.java | 65 ----------- .../JavaSizeBasedClusteringPlanStrategy.java | 5 - ...SparkRecentDaysClusteringPlanStrategy.java | 63 ----------- ...ectedPartitionsClusteringPlanStrategy.java | 66 ----------- .../SparkSingleFileSortPlanStrategy.java | 1 + .../SparkSizeBasedClusteringPlanStrategy.java | 5 - ...SparkRecentDaysClusteringPlanStrategy.java | 66 ----------- ...estSparkClusteringPlanPartitionFilter.java | 107 ++++++++++++++++++ 16 files changed, 368 insertions(+), 272 deletions(-) create mode 100644 hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/ClusteringPlanPartitionFilter.java create mode 100644 hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/ClusteringPlanPartitionFilterMode.java create mode 100644 hudi-client/hudi-client-common/src/test/java/org/apache/hudi/table/cluster/strategy/TestClusteringPlanStrategyConfigCompatibility.java delete mode 100644 hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/clustering/plan/strategy/JavaRecentDaysClusteringPlanStrategy.java delete mode 100644 hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/plan/strategy/SparkRecentDaysClusteringPlanStrategy.java delete mode 100644 hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/plan/strategy/SparkSelectedPartitionsClusteringPlanStrategy.java delete mode 100644 hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/clustering/plan/strategy/TestSparkRecentDaysClusteringPlanStrategy.java create mode 100644 hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/cluster/strategy/TestSparkClusteringPlanPartitionFilter.java diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieClusteringConfig.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieClusteringConfig.java index 4f80b6608..9486ad43d 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieClusteringConfig.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieClusteringConfig.java @@ -27,6 +27,7 @@ import org.apache.hudi.common.engine.EngineType; import org.apache.hudi.common.util.TypeUtils; import org.apache.hudi.exception.HoodieException; import org.apache.hudi.exception.HoodieNotSupportedException; +import org.apache.hudi.table.action.cluster.ClusteringPlanPartitionFilterMode; import javax.annotation.Nonnull; import java.io.File; @@ -54,6 +55,8 @@ public class HoodieClusteringConfig extends HoodieConfig { "org.apache.hudi.client.clustering.run.strategy.SparkSortAndSizeExecutionStrategy"; public static final String JAVA_SORT_AND_SIZE_EXECUTION_STRATEGY = "org.apache.hudi.client.clustering.run.strategy.JavaSortAndSizeExecutionStrategy"; + public static final String PLAN_PARTITION_FILTER_MODE = + "hoodie.clustering.plan.partition.filter.mode"; // Any Space-filling curves optimize(z-order/hilbert) params can be saved with this prefix public static final String LAYOUT_OPTIMIZE_PARAM_PREFIX = "hoodie.layout.optimize."; @@ -64,6 +67,20 @@ public class HoodieClusteringConfig extends HoodieConfig { .sinceVersion("0.7.0") .withDocumentation("Number of partitions to list to create ClusteringPlan"); + public static final ConfigProperty PARTITION_FILTER_BEGIN_PARTITION = ConfigProperty + .key(CLUSTERING_STRATEGY_PARAM_PREFIX + "cluster.begin.partition") + .noDefaultValue() + .sinceVersion("0.11.0") + .withDocumentation("Begin partition used to filter partition (inclusive), only effective when the filter mode '" + + PLAN_PARTITION_FILTER_MODE + "' is " + ClusteringPlanPartitionFilterMode.SELECTED_PARTITIONS.name()); + + public static final ConfigProperty PARTITION_FILTER_END_PARTITION = ConfigProperty + .key(CLUSTERING_STRATEGY_PARAM_PREFIX + "cluster.end.partition") + .noDefaultValue() + .sinceVersion("0.11.0") + .withDocumentation("End partition used to filter partition (inclusive), only effective when the filter mode '" + + PLAN_PARTITION_FILTER_MODE + "' is " + ClusteringPlanPartitionFilterMode.SELECTED_PARTITIONS.name()); + public static final ConfigProperty PLAN_STRATEGY_SMALL_FILE_LIMIT = ConfigProperty .key(CLUSTERING_STRATEGY_PARAM_PREFIX + "small.file.limit") .defaultValue(String.valueOf(600 * 1024 * 1024L)) @@ -110,6 +127,17 @@ public class HoodieClusteringConfig extends HoodieConfig { .sinceVersion("0.9.0") .withDocumentation("Number of partitions to skip from latest when choosing partitions to create ClusteringPlan"); + public static final ConfigProperty PLAN_PARTITION_FILTER_MODE_NAME = ConfigProperty + .key(PLAN_PARTITION_FILTER_MODE) + .defaultValue(ClusteringPlanPartitionFilterMode.NONE) + .sinceVersion("0.11.0") + .withDocumentation("Partition filter mode used in the creation of clustering plan. Available values are - " + + "NONE: do not filter table partition and thus the clustering plan will include all partitions that have clustering candidate." + + "RECENT_DAYS: keep a continuous range of partitions, worked together with configs '" + DAYBASED_LOOKBACK_PARTITIONS.key() + "' and '" + + PLAN_STRATEGY_SKIP_PARTITIONS_FROM_LATEST.key() + "." + + "SELECTED_PARTITIONS: keep partitions that are in the specified range ['" + PARTITION_FILTER_BEGIN_PARTITION.key() + "', '" + + PARTITION_FILTER_END_PARTITION.key() + "']."); + public static final ConfigProperty PLAN_STRATEGY_MAX_BYTES_PER_OUTPUT_FILEGROUP = ConfigProperty .key(CLUSTERING_STRATEGY_PARAM_PREFIX + "max.bytes.per.group") .defaultValue(String.valueOf(2 * 1024 * 1024 * 1024L)) @@ -381,6 +409,11 @@ public class HoodieClusteringConfig extends HoodieConfig { return this; } + public Builder withClusteringPlanPartitionFilterMode(ClusteringPlanPartitionFilterMode mode) { + clusteringConfig.setValue(PLAN_PARTITION_FILTER_MODE_NAME.key(), mode.toString()); + return this; + } + public Builder withClusteringExecutionStrategyClass(String runClusteringStrategyClass) { clusteringConfig.setValue(EXECUTION_STRATEGY_CLASS_NAME, runClusteringStrategyClass); return this; @@ -396,6 +429,16 @@ public class HoodieClusteringConfig extends HoodieConfig { return this; } + public Builder withClusteringPartitionFilterBeginPartition(String begin) { + clusteringConfig.setValue(PARTITION_FILTER_BEGIN_PARTITION, begin); + return this; + } + + public Builder withClusteringPartitionFilterEndPartition(String end) { + clusteringConfig.setValue(PARTITION_FILTER_END_PARTITION, end); + return this; + } + public Builder withClusteringPlanSmallFileLimit(long clusteringSmallFileLimit) { clusteringConfig.setValue(PLAN_STRATEGY_SMALL_FILE_LIMIT, String.valueOf(clusteringSmallFileLimit)); return this; diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java index 74a17bffe..d8e4bd3b2 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java @@ -53,6 +53,7 @@ import org.apache.hudi.keygen.constant.KeyGeneratorType; import org.apache.hudi.metrics.MetricsReporterType; import org.apache.hudi.metrics.datadog.DatadogHttpClient.ApiSite; import org.apache.hudi.table.RandomFileIdPrefixProvider; +import org.apache.hudi.table.action.cluster.ClusteringPlanPartitionFilterMode; import org.apache.hudi.table.action.compact.CompactionTriggerStrategy; import org.apache.hudi.table.action.compact.strategy.CompactionStrategy; @@ -1215,6 +1216,19 @@ public class HoodieWriteConfig extends HoodieConfig { return getString(HoodieClusteringConfig.PLAN_STRATEGY_CLASS_NAME); } + public ClusteringPlanPartitionFilterMode getClusteringPlanPartitionFilterMode() { + String mode = getString(HoodieClusteringConfig.PLAN_PARTITION_FILTER_MODE_NAME); + return ClusteringPlanPartitionFilterMode.valueOf(mode); + } + + public String getBeginPartitionForClustering() { + return getString(HoodieClusteringConfig.PARTITION_FILTER_BEGIN_PARTITION); + } + + public String getEndPartitionForClustering() { + return getString(HoodieClusteringConfig.PARTITION_FILTER_END_PARTITION); + } + public String getClusteringExecutionStrategyClass() { return getString(HoodieClusteringConfig.EXECUTION_STRATEGY_CLASS_NAME); } diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/BaseClusteringPlanActionExecutor.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/BaseClusteringPlanActionExecutor.java index 8071bfb83..a1820ed93 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/BaseClusteringPlanActionExecutor.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/BaseClusteringPlanActionExecutor.java @@ -79,7 +79,8 @@ public abstract class BaseClusteringPlanActionExecutor filter(List partitions, HoodieWriteConfig config) { + ClusteringPlanPartitionFilterMode mode = config.getClusteringPlanPartitionFilterMode(); + switch (mode) { + case NONE: + return partitions; + case RECENT_DAYS: + return recentDaysFilter(partitions, config); + case SELECTED_PARTITIONS: + return selectedPartitionsFilter(partitions, config); + default: + throw new HoodieClusteringException("Unknown partition filter, filter mode: " + mode); + } + } + + private static List recentDaysFilter(List partitions, HoodieWriteConfig config) { + int targetPartitionsForClustering = config.getTargetPartitionsForClustering(); + int skipPartitionsFromLatestForClustering = config.getSkipPartitionsFromLatestForClustering(); + return partitions.stream() + .sorted(Comparator.reverseOrder()) + .skip(Math.max(skipPartitionsFromLatestForClustering, 0)) + .limit(targetPartitionsForClustering > 0 ? targetPartitionsForClustering : partitions.size()) + .collect(Collectors.toList()); + } + + private static List selectedPartitionsFilter(List partitions, HoodieWriteConfig config) { + String beginPartition = config.getBeginPartitionForClustering(); + String endPartition = config.getEndPartitionForClustering(); + List filteredPartitions = partitions.stream() + .filter(path -> path.compareTo(beginPartition) >= 0 && path.compareTo(endPartition) <= 0) + .collect(Collectors.toList()); + return filteredPartitions; + } +} diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/ClusteringPlanPartitionFilterMode.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/ClusteringPlanPartitionFilterMode.java new file mode 100644 index 000000000..fbaf79797 --- /dev/null +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/ClusteringPlanPartitionFilterMode.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.table.action.cluster; + +/** + * Clustering partition filter mode + */ +public enum ClusteringPlanPartitionFilterMode { + NONE, + RECENT_DAYS, + SELECTED_PARTITIONS +} diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/strategy/ClusteringPlanStrategy.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/strategy/ClusteringPlanStrategy.java index 273ebce76..479f63932 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/strategy/ClusteringPlanStrategy.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/strategy/ClusteringPlanStrategy.java @@ -30,8 +30,11 @@ import org.apache.hudi.common.table.view.SyncableFileSystemView; import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.StringUtils; import org.apache.hudi.common.util.collection.Pair; +import org.apache.hudi.config.HoodieClusteringConfig; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.table.HoodieTable; +import org.apache.hudi.table.action.cluster.ClusteringPlanPartitionFilterMode; + import org.apache.log4j.LogManager; import org.apache.log4j.Logger; @@ -56,6 +59,37 @@ public abstract class ClusteringPlanStrategy filterPartitionPaths(List partitionPaths) { - return partitionPaths; + List filteredPartitions = ClusteringPlanPartitionFilter.filter(partitionPaths, getWriteConfig()); + LOG.debug("Filtered to the following partitions: " + filteredPartitions); + return filteredPartitions; } @Override diff --git a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/table/cluster/strategy/TestClusteringPlanStrategyConfigCompatibility.java b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/table/cluster/strategy/TestClusteringPlanStrategyConfigCompatibility.java new file mode 100644 index 000000000..34626a897 --- /dev/null +++ b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/table/cluster/strategy/TestClusteringPlanStrategyConfigCompatibility.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.table.cluster.strategy; + +import org.apache.hudi.config.HoodieClusteringConfig; +import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.table.action.cluster.ClusteringPlanPartitionFilterMode; +import org.apache.hudi.table.action.cluster.strategy.ClusteringPlanStrategy; + +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; + +import java.util.stream.Stream; + +public class TestClusteringPlanStrategyConfigCompatibility { + + private static Stream configParams() { + /** + * (user specified class, converted class, filter mode) + */ + Object[][] data = new Object[][] { + {"org.apache.hudi.client.clustering.plan.strategy.SparkRecentDaysClusteringPlanStrategy", + "org.apache.hudi.client.clustering.plan.strategy.SparkSizeBasedClusteringPlanStrategy", + ClusteringPlanPartitionFilterMode.RECENT_DAYS}, + {"org.apache.hudi.client.clustering.plan.strategy.SparkSelectedPartitionsClusteringPlanStrategy", + "org.apache.hudi.client.clustering.plan.strategy.SparkSizeBasedClusteringPlanStrategy", + ClusteringPlanPartitionFilterMode.SELECTED_PARTITIONS}, + {"org.apache.hudi.client.clustering.plan.strategy.JavaRecentDaysClusteringPlanStrategy", + "org.apache.hudi.client.clustering.plan.strategy.JavaSizeBasedClusteringPlanStrategy", + ClusteringPlanPartitionFilterMode.RECENT_DAYS} + }; + return Stream.of(data).map(Arguments::of); + } + + @ParameterizedTest() + @MethodSource("configParams") + public void testCheckAndGetClusteringPlanStrategy(String oldClass, String newClass, ClusteringPlanPartitionFilterMode mode) { + HoodieWriteConfig config = HoodieWriteConfig.newBuilder() + .withPath("") + .withClusteringConfig(HoodieClusteringConfig.newBuilder() + .withClusteringPlanStrategyClass(oldClass) + .build()) + .build(); + + Assertions.assertEquals(newClass, ClusteringPlanStrategy.checkAndGetClusteringPlanStrategy(config)); + Assertions.assertEquals(mode, config.getClusteringPlanPartitionFilterMode()); + } +} diff --git a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/clustering/plan/strategy/JavaRecentDaysClusteringPlanStrategy.java b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/clustering/plan/strategy/JavaRecentDaysClusteringPlanStrategy.java deleted file mode 100644 index 6d9b2eedc..000000000 --- a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/clustering/plan/strategy/JavaRecentDaysClusteringPlanStrategy.java +++ /dev/null @@ -1,65 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.hudi.client.clustering.plan.strategy; - -import org.apache.hudi.client.common.HoodieJavaEngineContext; -import org.apache.hudi.common.model.HoodieRecordPayload; -import org.apache.hudi.config.HoodieWriteConfig; -import org.apache.hudi.table.HoodieJavaCopyOnWriteTable; -import org.apache.hudi.table.HoodieJavaMergeOnReadTable; - -import org.apache.log4j.LogManager; -import org.apache.log4j.Logger; - -import java.util.Comparator; -import java.util.List; -import java.util.stream.Collectors; - -/** - * Clustering Strategy that only looks at latest 'daybased.lookback.partitions' partitions - * for Java engine. - */ -public class JavaRecentDaysClusteringPlanStrategy> - extends JavaSizeBasedClusteringPlanStrategy { - private static final Logger LOG = LogManager.getLogger(JavaRecentDaysClusteringPlanStrategy.class); - - public JavaRecentDaysClusteringPlanStrategy(HoodieJavaCopyOnWriteTable table, - HoodieJavaEngineContext engineContext, - HoodieWriteConfig writeConfig) { - super(table, engineContext, writeConfig); - } - - public JavaRecentDaysClusteringPlanStrategy(HoodieJavaMergeOnReadTable table, - HoodieJavaEngineContext engineContext, - HoodieWriteConfig writeConfig) { - super(table, engineContext, writeConfig); - } - - @Override - protected List filterPartitionPaths(List partitionPaths) { - int targetPartitionsForClustering = getWriteConfig().getTargetPartitionsForClustering(); - int skipPartitionsFromLatestForClustering = getWriteConfig().getSkipPartitionsFromLatestForClustering(); - return partitionPaths.stream() - .sorted(Comparator.reverseOrder()) - .skip(Math.max(skipPartitionsFromLatestForClustering, 0)) - .limit(targetPartitionsForClustering > 0 ? targetPartitionsForClustering : partitionPaths.size()) - .collect(Collectors.toList()); - } -} diff --git a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/clustering/plan/strategy/JavaSizeBasedClusteringPlanStrategy.java b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/clustering/plan/strategy/JavaSizeBasedClusteringPlanStrategy.java index 9052f030f..ec7202f4d 100644 --- a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/clustering/plan/strategy/JavaSizeBasedClusteringPlanStrategy.java +++ b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/clustering/plan/strategy/JavaSizeBasedClusteringPlanStrategy.java @@ -113,11 +113,6 @@ public class JavaSizeBasedClusteringPlanStrategy filterPartitionPaths(List partitionPaths) { - return partitionPaths; - } - @Override protected Stream getFileSlicesEligibleForClustering(final String partition) { return super.getFileSlicesEligibleForClustering(partition) diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/plan/strategy/SparkRecentDaysClusteringPlanStrategy.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/plan/strategy/SparkRecentDaysClusteringPlanStrategy.java deleted file mode 100644 index ad19824e1..000000000 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/plan/strategy/SparkRecentDaysClusteringPlanStrategy.java +++ /dev/null @@ -1,63 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hudi.client.clustering.plan.strategy; - -import org.apache.hudi.client.common.HoodieSparkEngineContext; -import org.apache.hudi.common.model.HoodieRecordPayload; -import org.apache.hudi.config.HoodieWriteConfig; -import org.apache.hudi.table.HoodieSparkCopyOnWriteTable; -import org.apache.hudi.table.HoodieSparkMergeOnReadTable; - -import org.apache.log4j.LogManager; -import org.apache.log4j.Logger; - -import java.util.Comparator; -import java.util.List; -import java.util.stream.Collectors; - -/** - * Clustering Strategy that only looks at latest 'daybased.lookback.partitions' partitions. - */ -public class SparkRecentDaysClusteringPlanStrategy> - extends SparkSizeBasedClusteringPlanStrategy { - private static final Logger LOG = LogManager.getLogger(SparkRecentDaysClusteringPlanStrategy.class); - - public SparkRecentDaysClusteringPlanStrategy(HoodieSparkCopyOnWriteTable table, - HoodieSparkEngineContext engineContext, - HoodieWriteConfig writeConfig) { - super(table, engineContext, writeConfig); - } - - public SparkRecentDaysClusteringPlanStrategy(HoodieSparkMergeOnReadTable table, - HoodieSparkEngineContext engineContext, - HoodieWriteConfig writeConfig) { - super(table, engineContext, writeConfig); - } - - @Override - protected List filterPartitionPaths(List partitionPaths) { - int targetPartitionsForClustering = getWriteConfig().getTargetPartitionsForClustering(); - int skipPartitionsFromLatestForClustering = getWriteConfig().getSkipPartitionsFromLatestForClustering(); - return partitionPaths.stream() - .sorted(Comparator.reverseOrder()) - .skip(Math.max(skipPartitionsFromLatestForClustering, 0)) - .limit(targetPartitionsForClustering > 0 ? targetPartitionsForClustering : partitionPaths.size()) - .collect(Collectors.toList()); - } -} diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/plan/strategy/SparkSelectedPartitionsClusteringPlanStrategy.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/plan/strategy/SparkSelectedPartitionsClusteringPlanStrategy.java deleted file mode 100644 index 549935d2f..000000000 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/plan/strategy/SparkSelectedPartitionsClusteringPlanStrategy.java +++ /dev/null @@ -1,66 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hudi.client.clustering.plan.strategy; - -import org.apache.hudi.client.common.HoodieSparkEngineContext; -import org.apache.hudi.common.model.HoodieRecordPayload; -import org.apache.hudi.config.HoodieWriteConfig; -import org.apache.hudi.table.HoodieSparkCopyOnWriteTable; -import org.apache.hudi.table.HoodieSparkMergeOnReadTable; -import org.apache.log4j.LogManager; -import org.apache.log4j.Logger; - -import java.util.List; -import java.util.stream.Collectors; - -import static org.apache.hudi.config.HoodieClusteringConfig.CLUSTERING_STRATEGY_PARAM_PREFIX; - -/** - * Clustering Strategy to filter just specified partitions from [begin, end]. Note both begin and end are inclusive. - */ -public class SparkSelectedPartitionsClusteringPlanStrategy> - extends SparkSizeBasedClusteringPlanStrategy { - private static final Logger LOG = LogManager.getLogger(SparkSelectedPartitionsClusteringPlanStrategy.class); - - public static final String CONF_BEGIN_PARTITION = CLUSTERING_STRATEGY_PARAM_PREFIX + "cluster.begin.partition"; - public static final String CONF_END_PARTITION = CLUSTERING_STRATEGY_PARAM_PREFIX + "cluster.end.partition"; - - public SparkSelectedPartitionsClusteringPlanStrategy(HoodieSparkCopyOnWriteTable table, - HoodieSparkEngineContext engineContext, - HoodieWriteConfig writeConfig) { - super(table, engineContext, writeConfig); - } - - public SparkSelectedPartitionsClusteringPlanStrategy(HoodieSparkMergeOnReadTable table, - HoodieSparkEngineContext engineContext, - HoodieWriteConfig writeConfig) { - super(table, engineContext, writeConfig); - } - - @Override - protected List filterPartitionPaths(List partitionPaths) { - String beginPartition = getWriteConfig().getProps().getProperty(CONF_BEGIN_PARTITION); - String endPartition = getWriteConfig().getProps().getProperty(CONF_END_PARTITION); - List filteredPartitions = partitionPaths.stream() - .filter(path -> path.compareTo(beginPartition) >= 0 && path.compareTo(endPartition) <= 0) - .collect(Collectors.toList()); - LOG.info("Filtered to the following partitions: " + filteredPartitions); - return filteredPartitions; - } -} diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/plan/strategy/SparkSingleFileSortPlanStrategy.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/plan/strategy/SparkSingleFileSortPlanStrategy.java index b98dbac39..88c3057f2 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/plan/strategy/SparkSingleFileSortPlanStrategy.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/plan/strategy/SparkSingleFileSortPlanStrategy.java @@ -48,6 +48,7 @@ public class SparkSingleFileSortPlanStrategy> super(table, engineContext, writeConfig); } + @Override protected Stream buildClusteringGroupsForPartition(String partitionPath, List fileSlices) { List, Integer>> fileSliceGroups = fileSlices.stream() .map(fileSlice -> Pair.of(Collections.singletonList(fileSlice), 1)).collect(Collectors.toList()); diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/plan/strategy/SparkSizeBasedClusteringPlanStrategy.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/plan/strategy/SparkSizeBasedClusteringPlanStrategy.java index 7295118ca..b38931c2d 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/plan/strategy/SparkSizeBasedClusteringPlanStrategy.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/plan/strategy/SparkSizeBasedClusteringPlanStrategy.java @@ -115,11 +115,6 @@ public class SparkSizeBasedClusteringPlanStrategy filterPartitionPaths(List partitionPaths) { - return partitionPaths; - } - @Override protected Stream getFileSlicesEligibleForClustering(final String partition) { return super.getFileSlicesEligibleForClustering(partition) diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/clustering/plan/strategy/TestSparkRecentDaysClusteringPlanStrategy.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/clustering/plan/strategy/TestSparkRecentDaysClusteringPlanStrategy.java deleted file mode 100644 index a9c7b7154..000000000 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/clustering/plan/strategy/TestSparkRecentDaysClusteringPlanStrategy.java +++ /dev/null @@ -1,66 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hudi.client.clustering.plan.strategy; - -import org.apache.hudi.client.common.HoodieSparkEngineContext; -import org.apache.hudi.config.HoodieClusteringConfig; -import org.apache.hudi.config.HoodieWriteConfig; -import org.apache.hudi.table.HoodieSparkCopyOnWriteTable; -import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Test; -import org.mockito.Mock; - -import java.util.ArrayList; -import java.util.List; - -import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertSame; - -public class TestSparkRecentDaysClusteringPlanStrategy { - @Mock - HoodieSparkCopyOnWriteTable table; - @Mock - HoodieSparkEngineContext context; - HoodieWriteConfig hoodieWriteConfig; - - @BeforeEach - public void setUp() { - this.hoodieWriteConfig = HoodieWriteConfig - .newBuilder() - .withPath("Fake_Table_Path") - .withClusteringConfig(HoodieClusteringConfig - .newBuilder() - .withClusteringSkipPartitionsFromLatest(1) - .withClusteringTargetPartitions(1) - .build()) - .build(); - } - - @Test - public void testFilterPartitionPaths() { - SparkRecentDaysClusteringPlanStrategy sg = new SparkRecentDaysClusteringPlanStrategy(table, context, hoodieWriteConfig); - ArrayList fakeTimeBasedPartitionsPath = new ArrayList<>(); - fakeTimeBasedPartitionsPath.add("20210718"); - fakeTimeBasedPartitionsPath.add("20210716"); - fakeTimeBasedPartitionsPath.add("20210719"); - List list = sg.filterPartitionPaths(fakeTimeBasedPartitionsPath); - assertEquals(1, list.size()); - assertSame("20210718", list.get(0)); - } -} diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/cluster/strategy/TestSparkClusteringPlanPartitionFilter.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/cluster/strategy/TestSparkClusteringPlanPartitionFilter.java new file mode 100644 index 000000000..a68a9e336 --- /dev/null +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/cluster/strategy/TestSparkClusteringPlanPartitionFilter.java @@ -0,0 +1,107 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hudi.table.action.cluster.strategy; + +import org.apache.hudi.client.clustering.plan.strategy.SparkSizeBasedClusteringPlanStrategy; +import org.apache.hudi.client.common.HoodieSparkEngineContext; +import org.apache.hudi.config.HoodieClusteringConfig; +import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.table.HoodieSparkCopyOnWriteTable; +import org.apache.hudi.table.action.cluster.ClusteringPlanPartitionFilterMode; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.mockito.Mock; + +import java.util.ArrayList; +import java.util.List; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertSame; + +public class TestSparkClusteringPlanPartitionFilter { + @Mock + HoodieSparkCopyOnWriteTable table; + @Mock + HoodieSparkEngineContext context; + HoodieWriteConfig.Builder hoodieWriteConfigBuilder; + + @BeforeEach + public void setUp() { + this.hoodieWriteConfigBuilder = HoodieWriteConfig + .newBuilder() + .withPath("Fake_Table_Path"); + } + + @Test + public void testFilterPartitionNoFilter() { + HoodieWriteConfig config = hoodieWriteConfigBuilder.withClusteringConfig(HoodieClusteringConfig.newBuilder() + .withClusteringPlanPartitionFilterMode(ClusteringPlanPartitionFilterMode.NONE) + .build()) + .build(); + + PartitionAwareClusteringPlanStrategy sg = new SparkSizeBasedClusteringPlanStrategy(table, context, config); + ArrayList fakeTimeBasedPartitionsPath = new ArrayList<>(); + fakeTimeBasedPartitionsPath.add("20210718"); + fakeTimeBasedPartitionsPath.add("20210716"); + fakeTimeBasedPartitionsPath.add("20210719"); + List list = sg.filterPartitionPaths(fakeTimeBasedPartitionsPath); + assertEquals(3, list.size()); + } + + @Test + public void testFilterPartitionRecentDays() { + HoodieWriteConfig config = hoodieWriteConfigBuilder.withClusteringConfig(HoodieClusteringConfig.newBuilder() + .withClusteringSkipPartitionsFromLatest(1) + .withClusteringTargetPartitions(1) + .withClusteringPlanPartitionFilterMode(ClusteringPlanPartitionFilterMode.RECENT_DAYS) + .build()) + .build(); + + PartitionAwareClusteringPlanStrategy sg = new SparkSizeBasedClusteringPlanStrategy(table, context, config); + ArrayList fakeTimeBasedPartitionsPath = new ArrayList<>(); + fakeTimeBasedPartitionsPath.add("20210718"); + fakeTimeBasedPartitionsPath.add("20210716"); + fakeTimeBasedPartitionsPath.add("20210719"); + List list = sg.filterPartitionPaths(fakeTimeBasedPartitionsPath); + assertEquals(1, list.size()); + assertSame("20210718", list.get(0)); + } + + @Test + public void testFilterPartitionSelectedPartitions() { + HoodieWriteConfig config = hoodieWriteConfigBuilder.withClusteringConfig(HoodieClusteringConfig.newBuilder() + .withClusteringPartitionFilterBeginPartition("20211222") + .withClusteringPartitionFilterEndPartition("20211223") + .withClusteringPlanPartitionFilterMode(ClusteringPlanPartitionFilterMode.SELECTED_PARTITIONS) + .build()) + .build(); + + PartitionAwareClusteringPlanStrategy sg = new SparkSizeBasedClusteringPlanStrategy(table, context, config); + ArrayList fakeTimeBasedPartitionsPath = new ArrayList<>(); + fakeTimeBasedPartitionsPath.add("20211220"); + fakeTimeBasedPartitionsPath.add("20211221"); + fakeTimeBasedPartitionsPath.add("20211222"); + fakeTimeBasedPartitionsPath.add("20211224"); + List list = sg.filterPartitionPaths(fakeTimeBasedPartitionsPath); + assertEquals(1, list.size()); + assertSame("20211222", list.get(0)); + } +}