1
0

[HUDI-3514] Rebase Data Skipping flow to rely on MT Column Stats index (#4948)

This commit is contained in:
Alexey Kudinkin
2022-03-15 10:38:36 -07:00
committed by GitHub
parent 9bdda2a312
commit 5e8ff8d793
19 changed files with 359 additions and 224 deletions

View File

@@ -18,21 +18,15 @@
package org.apache.hudi
import org.apache.hadoop.fs.Path
import org.apache.hudi.DataSourceReadOptions._
import org.apache.hudi.common.model.{HoodieFileFormat, HoodieRecord}
import org.apache.hudi.DataSourceWriteOptions.{BOOTSTRAP_OPERATION_OPT_VAL, OPERATION}
import org.apache.hudi.common.fs.FSUtils
import org.apache.hudi.common.model.HoodieRecord
import org.apache.hudi.common.model.HoodieTableType.{COPY_ON_WRITE, MERGE_ON_READ}
import org.apache.hudi.common.table.{HoodieTableMetaClient, TableSchemaResolver}
import org.apache.hudi.exception.HoodieException
import org.apache.hudi.hadoop.HoodieROTablePathFilter
import org.apache.log4j.LogManager
import org.apache.spark.sql.execution.datasources.{DataSource, FileStatusCache, HadoopFsRelation}
import org.apache.spark.sql.execution.datasources.orc.OrcFileFormat
import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat
import org.apache.spark.sql.execution.datasources.DataSource
import org.apache.spark.sql.execution.streaming.{Sink, Source}
import org.apache.spark.sql.hudi.streaming.HoodieStreamSource
import org.apache.spark.sql.sources._

View File

@@ -0,0 +1,45 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi
import org.apache.spark.sql.DataFrame
import org.apache.spark.storage.StorageLevel
import org.apache.spark.storage.StorageLevel.MEMORY_AND_DISK
object HoodieDatasetUtils {
/**
* Executes provided function while keeping provided [[DataFrame]] instance persisted for the
* duration of the execution
*
* @param df target [[DataFrame]] to be persisted
* @param level desired [[StorageLevel]] of the persistence
* @param f target function to be executed while [[DataFrame]] is kept persisted
* @tparam T return value of the target function
* @return execution outcome of the [[f]] function
*/
def withPersistence[T](df: DataFrame, level: StorageLevel = MEMORY_AND_DISK)(f: => T): T = {
df.persist(level)
try {
f
} finally {
df.unpersist()
}
}
}

View File

@@ -18,30 +18,31 @@
package org.apache.hudi
import org.apache.hadoop.fs.{FileStatus, Path}
import org.apache.hudi.HoodieFileIndex.getConfigProperties
import org.apache.hudi.HoodieDatasetUtils.withPersistence
import org.apache.hudi.HoodieFileIndex.{collectReferencedColumns, getConfigProperties}
import org.apache.hudi.common.config.{HoodieMetadataConfig, TypedProperties}
import org.apache.hudi.common.table.HoodieTableMetaClient
import org.apache.hudi.common.util.StringUtils
import org.apache.hudi.index.columnstats.ColumnStatsIndexHelper.{getMaxColumnNameFor, getMinColumnNameFor, getNumNullsColumnNameFor}
import org.apache.hudi.keygen.constant.KeyGeneratorOptions
import org.apache.hudi.keygen.{TimestampBasedAvroKeyGenerator, TimestampBasedKeyGenerator}
import org.apache.hudi.metadata.{HoodieMetadataPayload, HoodieTableMetadata, MetadataPartitionType}
import org.apache.spark.internal.Logging
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.{And, Expression, Literal}
import org.apache.spark.sql.execution.datasources.{FileIndex, FileStatusCache, NoopCache, PartitionDirectory}
import org.apache.spark.sql.hudi.DataSkippingUtils.createColumnStatsIndexFilterExpr
import org.apache.spark.sql.functions.col
import org.apache.spark.sql.hudi.DataSkippingUtils.translateIntoColumnStatsIndexFilterExpr
import org.apache.spark.sql.hudi.HoodieSqlCommonUtils
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types.{StringType, StructType}
import org.apache.spark.sql.{AnalysisException, Column, SparkSession}
import org.apache.spark.unsafe.types.UTF8String
import scala.collection.JavaConverters._
import scala.util.{Failure, Success, Try}
import scala.util.control.NonFatal
import java.text.SimpleDateFormat
import scala.collection.JavaConverters._
import scala.util.control.NonFatal
import scala.util.{Failure, Success, Try}
/**
* A file index which support partition prune for hoodie snapshot and read-optimized query.
@@ -84,7 +85,7 @@ case class HoodieFileIndex(spark: SparkSession,
override def rootPaths: Seq[Path] = queryPaths.asScala
def enableDataSkipping(): Boolean = {
def isDataSkippingEnabled(): Boolean = {
options.getOrElse(DataSourceReadOptions.ENABLE_DATA_SKIPPING.key(),
spark.sessionState.conf.getConfString(DataSourceReadOptions.ENABLE_DATA_SKIPPING.key(), "false")).toBoolean
}
@@ -112,7 +113,6 @@ case class HoodieFileIndex(spark: SparkSession,
* @return list of PartitionDirectory containing partition to base files mapping
*/
override def listFiles(partitionFilters: Seq[Expression], dataFilters: Seq[Expression]): Seq[PartitionDirectory] = {
val convertedPartitionFilters =
HoodieFileIndex.convertFilterForTimestampKeyGenerator(metaClient, partitionFilters)
@@ -121,18 +121,14 @@ case class HoodieFileIndex(spark: SparkSession,
// - Col-Stats Index is present
// - List of predicates (filters) is present
val candidateFilesNamesOpt: Option[Set[String]] =
lookupCandidateFilesInColStatsIndex(dataFilters) match {
lookupCandidateFilesInMetadataTable(dataFilters) match {
case Success(opt) => opt
case Failure(e) =>
if (e.isInstanceOf[AnalysisException]) {
logDebug("Failed to relay provided data filters to Z-index lookup", e)
} else {
logError("Failed to lookup candidate files in Z-index", e)
}
logError("Failed to lookup candidate files in Z-index", e)
Option.empty
}
logDebug(s"Overlapping candidate files (from Z-index): ${candidateFilesNamesOpt.getOrElse(Set.empty)}")
logDebug(s"Overlapping candidate files from Column Stats Index: ${candidateFilesNamesOpt.getOrElse(Set.empty)}")
if (queryAsNonePartitionedTable) {
// Read as Non-Partitioned table
@@ -185,8 +181,8 @@ case class HoodieFileIndex(spark: SparkSession,
/**
* Computes pruned list of candidate base-files' names based on provided list of {@link dataFilters}
* conditions, by leveraging custom Column Statistics index (col-stats-index) bearing "min", "max",
* "num_nulls" statistics for all clustered columns.
* conditions, by leveraging Metadata Table's Column Statistics index (hereon referred as ColStats for brevity)
* bearing "min", "max", "num_nulls" statistics for all columns.
*
* NOTE: This method has to return complete set of candidate files, since only provided candidates will
* ultimately be scanned as part of query execution. Hence, this method has to maintain the
@@ -195,77 +191,102 @@ case class HoodieFileIndex(spark: SparkSession,
* @param queryFilters list of original data filters passed down from querying engine
* @return list of pruned (data-skipped) candidate base-files' names
*/
private def lookupCandidateFilesInColStatsIndex(queryFilters: Seq[Expression]): Try[Option[Set[String]]] = Try {
val indexPath = metaClient.getColumnStatsIndexPath
private def lookupCandidateFilesInMetadataTable(queryFilters: Seq[Expression]): Try[Option[Set[String]]] = Try {
val fs = metaClient.getFs
val metadataTablePath = HoodieTableMetadata.getMetadataTableBasePath(basePath)
if (!enableDataSkipping() || !fs.exists(new Path(indexPath)) || queryFilters.isEmpty) {
// scalastyle:off return
return Success(Option.empty)
// scalastyle:on return
if (!isDataSkippingEnabled() || !fs.exists(new Path(metadataTablePath)) || queryFilters.isEmpty) {
Option.empty
} else {
val targetColStatsIndexColumns = Seq(
HoodieMetadataPayload.COLUMN_STATS_FIELD_FILE_NAME,
HoodieMetadataPayload.COLUMN_STATS_FIELD_MIN_VALUE,
HoodieMetadataPayload.COLUMN_STATS_FIELD_MAX_VALUE,
HoodieMetadataPayload.COLUMN_STATS_FIELD_NULL_COUNT)
val requiredMetadataIndexColumns =
(targetColStatsIndexColumns :+ HoodieMetadataPayload.COLUMN_STATS_FIELD_COLUMN_NAME).map(colName =>
s"${HoodieMetadataPayload.SCHEMA_FIELD_ID_COLUMN_STATS}.${colName}")
// Read Metadata Table's Column Stats Index into Spark's [[DataFrame]]
val metadataTableDF = spark.read.format("org.apache.hudi")
.load(s"$metadataTablePath/${MetadataPartitionType.COLUMN_STATS.getPartitionPath}")
// TODO filter on (column, partition) prefix
val colStatsDF = metadataTableDF.where(col(HoodieMetadataPayload.SCHEMA_FIELD_ID_COLUMN_STATS).isNotNull)
.select(requiredMetadataIndexColumns.map(col): _*)
val queryReferencedColumns = collectReferencedColumns(spark, queryFilters, schema)
// Persist DF to avoid re-computing column statistics unraveling
withPersistence(colStatsDF) {
// Metadata Table bears rows in the following format
//
// +---------------------------+------------+------------+------------+-------------+
// | fileName | columnName | minValue | maxValue | num_nulls |
// +---------------------------+------------+------------+------------+-------------+
// | one_base_file.parquet | A | 1 | 10 | 0 |
// | another_base_file.parquet | A | -10 | 0 | 5 |
// +---------------------------+------------+------------+------------+-------------+
//
// While Data Skipping utils are expecting following (transposed) format, where per-column stats are
// essentially transposed (from rows to columns):
//
// +---------------------------+------------+------------+-------------+
// | file | A_minValue | A_maxValue | A_num_nulls |
// +---------------------------+------------+------------+-------------+
// | one_base_file.parquet | 1 | 10 | 0 |
// | another_base_file.parquet | -10 | 0 | 5 |
// +---------------------------+------------+------------+-------------+
//
// NOTE: Column Stats Index might potentially contain statistics for many columns (if not all), while
// query at hand might only be referencing a handful of those. As such, we collect all the
// column references from the filtering expressions, and only transpose records corresponding to the
// columns referenced in those
val transposedColStatsDF =
queryReferencedColumns.map(colName =>
colStatsDF.filter(col(HoodieMetadataPayload.COLUMN_STATS_FIELD_COLUMN_NAME).equalTo(colName))
.select(targetColStatsIndexColumns.map(col): _*)
.withColumnRenamed(HoodieMetadataPayload.COLUMN_STATS_FIELD_NULL_COUNT, getNumNullsColumnNameFor(colName))
.withColumnRenamed(HoodieMetadataPayload.COLUMN_STATS_FIELD_MIN_VALUE, getMinColumnNameFor(colName))
.withColumnRenamed(HoodieMetadataPayload.COLUMN_STATS_FIELD_MAX_VALUE, getMaxColumnNameFor(colName))
)
.reduceLeft((left, right) =>
left.join(right, usingColumn = HoodieMetadataPayload.COLUMN_STATS_FIELD_FILE_NAME))
// Persist DF to avoid re-computing column statistics unraveling
withPersistence(transposedColStatsDF) {
val indexSchema = transposedColStatsDF.schema
val indexFilter =
queryFilters.map(translateIntoColumnStatsIndexFilterExpr(_, indexSchema))
.reduce(And)
val allIndexedFileNames =
transposedColStatsDF.select(HoodieMetadataPayload.COLUMN_STATS_FIELD_FILE_NAME)
.collect()
.map(_.getString(0))
.toSet
val prunedCandidateFileNames =
transposedColStatsDF.where(new Column(indexFilter))
.select(HoodieMetadataPayload.COLUMN_STATS_FIELD_FILE_NAME)
.collect()
.map(_.getString(0))
.toSet
// NOTE: Col-Stats Index isn't guaranteed to have complete set of statistics for every
// base-file: since it's bound to clustering, which could occur asynchronously
// at arbitrary point in time, and is not likely to be touching all of the base files.
//
// To close that gap, we manually compute the difference b/w all indexed (by col-stats-index)
// files and all outstanding base-files, and make sure that all base files not
// represented w/in the index are included in the output of this method
val notIndexedFileNames = lookupFileNamesMissingFromIndex(allIndexedFileNames)
Some(prunedCandidateFileNames ++ notIndexedFileNames)
}
}
}
val completedCommits = getActiveTimeline.filterCompletedInstants().getInstants.iterator.asScala.toList.map(_.getTimestamp)
// Collect all index tables present in `.zindex` folder
val candidateIndexTables =
fs.listStatus(new Path(indexPath))
.filter(_.isDirectory)
.map(_.getPath.getName)
.filter(completedCommits.contains(_))
.sortBy(x => x)
if (candidateIndexTables.isEmpty) {
// scalastyle:off return
return Success(Option.empty)
// scalastyle:on return
}
val dataFrameOpt = try {
Some(spark.read.load(new Path(indexPath, candidateIndexTables.last).toString))
} catch {
case t: Throwable =>
logError("Failed to read col-stats index; skipping", t)
None
}
dataFrameOpt.map(df => {
val indexSchema = df.schema
val indexFilter =
queryFilters.map(createColumnStatsIndexFilterExpr(_, indexSchema))
.reduce(And)
logInfo(s"Index filter condition: $indexFilter")
df.persist()
val allIndexedFileNames =
df.select("file")
.collect()
.map(_.getString(0))
.toSet
val prunedCandidateFileNames =
df.where(new Column(indexFilter))
.select("file")
.collect()
.map(_.getString(0))
.toSet
df.unpersist()
// NOTE: Col-Stats Index isn't guaranteed to have complete set of statistics for every
// base-file: since it's bound to clustering, which could occur asynchronously
// at arbitrary point in time, and is not likely to be touching all of the base files.
//
// To close that gap, we manually compute the difference b/w all indexed (by col-stats-index)
// files and all outstanding base-files, and make sure that all base files not
// represented w/in the index are included in the output of this method
val notIndexedFileNames =
lookupFileNamesMissingFromIndex(allIndexedFileNames)
prunedCandidateFileNames ++ notIndexedFileNames
})
}
override def refresh(): Unit = super.refresh()
@@ -282,6 +303,12 @@ case class HoodieFileIndex(spark: SparkSession,
object HoodieFileIndex extends Logging {
private def collectReferencedColumns(spark: SparkSession, queryFilters: Seq[Expression], schema: StructType): Seq[String] = {
val resolver = spark.sessionState.analyzer.resolver
val refs = queryFilters.flatMap(_.references)
schema.fieldNames.filter { colName => refs.exists(r => resolver.apply(colName, r.name)) }
}
def getConfigProperties(spark: SparkSession, options: Map[String, String]) = {
val sqlConf: SQLConf = spark.sessionState.conf
val properties = new TypedProperties()
@@ -331,6 +358,9 @@ object HoodieFileIndex extends Logging {
}
private def getQueryPath(options: Map[String, String]) = {
new Path(options.getOrElse("path", "'path' option required"))
new Path(options.get("path") match {
case Some(p) => p
case None => throw new IllegalArgumentException("'path' option required")
})
}
}

View File

@@ -307,7 +307,7 @@ object SparkHoodieTableFileIndex {
}
private def deduceQueryType(configProperties: TypedProperties): HoodieTableQueryType = {
configProperties.asScala(QUERY_TYPE.key()) match {
configProperties.asScala(QUERY_TYPE.key) match {
case QUERY_TYPE_SNAPSHOT_OPT_VAL => HoodieTableQueryType.SNAPSHOT
case QUERY_TYPE_INCREMENTAL_OPT_VAL => HoodieTableQueryType.INCREMENTAL
case QUERY_TYPE_READ_OPTIMIZED_OPT_VAL => HoodieTableQueryType.READ_OPTIMIZED

View File

@@ -17,39 +17,40 @@
package org.apache.spark.sql.hudi
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileStatus, Path}
import org.apache.hudi.index.columnstats.ColumnStatsIndexHelper.{getMaxColumnNameFor, getMinColumnNameFor, getNumNullsColumnNameFor}
import org.apache.spark.internal.Logging
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute
import org.apache.spark.sql.catalyst.expressions.Literal.TrueLiteral
import org.apache.spark.sql.catalyst.expressions.{Alias, And, Attribute, AttributeReference, EqualNullSafe, EqualTo, Expression, ExtractValue, GetStructField, GreaterThan, GreaterThanOrEqual, In, IsNotNull, IsNull, LessThan, LessThanOrEqual, Literal, Not, Or, StartsWith}
import org.apache.spark.sql.execution.datasources.PartitionedFile
import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat
import org.apache.spark.sql.functions.col
import org.apache.spark.sql.sources.Filter
import org.apache.spark.sql.types.{StringType, StructType}
import org.apache.spark.sql.vectorized.ColumnarBatch
import org.apache.spark.sql.{AnalysisException, SparkSession}
import org.apache.spark.sql.types.StructType
import org.apache.spark.unsafe.types.UTF8String
import scala.collection.JavaConverters._
object DataSkippingUtils extends Logging {
/**
* Translates provided {@link filterExpr} into corresponding filter-expression for column-stats index index table
* to filter out candidate files that would hold records matching the original filter
*
* @param sourceFilterExpr source table's query's filter expression
* @param dataTableFilterExpr source table's query's filter expression
* @param indexSchema index table schema
* @return filter for column-stats index's table
*/
def createColumnStatsIndexFilterExpr(sourceFilterExpr: Expression, indexSchema: StructType): Expression = {
def translateIntoColumnStatsIndexFilterExpr(dataTableFilterExpr: Expression, indexSchema: StructType): Expression = {
try {
createColumnStatsIndexFilterExprInternal(dataTableFilterExpr, indexSchema)
} catch {
case e: AnalysisException =>
logDebug(s"Failed to translated provided data table filter expr into column stats one ($dataTableFilterExpr)", e)
throw e
}
}
private def createColumnStatsIndexFilterExprInternal(dataTableFilterExpr: Expression, indexSchema: StructType): Expression = {
// Try to transform original Source Table's filter expression into
// Column-Stats Index filter expression
tryComposeIndexFilterExpr(sourceFilterExpr, indexSchema) match {
tryComposeIndexFilterExpr(dataTableFilterExpr, indexSchema) match {
case Some(e) => e
// NOTE: In case we can't transform source filter expression, we fallback
// to {@code TrueLiteral}, to essentially avoid pruning any indexed files from scanning
@@ -201,14 +202,14 @@ object DataSkippingUtils extends Logging {
)
case or: Or =>
val resLeft = createColumnStatsIndexFilterExpr(or.left, indexSchema)
val resRight = createColumnStatsIndexFilterExpr(or.right, indexSchema)
val resLeft = createColumnStatsIndexFilterExprInternal(or.left, indexSchema)
val resRight = createColumnStatsIndexFilterExprInternal(or.right, indexSchema)
Option(Or(resLeft, resRight))
case and: And =>
val resLeft = createColumnStatsIndexFilterExpr(and.left, indexSchema)
val resRight = createColumnStatsIndexFilterExpr(and.right, indexSchema)
val resLeft = createColumnStatsIndexFilterExprInternal(and.left, indexSchema)
val resRight = createColumnStatsIndexFilterExprInternal(and.right, indexSchema)
Option(And(resLeft, resRight))
@@ -219,10 +220,10 @@ object DataSkippingUtils extends Logging {
//
case Not(And(left: Expression, right: Expression)) =>
Option(createColumnStatsIndexFilterExpr(Or(Not(left), Not(right)), indexSchema))
Option(createColumnStatsIndexFilterExprInternal(Or(Not(left), Not(right)), indexSchema))
case Not(Or(left: Expression, right: Expression)) =>
Option(createColumnStatsIndexFilterExpr(And(Not(left), Not(right)), indexSchema))
Option(createColumnStatsIndexFilterExprInternal(And(Not(left), Not(right)), indexSchema))
case _: Expression => None
}
@@ -259,34 +260,4 @@ object DataSkippingUtils extends Logging {
throw new AnalysisException(s"convert reference to name failed, Found unsupported expression ${other}")
}
}
def getIndexFiles(conf: Configuration, indexPath: String): Seq[FileStatus] = {
val basePath = new Path(indexPath)
basePath.getFileSystem(conf)
.listStatus(basePath).filter(f => f.getPath.getName.endsWith(".parquet"))
}
/**
* read parquet files concurrently by local.
* this method is mush faster than spark
*/
def readParquetFile(spark: SparkSession, indexFiles: Seq[FileStatus], filters: Seq[Filter] = Nil, schemaOpts: Option[StructType] = None): Set[String] = {
val hadoopConf = spark.sparkContext.hadoopConfiguration
val partitionedFiles = indexFiles.map(f => PartitionedFile(InternalRow.empty, f.getPath.toString, 0, f.getLen))
val requiredSchema = new StructType().add("file", StringType, true)
val schema = schemaOpts.getOrElse(requiredSchema)
val parquetReader = new ParquetFileFormat().buildReaderWithPartitionValues(spark
, schema , StructType(Nil), requiredSchema, filters, Map.empty, hadoopConf)
val results = new Array[Iterator[String]](partitionedFiles.size)
partitionedFiles.zipWithIndex.par.foreach { case (pf, index) =>
val fileIterator = parquetReader(pf).asInstanceOf[Iterator[Any]]
val rows = fileIterator.flatMap(_ match {
case r: InternalRow => Seq(r)
case b: ColumnarBatch => b.rowIterator().asScala
}).map(r => r.getString(0))
results(index) = rows
}
results.flatMap(f => f).toSet
}
}