From 8869b3b4184bbec4502e2e3f6fde0ea9260cf0b0 Mon Sep 17 00:00:00 2001 From: Danny Chan Date: Fri, 14 May 2021 15:43:37 +0800 Subject: [PATCH] [HUDI-1902] Clean the corrupted files generated by FlinkMergeAndReplaceHandle (#2949) Make the intermediate files of FlinkMergeAndReplaceHandle hidden, when committing the instant, clean these files in case there was some corrupted files left(in normal case, the intermediate files should be cleaned by the FlinkMergeAndReplaceHandle itself). --- .../java/org/apache/hudi/table/HoodieTable.java | 9 ++++++++- .../hudi/io/FlinkMergeAndReplaceHandle.java | 3 ++- .../hudi/table/HoodieFlinkCopyOnWriteTable.java | 17 ++++++++++++----- .../org/apache/hudi/util/FlinkClientUtil.java | 8 ++++++++ .../hudi/sink/partitioner/BucketAssigner.java | 3 ++- 5 files changed, 32 insertions(+), 8 deletions(-) diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java index fd5321a97..512518c10 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java @@ -479,6 +479,13 @@ public abstract class HoodieTable implem }, config.getFinalizeWriteParallelism()); } + /** + * Returns the possible invalid data file name with given marker files. + */ + protected Set getInvalidDataPaths(MarkerFiles markers) throws IOException { + return markers.createdAndMergedDataPaths(context, config.getFinalizeWriteParallelism()); + } + /** * Reconciles WriteStats and marker files to detect and safely delete duplicate data files created because of Spark * retries. @@ -505,7 +512,7 @@ public abstract class HoodieTable implem } // we are not including log appends here, since they are already fail-safe. - Set invalidDataPaths = markers.createdAndMergedDataPaths(context, config.getFinalizeWriteParallelism()); + Set invalidDataPaths = getInvalidDataPaths(markers); Set validDataPaths = stats.stream() .map(HoodieWriteStat::getPath) .filter(p -> p.endsWith(this.getBaseFileExtension())) diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkMergeAndReplaceHandle.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkMergeAndReplaceHandle.java index 44d630ac8..c87f3dd79 100644 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkMergeAndReplaceHandle.java +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkMergeAndReplaceHandle.java @@ -128,8 +128,9 @@ public class FlinkMergeAndReplaceHandle */ protected String newFileNameWithRollover(int rollNumber) { // make the intermediate file as hidden + final String fileID = "." + this.fileId; return FSUtils.makeDataFileName(instantTime, writeToken + "-" + rollNumber, - this.fileId, hoodieTable.getBaseFileExtension()); + fileID, hoodieTable.getBaseFileExtension()); } @Override diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkCopyOnWriteTable.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkCopyOnWriteTable.java index cb8303e5f..7fc3afc13 100644 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkCopyOnWriteTable.java +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkCopyOnWriteTable.java @@ -31,12 +31,10 @@ import org.apache.hudi.common.model.HoodieBaseFile; import org.apache.hudi.common.model.HoodieKey; import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieRecordPayload; -import org.apache.hudi.common.model.HoodieWriteStat; import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.util.Option; import org.apache.hudi.config.HoodieWriteConfig; -import org.apache.hudi.exception.HoodieIOException; import org.apache.hudi.exception.HoodieNotSupportedException; import org.apache.hudi.exception.HoodieUpsertException; import org.apache.hudi.io.HoodieCreateHandle; @@ -56,6 +54,7 @@ import org.apache.hudi.table.action.commit.FlinkMergeHelper; import org.apache.hudi.table.action.commit.FlinkUpsertCommitActionExecutor; import org.apache.hudi.table.action.commit.FlinkUpsertPreppedCommitActionExecutor; import org.apache.hudi.table.action.rollback.FlinkCopyOnWriteRollbackActionExecutor; +import org.apache.hudi.util.FlinkClientUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -65,6 +64,10 @@ import java.util.Collections; import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; + +import static org.apache.hudi.common.model.HoodieFileFormat.PARQUET; /** * Implementation of a very heavily read-optimized Hoodie Table where, all data is stored in base files, with @@ -321,9 +324,13 @@ public class HoodieFlinkCopyOnWriteTable extends } @Override - public void finalizeWrite(HoodieEngineContext context, String instantTs, List stats) throws HoodieIOException { - // do nothing because flink create and merge handles can clean the - // retry files by themselves. + protected Set getInvalidDataPaths(MarkerFiles markers) throws IOException { + // keep only the intermediate file generated by FlinkMergeAndReplaceHandle. + return super.getInvalidDataPaths(markers).stream() + .filter(path -> { + final String fileName = FlinkClientUtil.parseFileName(path); + return fileName.startsWith(".") && fileName.endsWith(PARQUET.getFileExtension()); + }).collect(Collectors.toSet()); } // ------------------------------------------------------------------------- diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/util/FlinkClientUtil.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/util/FlinkClientUtil.java index c38c1f16d..4112e2b52 100644 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/util/FlinkClientUtil.java +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/util/FlinkClientUtil.java @@ -29,6 +29,14 @@ import java.io.File; */ public class FlinkClientUtil { + /** + * Parses the file name from path. + */ + public static String parseFileName(String path) { + int slash = path.lastIndexOf(Path.SEPARATOR); + return path.substring(slash + 1); + } + /** * Returns the hadoop configuration with possible hadoop conf paths. * E.G. the configurations under path $HADOOP_CONF_DIR and $HADOOP_HOME. diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketAssigner.java b/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketAssigner.java index d89ad8319..fab21d94c 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketAssigner.java +++ b/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketAssigner.java @@ -241,7 +241,8 @@ public class BucketAssigner { .getLatestBaseFilesBeforeOrOn(partitionPath, latestCommitTime.getTimestamp()).collect(Collectors.toList()); for (HoodieBaseFile file : allFiles) { - if (file.getFileSize() < config.getParquetSmallFileLimit()) { + // filter out the corrupted files. + if (file.getFileSize() < config.getParquetSmallFileLimit() && file.getFileSize() > 0) { String filename = file.getFileName(); SmallFile sf = new SmallFile(); sf.location = new HoodieRecordLocation(FSUtils.getCommitTime(filename), FSUtils.getFileId(filename));