1
0

[HUDI-4250][HUDI-4202] Optimize performance of Column Stats Index reading in Data Skipping (#5746)

We provide an alternative way of fetching Column Stats Index within the reading process to avoid the penalty of a more heavy-weight execution scheduled through a Spark engine.
This commit is contained in:
Alexey Kudinkin
2022-07-25 15:36:12 -07:00
committed by GitHub
parent 6e7ac45735
commit e7c8df7e8b
20 changed files with 730 additions and 387 deletions

View File

@@ -17,66 +17,153 @@
package org.apache.hudi
import org.apache.avro.Schema.Parser
import org.apache.avro.generic.GenericRecord
import org.apache.hudi.ColumnStatsIndexSupport.{composeIndexSchema, deserialize, metadataRecordSchemaString, metadataRecordStructType, tryUnpackNonNullVal}
import org.apache.avro.Conversions.DecimalConversion
import org.apache.avro.generic.GenericData
import org.apache.hudi.ColumnStatsIndexSupport._
import org.apache.hudi.HoodieCatalystUtils.{withPersistedData, withPersistedDataset}
import org.apache.hudi.HoodieConversionUtils.toScalaOption
import org.apache.hudi.avro.model.HoodieMetadataRecord
import org.apache.hudi.avro.model._
import org.apache.hudi.client.common.HoodieSparkEngineContext
import org.apache.hudi.common.config.HoodieMetadataConfig
import org.apache.hudi.common.data.HoodieData
import org.apache.hudi.common.model.HoodieRecord
import org.apache.hudi.common.table.HoodieTableMetaClient
import org.apache.hudi.common.table.view.FileSystemViewStorageConfig
import org.apache.hudi.common.util.ValidationUtils.checkState
import org.apache.hudi.common.util.collection
import org.apache.hudi.common.util.hash.ColumnIndexID
import org.apache.hudi.data.HoodieJavaRDD
import org.apache.hudi.metadata.{HoodieMetadataPayload, HoodieTableMetadata, HoodieTableMetadataUtil, MetadataPartitionType}
import org.apache.hudi.util.JFunction
import org.apache.spark.api.java.JavaSparkContext
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.HoodieUnsafeUtils.{createDataFrameFromInternalRows, createDataFrameFromRDD, createDataFrameFromRows}
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.util.DateTimeUtils
import org.apache.spark.sql.functions.col
import org.apache.spark.sql.types._
import org.apache.spark.sql.{DataFrame, HoodieUnsafeRDDUtils, Row, SparkSession}
import org.apache.spark.sql.{DataFrame, Row, SparkSession}
import org.apache.spark.storage.StorageLevel
import java.nio.ByteBuffer
import scala.collection.JavaConverters._
import scala.collection.immutable.TreeSet
import scala.collection.mutable.ListBuffer
import scala.collection.parallel.mutable.ParHashMap
/**
* Mixin trait abstracting away heavy-lifting of interactions with Metadata Table's Column Stats Index,
* providing convenient interfaces to read it, transpose, etc
*/
trait ColumnStatsIndexSupport extends SparkAdapterSupport {
class ColumnStatsIndexSupport(spark: SparkSession,
tableSchema: StructType,
@transient metadataConfig: HoodieMetadataConfig,
@transient metaClient: HoodieTableMetaClient,
allowCaching: Boolean = false) {
def readColumnStatsIndex(spark: SparkSession,
tableBasePath: String,
metadataConfig: HoodieMetadataConfig,
targetColumns: Seq[String] = Seq.empty): DataFrame = {
val targetColStatsIndexColumns = Seq(
HoodieMetadataPayload.COLUMN_STATS_FIELD_FILE_NAME,
HoodieMetadataPayload.COLUMN_STATS_FIELD_MIN_VALUE,
HoodieMetadataPayload.COLUMN_STATS_FIELD_MAX_VALUE,
HoodieMetadataPayload.COLUMN_STATS_FIELD_NULL_COUNT,
HoodieMetadataPayload.COLUMN_STATS_FIELD_VALUE_COUNT)
@transient private lazy val engineCtx = new HoodieSparkEngineContext(new JavaSparkContext(spark.sparkContext))
@transient private lazy val metadataTable: HoodieTableMetadata =
HoodieTableMetadata.create(engineCtx, metadataConfig, metaClient.getBasePathV2.toString, FileSystemViewStorageConfig.SPILLABLE_DIR.defaultValue)
val requiredMetadataIndexColumns =
(targetColStatsIndexColumns :+ HoodieMetadataPayload.COLUMN_STATS_FIELD_COLUMN_NAME).map(colName =>
s"${HoodieMetadataPayload.SCHEMA_FIELD_ID_COLUMN_STATS}.${colName}")
@transient private lazy val cachedColumnStatsIndexViews: ParHashMap[Seq[String], DataFrame] = ParHashMap()
val metadataTableDF: DataFrame = {
// NOTE: If specific columns have been provided, we can considerably trim down amount of data fetched
// by only fetching Column Stats Index records pertaining to the requested columns.
// Otherwise we fallback to read whole Column Stats Index
if (targetColumns.nonEmpty) {
readColumnStatsIndexForColumnsInternal(spark, targetColumns, metadataConfig, tableBasePath)
} else {
readFullColumnStatsIndexInternal(spark, metadataConfig, tableBasePath)
}
// NOTE: Since [[metadataConfig]] is transient this has to be eagerly persisted, before this will be passed
// on to the executor
private val inMemoryProjectionThreshold = metadataConfig.getColumnStatsIndexInMemoryProjectionThreshold
private lazy val indexedColumns: Set[String] = {
val customIndexedColumns = metadataConfig.getColumnsEnabledForColumnStatsIndex
// Column Stats Index could index either
// - The whole table
// - Only configured columns
if (customIndexedColumns.isEmpty) {
tableSchema.fieldNames.toSet
} else {
customIndexedColumns.asScala.toSet
}
}
val colStatsDF = metadataTableDF.where(col(HoodieMetadataPayload.SCHEMA_FIELD_ID_COLUMN_STATS).isNotNull)
.select(requiredMetadataIndexColumns.map(col): _*)
/**
* Returns true in cases when Column Stats Index is built and available as standalone partition
* w/in the Metadata Table
*/
def isIndexAvailable: Boolean = {
checkState(metadataConfig.enabled, "Metadata Table support has to be enabled")
metaClient.getTableConfig.getMetadataPartitions.contains(HoodieTableMetadataUtil.PARTITION_NAME_COLUMN_STATS)
}
colStatsDF
/**
* Determines whether it would be more optimal to read Column Stats Index a) in-memory of the invoking process,
* or b) executing it on-cluster via Spark [[Dataset]] and [[RDD]] APIs
*/
def shouldReadInMemory(fileIndex: HoodieFileIndex, queryReferencedColumns: Seq[String]): Boolean = {
Option(metadataConfig.getColumnStatsIndexProcessingModeOverride) match {
case Some(mode) =>
mode == HoodieMetadataConfig.COLUMN_STATS_INDEX_PROCESSING_MODE_IN_MEMORY
case None =>
fileIndex.getFileSlicesCount * queryReferencedColumns.length < inMemoryProjectionThreshold
}
}
/**
* Loads view of the Column Stats Index in a transposed format where single row coalesces every columns'
* statistics for a single file, returning it as [[DataFrame]]
*
* Please check out scala-doc of the [[transpose]] method explaining this view in more details
*/
def loadTransposed[T](targetColumns: Seq[String], shouldReadInMemory: Boolean)(block: DataFrame => T): T = {
cachedColumnStatsIndexViews.get(targetColumns) match {
case Some(cachedDF) =>
block(cachedDF)
case None =>
val colStatsRecords: HoodieData[HoodieMetadataColumnStats] =
loadColumnStatsIndexRecords(targetColumns, shouldReadInMemory)
withPersistedData(colStatsRecords, StorageLevel.MEMORY_ONLY) {
val (transposedRows, indexSchema) = transpose(colStatsRecords, targetColumns)
val df = if (shouldReadInMemory) {
// NOTE: This will instantiate a [[Dataset]] backed by [[LocalRelation]] holding all of the rows
// of the transposed table in memory, facilitating execution of the subsequently chained operations
// on it locally (on the driver; all such operations are actually going to be performed by Spark's
// Optimizer)
createDataFrameFromRows(spark, transposedRows.collectAsList().asScala, indexSchema)
} else {
val rdd = HoodieJavaRDD.getJavaRDD(transposedRows)
spark.createDataFrame(rdd, indexSchema)
}
if (allowCaching) {
cachedColumnStatsIndexViews.put(targetColumns, df)
// NOTE: Instead of collecting the rows from the index and hold them in memory, we instead rely
// on Spark as (potentially distributed) cache managing data lifecycle, while we simply keep
// the referenced to persisted [[DataFrame]] instance
df.persist(StorageLevel.MEMORY_ONLY)
block(df)
} else {
withPersistedDataset(df) {
block(df)
}
}
}
}
}
/**
* Loads a view of the Column Stats Index in a raw format, returning it as [[DataFrame]]
*
* Please check out scala-doc of the [[transpose]] method explaining this view in more details
*/
def load(targetColumns: Seq[String] = Seq.empty, shouldReadInMemory: Boolean = false): DataFrame = {
// NOTE: If specific columns have been provided, we can considerably trim down amount of data fetched
// by only fetching Column Stats Index records pertaining to the requested columns.
// Otherwise we fallback to read whole Column Stats Index
if (targetColumns.nonEmpty) {
loadColumnStatsIndexForColumnsInternal(targetColumns, shouldReadInMemory)
} else {
loadFullColumnStatsIndexInternal()
}
}
def invalidateCaches(): Unit = {
cachedColumnStatsIndexViews.foreach { case (_, df) => df.unpersist() }
cachedColumnStatsIndexViews.clear()
}
/**
@@ -112,154 +199,184 @@ trait ColumnStatsIndexSupport extends SparkAdapterSupport {
* column references from the filtering expressions, and only transpose records corresponding to the
* columns referenced in those
*
* @param spark Spark session ref
* @param colStatsDF [[DataFrame]] bearing raw Column Stats Index table
* @param colStatsRecords [[HoodieData[HoodieMetadataColumnStats]]] bearing raw Column Stats Index records
* @param queryColumns target columns to be included into the final table
* @param tableSchema schema of the source data table
* @return reshaped table according to the format outlined above
*/
def transposeColumnStatsIndex(spark: SparkSession, colStatsDF: DataFrame, queryColumns: Seq[String], tableSchema: StructType): DataFrame = {
val colStatsSchema = colStatsDF.schema
val colStatsSchemaOrdinalsMap = colStatsSchema.fields.zipWithIndex.map({
case (field, ordinal) => (field.name, ordinal)
}).toMap
private def transpose(colStatsRecords: HoodieData[HoodieMetadataColumnStats], queryColumns: Seq[String]): (HoodieData[Row], StructType) = {
val tableSchemaFieldMap = tableSchema.fields.map(f => (f.name, f)).toMap
val colNameOrdinal = colStatsSchemaOrdinalsMap(HoodieMetadataPayload.COLUMN_STATS_FIELD_COLUMN_NAME)
val minValueOrdinal = colStatsSchemaOrdinalsMap(HoodieMetadataPayload.COLUMN_STATS_FIELD_MIN_VALUE)
val maxValueOrdinal = colStatsSchemaOrdinalsMap(HoodieMetadataPayload.COLUMN_STATS_FIELD_MAX_VALUE)
val fileNameOrdinal = colStatsSchemaOrdinalsMap(HoodieMetadataPayload.COLUMN_STATS_FIELD_FILE_NAME)
val nullCountOrdinal = colStatsSchemaOrdinalsMap(HoodieMetadataPayload.COLUMN_STATS_FIELD_NULL_COUNT)
val valueCountOrdinal = colStatsSchemaOrdinalsMap(HoodieMetadataPayload.COLUMN_STATS_FIELD_VALUE_COUNT)
// NOTE: We have to collect list of indexed columns to make sure we properly align the rows
// w/in the transposed dataset: since some files might not have all of the columns indexed
// either due to the Column Stats Index config changes, schema evolution, etc, we have
// to make sure that all of the rows w/in transposed data-frame are properly padded (with null
// values) for such file-column combinations
val indexedColumns: Seq[String] = colStatsDF.rdd.map(row => row.getString(colNameOrdinal)).distinct().collect()
// NOTE: We're sorting the columns to make sure final index schema matches layout
// of the transposed table
val sortedTargetColumns = TreeSet(queryColumns.intersect(indexedColumns): _*)
val sortedTargetColumnsSet = TreeSet(queryColumns:_*)
val sortedTargetColumns = sortedTargetColumnsSet.toSeq
val transposedRDD = colStatsDF.rdd
.filter(row => sortedTargetColumns.contains(row.getString(colNameOrdinal)))
.map { row =>
if (row.isNullAt(minValueOrdinal) && row.isNullAt(maxValueOrdinal)) {
// NOTE: This is a trick to avoid pulling all of [[ColumnStatsIndexSupport]] object into the lambdas'
// closures below
val indexedColumns = this.indexedColumns
// Here we perform complex transformation which requires us to modify the layout of the rows
// of the dataset, and therefore we rely on low-level RDD API to avoid incurring encoding/decoding
// penalty of the [[Dataset]], since it's required to adhere to its schema at all times, while
// RDDs are not;
val transposedRows: HoodieData[Row] = colStatsRecords
// NOTE: Explicit conversion is required for Scala 2.11
.filter(JFunction.toJavaSerializableFunction(r => sortedTargetColumnsSet.contains(r.getColumnName)))
.mapToPair(JFunction.toJavaSerializablePairFunction(r => {
if (r.getMinValue == null && r.getMaxValue == null) {
// Corresponding row could be null in either of the 2 cases
// - Column contains only null values (in that case both min/max have to be nulls)
// - This is a stubbed Column Stats record (used as a tombstone)
row
collection.Pair.of(r.getFileName, r)
} else {
val minValueStruct = row.getAs[Row](minValueOrdinal)
val maxValueStruct = row.getAs[Row](maxValueOrdinal)
val minValueWrapper = r.getMinValue
val maxValueWrapper = r.getMaxValue
checkState(minValueStruct != null && maxValueStruct != null, "Invalid Column Stats record: either both min/max have to be null, or both have to be non-null")
checkState(minValueWrapper != null && maxValueWrapper != null, "Invalid Column Stats record: either both min/max have to be null, or both have to be non-null")
val colName = row.getString(colNameOrdinal)
val colName = r.getColumnName
val colType = tableSchemaFieldMap(colName).dataType
val (minValue, _) = tryUnpackNonNullVal(minValueStruct)
val (maxValue, _) = tryUnpackNonNullVal(maxValueStruct)
val rowValsSeq = row.toSeq.toArray
val minValue = deserialize(tryUnpackValueWrapper(minValueWrapper), colType)
val maxValue = deserialize(tryUnpackValueWrapper(maxValueWrapper), colType)
// Update min-/max-value structs w/ unwrapped values in-place
rowValsSeq(minValueOrdinal) = deserialize(minValue, colType)
rowValsSeq(maxValueOrdinal) = deserialize(maxValue, colType)
r.setMinValue(minValue)
r.setMaxValue(maxValue)
Row(rowValsSeq: _*)
collection.Pair.of(r.getFileName, r)
}
}
.groupBy(r => r.getString(fileNameOrdinal))
.foldByKey(Seq[Row]()) {
case (_, columnRowsSeq) =>
// Rows seq is always non-empty (otherwise it won't be grouped into)
val fileName = columnRowsSeq.head.get(fileNameOrdinal)
val valueCount = columnRowsSeq.head.get(valueCountOrdinal)
}))
.groupByKey()
.map(JFunction.toJavaSerializableFunction(p => {
val columnRecordsSeq: Seq[HoodieMetadataColumnStats] = p.getValue.asScala.toSeq
val fileName: String = p.getKey
val valueCount: Long = columnRecordsSeq.head.getValueCount
// To properly align individual rows (corresponding to a file) w/in the transposed projection, we need
// to align existing column-stats for individual file with the list of expected ones for the
// whole transposed projection (a superset of all files)
val columnRowsMap = columnRowsSeq.map(row => (row.getString(colNameOrdinal), row)).toMap
val alignedColumnRowsSeq = sortedTargetColumns.toSeq.map(columnRowsMap.get)
// To properly align individual rows (corresponding to a file) w/in the transposed projection, we need
// to align existing column-stats for individual file with the list of expected ones for the
// whole transposed projection (a superset of all files)
val columnRecordsMap = columnRecordsSeq.map(r => (r.getColumnName, r)).toMap
val alignedColStatRecordsSeq = sortedTargetColumns.map(columnRecordsMap.get)
val coalescedRowValuesSeq =
alignedColumnRowsSeq.foldLeft(Seq[Any](fileName, valueCount)) {
case (acc, opt) =>
opt match {
case Some(columnStatsRow) =>
acc ++ Seq(minValueOrdinal, maxValueOrdinal, nullCountOrdinal).map(ord => columnStatsRow.get(ord))
case None =>
// NOTE: Since we're assuming missing column to essentially contain exclusively
// null values, we set null-count to be equal to value-count (this behavior is
// consistent with reading non-existent columns from Parquet)
acc ++ Seq(null, null, valueCount)
}
}
val coalescedRowValuesSeq =
alignedColStatRecordsSeq.foldLeft(ListBuffer[Any](fileName, valueCount)) {
case (acc, opt) =>
opt match {
case Some(colStatRecord) =>
acc ++= Seq(colStatRecord.getMinValue, colStatRecord.getMaxValue, colStatRecord.getNullCount)
case None =>
// NOTE: This could occur in either of the following cases:
// 1. Column is not indexed in Column Stats Index: in this case we won't be returning
// any statistics for such column (ie all stats will be null)
// 2. Particular file does not have this particular column (which is indexed by Column Stats Index):
// in this case we're assuming missing column to essentially contain exclusively
// null values, we set min/max values as null and null-count to be equal to value-count (this
// behavior is consistent with reading non-existent columns from Parquet)
//
// This is a way to determine current column's index without explicit iteration (we're adding 3 stats / column)
val idx = acc.length / 3
val colName = sortedTargetColumns(idx)
val indexed = indexedColumns.contains(colName)
Seq(Row(coalescedRowValuesSeq:_*))
}
.values
.flatMap(it => it)
val nullCount = if (indexed) valueCount else null
acc ++= Seq(null, null, nullCount)
}
}
Row(coalescedRowValuesSeq:_*)
}))
// NOTE: It's crucial to maintain appropriate ordering of the columns
// matching table layout: hence, we cherry-pick individual columns
// instead of simply filtering in the ones we're interested in the schema
val indexSchema = composeIndexSchema(sortedTargetColumns.toSeq, tableSchema)
spark.createDataFrame(transposedRDD, indexSchema)
val indexSchema = composeIndexSchema(sortedTargetColumns, tableSchema)
(transposedRows, indexSchema)
}
private def readFullColumnStatsIndexInternal(spark: SparkSession, metadataConfig: HoodieMetadataConfig, tableBasePath: String): DataFrame = {
val metadataTablePath = HoodieTableMetadata.getMetadataTableBasePath(tableBasePath)
private def loadColumnStatsIndexForColumnsInternal(targetColumns: Seq[String], shouldReadInMemory: Boolean): DataFrame = {
val colStatsDF = {
val colStatsRecords: HoodieData[HoodieMetadataColumnStats] = loadColumnStatsIndexRecords(targetColumns, shouldReadInMemory)
// NOTE: Explicit conversion is required for Scala 2.11
val catalystRows: HoodieData[InternalRow] = colStatsRecords.mapPartitions(JFunction.toJavaSerializableFunction(it => {
val converter = AvroConversionUtils.createAvroToInternalRowConverter(HoodieMetadataColumnStats.SCHEMA$, columnStatsRecordStructType)
it.asScala.map(r => converter(r).orNull).asJava
}), false)
if (shouldReadInMemory) {
// NOTE: This will instantiate a [[Dataset]] backed by [[LocalRelation]] holding all of the rows
// of the transposed table in memory, facilitating execution of the subsequently chained operations
// on it locally (on the driver; all such operations are actually going to be performed by Spark's
// Optimizer)
createDataFrameFromInternalRows(spark, catalystRows.collectAsList().asScala, columnStatsRecordStructType)
} else {
createDataFrameFromRDD(spark, HoodieJavaRDD.getJavaRDD(catalystRows), columnStatsRecordStructType)
}
}
colStatsDF.select(targetColumnStatsIndexColumns.map(col): _*)
}
private def loadColumnStatsIndexRecords(targetColumns: Seq[String], shouldReadInMemory: Boolean): HoodieData[HoodieMetadataColumnStats] = {
// Read Metadata Table's Column Stats Index records into [[HoodieData]] container by
// - Fetching the records from CSI by key-prefixes (encoded column names)
// - Extracting [[HoodieMetadataColumnStats]] records
// - Filtering out nulls
checkState(targetColumns.nonEmpty)
// TODO encoding should be done internally w/in HoodieBackedTableMetadata
val encodedTargetColumnNames = targetColumns.map(colName => new ColumnIndexID(colName).asBase64EncodedString())
val metadataRecords: HoodieData[HoodieRecord[HoodieMetadataPayload]] =
metadataTable.getRecordsByKeyPrefixes(encodedTargetColumnNames.asJava, HoodieTableMetadataUtil.PARTITION_NAME_COLUMN_STATS, shouldReadInMemory)
val columnStatsRecords: HoodieData[HoodieMetadataColumnStats] =
// NOTE: Explicit conversion is required for Scala 2.11
metadataRecords.map(JFunction.toJavaSerializableFunction(record => {
toScalaOption(record.getData.getInsertValue(null, null))
.map(metadataRecord => metadataRecord.asInstanceOf[HoodieMetadataRecord].getColumnStatsMetadata)
.orNull
}))
.filter(JFunction.toJavaSerializableFunction(columnStatsRecord => columnStatsRecord != null))
columnStatsRecords
}
private def loadFullColumnStatsIndexInternal(): DataFrame = {
val metadataTablePath = HoodieTableMetadata.getMetadataTableBasePath(metaClient.getBasePathV2.toString)
// Read Metadata Table's Column Stats Index into Spark's [[DataFrame]]
spark.read.format("org.apache.hudi")
val colStatsDF = spark.read.format("org.apache.hudi")
.options(metadataConfig.getProps.asScala)
.load(s"$metadataTablePath/${MetadataPartitionType.COLUMN_STATS.getPartitionPath}")
}
private def readColumnStatsIndexForColumnsInternal(spark: SparkSession, targetColumns: Seq[String], metadataConfig: HoodieMetadataConfig, tableBasePath: String) = {
val ctx = new HoodieSparkEngineContext(new JavaSparkContext(spark.sparkContext))
val requiredIndexColumns =
targetColumnStatsIndexColumns.map(colName =>
col(s"${HoodieMetadataPayload.SCHEMA_FIELD_ID_COLUMN_STATS}.${colName}"))
// Read Metadata Table's Column Stats Index into Spark's [[DataFrame]] by
// - Fetching the records from CSI by key-prefixes (encoded column names)
// - Deserializing fetched records into [[InternalRow]]s
// - Composing [[DataFrame]]
val metadataTableDF = {
val metadataTable = HoodieTableMetadata.create(ctx, metadataConfig, tableBasePath, FileSystemViewStorageConfig.SPILLABLE_DIR.defaultValue)
// TODO encoding should be done internally w/in HoodieBackedTableMetadata
val encodedTargetColumnNames = targetColumns.map(colName => new ColumnIndexID(colName).asBase64EncodedString())
val recordsRDD: RDD[HoodieRecord[HoodieMetadataPayload]] =
HoodieJavaRDD.getJavaRDD(
metadataTable.getRecordsByKeyPrefixes(encodedTargetColumnNames.asJava, HoodieTableMetadataUtil.PARTITION_NAME_COLUMN_STATS)
)
val catalystRowsRDD: RDD[InternalRow] = recordsRDD.mapPartitions { it =>
val metadataRecordSchema = new Parser().parse(metadataRecordSchemaString)
val converter = AvroConversionUtils.createAvroToInternalRowConverter(metadataRecordSchema, metadataRecordStructType)
it.map { record =>
// schema and props are ignored for generating metadata record from the payload
// instead, the underlying file system, or bloom filter, or columns stats metadata (part of payload) are directly used
toScalaOption(record.getData.getInsertValue(null, null))
.flatMap(avroRecord => converter(avroRecord.asInstanceOf[GenericRecord]))
.orNull
}
}
HoodieUnsafeRDDUtils.createDataFrame(spark, catalystRowsRDD, metadataRecordStructType)
}
metadataTableDF
colStatsDF.where(col(HoodieMetadataPayload.SCHEMA_FIELD_ID_COLUMN_STATS).isNotNull)
.select(requiredIndexColumns: _*)
}
}
object ColumnStatsIndexSupport {
private val metadataRecordSchemaString: String = HoodieMetadataRecord.SCHEMA$.toString
private val metadataRecordStructType: StructType = AvroConversionUtils.convertAvroSchemaToStructType(HoodieMetadataRecord.SCHEMA$)
private val expectedAvroSchemaValues = Set("BooleanWrapper", "IntWrapper", "LongWrapper", "FloatWrapper", "DoubleWrapper",
"BytesWrapper", "StringWrapper", "DateWrapper", "DecimalWrapper", "TimeMicrosWrapper", "TimestampMicrosWrapper")
/**
* Target Column Stats Index columns which internally are mapped onto fields of the correspoding
* Column Stats record payload ([[HoodieMetadataColumnStats]]) persisted w/in Metadata Table
*/
private val targetColumnStatsIndexColumns = Seq(
HoodieMetadataPayload.COLUMN_STATS_FIELD_FILE_NAME,
HoodieMetadataPayload.COLUMN_STATS_FIELD_MIN_VALUE,
HoodieMetadataPayload.COLUMN_STATS_FIELD_MAX_VALUE,
HoodieMetadataPayload.COLUMN_STATS_FIELD_NULL_COUNT,
HoodieMetadataPayload.COLUMN_STATS_FIELD_VALUE_COUNT,
HoodieMetadataPayload.COLUMN_STATS_FIELD_COLUMN_NAME
)
private val columnStatsRecordStructType: StructType = AvroConversionUtils.convertAvroSchemaToStructType(HoodieMetadataColumnStats.SCHEMA$)
/**
* @VisibleForTesting
@@ -300,13 +417,28 @@ object ColumnStatsIndexSupport {
@inline private def composeColumnStatStructType(col: String, statName: String, dataType: DataType) =
StructField(formatColName(col, statName), dataType, nullable = true, Metadata.empty)
private def tryUnpackNonNullVal(statStruct: Row): (Any, Int) =
statStruct.toSeq.zipWithIndex
.find(_._1 != null)
// NOTE: First non-null value will be a wrapper (converted into Row), bearing a single
// value
.map { case (value, ord) => (value.asInstanceOf[Row].get(0), ord)}
.getOrElse((null, -1))
private def tryUnpackValueWrapper(valueWrapper: AnyRef): Any = {
valueWrapper match {
case w: BooleanWrapper => w.getValue
case w: IntWrapper => w.getValue
case w: LongWrapper => w.getValue
case w: FloatWrapper => w.getValue
case w: DoubleWrapper => w.getValue
case w: BytesWrapper => w.getValue
case w: StringWrapper => w.getValue
case w: DateWrapper => w.getValue
case w: DecimalWrapper => w.getValue
case w: TimeMicrosWrapper => w.getValue
case w: TimestampMicrosWrapper => w.getValue
case r: GenericData.Record if expectedAvroSchemaValues.contains(r.getSchema.getName) =>
r.get("value")
case _ => throw new UnsupportedOperationException(s"Not recognized value wrapper type (${valueWrapper.getClass.getSimpleName})")
}
}
val decConv = new DecimalConversion()
private def deserialize(value: Any, dataType: DataType): Any = {
dataType match {
@@ -315,12 +447,37 @@ object ColumnStatsIndexSupport {
// here we have to decode those back into corresponding logical representation.
case TimestampType => DateTimeUtils.toJavaTimestamp(value.asInstanceOf[Long])
case DateType => DateTimeUtils.toJavaDate(value.asInstanceOf[Int])
// Standard types
case StringType => value
case BooleanType => value
// Numeric types
case FloatType => value
case DoubleType => value
case LongType => value
case IntegerType => value
// NOTE: All integral types of size less than Int are encoded as Ints in MT
case ShortType => value.asInstanceOf[Int].toShort
case ByteType => value.asInstanceOf[Int].toByte
case _ => value
// TODO fix
case _: DecimalType =>
value match {
case buffer: ByteBuffer =>
val logicalType = DecimalWrapper.SCHEMA$.getField("value").schema().getLogicalType
decConv.fromBytes(buffer, null, logicalType)
case _ => value
}
case BinaryType =>
value match {
case b: ByteBuffer =>
val bytes = new Array[Byte](b.remaining)
b.get(bytes)
bytes
case other => other
}
case _ =>
throw new UnsupportedOperationException(s"Data type for the statistic value is not recognized $dataType")
}
}
}

View File

@@ -0,0 +1,65 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi
import org.apache.hudi.common.data.HoodieData
import org.apache.spark.sql.Dataset
import org.apache.spark.storage.StorageLevel
import org.apache.spark.storage.StorageLevel._
object HoodieCatalystUtils extends SparkAdapterSupport {
/**
* Executes provided function while keeping provided [[Dataset]] instance persisted for the
* duration of the execution
*
* @param df target [[Dataset]] to be persisted
* @param level desired [[StorageLevel]] of the persistence
* @param f target function to be executed while [[Dataset]] is kept persisted
* @tparam T return value of the target function
* @return execution outcome of the [[f]] function
*/
def withPersistedDataset[T](df: Dataset[_], level: StorageLevel = MEMORY_AND_DISK)(f: => T): T = {
df.persist(level)
try {
f
} finally {
df.unpersist()
}
}
/**
* Executes provided function while keeping provided [[HoodieData]] instance persisted for the
* duration of the execution
*
* @param data target [[Dataset]] to be persisted
* @param level desired [[StorageLevel]] of the persistence
* @param f target function to be executed while [[Dataset]] is kept persisted
* @tparam T return value of the target function
* @return execution outcome of the [[f]] function
*/
def withPersistedData[T](data: HoodieData[_], level: StorageLevel = MEMORY_AND_DISK)(f: => T): T = {
data.persist(sparkAdapter.convertStorageLevelToString(level))
try {
f
} finally {
data.unpersist()
}
}
}

View File

@@ -27,11 +27,10 @@ import org.apache.hudi.keygen.{BuiltinKeyGenerator, SparkKeyGeneratorInterface}
import org.apache.hudi.table.BulkInsertPartitioner
import org.apache.spark.internal.Logging
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.HoodieUnsafeRDDUtils.createDataFrame
import org.apache.spark.sql.HoodieUnsafeRowUtils.{composeNestedFieldPath, getNestedInternalRowValue}
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.types.{StringType, StructField, StructType}
import org.apache.spark.sql.{DataFrame, Dataset, HoodieUnsafeRDDUtils, Row}
import org.apache.spark.sql.{DataFrame, Dataset, HoodieUnsafeUtils, Row}
import org.apache.spark.unsafe.types.UTF8String
import scala.collection.JavaConverters.asScalaBufferConverter
@@ -92,9 +91,9 @@ object HoodieDatasetBulkInsertHelper extends Logging {
val updatedDF = if (populateMetaFields && config.shouldCombineBeforeInsert) {
val dedupedRdd = dedupeRows(prependedRdd, updatedSchema, config.getPreCombineField, SparkHoodieIndexFactory.isGlobalIndex(config))
HoodieUnsafeRDDUtils.createDataFrame(df.sparkSession, dedupedRdd, updatedSchema)
HoodieUnsafeUtils.createDataFrameFromRDD(df.sparkSession, dedupedRdd, updatedSchema)
} else {
HoodieUnsafeRDDUtils.createDataFrame(df.sparkSession, prependedRdd, updatedSchema)
HoodieUnsafeUtils.createDataFrameFromRDD(df.sparkSession, prependedRdd, updatedSchema)
}
val trimmedDF = if (shouldDropPartitionColumns) {

View File

@@ -1,45 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi
import org.apache.spark.sql.DataFrame
import org.apache.spark.storage.StorageLevel
import org.apache.spark.storage.StorageLevel.MEMORY_AND_DISK
object HoodieDatasetUtils {
/**
* Executes provided function while keeping provided [[DataFrame]] instance persisted for the
* duration of the execution
*
* @param df target [[DataFrame]] to be persisted
* @param level desired [[StorageLevel]] of the persistence
* @param f target function to be executed while [[DataFrame]] is kept persisted
* @tparam T return value of the target function
* @return execution outcome of the [[f]] function
*/
def withPersistence[T](df: DataFrame, level: StorageLevel = MEMORY_AND_DISK)(f: => T): T = {
df.persist(level)
try {
f
} finally {
df.unpersist()
}
}
}

View File

@@ -18,7 +18,6 @@
package org.apache.hudi
import org.apache.hadoop.fs.{FileStatus, Path}
import org.apache.hudi.HoodieDatasetUtils.withPersistence
import org.apache.hudi.HoodieFileIndex.{DataSkippingFailureMode, collectReferencedColumns, getConfigProperties}
import org.apache.hudi.common.config.{HoodieMetadataConfig, TypedProperties}
import org.apache.hudi.common.table.HoodieTableMetaClient
@@ -26,7 +25,7 @@ import org.apache.hudi.common.util.StringUtils
import org.apache.hudi.exception.HoodieException
import org.apache.hudi.keygen.constant.KeyGeneratorOptions
import org.apache.hudi.keygen.{TimestampBasedAvroKeyGenerator, TimestampBasedKeyGenerator}
import org.apache.hudi.metadata.{HoodieMetadataPayload, HoodieTableMetadataUtil}
import org.apache.hudi.metadata.HoodieMetadataPayload
import org.apache.spark.internal.Logging
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.{And, Expression, Literal}
@@ -35,7 +34,7 @@ import org.apache.spark.sql.hudi.DataSkippingUtils.translateIntoColumnStatsIndex
import org.apache.spark.sql.hudi.HoodieSqlCommonUtils
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types._
import org.apache.spark.sql.{Column, DataFrame, SparkSession}
import org.apache.spark.sql.{Column, SparkSession}
import org.apache.spark.unsafe.types.UTF8String
import java.text.SimpleDateFormat
@@ -80,8 +79,9 @@ case class HoodieFileIndex(spark: SparkSession,
specifiedQueryInstant = options.get(DataSourceReadOptions.TIME_TRAVEL_AS_OF_INSTANT.key).map(HoodieSqlCommonUtils.formatQueryInstant),
fileStatusCache = fileStatusCache
)
with FileIndex
with ColumnStatsIndexSupport {
with FileIndex {
@transient private lazy val columnStatsIndex = new ColumnStatsIndexSupport(spark, schema, metadataConfig, metaClient)
override def rootPaths: Seq[Path] = queryPaths.asScala
@@ -95,8 +95,9 @@ case class HoodieFileIndex(spark: SparkSession,
*/
def allFiles: Seq[FileStatus] = {
cachedAllInputFileSlices.values.asScala.flatMap(_.asScala)
.filter(_.getBaseFile.isPresent)
.map(_.getBaseFile.get().getFileStatus)
.map(fs => fs.getBaseFile.orElse(null))
.filter(_ != null)
.map(_.getFileStatus)
.toSeq
}
@@ -196,64 +197,63 @@ case class HoodieFileIndex(spark: SparkSession,
// nothing CSI in particular could be applied for)
lazy val queryReferencedColumns = collectReferencedColumns(spark, queryFilters, schema)
if (!isMetadataTableEnabled || !isColumnStatsIndexAvailable || !isDataSkippingEnabled) {
if (!isMetadataTableEnabled || !isDataSkippingEnabled || !columnStatsIndex.isIndexAvailable) {
validateConfig()
Option.empty
} else if (queryFilters.isEmpty || queryReferencedColumns.isEmpty) {
Option.empty
} else {
val colStatsDF: DataFrame = readColumnStatsIndex(spark, basePath, metadataConfig, queryReferencedColumns)
// NOTE: Since executing on-cluster via Spark API has its own non-trivial amount of overhead,
// it's most often preferential to fetch Column Stats Index w/in the same process (usually driver),
// w/o resorting to on-cluster execution.
// For that we use a simple-heuristic to determine whether we should read and process CSI in-memory or
// on-cluster: total number of rows of the expected projected portion of the index has to be below the
// threshold (of 100k records)
val shouldReadInMemory = columnStatsIndex.shouldReadInMemory(this, queryReferencedColumns)
// Persist DF to avoid re-computing column statistics unraveling
withPersistence(colStatsDF) {
val transposedColStatsDF: DataFrame = transposeColumnStatsIndex(spark, colStatsDF, queryReferencedColumns, schema)
columnStatsIndex.loadTransposed(queryReferencedColumns, shouldReadInMemory) { transposedColStatsDF =>
val indexSchema = transposedColStatsDF.schema
val indexFilter =
queryFilters.map(translateIntoColumnStatsIndexFilterExpr(_, indexSchema))
.reduce(And)
// Persist DF to avoid re-computing column statistics unraveling
withPersistence(transposedColStatsDF) {
val indexSchema = transposedColStatsDF.schema
val indexFilter =
queryFilters.map(translateIntoColumnStatsIndexFilterExpr(_, indexSchema))
.reduce(And)
val allIndexedFileNames =
transposedColStatsDF.select(HoodieMetadataPayload.COLUMN_STATS_FIELD_FILE_NAME)
.collect()
.map(_.getString(0))
.toSet
val allIndexedFileNames =
transposedColStatsDF.select(HoodieMetadataPayload.COLUMN_STATS_FIELD_FILE_NAME)
.collect()
.map(_.getString(0))
.toSet
val prunedCandidateFileNames =
transposedColStatsDF.where(new Column(indexFilter))
.select(HoodieMetadataPayload.COLUMN_STATS_FIELD_FILE_NAME)
.collect()
.map(_.getString(0))
.toSet
val prunedCandidateFileNames =
transposedColStatsDF.where(new Column(indexFilter))
.select(HoodieMetadataPayload.COLUMN_STATS_FIELD_FILE_NAME)
.collect()
.map(_.getString(0))
.toSet
// NOTE: Col-Stats Index isn't guaranteed to have complete set of statistics for every
// base-file: since it's bound to clustering, which could occur asynchronously
// at arbitrary point in time, and is not likely to be touching all of the base files.
//
// To close that gap, we manually compute the difference b/w all indexed (by col-stats-index)
// files and all outstanding base-files, and make sure that all base files not
// represented w/in the index are included in the output of this method
val notIndexedFileNames = lookupFileNamesMissingFromIndex(allIndexedFileNames)
// NOTE: Col-Stats Index isn't guaranteed to have complete set of statistics for every
// base-file: since it's bound to clustering, which could occur asynchronously
// at arbitrary point in time, and is not likely to be touching all of the base files.
//
// To close that gap, we manually compute the difference b/w all indexed (by col-stats-index)
// files and all outstanding base-files, and make sure that all base files not
// represented w/in the index are included in the output of this method
val notIndexedFileNames = lookupFileNamesMissingFromIndex(allIndexedFileNames)
Some(prunedCandidateFileNames ++ notIndexedFileNames)
}
Some(prunedCandidateFileNames ++ notIndexedFileNames)
}
}
}
override def refresh(): Unit = super.refresh()
override def refresh(): Unit = {
super.refresh()
columnStatsIndex.invalidateCaches()
}
override def inputFiles: Array[String] =
allFiles.map(_.getPath.toString).toArray
override def sizeInBytes: Long = cachedFileSize
private def isColumnStatsIndexAvailable =
metaClient.getTableConfig.getMetadataPartitions
.contains(HoodieTableMetadataUtil.PARTITION_NAME_COLUMN_STATS)
private def isDataSkippingEnabled: Boolean =
options.getOrElse(DataSourceReadOptions.ENABLE_DATA_SKIPPING.key(),
spark.sessionState.conf.getConfString(DataSourceReadOptions.ENABLE_DATA_SKIPPING.key(), "false")).toBoolean

View File

@@ -21,17 +21,54 @@ package org.apache.spark.sql
import org.apache.hudi.HoodieUnsafeRDD
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan}
import org.apache.spark.sql.types.StructType
import org.apache.spark.util.MutablePair
/**
* Suite of utilities helping in handling instances of [[HoodieUnsafeRDD]]
*/
object HoodieUnsafeRDDUtils {
object HoodieUnsafeUtils {
// TODO scala-doc
def createDataFrame(spark: SparkSession, rdd: RDD[InternalRow], structType: StructType): DataFrame =
spark.internalCreateDataFrame(rdd, structType)
/**
* Creates [[DataFrame]] from the in-memory [[Seq]] of [[Row]]s with provided [[schema]]
*
* NOTE: [[DataFrame]] is based on [[LocalRelation]], entailing that most computations with it
* will be executed by Spark locally
*
* @param spark spark's session
* @param rows collection of rows to base [[DataFrame]] on
* @param schema target [[DataFrame]]'s schema
* @return
*/
def createDataFrameFromRows(spark: SparkSession, rows: Seq[Row], schema: StructType): DataFrame =
Dataset.ofRows(spark, LocalRelation.fromExternalRows(schema.toAttributes, rows))
/**
* Creates [[DataFrame]] from the in-memory [[Seq]] of [[InternalRow]]s with provided [[schema]]
*
* NOTE: [[DataFrame]] is based on [[LocalRelation]], entailing that most computations with it
* will be executed by Spark locally
*
* @param spark spark's session
* @param rows collection of rows to base [[DataFrame]] on
* @param schema target [[DataFrame]]'s schema
* @return
*/
def createDataFrameFromInternalRows(spark: SparkSession, rows: Seq[InternalRow], schema: StructType): DataFrame =
Dataset.ofRows(spark, LocalRelation(schema.toAttributes, rows))
/**
* Creates [[DataFrame]] from the [[RDD]] of [[Row]]s with provided [[schema]]
*
* @param spark spark's session
* @param rdd RDD w/ [[Row]]s to base [[DataFrame]] on
* @param schema target [[DataFrame]]'s schema
* @return
*/
def createDataFrameFromRDD(spark: SparkSession, rdd: RDD[InternalRow], schema: StructType): DataFrame =
spark.internalCreateDataFrame(rdd, schema)
/**
* Canonical implementation of the [[RDD#collect]] for [[HoodieUnsafeRDD]], returning a properly