1
0

[HUDI-3684] Fixing NPE in ParquetUtils (#5102)

* Make sure nulls are properly handled in `HoodieColumnRangeMetadata`
This commit is contained in:
Alexey Kudinkin
2022-03-24 05:07:38 -07:00
committed by GitHub
parent fe2c3989e3
commit ccc3728002
4 changed files with 75 additions and 25 deletions

View File

@@ -18,19 +18,27 @@
package org.apache.hudi.common.model; package org.apache.hudi.common.model;
import javax.annotation.Nullable;
import java.io.Serializable; import java.io.Serializable;
import java.util.Arrays;
import java.util.Comparator; import java.util.Comparator;
import java.util.Objects; import java.util.Objects;
import java.util.function.BiFunction; import java.util.function.BiFunction;
import java.util.stream.Stream;
/** /**
* Hoodie Range metadata. * Hoodie metadata for the column range of data stored in columnar format (like Parquet)
*
* NOTE: {@link Comparable} is used as raw-type so that we can handle polymorphism, where
* caller apriori is not aware of the type {@link HoodieColumnRangeMetadata} is
* associated with
*/ */
public class HoodieColumnRangeMetadata<T> implements Serializable { @SuppressWarnings("rawtype")
public class HoodieColumnRangeMetadata<T extends Comparable> implements Serializable {
private final String filePath; private final String filePath;
private final String columnName; private final String columnName;
@Nullable
private final T minValue; private final T minValue;
@Nullable
private final T maxValue; private final T maxValue;
private final long nullCount; private final long nullCount;
private final long valueCount; private final long valueCount;
@@ -38,21 +46,30 @@ public class HoodieColumnRangeMetadata<T> implements Serializable {
private final long totalUncompressedSize; private final long totalUncompressedSize;
public static final BiFunction<HoodieColumnRangeMetadata<Comparable>, HoodieColumnRangeMetadata<Comparable>, HoodieColumnRangeMetadata<Comparable>> COLUMN_RANGE_MERGE_FUNCTION = public static final BiFunction<HoodieColumnRangeMetadata<Comparable>, HoodieColumnRangeMetadata<Comparable>, HoodieColumnRangeMetadata<Comparable>> COLUMN_RANGE_MERGE_FUNCTION =
(oldColumnRange, newColumnRange) -> new HoodieColumnRangeMetadata<>( (oldColumnRange, newColumnRange) -> new HoodieColumnRangeMetadata<Comparable>(
newColumnRange.getFilePath(), newColumnRange.getFilePath(),
newColumnRange.getColumnName(), newColumnRange.getColumnName(),
(Comparable) Arrays.asList(oldColumnRange.getMinValue(), newColumnRange.getMinValue()) (Comparable) Stream.of(oldColumnRange.getMinValue(), newColumnRange.getMinValue())
.stream().filter(Objects::nonNull).min(Comparator.naturalOrder()).orElse(null), .filter(Objects::nonNull)
(Comparable) Arrays.asList(oldColumnRange.getMinValue(), newColumnRange.getMinValue()) .min(Comparator.naturalOrder())
.stream().filter(Objects::nonNull).max(Comparator.naturalOrder()).orElse(null), .orElse(null),
(Comparable) Stream.of(oldColumnRange.getMinValue(), newColumnRange.getMinValue())
.filter(Objects::nonNull)
.max(Comparator.naturalOrder()).orElse(null),
oldColumnRange.getNullCount() + newColumnRange.getNullCount(), oldColumnRange.getNullCount() + newColumnRange.getNullCount(),
oldColumnRange.getValueCount() + newColumnRange.getValueCount(), oldColumnRange.getValueCount() + newColumnRange.getValueCount(),
oldColumnRange.getTotalSize() + newColumnRange.getTotalSize(), oldColumnRange.getTotalSize() + newColumnRange.getTotalSize(),
oldColumnRange.getTotalUncompressedSize() + newColumnRange.getTotalUncompressedSize() oldColumnRange.getTotalUncompressedSize() + newColumnRange.getTotalUncompressedSize()
); );
public HoodieColumnRangeMetadata(final String filePath, final String columnName, final T minValue, final T maxValue, private HoodieColumnRangeMetadata(String filePath,
final long nullCount, long valueCount, long totalSize, long totalUncompressedSize) { String columnName,
@Nullable T minValue,
@Nullable T maxValue,
long nullCount,
long valueCount,
long totalSize,
long totalUncompressedSize) {
this.filePath = filePath; this.filePath = filePath;
this.columnName = columnName; this.columnName = columnName;
this.minValue = minValue; this.minValue = minValue;
@@ -71,10 +88,12 @@ public class HoodieColumnRangeMetadata<T> implements Serializable {
return this.columnName; return this.columnName;
} }
@Nullable
public T getMinValue() { public T getMinValue() {
return this.minValue; return this.minValue;
} }
@Nullable
public T getMaxValue() { public T getMaxValue() {
return this.maxValue; return this.maxValue;
} }
@@ -133,6 +152,23 @@ public class HoodieColumnRangeMetadata<T> implements Serializable {
+ '}'; + '}';
} }
public static <T extends Comparable<T>> HoodieColumnRangeMetadata<T> create(String filePath,
String columnName,
@Nullable T minValue,
@Nullable T maxValue,
long nullCount,
long valueCount,
long totalSize,
long totalUncompressedSize) {
return new HoodieColumnRangeMetadata<>(filePath, columnName, minValue, maxValue, nullCount, valueCount, totalSize, totalUncompressedSize);
}
@SuppressWarnings("rawtype")
public static HoodieColumnRangeMetadata<Comparable> stub(String filePath,
String columnName) {
return new HoodieColumnRangeMetadata<>(filePath, columnName, null, null, -1, -1, -1, -1);
}
/** /**
* Statistics that is collected in {@link org.apache.hudi.metadata.MetadataPartitionType#COLUMN_STATS} index. * Statistics that is collected in {@link org.apache.hudi.metadata.MetadataPartitionType#COLUMN_STATS} index.
*/ */
@@ -144,6 +180,6 @@ public class HoodieColumnRangeMetadata<T> implements Serializable {
public static final String TOTAL_SIZE = "total_size"; public static final String TOTAL_SIZE = "total_size";
public static final String TOTAL_UNCOMPRESSED_SIZE = "total_uncompressed_size"; public static final String TOTAL_UNCOMPRESSED_SIZE = "total_uncompressed_size";
private Stats() { } private Stats() {}
} }
} }

View File

@@ -58,6 +58,7 @@ import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Set; import java.util.Set;
import java.util.function.Function; import java.util.function.Function;
import java.util.stream.Collector;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import java.util.stream.Stream; import java.util.stream.Stream;
@@ -288,18 +289,27 @@ public class ParquetUtils extends BaseFileUtils {
/** /**
* Parse min/max statistics stored in parquet footers for all columns. * Parse min/max statistics stored in parquet footers for all columns.
*/ */
@SuppressWarnings("rawtype")
public List<HoodieColumnRangeMetadata<Comparable>> readRangeFromParquetMetadata( public List<HoodieColumnRangeMetadata<Comparable>> readRangeFromParquetMetadata(
@Nonnull Configuration conf, @Nonnull Configuration conf,
@Nonnull Path parquetFilePath, @Nonnull Path parquetFilePath,
@Nonnull List<String> cols @Nonnull List<String> cols
) { ) {
ParquetMetadata metadata = readMetadata(conf, parquetFilePath); ParquetMetadata metadata = readMetadata(conf, parquetFilePath);
// NOTE: This collector has to have fully specialized generic type params since
// Java 1.8 struggles to infer them
Collector<HoodieColumnRangeMetadata<Comparable>, ?, Map<String, List<HoodieColumnRangeMetadata<Comparable>>>> groupingByCollector =
Collectors.groupingBy(HoodieColumnRangeMetadata::getColumnName);
// Collect stats from all individual Parquet blocks // Collect stats from all individual Parquet blocks
Map<String, List<HoodieColumnRangeMetadata<Comparable>>> columnToStatsListMap = metadata.getBlocks().stream().sequential() Map<String, List<HoodieColumnRangeMetadata<Comparable>>> columnToStatsListMap =
.flatMap(blockMetaData -> blockMetaData.getColumns().stream() (Map<String, List<HoodieColumnRangeMetadata<Comparable>>>) metadata.getBlocks().stream().sequential()
.filter(f -> cols.contains(f.getPath().toDotString())) .flatMap(blockMetaData ->
blockMetaData.getColumns().stream()
.filter(f -> cols.contains(f.getPath().toDotString()))
.map(columnChunkMetaData -> .map(columnChunkMetaData ->
new HoodieColumnRangeMetadata<Comparable>( HoodieColumnRangeMetadata.<Comparable>create(
parquetFilePath.getName(), parquetFilePath.getName(),
columnChunkMetaData.getPath().toDotString(), columnChunkMetaData.getPath().toDotString(),
convertToNativeJavaType( convertToNativeJavaType(
@@ -312,7 +322,8 @@ public class ParquetUtils extends BaseFileUtils {
columnChunkMetaData.getValueCount(), columnChunkMetaData.getValueCount(),
columnChunkMetaData.getTotalSize(), columnChunkMetaData.getTotalSize(),
columnChunkMetaData.getTotalUncompressedSize())) columnChunkMetaData.getTotalUncompressedSize()))
).collect(Collectors.groupingBy(HoodieColumnRangeMetadata::getColumnName)); )
.collect(groupingByCollector);
// Combine those into file-level statistics // Combine those into file-level statistics
// NOTE: Inlining this var makes javac (1.8) upset (due to its inability to infer // NOTE: Inlining this var makes javac (1.8) upset (due to its inability to infer
@@ -360,7 +371,7 @@ public class ParquetUtils extends BaseFileUtils {
maxValue = one.getMaxValue(); maxValue = one.getMaxValue();
} }
return new HoodieColumnRangeMetadata<T>( return HoodieColumnRangeMetadata.create(
one.getFilePath(), one.getFilePath(),
one.getColumnName(), minValue, maxValue, one.getColumnName(), minValue, maxValue,
one.getNullCount() + another.getNullCount(), one.getNullCount() + another.getNullCount(),
@@ -369,7 +380,11 @@ public class ParquetUtils extends BaseFileUtils {
one.getTotalUncompressedSize() + another.getTotalUncompressedSize()); one.getTotalUncompressedSize() + another.getTotalUncompressedSize());
} }
private static Comparable<?> convertToNativeJavaType(PrimitiveType primitiveType, Comparable val) { private static Comparable<?> convertToNativeJavaType(PrimitiveType primitiveType, Comparable<?> val) {
if (val == null) {
return null;
}
if (primitiveType.getOriginalType() == OriginalType.DECIMAL) { if (primitiveType.getOriginalType() == OriginalType.DECIMAL) {
return extractDecimal(val, primitiveType.getDecimalMetadata()); return extractDecimal(val, primitiveType.getDecimalMetadata());
} else if (primitiveType.getOriginalType() == OriginalType.DATE) { } else if (primitiveType.getOriginalType() == OriginalType.DATE) {

View File

@@ -960,8 +960,7 @@ public class HoodieTableMetadataUtil {
} else { } else {
// TODO we should delete records instead of stubbing them // TODO we should delete records instead of stubbing them
columnRangeMetadataList = columnRangeMetadataList =
columnsToIndex.stream().map(entry -> new HoodieColumnRangeMetadata<Comparable>(fileName, columnsToIndex.stream().map(entry -> HoodieColumnRangeMetadata.stub(fileName, entry))
entry, null, null, 0, 0, 0, 0))
.collect(Collectors.toList()); .collect(Collectors.toList());
} }
return HoodieMetadataPayload.createColumnStatsRecords(partitionPath, columnRangeMetadataList, isDeleted); return HoodieMetadataPayload.createColumnStatsRecords(partitionPath, columnRangeMetadataList, isDeleted);
@@ -1012,11 +1011,11 @@ public class HoodieTableMetadataUtil {
Map<String, HoodieColumnRangeMetadata<Comparable>> columnRangeMap, Map<String, HoodieColumnRangeMetadata<Comparable>> columnRangeMap,
Map<String, Map<String, Object>> columnToStats) { Map<String, Map<String, Object>> columnToStats) {
Map<String, Object> columnStats = columnToStats.get(field.name()); Map<String, Object> columnStats = columnToStats.get(field.name());
HoodieColumnRangeMetadata<Comparable> columnRangeMetadata = new HoodieColumnRangeMetadata<>( HoodieColumnRangeMetadata<Comparable> columnRangeMetadata = HoodieColumnRangeMetadata.create(
filePath, filePath,
field.name(), field.name(),
String.valueOf(columnStats.get(MIN)), (Comparable) String.valueOf(columnStats.get(MIN)),
String.valueOf(columnStats.get(MAX)), (Comparable) String.valueOf(columnStats.get(MAX)),
Long.parseLong(columnStats.getOrDefault(NULL_COUNT, 0).toString()), Long.parseLong(columnStats.getOrDefault(NULL_COUNT, 0).toString()),
Long.parseLong(columnStats.getOrDefault(VALUE_COUNT, 0).toString()), Long.parseLong(columnStats.getOrDefault(VALUE_COUNT, 0).toString()),
Long.parseLong(columnStats.getOrDefault(TOTAL_SIZE, 0).toString()), Long.parseLong(columnStats.getOrDefault(TOTAL_SIZE, 0).toString()),

View File

@@ -780,7 +780,7 @@ public class HoodieMetadataTableValidator implements Serializable {
return allColumnNameList.stream() return allColumnNameList.stream()
.flatMap(columnName -> .flatMap(columnName ->
tableMetadata.getColumnStats(partitionFileNameList, columnName).values().stream() tableMetadata.getColumnStats(partitionFileNameList, columnName).values().stream()
.map(stats -> new HoodieColumnRangeMetadata<>( .map(stats -> HoodieColumnRangeMetadata.create(
stats.getFileName(), stats.getFileName(),
columnName, columnName,
stats.getMinValue(), stats.getMinValue(),
@@ -799,7 +799,7 @@ public class HoodieMetadataTableValidator implements Serializable {
metaClient.getHadoopConf(), metaClient.getHadoopConf(),
new Path(new Path(metaClient.getBasePath(), partitionPath), filename), new Path(new Path(metaClient.getBasePath(), partitionPath), filename),
allColumnNameList).stream()) allColumnNameList).stream())
.map(rangeMetadata -> new HoodieColumnRangeMetadata<String>( .map(rangeMetadata -> HoodieColumnRangeMetadata.create(
rangeMetadata.getFilePath(), rangeMetadata.getFilePath(),
rangeMetadata.getColumnName(), rangeMetadata.getColumnName(),
// Note: here we ignore the type in the validation, // Note: here we ignore the type in the validation,