1
0

[HUDI-3902] Fallback to HadoopFsRelation in cases non-involving Schema Evolution (#5352)

Co-authored-by: Raymond Xu <2701446+xushiyan@users.noreply.github.com>
This commit is contained in:
Alexey Kudinkin
2022-04-19 10:40:20 -07:00
committed by GitHub
parent 9af7b09aec
commit 81bf771e56
8 changed files with 156 additions and 39 deletions

View File

@@ -20,13 +20,15 @@ package org.apache.hudi
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.apache.hudi.HoodieBaseRelation.createBaseFileReader
import org.apache.hudi.common.model.HoodieFileFormat
import org.apache.hudi.common.table.HoodieTableMetaClient
import org.apache.hudi.hadoop.HoodieROTablePathFilter
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.catalyst.expressions.Expression
import org.apache.spark.sql.execution.datasources._
import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat
import org.apache.spark.sql.hive.orc.OrcFileFormat
import org.apache.spark.sql.sources.{BaseRelation, Filter}
import org.apache.spark.sql.types.StructType
@@ -104,4 +106,48 @@ class BaseFileOnlyRelation(sqlContext: SQLContext,
sparkAdapter.getFilePartitions(sparkSession, fileSplits, maxSplitBytes)
.map(HoodieBaseFileSplit.apply)
}
/**
* NOTE: We have to fallback to [[HadoopFsRelation]] to make sure that all of the Spark optimizations could be
* equally applied to Hudi tables, since some of those are predicated on the usage of [[HadoopFsRelation]],
* and won't be applicable in case of us using our own custom relations (one of such optimizations is [[SchemaPruning]]
* rule; you can find more details in HUDI-3896)
*/
def toHadoopFsRelation: HadoopFsRelation = {
val (tableFileFormat, formatClassName) = metaClient.getTableConfig.getBaseFileFormat match {
case HoodieFileFormat.PARQUET => (new ParquetFileFormat, "parquet")
case HoodieFileFormat.ORC => (new OrcFileFormat, "orc")
}
if (globPaths.isEmpty) {
HadoopFsRelation(
location = fileIndex,
partitionSchema = fileIndex.partitionSchema,
dataSchema = fileIndex.dataSchema,
bucketSpec = None,
fileFormat = tableFileFormat,
optParams)(sparkSession)
} else {
val readPathsStr = optParams.get(DataSourceReadOptions.READ_PATHS.key)
val extraReadPaths = readPathsStr.map(p => p.split(",").toSeq).getOrElse(Seq())
DataSource.apply(
sparkSession = sparkSession,
paths = extraReadPaths,
userSpecifiedSchema = userSchema,
className = formatClassName,
// Since we're reading the table as just collection of files we have to make sure
// we only read the latest version of every Hudi's file-group, which might be compacted, clustered, etc.
// while keeping previous versions of the files around as well.
//
// We rely on [[HoodieROTablePathFilter]], to do proper filtering to assure that
options = optParams ++ Map(
"mapreduce.input.pathFilter.class" -> classOf[HoodieROTablePathFilter].getName
),
partitionColumns = partitionColumns
)
.resolveRelation()
.asInstanceOf[HadoopFsRelation]
}
}
}

View File

