diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java index fd5321a97..512518c10 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java @@ -479,6 +479,13 @@ public abstract class HoodieTable implem }, config.getFinalizeWriteParallelism()); } + /** + * Returns the possible invalid data file name with given marker files. + */ + protected Set getInvalidDataPaths(MarkerFiles markers) throws IOException { + return markers.createdAndMergedDataPaths(context, config.getFinalizeWriteParallelism()); + } + /** * Reconciles WriteStats and marker files to detect and safely delete duplicate data files created because of Spark * retries. @@ -505,7 +512,7 @@ public abstract class HoodieTable implem } // we are not including log appends here, since they are already fail-safe. - Set invalidDataPaths = markers.createdAndMergedDataPaths(context, config.getFinalizeWriteParallelism()); + Set invalidDataPaths = getInvalidDataPaths(markers); Set validDataPaths = stats.stream() .map(HoodieWriteStat::getPath) .filter(p -> p.endsWith(this.getBaseFileExtension())) diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkMergeAndReplaceHandle.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkMergeAndReplaceHandle.java index 44d630ac8..c87f3dd79 100644 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkMergeAndReplaceHandle.java +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkMergeAndReplaceHandle.java @@ -128,8 +128,9 @@ public class FlinkMergeAndReplaceHandle */ protected String newFileNameWithRollover(int rollNumber) { // make the intermediate file as hidden + final String fileID = "." + this.fileId; return FSUtils.makeDataFileName(instantTime, writeToken + "-" + rollNumber, - this.fileId, hoodieTable.getBaseFileExtension()); + fileID, hoodieTable.getBaseFileExtension()); } @Override diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkCopyOnWriteTable.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkCopyOnWriteTable.java index cb8303e5f..7fc3afc13 100644 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkCopyOnWriteTable.java +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkCopyOnWriteTable.java @@ -31,12 +31,10 @@ import org.apache.hudi.common.model.HoodieBaseFile; import org.apache.hudi.common.model.HoodieKey; import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieRecordPayload; -import org.apache.hudi.common.model.HoodieWriteStat; import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.util.Option; import org.apache.hudi.config.HoodieWriteConfig; -import org.apache.hudi.exception.HoodieIOException; import org.apache.hudi.exception.HoodieNotSupportedException; import org.apache.hudi.exception.HoodieUpsertException; import org.apache.hudi.io.HoodieCreateHandle; @@ -56,6 +54,7 @@ import org.apache.hudi.table.action.commit.FlinkMergeHelper; import org.apache.hudi.table.action.commit.FlinkUpsertCommitActionExecutor; import org.apache.hudi.table.action.commit.FlinkUpsertPreppedCommitActionExecutor; import org.apache.hudi.table.action.rollback.FlinkCopyOnWriteRollbackActionExecutor; +import org.apache.hudi.util.FlinkClientUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -65,6 +64,10 @@ import java.util.Collections; import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; + +import static org.apache.hudi.common.model.HoodieFileFormat.PARQUET; /** * Implementation of a very heavily read-optimized Hoodie Table where, all data is stored in base files, with @@ -321,9 +324,13 @@ public class HoodieFlinkCopyOnWriteTable extends } @Override - public void finalizeWrite(HoodieEngineContext context, String instantTs, List stats) throws HoodieIOException { - // do nothing because flink create and merge handles can clean the - // retry files by themselves. + protected Set getInvalidDataPaths(MarkerFiles markers) throws IOException { + // keep only the intermediate file generated by FlinkMergeAndReplaceHandle. + return super.getInvalidDataPaths(markers).stream() + .filter(path -> { + final String fileName = FlinkClientUtil.parseFileName(path); + return fileName.startsWith(".") && fileName.endsWith(PARQUET.getFileExtension()); + }).collect(Collectors.toSet()); } // ------------------------------------------------------------------------- diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/util/FlinkClientUtil.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/util/FlinkClientUtil.java index c38c1f16d..4112e2b52 100644 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/util/FlinkClientUtil.java +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/util/FlinkClientUtil.java @@ -29,6 +29,14 @@ import java.io.File; */ public class FlinkClientUtil { + /** + * Parses the file name from path. + */ + public static String parseFileName(String path) { + int slash = path.lastIndexOf(Path.SEPARATOR); + return path.substring(slash + 1); + } + /** * Returns the hadoop configuration with possible hadoop conf paths. * E.G. the configurations under path $HADOOP_CONF_DIR and $HADOOP_HOME. diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketAssigner.java b/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketAssigner.java index d89ad8319..fab21d94c 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketAssigner.java +++ b/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketAssigner.java @@ -241,7 +241,8 @@ public class BucketAssigner { .getLatestBaseFilesBeforeOrOn(partitionPath, latestCommitTime.getTimestamp()).collect(Collectors.toList()); for (HoodieBaseFile file : allFiles) { - if (file.getFileSize() < config.getParquetSmallFileLimit()) { + // filter out the corrupted files. + if (file.getFileSize() < config.getParquetSmallFileLimit() && file.getFileSize() > 0) { String filename = file.getFileName(); SmallFile sf = new SmallFile(); sf.location = new HoodieRecordLocation(FSUtils.getCommitTime(filename), FSUtils.getFileId(filename));