1
0

[HUDI-3338] Custom relation instead of HadoopFsRelation (#4709)

Currently, HadoopFsRelation will use the value of the real partition path as the value of the partition field. However, different from the normal table, Hudi will persist the partition value in the parquet file. And in some cases, it's different between the value of the real partition path and the value of the partition field.
So here we implement BaseFileOnlyViewRelation which lets Hudi manage its own relation.
This commit is contained in:
Yann Byron
2022-02-12 02:48:44 +08:00
committed by GitHub
parent 10474e0962
commit b431246710
20 changed files with 685 additions and 235 deletions

View File

@@ -117,7 +117,7 @@ object AvroConversionUtils {
def buildAvroRecordBySchema(record: IndexedRecord,
requiredSchema: Schema,
requiredPos: List[Int],
requiredPos: Seq[Int],
recordBuilder: GenericRecordBuilder): GenericRecord = {
val requiredFields = requiredSchema.getFields.asScala
assert(requiredFields.length == requiredPos.length)

View File

@@ -38,11 +38,11 @@ import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.encoders.RowEncoder
import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Expression, Literal}
import org.apache.spark.sql.execution.datasources.{FileStatusCache, InMemoryFileIndex}
import org.apache.spark.sql.functions._
import org.apache.spark.sql.sources._
import org.apache.spark.sql.types.{StringType, StructField, StructType}
import org.apache.spark.sql.{DataFrame, SparkSession}
import scala.collection.JavaConverters._
import scala.collection.JavaConverters.asScalaBufferConverter
object HoodieSparkUtils extends SparkAdapterSupport {
@@ -293,4 +293,30 @@ object HoodieSparkUtils extends SparkAdapterSupport {
s"${tableSchema.fieldNames.mkString(",")}")
AttributeReference(columnName, field.get.dataType, field.get.nullable)()
}
def getRequiredSchema(tableAvroSchema: Schema, requiredColumns: Array[String]): (Schema, StructType) = {
// First get the required avro-schema, then convert the avro-schema to spark schema.
val name2Fields = tableAvroSchema.getFields.asScala.map(f => f.name() -> f).toMap
// Here have to create a new Schema.Field object
// to prevent throwing exceptions like "org.apache.avro.AvroRuntimeException: Field already used".
val requiredFields = requiredColumns.map(c => name2Fields(c))
.map(f => new Schema.Field(f.name(), f.schema(), f.doc(), f.defaultVal(), f.order())).toList
val requiredAvroSchema = Schema.createRecord(tableAvroSchema.getName, tableAvroSchema.getDoc,
tableAvroSchema.getNamespace, tableAvroSchema.isError, requiredFields.asJava)
val requiredStructSchema = AvroConversionUtils.convertAvroSchemaToStructType(requiredAvroSchema)
(requiredAvroSchema, requiredStructSchema)
}
def toAttribute(tableSchema: StructType): Seq[AttributeReference] = {
tableSchema.map { field =>
AttributeReference(field.name, field.dataType, field.nullable, field.metadata)()
}
}
def collectFieldIndexes(projectedSchema: StructType, originalSchema: StructType): Seq[Int] = {
val nameToIndex = originalSchema.fields.zipWithIndex.map{ case (field, index) =>
field.name -> index
}.toMap
projectedSchema.map(field => nameToIndex(field.name))
}
}

View File

@@ -19,6 +19,7 @@
package org.apache.spark.sql.hudi
import org.apache.hudi.client.utils.SparkRowSerDe
import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
import org.apache.spark.sql.catalyst.expressions.Expression
@@ -26,7 +27,7 @@ import org.apache.spark.sql.catalyst.parser.ParserInterface
import org.apache.spark.sql.catalyst.plans.JoinType
import org.apache.spark.sql.catalyst.plans.logical.{Join, LogicalPlan}
import org.apache.spark.sql.catalyst.{AliasIdentifier, TableIdentifier}
import org.apache.spark.sql.execution.datasources.SparkParsePartitionUtil
import org.apache.spark.sql.execution.datasources.{FilePartition, PartitionedFile, SparkParsePartitionUtil}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.{Row, SparkSession}
@@ -92,4 +93,10 @@ trait SparkAdapter extends Serializable {
* ParserInterface#parseMultipartIdentifier is supported since spark3, for spark2 this should not be called.
*/
def parseMultipartIdentifier(parser: ParserInterface, sqlText: String): Seq[String]
/**
* Combine [[PartitionedFile]] to [[FilePartition]] according to `maxSplitBytes`.
*/
def getFilePartitions(sparkSession: SparkSession, partitionedFiles: Seq[PartitionedFile],
maxSplitBytes: Long): Seq[FilePartition]
}

View File

@@ -0,0 +1,95 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi
import org.apache.hudi.common.table.HoodieTableMetaClient
import org.apache.hudi.common.table.TableSchemaResolver
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.avro.SchemaConverters
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.Literal
import org.apache.spark.sql.execution.datasources.{PartitionedFile, _}
import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat
import org.apache.spark.sql.{Row, SQLContext}
import org.apache.spark.sql.sources.{BaseRelation, Filter, PrunedFilteredScan}
import org.apache.spark.sql.types.{BooleanType, StructType}
import scala.util.Try
/**
* The implement of [[BaseRelation]], which is used to respond to query that only touches the base files(Parquet),
* like query COW tables in Snapshot-Query and Read_Optimized mode and MOR tables in Read_Optimized mode.
*/
class BaseFileOnlyViewRelation(
sqlContext: SQLContext,
metaClient: HoodieTableMetaClient,
optParams: Map[String, String],
userSchema: Option[StructType]
) extends HoodieBaseRelation(sqlContext, metaClient, optParams, userSchema) with SparkAdapterSupport {
private val fileIndex = HoodieFileIndex(sparkSession,
metaClient,
userSchema,
optParams,
FileStatusCache.getOrCreate(sqlContext.sparkSession)
)
override def buildScan(requiredColumns: Array[String], filters: Array[Filter]): RDD[Row] = {
sqlContext.sparkSession.sessionState.conf.setConfString("spark.sql.parquet.enableVectorizedReader", "false")
val filterExpressions = HoodieSparkUtils.convertToCatalystExpressions(filters, tableStructSchema)
.getOrElse(Literal(true, BooleanType))
val (partitionFilters, dataFilters) = {
val splited = filters.map { filter =>
HoodieDataSourceHelper.splitPartitionAndDataPredicates(
sparkSession, filterExpressions, partitionColumns)
}
(splited.flatMap(_._1), splited.flatMap(_._2))
}
val partitionFiles = fileIndex.listFiles(partitionFilters, dataFilters).flatMap { partition =>
partition.files.flatMap { file =>
HoodieDataSourceHelper.splitFiles(
sparkSession = sparkSession,
file = file,
partitionValues = partition.values
)
}
}
val emptyPartitionFiles = partitionFiles.map{ f =>
PartitionedFile(InternalRow.empty, f.filePath, f.start, f.length)
}
val maxSplitBytes = sparkSession.sessionState.conf.filesMaxPartitionBytes
val filePartitions = sparkAdapter.getFilePartitions(sparkSession, emptyPartitionFiles, maxSplitBytes)
val requiredSchemaParquetReader = HoodieDataSourceHelper.buildHoodieParquetReader(
sparkSession = sparkSession,
dataSchema = tableStructSchema,
partitionSchema = StructType(Nil),
requiredSchema = tableStructSchema,
filters = filters,
options = optParams,
hadoopConf = sparkSession.sessionState.newHadoopConf()
)
new HoodieFileScanRDD(sparkSession, requiredColumns, tableStructSchema,
requiredSchemaParquetReader, filePartitions)
}
}

View File

@@ -108,6 +108,7 @@ class DefaultSource extends RelationProvider
val isBootstrappedTable = metaClient.getTableConfig.getBootstrapBasePath.isPresent
val tableType = metaClient.getTableType
val queryType = parameters(QUERY_TYPE.key)
val userSchema = if (schema == null) Option.empty[StructType] else Some(schema)
log.info(s"Is bootstrapped table => $isBootstrappedTable, tableType is: $tableType, queryType is: $queryType")
if (metaClient.getCommitsTimeline.filterCompletedInstants.countInstants() == 0) {
@@ -117,20 +118,20 @@ class DefaultSource extends RelationProvider
case (COPY_ON_WRITE, QUERY_TYPE_SNAPSHOT_OPT_VAL, false) |
(COPY_ON_WRITE, QUERY_TYPE_READ_OPTIMIZED_OPT_VAL, false) |
(MERGE_ON_READ, QUERY_TYPE_READ_OPTIMIZED_OPT_VAL, false) =>
getBaseFileOnlyView(useHoodieFileIndex, sqlContext, parameters, schema, tablePath,
getBaseFileOnlyView(useHoodieFileIndex, sqlContext, parameters, userSchema, tablePath,
readPaths, metaClient)
case (COPY_ON_WRITE, QUERY_TYPE_INCREMENTAL_OPT_VAL, _) =>
new IncrementalRelation(sqlContext, parameters, schema, metaClient)
new IncrementalRelation(sqlContext, parameters, userSchema, metaClient)
case (MERGE_ON_READ, QUERY_TYPE_SNAPSHOT_OPT_VAL, false) =>
new MergeOnReadSnapshotRelation(sqlContext, parameters, schema, globPaths, metaClient)
new MergeOnReadSnapshotRelation(sqlContext, parameters, userSchema, globPaths, metaClient)
case (MERGE_ON_READ, QUERY_TYPE_INCREMENTAL_OPT_VAL, _) =>
new MergeOnReadIncrementalRelation(sqlContext, parameters, schema, metaClient)
new MergeOnReadIncrementalRelation(sqlContext, parameters, userSchema, metaClient)
case (_, _, true) =>
new HoodieBootstrapRelation(sqlContext, schema, globPaths, metaClient, parameters)
new HoodieBootstrapRelation(sqlContext, userSchema, globPaths, metaClient, parameters)
case (_, _, _) =>
throw new HoodieException(s"Invalid query type : $queryType for tableType: $tableType," +
@@ -185,7 +186,7 @@ class DefaultSource extends RelationProvider
private def getBaseFileOnlyView(useHoodieFileIndex: Boolean,
sqlContext: SQLContext,
optParams: Map[String, String],
schema: StructType,
schema: Option[StructType],
tablePath: String,
extraReadPaths: Seq[String],
metaClient: HoodieTableMetaClient): BaseRelation = {
@@ -196,17 +197,7 @@ class DefaultSource extends RelationProvider
}
if (useHoodieFileIndex) {
val fileIndex = HoodieFileIndex(sqlContext.sparkSession, metaClient,
if (schema == null) Option.empty[StructType] else Some(schema),
optParams, FileStatusCache.getOrCreate(sqlContext.sparkSession))
HadoopFsRelation(
fileIndex,
fileIndex.partitionSchema,
fileIndex.dataSchema,
bucketSpec = None,
fileFormat = tableFileFormat,
optParams)(sqlContext.sparkSession)
new BaseFileOnlyViewRelation(sqlContext, metaClient, optParams, schema)
} else {
// this is just effectively RO view only, where `path` can contain a mix of
// non-hoodie/hoodie path files. set the path filter up
@@ -215,7 +206,7 @@ class DefaultSource extends RelationProvider
classOf[HoodieROTablePathFilter],
classOf[org.apache.hadoop.fs.PathFilter])
val specifySchema = if (schema == null) {
val specifySchema = if (schema.isEmpty) {
// Load the schema from the commit meta data.
// Here we should specify the schema to the latest commit schema since
// the table schema evolution.
@@ -228,7 +219,7 @@ class DefaultSource extends RelationProvider
// with tableSchemaResolver, return None here.
}
} else {
Some(schema)
schema
}
// simply return as a regular relation
DataSource.apply(

View File

@@ -0,0 +1,55 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi
import org.apache.avro.Schema
import org.apache.hudi.common.table.{HoodieTableMetaClient, TableSchemaResolver}
import org.apache.spark.internal.Logging
import org.apache.spark.sql.{SQLContext, SparkSession}
import org.apache.spark.sql.avro.SchemaConverters
import org.apache.spark.sql.sources.{BaseRelation, PrunedFilteredScan}
import org.apache.spark.sql.types.StructType
import scala.util.Try
/**
* Hoodie BaseRelation which extends [[PrunedFilteredScan]].
*/
abstract class HoodieBaseRelation(
val sqlContext: SQLContext,
metaClient: HoodieTableMetaClient,
optParams: Map[String, String],
userSchema: Option[StructType])
extends BaseRelation with PrunedFilteredScan with Logging{
protected val sparkSession: SparkSession = sqlContext.sparkSession
protected val tableAvroSchema: Schema = {
val schemaUtil = new TableSchemaResolver(metaClient)
Try (schemaUtil.getTableAvroSchema).getOrElse(SchemaConverters.toAvroType(userSchema.get))
}
protected val tableStructSchema: StructType = AvroConversionUtils.convertAvroSchemaToStructType(tableAvroSchema)
protected val partitionColumns: Array[String] = metaClient.getTableConfig.getPartitionFields.orElse(Array.empty)
override def schema: StructType = userSchema.getOrElse(tableStructSchema)
}

View File

@@ -24,12 +24,13 @@ import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.execution.datasources.PartitionedFile
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.vectorized.ColumnarBatch
import org.apache.hudi.HoodieDataSourceHelper._
class HoodieBootstrapRDD(@transient spark: SparkSession,
dataReadFunction: PartitionedFile => Iterator[Any],
skeletonReadFunction: PartitionedFile => Iterator[Any],
regularReadFunction: PartitionedFile => Iterator[Any],
dataReadFunction: PartitionedFile => Iterator[InternalRow],
skeletonReadFunction: PartitionedFile => Iterator[InternalRow],
regularReadFunction: PartitionedFile => Iterator[InternalRow],
dataSchema: StructType,
skeletonSchema: StructType,
requiredColumns: Array[String],
@@ -56,18 +57,18 @@ class HoodieBootstrapRDD(@transient spark: SparkSession,
// It is a bootstrap split. Check both skeleton and data files.
if (dataSchema.isEmpty) {
// No data column to fetch, hence fetch only from skeleton file
partitionedFileIterator = read(bootstrapPartition.split.skeletonFile.get, skeletonReadFunction)
partitionedFileIterator = skeletonReadFunction(bootstrapPartition.split.skeletonFile.get)
} else if (skeletonSchema.isEmpty) {
// No metadata column to fetch, hence fetch only from data file
partitionedFileIterator = read(bootstrapPartition.split.dataFile, dataReadFunction)
partitionedFileIterator = dataReadFunction(bootstrapPartition.split.dataFile)
} else {
// Fetch from both data and skeleton file, and merge
val dataFileIterator = read(bootstrapPartition.split.dataFile, dataReadFunction)
val skeletonFileIterator = read(bootstrapPartition.split.skeletonFile.get, skeletonReadFunction)
val dataFileIterator = dataReadFunction(bootstrapPartition.split.dataFile)
val skeletonFileIterator = skeletonReadFunction(bootstrapPartition.split.skeletonFile.get)
partitionedFileIterator = merge(skeletonFileIterator, dataFileIterator)
}
} else {
partitionedFileIterator = read(bootstrapPartition.split.dataFile, regularReadFunction)
partitionedFileIterator = regularReadFunction(bootstrapPartition.split.dataFile)
}
partitionedFileIterator
}
@@ -101,19 +102,6 @@ class HoodieBootstrapRDD(@transient spark: SparkSession,
mergedRow
}
def read(partitionedFile: PartitionedFile, readFileFunction: PartitionedFile => Iterator[Any])
: Iterator[InternalRow] = {
val fileIterator = readFileFunction(partitionedFile)
import scala.collection.JavaConverters._
val rows = fileIterator.flatMap(_ match {
case r: InternalRow => Seq(r)
case b: ColumnarBatch => b.rowIterator().asScala
})
rows
}
override protected def getPartitions: Array[Partition] = {
tableState.files.zipWithIndex.map(file => {
if (file._1.skeletonFile.isDefined) {

View File

@@ -52,7 +52,7 @@ import scala.collection.JavaConverters._
* @param optParams DataSource options passed by the user
*/
class HoodieBootstrapRelation(@transient val _sqlContext: SQLContext,
val userSchema: StructType,
val userSchema: Option[StructType],
val globPaths: Option[Seq[Path]],
val metaClient: HoodieTableMetaClient,
val optParams: Map[String, String]) extends BaseRelation
@@ -107,37 +107,35 @@ class HoodieBootstrapRelation(@transient val _sqlContext: SQLContext,
})
// Prepare readers for reading data file and skeleton files
val dataReadFunction = new ParquetFileFormat()
.buildReaderWithPartitionValues(
sparkSession = _sqlContext.sparkSession,
dataSchema = dataSchema,
partitionSchema = StructType(Seq.empty),
requiredSchema = requiredDataSchema,
filters = if (requiredSkeletonSchema.isEmpty) filters else Seq() ,
options = Map.empty,
hadoopConf = _sqlContext.sparkSession.sessionState.newHadoopConf()
)
val dataReadFunction = HoodieDataSourceHelper.buildHoodieParquetReader(
sparkSession = _sqlContext.sparkSession,
dataSchema = dataSchema,
partitionSchema = StructType(Seq.empty),
requiredSchema = requiredDataSchema,
filters = if (requiredSkeletonSchema.isEmpty) filters else Seq() ,
options = optParams,
hadoopConf = _sqlContext.sparkSession.sessionState.newHadoopConf()
)
val skeletonReadFunction = new ParquetFileFormat()
.buildReaderWithPartitionValues(
sparkSession = _sqlContext.sparkSession,
dataSchema = skeletonSchema,
partitionSchema = StructType(Seq.empty),
requiredSchema = requiredSkeletonSchema,
filters = if (requiredDataSchema.isEmpty) filters else Seq(),
options = Map.empty,
hadoopConf = _sqlContext.sparkSession.sessionState.newHadoopConf()
)
val skeletonReadFunction = HoodieDataSourceHelper.buildHoodieParquetReader(
sparkSession = _sqlContext.sparkSession,
dataSchema = skeletonSchema,
partitionSchema = StructType(Seq.empty),
requiredSchema = requiredSkeletonSchema,
filters = if (requiredDataSchema.isEmpty) filters else Seq(),
options = optParams,
hadoopConf = _sqlContext.sparkSession.sessionState.newHadoopConf()
)
val regularReadFunction = new ParquetFileFormat()
.buildReaderWithPartitionValues(
sparkSession = _sqlContext.sparkSession,
dataSchema = fullSchema,
partitionSchema = StructType(Seq.empty),
requiredSchema = requiredColsSchema,
filters = filters,
options = Map.empty,
hadoopConf = _sqlContext.sparkSession.sessionState.newHadoopConf())
val regularReadFunction = HoodieDataSourceHelper.buildHoodieParquetReader(
sparkSession = _sqlContext.sparkSession,
dataSchema = fullSchema,
partitionSchema = StructType(Seq.empty),
requiredSchema = requiredColsSchema,
filters = filters,
options = optParams,
hadoopConf = _sqlContext.sparkSession.sessionState.newHadoopConf()
)
val rdd = new HoodieBootstrapRDD(_sqlContext.sparkSession, dataReadFunction, skeletonReadFunction,
regularReadFunction, requiredDataSchema, requiredSkeletonSchema, requiredColumns, tableState)

View File

@@ -0,0 +1,158 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.FileStatus
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.{Expression, PredicateHelper, SpecificInternalRow, SubqueryExpression, UnsafeProjection}
import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat
import org.apache.spark.sql.execution.datasources.{FilePartition, PartitionedFile}
import org.apache.spark.sql.sources.Filter
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.vectorized.ColumnarBatch
import scala.collection.JavaConverters._
import scala.collection.mutable.ArrayBuffer
object HoodieDataSourceHelper extends PredicateHelper {
/**
* Partition the given condition into two sequence of conjunctive predicates:
* - predicates that can be evaluated using metadata only.
* - other predicates.
*/
def splitPartitionAndDataPredicates(
spark: SparkSession,
condition: Expression,
partitionColumns: Seq[String]): (Seq[Expression], Seq[Expression]) = {
splitConjunctivePredicates(condition).partition(
isPredicateMetadataOnly(spark, _, partitionColumns))
}
/**
* Check if condition can be evaluated using only metadata. In Delta, this means the condition
* only references partition columns and involves no subquery.
*/
def isPredicateMetadataOnly(
spark: SparkSession,
condition: Expression,
partitionColumns: Seq[String]): Boolean = {
isPredicatePartitionColumnsOnly(spark, condition, partitionColumns) &&
!SubqueryExpression.hasSubquery(condition)
}
/**
* Does the predicate only contains partition columns?
*/
def isPredicatePartitionColumnsOnly(
spark: SparkSession,
condition: Expression,
partitionColumns: Seq[String]): Boolean = {
val nameEquality = spark.sessionState.analyzer.resolver
condition.references.forall { r =>
partitionColumns.exists(nameEquality(r.name, _))
}
}
/**
* Wrapper `buildReaderWithPartitionValues` of [[ParquetFileFormat]]
* to deal with [[ColumnarBatch]] when enable parquet vectorized reader if necessary.
*/
def buildHoodieParquetReader(
sparkSession: SparkSession,
dataSchema: StructType,
partitionSchema: StructType,
requiredSchema: StructType,
filters: Seq[Filter],
options: Map[String, String],
hadoopConf: Configuration): (PartitionedFile) => Iterator[InternalRow] = {
val readParquetFile: PartitionedFile => Iterator[Any] = new ParquetFileFormat().buildReaderWithPartitionValues(
sparkSession = sparkSession,
dataSchema = dataSchema,
partitionSchema = partitionSchema,
requiredSchema = requiredSchema,
filters = filters,
options = options,
hadoopConf = hadoopConf
)
file: PartitionedFile => {
val iter = readParquetFile(file)
val rows = iter.flatMap(_ match {
case r: InternalRow => Seq(r)
case b: ColumnarBatch => b.rowIterator().asScala
})
rows
}
}
/**
* Extract the required schema from [[InternalRow]]
*/
def extractRequiredSchema(
iter: Iterator[InternalRow],
requiredSchema: StructType,
requiredFieldPos: Seq[Int]): Iterator[InternalRow] = {
val unsafeProjection = UnsafeProjection.create(requiredSchema)
val rows = iter.map { row =>
unsafeProjection(createInternalRowWithSchema(row, requiredSchema, requiredFieldPos))
}
rows
}
/**
* Convert [[InternalRow]] to [[SpecificInternalRow]].
*/
def createInternalRowWithSchema(
row: InternalRow,
schema: StructType,
positions: Seq[Int]): InternalRow = {
val rowToReturn = new SpecificInternalRow(schema)
var curIndex = 0
schema.zip(positions).foreach { case (field, pos) =>
val curField = if (row.isNullAt(pos)) {
null
} else {
row.get(pos, field.dataType)
}
rowToReturn.update(curIndex, curField)
curIndex += 1
}
rowToReturn
}
def splitFiles(
sparkSession: SparkSession,
file: FileStatus,
partitionValues: InternalRow): Seq[PartitionedFile] = {
val filePath = file.getPath
val maxSplitBytes = sparkSession.sessionState.conf.filesMaxPartitionBytes
(0L until file.getLen by maxSplitBytes).map { offset =>
val remaining = file.getLen - offset
val size = if (remaining > maxSplitBytes) maxSplitBytes else remaining
PartitionedFile(partitionValues, filePath.toUri.toString, offset, size)
}
}
}

View File

@@ -0,0 +1,109 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi
import org.apache.spark.{Partition, TaskContext}
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow}
import org.apache.spark.sql.execution.QueryExecutionException
import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.sql.execution.datasources.{FilePartition, PartitionedFile, SchemaColumnConvertNotSupportedException}
import org.apache.spark.sql.types.StructType
/**
* Similar to [[org.apache.spark.sql.execution.datasources.FileScanRDD]].
*
* This class will extract the fields needed according to [[requiredColumns]] and
* return iterator of [[org.apache.spark.sql.Row]] directly.
*/
class HoodieFileScanRDD(
@transient private val sparkSession: SparkSession,
requiredColumns: Array[String],
schema: StructType,
readFunction: PartitionedFile => Iterator[InternalRow],
@transient val filePartitions: Seq[FilePartition])
extends RDD[Row](sparkSession.sparkContext, Nil) {
private val requiredSchema = {
val nameToStructField = schema.map(field => (field.name, field)).toMap
StructType(requiredColumns.map(nameToStructField))
}
private val requiredFieldPos = HoodieSparkUtils.collectFieldIndexes(requiredSchema, schema)
override def compute(split: Partition, context: TaskContext): Iterator[Row] = {
val iterator = new Iterator[Object] with AutoCloseable {
private[this] val files = split.asInstanceOf[FilePartition].files.toIterator
private[this] var currentFile: PartitionedFile = null
private[this] var currentIterator: Iterator[Object] = null
override def hasNext: Boolean = {
(currentIterator != null && currentIterator.hasNext) || nextIterator()
}
def next(): Object = {
currentIterator.next()
}
/** Advances to the next file. Returns true if a new non-empty iterator is available. */
private def nextIterator(): Boolean = {
if (files.hasNext) {
currentFile = files.next()
logInfo(s"Reading File $currentFile")
currentIterator = readFunction(currentFile)
try {
hasNext
} catch {
case e: SchemaColumnConvertNotSupportedException =>
val message = "Parquet column cannot be converted in " +
s"file ${currentFile.filePath}. Column: ${e.getColumn}, " +
s"Expected: ${e.getLogicalType}, Found: ${e.getPhysicalType}"
throw new QueryExecutionException(message, e)
case e => throw e
}
} else {
currentFile = null
false
}
}
override def close(): Unit = {}
}
// Register an on-task-completion callback to close the input stream.
context.addTaskCompletionListener[Unit](_ => iterator.close())
// extract required columns from row
val iterAfterExtract = HoodieDataSourceHelper.extractRequiredSchema(
iterator.asInstanceOf[Iterator[InternalRow]],
requiredSchema,
requiredFieldPos)
// convert InternalRow to Row and return
val converter = CatalystTypeConverters.createToScalaConverter(requiredSchema)
iterAfterExtract.map(converter(_).asInstanceOf[Row])
}
override protected def getPartitions: Array[Partition] = filePartitions.toArray
}

View File

@@ -20,19 +20,22 @@ package org.apache.hudi
import org.apache.avro.Schema
import org.apache.avro.generic.{GenericRecord, GenericRecordBuilder}
import org.apache.hadoop.conf.Configuration
import org.apache.hudi.HoodieDataSourceHelper._
import org.apache.hudi.common.fs.FSUtils
import org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner
import org.apache.hudi.config.HoodiePayloadConfig
import org.apache.hudi.exception.HoodieException
import org.apache.hudi.hadoop.config.HoodieRealtimeConfig
import org.apache.hudi.hadoop.utils.HoodieInputFormatUtils.HOODIE_RECORD_KEY_COL_POS
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.avro.{HoodieAvroDeserializer, HoodieAvroSerializer}
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.{SpecificInternalRow, UnsafeProjection}
import org.apache.spark.sql.catalyst.expressions.UnsafeProjection
import org.apache.spark.sql.execution.datasources.PartitionedFile
import org.apache.spark.sql.vectorized.ColumnarBatch
import org.apache.spark.{Partition, SerializableWritable, SparkContext, TaskContext}
import java.io.Closeable
@@ -46,8 +49,8 @@ case class HoodieMergeOnReadPartition(index: Int, split: HoodieMergeOnReadFileSp
class HoodieMergeOnReadRDD(@transient sc: SparkContext,
@transient config: Configuration,
fullSchemaFileReader: PartitionedFile => Iterator[Any],
requiredSchemaFileReader: PartitionedFile => Iterator[Any],
fullSchemaFileReader: PartitionedFile => Iterator[InternalRow],
requiredSchemaFileReader: PartitionedFile => Iterator[InternalRow],
tableState: HoodieMergeOnReadTableState)
extends RDD[InternalRow](sc, Nil) {
@@ -61,26 +64,33 @@ class HoodieMergeOnReadRDD(@transient sc: SparkContext,
} else {
new Properties()
}
private val requiredSchema = tableState.requiredStructSchema
private val requiredFieldPosition = HoodieSparkUtils.collectFieldIndexes(requiredSchema,
tableState.tableStructSchema
)
override def compute(split: Partition, context: TaskContext): Iterator[InternalRow] = {
val mergeOnReadPartition = split.asInstanceOf[HoodieMergeOnReadPartition]
val iter = mergeOnReadPartition.split match {
case dataFileOnlySplit if dataFileOnlySplit.logPaths.isEmpty =>
val rows = read(dataFileOnlySplit.dataFile.get, requiredSchemaFileReader)
extractRequiredSchema(rows)
val rows = requiredSchemaFileReader(dataFileOnlySplit.dataFile.get)
extractRequiredSchema(rows, requiredSchema, requiredFieldPosition)
case logFileOnlySplit if logFileOnlySplit.dataFile.isEmpty =>
logFileIterator(logFileOnlySplit, getConfig)
case skipMergeSplit if skipMergeSplit.mergeType
.equals(DataSourceReadOptions.REALTIME_SKIP_MERGE_OPT_VAL) =>
skipMergeFileIterator(
skipMergeSplit,
read(skipMergeSplit.dataFile.get, requiredSchemaFileReader),
requiredSchemaFileReader(skipMergeSplit.dataFile.get),
getConfig
)
case payloadCombineSplit if payloadCombineSplit.mergeType
.equals(DataSourceReadOptions.REALTIME_PAYLOAD_COMBINE_OPT_VAL) =>
payloadCombineFileIterator(
payloadCombineSplit,
read(payloadCombineSplit.dataFile.get, fullSchemaFileReader),
fullSchemaFileReader(payloadCombineSplit.dataFile.get),
getConfig
)
case _ => throw new HoodieException(s"Unable to select an Iterator to read the Hoodie MOR File Split for " +
@@ -112,36 +122,11 @@ class HoodieMergeOnReadRDD(@transient sc: SparkContext,
}
}
private def read(partitionedFile: PartitionedFile,
readFileFunction: PartitionedFile => Iterator[Any]): Iterator[InternalRow] = {
val fileIterator = readFileFunction(partitionedFile)
val rows = fileIterator.flatMap(_ match {
case r: InternalRow => Seq(r)
case b: ColumnarBatch => b.rowIterator().asScala
})
rows
}
private def extractRequiredSchema(iter: Iterator[InternalRow]): Iterator[InternalRow] = {
val tableAvroSchema = new Schema.Parser().parse(tableState.tableAvroSchema)
val requiredAvroSchema = new Schema.Parser().parse(tableState.requiredAvroSchema)
val requiredFieldPosition = tableState.requiredStructSchema
.map(f => tableAvroSchema.getField(f.name).pos()).toList
val unsafeProjection = UnsafeProjection.create(tableState.requiredStructSchema)
val rows = iter.map { row =>
unsafeProjection(createRowWithRequiredSchema(row, requiredFieldPosition))
}
rows
}
private def logFileIterator(split: HoodieMergeOnReadFileSplit,
config: Configuration): Iterator[InternalRow] =
new Iterator[InternalRow] with Closeable {
private val tableAvroSchema = new Schema.Parser().parse(tableState.tableAvroSchema)
private val requiredAvroSchema = new Schema.Parser().parse(tableState.requiredAvroSchema)
private val requiredFieldPosition =
tableState.requiredStructSchema
.map(f => tableAvroSchema.getField(f.name).pos()).toList
private val recordBuilder = new GenericRecordBuilder(requiredAvroSchema)
private val deserializer = HoodieAvroDeserializer(requiredAvroSchema, tableState.requiredStructSchema)
private val unsafeProjection = UnsafeProjection.create(tableState.requiredStructSchema)
@@ -189,9 +174,6 @@ class HoodieMergeOnReadRDD(@transient sc: SparkContext,
new Iterator[InternalRow] with Closeable {
private val tableAvroSchema = new Schema.Parser().parse(tableState.tableAvroSchema)
private val requiredAvroSchema = new Schema.Parser().parse(tableState.requiredAvroSchema)
private val requiredFieldPosition =
tableState.requiredStructSchema
.map(f => tableAvroSchema.getField(f.name).pos()).toList
private val recordBuilder = new GenericRecordBuilder(requiredAvroSchema)
private val deserializer = HoodieAvroDeserializer(requiredAvroSchema, tableState.requiredStructSchema)
private val unsafeProjection = UnsafeProjection.create(tableState.requiredStructSchema)
@@ -205,7 +187,7 @@ class HoodieMergeOnReadRDD(@transient sc: SparkContext,
override def hasNext: Boolean = {
if (baseFileIterator.hasNext) {
val curRow = baseFileIterator.next()
recordToLoad = unsafeProjection(createRowWithRequiredSchema(curRow, requiredFieldPosition))
recordToLoad = unsafeProjection(createInternalRowWithSchema(curRow, requiredSchema, requiredFieldPosition))
true
} else {
if (logRecordsKeyIterator.hasNext) {
@@ -247,9 +229,6 @@ class HoodieMergeOnReadRDD(@transient sc: SparkContext,
new Iterator[InternalRow] with Closeable {
private val tableAvroSchema = new Schema.Parser().parse(tableState.tableAvroSchema)
private val requiredAvroSchema = new Schema.Parser().parse(tableState.requiredAvroSchema)
private val requiredFieldPosition =
tableState.requiredStructSchema
.map(f => tableAvroSchema.getField(f.name).pos()).toList
private val serializer = HoodieAvroSerializer(tableState.tableStructSchema, tableAvroSchema, false)
private val requiredDeserializer = HoodieAvroDeserializer(requiredAvroSchema, tableState.requiredStructSchema)
private val recordBuilder = new GenericRecordBuilder(requiredAvroSchema)
@@ -289,7 +268,7 @@ class HoodieMergeOnReadRDD(@transient sc: SparkContext,
}
} else {
// No merge needed, load current row with required schema
recordToLoad = unsafeProjection(createRowWithRequiredSchema(curRow, requiredFieldPosition))
recordToLoad = unsafeProjection(createInternalRowWithSchema(curRow, requiredSchema, requiredFieldPosition))
true
}
} else {
@@ -339,22 +318,6 @@ class HoodieMergeOnReadRDD(@transient sc: SparkContext,
historyAvroRecord, tableAvroSchema, payloadProps)
}
}
private def createRowWithRequiredSchema(row: InternalRow, requiredFieldPosition: Seq[Int]): InternalRow = {
val rowToReturn = new SpecificInternalRow(tableState.requiredStructSchema)
val posIterator = requiredFieldPosition.iterator
var curIndex = 0
tableState.requiredStructSchema.foreach(
f => {
val curPos = posIterator.next()
val curField = if (row.isNullAt(curPos)) null else row.get(curPos, f.dataType)
rowToReturn.update(curIndex, curField)
curIndex = curIndex + 1
}
)
rowToReturn
}
}
private object HoodieMergeOnReadRDD {

View File

@@ -48,7 +48,7 @@ import scala.collection.mutable
*/
class IncrementalRelation(val sqlContext: SQLContext,
val optParams: Map[String, String],
val userSchema: StructType,
val userSchema: Option[StructType],
val metaClient: HoodieTableMetaClient) extends BaseRelation with TableScan {
private val log = LogManager.getLogger(classOf[IncrementalRelation])

View File

@@ -25,9 +25,12 @@ import org.apache.hudi.hadoop.utils.HoodieInputFormatUtils.listAffectedFilesForC
import org.apache.hudi.hadoop.utils.HoodieInputFormatUtils.getCommitMetadata
import org.apache.hudi.hadoop.utils.HoodieInputFormatUtils.getWritePartitionPaths
import org.apache.hudi.hadoop.utils.HoodieRealtimeRecordReaderUtils.getMaxCompactionMemoryInBytes
import org.apache.hadoop.fs.{GlobPattern, Path}
import org.apache.hadoop.mapred.JobConf
import org.apache.log4j.LogManager
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.execution.datasources.PartitionedFile
@@ -43,15 +46,15 @@ import scala.collection.JavaConversions._
* Relation, that implements the Hoodie incremental view for Merge On Read table.
*
*/
class MergeOnReadIncrementalRelation(val sqlContext: SQLContext,
class MergeOnReadIncrementalRelation(sqlContext: SQLContext,
val optParams: Map[String, String],
val userSchema: StructType,
val userSchema: Option[StructType],
val metaClient: HoodieTableMetaClient)
extends BaseRelation with PrunedFilteredScan {
extends HoodieBaseRelation(sqlContext, metaClient, optParams, userSchema) {
private val log = LogManager.getLogger(classOf[MergeOnReadIncrementalRelation])
private val conf = sqlContext.sparkContext.hadoopConfiguration
private val jobConf = new JobConf(conf)
private val commitTimeline = metaClient.getCommitsAndCompactionTimeline.filterCompletedInstants()
if (commitTimeline.empty()) {
throw new HoodieException("No instants to incrementally pull")
@@ -72,13 +75,13 @@ class MergeOnReadIncrementalRelation(val sqlContext: SQLContext,
private val commitsTimelineToReturn = commitTimeline.findInstantsInRange(
optParams(DataSourceReadOptions.BEGIN_INSTANTTIME.key),
optParams.getOrElse(DataSourceReadOptions.END_INSTANTTIME.key, lastInstant.getTimestamp))
log.debug(s"${commitsTimelineToReturn.getInstants.iterator().toList.map(f => f.toString).mkString(",")}")
logDebug(s"${commitsTimelineToReturn.getInstants.iterator().toList.map(f => f.toString).mkString(",")}")
private val commitsToReturn = commitsTimelineToReturn.getInstants.iterator().toList
private val schemaResolver = new TableSchemaResolver(metaClient)
private val tableAvroSchema = schemaResolver.getTableAvroSchema
private val tableStructSchema = AvroConversionUtils.convertAvroSchemaToStructType(tableAvroSchema)
private val maxCompactionMemoryInBytes = getMaxCompactionMemoryInBytes(jobConf)
private val fileIndex = if (commitsToReturn.isEmpty) List() else buildFileIndex()
private val preCombineField = {
val preCombineFieldFromTableConfig = metaClient.getTableConfig.getPreCombineField
if (preCombineFieldFromTableConfig != null) {
@@ -89,7 +92,6 @@ class MergeOnReadIncrementalRelation(val sqlContext: SQLContext,
optParams.get(DataSourceReadOptions.READ_PRE_COMBINE_FIELD.key)
}
}
override def schema: StructType = tableStructSchema
override def needConversion: Boolean = false
@@ -108,8 +110,8 @@ class MergeOnReadIncrementalRelation(val sqlContext: SQLContext,
if (fileIndex.isEmpty) {
sqlContext.sparkContext.emptyRDD[Row]
} else {
log.debug(s"buildScan requiredColumns = ${requiredColumns.mkString(",")}")
log.debug(s"buildScan filters = ${filters.mkString(",")}")
logDebug(s"buildScan requiredColumns = ${requiredColumns.mkString(",")}")
logDebug(s"buildScan filters = ${filters.mkString(",")}")
// config to ensure the push down filter for parquet will be applied.
sqlContext.sparkSession.sessionState.conf.setConfString("spark.sql.parquet.filterPushdown", "true")
sqlContext.sparkSession.sessionState.conf.setConfString("spark.sql.parquet.recordLevelFilter.enabled", "true")
@@ -121,7 +123,7 @@ class MergeOnReadIncrementalRelation(val sqlContext: SQLContext,
filters :+ isNotNullFilter :+ largerThanFilter :+ lessThanFilter
}
val (requiredAvroSchema, requiredStructSchema) =
MergeOnReadSnapshotRelation.getRequiredSchema(tableAvroSchema, requiredColumns)
HoodieSparkUtils.getRequiredSchema(tableAvroSchema, requiredColumns)
val hoodieTableState = HoodieMergeOnReadTableState(
tableStructSchema,
@@ -132,7 +134,7 @@ class MergeOnReadIncrementalRelation(val sqlContext: SQLContext,
preCombineField,
Option.empty
)
val fullSchemaParquetReader = new ParquetFileFormat().buildReaderWithPartitionValues(
val fullSchemaParquetReader = HoodieDataSourceHelper.buildHoodieParquetReader(
sparkSession = sqlContext.sparkSession,
dataSchema = tableStructSchema,
partitionSchema = StructType(Nil),
@@ -141,7 +143,8 @@ class MergeOnReadIncrementalRelation(val sqlContext: SQLContext,
options = optParams,
hadoopConf = sqlContext.sparkSession.sessionState.newHadoopConf()
)
val requiredSchemaParquetReader = new ParquetFileFormat().buildReaderWithPartitionValues(
val requiredSchemaParquetReader = HoodieDataSourceHelper.buildHoodieParquetReader(
sparkSession = sqlContext.sparkSession,
dataSchema = tableStructSchema,
partitionSchema = StructType(Nil),
@@ -173,7 +176,7 @@ class MergeOnReadIncrementalRelation(val sqlContext: SQLContext,
).toList
val latestCommit = fsView.getLastInstant.get.getTimestamp
if (log.isDebugEnabled) {
fileGroups.foreach(f => log.debug(s"current file group id: " +
fileGroups.foreach(f => logDebug(s"current file group id: " +
s"${f.getFileGroupId} and file slices ${f.getLatestFileSlice.get.toString}"))
}

View File

@@ -18,14 +18,15 @@
package org.apache.hudi
import org.apache.avro.Schema
import org.apache.hudi.common.model.HoodieLogFile
import org.apache.hudi.common.table.{HoodieTableMetaClient, TableSchemaResolver}
import org.apache.hudi.common.table.view.HoodieTableFileSystemView
import org.apache.hudi.hadoop.utils.HoodieRealtimeInputFormatUtils
import org.apache.hudi.hadoop.utils.HoodieRealtimeRecordReaderUtils.getMaxCompactionMemoryInBytes
import org.apache.hadoop.fs.Path
import org.apache.hadoop.mapred.JobConf
import org.apache.spark.internal.Logging
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.avro.SchemaConverters
@@ -55,32 +56,22 @@ case class HoodieMergeOnReadTableState(tableStructSchema: StructType,
preCombineField: Option[String],
recordKeyFieldOpt: Option[String])
class MergeOnReadSnapshotRelation(val sqlContext: SQLContext,
val optParams: Map[String, String],
val userSchema: StructType,
class MergeOnReadSnapshotRelation(sqlContext: SQLContext,
optParams: Map[String, String],
val userSchema: Option[StructType],
val globPaths: Option[Seq[Path]],
val metaClient: HoodieTableMetaClient)
extends BaseRelation with PrunedFilteredScan with Logging {
extends HoodieBaseRelation(sqlContext, metaClient, optParams, userSchema) {
private val conf = sqlContext.sparkContext.hadoopConfiguration
private val jobConf = new JobConf(conf)
// use schema from latest metadata, if not present, read schema from the data file
private val schemaResolver = new TableSchemaResolver(metaClient)
private lazy val tableAvroSchema = {
try {
schemaResolver.getTableAvroSchema
} catch {
case _: Throwable => // If there is no commit in the table, we cann't get the schema
// with schemaUtil, use the userSchema instead.
SchemaConverters.toAvroType(userSchema)
}
}
private lazy val tableStructSchema = AvroConversionUtils.convertAvroSchemaToStructType(tableAvroSchema)
private val mergeType = optParams.getOrElse(
DataSourceReadOptions.REALTIME_MERGE.key,
DataSourceReadOptions.REALTIME_MERGE.defaultValue)
private val maxCompactionMemoryInBytes = getMaxCompactionMemoryInBytes(jobConf)
private val preCombineField = {
val preCombineFieldFromTableConfig = metaClient.getTableConfig.getPreCombineField
if (preCombineFieldFromTableConfig != null) {
@@ -95,7 +86,6 @@ class MergeOnReadSnapshotRelation(val sqlContext: SQLContext,
if (!metaClient.getTableConfig.populateMetaFields()) {
recordKeyFieldOpt = Option(metaClient.getTableConfig.getRecordKeyFieldProp)
}
override def schema: StructType = tableStructSchema
override def needConversion: Boolean = false
@@ -107,7 +97,7 @@ class MergeOnReadSnapshotRelation(val sqlContext: SQLContext,
log.debug(s" buildScan filters = ${filters.mkString(",")}")
val (requiredAvroSchema, requiredStructSchema) =
MergeOnReadSnapshotRelation.getRequiredSchema(tableAvroSchema, requiredColumns)
HoodieSparkUtils.getRequiredSchema(tableAvroSchema, requiredColumns)
val fileIndex = buildFileIndex(filters)
val hoodieTableState = HoodieMergeOnReadTableState(
tableStructSchema,
@@ -118,7 +108,7 @@ class MergeOnReadSnapshotRelation(val sqlContext: SQLContext,
preCombineField,
recordKeyFieldOpt
)
val fullSchemaParquetReader = new ParquetFileFormat().buildReaderWithPartitionValues(
val fullSchemaParquetReader = HoodieDataSourceHelper.buildHoodieParquetReader(
sparkSession = sqlContext.sparkSession,
dataSchema = tableStructSchema,
partitionSchema = StructType(Nil),
@@ -127,7 +117,8 @@ class MergeOnReadSnapshotRelation(val sqlContext: SQLContext,
options = optParams,
hadoopConf = sqlContext.sparkSession.sessionState.newHadoopConf()
)
val requiredSchemaParquetReader = new ParquetFileFormat().buildReaderWithPartitionValues(
val requiredSchemaParquetReader = HoodieDataSourceHelper.buildHoodieParquetReader(
sparkSession = sqlContext.sparkSession,
dataSchema = tableStructSchema,
partitionSchema = StructType(Nil),
@@ -252,14 +243,4 @@ object MergeOnReadSnapshotRelation {
path.toUri.toString
}
def getRequiredSchema(tableAvroSchema: Schema, requiredColumns: Array[String]): (Schema, StructType) = {
// First get the required avro-schema, then convert the avro-schema to spark schema.
val name2Fields = tableAvroSchema.getFields.asScala.map(f => f.name() -> f).toMap
val requiredFields = requiredColumns.map(c => name2Fields(c))
.map(f => new Schema.Field(f.name(), f.schema(), f.doc(), f.defaultVal(), f.order())).toList
val requiredAvroSchema = Schema.createRecord(tableAvroSchema.getName, tableAvroSchema.getDoc,
tableAvroSchema.getNamespace, tableAvroSchema.isError, requiredFields.asJava)
val requiredStructSchema = AvroConversionUtils.convertAvroSchemaToStructType(requiredAvroSchema)
(requiredAvroSchema, requiredStructSchema)
}
}

View File

@@ -161,12 +161,12 @@ class HoodieStreamSource(
val rdd = tableType match {
case HoodieTableType.COPY_ON_WRITE =>
val serDe = sparkAdapter.createSparkRowSerDe(RowEncoder(schema))
new IncrementalRelation(sqlContext, incParams, schema, metaClient)
new IncrementalRelation(sqlContext, incParams, Some(schema), metaClient)
.buildScan()
.map(serDe.serializeRow)
case HoodieTableType.MERGE_ON_READ =>
val requiredColumns = schema.fields.map(_.name)
new MergeOnReadIncrementalRelation(sqlContext, incParams, schema, metaClient)
new MergeOnReadIncrementalRelation(sqlContext, incParams, Some(schema), metaClient)
.buildScan(requiredColumns, Array.empty[Filter])
.asInstanceOf[RDD[InternalRow]]
case _ => throw new IllegalArgumentException(s"UnSupport tableType: $tableType")

View File

@@ -18,18 +18,24 @@
package org.apache.hudi
import org.apache.avro.Schema
import org.apache.avro.generic.GenericRecord
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.apache.hudi.testutils.DataSourceTestUtils
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.types.{StructType, TimestampType}
import org.apache.spark.sql.{Row, SparkSession}
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.Test
import org.junit.jupiter.api.io.TempDir
import java.io.File
import java.nio.file.Paths
import scala.collection.JavaConverters
class TestHoodieSparkUtils {
@@ -232,6 +238,29 @@ class TestHoodieSparkUtils {
spark.stop()
}
@Test
def testGetRequiredSchema(): Unit = {
val avroSchemaString = "{\"type\":\"record\",\"name\":\"record\"," +
"\"fields\":[{\"name\":\"_hoodie_commit_time\",\"type\":[\"null\",\"string\"],\"doc\":\"\",\"default\":null}," +
"{\"name\":\"_hoodie_commit_seqno\",\"type\":[\"null\",\"string\"],\"doc\":\"\",\"default\":null}," +
"{\"name\":\"_hoodie_record_key\",\"type\":[\"null\",\"string\"],\"doc\":\"\",\"default\":null}," +
"{\"name\":\"_hoodie_partition_path\",\"type\":[\"null\",\"string\"],\"doc\":\"\",\"default\":null}," +
"{\"name\":\"_hoodie_file_name\",\"type\":[\"null\",\"string\"],\"doc\":\"\",\"default\":null}," +
"{\"name\":\"uuid\",\"type\":\"string\"},{\"name\":\"name\",\"type\":[\"null\",\"string\"],\"default\":null}," +
"{\"name\":\"age\",\"type\":[\"null\",\"int\"],\"default\":null}," +
"{\"name\":\"ts\",\"type\":[\"null\",{\"type\":\"long\",\"logicalType\":\"timestamp-millis\"}],\"default\":null}," +
"{\"name\":\"partition\",\"type\":[\"null\",\"string\"],\"default\":null}]}"
val tableAvroSchema = new Schema.Parser().parse(avroSchemaString)
val (requiredAvroSchema, requiredStructSchema) =
HoodieSparkUtils.getRequiredSchema(tableAvroSchema, Array("ts"))
assertEquals("timestamp-millis",
requiredAvroSchema.getField("ts").schema().getTypes.get(1).getLogicalType.getName)
assertEquals(TimestampType, requiredStructSchema.fields(0).dataType)
}
def convertRowListToSeq(inputList: java.util.List[Row]): Seq[Row] =
JavaConverters.asScalaIteratorConverter(inputList.iterator).asScala.toSeq
}

View File

@@ -1,50 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi
import org.apache.avro.Schema
import org.apache.spark.sql.types.TimestampType
import org.junit.jupiter.api.Assertions.assertEquals
import org.junit.jupiter.api.Test
class TestMergeOnReadSnapshotRelation {
@Test
def testGetRequiredSchema(): Unit = {
val avroSchemaString = "{\"type\":\"record\",\"name\":\"record\"," +
"\"fields\":[{\"name\":\"_hoodie_commit_time\",\"type\":[\"null\",\"string\"],\"doc\":\"\",\"default\":null}," +
"{\"name\":\"_hoodie_commit_seqno\",\"type\":[\"null\",\"string\"],\"doc\":\"\",\"default\":null}," +
"{\"name\":\"_hoodie_record_key\",\"type\":[\"null\",\"string\"],\"doc\":\"\",\"default\":null}," +
"{\"name\":\"_hoodie_partition_path\",\"type\":[\"null\",\"string\"],\"doc\":\"\",\"default\":null}," +
"{\"name\":\"_hoodie_file_name\",\"type\":[\"null\",\"string\"],\"doc\":\"\",\"default\":null}," +
"{\"name\":\"uuid\",\"type\":\"string\"},{\"name\":\"name\",\"type\":[\"null\",\"string\"],\"default\":null}," +
"{\"name\":\"age\",\"type\":[\"null\",\"int\"],\"default\":null}," +
"{\"name\":\"ts\",\"type\":[\"null\",{\"type\":\"long\",\"logicalType\":\"timestamp-millis\"}],\"default\":null}," +
"{\"name\":\"partition\",\"type\":[\"null\",\"string\"],\"default\":null}]}"
val tableAvroSchema = new Schema.Parser().parse(avroSchemaString)
val (requiredAvroSchema, requiredStructSchema) =
MergeOnReadSnapshotRelation.getRequiredSchema(tableAvroSchema, Array("ts"))
assertEquals("timestamp-millis",
requiredAvroSchema.getField("ts").schema().getTypes.get(1).getLogicalType.getName)
assertEquals(TimestampType, requiredStructSchema.fields(0).dataType)
}
}

View File

@@ -833,4 +833,48 @@ class TestCOWDataSource extends HoodieClientTestBase {
assertEquals(inputDF2.sort("_row_key").select("shortDecimal").collect().map(_.getDecimal(0).toPlainString).mkString(","),
readResult.sort("_row_key").select("shortDecimal").collect().map(_.getDecimal(0).toPlainString).mkString(","))
}
@Test
def testHoodieBaseFileOnlyViewRelation(): Unit = {
val _spark = spark
import _spark.implicits._
val df = Seq((1, "z3", 30, "v1", "2018-09-23"), (2, "z3", 35, "v1", "2018-09-24"))
.toDF("id", "name", "age", "ts", "data_date")
df.write.format("hudi")
.options(commonOpts)
.option(DataSourceWriteOptions.RECORDKEY_FIELD.key, "id")
.option(DataSourceWriteOptions.RECORDKEY_FIELD.key, "id")
.option(DataSourceWriteOptions.RECORDKEY_FIELD.key, "id")
.option("hoodie.insert.shuffle.parallelism", "4")
.option("hoodie.upsert.shuffle.parallelism", "4")
.option("hoodie.bulkinsert.shuffle.parallelism", "2")
.option(DataSourceWriteOptions.RECORDKEY_FIELD.key, "id")
.option(DataSourceWriteOptions.PARTITIONPATH_FIELD.key, "data_date")
.option(DataSourceWriteOptions.PRECOMBINE_FIELD.key, "ts")
.option(DataSourceWriteOptions.KEYGENERATOR_CLASS_NAME.key, "org.apache.hudi.keygen.TimestampBasedKeyGenerator")
.option(TimestampBasedAvroKeyGenerator.Config.TIMESTAMP_TYPE_FIELD_PROP, "DATE_STRING")
.option(TimestampBasedAvroKeyGenerator.Config.TIMESTAMP_OUTPUT_DATE_FORMAT_PROP, "yyyy/MM/dd")
.option(TimestampBasedAvroKeyGenerator.Config.TIMESTAMP_TIMEZONE_FORMAT_PROP, "GMT+8:00")
.option(TimestampBasedAvroKeyGenerator.Config.TIMESTAMP_INPUT_DATE_FORMAT_PROP, "yyyy-MM-dd")
.mode(org.apache.spark.sql.SaveMode.Append)
.save(basePath)
val res = spark.read.format("hudi").load(basePath)
assert(res.count() == 2)
// data_date is the partition field. Persist to the parquet file using the origin values, and read it.
assertTrue(
res.select("data_date").map(_.get(0).toString).collect().sorted.sameElements(
Array("2018-09-23", "2018-09-24")
)
)
assertTrue(
res.select("_hoodie_partition_path").map(_.get(0).toString).collect().sorted.sameElements(
Array("2018/09/23", "2018/09/24")
)
)
}
}

View File

@@ -19,6 +19,7 @@ package org.apache.spark.sql.adapter
import org.apache.hudi.Spark2RowSerDe
import org.apache.hudi.client.utils.SparkRowSerDe
import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
import org.apache.spark.sql.catalyst.expressions.{Expression, Like}
@@ -26,12 +27,14 @@ import org.apache.spark.sql.catalyst.parser.ParserInterface
import org.apache.spark.sql.catalyst.plans.JoinType
import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoTable, Join, LogicalPlan}
import org.apache.spark.sql.catalyst.{AliasIdentifier, TableIdentifier}
import org.apache.spark.sql.execution.datasources.{Spark2ParsePartitionUtil, SparkParsePartitionUtil}
import org.apache.spark.sql.execution.datasources.{FilePartition, PartitionedFile, Spark2ParsePartitionUtil, SparkParsePartitionUtil}
import org.apache.spark.sql.hudi.SparkAdapter
import org.apache.spark.sql.hudi.parser.HoodieSpark2ExtendedSqlParser
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.{Row, SparkSession}
import scala.collection.mutable.ArrayBuffer
/**
* The adapter for spark2.
*/
@@ -86,4 +89,44 @@ class Spark2Adapter extends SparkAdapter {
override def parseMultipartIdentifier(parser: ParserInterface, sqlText: String): Seq[String] = {
throw new IllegalStateException(s"Should not call ParserInterface#parseMultipartIdentifier for spark2")
}
/**
* Combine [[PartitionedFile]] to [[FilePartition]] according to `maxSplitBytes`.
*
* This is a copy of org.apache.spark.sql.execution.datasources.FilePartition#getFilePartitions from Spark 3.2.
* And this will be called only in Spark 2.
*/
override def getFilePartitions(
sparkSession: SparkSession,
partitionedFiles: Seq[PartitionedFile],
maxSplitBytes: Long): Seq[FilePartition] = {
val partitions = new ArrayBuffer[FilePartition]
val currentFiles = new ArrayBuffer[PartitionedFile]
var currentSize = 0L
/** Close the current partition and move to the next. */
def closePartition(): Unit = {
if (currentFiles.nonEmpty) {
// Copy to a new Array.
val newPartition = FilePartition(partitions.size, currentFiles.toArray)
partitions += newPartition
}
currentFiles.clear()
currentSize = 0
}
val openCostInBytes = sparkSession.sessionState.conf.filesOpenCostInBytes
// Assign files to partitions using "Next Fit Decreasing"
partitionedFiles.foreach { file =>
if (currentSize + file.length > maxSplitBytes) {
closePartition()
}
// Add the given file to the current partition.
currentSize += file.length + openCostInBytes
currentFiles += file
}
closePartition()
partitions.toSeq
}
}

View File

@@ -20,6 +20,7 @@ package org.apache.spark.sql.adapter
import org.apache.hudi.Spark3RowSerDe
import org.apache.hudi.client.utils.SparkRowSerDe
import org.apache.hudi.spark3.internal.ReflectUtil
import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
@@ -29,9 +30,8 @@ import org.apache.spark.sql.catalyst.plans.JoinType
import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoStatement, Join, JoinHint, LogicalPlan}
import org.apache.spark.sql.catalyst.{AliasIdentifier, TableIdentifier}
import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._
import org.apache.spark.sql.connector.catalog.Table
import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
import org.apache.spark.sql.execution.datasources.{LogicalRelation, Spark3ParsePartitionUtil, SparkParsePartitionUtil}
import org.apache.spark.sql.execution.datasources.{Spark3ParsePartitionUtil, SparkParsePartitionUtil}
import org.apache.spark.sql.execution.datasources.{FilePartition, PartitionedFile}
import org.apache.spark.sql.hudi.SparkAdapter
import org.apache.spark.sql.internal.SQLConf
@@ -94,4 +94,14 @@ class Spark3Adapter extends SparkAdapter {
override def parseMultipartIdentifier(parser: ParserInterface, sqlText: String): Seq[String] = {
parser.parseMultipartIdentifier(sqlText)
}
/**
* Combine [[PartitionedFile]] to [[FilePartition]] according to `maxSplitBytes`.
*/
override def getFilePartitions(
sparkSession: SparkSession,
partitionedFiles: Seq[PartitionedFile],
maxSplitBytes: Long): Seq[FilePartition] = {
FilePartition.getFilePartitions(sparkSession, partitionedFiles, maxSplitBytes)
}
}