diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/update/strategy/BaseSparkUpdateStrategy.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/update/strategy/BaseSparkUpdateStrategy.java new file mode 100644 index 000000000..655c11983 --- /dev/null +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/update/strategy/BaseSparkUpdateStrategy.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.client.clustering.update.strategy; + +import org.apache.hudi.client.common.HoodieSparkEngineContext; +import org.apache.hudi.common.model.HoodieFileGroupId; +import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.common.model.HoodieRecordPayload; +import org.apache.hudi.table.action.cluster.strategy.UpdateStrategy; + +import org.apache.spark.api.java.JavaRDD; + +import java.util.List; +import java.util.Set; + +/** + * Spark base update strategy, write records to the file groups which are in clustering + * need to check. Spark relate implementations should extend this base class. + */ +public abstract class BaseSparkUpdateStrategy> extends UpdateStrategy>> { + + public BaseSparkUpdateStrategy(HoodieSparkEngineContext engineContext, + Set fileGroupsInPendingClustering) { + super(engineContext, fileGroupsInPendingClustering); + } + + /** + * Get records matched file group ids. + * @param inputRecords the records to write, tagged with target file id + * @return the records matched file group ids + */ + protected List getGroupIdsWithUpdate(JavaRDD> inputRecords) { + return inputRecords + .filter(record -> record.getCurrentLocation() != null) + .map(record -> new HoodieFileGroupId(record.getPartitionPath(), record.getCurrentLocation().getFileId())).distinct().collect(); + } +} diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/update/strategy/SparkAllowUpdateStrategy.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/update/strategy/SparkAllowUpdateStrategy.java index 403a0c2e1..92a5fb69a 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/update/strategy/SparkAllowUpdateStrategy.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/update/strategy/SparkAllowUpdateStrategy.java @@ -23,7 +23,6 @@ import org.apache.hudi.common.model.HoodieFileGroupId; import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieRecordPayload; import org.apache.hudi.common.util.collection.Pair; -import org.apache.hudi.table.action.cluster.strategy.UpdateStrategy; import org.apache.spark.api.java.JavaRDD; @@ -35,20 +34,13 @@ import java.util.stream.Collectors; /** * Allow ingestion commits during clustering job. */ -public class SparkAllowUpdateStrategy> extends UpdateStrategy>> { +public class SparkAllowUpdateStrategy> extends BaseSparkUpdateStrategy { - public SparkAllowUpdateStrategy( - HoodieSparkEngineContext engineContext, HashSet fileGroupsInPendingClustering) { + public SparkAllowUpdateStrategy(HoodieSparkEngineContext engineContext, + HashSet fileGroupsInPendingClustering) { super(engineContext, fileGroupsInPendingClustering); } - private List getGroupIdsWithUpdate(JavaRDD> inputRecords) { - List fileGroupIdsWithUpdates = inputRecords - .filter(record -> record.getCurrentLocation() != null) - .map(record -> new HoodieFileGroupId(record.getPartitionPath(), record.getCurrentLocation().getFileId())).distinct().collect(); - return fileGroupIdsWithUpdates; - } - @Override public Pair>, Set> handleUpdate(JavaRDD> taggedRecordsRDD) { List fileGroupIdsWithRecordUpdate = getGroupIdsWithUpdate(taggedRecordsRDD); diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/update/strategy/SparkRejectUpdateStrategy.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/update/strategy/SparkRejectUpdateStrategy.java index ad60d9c88..ac058a4d8 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/update/strategy/SparkRejectUpdateStrategy.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/update/strategy/SparkRejectUpdateStrategy.java @@ -24,7 +24,6 @@ import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieRecordPayload; import org.apache.hudi.common.util.collection.Pair; import org.apache.hudi.exception.HoodieClusteringUpdateException; -import org.apache.hudi.table.action.cluster.strategy.UpdateStrategy; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; @@ -39,20 +38,14 @@ import java.util.Set; * Update strategy based on following. * if some file groups have update record, throw exception */ -public class SparkRejectUpdateStrategy> extends UpdateStrategy>> { +public class SparkRejectUpdateStrategy> extends BaseSparkUpdateStrategy { private static final Logger LOG = LogManager.getLogger(SparkRejectUpdateStrategy.class); - public SparkRejectUpdateStrategy(HoodieSparkEngineContext engineContext, HashSet fileGroupsInPendingClustering) { + public SparkRejectUpdateStrategy(HoodieSparkEngineContext engineContext, + HashSet fileGroupsInPendingClustering) { super(engineContext, fileGroupsInPendingClustering); } - private List getGroupIdsWithUpdate(JavaRDD> inputRecords) { - List fileGroupIdsWithUpdates = inputRecords - .filter(record -> record.getCurrentLocation() != null) - .map(record -> new HoodieFileGroupId(record.getPartitionPath(), record.getCurrentLocation().getFileId())).distinct().collect(); - return fileGroupIdsWithUpdates; - } - @Override public Pair>, Set> handleUpdate(JavaRDD> taggedRecordsRDD) { List fileGroupIdsWithRecordUpdate = getGroupIdsWithUpdate(taggedRecordsRDD);