1
0

[HUDI-3204] Fixing partition-values being derived from partition-path instead of source columns (#5364)

- Scaffolded `Spark24HoodieParquetFileFormat` extending `ParquetFileFormat` and overriding the behavior of adding partition columns to every row
 - Amended `SparkAdapter`s `createHoodieParquetFileFormat` API to be able to configure whether to append partition values or not
 - Fallback to append partition values in cases when the source columns are not persisted in data-file
 - Fixing HoodieBaseRelation incorrectly handling mandatory columns
This commit is contained in:
Alexey Kudinkin
2022-04-20 04:30:27 -07:00
committed by GitHub
parent 408663c42b
commit f7544e23ac
28 changed files with 1156 additions and 686 deletions

View File

@@ -17,4 +17,4 @@
org.apache.hudi.DefaultSource
org.apache.spark.sql.execution.datasources.parquet.SparkHoodieParquetFileFormat
org.apache.spark.sql.execution.datasources.parquet.HoodieParquetFileFormat

View File

@@ -20,14 +20,13 @@ package org.apache.hudi
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.apache.hudi.HoodieBaseRelation.createBaseFileReader
import org.apache.hudi.common.model.HoodieFileFormat
import org.apache.hudi.common.table.HoodieTableMetaClient
import org.apache.hudi.hadoop.HoodieROTablePathFilter
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.catalyst.expressions.Expression
import org.apache.spark.sql.execution.datasources._
import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat
import org.apache.spark.sql.execution.datasources.parquet.{HoodieParquetFileFormat, ParquetFileFormat}
import org.apache.spark.sql.hive.orc.OrcFileFormat
import org.apache.spark.sql.sources.{BaseRelation, Filter}
import org.apache.spark.sql.types.StructType
@@ -56,6 +55,7 @@ class BaseFileOnlyRelation(sqlContext: SQLContext,
override type FileSplit = HoodieBaseFileSplit
override lazy val mandatoryColumns: Seq[String] =
// TODO reconcile, record's key shouldn't be mandatory for base-file only relation
Seq(recordKeyField)
override def imbueConfigs(sqlContext: SQLContext): Unit = {
@@ -65,14 +65,14 @@ class BaseFileOnlyRelation(sqlContext: SQLContext,
protected override def composeRDD(fileSplits: Seq[HoodieBaseFileSplit],
partitionSchema: StructType,
tableSchema: HoodieTableSchema,
dataSchema: HoodieTableSchema,
requiredSchema: HoodieTableSchema,
filters: Array[Filter]): HoodieUnsafeRDD = {
val baseFileReader = createBaseFileReader(
spark = sparkSession,
partitionSchema = partitionSchema,
tableSchema = tableSchema,
dataSchema = dataSchema,
requiredSchema = requiredSchema,
filters = filters,
options = optParams,
@@ -114,16 +114,38 @@ class BaseFileOnlyRelation(sqlContext: SQLContext,
* rule; you can find more details in HUDI-3896)
*/
def toHadoopFsRelation: HadoopFsRelation = {
// We're delegating to Spark to append partition values to every row only in cases
// when these corresponding partition-values are not persisted w/in the data file itself
val shouldAppendPartitionColumns = shouldOmitPartitionColumns
val (tableFileFormat, formatClassName) = metaClient.getTableConfig.getBaseFileFormat match {
case HoodieFileFormat.PARQUET => (new ParquetFileFormat, "parquet")
case HoodieFileFormat.PARQUET =>
(sparkAdapter.createHoodieParquetFileFormat(shouldAppendPartitionColumns).get, HoodieParquetFileFormat.FILE_FORMAT_ID)
case HoodieFileFormat.ORC => (new OrcFileFormat, "orc")
}
if (globPaths.isEmpty) {
// NOTE: There are currently 2 ways partition values could be fetched:
// - Source columns (producing the values used for physical partitioning) will be read
// from the data file
// - Values parsed from the actual partition pat would be appended to the final dataset
//
// In the former case, we don't need to provide the partition-schema to the relation,
// therefore we simply stub it w/ empty schema and use full table-schema as the one being
// read from the data file.
//
// In the latter, we have to specify proper partition schema as well as "data"-schema, essentially
// being a table-schema with all partition columns stripped out
val (partitionSchema, dataSchema) = if (shouldAppendPartitionColumns) {
(fileIndex.partitionSchema, fileIndex.dataSchema)
} else {
(StructType(Nil), tableStructSchema)
}
HadoopFsRelation(
location = fileIndex,
partitionSchema = fileIndex.partitionSchema,
dataSchema = fileIndex.dataSchema,
partitionSchema = partitionSchema,
dataSchema = dataSchema,
bucketSpec = None,
fileFormat = tableFileFormat,
optParams)(sparkSession)

View File

@@ -23,7 +23,7 @@ import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileStatus, Path}
import org.apache.hadoop.hbase.io.hfile.CacheConfig
import org.apache.hadoop.mapred.JobConf
import org.apache.hudi.HoodieBaseRelation.getPartitionPath
import org.apache.hudi.HoodieBaseRelation.{convertToAvroSchema, createHFileReader, generateUnsafeProjection, getPartitionPath}
import org.apache.hudi.HoodieConversionUtils.toScalaOption
import org.apache.hudi.common.config.{HoodieMetadataConfig, SerializableConfiguration}
import org.apache.hudi.common.fs.FSUtils
@@ -36,12 +36,13 @@ import org.apache.hudi.common.util.ValidationUtils.checkState
import org.apache.hudi.internal.schema.InternalSchema
import org.apache.hudi.internal.schema.convert.AvroInternalSchemaConverter
import org.apache.hudi.io.storage.HoodieHFileReader
import org.apache.spark.TaskContext
import org.apache.spark.execution.datasources.HoodieInMemoryFileIndex
import org.apache.spark.internal.Logging
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.avro.HoodieAvroSchemaConverters
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.{Expression, SubqueryExpression}
import org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection
import org.apache.spark.sql.catalyst.expressions.{Expression, SubqueryExpression, UnsafeProjection}
import org.apache.spark.sql.execution.FileRelation
import org.apache.spark.sql.execution.datasources.{FileStatusCache, PartitionedFile, PartitioningUtils}
import org.apache.spark.sql.hudi.HoodieSqlCommonUtils
@@ -50,11 +51,11 @@ import org.apache.spark.sql.types.{StringType, StructField, StructType}
import org.apache.spark.sql.{Row, SQLContext, SparkSession}
import org.apache.spark.unsafe.types.UTF8String
import java.io.Closeable
import java.net.URI
import java.util.Locale
import scala.collection.JavaConverters._
import scala.util.Try
import scala.util.control.NonFatal
import scala.util.{Failure, Success, Try}
trait HoodieFileSplit {}
@@ -78,7 +79,6 @@ abstract class HoodieBaseRelation(val sqlContext: SQLContext,
extends BaseRelation
with FileRelation
with PrunedFilteredScan
with SparkAdapterSupport
with Logging {
type FileSplit <: HoodieFileSplit
@@ -125,14 +125,17 @@ abstract class HoodieBaseRelation(val sqlContext: SQLContext,
protected lazy val (tableAvroSchema: Schema, internalSchema: InternalSchema) = {
val schemaUtil = new TableSchemaResolver(metaClient)
val avroSchema = Try(schemaUtil.getTableAvroSchema).getOrElse(
// If there is no commit in the table, we can't get the schema
// t/h [[TableSchemaResolver]], fallback to the provided [[userSchema]] instead.
userSchema match {
case Some(s) => sparkAdapter.getAvroSchemaConverters.toAvroType(s, nullable = false, "record")
case _ => throw new IllegalArgumentException("User-provided schema is required in case the table is empty")
}
)
val avroSchema = Try(schemaUtil.getTableAvroSchema) match {
case Success(schema) => schema
case Failure(e) =>
logWarning("Failed to fetch schema from the table", e)
// If there is no commit in the table, we can't get the schema
// t/h [[TableSchemaResolver]], fallback to the provided [[userSchema]] instead.
userSchema match {
case Some(s) => convertToAvroSchema(s)
case _ => throw new IllegalArgumentException("User-provided schema is required in case the table is empty")
}
}
// try to find internalSchema
val internalSchemaFromMeta = try {
schemaUtil.getTableInternalSchemaFromCommitMetadata.orElse(InternalSchema.getEmptyInternalSchema)
@@ -146,11 +149,8 @@ abstract class HoodieBaseRelation(val sqlContext: SQLContext,
protected val partitionColumns: Array[String] = tableConfig.getPartitionFields.orElse(Array.empty)
/**
* if true, need to deal with schema for creating file reader.
*/
protected val dropPartitionColumnsWhenWrite: Boolean =
metaClient.getTableConfig.isDropPartitionColumns && partitionColumns.nonEmpty
protected val shouldOmitPartitionColumns: Boolean =
metaClient.getTableConfig.shouldDropPartitionColumns && partitionColumns.nonEmpty
/**
* NOTE: PLEASE READ THIS CAREFULLY
@@ -205,14 +205,19 @@ abstract class HoodieBaseRelation(val sqlContext: SQLContext,
* NOTE: DO NOT OVERRIDE THIS METHOD
*/
override final def buildScan(requiredColumns: Array[String], filters: Array[Filter]): RDD[Row] = {
// NOTE: In case list of requested columns doesn't contain the Primary Key one, we
// NOTE: PLEAS READ CAREFULLY BEFORE MAKING CHANGES
//
// In case list of requested columns doesn't contain the Primary Key one, we
// have to add it explicitly so that
// - Merging could be performed correctly
// - In case 0 columns are to be fetched (for ex, when doing {@code count()} on Spark's [[Dataset]],
// Spark still fetches all the rows to execute the query correctly
// Spark still fetches all the rows to execute the query correctly
//
// It's okay to return columns that have not been requested by the caller, as those nevertheless will be
// filtered out upstream
// *Appending* additional columns to the ones requested by the caller is not a problem, as those
// will be "projected out" by the caller's projection;
//
// (!!!) IT'S CRITICAL TO AVOID REORDERING OF THE REQUESTED COLUMNS AS THIS WILL BREAK THE UPSTREAM
// PROJECTION
val fetchedColumns: Array[String] = appendMandatoryColumns(requiredColumns)
val (requiredAvroSchema, requiredStructSchema, requiredInternalSchema) =
@@ -223,56 +228,62 @@ abstract class HoodieBaseRelation(val sqlContext: SQLContext,
val fileSplits = collectFileSplits(partitionFilters, dataFilters)
val partitionSchema = if (dropPartitionColumnsWhenWrite) {
// when hoodie.datasource.write.drop.partition.columns is true, partition columns can't be persisted in
// data files.
StructType(partitionColumns.map(StructField(_, StringType)))
} else {
StructType(Nil)
}
val tableSchema = HoodieTableSchema(tableStructSchema, if (internalSchema.isEmptySchema) tableAvroSchema.toString else AvroInternalSchemaConverter.convert(internalSchema, tableAvroSchema.getName).toString, internalSchema)
val dataSchema = if (dropPartitionColumnsWhenWrite) {
val dataStructType = StructType(tableStructSchema.filterNot(f => partitionColumns.contains(f.name)))
HoodieTableSchema(
dataStructType,
sparkAdapter.getAvroSchemaConverters.toAvroType(dataStructType, nullable = false, "record").toString()
)
} else {
tableSchema
}
val requiredSchema = if (dropPartitionColumnsWhenWrite) {
val requiredStructType = StructType(requiredStructSchema.filterNot(f => partitionColumns.contains(f.name)))
HoodieTableSchema(
requiredStructType,
sparkAdapter.getAvroSchemaConverters.toAvroType(requiredStructType, nullable = false, "record").toString()
)
} else {
HoodieTableSchema(requiredStructSchema, requiredAvroSchema.toString, requiredInternalSchema)
}
// Here we rely on a type erasure, to workaround inherited API restriction and pass [[RDD[InternalRow]]] back as [[RDD[Row]]]
// Please check [[needConversion]] scala-doc for more details
if (fileSplits.nonEmpty)
composeRDD(fileSplits, partitionSchema, dataSchema, requiredSchema, filters).asInstanceOf[RDD[Row]]
else
val tableAvroSchemaStr =
if (internalSchema.isEmptySchema) tableAvroSchema.toString
else AvroInternalSchemaConverter.convert(internalSchema, tableAvroSchema.getName).toString
val tableSchema = HoodieTableSchema(tableStructSchema, tableAvroSchemaStr, internalSchema)
val requiredSchema = HoodieTableSchema(requiredStructSchema, requiredAvroSchema.toString, requiredInternalSchema)
// Since schema requested by the caller might contain partition columns, we might need to
// prune it, removing all partition columns from it in case these columns are not persisted
// in the data files
//
// NOTE: This partition schema is only relevant to file reader to be able to embed
// values of partition columns (hereafter referred to as partition values) encoded into
// the partition path, and omitted from the data file, back into fetched rows;
// Note that, by default, partition columns are not omitted therefore specifying
// partition schema for reader is not required
val (partitionSchema, dataSchema, prunedRequiredSchema) =
tryPrunePartitionColumns(tableSchema, requiredSchema)
if (fileSplits.isEmpty) {
sparkSession.sparkContext.emptyRDD
} else {
val rdd = composeRDD(fileSplits, partitionSchema, dataSchema, prunedRequiredSchema, filters)
// NOTE: In case when partition columns have been pruned from the required schema, we have to project
// the rows from the pruned schema back into the one expected by the caller
val projectedRDD = if (prunedRequiredSchema.structTypeSchema != requiredSchema.structTypeSchema) {
rdd.mapPartitions { it =>
val fullPrunedSchema = StructType(prunedRequiredSchema.structTypeSchema.fields ++ partitionSchema.fields)
val unsafeProjection = generateUnsafeProjection(fullPrunedSchema, requiredSchema.structTypeSchema)
it.map(unsafeProjection)
}
} else {
rdd
}
// Here we rely on a type erasure, to workaround inherited API restriction and pass [[RDD[InternalRow]]] back as [[RDD[Row]]]
// Please check [[needConversion]] scala-doc for more details
projectedRDD.asInstanceOf[RDD[Row]]
}
}
/**
* Composes RDD provided file splits to read from, table and partition schemas, data filters to be applied
*
* @param fileSplits file splits to be handled by the RDD
* @param partitionSchema target table's partition schema
* @param tableSchema target table's schema
* @param dataSchema target table's data files' schema
* @param requiredSchema projected schema required by the reader
* @param filters data filters to be applied
* @return instance of RDD (implementing [[HoodieUnsafeRDD]])
*/
protected def composeRDD(fileSplits: Seq[FileSplit],
partitionSchema: StructType,
tableSchema: HoodieTableSchema,
dataSchema: HoodieTableSchema,
requiredSchema: HoodieTableSchema,
filters: Array[Filter]): HoodieUnsafeRDD
@@ -325,16 +336,8 @@ abstract class HoodieBaseRelation(val sqlContext: SQLContext,
}
protected final def appendMandatoryColumns(requestedColumns: Array[String]): Array[String] = {
if (dropPartitionColumnsWhenWrite) {
if (requestedColumns.isEmpty) {
mandatoryColumns.toArray
} else {
requestedColumns
}
} else {
val missing = mandatoryColumns.filter(col => !requestedColumns.contains(col))
requestedColumns ++ missing
}
val missing = mandatoryColumns.filter(col => !requestedColumns.contains(col))
requestedColumns ++ missing
}
protected def getTableState: HoodieTableState = {
@@ -364,7 +367,7 @@ abstract class HoodieBaseRelation(val sqlContext: SQLContext,
protected def getPartitionColumnsAsInternalRow(file: FileStatus): InternalRow = {
try {
val tableConfig = metaClient.getTableConfig
if (dropPartitionColumnsWhenWrite) {
if (shouldOmitPartitionColumns) {
val relativePath = new URI(metaClient.getBasePath).relativize(new URI(file.getPath.getParent.toString)).toString
val hiveStylePartitioningEnabled = tableConfig.getHiveStylePartitioningEnable.toBoolean
if (hiveStylePartitioningEnabled) {
@@ -388,40 +391,47 @@ abstract class HoodieBaseRelation(val sqlContext: SQLContext,
InternalRow.empty
}
}
}
object HoodieBaseRelation {
def getPartitionPath(fileStatus: FileStatus): Path =
fileStatus.getPath.getParent
protected def getColName(f: StructField): String = {
if (sparkSession.sessionState.conf.caseSensitiveAnalysis) {
f.name
} else {
f.name.toLowerCase(Locale.ROOT)
}
}
/**
* Returns file-reader routine accepting [[PartitionedFile]] and returning an [[Iterator]]
* over [[InternalRow]]
*/
def createBaseFileReader(spark: SparkSession,
partitionSchema: StructType,
tableSchema: HoodieTableSchema,
requiredSchema: HoodieTableSchema,
filters: Seq[Filter],
options: Map[String, String],
hadoopConf: Configuration): PartitionedFile => Iterator[InternalRow] = {
protected def createBaseFileReader(spark: SparkSession,
partitionSchema: StructType,
dataSchema: HoodieTableSchema,
requiredSchema: HoodieTableSchema,
filters: Seq[Filter],
options: Map[String, String],
hadoopConf: Configuration): PartitionedFile => Iterator[InternalRow] = {
val hfileReader = createHFileReader(
spark = spark,
tableSchema = tableSchema,
dataSchema = dataSchema,
requiredSchema = requiredSchema,
filters = filters,
options = options,
hadoopConf = hadoopConf
)
// We're delegating to Spark to append partition values to every row only in cases
// when these corresponding partition-values are not persisted w/in the data file itself
val shouldAppendPartitionColumns = shouldOmitPartitionColumns
val parquetReader = HoodieDataSourceHelper.buildHoodieParquetReader(
sparkSession = spark,
dataSchema = tableSchema.structTypeSchema,
dataSchema = dataSchema.structTypeSchema,
partitionSchema = partitionSchema,
requiredSchema = requiredSchema.structTypeSchema,
filters = filters,
options = options,
hadoopConf = hadoopConf
hadoopConf = hadoopConf,
appendPartitionValues = shouldAppendPartitionColumns
)
partitionedFile => {
@@ -436,8 +446,38 @@ object HoodieBaseRelation {
}
}
private def tryPrunePartitionColumns(tableSchema: HoodieTableSchema,
requiredSchema: HoodieTableSchema): (StructType, HoodieTableSchema, HoodieTableSchema) = {
if (shouldOmitPartitionColumns) {
val partitionSchema = StructType(partitionColumns.map(StructField(_, StringType)))
val prunedDataStructSchema = prunePartitionColumns(tableSchema.structTypeSchema)
val prunedRequiredSchema = prunePartitionColumns(requiredSchema.structTypeSchema)
(partitionSchema,
HoodieTableSchema(prunedDataStructSchema, convertToAvroSchema(prunedDataStructSchema).toString),
HoodieTableSchema(prunedRequiredSchema, convertToAvroSchema(prunedRequiredSchema).toString))
} else {
(StructType(Nil), tableSchema, requiredSchema)
}
}
private def prunePartitionColumns(dataStructSchema: StructType): StructType =
StructType(dataStructSchema.filterNot(f => partitionColumns.contains(f.name)))
}
object HoodieBaseRelation extends SparkAdapterSupport {
private def generateUnsafeProjection(from: StructType, to: StructType) =
sparkAdapter.createCatalystExpressionUtils().generateUnsafeProjection(from, to)
def convertToAvroSchema(structSchema: StructType): Schema =
sparkAdapter.getAvroSchemaConverters.toAvroType(structSchema, nullable = false, "Record")
def getPartitionPath(fileStatus: FileStatus): Path =
fileStatus.getPath.getParent
private def createHFileReader(spark: SparkSession,
tableSchema: HoodieTableSchema,
dataSchema: HoodieTableSchema,
requiredSchema: HoodieTableSchema,
filters: Seq[Filter],
options: Map[String, String],

View File

@@ -21,6 +21,7 @@ package org.apache.hudi
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.FileStatus
import org.apache.hudi.client.utils.SparkInternalSchemaConverter
import org.apache.hudi.common.util.StringUtils.isNullOrEmpty
import org.apache.hudi.internal.schema.InternalSchema
import org.apache.hudi.internal.schema.utils.SerDeHelper
import org.apache.spark.sql.SparkSession
@@ -38,8 +39,8 @@ object HoodieDataSourceHelper extends PredicateHelper with SparkAdapterSupport {
/**
* Wrapper `buildReaderWithPartitionValues` of [[ParquetFileFormat]]
* to deal with [[ColumnarBatch]] when enable parquet vectorized reader if necessary.
* Wrapper for `buildReaderWithPartitionValues` of [[ParquetFileFormat]] handling [[ColumnarBatch]],
* when Parquet's Vectorized Reader is used
*/
def buildHoodieParquetReader(sparkSession: SparkSession,
dataSchema: StructType,
@@ -47,9 +48,11 @@ object HoodieDataSourceHelper extends PredicateHelper with SparkAdapterSupport {
requiredSchema: StructType,
filters: Seq[Filter],
options: Map[String, String],
hadoopConf: Configuration): PartitionedFile => Iterator[InternalRow] = {
hadoopConf: Configuration,
appendPartitionValues: Boolean = false): PartitionedFile => Iterator[InternalRow] = {
val readParquetFile: PartitionedFile => Iterator[Any] = sparkAdapter.createHoodieParquetFileFormat().get.buildReaderWithPartitionValues(
val parquetFileFormat: ParquetFileFormat = sparkAdapter.createHoodieParquetFileFormat(appendPartitionValues).get
val readParquetFile: PartitionedFile => Iterator[Any] = parquetFileFormat.buildReaderWithPartitionValues(
sparkSession = sparkSession,
dataSchema = dataSchema,
partitionSchema = partitionSchema,
@@ -91,9 +94,12 @@ object HoodieDataSourceHelper extends PredicateHelper with SparkAdapterSupport {
* @param validCommits valid commits, using give validCommits to validate all legal histroy Schema files, and return the latest one.
*/
def getConfigurationWithInternalSchema(conf: Configuration, internalSchema: InternalSchema, tablePath: String, validCommits: String): Configuration = {
conf.set(SparkInternalSchemaConverter.HOODIE_QUERY_SCHEMA, SerDeHelper.toJson(internalSchema))
conf.set(SparkInternalSchemaConverter.HOODIE_TABLE_PATH, tablePath)
conf.set(SparkInternalSchemaConverter.HOODIE_VALID_COMMITS_LIST, validCommits)
val querySchemaString = SerDeHelper.toJson(internalSchema)
if (!isNullOrEmpty(querySchemaString)) {
conf.set(SparkInternalSchemaConverter.HOODIE_QUERY_SCHEMA, SerDeHelper.toJson(internalSchema))
conf.set(SparkInternalSchemaConverter.HOODIE_TABLE_PATH, tablePath)
conf.set(SparkInternalSchemaConverter.HOODIE_VALID_COMMITS_LIST, validCommits)
}
conf
}
}

View File

@@ -88,7 +88,7 @@ object HoodieSparkSqlWriter {
val (parameters, hoodieConfig) = mergeParamsAndGetHoodieConfig(optParams, tableConfig)
val originKeyGeneratorClassName = HoodieWriterUtils.getOriginKeyGenerator(parameters)
val timestampKeyGeneratorConfigs = extractConfigsRelatedToTimestmapBasedKeyGenerator(
val timestampKeyGeneratorConfigs = extractConfigsRelatedToTimestampBasedKeyGenerator(
originKeyGeneratorClassName, parameters)
//validate datasource and tableconfig keygen are the same
validateKeyGeneratorConfig(originKeyGeneratorClassName, tableConfig);
@@ -758,7 +758,7 @@ object HoodieSparkSqlWriter {
(params, HoodieWriterUtils.convertMapToHoodieConfig(params))
}
private def extractConfigsRelatedToTimestmapBasedKeyGenerator(keyGenerator: String,
private def extractConfigsRelatedToTimestampBasedKeyGenerator(keyGenerator: String,
params: Map[String, String]): Map[String, String] = {
if (keyGenerator.equals(classOf[TimestampBasedKeyGenerator].getCanonicalName) ||
keyGenerator.equals(classOf[TimestampBasedAvroKeyGenerator].getCanonicalName)) {

View File

@@ -20,8 +20,8 @@ package org.apache.hudi
import org.apache.avro.Schema
import org.apache.hudi.common.model.{HoodieCommitMetadata, HoodieFileFormat, HoodieRecord, HoodieReplaceCommitMetadata}
import org.apache.hudi.common.table.{HoodieTableMetaClient, TableSchemaResolver}
import java.util.stream.Collectors
import java.util.stream.Collectors
import org.apache.hadoop.fs.{GlobPattern, Path}
import org.apache.hudi.client.common.HoodieSparkEngineContext
import org.apache.hudi.client.utils.SparkInternalSchemaConverter
@@ -36,6 +36,7 @@ import org.apache.hudi.table.HoodieSparkTable
import org.apache.log4j.LogManager
import org.apache.spark.api.java.JavaSparkContext
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.execution.datasources.parquet.HoodieParquetFileFormat
import org.apache.spark.sql.sources.{BaseRelation, TableScan}
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.{DataFrame, Row, SQLContext}
@@ -183,7 +184,7 @@ class IncrementalRelation(val sqlContext: SQLContext,
sqlContext.sparkContext.hadoopConfiguration.set(SparkInternalSchemaConverter.HOODIE_TABLE_PATH, metaClient.getBasePath)
sqlContext.sparkContext.hadoopConfiguration.set(SparkInternalSchemaConverter.HOODIE_VALID_COMMITS_LIST, validCommits)
val formatClassName = metaClient.getTableConfig.getBaseFileFormat match {
case HoodieFileFormat.PARQUET => if (!internalSchema.isEmptySchema) "HoodieParquet" else "parquet"
case HoodieFileFormat.PARQUET => HoodieParquetFileFormat.FILE_FORMAT_ID
case HoodieFileFormat.ORC => "orc"
}
sqlContext.sparkContext.hadoopConfiguration.unset("mapreduce.input.pathFilter.class")

View File

@@ -19,9 +19,7 @@ package org.apache.hudi
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{GlobPattern, Path}
import org.apache.hudi.HoodieBaseRelation.createBaseFileReader
import org.apache.hudi.HoodieConversionUtils.toScalaOption
import org.apache.hudi.common.fs.FSUtils.getRelativePartitionPath
import org.apache.hudi.common.model.{FileSlice, HoodieRecord}
import org.apache.hudi.common.table.HoodieTableMetaClient
import org.apache.hudi.common.table.timeline.{HoodieInstant, HoodieTimeline}
@@ -61,14 +59,14 @@ class MergeOnReadIncrementalRelation(sqlContext: SQLContext,
protected override def composeRDD(fileSplits: Seq[HoodieMergeOnReadFileSplit],
partitionSchema: StructType,
tableSchema: HoodieTableSchema,
dataSchema: HoodieTableSchema,
requiredSchema: HoodieTableSchema,
filters: Array[Filter]): HoodieMergeOnReadRDD = {
val fullSchemaParquetReader = createBaseFileReader(
spark = sqlContext.sparkSession,
partitionSchema = partitionSchema,
tableSchema = tableSchema,
requiredSchema = tableSchema,
dataSchema = dataSchema,
requiredSchema = dataSchema,
// This file-reader is used to read base file records, subsequently merging them with the records
// stored in delta-log files. As such, we have to read _all_ records from the base file, while avoiding
// applying any user-defined filtering _before_ we complete combining them w/ delta-log records (to make sure that
@@ -86,7 +84,7 @@ class MergeOnReadIncrementalRelation(sqlContext: SQLContext,
val requiredSchemaParquetReader = createBaseFileReader(
spark = sqlContext.sparkSession,
partitionSchema = partitionSchema,
tableSchema = tableSchema,
dataSchema = dataSchema,
requiredSchema = requiredSchema,
filters = filters ++ incrementalSpanRecordFilters,
options = optParams,
@@ -99,7 +97,7 @@ class MergeOnReadIncrementalRelation(sqlContext: SQLContext,
// TODO(HUDI-3639) implement incremental span record filtering w/in RDD to make sure returned iterator is appropriately
// filtered, since file-reader might not be capable to perform filtering
new HoodieMergeOnReadRDD(sqlContext.sparkContext, jobConf, fullSchemaParquetReader, requiredSchemaParquetReader,
tableSchema, requiredSchema, hoodieTableState, mergeType, fileSplits)
dataSchema, requiredSchema, hoodieTableState, mergeType, fileSplits)
}
override protected def collectFileSplits(partitionFilters: Seq[Expression], dataFilters: Seq[Expression]): List[HoodieMergeOnReadFileSplit] = {

View File

@@ -20,17 +20,14 @@ package org.apache.hudi
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.apache.hudi.HoodieBaseRelation.createBaseFileReader
import org.apache.hudi.HoodieConversionUtils.toScalaOption
import org.apache.hudi.MergeOnReadSnapshotRelation.getFilePath
import org.apache.hudi.common.fs.FSUtils.getRelativePartitionPath
import org.apache.hudi.common.model.{FileSlice, HoodieLogFile}
import org.apache.hudi.common.table.HoodieTableMetaClient
import org.apache.hudi.common.table.view.HoodieTableFileSystemView
import org.apache.hudi.hadoop.utils.HoodieRealtimeRecordReaderUtils.getMaxCompactionMemoryInBytes
import org.apache.spark.execution.datasources.HoodieInMemoryFileIndex
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.Expression
import org.apache.spark.sql.execution.datasources.PartitionedFile
import org.apache.spark.sql.sources.Filter
@@ -63,14 +60,14 @@ class MergeOnReadSnapshotRelation(sqlContext: SQLContext,
protected override def composeRDD(fileSplits: Seq[HoodieMergeOnReadFileSplit],
partitionSchema: StructType,
tableSchema: HoodieTableSchema,
dataSchema: HoodieTableSchema,
requiredSchema: HoodieTableSchema,
filters: Array[Filter]): HoodieMergeOnReadRDD = {
val fullSchemaParquetReader = createBaseFileReader(
spark = sqlContext.sparkSession,
partitionSchema = partitionSchema,
tableSchema = tableSchema,
requiredSchema = tableSchema,
dataSchema = dataSchema,
requiredSchema = dataSchema,
// This file-reader is used to read base file records, subsequently merging them with the records
// stored in delta-log files. As such, we have to read _all_ records from the base file, while avoiding
// applying any filtering _before_ we complete combining them w/ delta-log records (to make sure that
@@ -85,7 +82,7 @@ class MergeOnReadSnapshotRelation(sqlContext: SQLContext,
val requiredSchemaParquetReader = createBaseFileReader(
spark = sqlContext.sparkSession,
partitionSchema = partitionSchema,
tableSchema = tableSchema,
dataSchema = dataSchema,
requiredSchema = requiredSchema,
filters = filters,
options = optParams,
@@ -96,7 +93,7 @@ class MergeOnReadSnapshotRelation(sqlContext: SQLContext,
val tableState = getTableState
new HoodieMergeOnReadRDD(sqlContext.sparkContext, jobConf, fullSchemaParquetReader, requiredSchemaParquetReader,
tableSchema, requiredSchema, tableState, mergeType, fileSplits)
dataSchema, requiredSchema, tableState, mergeType, fileSplits)
}
protected override def collectFileSplits(partitionFilters: Seq[Expression], dataFilters: Seq[Expression]): List[HoodieMergeOnReadFileSplit] = {

View File

@@ -120,6 +120,9 @@ class SparkHoodieTableFileIndex(spark: SparkSession,
StructType(schema.fields.filterNot(f => partitionColumns.contains(f.name)))
}
/**
* @VisibleForTesting
*/
def partitionSchema: StructType = {
if (queryAsNonePartitionedTable) {
// If we read it as Non-Partitioned table, we should not

View File

@@ -23,26 +23,32 @@ import org.apache.hudi.SparkAdapterSupport
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.execution.datasources.PartitionedFile
import org.apache.spark.sql.execution.datasources.parquet.HoodieParquetFileFormat.FILE_FORMAT_ID
import org.apache.spark.sql.sources.Filter
import org.apache.spark.sql.types.StructType
class SparkHoodieParquetFileFormat extends ParquetFileFormat with SparkAdapterSupport {
override def shortName(): String = "HoodieParquet"
class HoodieParquetFileFormat extends ParquetFileFormat with SparkAdapterSupport {
override def toString: String = "HoodieParquet"
override def shortName(): String = FILE_FORMAT_ID
override def buildReaderWithPartitionValues(
sparkSession: SparkSession,
dataSchema: StructType,
partitionSchema: StructType,
requiredSchema: StructType,
filters: Seq[Filter],
options: Map[String, String],
hadoopConf: Configuration): PartitionedFile => Iterator[InternalRow] = {
override def toString: String = "Hoodie-Parquet"
override def buildReaderWithPartitionValues(sparkSession: SparkSession,
dataSchema: StructType,
partitionSchema: StructType,
requiredSchema: StructType,
filters: Seq[Filter],
options: Map[String, String],
hadoopConf: Configuration): PartitionedFile => Iterator[InternalRow] = {
sparkAdapter
.createHoodieParquetFileFormat().get
.createHoodieParquetFileFormat(appendPartitionValues = false).get
.buildReaderWithPartitionValues(sparkSession, dataSchema, partitionSchema, requiredSchema, filters, options, hadoopConf)
}
}
object HoodieParquetFileFormat {
val FILE_FORMAT_ID = "hoodie-parquet"
}

View File

@@ -747,7 +747,8 @@ class TestCOWDataSource extends HoodieClientTestBase {
assertEquals(resultSchema, schema1)
}
@ParameterizedTest @ValueSource(booleans = Array(true, false))
@ParameterizedTest
@ValueSource(booleans = Array(true, false))
def testCopyOnWriteWithDropPartitionColumns(enableDropPartitionColumns: Boolean) {
val records1 = recordsToStrings(dataGen.generateInsertsContainsAllPartitions("000", 100)).toList
val inputDF1 = spark.read.json(spark.sparkContext.parallelize(records1, 2))
@@ -897,9 +898,9 @@ class TestCOWDataSource extends HoodieClientTestBase {
readResult.sort("_row_key").select("shortDecimal").collect().map(_.getDecimal(0).toPlainString).mkString(","))
}
@Disabled("HUDI-3204")
@Test
def testHoodieBaseFileOnlyViewRelation(): Unit = {
@ParameterizedTest
@ValueSource(booleans = Array(true, false))
def testHoodieBaseFileOnlyViewRelation(useGlobbing: Boolean): Unit = {
val _spark = spark
import _spark.implicits._
@@ -925,18 +926,27 @@ class TestCOWDataSource extends HoodieClientTestBase {
.mode(org.apache.spark.sql.SaveMode.Append)
.save(basePath)
val res = spark.read.format("hudi").load(basePath)
// NOTE: We're testing here that both paths are appropriately handling
// partition values, regardless of whether we're reading the table
// t/h a globbed path or not
val path = if (useGlobbing) {
s"$basePath/*/*/*/*"
} else {
basePath
}
val res = spark.read.format("hudi").load(path)
assert(res.count() == 2)
// data_date is the partition field. Persist to the parquet file using the origin values, and read it.
assertEquals(
res.select("data_date").map(_.get(0).toString).collect().sorted,
Array("2018-09-23", "2018-09-24")
res.select("data_date").map(_.get(0).toString).collect().sorted.toSeq,
Seq("2018-09-23", "2018-09-24")
)
assertEquals(
res.select("_hoodie_partition_path").map(_.get(0).toString).collect().sorted,
Array("2018/09/23", "2018/09/24")
res.select("_hoodie_partition_path").map(_.get(0).toString).collect().sorted.toSeq,
Seq("2018/09/23", "2018/09/24")
)
}
}

View File

@@ -57,7 +57,6 @@ class TestCOWDataSourceStorage extends SparkClientFunctionalTestHarness {
val verificationCol: String = "driver"
val updatedVerificationVal: String = "driver_update"
@Disabled("HUDI-3896")
@ParameterizedTest
@CsvSource(Array(
"true,org.apache.hudi.keygen.SimpleKeyGenerator",

View File

@@ -30,7 +30,7 @@ import org.apache.spark.sql.catalyst.plans.JoinType
import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoTable, Join, LogicalPlan}
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.catalyst.{AliasIdentifier, TableIdentifier}
import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat
import org.apache.spark.sql.execution.datasources.parquet.{ParquetFileFormat, Spark24HoodieParquetFileFormat}
import org.apache.spark.sql.execution.datasources.{FilePartition, PartitionedFile, Spark2ParsePartitionUtil, SparkParsePartitionUtil}
import org.apache.spark.sql.hudi.SparkAdapter
import org.apache.spark.sql.hudi.parser.HoodieSpark2ExtendedSqlParser
@@ -165,7 +165,7 @@ class Spark2Adapter extends SparkAdapter {
}
}
override def createHoodieParquetFileFormat(): Option[ParquetFileFormat] = {
Some(new ParquetFileFormat)
override def createHoodieParquetFileFormat(appendPartitionValues: Boolean): Option[ParquetFileFormat] = {
Some(new Spark24HoodieParquetFileFormat(appendPartitionValues))
}
}

View File

@@ -0,0 +1,229 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.execution.datasources.parquet
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.apache.hadoop.mapreduce.lib.input.FileSplit
import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl
import org.apache.hadoop.mapreduce.{JobID, TaskAttemptID, TaskID, TaskType}
import org.apache.parquet.filter2.compat.FilterCompat
import org.apache.parquet.filter2.predicate.FilterApi
import org.apache.parquet.format.converter.ParquetMetadataConverter.SKIP_ROW_GROUPS
import org.apache.parquet.hadoop.{ParquetFileReader, ParquetInputFormat, ParquetRecordReader}
import org.apache.spark.TaskContext
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.avro.AvroDeserializer
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection
import org.apache.spark.sql.catalyst.expressions.{JoinedRow, UnsafeRow}
import org.apache.spark.sql.catalyst.util.DateTimeUtils
import org.apache.spark.sql.execution.datasources.{PartitionedFile, RecordReaderIterator}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.sources.Filter
import org.apache.spark.sql.types.{AtomicType, StructType}
import org.apache.spark.util.SerializableConfiguration
import java.net.URI
/**
* This class is an extension of [[ParquetFileFormat]] overriding Spark-specific behavior
* that's not possible to customize in any other way
*
* NOTE: This is a version of [[AvroDeserializer]] impl from Spark 2.4.4 w/ w/ the following changes applied to it:
* <ol>
* <li>Avoiding appending partition values to the rows read from the data file</li>
* </ol>
*/
class Spark24HoodieParquetFileFormat(private val shouldAppendPartitionValues: Boolean) extends ParquetFileFormat {
override def buildReaderWithPartitionValues(sparkSession: SparkSession,
dataSchema: StructType,
partitionSchema: StructType,
requiredSchema: StructType,
filters: Seq[Filter],
options: Map[String, String],
hadoopConf: Configuration): PartitionedFile => Iterator[InternalRow] = {
hadoopConf.set(ParquetInputFormat.READ_SUPPORT_CLASS, classOf[ParquetReadSupport].getName)
hadoopConf.set(
ParquetReadSupport.SPARK_ROW_REQUESTED_SCHEMA,
requiredSchema.json)
hadoopConf.set(
ParquetWriteSupport.SPARK_ROW_SCHEMA,
requiredSchema.json)
hadoopConf.set(
SQLConf.SESSION_LOCAL_TIMEZONE.key,
sparkSession.sessionState.conf.sessionLocalTimeZone)
hadoopConf.setBoolean(
SQLConf.CASE_SENSITIVE.key,
sparkSession.sessionState.conf.caseSensitiveAnalysis)
ParquetWriteSupport.setSchema(requiredSchema, hadoopConf)
// Sets flags for `ParquetToSparkSchemaConverter`
hadoopConf.setBoolean(
SQLConf.PARQUET_BINARY_AS_STRING.key,
sparkSession.sessionState.conf.isParquetBinaryAsString)
hadoopConf.setBoolean(
SQLConf.PARQUET_INT96_AS_TIMESTAMP.key,
sparkSession.sessionState.conf.isParquetINT96AsTimestamp)
val broadcastedHadoopConf =
sparkSession.sparkContext.broadcast(new SerializableConfiguration(hadoopConf))
// TODO: if you move this into the closure it reverts to the default values.
// If true, enable using the custom RecordReader for parquet. This only works for
// a subset of the types (no complex types).
val resultSchema = StructType(partitionSchema.fields ++ requiredSchema.fields)
val sqlConf = sparkSession.sessionState.conf
val enableOffHeapColumnVector = sqlConf.offHeapColumnVectorEnabled
val enableVectorizedReader: Boolean =
sqlConf.parquetVectorizedReaderEnabled &&
resultSchema.forall(_.dataType.isInstanceOf[AtomicType])
val enableRecordFilter: Boolean = sqlConf.parquetRecordFilterEnabled
val timestampConversion: Boolean = sqlConf.isParquetINT96TimestampConversion
val capacity = sqlConf.parquetVectorizedReaderBatchSize
val enableParquetFilterPushDown: Boolean = sqlConf.parquetFilterPushDown
// Whole stage codegen (PhysicalRDD) is able to deal with batches directly
val returningBatch = supportBatch(sparkSession, resultSchema)
val pushDownDate = sqlConf.parquetFilterPushDownDate
val pushDownTimestamp = sqlConf.parquetFilterPushDownTimestamp
val pushDownDecimal = sqlConf.parquetFilterPushDownDecimal
val pushDownStringStartWith = sqlConf.parquetFilterPushDownStringStartWith
val pushDownInFilterThreshold = sqlConf.parquetFilterPushDownInFilterThreshold
val isCaseSensitive = sqlConf.caseSensitiveAnalysis
(file: PartitionedFile) => {
assert(!shouldAppendPartitionValues || file.partitionValues.numFields == partitionSchema.size)
val fileSplit =
new FileSplit(new Path(new URI(file.filePath)), file.start, file.length, Array.empty)
val filePath = fileSplit.getPath
val split =
new org.apache.parquet.hadoop.ParquetInputSplit(
filePath,
fileSplit.getStart,
fileSplit.getStart + fileSplit.getLength,
fileSplit.getLength,
fileSplit.getLocations,
null)
val sharedConf = broadcastedHadoopConf.value.value
lazy val footerFileMetaData =
ParquetFileReader.readFooter(sharedConf, filePath, SKIP_ROW_GROUPS).getFileMetaData
// Try to push down filters when filter push-down is enabled.
val pushed = if (enableParquetFilterPushDown) {
val parquetSchema = footerFileMetaData.getSchema
val parquetFilters = new ParquetFilters(pushDownDate, pushDownTimestamp, pushDownDecimal,
pushDownStringStartWith, pushDownInFilterThreshold, isCaseSensitive)
filters
// Collects all converted Parquet filter predicates. Notice that not all predicates can be
// converted (`ParquetFilters.createFilter` returns an `Option`). That's why a `flatMap`
// is used here.
.flatMap(parquetFilters.createFilter(parquetSchema, _))
.reduceOption(FilterApi.and)
} else {
None
}
// PARQUET_INT96_TIMESTAMP_CONVERSION says to apply timezone conversions to int96 timestamps'
// *only* if the file was created by something other than "parquet-mr", so check the actual
// writer here for this file. We have to do this per-file, as each file in the table may
// have different writers.
// Define isCreatedByParquetMr as function to avoid unnecessary parquet footer reads.
def isCreatedByParquetMr: Boolean =
footerFileMetaData.getCreatedBy().startsWith("parquet-mr")
val convertTz =
if (timestampConversion && !isCreatedByParquetMr) {
Some(DateTimeUtils.getTimeZone(sharedConf.get(SQLConf.SESSION_LOCAL_TIMEZONE.key)))
} else {
None
}
val attemptId = new TaskAttemptID(new TaskID(new JobID(), TaskType.MAP, 0), 0)
val hadoopAttemptContext =
new TaskAttemptContextImpl(broadcastedHadoopConf.value.value, attemptId)
// Try to push down filters when filter push-down is enabled.
// Notice: This push-down is RowGroups level, not individual records.
if (pushed.isDefined) {
ParquetInputFormat.setFilterPredicate(hadoopAttemptContext.getConfiguration, pushed.get)
}
val taskContext = Option(TaskContext.get())
if (enableVectorizedReader) {
val vectorizedReader = new VectorizedParquetRecordReader(
convertTz.orNull, enableOffHeapColumnVector && taskContext.isDefined, capacity)
val iter = new RecordReaderIterator(vectorizedReader)
// SPARK-23457 Register a task completion lister before `initialization`.
taskContext.foreach(_.addTaskCompletionListener[Unit](_ => iter.close()))
vectorizedReader.initialize(split, hadoopAttemptContext)
logDebug(s"Appending $partitionSchema ${file.partitionValues}")
// NOTE: We're making appending of the partitioned values to the rows read from the
// data file configurable
if (shouldAppendPartitionValues) {
vectorizedReader.initBatch(partitionSchema, file.partitionValues)
} else {
vectorizedReader.initBatch(StructType(Nil), InternalRow.empty)
}
if (returningBatch) {
vectorizedReader.enableReturningBatches()
}
// UnsafeRowParquetRecordReader appends the columns internally to avoid another copy.
iter.asInstanceOf[Iterator[InternalRow]]
} else {
logDebug(s"Falling back to parquet-mr")
// ParquetRecordReader returns UnsafeRow
val reader = if (pushed.isDefined && enableRecordFilter) {
val parquetFilter = FilterCompat.get(pushed.get, null)
new ParquetRecordReader[UnsafeRow](new ParquetReadSupport(convertTz), parquetFilter)
} else {
new ParquetRecordReader[UnsafeRow](new ParquetReadSupport(convertTz))
}
val iter = new RecordReaderIterator(reader)
// SPARK-23457 Register a task completion lister before `initialization`.
taskContext.foreach(_.addTaskCompletionListener[Unit](_ => iter.close()))
reader.initialize(split, hadoopAttemptContext)
val fullSchema = requiredSchema.toAttributes ++ partitionSchema.toAttributes
val joinedRow = new JoinedRow()
val appendPartitionColumns = GenerateUnsafeProjection.generate(fullSchema, fullSchema)
// This is a horrible erasure hack... if we type the iterator above, then it actually check
// the type in next() and we get a class cast exception. If we make that function return
// Object, then we can defer the cast until later!
//
// NOTE: We're making appending of the partitioned values to the rows read from the
// data file configurable
if (!shouldAppendPartitionValues || partitionSchema.length == 0) {
// There is no partition columns
iter.asInstanceOf[Iterator[InternalRow]]
} else {
iter.asInstanceOf[Iterator[InternalRow]]
.map(d => appendPartitionColumns(joinedRow(d, file.partitionValues)))
}
}
}
}
}

View File

@@ -19,14 +19,13 @@
package org.apache.spark.sql.adapter
import org.apache.avro.Schema
import org.apache.spark.sql.avro.{HoodieAvroDeserializer, HoodieAvroSchemaConverters, HoodieAvroSerializer, HoodieSpark3_1AvroDeserializer, HoodieSpark3_1AvroSerializer, HoodieSparkAvroSchemaConverters}
import org.apache.spark.SPARK_VERSION
import org.apache.spark.sql.avro.{HoodieAvroDeserializer, HoodieAvroSerializer, HoodieSpark3_1AvroDeserializer, HoodieSpark3_1AvroSerializer}
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.execution.datasources.parquet.{ParquetFileFormat, Spark312HoodieParquetFileFormat}
import org.apache.spark.sql.hudi.SparkAdapter
import org.apache.spark.sql.types.DataType
import org.apache.spark.sql.{HoodieCatalystExpressionUtils, HoodieSpark3_1CatalystExpressionUtils}
import org.apache.spark.SPARK_VERSION
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.{HoodieCatalystExpressionUtils, HoodieSpark3_1CatalystExpressionUtils, SparkSession}
/**
@@ -55,14 +54,7 @@ class Spark3_1Adapter extends BaseSpark3Adapter {
}
}
override def createHoodieParquetFileFormat(): Option[ParquetFileFormat] = {
if (SPARK_VERSION.startsWith("3.1")) {
val loadClassName = "org.apache.spark.sql.execution.datasources.parquet.Spark312HoodieParquetFileFormat"
val clazz = Class.forName(loadClassName, true, Thread.currentThread().getContextClassLoader)
val ctor = clazz.getConstructors.head
Some(ctor.newInstance().asInstanceOf[ParquetFileFormat])
} else {
None
}
override def createHoodieParquetFileFormat(appendPartitionValues: Boolean): Option[ParquetFileFormat] = {
Some(new Spark312HoodieParquetFileFormat(appendPartitionValues))
}
}

View File

@@ -17,279 +17,312 @@
package org.apache.spark.sql.execution.datasources.parquet
import java.net.URI
import java.util
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl
import org.apache.hadoop.mapreduce.{JobID, TaskAttemptID, TaskID, TaskType}
import org.apache.hudi.HoodieSparkUtils
import org.apache.hudi.client.utils.SparkInternalSchemaConverter
import org.apache.hudi.common.fs.FSUtils
import org.apache.hudi.HoodieSparkUtils
import org.apache.hudi.common.util.InternalSchemaCache
import org.apache.hudi.common.util.StringUtils.isNullOrEmpty
import org.apache.hudi.common.util.{InternalSchemaCache, StringUtils}
import org.apache.hudi.common.util.collection.Pair
import org.apache.hudi.internal.schema.InternalSchema
import org.apache.hudi.internal.schema.utils.{InternalSchemaUtils, SerDeHelper}
import org.apache.hudi.internal.schema.action.InternalSchemaMerger
import org.apache.hudi.internal.schema.utils.{InternalSchemaUtils, SerDeHelper}
import org.apache.parquet.filter2.compat.FilterCompat
import org.apache.parquet.filter2.predicate.FilterApi
import org.apache.parquet.format.converter.ParquetMetadataConverter.SKIP_ROW_GROUPS
import org.apache.parquet.hadoop.{ParquetFileReader, ParquetInputFormat, ParquetRecordReader}
import org.apache.spark.TaskContext
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.avro.AvroDeserializer
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.{Cast, JoinedRow}
import org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection
import org.apache.spark.sql.catalyst.expressions.{Cast, JoinedRow}
import org.apache.spark.sql.catalyst.util.DateTimeUtils
import org.apache.spark.sql.execution.datasources.parquet.Spark312HoodieParquetFileFormat.{createParquetFilters, pruneInternalSchema, rebuildFilterFromParquet}
import org.apache.spark.sql.execution.datasources.{DataSourceUtils, PartitionedFile, RecordReaderIterator}
import org.apache.spark.sql.execution.datasources.parquet._
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.sources._
import org.apache.spark.sql.types.{AtomicType, DataType, StructField, StructType}
import org.apache.spark.util.SerializableConfiguration
class Spark312HoodieParquetFileFormat extends ParquetFileFormat {
import java.net.URI
// reference ParquetFileFormat from spark project
override def buildReaderWithPartitionValues(
sparkSession: SparkSession,
dataSchema: StructType,
partitionSchema: StructType,
requiredSchema: StructType,
filters: Seq[Filter],
options: Map[String, String],
hadoopConf: Configuration): PartitionedFile => Iterator[InternalRow] = {
if (hadoopConf.get(SparkInternalSchemaConverter.HOODIE_QUERY_SCHEMA, "").isEmpty) {
// fallback to origin parquet File read
super.buildReaderWithPartitionValues(sparkSession, dataSchema, partitionSchema, requiredSchema, filters, options, hadoopConf)
} else {
hadoopConf.set(ParquetInputFormat.READ_SUPPORT_CLASS, classOf[ParquetReadSupport].getName)
hadoopConf.set(
ParquetReadSupport.SPARK_ROW_REQUESTED_SCHEMA,
requiredSchema.json)
hadoopConf.set(
ParquetWriteSupport.SPARK_ROW_SCHEMA,
requiredSchema.json)
hadoopConf.set(
SQLConf.SESSION_LOCAL_TIMEZONE.key,
sparkSession.sessionState.conf.sessionLocalTimeZone)
hadoopConf.setBoolean(
SQLConf.NESTED_SCHEMA_PRUNING_ENABLED.key,
sparkSession.sessionState.conf.nestedSchemaPruningEnabled)
hadoopConf.setBoolean(
SQLConf.CASE_SENSITIVE.key,
sparkSession.sessionState.conf.caseSensitiveAnalysis)
ParquetWriteSupport.setSchema(requiredSchema, hadoopConf)
/**
* This class is an extension of [[ParquetFileFormat]] overriding Spark-specific behavior
* that's not possible to customize in any other way
*
* NOTE: This is a version of [[AvroDeserializer]] impl from Spark 3.1.2 w/ w/ the following changes applied to it:
* <ol>
* <li>Avoiding appending partition values to the rows read from the data file</li>
* <li>Schema on-read</li>
* </ol>
*/
class Spark312HoodieParquetFileFormat(private val shouldAppendPartitionValues: Boolean) extends ParquetFileFormat {
// Sets flags for `ParquetToSparkSchemaConverter`
hadoopConf.setBoolean(
SQLConf.PARQUET_BINARY_AS_STRING.key,
sparkSession.sessionState.conf.isParquetBinaryAsString)
hadoopConf.setBoolean(
SQLConf.PARQUET_INT96_AS_TIMESTAMP.key,
sparkSession.sessionState.conf.isParquetINT96AsTimestamp)
// for dataSource v1, we have no method to do project for spark physical plan.
// it's safe to do cols project here.
val internalSchemaString = hadoopConf.get(SparkInternalSchemaConverter.HOODIE_QUERY_SCHEMA)
val querySchemaOption = SerDeHelper.fromJson(internalSchemaString)
if (querySchemaOption.isPresent && !requiredSchema.isEmpty) {
val prunedSchema = SparkInternalSchemaConverter.convertAndPruneStructTypeToInternalSchema(requiredSchema, querySchemaOption.get())
hadoopConf.set(SparkInternalSchemaConverter.HOODIE_QUERY_SCHEMA, SerDeHelper.toJson(prunedSchema))
override def buildReaderWithPartitionValues(sparkSession: SparkSession,
dataSchema: StructType,
partitionSchema: StructType,
requiredSchema: StructType,
filters: Seq[Filter],
options: Map[String, String],
hadoopConf: Configuration): PartitionedFile => Iterator[InternalRow] = {
hadoopConf.set(ParquetInputFormat.READ_SUPPORT_CLASS, classOf[ParquetReadSupport].getName)
hadoopConf.set(
ParquetReadSupport.SPARK_ROW_REQUESTED_SCHEMA,
requiredSchema.json)
hadoopConf.set(
ParquetWriteSupport.SPARK_ROW_SCHEMA,
requiredSchema.json)
hadoopConf.set(
SQLConf.SESSION_LOCAL_TIMEZONE.key,
sparkSession.sessionState.conf.sessionLocalTimeZone)
hadoopConf.setBoolean(
SQLConf.NESTED_SCHEMA_PRUNING_ENABLED.key,
sparkSession.sessionState.conf.nestedSchemaPruningEnabled)
hadoopConf.setBoolean(
SQLConf.CASE_SENSITIVE.key,
sparkSession.sessionState.conf.caseSensitiveAnalysis)
ParquetWriteSupport.setSchema(requiredSchema, hadoopConf)
// Sets flags for `ParquetToSparkSchemaConverter`
hadoopConf.setBoolean(
SQLConf.PARQUET_BINARY_AS_STRING.key,
sparkSession.sessionState.conf.isParquetBinaryAsString)
hadoopConf.setBoolean(
SQLConf.PARQUET_INT96_AS_TIMESTAMP.key,
sparkSession.sessionState.conf.isParquetINT96AsTimestamp)
val internalSchemaStr = hadoopConf.get(SparkInternalSchemaConverter.HOODIE_QUERY_SCHEMA)
// For Spark DataSource v1, there's no Physical Plan projection/schema pruning w/in Spark itself,
// therefore it's safe to do schema projection here
if (!isNullOrEmpty(internalSchemaStr)) {
val prunedInternalSchemaStr =
pruneInternalSchema(internalSchemaStr, requiredSchema)
hadoopConf.set(SparkInternalSchemaConverter.HOODIE_QUERY_SCHEMA, prunedInternalSchemaStr)
}
val broadcastedHadoopConf =
sparkSession.sparkContext.broadcast(new SerializableConfiguration(hadoopConf))
// TODO: if you move this into the closure it reverts to the default values.
// If true, enable using the custom RecordReader for parquet. This only works for
// a subset of the types (no complex types).
val resultSchema = StructType(partitionSchema.fields ++ requiredSchema.fields)
val sqlConf = sparkSession.sessionState.conf
val enableOffHeapColumnVector = sqlConf.offHeapColumnVectorEnabled
val enableVectorizedReader: Boolean =
sqlConf.parquetVectorizedReaderEnabled &&
resultSchema.forall(_.dataType.isInstanceOf[AtomicType])
val enableRecordFilter: Boolean = sqlConf.parquetRecordFilterEnabled
val timestampConversion: Boolean = sqlConf.isParquetINT96TimestampConversion
val capacity = sqlConf.parquetVectorizedReaderBatchSize
val enableParquetFilterPushDown: Boolean = sqlConf.parquetFilterPushDown
// Whole stage codegen (PhysicalRDD) is able to deal with batches directly
val returningBatch = supportBatch(sparkSession, resultSchema)
val pushDownDate = sqlConf.parquetFilterPushDownDate
val pushDownTimestamp = sqlConf.parquetFilterPushDownTimestamp
val pushDownDecimal = sqlConf.parquetFilterPushDownDecimal
val pushDownStringStartWith = sqlConf.parquetFilterPushDownStringStartWith
val pushDownInFilterThreshold = sqlConf.parquetFilterPushDownInFilterThreshold
val isCaseSensitive = sqlConf.caseSensitiveAnalysis
(file: PartitionedFile) => {
assert(!shouldAppendPartitionValues || file.partitionValues.numFields == partitionSchema.size)
val filePath = new Path(new URI(file.filePath))
val split =
new org.apache.parquet.hadoop.ParquetInputSplit(
filePath,
file.start,
file.start + file.length,
file.length,
Array.empty,
null)
val sharedConf = broadcastedHadoopConf.value.value
// Fetch internal schema
val internalSchemaStr = sharedConf.get(SparkInternalSchemaConverter.HOODIE_QUERY_SCHEMA)
// Internal schema has to be pruned at this point
val querySchemaOption = SerDeHelper.fromJson(internalSchemaStr)
val shouldUseInternalSchema = !isNullOrEmpty(internalSchemaStr) && querySchemaOption.isPresent
val tablePath = sharedConf.get(SparkInternalSchemaConverter.HOODIE_TABLE_PATH)
val commitInstantTime = FSUtils.getCommitTime(filePath.getName).toLong;
val fileSchema = if (shouldUseInternalSchema) {
val validCommits = sharedConf.get(SparkInternalSchemaConverter.HOODIE_VALID_COMMITS_LIST)
InternalSchemaCache.getInternalSchemaByVersionId(commitInstantTime, tablePath, sharedConf, if (validCommits == null) "" else validCommits)
} else {
null
}
val broadcastedHadoopConf =
sparkSession.sparkContext.broadcast(new SerializableConfiguration(hadoopConf))
// TODO: if you move this into the closure it reverts to the default values.
// If true, enable using the custom RecordReader for parquet. This only works for
// a subset of the types (no complex types).
val resultSchema = StructType(partitionSchema.fields ++ requiredSchema.fields)
val sqlConf = sparkSession.sessionState.conf
val enableOffHeapColumnVector = sqlConf.offHeapColumnVectorEnabled
val enableVectorizedReader: Boolean =
sqlConf.parquetVectorizedReaderEnabled &&
resultSchema.forall(_.dataType.isInstanceOf[AtomicType])
val enableRecordFilter: Boolean = sqlConf.parquetRecordFilterEnabled
val timestampConversion: Boolean = sqlConf.isParquetINT96TimestampConversion
val capacity = sqlConf.parquetVectorizedReaderBatchSize
val enableParquetFilterPushDown: Boolean = sqlConf.parquetFilterPushDown
// Whole stage codegen (PhysicalRDD) is able to deal with batches directly
val returningBatch = supportBatch(sparkSession, resultSchema)
val pushDownDate = sqlConf.parquetFilterPushDownDate
val pushDownTimestamp = sqlConf.parquetFilterPushDownTimestamp
val pushDownDecimal = sqlConf.parquetFilterPushDownDecimal
val pushDownStringStartWith = sqlConf.parquetFilterPushDownStringStartWith
val pushDownInFilterThreshold = sqlConf.parquetFilterPushDownInFilterThreshold
val isCaseSensitive = sqlConf.caseSensitiveAnalysis
(file: PartitionedFile) => {
assert(file.partitionValues.numFields == partitionSchema.size)
val filePath = new Path(new URI(file.filePath))
val split =
new org.apache.parquet.hadoop.ParquetInputSplit(
filePath,
file.start,
file.start + file.length,
file.length,
Array.empty,
null)
val sharedConf = broadcastedHadoopConf.value.value
// do deal with internalSchema
val internalSchemaString = sharedConf.get(SparkInternalSchemaConverter.HOODIE_QUERY_SCHEMA)
// querySchema must be a pruned schema.
val querySchemaOption = SerDeHelper.fromJson(internalSchemaString)
val internalSchemaChangeEnabled = if (internalSchemaString.isEmpty || !querySchemaOption.isPresent) false else true
val tablePath = sharedConf.get(SparkInternalSchemaConverter.HOODIE_TABLE_PATH)
val commitInstantTime = FSUtils.getCommitTime(filePath.getName).toLong;
val fileSchema = if (internalSchemaChangeEnabled) {
val validCommits = sharedConf.get(SparkInternalSchemaConverter.HOODIE_VALID_COMMITS_LIST)
InternalSchemaCache.getInternalSchemaByVersionId(commitInstantTime, tablePath, sharedConf, if (validCommits == null) "" else validCommits)
lazy val footerFileMetaData =
ParquetFileReader.readFooter(sharedConf, filePath, SKIP_ROW_GROUPS).getFileMetaData
val datetimeRebaseMode = DataSourceUtils.datetimeRebaseMode(
footerFileMetaData.getKeyValueMetaData.get,
SQLConf.get.getConf(SQLConf.LEGACY_PARQUET_REBASE_MODE_IN_READ))
// Try to push down filters when filter push-down is enabled.
val pushed = if (enableParquetFilterPushDown) {
val parquetSchema = footerFileMetaData.getSchema
val parquetFilters = if (HoodieSparkUtils.gteqSpark3_1_3) {
createParquetFilters(
parquetSchema,
pushDownDate,
pushDownTimestamp,
pushDownDecimal,
pushDownStringStartWith,
pushDownInFilterThreshold,
isCaseSensitive,
datetimeRebaseMode)
} else {
// this should not happened, searchSchemaAndCache will deal with correctly.
null
createParquetFilters(
parquetSchema,
pushDownDate,
pushDownTimestamp,
pushDownDecimal,
pushDownStringStartWith,
pushDownInFilterThreshold,
isCaseSensitive)
}
filters.map(rebuildFilterFromParquet(_, fileSchema, querySchemaOption.orElse(null)))
// Collects all converted Parquet filter predicates. Notice that not all predicates can be
// converted (`ParquetFilters.createFilter` returns an `Option`). That's why a `flatMap`
// is used here.
.flatMap(parquetFilters.createFilter)
.reduceOption(FilterApi.and)
} else {
None
}
lazy val footerFileMetaData =
ParquetFileReader.readFooter(sharedConf, filePath, SKIP_ROW_GROUPS).getFileMetaData
val datetimeRebaseMode = DataSourceUtils.datetimeRebaseMode(
footerFileMetaData.getKeyValueMetaData.get,
SQLConf.get.getConf(SQLConf.LEGACY_PARQUET_REBASE_MODE_IN_READ))
// Try to push down filters when filter push-down is enabled.
val pushed = if (enableParquetFilterPushDown) {
val parquetSchema = footerFileMetaData.getSchema
val parquetFilters = if (HoodieSparkUtils.gteqSpark3_1_3) {
Spark312HoodieParquetFileFormat.createParquetFilters(
parquetSchema,
pushDownDate,
pushDownTimestamp,
pushDownDecimal,
pushDownStringStartWith,
pushDownInFilterThreshold,
isCaseSensitive,
datetimeRebaseMode)
} else {
Spark312HoodieParquetFileFormat.createParquetFilters(
parquetSchema,
pushDownDate,
pushDownTimestamp,
pushDownDecimal,
pushDownStringStartWith,
pushDownInFilterThreshold,
isCaseSensitive)
}
filters.map(Spark312HoodieParquetFileFormat.rebuildFilterFromParquet(_, fileSchema, querySchemaOption.get()))
// Collects all converted Parquet filter predicates. Notice that not all predicates can be
// converted (`ParquetFilters.createFilter` returns an `Option`). That's why a `flatMap`
// is used here.
.flatMap(parquetFilters.createFilter(_))
.reduceOption(FilterApi.and)
// PARQUET_INT96_TIMESTAMP_CONVERSION says to apply timezone conversions to int96 timestamps'
// *only* if the file was created by something other than "parquet-mr", so check the actual
// writer here for this file. We have to do this per-file, as each file in the table may
// have different writers.
// Define isCreatedByParquetMr as function to avoid unnecessary parquet footer reads.
def isCreatedByParquetMr: Boolean =
footerFileMetaData.getCreatedBy().startsWith("parquet-mr")
val convertTz =
if (timestampConversion && !isCreatedByParquetMr) {
Some(DateTimeUtils.getZoneId(sharedConf.get(SQLConf.SESSION_LOCAL_TIMEZONE.key)))
} else {
None
}
// PARQUET_INT96_TIMESTAMP_CONVERSION says to apply timezone conversions to int96 timestamps'
// *only* if the file was created by something other than "parquet-mr", so check the actual
// writer here for this file. We have to do this per-file, as each file in the table may
// have different writers.
// Define isCreatedByParquetMr as function to avoid unnecessary parquet footer reads.
def isCreatedByParquetMr: Boolean =
footerFileMetaData.getCreatedBy().startsWith("parquet-mr")
val int96RebaseMode = DataSourceUtils.int96RebaseMode(
footerFileMetaData.getKeyValueMetaData.get,
SQLConf.get.getConf(SQLConf.LEGACY_PARQUET_INT96_REBASE_MODE_IN_READ))
val convertTz =
if (timestampConversion && !isCreatedByParquetMr) {
Some(DateTimeUtils.getZoneId(sharedConf.get(SQLConf.SESSION_LOCAL_TIMEZONE.key)))
val attemptId = new TaskAttemptID(new TaskID(new JobID(), TaskType.MAP, 0), 0)
// Clone new conf
val hadoopAttemptConf = new Configuration(broadcastedHadoopConf.value.value)
var typeChangeInfos: java.util.Map[Integer, Pair[DataType, DataType]] = new java.util.HashMap()
if (shouldUseInternalSchema) {
val mergedInternalSchema = new InternalSchemaMerger(fileSchema, querySchemaOption.get(), true, true).mergeSchema()
val mergedSchema = SparkInternalSchemaConverter.constructSparkSchemaFromInternalSchema(mergedInternalSchema)
typeChangeInfos = SparkInternalSchemaConverter.collectTypeChangedCols(querySchemaOption.get(), mergedInternalSchema)
hadoopAttemptConf.set(ParquetReadSupport.SPARK_ROW_REQUESTED_SCHEMA, mergedSchema.json)
}
val hadoopAttemptContext =
new TaskAttemptContextImpl(hadoopAttemptConf, attemptId)
// Try to push down filters when filter push-down is enabled.
// Notice: This push-down is RowGroups level, not individual records.
if (pushed.isDefined) {
ParquetInputFormat.setFilterPredicate(hadoopAttemptContext.getConfiguration, pushed.get)
}
val taskContext = Option(TaskContext.get())
if (enableVectorizedReader) {
val vectorizedReader =
if (shouldUseInternalSchema) {
new Spark312HoodieVectorizedParquetRecordReader(
convertTz.orNull,
datetimeRebaseMode.toString,
int96RebaseMode.toString,
enableOffHeapColumnVector && taskContext.isDefined,
capacity,
typeChangeInfos)
} else {
None
new VectorizedParquetRecordReader(
convertTz.orNull,
datetimeRebaseMode.toString,
int96RebaseMode.toString,
enableOffHeapColumnVector && taskContext.isDefined,
capacity)
}
val int96RebaseMode = DataSourceUtils.int96RebaseMode(
footerFileMetaData.getKeyValueMetaData.get,
SQLConf.get.getConf(SQLConf.LEGACY_PARQUET_INT96_REBASE_MODE_IN_READ))
val attemptId = new TaskAttemptID(new TaskID(new JobID(), TaskType.MAP, 0), 0)
// use new conf
val hadoopAttempConf = new Configuration(broadcastedHadoopConf.value.value)
//
// reset request schema
var typeChangeInfos: java.util.Map[Integer, Pair[DataType, DataType]] = new java.util.HashMap()
if (internalSchemaChangeEnabled) {
val mergedInternalSchema = new InternalSchemaMerger(fileSchema, querySchemaOption.get(), true, true).mergeSchema()
val mergedSchema = SparkInternalSchemaConverter.constructSparkSchemaFromInternalSchema(mergedInternalSchema)
typeChangeInfos = SparkInternalSchemaConverter.collectTypeChangedCols(querySchemaOption.get(), mergedInternalSchema)
hadoopAttempConf.set(ParquetReadSupport.SPARK_ROW_REQUESTED_SCHEMA, mergedSchema.json)
}
val hadoopAttemptContext =
new TaskAttemptContextImpl(hadoopAttempConf, attemptId)
val iter = new RecordReaderIterator(vectorizedReader)
// SPARK-23457 Register a task completion listener before `initialization`.
taskContext.foreach(_.addTaskCompletionListener[Unit](_ => iter.close()))
vectorizedReader.initialize(split, hadoopAttemptContext)
// Try to push down filters when filter push-down is enabled.
// Notice: This push-down is RowGroups level, not individual records.
if (pushed.isDefined) {
ParquetInputFormat.setFilterPredicate(hadoopAttemptContext.getConfiguration, pushed.get)
}
val taskContext = Option(TaskContext.get())
if (enableVectorizedReader) {
val vectorizedReader = new Spark312HoodieVectorizedParquetRecordReader(
convertTz.orNull,
datetimeRebaseMode.toString,
int96RebaseMode.toString,
enableOffHeapColumnVector && taskContext.isDefined,
capacity, typeChangeInfos)
val iter = new RecordReaderIterator(vectorizedReader)
// SPARK-23457 Register a task completion listener before `initialization`.
taskContext.foreach(_.addTaskCompletionListener[Unit](_ => iter.close()))
vectorizedReader.initialize(split, hadoopAttemptContext)
// NOTE: We're making appending of the partitioned values to the rows read from the
// data file configurable
if (shouldAppendPartitionValues) {
logDebug(s"Appending $partitionSchema ${file.partitionValues}")
vectorizedReader.initBatch(partitionSchema, file.partitionValues)
if (returningBatch) {
vectorizedReader.enableReturningBatches()
}
// UnsafeRowParquetRecordReader appends the columns internally to avoid another copy.
iter.asInstanceOf[Iterator[InternalRow]]
} else {
logDebug(s"Falling back to parquet-mr")
// ParquetRecordReader returns InternalRow
val readSupport = new ParquetReadSupport(
convertTz,
enableVectorizedReader = false,
datetimeRebaseMode,
int96RebaseMode)
val reader = if (pushed.isDefined && enableRecordFilter) {
val parquetFilter = FilterCompat.get(pushed.get, null)
new ParquetRecordReader[InternalRow](readSupport, parquetFilter)
} else {
new ParquetRecordReader[InternalRow](readSupport)
}
val iter = new RecordReaderIterator[InternalRow](reader)
// SPARK-23457 Register a task completion listener before `initialization`.
taskContext.foreach(_.addTaskCompletionListener[Unit](_ => iter.close()))
reader.initialize(split, hadoopAttemptContext)
vectorizedReader.initBatch(StructType(Nil), InternalRow.empty)
}
val fullSchema = requiredSchema.toAttributes ++ partitionSchema.toAttributes
val unsafeProjection = if (typeChangeInfos.isEmpty) {
GenerateUnsafeProjection.generate(fullSchema, fullSchema)
} else {
// find type changed.
val newFullSchema = new StructType(requiredSchema.fields.zipWithIndex.map { case (f, i) =>
if (typeChangeInfos.containsKey(i)) {
StructField(f.name, typeChangeInfos.get(i).getRight, f.nullable, f.metadata)
} else f
}).toAttributes ++ partitionSchema.toAttributes
val castSchema = newFullSchema.zipWithIndex.map { case (attr, i) =>
if (typeChangeInfos.containsKey(i)) {
Cast(attr, typeChangeInfos.get(i).getLeft)
} else attr
}
GenerateUnsafeProjection.generate(castSchema, newFullSchema)
}
if (returningBatch) {
vectorizedReader.enableReturningBatches()
}
if (partitionSchema.length == 0) {
// There is no partition columns
iter.map(unsafeProjection)
} else {
val joinedRow = new JoinedRow()
iter.map(d => unsafeProjection(joinedRow(d, file.partitionValues)))
// UnsafeRowParquetRecordReader appends the columns internally to avoid another copy.
iter.asInstanceOf[Iterator[InternalRow]]
} else {
logDebug(s"Falling back to parquet-mr")
// ParquetRecordReader returns InternalRow
val readSupport = new ParquetReadSupport(
convertTz,
enableVectorizedReader = false,
datetimeRebaseMode,
int96RebaseMode)
val reader = if (pushed.isDefined && enableRecordFilter) {
val parquetFilter = FilterCompat.get(pushed.get, null)
new ParquetRecordReader[InternalRow](readSupport, parquetFilter)
} else {
new ParquetRecordReader[InternalRow](readSupport)
}
val iter = new RecordReaderIterator[InternalRow](reader)
// SPARK-23457 Register a task completion listener before `initialization`.
taskContext.foreach(_.addTaskCompletionListener[Unit](_ => iter.close()))
reader.initialize(split, hadoopAttemptContext)
val fullSchema = requiredSchema.toAttributes ++ partitionSchema.toAttributes
val unsafeProjection = if (typeChangeInfos.isEmpty) {
GenerateUnsafeProjection.generate(fullSchema, fullSchema)
} else {
// find type changed.
val newFullSchema = new StructType(requiredSchema.fields.zipWithIndex.map { case (f, i) =>
if (typeChangeInfos.containsKey(i)) {
StructField(f.name, typeChangeInfos.get(i).getRight, f.nullable, f.metadata)
} else f
}).toAttributes ++ partitionSchema.toAttributes
val castSchema = newFullSchema.zipWithIndex.map { case (attr, i) =>
if (typeChangeInfos.containsKey(i)) {
Cast(attr, typeChangeInfos.get(i).getLeft)
} else attr
}
GenerateUnsafeProjection.generate(castSchema, newFullSchema)
}
// NOTE: We're making appending of the partitioned values to the rows read from the
// data file configurable
if (!shouldAppendPartitionValues || partitionSchema.length == 0) {
// There is no partition columns
iter.map(unsafeProjection)
} else {
val joinedRow = new JoinedRow()
iter.map(d => unsafeProjection(joinedRow(d, file.partitionValues)))
}
}
}
@@ -300,6 +333,16 @@ object Spark312HoodieParquetFileFormat {
val PARQUET_FILTERS_CLASS_NAME = "org.apache.spark.sql.execution.datasources.parquet.ParquetFilters"
def pruneInternalSchema(internalSchemaStr: String, requiredSchema: StructType): String = {
val querySchemaOption = SerDeHelper.fromJson(internalSchemaStr)
if (querySchemaOption.isPresent && requiredSchema.nonEmpty) {
val prunedSchema = SparkInternalSchemaConverter.convertAndPruneStructTypeToInternalSchema(requiredSchema, querySchemaOption.get())
SerDeHelper.toJson(prunedSchema)
} else {
internalSchemaStr
}
}
private def createParquetFilters(arg: Any*): ParquetFilters = {
val clazz = Class.forName(PARQUET_FILTERS_CLASS_NAME, true, Thread.currentThread().getContextClassLoader)
val ctor = clazz.getConstructors.head

View File

@@ -24,7 +24,7 @@ import org.apache.spark.sql.catalyst.parser.ParserInterface
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.SPARK_VERSION
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat
import org.apache.spark.sql.execution.datasources.parquet.{ParquetFileFormat, Spark32HoodieParquetFileFormat}
import org.apache.spark.sql.parser.HoodieSpark3_2ExtendedSqlParser
import org.apache.spark.sql.types.DataType
import org.apache.spark.sql.{HoodieCatalystExpressionUtils, HoodieSpark3_2CatalystExpressionUtils, SparkSession}
@@ -80,14 +80,7 @@ class Spark3_2Adapter extends BaseSpark3Adapter {
}
}
override def createHoodieParquetFileFormat(): Option[ParquetFileFormat] = {
if (SPARK_VERSION.startsWith("3.2")) {
val loadClassName = "org.apache.spark.sql.execution.datasources.parquet.Spark32HoodieParquetFileFormat"
val clazz = Class.forName(loadClassName, true, Thread.currentThread().getContextClassLoader)
val ctor = clazz.getConstructors.head
Some(ctor.newInstance().asInstanceOf[ParquetFileFormat])
} else {
None
}
override def createHoodieParquetFileFormat(appendPartitionValues: Boolean): Option[ParquetFileFormat] = {
Some(new Spark32HoodieParquetFileFormat(appendPartitionValues))
}
}

View File

@@ -17,8 +17,6 @@
package org.apache.spark.sql.execution.datasources.parquet
import java.net.URI
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.apache.hadoop.mapred.FileSplit
@@ -27,6 +25,7 @@ import org.apache.hadoop.mapreduce.{JobID, TaskAttemptID, TaskID, TaskType}
import org.apache.hudi.client.utils.SparkInternalSchemaConverter
import org.apache.hudi.common.fs.FSUtils
import org.apache.hudi.common.util.InternalSchemaCache
import org.apache.hudi.common.util.StringUtils.isNullOrEmpty
import org.apache.hudi.common.util.collection.Pair
import org.apache.hudi.internal.schema.InternalSchema
import org.apache.hudi.internal.schema.action.InternalSchemaMerger
@@ -34,226 +33,266 @@ import org.apache.hudi.internal.schema.utils.{InternalSchemaUtils, SerDeHelper}
import org.apache.parquet.filter2.compat.FilterCompat
import org.apache.parquet.filter2.predicate.FilterApi
import org.apache.parquet.format.converter.ParquetMetadataConverter.SKIP_ROW_GROUPS
import org.apache.parquet.hadoop.{ParquetFileReader, ParquetInputFormat, ParquetRecordReader}
import org.apache.parquet.hadoop.{ParquetInputFormat, ParquetRecordReader}
import org.apache.spark.TaskContext
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.{Cast, JoinedRow}
import org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection
import org.apache.spark.sql.catalyst.util.DateTimeUtils
import org.apache.spark.sql.execution.datasources.parquet.Spark32HoodieParquetFileFormat.{pruneInternalSchema, rebuildFilterFromParquet}
import org.apache.spark.sql.execution.datasources.{DataSourceUtils, PartitionedFile, RecordReaderIterator}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.sources._
import org.apache.spark.sql.types.{AtomicType, DataType, StructField, StructType}
import org.apache.spark.util.SerializableConfiguration
class Spark32HoodieParquetFileFormat extends ParquetFileFormat {
import java.net.URI
// reference ParquetFileFormat from spark project
override def buildReaderWithPartitionValues(
sparkSession: SparkSession,
dataSchema: StructType,
partitionSchema: StructType,
requiredSchema: StructType,
filters: Seq[Filter],
options: Map[String, String],
hadoopConf: Configuration): PartitionedFile => Iterator[InternalRow] = {
if (hadoopConf.get(SparkInternalSchemaConverter.HOODIE_QUERY_SCHEMA, "").isEmpty) {
// fallback to origin parquet File read
super.buildReaderWithPartitionValues(sparkSession, dataSchema, partitionSchema, requiredSchema, filters, options, hadoopConf)
} else {
hadoopConf.set(ParquetInputFormat.READ_SUPPORT_CLASS, classOf[ParquetReadSupport].getName)
hadoopConf.set(
ParquetReadSupport.SPARK_ROW_REQUESTED_SCHEMA,
requiredSchema.json)
hadoopConf.set(
ParquetWriteSupport.SPARK_ROW_SCHEMA,
requiredSchema.json)
hadoopConf.set(
SQLConf.SESSION_LOCAL_TIMEZONE.key,
sparkSession.sessionState.conf.sessionLocalTimeZone)
hadoopConf.setBoolean(
SQLConf.NESTED_SCHEMA_PRUNING_ENABLED.key,
sparkSession.sessionState.conf.nestedSchemaPruningEnabled)
hadoopConf.setBoolean(
SQLConf.CASE_SENSITIVE.key,
sparkSession.sessionState.conf.caseSensitiveAnalysis)
/**
* This class is an extension of [[ParquetFileFormat]] overriding Spark-specific behavior
* that's not possible to customize in any other way
*
* NOTE: This is a version of [[AvroDeserializer]] impl from Spark 3.2.1 w/ w/ the following changes applied to it:
* <ol>
* <li>Avoiding appending partition values to the rows read from the data file</li>
* <li>Schema on-read</li>
* </ol>
*/
class Spark32HoodieParquetFileFormat(private val shouldAppendPartitionValues: Boolean) extends ParquetFileFormat {
ParquetWriteSupport.setSchema(requiredSchema, hadoopConf)
override def buildReaderWithPartitionValues(sparkSession: SparkSession,
dataSchema: StructType,
partitionSchema: StructType,
requiredSchema: StructType,
filters: Seq[Filter],
options: Map[String, String],
hadoopConf: Configuration): PartitionedFile => Iterator[InternalRow] = {
hadoopConf.set(ParquetInputFormat.READ_SUPPORT_CLASS, classOf[ParquetReadSupport].getName)
hadoopConf.set(
ParquetReadSupport.SPARK_ROW_REQUESTED_SCHEMA,
requiredSchema.json)
hadoopConf.set(
ParquetWriteSupport.SPARK_ROW_SCHEMA,
requiredSchema.json)
hadoopConf.set(
SQLConf.SESSION_LOCAL_TIMEZONE.key,
sparkSession.sessionState.conf.sessionLocalTimeZone)
hadoopConf.setBoolean(
SQLConf.NESTED_SCHEMA_PRUNING_ENABLED.key,
sparkSession.sessionState.conf.nestedSchemaPruningEnabled)
hadoopConf.setBoolean(
SQLConf.CASE_SENSITIVE.key,
sparkSession.sessionState.conf.caseSensitiveAnalysis)
// Sets flags for `ParquetToSparkSchemaConverter`
hadoopConf.setBoolean(
SQLConf.PARQUET_BINARY_AS_STRING.key,
sparkSession.sessionState.conf.isParquetBinaryAsString)
hadoopConf.setBoolean(
SQLConf.PARQUET_INT96_AS_TIMESTAMP.key,
sparkSession.sessionState.conf.isParquetINT96AsTimestamp)
// for dataSource v1, we have no method to do project for spark physical plan.
// it's safe to do cols project here.
val internalSchemaString = hadoopConf.get(SparkInternalSchemaConverter.HOODIE_QUERY_SCHEMA)
val querySchemaOption = SerDeHelper.fromJson(internalSchemaString)
if (querySchemaOption.isPresent && !requiredSchema.isEmpty) {
val prunedSchema = SparkInternalSchemaConverter.convertAndPruneStructTypeToInternalSchema(requiredSchema, querySchemaOption.get())
hadoopConf.set(SparkInternalSchemaConverter.HOODIE_QUERY_SCHEMA, SerDeHelper.toJson(prunedSchema))
ParquetWriteSupport.setSchema(requiredSchema, hadoopConf)
// Sets flags for `ParquetToSparkSchemaConverter`
hadoopConf.setBoolean(
SQLConf.PARQUET_BINARY_AS_STRING.key,
sparkSession.sessionState.conf.isParquetBinaryAsString)
hadoopConf.setBoolean(
SQLConf.PARQUET_INT96_AS_TIMESTAMP.key,
sparkSession.sessionState.conf.isParquetINT96AsTimestamp)
val internalSchemaStr = hadoopConf.get(SparkInternalSchemaConverter.HOODIE_QUERY_SCHEMA)
// For Spark DataSource v1, there's no Physical Plan projection/schema pruning w/in Spark itself,
// therefore it's safe to do schema projection here
if (!isNullOrEmpty(internalSchemaStr)) {
val prunedInternalSchemaStr =
pruneInternalSchema(internalSchemaStr, requiredSchema)
hadoopConf.set(SparkInternalSchemaConverter.HOODIE_QUERY_SCHEMA, prunedInternalSchemaStr)
}
val broadcastedHadoopConf =
sparkSession.sparkContext.broadcast(new SerializableConfiguration(hadoopConf))
// TODO: if you move this into the closure it reverts to the default values.
// If true, enable using the custom RecordReader for parquet. This only works for
// a subset of the types (no complex types).
val resultSchema = StructType(partitionSchema.fields ++ requiredSchema.fields)
val sqlConf = sparkSession.sessionState.conf
val enableOffHeapColumnVector = sqlConf.offHeapColumnVectorEnabled
val enableVectorizedReader: Boolean =
sqlConf.parquetVectorizedReaderEnabled &&
resultSchema.forall(_.dataType.isInstanceOf[AtomicType])
val enableRecordFilter: Boolean = sqlConf.parquetRecordFilterEnabled
val timestampConversion: Boolean = sqlConf.isParquetINT96TimestampConversion
val capacity = sqlConf.parquetVectorizedReaderBatchSize
val enableParquetFilterPushDown: Boolean = sqlConf.parquetFilterPushDown
// Whole stage codegen (PhysicalRDD) is able to deal with batches directly
val returningBatch = supportBatch(sparkSession, resultSchema)
val pushDownDate = sqlConf.parquetFilterPushDownDate
val pushDownTimestamp = sqlConf.parquetFilterPushDownTimestamp
val pushDownDecimal = sqlConf.parquetFilterPushDownDecimal
val pushDownStringStartWith = sqlConf.parquetFilterPushDownStringStartWith
val pushDownInFilterThreshold = sqlConf.parquetFilterPushDownInFilterThreshold
val isCaseSensitive = sqlConf.caseSensitiveAnalysis
val parquetOptions = new ParquetOptions(options, sparkSession.sessionState.conf)
val datetimeRebaseModeInRead = parquetOptions.datetimeRebaseModeInRead
val int96RebaseModeInRead = parquetOptions.int96RebaseModeInRead
(file: PartitionedFile) => {
assert(!shouldAppendPartitionValues || file.partitionValues.numFields == partitionSchema.size)
val filePath = new Path(new URI(file.filePath))
val split = new FileSplit(filePath, file.start, file.length, Array.empty[String])
val sharedConf = broadcastedHadoopConf.value.value
// Fetch internal schema
val internalSchemaStr = sharedConf.get(SparkInternalSchemaConverter.HOODIE_QUERY_SCHEMA)
// Internal schema has to be pruned at this point
val querySchemaOption = SerDeHelper.fromJson(internalSchemaStr)
val shouldUseInternalSchema = !isNullOrEmpty(internalSchemaStr) && querySchemaOption.isPresent
val tablePath = sharedConf.get(SparkInternalSchemaConverter.HOODIE_TABLE_PATH)
val commitInstantTime = FSUtils.getCommitTime(filePath.getName).toLong;
val fileSchema = if (shouldUseInternalSchema) {
val validCommits = sharedConf.get(SparkInternalSchemaConverter.HOODIE_VALID_COMMITS_LIST)
InternalSchemaCache.getInternalSchemaByVersionId(commitInstantTime, tablePath, sharedConf, if (validCommits == null) "" else validCommits)
} else {
null
}
val broadcastedHadoopConf =
sparkSession.sparkContext.broadcast(new SerializableConfiguration(hadoopConf))
// TODO: if you move this into the closure it reverts to the default values.
// If true, enable using the custom RecordReader for parquet. This only works for
// a subset of the types (no complex types).
val resultSchema = StructType(partitionSchema.fields ++ requiredSchema.fields)
val sqlConf = sparkSession.sessionState.conf
val enableOffHeapColumnVector = sqlConf.offHeapColumnVectorEnabled
val enableVectorizedReader: Boolean =
sqlConf.parquetVectorizedReaderEnabled &&
resultSchema.forall(_.dataType.isInstanceOf[AtomicType])
val enableRecordFilter: Boolean = sqlConf.parquetRecordFilterEnabled
val timestampConversion: Boolean = sqlConf.isParquetINT96TimestampConversion
val capacity = sqlConf.parquetVectorizedReaderBatchSize
val enableParquetFilterPushDown: Boolean = sqlConf.parquetFilterPushDown
// Whole stage codegen (PhysicalRDD) is able to deal with batches directly
val returningBatch = supportBatch(sparkSession, resultSchema)
val pushDownDate = sqlConf.parquetFilterPushDownDate
val pushDownTimestamp = sqlConf.parquetFilterPushDownTimestamp
val pushDownDecimal = sqlConf.parquetFilterPushDownDecimal
val pushDownStringStartWith = sqlConf.parquetFilterPushDownStringStartWith
val pushDownInFilterThreshold = sqlConf.parquetFilterPushDownInFilterThreshold
val isCaseSensitive = sqlConf.caseSensitiveAnalysis
val parquetOptions = new ParquetOptions(options, sparkSession.sessionState.conf)
val datetimeRebaseModeInRead = parquetOptions.datetimeRebaseModeInRead
val int96RebaseModeInread = parquetOptions.int96RebaseModeInRead
lazy val footerFileMetaData =
ParquetFooterReader.readFooter(sharedConf, filePath, SKIP_ROW_GROUPS).getFileMetaData
val datetimeRebaseSpec = DataSourceUtils.datetimeRebaseSpec(
footerFileMetaData.getKeyValueMetaData.get,
datetimeRebaseModeInRead)
// Try to push down filters when filter push-down is enabled.
val pushed = if (enableParquetFilterPushDown) {
val parquetSchema = footerFileMetaData.getSchema
val parquetFilters = new ParquetFilters(
parquetSchema,
pushDownDate,
pushDownTimestamp,
pushDownDecimal,
pushDownStringStartWith,
pushDownInFilterThreshold,
isCaseSensitive,
datetimeRebaseSpec)
filters.map(rebuildFilterFromParquet(_, fileSchema, querySchemaOption.orElse(null)))
// Collects all converted Parquet filter predicates. Notice that not all predicates can be
// converted (`ParquetFilters.createFilter` returns an `Option`). That's why a `flatMap`
// is used here.
.flatMap(parquetFilters.createFilter)
.reduceOption(FilterApi.and)
} else {
None
}
(file: PartitionedFile) => {
assert(file.partitionValues.numFields == partitionSchema.size)
val filePath = new Path(new URI(file.filePath))
val split = new FileSplit(filePath, file.start, file.length, Array.empty[String])
val sharedConf = broadcastedHadoopConf.value.value
// do deal with internalSchema
val internalSchemaString = sharedConf.get(SparkInternalSchemaConverter.HOODIE_QUERY_SCHEMA)
// querySchema must be a pruned schema.
val querySchemaOption = SerDeHelper.fromJson(internalSchemaString)
val internalSchemaChangeEnabled = if (internalSchemaString.isEmpty || !querySchemaOption.isPresent) false else true
val tablePath = sharedConf.get(SparkInternalSchemaConverter.HOODIE_TABLE_PATH)
val commitInstantTime = FSUtils.getCommitTime(filePath.getName).toLong;
val fileSchema = if (internalSchemaChangeEnabled) {
val validCommits = sharedConf.get(SparkInternalSchemaConverter.HOODIE_VALID_COMMITS_LIST)
InternalSchemaCache.getInternalSchemaByVersionId(commitInstantTime, tablePath, sharedConf, if (validCommits == null) "" else validCommits)
} else {
// this should not happened, searchSchemaAndCache will deal with correctly.
null
}
// PARQUET_INT96_TIMESTAMP_CONVERSION says to apply timezone conversions to int96 timestamps'
// *only* if the file was created by something other than "parquet-mr", so check the actual
// writer here for this file. We have to do this per-file, as each file in the table may
// have different writers.
// Define isCreatedByParquetMr as function to avoid unnecessary parquet footer reads.
def isCreatedByParquetMr: Boolean =
footerFileMetaData.getCreatedBy().startsWith("parquet-mr")
lazy val footerFileMetaData =
ParquetFooterReader.readFooter(sharedConf, filePath, SKIP_ROW_GROUPS).getFileMetaData
val datetimeRebaseSpec = DataSourceUtils.datetimeRebaseSpec(
footerFileMetaData.getKeyValueMetaData.get, datetimeRebaseModeInRead)
// Try to push down filters when filter push-down is enabled.
val pushed = if (enableParquetFilterPushDown) {
val parquetSchema = footerFileMetaData.getSchema
val parquetFilters = new ParquetFilters(
parquetSchema,
pushDownDate,
pushDownTimestamp,
pushDownDecimal,
pushDownStringStartWith,
pushDownInFilterThreshold,
isCaseSensitive,
datetimeRebaseSpec)
filters.map(Spark32HoodieParquetFileFormat.rebuildFilterFromParquet(_, fileSchema, querySchemaOption.get()))
// Collects all converted Parquet filter predicates. Notice that not all predicates can be
// converted (`ParquetFilters.createFilter` returns an `Option`). That's why a `flatMap`
// is used here.
.flatMap(parquetFilters.createFilter(_))
.reduceOption(FilterApi.and)
val convertTz =
if (timestampConversion && !isCreatedByParquetMr) {
Some(DateTimeUtils.getZoneId(sharedConf.get(SQLConf.SESSION_LOCAL_TIMEZONE.key)))
} else {
None
}
// PARQUET_INT96_TIMESTAMP_CONVERSION says to apply timezone conversions to int96 timestamps'
// *only* if the file was created by something other than "parquet-mr", so check the actual
// writer here for this file. We have to do this per-file, as each file in the table may
// have different writers.
// Define isCreatedByParquetMr as function to avoid unnecessary parquet footer reads.
def isCreatedByParquetMr: Boolean =
footerFileMetaData.getCreatedBy().startsWith("parquet-mr")
val int96RebaseSpec = DataSourceUtils.int96RebaseSpec(
footerFileMetaData.getKeyValueMetaData.get,
int96RebaseModeInRead)
val convertTz =
if (timestampConversion && !isCreatedByParquetMr) {
Some(DateTimeUtils.getZoneId(sharedConf.get(SQLConf.SESSION_LOCAL_TIMEZONE.key)))
val attemptId = new TaskAttemptID(new TaskID(new JobID(), TaskType.MAP, 0), 0)
// Clone new conf
val hadoopAttemptConf = new Configuration(broadcastedHadoopConf.value.value)
var typeChangeInfos: java.util.Map[Integer, Pair[DataType, DataType]] = new java.util.HashMap()
if (shouldUseInternalSchema) {
val mergedInternalSchema = new InternalSchemaMerger(fileSchema, querySchemaOption.get(), true, true).mergeSchema()
val mergedSchema = SparkInternalSchemaConverter.constructSparkSchemaFromInternalSchema(mergedInternalSchema)
typeChangeInfos = SparkInternalSchemaConverter.collectTypeChangedCols(querySchemaOption.get(), mergedInternalSchema)
hadoopAttemptConf.set(ParquetReadSupport.SPARK_ROW_REQUESTED_SCHEMA, mergedSchema.json)
}
val hadoopAttemptContext =
new TaskAttemptContextImpl(hadoopAttemptConf, attemptId)
// Try to push down filters when filter push-down is enabled.
// Notice: This push-down is RowGroups level, not individual records.
if (pushed.isDefined) {
ParquetInputFormat.setFilterPredicate(hadoopAttemptContext.getConfiguration, pushed.get)
}
val taskContext = Option(TaskContext.get())
if (enableVectorizedReader) {
val vectorizedReader =
if (shouldUseInternalSchema) {
new Spark32HoodieVectorizedParquetRecordReader(
convertTz.orNull,
datetimeRebaseSpec.mode.toString,
datetimeRebaseSpec.timeZone,
int96RebaseSpec.mode.toString,
int96RebaseSpec.timeZone,
enableOffHeapColumnVector && taskContext.isDefined,
capacity,
typeChangeInfos)
} else {
None
new VectorizedParquetRecordReader(
convertTz.orNull,
datetimeRebaseSpec.mode.toString,
datetimeRebaseSpec.timeZone,
int96RebaseSpec.mode.toString,
int96RebaseSpec.timeZone,
enableOffHeapColumnVector && taskContext.isDefined,
capacity)
}
val int96RebaseSpec = DataSourceUtils.int96RebaseSpec(
footerFileMetaData.getKeyValueMetaData.get, int96RebaseModeInread)
val attemptId = new TaskAttemptID(new TaskID(new JobID(), TaskType.MAP, 0), 0)
// use new conf
val hadoopAttempConf = new Configuration(broadcastedHadoopConf.value.value)
// SPARK-37089: We cannot register a task completion listener to close this iterator here
// because downstream exec nodes have already registered their listeners. Since listeners
// are executed in reverse order of registration, a listener registered here would close the
// iterator while downstream exec nodes are still running. When off-heap column vectors are
// enabled, this can cause a use-after-free bug leading to a segfault.
//
// reset request schema
var typeChangeInfos: java.util.Map[Integer, Pair[DataType, DataType]] = new java.util.HashMap()
if (internalSchemaChangeEnabled) {
val mergedInternalSchema = new InternalSchemaMerger(fileSchema, querySchemaOption.get(), true, true).mergeSchema()
val mergedSchema = SparkInternalSchemaConverter.constructSparkSchemaFromInternalSchema(mergedInternalSchema)
typeChangeInfos = SparkInternalSchemaConverter.collectTypeChangedCols(querySchemaOption.get(), mergedInternalSchema)
hadoopAttempConf.set(ParquetReadSupport.SPARK_ROW_REQUESTED_SCHEMA, mergedSchema.json)
}
val hadoopAttemptContext =
new TaskAttemptContextImpl(hadoopAttempConf, attemptId)
// Instead, we use FileScanRDD's task completion listener to close this iterator.
val iter = new RecordReaderIterator(vectorizedReader)
try {
vectorizedReader.initialize(split, hadoopAttemptContext)
// Try to push down filters when filter push-down is enabled.
// Notice: This push-down is RowGroups level, not individual records.
if (pushed.isDefined) {
ParquetInputFormat.setFilterPredicate(hadoopAttemptContext.getConfiguration, pushed.get)
}
val taskContext = Option(TaskContext.get())
if (enableVectorizedReader) {
val vectorizedReader = new Spark32HoodieVectorizedParquetRecordReader(
convertTz.orNull,
datetimeRebaseSpec.mode.toString,
datetimeRebaseSpec.timeZone,
int96RebaseSpec.mode.toString,
int96RebaseSpec.timeZone,
enableOffHeapColumnVector && taskContext.isDefined,
capacity, typeChangeInfos)
val iter = new RecordReaderIterator(vectorizedReader)
// SPARK-23457 Register a task completion listener before `initialization`.
// taskContext.foreach(_.addTaskCompletionListener[Unit](_ => iter.close()))
try {
vectorizedReader.initialize(split, hadoopAttemptContext)
// NOTE: We're making appending of the partitioned values to the rows read from the
// data file configurable
if (shouldAppendPartitionValues) {
logDebug(s"Appending $partitionSchema ${file.partitionValues}")
vectorizedReader.initBatch(partitionSchema, file.partitionValues)
if (returningBatch) {
vectorizedReader.enableReturningBatches()
}
// UnsafeRowParquetRecordReader appends the columns internally to avoid another copy.
iter.asInstanceOf[Iterator[InternalRow]]
} catch {
case e: Throwable =>
// SPARK-23457: In case there is an exception in initialization, close the iterator to
// avoid leaking resources.
iter.close()
throw e
}
} else {
logDebug(s"Falling back to parquet-mr")
// ParquetRecordReader returns InternalRow
val readSupport = new ParquetReadSupport(
convertTz,
enableVectorizedReader = false,
datetimeRebaseSpec,
int96RebaseSpec)
val reader = if (pushed.isDefined && enableRecordFilter) {
val parquetFilter = FilterCompat.get(pushed.get, null)
new ParquetRecordReader[InternalRow](readSupport, parquetFilter)
} else {
new ParquetRecordReader[InternalRow](readSupport)
vectorizedReader.initBatch(StructType(Nil), InternalRow.empty)
}
val iter = new RecordReaderIterator[InternalRow](reader)
// SPARK-23457 Register a task completion listener before `initialization`.
taskContext.foreach(_.addTaskCompletionListener[Unit](_ => iter.close()))
if (returningBatch) {
vectorizedReader.enableReturningBatches()
}
// UnsafeRowParquetRecordReader appends the columns internally to avoid another copy.
iter.asInstanceOf[Iterator[InternalRow]]
} catch {
case e: Throwable =>
// SPARK-23457: In case there is an exception in initialization, close the iterator to
// avoid leaking resources.
iter.close()
throw e
}
} else {
logDebug(s"Falling back to parquet-mr")
// ParquetRecordReader returns InternalRow
val readSupport = new ParquetReadSupport(
convertTz,
enableVectorizedReader = false,
datetimeRebaseSpec,
int96RebaseSpec)
val reader = if (pushed.isDefined && enableRecordFilter) {
val parquetFilter = FilterCompat.get(pushed.get, null)
new ParquetRecordReader[InternalRow](readSupport, parquetFilter)
} else {
new ParquetRecordReader[InternalRow](readSupport)
}
val iter = new RecordReaderIterator[InternalRow](reader)
try {
reader.initialize(split, hadoopAttemptContext)
val fullSchema = requiredSchema.toAttributes ++ partitionSchema.toAttributes
@@ -274,13 +313,21 @@ class Spark32HoodieParquetFileFormat extends ParquetFileFormat {
GenerateUnsafeProjection.generate(castSchema, newFullSchema)
}
if (partitionSchema.length == 0) {
// NOTE: We're making appending of the partitioned values to the rows read from the
// data file configurable
if (!shouldAppendPartitionValues || partitionSchema.length == 0) {
// There is no partition columns
iter.map(unsafeProjection)
} else {
val joinedRow = new JoinedRow()
iter.map(d => unsafeProjection(joinedRow(d, file.partitionValues)))
}
} catch {
case e: Throwable =>
// SPARK-23457: In case there is an exception in initialization, close the iterator to
// avoid leaking resources.
iter.close()
throw e
}
}
}
@@ -289,6 +336,16 @@ class Spark32HoodieParquetFileFormat extends ParquetFileFormat {
object Spark32HoodieParquetFileFormat {
def pruneInternalSchema(internalSchemaStr: String, requiredSchema: StructType): String = {
val querySchemaOption = SerDeHelper.fromJson(internalSchemaStr)
if (querySchemaOption.isPresent && requiredSchema.nonEmpty) {
val prunedSchema = SparkInternalSchemaConverter.convertAndPruneStructTypeToInternalSchema(requiredSchema, querySchemaOption.get())
SerDeHelper.toJson(prunedSchema)
} else {
internalSchemaStr
}
}
private def rebuildFilterFromParquet(oldFilter: Filter, fileSchema: InternalSchema, querySchema: InternalSchema): Filter = {
if (fileSchema == null || querySchema == null) {
oldFilter