diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/common/HoodieFlinkEngineContext.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/common/HoodieFlinkEngineContext.java index 7713c1e63..0bd0d432c 100644 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/common/HoodieFlinkEngineContext.java +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/common/HoodieFlinkEngineContext.java @@ -79,7 +79,7 @@ public class HoodieFlinkEngineContext extends HoodieEngineContext { @Override public Map mapToPair(List data, SerializablePairFunction func, Integer parallelism) { - return data.stream().map(throwingMapToPairWrapper(func)).collect(Collectors.toMap(Pair::getLeft, Pair::getRight)); + return data.stream().parallel().map(throwingMapToPairWrapper(func)).collect(Collectors.toMap(Pair::getLeft, Pair::getRight)); } @Override diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkCreateHandle.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkCreateHandle.java index 3ff579fed..88af0f806 100644 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkCreateHandle.java +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkCreateHandle.java @@ -27,6 +27,7 @@ import org.apache.hudi.common.util.Option; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.exception.HoodieException; import org.apache.hudi.table.HoodieTable; +import org.apache.hudi.table.MarkerFiles; import org.apache.avro.Schema; import org.apache.hadoop.fs.Path; @@ -99,6 +100,12 @@ public class FlinkCreateHandle } } + @Override + protected void createMarkerFile(String partitionPath, String dataFileName) { + MarkerFiles markerFiles = new MarkerFiles(hoodieTable, instantTime); + markerFiles.createIfNotExists(partitionPath, dataFileName, getIOType()); + } + @Override public Path makeNewPath(String partitionPath) { Path path = super.makeNewPath(partitionPath); diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkMergeAndReplaceHandle.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkMergeAndReplaceHandle.java index f414450ef..972e7ca85 100644 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkMergeAndReplaceHandle.java +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkMergeAndReplaceHandle.java @@ -28,6 +28,7 @@ import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.exception.HoodieException; import org.apache.hudi.exception.HoodieIOException; import org.apache.hudi.table.HoodieTable; +import org.apache.hudi.table.MarkerFiles; import org.apache.hadoop.fs.Path; import org.apache.log4j.LogManager; @@ -102,7 +103,8 @@ public class FlinkMergeAndReplaceHandle @Override protected void createMarkerFile(String partitionPath, String dataFileName) { - // no need to create any marker file for intermediate file. + MarkerFiles markerFiles = new MarkerFiles(hoodieTable, instantTime); + markerFiles.createIfNotExists(partitionPath, dataFileName, getIOType()); } @Override diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkMergeHandle.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkMergeHandle.java index b896bfebb..faea1b957 100644 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkMergeHandle.java +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkMergeHandle.java @@ -27,6 +27,7 @@ import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.exception.HoodieException; import org.apache.hudi.exception.HoodieIOException; import org.apache.hudi.table.HoodieTable; +import org.apache.hudi.table.MarkerFiles; import org.apache.hadoop.fs.Path; import org.apache.log4j.LogManager; @@ -114,9 +115,8 @@ public class FlinkMergeHandle @Override protected void createMarkerFile(String partitionPath, String dataFileName) { - // no need to create marker file for flink merge handle, - // the flink write handle does not rely on MARKER files for - // corrupt files cleaning, see HoodieFlinkCopyOnWriteTable#getInvalidDataPaths for details. + MarkerFiles markerFiles = new MarkerFiles(hoodieTable, instantTime); + markerFiles.createIfNotExists(partitionPath, dataFileName, getIOType()); } @Override diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkCopyOnWriteTable.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkCopyOnWriteTable.java index 7fc3afc13..d8bdb9fba 100644 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkCopyOnWriteTable.java +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkCopyOnWriteTable.java @@ -54,7 +54,6 @@ import org.apache.hudi.table.action.commit.FlinkMergeHelper; import org.apache.hudi.table.action.commit.FlinkUpsertCommitActionExecutor; import org.apache.hudi.table.action.commit.FlinkUpsertPreppedCommitActionExecutor; import org.apache.hudi.table.action.rollback.FlinkCopyOnWriteRollbackActionExecutor; -import org.apache.hudi.util.FlinkClientUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -64,10 +63,6 @@ import java.util.Collections; import java.util.Iterator; import java.util.List; import java.util.Map; -import java.util.Set; -import java.util.stream.Collectors; - -import static org.apache.hudi.common.model.HoodieFileFormat.PARQUET; /** * Implementation of a very heavily read-optimized Hoodie Table where, all data is stored in base files, with @@ -323,16 +318,6 @@ public class HoodieFlinkCopyOnWriteTable extends throw new HoodieNotSupportedException("Savepoint and restore is not supported yet"); } - @Override - protected Set getInvalidDataPaths(MarkerFiles markers) throws IOException { - // keep only the intermediate file generated by FlinkMergeAndReplaceHandle. - return super.getInvalidDataPaths(markers).stream() - .filter(path -> { - final String fileName = FlinkClientUtil.parseFileName(path); - return fileName.startsWith(".") && fileName.endsWith(PARQUET.getFileExtension()); - }).collect(Collectors.toSet()); - } - // ------------------------------------------------------------------------- // Used for compaction // ------------------------------------------------------------------------- diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/clean/FlinkCleanActionExecutor.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/clean/FlinkCleanActionExecutor.java index 6ed38a952..9378cb230 100644 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/clean/FlinkCleanActionExecutor.java +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/clean/FlinkCleanActionExecutor.java @@ -40,7 +40,6 @@ import org.apache.log4j.Logger; import java.io.IOException; import java.util.HashMap; -import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.stream.Collectors; @@ -62,9 +61,8 @@ public class FlinkCleanActionExecutor extends @Override List clean(HoodieEngineContext context, HoodieCleanerPlan cleanerPlan) { - - Iterator> filesToBeDeletedPerPartition = cleanerPlan.getFilePathsToBeDeletedPerPartition().entrySet().stream() - .flatMap(x -> x.getValue().stream().map(y -> new Tuple2<>(x.getKey(), new CleanFileInfo(y.getFilePath(), y.getIsBootstrapBaseFile())))).iterator(); + Stream> filesToBeDeletedPerPartition = cleanerPlan.getFilePathsToBeDeletedPerPartition().entrySet().stream() + .flatMap(x -> x.getValue().stream().map(y -> new Tuple2<>(x.getKey(), new CleanFileInfo(y.getFilePath(), y.getIsBootstrapBaseFile())))); Stream> partitionCleanStats = deleteFilesFunc(filesToBeDeletedPerPartition, table) @@ -97,12 +95,11 @@ public class FlinkCleanActionExecutor extends }).collect(Collectors.toList()); } - private static Stream> deleteFilesFunc(Iterator> iter, HoodieTable table) { + private static Stream> deleteFilesFunc(Stream> cleanFileInfo, HoodieTable table) { Map partitionCleanStatMap = new HashMap<>(); FileSystem fs = table.getMetaClient().getFs(); - while (iter.hasNext()) { - Tuple2 partitionDelFileTuple = iter.next(); + cleanFileInfo.parallel().forEach(partitionDelFileTuple -> { String partitionPath = partitionDelFileTuple._1(); Path deletePath = new Path(partitionDelFileTuple._2().getFilePath()); String deletePathStr = deletePath.toString(); @@ -112,11 +109,11 @@ public class FlinkCleanActionExecutor extends } catch (IOException e) { LOG.error("Delete file failed"); } - if (!partitionCleanStatMap.containsKey(partitionPath)) { - partitionCleanStatMap.put(partitionPath, new PartitionCleanStat(partitionPath)); + final PartitionCleanStat partitionCleanStat; + synchronized (partitionCleanStatMap) { + partitionCleanStat = partitionCleanStatMap.computeIfAbsent(partitionPath, k -> new PartitionCleanStat(partitionPath)); } boolean isBootstrapBasePathFile = partitionDelFileTuple._2().isBootstrapBaseFile(); - PartitionCleanStat partitionCleanStat = partitionCleanStatMap.get(partitionPath); if (isBootstrapBasePathFile) { // For Bootstrap Base file deletions, store the full file path. partitionCleanStat.addDeleteFilePatterns(deletePath.toString(), true); @@ -125,7 +122,7 @@ public class FlinkCleanActionExecutor extends partitionCleanStat.addDeleteFilePatterns(deletePath.getName(), false); partitionCleanStat.addDeletedFileResult(deletePath.getName(), deletedFileResult, false); } - } + }); return partitionCleanStatMap.entrySet().stream().map(e -> Pair.of(e.getKey(), e.getValue())); } }