diff --git a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/AvroConversionUtils.scala b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/AvroConversionUtils.scala index 5b87fee14..d5a287233 100644 --- a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/AvroConversionUtils.scala +++ b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/AvroConversionUtils.scala @@ -117,7 +117,7 @@ object AvroConversionUtils { def buildAvroRecordBySchema(record: IndexedRecord, requiredSchema: Schema, - requiredPos: List[Int], + requiredPos: Seq[Int], recordBuilder: GenericRecordBuilder): GenericRecord = { val requiredFields = requiredSchema.getFields.asScala assert(requiredFields.length == requiredPos.length) diff --git a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieSparkUtils.scala b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieSparkUtils.scala index 3e5402565..dc61a5107 100644 --- a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieSparkUtils.scala +++ b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieSparkUtils.scala @@ -38,11 +38,11 @@ import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.encoders.RowEncoder import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Expression, Literal} import org.apache.spark.sql.execution.datasources.{FileStatusCache, InMemoryFileIndex} -import org.apache.spark.sql.functions._ import org.apache.spark.sql.sources._ import org.apache.spark.sql.types.{StringType, StructField, StructType} import org.apache.spark.sql.{DataFrame, SparkSession} +import scala.collection.JavaConverters._ import scala.collection.JavaConverters.asScalaBufferConverter object HoodieSparkUtils extends SparkAdapterSupport { @@ -293,4 +293,30 @@ object HoodieSparkUtils extends SparkAdapterSupport { s"${tableSchema.fieldNames.mkString(",")}") AttributeReference(columnName, field.get.dataType, field.get.nullable)() } + + def getRequiredSchema(tableAvroSchema: Schema, requiredColumns: Array[String]): (Schema, StructType) = { + // First get the required avro-schema, then convert the avro-schema to spark schema. + val name2Fields = tableAvroSchema.getFields.asScala.map(f => f.name() -> f).toMap + // Here have to create a new Schema.Field object + // to prevent throwing exceptions like "org.apache.avro.AvroRuntimeException: Field already used". + val requiredFields = requiredColumns.map(c => name2Fields(c)) + .map(f => new Schema.Field(f.name(), f.schema(), f.doc(), f.defaultVal(), f.order())).toList + val requiredAvroSchema = Schema.createRecord(tableAvroSchema.getName, tableAvroSchema.getDoc, + tableAvroSchema.getNamespace, tableAvroSchema.isError, requiredFields.asJava) + val requiredStructSchema = AvroConversionUtils.convertAvroSchemaToStructType(requiredAvroSchema) + (requiredAvroSchema, requiredStructSchema) + } + + def toAttribute(tableSchema: StructType): Seq[AttributeReference] = { + tableSchema.map { field => + AttributeReference(field.name, field.dataType, field.nullable, field.metadata)() + } + } + + def collectFieldIndexes(projectedSchema: StructType, originalSchema: StructType): Seq[Int] = { + val nameToIndex = originalSchema.fields.zipWithIndex.map{ case (field, index) => + field.name -> index + }.toMap + projectedSchema.map(field => nameToIndex(field.name)) + } } diff --git a/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/hudi/SparkAdapter.scala b/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/hudi/SparkAdapter.scala index 79c858e06..1264b9a1e 100644 --- a/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/hudi/SparkAdapter.scala +++ b/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/hudi/SparkAdapter.scala @@ -19,6 +19,7 @@ package org.apache.spark.sql.hudi import org.apache.hudi.client.utils.SparkRowSerDe + import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder import org.apache.spark.sql.catalyst.expressions.Expression @@ -26,7 +27,7 @@ import org.apache.spark.sql.catalyst.parser.ParserInterface import org.apache.spark.sql.catalyst.plans.JoinType import org.apache.spark.sql.catalyst.plans.logical.{Join, LogicalPlan} import org.apache.spark.sql.catalyst.{AliasIdentifier, TableIdentifier} -import org.apache.spark.sql.execution.datasources.SparkParsePartitionUtil +import org.apache.spark.sql.execution.datasources.{FilePartition, PartitionedFile, SparkParsePartitionUtil} import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.{Row, SparkSession} @@ -92,4 +93,10 @@ trait SparkAdapter extends Serializable { * ParserInterface#parseMultipartIdentifier is supported since spark3, for spark2 this should not be called. */ def parseMultipartIdentifier(parser: ParserInterface, sqlText: String): Seq[String] + + /** + * Combine [[PartitionedFile]] to [[FilePartition]] according to `maxSplitBytes`. + */ + def getFilePartitions(sparkSession: SparkSession, partitionedFiles: Seq[PartitionedFile], + maxSplitBytes: Long): Seq[FilePartition] } diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/BaseFileOnlyViewRelation.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/BaseFileOnlyViewRelation.scala new file mode 100644 index 000000000..7c3f99c9e --- /dev/null +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/BaseFileOnlyViewRelation.scala @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi + +import org.apache.hudi.common.table.HoodieTableMetaClient +import org.apache.hudi.common.table.TableSchemaResolver + +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.avro.SchemaConverters +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.Literal +import org.apache.spark.sql.execution.datasources.{PartitionedFile, _} +import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat +import org.apache.spark.sql.{Row, SQLContext} +import org.apache.spark.sql.sources.{BaseRelation, Filter, PrunedFilteredScan} +import org.apache.spark.sql.types.{BooleanType, StructType} + +import scala.util.Try + +/** + * The implement of [[BaseRelation]], which is used to respond to query that only touches the base files(Parquet), + * like query COW tables in Snapshot-Query and Read_Optimized mode and MOR tables in Read_Optimized mode. + */ +class BaseFileOnlyViewRelation( + sqlContext: SQLContext, + metaClient: HoodieTableMetaClient, + optParams: Map[String, String], + userSchema: Option[StructType] + ) extends HoodieBaseRelation(sqlContext, metaClient, optParams, userSchema) with SparkAdapterSupport { + + private val fileIndex = HoodieFileIndex(sparkSession, + metaClient, + userSchema, + optParams, + FileStatusCache.getOrCreate(sqlContext.sparkSession) + ) + + override def buildScan(requiredColumns: Array[String], filters: Array[Filter]): RDD[Row] = { + sqlContext.sparkSession.sessionState.conf.setConfString("spark.sql.parquet.enableVectorizedReader", "false") + + val filterExpressions = HoodieSparkUtils.convertToCatalystExpressions(filters, tableStructSchema) + .getOrElse(Literal(true, BooleanType)) + val (partitionFilters, dataFilters) = { + val splited = filters.map { filter => + HoodieDataSourceHelper.splitPartitionAndDataPredicates( + sparkSession, filterExpressions, partitionColumns) + } + (splited.flatMap(_._1), splited.flatMap(_._2)) + } + + val partitionFiles = fileIndex.listFiles(partitionFilters, dataFilters).flatMap { partition => + partition.files.flatMap { file => + HoodieDataSourceHelper.splitFiles( + sparkSession = sparkSession, + file = file, + partitionValues = partition.values + ) + } + } + val emptyPartitionFiles = partitionFiles.map{ f => + PartitionedFile(InternalRow.empty, f.filePath, f.start, f.length) + } + val maxSplitBytes = sparkSession.sessionState.conf.filesMaxPartitionBytes + val filePartitions = sparkAdapter.getFilePartitions(sparkSession, emptyPartitionFiles, maxSplitBytes) + + val requiredSchemaParquetReader = HoodieDataSourceHelper.buildHoodieParquetReader( + sparkSession = sparkSession, + dataSchema = tableStructSchema, + partitionSchema = StructType(Nil), + requiredSchema = tableStructSchema, + filters = filters, + options = optParams, + hadoopConf = sparkSession.sessionState.newHadoopConf() + ) + + new HoodieFileScanRDD(sparkSession, requiredColumns, tableStructSchema, + requiredSchemaParquetReader, filePartitions) + } +} diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DefaultSource.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DefaultSource.scala index 4c9e585c3..3003345ea 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DefaultSource.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DefaultSource.scala @@ -108,6 +108,7 @@ class DefaultSource extends RelationProvider val isBootstrappedTable = metaClient.getTableConfig.getBootstrapBasePath.isPresent val tableType = metaClient.getTableType val queryType = parameters(QUERY_TYPE.key) + val userSchema = if (schema == null) Option.empty[StructType] else Some(schema) log.info(s"Is bootstrapped table => $isBootstrappedTable, tableType is: $tableType, queryType is: $queryType") if (metaClient.getCommitsTimeline.filterCompletedInstants.countInstants() == 0) { @@ -117,20 +118,20 @@ class DefaultSource extends RelationProvider case (COPY_ON_WRITE, QUERY_TYPE_SNAPSHOT_OPT_VAL, false) | (COPY_ON_WRITE, QUERY_TYPE_READ_OPTIMIZED_OPT_VAL, false) | (MERGE_ON_READ, QUERY_TYPE_READ_OPTIMIZED_OPT_VAL, false) => - getBaseFileOnlyView(useHoodieFileIndex, sqlContext, parameters, schema, tablePath, + getBaseFileOnlyView(useHoodieFileIndex, sqlContext, parameters, userSchema, tablePath, readPaths, metaClient) case (COPY_ON_WRITE, QUERY_TYPE_INCREMENTAL_OPT_VAL, _) => - new IncrementalRelation(sqlContext, parameters, schema, metaClient) + new IncrementalRelation(sqlContext, parameters, userSchema, metaClient) case (MERGE_ON_READ, QUERY_TYPE_SNAPSHOT_OPT_VAL, false) => - new MergeOnReadSnapshotRelation(sqlContext, parameters, schema, globPaths, metaClient) + new MergeOnReadSnapshotRelation(sqlContext, parameters, userSchema, globPaths, metaClient) case (MERGE_ON_READ, QUERY_TYPE_INCREMENTAL_OPT_VAL, _) => - new MergeOnReadIncrementalRelation(sqlContext, parameters, schema, metaClient) + new MergeOnReadIncrementalRelation(sqlContext, parameters, userSchema, metaClient) case (_, _, true) => - new HoodieBootstrapRelation(sqlContext, schema, globPaths, metaClient, parameters) + new HoodieBootstrapRelation(sqlContext, userSchema, globPaths, metaClient, parameters) case (_, _, _) => throw new HoodieException(s"Invalid query type : $queryType for tableType: $tableType," + @@ -185,7 +186,7 @@ class DefaultSource extends RelationProvider private def getBaseFileOnlyView(useHoodieFileIndex: Boolean, sqlContext: SQLContext, optParams: Map[String, String], - schema: StructType, + schema: Option[StructType], tablePath: String, extraReadPaths: Seq[String], metaClient: HoodieTableMetaClient): BaseRelation = { @@ -196,17 +197,7 @@ class DefaultSource extends RelationProvider } if (useHoodieFileIndex) { - val fileIndex = HoodieFileIndex(sqlContext.sparkSession, metaClient, - if (schema == null) Option.empty[StructType] else Some(schema), - optParams, FileStatusCache.getOrCreate(sqlContext.sparkSession)) - - HadoopFsRelation( - fileIndex, - fileIndex.partitionSchema, - fileIndex.dataSchema, - bucketSpec = None, - fileFormat = tableFileFormat, - optParams)(sqlContext.sparkSession) + new BaseFileOnlyViewRelation(sqlContext, metaClient, optParams, schema) } else { // this is just effectively RO view only, where `path` can contain a mix of // non-hoodie/hoodie path files. set the path filter up @@ -215,7 +206,7 @@ class DefaultSource extends RelationProvider classOf[HoodieROTablePathFilter], classOf[org.apache.hadoop.fs.PathFilter]) - val specifySchema = if (schema == null) { + val specifySchema = if (schema.isEmpty) { // Load the schema from the commit meta data. // Here we should specify the schema to the latest commit schema since // the table schema evolution. @@ -228,7 +219,7 @@ class DefaultSource extends RelationProvider // with tableSchemaResolver, return None here. } } else { - Some(schema) + schema } // simply return as a regular relation DataSource.apply( diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala new file mode 100644 index 000000000..74aa8a3d9 --- /dev/null +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi + +import org.apache.avro.Schema + +import org.apache.hudi.common.table.{HoodieTableMetaClient, TableSchemaResolver} + +import org.apache.spark.internal.Logging +import org.apache.spark.sql.{SQLContext, SparkSession} +import org.apache.spark.sql.avro.SchemaConverters +import org.apache.spark.sql.sources.{BaseRelation, PrunedFilteredScan} +import org.apache.spark.sql.types.StructType + +import scala.util.Try + +/** + * Hoodie BaseRelation which extends [[PrunedFilteredScan]]. + */ +abstract class HoodieBaseRelation( + val sqlContext: SQLContext, + metaClient: HoodieTableMetaClient, + optParams: Map[String, String], + userSchema: Option[StructType]) + extends BaseRelation with PrunedFilteredScan with Logging{ + + protected val sparkSession: SparkSession = sqlContext.sparkSession + + protected val tableAvroSchema: Schema = { + val schemaUtil = new TableSchemaResolver(metaClient) + Try (schemaUtil.getTableAvroSchema).getOrElse(SchemaConverters.toAvroType(userSchema.get)) + } + + protected val tableStructSchema: StructType = AvroConversionUtils.convertAvroSchemaToStructType(tableAvroSchema) + + protected val partitionColumns: Array[String] = metaClient.getTableConfig.getPartitionFields.orElse(Array.empty) + + override def schema: StructType = userSchema.getOrElse(tableStructSchema) + +} diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBootstrapRDD.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBootstrapRDD.scala index a522db6af..ea997c86a 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBootstrapRDD.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBootstrapRDD.scala @@ -24,12 +24,13 @@ import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.execution.datasources.PartitionedFile import org.apache.spark.sql.types.StructType -import org.apache.spark.sql.vectorized.ColumnarBatch + +import org.apache.hudi.HoodieDataSourceHelper._ class HoodieBootstrapRDD(@transient spark: SparkSession, - dataReadFunction: PartitionedFile => Iterator[Any], - skeletonReadFunction: PartitionedFile => Iterator[Any], - regularReadFunction: PartitionedFile => Iterator[Any], + dataReadFunction: PartitionedFile => Iterator[InternalRow], + skeletonReadFunction: PartitionedFile => Iterator[InternalRow], + regularReadFunction: PartitionedFile => Iterator[InternalRow], dataSchema: StructType, skeletonSchema: StructType, requiredColumns: Array[String], @@ -56,18 +57,18 @@ class HoodieBootstrapRDD(@transient spark: SparkSession, // It is a bootstrap split. Check both skeleton and data files. if (dataSchema.isEmpty) { // No data column to fetch, hence fetch only from skeleton file - partitionedFileIterator = read(bootstrapPartition.split.skeletonFile.get, skeletonReadFunction) + partitionedFileIterator = skeletonReadFunction(bootstrapPartition.split.skeletonFile.get) } else if (skeletonSchema.isEmpty) { // No metadata column to fetch, hence fetch only from data file - partitionedFileIterator = read(bootstrapPartition.split.dataFile, dataReadFunction) + partitionedFileIterator = dataReadFunction(bootstrapPartition.split.dataFile) } else { // Fetch from both data and skeleton file, and merge - val dataFileIterator = read(bootstrapPartition.split.dataFile, dataReadFunction) - val skeletonFileIterator = read(bootstrapPartition.split.skeletonFile.get, skeletonReadFunction) + val dataFileIterator = dataReadFunction(bootstrapPartition.split.dataFile) + val skeletonFileIterator = skeletonReadFunction(bootstrapPartition.split.skeletonFile.get) partitionedFileIterator = merge(skeletonFileIterator, dataFileIterator) } } else { - partitionedFileIterator = read(bootstrapPartition.split.dataFile, regularReadFunction) + partitionedFileIterator = regularReadFunction(bootstrapPartition.split.dataFile) } partitionedFileIterator } @@ -101,19 +102,6 @@ class HoodieBootstrapRDD(@transient spark: SparkSession, mergedRow } - def read(partitionedFile: PartitionedFile, readFileFunction: PartitionedFile => Iterator[Any]) - : Iterator[InternalRow] = { - val fileIterator = readFileFunction(partitionedFile) - - import scala.collection.JavaConverters._ - - val rows = fileIterator.flatMap(_ match { - case r: InternalRow => Seq(r) - case b: ColumnarBatch => b.rowIterator().asScala - }) - rows - } - override protected def getPartitions: Array[Partition] = { tableState.files.zipWithIndex.map(file => { if (file._1.skeletonFile.isDefined) { diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBootstrapRelation.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBootstrapRelation.scala index b1ab83a94..e9ce0f1ae 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBootstrapRelation.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBootstrapRelation.scala @@ -52,7 +52,7 @@ import scala.collection.JavaConverters._ * @param optParams DataSource options passed by the user */ class HoodieBootstrapRelation(@transient val _sqlContext: SQLContext, - val userSchema: StructType, + val userSchema: Option[StructType], val globPaths: Option[Seq[Path]], val metaClient: HoodieTableMetaClient, val optParams: Map[String, String]) extends BaseRelation @@ -107,37 +107,35 @@ class HoodieBootstrapRelation(@transient val _sqlContext: SQLContext, }) // Prepare readers for reading data file and skeleton files - val dataReadFunction = new ParquetFileFormat() - .buildReaderWithPartitionValues( - sparkSession = _sqlContext.sparkSession, - dataSchema = dataSchema, - partitionSchema = StructType(Seq.empty), - requiredSchema = requiredDataSchema, - filters = if (requiredSkeletonSchema.isEmpty) filters else Seq() , - options = Map.empty, - hadoopConf = _sqlContext.sparkSession.sessionState.newHadoopConf() - ) + val dataReadFunction = HoodieDataSourceHelper.buildHoodieParquetReader( + sparkSession = _sqlContext.sparkSession, + dataSchema = dataSchema, + partitionSchema = StructType(Seq.empty), + requiredSchema = requiredDataSchema, + filters = if (requiredSkeletonSchema.isEmpty) filters else Seq() , + options = optParams, + hadoopConf = _sqlContext.sparkSession.sessionState.newHadoopConf() + ) - val skeletonReadFunction = new ParquetFileFormat() - .buildReaderWithPartitionValues( - sparkSession = _sqlContext.sparkSession, - dataSchema = skeletonSchema, - partitionSchema = StructType(Seq.empty), - requiredSchema = requiredSkeletonSchema, - filters = if (requiredDataSchema.isEmpty) filters else Seq(), - options = Map.empty, - hadoopConf = _sqlContext.sparkSession.sessionState.newHadoopConf() - ) + val skeletonReadFunction = HoodieDataSourceHelper.buildHoodieParquetReader( + sparkSession = _sqlContext.sparkSession, + dataSchema = skeletonSchema, + partitionSchema = StructType(Seq.empty), + requiredSchema = requiredSkeletonSchema, + filters = if (requiredDataSchema.isEmpty) filters else Seq(), + options = optParams, + hadoopConf = _sqlContext.sparkSession.sessionState.newHadoopConf() + ) - val regularReadFunction = new ParquetFileFormat() - .buildReaderWithPartitionValues( - sparkSession = _sqlContext.sparkSession, - dataSchema = fullSchema, - partitionSchema = StructType(Seq.empty), - requiredSchema = requiredColsSchema, - filters = filters, - options = Map.empty, - hadoopConf = _sqlContext.sparkSession.sessionState.newHadoopConf()) + val regularReadFunction = HoodieDataSourceHelper.buildHoodieParquetReader( + sparkSession = _sqlContext.sparkSession, + dataSchema = fullSchema, + partitionSchema = StructType(Seq.empty), + requiredSchema = requiredColsSchema, + filters = filters, + options = optParams, + hadoopConf = _sqlContext.sparkSession.sessionState.newHadoopConf() + ) val rdd = new HoodieBootstrapRDD(_sqlContext.sparkSession, dataReadFunction, skeletonReadFunction, regularReadFunction, requiredDataSchema, requiredSkeletonSchema, requiredColumns, tableState) diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieDataSourceHelper.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieDataSourceHelper.scala new file mode 100644 index 000000000..a04f54621 --- /dev/null +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieDataSourceHelper.scala @@ -0,0 +1,158 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi + +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.FileStatus + +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.{Expression, PredicateHelper, SpecificInternalRow, SubqueryExpression, UnsafeProjection} +import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat +import org.apache.spark.sql.execution.datasources.{FilePartition, PartitionedFile} +import org.apache.spark.sql.sources.Filter +import org.apache.spark.sql.types.StructType +import org.apache.spark.sql.vectorized.ColumnarBatch + +import scala.collection.JavaConverters._ +import scala.collection.mutable.ArrayBuffer + +object HoodieDataSourceHelper extends PredicateHelper { + + /** + * Partition the given condition into two sequence of conjunctive predicates: + * - predicates that can be evaluated using metadata only. + * - other predicates. + */ + def splitPartitionAndDataPredicates( + spark: SparkSession, + condition: Expression, + partitionColumns: Seq[String]): (Seq[Expression], Seq[Expression]) = { + splitConjunctivePredicates(condition).partition( + isPredicateMetadataOnly(spark, _, partitionColumns)) + } + + /** + * Check if condition can be evaluated using only metadata. In Delta, this means the condition + * only references partition columns and involves no subquery. + */ + def isPredicateMetadataOnly( + spark: SparkSession, + condition: Expression, + partitionColumns: Seq[String]): Boolean = { + isPredicatePartitionColumnsOnly(spark, condition, partitionColumns) && + !SubqueryExpression.hasSubquery(condition) + } + + /** + * Does the predicate only contains partition columns? + */ + def isPredicatePartitionColumnsOnly( + spark: SparkSession, + condition: Expression, + partitionColumns: Seq[String]): Boolean = { + val nameEquality = spark.sessionState.analyzer.resolver + condition.references.forall { r => + partitionColumns.exists(nameEquality(r.name, _)) + } + } + + /** + * Wrapper `buildReaderWithPartitionValues` of [[ParquetFileFormat]] + * to deal with [[ColumnarBatch]] when enable parquet vectorized reader if necessary. + */ + def buildHoodieParquetReader( + sparkSession: SparkSession, + dataSchema: StructType, + partitionSchema: StructType, + requiredSchema: StructType, + filters: Seq[Filter], + options: Map[String, String], + hadoopConf: Configuration): (PartitionedFile) => Iterator[InternalRow] = { + + val readParquetFile: PartitionedFile => Iterator[Any] = new ParquetFileFormat().buildReaderWithPartitionValues( + sparkSession = sparkSession, + dataSchema = dataSchema, + partitionSchema = partitionSchema, + requiredSchema = requiredSchema, + filters = filters, + options = options, + hadoopConf = hadoopConf + ) + + file: PartitionedFile => { + val iter = readParquetFile(file) + val rows = iter.flatMap(_ match { + case r: InternalRow => Seq(r) + case b: ColumnarBatch => b.rowIterator().asScala + }) + rows + } + } + + /** + * Extract the required schema from [[InternalRow]] + */ + def extractRequiredSchema( + iter: Iterator[InternalRow], + requiredSchema: StructType, + requiredFieldPos: Seq[Int]): Iterator[InternalRow] = { + val unsafeProjection = UnsafeProjection.create(requiredSchema) + val rows = iter.map { row => + unsafeProjection(createInternalRowWithSchema(row, requiredSchema, requiredFieldPos)) + } + rows + } + + /** + * Convert [[InternalRow]] to [[SpecificInternalRow]]. + */ + def createInternalRowWithSchema( + row: InternalRow, + schema: StructType, + positions: Seq[Int]): InternalRow = { + val rowToReturn = new SpecificInternalRow(schema) + var curIndex = 0 + schema.zip(positions).foreach { case (field, pos) => + val curField = if (row.isNullAt(pos)) { + null + } else { + row.get(pos, field.dataType) + } + rowToReturn.update(curIndex, curField) + curIndex += 1 + } + rowToReturn + } + + + def splitFiles( + sparkSession: SparkSession, + file: FileStatus, + partitionValues: InternalRow): Seq[PartitionedFile] = { + val filePath = file.getPath + val maxSplitBytes = sparkSession.sessionState.conf.filesMaxPartitionBytes + (0L until file.getLen by maxSplitBytes).map { offset => + val remaining = file.getLen - offset + val size = if (remaining > maxSplitBytes) maxSplitBytes else remaining + PartitionedFile(partitionValues, filePath.toUri.toString, offset, size) + } + } + +} diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieFileScanRDD.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieFileScanRDD.scala new file mode 100644 index 000000000..9f2d7d9e0 --- /dev/null +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieFileScanRDD.scala @@ -0,0 +1,109 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi + +import org.apache.spark.{Partition, TaskContext} +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow} +import org.apache.spark.sql.execution.QueryExecutionException +import org.apache.spark.sql.{Row, SparkSession} +import org.apache.spark.sql.execution.datasources.{FilePartition, PartitionedFile, SchemaColumnConvertNotSupportedException} +import org.apache.spark.sql.types.StructType + +/** + * Similar to [[org.apache.spark.sql.execution.datasources.FileScanRDD]]. + * + * This class will extract the fields needed according to [[requiredColumns]] and + * return iterator of [[org.apache.spark.sql.Row]] directly. + */ +class HoodieFileScanRDD( + @transient private val sparkSession: SparkSession, + requiredColumns: Array[String], + schema: StructType, + readFunction: PartitionedFile => Iterator[InternalRow], + @transient val filePartitions: Seq[FilePartition]) + extends RDD[Row](sparkSession.sparkContext, Nil) { + + private val requiredSchema = { + val nameToStructField = schema.map(field => (field.name, field)).toMap + StructType(requiredColumns.map(nameToStructField)) + } + + private val requiredFieldPos = HoodieSparkUtils.collectFieldIndexes(requiredSchema, schema) + + override def compute(split: Partition, context: TaskContext): Iterator[Row] = { + val iterator = new Iterator[Object] with AutoCloseable { + + private[this] val files = split.asInstanceOf[FilePartition].files.toIterator + private[this] var currentFile: PartitionedFile = null + private[this] var currentIterator: Iterator[Object] = null + + override def hasNext: Boolean = { + (currentIterator != null && currentIterator.hasNext) || nextIterator() + } + + def next(): Object = { + currentIterator.next() + } + + /** Advances to the next file. Returns true if a new non-empty iterator is available. */ + private def nextIterator(): Boolean = { + if (files.hasNext) { + currentFile = files.next() + + logInfo(s"Reading File $currentFile") + currentIterator = readFunction(currentFile) + + try { + hasNext + } catch { + case e: SchemaColumnConvertNotSupportedException => + val message = "Parquet column cannot be converted in " + + s"file ${currentFile.filePath}. Column: ${e.getColumn}, " + + s"Expected: ${e.getLogicalType}, Found: ${e.getPhysicalType}" + throw new QueryExecutionException(message, e) + + case e => throw e + } + } else { + currentFile = null + false + } + } + + override def close(): Unit = {} + } + + // Register an on-task-completion callback to close the input stream. + context.addTaskCompletionListener[Unit](_ => iterator.close()) + + // extract required columns from row + val iterAfterExtract = HoodieDataSourceHelper.extractRequiredSchema( + iterator.asInstanceOf[Iterator[InternalRow]], + requiredSchema, + requiredFieldPos) + + // convert InternalRow to Row and return + val converter = CatalystTypeConverters.createToScalaConverter(requiredSchema) + iterAfterExtract.map(converter(_).asInstanceOf[Row]) + } + + override protected def getPartitions: Array[Partition] = filePartitions.toArray + +} diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieMergeOnReadRDD.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieMergeOnReadRDD.scala index 3257ecf1f..4c3d30bd4 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieMergeOnReadRDD.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieMergeOnReadRDD.scala @@ -20,19 +20,22 @@ package org.apache.hudi import org.apache.avro.Schema import org.apache.avro.generic.{GenericRecord, GenericRecordBuilder} + import org.apache.hadoop.conf.Configuration + +import org.apache.hudi.HoodieDataSourceHelper._ import org.apache.hudi.common.fs.FSUtils import org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner import org.apache.hudi.config.HoodiePayloadConfig import org.apache.hudi.exception.HoodieException import org.apache.hudi.hadoop.config.HoodieRealtimeConfig import org.apache.hudi.hadoop.utils.HoodieInputFormatUtils.HOODIE_RECORD_KEY_COL_POS + import org.apache.spark.rdd.RDD import org.apache.spark.sql.avro.{HoodieAvroDeserializer, HoodieAvroSerializer} import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.sql.catalyst.expressions.{SpecificInternalRow, UnsafeProjection} +import org.apache.spark.sql.catalyst.expressions.UnsafeProjection import org.apache.spark.sql.execution.datasources.PartitionedFile -import org.apache.spark.sql.vectorized.ColumnarBatch import org.apache.spark.{Partition, SerializableWritable, SparkContext, TaskContext} import java.io.Closeable @@ -46,8 +49,8 @@ case class HoodieMergeOnReadPartition(index: Int, split: HoodieMergeOnReadFileSp class HoodieMergeOnReadRDD(@transient sc: SparkContext, @transient config: Configuration, - fullSchemaFileReader: PartitionedFile => Iterator[Any], - requiredSchemaFileReader: PartitionedFile => Iterator[Any], + fullSchemaFileReader: PartitionedFile => Iterator[InternalRow], + requiredSchemaFileReader: PartitionedFile => Iterator[InternalRow], tableState: HoodieMergeOnReadTableState) extends RDD[InternalRow](sc, Nil) { @@ -61,26 +64,33 @@ class HoodieMergeOnReadRDD(@transient sc: SparkContext, } else { new Properties() } + + private val requiredSchema = tableState.requiredStructSchema + + private val requiredFieldPosition = HoodieSparkUtils.collectFieldIndexes(requiredSchema, + tableState.tableStructSchema + ) + override def compute(split: Partition, context: TaskContext): Iterator[InternalRow] = { val mergeOnReadPartition = split.asInstanceOf[HoodieMergeOnReadPartition] val iter = mergeOnReadPartition.split match { case dataFileOnlySplit if dataFileOnlySplit.logPaths.isEmpty => - val rows = read(dataFileOnlySplit.dataFile.get, requiredSchemaFileReader) - extractRequiredSchema(rows) + val rows = requiredSchemaFileReader(dataFileOnlySplit.dataFile.get) + extractRequiredSchema(rows, requiredSchema, requiredFieldPosition) case logFileOnlySplit if logFileOnlySplit.dataFile.isEmpty => logFileIterator(logFileOnlySplit, getConfig) case skipMergeSplit if skipMergeSplit.mergeType .equals(DataSourceReadOptions.REALTIME_SKIP_MERGE_OPT_VAL) => skipMergeFileIterator( skipMergeSplit, - read(skipMergeSplit.dataFile.get, requiredSchemaFileReader), + requiredSchemaFileReader(skipMergeSplit.dataFile.get), getConfig ) case payloadCombineSplit if payloadCombineSplit.mergeType .equals(DataSourceReadOptions.REALTIME_PAYLOAD_COMBINE_OPT_VAL) => payloadCombineFileIterator( payloadCombineSplit, - read(payloadCombineSplit.dataFile.get, fullSchemaFileReader), + fullSchemaFileReader(payloadCombineSplit.dataFile.get), getConfig ) case _ => throw new HoodieException(s"Unable to select an Iterator to read the Hoodie MOR File Split for " + @@ -112,36 +122,11 @@ class HoodieMergeOnReadRDD(@transient sc: SparkContext, } } - private def read(partitionedFile: PartitionedFile, - readFileFunction: PartitionedFile => Iterator[Any]): Iterator[InternalRow] = { - val fileIterator = readFileFunction(partitionedFile) - val rows = fileIterator.flatMap(_ match { - case r: InternalRow => Seq(r) - case b: ColumnarBatch => b.rowIterator().asScala - }) - rows - } - - private def extractRequiredSchema(iter: Iterator[InternalRow]): Iterator[InternalRow] = { - val tableAvroSchema = new Schema.Parser().parse(tableState.tableAvroSchema) - val requiredAvroSchema = new Schema.Parser().parse(tableState.requiredAvroSchema) - val requiredFieldPosition = tableState.requiredStructSchema - .map(f => tableAvroSchema.getField(f.name).pos()).toList - val unsafeProjection = UnsafeProjection.create(tableState.requiredStructSchema) - val rows = iter.map { row => - unsafeProjection(createRowWithRequiredSchema(row, requiredFieldPosition)) - } - rows - } - private def logFileIterator(split: HoodieMergeOnReadFileSplit, config: Configuration): Iterator[InternalRow] = new Iterator[InternalRow] with Closeable { private val tableAvroSchema = new Schema.Parser().parse(tableState.tableAvroSchema) private val requiredAvroSchema = new Schema.Parser().parse(tableState.requiredAvroSchema) - private val requiredFieldPosition = - tableState.requiredStructSchema - .map(f => tableAvroSchema.getField(f.name).pos()).toList private val recordBuilder = new GenericRecordBuilder(requiredAvroSchema) private val deserializer = HoodieAvroDeserializer(requiredAvroSchema, tableState.requiredStructSchema) private val unsafeProjection = UnsafeProjection.create(tableState.requiredStructSchema) @@ -189,9 +174,6 @@ class HoodieMergeOnReadRDD(@transient sc: SparkContext, new Iterator[InternalRow] with Closeable { private val tableAvroSchema = new Schema.Parser().parse(tableState.tableAvroSchema) private val requiredAvroSchema = new Schema.Parser().parse(tableState.requiredAvroSchema) - private val requiredFieldPosition = - tableState.requiredStructSchema - .map(f => tableAvroSchema.getField(f.name).pos()).toList private val recordBuilder = new GenericRecordBuilder(requiredAvroSchema) private val deserializer = HoodieAvroDeserializer(requiredAvroSchema, tableState.requiredStructSchema) private val unsafeProjection = UnsafeProjection.create(tableState.requiredStructSchema) @@ -205,7 +187,7 @@ class HoodieMergeOnReadRDD(@transient sc: SparkContext, override def hasNext: Boolean = { if (baseFileIterator.hasNext) { val curRow = baseFileIterator.next() - recordToLoad = unsafeProjection(createRowWithRequiredSchema(curRow, requiredFieldPosition)) + recordToLoad = unsafeProjection(createInternalRowWithSchema(curRow, requiredSchema, requiredFieldPosition)) true } else { if (logRecordsKeyIterator.hasNext) { @@ -247,9 +229,6 @@ class HoodieMergeOnReadRDD(@transient sc: SparkContext, new Iterator[InternalRow] with Closeable { private val tableAvroSchema = new Schema.Parser().parse(tableState.tableAvroSchema) private val requiredAvroSchema = new Schema.Parser().parse(tableState.requiredAvroSchema) - private val requiredFieldPosition = - tableState.requiredStructSchema - .map(f => tableAvroSchema.getField(f.name).pos()).toList private val serializer = HoodieAvroSerializer(tableState.tableStructSchema, tableAvroSchema, false) private val requiredDeserializer = HoodieAvroDeserializer(requiredAvroSchema, tableState.requiredStructSchema) private val recordBuilder = new GenericRecordBuilder(requiredAvroSchema) @@ -289,7 +268,7 @@ class HoodieMergeOnReadRDD(@transient sc: SparkContext, } } else { // No merge needed, load current row with required schema - recordToLoad = unsafeProjection(createRowWithRequiredSchema(curRow, requiredFieldPosition)) + recordToLoad = unsafeProjection(createInternalRowWithSchema(curRow, requiredSchema, requiredFieldPosition)) true } } else { @@ -339,22 +318,6 @@ class HoodieMergeOnReadRDD(@transient sc: SparkContext, historyAvroRecord, tableAvroSchema, payloadProps) } } - - private def createRowWithRequiredSchema(row: InternalRow, requiredFieldPosition: Seq[Int]): InternalRow = { - val rowToReturn = new SpecificInternalRow(tableState.requiredStructSchema) - val posIterator = requiredFieldPosition.iterator - var curIndex = 0 - tableState.requiredStructSchema.foreach( - f => { - val curPos = posIterator.next() - val curField = if (row.isNullAt(curPos)) null else row.get(curPos, f.dataType) - rowToReturn.update(curIndex, curField) - curIndex = curIndex + 1 - } - ) - rowToReturn - } - } private object HoodieMergeOnReadRDD { diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/IncrementalRelation.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/IncrementalRelation.scala index 14df0cb38..395e1b3a2 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/IncrementalRelation.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/IncrementalRelation.scala @@ -48,7 +48,7 @@ import scala.collection.mutable */ class IncrementalRelation(val sqlContext: SQLContext, val optParams: Map[String, String], - val userSchema: StructType, + val userSchema: Option[StructType], val metaClient: HoodieTableMetaClient) extends BaseRelation with TableScan { private val log = LogManager.getLogger(classOf[IncrementalRelation]) diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadIncrementalRelation.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadIncrementalRelation.scala index 01b480a11..db9ebeff4 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadIncrementalRelation.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadIncrementalRelation.scala @@ -25,9 +25,12 @@ import org.apache.hudi.hadoop.utils.HoodieInputFormatUtils.listAffectedFilesForC import org.apache.hudi.hadoop.utils.HoodieInputFormatUtils.getCommitMetadata import org.apache.hudi.hadoop.utils.HoodieInputFormatUtils.getWritePartitionPaths import org.apache.hudi.hadoop.utils.HoodieRealtimeRecordReaderUtils.getMaxCompactionMemoryInBytes + import org.apache.hadoop.fs.{GlobPattern, Path} import org.apache.hadoop.mapred.JobConf + import org.apache.log4j.LogManager + import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.execution.datasources.PartitionedFile @@ -43,15 +46,15 @@ import scala.collection.JavaConversions._ * Relation, that implements the Hoodie incremental view for Merge On Read table. * */ -class MergeOnReadIncrementalRelation(val sqlContext: SQLContext, +class MergeOnReadIncrementalRelation(sqlContext: SQLContext, val optParams: Map[String, String], - val userSchema: StructType, + val userSchema: Option[StructType], val metaClient: HoodieTableMetaClient) - extends BaseRelation with PrunedFilteredScan { + extends HoodieBaseRelation(sqlContext, metaClient, optParams, userSchema) { - private val log = LogManager.getLogger(classOf[MergeOnReadIncrementalRelation]) private val conf = sqlContext.sparkContext.hadoopConfiguration private val jobConf = new JobConf(conf) + private val commitTimeline = metaClient.getCommitsAndCompactionTimeline.filterCompletedInstants() if (commitTimeline.empty()) { throw new HoodieException("No instants to incrementally pull") @@ -72,13 +75,13 @@ class MergeOnReadIncrementalRelation(val sqlContext: SQLContext, private val commitsTimelineToReturn = commitTimeline.findInstantsInRange( optParams(DataSourceReadOptions.BEGIN_INSTANTTIME.key), optParams.getOrElse(DataSourceReadOptions.END_INSTANTTIME.key, lastInstant.getTimestamp)) - log.debug(s"${commitsTimelineToReturn.getInstants.iterator().toList.map(f => f.toString).mkString(",")}") + logDebug(s"${commitsTimelineToReturn.getInstants.iterator().toList.map(f => f.toString).mkString(",")}") private val commitsToReturn = commitsTimelineToReturn.getInstants.iterator().toList - private val schemaResolver = new TableSchemaResolver(metaClient) - private val tableAvroSchema = schemaResolver.getTableAvroSchema - private val tableStructSchema = AvroConversionUtils.convertAvroSchemaToStructType(tableAvroSchema) + private val maxCompactionMemoryInBytes = getMaxCompactionMemoryInBytes(jobConf) + private val fileIndex = if (commitsToReturn.isEmpty) List() else buildFileIndex() + private val preCombineField = { val preCombineFieldFromTableConfig = metaClient.getTableConfig.getPreCombineField if (preCombineFieldFromTableConfig != null) { @@ -89,7 +92,6 @@ class MergeOnReadIncrementalRelation(val sqlContext: SQLContext, optParams.get(DataSourceReadOptions.READ_PRE_COMBINE_FIELD.key) } } - override def schema: StructType = tableStructSchema override def needConversion: Boolean = false @@ -108,8 +110,8 @@ class MergeOnReadIncrementalRelation(val sqlContext: SQLContext, if (fileIndex.isEmpty) { sqlContext.sparkContext.emptyRDD[Row] } else { - log.debug(s"buildScan requiredColumns = ${requiredColumns.mkString(",")}") - log.debug(s"buildScan filters = ${filters.mkString(",")}") + logDebug(s"buildScan requiredColumns = ${requiredColumns.mkString(",")}") + logDebug(s"buildScan filters = ${filters.mkString(",")}") // config to ensure the push down filter for parquet will be applied. sqlContext.sparkSession.sessionState.conf.setConfString("spark.sql.parquet.filterPushdown", "true") sqlContext.sparkSession.sessionState.conf.setConfString("spark.sql.parquet.recordLevelFilter.enabled", "true") @@ -121,7 +123,7 @@ class MergeOnReadIncrementalRelation(val sqlContext: SQLContext, filters :+ isNotNullFilter :+ largerThanFilter :+ lessThanFilter } val (requiredAvroSchema, requiredStructSchema) = - MergeOnReadSnapshotRelation.getRequiredSchema(tableAvroSchema, requiredColumns) + HoodieSparkUtils.getRequiredSchema(tableAvroSchema, requiredColumns) val hoodieTableState = HoodieMergeOnReadTableState( tableStructSchema, @@ -132,7 +134,7 @@ class MergeOnReadIncrementalRelation(val sqlContext: SQLContext, preCombineField, Option.empty ) - val fullSchemaParquetReader = new ParquetFileFormat().buildReaderWithPartitionValues( + val fullSchemaParquetReader = HoodieDataSourceHelper.buildHoodieParquetReader( sparkSession = sqlContext.sparkSession, dataSchema = tableStructSchema, partitionSchema = StructType(Nil), @@ -141,7 +143,8 @@ class MergeOnReadIncrementalRelation(val sqlContext: SQLContext, options = optParams, hadoopConf = sqlContext.sparkSession.sessionState.newHadoopConf() ) - val requiredSchemaParquetReader = new ParquetFileFormat().buildReaderWithPartitionValues( + + val requiredSchemaParquetReader = HoodieDataSourceHelper.buildHoodieParquetReader( sparkSession = sqlContext.sparkSession, dataSchema = tableStructSchema, partitionSchema = StructType(Nil), @@ -173,7 +176,7 @@ class MergeOnReadIncrementalRelation(val sqlContext: SQLContext, ).toList val latestCommit = fsView.getLastInstant.get.getTimestamp if (log.isDebugEnabled) { - fileGroups.foreach(f => log.debug(s"current file group id: " + + fileGroups.foreach(f => logDebug(s"current file group id: " + s"${f.getFileGroupId} and file slices ${f.getLatestFileSlice.get.toString}")) } diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadSnapshotRelation.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadSnapshotRelation.scala index 2829b4bc1..6ff49823a 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadSnapshotRelation.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadSnapshotRelation.scala @@ -18,14 +18,15 @@ package org.apache.hudi -import org.apache.avro.Schema import org.apache.hudi.common.model.HoodieLogFile import org.apache.hudi.common.table.{HoodieTableMetaClient, TableSchemaResolver} import org.apache.hudi.common.table.view.HoodieTableFileSystemView import org.apache.hudi.hadoop.utils.HoodieRealtimeInputFormatUtils import org.apache.hudi.hadoop.utils.HoodieRealtimeRecordReaderUtils.getMaxCompactionMemoryInBytes + import org.apache.hadoop.fs.Path import org.apache.hadoop.mapred.JobConf + import org.apache.spark.internal.Logging import org.apache.spark.rdd.RDD import org.apache.spark.sql.avro.SchemaConverters @@ -55,32 +56,22 @@ case class HoodieMergeOnReadTableState(tableStructSchema: StructType, preCombineField: Option[String], recordKeyFieldOpt: Option[String]) -class MergeOnReadSnapshotRelation(val sqlContext: SQLContext, - val optParams: Map[String, String], - val userSchema: StructType, +class MergeOnReadSnapshotRelation(sqlContext: SQLContext, + optParams: Map[String, String], + val userSchema: Option[StructType], val globPaths: Option[Seq[Path]], val metaClient: HoodieTableMetaClient) - extends BaseRelation with PrunedFilteredScan with Logging { + extends HoodieBaseRelation(sqlContext, metaClient, optParams, userSchema) { private val conf = sqlContext.sparkContext.hadoopConfiguration private val jobConf = new JobConf(conf) - // use schema from latest metadata, if not present, read schema from the data file - private val schemaResolver = new TableSchemaResolver(metaClient) - private lazy val tableAvroSchema = { - try { - schemaResolver.getTableAvroSchema - } catch { - case _: Throwable => // If there is no commit in the table, we cann't get the schema - // with schemaUtil, use the userSchema instead. - SchemaConverters.toAvroType(userSchema) - } - } - private lazy val tableStructSchema = AvroConversionUtils.convertAvroSchemaToStructType(tableAvroSchema) private val mergeType = optParams.getOrElse( DataSourceReadOptions.REALTIME_MERGE.key, DataSourceReadOptions.REALTIME_MERGE.defaultValue) + private val maxCompactionMemoryInBytes = getMaxCompactionMemoryInBytes(jobConf) + private val preCombineField = { val preCombineFieldFromTableConfig = metaClient.getTableConfig.getPreCombineField if (preCombineFieldFromTableConfig != null) { @@ -95,7 +86,6 @@ class MergeOnReadSnapshotRelation(val sqlContext: SQLContext, if (!metaClient.getTableConfig.populateMetaFields()) { recordKeyFieldOpt = Option(metaClient.getTableConfig.getRecordKeyFieldProp) } - override def schema: StructType = tableStructSchema override def needConversion: Boolean = false @@ -107,7 +97,7 @@ class MergeOnReadSnapshotRelation(val sqlContext: SQLContext, log.debug(s" buildScan filters = ${filters.mkString(",")}") val (requiredAvroSchema, requiredStructSchema) = - MergeOnReadSnapshotRelation.getRequiredSchema(tableAvroSchema, requiredColumns) + HoodieSparkUtils.getRequiredSchema(tableAvroSchema, requiredColumns) val fileIndex = buildFileIndex(filters) val hoodieTableState = HoodieMergeOnReadTableState( tableStructSchema, @@ -118,7 +108,7 @@ class MergeOnReadSnapshotRelation(val sqlContext: SQLContext, preCombineField, recordKeyFieldOpt ) - val fullSchemaParquetReader = new ParquetFileFormat().buildReaderWithPartitionValues( + val fullSchemaParquetReader = HoodieDataSourceHelper.buildHoodieParquetReader( sparkSession = sqlContext.sparkSession, dataSchema = tableStructSchema, partitionSchema = StructType(Nil), @@ -127,7 +117,8 @@ class MergeOnReadSnapshotRelation(val sqlContext: SQLContext, options = optParams, hadoopConf = sqlContext.sparkSession.sessionState.newHadoopConf() ) - val requiredSchemaParquetReader = new ParquetFileFormat().buildReaderWithPartitionValues( + + val requiredSchemaParquetReader = HoodieDataSourceHelper.buildHoodieParquetReader( sparkSession = sqlContext.sparkSession, dataSchema = tableStructSchema, partitionSchema = StructType(Nil), @@ -252,14 +243,4 @@ object MergeOnReadSnapshotRelation { path.toUri.toString } - def getRequiredSchema(tableAvroSchema: Schema, requiredColumns: Array[String]): (Schema, StructType) = { - // First get the required avro-schema, then convert the avro-schema to spark schema. - val name2Fields = tableAvroSchema.getFields.asScala.map(f => f.name() -> f).toMap - val requiredFields = requiredColumns.map(c => name2Fields(c)) - .map(f => new Schema.Field(f.name(), f.schema(), f.doc(), f.defaultVal(), f.order())).toList - val requiredAvroSchema = Schema.createRecord(tableAvroSchema.getName, tableAvroSchema.getDoc, - tableAvroSchema.getNamespace, tableAvroSchema.isError, requiredFields.asJava) - val requiredStructSchema = AvroConversionUtils.convertAvroSchemaToStructType(requiredAvroSchema) - (requiredAvroSchema, requiredStructSchema) - } } diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/streaming/HoodieStreamSource.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/streaming/HoodieStreamSource.scala index ffe9b6498..4e46233c3 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/streaming/HoodieStreamSource.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/streaming/HoodieStreamSource.scala @@ -161,12 +161,12 @@ class HoodieStreamSource( val rdd = tableType match { case HoodieTableType.COPY_ON_WRITE => val serDe = sparkAdapter.createSparkRowSerDe(RowEncoder(schema)) - new IncrementalRelation(sqlContext, incParams, schema, metaClient) + new IncrementalRelation(sqlContext, incParams, Some(schema), metaClient) .buildScan() .map(serDe.serializeRow) case HoodieTableType.MERGE_ON_READ => val requiredColumns = schema.fields.map(_.name) - new MergeOnReadIncrementalRelation(sqlContext, incParams, schema, metaClient) + new MergeOnReadIncrementalRelation(sqlContext, incParams, Some(schema), metaClient) .buildScan(requiredColumns, Array.empty[Filter]) .asInstanceOf[RDD[InternalRow]] case _ => throw new IllegalArgumentException(s"UnSupport tableType: $tableType") diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieSparkUtils.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieSparkUtils.scala index ad974286a..28cd59cb0 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieSparkUtils.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieSparkUtils.scala @@ -18,18 +18,24 @@ package org.apache.hudi +import org.apache.avro.Schema import org.apache.avro.generic.GenericRecord + import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.Path + import org.apache.hudi.testutils.DataSourceTestUtils -import org.apache.spark.sql.types.StructType + +import org.apache.spark.sql.types.{StructType, TimestampType} import org.apache.spark.sql.{Row, SparkSession} + import org.junit.jupiter.api.Assertions._ import org.junit.jupiter.api.Test import org.junit.jupiter.api.io.TempDir import java.io.File import java.nio.file.Paths + import scala.collection.JavaConverters class TestHoodieSparkUtils { @@ -232,6 +238,29 @@ class TestHoodieSparkUtils { spark.stop() } + @Test + def testGetRequiredSchema(): Unit = { + val avroSchemaString = "{\"type\":\"record\",\"name\":\"record\"," + + "\"fields\":[{\"name\":\"_hoodie_commit_time\",\"type\":[\"null\",\"string\"],\"doc\":\"\",\"default\":null}," + + "{\"name\":\"_hoodie_commit_seqno\",\"type\":[\"null\",\"string\"],\"doc\":\"\",\"default\":null}," + + "{\"name\":\"_hoodie_record_key\",\"type\":[\"null\",\"string\"],\"doc\":\"\",\"default\":null}," + + "{\"name\":\"_hoodie_partition_path\",\"type\":[\"null\",\"string\"],\"doc\":\"\",\"default\":null}," + + "{\"name\":\"_hoodie_file_name\",\"type\":[\"null\",\"string\"],\"doc\":\"\",\"default\":null}," + + "{\"name\":\"uuid\",\"type\":\"string\"},{\"name\":\"name\",\"type\":[\"null\",\"string\"],\"default\":null}," + + "{\"name\":\"age\",\"type\":[\"null\",\"int\"],\"default\":null}," + + "{\"name\":\"ts\",\"type\":[\"null\",{\"type\":\"long\",\"logicalType\":\"timestamp-millis\"}],\"default\":null}," + + "{\"name\":\"partition\",\"type\":[\"null\",\"string\"],\"default\":null}]}" + + val tableAvroSchema = new Schema.Parser().parse(avroSchemaString) + + val (requiredAvroSchema, requiredStructSchema) = + HoodieSparkUtils.getRequiredSchema(tableAvroSchema, Array("ts")) + + assertEquals("timestamp-millis", + requiredAvroSchema.getField("ts").schema().getTypes.get(1).getLogicalType.getName) + assertEquals(TimestampType, requiredStructSchema.fields(0).dataType) + } + def convertRowListToSeq(inputList: java.util.List[Row]): Seq[Row] = JavaConverters.asScalaIteratorConverter(inputList.iterator).asScala.toSeq } diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestMergeOnReadSnapshotRelation.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestMergeOnReadSnapshotRelation.scala deleted file mode 100644 index 80a883a00..000000000 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestMergeOnReadSnapshotRelation.scala +++ /dev/null @@ -1,50 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hudi - -import org.apache.avro.Schema -import org.apache.spark.sql.types.TimestampType -import org.junit.jupiter.api.Assertions.assertEquals -import org.junit.jupiter.api.Test - -class TestMergeOnReadSnapshotRelation { - - @Test - def testGetRequiredSchema(): Unit = { - val avroSchemaString = "{\"type\":\"record\",\"name\":\"record\"," + - "\"fields\":[{\"name\":\"_hoodie_commit_time\",\"type\":[\"null\",\"string\"],\"doc\":\"\",\"default\":null}," + - "{\"name\":\"_hoodie_commit_seqno\",\"type\":[\"null\",\"string\"],\"doc\":\"\",\"default\":null}," + - "{\"name\":\"_hoodie_record_key\",\"type\":[\"null\",\"string\"],\"doc\":\"\",\"default\":null}," + - "{\"name\":\"_hoodie_partition_path\",\"type\":[\"null\",\"string\"],\"doc\":\"\",\"default\":null}," + - "{\"name\":\"_hoodie_file_name\",\"type\":[\"null\",\"string\"],\"doc\":\"\",\"default\":null}," + - "{\"name\":\"uuid\",\"type\":\"string\"},{\"name\":\"name\",\"type\":[\"null\",\"string\"],\"default\":null}," + - "{\"name\":\"age\",\"type\":[\"null\",\"int\"],\"default\":null}," + - "{\"name\":\"ts\",\"type\":[\"null\",{\"type\":\"long\",\"logicalType\":\"timestamp-millis\"}],\"default\":null}," + - "{\"name\":\"partition\",\"type\":[\"null\",\"string\"],\"default\":null}]}" - - val tableAvroSchema = new Schema.Parser().parse(avroSchemaString) - - val (requiredAvroSchema, requiredStructSchema) = - MergeOnReadSnapshotRelation.getRequiredSchema(tableAvroSchema, Array("ts")) - - assertEquals("timestamp-millis", - requiredAvroSchema.getField("ts").schema().getTypes.get(1).getLogicalType.getName) - assertEquals(TimestampType, requiredStructSchema.fields(0).dataType) - } -} diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala index ad6d40e00..e998f7cfa 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala @@ -833,4 +833,48 @@ class TestCOWDataSource extends HoodieClientTestBase { assertEquals(inputDF2.sort("_row_key").select("shortDecimal").collect().map(_.getDecimal(0).toPlainString).mkString(","), readResult.sort("_row_key").select("shortDecimal").collect().map(_.getDecimal(0).toPlainString).mkString(",")) } + + @Test + def testHoodieBaseFileOnlyViewRelation(): Unit = { + val _spark = spark + import _spark.implicits._ + + val df = Seq((1, "z3", 30, "v1", "2018-09-23"), (2, "z3", 35, "v1", "2018-09-24")) + .toDF("id", "name", "age", "ts", "data_date") + + df.write.format("hudi") + .options(commonOpts) + .option(DataSourceWriteOptions.RECORDKEY_FIELD.key, "id") + .option(DataSourceWriteOptions.RECORDKEY_FIELD.key, "id") + .option(DataSourceWriteOptions.RECORDKEY_FIELD.key, "id") + .option("hoodie.insert.shuffle.parallelism", "4") + .option("hoodie.upsert.shuffle.parallelism", "4") + .option("hoodie.bulkinsert.shuffle.parallelism", "2") + .option(DataSourceWriteOptions.RECORDKEY_FIELD.key, "id") + .option(DataSourceWriteOptions.PARTITIONPATH_FIELD.key, "data_date") + .option(DataSourceWriteOptions.PRECOMBINE_FIELD.key, "ts") + .option(DataSourceWriteOptions.KEYGENERATOR_CLASS_NAME.key, "org.apache.hudi.keygen.TimestampBasedKeyGenerator") + .option(TimestampBasedAvroKeyGenerator.Config.TIMESTAMP_TYPE_FIELD_PROP, "DATE_STRING") + .option(TimestampBasedAvroKeyGenerator.Config.TIMESTAMP_OUTPUT_DATE_FORMAT_PROP, "yyyy/MM/dd") + .option(TimestampBasedAvroKeyGenerator.Config.TIMESTAMP_TIMEZONE_FORMAT_PROP, "GMT+8:00") + .option(TimestampBasedAvroKeyGenerator.Config.TIMESTAMP_INPUT_DATE_FORMAT_PROP, "yyyy-MM-dd") + .mode(org.apache.spark.sql.SaveMode.Append) + .save(basePath) + + val res = spark.read.format("hudi").load(basePath) + + assert(res.count() == 2) + + // data_date is the partition field. Persist to the parquet file using the origin values, and read it. + assertTrue( + res.select("data_date").map(_.get(0).toString).collect().sorted.sameElements( + Array("2018-09-23", "2018-09-24") + ) + ) + assertTrue( + res.select("_hoodie_partition_path").map(_.get(0).toString).collect().sorted.sameElements( + Array("2018/09/23", "2018/09/24") + ) + ) + } } diff --git a/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/adapter/Spark2Adapter.scala b/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/adapter/Spark2Adapter.scala index bf1cd2448..94d1c80fc 100644 --- a/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/adapter/Spark2Adapter.scala +++ b/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/adapter/Spark2Adapter.scala @@ -19,6 +19,7 @@ package org.apache.spark.sql.adapter import org.apache.hudi.Spark2RowSerDe import org.apache.hudi.client.utils.SparkRowSerDe + import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder import org.apache.spark.sql.catalyst.expressions.{Expression, Like} @@ -26,12 +27,14 @@ import org.apache.spark.sql.catalyst.parser.ParserInterface import org.apache.spark.sql.catalyst.plans.JoinType import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoTable, Join, LogicalPlan} import org.apache.spark.sql.catalyst.{AliasIdentifier, TableIdentifier} -import org.apache.spark.sql.execution.datasources.{Spark2ParsePartitionUtil, SparkParsePartitionUtil} +import org.apache.spark.sql.execution.datasources.{FilePartition, PartitionedFile, Spark2ParsePartitionUtil, SparkParsePartitionUtil} import org.apache.spark.sql.hudi.SparkAdapter import org.apache.spark.sql.hudi.parser.HoodieSpark2ExtendedSqlParser import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.{Row, SparkSession} +import scala.collection.mutable.ArrayBuffer + /** * The adapter for spark2. */ @@ -86,4 +89,44 @@ class Spark2Adapter extends SparkAdapter { override def parseMultipartIdentifier(parser: ParserInterface, sqlText: String): Seq[String] = { throw new IllegalStateException(s"Should not call ParserInterface#parseMultipartIdentifier for spark2") } + + /** + * Combine [[PartitionedFile]] to [[FilePartition]] according to `maxSplitBytes`. + * + * This is a copy of org.apache.spark.sql.execution.datasources.FilePartition#getFilePartitions from Spark 3.2. + * And this will be called only in Spark 2. + */ + override def getFilePartitions( + sparkSession: SparkSession, + partitionedFiles: Seq[PartitionedFile], + maxSplitBytes: Long): Seq[FilePartition] = { + + val partitions = new ArrayBuffer[FilePartition] + val currentFiles = new ArrayBuffer[PartitionedFile] + var currentSize = 0L + + /** Close the current partition and move to the next. */ + def closePartition(): Unit = { + if (currentFiles.nonEmpty) { + // Copy to a new Array. + val newPartition = FilePartition(partitions.size, currentFiles.toArray) + partitions += newPartition + } + currentFiles.clear() + currentSize = 0 + } + + val openCostInBytes = sparkSession.sessionState.conf.filesOpenCostInBytes + // Assign files to partitions using "Next Fit Decreasing" + partitionedFiles.foreach { file => + if (currentSize + file.length > maxSplitBytes) { + closePartition() + } + // Add the given file to the current partition. + currentSize += file.length + openCostInBytes + currentFiles += file + } + closePartition() + partitions.toSeq + } } diff --git a/hudi-spark-datasource/hudi-spark3-common/src/main/scala/org/apache/spark/sql/adapter/Spark3Adapter.scala b/hudi-spark-datasource/hudi-spark3-common/src/main/scala/org/apache/spark/sql/adapter/Spark3Adapter.scala index 61fcc9634..f446715a6 100644 --- a/hudi-spark-datasource/hudi-spark3-common/src/main/scala/org/apache/spark/sql/adapter/Spark3Adapter.scala +++ b/hudi-spark-datasource/hudi-spark3-common/src/main/scala/org/apache/spark/sql/adapter/Spark3Adapter.scala @@ -20,6 +20,7 @@ package org.apache.spark.sql.adapter import org.apache.hudi.Spark3RowSerDe import org.apache.hudi.client.utils.SparkRowSerDe import org.apache.hudi.spark3.internal.ReflectUtil + import org.apache.spark.sql.{Row, SparkSession} import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder @@ -29,9 +30,8 @@ import org.apache.spark.sql.catalyst.plans.JoinType import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoStatement, Join, JoinHint, LogicalPlan} import org.apache.spark.sql.catalyst.{AliasIdentifier, TableIdentifier} import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._ -import org.apache.spark.sql.connector.catalog.Table -import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation -import org.apache.spark.sql.execution.datasources.{LogicalRelation, Spark3ParsePartitionUtil, SparkParsePartitionUtil} +import org.apache.spark.sql.execution.datasources.{Spark3ParsePartitionUtil, SparkParsePartitionUtil} +import org.apache.spark.sql.execution.datasources.{FilePartition, PartitionedFile} import org.apache.spark.sql.hudi.SparkAdapter import org.apache.spark.sql.internal.SQLConf @@ -94,4 +94,14 @@ class Spark3Adapter extends SparkAdapter { override def parseMultipartIdentifier(parser: ParserInterface, sqlText: String): Seq[String] = { parser.parseMultipartIdentifier(sqlText) } + + /** + * Combine [[PartitionedFile]] to [[FilePartition]] according to `maxSplitBytes`. + */ + override def getFilePartitions( + sparkSession: SparkSession, + partitionedFiles: Seq[PartitionedFile], + maxSplitBytes: Long): Seq[FilePartition] = { + FilePartition.getFilePartitions(sparkSession, partitionedFiles, maxSplitBytes) + } }