From 9fe28e56b49c7bf68ae2d83bfe89755314aa793b Mon Sep 17 00:00:00 2001 From: YueZhang <69956021+zhangyue19921010@users.noreply.github.com> Date: Wed, 12 Jan 2022 15:23:55 +0800 Subject: [PATCH] [HUDI-3045] New clustering regex match config to choose partitions when building clustering plan (#4346) Co-authored-by: yuezhang --- .../hudi/config/HoodieClusteringConfig.java | 11 +++ .../apache/hudi/config/HoodieWriteConfig.java | 4 + .../PartitionAwareClusteringPlanStrategy.java | 14 +++ ...tPartitionAwareClusteringPlanStrategy.java | 96 +++++++++++++++++++ 4 files changed, 125 insertions(+) create mode 100644 hudi-client/hudi-client-common/src/test/java/org/apache/hudi/table/action/cluster/strategy/TestPartitionAwareClusteringPlanStrategy.java diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieClusteringConfig.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieClusteringConfig.java index 9486ad43d..057b4a6f6 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieClusteringConfig.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieClusteringConfig.java @@ -87,6 +87,12 @@ public class HoodieClusteringConfig extends HoodieConfig { .sinceVersion("0.7.0") .withDocumentation("Files smaller than the size specified here are candidates for clustering"); + public static final ConfigProperty PARTITION_REGEX_PATTERN = ConfigProperty + .key(CLUSTERING_STRATEGY_PARAM_PREFIX + "partition.regex.pattern") + .noDefaultValue() + .sinceVersion("0.11.0") + .withDocumentation("Filter clustering partitions that matched regex pattern"); + public static final ConfigProperty PLAN_STRATEGY_CLASS_NAME = ConfigProperty .key("hoodie.clustering.plan.strategy.class") .defaultValue(SPARK_SIZED_BASED_CLUSTERING_PLAN_STRATEGY) @@ -424,6 +430,11 @@ public class HoodieClusteringConfig extends HoodieConfig { return this; } + public Builder withClusteringPartitionRegexPattern(String pattern) { + clusteringConfig.setValue(PARTITION_REGEX_PATTERN, pattern); + return this; + } + public Builder withClusteringSkipPartitionsFromLatest(int clusteringSkipPartitionsFromLatest) { clusteringConfig.setValue(PLAN_STRATEGY_SKIP_PARTITIONS_FROM_LATEST, String.valueOf(clusteringSkipPartitionsFromLatest)); return this; diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java index 7b33d338f..4c392d5c9 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java @@ -1252,6 +1252,10 @@ public class HoodieWriteConfig extends HoodieConfig { return getLong(HoodieClusteringConfig.PLAN_STRATEGY_SMALL_FILE_LIMIT); } + public String getClusteringPartitionFilterRegexPattern() { + return getString(HoodieClusteringConfig.PARTITION_REGEX_PATTERN); + } + public int getClusteringMaxNumGroups() { return getInt(HoodieClusteringConfig.PLAN_STRATEGY_MAX_GROUPS); } diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/strategy/PartitionAwareClusteringPlanStrategy.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/strategy/PartitionAwareClusteringPlanStrategy.java index ead615fe8..f4aaeee0c 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/strategy/PartitionAwareClusteringPlanStrategy.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/strategy/PartitionAwareClusteringPlanStrategy.java @@ -27,6 +27,7 @@ import org.apache.hudi.common.model.FileSlice; import org.apache.hudi.common.model.HoodieRecordPayload; import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.StringUtils; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.table.HoodieTable; import org.apache.hudi.table.action.cluster.ClusteringPlanPartitionFilter; @@ -35,6 +36,7 @@ import org.apache.log4j.LogManager; import org.apache.log4j.Logger; import java.util.List; +import java.util.regex.Pattern; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -70,6 +72,8 @@ public abstract class PartitionAwareClusteringPlanStrategy partitionPaths = FSUtils.getAllPartitionPaths(getEngineContext(), config.getMetadataConfig(), metaClient.getBasePath()); + // get regex matched partitions if set + partitionPaths = getRegexPatternMatchedPartitions(config, partitionPaths); // filter the partition paths if needed to reduce list status partitionPaths = filterPartitionPaths(partitionPaths); @@ -108,4 +112,14 @@ public abstract class PartitionAwareClusteringPlanStrategy getRegexPatternMatchedPartitions(HoodieWriteConfig config, List partitionPaths) { + String pattern = config.getClusteringPartitionFilterRegexPattern(); + if (!StringUtils.isNullOrEmpty(pattern)) { + partitionPaths = partitionPaths.stream() + .filter(partition -> Pattern.matches(pattern, partition)) + .collect(Collectors.toList()); + } + return partitionPaths; + } } diff --git a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/table/action/cluster/strategy/TestPartitionAwareClusteringPlanStrategy.java b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/table/action/cluster/strategy/TestPartitionAwareClusteringPlanStrategy.java new file mode 100644 index 000000000..a053a9611 --- /dev/null +++ b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/table/action/cluster/strategy/TestPartitionAwareClusteringPlanStrategy.java @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.table.action.cluster.strategy; + +import org.apache.hudi.avro.model.HoodieClusteringGroup; +import org.apache.hudi.common.engine.HoodieEngineContext; +import org.apache.hudi.config.HoodieClusteringConfig; +import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.table.HoodieTable; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.mockito.Mock; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.stream.Stream; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; + +public class TestPartitionAwareClusteringPlanStrategy { + + @Mock + HoodieTable table; + @Mock + HoodieEngineContext context; + HoodieWriteConfig hoodieWriteConfig; + + @BeforeEach + public void setUp() { + Properties props = new Properties(); + props.setProperty("hoodie.clustering.plan.strategy.partition.regex.pattern", "2021072.*"); + this.hoodieWriteConfig = HoodieWriteConfig + .newBuilder() + .withPath("dummy_Table_Path") + .withClusteringConfig(HoodieClusteringConfig + .newBuilder() + .fromProperties(props) + .build()) + .build(); + } + + @Test + public void testFilterPartitionPaths() { + PartitionAwareClusteringPlanStrategy strategyTestRegexPattern = new DummyPartitionAwareClusteringPlanStrategy(table, context, hoodieWriteConfig); + + ArrayList fakeTimeBasedPartitionsPath = new ArrayList<>(); + fakeTimeBasedPartitionsPath.add("20210718"); + fakeTimeBasedPartitionsPath.add("20210715"); + fakeTimeBasedPartitionsPath.add("20210723"); + fakeTimeBasedPartitionsPath.add("20210716"); + fakeTimeBasedPartitionsPath.add("20210719"); + fakeTimeBasedPartitionsPath.add("20210721"); + + List list = strategyTestRegexPattern.getRegexPatternMatchedPartitions(hoodieWriteConfig, fakeTimeBasedPartitionsPath); + assertEquals(2, list.size()); + assertTrue(list.contains("20210721")); + assertTrue(list.contains("20210723")); + } + + class DummyPartitionAwareClusteringPlanStrategy extends PartitionAwareClusteringPlanStrategy { + + public DummyPartitionAwareClusteringPlanStrategy(HoodieTable table, HoodieEngineContext engineContext, HoodieWriteConfig writeConfig) { + super(table, engineContext, writeConfig); + } + + @Override + protected Stream buildClusteringGroupsForPartition(String partitionPath, List list) { + return null; + } + + @Override + protected Map getStrategyParams() { + return null; + } + } +}