1
0

[HUDI-4420] Fixing table schema delineation on partition/data schema for Spark relations (#5708)

This commit is contained in:
Alexey Kudinkin
2022-07-23 14:59:16 -07:00
committed by GitHub
parent da28e38fe3
commit 2d745057ea
15 changed files with 364 additions and 273 deletions

View File

@@ -45,7 +45,7 @@ trait SparkAdapter extends Serializable {
* Creates instance of [[HoodieCatalystExpressionUtils]] providing for common utils operating * Creates instance of [[HoodieCatalystExpressionUtils]] providing for common utils operating
* on Catalyst [[Expression]]s * on Catalyst [[Expression]]s
*/ */
def getCatalystExpressionUtils(): HoodieCatalystExpressionUtils def getCatalystExpressionUtils: HoodieCatalystExpressionUtils
/** /**
* Creates instance of [[HoodieCatalystPlansUtils]] providing for common utils operating * Creates instance of [[HoodieCatalystPlansUtils]] providing for common utils operating

View File

@@ -20,11 +20,13 @@ package org.apache.hudi
import org.apache.hadoop.conf.Configuration import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path import org.apache.hadoop.fs.Path
import org.apache.hudi.HoodieBaseRelation.projectReader
import org.apache.hudi.common.table.HoodieTableMetaClient import org.apache.hudi.common.table.HoodieTableMetaClient
import org.apache.hudi.hadoop.HoodieROTablePathFilter import org.apache.hudi.hadoop.HoodieROTablePathFilter
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SQLContext import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.Expression import org.apache.spark.sql.catalyst.expressions.Expression
import org.apache.spark.sql.execution.datasources
import org.apache.spark.sql.execution.datasources._ import org.apache.spark.sql.execution.datasources._
import org.apache.spark.sql.sources.{BaseRelation, Filter} import org.apache.spark.sql.sources.{BaseRelation, Filter}
import org.apache.spark.sql.types.StructType import org.apache.spark.sql.types.StructType
@@ -68,17 +70,18 @@ class BaseFileOnlyRelation(sqlContext: SQLContext,
} }
protected override def composeRDD(fileSplits: Seq[HoodieBaseFileSplit], protected override def composeRDD(fileSplits: Seq[HoodieBaseFileSplit],
partitionSchema: StructType, tableSchema: HoodieTableSchema,
dataSchema: HoodieTableSchema,
requiredSchema: HoodieTableSchema, requiredSchema: HoodieTableSchema,
requestedColumns: Array[String], requestedColumns: Array[String],
filters: Array[Filter]): HoodieUnsafeRDD = { filters: Array[Filter]): RDD[InternalRow] = {
val (partitionSchema, dataSchema, requiredDataSchema) =
tryPrunePartitionColumns(tableSchema, requiredSchema)
val baseFileReader = createBaseFileReader( val baseFileReader = createBaseFileReader(
spark = sparkSession, spark = sparkSession,
partitionSchema = partitionSchema, partitionSchema = partitionSchema,
dataSchema = dataSchema, dataSchema = dataSchema,
requiredSchema = requiredSchema, requiredDataSchema = requiredDataSchema,
filters = filters, filters = filters,
options = optParams, options = optParams,
// NOTE: We have to fork the Hadoop Config here as Spark will be modifying it // NOTE: We have to fork the Hadoop Config here as Spark will be modifying it
@@ -86,7 +89,15 @@ class BaseFileOnlyRelation(sqlContext: SQLContext,
hadoopConf = embedInternalSchema(new Configuration(conf), requiredSchema.internalSchema) hadoopConf = embedInternalSchema(new Configuration(conf), requiredSchema.internalSchema)
) )
new HoodieFileScanRDD(sparkSession, baseFileReader, fileSplits) // NOTE: In some case schema of the reader's output (reader's schema) might not match the schema expected by the caller.
// This could occur for ex, when requested schema contains partition columns which might not be persisted w/in the
// data file, but instead would be parsed from the partition path. In that case output of the file-reader will have
// different ordering of the fields than the original required schema (for more details please check out
// [[ParquetFileFormat]] impl). In that case we have to project the rows from the file-reader's schema
// back into the one expected by the caller
val projectedReader = projectReader(baseFileReader, requiredSchema.structTypeSchema)
new HoodieFileScanRDD(sparkSession, projectedReader.apply, fileSplits)
} }
protected def collectFileSplits(partitionFilters: Seq[Expression], dataFilters: Seq[Expression]): Seq[HoodieBaseFileSplit] = { protected def collectFileSplits(partitionFilters: Seq[Expression], dataFilters: Seq[Expression]): Seq[HoodieBaseFileSplit] = {

View File

@@ -45,7 +45,7 @@ import org.apache.spark.execution.datasources.HoodieInMemoryFileIndex
import org.apache.spark.internal.Logging import org.apache.spark.internal.Logging
import org.apache.spark.rdd.RDD import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.{Expression, SubqueryExpression} import org.apache.spark.sql.catalyst.expressions.{Expression, SubqueryExpression, UnsafeProjection}
import org.apache.spark.sql.execution.FileRelation import org.apache.spark.sql.execution.FileRelation
import org.apache.spark.sql.execution.datasources.orc.OrcFileFormat import org.apache.spark.sql.execution.datasources.orc.OrcFileFormat
import org.apache.spark.sql.execution.datasources.parquet.{HoodieParquetFileFormat, ParquetFileFormat} import org.apache.spark.sql.execution.datasources.parquet.{HoodieParquetFileFormat, ParquetFileFormat}
@@ -274,7 +274,7 @@ abstract class HoodieBaseRelation(val sqlContext: SQLContext,
def canPruneRelationSchema: Boolean = def canPruneRelationSchema: Boolean =
(fileFormat.isInstanceOf[ParquetFileFormat] || fileFormat.isInstanceOf[OrcFileFormat]) && (fileFormat.isInstanceOf[ParquetFileFormat] || fileFormat.isInstanceOf[OrcFileFormat]) &&
// NOTE: Some relations might be disabling sophisticated schema pruning techniques (for ex, nested schema pruning) // NOTE: Some relations might be disabling sophisticated schema pruning techniques (for ex, nested schema pruning)
// TODO(HUDI-XXX) internal schema doesn't supported nested schema pruning currently // TODO(HUDI-XXX) internal schema doesn't support nested schema pruning currently
!hasSchemaOnRead !hasSchemaOnRead
override def schema: StructType = { override def schema: StructType = {
@@ -334,38 +334,14 @@ abstract class HoodieBaseRelation(val sqlContext: SQLContext,
val tableSchema = HoodieTableSchema(tableStructSchema, tableAvroSchemaStr, internalSchemaOpt) val tableSchema = HoodieTableSchema(tableStructSchema, tableAvroSchemaStr, internalSchemaOpt)
val requiredSchema = HoodieTableSchema(requiredStructSchema, requiredAvroSchema.toString, Some(requiredInternalSchema)) val requiredSchema = HoodieTableSchema(requiredStructSchema, requiredAvroSchema.toString, Some(requiredInternalSchema))
// Since schema requested by the caller might contain partition columns, we might need to
// prune it, removing all partition columns from it in case these columns are not persisted
// in the data files
//
// NOTE: This partition schema is only relevant to file reader to be able to embed
// values of partition columns (hereafter referred to as partition values) encoded into
// the partition path, and omitted from the data file, back into fetched rows;
// Note that, by default, partition columns are not omitted therefore specifying
// partition schema for reader is not required
val (partitionSchema, dataSchema, requiredDataSchema) =
tryPrunePartitionColumns(tableSchema, requiredSchema)
if (fileSplits.isEmpty) { if (fileSplits.isEmpty) {
sparkSession.sparkContext.emptyRDD sparkSession.sparkContext.emptyRDD
} else { } else {
val rdd = composeRDD(fileSplits, partitionSchema, dataSchema, requiredDataSchema, targetColumns, filters) val rdd = composeRDD(fileSplits, tableSchema, requiredSchema, targetColumns, filters)
// NOTE: In case when partition columns have been pruned from the required schema, we have to project
// the rows from the pruned schema back into the one expected by the caller
val projectedRDD = if (requiredDataSchema.structTypeSchema != requiredSchema.structTypeSchema) {
rdd.mapPartitions { it =>
val fullPrunedSchema = StructType(requiredDataSchema.structTypeSchema.fields ++ partitionSchema.fields)
val unsafeProjection = generateUnsafeProjection(fullPrunedSchema, requiredSchema.structTypeSchema)
it.map(unsafeProjection)
}
} else {
rdd
}
// Here we rely on a type erasure, to workaround inherited API restriction and pass [[RDD[InternalRow]]] back as [[RDD[Row]]] // Here we rely on a type erasure, to workaround inherited API restriction and pass [[RDD[InternalRow]]] back as [[RDD[Row]]]
// Please check [[needConversion]] scala-doc for more details // Please check [[needConversion]] scala-doc for more details
projectedRDD.asInstanceOf[RDD[Row]] rdd.asInstanceOf[RDD[Row]]
} }
} }
@@ -373,19 +349,17 @@ abstract class HoodieBaseRelation(val sqlContext: SQLContext,
* Composes RDD provided file splits to read from, table and partition schemas, data filters to be applied * Composes RDD provided file splits to read from, table and partition schemas, data filters to be applied
* *
* @param fileSplits file splits to be handled by the RDD * @param fileSplits file splits to be handled by the RDD
* @param partitionSchema target table's partition schema * @param tableSchema target table's schema
* @param dataSchema target table's data files' schema
* @param requiredSchema projected schema required by the reader * @param requiredSchema projected schema required by the reader
* @param requestedColumns columns requested by the query * @param requestedColumns columns requested by the query
* @param filters data filters to be applied * @param filters data filters to be applied
* @return instance of RDD (implementing [[HoodieUnsafeRDD]]) * @return instance of RDD (holding [[InternalRow]]s)
*/ */
protected def composeRDD(fileSplits: Seq[FileSplit], protected def composeRDD(fileSplits: Seq[FileSplit],
partitionSchema: StructType, tableSchema: HoodieTableSchema,
dataSchema: HoodieTableSchema,
requiredSchema: HoodieTableSchema, requiredSchema: HoodieTableSchema,
requestedColumns: Array[String], requestedColumns: Array[String],
filters: Array[Filter]): HoodieUnsafeRDD filters: Array[Filter]): RDD[InternalRow]
/** /**
* Provided with partition and date filters collects target file splits to read records from, while * Provided with partition and date filters collects target file splits to read records from, while
@@ -553,7 +527,7 @@ abstract class HoodieBaseRelation(val sqlContext: SQLContext,
protected def createBaseFileReader(spark: SparkSession, protected def createBaseFileReader(spark: SparkSession,
partitionSchema: StructType, partitionSchema: StructType,
dataSchema: HoodieTableSchema, dataSchema: HoodieTableSchema,
requiredSchema: HoodieTableSchema, requiredDataSchema: HoodieTableSchema,
filters: Seq[Filter], filters: Seq[Filter],
options: Map[String, String], options: Map[String, String],
hadoopConf: Configuration): BaseFileReader = { hadoopConf: Configuration): BaseFileReader = {
@@ -564,42 +538,56 @@ abstract class HoodieBaseRelation(val sqlContext: SQLContext,
// we have to eagerly initialize all of the readers even though only one specific to the type // we have to eagerly initialize all of the readers even though only one specific to the type
// of the file being read will be used. This is required to avoid serialization of the whole // of the file being read will be used. This is required to avoid serialization of the whole
// relation (containing file-index for ex) and passing it to the executor // relation (containing file-index for ex) and passing it to the executor
val reader = tableBaseFileFormat match { val (read: (PartitionedFile => Iterator[InternalRow]), schema: StructType) =
case HoodieFileFormat.PARQUET => tableBaseFileFormat match {
HoodieDataSourceHelper.buildHoodieParquetReader( case HoodieFileFormat.PARQUET =>
sparkSession = spark, val parquetReader = HoodieDataSourceHelper.buildHoodieParquetReader(
dataSchema = dataSchema.structTypeSchema, sparkSession = spark,
partitionSchema = partitionSchema, dataSchema = dataSchema.structTypeSchema,
requiredSchema = requiredSchema.structTypeSchema, partitionSchema = partitionSchema,
filters = filters, requiredSchema = requiredDataSchema.structTypeSchema,
options = options, filters = filters,
hadoopConf = hadoopConf, options = options,
// We're delegating to Spark to append partition values to every row only in cases hadoopConf = hadoopConf,
// when these corresponding partition-values are not persisted w/in the data file itself // We're delegating to Spark to append partition values to every row only in cases
appendPartitionValues = shouldExtractPartitionValuesFromPartitionPath // when these corresponding partition-values are not persisted w/in the data file itself
) appendPartitionValues = shouldExtractPartitionValuesFromPartitionPath
)
// Since partition values by default are omitted, and not persisted w/in data-files by Spark,
// data-file readers (such as [[ParquetFileFormat]]) have to inject partition values while reading
// the data. As such, actual full schema produced by such reader is composed of
// a) Data-file schema (projected or not)
// b) Appended partition column values
val readerSchema = StructType(requiredDataSchema.structTypeSchema.fields ++ partitionSchema.fields)
(parquetReader, readerSchema)
case HoodieFileFormat.HFILE => case HoodieFileFormat.HFILE =>
createHFileReader( val hfileReader = createHFileReader(
spark = spark, spark = spark,
dataSchema = dataSchema, dataSchema = dataSchema,
requiredSchema = requiredSchema, requiredDataSchema = requiredDataSchema,
filters = filters, filters = filters,
options = options, options = options,
hadoopConf = hadoopConf hadoopConf = hadoopConf
) )
(hfileReader, requiredDataSchema.structTypeSchema)
case _ => throw new UnsupportedOperationException(s"Base file format is not currently supported ($tableBaseFileFormat)") case _ => throw new UnsupportedOperationException(s"Base file format is not currently supported ($tableBaseFileFormat)")
} }
partitionedFile => { BaseFileReader(
val extension = FSUtils.getFileExtension(partitionedFile.filePath) read = partitionedFile => {
if (tableBaseFileFormat.getFileExtension.equals(extension)) { val extension = FSUtils.getFileExtension(partitionedFile.filePath)
reader.apply(partitionedFile) if (tableBaseFileFormat.getFileExtension.equals(extension)) {
} else { read(partitionedFile)
throw new UnsupportedOperationException(s"Invalid base-file format ($extension), expected ($tableBaseFileFormat)") } else {
} throw new UnsupportedOperationException(s"Invalid base-file format ($extension), expected ($tableBaseFileFormat)")
} }
},
schema = schema
)
} }
protected def embedInternalSchema(conf: Configuration, internalSchemaOpt: Option[InternalSchema]): Configuration = { protected def embedInternalSchema(conf: Configuration, internalSchemaOpt: Option[InternalSchema]): Configuration = {
@@ -615,8 +603,17 @@ abstract class HoodieBaseRelation(val sqlContext: SQLContext,
conf conf
} }
private def tryPrunePartitionColumns(tableSchema: HoodieTableSchema, protected def tryPrunePartitionColumns(tableSchema: HoodieTableSchema,
requiredSchema: HoodieTableSchema): (StructType, HoodieTableSchema, HoodieTableSchema) = { requiredSchema: HoodieTableSchema): (StructType, HoodieTableSchema, HoodieTableSchema) = {
// Since schema requested by the caller might contain partition columns, we might need to
// prune it, removing all partition columns from it in case these columns are not persisted
// in the data files
//
// NOTE: This partition schema is only relevant to file reader to be able to embed
// values of partition columns (hereafter referred to as partition values) encoded into
// the partition path, and omitted from the data file, back into fetched rows;
// Note that, by default, partition columns are not omitted therefore specifying
// partition schema for reader is not required
if (shouldExtractPartitionValuesFromPartitionPath) { if (shouldExtractPartitionValuesFromPartitionPath) {
val partitionSchema = StructType(partitionColumns.map(StructField(_, StringType))) val partitionSchema = StructType(partitionColumns.map(StructField(_, StringType)))
val prunedDataStructSchema = prunePartitionColumns(tableSchema.structTypeSchema) val prunedDataStructSchema = prunePartitionColumns(tableSchema.structTypeSchema)
@@ -645,10 +642,12 @@ abstract class HoodieBaseRelation(val sqlContext: SQLContext,
object HoodieBaseRelation extends SparkAdapterSupport { object HoodieBaseRelation extends SparkAdapterSupport {
type BaseFileReader = PartitionedFile => Iterator[InternalRow] case class BaseFileReader(read: PartitionedFile => Iterator[InternalRow], val schema: StructType) {
def apply(file: PartitionedFile): Iterator[InternalRow] = read.apply(file)
}
private def generateUnsafeProjection(from: StructType, to: StructType) = def generateUnsafeProjection(from: StructType, to: StructType): UnsafeProjection =
sparkAdapter.getCatalystExpressionUtils().generateUnsafeProjection(from, to) sparkAdapter.getCatalystExpressionUtils.generateUnsafeProjection(from, to)
def convertToAvroSchema(structSchema: StructType): Schema = def convertToAvroSchema(structSchema: StructType): Schema =
sparkAdapter.getAvroSchemaConverters.toAvroType(structSchema, nullable = false, "Record") sparkAdapter.getAvroSchemaConverters.toAvroType(structSchema, nullable = false, "Record")
@@ -656,6 +655,32 @@ object HoodieBaseRelation extends SparkAdapterSupport {
def getPartitionPath(fileStatus: FileStatus): Path = def getPartitionPath(fileStatus: FileStatus): Path =
fileStatus.getPath.getParent fileStatus.getPath.getParent
/**
* Projects provided file reader's output from its original schema, into a [[requiredSchema]]
*
* NOTE: [[requiredSchema]] has to be a proper subset of the file reader's schema
*
* @param reader file reader to be projected
* @param requiredSchema target schema for the output of the provided file reader
*/
def projectReader(reader: BaseFileReader, requiredSchema: StructType): BaseFileReader = {
checkState(reader.schema.fields.toSet.intersect(requiredSchema.fields.toSet).size == requiredSchema.size)
if (reader.schema == requiredSchema) {
reader
} else {
val read = reader.apply(_)
val projectedRead: PartitionedFile => Iterator[InternalRow] = (file: PartitionedFile) => {
// NOTE: Projection is not a serializable object, hence it creation should only happen w/in
// the executor process
val unsafeProjection = generateUnsafeProjection(reader.schema, requiredSchema)
read(file).map(unsafeProjection)
}
BaseFileReader(projectedRead, requiredSchema)
}
}
/** /**
* Projects provided schema by picking only required (projected) top-level columns from it * Projects provided schema by picking only required (projected) top-level columns from it
* *
@@ -666,7 +691,6 @@ object HoodieBaseRelation extends SparkAdapterSupport {
tableSchema match { tableSchema match {
case Right(internalSchema) => case Right(internalSchema) =>
checkState(!internalSchema.isEmptySchema) checkState(!internalSchema.isEmptySchema)
// TODO extend pruning to leverage optimizer pruned schema
val prunedInternalSchema = InternalSchemaUtils.pruneInternalSchema(internalSchema, requiredColumns.toList.asJava) val prunedInternalSchema = InternalSchemaUtils.pruneInternalSchema(internalSchema, requiredColumns.toList.asJava)
val requiredAvroSchema = AvroInternalSchemaConverter.convert(prunedInternalSchema, "schema") val requiredAvroSchema = AvroInternalSchemaConverter.convert(prunedInternalSchema, "schema")
val requiredStructSchema = AvroConversionUtils.convertAvroSchemaToStructType(requiredAvroSchema) val requiredStructSchema = AvroConversionUtils.convertAvroSchemaToStructType(requiredAvroSchema)
@@ -691,10 +715,10 @@ object HoodieBaseRelation extends SparkAdapterSupport {
private def createHFileReader(spark: SparkSession, private def createHFileReader(spark: SparkSession,
dataSchema: HoodieTableSchema, dataSchema: HoodieTableSchema,
requiredSchema: HoodieTableSchema, requiredDataSchema: HoodieTableSchema,
filters: Seq[Filter], filters: Seq[Filter],
options: Map[String, String], options: Map[String, String],
hadoopConf: Configuration): BaseFileReader = { hadoopConf: Configuration): PartitionedFile => Iterator[InternalRow] = {
val hadoopConfBroadcast = val hadoopConfBroadcast =
spark.sparkContext.broadcast(new SerializableConfiguration(hadoopConf)) spark.sparkContext.broadcast(new SerializableConfiguration(hadoopConf))
@@ -703,10 +727,10 @@ object HoodieBaseRelation extends SparkAdapterSupport {
val reader = new HoodieHFileReader[GenericRecord](hadoopConf, new Path(partitionedFile.filePath), val reader = new HoodieHFileReader[GenericRecord](hadoopConf, new Path(partitionedFile.filePath),
new CacheConfig(hadoopConf)) new CacheConfig(hadoopConf))
val requiredRowSchema = requiredSchema.structTypeSchema val requiredRowSchema = requiredDataSchema.structTypeSchema
// NOTE: Schema has to be parsed at this point, since Avro's [[Schema]] aren't serializable // NOTE: Schema has to be parsed at this point, since Avro's [[Schema]] aren't serializable
// to be passed from driver to executor // to be passed from driver to executor
val requiredAvroSchema = new Schema.Parser().parse(requiredSchema.avroSchemaStr) val requiredAvroSchema = new Schema.Parser().parse(requiredDataSchema.avroSchemaStr)
val avroToRowConverter = AvroConversionUtils.createAvroToInternalRowConverter(requiredAvroSchema, requiredRowSchema) val avroToRowConverter = AvroConversionUtils.createAvroToInternalRowConverter(requiredAvroSchema, requiredRowSchema)
reader.getRecordIterator(requiredAvroSchema).asScala reader.getRecordIterator(requiredAvroSchema).asScala

View File

@@ -41,15 +41,17 @@ object HoodieDataSourceHelper extends PredicateHelper with SparkAdapterSupport {
/** /**
* Wrapper for `buildReaderWithPartitionValues` of [[ParquetFileFormat]] handling [[ColumnarBatch]], * Wrapper for `buildReaderWithPartitionValues` of [[ParquetFileFormat]] handling [[ColumnarBatch]],
* when Parquet's Vectorized Reader is used * when Parquet's Vectorized Reader is used
*
* TODO move to HoodieBaseRelation, make private
*/ */
def buildHoodieParquetReader(sparkSession: SparkSession, private[hudi] def buildHoodieParquetReader(sparkSession: SparkSession,
dataSchema: StructType, dataSchema: StructType,
partitionSchema: StructType, partitionSchema: StructType,
requiredSchema: StructType, requiredSchema: StructType,
filters: Seq[Filter], filters: Seq[Filter],
options: Map[String, String], options: Map[String, String],
hadoopConf: Configuration, hadoopConf: Configuration,
appendPartitionValues: Boolean = false): PartitionedFile => Iterator[InternalRow] = { appendPartitionValues: Boolean = false): PartitionedFile => Iterator[InternalRow] = {
val parquetFileFormat: ParquetFileFormat = sparkAdapter.createHoodieParquetFileFormat(appendPartitionValues).get val parquetFileFormat: ParquetFileFormat = sparkAdapter.createHoodieParquetFileFormat(appendPartitionValues).get
val readParquetFile: PartitionedFile => Iterator[Any] = parquetFileFormat.buildReaderWithPartitionValues( val readParquetFile: PartitionedFile => Iterator[Any] = parquetFileFormat.buildReaderWithPartitionValues(
sparkSession = sparkSession, sparkSession = sparkSession,

View File

@@ -25,9 +25,9 @@ import org.apache.spark.sql.execution.datasources.{FilePartition, FileScanRDD, P
case class HoodieBaseFileSplit(filePartition: FilePartition) extends HoodieFileSplit case class HoodieBaseFileSplit(filePartition: FilePartition) extends HoodieFileSplit
class HoodieFileScanRDD(@transient private val sparkSession: SparkSession, class HoodieFileScanRDD(@transient private val sparkSession: SparkSession,
readFunction: PartitionedFile => Iterator[InternalRow], read: PartitionedFile => Iterator[InternalRow],
@transient fileSplits: Seq[HoodieBaseFileSplit]) @transient fileSplits: Seq[HoodieBaseFileSplit])
extends FileScanRDD(sparkSession, readFunction, fileSplits.map(_.filePartition)) extends FileScanRDD(sparkSession, read, fileSplits.map(_.filePartition))
with HoodieUnsafeRDD { with HoodieUnsafeRDD {
override final def collect(): Array[InternalRow] = super[HoodieUnsafeRDD].collect() override final def collect(): Array[InternalRow] = super[HoodieUnsafeRDD].collect()

View File

@@ -23,9 +23,10 @@ import org.apache.avro.generic.{GenericRecord, GenericRecordBuilder, IndexedReco
import org.apache.hadoop.conf.Configuration import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path import org.apache.hadoop.fs.Path
import org.apache.hadoop.mapred.JobConf import org.apache.hadoop.mapred.JobConf
import org.apache.hudi.HoodieBaseRelation.BaseFileReader import org.apache.hudi.HoodieBaseRelation.{BaseFileReader, generateUnsafeProjection, projectReader}
import org.apache.hudi.HoodieConversionUtils.{toJavaOption, toScalaOption} import org.apache.hudi.HoodieConversionUtils.{toJavaOption, toScalaOption}
import org.apache.hudi.HoodieMergeOnReadRDD.{AvroDeserializerSupport, collectFieldOrdinals, getPartitionPath, projectAvro, projectAvroUnsafe, projectRowUnsafe, resolveAvroSchemaNullability} import org.apache.hudi.HoodieMergeOnReadRDD.SafeAvroProjection.collectFieldOrdinals
import org.apache.hudi.HoodieMergeOnReadRDD._
import org.apache.hudi.common.config.{HoodieCommonConfig, HoodieMetadataConfig} import org.apache.hudi.common.config.{HoodieCommonConfig, HoodieMetadataConfig}
import org.apache.hudi.common.engine.HoodieLocalEngineContext import org.apache.hudi.common.engine.HoodieLocalEngineContext
import org.apache.hudi.common.fs.FSUtils import org.apache.hudi.common.fs.FSUtils
@@ -43,8 +44,6 @@ import org.apache.hudi.metadata.{HoodieBackedTableMetadata, HoodieTableMetadata}
import org.apache.spark.rdd.RDD import org.apache.spark.rdd.RDD
import org.apache.spark.sql.avro.HoodieAvroDeserializer import org.apache.spark.sql.avro.HoodieAvroDeserializer
import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.{SpecificInternalRow, UnsafeProjection}
import org.apache.spark.sql.execution.datasources.PartitionedFile
import org.apache.spark.sql.types.StructType import org.apache.spark.sql.types.StructType
import org.apache.spark.{Partition, SerializableWritable, SparkContext, TaskContext} import org.apache.spark.{Partition, SerializableWritable, SparkContext, TaskContext}
@@ -56,14 +55,42 @@ import scala.util.Try
case class HoodieMergeOnReadPartition(index: Int, split: HoodieMergeOnReadFileSplit) extends Partition case class HoodieMergeOnReadPartition(index: Int, split: HoodieMergeOnReadFileSplit) extends Partition
case class HoodieMergeOnReadBaseFileReaders(fullSchemaFileReader: BaseFileReader, /**
requiredSchemaFileReaderForMerging: BaseFileReader, * Class holding base-file readers for 3 different use-cases:
requiredSchemaFileReaderForNoMerging: BaseFileReader) *
* <ol>
* <li>Full-schema reader: is used when whole row has to be read to perform merging correctly.
* This could occur, when no optimizations could be applied and we have to fallback to read the whole row from
* the base file and the corresponding delta-log file to merge them correctly</li>
*
* <li>Required-schema reader: is used when it's fine to only read row's projected columns.
* This could occur, when row could be merged with corresponding delta-log record leveraging while only having
* projected columns</li>
*
* <li>Required-schema reader (skip-merging): is used when when no merging will be performed (skip-merged).
* This could occur, when file-group has no delta-log files</li>
* </ol>
*/
private[hudi] case class HoodieMergeOnReadBaseFileReaders(fullSchemaReader: BaseFileReader,
requiredSchemaReader: BaseFileReader,
requiredSchemaReaderSkipMerging: BaseFileReader)
/**
* RDD enabling Hudi's Merge-on-Read (MOR) semantic
*
* @param sc spark's context
* @param config hadoop configuration
* @param fileReaders suite of base file readers
* @param tableSchema table's full schema
* @param requiredSchema expected (potentially) projected schema
* @param tableState table's state
* @param mergeType type of merge performed
* @param fileSplits target file-splits this RDD will be iterating over
*/
class HoodieMergeOnReadRDD(@transient sc: SparkContext, class HoodieMergeOnReadRDD(@transient sc: SparkContext,
@transient config: Configuration, @transient config: Configuration,
fileReaders: HoodieMergeOnReadBaseFileReaders, fileReaders: HoodieMergeOnReadBaseFileReaders,
dataSchema: HoodieTableSchema, tableSchema: HoodieTableSchema,
requiredSchema: HoodieTableSchema, requiredSchema: HoodieTableSchema,
tableState: HoodieTableState, tableState: HoodieTableState,
mergeType: String, mergeType: String,
@@ -90,18 +117,19 @@ class HoodieMergeOnReadRDD(@transient sc: SparkContext,
val mergeOnReadPartition = split.asInstanceOf[HoodieMergeOnReadPartition] val mergeOnReadPartition = split.asInstanceOf[HoodieMergeOnReadPartition]
val iter = mergeOnReadPartition.split match { val iter = mergeOnReadPartition.split match {
case dataFileOnlySplit if dataFileOnlySplit.logFiles.isEmpty => case dataFileOnlySplit if dataFileOnlySplit.logFiles.isEmpty =>
fileReaders.requiredSchemaFileReaderForNoMerging.apply(dataFileOnlySplit.dataFile.get) val projectedReader = projectReader(fileReaders.requiredSchemaReaderSkipMerging, requiredSchema.structTypeSchema)
projectedReader(dataFileOnlySplit.dataFile.get)
case logFileOnlySplit if logFileOnlySplit.dataFile.isEmpty => case logFileOnlySplit if logFileOnlySplit.dataFile.isEmpty =>
new LogFileIterator(logFileOnlySplit, getConfig) new LogFileIterator(logFileOnlySplit, getConfig)
case split if mergeType.equals(DataSourceReadOptions.REALTIME_SKIP_MERGE_OPT_VAL) => case split if mergeType.equals(DataSourceReadOptions.REALTIME_SKIP_MERGE_OPT_VAL) =>
val baseFileIterator = fileReaders.requiredSchemaFileReaderForNoMerging.apply(split.dataFile.get) val reader = fileReaders.requiredSchemaReaderSkipMerging
new SkipMergeIterator(split, baseFileIterator, getConfig) new SkipMergeIterator(split, reader, getConfig)
case split if mergeType.equals(DataSourceReadOptions.REALTIME_PAYLOAD_COMBINE_OPT_VAL) => case split if mergeType.equals(DataSourceReadOptions.REALTIME_PAYLOAD_COMBINE_OPT_VAL) =>
val (baseFileIterator, schema) = readBaseFile(split) val reader = pickBaseFileReader
new RecordMergingFileIterator(split, baseFileIterator, schema, getConfig) new RecordMergingFileIterator(split, reader, getConfig)
case _ => throw new HoodieException(s"Unable to select an Iterator to read the Hoodie MOR File Split for " + case _ => throw new HoodieException(s"Unable to select an Iterator to read the Hoodie MOR File Split for " +
s"file path: ${mergeOnReadPartition.split.dataFile.get.filePath}" + s"file path: ${mergeOnReadPartition.split.dataFile.get.filePath}" +
@@ -120,7 +148,7 @@ class HoodieMergeOnReadRDD(@transient sc: SparkContext,
iter iter
} }
private def readBaseFile(split: HoodieMergeOnReadFileSplit): (Iterator[InternalRow], HoodieTableSchema) = { private def pickBaseFileReader: BaseFileReader = {
// NOTE: This is an optimization making sure that even for MOR tables we fetch absolute minimum // NOTE: This is an optimization making sure that even for MOR tables we fetch absolute minimum
// of the stored data possible, while still properly executing corresponding relation's semantic // of the stored data possible, while still properly executing corresponding relation's semantic
// and meet the query's requirements. // and meet the query's requirements.
@@ -129,10 +157,11 @@ class HoodieMergeOnReadRDD(@transient sc: SparkContext,
// a) It does use one of the standard (and whitelisted) Record Payload classes // a) It does use one of the standard (and whitelisted) Record Payload classes
// then we can avoid reading and parsing the records w/ _full_ schema, and instead only // then we can avoid reading and parsing the records w/ _full_ schema, and instead only
// rely on projected one, nevertheless being able to perform merging correctly // rely on projected one, nevertheless being able to perform merging correctly
if (!whitelistedPayloadClasses.contains(tableState.recordPayloadClassName)) if (whitelistedPayloadClasses.contains(tableState.recordPayloadClassName)) {
(fileReaders.fullSchemaFileReader(split.dataFile.get), dataSchema) fileReaders.requiredSchemaReader
else } else {
(fileReaders.requiredSchemaFileReaderForMerging(split.dataFile.get), requiredSchema) fileReaders.fullSchemaReader
}
} }
override protected def getPartitions: Array[Partition] = override protected def getPartitions: Array[Partition] =
@@ -156,38 +185,27 @@ class HoodieMergeOnReadRDD(@transient sc: SparkContext,
protected override val requiredAvroSchema: Schema = new Schema.Parser().parse(requiredSchema.avroSchemaStr) protected override val requiredAvroSchema: Schema = new Schema.Parser().parse(requiredSchema.avroSchemaStr)
protected override val requiredStructTypeSchema: StructType = requiredSchema.structTypeSchema protected override val requiredStructTypeSchema: StructType = requiredSchema.structTypeSchema
protected val logFileReaderAvroSchema: Schema = new Schema.Parser().parse(dataSchema.avroSchemaStr) protected val logFileReaderAvroSchema: Schema = new Schema.Parser().parse(tableSchema.avroSchemaStr)
protected val recordBuilder: GenericRecordBuilder = new GenericRecordBuilder(requiredAvroSchema)
protected var recordToLoad: InternalRow = _ protected var recordToLoad: InternalRow = _
// TODO validate whether we need to do UnsafeProjection private val requiredSchemaSafeAvroProjection = SafeAvroProjection.create(logFileReaderAvroSchema, requiredAvroSchema)
protected val unsafeProjection: UnsafeProjection = UnsafeProjection.create(requiredStructTypeSchema)
// NOTE: This maps _required_ schema fields onto the _full_ table schema, collecting their "ordinals"
// w/in the record payload. This is required, to project records read from the Delta Log file
// which always reads records in full schema (never projected, due to the fact that DL file might
// be stored in non-columnar formats like Avro, HFile, etc)
private val requiredSchemaFieldOrdinals: List[Int] = collectFieldOrdinals(requiredAvroSchema, logFileReaderAvroSchema)
private var logScanner = { private var logScanner = {
val internalSchema = dataSchema.internalSchema.getOrElse(InternalSchema.getEmptyInternalSchema) val internalSchema = tableSchema.internalSchema.getOrElse(InternalSchema.getEmptyInternalSchema)
HoodieMergeOnReadRDD.scanLog(split.logFiles, getPartitionPath(split), logFileReaderAvroSchema, tableState, HoodieMergeOnReadRDD.scanLog(split.logFiles, getPartitionPath(split), logFileReaderAvroSchema, tableState,
maxCompactionMemoryInBytes, config, internalSchema) maxCompactionMemoryInBytes, config, internalSchema)
} }
private val logRecords = logScanner.getRecords.asScala private val logRecords = logScanner.getRecords.asScala
// NOTE: This iterator iterates over already projected (in required schema) records
// NOTE: This have to stay lazy to make sure it's initialized only at the point where it's // NOTE: This have to stay lazy to make sure it's initialized only at the point where it's
// going to be used, since we modify `logRecords` before that and therefore can't do it any earlier // going to be used, since we modify `logRecords` before that and therefore can't do it any earlier
protected lazy val logRecordsIterator: Iterator[Option[GenericRecord]] = protected lazy val logRecordsIterator: Iterator[Option[GenericRecord]] =
logRecords.iterator.map { logRecords.iterator.map {
case (_, record) => case (_, record) =>
val avroRecordOpt = toScalaOption(record.getData.getInsertValue(logFileReaderAvroSchema, payloadProps)) toScalaOption(record.getData.getInsertValue(logFileReaderAvroSchema, payloadProps))
avroRecordOpt.map { .map(_.asInstanceOf[GenericRecord])
avroRecord => projectAvroUnsafe(avroRecord, requiredAvroSchema, requiredSchemaFieldOrdinals, recordBuilder)
}
} }
protected def removeLogRecord(key: String): Option[HoodieRecord[_ <: HoodieRecordPayload[_]]] = protected def removeLogRecord(key: String): Option[HoodieRecord[_ <: HoodieRecordPayload[_]]] =
@@ -205,7 +223,8 @@ class HoodieMergeOnReadRDD(@transient sc: SparkContext,
// Record has been deleted, skipping // Record has been deleted, skipping
this.hasNextInternal this.hasNextInternal
} else { } else {
recordToLoad = unsafeProjection(deserialize(avroRecordOpt.get)) val projectedAvroRecord = requiredSchemaSafeAvroProjection(avroRecordOpt.get)
recordToLoad = deserialize(projectedAvroRecord)
true true
} }
} }
@@ -229,14 +248,18 @@ class HoodieMergeOnReadRDD(@transient sc: SparkContext,
* performing any combination/merging of the records w/ the same primary keys (ie producing duplicates potentially) * performing any combination/merging of the records w/ the same primary keys (ie producing duplicates potentially)
*/ */
private class SkipMergeIterator(split: HoodieMergeOnReadFileSplit, private class SkipMergeIterator(split: HoodieMergeOnReadFileSplit,
baseFileIterator: Iterator[InternalRow], baseFileReader: BaseFileReader,
config: Configuration) config: Configuration)
extends LogFileIterator(split, config) { extends LogFileIterator(split, config) {
private val requiredSchemaUnsafeProjection = generateUnsafeProjection(baseFileReader.schema, requiredStructTypeSchema)
private val baseFileIterator = baseFileReader(split.dataFile.get)
override def hasNext: Boolean = { override def hasNext: Boolean = {
if (baseFileIterator.hasNext) { if (baseFileIterator.hasNext) {
val curRow = baseFileIterator.next() // No merge is required, simply load current row and project into required schema
recordToLoad = curRow recordToLoad = requiredSchemaUnsafeProjection(baseFileIterator.next())
true true
} else { } else {
super[LogFileIterator].hasNext super[LogFileIterator].hasNext
@@ -250,8 +273,7 @@ class HoodieMergeOnReadRDD(@transient sc: SparkContext,
* streams * streams
*/ */
private class RecordMergingFileIterator(split: HoodieMergeOnReadFileSplit, private class RecordMergingFileIterator(split: HoodieMergeOnReadFileSplit,
baseFileIterator: Iterator[InternalRow], baseFileReader: BaseFileReader,
baseFileReaderSchema: HoodieTableSchema,
config: Configuration) config: Configuration)
extends LogFileIterator(split, config) { extends LogFileIterator(split, config) {
@@ -260,13 +282,17 @@ class HoodieMergeOnReadRDD(@transient sc: SparkContext,
// - Projected schema // - Projected schema
// As such, no particular schema could be assumed, and therefore we rely on the caller // As such, no particular schema could be assumed, and therefore we rely on the caller
// to correspondingly set the scheme of the expected output of base-file reader // to correspondingly set the scheme of the expected output of base-file reader
private val baseFileReaderAvroSchema = new Schema.Parser().parse(baseFileReaderSchema.avroSchemaStr) private val baseFileReaderAvroSchema = sparkAdapter.getAvroSchemaConverters.toAvroType(baseFileReader.schema, nullable = false, "record")
private val requiredSchemaFieldOrdinals: List[Int] = collectFieldOrdinals(requiredAvroSchema, baseFileReaderAvroSchema)
private val serializer = sparkAdapter.createAvroSerializer(baseFileReaderSchema.structTypeSchema, private val serializer = sparkAdapter.createAvroSerializer(baseFileReader.schema, baseFileReaderAvroSchema, nullable = false)
baseFileReaderAvroSchema, resolveAvroSchemaNullability(baseFileReaderAvroSchema))
private val recordKeyOrdinal = baseFileReaderSchema.structTypeSchema.fieldIndex(tableState.recordKeyField) private val reusableRecordBuilder: GenericRecordBuilder = new GenericRecordBuilder(requiredAvroSchema)
private val recordKeyOrdinal = baseFileReader.schema.fieldIndex(tableState.recordKeyField)
private val requiredSchemaUnsafeProjection = generateUnsafeProjection(baseFileReader.schema, requiredStructTypeSchema)
private val baseFileIterator = baseFileReader(split.dataFile.get)
override def hasNext: Boolean = hasNextInternal override def hasNext: Boolean = hasNextInternal
@@ -275,26 +301,22 @@ class HoodieMergeOnReadRDD(@transient sc: SparkContext,
// handling records // handling records
@tailrec private def hasNextInternal: Boolean = { @tailrec private def hasNextInternal: Boolean = {
if (baseFileIterator.hasNext) { if (baseFileIterator.hasNext) {
val curRowRecord = baseFileIterator.next() val curRow = baseFileIterator.next()
val curKey = curRowRecord.getString(recordKeyOrdinal) val curKey = curRow.getString(recordKeyOrdinal)
val updatedRecordOpt = removeLogRecord(curKey) val updatedRecordOpt = removeLogRecord(curKey)
if (updatedRecordOpt.isEmpty) { if (updatedRecordOpt.isEmpty) {
// No merge needed, load current row with required projected schema // No merge is required, simply load current row and project into required schema
recordToLoad = unsafeProjection(projectRowUnsafe(curRowRecord, requiredSchema.structTypeSchema, requiredSchemaFieldOrdinals)) recordToLoad = requiredSchemaUnsafeProjection(curRow)
true true
} else { } else {
val mergedAvroRecordOpt = merge(serialize(curRowRecord), updatedRecordOpt.get) val mergedAvroRecordOpt = merge(serialize(curRow), updatedRecordOpt.get)
if (mergedAvroRecordOpt.isEmpty) { if (mergedAvroRecordOpt.isEmpty) {
// Record has been deleted, skipping // Record has been deleted, skipping
this.hasNextInternal this.hasNextInternal
} else { } else {
// NOTE: In occurrence of a merge we can't know the schema of the record being returned, b/c val projectedAvroRecord = projectAvroUnsafe(mergedAvroRecordOpt.get.asInstanceOf[GenericRecord],
// record from the Delta Log will bear (full) Table schema, while record from the Base file requiredAvroSchema, reusableRecordBuilder)
// might already be read in projected one (as an optimization). recordToLoad = deserialize(projectedAvroRecord)
// As such we can't use more performant [[projectAvroUnsafe]], and instead have to fallback
// to [[projectAvro]]
val projectedAvroRecord = projectAvro(mergedAvroRecordOpt.get, requiredAvroSchema, recordBuilder)
recordToLoad = unsafeProjection(deserialize(projectedAvroRecord))
true true
} }
} }
@@ -381,66 +403,10 @@ private object HoodieMergeOnReadRDD {
} }
} }
/** private def projectAvroUnsafe(record: GenericRecord, projectedSchema: Schema, reusableRecordBuilder: GenericRecordBuilder): GenericRecord = {
* Projects provided instance of [[InternalRow]] into provided schema, assuming that the
* the schema of the original row is strictly a superset of the given one
*/
private def projectRowUnsafe(row: InternalRow,
projectedSchema: StructType,
ordinals: Seq[Int]): InternalRow = {
val projectedRow = new SpecificInternalRow(projectedSchema)
var curIndex = 0
projectedSchema.zip(ordinals).foreach { case (field, pos) =>
val curField = if (row.isNullAt(pos)) {
null
} else {
row.get(pos, field.dataType)
}
projectedRow.update(curIndex, curField)
curIndex += 1
}
projectedRow
}
/**
* Projects provided instance of [[IndexedRecord]] into provided schema, assuming that the
* the schema of the original row is strictly a superset of the given one
*/
def projectAvroUnsafe(record: IndexedRecord,
projectedSchema: Schema,
ordinals: List[Int],
recordBuilder: GenericRecordBuilder): GenericRecord = {
val fields = projectedSchema.getFields.asScala val fields = projectedSchema.getFields.asScala
checkState(fields.length == ordinals.length) fields.foreach(field => reusableRecordBuilder.set(field, record.get(field.name())))
fields.zip(ordinals).foreach { reusableRecordBuilder.build()
case (field, pos) => recordBuilder.set(field, record.get(pos))
}
recordBuilder.build()
}
/**
* Projects provided instance of [[IndexedRecord]] into provided schema, assuming that the
* the schema of the original row is strictly a superset of the given one
*
* This is a "safe" counterpart of [[projectAvroUnsafe]]: it does build mapping of the record's
* schema into projected one itself (instead of expecting such mapping from the caller)
*/
def projectAvro(record: IndexedRecord,
projectedSchema: Schema,
recordBuilder: GenericRecordBuilder): GenericRecord = {
projectAvroUnsafe(record, projectedSchema, collectFieldOrdinals(projectedSchema, record.getSchema), recordBuilder)
}
/**
* Maps [[projected]] [[Schema]] onto [[source]] one, collecting corresponding field ordinals w/in it, which
* will be subsequently used by either [[projectRowUnsafe]] or [[projectAvroUnsafe()]] method
*
* @param projected target projected schema (which is a proper subset of [[source]] [[Schema]])
* @param source source schema of the record being projected
* @return list of ordinals of corresponding fields of [[projected]] schema w/in [[source]] one
*/
private def collectFieldOrdinals(projected: Schema, source: Schema): List[Int] = {
projected.getFields.asScala.map(f => source.getField(f.name()).pos()).toList
} }
private def getPartitionPath(split: HoodieMergeOnReadFileSplit): Path = { private def getPartitionPath(split: HoodieMergeOnReadFileSplit): Path = {
@@ -452,9 +418,48 @@ private object HoodieMergeOnReadRDD {
.getParent .getParent
} }
private def resolveAvroSchemaNullability(schema: Schema) = { // TODO extract to HoodieAvroSchemaUtils
AvroConversionUtils.resolveAvroTypeNullability(schema) match { abstract class AvroProjection extends (GenericRecord => GenericRecord)
case (nullable, _) => nullable
class SafeAvroProjection(sourceSchema: Schema,
projectedSchema: Schema,
reusableRecordBuilder: GenericRecordBuilder = null) extends AvroProjection {
private val ordinals: List[Int] = collectFieldOrdinals(projectedSchema, sourceSchema)
private val recordBuilder: GenericRecordBuilder =
if (reusableRecordBuilder != null) {
reusableRecordBuilder
} else {
new GenericRecordBuilder(projectedSchema)
}
override def apply(record: GenericRecord): GenericRecord = {
val fields = projectedSchema.getFields.asScala
checkState(fields.length == ordinals.length)
fields.zip(ordinals).foreach {
case (field, pos) => recordBuilder.set(field, record.get(pos))
}
recordBuilder.build()
}
}
object SafeAvroProjection {
def create(sourceSchema: Schema, projectedSchema: Schema, reusableRecordBuilder: GenericRecordBuilder = null): SafeAvroProjection =
new SafeAvroProjection(
sourceSchema = sourceSchema,
projectedSchema = projectedSchema,
reusableRecordBuilder = reusableRecordBuilder)
/**
* Maps [[projected]] [[Schema]] onto [[source]] one, collecting corresponding field ordinals w/in it, which
* will be subsequently used by either [[projectRowUnsafe]] or [[projectAvroUnsafe()]] method
*
* @param projected target projected schema (which is a proper subset of [[source]] [[Schema]])
* @param source source schema of the record being projected
* @return list of ordinals of corresponding fields of [[projected]] schema w/in [[source]] one
*/
private def collectFieldOrdinals(projected: Schema, source: Schema): List[Int] = {
projected.getFields.asScala.map(f => source.getField(f.name()).pos()).toList
} }
} }

View File

@@ -17,7 +17,6 @@
package org.apache.hudi package org.apache.hudi
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{GlobPattern, Path} import org.apache.hadoop.fs.{GlobPattern, Path}
import org.apache.hudi.HoodieConversionUtils.toScalaOption import org.apache.hudi.HoodieConversionUtils.toScalaOption
import org.apache.hudi.common.model.{FileSlice, HoodieRecord} import org.apache.hudi.common.model.{FileSlice, HoodieRecord}
@@ -27,7 +26,9 @@ import org.apache.hudi.common.table.view.HoodieTableFileSystemView
import org.apache.hudi.common.util.StringUtils import org.apache.hudi.common.util.StringUtils
import org.apache.hudi.exception.HoodieException import org.apache.hudi.exception.HoodieException
import org.apache.hudi.hadoop.utils.HoodieInputFormatUtils.{getCommitMetadata, getWritePartitionPaths, listAffectedFilesForCommits} import org.apache.hudi.hadoop.utils.HoodieInputFormatUtils.{getCommitMetadata, getWritePartitionPaths, listAffectedFilesForCommits}
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SQLContext import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.Expression import org.apache.spark.sql.catalyst.expressions.Expression
import org.apache.spark.sql.sources._ import org.apache.spark.sql.sources._
import org.apache.spark.sql.types.StructType import org.apache.spark.sql.types.StructType
@@ -58,32 +59,15 @@ class MergeOnReadIncrementalRelation(sqlContext: SQLContext,
} }
protected override def composeRDD(fileSplits: Seq[HoodieMergeOnReadFileSplit], protected override def composeRDD(fileSplits: Seq[HoodieMergeOnReadFileSplit],
partitionSchema: StructType, tableSchema: HoodieTableSchema,
dataSchema: HoodieTableSchema,
requiredSchema: HoodieTableSchema, requiredSchema: HoodieTableSchema,
requestedColumns: Array[String], requestedColumns: Array[String],
filters: Array[Filter]): HoodieMergeOnReadRDD = { filters: Array[Filter]): RDD[InternalRow] = {
val fullSchemaParquetReader = createBaseFileReader( // The only required filters are ones that make sure we're only fetching records that
spark = sqlContext.sparkSession, // fall into incremental span of the timeline being queried
partitionSchema = partitionSchema, val requiredFilters = incrementalSpanRecordFilters
dataSchema = dataSchema, val optionalFilters = filters
requiredSchema = dataSchema, val readers = createBaseFileReaders(tableSchema, requiredSchema, requestedColumns, requiredFilters, optionalFilters)
// This file-reader is used to read base file records, subsequently merging them with the records
// stored in delta-log files. As such, we have to read _all_ records from the base file, while avoiding
// applying any user-defined filtering _before_ we complete combining them w/ delta-log records (to make sure that
// we combine them correctly)
//
// The only filtering applicable here is the filtering to make sure we're only fetching records that
// fall into incremental span of the timeline being queried
filters = incrementalSpanRecordFilters,
options = optParams,
// NOTE: We have to fork the Hadoop Config here as Spark will be modifying it
// to configure Parquet reader appropriately
hadoopConf = embedInternalSchema(new Configuration(conf), internalSchemaOpt)
)
val (requiredSchemaBaseFileReaderMerging, requiredSchemaBaseFileReaderNoMerging) =
createMergeOnReadBaseFileReaders(partitionSchema, dataSchema, requiredSchema, requestedColumns, filters ++ incrementalSpanRecordFilters)
val hoodieTableState = getTableState val hoodieTableState = getTableState
// TODO(HUDI-3639) implement incremental span record filtering w/in RDD to make sure returned iterator is appropriately // TODO(HUDI-3639) implement incremental span record filtering w/in RDD to make sure returned iterator is appropriately
@@ -91,12 +75,8 @@ class MergeOnReadIncrementalRelation(sqlContext: SQLContext,
new HoodieMergeOnReadRDD( new HoodieMergeOnReadRDD(
sqlContext.sparkContext, sqlContext.sparkContext,
config = jobConf, config = jobConf,
fileReaders = HoodieMergeOnReadBaseFileReaders( fileReaders = readers,
fullSchemaFileReader = fullSchemaParquetReader, tableSchema = tableSchema,
requiredSchemaFileReaderForMerging = requiredSchemaBaseFileReaderMerging,
requiredSchemaFileReaderForNoMerging = requiredSchemaBaseFileReaderNoMerging
),
dataSchema = dataSchema,
requiredSchema = requiredSchema, requiredSchema = requiredSchema,
tableState = hoodieTableState, tableState = hoodieTableState,
mergeType = mergeType, mergeType = mergeType,

View File

@@ -29,6 +29,7 @@ import org.apache.hudi.common.model.{FileSlice, HoodieLogFile}
import org.apache.hudi.common.table.HoodieTableMetaClient import org.apache.hudi.common.table.HoodieTableMetaClient
import org.apache.hudi.common.table.view.HoodieTableFileSystemView import org.apache.hudi.common.table.view.HoodieTableFileSystemView
import org.apache.spark.execution.datasources.HoodieInMemoryFileIndex import org.apache.spark.execution.datasources.HoodieInMemoryFileIndex
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SQLContext import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.Expression import org.apache.spark.sql.catalyst.expressions.Expression
@@ -80,44 +81,113 @@ class MergeOnReadSnapshotRelation(sqlContext: SQLContext,
} }
protected override def composeRDD(fileSplits: Seq[HoodieMergeOnReadFileSplit], protected override def composeRDD(fileSplits: Seq[HoodieMergeOnReadFileSplit],
partitionSchema: StructType, tableSchema: HoodieTableSchema,
dataSchema: HoodieTableSchema,
requiredSchema: HoodieTableSchema, requiredSchema: HoodieTableSchema,
requestedColumns: Array[String], requestedColumns: Array[String],
filters: Array[Filter]): HoodieMergeOnReadRDD = { filters: Array[Filter]): RDD[InternalRow] = {
val fullSchemaBaseFileReader = createBaseFileReader( val requiredFilters = Seq.empty
val optionalFilters = filters
val readers = createBaseFileReaders(tableSchema, requiredSchema, requestedColumns, requiredFilters, optionalFilters)
val tableState = getTableState
new HoodieMergeOnReadRDD(
sqlContext.sparkContext,
config = jobConf,
fileReaders = readers,
tableSchema = tableSchema,
requiredSchema = requiredSchema,
tableState = tableState,
mergeType = mergeType,
fileSplits = fileSplits)
}
protected def createBaseFileReaders(tableSchema: HoodieTableSchema,
requiredSchema: HoodieTableSchema,
requestedColumns: Array[String],
requiredFilters: Seq[Filter],
optionalFilters: Seq[Filter] = Seq.empty): HoodieMergeOnReadBaseFileReaders = {
val (partitionSchema, dataSchema, requiredDataSchema) =
tryPrunePartitionColumns(tableSchema, requiredSchema)
val fullSchemaReader = createBaseFileReader(
spark = sqlContext.sparkSession, spark = sqlContext.sparkSession,
partitionSchema = partitionSchema, partitionSchema = partitionSchema,
dataSchema = dataSchema, dataSchema = dataSchema,
requiredSchema = dataSchema, requiredDataSchema = dataSchema,
// This file-reader is used to read base file records, subsequently merging them with the records // This file-reader is used to read base file records, subsequently merging them with the records
// stored in delta-log files. As such, we have to read _all_ records from the base file, while avoiding // stored in delta-log files. As such, we have to read _all_ records from the base file, while avoiding
// applying any filtering _before_ we complete combining them w/ delta-log records (to make sure that // applying any filtering _before_ we complete combining them w/ delta-log records (to make sure that
// we combine them correctly) // we combine them correctly);
filters = Seq.empty, // As such only required filters could be pushed-down to such reader
filters = requiredFilters,
options = optParams, options = optParams,
// NOTE: We have to fork the Hadoop Config here as Spark will be modifying it // NOTE: We have to fork the Hadoop Config here as Spark will be modifying it
// to configure Parquet reader appropriately // to configure Parquet reader appropriately
hadoopConf = embedInternalSchema(new Configuration(conf), internalSchemaOpt) hadoopConf = embedInternalSchema(new Configuration(conf), internalSchemaOpt)
) )
val (requiredSchemaBaseFileReaderMerging, requiredSchemaBaseFileReaderNoMerging) = val requiredSchemaReader = createBaseFileReader(
createMergeOnReadBaseFileReaders(partitionSchema, dataSchema, requiredSchema, requestedColumns, filters) spark = sqlContext.sparkSession,
partitionSchema = partitionSchema,
val tableState = getTableState
new HoodieMergeOnReadRDD(
sqlContext.sparkContext,
config = jobConf,
fileReaders = HoodieMergeOnReadBaseFileReaders(
fullSchemaFileReader = fullSchemaBaseFileReader,
requiredSchemaFileReaderForMerging = requiredSchemaBaseFileReaderMerging,
requiredSchemaFileReaderForNoMerging = requiredSchemaBaseFileReaderNoMerging
),
dataSchema = dataSchema, dataSchema = dataSchema,
requiredSchema = requiredSchema, requiredDataSchema = requiredDataSchema,
tableState = tableState, // This file-reader is used to read base file records, subsequently merging them with the records
mergeType = mergeType, // stored in delta-log files. As such, we have to read _all_ records from the base file, while avoiding
fileSplits = fileSplits) // applying any filtering _before_ we complete combining them w/ delta-log records (to make sure that
// we combine them correctly);
// As such only required filters could be pushed-down to such reader
filters = requiredFilters,
options = optParams,
// NOTE: We have to fork the Hadoop Config here as Spark will be modifying it
// to configure Parquet reader appropriately
hadoopConf = embedInternalSchema(new Configuration(conf), requiredDataSchema.internalSchema)
)
// Check whether fields required for merging were also requested to be fetched
// by the query:
// - In case they were, there's no optimization we could apply here (we will have
// to fetch such fields)
// - In case they were not, we will provide 2 separate file-readers
// a) One which would be applied to file-groups w/ delta-logs (merging)
// b) One which would be applied to file-groups w/ no delta-logs or
// in case query-mode is skipping merging
val mandatoryColumns = mandatoryFieldsForMerging.map(HoodieAvroUtils.getRootLevelFieldName)
if (mandatoryColumns.forall(requestedColumns.contains)) {
HoodieMergeOnReadBaseFileReaders(
fullSchemaReader = fullSchemaReader,
requiredSchemaReader = requiredSchemaReader,
requiredSchemaReaderSkipMerging = requiredSchemaReader
)
} else {
val prunedRequiredSchema = {
val superfluousColumnNames = mandatoryColumns.filterNot(requestedColumns.contains)
val prunedStructSchema =
StructType(requiredDataSchema.structTypeSchema.fields
.filterNot(f => superfluousColumnNames.contains(f.name)))
HoodieTableSchema(prunedStructSchema, convertToAvroSchema(prunedStructSchema).toString)
}
val requiredSchemaReaderSkipMerging = createBaseFileReader(
spark = sqlContext.sparkSession,
partitionSchema = partitionSchema,
dataSchema = dataSchema,
requiredDataSchema = prunedRequiredSchema,
// This file-reader is only used in cases when no merging is performed, therefore it's safe to push
// down these filters to the base file readers
filters = requiredFilters ++ optionalFilters,
options = optParams,
// NOTE: We have to fork the Hadoop Config here as Spark will be modifying it
// to configure Parquet reader appropriately
hadoopConf = embedInternalSchema(new Configuration(conf), requiredDataSchema.internalSchema)
)
HoodieMergeOnReadBaseFileReaders(
fullSchemaReader = fullSchemaReader,
requiredSchemaReader = requiredSchemaReader,
requiredSchemaReaderSkipMerging = requiredSchemaReaderSkipMerging
)
}
} }
protected override def collectFileSplits(partitionFilters: Seq[Expression], dataFilters: Seq[Expression]): List[HoodieMergeOnReadFileSplit] = { protected override def collectFileSplits(partitionFilters: Seq[Expression], dataFilters: Seq[Expression]): List[HoodieMergeOnReadFileSplit] = {
@@ -156,7 +226,7 @@ class MergeOnReadSnapshotRelation(sqlContext: SQLContext,
spark = sqlContext.sparkSession, spark = sqlContext.sparkSession,
partitionSchema = partitionSchema, partitionSchema = partitionSchema,
dataSchema = dataSchema, dataSchema = dataSchema,
requiredSchema = requiredDataSchema, requiredDataSchema = requiredDataSchema,
filters = filters, filters = filters,
options = optParams, options = optParams,
// NOTE: We have to fork the Hadoop Config here as Spark will be modifying it // NOTE: We have to fork the Hadoop Config here as Spark will be modifying it
@@ -189,7 +259,7 @@ class MergeOnReadSnapshotRelation(sqlContext: SQLContext,
spark = sqlContext.sparkSession, spark = sqlContext.sparkSession,
partitionSchema = partitionSchema, partitionSchema = partitionSchema,
dataSchema = dataSchema, dataSchema = dataSchema,
requiredSchema = prunedRequiredSchema, requiredDataSchema = prunedRequiredSchema,
filters = filters, filters = filters,
options = optParams, options = optParams,
// NOTE: We have to fork the Hadoop Config here as Spark will be modifying it // NOTE: We have to fork the Hadoop Config here as Spark will be modifying it

View File

@@ -384,7 +384,7 @@ private object ColumnStatsExpressionUtils {
* Returns only [[AttributeReference]] contained as a sub-expression * Returns only [[AttributeReference]] contained as a sub-expression
*/ */
object AllowedTransformationExpression extends SparkAdapterSupport { object AllowedTransformationExpression extends SparkAdapterSupport {
val exprUtils: HoodieCatalystExpressionUtils = sparkAdapter.getCatalystExpressionUtils() val exprUtils: HoodieCatalystExpressionUtils = sparkAdapter.getCatalystExpressionUtils
def unapply(expr: Expression): Option[AttributeReference] = { def unapply(expr: Expression): Option[AttributeReference] = {
// First step, we check that expression // First step, we check that expression

View File

@@ -41,7 +41,7 @@ class RunClusteringProcedure extends BaseProcedure
with Logging with Logging
with SparkAdapterSupport { with SparkAdapterSupport {
private val exprUtils = sparkAdapter.getCatalystExpressionUtils() private val exprUtils = sparkAdapter.getCatalystExpressionUtils
/** /**
* OPTIMIZE table_name|table_path [WHERE predicate] * OPTIMIZE table_name|table_path [WHERE predicate]

View File

@@ -57,7 +57,7 @@ case class IndexRow(fileName: String,
class TestDataSkippingUtils extends HoodieClientTestBase with SparkAdapterSupport { class TestDataSkippingUtils extends HoodieClientTestBase with SparkAdapterSupport {
val exprUtils: HoodieCatalystExpressionUtils = sparkAdapter.getCatalystExpressionUtils() val exprUtils: HoodieCatalystExpressionUtils = sparkAdapter.getCatalystExpressionUtils
var spark: SparkSession = _ var spark: SparkSession = _

View File

@@ -17,14 +17,13 @@
package org.apache.spark.sql.hudi package org.apache.spark.sql.hudi
import org.apache.hudi.DataSourceWriteOptions.{KEYGENERATOR_CLASS_NAME, MOR_TABLE_TYPE_OPT_VAL, PARTITIONPATH_FIELD, PRECOMBINE_FIELD, RECORDKEY_FIELD, TABLE_TYPE} import org.apache.hudi.DataSourceWriteOptions._
import org.apache.hudi.HoodieSparkUtils import org.apache.hudi.HoodieSparkUtils
import org.apache.hudi.common.table.{HoodieTableMetaClient, TableSchemaResolver} import org.apache.hudi.common.table.{HoodieTableMetaClient, TableSchemaResolver}
import org.apache.hudi.config.HoodieWriteConfig import org.apache.hudi.config.HoodieWriteConfig
import org.apache.hudi.exception.HoodieDuplicateKeyException import org.apache.hudi.exception.HoodieDuplicateKeyException
import org.apache.hudi.keygen.ComplexKeyGenerator import org.apache.hudi.keygen.ComplexKeyGenerator
import org.apache.spark.sql.SaveMode import org.apache.spark.sql.SaveMode
import org.apache.spark.sql.internal.SQLConf
import java.io.File import java.io.File

View File

@@ -40,7 +40,7 @@ import scala.collection.mutable.ArrayBuffer
*/ */
class Spark2Adapter extends SparkAdapter { class Spark2Adapter extends SparkAdapter {
override def getCatalystExpressionUtils(): HoodieCatalystExpressionUtils = HoodieSpark2CatalystExpressionUtils override def getCatalystExpressionUtils: HoodieCatalystExpressionUtils = HoodieSpark2CatalystExpressionUtils
override def getCatalystPlanUtils: HoodieCatalystPlansUtils = HoodieSpark2CatalystPlanUtils override def getCatalystPlanUtils: HoodieCatalystPlansUtils = HoodieSpark2CatalystPlanUtils

View File

@@ -19,20 +19,20 @@
package org.apache.spark.sql.adapter package org.apache.spark.sql.adapter
import org.apache.avro.Schema import org.apache.avro.Schema
import org.apache.spark.sql.{HoodieCatalystExpressionUtils, HoodieCatalystPlansUtils, HoodieSpark31CatalystExpressionUtils, HoodieSpark31CatalystPlanUtils}
import org.apache.spark.sql.avro.{HoodieAvroDeserializer, HoodieAvroSerializer, HoodieSpark3_1AvroDeserializer, HoodieSpark3_1AvroSerializer} import org.apache.spark.sql.avro.{HoodieAvroDeserializer, HoodieAvroSerializer, HoodieSpark3_1AvroDeserializer, HoodieSpark3_1AvroSerializer}
import org.apache.spark.sql.execution.datasources.parquet.{ParquetFileFormat, Spark31HoodieParquetFileFormat} import org.apache.spark.sql.execution.datasources.parquet.{ParquetFileFormat, Spark31HoodieParquetFileFormat}
import org.apache.spark.sql.hudi.SparkAdapter import org.apache.spark.sql.hudi.SparkAdapter
import org.apache.spark.sql.types.DataType import org.apache.spark.sql.types.DataType
import org.apache.spark.sql.{HoodieCatalystExpressionUtils, HoodieCatalystPlansUtils, HoodieSpark31CatalystExpressionUtils, HoodieSpark31CatalystPlanUtils}
/** /**
* Implementation of [[SparkAdapter]] for Spark 3.1.x * Implementation of [[SparkAdapter]] for Spark 3.1.x
*/ */
class Spark3_1Adapter extends BaseSpark3Adapter { class Spark3_1Adapter extends BaseSpark3Adapter {
def getCatalystPlanUtils: HoodieCatalystPlansUtils = HoodieSpark31CatalystPlanUtils override def getCatalystExpressionUtils: HoodieCatalystExpressionUtils = HoodieSpark31CatalystExpressionUtils
override def getCatalystExpressionUtils(): HoodieCatalystExpressionUtils = HoodieSpark31CatalystExpressionUtils override def getCatalystPlanUtils: HoodieCatalystPlansUtils = HoodieSpark31CatalystPlanUtils
override def createAvroSerializer(rootCatalystType: DataType, rootAvroType: Schema, nullable: Boolean): HoodieAvroSerializer = override def createAvroSerializer(rootCatalystType: DataType, rootAvroType: Schema, nullable: Boolean): HoodieAvroSerializer =
new HoodieSpark3_1AvroSerializer(rootCatalystType, rootAvroType, nullable) new HoodieSpark3_1AvroSerializer(rootCatalystType, rootAvroType, nullable)

View File

@@ -30,15 +30,15 @@ import org.apache.spark.sql._
*/ */
class Spark3_2Adapter extends BaseSpark3Adapter { class Spark3_2Adapter extends BaseSpark3Adapter {
def getCatalystPlanUtils: HoodieCatalystPlansUtils = HoodieSpark32CatalystPlanUtils
override def createAvroSerializer(rootCatalystType: DataType, rootAvroType: Schema, nullable: Boolean): HoodieAvroSerializer = override def createAvroSerializer(rootCatalystType: DataType, rootAvroType: Schema, nullable: Boolean): HoodieAvroSerializer =
new HoodieSpark3_2AvroSerializer(rootCatalystType, rootAvroType, nullable) new HoodieSpark3_2AvroSerializer(rootCatalystType, rootAvroType, nullable)
override def createAvroDeserializer(rootAvroType: Schema, rootCatalystType: DataType): HoodieAvroDeserializer = override def createAvroDeserializer(rootAvroType: Schema, rootCatalystType: DataType): HoodieAvroDeserializer =
new HoodieSpark3_2AvroDeserializer(rootAvroType, rootCatalystType) new HoodieSpark3_2AvroDeserializer(rootAvroType, rootCatalystType)
override def getCatalystExpressionUtils(): HoodieCatalystExpressionUtils = HoodieSpark32CatalystExpressionUtils override def getCatalystExpressionUtils: HoodieCatalystExpressionUtils = HoodieSpark32CatalystExpressionUtils
override def getCatalystPlanUtils: HoodieCatalystPlansUtils = HoodieSpark32CatalystPlanUtils
override def createExtendedSparkParser: Option[(SparkSession, ParserInterface) => ParserInterface] = { override def createExtendedSparkParser: Option[(SparkSession, ParserInterface) => ParserInterface] = {
Some( Some(