1
0

[HUDI-4458] Add a converter cache for flink ColumnStatsIndices (#6205)

This commit is contained in:
Danny Chan
2022-07-25 17:49:01 +08:00
committed by GitHub
parent f6e7227ed5
commit b513232449

View File

@@ -53,6 +53,7 @@ import java.util.Set;
import java.util.SortedMap; import java.util.SortedMap;
import java.util.TreeMap; import java.util.TreeMap;
import java.util.TreeSet; import java.util.TreeSet;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import java.util.stream.Stream; import java.util.stream.Stream;
@@ -161,6 +162,7 @@ public class ColumnStatsIndices {
.filter(indexedColumns::contains) .filter(indexedColumns::contains)
.collect(Collectors.toCollection(TreeSet::new)); .collect(Collectors.toCollection(TreeSet::new));
final Map<LogicalType, AvroToRowDataConverters.AvroToRowDataConverter> converters = new ConcurrentHashMap<>();
Map<StringData, List<RowData>> fileNameToRows = colStats.stream().parallel() Map<StringData, List<RowData>> fileNameToRows = colStats.stream().parallel()
.filter(row -> sortedTargetColumns.contains(row.getString(ORD_COL_NAME).toString())) .filter(row -> sortedTargetColumns.contains(row.getString(ORD_COL_NAME).toString()))
.map(row -> { .map(row -> {
@@ -172,7 +174,7 @@ public class ColumnStatsIndices {
} else { } else {
String colName = row.getString(ORD_COL_NAME).toString(); String colName = row.getString(ORD_COL_NAME).toString();
LogicalType colType = tableFieldTypeMap.get(colName); LogicalType colType = tableFieldTypeMap.get(colName);
return unpackMinMaxVal(row, colType); return unpackMinMaxVal(row, colType, converters);
} }
}).collect(Collectors.groupingBy(rowData -> rowData.getString(ORD_FILE_NAME))); }).collect(Collectors.groupingBy(rowData -> rowData.getString(ORD_FILE_NAME)));
@@ -222,7 +224,8 @@ public class ColumnStatsIndices {
private static RowData unpackMinMaxVal( private static RowData unpackMinMaxVal(
RowData row, RowData row,
LogicalType colType) { LogicalType colType,
Map<LogicalType, AvroToRowDataConverters.AvroToRowDataConverter> converters) {
RowData minValueStruct = row.getRow(ORD_MIN_VAL, 1); RowData minValueStruct = row.getRow(ORD_MIN_VAL, 1);
RowData maxValueStruct = row.getRow(ORD_MAX_VAL, 1); RowData maxValueStruct = row.getRow(ORD_MAX_VAL, 1);
@@ -230,8 +233,8 @@ public class ColumnStatsIndices {
checkState(minValueStruct != null && maxValueStruct != null, checkState(minValueStruct != null && maxValueStruct != null,
"Invalid Column Stats record: either both min/max have to be null, or both have to be non-null"); "Invalid Column Stats record: either both min/max have to be null, or both have to be non-null");
Object minValue = tryUnpackNonNullVal(minValueStruct, colType); Object minValue = tryUnpackNonNullVal(minValueStruct, colType, converters);
Object maxValue = tryUnpackNonNullVal(maxValueStruct, colType); Object maxValue = tryUnpackNonNullVal(maxValueStruct, colType, converters);
// the column schema: // the column schema:
// |- file_name: string // |- file_name: string
@@ -252,18 +255,24 @@ public class ColumnStatsIndices {
return unpackedRow; return unpackedRow;
} }
private static Object tryUnpackNonNullVal(RowData rowData, LogicalType colType) { private static Object tryUnpackNonNullVal(
RowData rowData,
LogicalType colType,
Map<LogicalType, AvroToRowDataConverters.AvroToRowDataConverter> converters) {
for (int i = 0; i < rowData.getArity(); i++) { for (int i = 0; i < rowData.getArity(); i++) {
// row data converted from avro is definitely generic. // row data converted from avro is definitely generic.
Object nested = ((GenericRowData) rowData).getField(i); Object nested = ((GenericRowData) rowData).getField(i);
if (nested != null) { if (nested != null) {
return doUnpack(nested, colType); return doUnpack(nested, colType, converters);
} }
} }
return null; return null;
} }
private static Object doUnpack(Object rawVal, LogicalType logicalType) { private static Object doUnpack(
Object rawVal,
LogicalType logicalType,
Map<LogicalType, AvroToRowDataConverters.AvroToRowDataConverter> converters) {
// fix time unit // fix time unit
switch (logicalType.getTypeRoot()) { switch (logicalType.getTypeRoot()) {
case TIME_WITHOUT_TIME_ZONE: case TIME_WITHOUT_TIME_ZONE:
@@ -287,7 +296,8 @@ public class ColumnStatsIndices {
default: default:
// no operation // no operation
} }
AvroToRowDataConverters.AvroToRowDataConverter converter = AvroToRowDataConverters.createConverter(logicalType); AvroToRowDataConverters.AvroToRowDataConverter converter =
converters.computeIfAbsent(logicalType, k -> AvroToRowDataConverters.createConverter(logicalType));
return converter.convert(rawVal); return converter.convert(rawVal);
} }