@@ -21,11 +21,13 @@ import org.apache.hadoop.fs.Path
import org.apache.hudi.DataSourceReadOptions._
import org.apache.hudi.DataSourceWriteOptions.{BOOTSTRAP_OPERATION_OPT_VAL, OPERATION}
import org.apache.hudi.common.fs.FSUtils
import org.apache.hudi.common.model.{HoodieFileFormat, HoodieRecord}
import org.apache.hudi.common.model.HoodieRecord
import org.apache.hudi.common.model.HoodieTableType.{COPY_ON_WRITE, MERGE_ON_READ}
import org.apache.hudi.common.table.timeline.HoodieInstant
import org.apache.hudi.common.table.{HoodieTableMetaClient, TableSchemaResolver}
import org.apache.hudi.config.HoodieWriteConfig.SCHEMA_EVOLUTION_ENABLE
import org.apache.hudi.exception.HoodieException
import org.apache.hudi.internal.schema.InternalSchema
import org.apache.log4j.LogManager
import org.apache.spark.sql.execution.streaming.{Sink, Source}
import org.apache.spark.sql.hudi.streaming.HoodieStreamSource
@@ -108,7 +110,7 @@ class DefaultSource extends RelationProvider
case (COPY_ON_WRITE, QUERY_TYPE_SNAPSHOT_OPT_VAL, false) |
(COPY_ON_WRITE, QUERY_TYPE_READ_OPTIMIZED_OPT_VAL, false) |
(MERGE_ON_READ, QUERY_TYPE_READ_OPTIMIZED_OPT_VAL, false) =>
new BaseFileOnlyRelation(sqlContext, metaClient, parameters, userSchema, globPaths)
resolveBaseFileOnlyRelation(sqlContext, globPaths, userSchema, metaClient, parameters)
case (COPY_ON_WRITE, QUERY_TYPE_INCREMENTAL_OPT_VAL, _) =>
new IncrementalRelation(sqlContext, parameters, userSchema, metaClient)
@@ -141,7 +143,7 @@ class DefaultSource extends RelationProvider
*
* TODO: Revisit to return a concrete relation here when we support CREATE TABLE AS for Hudi with DataSource API.
* That is the only case where Spark seems to actually need a relation to be returned here
* [[DataSource.writeAndRead()]]
* [[org.apache.spark.sql.execution.datasources.DataSource.writeAndRead()]]
*
* @param sqlContext Spark SQL Context
* @param mode Mode for saving the DataFrame at the destination
@@ -206,4 +208,32 @@ class DefaultSource extends RelationProvider
parameters: Map[String, String]): Source = {
new HoodieStreamSource(sqlContext, metadataPath, schema, parameters)
}
private def resolveBaseFileOnlyRelation(sqlContext: SQLContext,
globPaths: Seq[Path],
userSchema: Option[StructType],
metaClient: HoodieTableMetaClient,
optParams: Map[String, String]) = {
val baseRelation = new BaseFileOnlyRelation(sqlContext, metaClient, optParams, userSchema, globPaths)
val enableSchemaOnRead: Boolean = !tryFetchInternalSchema(metaClient).isEmptySchema
// NOTE: We fallback to [[HadoopFsRelation]] in all of the cases except ones requiring usage of
// [[BaseFileOnlyRelation]] to function correctly. This is necessary to maintain performance parity w/
// vanilla Spark, since some of the Spark optimizations are predicated on the using of [[HadoopFsRelation]].
//
// You can check out HUDI-3896 for more details
if (enableSchemaOnRead) {
baseRelation
} else {
baseRelation.toHadoopFsRelation
}
}
private def tryFetchInternalSchema(metaClient: HoodieTableMetaClient) =
try {
new TableSchemaResolver(metaClient).getTableInternalSchemaFromCommitMetadata
.orElse(InternalSchema.getEmptyInternalSchema)
} catch {
case _: Exception => InternalSchema.getEmptyInternalSchema
}
}

View File

@@ -19,12 +19,10 @@ package org.apache.hudi
import org.apache.avro.Schema
import org.apache.avro.generic.GenericRecord
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileStatus, Path}
import org.apache.hadoop.hbase.io.hfile.CacheConfig
import org.apache.hadoop.mapred.JobConf
import org.apache.hudi.HoodieBaseRelation.getPartitionPath
import org.apache.hudi.HoodieConversionUtils.toScalaOption
import org.apache.hudi.common.config.{HoodieMetadataConfig, SerializableConfiguration}
@@ -38,13 +36,13 @@ import org.apache.hudi.common.util.ValidationUtils.checkState
import org.apache.hudi.internal.schema.InternalSchema
import org.apache.hudi.internal.schema.convert.AvroInternalSchemaConverter
import org.apache.hudi.io.storage.HoodieHFileReader
import org.apache.spark.TaskContext
import org.apache.spark.execution.datasources.HoodieInMemoryFileIndex
import org.apache.spark.internal.Logging
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.{Expression, SubqueryExpression}
import org.apache.spark.sql.execution.FileRelation
import org.apache.spark.sql.execution.datasources.{FileStatusCache, PartitionedFile, PartitioningUtils}
import org.apache.spark.sql.hudi.HoodieSqlCommonUtils
import org.apache.spark.sql.sources.{BaseRelation, Filter, PrunedFilteredScan}
@@ -54,7 +52,6 @@ import org.apache.spark.unsafe.types.UTF8String
import java.io.Closeable
import java.net.URI
import scala.collection.JavaConverters._
import scala.util.Try
import scala.util.control.NonFatal
@@ -78,7 +75,11 @@ abstract class HoodieBaseRelation(val sqlContext: SQLContext,
val metaClient: HoodieTableMetaClient,
val optParams: Map[String, String],
userSchema: Option[StructType])
extends BaseRelation with PrunedFilteredScan with Logging with SparkAdapterSupport {
extends BaseRelation
with FileRelation
with PrunedFilteredScan
with SparkAdapterSupport
with Logging {
type FileSplit <: HoodieFileSplit
@@ -198,6 +199,8 @@ abstract class HoodieBaseRelation(val sqlContext: SQLContext,
*/
override final def needConversion: Boolean = false
override def inputFiles: Array[String] = fileIndex.allFiles.map(_.getPath.toUri.toString).toArray
/**
* NOTE: DO NOT OVERRIDE THIS METHOD
*/
@@ -255,6 +258,8 @@ abstract class HoodieBaseRelation(val sqlContext: SQLContext,
sparkSession.sparkContext.emptyRDD
}
/**
* Composes RDD provided file splits to read from, table and partition schemas, data filters to be applied
*