[HUDI-3847] Fix NPE due to null schema in HoodieMetadataTableValidator (#5284)
This commit is contained in:
@@ -68,6 +68,7 @@ import org.apache.hadoop.fs.Path;
|
|||||||
import org.apache.log4j.LogManager;
|
import org.apache.log4j.LogManager;
|
||||||
import org.apache.log4j.Logger;
|
import org.apache.log4j.Logger;
|
||||||
import org.apache.parquet.avro.AvroSchemaConverter;
|
import org.apache.parquet.avro.AvroSchemaConverter;
|
||||||
|
import org.apache.parquet.schema.MessageType;
|
||||||
import org.apache.spark.SparkConf;
|
import org.apache.spark.SparkConf;
|
||||||
import org.apache.spark.api.java.JavaSparkContext;
|
import org.apache.spark.api.java.JavaSparkContext;
|
||||||
|
|
||||||
@@ -808,9 +809,14 @@ public class HoodieMetadataTableValidator implements Serializable {
|
|||||||
for (String logFilePathStr : logFilePathSet) {
|
for (String logFilePathStr : logFilePathSet) {
|
||||||
HoodieLogFormat.Reader reader = null;
|
HoodieLogFormat.Reader reader = null;
|
||||||
try {
|
try {
|
||||||
Schema readerSchema =
|
MessageType messageType =
|
||||||
converter.convert(Objects.requireNonNull(
|
TableSchemaResolver.readSchemaFromLogFile(fs, new Path(logFilePathStr));
|
||||||
TableSchemaResolver.readSchemaFromLogFile(fs, new Path(logFilePathStr))));
|
if (messageType == null) {
|
||||||
|
LOG.warn(String.format("Cannot read schema from log file %s. "
|
||||||
|
+ "Skip the check as it's likely being written by an inflight instant.", logFilePathStr));
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
Schema readerSchema = converter.convert(messageType);
|
||||||
reader =
|
reader =
|
||||||
HoodieLogFormat.newReader(fs, new HoodieLogFile(new Path(logFilePathStr)), readerSchema);
|
HoodieLogFormat.newReader(fs, new HoodieLogFile(new Path(logFilePathStr)), readerSchema);
|
||||||
// read the avro blocks
|
// read the avro blocks
|
||||||
@@ -829,7 +835,9 @@ public class HoodieMetadataTableValidator implements Serializable {
|
|||||||
LOG.warn("There is no log block in " + logFilePathStr);
|
LOG.warn("There is no log block in " + logFilePathStr);
|
||||||
}
|
}
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
throw new HoodieValidationException("Validation failed due to IOException", e);
|
LOG.warn(String.format("Cannot read log file %s: %s. "
|
||||||
|
+ "Skip the check as it's likely being written by an inflight instant.",
|
||||||
|
logFilePathStr, e.getMessage()), e);
|
||||||
} finally {
|
} finally {
|
||||||
FileIOUtils.closeQuietly(reader);
|
FileIOUtils.closeQuietly(reader);
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user