From b513232449a4d85648dfb4675c0bdb1073efbcf2 Mon Sep 17 00:00:00 2001 From: Danny Chan Date: Mon, 25 Jul 2022 17:49:01 +0800 Subject: [PATCH] [HUDI-4458] Add a converter cache for flink ColumnStatsIndices (#6205) --- .../hudi/source/stats/ColumnStatsIndices.java | 26 +++++++++++++------ 1 file changed, 18 insertions(+), 8 deletions(-) diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/stats/ColumnStatsIndices.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/stats/ColumnStatsIndices.java index d033c1d87..8ec0eafde 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/stats/ColumnStatsIndices.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/stats/ColumnStatsIndices.java @@ -53,6 +53,7 @@ import java.util.Set; import java.util.SortedMap; import java.util.TreeMap; import java.util.TreeSet; +import java.util.concurrent.ConcurrentHashMap; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -161,6 +162,7 @@ public class ColumnStatsIndices { .filter(indexedColumns::contains) .collect(Collectors.toCollection(TreeSet::new)); + final Map converters = new ConcurrentHashMap<>(); Map> fileNameToRows = colStats.stream().parallel() .filter(row -> sortedTargetColumns.contains(row.getString(ORD_COL_NAME).toString())) .map(row -> { @@ -172,7 +174,7 @@ public class ColumnStatsIndices { } else { String colName = row.getString(ORD_COL_NAME).toString(); LogicalType colType = tableFieldTypeMap.get(colName); - return unpackMinMaxVal(row, colType); + return unpackMinMaxVal(row, colType, converters); } }).collect(Collectors.groupingBy(rowData -> rowData.getString(ORD_FILE_NAME))); @@ -222,7 +224,8 @@ public class ColumnStatsIndices { private static RowData unpackMinMaxVal( RowData row, - LogicalType colType) { + LogicalType colType, + Map converters) { RowData minValueStruct = row.getRow(ORD_MIN_VAL, 1); RowData maxValueStruct = row.getRow(ORD_MAX_VAL, 1); @@ -230,8 +233,8 @@ public class ColumnStatsIndices { checkState(minValueStruct != null && maxValueStruct != null, "Invalid Column Stats record: either both min/max have to be null, or both have to be non-null"); - Object minValue = tryUnpackNonNullVal(minValueStruct, colType); - Object maxValue = tryUnpackNonNullVal(maxValueStruct, colType); + Object minValue = tryUnpackNonNullVal(minValueStruct, colType, converters); + Object maxValue = tryUnpackNonNullVal(maxValueStruct, colType, converters); // the column schema: // |- file_name: string @@ -252,18 +255,24 @@ public class ColumnStatsIndices { return unpackedRow; } - private static Object tryUnpackNonNullVal(RowData rowData, LogicalType colType) { + private static Object tryUnpackNonNullVal( + RowData rowData, + LogicalType colType, + Map converters) { for (int i = 0; i < rowData.getArity(); i++) { // row data converted from avro is definitely generic. Object nested = ((GenericRowData) rowData).getField(i); if (nested != null) { - return doUnpack(nested, colType); + return doUnpack(nested, colType, converters); } } return null; } - private static Object doUnpack(Object rawVal, LogicalType logicalType) { + private static Object doUnpack( + Object rawVal, + LogicalType logicalType, + Map converters) { // fix time unit switch (logicalType.getTypeRoot()) { case TIME_WITHOUT_TIME_ZONE: @@ -287,7 +296,8 @@ public class ColumnStatsIndices { default: // no operation } - AvroToRowDataConverters.AvroToRowDataConverter converter = AvroToRowDataConverters.createConverter(logicalType); + AvroToRowDataConverters.AvroToRowDataConverter converter = + converters.computeIfAbsent(logicalType, k -> AvroToRowDataConverters.createConverter(logicalType)); return converter.convert(rawVal); }