[HUDI-2247] Filter file where length less than parquet MAGIC length (#3363)
Co-authored-by: 喻兆靖 <yuzhaojing@bilibili.com>
This commit is contained in:
@@ -57,6 +57,7 @@ import java.util.concurrent.TimeUnit;
|
||||
import java.util.regex.Pattern;
|
||||
|
||||
import static java.util.stream.Collectors.toList;
|
||||
import static org.apache.hudi.util.StreamerUtil.isValidFile;
|
||||
|
||||
/**
|
||||
* The function to load index from existing hoodieTable.
|
||||
@@ -179,7 +180,7 @@ public class BootstrapFunction<I, O extends HoodieRecord>
|
||||
// load parquet records
|
||||
fileSlice.getBaseFile().ifPresent(baseFile -> {
|
||||
// filter out crushed files
|
||||
if (baseFile.getFileSize() <= 0) {
|
||||
if (!isValidFile(baseFile.getFileStatus())) {
|
||||
return;
|
||||
}
|
||||
|
||||
@@ -199,7 +200,7 @@ public class BootstrapFunction<I, O extends HoodieRecord>
|
||||
// load avro log records
|
||||
List<String> logPaths = fileSlice.getLogFiles()
|
||||
// filter out crushed files
|
||||
.filter(logFile -> logFile.getFileSize() > 0)
|
||||
.filter(logFile -> isValidFile(logFile.getFileStatus()))
|
||||
.map(logFile -> logFile.getPath().toString())
|
||||
.collect(toList());
|
||||
HoodieMergedLogRecordScanner scanner = FormatUtils.scanLog(logPaths, schema, latestCommitTime.get().getTimestamp(),
|
||||
|
||||
@@ -26,6 +26,7 @@ import org.apache.hudi.common.table.timeline.HoodieTimeline;
|
||||
import org.apache.hudi.common.util.Option;
|
||||
import org.apache.hudi.config.HoodieWriteConfig;
|
||||
import org.apache.hudi.exception.HoodieException;
|
||||
import org.apache.hudi.util.StreamerUtil;
|
||||
|
||||
import org.apache.flink.core.fs.Path;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
@@ -131,7 +132,7 @@ public class WriteProfiles {
|
||||
})
|
||||
// filter out crushed files
|
||||
.filter(Objects::nonNull)
|
||||
.filter(fileStatus -> fileStatus.getLen() > 0)
|
||||
.filter(StreamerUtil::isValidFile)
|
||||
.collect(Collectors.toList());
|
||||
}
|
||||
|
||||
|
||||
@@ -29,6 +29,7 @@ import org.apache.hudi.common.engine.EngineType;
|
||||
import org.apache.hudi.common.fs.FSUtils;
|
||||
import org.apache.hudi.common.model.WriteOperationType;
|
||||
import org.apache.hudi.common.table.HoodieTableMetaClient;
|
||||
import org.apache.hudi.common.table.log.HoodieLogFormat;
|
||||
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
|
||||
import org.apache.hudi.common.util.Option;
|
||||
import org.apache.hudi.common.util.ReflectionUtils;
|
||||
@@ -51,9 +52,12 @@ import org.apache.flink.api.common.functions.RuntimeContext;
|
||||
import org.apache.flink.configuration.ConfigOption;
|
||||
import org.apache.flink.configuration.Configuration;
|
||||
import org.apache.flink.util.Preconditions;
|
||||
import org.apache.hadoop.fs.FileStatus;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.kafka.clients.consumer.ConsumerConfig;
|
||||
import org.apache.orc.OrcFile;
|
||||
import org.apache.parquet.hadoop.ParquetFileWriter;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
@@ -68,6 +72,9 @@ import java.util.List;
|
||||
import java.util.Locale;
|
||||
import java.util.Properties;
|
||||
|
||||
import static org.apache.hudi.common.model.HoodieFileFormat.HOODIE_LOG;
|
||||
import static org.apache.hudi.common.model.HoodieFileFormat.ORC;
|
||||
import static org.apache.hudi.common.model.HoodieFileFormat.PARQUET;
|
||||
import static org.apache.hudi.common.table.HoodieTableConfig.HOODIE_ARCHIVELOG_FOLDER_PROP;
|
||||
|
||||
/**
|
||||
@@ -348,6 +355,23 @@ public class StreamerUtil {
|
||||
}
|
||||
}
|
||||
|
||||
public static boolean isValidFile(FileStatus fileStatus) {
|
||||
final String extension = FSUtils.getFileExtension(fileStatus.getPath().toString());
|
||||
if (PARQUET.getFileExtension().equals(extension)) {
|
||||
return fileStatus.getLen() > ParquetFileWriter.MAGIC.length;
|
||||
}
|
||||
|
||||
if (ORC.getFileExtension().equals(extension)) {
|
||||
return fileStatus.getLen() > OrcFile.MAGIC.length();
|
||||
}
|
||||
|
||||
if (HOODIE_LOG.getFileExtension().equals(extension)) {
|
||||
return fileStatus.getLen() > HoodieLogFormat.MAGIC.length;
|
||||
}
|
||||
|
||||
return fileStatus.getLen() > 0;
|
||||
}
|
||||
|
||||
public static boolean allowDuplicateInserts(Configuration conf) {
|
||||
WriteOperationType operationType = WriteOperationType.fromValue(conf.getString(FlinkOptions.OPERATION));
|
||||
return operationType == WriteOperationType.INSERT && !conf.getBoolean(FlinkOptions.INSERT_DEDUP);
|
||||
|
||||
Reference in New Issue
Block a user