1
0

[HUDI-3095] abstract partition filter logic to enable code reuse (#4454)

* [HUDI-3095] abstract partition filter logic to enable code reuse

* [HUDI-3095] address reviews
This commit is contained in:
Yuwei XIAO
2021-12-31 13:37:52 +08:00
committed by GitHub
parent e88b5fd450
commit 2444f40a4b
16 changed files with 368 additions and 272 deletions

View File

@@ -27,6 +27,7 @@ import org.apache.hudi.common.engine.EngineType;
import org.apache.hudi.common.util.TypeUtils;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieNotSupportedException;
import org.apache.hudi.table.action.cluster.ClusteringPlanPartitionFilterMode;
import javax.annotation.Nonnull;
import java.io.File;
@@ -54,6 +55,8 @@ public class HoodieClusteringConfig extends HoodieConfig {
"org.apache.hudi.client.clustering.run.strategy.SparkSortAndSizeExecutionStrategy";
public static final String JAVA_SORT_AND_SIZE_EXECUTION_STRATEGY =
"org.apache.hudi.client.clustering.run.strategy.JavaSortAndSizeExecutionStrategy";
public static final String PLAN_PARTITION_FILTER_MODE =
"hoodie.clustering.plan.partition.filter.mode";
// Any Space-filling curves optimize(z-order/hilbert) params can be saved with this prefix
public static final String LAYOUT_OPTIMIZE_PARAM_PREFIX = "hoodie.layout.optimize.";
@@ -64,6 +67,20 @@ public class HoodieClusteringConfig extends HoodieConfig {
.sinceVersion("0.7.0")
.withDocumentation("Number of partitions to list to create ClusteringPlan");
public static final ConfigProperty<String> PARTITION_FILTER_BEGIN_PARTITION = ConfigProperty
.key(CLUSTERING_STRATEGY_PARAM_PREFIX + "cluster.begin.partition")
.noDefaultValue()
.sinceVersion("0.11.0")
.withDocumentation("Begin partition used to filter partition (inclusive), only effective when the filter mode '"
+ PLAN_PARTITION_FILTER_MODE + "' is " + ClusteringPlanPartitionFilterMode.SELECTED_PARTITIONS.name());
public static final ConfigProperty<String> PARTITION_FILTER_END_PARTITION = ConfigProperty
.key(CLUSTERING_STRATEGY_PARAM_PREFIX + "cluster.end.partition")
.noDefaultValue()
.sinceVersion("0.11.0")
.withDocumentation("End partition used to filter partition (inclusive), only effective when the filter mode '"
+ PLAN_PARTITION_FILTER_MODE + "' is " + ClusteringPlanPartitionFilterMode.SELECTED_PARTITIONS.name());
public static final ConfigProperty<String> PLAN_STRATEGY_SMALL_FILE_LIMIT = ConfigProperty
.key(CLUSTERING_STRATEGY_PARAM_PREFIX + "small.file.limit")
.defaultValue(String.valueOf(600 * 1024 * 1024L))
@@ -110,6 +127,17 @@ public class HoodieClusteringConfig extends HoodieConfig {
.sinceVersion("0.9.0")
.withDocumentation("Number of partitions to skip from latest when choosing partitions to create ClusteringPlan");
public static final ConfigProperty<ClusteringPlanPartitionFilterMode> PLAN_PARTITION_FILTER_MODE_NAME = ConfigProperty
.key(PLAN_PARTITION_FILTER_MODE)
.defaultValue(ClusteringPlanPartitionFilterMode.NONE)
.sinceVersion("0.11.0")
.withDocumentation("Partition filter mode used in the creation of clustering plan. Available values are - "
+ "NONE: do not filter table partition and thus the clustering plan will include all partitions that have clustering candidate."
+ "RECENT_DAYS: keep a continuous range of partitions, worked together with configs '" + DAYBASED_LOOKBACK_PARTITIONS.key() + "' and '"
+ PLAN_STRATEGY_SKIP_PARTITIONS_FROM_LATEST.key() + "."
+ "SELECTED_PARTITIONS: keep partitions that are in the specified range ['" + PARTITION_FILTER_BEGIN_PARTITION.key() + "', '"
+ PARTITION_FILTER_END_PARTITION.key() + "'].");
public static final ConfigProperty<String> PLAN_STRATEGY_MAX_BYTES_PER_OUTPUT_FILEGROUP = ConfigProperty
.key(CLUSTERING_STRATEGY_PARAM_PREFIX + "max.bytes.per.group")
.defaultValue(String.valueOf(2 * 1024 * 1024 * 1024L))
@@ -381,6 +409,11 @@ public class HoodieClusteringConfig extends HoodieConfig {
return this;
}
public Builder withClusteringPlanPartitionFilterMode(ClusteringPlanPartitionFilterMode mode) {
clusteringConfig.setValue(PLAN_PARTITION_FILTER_MODE_NAME.key(), mode.toString());
return this;
}
public Builder withClusteringExecutionStrategyClass(String runClusteringStrategyClass) {
clusteringConfig.setValue(EXECUTION_STRATEGY_CLASS_NAME, runClusteringStrategyClass);
return this;
@@ -396,6 +429,16 @@ public class HoodieClusteringConfig extends HoodieConfig {
return this;
}
public Builder withClusteringPartitionFilterBeginPartition(String begin) {
clusteringConfig.setValue(PARTITION_FILTER_BEGIN_PARTITION, begin);
return this;
}
public Builder withClusteringPartitionFilterEndPartition(String end) {
clusteringConfig.setValue(PARTITION_FILTER_END_PARTITION, end);
return this;
}
public Builder withClusteringPlanSmallFileLimit(long clusteringSmallFileLimit) {
clusteringConfig.setValue(PLAN_STRATEGY_SMALL_FILE_LIMIT, String.valueOf(clusteringSmallFileLimit));
return this;

View File

@@ -53,6 +53,7 @@ import org.apache.hudi.keygen.constant.KeyGeneratorType;
import org.apache.hudi.metrics.MetricsReporterType;
import org.apache.hudi.metrics.datadog.DatadogHttpClient.ApiSite;
import org.apache.hudi.table.RandomFileIdPrefixProvider;
import org.apache.hudi.table.action.cluster.ClusteringPlanPartitionFilterMode;
import org.apache.hudi.table.action.compact.CompactionTriggerStrategy;
import org.apache.hudi.table.action.compact.strategy.CompactionStrategy;
@@ -1215,6 +1216,19 @@ public class HoodieWriteConfig extends HoodieConfig {
return getString(HoodieClusteringConfig.PLAN_STRATEGY_CLASS_NAME);
}
public ClusteringPlanPartitionFilterMode getClusteringPlanPartitionFilterMode() {
String mode = getString(HoodieClusteringConfig.PLAN_PARTITION_FILTER_MODE_NAME);
return ClusteringPlanPartitionFilterMode.valueOf(mode);
}
public String getBeginPartitionForClustering() {
return getString(HoodieClusteringConfig.PARTITION_FILTER_BEGIN_PARTITION);
}
public String getEndPartitionForClustering() {
return getString(HoodieClusteringConfig.PARTITION_FILTER_END_PARTITION);
}
public String getClusteringExecutionStrategyClass() {
return getString(HoodieClusteringConfig.EXECUTION_STRATEGY_CLASS_NAME);
}

View File

@@ -79,7 +79,8 @@ public abstract class BaseClusteringPlanActionExecutor<T extends HoodieRecordPay
LOG.info("Generating clustering plan for table " + config.getBasePath());
ClusteringPlanStrategy strategy = (ClusteringPlanStrategy)
ReflectionUtils.loadClass(config.getClusteringPlanStrategyClass(), table, context, config);
ReflectionUtils.loadClass(ClusteringPlanStrategy.checkAndGetClusteringPlanStrategy(config), table, context, config);
return strategy.generateClusteringPlan();
}

View File

@@ -0,0 +1,68 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.table.action.cluster;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieClusteringException;
import java.util.Comparator;
import java.util.List;
import java.util.stream.Collectors;
/**
* Partition filter utilities. Currently, we support three mode:
* NONE: skip filter
* RECENT DAYS: output recent partition given skip num and days lookback config
* SELECTED_PARTITIONS: output partition falls in the [start, end] condition
*/
public class ClusteringPlanPartitionFilter {
public static List<String> filter(List<String> partitions, HoodieWriteConfig config) {
ClusteringPlanPartitionFilterMode mode = config.getClusteringPlanPartitionFilterMode();
switch (mode) {
case NONE:
return partitions;
case RECENT_DAYS:
return recentDaysFilter(partitions, config);
case SELECTED_PARTITIONS:
return selectedPartitionsFilter(partitions, config);
default:
throw new HoodieClusteringException("Unknown partition filter, filter mode: " + mode);
}
}
private static List<String> recentDaysFilter(List<String> partitions, HoodieWriteConfig config) {
int targetPartitionsForClustering = config.getTargetPartitionsForClustering();
int skipPartitionsFromLatestForClustering = config.getSkipPartitionsFromLatestForClustering();
return partitions.stream()
.sorted(Comparator.reverseOrder())
.skip(Math.max(skipPartitionsFromLatestForClustering, 0))
.limit(targetPartitionsForClustering > 0 ? targetPartitionsForClustering : partitions.size())
.collect(Collectors.toList());
}
private static List<String> selectedPartitionsFilter(List<String> partitions, HoodieWriteConfig config) {
String beginPartition = config.getBeginPartitionForClustering();
String endPartition = config.getEndPartitionForClustering();
List<String> filteredPartitions = partitions.stream()
.filter(path -> path.compareTo(beginPartition) >= 0 && path.compareTo(endPartition) <= 0)
.collect(Collectors.toList());
return filteredPartitions;
}
}

View File

@@ -0,0 +1,28 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.table.action.cluster;
/**
* Clustering partition filter mode
*/
public enum ClusteringPlanPartitionFilterMode {
NONE,
RECENT_DAYS,
SELECTED_PARTITIONS
}

View File

@@ -30,8 +30,11 @@ import org.apache.hudi.common.table.view.SyncableFileSystemView;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieClusteringConfig;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.action.cluster.ClusteringPlanPartitionFilterMode;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
@@ -56,6 +59,37 @@ public abstract class ClusteringPlanStrategy<T extends HoodieRecordPayload,I,K,O
private final transient HoodieEngineContext engineContext;
private final HoodieWriteConfig writeConfig;
/**
* Check if the given class is deprecated.
* If it is, then try to convert it to suitable one and update the write config accordingly.
* @param config write config
* @return class name of clustering plan strategy
*/
public static String checkAndGetClusteringPlanStrategy(HoodieWriteConfig config) {
String className = config.getClusteringPlanStrategyClass();
String sparkSizeBasedClassName = HoodieClusteringConfig.SPARK_SIZED_BASED_CLUSTERING_PLAN_STRATEGY;
String sparkSelectedPartitionsClassName = "org.apache.hudi.client.clustering.plan.strategy.SparkSelectedPartitionsClusteringPlanStrategy";
String sparkRecentDaysClassName = "org.apache.hudi.client.clustering.plan.strategy.SparkRecentDaysClusteringPlanStrategy";
String javaSelectedPartitionClassName = "org.apache.hudi.client.clustering.plan.strategy.JavaRecentDaysClusteringPlanStrategy";
String javaSizeBasedClassName = HoodieClusteringConfig.JAVA_SIZED_BASED_CLUSTERING_PLAN_STRATEGY;
String logStr = "The clustering plan '%s' is deprecated. Please set the plan as '%s' and set '%s' as '%s' to achieve the same behaviour";
if (sparkRecentDaysClassName.equals(className)) {
config.setValue(HoodieClusteringConfig.PLAN_PARTITION_FILTER_MODE_NAME, ClusteringPlanPartitionFilterMode.RECENT_DAYS.name());
LOG.warn(String.format(logStr, className, sparkSizeBasedClassName, HoodieClusteringConfig.PLAN_PARTITION_FILTER_MODE_NAME.key(), ClusteringPlanPartitionFilterMode.RECENT_DAYS.name()));
return sparkSizeBasedClassName;
} else if (sparkSelectedPartitionsClassName.equals(className)) {
config.setValue(HoodieClusteringConfig.PLAN_PARTITION_FILTER_MODE_NAME, ClusteringPlanPartitionFilterMode.SELECTED_PARTITIONS.name());
LOG.warn(String.format(logStr, className, sparkSizeBasedClassName, HoodieClusteringConfig.PLAN_PARTITION_FILTER_MODE_NAME.key(), ClusteringPlanPartitionFilterMode.SELECTED_PARTITIONS.name()));
return sparkSizeBasedClassName;
} else if (javaSelectedPartitionClassName.equals(className)) {
config.setValue(HoodieClusteringConfig.PLAN_PARTITION_FILTER_MODE_NAME, ClusteringPlanPartitionFilterMode.RECENT_DAYS.name());
LOG.warn(String.format(logStr, className, javaSizeBasedClassName, HoodieClusteringConfig.PLAN_PARTITION_FILTER_MODE_NAME.key(), ClusteringPlanPartitionFilterMode.SELECTED_PARTITIONS.name()));
return javaSizeBasedClassName;
}
return className;
}
public ClusteringPlanStrategy(HoodieTable table, HoodieEngineContext engineContext, HoodieWriteConfig writeConfig) {
this.writeConfig = writeConfig;
this.hoodieTable = table;

View File

@@ -29,6 +29,8 @@ import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.action.cluster.ClusteringPlanPartitionFilter;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
@@ -56,7 +58,9 @@ public abstract class PartitionAwareClusteringPlanStrategy<T extends HoodieRecor
* Return list of partition paths to be considered for clustering.
*/
protected List<String> filterPartitionPaths(List<String> partitionPaths) {
return partitionPaths;
List<String> filteredPartitions = ClusteringPlanPartitionFilter.filter(partitionPaths, getWriteConfig());
LOG.debug("Filtered to the following partitions: " + filteredPartitions);
return filteredPartitions;
}
@Override

View File

@@ -0,0 +1,66 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.table.cluster.strategy;
import org.apache.hudi.config.HoodieClusteringConfig;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.table.action.cluster.ClusteringPlanPartitionFilterMode;
import org.apache.hudi.table.action.cluster.strategy.ClusteringPlanStrategy;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;
import java.util.stream.Stream;
public class TestClusteringPlanStrategyConfigCompatibility {
private static Stream<Arguments> configParams() {
/**
* (user specified class, converted class, filter mode)
*/
Object[][] data = new Object[][] {
{"org.apache.hudi.client.clustering.plan.strategy.SparkRecentDaysClusteringPlanStrategy",
"org.apache.hudi.client.clustering.plan.strategy.SparkSizeBasedClusteringPlanStrategy",
ClusteringPlanPartitionFilterMode.RECENT_DAYS},
{"org.apache.hudi.client.clustering.plan.strategy.SparkSelectedPartitionsClusteringPlanStrategy",
"org.apache.hudi.client.clustering.plan.strategy.SparkSizeBasedClusteringPlanStrategy",
ClusteringPlanPartitionFilterMode.SELECTED_PARTITIONS},
{"org.apache.hudi.client.clustering.plan.strategy.JavaRecentDaysClusteringPlanStrategy",
"org.apache.hudi.client.clustering.plan.strategy.JavaSizeBasedClusteringPlanStrategy",
ClusteringPlanPartitionFilterMode.RECENT_DAYS}
};
return Stream.of(data).map(Arguments::of);
}
@ParameterizedTest()
@MethodSource("configParams")
public void testCheckAndGetClusteringPlanStrategy(String oldClass, String newClass, ClusteringPlanPartitionFilterMode mode) {
HoodieWriteConfig config = HoodieWriteConfig.newBuilder()
.withPath("")
.withClusteringConfig(HoodieClusteringConfig.newBuilder()
.withClusteringPlanStrategyClass(oldClass)
.build())
.build();
Assertions.assertEquals(newClass, ClusteringPlanStrategy.checkAndGetClusteringPlanStrategy(config));
Assertions.assertEquals(mode, config.getClusteringPlanPartitionFilterMode());
}
}

View File

@@ -1,65 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.hudi.client.clustering.plan.strategy;
import org.apache.hudi.client.common.HoodieJavaEngineContext;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.table.HoodieJavaCopyOnWriteTable;
import org.apache.hudi.table.HoodieJavaMergeOnReadTable;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import java.util.Comparator;
import java.util.List;
import java.util.stream.Collectors;
/**
* Clustering Strategy that only looks at latest 'daybased.lookback.partitions' partitions
* for Java engine.
*/
public class JavaRecentDaysClusteringPlanStrategy<T extends HoodieRecordPayload<T>>
extends JavaSizeBasedClusteringPlanStrategy<T> {
private static final Logger LOG = LogManager.getLogger(JavaRecentDaysClusteringPlanStrategy.class);
public JavaRecentDaysClusteringPlanStrategy(HoodieJavaCopyOnWriteTable<T> table,
HoodieJavaEngineContext engineContext,
HoodieWriteConfig writeConfig) {
super(table, engineContext, writeConfig);
}
public JavaRecentDaysClusteringPlanStrategy(HoodieJavaMergeOnReadTable<T> table,
HoodieJavaEngineContext engineContext,
HoodieWriteConfig writeConfig) {
super(table, engineContext, writeConfig);
}
@Override
protected List<String> filterPartitionPaths(List<String> partitionPaths) {
int targetPartitionsForClustering = getWriteConfig().getTargetPartitionsForClustering();
int skipPartitionsFromLatestForClustering = getWriteConfig().getSkipPartitionsFromLatestForClustering();
return partitionPaths.stream()
.sorted(Comparator.reverseOrder())
.skip(Math.max(skipPartitionsFromLatestForClustering, 0))
.limit(targetPartitionsForClustering > 0 ? targetPartitionsForClustering : partitionPaths.size())
.collect(Collectors.toList());
}
}

View File

@@ -113,11 +113,6 @@ public class JavaSizeBasedClusteringPlanStrategy<T extends HoodieRecordPayload<T
return params;
}
@Override
protected List<String> filterPartitionPaths(List<String> partitionPaths) {
return partitionPaths;
}
@Override
protected Stream<FileSlice> getFileSlicesEligibleForClustering(final String partition) {
return super.getFileSlicesEligibleForClustering(partition)

View File

@@ -1,63 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.client.clustering.plan.strategy;
import org.apache.hudi.client.common.HoodieSparkEngineContext;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.table.HoodieSparkCopyOnWriteTable;
import org.apache.hudi.table.HoodieSparkMergeOnReadTable;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import java.util.Comparator;
import java.util.List;
import java.util.stream.Collectors;
/**
* Clustering Strategy that only looks at latest 'daybased.lookback.partitions' partitions.
*/
public class SparkRecentDaysClusteringPlanStrategy<T extends HoodieRecordPayload<T>>
extends SparkSizeBasedClusteringPlanStrategy<T> {
private static final Logger LOG = LogManager.getLogger(SparkRecentDaysClusteringPlanStrategy.class);
public SparkRecentDaysClusteringPlanStrategy(HoodieSparkCopyOnWriteTable<T> table,
HoodieSparkEngineContext engineContext,
HoodieWriteConfig writeConfig) {
super(table, engineContext, writeConfig);
}
public SparkRecentDaysClusteringPlanStrategy(HoodieSparkMergeOnReadTable<T> table,
HoodieSparkEngineContext engineContext,
HoodieWriteConfig writeConfig) {
super(table, engineContext, writeConfig);
}
@Override
protected List<String> filterPartitionPaths(List<String> partitionPaths) {
int targetPartitionsForClustering = getWriteConfig().getTargetPartitionsForClustering();
int skipPartitionsFromLatestForClustering = getWriteConfig().getSkipPartitionsFromLatestForClustering();
return partitionPaths.stream()
.sorted(Comparator.reverseOrder())
.skip(Math.max(skipPartitionsFromLatestForClustering, 0))
.limit(targetPartitionsForClustering > 0 ? targetPartitionsForClustering : partitionPaths.size())
.collect(Collectors.toList());
}
}

View File

@@ -1,66 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.client.clustering.plan.strategy;
import org.apache.hudi.client.common.HoodieSparkEngineContext;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.table.HoodieSparkCopyOnWriteTable;
import org.apache.hudi.table.HoodieSparkMergeOnReadTable;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import java.util.List;
import java.util.stream.Collectors;
import static org.apache.hudi.config.HoodieClusteringConfig.CLUSTERING_STRATEGY_PARAM_PREFIX;
/**
* Clustering Strategy to filter just specified partitions from [begin, end]. Note both begin and end are inclusive.
*/
public class SparkSelectedPartitionsClusteringPlanStrategy<T extends HoodieRecordPayload<T>>
extends SparkSizeBasedClusteringPlanStrategy<T> {
private static final Logger LOG = LogManager.getLogger(SparkSelectedPartitionsClusteringPlanStrategy.class);
public static final String CONF_BEGIN_PARTITION = CLUSTERING_STRATEGY_PARAM_PREFIX + "cluster.begin.partition";
public static final String CONF_END_PARTITION = CLUSTERING_STRATEGY_PARAM_PREFIX + "cluster.end.partition";
public SparkSelectedPartitionsClusteringPlanStrategy(HoodieSparkCopyOnWriteTable<T> table,
HoodieSparkEngineContext engineContext,
HoodieWriteConfig writeConfig) {
super(table, engineContext, writeConfig);
}
public SparkSelectedPartitionsClusteringPlanStrategy(HoodieSparkMergeOnReadTable<T> table,
HoodieSparkEngineContext engineContext,
HoodieWriteConfig writeConfig) {
super(table, engineContext, writeConfig);
}
@Override
protected List<String> filterPartitionPaths(List<String> partitionPaths) {
String beginPartition = getWriteConfig().getProps().getProperty(CONF_BEGIN_PARTITION);
String endPartition = getWriteConfig().getProps().getProperty(CONF_END_PARTITION);
List<String> filteredPartitions = partitionPaths.stream()
.filter(path -> path.compareTo(beginPartition) >= 0 && path.compareTo(endPartition) <= 0)
.collect(Collectors.toList());
LOG.info("Filtered to the following partitions: " + filteredPartitions);
return filteredPartitions;
}
}

View File

@@ -48,6 +48,7 @@ public class SparkSingleFileSortPlanStrategy<T extends HoodieRecordPayload<T>>
super(table, engineContext, writeConfig);
}
@Override
protected Stream<HoodieClusteringGroup> buildClusteringGroupsForPartition(String partitionPath, List<FileSlice> fileSlices) {
List<Pair<List<FileSlice>, Integer>> fileSliceGroups = fileSlices.stream()
.map(fileSlice -> Pair.of(Collections.singletonList(fileSlice), 1)).collect(Collectors.toList());

View File

@@ -115,11 +115,6 @@ public class SparkSizeBasedClusteringPlanStrategy<T extends HoodieRecordPayload<
return params;
}
@Override
protected List<String> filterPartitionPaths(List<String> partitionPaths) {
return partitionPaths;
}
@Override
protected Stream<FileSlice> getFileSlicesEligibleForClustering(final String partition) {
return super.getFileSlicesEligibleForClustering(partition)

View File

@@ -1,66 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.client.clustering.plan.strategy;
import org.apache.hudi.client.common.HoodieSparkEngineContext;
import org.apache.hudi.config.HoodieClusteringConfig;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.table.HoodieSparkCopyOnWriteTable;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.mockito.Mock;
import java.util.ArrayList;
import java.util.List;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertSame;
public class TestSparkRecentDaysClusteringPlanStrategy {
@Mock
HoodieSparkCopyOnWriteTable table;
@Mock
HoodieSparkEngineContext context;
HoodieWriteConfig hoodieWriteConfig;
@BeforeEach
public void setUp() {
this.hoodieWriteConfig = HoodieWriteConfig
.newBuilder()
.withPath("Fake_Table_Path")
.withClusteringConfig(HoodieClusteringConfig
.newBuilder()
.withClusteringSkipPartitionsFromLatest(1)
.withClusteringTargetPartitions(1)
.build())
.build();
}
@Test
public void testFilterPartitionPaths() {
SparkRecentDaysClusteringPlanStrategy sg = new SparkRecentDaysClusteringPlanStrategy(table, context, hoodieWriteConfig);
ArrayList<String> fakeTimeBasedPartitionsPath = new ArrayList<>();
fakeTimeBasedPartitionsPath.add("20210718");
fakeTimeBasedPartitionsPath.add("20210716");
fakeTimeBasedPartitionsPath.add("20210719");
List list = sg.filterPartitionPaths(fakeTimeBasedPartitionsPath);
assertEquals(1, list.size());
assertSame("20210718", list.get(0));
}
}

View File

@@ -0,0 +1,107 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.hudi.table.action.cluster.strategy;
import org.apache.hudi.client.clustering.plan.strategy.SparkSizeBasedClusteringPlanStrategy;
import org.apache.hudi.client.common.HoodieSparkEngineContext;
import org.apache.hudi.config.HoodieClusteringConfig;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.table.HoodieSparkCopyOnWriteTable;
import org.apache.hudi.table.action.cluster.ClusteringPlanPartitionFilterMode;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.mockito.Mock;
import java.util.ArrayList;
import java.util.List;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertSame;
public class TestSparkClusteringPlanPartitionFilter {
@Mock
HoodieSparkCopyOnWriteTable table;
@Mock
HoodieSparkEngineContext context;
HoodieWriteConfig.Builder hoodieWriteConfigBuilder;
@BeforeEach
public void setUp() {
this.hoodieWriteConfigBuilder = HoodieWriteConfig
.newBuilder()
.withPath("Fake_Table_Path");
}
@Test
public void testFilterPartitionNoFilter() {
HoodieWriteConfig config = hoodieWriteConfigBuilder.withClusteringConfig(HoodieClusteringConfig.newBuilder()
.withClusteringPlanPartitionFilterMode(ClusteringPlanPartitionFilterMode.NONE)
.build())
.build();
PartitionAwareClusteringPlanStrategy sg = new SparkSizeBasedClusteringPlanStrategy(table, context, config);
ArrayList<String> fakeTimeBasedPartitionsPath = new ArrayList<>();
fakeTimeBasedPartitionsPath.add("20210718");
fakeTimeBasedPartitionsPath.add("20210716");
fakeTimeBasedPartitionsPath.add("20210719");
List list = sg.filterPartitionPaths(fakeTimeBasedPartitionsPath);
assertEquals(3, list.size());
}
@Test
public void testFilterPartitionRecentDays() {
HoodieWriteConfig config = hoodieWriteConfigBuilder.withClusteringConfig(HoodieClusteringConfig.newBuilder()
.withClusteringSkipPartitionsFromLatest(1)
.withClusteringTargetPartitions(1)
.withClusteringPlanPartitionFilterMode(ClusteringPlanPartitionFilterMode.RECENT_DAYS)
.build())
.build();
PartitionAwareClusteringPlanStrategy sg = new SparkSizeBasedClusteringPlanStrategy(table, context, config);
ArrayList<String> fakeTimeBasedPartitionsPath = new ArrayList<>();
fakeTimeBasedPartitionsPath.add("20210718");
fakeTimeBasedPartitionsPath.add("20210716");
fakeTimeBasedPartitionsPath.add("20210719");
List list = sg.filterPartitionPaths(fakeTimeBasedPartitionsPath);
assertEquals(1, list.size());
assertSame("20210718", list.get(0));
}
@Test
public void testFilterPartitionSelectedPartitions() {
HoodieWriteConfig config = hoodieWriteConfigBuilder.withClusteringConfig(HoodieClusteringConfig.newBuilder()
.withClusteringPartitionFilterBeginPartition("20211222")
.withClusteringPartitionFilterEndPartition("20211223")
.withClusteringPlanPartitionFilterMode(ClusteringPlanPartitionFilterMode.SELECTED_PARTITIONS)
.build())
.build();
PartitionAwareClusteringPlanStrategy sg = new SparkSizeBasedClusteringPlanStrategy(table, context, config);
ArrayList<String> fakeTimeBasedPartitionsPath = new ArrayList<>();
fakeTimeBasedPartitionsPath.add("20211220");
fakeTimeBasedPartitionsPath.add("20211221");
fakeTimeBasedPartitionsPath.add("20211222");
fakeTimeBasedPartitionsPath.add("20211224");
List list = sg.filterPartitionPaths(fakeTimeBasedPartitionsPath);
assertEquals(1, list.size());
assertSame("20211222", list.get(0));
}
}