1
0

[HUDI-3396] Make sure BaseFileOnlyViewRelation only reads projected columns (#4818)

NOTE: This change is first part of the series to clean up Hudi's Spark DataSource related implementations, making sure there's minimal code duplication among them, implementations are consistent and performant

This PR is making sure that BaseFileOnlyViewRelation only reads projected columns as well as avoiding unnecessary serde from Row to InternalRow

Brief change log
- Introduced HoodieBaseRDD as a base for all custom RDD impls
- Extracted common fields/methods to HoodieBaseRelation
- Cleaned up and streamlined HoodieBaseFileViewOnlyRelation
- Fixed all of the Relations to avoid superfluous Row <> InternalRow conversions
This commit is contained in:
Alexey Kudinkin
2022-03-09 18:45:25 -08:00
committed by GitHub
parent ca0b8fccee
commit 034addaef5
25 changed files with 751 additions and 264 deletions

View File

@@ -18,63 +18,82 @@
package org.apache.hudi
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.apache.hudi.HoodieBaseRelation.createBaseFileReader
import org.apache.hudi.common.table.HoodieTableMetaClient
import org.apache.hudi.hadoop.HoodieROTablePathFilter
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.{Expression, Literal}
import org.apache.spark.sql.execution.datasources.{FileStatusCache, PartitionedFile}
import org.apache.spark.sql.{Row, SQLContext}
import org.apache.spark.sql.catalyst.expressions.{Expression, SubqueryExpression}
import org.apache.spark.sql.execution.datasources._
import org.apache.spark.sql.sources.{BaseRelation, Filter}
import org.apache.spark.sql.types.{BooleanType, StructType}
import org.apache.spark.sql.types.StructType
/**
* The implement of [[BaseRelation]], which is used to respond to query that only touches the base files(Parquet),
* like query COW tables in Snapshot-Query and Read_Optimized mode and MOR tables in Read_Optimized mode.
* [[BaseRelation]] implementation only reading Base files of Hudi tables, essentially supporting following querying
* modes:
* <ul>
* <li>For COW tables: Snapshot</li>
* <li>For MOR tables: Read-optimized</li>
* </ul>
*
* NOTE: The reason this Relation is used in liue of Spark's default [[HadoopFsRelation]] is primarily due to the
* fact that it injects real partition's path as the value of the partition field, which Hudi ultimately persists
* as part of the record payload. In some cases, however, partition path might not necessarily be equal to the
* verbatim value of the partition path field (when custom [[KeyGenerator]] is used) therefore leading to incorrect
* partition field values being written
*/
class BaseFileOnlyViewRelation(
sqlContext: SQLContext,
metaClient: HoodieTableMetaClient,
optParams: Map[String, String],
userSchema: Option[StructType],
globPaths: Seq[Path]
) extends HoodieBaseRelation(sqlContext, metaClient, optParams, userSchema) with SparkAdapterSupport {
class BaseFileOnlyViewRelation(sqlContext: SQLContext,
metaClient: HoodieTableMetaClient,
optParams: Map[String, String],
userSchema: Option[StructType],
globPaths: Seq[Path])
extends HoodieBaseRelation(sqlContext, metaClient, optParams, userSchema) with SparkAdapterSupport {
override def buildScan(requiredColumns: Array[String], filters: Array[Filter]): RDD[Row] = {
sqlContext.sparkSession.sessionState.conf.setConfString("spark.sql.parquet.enableVectorizedReader", "false")
private val fileIndex = HoodieFileIndex(sparkSession, metaClient, userSchema, optParams,
FileStatusCache.getOrCreate(sqlContext.sparkSession))
val filterExpressions = HoodieSparkUtils.convertToCatalystExpressions(filters, tableStructSchema)
.getOrElse(Literal(true, BooleanType))
val (partitionFilters, dataFilters) = {
val splited = filters.map { filter =>
HoodieDataSourceHelper.splitPartitionAndDataPredicates(
sparkSession, filterExpressions, partitionColumns)
}
(splited.flatMap(_._1), splited.flatMap(_._2))
}
val partitionFiles = getPartitionFiles(partitionFilters, dataFilters)
override def doBuildScan(requiredColumns: Array[String], filters: Array[Filter]): RDD[InternalRow] = {
// NOTE: In case list of requested columns doesn't contain the Primary Key one, we
// have to add it explicitly so that
// - Merging could be performed correctly
// - In case 0 columns are to be fetched (for ex, when doing {@code count()} on Spark's [[Dataset]],
// Spark still fetches all the rows to execute the query correctly
//
// It's okay to return columns that have not been requested by the caller, as those nevertheless will be
// filtered out upstream
val fetchedColumns: Array[String] = appendMandatoryColumns(requiredColumns)
val maxSplitBytes = sparkSession.sessionState.conf.filesMaxPartitionBytes
val filePartitions = sparkAdapter.getFilePartitions(sparkSession, partitionFiles, maxSplitBytes)
val (requiredAvroSchema, requiredStructSchema) =
HoodieSparkUtils.getRequiredSchema(tableAvroSchema, fetchedColumns)
val requiredSchemaParquetReader = HoodieDataSourceHelper.buildHoodieParquetReader(
sparkSession = sparkSession,
dataSchema = tableStructSchema,
partitionSchema = StructType(Nil),
requiredSchema = tableStructSchema,
val filterExpressions = convertToExpressions(filters)
val (partitionFilters, dataFilters) = filterExpressions.partition(isPartitionPredicate)
val filePartitions = getPartitions(partitionFilters, dataFilters)
val partitionSchema = StructType(Nil)
val tableSchema = HoodieTableSchema(tableStructSchema, tableAvroSchema.toString)
val requiredSchema = HoodieTableSchema(requiredStructSchema, requiredAvroSchema.toString)
val baseFileReader = createBaseFileReader(
spark = sparkSession,
partitionSchema = partitionSchema,
tableSchema = tableSchema,
requiredSchema = requiredSchema,
filters = filters,
options = optParams,
hadoopConf = sparkSession.sessionState.newHadoopConf()
// NOTE: We have to fork the Hadoop Config here as Spark will be modifying it
// to configure Parquet reader appropriately
hadoopConf = new Configuration(conf)
)
new HoodieFileScanRDD(sparkSession, requiredColumns, tableStructSchema,
requiredSchemaParquetReader, filePartitions)
new HoodieFileScanRDD(sparkSession, baseFileReader, filePartitions)
}
private def getPartitionFiles(partitionFilters: Seq[Expression], dataFilters: Seq[Expression]): Seq[PartitionedFile] = {
private def getPartitions(partitionFilters: Seq[Expression], dataFilters: Seq[Expression]): Seq[FilePartition] = {
val partitionDirectories = if (globPaths.isEmpty) {
val hoodieFileIndex = HoodieFileIndex(sparkSession, metaClient, userSchema, optParams,
FileStatusCache.getOrCreate(sqlContext.sparkSession))
@@ -89,18 +108,46 @@ class BaseFileOnlyViewRelation(
inMemoryFileIndex.listFiles(partitionFilters, dataFilters)
}
val partitionFiles = partitionDirectories.flatMap { partition =>
val partitions = partitionDirectories.flatMap { partition =>
partition.files.flatMap { file =>
// TODO move to adapter
// TODO fix, currently assuming parquet as underlying format
HoodieDataSourceHelper.splitFiles(
sparkSession = sparkSession,
file = file,
partitionValues = partition.values
// TODO clarify why this is required
partitionValues = InternalRow.empty
)
}
}
partitionFiles.map{ f =>
PartitionedFile(InternalRow.empty, f.filePath, f.start, f.length)
}
val maxSplitBytes = sparkSession.sessionState.conf.filesMaxPartitionBytes
sparkAdapter.getFilePartitions(sparkSession, partitions, maxSplitBytes)
}
private def convertToExpressions(filters: Array[Filter]): Array[Expression] = {
val catalystExpressions = HoodieSparkUtils.convertToCatalystExpressions(filters, tableStructSchema)
val failedExprs = catalystExpressions.zipWithIndex.filter { case (opt, _) => opt.isEmpty }
if (failedExprs.nonEmpty) {
val failedFilters = failedExprs.map(p => filters(p._2))
logWarning(s"Failed to convert Filters into Catalyst expressions (${failedFilters.map(_.toString)})")
}
catalystExpressions.filter(_.isDefined).map(_.get).toArray
}
/**
* Checks whether given expression only references only references partition columns
* (and involves no sub-query)
*/
private def isPartitionPredicate(condition: Expression): Boolean = {
// Validates that the provided names both resolve to the same entity
val resolvedNameEquals = sparkSession.sessionState.analyzer.resolver
condition.references.forall { r => partitionColumns.exists(resolvedNameEquals(r.name, _)) } &&
!SubqueryExpression.hasSubquery(condition)
}
}

View File

@@ -22,38 +22,70 @@ import org.apache.avro.generic.GenericRecord
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.apache.hadoop.hbase.io.hfile.CacheConfig
import org.apache.hadoop.mapred.JobConf
import org.apache.hudi.HoodieBaseRelation.isMetadataTable
import org.apache.hudi.common.config.SerializableConfiguration
import org.apache.hudi.common.fs.FSUtils
import org.apache.hudi.common.model.HoodieFileFormat
import org.apache.hudi.common.model.{HoodieFileFormat, HoodieRecord}
import org.apache.hudi.common.table.{HoodieTableMetaClient, TableSchemaResolver}
import org.apache.hudi.common.util.StringUtils
import org.apache.hudi.io.storage.HoodieHFileReader
import org.apache.hudi.metadata.HoodieTableMetadata
import org.apache.hudi.metadata.{HoodieMetadataPayload, HoodieTableMetadata}
import org.apache.spark.internal.Logging
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.avro.SchemaConverters
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.execution.datasources.PartitionedFile
import org.apache.spark.sql.hudi.HoodieSqlCommonUtils
import org.apache.spark.sql.sources.{BaseRelation, Filter, PrunedFilteredScan}
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.{SQLContext, SparkSession}
import org.apache.spark.sql.{Row, SQLContext, SparkSession}
import scala.collection.JavaConverters._
import scala.util.Try
case class HoodieTableSchema(structTypeSchema: StructType, avroSchemaStr: String)
case class HoodieTableState(recordKeyField: String,
preCombineFieldOpt: Option[String])
/**
* Hoodie BaseRelation which extends [[PrunedFilteredScan]].
*/
abstract class HoodieBaseRelation(
val sqlContext: SQLContext,
metaClient: HoodieTableMetaClient,
optParams: Map[String, String],
userSchema: Option[StructType])
extends BaseRelation with PrunedFilteredScan with Logging{
abstract class HoodieBaseRelation(val sqlContext: SQLContext,
metaClient: HoodieTableMetaClient,
optParams: Map[String, String],
userSchema: Option[StructType])
extends BaseRelation with PrunedFilteredScan with Logging {
protected val sparkSession: SparkSession = sqlContext.sparkSession
protected lazy val conf: Configuration = new Configuration(sqlContext.sparkContext.hadoopConfiguration)
protected lazy val jobConf = new JobConf(conf)
// If meta fields are enabled, always prefer key from the meta field as opposed to user-specified one
// NOTE: This is historical behavior which is preserved as is
protected lazy val recordKeyField: String =
if (metaClient.getTableConfig.populateMetaFields()) HoodieRecord.RECORD_KEY_METADATA_FIELD
else metaClient.getTableConfig.getRecordKeyFieldProp
protected lazy val preCombineFieldOpt: Option[String] = getPrecombineFieldProperty
/**
* @VisibleInTests
*/
lazy val mandatoryColumns: Seq[String] = {
if (isMetadataTable(metaClient)) {
Seq(HoodieMetadataPayload.KEY_FIELD_NAME, HoodieMetadataPayload.SCHEMA_FIELD_NAME_TYPE)
} else {
Seq(recordKeyField) ++ preCombineFieldOpt.map(Seq(_)).getOrElse(Seq())
}
}
protected lazy val specifiedQueryInstant: Option[String] =
optParams.get(DataSourceReadOptions.TIME_TRAVEL_AS_OF_INSTANT.key)
.map(HoodieSqlCommonUtils.formatQueryInstant)
protected lazy val tableAvroSchema: Schema = {
val schemaUtil = new TableSchemaResolver(metaClient)
Try(schemaUtil.getTableAvroSchema).getOrElse(
@@ -81,6 +113,34 @@ abstract class HoodieBaseRelation(
}
override def schema: StructType = tableStructSchema
/**
* This method controls whether relation will be producing
* <ul>
* <li>[[Row]], when it's being equal to true</li>
* <li>[[InternalRow]], when it's being equal to false</li>
* </ul>
*
* Returning [[InternalRow]] directly enables us to save on needless ser/de loop from [[InternalRow]] (being
* produced by file-reader) to [[Row]] and back
*/
override final def needConversion: Boolean = false
/**
* NOTE: DO NOT OVERRIDE THIS METHOD
*/
override final def buildScan(requiredColumns: Array[String], filters: Array[Filter]): RDD[Row] = {
// Here we rely on a type erasure, to workaround inherited API restriction and pass [[RDD[InternalRow]]] back as [[RDD[Row]]]
// Please check [[needConversion]] scala-doc for more details
doBuildScan(requiredColumns, filters).asInstanceOf[RDD[Row]]
}
protected def doBuildScan(requiredColumns: Array[String], filters: Array[Filter]): RDD[InternalRow]
protected final def appendMandatoryColumns(requestedColumns: Array[String]): Array[String] = {
val missing = mandatoryColumns.filter(col => !requestedColumns.contains(col))
requestedColumns ++ missing
}
}
object HoodieBaseRelation {

View File

@@ -22,7 +22,7 @@ import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.FileStatus
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.{Expression, PredicateHelper, SpecificInternalRow, SubqueryExpression, UnsafeProjection}
import org.apache.spark.sql.catalyst.expressions.{PredicateHelper, SpecificInternalRow, UnsafeProjection}
import org.apache.spark.sql.execution.datasources.PartitionedFile
import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat
import org.apache.spark.sql.sources.Filter
@@ -33,43 +33,6 @@ import scala.collection.JavaConverters._
object HoodieDataSourceHelper extends PredicateHelper {
/**
* Partition the given condition into two sequence of conjunctive predicates:
* - predicates that can be evaluated using metadata only.
* - other predicates.
*/
def splitPartitionAndDataPredicates(
spark: SparkSession,
condition: Expression,
partitionColumns: Seq[String]): (Seq[Expression], Seq[Expression]) = {
splitConjunctivePredicates(condition).partition(
isPredicateMetadataOnly(spark, _, partitionColumns))
}
/**
* Check if condition can be evaluated using only metadata. In Delta, this means the condition
* only references partition columns and involves no subquery.
*/
def isPredicateMetadataOnly(
spark: SparkSession,
condition: Expression,
partitionColumns: Seq[String]): Boolean = {
isPredicatePartitionColumnsOnly(spark, condition, partitionColumns) &&
!SubqueryExpression.hasSubquery(condition)
}
/**
* Does the predicate only contains partition columns?
*/
def isPredicatePartitionColumnsOnly(
spark: SparkSession,
condition: Expression,
partitionColumns: Seq[String]): Boolean = {
val nameEquality = spark.sessionState.analyzer.resolver
condition.references.forall { r =>
partitionColumns.exists(nameEquality(r.name, _))
}
}
/**
* Wrapper `buildReaderWithPartitionValues` of [[ParquetFileFormat]]

View File

@@ -18,56 +18,37 @@
package org.apache.hudi
import org.apache.spark.{Partition, TaskContext}
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow}
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.execution.QueryExecutionException
import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.sql.execution.datasources.{FilePartition, PartitionedFile, SchemaColumnConvertNotSupportedException}
import org.apache.spark.sql.types.StructType
import org.apache.spark.{Partition, TaskContext}
/**
* Similar to [[org.apache.spark.sql.execution.datasources.FileScanRDD]].
*
* This class will extract the fields needed according to [[requiredColumns]] and
* return iterator of [[org.apache.spark.sql.Row]] directly.
* TODO eval if we actually need it
*/
class HoodieFileScanRDD(
@transient private val sparkSession: SparkSession,
requiredColumns: Array[String],
schema: StructType,
readFunction: PartitionedFile => Iterator[InternalRow],
@transient val filePartitions: Seq[FilePartition])
extends RDD[Row](sparkSession.sparkContext, Nil) {
private val requiredSchema = {
val nameToStructField = schema.map(field => (field.name, field)).toMap
StructType(requiredColumns.map(nameToStructField))
}
private val requiredFieldPos = HoodieSparkUtils.collectFieldIndexes(requiredSchema, schema)
override def compute(split: Partition, context: TaskContext): Iterator[Row] = {
val iterator = new Iterator[Object] with AutoCloseable {
class HoodieFileScanRDD(@transient private val sparkSession: SparkSession,
readFunction: PartitionedFile => Iterator[InternalRow],
@transient fileSplits: Seq[FilePartition])
extends HoodieUnsafeRDD(sparkSession.sparkContext) {
override def compute(split: Partition, context: TaskContext): Iterator[InternalRow] = {
val iterator = new Iterator[InternalRow] with AutoCloseable {
private[this] val files = split.asInstanceOf[FilePartition].files.toIterator
private[this] var currentFile: PartitionedFile = null
private[this] var currentIterator: Iterator[Object] = null
private[this] var currentFile: PartitionedFile = _
private[this] var currentIterator: Iterator[InternalRow] = _
override def hasNext: Boolean = {
(currentIterator != null && currentIterator.hasNext) || nextIterator()
}
def next(): Object = {
currentIterator.next()
}
def next(): InternalRow = currentIterator.next()
/** Advances to the next file. Returns true if a new non-empty iterator is available. */
private def nextIterator(): Boolean = {
if (files.hasNext) {
currentFile = files.next()
logInfo(s"Reading File $currentFile")
currentFile = files.next()
currentIterator = readFunction(currentFile)
try {
@@ -93,17 +74,8 @@ class HoodieFileScanRDD(
// Register an on-task-completion callback to close the input stream.
context.addTaskCompletionListener[Unit](_ => iterator.close())
// extract required columns from row
val iterAfterExtract = HoodieDataSourceHelper.extractRequiredSchema(
iterator.asInstanceOf[Iterator[InternalRow]],
requiredSchema,
requiredFieldPos)
// convert InternalRow to Row and return
val converter = CatalystTypeConverters.createToScalaConverter(requiredSchema)
iterAfterExtract.map(converter(_).asInstanceOf[Row])
iterator.asInstanceOf[Iterator[InternalRow]]
}
override protected def getPartitions: Array[Partition] = filePartitions.toArray
override protected def getPartitions: Array[Partition] = fileSplits.toArray
}

View File

@@ -36,7 +36,6 @@ import org.apache.hudi.exception.HoodieException
import org.apache.hudi.hadoop.config.HoodieRealtimeConfig
import org.apache.hudi.metadata.HoodieTableMetadata.getDataTableBasePathFromMetadataTable
import org.apache.hudi.metadata.{HoodieBackedTableMetadata, HoodieTableMetadata}
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.UnsafeProjection
import org.apache.spark.sql.execution.datasources.PartitionedFile
@@ -54,10 +53,11 @@ class HoodieMergeOnReadRDD(@transient sc: SparkContext,
@transient config: Configuration,
fullSchemaFileReader: PartitionedFile => Iterator[InternalRow],
requiredSchemaFileReader: PartitionedFile => Iterator[InternalRow],
tableState: HoodieMergeOnReadTableState,
tableState: HoodieTableState,
tableSchema: HoodieTableSchema,
requiredSchema: HoodieTableSchema)
extends RDD[InternalRow](sc, Nil) {
requiredSchema: HoodieTableSchema,
@transient fileSplits: List[HoodieMergeOnReadFileSplit])
extends HoodieUnsafeRDD(sc) {
private val confBroadcast = sc.broadcast(new SerializableWritable(config))
private val recordKeyField = tableState.recordKeyField
@@ -98,12 +98,8 @@ class HoodieMergeOnReadRDD(@transient sc: SparkContext,
iter
}
override protected def getPartitions: Array[Partition] = {
tableState
.hoodieRealtimeFileSplits
.zipWithIndex
.map(file => HoodieMergeOnReadPartition(file._2, file._1)).toArray
}
override protected def getPartitions: Array[Partition] =
fileSplits.zipWithIndex.map(file => HoodieMergeOnReadPartition(file._2, file._1)).toArray
private def getConfig: Configuration = {
val conf = confBroadcast.value.value

View File

@@ -0,0 +1,68 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.hudi
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.{Partition, SparkContext, TaskContext}
/**
* !!! PLEASE READ CAREFULLY !!!
*
* Base class for all of the custom low-overhead RDD implementations for Hudi.
*
* To keep memory allocation footprint as low as possible, each inheritor of this RDD base class
*
* <pre>
* 1. Does NOT deserialize from [[InternalRow]] to [[Row]] (therefore only providing access to
* Catalyst internal representations (often mutable) of the read row)
*
* 2. DOES NOT COPY UNDERLYING ROW OUT OF THE BOX, meaning that
*
* a) access to this RDD is NOT thread-safe
*
* b) iterating over it reference to a _mutable_ underlying instance (of [[InternalRow]]) is
* returned, entailing that after [[Iterator#next()]] is invoked on the provided iterator,
* previous reference becomes **invalid**. Therefore, you will have to copy underlying mutable
* instance of [[InternalRow]] if you plan to access it after [[Iterator#next()]] is invoked (filling
* it with the next row's payload)
*
* c) due to item b) above, no operation other than the iteration will produce meaningful
* results on it and will likely fail [1]
* </pre>
*
* [1] For example, [[RDD#collect]] method on this implementation would not work correctly, as it's
* simply using Scala's default [[Iterator#toArray]] method which will simply concat all the references onto
* the same underlying mutable object into [[Array]]. Instead each individual [[InternalRow]] _has to be copied_,
* before concatenating into the final output. Please refer to [[HoodieRDDUtils#collect]] for more details.
*
* NOTE: It enforces, for ex, that all of the RDDs implement [[compute]] method returning
* [[InternalRow]] to avoid superfluous ser/de
*/
abstract class HoodieUnsafeRDD(@transient sc: SparkContext)
extends RDD[InternalRow](sc, Nil) {
def compute(split: Partition, context: TaskContext): Iterator[InternalRow]
override final def collect(): Array[InternalRow] =
throw new UnsupportedOperationException(
"This method will not function correctly, please refer to scala-doc for HoodieUnsafeRDD"
)
}

View File

@@ -19,7 +19,6 @@ package org.apache.hudi
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{GlobPattern, Path}
import org.apache.hadoop.mapred.JobConf
import org.apache.hudi.HoodieBaseRelation.createBaseFileReader
import org.apache.hudi.common.model.HoodieRecord
import org.apache.hudi.common.table.HoodieTableMetaClient
@@ -28,11 +27,11 @@ import org.apache.hudi.exception.HoodieException
import org.apache.hudi.hadoop.utils.HoodieInputFormatUtils.{getCommitMetadata, getWritePartitionPaths, listAffectedFilesForCommits}
import org.apache.hudi.hadoop.utils.HoodieRealtimeRecordReaderUtils.getMaxCompactionMemoryInBytes
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{Row, SQLContext}
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.execution.datasources.PartitionedFile
import org.apache.spark.sql.sources._
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.{Row, SQLContext}
import scala.collection.JavaConversions._
@@ -47,9 +46,6 @@ class MergeOnReadIncrementalRelation(sqlContext: SQLContext,
val metaClient: HoodieTableMetaClient)
extends HoodieBaseRelation(sqlContext, metaClient, optParams, userSchema) {
private val conf = new Configuration(sqlContext.sparkContext.hadoopConfiguration)
private val jobConf = new JobConf(conf)
private val commitTimeline = metaClient.getCommitsAndCompactionTimeline.filterCompletedInstants()
if (commitTimeline.empty()) {
throw new HoodieException("No instants to incrementally pull")
@@ -77,8 +73,6 @@ class MergeOnReadIncrementalRelation(sqlContext: SQLContext,
private val fileIndex = if (commitsToReturn.isEmpty) List() else buildFileIndex()
private val preCombineFieldOpt = getPrecombineFieldProperty
// Record filters making sure that only records w/in the requested bounds are being fetched as part of the
// scan collected by this relation
private lazy val incrementalSpanRecordsFilters: Seq[Filter] = {
@@ -88,18 +82,16 @@ class MergeOnReadIncrementalRelation(sqlContext: SQLContext,
Seq(isNotNullFilter, largerThanFilter, lessThanFilter)
}
private lazy val mandatoryColumns = {
override lazy val mandatoryColumns: Seq[String] = {
// NOTE: This columns are required for Incremental flow to be able to handle the rows properly, even in
// cases when no columns are requested to be fetched (for ex, when using {@code count()} API)
Seq(HoodieRecord.RECORD_KEY_METADATA_FIELD, HoodieRecord.COMMIT_TIME_METADATA_FIELD) ++
preCombineFieldOpt.map(Seq(_)).getOrElse(Seq())
}
override def needConversion: Boolean = false
override def buildScan(requiredColumns: Array[String], filters: Array[Filter]): RDD[Row] = {
override def doBuildScan(requiredColumns: Array[String], filters: Array[Filter]): RDD[InternalRow] = {
if (fileIndex.isEmpty) {
sqlContext.sparkContext.emptyRDD[Row]
sqlContext.sparkContext.emptyRDD[InternalRow]
} else {
logDebug(s"buildScan requiredColumns = ${requiredColumns.mkString(",")}")
logDebug(s"buildScan filters = ${filters.mkString(",")}")
@@ -148,20 +140,20 @@ class MergeOnReadIncrementalRelation(sqlContext: SQLContext,
hadoopConf = new Configuration(conf)
)
val hoodieTableState = HoodieMergeOnReadTableState(fileIndex, HoodieRecord.RECORD_KEY_METADATA_FIELD, preCombineFieldOpt)
val hoodieTableState = HoodieTableState(HoodieRecord.RECORD_KEY_METADATA_FIELD, preCombineFieldOpt)
// TODO implement incremental span record filtering w/in RDD to make sure returned iterator is appropriately
// filtered, since file-reader might not be capable to perform filtering
val rdd = new HoodieMergeOnReadRDD(
new HoodieMergeOnReadRDD(
sqlContext.sparkContext,
jobConf,
fullSchemaParquetReader,
requiredSchemaParquetReader,
hoodieTableState,
tableSchema,
requiredSchema
requiredSchema,
fileIndex
)
rdd.asInstanceOf[RDD[Row]]
}
}
@@ -225,9 +217,4 @@ class MergeOnReadIncrementalRelation(sqlContext: SQLContext,
latestCommit, metaClient.getBasePath, maxCompactionMemoryInBytes, mergeType)
})
}
private def appendMandatoryColumns(requestedColumns: Array[String]): Array[String] = {
val missing = mandatoryColumns.filter(col => !requestedColumns.contains(col))
requestedColumns ++ missing
}
}

View File

@@ -20,22 +20,19 @@ package org.apache.hudi
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.apache.hadoop.mapred.JobConf
import org.apache.hudi.HoodieBaseRelation.{createBaseFileReader, isMetadataTable}
import org.apache.hudi.HoodieBaseRelation.createBaseFileReader
import org.apache.hudi.common.model.{HoodieLogFile, HoodieRecord}
import org.apache.hudi.common.table.HoodieTableMetaClient
import org.apache.hudi.common.table.view.HoodieTableFileSystemView
import org.apache.hudi.hadoop.utils.HoodieRealtimeInputFormatUtils
import org.apache.hudi.hadoop.utils.HoodieRealtimeRecordReaderUtils.getMaxCompactionMemoryInBytes
import org.apache.hudi.metadata.HoodieMetadataPayload
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.Expression
import org.apache.spark.sql.execution.datasources.{FileStatusCache, PartitionedFile}
import org.apache.spark.sql.hudi.HoodieSqlCommonUtils
import org.apache.spark.sql.sources.Filter
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.{Row, SQLContext}
import scala.collection.JavaConverters._
@@ -46,10 +43,6 @@ case class HoodieMergeOnReadFileSplit(dataFile: Option[PartitionedFile],
maxCompactionMemoryInBytes: Long,
mergeType: String)
case class HoodieMergeOnReadTableState(hoodieRealtimeFileSplits: List[HoodieMergeOnReadFileSplit],
recordKeyField: String,
preCombineFieldOpt: Option[String])
class MergeOnReadSnapshotRelation(sqlContext: SQLContext,
optParams: Map[String, String],
val userSchema: Option[StructType],
@@ -57,38 +50,13 @@ class MergeOnReadSnapshotRelation(sqlContext: SQLContext,
val metaClient: HoodieTableMetaClient)
extends HoodieBaseRelation(sqlContext, metaClient, optParams, userSchema) {
private val conf = new Configuration(sqlContext.sparkContext.hadoopConfiguration)
private val jobConf = new JobConf(conf)
private val mergeType = optParams.getOrElse(
DataSourceReadOptions.REALTIME_MERGE.key,
DataSourceReadOptions.REALTIME_MERGE.defaultValue)
private val maxCompactionMemoryInBytes = getMaxCompactionMemoryInBytes(jobConf)
// If meta fields are enabled, always prefer key from the meta field as opposed to user-specified one
// NOTE: This is historical behavior which is preserved as is
private val recordKeyField = {
if (metaClient.getTableConfig.populateMetaFields()) HoodieRecord.RECORD_KEY_METADATA_FIELD
else metaClient.getTableConfig.getRecordKeyFieldProp
}
private val preCombineFieldOpt = getPrecombineFieldProperty
private lazy val mandatoryColumns = {
if (isMetadataTable(metaClient)) {
Seq(HoodieMetadataPayload.KEY_FIELD_NAME, HoodieMetadataPayload.SCHEMA_FIELD_NAME_TYPE)
} else {
Seq(recordKeyField) ++ preCombineFieldOpt.map(Seq(_)).getOrElse(Seq())
}
}
override def needConversion: Boolean = false
private val specifiedQueryInstant = optParams.get(DataSourceReadOptions.TIME_TRAVEL_AS_OF_INSTANT.key)
.map(HoodieSqlCommonUtils.formatQueryInstant)
override def buildScan(requiredColumns: Array[String], filters: Array[Filter]): RDD[Row] = {
override def doBuildScan(requiredColumns: Array[String], filters: Array[Filter]): RDD[InternalRow] = {
log.debug(s" buildScan requiredColumns = ${requiredColumns.mkString(",")}")
log.debug(s" buildScan filters = ${filters.mkString(",")}")
@@ -137,12 +105,10 @@ class MergeOnReadSnapshotRelation(sqlContext: SQLContext,
hadoopConf = new Configuration(conf)
)
val tableState = HoodieMergeOnReadTableState(fileIndex, recordKeyField, preCombineFieldOpt)
val tableState = HoodieTableState(recordKeyField, preCombineFieldOpt)
val rdd = new HoodieMergeOnReadRDD(sqlContext.sparkContext, jobConf, fullSchemaParquetReader,
requiredSchemaParquetReader, tableState, tableSchema, requiredSchema)
rdd.asInstanceOf[RDD[Row]]
new HoodieMergeOnReadRDD(sqlContext.sparkContext, jobConf, fullSchemaParquetReader,
requiredSchemaParquetReader, tableState, tableSchema, requiredSchema, fileIndex)
}
def buildFileIndex(filters: Array[Filter]): List[HoodieMergeOnReadFileSplit] = {
@@ -193,7 +159,7 @@ class MergeOnReadSnapshotRelation(sqlContext: SQLContext,
val partitionColumns = hoodieFileIndex.partitionSchema.fieldNames.toSet
val partitionFilters = filters.filter(f => f.references.forall(p => partitionColumns.contains(p)))
val partitionFilterExpression =
HoodieSparkUtils.convertToCatalystExpressions(partitionFilters, tableStructSchema)
HoodieSparkUtils.convertToCatalystExpression(partitionFilters, tableStructSchema)
val convertedPartitionFilterExpression =
HoodieFileIndex.convertFilterForTimestampKeyGenerator(metaClient, partitionFilterExpression.toSeq)
@@ -231,11 +197,6 @@ class MergeOnReadSnapshotRelation(sqlContext: SQLContext,
}
}
}
private def appendMandatoryColumns(requestedColumns: Array[String]): Array[String] = {
val missing = mandatoryColumns.filter(col => !requestedColumns.contains(col))
requestedColumns ++ missing
}
}
object MergeOnReadSnapshotRelation {

View File

@@ -0,0 +1,44 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark
import org.apache.hudi.HoodieUnsafeRDD
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.util.MutablePair
/**
* Suite of utilities helping in handling instances of [[HoodieUnsafeRDD]]
*/
object HoodieUnsafeRDDUtils {
/**
* Canonical implementation of the [[RDD#collect]] for [[HoodieUnsafeRDD]], returning a properly
* copied [[Array]] of [[InternalRow]]s
*/
def collect(rdd: HoodieUnsafeRDD): Array[InternalRow] = {
rdd.mapPartitionsInternal { iter =>
// NOTE: We're leveraging [[MutablePair]] here to avoid unnecessary allocations, since
// a) iteration is performed lazily and b) iteration is single-threaded (w/in partition)
val pair = new MutablePair[InternalRow, Null]()
iter.map(row => pair.update(row.copy(), null))
}
.map(p => p._1)
.collect()
}
}

View File

@@ -20,8 +20,8 @@ package org.apache.spark.sql.avro
import org.apache.avro.Schema
import org.apache.spark.sql.types.DataType
class HoodieAvroSerializer(rootCatalystType: DataType, rootAvroType: Schema, nullable: Boolean)
extends HoodieAvroSerializerTrait {
class HoodieSparkAvroSerializer(rootCatalystType: DataType, rootAvroType: Schema, nullable: Boolean)
extends HoodieAvroSerializer {
val avroSerializer = new AvroSerializer(rootCatalystType, rootAvroType, nullable)