1
0

[HUDI-4353] Column stats data skipping for flink (#6026)

This commit is contained in:
Danny Chan
2022-07-03 08:29:31 +08:00
committed by GitHub
parent bdf73b2650
commit 47792a3186
30 changed files with 1930 additions and 81 deletions

View File

@@ -236,6 +236,13 @@ public class FlinkOptions extends HoodieConfig {
.noDefaultValue()
.withDescription("End commit instant for reading, the commit time format should be 'yyyyMMddHHmmss'");
public static final ConfigOption<Boolean> READ_DATA_SKIPPING_ENABLED = ConfigOptions
.key("read.data.skipping.enabled")
.booleanType()
.defaultValue(false)
.withDescription("Enables data-skipping allowing queries to leverage indexes to reduce the search space by"
+ "skipping over files");
// ------------------------------------------------------------------------
// Write Options
// ------------------------------------------------------------------------

View File

@@ -18,11 +18,11 @@
package org.apache.hudi.configuration;
import org.apache.hudi.util.FlinkClientUtil;
import org.apache.flink.configuration.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hudi.util.FlinkClientUtil;
import java.util.Map;
/**

View File

@@ -18,7 +18,6 @@
package org.apache.hudi.sink.meta;
import org.apache.flink.configuration.Configuration;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.util.ValidationUtils;
@@ -26,6 +25,7 @@ import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.configuration.HadoopConfigurations;
import org.apache.hudi.exception.HoodieException;
import org.apache.flink.configuration.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.slf4j.Logger;

View File

@@ -18,7 +18,6 @@
package org.apache.hudi.sink.utils;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.hudi.aws.sync.AwsGlueCatalogSyncTool;
import org.apache.hudi.common.config.SerializableConfiguration;
import org.apache.hudi.common.fs.FSUtils;
@@ -29,6 +28,7 @@ import org.apache.hudi.hive.HiveSyncTool;
import org.apache.hudi.hive.ddl.HiveSyncMode;
import org.apache.hudi.table.format.FilePathUtils;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.configuration.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.hive.conf.HiveConf;

View File

@@ -155,6 +155,10 @@ public class NonThrownExecutor implements AutoCloseable {
// -------------------------------------------------------------------------
// Inner Class
// -------------------------------------------------------------------------
/**
* The exception hook.
*/
public interface ExceptionHook {
void apply(String errMsg, Throwable t);
}

View File

@@ -21,14 +21,24 @@ package org.apache.hudi.source;
import org.apache.hudi.client.common.HoodieFlinkEngineContext;
import org.apache.hudi.common.config.HoodieMetadataConfig;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.configuration.HadoopConfigurations;
import org.apache.hudi.source.stats.ColumnStatsIndices;
import org.apache.hudi.source.stats.ExpressionEvaluator;
import org.apache.hudi.util.DataTypeUtils;
import org.apache.hudi.util.ExpressionUtils;
import org.apache.hudi.util.StreamerUtil;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.expressions.ResolvedExpression;
import org.apache.flink.table.types.logical.RowType;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.annotation.Nullable;
@@ -40,6 +50,7 @@ import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.stream.Collectors;
/**
* A file index which supports listing files efficiently through metadata table.
@@ -47,19 +58,26 @@ import java.util.Set;
* <p>It caches the partition paths to avoid redundant look up.
*/
public class FileIndex {
private static final Logger LOG = LoggerFactory.getLogger(FileIndex.class);
private final Path path;
private final RowType rowType;
private final HoodieMetadataConfig metadataConfig;
private List<String> partitionPaths; // cache of partition paths
private final boolean dataSkippingEnabled;
private List<String> partitionPaths; // cache of partition paths
private List<ResolvedExpression> filters; // push down filters
private final boolean tableExists;
private FileIndex(Path path, Configuration conf) {
private FileIndex(Path path, Configuration conf, RowType rowType) {
this.path = path;
this.rowType = rowType;
this.metadataConfig = metadataConfig(conf);
this.dataSkippingEnabled = conf.getBoolean(FlinkOptions.READ_DATA_SKIPPING_ENABLED);
this.tableExists = StreamerUtil.tableExists(path.toString(), HadoopConfigurations.getHadoopConf(conf));
}
public static FileIndex instance(Path path, Configuration conf) {
return new FileIndex(path, conf);
public static FileIndex instance(Path path, Configuration conf, RowType rowType) {
return new FileIndex(path, conf, rowType);
}
/**
@@ -119,9 +137,17 @@ public class FileIndex {
return new FileStatus[0];
}
String[] partitions = getOrBuildPartitionPaths().stream().map(p -> fullPartitionPath(path, p)).toArray(String[]::new);
return FSUtils.getFilesInPartitions(HoodieFlinkEngineContext.DEFAULT, metadataConfig, path.toString(),
FileStatus[] allFileStatus = FSUtils.getFilesInPartitions(HoodieFlinkEngineContext.DEFAULT, metadataConfig, path.toString(),
partitions, "/tmp/")
.values().stream().flatMap(Arrays::stream).toArray(FileStatus[]::new);
Set<String> candidateFiles = candidateFilesInMetadataTable(allFileStatus);
if (candidateFiles == null) {
// no need to filter by col stats or error occurs.
return allFileStatus;
}
return Arrays.stream(allFileStatus).parallel()
.filter(fileStatus -> candidateFiles.contains(fileStatus.getPath().getName()))
.toArray(FileStatus[]::new);
}
/**
@@ -159,10 +185,96 @@ public class FileIndex {
}
}
/**
* Sets up pushed down filters.
*/
public void setFilters(List<ResolvedExpression> filters) {
if (filters.size() > 0) {
this.filters = new ArrayList<>(filters);
}
}
// -------------------------------------------------------------------------
// Utilities
// -------------------------------------------------------------------------
/**
* Computes pruned list of candidate base-files' names based on provided list of data filters.
* conditions, by leveraging Metadata Table's Column Statistics index (hereon referred as ColStats for brevity)
* bearing "min", "max", "num_nulls" statistics for all columns.
*
* <p>NOTE: This method has to return complete set of candidate files, since only provided candidates will
* ultimately be scanned as part of query execution. Hence, this method has to maintain the
* invariant of conservatively including every base-file's name, that is NOT referenced in its index.
*
* <p>The {@code filters} must all be simple.
*
* @return list of pruned (data-skipped) candidate base-files' names
*/
@Nullable
private Set<String> candidateFilesInMetadataTable(FileStatus[] allFileStatus) {
// NOTE: Data Skipping is only effective when it references columns that are indexed w/in
// the Column Stats Index (CSI). Following cases could not be effectively handled by Data Skipping:
// - Expressions on top-level column's fields (ie, for ex filters like "struct.field > 0", since
// CSI only contains stats for top-level columns, in this case for "struct")
// - Any expression not directly referencing top-level column (for ex, sub-queries, since there's
// nothing CSI in particular could be applied for)
if (!metadataConfig.enabled() || !dataSkippingEnabled) {
validateConfig();
return null;
}
if (this.filters == null || this.filters.size() == 0) {
return null;
}
String[] referencedCols = ExpressionUtils.referencedColumns(filters);
if (referencedCols.length == 0) {
return null;
}
try {
final List<RowData> colStats = ColumnStatsIndices.readColumnStatsIndex(path.toString(), metadataConfig, referencedCols);
final Pair<List<RowData>, String[]> colStatsTable = ColumnStatsIndices.transposeColumnStatsIndex(colStats, referencedCols, rowType);
List<RowData> transposedColStats = colStatsTable.getLeft();
String[] queryCols = colStatsTable.getRight();
if (queryCols.length == 0) {
// the indexed columns have no intersection with the referenced columns, returns early
return null;
}
RowType.RowField[] queryFields = DataTypeUtils.projectRowFields(rowType, queryCols);
Set<String> allIndexedFileNames = transposedColStats.stream().parallel()
.map(row -> row.getString(0).toString())
.collect(Collectors.toSet());
Set<String> candidateFileNames = transposedColStats.stream().parallel()
.filter(row -> ExpressionEvaluator.filterExprs(filters, row, queryFields))
.map(row -> row.getString(0).toString())
.collect(Collectors.toSet());
// NOTE: Col-Stats Index isn't guaranteed to have complete set of statistics for every
// base-file: since it's bound to clustering, which could occur asynchronously
// at arbitrary point in time, and is not likely to be touching all the base files.
//
// To close that gap, we manually compute the difference b/w all indexed (by col-stats-index)
// files and all outstanding base-files, and make sure that all base files not
// represented w/in the index are included in the output of this method
Set<String> nonIndexedFileNames = Arrays.stream(allFileStatus)
.map(fileStatus -> fileStatus.getPath().getName()).collect(Collectors.toSet());
nonIndexedFileNames.removeAll(allIndexedFileNames);
candidateFileNames.addAll(nonIndexedFileNames);
return candidateFileNames;
} catch (Throwable throwable) {
LOG.warn("Read column stats for data skipping error", throwable);
return null;
}
}
private void validateConfig() {
if (dataSkippingEnabled && !metadataConfig.enabled()) {
LOG.warn("Data skipping requires Metadata Table to be enabled! "
+ "isMetadataTableEnabled = {}", metadataConfig.enabled());
}
}
/**
* Returns all the relative partition paths.
*

View File

@@ -35,6 +35,7 @@ import org.apache.hudi.table.format.mor.MergeOnReadInputSplit;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.fs.Path;
import org.apache.flink.table.types.logical.RowType;
import org.apache.hadoop.fs.FileStatus;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -77,6 +78,7 @@ public class IncrementalInputSplits implements Serializable {
private static final Logger LOG = LoggerFactory.getLogger(IncrementalInputSplits.class);
private final Configuration conf;
private final Path path;
private final RowType rowType;
private final long maxCompactionMemoryInBytes;
// for partition pruning
private final Set<String> requiredPartitions;
@@ -86,11 +88,13 @@ public class IncrementalInputSplits implements Serializable {
private IncrementalInputSplits(
Configuration conf,
Path path,
RowType rowType,
long maxCompactionMemoryInBytes,
@Nullable Set<String> requiredPartitions,
boolean skipCompaction) {
this.conf = conf;
this.path = path;
this.rowType = rowType;
this.maxCompactionMemoryInBytes = maxCompactionMemoryInBytes;
this.requiredPartitions = requiredPartitions;
this.skipCompaction = skipCompaction;
@@ -167,7 +171,7 @@ public class IncrementalInputSplits implements Serializable {
if (instantRange == null) {
// reading from the earliest, scans the partitions and files directly.
FileIndex fileIndex = FileIndex.instance(new org.apache.hadoop.fs.Path(path.toUri()), conf);
FileIndex fileIndex = FileIndex.instance(new org.apache.hadoop.fs.Path(path.toUri()), conf, rowType);
if (this.requiredPartitions != null) {
// apply partition push down
fileIndex.setPartitionPaths(this.requiredPartitions);
@@ -349,6 +353,7 @@ public class IncrementalInputSplits implements Serializable {
public static class Builder {
private Configuration conf;
private Path path;
private RowType rowType;
private long maxCompactionMemoryInBytes;
// for partition pruning
private Set<String> requiredPartitions;
@@ -368,6 +373,11 @@ public class IncrementalInputSplits implements Serializable {
return this;
}
public Builder rowType(RowType rowType) {
this.rowType = rowType;
return this;
}
public Builder maxCompactionMemoryInBytes(long maxCompactionMemoryInBytes) {
this.maxCompactionMemoryInBytes = maxCompactionMemoryInBytes;
return this;
@@ -384,7 +394,8 @@ public class IncrementalInputSplits implements Serializable {
}
public IncrementalInputSplits build() {
return new IncrementalInputSplits(Objects.requireNonNull(this.conf), Objects.requireNonNull(this.path),
return new IncrementalInputSplits(
Objects.requireNonNull(this.conf), Objects.requireNonNull(this.path), Objects.requireNonNull(this.rowType),
this.maxCompactionMemoryInBytes, this.requiredPartitions, this.skipCompaction);
}
}

View File

@@ -36,6 +36,7 @@ import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.table.types.logical.RowType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -99,6 +100,7 @@ public class StreamReadMonitoringFunction
public StreamReadMonitoringFunction(
Configuration conf,
Path path,
RowType rowType,
long maxCompactionMemoryInBytes,
@Nullable Set<String> requiredPartitionPaths) {
this.conf = conf;
@@ -107,6 +109,7 @@ public class StreamReadMonitoringFunction
this.incrementalInputSplits = IncrementalInputSplits.builder()
.conf(conf)
.path(path)
.rowType(rowType)
.maxCompactionMemoryInBytes(maxCompactionMemoryInBytes)
.requiredPartitions(requiredPartitionPaths)
.skipCompaction(conf.getBoolean(FlinkOptions.READ_STREAMING_SKIP_COMPACT))

View File

@@ -0,0 +1,377 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.source.stats;
import org.apache.hudi.avro.model.HoodieMetadataRecord;
import org.apache.hudi.client.common.HoodieFlinkEngineContext;
import org.apache.hudi.common.config.HoodieMetadataConfig;
import org.apache.hudi.common.data.HoodieData;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.table.view.FileSystemViewStorageConfig;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.common.util.hash.ColumnIndexID;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.metadata.HoodieMetadataPayload;
import org.apache.hudi.metadata.HoodieTableMetadata;
import org.apache.hudi.metadata.HoodieTableMetadataUtil;
import org.apache.hudi.util.AvroSchemaConverter;
import org.apache.hudi.util.AvroToRowDataConverters;
import org.apache.hudi.util.RowDataProjection;
import org.apache.avro.generic.GenericRecord;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.StringData;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.table.types.logical.TimeType;
import org.apache.flink.table.types.logical.TimestampType;
import java.io.IOException;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.SortedMap;
import java.util.TreeMap;
import java.util.TreeSet;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import static org.apache.hudi.common.util.ValidationUtils.checkState;
/**
* Utilities for abstracting away heavy-lifting of interactions with Metadata Table's Column Stats Index,
* providing convenient interfaces to read it, transpose, etc.
*/
public class ColumnStatsIndices {
private static final DataType METADATA_DATA_TYPE = getMetadataDataType();
private static final DataType COL_STATS_DATA_TYPE = getColStatsDataType();
private static final int[] COL_STATS_TARGET_POS = getColStatsTargetPos();
// the column schema:
// |- file_name: string
// |- min_val: row
// |- max_val: row
// |- null_cnt: long
// |- val_cnt: long
// |- column_name: string
private static final int ORD_FILE_NAME = 0;
private static final int ORD_MIN_VAL = 1;
private static final int ORD_MAX_VAL = 2;
private static final int ORD_NULL_CNT = 3;
private static final int ORD_VAL_CNT = 4;
private static final int ORD_COL_NAME = 5;
private ColumnStatsIndices() {
}
public static List<RowData> readColumnStatsIndex(String basePath, HoodieMetadataConfig metadataConfig, String[] targetColumns) {
// NOTE: If specific columns have been provided, we can considerably trim down amount of data fetched
// by only fetching Column Stats Index records pertaining to the requested columns.
// Otherwise, we fall back to read whole Column Stats Index
ValidationUtils.checkArgument(targetColumns.length > 0,
"Column stats is only valid when push down filters have referenced columns");
final List<RowData> metadataRows = readColumnStatsIndexByColumns(basePath, targetColumns, metadataConfig);
return projectNestedColStatsColumns(metadataRows);
}
private static List<RowData> projectNestedColStatsColumns(List<RowData> rows) {
int pos = HoodieMetadataRecord.SCHEMA$.getField(HoodieMetadataPayload.SCHEMA_FIELD_ID_COLUMN_STATS).pos();
RowDataProjection projection = RowDataProjection.instanceV2((RowType) COL_STATS_DATA_TYPE.getLogicalType(), COL_STATS_TARGET_POS);
return rows.stream().parallel()
.map(row -> {
RowData columnStatsField = row.getRow(pos, 9);
return projection.project(columnStatsField);
}).collect(Collectors.toList());
}
/**
* Transposes and converts the raw table format of the Column Stats Index representation,
* where each row/record corresponds to individual (column, file) pair, into the table format
* where each row corresponds to single file with statistic for individual columns collated
* w/in such row:
* <p>
* Metadata Table Column Stats Index format:
*
* <pre>
* +---------------------------+------------+------------+------------+-------------+
* | fileName | columnName | minValue | maxValue | num_nulls |
* +---------------------------+------------+------------+------------+-------------+
* | one_base_file.parquet | A | 1 | 10 | 0 |
* | another_base_file.parquet | A | -10 | 0 | 5 |
* +---------------------------+------------+------------+------------+-------------+
* </pre>
* <p>
* Returned table format
*
* <pre>
* +---------------------------+------------+------------+-------------+
* | file | A_minValue | A_maxValue | A_nullCount |
* +---------------------------+------------+------------+-------------+
* | one_base_file.parquet | 1 | 10 | 0 |
* | another_base_file.parquet | -10 | 0 | 5 |
* +---------------------------+------------+------------+-------------+
* </pre>
* <p>
* NOTE: Column Stats Index might potentially contain statistics for many columns (if not all), while
* query at hand might only be referencing a handful of those. As such, we collect all the
* column references from the filtering expressions, and only transpose records corresponding to the
* columns referenced in those
*
* @param colStats RowData list bearing raw Column Stats Index table
* @param queryColumns target columns to be included into the final table
* @param tableSchema schema of the source data table
* @return reshaped table according to the format outlined above
*/
public static Pair<List<RowData>, String[]> transposeColumnStatsIndex(List<RowData> colStats, String[] queryColumns, RowType tableSchema) {
Map<String, LogicalType> tableFieldTypeMap = tableSchema.getFields().stream()
.collect(Collectors.toMap(RowType.RowField::getName, RowType.RowField::getType));
// NOTE: We have to collect list of indexed columns to make sure we properly align the rows
// w/in the transposed dataset: since some files might not have all the columns indexed
// either due to the Column Stats Index config changes, schema evolution, etc. we have
// to make sure that all the rows w/in transposed data-frame are properly padded (with null
// values) for such file-column combinations
Set<String> indexedColumns = colStats.stream().map(row -> row.getString(ORD_COL_NAME)
.toString()).collect(Collectors.toSet());
// NOTE: We're sorting the columns to make sure final index schema matches layout
// of the transposed table
TreeSet<String> sortedTargetColumns = Arrays.stream(queryColumns).sorted()
.filter(indexedColumns::contains)
.collect(Collectors.toCollection(TreeSet::new));
Map<StringData, List<RowData>> fileNameToRows = colStats.stream().parallel()
.filter(row -> sortedTargetColumns.contains(row.getString(ORD_COL_NAME).toString()))
.map(row -> {
if (row.isNullAt(ORD_MIN_VAL) && row.isNullAt(ORD_MAX_VAL)) {
// Corresponding row could be null in either of the 2 cases
// - Column contains only null values (in that case both min/max have to be nulls)
// - This is a stubbed Column Stats record (used as a tombstone)
return row;
} else {
String colName = row.getString(ORD_COL_NAME).toString();
LogicalType colType = tableFieldTypeMap.get(colName);
return unpackMinMaxVal(row, colType);
}
}).collect(Collectors.groupingBy(rowData -> rowData.getString(ORD_FILE_NAME)));
return Pair.of(foldRowsByFiles(sortedTargetColumns, fileNameToRows), sortedTargetColumns.toArray(new String[0]));
}
private static List<RowData> foldRowsByFiles(
TreeSet<String> sortedTargetColumns,
Map<StringData, List<RowData>> fileNameToRows) {
return fileNameToRows.values().stream().parallel().map(rows -> {
// Rows seq is always non-empty (otherwise it won't be grouped into)
StringData fileName = rows.get(0).getString(ORD_FILE_NAME);
long valueCount = rows.get(0).getLong(ORD_VAL_CNT);
// To properly align individual rows (corresponding to a file) w/in the transposed projection, we need
// to align existing column-stats for individual file with the list of expected ones for the
// whole transposed projection (a superset of all files)
Map<String, RowData> columnRowsMap = rows.stream()
.collect(Collectors.toMap(row -> row.getString(ORD_COL_NAME).toString(), row -> row));
SortedMap<String, RowData> alignedColumnRowsMap = new TreeMap<>();
sortedTargetColumns.forEach(col -> alignedColumnRowsMap.put(col, columnRowsMap.get(col)));
List<Tuple3> columnStats = alignedColumnRowsMap.values().stream().map(row -> {
if (row == null) {
// NOTE: Since we're assuming missing column to essentially contain exclusively
// null values, we set null-count to be equal to value-count (this behavior is
// consistent with reading non-existent columns from Parquet)
return Tuple3.of(null, null, valueCount);
} else {
GenericRowData gr = (GenericRowData) row;
return Tuple3.of(gr.getField(ORD_MIN_VAL), gr.getField(ORD_MAX_VAL), gr.getField(ORD_NULL_CNT));
}
}).collect(Collectors.toList());
GenericRowData foldedRow = new GenericRowData(2 + 3 * columnStats.size());
foldedRow.setField(0, fileName);
foldedRow.setField(1, valueCount);
for (int i = 0; i < columnStats.size(); i++) {
Tuple3 stats = columnStats.get(i);
int startPos = 2 + 3 * i;
foldedRow.setField(startPos, stats.f0);
foldedRow.setField(startPos + 1, stats.f1);
foldedRow.setField(startPos + 2, stats.f2);
}
return foldedRow;
}).collect(Collectors.toList());
}
private static RowData unpackMinMaxVal(
RowData row,
LogicalType colType) {
RowData minValueStruct = row.getRow(ORD_MIN_VAL, 1);
RowData maxValueStruct = row.getRow(ORD_MAX_VAL, 1);
checkState(minValueStruct != null && maxValueStruct != null,
"Invalid Column Stats record: either both min/max have to be null, or both have to be non-null");
Object minValue = tryUnpackNonNullVal(minValueStruct, colType);
Object maxValue = tryUnpackNonNullVal(maxValueStruct, colType);
// the column schema:
// |- file_name: string
// |- min_val: row
// |- max_val: row
// |- null_cnt: long
// |- val_cnt: long
// |- column_name: string
GenericRowData unpackedRow = new GenericRowData(row.getArity());
unpackedRow.setField(0, row.getString(0));
unpackedRow.setField(1, minValue);
unpackedRow.setField(2, maxValue);
unpackedRow.setField(3, row.getLong(3));
unpackedRow.setField(4, row.getLong(4));
unpackedRow.setField(5, row.getString(5));
return unpackedRow;
}
private static Object tryUnpackNonNullVal(RowData rowData, LogicalType colType) {
for (int i = 0; i < rowData.getArity(); i++) {
// row data converted from avro is definitely generic.
Object nested = ((GenericRowData) rowData).getField(i);
if (nested != null) {
return doUnpack(nested, colType);
}
}
return null;
}
private static Object doUnpack(Object rawVal, LogicalType logicalType) {
// fix time unit
switch (logicalType.getTypeRoot()) {
case TIME_WITHOUT_TIME_ZONE:
TimeType timeType = (TimeType) logicalType;
if (timeType.getPrecision() == 3) {
// the precision in HoodieMetadata is 6
rawVal = ((Long) rawVal) / 1000;
} else if (timeType.getPrecision() == 9) {
rawVal = ((Long) rawVal) * 1000;
}
break;
case TIMESTAMP_WITHOUT_TIME_ZONE:
TimestampType timestampType = (TimestampType) logicalType;
if (timestampType.getPrecision() == 3) {
// the precision in HoodieMetadata is 6
rawVal = ((Long) rawVal) / 1000;
} else if (timestampType.getPrecision() == 9) {
rawVal = ((Long) rawVal) * 1000;
}
break;
default:
// no operation
}
AvroToRowDataConverters.AvroToRowDataConverter converter = AvroToRowDataConverters.createConverter(logicalType);
return converter.convert(rawVal);
}
private static List<RowData> readColumnStatsIndexByColumns(
String basePath,
String[] targetColumns,
HoodieMetadataConfig metadataConfig) {
// Read Metadata Table's Column Stats Index into Flink's RowData list by
// - Fetching the records from CSI by key-prefixes (encoded column names)
// - Deserializing fetched records into [[RowData]]s
HoodieTableMetadata metadataTable = HoodieTableMetadata.create(
HoodieFlinkEngineContext.DEFAULT,
metadataConfig, basePath,
FileSystemViewStorageConfig.SPILLABLE_DIR.defaultValue());
// TODO encoding should be done internally w/in HoodieBackedTableMetadata
List<String> encodedTargetColumnNames = Arrays.stream(targetColumns)
.map(colName -> new ColumnIndexID(colName).asBase64EncodedString()).collect(Collectors.toList());
HoodieData<HoodieRecord<HoodieMetadataPayload>> records =
metadataTable.getRecordsByKeyPrefixes(encodedTargetColumnNames, HoodieTableMetadataUtil.PARTITION_NAME_COLUMN_STATS);
org.apache.hudi.util.AvroToRowDataConverters.AvroToRowDataConverter converter =
AvroToRowDataConverters.createRowConverter((RowType) METADATA_DATA_TYPE.getLogicalType());
return records.collectAsList().stream().parallel().map(record -> {
// schema and props are ignored for generating metadata record from the payload
// instead, the underlying file system, or bloom filter, or columns stats metadata (part of payload) are directly used
GenericRecord genericRecord;
try {
genericRecord = (GenericRecord) record.getData().getInsertValue(null, null).orElse(null);
} catch (IOException e) {
throw new HoodieException("Exception while getting insert value from metadata payload");
}
return (RowData) converter.convert(genericRecord);
}
).collect(Collectors.toList());
}
// -------------------------------------------------------------------------
// Utilities
// -------------------------------------------------------------------------
private static class Tuple3 {
public Object f0;
public Object f1;
public Object f2;
private Tuple3(Object f0, Object f1, Object f2) {
this.f0 = f0;
this.f1 = f1;
this.f2 = f2;
}
public static Tuple3 of(Object f0, Object f1, Object f2) {
return new Tuple3(f0, f1, f2);
}
}
private static DataType getMetadataDataType() {
return AvroSchemaConverter.convertToDataType(HoodieMetadataRecord.SCHEMA$);
}
private static DataType getColStatsDataType() {
int pos = HoodieMetadataRecord.SCHEMA$.getField(HoodieMetadataPayload.SCHEMA_FIELD_ID_COLUMN_STATS).pos();
return METADATA_DATA_TYPE.getChildren().get(pos);
}
// the column schema:
// |- file_name: string
// |- min_val: row
// |- max_val: row
// |- null_cnt: long
// |- val_cnt: long
// |- column_name: string
private static int[] getColStatsTargetPos() {
RowType colStatsRowType = (RowType) COL_STATS_DATA_TYPE.getLogicalType();
return Stream.of(
HoodieMetadataPayload.COLUMN_STATS_FIELD_FILE_NAME,
HoodieMetadataPayload.COLUMN_STATS_FIELD_MIN_VALUE,
HoodieMetadataPayload.COLUMN_STATS_FIELD_MAX_VALUE,
HoodieMetadataPayload.COLUMN_STATS_FIELD_NULL_COUNT,
HoodieMetadataPayload.COLUMN_STATS_FIELD_VALUE_COUNT,
HoodieMetadataPayload.COLUMN_STATS_FIELD_COLUMN_NAME)
.mapToInt(colStatsRowType::getFieldIndex)
.toArray();
}
}

View File

@@ -0,0 +1,552 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.source.stats;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.util.ExpressionUtils;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.expressions.CallExpression;
import org.apache.flink.table.expressions.Expression;
import org.apache.flink.table.expressions.FieldReferenceExpression;
import org.apache.flink.table.expressions.ResolvedExpression;
import org.apache.flink.table.expressions.ValueLiteralExpression;
import org.apache.flink.table.functions.BuiltInFunctionDefinitions;
import org.apache.flink.table.functions.FunctionDefinition;
import org.apache.flink.table.types.logical.DecimalType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.RowType;
import javax.validation.constraints.NotNull;
import java.math.BigDecimal;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
/**
* Tool to evaluate the {@link org.apache.flink.table.expressions.ResolvedExpression}s.
*/
public class ExpressionEvaluator {
private static final int IN_PREDICATE_LIMIT = 200;
/**
* Filter the index row with specific data filters and query fields.
*
* @param filters The pushed down data filters
* @param indexRow The index row
* @param queryFields The query fields referenced by the filters
* @return true if the index row should be considered as a candidate
*/
public static boolean filterExprs(List<ResolvedExpression> filters, RowData indexRow, RowType.RowField[] queryFields) {
for (ResolvedExpression filter : filters) {
if (!Evaluator.bindCall((CallExpression) filter, indexRow, queryFields).eval()) {
return false;
}
}
return true;
}
/**
* Used for deciding whether the literal values match the column stats.
* The evaluator can be nested.
*/
public abstract static class Evaluator {
// the constant literal value
protected Object val;
// column stats
protected Object minVal;
protected Object maxVal;
protected long nullCnt = 0;
// referenced field type
protected LogicalType type;
/**
* Binds the evaluator with specific call expression.
*
* <p>Three steps to bind the call:
* 1. map the evaluator instance;
* 2. bind the field reference;
* 3. bind the column stats.
*
* <p>Normalize the expression to simplify the following decision logic:
* always put the literal expression in the right.
*/
public static Evaluator bindCall(CallExpression call, RowData indexRow, RowType.RowField[] queryFields) {
FunctionDefinition funDef = call.getFunctionDefinition();
List<Expression> childExprs = call.getChildren();
boolean normalized = childExprs.get(0) instanceof FieldReferenceExpression;
final Evaluator evaluator;
if (BuiltInFunctionDefinitions.NOT.equals(funDef)) {
evaluator = Not.getInstance();
Evaluator childEvaluator = bindCall((CallExpression) childExprs.get(0), indexRow, queryFields);
return ((Not) evaluator).bindEvaluator(childEvaluator);
}
if (BuiltInFunctionDefinitions.AND.equals(funDef)) {
evaluator = And.getInstance();
Evaluator evaluator1 = bindCall((CallExpression) childExprs.get(0), indexRow, queryFields);
Evaluator evaluator2 = bindCall((CallExpression) childExprs.get(1), indexRow, queryFields);
return ((And) evaluator).bindEvaluator(evaluator1, evaluator2);
}
if (BuiltInFunctionDefinitions.OR.equals(funDef)) {
evaluator = Or.getInstance();
Evaluator evaluator1 = bindCall((CallExpression) childExprs.get(0), indexRow, queryFields);
Evaluator evaluator2 = bindCall((CallExpression) childExprs.get(1), indexRow, queryFields);
return ((Or) evaluator).bindEvaluator(evaluator1, evaluator2);
}
// handle IN specifically
if (BuiltInFunctionDefinitions.IN.equals(funDef)) {
ValidationUtils.checkState(normalized, "The IN expression expects to be normalized");
evaluator = In.getInstance();
FieldReferenceExpression rExpr = (FieldReferenceExpression) childExprs.get(0);
evaluator.bindFieldReference(rExpr);
((In) evaluator).bindVals(getInLiteralVals(childExprs));
return evaluator.bindColStats(indexRow, queryFields, rExpr);
}
// handle unary operators
if (BuiltInFunctionDefinitions.IS_NULL.equals(funDef)) {
FieldReferenceExpression rExpr = (FieldReferenceExpression) childExprs.get(0);
return IsNull.getInstance()
.bindFieldReference(rExpr)
.bindColStats(indexRow, queryFields, rExpr);
} else if (BuiltInFunctionDefinitions.IS_NOT_NULL.equals(funDef)) {
FieldReferenceExpression rExpr = (FieldReferenceExpression) childExprs.get(0);
return IsNotNull.getInstance()
.bindFieldReference(rExpr)
.bindColStats(indexRow, queryFields, rExpr);
}
// handle binary operators
if (BuiltInFunctionDefinitions.EQUALS.equals(funDef)) {
evaluator = EqualTo.getInstance();
} else if (BuiltInFunctionDefinitions.NOT_EQUALS.equals(funDef)) {
evaluator = NotEqualTo.getInstance();
} else if (BuiltInFunctionDefinitions.LESS_THAN.equals(funDef)) {
evaluator = normalized ? LessThan.getInstance() : GreaterThan.getInstance();
} else if (BuiltInFunctionDefinitions.GREATER_THAN.equals(funDef)) {
evaluator = normalized ? GreaterThan.getInstance() : LessThan.getInstance();
} else if (BuiltInFunctionDefinitions.LESS_THAN_OR_EQUAL.equals(funDef)) {
evaluator = normalized ? LessThanOrEqual.getInstance() : GreaterThanOrEqual.getInstance();
} else if (BuiltInFunctionDefinitions.GREATER_THAN_OR_EQUAL.equals(funDef)) {
evaluator = normalized ? GreaterThanOrEqual.getInstance() : LessThanOrEqual.getInstance();
} else {
throw new AssertionError("Unexpected function definition " + funDef);
}
FieldReferenceExpression rExpr = normalized
? (FieldReferenceExpression) childExprs.get(0)
: (FieldReferenceExpression) childExprs.get(1);
ValueLiteralExpression vExpr = normalized
? (ValueLiteralExpression) childExprs.get(1)
: (ValueLiteralExpression) childExprs.get(0);
evaluator
.bindFieldReference(rExpr)
.bindVal(vExpr)
.bindColStats(indexRow, queryFields, rExpr);
return evaluator;
}
public Evaluator bindColStats(
RowData indexRow,
RowType.RowField[] queryFields,
FieldReferenceExpression expr) {
int colPos = -1;
for (int i = 0; i < queryFields.length; i++) {
if (expr.getName().equals(queryFields[i].getName())) {
colPos = i;
}
}
ValidationUtils.checkState(colPos != -1, "Can not find column " + expr.getName());
int startPos = 2 + colPos * 3;
LogicalType colType = queryFields[colPos].getType();
Object minVal = indexRow.isNullAt(startPos) ? null : getValAsJavaObj(indexRow, startPos, colType);
Object maxVal = indexRow.isNullAt(startPos + 1) ? null : getValAsJavaObj(indexRow, startPos + 1, colType);
long nullCnt = indexRow.getLong(startPos + 2);
this.minVal = minVal;
this.maxVal = maxVal;
this.nullCnt = nullCnt;
return this;
}
public Evaluator bindVal(ValueLiteralExpression vExpr) {
this.val = ExpressionUtils.getValueFromLiteral(vExpr);
return this;
}
public Evaluator bindFieldReference(FieldReferenceExpression expr) {
this.type = expr.getOutputDataType().getLogicalType();
return this;
}
public abstract boolean eval();
}
/**
* To evaluate = expr.
*/
public static class EqualTo extends Evaluator {
public static EqualTo getInstance() {
return new EqualTo();
}
@Override
public boolean eval() {
if (this.minVal == null || this.maxVal == null || this.val == null) {
return false;
}
if (compare(this.minVal, this.val, this.type) > 0) {
return false;
}
return compare(this.maxVal, this.val, this.type) >= 0;
}
}
/**
* To evaluate <> expr.
*/
public static class NotEqualTo extends Evaluator {
public static NotEqualTo getInstance() {
return new NotEqualTo();
}
@Override
public boolean eval() {
// because the bounds are not necessarily a min or max value, this cannot be answered using
// them. notEq(col, X) with (X, Y) doesn't guarantee that X is a value in col.
return true;
}
}
/**
* To evaluate IS NULL expr.
*/
public static class IsNull extends Evaluator {
public static IsNull getInstance() {
return new IsNull();
}
@Override
public boolean eval() {
return this.nullCnt > 0;
}
}
/**
* To evaluate IS NOT NULL expr.
*/
public static class IsNotNull extends Evaluator {
public static IsNotNull getInstance() {
return new IsNotNull();
}
@Override
public boolean eval() {
// should consider FLOAT/DOUBLE & NAN
return this.minVal != null || this.nullCnt <= 0;
}
}
/**
* To evaluate < expr.
*/
public static class LessThan extends Evaluator {
public static LessThan getInstance() {
return new LessThan();
}
@Override
public boolean eval() {
if (this.minVal == null) {
return false;
}
return compare(this.minVal, this.val, this.type) < 0;
}
}
/**
* To evaluate > expr.
*/
public static class GreaterThan extends Evaluator {
public static GreaterThan getInstance() {
return new GreaterThan();
}
@Override
public boolean eval() {
if (this.maxVal == null) {
return false;
}
return compare(this.maxVal, this.val, this.type) > 0;
}
}
/**
* To evaluate <= expr.
*/
public static class LessThanOrEqual extends Evaluator {
public static LessThanOrEqual getInstance() {
return new LessThanOrEqual();
}
@Override
public boolean eval() {
if (this.minVal == null) {
return false;
}
return compare(this.minVal, this.val, this.type) <= 0;
}
}
/**
* To evaluate >= expr.
*/
public static class GreaterThanOrEqual extends Evaluator {
public static GreaterThanOrEqual getInstance() {
return new GreaterThanOrEqual();
}
@Override
public boolean eval() {
if (this.maxVal == null) {
return false;
}
return compare(this.maxVal, this.val, this.type) >= 0;
}
}
/**
* To evaluate IN expr.
*/
public static class In extends Evaluator {
public static In getInstance() {
return new In();
}
private Object[] vals;
@Override
public boolean eval() {
if (this.minVal == null) {
return false; // values are all null and literalSet cannot contain null.
}
if (vals.length > IN_PREDICATE_LIMIT) {
// skip evaluating the predicate if the number of values is too big
return true;
}
vals = Arrays.stream(vals).filter(v -> compare(this.minVal, v, this.type) <= 0).toArray();
if (vals.length == 0) { // if all values are less than lower bound, rows cannot match.
return false;
}
vals = Arrays.stream(vals).filter(v -> compare(this.maxVal, v, this.type) >= 0).toArray();
if (vals.length == 0) { // if all remaining values are greater than upper bound, rows cannot match.
return false;
}
return true;
}
public void bindVals(Object... vals) {
this.vals = vals;
}
}
// component predicate
/**
* To evaluate NOT expr.
*/
public static class Not extends Evaluator {
public static Not getInstance() {
return new Not();
}
private Evaluator evaluator;
@Override
public boolean eval() {
return !this.evaluator.eval();
}
public Evaluator bindEvaluator(Evaluator evaluator) {
this.evaluator = evaluator;
return this;
}
}
/**
* To evaluate AND expr.
*/
public static class And extends Evaluator {
public static And getInstance() {
return new And();
}
private Evaluator[] evaluators;
@Override
public boolean eval() {
for (Evaluator evaluator : evaluators) {
if (!evaluator.eval()) {
return false;
}
}
return true;
}
public Evaluator bindEvaluator(Evaluator... evaluators) {
this.evaluators = evaluators;
return this;
}
}
/**
* To evaluate OR expr.
*/
public static class Or extends Evaluator {
public static Or getInstance() {
return new Or();
}
private Evaluator[] evaluators;
@Override
public boolean eval() {
for (Evaluator evaluator : evaluators) {
if (evaluator.eval()) {
return true;
}
}
return false;
}
public Evaluator bindEvaluator(Evaluator... evaluators) {
this.evaluators = evaluators;
return this;
}
}
// -------------------------------------------------------------------------
// Utilities
// -------------------------------------------------------------------------
private static int compare(@NotNull Object val1, @NotNull Object val2, LogicalType logicalType) {
switch (logicalType.getTypeRoot()) {
case TIMESTAMP_WITHOUT_TIME_ZONE:
case TIME_WITHOUT_TIME_ZONE:
case DATE:
return ((Long) val1).compareTo((Long) val2);
case BOOLEAN:
return ((Boolean) val1).compareTo((Boolean) val2);
case TINYINT:
case SMALLINT:
case INTEGER:
return ((Integer) val1).compareTo((Integer) val2);
case FLOAT:
return ((Float) val1).compareTo((Float) val2);
case DOUBLE:
return ((Double) val1).compareTo((Double) val2);
case BINARY:
case VARBINARY:
return compareBytes((byte[]) val1, (byte[]) val2);
case CHAR:
case VARCHAR:
return ((String) val1).compareTo((String) val2);
case DECIMAL:
return ((BigDecimal) val1).compareTo((BigDecimal) val2);
default:
throw new UnsupportedOperationException("Unsupported type: " + logicalType);
}
}
private static int compareBytes(byte[] v1, byte[] v2) {
int len1 = v1.length;
int len2 = v2.length;
int lim = Math.min(len1, len2);
int k = 0;
while (k < lim) {
byte c1 = v1[k];
byte c2 = v2[k];
if (c1 != c2) {
return c1 - c2;
}
k++;
}
return len1 - len2;
}
/**
* Returns the IN expression literal values.
*/
private static Object[] getInLiteralVals(List<Expression> childExprs) {
List<Object> vals = new ArrayList<>();
for (int i = 1; i < childExprs.size(); i++) {
vals.add(ExpressionUtils.getValueFromLiteral((ValueLiteralExpression) childExprs.get(i)));
}
return vals.toArray();
}
/**
* Returns the value as Java object at position {@code pos} of row {@code indexRow}.
*/
private static Object getValAsJavaObj(RowData indexRow, int pos, LogicalType colType) {
switch (colType.getTypeRoot()) {
// NOTE: Since we can't rely on Avro's "date", and "timestamp-micros" logical-types, we're
// manually encoding corresponding values as int and long w/in the Column Stats Index and
// here we have to decode those back into corresponding logical representation.
case TIMESTAMP_WITHOUT_TIME_ZONE:
case TIME_WITHOUT_TIME_ZONE:
case DATE:
return indexRow.getLong(pos);
// NOTE: All integral types of size less than Int are encoded as Ints in MT
case BOOLEAN:
return indexRow.getBoolean(pos);
case TINYINT:
case SMALLINT:
case INTEGER:
return indexRow.getInt(pos);
case FLOAT:
return indexRow.getFloat(pos);
case DOUBLE:
return indexRow.getDouble(pos);
case BINARY:
case VARBINARY:
return indexRow.getBinary(pos);
case CHAR:
case VARCHAR:
return indexRow.getString(pos).toString();
case DECIMAL:
DecimalType decimalType = (DecimalType) colType;
return indexRow.getDecimal(pos, decimalType.getPrecision(), decimalType.getScale()).toBigDecimal();
default:
throw new UnsupportedOperationException("Unsupported type: " + colType);
}
}
}

View File

@@ -67,7 +67,7 @@ public class HoodieTableSink implements DynamicTableSink, SupportsPartitioning,
.getCheckpointConfig().getCheckpointTimeout();
conf.setLong(FlinkOptions.WRITE_COMMIT_ACK_TIMEOUT, ckpTimeout);
RowType rowType = (RowType) schema.toSourceRowDataType().notNull().getLogicalType();
RowType rowType = (RowType) schema.toSinkRowDataType().notNull().getLogicalType();
// bulk_insert mode
final String writeOperation = this.conf.get(FlinkOptions.OPERATION);

View File

@@ -20,6 +20,7 @@ package org.apache.hudi.table;
import org.apache.hudi.avro.HoodieAvroUtils;
import org.apache.hudi.common.model.BaseFile;
import org.apache.hudi.common.model.HoodieBaseFile;
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.HoodieLogFile;
import org.apache.hudi.common.model.HoodieTableType;
@@ -34,7 +35,6 @@ import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.configuration.HadoopConfigurations;
import org.apache.hudi.configuration.OptionsResolver;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.hadoop.HoodieROTablePathFilter;
import org.apache.hudi.source.FileIndex;
import org.apache.hudi.source.IncrementalInputSplits;
import org.apache.hudi.source.StreamReadMonitoringFunction;
@@ -46,13 +46,12 @@ import org.apache.hudi.table.format.mor.MergeOnReadInputSplit;
import org.apache.hudi.table.format.mor.MergeOnReadTableState;
import org.apache.hudi.util.AvroSchemaConverter;
import org.apache.hudi.util.ChangelogModes;
import org.apache.hudi.util.ExpressionUtils;
import org.apache.hudi.util.InputFormats;
import org.apache.hudi.util.StreamerUtil;
import org.apache.avro.Schema;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.io.FileInputFormat;
import org.apache.flink.api.common.io.FilePathFilter;
import org.apache.flink.api.common.io.InputFormat;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.configuration.Configuration;
@@ -73,7 +72,6 @@ import org.apache.flink.table.connector.source.abilities.SupportsLimitPushDown;
import org.apache.flink.table.connector.source.abilities.SupportsPartitionPushDown;
import org.apache.flink.table.connector.source.abilities.SupportsProjectionPushDown;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.expressions.Expression;
import org.apache.flink.table.expressions.ResolvedExpression;
import org.apache.flink.table.runtime.types.TypeInfoDataTypeConverter;
import org.apache.flink.table.types.DataType;
@@ -117,6 +115,7 @@ public class HoodieTableSource implements
private final long maxCompactionMemoryInBytes;
private final ResolvedSchema schema;
private final RowType tableRowType;
private final Path path;
private final List<String> partitionKeys;
private final String defaultPartName;
@@ -125,7 +124,7 @@ public class HoodieTableSource implements
private int[] requiredPos;
private long limit;
private List<Expression> filters;
private List<ResolvedExpression> filters;
private List<Map<String, String>> requiredPartitions;
@@ -147,21 +146,22 @@ public class HoodieTableSource implements
@Nullable List<Map<String, String>> requiredPartitions,
@Nullable int[] requiredPos,
@Nullable Long limit,
@Nullable List<Expression> filters) {
@Nullable List<ResolvedExpression> filters) {
this.schema = schema;
this.tableRowType = (RowType) schema.toPhysicalRowDataType().notNull().getLogicalType();
this.path = path;
this.partitionKeys = partitionKeys;
this.defaultPartName = defaultPartName;
this.conf = conf;
this.fileIndex = FileIndex.instance(this.path, this.conf);
this.requiredPartitions = requiredPartitions;
this.requiredPos = requiredPos == null
? IntStream.range(0, schema.toPhysicalRowDataType().getChildren().size()).toArray()
? IntStream.range(0, this.tableRowType.getFieldCount()).toArray()
: requiredPos;
this.limit = limit == null ? NO_LIMIT_CONSTANT : limit;
this.filters = filters == null ? Collections.emptyList() : filters;
this.hadoopConf = HadoopConfigurations.getHadoopConf(conf);
this.metaClient = StreamerUtil.metaClientForReader(conf, hadoopConf);
this.fileIndex = FileIndex.instance(this.path, this.conf, this.tableRowType);
this.maxCompactionMemoryInBytes = StreamerUtil.getMaxCompactionMemoryInBytes(conf);
}
@@ -181,12 +181,12 @@ public class HoodieTableSource implements
(TypeInformation<RowData>) TypeInfoDataTypeConverter.fromDataTypeToTypeInfo(getProducedDataType());
if (conf.getBoolean(FlinkOptions.READ_AS_STREAMING)) {
StreamReadMonitoringFunction monitoringFunction = new StreamReadMonitoringFunction(
conf, FilePathUtils.toFlinkPath(path), maxCompactionMemoryInBytes, getRequiredPartitionPaths());
conf, FilePathUtils.toFlinkPath(path), tableRowType, maxCompactionMemoryInBytes, getRequiredPartitionPaths());
InputFormat<RowData, ?> inputFormat = getInputFormat(true);
OneInputStreamOperatorFactory<MergeOnReadInputSplit, RowData> factory = StreamReadOperator.factory((MergeOnReadInputFormat) inputFormat);
SingleOutputStreamOperator<RowData> source = execEnv.addSource(monitoringFunction, getSourceOperatorName("split_monitor"))
.setParallelism(1)
.keyBy(inputSplit -> inputSplit.getFileId())
.keyBy(MergeOnReadInputSplit::getFileId)
.transform("split_reader", typeInfo, factory)
.setParallelism(conf.getInteger(FlinkOptions.READ_TASKS));
return new DataStreamSource<>(source);
@@ -219,7 +219,8 @@ public class HoodieTableSource implements
@Override
public Result applyFilters(List<ResolvedExpression> filters) {
this.filters = new ArrayList<>(filters);
this.filters = filters.stream().filter(ExpressionUtils::isSimpleCallExpression).collect(Collectors.toList());
this.fileIndex.setFilters(this.filters);
// refuse all the filters now
return SupportsFilterPushDown.Result.of(Collections.emptyList(), new ArrayList<>(filters));
}
@@ -262,13 +263,6 @@ public class HoodieTableSource implements
.bridgedTo(RowData.class);
}
private List<Map<String, String>> getOrFetchPartitions() {
if (requiredPartitions == null) {
requiredPartitions = listPartitions().orElse(Collections.emptyList());
}
return requiredPartitions;
}
private String getSourceOperatorName(String operatorName) {
String[] schemaFieldNames = this.schema.getColumnNames().toArray(new String[0]);
List<String> fields = Arrays.stream(this.requiredPos)
@@ -366,7 +360,9 @@ public class HoodieTableSource implements
return baseFileOnlyInputFormat();
case FlinkOptions.QUERY_TYPE_INCREMENTAL:
IncrementalInputSplits incrementalInputSplits = IncrementalInputSplits.builder()
.conf(conf).path(FilePathUtils.toFlinkPath(path))
.conf(conf)
.path(FilePathUtils.toFlinkPath(path))
.rowType(this.tableRowType)
.maxCompactionMemoryInBytes(maxCompactionMemoryInBytes)
.requiredPartitions(getRequiredPartitionPaths()).build();
final IncrementalInputSplits.Result result = incrementalInputSplits.inputSplits(metaClient, hadoopConf);
@@ -439,11 +435,18 @@ public class HoodieTableSource implements
}
private InputFormat<RowData, ?> baseFileOnlyInputFormat() {
final Path[] paths = getReadPaths();
if (paths.length == 0) {
final FileStatus[] fileStatuses = getReadFiles();
if (fileStatuses.length == 0) {
return InputFormats.EMPTY_INPUT_FORMAT;
}
FileInputFormat<RowData> format = new CopyOnWriteInputFormat(
HoodieTableFileSystemView fsView = new HoodieTableFileSystemView(metaClient,
metaClient.getCommitsAndCompactionTimeline().filterCompletedInstants(), fileStatuses);
Path[] paths = fsView.getLatestBaseFiles()
.map(HoodieBaseFile::getFileStatus)
.map(FileStatus::getPath).toArray(Path[]::new);
return new CopyOnWriteInputFormat(
FilePathUtils.toFlinkPaths(paths),
this.schema.getColumnNames().toArray(new String[0]),
this.schema.getColumnDataTypes().toArray(new DataType[0]),
@@ -453,12 +456,10 @@ public class HoodieTableSource implements
getParquetConf(this.conf, this.hadoopConf),
this.conf.getBoolean(FlinkOptions.UTC_TIMEZONE)
);
format.setFilesFilter(new LatestFileFilter(this.hadoopConf));
return format;
}
private Schema inferSchemaFromDdl() {
Schema schema = AvroSchemaConverter.convertToSchema(this.schema.toPhysicalRowDataType().getLogicalType());
Schema schema = AvroSchemaConverter.convertToSchema(this.tableRowType);
return HoodieAvroUtils.addMetadataFields(schema, conf.getBoolean(FlinkOptions.CHANGELOG_ENABLED));
}
@@ -498,23 +499,13 @@ public class HoodieTableSource implements
* Get the reader paths with partition path expanded.
*/
@VisibleForTesting
public Path[] getReadPaths() {
return partitionKeys.isEmpty()
? new Path[] {path}
: FilePathUtils.partitionPath2ReadPath(path, partitionKeys, getOrFetchPartitions(),
conf.getBoolean(FlinkOptions.HIVE_STYLE_PARTITIONING));
}
private static class LatestFileFilter extends FilePathFilter {
private final HoodieROTablePathFilter hoodieFilter;
public LatestFileFilter(org.apache.hadoop.conf.Configuration hadoopConf) {
this.hoodieFilter = new HoodieROTablePathFilter(hadoopConf);
}
@Override
public boolean filterPath(org.apache.flink.core.fs.Path filePath) {
return !this.hoodieFilter.accept(new Path(filePath.toUri()));
public FileStatus[] getReadFiles() {
Set<String> requiredPartitionPaths = getRequiredPartitionPaths();
fileIndex.setPartitionPaths(requiredPartitionPaths);
List<String> relPartitionPaths = fileIndex.getOrBuildPartitionPaths();
if (relPartitionPaths.size() == 0) {
return new FileStatus[0];
}
return fileIndex.getFilesInPartitions();
}
}

View File

@@ -323,6 +323,10 @@ public class FilePathUtils {
public static LinkedHashMap<String, String> validateAndReorderPartitions(
Map<String, String> partitionKVs,
List<String> partitionKeys) {
if (partitionKeys.size() == 0) {
// in case the partition fields are not in schema
return new LinkedHashMap<>(partitionKVs);
}
LinkedHashMap<String, String> map = new LinkedHashMap<>();
for (String k : partitionKeys) {
if (!partitionKVs.containsKey(k)) {

View File

@@ -39,6 +39,7 @@ import org.apache.flink.table.types.logical.TypeInformationRawType;
import org.apache.flink.table.types.logical.utils.LogicalTypeChecks;
import java.util.List;
import java.util.stream.Collectors;
/**
* Converts an Avro schema into Flink's type information. It uses {@link org.apache.flink.api.java.typeutils.RowTypeInfo} for
@@ -96,9 +97,24 @@ public class AvroSchemaConverter {
actualSchema = schema.getTypes().get(0);
nullable = false;
} else {
List<Schema> nonNullTypes = schema.getTypes().stream()
.filter(s -> s.getType() != Schema.Type.NULL)
.collect(Collectors.toList());
nullable = schema.getTypes().size() > nonNullTypes.size();
// use Kryo for serialization
return new AtomicDataType(
new TypeInformationRawType<>(false, Types.GENERIC(Object.class)));
DataType rawDataType = new AtomicDataType(
new TypeInformationRawType<>(false, Types.GENERIC(Object.class)))
.notNull();
if (recordTypesOfSameNumFields(nonNullTypes)) {
DataType converted = DataTypes.ROW(
DataTypes.FIELD("wrapper", rawDataType))
.notNull();
return nullable ? converted.nullable() : converted;
}
// use Kryo for serialization
return nullable ? rawDataType.nullable() : rawDataType;
}
DataType converted = convertToDataType(actualSchema);
return nullable ? converted.nullable() : converted;
@@ -155,6 +171,20 @@ public class AvroSchemaConverter {
}
}
/**
* Returns true if all the types are RECORD type with same number of fields.
*/
private static boolean recordTypesOfSameNumFields(List<Schema> types) {
if (types == null || types.size() == 0) {
return false;
}
if (types.stream().anyMatch(s -> s.getType() != Schema.Type.RECORD)) {
return false;
}
int numFields = types.get(0).getFields().size();
return types.stream().allMatch(s -> s.getFields().size() == numFields);
}
/**
* Converts Flink SQL {@link LogicalType} (can be nested) into an Avro schema.
*

View File

@@ -106,7 +106,7 @@ public class AvroToRowDataConverters {
/**
* Creates a runtime converter which assuming input object is not null.
*/
private static AvroToRowDataConverter createConverter(LogicalType type) {
public static AvroToRowDataConverter createConverter(LogicalType type) {
switch (type.getTypeRoot()) {
case NULL:
return avroObject -> null;
@@ -121,6 +121,7 @@ public class AvroToRowDataConverters {
case INTERVAL_DAY_TIME: // long
case FLOAT: // float
case DOUBLE: // double
case RAW:
return avroObject -> avroObject;
case DATE:
return AvroToRowDataConverters::convertToDate;
@@ -143,7 +144,6 @@ public class AvroToRowDataConverters {
case MAP:
case MULTISET:
return createMapConverter(type);
case RAW:
default:
throw new UnsupportedOperationException("Unsupported type: " + type);
}

View File

@@ -23,8 +23,11 @@ import org.apache.hudi.common.util.ValidationUtils;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.LogicalTypeRoot;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.table.types.logical.TimestampType;
import java.util.Arrays;
/**
* Utilities for {@link org.apache.flink.table.types.DataType}.
*/
@@ -58,4 +61,12 @@ public class DataTypeUtils {
public static boolean isDatetimeType(DataType type) {
return isTimestampType(type) || isDateType(type);
}
/**
* Projects the row fields with given names.
*/
public static RowType.RowField[] projectRowFields(RowType rowType, String[] names) {
int [] fieldIndices = Arrays.stream(names).mapToInt(rowType::getFieldIndex).toArray();
return Arrays.stream(fieldIndices).mapToObj(i -> rowType.getFields().get(i)).toArray(RowType.RowField[]::new);
}
}

View File

@@ -0,0 +1,180 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.util;
import org.apache.flink.table.expressions.CallExpression;
import org.apache.flink.table.expressions.Expression;
import org.apache.flink.table.expressions.FieldReferenceExpression;
import org.apache.flink.table.expressions.ResolvedExpression;
import org.apache.flink.table.expressions.ValueLiteralExpression;
import org.apache.flink.table.functions.BuiltInFunctionDefinitions;
import org.apache.flink.table.functions.FunctionDefinition;
import org.apache.flink.table.types.logical.LogicalType;
import javax.annotation.Nullable;
import java.math.BigDecimal;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.LocalTime;
import java.time.ZoneOffset;
import java.time.temporal.ChronoField;
import java.util.Arrays;
import java.util.List;
/**
* Utilities for expression resolving.
*/
public class ExpressionUtils {
/**
* Collect the referenced columns with given expressions,
* only simple call expression is supported.
*/
public static String[] referencedColumns(List<ResolvedExpression> exprs) {
return exprs.stream()
.map(ExpressionUtils::getReferencedColumns)
.filter(columns -> columns.length > 0)
.flatMap(Arrays::stream)
.distinct() // deduplication
.toArray(String[]::new);
}
/**
* Returns whether the given expression is simple call expression:
* a binary call with one operand as field reference and another operand
* as literal.
*/
public static boolean isSimpleCallExpression(Expression expr) {
if (!(expr instanceof CallExpression)) {
return false;
}
CallExpression callExpression = (CallExpression) expr;
FunctionDefinition funcDef = callExpression.getFunctionDefinition();
// simple call list:
// NOT AND OR IN EQUALS NOT_EQUALS IS_NULL IS_NOT_NULL LESS_THAN GREATER_THAN
// LESS_THAN_OR_EQUAL GREATER_THAN_OR_EQUAL
if (funcDef == BuiltInFunctionDefinitions.NOT
|| funcDef == BuiltInFunctionDefinitions.AND
|| funcDef == BuiltInFunctionDefinitions.OR) {
return callExpression.getChildren().stream()
.allMatch(ExpressionUtils::isSimpleCallExpression);
}
if (!(funcDef == BuiltInFunctionDefinitions.IN
|| funcDef == BuiltInFunctionDefinitions.EQUALS
|| funcDef == BuiltInFunctionDefinitions.NOT_EQUALS
|| funcDef == BuiltInFunctionDefinitions.IS_NULL
|| funcDef == BuiltInFunctionDefinitions.IS_NOT_NULL
|| funcDef == BuiltInFunctionDefinitions.LESS_THAN
|| funcDef == BuiltInFunctionDefinitions.GREATER_THAN
|| funcDef == BuiltInFunctionDefinitions.LESS_THAN_OR_EQUAL
|| funcDef == BuiltInFunctionDefinitions.GREATER_THAN_OR_EQUAL)) {
return false;
}
// handle IN
if (funcDef == BuiltInFunctionDefinitions.IN) {
// In expression RHS operands are always literals
return true;
}
// handle unary operator
if (funcDef == BuiltInFunctionDefinitions.IS_NULL
|| funcDef == BuiltInFunctionDefinitions.IS_NOT_NULL) {
return callExpression.getChildren().stream()
.allMatch(e -> e instanceof FieldReferenceExpression);
}
// handle binary operator
return isFieldReferenceAndLiteral(callExpression.getChildren());
}
private static boolean isFieldReferenceAndLiteral(List<Expression> exprs) {
if (exprs.size() != 2) {
return false;
}
final Expression expr0 = exprs.get(0);
final Expression expr1 = exprs.get(1);
return expr0 instanceof FieldReferenceExpression && expr1 instanceof ValueLiteralExpression
|| expr0 instanceof ValueLiteralExpression && expr1 instanceof FieldReferenceExpression;
}
private static String[] getReferencedColumns(ResolvedExpression expression) {
CallExpression callExpr = (CallExpression) expression;
FunctionDefinition funcDef = callExpr.getFunctionDefinition();
if (funcDef == BuiltInFunctionDefinitions.NOT
|| funcDef == BuiltInFunctionDefinitions.AND
|| funcDef == BuiltInFunctionDefinitions.OR) {
return callExpr.getChildren().stream()
.map(e -> getReferencedColumns((ResolvedExpression) e))
.flatMap(Arrays::stream)
.toArray(String[]::new);
}
return expression.getChildren().stream()
.filter(expr -> expr instanceof FieldReferenceExpression)
.map(expr -> ((FieldReferenceExpression) expr).getName())
.toArray(String[]::new);
}
/**
* Returns the value with given value literal expression.
*
* <p>Returns null if the value can not parse as the output data type correctly,
* should call {@code ValueLiteralExpression.isNull} first to decide whether
* the literal is NULL.
*/
@Nullable
public static Object getValueFromLiteral(ValueLiteralExpression expr) {
LogicalType logicalType = expr.getOutputDataType().getLogicalType();
switch (logicalType.getTypeRoot()) {
case TIMESTAMP_WITHOUT_TIME_ZONE:
return expr.getValueAs(LocalDateTime.class)
.map(ldt -> ldt.toInstant(ZoneOffset.UTC).toEpochMilli())
.orElse(null);
case TIME_WITHOUT_TIME_ZONE:
return expr.getValueAs(LocalTime.class)
.map(lt -> lt.get(ChronoField.MILLI_OF_DAY))
.orElse(null);
case DATE:
return expr.getValueAs(LocalDate.class)
.map(LocalDate::toEpochDay)
.orElse(null);
// NOTE: All integral types of size less than Int are encoded as Ints in MT
case BOOLEAN:
return expr.getValueAs(Boolean.class).orElse(null);
case TINYINT:
case SMALLINT:
case INTEGER:
return expr.getValueAs(Integer.class).orElse(null);
case FLOAT:
return expr.getValueAs(Float.class).orElse(null);
case DOUBLE:
return expr.getValueAs(Double.class).orElse(null);
case BINARY:
case VARBINARY:
return expr.getValueAs(byte[].class).orElse(null);
case CHAR:
case VARCHAR:
return expr.getValueAs(String.class).orElse(null);
case DECIMAL:
return expr.getValueAs(BigDecimal.class).orElse(null);
default:
throw new UnsupportedOperationException("Unsupported type: " + logicalType);
}
}
}

View File

@@ -26,6 +26,8 @@ import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.RowType;
import java.io.Serializable;
import java.util.Arrays;
import java.util.List;
/**
* Utilities to project the row data with given positions.
@@ -51,6 +53,12 @@ public class RowDataProjection implements Serializable {
return new RowDataProjection(types, positions);
}
public static RowDataProjection instanceV2(RowType rowType, int[] positions) {
List<LogicalType> fieldTypes = rowType.getChildren();
final LogicalType[] types = Arrays.stream(positions).mapToObj(fieldTypes::get).toArray(LogicalType[]::new);
return new RowDataProjection(types, positions);
}
public static RowDataProjection instance(LogicalType[] types, int[] positions) {
return new RowDataProjection(types, positions);
}

View File

@@ -18,12 +18,12 @@
package org.apache.hudi.util;
import org.apache.flink.configuration.Configuration;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.table.view.FileSystemViewStorageConfig;
import org.apache.hudi.configuration.HadoopConfigurations;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.flink.configuration.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;