diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/bootstrap/BootstrapFunction.java b/hudi-flink/src/main/java/org/apache/hudi/sink/bootstrap/BootstrapFunction.java index 0a4ef3000..bcf6c3a00 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/sink/bootstrap/BootstrapFunction.java +++ b/hudi-flink/src/main/java/org/apache/hudi/sink/bootstrap/BootstrapFunction.java @@ -57,6 +57,7 @@ import java.util.concurrent.TimeUnit; import java.util.regex.Pattern; import static java.util.stream.Collectors.toList; +import static org.apache.hudi.util.StreamerUtil.isValidFile; /** * The function to load index from existing hoodieTable. @@ -179,7 +180,7 @@ public class BootstrapFunction // load parquet records fileSlice.getBaseFile().ifPresent(baseFile -> { // filter out crushed files - if (baseFile.getFileSize() <= 0) { + if (!isValidFile(baseFile.getFileStatus())) { return; } @@ -199,7 +200,7 @@ public class BootstrapFunction // load avro log records List logPaths = fileSlice.getLogFiles() // filter out crushed files - .filter(logFile -> logFile.getFileSize() > 0) + .filter(logFile -> isValidFile(logFile.getFileStatus())) .map(logFile -> logFile.getPath().toString()) .collect(toList()); HoodieMergedLogRecordScanner scanner = FormatUtils.scanLog(logPaths, schema, latestCommitTime.get().getTimestamp(), diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/profile/WriteProfiles.java b/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/profile/WriteProfiles.java index 1277b190d..1fcf3f11c 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/profile/WriteProfiles.java +++ b/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/profile/WriteProfiles.java @@ -26,6 +26,7 @@ import org.apache.hudi.common.table.timeline.HoodieTimeline; import org.apache.hudi.common.util.Option; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.exception.HoodieException; +import org.apache.hudi.util.StreamerUtil; import org.apache.flink.core.fs.Path; import org.apache.hadoop.conf.Configuration; @@ -131,7 +132,7 @@ public class WriteProfiles { }) // filter out crushed files .filter(Objects::nonNull) - .filter(fileStatus -> fileStatus.getLen() > 0) + .filter(StreamerUtil::isValidFile) .collect(Collectors.toList()); } diff --git a/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java b/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java index 3d53a07b4..86de90e5b 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java +++ b/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java @@ -29,6 +29,7 @@ import org.apache.hudi.common.engine.EngineType; import org.apache.hudi.common.fs.FSUtils; import org.apache.hudi.common.model.WriteOperationType; import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.common.table.log.HoodieLogFormat; import org.apache.hudi.common.table.timeline.HoodieActiveTimeline; import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.ReflectionUtils; @@ -51,9 +52,12 @@ import org.apache.flink.api.common.functions.RuntimeContext; import org.apache.flink.configuration.ConfigOption; import org.apache.flink.configuration.Configuration; import org.apache.flink.util.Preconditions; +import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.orc.OrcFile; +import org.apache.parquet.hadoop.ParquetFileWriter; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -68,6 +72,9 @@ import java.util.List; import java.util.Locale; import java.util.Properties; +import static org.apache.hudi.common.model.HoodieFileFormat.HOODIE_LOG; +import static org.apache.hudi.common.model.HoodieFileFormat.ORC; +import static org.apache.hudi.common.model.HoodieFileFormat.PARQUET; import static org.apache.hudi.common.table.HoodieTableConfig.HOODIE_ARCHIVELOG_FOLDER_PROP; /** @@ -348,6 +355,23 @@ public class StreamerUtil { } } + public static boolean isValidFile(FileStatus fileStatus) { + final String extension = FSUtils.getFileExtension(fileStatus.getPath().toString()); + if (PARQUET.getFileExtension().equals(extension)) { + return fileStatus.getLen() > ParquetFileWriter.MAGIC.length; + } + + if (ORC.getFileExtension().equals(extension)) { + return fileStatus.getLen() > OrcFile.MAGIC.length(); + } + + if (HOODIE_LOG.getFileExtension().equals(extension)) { + return fileStatus.getLen() > HoodieLogFormat.MAGIC.length; + } + + return fileStatus.getLen() > 0; + } + public static boolean allowDuplicateInserts(Configuration conf) { WriteOperationType operationType = WriteOperationType.fromValue(conf.getString(FlinkOptions.OPERATION)); return operationType == WriteOperationType.INSERT && !conf.getBoolean(FlinkOptions.INSERT_DEDUP);