From b431246710f37def64b28ea26923ef5602404d24 Mon Sep 17 00:00:00 2001 From: Yann Byron Date: Sat, 12 Feb 2022 02:48:44 +0800 Subject: [PATCH] [HUDI-3338] Custom relation instead of HadoopFsRelation (#4709) Currently, HadoopFsRelation will use the value of the real partition path as the value of the partition field. However, different from the normal table, Hudi will persist the partition value in the parquet file. And in some cases, it's different between the value of the real partition path and the value of the partition field. So here we implement BaseFileOnlyViewRelation which lets Hudi manage its own relation. --- .../org/apache/hudi/AvroConversionUtils.scala | 2 +- .../org/apache/hudi/HoodieSparkUtils.scala | 28 +++- .../apache/spark/sql/hudi/SparkAdapter.scala | 9 +- .../hudi/BaseFileOnlyViewRelation.scala | 95 +++++++++++ .../scala/org/apache/hudi/DefaultSource.scala | 29 ++-- .../org/apache/hudi/HoodieBaseRelation.scala | 55 ++++++ .../org/apache/hudi/HoodieBootstrapRDD.scala | 32 ++-- .../apache/hudi/HoodieBootstrapRelation.scala | 58 ++++--- .../apache/hudi/HoodieDataSourceHelper.scala | 158 ++++++++++++++++++ .../org/apache/hudi/HoodieFileScanRDD.scala | 109 ++++++++++++ .../apache/hudi/HoodieMergeOnReadRDD.scala | 77 +++------ .../org/apache/hudi/IncrementalRelation.scala | 2 +- .../hudi/MergeOnReadIncrementalRelation.scala | 33 ++-- .../hudi/MergeOnReadSnapshotRelation.scala | 43 ++--- .../hudi/streaming/HoodieStreamSource.scala | 4 +- .../apache/hudi/TestHoodieSparkUtils.scala | 31 +++- .../TestMergeOnReadSnapshotRelation.scala | 50 ------ .../hudi/functional/TestCOWDataSource.scala | 44 +++++ .../spark/sql/adapter/Spark2Adapter.scala | 45 ++++- .../spark/sql/adapter/Spark3Adapter.scala | 16 +- 20 files changed, 685 insertions(+), 235 deletions(-) create mode 100644 hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/BaseFileOnlyViewRelation.scala create mode 100644 hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala create mode 100644 hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieDataSourceHelper.scala create mode 100644 hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieFileScanRDD.scala delete mode 100644 hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestMergeOnReadSnapshotRelation.scala diff --git a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/AvroConversionUtils.scala b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/AvroConversionUtils.scala index 5b87fee14..d5a287233 100644 --- a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/AvroConversionUtils.scala +++ b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/AvroConversionUtils.scala @@ -117,7 +117,7 @@ object AvroConversionUtils { def buildAvroRecordBySchema(record: IndexedRecord, requiredSchema: Schema, - requiredPos: List[Int], + requiredPos: Seq[Int], recordBuilder: GenericRecordBuilder): GenericRecord = { val requiredFields = requiredSchema.getFields.asScala assert(requiredFields.length == requiredPos.length) diff --git a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieSparkUtils.scala b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieSparkUtils.scala index 3e5402565..dc61a5107 100644 --- a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieSparkUtils.scala +++ b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieSparkUtils.scala @@ -38,11 +38,11 @@ import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.encoders.RowEncoder import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Expression, Literal} import org.apache.spark.sql.execution.datasources.{FileStatusCache, InMemoryFileIndex} -import org.apache.spark.sql.functions._ import org.apache.spark.sql.sources._ import org.apache.spark.sql.types.{StringType, StructField, StructType} import org.apache.spark.sql.{DataFrame, SparkSession} +import scala.collection.JavaConverters._ import scala.collection.JavaConverters.asScalaBufferConverter object HoodieSparkUtils extends SparkAdapterSupport { @@ -293,4 +293,30 @@ object HoodieSparkUtils extends SparkAdapterSupport { s"${tableSchema.fieldNames.mkString(",")}") AttributeReference(columnName, field.get.dataType, field.get.nullable)() } + + def getRequiredSchema(tableAvroSchema: Schema, requiredColumns: Array[String]): (Schema, StructType) = { + // First get the required avro-schema, then convert the avro-schema to spark schema. + val name2Fields = tableAvroSchema.getFields.asScala.map(f => f.name() -> f).toMap + // Here have to create a new Schema.Field object + // to prevent throwing exceptions like "org.apache.avro.AvroRuntimeException: Field already used". + val requiredFields = requiredColumns.map(c => name2Fields(c)) + .map(f => new Schema.Field(f.name(), f.schema(), f.doc(), f.defaultVal(), f.order())).toList + val requiredAvroSchema = Schema.createRecord(tableAvroSchema.getName, tableAvroSchema.getDoc, + tableAvroSchema.getNamespace, tableAvroSchema.isError, requiredFields.asJava) + val requiredStructSchema = AvroConversionUtils.convertAvroSchemaToStructType(requiredAvroSchema) + (requiredAvroSchema, requiredStructSchema) + } + + def toAttribute(tableSchema: StructType): Seq[AttributeReference] = { + tableSchema.map { field => + AttributeReference(field.name, field.dataType, field.nullable, field.metadata)() + } + } + + def collectFieldIndexes(projectedSchema: StructType, originalSchema: StructType): Seq[Int] = { + val nameToIndex = originalSchema.fields.zipWithIndex.map{ case (field, index) => + field.name -> index + }.toMap + projectedSchema.map(field => nameToIndex(field.name)) + } } diff --git a/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/hudi/SparkAdapter.scala b/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/hudi/SparkAdapter.scala index 79c858e06..1264b9a1e 100644 --- a/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/hudi/SparkAdapter.scala +++ b/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/hudi/SparkAdapter.scala @@ -19,6 +19,7 @@ package org.apache.spark.sql.hudi import org.apache.hudi.client.utils.SparkRowSerDe + import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder import org.apache.spark.sql.catalyst.expressions.Expression @@ -26,7 +27,7 @@ import org.apache.spark.sql.catalyst.parser.ParserInterface import org.apache.spark.sql.catalyst.plans.JoinType import org.apache.spark.sql.catalyst.plans.logical.{Join, LogicalPlan} import org.apache.spark.sql.catalyst.{AliasIdentifier, TableIdentifier} -import org.apache.spark.sql.execution.datasources.SparkParsePartitionUtil +import org.apache.spark.sql.execution.datasources.{FilePartition, PartitionedFile, SparkParsePartitionUtil} import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.{Row, SparkSession} @@ -92,4 +93,10 @@ trait SparkAdapter extends Serializable { * ParserInterface#parseMultipartIdentifier is supported since spark3, for spark2 this should not be called. */ def parseMultipartIdentifier(parser: ParserInterface, sqlText: String): Seq[String] + + /** + * Combine [[PartitionedFile]] to [[FilePartition]] according to `maxSplitBytes`. + */ + def getFilePartitions(sparkSession: SparkSession, partitionedFiles: Seq[PartitionedFile], + maxSplitBytes: Long): Seq[FilePartition] } diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/BaseFileOnlyViewRelation.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/BaseFileOnlyViewRelation.scala new file mode 100644 index 000000000..7c3f99c9e --- /dev/null +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/BaseFileOnlyViewRelation.scala @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi + +import org.apache.hudi.common.table.HoodieTableMetaClient +import org.apache.hudi.common.table.TableSchemaResolver + +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.avro.SchemaConverters +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.Literal +import org.apache.spark.sql.execution.datasources.{PartitionedFile, _} +import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat +import org.apache.spark.sql.{Row, SQLContext} +import org.apache.spark.sql.sources.{BaseRelation, Filter, PrunedFilteredScan} +import org.apache.spark.sql.types.{BooleanType, StructType} + +import scala.util.Try + +/** + * The implement of [[BaseRelation]], which is used to respond to query that only touches the base files(Parquet), + * like query COW tables in Snapshot-Query and Read_Optimized mode and MOR tables in Read_Optimized mode. + */ +class BaseFileOnlyViewRelation( + sqlContext: SQLContext, + metaClient: HoodieTableMetaClient, + optParams: Map[String, String], + userSchema: Option[StructType] + ) extends HoodieBaseRelation(sqlContext, metaClient, optParams, userSchema) with SparkAdapterSupport { + + private val fileIndex = HoodieFileIndex(sparkSession, + metaClient, + userSchema, + optParams, + FileStatusCache.getOrCreate(sqlContext.sparkSession) + ) + + override def buildScan(requiredColumns: Array[String], filters: Array[Filter]): RDD[Row] = { + sqlContext.sparkSession.sessionState.conf.setConfString("spark.sql.parquet.enableVectorizedReader", "false") + + val filterExpressions = HoodieSparkUtils.convertToCatalystExpressions(filters, tableStructSchema) + .getOrElse(Literal(true, BooleanType)) + val (partitionFilters, dataFilters) = { + val splited = filters.map { filter => + HoodieDataSourceHelper.splitPartitionAndDataPredicates( + sparkSession, filterExpressions, partitionColumns) + } + (splited.flatMap(_._1), splited.flatMap(_._2)) + } + + val partitionFiles = fileIndex.listFiles(partitionFilters, dataFilters).flatMap { partition => + partition.files.flatMap { file => + HoodieDataSourceHelper.splitFiles( + sparkSession = sparkSession, + file = file, + partitionValues = partition.values + ) + } + } + val emptyPartitionFiles = partitionFiles.map{ f => + PartitionedFile(InternalRow.empty, f.filePath, f.start, f.length) + } + val maxSplitBytes = sparkSession.sessionState.conf.filesMaxPartitionBytes + val filePartitions = sparkAdapter.getFilePartitions(sparkSession, emptyPartitionFiles, maxSplitBytes) + + val requiredSchemaParquetReader = HoodieDataSourceHelper.buildHoodieParquetReader( + sparkSession = sparkSession, + dataSchema = tableStructSchema, + partitionSchema = StructType(Nil), + requiredSchema = tableStructSchema, + filters = filters, + options = optParams, + hadoopConf = sparkSession.sessionState.newHadoopConf() + ) + + new HoodieFileScanRDD(sparkSession, requiredColumns, tableStructSchema, + requiredSchemaParquetReader, filePartitions) + } +} diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DefaultSource.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DefaultSource.scala index 4c9e585c3..3003345ea 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DefaultSource.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DefaultSource.scala @@ -108,6 +108,7 @@ class DefaultSource extends RelationProvider val isBootstrappedTable = metaClient.getTableConfig.getBootstrapBasePath.isPresent val tableType = metaClient.getTableType val queryType = parameters(QUERY_TYPE.key) + val userSchema = if (schema == null) Option.empty[StructType] else Some(schema) log.info(s"Is bootstrapped table => $isBootstrappedTable, tableType is: $tableType, queryType is: $queryType") if (metaClient.getCommitsTimeline.filterCompletedInstants.countInstants() == 0) { @@ -117,20 +118,20 @@ class DefaultSource extends RelationProvider case (COPY_ON_WRITE, QUERY_TYPE_SNAPSHOT_OPT_VAL, false) | (COPY_ON_WRITE, QUERY_TYPE_READ_OPTIMIZED_OPT_VAL, false) | (MERGE_ON_READ, QUERY_TYPE_READ_OPTIMIZED_OPT_VAL, false) => - getBaseFileOnlyView(useHoodieFileIndex, sqlContext, parameters, schema, tablePath, + getBaseFileOnlyView(useHoodieFileIndex, sqlContext, parameters, userSchema, tablePath, readPaths, metaClient) case (COPY_ON_WRITE, QUERY_TYPE_INCREMENTAL_OPT_VAL, _) => - new IncrementalRelation(sqlContext, parameters, schema, metaClient) + new IncrementalRelation(sqlContext, parameters, userSchema, metaClient) case (MERGE_ON_READ, QUERY_TYPE_SNAPSHOT_OPT_VAL, false) => - new MergeOnReadSnapshotRelation(sqlContext, parameters, schema, globPaths, metaClient) + new MergeOnReadSnapshotRelation(sqlContext, parameters, userSchema, globPaths, metaClient) case (MERGE_ON_READ, QUERY_TYPE_INCREMENTAL_OPT_VAL, _) => - new MergeOnReadIncrementalRelation(sqlContext, parameters, schema, metaClient) + new MergeOnReadIncrementalRelation(sqlContext, parameters, userSchema, metaClient) case (_, _, true) => - new HoodieBootstrapRelation(sqlContext, schema, globPaths, metaClient, parameters) + new HoodieBootstrapRelation(sqlContext, userSchema, globPaths, metaClient, parameters) case (_, _, _) => throw new HoodieException(s"Invalid query type : $queryType for tableType: $tableType," + @@ -185,7 +186,7 @@ class DefaultSource extends RelationProvider private def getBaseFileOnlyView(useHoodieFileIndex: Boolean, sqlContext: SQLContext, optParams: Map[String, String], - schema: StructType, + schema: Option[StructType], tablePath: String, extraReadPaths: Seq[String], metaClient: HoodieTableMetaClient): BaseRelation = { @@ -196,17 +197,7 @@ class DefaultSource extends RelationProvider } if (useHoodieFileIndex) { - val fileIndex = HoodieFileIndex(sqlContext.sparkSession, metaClient, - if (schema == null) Option.empty[StructType] else Some(schema), - optParams, FileStatusCache.getOrCreate(sqlContext.sparkSession)) - - HadoopFsRelation( - fileIndex, - fileIndex.partitionSchema, - fileIndex.dataSchema, - bucketSpec = None, - fileFormat = tableFileFormat, - optParams)(sqlContext.sparkSession) + new BaseFileOnlyViewRelation(sqlContext, metaClient, optParams, schema) } else { // this is just effectively RO view only, where `path` can contain a mix of // non-hoodie/hoodie path files. set the path filter up @@ -215,7 +206,7 @@ class DefaultSource extends RelationProvider classOf[HoodieROTablePathFilter], classOf[org.apache.hadoop.fs.PathFilter]) - val specifySchema = if (schema == null) { + val specifySchema = if (schema.isEmpty) { // Load the schema from the commit meta data. // Here we should specify the schema to the latest commit schema since // the table schema evolution. @@ -228,7 +219,7 @@ class DefaultSource extends RelationProvider // with tableSchemaResolver, return None here. } } else { - Some(schema) + schema } // simply return as a regular relation DataSource.apply( diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala new file mode 100644 index 000000000..74aa8a3d9 --- /dev/null +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi + +import org.apache.avro.Schema + +import org.apache.hudi.common.table.{HoodieTableMetaClient, TableSchemaResolver} + +import org.apache.spark.internal.Logging +import org.apache.spark.sql.{SQLContext, SparkSession} +import org.apache.spark.sql.avro.SchemaConverters +import org.apache.spark.sql.sources.{BaseRelation, PrunedFilteredScan} +import org.apache.spark.sql.types.StructType + +import scala.util.Try + +/** + * Hoodie BaseRelation which extends [[PrunedFilteredScan]]. + */ +abstract class HoodieBaseRelation( + val sqlContext: SQLContext, + metaClient: HoodieTableMetaClient, + optParams: Map[String, String], + userSchema: Option[StructType]) + extends BaseRelation with PrunedFilteredScan with Logging{ + + protected val sparkSession: SparkSession = sqlContext.sparkSession + + protected val tableAvroSchema: Schema = { + val schemaUtil = new TableSchemaResolver(metaClient) + Try (schemaUtil.getTableAvroSchema).getOrElse(SchemaConverters.toAvroType(userSchema.get)) + } + + protected val tableStructSchema: StructType = AvroConversionUtils.convertAvroSchemaToStructType(tableAvroSchema) + + protected val partitionColumns: Array[String] = metaClient.getTableConfig.getPartitionFields.orElse(Array.empty) + + override def schema: StructType = userSchema.getOrElse(tableStructSchema) + +} diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBootstrapRDD.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBootstrapRDD.scala index a522db6af..ea997c86a 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBootstrapRDD.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBootstrapRDD.scala @@ -24,12 +24,13 @@ import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.execution.datasources.PartitionedFile import org.apache.spark.sql.types.StructType -import org.apache.spark.sql.vectorized.ColumnarBatch + +import org.apache.hudi.HoodieDataSourceHelper._ class HoodieBootstrapRDD(@transient spark: SparkSession, - dataReadFunction: PartitionedFile => Iterator[Any], - skeletonReadFunction: PartitionedFile => Iterator[Any], - regularReadFunction: PartitionedFile => Iterator[Any], + dataReadFunction: PartitionedFile => Iterator[InternalRow], + skeletonReadFunction: PartitionedFile => Iterator[InternalRow], + regularReadFunction: PartitionedFile => Iterator[InternalRow], dataSchema: StructType, skeletonSchema: StructType, requiredColumns: Array[String], @@ -56,18 +57,18 @@ class HoodieBootstrapRDD(@transient spark: SparkSession, // It is a bootstrap split. Check both skeleton and data files. if (dataSchema.isEmpty) { // No data column to fetch, hence fetch only from skeleton file - partitionedFileIterator = read(bootstrapPartition.split.skeletonFile.get, skeletonReadFunction) + partitionedFileIterator = skeletonReadFunction(bootstrapPartition.split.skeletonFile.get) } else if (skeletonSchema.isEmpty) { // No metadata column to fetch, hence fetch only from data file - partitionedFileIterator = read(bootstrapPartition.split.dataFile, dataReadFunction) + partitionedFileIterator = dataReadFunction(bootstrapPartition.split.dataFile) } else { // Fetch from both data and skeleton file, and merge - val dataFileIterator = read(bootstrapPartition.split.dataFile, dataReadFunction) - val skeletonFileIterator = read(bootstrapPartition.split.skeletonFile.get, skeletonReadFunction) + val dataFileIterator = dataReadFunction(bootstrapPartition.split.dataFile) + val skeletonFileIterator = skeletonReadFunction(bootstrapPartition.split.skeletonFile.get) partitionedFileIterator = merge(skeletonFileIterator, dataFileIterator) } } else { - partitionedFileIterator = read(bootstrapPartition.split.dataFile, regularReadFunction) + partitionedFileIterator = regularReadFunction(bootstrapPartition.split.dataFile) } partitionedFileIterator } @@ -101,19 +102,6 @@ class HoodieBootstrapRDD(@transient spark: SparkSession, mergedRow } - def read(partitionedFile: PartitionedFile, readFileFunction: PartitionedFile => Iterator[Any]) - : Iterator[InternalRow] = { - val fileIterator = readFileFunction(partitionedFile) - - import scala.collection.JavaConverters._ - - val rows = fileIterator.flatMap(_ match { - case r: InternalRow => Seq(r) - case b: ColumnarBatch => b.rowIterator().asScala - }) - rows - } - override protected def getPartitions: Array[Partition] = { tableState.files.zipWithIndex.map(file => { if (file._1.skeletonFile.isDefined) { diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBootstrapRelation.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBootstrapRelation.scala index b1ab83a94..e9ce0f1ae 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBootstrapRelation.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBootstrapRelation.scala @@ -52,7 +52,7 @@ import scala.collection.JavaConverters._ * @param optParams DataSource options passed by the user */ class HoodieBootstrapRelation(@transient val _sqlContext: SQLContext, - val userSchema: StructType, + val userSchema: Option[StructType], val globPaths: Option[Seq[Path]], val metaClient: HoodieTableMetaClient, val optParams: Map[String, String]) extends BaseRelation @@ -107,37 +107,35 @@ class HoodieBootstrapRelation(@transient val _sqlContext: SQLContext, }) // Prepare readers for reading data file and skeleton files - val dataReadFunction = new ParquetFileFormat() - .buildReaderWithPartitionValues( - sparkSession = _sqlContext.sparkSession, - dataSchema = dataSchema, - partitionSchema = StructType(Seq.empty), - requiredSchema = requiredDataSchema, - filters = if (requiredSkeletonSchema.isEmpty) filters else Seq() , - options = Map.empty, - hadoopConf = _sqlContext.sparkSession.sessionState.newHadoopConf() - ) + val dataReadFunction = HoodieDataSourceHelper.buildHoodieParquetReader( + sparkSession = _sqlContext.sparkSession, + dataSchema = dataSchema, + partitionSchema = StructType(Seq.empty), + requiredSchema = requiredDataSchema, + filters = if (requiredSkeletonSchema.isEmpty) filters else Seq() , + options = optParams, + hadoopConf = _sqlContext.sparkSession.sessionState.newHadoopConf() + ) - val skeletonReadFunction = new ParquetFileFormat() - .buildReaderWithPartitionValues( - sparkSession = _sqlContext.sparkSession, - dataSchema = skeletonSchema, - partitionSchema = StructType(Seq.empty), - requiredSchema = requiredSkeletonSchema, - filters = if (requiredDataSchema.isEmpty) filters else Seq(), - options = Map.empty, - hadoopConf = _sqlContext.sparkSession.sessionState.newHadoopConf() - ) + val skeletonReadFunction = HoodieDataSourceHelper.buildHoodieParquetReader( + sparkSession = _sqlContext.sparkSession, + dataSchema = skeletonSchema, + partitionSchema = StructType(Seq.empty), + requiredSchema = requiredSkeletonSchema, + filters = if (requiredDataSchema.isEmpty) filters else Seq(), + options = optParams, + hadoopConf = _sqlContext.sparkSession.sessionState.newHadoopConf() + ) - val regularReadFunction = new ParquetFileFormat() - .buildReaderWithPartitionValues( - sparkSession = _sqlContext.sparkSession, - dataSchema = fullSchema, - partitionSchema = StructType(Seq.empty), - requiredSchema = requiredColsSchema, - filters = filters, - options = Map.empty, - hadoopConf = _sqlContext.sparkSession.sessionState.newHadoopConf()) + val regularReadFunction = HoodieDataSourceHelper.buildHoodieParquetReader( + sparkSession = _sqlContext.sparkSession, + dataSchema = fullSchema, + partitionSchema = StructType(Seq.empty), + requiredSchema = requiredColsSchema, + filters = filters, + options = optParams, + hadoopConf = _sqlContext.sparkSession.sessionState.newHadoopConf() + ) val rdd = new HoodieBootstrapRDD(_sqlContext.sparkSession, dataReadFunction, skeletonReadFunction, regularReadFunction, requiredDataSchema, requiredSkeletonSchema, requiredColumns, tableState) diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieDataSourceHelper.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieDataSourceHelper.scala new file mode 100644 index 000000000..a04f54621 --- /dev/null +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieDataSourceHelper.scala @@ -0,0 +1,158 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi + +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.FileStatus + +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.{Expression, PredicateHelper, SpecificInternalRow, SubqueryExpression, UnsafeProjection} +import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat +import org.apache.spark.sql.execution.datasources.{FilePartition, PartitionedFile} +import org.apache.spark.sql.sources.Filter +import org.apache.spark.sql.types.StructType +import org.apache.spark.sql.vectorized.ColumnarBatch + +import scala.collection.JavaConverters._ +import scala.collection.mutable.ArrayBuffer + +object HoodieDataSourceHelper extends PredicateHelper { + + /** + * Partition the given condition into two sequence of conjunctive predicates: + * - predicates that can be evaluated using metadata only. + * - other predicates. + */ + def splitPartitionAndDataPredicates( + spark: SparkSession, + condition: Expression, + partitionColumns: Seq[String]): (Seq[Expression], Seq[Expression]) = { + splitConjunctivePredicates(condition).partition( + isPredicateMetadataOnly(spark, _, partitionColumns)) + } + + /** + * Check if condition can be evaluated using only metadata. In Delta, this means the condition + * only references partition columns and involves no subquery. + */ + def isPredicateMetadataOnly( + spark: SparkSession, + condition: Expression, + partitionColumns: Seq[String]): Boolean = { + isPredicatePartitionColumnsOnly(spark, condition, partitionColumns) && + !SubqueryExpression.hasSubquery(condition) + } + + /** + * Does the predicate only contains partition columns? + */ + def isPredicatePartitionColumnsOnly( + spark: SparkSession, + condition: Expression, + partitionColumns: Seq[String]): Boolean = { + val nameEquality = spark.sessionState.analyzer.resolver + condition.references.forall { r => + partitionColumns.exists(nameEquality(r.name, _)) + } + } + + /** + * Wrapper `buildReaderWithPartitionValues` of [[ParquetFileFormat]] + * to deal with [[ColumnarBatch]] when enable parquet vectorized reader if necessary. + */ + def buildHoodieParquetReader( + sparkSession: SparkSession, + dataSchema: StructType, + partitionSchema: StructType, + requiredSchema: StructType, + filters: Seq[Filter], + options: Map[String, String], + hadoopConf: Configuration): (PartitionedFile) => Iterator[InternalRow] = { + + val readParquetFile: PartitionedFile => Iterator[Any] = new ParquetFileFormat().buildReaderWithPartitionValues( + sparkSession = sparkSession, + dataSchema = dataSchema, + partitionSchema = partitionSchema, + requiredSchema = requiredSchema, + filters = filters, + options = options, + hadoopConf = hadoopConf + ) + + file: PartitionedFile => { + val iter = readParquetFile(file) + val rows = iter.flatMap(_ match { + case r: InternalRow => Seq(r) + case b: ColumnarBatch => b.rowIterator().asScala + }) + rows + } + } + + /** + * Extract the required schema from [[InternalRow]] + */ + def extractRequiredSchema( + iter: Iterator[InternalRow], + requiredSchema: StructType, + requiredFieldPos: Seq[Int]): Iterator[InternalRow] = { + val unsafeProjection = UnsafeProjection.create(requiredSchema) + val rows = iter.map { row => + unsafeProjection(createInternalRowWithSchema(row, requiredSchema, requiredFieldPos)) + } + rows + } + + /** + * Convert [[InternalRow]] to [[SpecificInternalRow]]. + */ + def createInternalRowWithSchema( + row: InternalRow, + schema: StructType, + positions: Seq[Int]): InternalRow = { + val rowToReturn = new SpecificInternalRow(schema) + var curIndex = 0 + schema.zip(positions).foreach { case (field, pos) => + val curField = if (row.isNullAt(pos)) { + null + } else { + row.get(pos, field.dataType) + } + rowToReturn.update(curIndex, curField) + curIndex += 1 + } + rowToReturn + } + + + def splitFiles( + sparkSession: SparkSession, + file: FileStatus, + partitionValues: InternalRow): Seq[PartitionedFile] = { + val filePath = file.getPath + val maxSplitBytes = sparkSession.sessionState.conf.filesMaxPartitionBytes + (0L until file.getLen by maxSplitBytes).map { offset => + val remaining = file.getLen - offset + val size = if (remaining > maxSplitBytes) maxSplitBytes else remaining + PartitionedFile(partitionValues, filePath.toUri.toString, offset, size) + } + } + +} diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieFileScanRDD.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieFileScanRDD.scala new file mode 100644 index 000000000..9f2d7d9e0 --- /dev/null +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieFileScanRDD.scala @@ -0,0 +1,109 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi + +import org.apache.spark.{Partition, TaskContext} +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow} +import org.apache.spark.sql.execution.QueryExecutionException +import org.apache.spark.sql.{Row, SparkSession} +import org.apache.spark.sql.execution.datasources.{FilePartition, PartitionedFile, SchemaColumnConvertNotSupportedException} +import org.apache.spark.sql.types.StructType + +/** + * Similar to [[org.apache.spark.sql.execution.datasources.FileScanRDD]]. + * + * This class will extract the fields needed according to [[requiredColumns]] and + * return iterator of [[org.apache.spark.sql.Row]] directly. + */ +class HoodieFileScanRDD( + @transient private val sparkSession: SparkSession, + requiredColumns: Array[String], + schema: StructType, + readFunction: PartitionedFile => Iterator[InternalRow], + @transient val filePartitions: Seq[FilePartition]) + extends RDD[Row](sparkSession.sparkContext, Nil) { + + private val requiredSchema = { + val nameToStructField = schema.map(field => (field.name, field)).toMap + StructType(requiredColumns.map(nameToStructField)) + } + + private val requiredFieldPos = HoodieSparkUtils.collectFieldIndexes(requiredSchema, schema) + + override def compute(split: Partition, context: TaskContext): Iterator[Row] = { + val iterator = new Iterator[Object] with AutoCloseable { + + private[this] val files = split.asInstanceOf[FilePartition].files.toIterator + private[this] var currentFile: PartitionedFile = null + private[this] var currentIterator: Iterator[Object] = null + + override def hasNext: Boolean = { + (currentIterator != null && currentIterator.hasNext) || nextIterator() + } + + def next(): Object = { + currentIterator.next() + } + + /** Advances to the next file. Returns true if a new non-empty iterator is available. */ + private def nextIterator(): Boolean = { + if (files.hasNext) { + currentFile = files.next() + + logInfo(s"Reading File $currentFile") + currentIterator = readFunction(currentFile) + + try { + hasNext + } catch { + case e: SchemaColumnConvertNotSupportedException => + val message = "Parquet column cannot be converted in " + + s"file ${currentFile.filePath}. Column: ${e.getColumn}, " + + s"Expected: ${e.getLogicalType}, Found: ${e.getPhysicalType}" + throw new QueryExecutionException(message, e) + + case e => throw e + } + } else { + currentFile = null + false + } + } + + override def close(): Unit = {} + } + + // Register an on-task-completion callback to close the input stream. + context.addTaskCompletionListener[Unit](_ => iterator.close()) + + // extract required columns from row + val iterAfterExtract = HoodieDataSourceHelper.extractRequiredSchema( + iterator.asInstanceOf[Iterator[InternalRow]], + requiredSchema, + requiredFieldPos) + + // convert InternalRow to Row and return + val converter = CatalystTypeConverters.createToScalaConverter(requiredSchema) + iterAfterExtract.map(converter(_).asInstanceOf[Row]) + } + + override protected def getPartitions: Array[Partition] = filePartitions.toArray + +} diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieMergeOnReadRDD.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieMergeOnReadRDD.scala index 3257ecf1f..4c3d30bd4 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieMergeOnReadRDD.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieMergeOnReadRDD.scala @@ -20,19 +20,22 @@ package org.apache.hudi import org.apache.avro.Schema import org.apache.avro.generic.{GenericRecord, GenericRecordBuilder} + import org.apache.hadoop.conf.Configuration + +import org.apache.hudi.HoodieDataSourceHelper._ import org.apache.hudi.common.fs.FSUtils import org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner import org.apache.hudi.config.HoodiePayloadConfig import org.apache.hudi.exception.HoodieException import org.apache.hudi.hadoop.config.HoodieRealtimeConfig import org.apache.hudi.hadoop.utils.HoodieInputFormatUtils.HOODIE_RECORD_KEY_COL_POS + import org.apache.spark.rdd.RDD import org.apache.spark.sql.avro.{HoodieAvroDeserializer, HoodieAvroSerializer} import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.sql.catalyst.expressions.{SpecificInternalRow, UnsafeProjection} +import org.apache.spark.sql.catalyst.expressions.UnsafeProjection import org.apache.spark.sql.execution.datasources.PartitionedFile -import org.apache.spark.sql.vectorized.ColumnarBatch import org.apache.spark.{Partition, SerializableWritable, SparkContext, TaskContext} import java.io.Closeable @@ -46,8 +49,8 @@ case class HoodieMergeOnReadPartition(index: Int, split: HoodieMergeOnReadFileSp class HoodieMergeOnReadRDD(@transient sc: SparkContext, @transient config: Configuration, - fullSchemaFileReader: PartitionedFile => Iterator[Any], - requiredSchemaFileReader: PartitionedFile => Iterator[Any], + fullSchemaFileReader: PartitionedFile => Iterator[InternalRow], + requiredSchemaFileReader: PartitionedFile => Iterator[InternalRow], tableState: HoodieMergeOnReadTableState) extends RDD[InternalRow](sc, Nil) { @@ -61,26 +64,33 @@ class HoodieMergeOnReadRDD(@transient sc: SparkContext, } else { new Properties() } + + private val requiredSchema = tableState.requiredStructSchema + + private val requiredFieldPosition = HoodieSparkUtils.collectFieldIndexes(requiredSchema, + tableState.tableStructSchema + ) + override def compute(split: Partition, context: TaskContext): Iterator[InternalRow] = { val mergeOnReadPartition = split.asInstanceOf[HoodieMergeOnReadPartition] val iter = mergeOnReadPartition.split match { case dataFileOnlySplit if dataFileOnlySplit.logPaths.isEmpty => - val rows = read(dataFileOnlySplit.dataFile.get, requiredSchemaFileReader) - extractRequiredSchema(rows) + val rows = requiredSchemaFileReader(dataFileOnlySplit.dataFile.get) + extractRequiredSchema(rows, requiredSchema, requiredFieldPosition) case logFileOnlySplit if logFileOnlySplit.dataFile.isEmpty => logFileIterator(logFileOnlySplit, getConfig) case skipMergeSplit if skipMergeSplit.mergeType .equals(DataSourceReadOptions.REALTIME_SKIP_MERGE_OPT_VAL) => skipMergeFileIterator( skipMergeSplit, - read(skipMergeSplit.dataFile.get, requiredSchemaFileReader), + requiredSchemaFileReader(skipMergeSplit.dataFile.get), getConfig ) case payloadCombineSplit if payloadCombineSplit.mergeType .equals(DataSourceReadOptions.REALTIME_PAYLOAD_COMBINE_OPT_VAL) => payloadCombineFileIterator( payloadCombineSplit, - read(payloadCombineSplit.dataFile.get, fullSchemaFileReader), + fullSchemaFileReader(payloadCombineSplit.dataFile.get), getConfig ) case _ => throw new HoodieException(s"Unable to select an Iterator to read the Hoodie MOR File Split for " + @@ -112,36 +122,11 @@ class HoodieMergeOnReadRDD(@transient sc: SparkContext, } } - private def read(partitionedFile: PartitionedFile, - readFileFunction: PartitionedFile => Iterator[Any]): Iterator[InternalRow] = { - val fileIterator = readFileFunction(partitionedFile) - val rows = fileIterator.flatMap(_ match { - case r: InternalRow => Seq(r) - case b: ColumnarBatch => b.rowIterator().asScala - }) - rows - } - - private def extractRequiredSchema(iter: Iterator[InternalRow]): Iterator[InternalRow] = { - val tableAvroSchema = new Schema.Parser().parse(tableState.tableAvroSchema) - val requiredAvroSchema = new Schema.Parser().parse(tableState.requiredAvroSchema) - val requiredFieldPosition = tableState.requiredStructSchema - .map(f => tableAvroSchema.getField(f.name).pos()).toList - val unsafeProjection = UnsafeProjection.create(tableState.requiredStructSchema) - val rows = iter.map { row => - unsafeProjection(createRowWithRequiredSchema(row, requiredFieldPosition)) - } - rows - } - private def logFileIterator(split: HoodieMergeOnReadFileSplit, config: Configuration): Iterator[InternalRow] = new Iterator[InternalRow] with Closeable { private val tableAvroSchema = new Schema.Parser().parse(tableState.tableAvroSchema) private val requiredAvroSchema = new Schema.Parser().parse(tableState.requiredAvroSchema) - private val requiredFieldPosition = - tableState.requiredStructSchema - .map(f => tableAvroSchema.getField(f.name).pos()).toList private val recordBuilder = new GenericRecordBuilder(requiredAvroSchema) private val deserializer = HoodieAvroDeserializer(requiredAvroSchema, tableState.requiredStructSchema) private val unsafeProjection = UnsafeProjection.create(tableState.requiredStructSchema) @@ -189,9 +174,6 @@ class HoodieMergeOnReadRDD(@transient sc: SparkContext, new Iterator[InternalRow] with Closeable { private val tableAvroSchema = new Schema.Parser().parse(tableState.tableAvroSchema) private val requiredAvroSchema = new Schema.Parser().parse(tableState.requiredAvroSchema) - private val requiredFieldPosition = - tableState.requiredStructSchema - .map(f => tableAvroSchema.getField(f.name).pos()).toList private val recordBuilder = new GenericRecordBuilder(requiredAvroSchema) private val deserializer = HoodieAvroDeserializer(requiredAvroSchema, tableState.requiredStructSchema) private val unsafeProjection = UnsafeProjection.create(tableState.requiredStructSchema) @@ -205,7 +187,7 @@ class HoodieMergeOnReadRDD(@transient sc: SparkContext, override def hasNext: Boolean = { if (baseFileIterator.hasNext) { val curRow = baseFileIterator.next() - recordToLoad = unsafeProjection(createRowWithRequiredSchema(curRow, requiredFieldPosition)) + recordToLoad = unsafeProjection(createInternalRowWithSchema(curRow, requiredSchema, requiredFieldPosition)) true } else { if (logRecordsKeyIterator.hasNext) { @@ -247,9 +229,6 @@ class HoodieMergeOnReadRDD(@transient sc: SparkContext, new Iterator[InternalRow] with Closeable { private val tableAvroSchema = new Schema.Parser().parse(tableState.tableAvroSchema) private val requiredAvroSchema = new Schema.Parser().parse(tableState.requiredAvroSchema) - private val requiredFieldPosition = - tableState.requiredStructSchema - .map(f => tableAvroSchema.getField(f.name).pos()).toList private val serializer = HoodieAvroSerializer(tableState.tableStructSchema, tableAvroSchema, false) private val requiredDeserializer = HoodieAvroDeserializer(requiredAvroSchema, tableState.requiredStructSchema) private val recordBuilder = new GenericRecordBuilder(requiredAvroSchema) @@ -289,7 +268,7 @@ class HoodieMergeOnReadRDD(@transient sc: SparkContext, } } else { // No merge needed, load current row with required schema - recordToLoad = unsafeProjection(createRowWithRequiredSchema(curRow, requiredFieldPosition)) + recordToLoad = unsafeProjection(createInternalRowWithSchema(curRow, requiredSchema, requiredFieldPosition)) true } } else { @@ -339,22 +318,6 @@ class HoodieMergeOnReadRDD(@transient sc: SparkContext, historyAvroRecord, tableAvroSchema, payloadProps) } } - - private def createRowWithRequiredSchema(row: InternalRow, requiredFieldPosition: Seq[Int]): InternalRow = { - val rowToReturn = new SpecificInternalRow(tableState.requiredStructSchema) - val posIterator = requiredFieldPosition.iterator - var curIndex = 0 - tableState.requiredStructSchema.foreach( - f => { - val curPos = posIterator.next() - val curField = if (row.isNullAt(curPos)) null else row.get(curPos, f.dataType) - rowToReturn.update(curIndex, curField) - curIndex = curIndex + 1 - } - ) - rowToReturn - } - } private object HoodieMergeOnReadRDD { diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/IncrementalRelation.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/IncrementalRelation.scala index 14df0cb38..395e1b3a2 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/IncrementalRelation.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/IncrementalRelation.scala @@ -48,7 +48,7 @@ import scala.collection.mutable */ class IncrementalRelation(val sqlContext: SQLContext, val optParams: Map[String, String], - val userSchema: StructType, + val userSchema: Option[StructType], val metaClient: HoodieTableMetaClient) extends BaseRelation with TableScan { private val log = LogManager.getLogger(classOf[IncrementalRelation]) diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadIncrementalRelation.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadIncrementalRelation.scala index 01b480a11..db9ebeff4 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadIncrementalRelation.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadIncrementalRelation.scala @@ -25,9 +25,12 @@ import org.apache.hudi.hadoop.utils.HoodieInputFormatUtils.listAffectedFilesForC import org.apache.hudi.hadoop.utils.HoodieInputFormatUtils.getCommitMetadata import org.apache.hudi.hadoop.utils.HoodieInputFormatUtils.getWritePartitionPaths import org.apache.hudi.hadoop.utils.HoodieRealtimeRecordReaderUtils.getMaxCompactionMemoryInBytes + import org.apache.hadoop.fs.{GlobPattern, Path} import org.apache.hadoop.mapred.JobConf + import org.apache.log4j.LogManager + import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.execution.datasources.PartitionedFile @@ -43,15 +46,15 @@ import scala.collection.JavaConversions._ * Relation, that implements the Hoodie incremental view for Merge On Read table. * */ -class MergeOnReadIncrementalRelation(val sqlContext: SQLContext, +class MergeOnReadIncrementalRelation(sqlContext: SQLContext, val optParams: Map[String, String], - val userSchema: StructType, + val userSchema: Option[StructType], val metaClient: HoodieTableMetaClient) - extends BaseRelation with PrunedFilteredScan { + extends HoodieBaseRelation(sqlContext, metaClient, optParams, userSchema) { - private val log = LogManager.getLogger(classOf[MergeOnReadIncrementalRelation]) private val conf = sqlContext.sparkContext.hadoopConfiguration private val jobConf = new JobConf(conf) + private val commitTimeline = metaClient.getCommitsAndCompactionTimeline.filterCompletedInstants() if (commitTimeline.empty()) { throw new HoodieException("No instants to incrementally pull") @@ -72,13 +75,13 @@ class MergeOnReadIncrementalRelation(val sqlContext: SQLContext, private val commitsTimelineToReturn = commitTimeline.findInstantsInRange( optParams(DataSourceReadOptions.BEGIN_INSTANTTIME.key), optParams.getOrElse(DataSourceReadOptions.END_INSTANTTIME.key, lastInstant.getTimestamp)) - log.debug(s"${commitsTimelineToReturn.getInstants.iterator().toList.map(f => f.toString).mkString(",")}") + logDebug(s"${commitsTimelineToReturn.getInstants.iterator().toList.map(f => f.toString).mkString(",")}") private val commitsToReturn = commitsTimelineToReturn.getInstants.iterator().toList - private val schemaResolver = new TableSchemaResolver(metaClient) - private val tableAvroSchema = schemaResolver.getTableAvroSchema - private val tableStructSchema = AvroConversionUtils.convertAvroSchemaToStructType(tableAvroSchema) + private val maxCompactionMemoryInBytes = getMaxCompactionMemoryInBytes(jobConf) + private val fileIndex = if (commitsToReturn.isEmpty) List() else buildFileIndex() + private val preCombineField = { val preCombineFieldFromTableConfig = metaClient.getTableConfig.getPreCombineField if (preCombineFieldFromTableConfig != null) { @@ -89,7 +92,6 @@ class MergeOnReadIncrementalRelation(val sqlContext: SQLContext, optParams.get(DataSourceReadOptions.READ_PRE_COMBINE_FIELD.key) } } - override def schema: StructType = tableStructSchema override def needConversion: Boolean = false @@ -108,8 +110,8 @@ class MergeOnReadIncrementalRelation(val sqlContext: SQLContext, if (fileIndex.isEmpty) { sqlContext.sparkContext.emptyRDD[Row] } else { - log.debug(s"buildScan requiredColumns = ${requiredColumns.mkString(",")}") - log.debug(s"buildScan filters = ${filters.mkString(",")}") + logDebug(s"buildScan requiredColumns = ${requiredColumns.mkString(",")}") + logDebug(s"buildScan filters = ${filters.mkString(",")}") // config to ensure the push down filter for parquet will be applied. sqlContext.sparkSession.sessionState.conf.setConfString("spark.sql.parquet.filterPushdown", "true") sqlContext.sparkSession.sessionState.conf.setConfString("spark.sql.parquet.recordLevelFilter.enabled", "true") @@ -121,7 +123,7 @@ class MergeOnReadIncrementalRelation(val sqlContext: SQLContext, filters :+ isNotNullFilter :+ largerThanFilter :+ lessThanFilter } val (requiredAvroSchema, requiredStructSchema) = - MergeOnReadSnapshotRelation.getRequiredSchema(tableAvroSchema, requiredColumns) + HoodieSparkUtils.getRequiredSchema(tableAvroSchema, requiredColumns) val hoodieTableState = HoodieMergeOnReadTableState( tableStructSchema, @@ -132,7 +134,7 @@ class MergeOnReadIncrementalRelation(val sqlContext: SQLContext, preCombineField, Option.empty ) - val fullSchemaParquetReader = new ParquetFileFormat().buildReaderWithPartitionValues( + val fullSchemaParquetReader = HoodieDataSourceHelper.buildHoodieParquetReader( sparkSession = sqlContext.sparkSession, dataSchema = tableStructSchema, partitionSchema = StructType(Nil), @@ -141,7 +143,8 @@ class MergeOnReadIncrementalRelation(val sqlContext: SQLContext, options = optParams, hadoopConf = sqlContext.sparkSession.sessionState.newHadoopConf() ) - val requiredSchemaParquetReader = new ParquetFileFormat().buildReaderWithPartitionValues( + + val requiredSchemaParquetReader = HoodieDataSourceHelper.buildHoodieParquetReader( sparkSession = sqlContext.sparkSession, dataSchema = tableStructSchema, partitionSchema = StructType(Nil), @@ -173,7 +176,7 @@ class MergeOnReadIncrementalRelation(val sqlContext: SQLContext, ).toList val latestCommit = fsView.getLastInstant.get.getTimestamp if (log.isDebugEnabled) { - fileGroups.foreach(f => log.debug(s"current file group id: " + + fileGroups.foreach(f => logDebug(s"current file group id: " + s"${f.getFileGroupId} and file slices ${f.getLatestFileSlice.get.toString}")) } diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadSnapshotRelation.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadSnapshotRelation.scala index 2829b4bc1..6ff49823a 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadSnapshotRelation.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadSnapshotRelation.scala @@ -18,14 +18,15 @@ package org.apache.hudi -import org.apache.avro.Schema import org.apache.hudi.common.model.HoodieLogFile import org.apache.hudi.common.table.{HoodieTableMetaClient, TableSchemaResolver} import org.apache.hudi.common.table.view.HoodieTableFileSystemView import org.apache.hudi.hadoop.utils.HoodieRealtimeInputFormatUtils import org.apache.hudi.hadoop.utils.HoodieRealtimeRecordReaderUtils.getMaxCompactionMemoryInBytes + import org.apache.hadoop.fs.Path import org.apache.hadoop.mapred.JobConf + import org.apache.spark.internal.Logging import org.apache.spark.rdd.RDD import org.apache.spark.sql.avro.SchemaConverters @@ -55,32 +56,22 @@ case class HoodieMergeOnReadTableState(tableStructSchema: StructType, preCombineField: Option[String], recordKeyFieldOpt: Option[String]) -class MergeOnReadSnapshotRelation(val sqlContext: SQLContext, - val optParams: Map[String, String], - val userSchema: StructType, +class MergeOnReadSnapshotRelation(sqlContext: SQLContext, + optParams: Map[String, String], + val userSchema: Option[StructType], val globPaths: Option[Seq[Path]], val metaClient: HoodieTableMetaClient) - extends BaseRelation with PrunedFilteredScan with Logging { + extends HoodieBaseRelation(sqlContext, metaClient, optParams, userSchema) { private val conf = sqlContext.sparkContext.hadoopConfiguration private val jobConf = new JobConf(conf) - // use schema from latest metadata, if not present, read schema from the data file - private val schemaResolver = new TableSchemaResolver(metaClient) - private lazy val tableAvroSchema = { - try { - schemaResolver.getTableAvroSchema - } catch { - case _: Throwable => // If there is no commit in the table, we cann't get the schema - // with schemaUtil, use the userSchema instead. - SchemaConverters.toAvroType(userSchema) - } - } - private lazy val tableStructSchema = AvroConversionUtils.convertAvroSchemaToStructType(tableAvroSchema) private val mergeType = optParams.getOrElse( DataSourceReadOptions.REALTIME_MERGE.key, DataSourceReadOptions.REALTIME_MERGE.defaultValue) + private val maxCompactionMemoryInBytes = getMaxCompactionMemoryInBytes(jobConf) + private val preCombineField = { val preCombineFieldFromTableConfig = metaClient.getTableConfig.getPreCombineField if (preCombineFieldFromTableConfig != null) { @@ -95,7 +86,6 @@ class MergeOnReadSnapshotRelation(val sqlContext: SQLContext, if (!metaClient.getTableConfig.populateMetaFields()) { recordKeyFieldOpt = Option(metaClient.getTableConfig.getRecordKeyFieldProp) } - override def schema: StructType = tableStructSchema override def needConversion: Boolean = false @@ -107,7 +97,7 @@ class MergeOnReadSnapshotRelation(val sqlContext: SQLContext, log.debug(s" buildScan filters = ${filters.mkString(",")}") val (requiredAvroSchema, requiredStructSchema) = - MergeOnReadSnapshotRelation.getRequiredSchema(tableAvroSchema, requiredColumns) + HoodieSparkUtils.getRequiredSchema(tableAvroSchema, requiredColumns) val fileIndex = buildFileIndex(filters) val hoodieTableState = HoodieMergeOnReadTableState( tableStructSchema, @@ -118,7 +108,7 @@ class MergeOnReadSnapshotRelation(val sqlContext: SQLContext, preCombineField, recordKeyFieldOpt ) - val fullSchemaParquetReader = new ParquetFileFormat().buildReaderWithPartitionValues( + val fullSchemaParquetReader = HoodieDataSourceHelper.buildHoodieParquetReader( sparkSession = sqlContext.sparkSession, dataSchema = tableStructSchema, partitionSchema = StructType(Nil), @@ -127,7 +117,8 @@ class MergeOnReadSnapshotRelation(val sqlContext: SQLContext, options = optParams, hadoopConf = sqlContext.sparkSession.sessionState.newHadoopConf() ) - val requiredSchemaParquetReader = new ParquetFileFormat().buildReaderWithPartitionValues( + + val requiredSchemaParquetReader = HoodieDataSourceHelper.buildHoodieParquetReader( sparkSession = sqlContext.sparkSession, dataSchema = tableStructSchema, partitionSchema = StructType(Nil), @@ -252,14 +243,4 @@ object MergeOnReadSnapshotRelation { path.toUri.toString } - def getRequiredSchema(tableAvroSchema: Schema, requiredColumns: Array[String]): (Schema, StructType) = { - // First get the required avro-schema, then convert the avro-schema to spark schema. - val name2Fields = tableAvroSchema.getFields.asScala.map(f => f.name() -> f).toMap - val requiredFields = requiredColumns.map(c => name2Fields(c)) - .map(f => new Schema.Field(f.name(), f.schema(), f.doc(), f.defaultVal(), f.order())).toList - val requiredAvroSchema = Schema.createRecord(tableAvroSchema.getName, tableAvroSchema.getDoc, - tableAvroSchema.getNamespace, tableAvroSchema.isError, requiredFields.asJava) - val requiredStructSchema = AvroConversionUtils.convertAvroSchemaToStructType(requiredAvroSchema) - (requiredAvroSchema, requiredStructSchema) - } } diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/streaming/HoodieStreamSource.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/streaming/HoodieStreamSource.scala index ffe9b6498..4e46233c3 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/streaming/HoodieStreamSource.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/streaming/HoodieStreamSource.scala @@ -161,12 +161,12 @@ class HoodieStreamSource( val rdd = tableType match { case HoodieTableType.COPY_ON_WRITE => val serDe = sparkAdapter.createSparkRowSerDe(RowEncoder(schema)) - new IncrementalRelation(sqlContext, incParams, schema, metaClient) + new IncrementalRelation(sqlContext, incParams, Some(schema), metaClient) .buildScan() .map(serDe.serializeRow) case HoodieTableType.MERGE_ON_READ => val requiredColumns = schema.fields.map(_.name) - new MergeOnReadIncrementalRelation(sqlContext, incParams, schema, metaClient) + new MergeOnReadIncrementalRelation(sqlContext, incParams, Some(schema), metaClient) .buildScan(requiredColumns, Array.empty[Filter]) .asInstanceOf[RDD[InternalRow]] case _ => throw new IllegalArgumentException(s"UnSupport tableType: $tableType") diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieSparkUtils.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieSparkUtils.scala index ad974286a..28cd59cb0 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieSparkUtils.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieSparkUtils.scala @@ -18,18 +18,24 @@ package org.apache.hudi +import org.apache.avro.Schema import org.apache.avro.generic.GenericRecord + import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.Path + import org.apache.hudi.testutils.DataSourceTestUtils -import org.apache.spark.sql.types.StructType + +import org.apache.spark.sql.types.{StructType, TimestampType} import org.apache.spark.sql.{Row, SparkSession} + import org.junit.jupiter.api.Assertions._ import org.junit.jupiter.api.Test import org.junit.jupiter.api.io.TempDir import java.io.File import java.nio.file.Paths + import scala.collection.JavaConverters class TestHoodieSparkUtils { @@ -232,6 +238,29 @@ class TestHoodieSparkUtils { spark.stop() } + @Test + def testGetRequiredSchema(): Unit = { + val avroSchemaString = "{\"type\":\"record\",\"name\":\"record\"," + + "\"fields\":[{\"name\":\"_hoodie_commit_time\",\"type\":[\"null\",\"string\"],\"doc\":\"\",\"default\":null}," + + "{\"name\":\"_hoodie_commit_seqno\",\"type\":[\"null\",\"string\"],\"doc\":\"\",\"default\":null}," + + "{\"name\":\"_hoodie_record_key\",\"type\":[\"null\",\"string\"],\"doc\":\"\",\"default\":null}," + + "{\"name\":\"_hoodie_partition_path\",\"type\":[\"null\",\"string\"],\"doc\":\"\",\"default\":null}," + + "{\"name\":\"_hoodie_file_name\",\"type\":[\"null\",\"string\"],\"doc\":\"\",\"default\":null}," + + "{\"name\":\"uuid\",\"type\":\"string\"},{\"name\":\"name\",\"type\":[\"null\",\"string\"],\"default\":null}," + + "{\"name\":\"age\",\"type\":[\"null\",\"int\"],\"default\":null}," + + "{\"name\":\"ts\",\"type\":[\"null\",{\"type\":\"long\",\"logicalType\":\"timestamp-millis\"}],\"default\":null}," + + "{\"name\":\"partition\",\"type\":[\"null\",\"string\"],\"default\":null}]}" + + val tableAvroSchema = new Schema.Parser().parse(avroSchemaString) + + val (requiredAvroSchema, requiredStructSchema) = + HoodieSparkUtils.getRequiredSchema(tableAvroSchema, Array("ts")) + + assertEquals("timestamp-millis", + requiredAvroSchema.getField("ts").schema().getTypes.get(1).getLogicalType.getName) + assertEquals(TimestampType, requiredStructSchema.fields(0).dataType) + } + def convertRowListToSeq(inputList: java.util.List[Row]): Seq[Row] = JavaConverters.asScalaIteratorConverter(inputList.iterator).asScala.toSeq } diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestMergeOnReadSnapshotRelation.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestMergeOnReadSnapshotRelation.scala deleted file mode 100644 index 80a883a00..000000000 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestMergeOnReadSnapshotRelation.scala +++ /dev/null @@ -1,50 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hudi - -import org.apache.avro.Schema -import org.apache.spark.sql.types.TimestampType -import org.junit.jupiter.api.Assertions.assertEquals -import org.junit.jupiter.api.Test - -class TestMergeOnReadSnapshotRelation { - - @Test - def testGetRequiredSchema(): Unit = { - val avroSchemaString = "{\"type\":\"record\",\"name\":\"record\"," + - "\"fields\":[{\"name\":\"_hoodie_commit_time\",\"type\":[\"null\",\"string\"],\"doc\":\"\",\"default\":null}," + - "{\"name\":\"_hoodie_commit_seqno\",\"type\":[\"null\",\"string\"],\"doc\":\"\",\"default\":null}," + - "{\"name\":\"_hoodie_record_key\",\"type\":[\"null\",\"string\"],\"doc\":\"\",\"default\":null}," + - "{\"name\":\"_hoodie_partition_path\",\"type\":[\"null\",\"string\"],\"doc\":\"\",\"default\":null}," + - "{\"name\":\"_hoodie_file_name\",\"type\":[\"null\",\"string\"],\"doc\":\"\",\"default\":null}," + - "{\"name\":\"uuid\",\"type\":\"string\"},{\"name\":\"name\",\"type\":[\"null\",\"string\"],\"default\":null}," + - "{\"name\":\"age\",\"type\":[\"null\",\"int\"],\"default\":null}," + - "{\"name\":\"ts\",\"type\":[\"null\",{\"type\":\"long\",\"logicalType\":\"timestamp-millis\"}],\"default\":null}," + - "{\"name\":\"partition\",\"type\":[\"null\",\"string\"],\"default\":null}]}" - - val tableAvroSchema = new Schema.Parser().parse(avroSchemaString) - - val (requiredAvroSchema, requiredStructSchema) = - MergeOnReadSnapshotRelation.getRequiredSchema(tableAvroSchema, Array("ts")) - - assertEquals("timestamp-millis", - requiredAvroSchema.getField("ts").schema().getTypes.get(1).getLogicalType.getName) - assertEquals(TimestampType, requiredStructSchema.fields(0).dataType) - } -} diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala index ad6d40e00..e998f7cfa 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala @@ -833,4 +833,48 @@ class TestCOWDataSource extends HoodieClientTestBase { assertEquals(inputDF2.sort("_row_key").select("shortDecimal").collect().map(_.getDecimal(0).toPlainString).mkString(","), readResult.sort("_row_key").select("shortDecimal").collect().map(_.getDecimal(0).toPlainString).mkString(",")) } + + @Test + def testHoodieBaseFileOnlyViewRelation(): Unit = { + val _spark = spark + import _spark.implicits._ + + val df = Seq((1, "z3", 30, "v1", "2018-09-23"), (2, "z3", 35, "v1", "2018-09-24")) + .toDF("id", "name", "age", "ts", "data_date") + + df.write.format("hudi") + .options(commonOpts) + .option(DataSourceWriteOptions.RECORDKEY_FIELD.key, "id") + .option(DataSourceWriteOptions.RECORDKEY_FIELD.key, "id") + .option(DataSourceWriteOptions.RECORDKEY_FIELD.key, "id") + .option("hoodie.insert.shuffle.parallelism", "4") + .option("hoodie.upsert.shuffle.parallelism", "4") + .option("hoodie.bulkinsert.shuffle.parallelism", "2") + .option(DataSourceWriteOptions.RECORDKEY_FIELD.key, "id") + .option(DataSourceWriteOptions.PARTITIONPATH_FIELD.key, "data_date") + .option(DataSourceWriteOptions.PRECOMBINE_FIELD.key, "ts") + .option(DataSourceWriteOptions.KEYGENERATOR_CLASS_NAME.key, "org.apache.hudi.keygen.TimestampBasedKeyGenerator") + .option(TimestampBasedAvroKeyGenerator.Config.TIMESTAMP_TYPE_FIELD_PROP, "DATE_STRING") + .option(TimestampBasedAvroKeyGenerator.Config.TIMESTAMP_OUTPUT_DATE_FORMAT_PROP, "yyyy/MM/dd") + .option(TimestampBasedAvroKeyGenerator.Config.TIMESTAMP_TIMEZONE_FORMAT_PROP, "GMT+8:00") + .option(TimestampBasedAvroKeyGenerator.Config.TIMESTAMP_INPUT_DATE_FORMAT_PROP, "yyyy-MM-dd") + .mode(org.apache.spark.sql.SaveMode.Append) + .save(basePath) + + val res = spark.read.format("hudi").load(basePath) + + assert(res.count() == 2) + + // data_date is the partition field. Persist to the parquet file using the origin values, and read it. + assertTrue( + res.select("data_date").map(_.get(0).toString).collect().sorted.sameElements( + Array("2018-09-23", "2018-09-24") + ) + ) + assertTrue( + res.select("_hoodie_partition_path").map(_.get(0).toString).collect().sorted.sameElements( + Array("2018/09/23", "2018/09/24") + ) + ) + } } diff --git a/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/adapter/Spark2Adapter.scala b/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/adapter/Spark2Adapter.scala index bf1cd2448..94d1c80fc 100644 --- a/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/adapter/Spark2Adapter.scala +++ b/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/adapter/Spark2Adapter.scala @@ -19,6 +19,7 @@ package org.apache.spark.sql.adapter import org.apache.hudi.Spark2RowSerDe import org.apache.hudi.client.utils.SparkRowSerDe + import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder import org.apache.spark.sql.catalyst.expressions.{Expression, Like} @@ -26,12 +27,14 @@ import org.apache.spark.sql.catalyst.parser.ParserInterface import org.apache.spark.sql.catalyst.plans.JoinType import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoTable, Join, LogicalPlan} import org.apache.spark.sql.catalyst.{AliasIdentifier, TableIdentifier} -import org.apache.spark.sql.execution.datasources.{Spark2ParsePartitionUtil, SparkParsePartitionUtil} +import org.apache.spark.sql.execution.datasources.{FilePartition, PartitionedFile, Spark2ParsePartitionUtil, SparkParsePartitionUtil} import org.apache.spark.sql.hudi.SparkAdapter import org.apache.spark.sql.hudi.parser.HoodieSpark2ExtendedSqlParser import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.{Row, SparkSession} +import scala.collection.mutable.ArrayBuffer + /** * The adapter for spark2. */ @@ -86,4 +89,44 @@ class Spark2Adapter extends SparkAdapter { override def parseMultipartIdentifier(parser: ParserInterface, sqlText: String): Seq[String] = { throw new IllegalStateException(s"Should not call ParserInterface#parseMultipartIdentifier for spark2") } + + /** + * Combine [[PartitionedFile]] to [[FilePartition]] according to `maxSplitBytes`. + * + * This is a copy of org.apache.spark.sql.execution.datasources.FilePartition#getFilePartitions from Spark 3.2. + * And this will be called only in Spark 2. + */ + override def getFilePartitions( + sparkSession: SparkSession, + partitionedFiles: Seq[PartitionedFile], + maxSplitBytes: Long): Seq[FilePartition] = { + + val partitions = new ArrayBuffer[FilePartition] + val currentFiles = new ArrayBuffer[PartitionedFile] + var currentSize = 0L + + /** Close the current partition and move to the next. */ + def closePartition(): Unit = { + if (currentFiles.nonEmpty) { + // Copy to a new Array. + val newPartition = FilePartition(partitions.size, currentFiles.toArray) + partitions += newPartition + } + currentFiles.clear() + currentSize = 0 + } + + val openCostInBytes = sparkSession.sessionState.conf.filesOpenCostInBytes + // Assign files to partitions using "Next Fit Decreasing" + partitionedFiles.foreach { file => + if (currentSize + file.length > maxSplitBytes) { + closePartition() + } + // Add the given file to the current partition. + currentSize += file.length + openCostInBytes + currentFiles += file + } + closePartition() + partitions.toSeq + } } diff --git a/hudi-spark-datasource/hudi-spark3-common/src/main/scala/org/apache/spark/sql/adapter/Spark3Adapter.scala b/hudi-spark-datasource/hudi-spark3-common/src/main/scala/org/apache/spark/sql/adapter/Spark3Adapter.scala index 61fcc9634..f446715a6 100644 --- a/hudi-spark-datasource/hudi-spark3-common/src/main/scala/org/apache/spark/sql/adapter/Spark3Adapter.scala +++ b/hudi-spark-datasource/hudi-spark3-common/src/main/scala/org/apache/spark/sql/adapter/Spark3Adapter.scala @@ -20,6 +20,7 @@ package org.apache.spark.sql.adapter import org.apache.hudi.Spark3RowSerDe import org.apache.hudi.client.utils.SparkRowSerDe import org.apache.hudi.spark3.internal.ReflectUtil + import org.apache.spark.sql.{Row, SparkSession} import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder @@ -29,9 +30,8 @@ import org.apache.spark.sql.catalyst.plans.JoinType import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoStatement, Join, JoinHint, LogicalPlan} import org.apache.spark.sql.catalyst.{AliasIdentifier, TableIdentifier} import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._ -import org.apache.spark.sql.connector.catalog.Table -import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation -import org.apache.spark.sql.execution.datasources.{LogicalRelation, Spark3ParsePartitionUtil, SparkParsePartitionUtil} +import org.apache.spark.sql.execution.datasources.{Spark3ParsePartitionUtil, SparkParsePartitionUtil} +import org.apache.spark.sql.execution.datasources.{FilePartition, PartitionedFile} import org.apache.spark.sql.hudi.SparkAdapter import org.apache.spark.sql.internal.SQLConf @@ -94,4 +94,14 @@ class Spark3Adapter extends SparkAdapter { override def parseMultipartIdentifier(parser: ParserInterface, sqlText: String): Seq[String] = { parser.parseMultipartIdentifier(sqlText) } + + /** + * Combine [[PartitionedFile]] to [[FilePartition]] according to `maxSplitBytes`. + */ + override def getFilePartitions( + sparkSession: SparkSession, + partitionedFiles: Seq[PartitionedFile], + maxSplitBytes: Long): Seq[FilePartition] = { + FilePartition.getFilePartitions(sparkSession, partitionedFiles, maxSplitBytes) + } }