1
0

[HUDI-2497] Refactor clean and restore actions in hudi-client module (#3734)

This commit is contained in:
Y Ethan Guo
2021-09-30 15:20:25 -07:00
committed by GitHub
parent 73e8ba7620
commit 46808dcb1f
22 changed files with 273 additions and 705 deletions

View File

@@ -20,10 +20,13 @@ package org.apache.hudi.table.action.clean;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hudi.avro.model.HoodieActionInstant;
import org.apache.hudi.avro.model.HoodieCleanMetadata; import org.apache.hudi.avro.model.HoodieCleanMetadata;
import org.apache.hudi.avro.model.HoodieCleanerPlan; import org.apache.hudi.avro.model.HoodieCleanerPlan;
import org.apache.hudi.common.HoodieCleanStat; import org.apache.hudi.common.HoodieCleanStat;
import org.apache.hudi.common.engine.HoodieEngineContext; import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.model.CleanFileInfo;
import org.apache.hudi.common.model.HoodieRecordPayload; import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.TimelineMetadataUtils; import org.apache.hudi.common.table.timeline.TimelineMetadataUtils;
@@ -31,29 +34,36 @@ import org.apache.hudi.common.util.CleanerUtils;
import org.apache.hudi.common.util.HoodieTimer; import org.apache.hudi.common.util.HoodieTimer;
import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ValidationUtils; import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.common.util.collection.ImmutablePair;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieIOException; import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.table.HoodieTable; import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.action.BaseActionExecutor; import org.apache.hudi.table.action.BaseActionExecutor;
import org.apache.log4j.LogManager; import org.apache.log4j.LogManager;
import org.apache.log4j.Logger; import org.apache.log4j.Logger;
import java.io.FileNotFoundException; import java.io.FileNotFoundException;
import java.io.IOException; import java.io.IOException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List; import java.util.List;
import java.util.Map;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import java.util.stream.Stream;
public abstract class BaseCleanActionExecutor<T extends HoodieRecordPayload, I, K, O> extends BaseActionExecutor<T, I, K, O, HoodieCleanMetadata> { public class CleanActionExecutor<T extends HoodieRecordPayload, I, K, O> extends BaseActionExecutor<T, I, K, O, HoodieCleanMetadata> {
private static final long serialVersionUID = 1L; private static final long serialVersionUID = 1L;
private static final Logger LOG = LogManager.getLogger(BaseCleanActionExecutor.class); private static final Logger LOG = LogManager.getLogger(CleanActionExecutor.class);
public BaseCleanActionExecutor(HoodieEngineContext context, HoodieWriteConfig config, HoodieTable<T, I, K, O> table, String instantTime) { public CleanActionExecutor(HoodieEngineContext context, HoodieWriteConfig config, HoodieTable<T, I, K, O> table, String instantTime) {
super(context, config, table, instantTime); super(context, config, table, instantTime);
} }
protected static Boolean deleteFileAndGetResult(FileSystem fs, String deletePathStr) throws IOException { static Boolean deleteFileAndGetResult(FileSystem fs, String deletePathStr) throws IOException {
Path deletePath = new Path(deletePathStr); Path deletePath = new Path(deletePathStr);
LOG.debug("Working on delete path :" + deletePath); LOG.debug("Working on delete path :" + deletePath);
try { try {
@@ -68,13 +78,85 @@ public abstract class BaseCleanActionExecutor<T extends HoodieRecordPayload, I,
} }
} }
static Stream<Pair<String, PartitionCleanStat>> deleteFilesFunc(Iterator<Pair<String, CleanFileInfo>> cleanFileInfo, HoodieTable table) {
Map<String, PartitionCleanStat> partitionCleanStatMap = new HashMap<>();
FileSystem fs = table.getMetaClient().getFs();
cleanFileInfo.forEachRemaining(partitionDelFileTuple -> {
String partitionPath = partitionDelFileTuple.getLeft();
Path deletePath = new Path(partitionDelFileTuple.getRight().getFilePath());
String deletePathStr = deletePath.toString();
Boolean deletedFileResult = null;
try {
deletedFileResult = deleteFileAndGetResult(fs, deletePathStr);
} catch (IOException e) {
LOG.error("Delete file failed: " + deletePathStr);
}
final PartitionCleanStat partitionCleanStat =
partitionCleanStatMap.computeIfAbsent(partitionPath, k -> new PartitionCleanStat(partitionPath));
boolean isBootstrapBasePathFile = partitionDelFileTuple.getRight().isBootstrapBaseFile();
if (isBootstrapBasePathFile) {
// For Bootstrap Base file deletions, store the full file path.
partitionCleanStat.addDeleteFilePatterns(deletePath.toString(), true);
partitionCleanStat.addDeletedFileResult(deletePath.toString(), deletedFileResult, true);
} else {
partitionCleanStat.addDeleteFilePatterns(deletePath.getName(), false);
partitionCleanStat.addDeletedFileResult(deletePath.getName(), deletedFileResult, false);
}
});
return partitionCleanStatMap.entrySet().stream().map(e -> Pair.of(e.getKey(), e.getValue()));
}
/** /**
* Performs cleaning of partition paths according to cleaning policy and returns the number of files cleaned. Handles * Performs cleaning of partition paths according to cleaning policy and returns the number of files cleaned. Handles
* skews in partitions to clean by making files to clean as the unit of task distribution. * skews in partitions to clean by making files to clean as the unit of task distribution.
* *
* @throws IllegalArgumentException if unknown cleaning policy is provided * @throws IllegalArgumentException if unknown cleaning policy is provided
*/ */
abstract List<HoodieCleanStat> clean(HoodieEngineContext context, HoodieCleanerPlan cleanerPlan); List<HoodieCleanStat> clean(HoodieEngineContext context, HoodieCleanerPlan cleanerPlan) {
int cleanerParallelism = Math.min(
(int) (cleanerPlan.getFilePathsToBeDeletedPerPartition().values().stream().mapToInt(List::size).count()),
config.getCleanerParallelism());
LOG.info("Using cleanerParallelism: " + cleanerParallelism);
context.setJobStatus(this.getClass().getSimpleName(), "Perform cleaning of partitions");
Stream<Pair<String, CleanFileInfo>> filesToBeDeletedPerPartition =
cleanerPlan.getFilePathsToBeDeletedPerPartition().entrySet().stream()
.flatMap(x -> x.getValue().stream().map(y -> new ImmutablePair<>(x.getKey(),
new CleanFileInfo(y.getFilePath(), y.getIsBootstrapBaseFile()))));
Stream<ImmutablePair<String, PartitionCleanStat>> partitionCleanStats =
context.mapPartitionsToPairAndReduceByKey(filesToBeDeletedPerPartition,
iterator -> deleteFilesFunc(iterator, table), PartitionCleanStat::merge, cleanerParallelism);
Map<String, PartitionCleanStat> partitionCleanStatsMap = partitionCleanStats
.collect(Collectors.toMap(Pair::getKey, Pair::getValue));
// Return PartitionCleanStat for each partition passed.
return cleanerPlan.getFilePathsToBeDeletedPerPartition().keySet().stream().map(partitionPath -> {
PartitionCleanStat partitionCleanStat = partitionCleanStatsMap.containsKey(partitionPath)
? partitionCleanStatsMap.get(partitionPath)
: new PartitionCleanStat(partitionPath);
HoodieActionInstant actionInstant = cleanerPlan.getEarliestInstantToRetain();
return HoodieCleanStat.newBuilder().withPolicy(config.getCleanerPolicy()).withPartitionPath(partitionPath)
.withEarliestCommitRetained(Option.ofNullable(
actionInstant != null
? new HoodieInstant(HoodieInstant.State.valueOf(actionInstant.getState()),
actionInstant.getAction(), actionInstant.getTimestamp())
: null))
.withDeletePathPattern(partitionCleanStat.deletePathPatterns())
.withSuccessfulDeletes(partitionCleanStat.successDeleteFiles())
.withFailedDeletes(partitionCleanStat.failedDeleteFiles())
.withDeleteBootstrapBasePathPatterns(partitionCleanStat.getDeleteBootstrapBasePathPatterns())
.withSuccessfulDeleteBootstrapBaseFiles(partitionCleanStat.getSuccessfulDeleteBootstrapBaseFiles())
.withFailedDeleteBootstrapBaseFiles(partitionCleanStat.getFailedDeleteBootstrapBaseFiles())
.build();
}).collect(Collectors.toList());
}
/** /**
* Executes the Cleaner plan stored in the instant metadata. * Executes the Cleaner plan stored in the instant metadata.
@@ -143,7 +225,7 @@ public abstract class BaseCleanActionExecutor<T extends HoodieRecordPayload, I,
} }
// return the last clean metadata for now // return the last clean metadata for now
// TODO (NA) : Clean only the earliest pending clean just like how we do for other table services // TODO (NA) : Clean only the earliest pending clean just like how we do for other table services
// This requires the BaseCleanActionExecutor to be refactored as BaseCommitActionExecutor // This requires the CleanActionExecutor to be refactored as BaseCommitActionExecutor
return cleanMetadataList.size() > 0 ? cleanMetadataList.get(cleanMetadataList.size() - 1) : null; return cleanMetadataList.size() > 0 ? cleanMetadataList.get(cleanMetadataList.size() - 1) : null;
} }
} }

View File

@@ -43,13 +43,13 @@ import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.stream.Collectors; import java.util.stream.Collectors;
public abstract class BaseCleanPlanActionExecutor<T extends HoodieRecordPayload, I, K, O> extends BaseActionExecutor<T, I, K, O, Option<HoodieCleanerPlan>> { public class CleanPlanActionExecutor<T extends HoodieRecordPayload, I, K, O> extends BaseActionExecutor<T, I, K, O, Option<HoodieCleanerPlan>> {
private static final Logger LOG = LogManager.getLogger(CleanPlanner.class); private static final Logger LOG = LogManager.getLogger(CleanPlanner.class);
private final Option<Map<String, String>> extraMetadata; private final Option<Map<String, String>> extraMetadata;
public BaseCleanPlanActionExecutor(HoodieEngineContext context, public CleanPlanActionExecutor(HoodieEngineContext context,
HoodieWriteConfig config, HoodieWriteConfig config,
HoodieTable<T, I, K, O> table, HoodieTable<T, I, K, O> table,
String instantTime, String instantTime,
@@ -58,7 +58,9 @@ public abstract class BaseCleanPlanActionExecutor<T extends HoodieRecordPayload,
this.extraMetadata = extraMetadata; this.extraMetadata = extraMetadata;
} }
protected abstract Option<HoodieCleanerPlan> createCleanerPlan(); protected Option<HoodieCleanerPlan> createCleanerPlan() {
return execute();
}
/** /**
* Generates List of files to be cleaned. * Generates List of files to be cleaned.

View File

@@ -9,20 +9,18 @@
* *
* http://www.apache.org/licenses/LICENSE-2.0 * http://www.apache.org/licenses/LICENSE-2.0
* *
* Unless required by applicable law or agreed to in writing, software * Unless required by applicable law or agreed to in writing,
* distributed under the License is distributed on an "AS IS" BASIS, * software distributed under the License is distributed on an
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* See the License for the specific language governing permissions and * KIND, either express or implied. See the License for the
* limitations under the License. * specific language governing permissions and limitations
* under the License.
*/ */
package org.apache.hudi.table.action.restore; package org.apache.hudi.table.action.restore;
import org.apache.hudi.avro.model.HoodieRollbackMetadata; import org.apache.hudi.avro.model.HoodieRollbackMetadata;
import org.apache.hudi.client.WriteStatus; import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.client.common.HoodieJavaEngineContext;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordPayload; import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline; import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.table.timeline.HoodieInstant;
@@ -32,12 +30,9 @@ import org.apache.hudi.exception.HoodieRollbackException;
import org.apache.hudi.table.HoodieTable; import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.action.rollback.CopyOnWriteRollbackActionExecutor; import org.apache.hudi.table.action.rollback.CopyOnWriteRollbackActionExecutor;
import java.util.List; public class CopyOnWriteRestoreActionExecutor<T extends HoodieRecordPayload, I, K, O>
extends BaseRestoreActionExecutor<T, I, K, O> {
public class JavaCopyOnWriteRestoreActionExecutor<T extends HoodieRecordPayload> extends public CopyOnWriteRestoreActionExecutor(HoodieEngineContext context,
BaseRestoreActionExecutor<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> {
public JavaCopyOnWriteRestoreActionExecutor(HoodieJavaEngineContext context,
HoodieWriteConfig config, HoodieWriteConfig config,
HoodieTable table, HoodieTable table,
String instantTime, String instantTime,
@@ -47,20 +42,23 @@ public class JavaCopyOnWriteRestoreActionExecutor<T extends HoodieRecordPayload>
@Override @Override
protected HoodieRollbackMetadata rollbackInstant(HoodieInstant instantToRollback) { protected HoodieRollbackMetadata rollbackInstant(HoodieInstant instantToRollback) {
if (!instantToRollback.getAction().equals(HoodieTimeline.COMMIT_ACTION)
&& !instantToRollback.getAction().equals(HoodieTimeline.REPLACE_COMMIT_ACTION)) {
throw new HoodieRollbackException("Unsupported action in rollback instant:" + instantToRollback);
}
table.getMetaClient().reloadActiveTimeline();
String newInstantTime = HoodieActiveTimeline.createNewInstantTime();
table.scheduleRollback(context, newInstantTime, instantToRollback, false);
table.getMetaClient().reloadActiveTimeline(); table.getMetaClient().reloadActiveTimeline();
CopyOnWriteRollbackActionExecutor rollbackActionExecutor = new CopyOnWriteRollbackActionExecutor( CopyOnWriteRollbackActionExecutor rollbackActionExecutor = new CopyOnWriteRollbackActionExecutor(
context, context,
config, config,
table, table,
HoodieActiveTimeline.createNewInstantTime(), newInstantTime,
instantToRollback, instantToRollback,
true, true,
true, true,
false); false);
if (!instantToRollback.getAction().equals(HoodieTimeline.COMMIT_ACTION)
&& !instantToRollback.getAction().equals(HoodieTimeline.REPLACE_COMMIT_ACTION)) {
throw new HoodieRollbackException("Unsupported action in rollback instant:" + instantToRollback);
}
return rollbackActionExecutor.execute(); return rollbackActionExecutor.execute();
} }
} }

View File

@@ -9,20 +9,18 @@
* *
* http://www.apache.org/licenses/LICENSE-2.0 * http://www.apache.org/licenses/LICENSE-2.0
* *
* Unless required by applicable law or agreed to in writing, software * Unless required by applicable law or agreed to in writing,
* distributed under the License is distributed on an "AS IS" BASIS, * software distributed under the License is distributed on an
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* See the License for the specific language governing permissions and * KIND, either express or implied. See the License for the
* limitations under the License. * specific language governing permissions and limitations
* under the License.
*/ */
package org.apache.hudi.table.action.restore; package org.apache.hudi.table.action.restore;
import org.apache.hudi.avro.model.HoodieRollbackMetadata; import org.apache.hudi.avro.model.HoodieRollbackMetadata;
import org.apache.hudi.client.WriteStatus; import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.client.common.HoodieSparkEngineContext;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordPayload; import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline; import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.table.timeline.HoodieInstant;
@@ -31,17 +29,10 @@ import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.table.HoodieTable; import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.action.rollback.MergeOnReadRollbackActionExecutor; import org.apache.hudi.table.action.rollback.MergeOnReadRollbackActionExecutor;
import org.apache.spark.api.java.JavaRDD; public class MergeOnReadRestoreActionExecutor<T extends HoodieRecordPayload, I, K, O>
extends BaseRestoreActionExecutor<T, I, K, O> {
@SuppressWarnings("checkstyle:LineLength") public MergeOnReadRestoreActionExecutor(HoodieEngineContext context, HoodieWriteConfig config, HoodieTable<T, I, K, O> table,
public class SparkMergeOnReadRestoreActionExecutor<T extends HoodieRecordPayload> extends String instantTime, String restoreInstantTime) {
BaseRestoreActionExecutor<T, JavaRDD<HoodieRecord<T>>, JavaRDD<HoodieKey>, JavaRDD<WriteStatus>> {
public SparkMergeOnReadRestoreActionExecutor(HoodieSparkEngineContext context,
HoodieWriteConfig config,
HoodieTable table,
String instantTime,
String restoreInstantTime) {
super(context, config, table, instantTime, restoreInstantTime); super(context, config, table, instantTime, restoreInstantTime);
} }

View File

@@ -26,11 +26,13 @@ import org.apache.hudi.common.engine.TaskContextSupplier;
import org.apache.hudi.common.function.SerializableBiFunction; import org.apache.hudi.common.function.SerializableBiFunction;
import org.apache.hudi.common.function.SerializableConsumer; import org.apache.hudi.common.function.SerializableConsumer;
import org.apache.hudi.common.function.SerializableFunction; import org.apache.hudi.common.function.SerializableFunction;
import org.apache.hudi.common.function.SerializablePairFlatMapFunction;
import org.apache.hudi.common.function.SerializablePairFunction; import org.apache.hudi.common.function.SerializablePairFunction;
import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.Option;
import org.apache.flink.api.common.functions.RuntimeContext; import org.apache.flink.api.common.functions.RuntimeContext;
import java.util.Iterator;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Objects; import java.util.Objects;
@@ -38,9 +40,11 @@ import java.util.function.Supplier;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import java.util.stream.Stream; import java.util.stream.Stream;
import org.apache.hudi.common.util.collection.ImmutablePair;
import org.apache.hudi.common.util.collection.Pair; import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.util.FlinkClientUtil; import org.apache.hudi.util.FlinkClientUtil;
import static org.apache.hudi.common.function.FunctionWrapper.throwingFlatMapToPairWrapper;
import static org.apache.hudi.common.function.FunctionWrapper.throwingFlatMapWrapper; import static org.apache.hudi.common.function.FunctionWrapper.throwingFlatMapWrapper;
import static org.apache.hudi.common.function.FunctionWrapper.throwingForeachWrapper; import static org.apache.hudi.common.function.FunctionWrapper.throwingForeachWrapper;
import static org.apache.hudi.common.function.FunctionWrapper.throwingMapToPairWrapper; import static org.apache.hudi.common.function.FunctionWrapper.throwingMapToPairWrapper;
@@ -86,6 +90,17 @@ public class HoodieFlinkEngineContext extends HoodieEngineContext {
.collect(Collectors.toList()); .collect(Collectors.toList());
} }
@Override
public <I, K, V> Stream<ImmutablePair<K, V>> mapPartitionsToPairAndReduceByKey(
Stream<I> data, SerializablePairFlatMapFunction<Iterator<I>, K, V> flatMapToPairFunc,
SerializableBiFunction<V, V, V> reduceFunc, int parallelism) {
return throwingFlatMapToPairWrapper(flatMapToPairFunc).apply(data.parallel().iterator())
.collect(Collectors.groupingBy(Pair::getKey)).entrySet().stream()
.map(entry -> new ImmutablePair<>(entry.getKey(), entry.getValue().stream().map(
Pair::getValue).reduce(throwingReduceWrapper(reduceFunc)).orElse(null)))
.filter(Objects::nonNull);
}
@Override @Override
public <I, K, V> List<V> reduceByKey( public <I, K, V> List<V> reduceByKey(
List<Pair<K, V>> data, SerializableBiFunction<V, V, V> reduceFunc, int parallelism) { List<Pair<K, V>> data, SerializableBiFunction<V, V, V> reduceFunc, int parallelism) {

View File

@@ -44,8 +44,8 @@ import org.apache.hudi.io.HoodieSortedMergeHandle;
import org.apache.hudi.io.HoodieWriteHandle; import org.apache.hudi.io.HoodieWriteHandle;
import org.apache.hudi.table.action.HoodieWriteMetadata; import org.apache.hudi.table.action.HoodieWriteMetadata;
import org.apache.hudi.table.action.bootstrap.HoodieBootstrapWriteMetadata; import org.apache.hudi.table.action.bootstrap.HoodieBootstrapWriteMetadata;
import org.apache.hudi.table.action.clean.FlinkCleanActionExecutor; import org.apache.hudi.table.action.clean.CleanActionExecutor;
import org.apache.hudi.table.action.clean.FlinkScheduleCleanActionExecutor; import org.apache.hudi.table.action.clean.CleanPlanActionExecutor;
import org.apache.hudi.table.action.commit.FlinkDeleteCommitActionExecutor; import org.apache.hudi.table.action.commit.FlinkDeleteCommitActionExecutor;
import org.apache.hudi.table.action.commit.FlinkInsertCommitActionExecutor; import org.apache.hudi.table.action.commit.FlinkInsertCommitActionExecutor;
import org.apache.hudi.table.action.commit.FlinkInsertOverwriteCommitActionExecutor; import org.apache.hudi.table.action.commit.FlinkInsertOverwriteCommitActionExecutor;
@@ -297,7 +297,7 @@ public class HoodieFlinkCopyOnWriteTable<T extends HoodieRecordPayload> extends
*/ */
@Override @Override
public Option<HoodieCleanerPlan> scheduleCleaning(HoodieEngineContext context, String instantTime, Option<Map<String, String>> extraMetadata) { public Option<HoodieCleanerPlan> scheduleCleaning(HoodieEngineContext context, String instantTime, Option<Map<String, String>> extraMetadata) {
return new FlinkScheduleCleanActionExecutor(context, config, this, instantTime, extraMetadata).execute(); return new CleanPlanActionExecutor(context, config, this, instantTime, extraMetadata).execute();
} }
@Override @Override
@@ -308,7 +308,7 @@ public class HoodieFlinkCopyOnWriteTable<T extends HoodieRecordPayload> extends
@Override @Override
public HoodieCleanMetadata clean(HoodieEngineContext context, String cleanInstantTime) { public HoodieCleanMetadata clean(HoodieEngineContext context, String cleanInstantTime) {
return new FlinkCleanActionExecutor(context, config, this, cleanInstantTime).execute(); return new CleanActionExecutor(context, config, this, cleanInstantTime).execute();
} }
@Override @Override

View File

@@ -1,128 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.table.action.clean;
import org.apache.hudi.avro.model.HoodieActionInstant;
import org.apache.hudi.avro.model.HoodieCleanerPlan;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.common.HoodieCleanStat;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.model.CleanFileInfo;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.table.HoodieTable;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import scala.Tuple2;
public class FlinkCleanActionExecutor<T extends HoodieRecordPayload> extends
BaseCleanActionExecutor<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> {
private static final Logger LOG = LogManager.getLogger(FlinkCleanActionExecutor.class);
public FlinkCleanActionExecutor(HoodieEngineContext context,
HoodieWriteConfig config,
HoodieTable<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> table,
String instantTime) {
super(context, config, table, instantTime);
}
@Override
List<HoodieCleanStat> clean(HoodieEngineContext context, HoodieCleanerPlan cleanerPlan) {
Stream<Tuple2<String, CleanFileInfo>> filesToBeDeletedPerPartition = cleanerPlan.getFilePathsToBeDeletedPerPartition().entrySet().stream()
.flatMap(x -> x.getValue().stream().map(y -> new Tuple2<>(x.getKey(), new CleanFileInfo(y.getFilePath(), y.getIsBootstrapBaseFile()))));
Stream<Tuple2<String, PartitionCleanStat>> partitionCleanStats =
deleteFilesFunc(filesToBeDeletedPerPartition, table)
.collect(Collectors.groupingBy(Pair::getLeft))
.entrySet().stream()
.map(x -> new Tuple2(x.getKey(), x.getValue().stream().map(y -> y.getRight()).reduce(PartitionCleanStat::merge).get()));
Map<String, PartitionCleanStat> partitionCleanStatsMap = partitionCleanStats
.collect(Collectors.toMap(Tuple2::_1, Tuple2::_2));
// Return PartitionCleanStat for each partition passed.
return cleanerPlan.getFilePathsToBeDeletedPerPartition().keySet().stream().map(partitionPath -> {
PartitionCleanStat partitionCleanStat = partitionCleanStatsMap.containsKey(partitionPath)
? partitionCleanStatsMap.get(partitionPath)
: new PartitionCleanStat(partitionPath);
HoodieActionInstant actionInstant = cleanerPlan.getEarliestInstantToRetain();
return HoodieCleanStat.newBuilder().withPolicy(config.getCleanerPolicy()).withPartitionPath(partitionPath)
.withEarliestCommitRetained(Option.ofNullable(
actionInstant != null
? new HoodieInstant(HoodieInstant.State.valueOf(actionInstant.getState()),
actionInstant.getAction(), actionInstant.getTimestamp())
: null))
.withDeletePathPattern(partitionCleanStat.deletePathPatterns())
.withSuccessfulDeletes(partitionCleanStat.successDeleteFiles())
.withFailedDeletes(partitionCleanStat.failedDeleteFiles())
.withDeleteBootstrapBasePathPatterns(partitionCleanStat.getDeleteBootstrapBasePathPatterns())
.withSuccessfulDeleteBootstrapBaseFiles(partitionCleanStat.getSuccessfulDeleteBootstrapBaseFiles())
.withFailedDeleteBootstrapBaseFiles(partitionCleanStat.getFailedDeleteBootstrapBaseFiles())
.build();
}).collect(Collectors.toList());
}
private static Stream<Pair<String, PartitionCleanStat>> deleteFilesFunc(Stream<Tuple2<String, CleanFileInfo>> cleanFileInfo, HoodieTable table) {
Map<String, PartitionCleanStat> partitionCleanStatMap = new HashMap<>();
FileSystem fs = table.getMetaClient().getFs();
cleanFileInfo.parallel().forEach(partitionDelFileTuple -> {
String partitionPath = partitionDelFileTuple._1();
Path deletePath = new Path(partitionDelFileTuple._2().getFilePath());
String deletePathStr = deletePath.toString();
Boolean deletedFileResult = null;
try {
deletedFileResult = deleteFileAndGetResult(fs, deletePathStr);
} catch (IOException e) {
LOG.error("Delete file failed");
}
final PartitionCleanStat partitionCleanStat;
synchronized (partitionCleanStatMap) {
partitionCleanStat = partitionCleanStatMap.computeIfAbsent(partitionPath, k -> new PartitionCleanStat(partitionPath));
}
boolean isBootstrapBasePathFile = partitionDelFileTuple._2().isBootstrapBaseFile();
if (isBootstrapBasePathFile) {
// For Bootstrap Base file deletions, store the full file path.
partitionCleanStat.addDeleteFilePatterns(deletePath.toString(), true);
partitionCleanStat.addDeletedFileResult(deletePath.toString(), deletedFileResult, true);
} else {
partitionCleanStat.addDeleteFilePatterns(deletePath.getName(), false);
partitionCleanStat.addDeletedFileResult(deletePath.getName(), deletedFileResult, false);
}
});
return partitionCleanStatMap.entrySet().stream().map(e -> Pair.of(e.getKey(), e.getValue()));
}
}

View File

@@ -1,52 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.table.action.clean;
import java.util.List;
import java.util.Map;
import org.apache.hudi.avro.model.HoodieCleanerPlan;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.table.HoodieTable;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
public class FlinkScheduleCleanActionExecutor<T extends HoodieRecordPayload> extends
BaseCleanPlanActionExecutor<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> {
private static final Logger LOG = LogManager.getLogger(FlinkScheduleCleanActionExecutor.class);
public FlinkScheduleCleanActionExecutor(HoodieEngineContext context,
HoodieWriteConfig config,
HoodieTable<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> table,
String instantTime,
Option<Map<String, String>> extraMetadata) {
super(context, config, table, instantTime, extraMetadata);
}
@Override
protected Option<HoodieCleanerPlan> createCleanerPlan() {
return super.execute();
}
}

View File

@@ -19,6 +19,7 @@
package org.apache.hudi.client.common; package org.apache.hudi.client.common;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hudi.common.config.SerializableConfiguration; import org.apache.hudi.common.config.SerializableConfiguration;
import org.apache.hudi.common.engine.EngineProperty; import org.apache.hudi.common.engine.EngineProperty;
import org.apache.hudi.common.engine.HoodieEngineContext; import org.apache.hudi.common.engine.HoodieEngineContext;
@@ -26,11 +27,14 @@ import org.apache.hudi.common.engine.TaskContextSupplier;
import org.apache.hudi.common.function.SerializableBiFunction; import org.apache.hudi.common.function.SerializableBiFunction;
import org.apache.hudi.common.function.SerializableConsumer; import org.apache.hudi.common.function.SerializableConsumer;
import org.apache.hudi.common.function.SerializableFunction; import org.apache.hudi.common.function.SerializableFunction;
import org.apache.hudi.common.function.SerializablePairFlatMapFunction;
import org.apache.hudi.common.function.SerializablePairFunction; import org.apache.hudi.common.function.SerializablePairFunction;
import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.ImmutablePair;
import org.apache.hudi.common.util.collection.Pair; import org.apache.hudi.common.util.collection.Pair;
import java.util.Iterator;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Objects; import java.util.Objects;
@@ -38,6 +42,7 @@ import java.util.stream.Collectors;
import java.util.stream.Stream; import java.util.stream.Stream;
import static java.util.stream.Collectors.toList; import static java.util.stream.Collectors.toList;
import static org.apache.hudi.common.function.FunctionWrapper.throwingFlatMapToPairWrapper;
import static org.apache.hudi.common.function.FunctionWrapper.throwingFlatMapWrapper; import static org.apache.hudi.common.function.FunctionWrapper.throwingFlatMapWrapper;
import static org.apache.hudi.common.function.FunctionWrapper.throwingForeachWrapper; import static org.apache.hudi.common.function.FunctionWrapper.throwingForeachWrapper;
import static org.apache.hudi.common.function.FunctionWrapper.throwingMapToPairWrapper; import static org.apache.hudi.common.function.FunctionWrapper.throwingMapToPairWrapper;
@@ -70,6 +75,16 @@ public class HoodieJavaEngineContext extends HoodieEngineContext {
.collect(Collectors.toList()); .collect(Collectors.toList());
} }
@Override
public <I, K, V> Stream<ImmutablePair<K, V>> mapPartitionsToPairAndReduceByKey(Stream<I> data, SerializablePairFlatMapFunction<Iterator<I>, K, V> flatMapToPairFunc,
SerializableBiFunction<V, V, V> reduceFunc, int parallelism) {
return throwingFlatMapToPairWrapper(flatMapToPairFunc).apply(data.parallel().iterator())
.collect(Collectors.groupingBy(Pair::getKey)).entrySet().stream()
.map(entry -> new ImmutablePair<>(entry.getKey(), entry.getValue().stream().map(
Pair::getValue).reduce(throwingReduceWrapper(reduceFunc)).orElse(null)))
.filter(Objects::nonNull);
}
@Override @Override
public <I, K, V> List<V> reduceByKey( public <I, K, V> List<V> reduceByKey(
List<Pair<K, V>> data, SerializableBiFunction<V, V, V> reduceFunc, int parallelism) { List<Pair<K, V>> data, SerializableBiFunction<V, V, V> reduceFunc, int parallelism) {

View File

@@ -39,8 +39,8 @@ import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieNotSupportedException; import org.apache.hudi.exception.HoodieNotSupportedException;
import org.apache.hudi.table.action.HoodieWriteMetadata; import org.apache.hudi.table.action.HoodieWriteMetadata;
import org.apache.hudi.table.action.bootstrap.HoodieBootstrapWriteMetadata; import org.apache.hudi.table.action.bootstrap.HoodieBootstrapWriteMetadata;
import org.apache.hudi.table.action.clean.JavaCleanActionExecutor; import org.apache.hudi.table.action.clean.CleanActionExecutor;
import org.apache.hudi.table.action.clean.JavaScheduleCleanActionExecutor; import org.apache.hudi.table.action.clean.CleanPlanActionExecutor;
import org.apache.hudi.table.action.commit.JavaDeleteCommitActionExecutor; import org.apache.hudi.table.action.commit.JavaDeleteCommitActionExecutor;
import org.apache.hudi.table.action.commit.JavaBulkInsertCommitActionExecutor; import org.apache.hudi.table.action.commit.JavaBulkInsertCommitActionExecutor;
import org.apache.hudi.table.action.commit.JavaBulkInsertPreppedCommitActionExecutor; import org.apache.hudi.table.action.commit.JavaBulkInsertPreppedCommitActionExecutor;
@@ -50,7 +50,7 @@ import org.apache.hudi.table.action.commit.JavaInsertOverwriteTableCommitActionE
import org.apache.hudi.table.action.commit.JavaInsertPreppedCommitActionExecutor; import org.apache.hudi.table.action.commit.JavaInsertPreppedCommitActionExecutor;
import org.apache.hudi.table.action.commit.JavaUpsertCommitActionExecutor; import org.apache.hudi.table.action.commit.JavaUpsertCommitActionExecutor;
import org.apache.hudi.table.action.commit.JavaUpsertPreppedCommitActionExecutor; import org.apache.hudi.table.action.commit.JavaUpsertPreppedCommitActionExecutor;
import org.apache.hudi.table.action.restore.JavaCopyOnWriteRestoreActionExecutor; import org.apache.hudi.table.action.restore.CopyOnWriteRestoreActionExecutor;
import org.apache.hudi.table.action.rollback.BaseRollbackPlanActionExecutor; import org.apache.hudi.table.action.rollback.BaseRollbackPlanActionExecutor;
import org.apache.hudi.table.action.rollback.CopyOnWriteRollbackActionExecutor; import org.apache.hudi.table.action.rollback.CopyOnWriteRollbackActionExecutor;
import org.apache.hudi.table.action.savepoint.SavepointActionExecutor; import org.apache.hudi.table.action.savepoint.SavepointActionExecutor;
@@ -187,13 +187,13 @@ public class HoodieJavaCopyOnWriteTable<T extends HoodieRecordPayload> extends H
@Override @Override
public Option<HoodieCleanerPlan> scheduleCleaning(HoodieEngineContext context, String instantTime, Option<Map<String, String>> extraMetadata) { public Option<HoodieCleanerPlan> scheduleCleaning(HoodieEngineContext context, String instantTime, Option<Map<String, String>> extraMetadata) {
return new JavaScheduleCleanActionExecutor<>(context, config, this, instantTime, extraMetadata).execute(); return new CleanPlanActionExecutor<>(context, config, this, instantTime, extraMetadata).execute();
} }
@Override @Override
public HoodieCleanMetadata clean(HoodieEngineContext context, public HoodieCleanMetadata clean(HoodieEngineContext context,
String cleanInstantTime) { String cleanInstantTime) {
return new JavaCleanActionExecutor(context, config, this, cleanInstantTime).execute(); return new CleanActionExecutor(context, config, this, cleanInstantTime).execute();
} }
@Override @Override
@@ -218,7 +218,7 @@ public class HoodieJavaCopyOnWriteTable<T extends HoodieRecordPayload> extends H
public HoodieRestoreMetadata restore(HoodieEngineContext context, public HoodieRestoreMetadata restore(HoodieEngineContext context,
String restoreInstantTime, String restoreInstantTime,
String instantToRestore) { String instantToRestore) {
return new JavaCopyOnWriteRestoreActionExecutor((HoodieJavaEngineContext) context, return new CopyOnWriteRestoreActionExecutor(
config, this, restoreInstantTime, instantToRestore).execute(); context, config, this, restoreInstantTime, instantToRestore).execute();
} }
} }

View File

@@ -1,130 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.table.action.clean;
import org.apache.hudi.avro.model.HoodieActionInstant;
import org.apache.hudi.avro.model.HoodieCleanerPlan;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.common.HoodieCleanStat;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.model.CleanFileInfo;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.ImmutablePair;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.table.HoodieTable;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import java.io.IOException;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import java.util.stream.Stream;
public class JavaCleanActionExecutor<T extends HoodieRecordPayload> extends
BaseCleanActionExecutor<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> {
private static final Logger LOG = LogManager.getLogger(JavaCleanActionExecutor.class);
public JavaCleanActionExecutor(HoodieEngineContext context,
HoodieWriteConfig config,
HoodieTable<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> table,
String instantTime) {
super(context, config, table, instantTime);
}
@Override
List<HoodieCleanStat> clean(HoodieEngineContext context, HoodieCleanerPlan cleanerPlan) {
Iterator<ImmutablePair<String, CleanFileInfo>> filesToBeDeletedPerPartition = cleanerPlan.getFilePathsToBeDeletedPerPartition().entrySet().stream()
.flatMap(x -> x.getValue().stream().map(y -> new ImmutablePair<>(x.getKey(), new CleanFileInfo(y.getFilePath(), y.getIsBootstrapBaseFile())))).iterator();
Stream<Pair<String, PartitionCleanStat>> partitionCleanStats =
deleteFilesFunc(filesToBeDeletedPerPartition, table)
.collect(Collectors.groupingBy(Pair::getLeft))
.entrySet().stream()
.map(x -> new ImmutablePair(x.getKey(), x.getValue().stream().map(y -> y.getRight()).reduce(PartitionCleanStat::merge).get()));
Map<String, PartitionCleanStat> partitionCleanStatsMap = partitionCleanStats
.collect(Collectors.toMap(Pair::getLeft, Pair::getRight));
// Return PartitionCleanStat for each partition passed.
return cleanerPlan.getFilePathsToBeDeletedPerPartition().keySet().stream().map(partitionPath -> {
PartitionCleanStat partitionCleanStat = partitionCleanStatsMap.containsKey(partitionPath)
? partitionCleanStatsMap.get(partitionPath)
: new PartitionCleanStat(partitionPath);
HoodieActionInstant actionInstant = cleanerPlan.getEarliestInstantToRetain();
return HoodieCleanStat.newBuilder().withPolicy(config.getCleanerPolicy()).withPartitionPath(partitionPath)
.withEarliestCommitRetained(Option.ofNullable(
actionInstant != null
? new HoodieInstant(HoodieInstant.State.valueOf(actionInstant.getState()),
actionInstant.getAction(), actionInstant.getTimestamp())
: null))
.withDeletePathPattern(partitionCleanStat.deletePathPatterns())
.withSuccessfulDeletes(partitionCleanStat.successDeleteFiles())
.withFailedDeletes(partitionCleanStat.failedDeleteFiles())
.withDeleteBootstrapBasePathPatterns(partitionCleanStat.getDeleteBootstrapBasePathPatterns())
.withSuccessfulDeleteBootstrapBaseFiles(partitionCleanStat.getSuccessfulDeleteBootstrapBaseFiles())
.withFailedDeleteBootstrapBaseFiles(partitionCleanStat.getFailedDeleteBootstrapBaseFiles())
.build();
}).collect(Collectors.toList());
}
private static Stream<Pair<String, PartitionCleanStat>> deleteFilesFunc(Iterator<ImmutablePair<String, CleanFileInfo>> iter, HoodieTable table) {
Map<String, PartitionCleanStat> partitionCleanStatMap = new HashMap<>();
FileSystem fs = table.getMetaClient().getFs();
while (iter.hasNext()) {
Pair<String, CleanFileInfo> partitionDelFileTuple = iter.next();
String partitionPath = partitionDelFileTuple.getLeft();
Path deletePath = new Path(partitionDelFileTuple.getRight().getFilePath());
String deletePathStr = deletePath.toString();
Boolean deletedFileResult = null;
try {
deletedFileResult = deleteFileAndGetResult(fs, deletePathStr);
} catch (IOException e) {
LOG.error("Delete file failed");
}
if (!partitionCleanStatMap.containsKey(partitionPath)) {
partitionCleanStatMap.put(partitionPath, new PartitionCleanStat(partitionPath));
}
boolean isBootstrapBasePathFile = partitionDelFileTuple.getRight().isBootstrapBaseFile();
PartitionCleanStat partitionCleanStat = partitionCleanStatMap.get(partitionPath);
if (isBootstrapBasePathFile) {
// For Bootstrap Base file deletions, store the full file path.
partitionCleanStat.addDeleteFilePatterns(deletePath.toString(), true);
partitionCleanStat.addDeletedFileResult(deletePath.toString(), deletedFileResult, true);
} else {
partitionCleanStat.addDeleteFilePatterns(deletePath.getName(), false);
partitionCleanStat.addDeletedFileResult(deletePath.getName(), deletedFileResult, false);
}
}
return partitionCleanStatMap.entrySet().stream().map(e -> Pair.of(e.getKey(), e.getValue()));
}
}

View File

@@ -1,52 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.table.action.clean;
import java.util.List;
import java.util.Map;
import org.apache.hudi.avro.model.HoodieCleanerPlan;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.table.HoodieTable;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
public class JavaScheduleCleanActionExecutor<T extends HoodieRecordPayload> extends
BaseCleanPlanActionExecutor<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> {
private static final Logger LOG = LogManager.getLogger(JavaScheduleCleanActionExecutor.class);
public JavaScheduleCleanActionExecutor(HoodieEngineContext context,
HoodieWriteConfig config,
HoodieTable<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> table,
String instantTime,
Option<Map<String, String>> extraMetadata) {
super(context, config, table, instantTime, extraMetadata);
}
@Override
protected Option<HoodieCleanerPlan> createCleanerPlan() {
return super.execute();
}
}

View File

@@ -25,18 +25,23 @@ import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.function.SerializableBiFunction; import org.apache.hudi.common.function.SerializableBiFunction;
import org.apache.hudi.common.function.SerializableConsumer; import org.apache.hudi.common.function.SerializableConsumer;
import org.apache.hudi.common.function.SerializableFunction; import org.apache.hudi.common.function.SerializableFunction;
import org.apache.hudi.common.function.SerializablePairFlatMapFunction;
import org.apache.hudi.common.function.SerializablePairFunction; import org.apache.hudi.common.function.SerializablePairFunction;
import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.ImmutablePair;
import org.apache.hudi.common.util.collection.Pair; import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.exception.HoodieException; import org.apache.hudi.exception.HoodieException;
import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.PairFlatMapFunction;
import org.apache.spark.sql.SQLContext; import org.apache.spark.sql.SQLContext;
import scala.Tuple2; import scala.Tuple2;
import java.util.Iterator;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Objects; import java.util.Objects;
import java.util.stream.Collectors;
import java.util.stream.Stream; import java.util.stream.Stream;
/** /**
@@ -82,6 +87,20 @@ public class HoodieSparkEngineContext extends HoodieEngineContext {
}).reduceByKey(reduceFunc::apply).map(Tuple2::_2).collect(); }).reduceByKey(reduceFunc::apply).map(Tuple2::_2).collect();
} }
@Override
public <I, K, V> Stream<ImmutablePair<K, V>> mapPartitionsToPairAndReduceByKey(
Stream<I> data, SerializablePairFlatMapFunction<Iterator<I>, K, V> flatMapToPairFunc,
SerializableBiFunction<V, V, V> reduceFunc, int parallelism) {
return javaSparkContext.parallelize(data.collect(Collectors.toList()), parallelism)
.mapPartitionsToPair((PairFlatMapFunction<Iterator<I>, K, V>) iterator ->
flatMapToPairFunc.call(iterator).collect(Collectors.toList()).stream()
.map(e -> new Tuple2<>(e.getKey(), e.getValue())).iterator()
)
.reduceByKey(reduceFunc::apply)
.map(e -> new ImmutablePair<>(e._1, e._2))
.collect().stream();
}
@Override @Override
public <I, K, V> List<V> reduceByKey( public <I, K, V> List<V> reduceByKey(
List<Pair<K, V>> data, SerializableBiFunction<V, V, V> reduceFunc, int parallelism) { List<Pair<K, V>> data, SerializableBiFunction<V, V, V> reduceFunc, int parallelism) {

View File

@@ -50,8 +50,8 @@ import org.apache.hudi.keygen.factory.HoodieSparkKeyGeneratorFactory;
import org.apache.hudi.table.action.HoodieWriteMetadata; import org.apache.hudi.table.action.HoodieWriteMetadata;
import org.apache.hudi.table.action.bootstrap.HoodieBootstrapWriteMetadata; import org.apache.hudi.table.action.bootstrap.HoodieBootstrapWriteMetadata;
import org.apache.hudi.table.action.bootstrap.SparkBootstrapCommitActionExecutor; import org.apache.hudi.table.action.bootstrap.SparkBootstrapCommitActionExecutor;
import org.apache.hudi.table.action.clean.SparkCleanActionExecutor; import org.apache.hudi.table.action.clean.CleanActionExecutor;
import org.apache.hudi.table.action.clean.SparkCleanPlanActionExecutor; import org.apache.hudi.table.action.clean.CleanPlanActionExecutor;
import org.apache.hudi.table.action.cluster.SparkClusteringPlanActionExecutor; import org.apache.hudi.table.action.cluster.SparkClusteringPlanActionExecutor;
import org.apache.hudi.table.action.cluster.SparkExecuteClusteringCommitActionExecutor; import org.apache.hudi.table.action.cluster.SparkExecuteClusteringCommitActionExecutor;
import org.apache.hudi.table.action.commit.SparkBulkInsertCommitActionExecutor; import org.apache.hudi.table.action.commit.SparkBulkInsertCommitActionExecutor;
@@ -65,7 +65,7 @@ import org.apache.hudi.table.action.commit.SparkInsertPreppedCommitActionExecuto
import org.apache.hudi.table.action.commit.SparkMergeHelper; import org.apache.hudi.table.action.commit.SparkMergeHelper;
import org.apache.hudi.table.action.commit.SparkUpsertCommitActionExecutor; import org.apache.hudi.table.action.commit.SparkUpsertCommitActionExecutor;
import org.apache.hudi.table.action.commit.SparkUpsertPreppedCommitActionExecutor; import org.apache.hudi.table.action.commit.SparkUpsertPreppedCommitActionExecutor;
import org.apache.hudi.table.action.restore.SparkCopyOnWriteRestoreActionExecutor; import org.apache.hudi.table.action.restore.CopyOnWriteRestoreActionExecutor;
import org.apache.hudi.table.action.rollback.BaseRollbackPlanActionExecutor; import org.apache.hudi.table.action.rollback.BaseRollbackPlanActionExecutor;
import org.apache.hudi.table.action.rollback.CopyOnWriteRollbackActionExecutor; import org.apache.hudi.table.action.rollback.CopyOnWriteRollbackActionExecutor;
import org.apache.hudi.table.action.savepoint.SavepointActionExecutor; import org.apache.hudi.table.action.savepoint.SavepointActionExecutor;
@@ -181,12 +181,12 @@ public class HoodieSparkCopyOnWriteTable<T extends HoodieRecordPayload> extends
@Override @Override
public void rollbackBootstrap(HoodieEngineContext context, String instantTime) { public void rollbackBootstrap(HoodieEngineContext context, String instantTime) {
new SparkCopyOnWriteRestoreActionExecutor((HoodieSparkEngineContext) context, config, this, instantTime, HoodieTimeline.INIT_INSTANT_TS).execute(); new CopyOnWriteRestoreActionExecutor(context, config, this, instantTime, HoodieTimeline.INIT_INSTANT_TS).execute();
} }
@Override @Override
public Option<HoodieCleanerPlan> scheduleCleaning(HoodieEngineContext context, String instantTime, Option<Map<String, String>> extraMetadata) { public Option<HoodieCleanerPlan> scheduleCleaning(HoodieEngineContext context, String instantTime, Option<Map<String, String>> extraMetadata) {
return new SparkCleanPlanActionExecutor<>(context, config,this, instantTime, extraMetadata).execute(); return new CleanPlanActionExecutor<>(context, config, this, instantTime, extraMetadata).execute();
} }
@Override @Override
@@ -251,7 +251,7 @@ public class HoodieSparkCopyOnWriteTable<T extends HoodieRecordPayload> extends
@Override @Override
public HoodieCleanMetadata clean(HoodieEngineContext context, String cleanInstantTime) { public HoodieCleanMetadata clean(HoodieEngineContext context, String cleanInstantTime) {
return new SparkCleanActionExecutor((HoodieSparkEngineContext)context, config, this, cleanInstantTime).execute(); return new CleanActionExecutor(context, config, this, cleanInstantTime).execute();
} }
@Override @Override
@@ -266,7 +266,7 @@ public class HoodieSparkCopyOnWriteTable<T extends HoodieRecordPayload> extends
@Override @Override
public HoodieRestoreMetadata restore(HoodieEngineContext context, String restoreInstantTime, String instantToRestore) { public HoodieRestoreMetadata restore(HoodieEngineContext context, String restoreInstantTime, String instantToRestore) {
return new SparkCopyOnWriteRestoreActionExecutor((HoodieSparkEngineContext) context, config, this, restoreInstantTime, instantToRestore).execute(); return new CopyOnWriteRestoreActionExecutor(context, config, this, restoreInstantTime, instantToRestore).execute();
} }
} }

View File

@@ -39,6 +39,7 @@ import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.table.action.HoodieWriteMetadata; import org.apache.hudi.table.action.HoodieWriteMetadata;
import org.apache.hudi.table.action.bootstrap.SparkBootstrapDeltaCommitActionExecutor; import org.apache.hudi.table.action.bootstrap.SparkBootstrapDeltaCommitActionExecutor;
import org.apache.hudi.table.action.bootstrap.HoodieBootstrapWriteMetadata; import org.apache.hudi.table.action.bootstrap.HoodieBootstrapWriteMetadata;
import org.apache.hudi.table.action.compact.BaseScheduleCompactionActionExecutor;
import org.apache.hudi.table.action.compact.SparkRunCompactionActionExecutor; import org.apache.hudi.table.action.compact.SparkRunCompactionActionExecutor;
import org.apache.hudi.table.action.compact.SparkScheduleCompactionActionExecutor; import org.apache.hudi.table.action.compact.SparkScheduleCompactionActionExecutor;
import org.apache.hudi.table.action.deltacommit.SparkBulkInsertDeltaCommitActionExecutor; import org.apache.hudi.table.action.deltacommit.SparkBulkInsertDeltaCommitActionExecutor;
@@ -48,8 +49,7 @@ import org.apache.hudi.table.action.deltacommit.SparkInsertDeltaCommitActionExec
import org.apache.hudi.table.action.deltacommit.SparkInsertPreppedDeltaCommitActionExecutor; import org.apache.hudi.table.action.deltacommit.SparkInsertPreppedDeltaCommitActionExecutor;
import org.apache.hudi.table.action.deltacommit.SparkUpsertDeltaCommitActionExecutor; import org.apache.hudi.table.action.deltacommit.SparkUpsertDeltaCommitActionExecutor;
import org.apache.hudi.table.action.deltacommit.SparkUpsertPreppedDeltaCommitActionExecutor; import org.apache.hudi.table.action.deltacommit.SparkUpsertPreppedDeltaCommitActionExecutor;
import org.apache.hudi.table.action.compact.BaseScheduleCompactionActionExecutor; import org.apache.hudi.table.action.restore.MergeOnReadRestoreActionExecutor;
import org.apache.hudi.table.action.restore.SparkMergeOnReadRestoreActionExecutor;
import org.apache.hudi.table.action.rollback.BaseRollbackPlanActionExecutor; import org.apache.hudi.table.action.rollback.BaseRollbackPlanActionExecutor;
import org.apache.hudi.table.action.rollback.MergeOnReadRollbackActionExecutor; import org.apache.hudi.table.action.rollback.MergeOnReadRollbackActionExecutor;
@@ -141,7 +141,7 @@ public class HoodieSparkMergeOnReadTable<T extends HoodieRecordPayload> extends
@Override @Override
public void rollbackBootstrap(HoodieEngineContext context, String instantTime) { public void rollbackBootstrap(HoodieEngineContext context, String instantTime) {
new SparkMergeOnReadRestoreActionExecutor((HoodieSparkEngineContext) context, config, this, instantTime, HoodieTimeline.INIT_INSTANT_TS).execute(); new MergeOnReadRestoreActionExecutor(context, config, this, instantTime, HoodieTimeline.INIT_INSTANT_TS).execute();
} }
@Override @Override
@@ -161,7 +161,7 @@ public class HoodieSparkMergeOnReadTable<T extends HoodieRecordPayload> extends
@Override @Override
public HoodieRestoreMetadata restore(HoodieEngineContext context, String restoreInstantTime, String instantToRestore) { public HoodieRestoreMetadata restore(HoodieEngineContext context, String restoreInstantTime, String instantToRestore) {
return new SparkMergeOnReadRestoreActionExecutor((HoodieSparkEngineContext) context, config, this, restoreInstantTime, instantToRestore).execute(); return new MergeOnReadRestoreActionExecutor(context, config, this, restoreInstantTime, instantToRestore).execute();
} }
@Override @Override

View File

@@ -1,134 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.table.action.clean;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hudi.avro.model.HoodieActionInstant;
import org.apache.hudi.avro.model.HoodieCleanerPlan;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.client.common.HoodieSparkEngineContext;
import org.apache.hudi.common.HoodieCleanStat;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.model.CleanFileInfo;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.table.HoodieTable;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.PairFlatMapFunction;
import scala.Tuple2;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
@SuppressWarnings("checkstyle:LineLength")
public class SparkCleanActionExecutor<T extends HoodieRecordPayload> extends
BaseCleanActionExecutor<T, JavaRDD<HoodieRecord<T>>, JavaRDD<HoodieKey>, JavaRDD<WriteStatus>> {
private static final Logger LOG = LogManager.getLogger(SparkCleanActionExecutor.class);
public SparkCleanActionExecutor(HoodieSparkEngineContext context,
HoodieWriteConfig config,
HoodieTable<T, JavaRDD<HoodieRecord<T>>, JavaRDD<HoodieKey>, JavaRDD<WriteStatus>> table,
String instantTime) {
super(context, config, table, instantTime);
}
private static PairFlatMapFunction<Iterator<Tuple2<String, CleanFileInfo>>, String, PartitionCleanStat>
deleteFilesFunc(HoodieTable table) {
return (PairFlatMapFunction<Iterator<Tuple2<String, CleanFileInfo>>, String, PartitionCleanStat>) iter -> {
Map<String, PartitionCleanStat> partitionCleanStatMap = new HashMap<>();
FileSystem fs = table.getMetaClient().getFs();
while (iter.hasNext()) {
Tuple2<String, CleanFileInfo> partitionDelFileTuple = iter.next();
String partitionPath = partitionDelFileTuple._1();
Path deletePath = new Path(partitionDelFileTuple._2().getFilePath());
String deletePathStr = deletePath.toString();
Boolean deletedFileResult = deleteFileAndGetResult(fs, deletePathStr);
if (!partitionCleanStatMap.containsKey(partitionPath)) {
partitionCleanStatMap.put(partitionPath, new PartitionCleanStat(partitionPath));
}
boolean isBootstrapBasePathFile = partitionDelFileTuple._2().isBootstrapBaseFile();
PartitionCleanStat partitionCleanStat = partitionCleanStatMap.get(partitionPath);
if (isBootstrapBasePathFile) {
// For Bootstrap Base file deletions, store the full file path.
partitionCleanStat.addDeleteFilePatterns(deletePath.toString(), true);
partitionCleanStat.addDeletedFileResult(deletePath.toString(), deletedFileResult, true);
} else {
partitionCleanStat.addDeleteFilePatterns(deletePath.getName(), false);
partitionCleanStat.addDeletedFileResult(deletePath.getName(), deletedFileResult, false);
}
}
return partitionCleanStatMap.entrySet().stream().map(e -> new Tuple2<>(e.getKey(), e.getValue()))
.collect(Collectors.toList()).iterator();
};
}
@Override
List<HoodieCleanStat> clean(HoodieEngineContext context, HoodieCleanerPlan cleanerPlan) {
JavaSparkContext jsc = HoodieSparkEngineContext.getSparkContext(context);
int cleanerParallelism = Math.min(
(int) (cleanerPlan.getFilePathsToBeDeletedPerPartition().values().stream().mapToInt(List::size).count()),
config.getCleanerParallelism());
LOG.info("Using cleanerParallelism: " + cleanerParallelism);
context.setJobStatus(this.getClass().getSimpleName(), "Perform cleaning of partitions");
List<Tuple2<String, PartitionCleanStat>> partitionCleanStats = jsc
.parallelize(cleanerPlan.getFilePathsToBeDeletedPerPartition().entrySet().stream()
.flatMap(x -> x.getValue().stream().map(y -> new Tuple2<>(x.getKey(),
new CleanFileInfo(y.getFilePath(), y.getIsBootstrapBaseFile()))))
.collect(Collectors.toList()), cleanerParallelism)
.mapPartitionsToPair(deleteFilesFunc(table))
.reduceByKey(PartitionCleanStat::merge).collect();
Map<String, PartitionCleanStat> partitionCleanStatsMap = partitionCleanStats.stream()
.collect(Collectors.toMap(Tuple2::_1, Tuple2::_2));
// Return PartitionCleanStat for each partition passed.
return cleanerPlan.getFilePathsToBeDeletedPerPartition().keySet().stream().map(partitionPath -> {
PartitionCleanStat partitionCleanStat = partitionCleanStatsMap.containsKey(partitionPath)
? partitionCleanStatsMap.get(partitionPath)
: new PartitionCleanStat(partitionPath);
HoodieActionInstant actionInstant = cleanerPlan.getEarliestInstantToRetain();
return HoodieCleanStat.newBuilder().withPolicy(config.getCleanerPolicy()).withPartitionPath(partitionPath)
.withEarliestCommitRetained(Option.ofNullable(
actionInstant != null
? new HoodieInstant(HoodieInstant.State.valueOf(actionInstant.getState()),
actionInstant.getAction(), actionInstant.getTimestamp())
: null))
.withDeletePathPattern(partitionCleanStat.deletePathPatterns())
.withSuccessfulDeletes(partitionCleanStat.successDeleteFiles())
.withFailedDeletes(partitionCleanStat.failedDeleteFiles())
.withDeleteBootstrapBasePathPatterns(partitionCleanStat.getDeleteBootstrapBasePathPatterns())
.withSuccessfulDeleteBootstrapBaseFiles(partitionCleanStat.getSuccessfulDeleteBootstrapBaseFiles())
.withFailedDeleteBootstrapBaseFiles(partitionCleanStat.getFailedDeleteBootstrapBaseFiles())
.build();
}).collect(Collectors.toList());
}
}

View File

@@ -1,55 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.table.action.clean;
import org.apache.hudi.avro.model.HoodieCleanerPlan;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.table.HoodieTable;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.spark.api.java.JavaRDD;
import java.util.Map;
@SuppressWarnings("checkstyle:LineLength")
public class SparkCleanPlanActionExecutor<T extends HoodieRecordPayload> extends
BaseCleanPlanActionExecutor<T, JavaRDD<HoodieRecord<T>>, JavaRDD<HoodieKey>, JavaRDD<WriteStatus>> {
private static final Logger LOG = LogManager.getLogger(SparkCleanPlanActionExecutor.class);
public SparkCleanPlanActionExecutor(HoodieEngineContext context,
HoodieWriteConfig config,
HoodieTable<T, JavaRDD<HoodieRecord<T>>, JavaRDD<HoodieKey>, JavaRDD<WriteStatus>> table,
String instantTime,
Option<Map<String, String>> extraMetadata) {
super(context, config, table, instantTime, extraMetadata);
}
@Override
protected Option<HoodieCleanerPlan> createCleanerPlan() {
return super.execute();
}
}

View File

@@ -1,70 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.table.action.restore;
import org.apache.hudi.avro.model.HoodieRollbackMetadata;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.client.common.HoodieSparkEngineContext;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieRollbackException;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.action.rollback.CopyOnWriteRollbackActionExecutor;
import org.apache.spark.api.java.JavaRDD;
@SuppressWarnings("checkstyle:LineLength")
public class SparkCopyOnWriteRestoreActionExecutor<T extends HoodieRecordPayload> extends
BaseRestoreActionExecutor<T, JavaRDD<HoodieRecord<T>>, JavaRDD<HoodieKey>, JavaRDD<WriteStatus>> {
public SparkCopyOnWriteRestoreActionExecutor(HoodieSparkEngineContext context,
HoodieWriteConfig config,
HoodieTable table,
String instantTime,
String restoreInstantTime) {
super(context, config, table, instantTime, restoreInstantTime);
}
@Override
protected HoodieRollbackMetadata rollbackInstant(HoodieInstant instantToRollback) {
if (!instantToRollback.getAction().equals(HoodieTimeline.COMMIT_ACTION)
&& !instantToRollback.getAction().equals(HoodieTimeline.REPLACE_COMMIT_ACTION)) {
throw new HoodieRollbackException("Unsupported action in rollback instant:" + instantToRollback);
}
table.getMetaClient().reloadActiveTimeline();
String instantTime = HoodieActiveTimeline.createNewInstantTime();
table.scheduleRollback(context, instantTime, instantToRollback, false);
table.getMetaClient().reloadActiveTimeline();
CopyOnWriteRollbackActionExecutor rollbackActionExecutor = new CopyOnWriteRollbackActionExecutor(
(HoodieSparkEngineContext) context,
config,
table,
instantTime,
instantToRollback,
true,
true,
false);
return rollbackActionExecutor.execute();
}
}

View File

@@ -22,10 +22,13 @@ import org.apache.hudi.common.config.SerializableConfiguration;
import org.apache.hudi.common.function.SerializableBiFunction; import org.apache.hudi.common.function.SerializableBiFunction;
import org.apache.hudi.common.function.SerializableConsumer; import org.apache.hudi.common.function.SerializableConsumer;
import org.apache.hudi.common.function.SerializableFunction; import org.apache.hudi.common.function.SerializableFunction;
import org.apache.hudi.common.function.SerializablePairFlatMapFunction;
import org.apache.hudi.common.function.SerializablePairFunction; import org.apache.hudi.common.function.SerializablePairFunction;
import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.ImmutablePair;
import org.apache.hudi.common.util.collection.Pair; import org.apache.hudi.common.util.collection.Pair;
import java.util.Iterator;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.stream.Stream; import java.util.stream.Stream;
@@ -61,6 +64,10 @@ public abstract class HoodieEngineContext {
public abstract <I, K, V> List<V> mapToPairAndReduceByKey( public abstract <I, K, V> List<V> mapToPairAndReduceByKey(
List<I> data, SerializablePairFunction<I, K, V> mapToPairFunc, SerializableBiFunction<V, V, V> reduceFunc, int parallelism); List<I> data, SerializablePairFunction<I, K, V> mapToPairFunc, SerializableBiFunction<V, V, V> reduceFunc, int parallelism);
public abstract <I, K, V> Stream<ImmutablePair<K, V>> mapPartitionsToPairAndReduceByKey(
Stream<I> data, SerializablePairFlatMapFunction<Iterator<I>, K, V> flatMapToPairFunc,
SerializableBiFunction<V, V, V> reduceFunc, int parallelism);
public abstract <I, K, V> List<V> reduceByKey( public abstract <I, K, V> List<V> reduceByKey(
List<Pair<K, V>> data, SerializableBiFunction<V, V, V> reduceFunc, int parallelism); List<Pair<K, V>> data, SerializableBiFunction<V, V, V> reduceFunc, int parallelism);

View File

@@ -19,15 +19,19 @@
package org.apache.hudi.common.engine; package org.apache.hudi.common.engine;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hudi.common.config.SerializableConfiguration; import org.apache.hudi.common.config.SerializableConfiguration;
import org.apache.hudi.common.function.SerializableBiFunction; import org.apache.hudi.common.function.SerializableBiFunction;
import org.apache.hudi.common.function.SerializableConsumer; import org.apache.hudi.common.function.SerializableConsumer;
import org.apache.hudi.common.function.SerializableFunction; import org.apache.hudi.common.function.SerializableFunction;
import org.apache.hudi.common.function.SerializablePairFlatMapFunction;
import org.apache.hudi.common.function.SerializablePairFunction; import org.apache.hudi.common.function.SerializablePairFunction;
import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.ImmutablePair;
import org.apache.hudi.common.util.collection.Pair; import org.apache.hudi.common.util.collection.Pair;
import java.util.Iterator;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Objects; import java.util.Objects;
@@ -35,6 +39,7 @@ import java.util.stream.Collectors;
import java.util.stream.Stream; import java.util.stream.Stream;
import static java.util.stream.Collectors.toList; import static java.util.stream.Collectors.toList;
import static org.apache.hudi.common.function.FunctionWrapper.throwingFlatMapToPairWrapper;
import static org.apache.hudi.common.function.FunctionWrapper.throwingFlatMapWrapper; import static org.apache.hudi.common.function.FunctionWrapper.throwingFlatMapWrapper;
import static org.apache.hudi.common.function.FunctionWrapper.throwingForeachWrapper; import static org.apache.hudi.common.function.FunctionWrapper.throwingForeachWrapper;
import static org.apache.hudi.common.function.FunctionWrapper.throwingMapToPairWrapper; import static org.apache.hudi.common.function.FunctionWrapper.throwingMapToPairWrapper;
@@ -68,6 +73,17 @@ public final class HoodieLocalEngineContext extends HoodieEngineContext {
.collect(Collectors.toList()); .collect(Collectors.toList());
} }
@Override
public <I, K, V> Stream<ImmutablePair<K, V>> mapPartitionsToPairAndReduceByKey(
Stream<I> data, SerializablePairFlatMapFunction<Iterator<I>, K, V> flatMapToPairFunc,
SerializableBiFunction<V, V, V> reduceFunc, int parallelism) {
return throwingFlatMapToPairWrapper(flatMapToPairFunc).apply(data.parallel().iterator())
.collect(Collectors.groupingBy(Pair::getKey)).entrySet().stream()
.map(entry -> new ImmutablePair<>(entry.getKey(), entry.getValue().stream().map(
Pair::getValue).reduce(throwingReduceWrapper(reduceFunc)).orElse(null)))
.filter(Objects::nonNull);
}
@Override @Override
public <I, K, V> List<V> reduceByKey( public <I, K, V> List<V> reduceByKey(
List<Pair<K, V>> data, SerializableBiFunction<V, V, V> reduceFunc, int parallelism) { List<Pair<K, V>> data, SerializableBiFunction<V, V, V> reduceFunc, int parallelism) {

View File

@@ -72,6 +72,17 @@ public class FunctionWrapper {
}; };
} }
public static <I, K, V> Function<I, Stream<Pair<K, V>>> throwingFlatMapToPairWrapper(
SerializablePairFlatMapFunction<I, K, V> throwingPairFlatMapFunction) {
return v1 -> {
try {
return throwingPairFlatMapFunction.call(v1);
} catch (Exception e) {
throw new HoodieException("Error occurs when executing mapToPair", e);
}
};
}
public static <V> BinaryOperator<V> throwingReduceWrapper(SerializableBiFunction<V, V, V> throwingReduceFunction) { public static <V> BinaryOperator<V> throwingReduceWrapper(SerializableBiFunction<V, V, V> throwingReduceFunction) {
return (v1, v2) -> { return (v1, v2) -> {
try { try {

View File

@@ -0,0 +1,33 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.hudi.common.function;
import org.apache.hudi.common.util.collection.Pair;
import java.io.Serializable;
import java.util.stream.Stream;
/**
* A function that returns a stream of key-value pairs (Pair&lt;K, V&gt;).
*/
@FunctionalInterface
public interface SerializablePairFlatMapFunction<I, K, V> extends Serializable {
Stream<Pair<K, V>> call(I t) throws Exception;
}