1
0

[HUDI-3896] Porting Nested Schema Pruning optimization for Hudi's custom Relations (#5428)

Currently, all Hudi Relations bear performance gap relative to Spark's HadoopFsRelation 
and the reason to that is SchemaPruning optimization rule (pruning nested schemas) 
that is unfortunately predicated on usage of HadoopFsRelation, meaning that it's 
not applied in cases when any other relation is used.

This change is porting this rule to Hudi relations (MOR, Incremental, etc) 
by the virtue of leveraging HoodieSparkSessionExtensions mechanism 
injecting modified version of the original SchemaPruning rule 
that is adopted to work w/ Hudi's custom relations.

- Added customOptimizerRules to HoodieAnalysis
- Added NestedSchemaPrunning Spark's Optimizer rule
- Handle Spark's Optimizer pruned data schema (to effectively prune nested schemas)
- Enable HoodieClientTestHarness to inject HoodieSparkSessionExtensions
- Injecting Spark Session extensions for TestMORDataSource, TestCOWDataSource
- Disabled fallback to HadoopFsRelation
This commit is contained in:
Alexey Kudinkin
2022-07-21 02:36:06 -07:00
committed by GitHub
parent 2394c62973
commit de37774e12
42 changed files with 1220 additions and 500 deletions

View File

@@ -44,25 +44,24 @@ import org.apache.hudi.avro.HoodieAvroUtils
import scala.collection.JavaConverters._
object HoodieSparkUtils extends SparkAdapterSupport {
private[hudi] trait SparkVersionsSupport {
def getSparkVersion: String
def isSpark2: Boolean = SPARK_VERSION.startsWith("2.")
def isSpark2: Boolean = getSparkVersion.startsWith("2.")
def isSpark3: Boolean = getSparkVersion.startsWith("3.")
def isSpark3_0: Boolean = getSparkVersion.startsWith("3.0")
def isSpark3_1: Boolean = getSparkVersion.startsWith("3.1")
def isSpark3_2: Boolean = getSparkVersion.startsWith("3.2")
def isSpark3: Boolean = SPARK_VERSION.startsWith("3.")
def gteqSpark3_1: Boolean = getSparkVersion >= "3.1"
def gteqSpark3_1_3: Boolean = getSparkVersion >= "3.1.3"
def gteqSpark3_2: Boolean = getSparkVersion >= "3.2"
def gteqSpark3_2_1: Boolean = getSparkVersion >= "3.2.1"
}
def isSpark3_0: Boolean = SPARK_VERSION.startsWith("3.0")
object HoodieSparkUtils extends SparkAdapterSupport with SparkVersionsSupport {
def isSpark3_1: Boolean = SPARK_VERSION.startsWith("3.1")
def gteqSpark3_1: Boolean = SPARK_VERSION > "3.1"
def gteqSpark3_1_3: Boolean = SPARK_VERSION >= "3.1.3"
def isSpark3_2: Boolean = SPARK_VERSION.startsWith("3.2")
def gteqSpark3_2: Boolean = SPARK_VERSION > "3.2"
def gteqSpark3_2_1: Boolean = SPARK_VERSION >= "3.2.1"
override def getSparkVersion: String = SPARK_VERSION
def getMetaSchema: StructType = {
StructType(HoodieRecord.HOODIE_META_COLUMNS.asScala.map(col => {
@@ -268,15 +267,15 @@ object HoodieSparkUtils extends SparkAdapterSupport {
case StringStartsWith(attribute, value) =>
val leftExp = toAttribute(attribute, tableSchema)
val rightExp = Literal.create(s"$value%")
sparkAdapter.createLike(leftExp, rightExp)
sparkAdapter.getCatalystPlanUtils.createLike(leftExp, rightExp)
case StringEndsWith(attribute, value) =>
val leftExp = toAttribute(attribute, tableSchema)
val rightExp = Literal.create(s"%$value")
sparkAdapter.createLike(leftExp, rightExp)
sparkAdapter.getCatalystPlanUtils.createLike(leftExp, rightExp)
case StringContains(attribute, value) =>
val leftExp = toAttribute(attribute, tableSchema)
val rightExp = Literal.create(s"%$value%")
sparkAdapter.createLike(leftExp, rightExp)
sparkAdapter.getCatalystPlanUtils.createLike(leftExp, rightExp)
case _ => null
}
)
@@ -318,38 +317,4 @@ object HoodieSparkUtils extends SparkAdapterSupport {
s"${tableSchema.fieldNames.mkString(",")}")
AttributeReference(columnName, field.get.dataType, field.get.nullable)()
}
def getRequiredSchema(tableAvroSchema: Schema, requiredColumns: Array[String], internalSchema: InternalSchema = InternalSchema.getEmptyInternalSchema): (Schema, StructType, InternalSchema) = {
if (internalSchema.isEmptySchema || requiredColumns.isEmpty) {
// First get the required avro-schema, then convert the avro-schema to spark schema.
val name2Fields = tableAvroSchema.getFields.asScala.map(f => f.name() -> f).toMap
// Here have to create a new Schema.Field object
// to prevent throwing exceptions like "org.apache.avro.AvroRuntimeException: Field already used".
val requiredFields = requiredColumns.map(c => name2Fields(c))
.map(f => new Schema.Field(f.name(), f.schema(), f.doc(), f.defaultVal(), f.order())).toList
val requiredAvroSchema = Schema.createRecord(tableAvroSchema.getName, tableAvroSchema.getDoc,
tableAvroSchema.getNamespace, tableAvroSchema.isError, requiredFields.asJava)
val requiredStructSchema = AvroConversionUtils.convertAvroSchemaToStructType(requiredAvroSchema)
(requiredAvroSchema, requiredStructSchema, internalSchema)
} else {
// now we support nested project
val prunedInternalSchema = InternalSchemaUtils.pruneInternalSchema(internalSchema, requiredColumns.toList.asJava)
val requiredAvroSchema = AvroInternalSchemaConverter.convert(prunedInternalSchema, tableAvroSchema.getName)
val requiredStructSchema = AvroConversionUtils.convertAvroSchemaToStructType(requiredAvroSchema)
(requiredAvroSchema, requiredStructSchema, prunedInternalSchema)
}
}
def toAttribute(tableSchema: StructType): Seq[AttributeReference] = {
tableSchema.map { field =>
AttributeReference(field.name, field.dataType, field.nullable, field.metadata)()
}
}
def collectFieldIndexes(projectedSchema: StructType, originalSchema: StructType): Seq[Int] = {
val nameToIndex = originalSchema.fields.zipWithIndex.map{ case (field, index) =>
field.name -> index
}.toMap
projectedSchema.map(field => nameToIndex(field.name))
}
}

View File

@@ -0,0 +1,77 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql
import org.apache.spark.sql.catalyst.{AliasIdentifier, TableIdentifier}
import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation
import org.apache.spark.sql.catalyst.expressions.Expression
import org.apache.spark.sql.catalyst.plans.JoinType
import org.apache.spark.sql.catalyst.plans.logical.{Join, LogicalPlan}
trait HoodieCatalystPlansUtils {
def createExplainCommand(plan: LogicalPlan, extended: Boolean): LogicalPlan
/**
* Convert a AliasIdentifier to TableIdentifier.
*/
def toTableIdentifier(aliasId: AliasIdentifier): TableIdentifier
/**
* Convert a UnresolvedRelation to TableIdentifier.
*/
def toTableIdentifier(relation: UnresolvedRelation): TableIdentifier
/**
* Create Join logical plan.
*/
def createJoin(left: LogicalPlan, right: LogicalPlan, joinType: JoinType): Join
/**
* Test if the logical plan is a Insert Into LogicalPlan.
*/
def isInsertInto(plan: LogicalPlan): Boolean
/**
* Get the member of the Insert Into LogicalPlan.
*/
def getInsertIntoChildren(plan: LogicalPlan):
Option[(LogicalPlan, Map[String, Option[String]], LogicalPlan, Boolean, Boolean)]
/**
* if the logical plan is a TimeTravelRelation LogicalPlan.
*/
def isRelationTimeTravel(plan: LogicalPlan): Boolean
/**
* Get the member of the TimeTravelRelation LogicalPlan.
*/
def getRelationTimeTravel(plan: LogicalPlan): Option[(LogicalPlan, Option[Expression], Option[String])]
/**
* Create a Insert Into LogicalPlan.
*/
def createInsertInto(table: LogicalPlan, partition: Map[String, Option[String]],
query: LogicalPlan, overwrite: Boolean, ifPartitionNotExists: Boolean): LogicalPlan
/**
* Create Like expression.
*/
def createLike(left: Expression, right: Expression): Expression
}

View File

@@ -21,20 +21,18 @@ package org.apache.spark.sql.hudi
import org.apache.avro.Schema
import org.apache.hudi.client.utils.SparkRowSerDe
import org.apache.spark.sql.avro.{HoodieAvroDeserializer, HoodieAvroSchemaConverters, HoodieAvroSerializer}
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation
import org.apache.spark.sql.catalyst.catalog.CatalogTable
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
import org.apache.spark.sql.catalyst.expressions.{Expression, InterpretedPredicate}
import org.apache.spark.sql.catalyst.parser.ParserInterface
import org.apache.spark.sql.catalyst.plans.JoinType
import org.apache.spark.sql.catalyst.plans.logical.{Join, LogicalPlan, SubqueryAlias}
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.catalyst.{AliasIdentifier, TableIdentifier}
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, SubqueryAlias}
import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat
import org.apache.spark.sql.execution.datasources.{FilePartition, LogicalRelation, PartitionedFile, SparkParsePartitionUtil}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types.DataType
import org.apache.spark.sql.{HoodieCatalystExpressionUtils, Row, SparkSession}
import org.apache.spark.sql.{HoodieCatalystExpressionUtils, HoodieCatalystPlansUtils, Row, SparkSession}
import java.util.Locale
@@ -45,9 +43,15 @@ trait SparkAdapter extends Serializable {
/**
* Creates instance of [[HoodieCatalystExpressionUtils]] providing for common utils operating
* on Catalyst Expressions
* on Catalyst [[Expression]]s
*/
def createCatalystExpressionUtils(): HoodieCatalystExpressionUtils
def getCatalystExpressionUtils(): HoodieCatalystExpressionUtils
/**
* Creates instance of [[HoodieCatalystPlansUtils]] providing for common utils operating
* on Catalyst [[LogicalPlan]]s
*/
def getCatalystPlanUtils: HoodieCatalystPlansUtils
/**
* Creates instance of [[HoodieAvroSerializer]] providing for ability to serialize
@@ -71,48 +75,6 @@ trait SparkAdapter extends Serializable {
*/
def createSparkRowSerDe(encoder: ExpressionEncoder[Row]): SparkRowSerDe
/**
* Convert a AliasIdentifier to TableIdentifier.
*/
def toTableIdentifier(aliasId: AliasIdentifier): TableIdentifier
/**
* Convert a UnresolvedRelation to TableIdentifier.
*/
def toTableIdentifier(relation: UnresolvedRelation): TableIdentifier
/**
* Create Join logical plan.
*/
def createJoin(left: LogicalPlan, right: LogicalPlan, joinType: JoinType): Join
/**
* Test if the logical plan is a Insert Into LogicalPlan.
*/
def isInsertInto(plan: LogicalPlan): Boolean
/**
* Get the member of the Insert Into LogicalPlan.
*/
def getInsertIntoChildren(plan: LogicalPlan):
Option[(LogicalPlan, Map[String, Option[String]], LogicalPlan, Boolean, Boolean)]
/**
* if the logical plan is a TimeTravelRelation LogicalPlan.
*/
def isRelationTimeTravel(plan: LogicalPlan): Boolean
/**
* Get the member of the TimeTravelRelation LogicalPlan.
*/
def getRelationTimeTravel(plan: LogicalPlan): Option[(LogicalPlan, Option[Expression], Option[String])]
/**
* Create a Insert Into LogicalPlan.
*/
def createInsertInto(table: LogicalPlan, partition: Map[String, Option[String]],
query: LogicalPlan, overwrite: Boolean, ifPartitionNotExists: Boolean): LogicalPlan
/**
* Create the hoodie's extended spark sql parser.
*/
@@ -123,11 +85,6 @@ trait SparkAdapter extends Serializable {
*/
def createSparkParsePartitionUtil(conf: SQLConf): SparkParsePartitionUtil
/**
* Create Like expression.
*/
def createLike(left: Expression, right: Expression): Expression
/**
* ParserInterface#parseMultipartIdentifier is supported since spark3, for spark2 this should not be called.
*/
@@ -143,7 +100,7 @@ trait SparkAdapter extends Serializable {
unfoldSubqueryAliases(table) match {
case LogicalRelation(_, _, Some(table), _) => isHoodieTable(table)
case relation: UnresolvedRelation =>
isHoodieTable(toTableIdentifier(relation), spark)
isHoodieTable(getCatalystPlanUtils.toTableIdentifier(relation), spark)
case _=> false
}
}
@@ -177,6 +134,8 @@ trait SparkAdapter extends Serializable {
/**
* Create instance of [[InterpretedPredicate]]
*
* TODO move to HoodieCatalystExpressionUtils
*/
def createInterpretedPredicate(e: Expression): InterpretedPredicate
}

View File

@@ -199,7 +199,7 @@ public abstract class HoodieClientTestHarness extends HoodieCommonTestHarness im
if (sparkSessionExtensionsInjector.isPresent()) {
// In case we need to inject extensions into Spark Session, we have
// to stop any session that might still be active and since Spark will try
// to stop any session that might still be active, since Spark will try
// to re-use it
HoodieConversionUtils.toJavaOption(SparkSession.getActiveSession())
.ifPresent(SparkSession::stop);

View File

@@ -33,6 +33,7 @@ public class HoodieExampleSparkUtils {
Map<String, String> additionalConfigs = new HashMap<>();
additionalConfigs.put("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
additionalConfigs.put("spark.kryoserializer.buffer.max", "512m");
additionalConfigs.put("spark.sql.extensions", "org.apache.spark.sql.hudi.HoodieSparkSessionExtension");
return additionalConfigs;
}

View File

@@ -20,14 +20,12 @@ package org.apache.hudi
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.apache.hudi.common.model.HoodieFileFormat
import org.apache.hudi.common.table.HoodieTableMetaClient
import org.apache.hudi.hadoop.HoodieROTablePathFilter
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.catalyst.expressions.Expression
import org.apache.spark.sql.execution.datasources
import org.apache.spark.sql.execution.datasources._
import org.apache.spark.sql.execution.datasources.parquet.HoodieParquetFileFormat
import org.apache.spark.sql.hive.orc.OrcFileFormat
import org.apache.spark.sql.sources.{BaseRelation, Filter}
import org.apache.spark.sql.types.StructType
@@ -59,10 +57,8 @@ class BaseFileOnlyRelation(sqlContext: SQLContext,
// For more details please check HUDI-4161
// NOTE: This override has to mirror semantic of whenever this Relation is converted into [[HadoopFsRelation]],
// which is currently done for all cases, except when Schema Evolution is enabled
override protected val shouldExtractPartitionValuesFromPartitionPath: Boolean = {
val enableSchemaOnRead = !internalSchema.isEmptySchema
!enableSchemaOnRead
}
override protected val shouldExtractPartitionValuesFromPartitionPath: Boolean =
internalSchemaOpt.isEmpty
override lazy val mandatoryFields: Seq[String] =
// TODO reconcile, record's key shouldn't be mandatory for base-file only relation
@@ -88,7 +84,7 @@ class BaseFileOnlyRelation(sqlContext: SQLContext,
options = optParams,
// NOTE: We have to fork the Hadoop Config here as Spark will be modifying it
// to configure Parquet reader appropriately
hadoopConf = HoodieDataSourceHelper.getConfigurationWithInternalSchema(new Configuration(conf), requiredSchema.internalSchema, metaClient.getBasePath, validCommits)
hadoopConf = embedInternalSchema(new Configuration(conf), requiredSchema.internalSchema)
)
new HoodieFileScanRDD(sparkSession, baseFileReader, fileSplits)
@@ -124,16 +120,6 @@ class BaseFileOnlyRelation(sqlContext: SQLContext,
* rule; you can find more details in HUDI-3896)
*/
def toHadoopFsRelation: HadoopFsRelation = {
val (tableFileFormat, formatClassName) =
metaClient.getTableConfig.getBaseFileFormat match {
case HoodieFileFormat.ORC => (new OrcFileFormat, "orc")
case HoodieFileFormat.PARQUET =>
// We're delegating to Spark to append partition values to every row only in cases
// when these corresponding partition-values are not persisted w/in the data file itself
val parquetFileFormat = sparkAdapter.createHoodieParquetFileFormat(shouldExtractPartitionValuesFromPartitionPath).get
(parquetFileFormat, HoodieParquetFileFormat.FILE_FORMAT_ID)
}
if (globPaths.isEmpty) {
// NOTE: There are currently 2 ways partition values could be fetched:
// - Source columns (producing the values used for physical partitioning) will be read
@@ -157,27 +143,46 @@ class BaseFileOnlyRelation(sqlContext: SQLContext,
partitionSchema = partitionSchema,
dataSchema = dataSchema,
bucketSpec = None,
fileFormat = tableFileFormat,
fileFormat = fileFormat,
optParams)(sparkSession)
} else {
val readPathsStr = optParams.get(DataSourceReadOptions.READ_PATHS.key)
val extraReadPaths = readPathsStr.map(p => p.split(",").toSeq).getOrElse(Seq())
// NOTE: Spark is able to infer partitioning values from partition path only when Hive-style partitioning
// scheme is used. Therefore, we fallback to reading the table as non-partitioned (specifying
// partitionColumns = Seq.empty) whenever Hive-style partitioning is not involved
val partitionColumns: Seq[String] = if (tableConfig.getHiveStylePartitioningEnable.toBoolean) {
this.partitionColumns
} else {
Seq.empty
}
DataSource.apply(
sparkSession = sparkSession,
paths = extraReadPaths,
// Here we should specify the schema to the latest commit schema since
// the table schema evolution.
userSpecifiedSchema = userSchema.orElse(Some(tableStructSchema)),
className = formatClassName,
// Since we're reading the table as just collection of files we have to make sure
// we only read the latest version of every Hudi's file-group, which might be compacted, clustered, etc.
// while keeping previous versions of the files around as well.
//
// We rely on [[HoodieROTablePathFilter]], to do proper filtering to assure that
className = fileFormatClassName,
options = optParams ++ Map(
"mapreduce.input.pathFilter.class" -> classOf[HoodieROTablePathFilter].getName
)
// Since we're reading the table as just collection of files we have to make sure
// we only read the latest version of every Hudi's file-group, which might be compacted, clustered, etc.
// while keeping previous versions of the files around as well.
//
// We rely on [[HoodieROTablePathFilter]], to do proper filtering to assure that
"mapreduce.input.pathFilter.class" -> classOf[HoodieROTablePathFilter].getName,
// We have to override [[EXTRACT_PARTITION_VALUES_FROM_PARTITION_PATH]] setting, since
// the relation might have this setting overridden
DataSourceReadOptions.EXTRACT_PARTITION_VALUES_FROM_PARTITION_PATH.key -> shouldExtractPartitionValuesFromPartitionPath.toString,
// NOTE: We have to specify table's base-path explicitly, since we're requesting Spark to read it as a
// list of globbed paths which complicates partitioning discovery for Spark.
// Please check [[PartitioningAwareFileIndex#basePaths]] comment for more details.
PartitioningAwareFileIndex.BASE_PATH_PARAM -> metaClient.getBasePathV2.toString
),
partitionColumns = partitionColumns
)
.resolveRelation()
.asInstanceOf[HadoopFsRelation]

View File

@@ -26,6 +26,7 @@ import org.apache.hudi.common.model.HoodieTableType.{COPY_ON_WRITE, MERGE_ON_REA
import org.apache.hudi.common.table.timeline.HoodieInstant
import org.apache.hudi.common.table.{HoodieTableMetaClient, TableSchemaResolver}
import org.apache.hudi.exception.HoodieException
import org.apache.hudi.metadata.HoodieTableMetadata.isMetadataTable
import org.apache.log4j.LogManager
import org.apache.spark.sql.execution.streaming.{Sink, Source}
import org.apache.spark.sql.hudi.HoodieSqlCommonUtils.isUsingHiveCatalog
@@ -127,6 +128,7 @@ class DefaultSource extends RelationProvider
(COPY_ON_WRITE, QUERY_TYPE_READ_OPTIMIZED_OPT_VAL, false) |
(MERGE_ON_READ, QUERY_TYPE_READ_OPTIMIZED_OPT_VAL, false) =>
resolveBaseFileOnlyRelation(sqlContext, globPaths, userSchema, metaClient, parameters)
case (COPY_ON_WRITE, QUERY_TYPE_INCREMENTAL_OPT_VAL, _) =>
new IncrementalRelation(sqlContext, parameters, userSchema, metaClient)

View File

@@ -23,10 +23,11 @@ import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileStatus, Path}
import org.apache.hadoop.hbase.io.hfile.CacheConfig
import org.apache.hadoop.mapred.JobConf
import org.apache.hudi.HoodieBaseRelation.{convertToAvroSchema, createHFileReader, generateUnsafeProjection, getPartitionPath}
import org.apache.hudi.HoodieBaseRelation.{convertToAvroSchema, createHFileReader, generateUnsafeProjection, getPartitionPath, projectSchema}
import org.apache.hudi.HoodieConversionUtils.toScalaOption
import org.apache.hudi.avro.HoodieAvroUtils
import org.apache.hudi.common.config.HoodieMetadataConfig
import org.apache.hudi.client.utils.SparkInternalSchemaConverter
import org.apache.hudi.common.config.{HoodieMetadataConfig, SerializableConfiguration}
import org.apache.hudi.common.fs.FSUtils
import org.apache.hudi.common.fs.FSUtils.getRelativePartitionPath
import org.apache.hudi.common.model.{FileSlice, HoodieFileFormat, HoodieRecord}
@@ -34,18 +35,21 @@ import org.apache.hudi.common.table.timeline.{HoodieInstant, HoodieTimeline}
import org.apache.hudi.common.table.view.HoodieTableFileSystemView
import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient, TableSchemaResolver}
import org.apache.hudi.common.util.StringUtils
import org.apache.hudi.common.util.StringUtils.isNullOrEmpty
import org.apache.hudi.common.util.ValidationUtils.checkState
import org.apache.hudi.internal.schema.convert.AvroInternalSchemaConverter
import org.apache.hudi.internal.schema.{HoodieSchemaException, InternalSchema}
import org.apache.hudi.internal.schema.convert.AvroInternalSchemaConverter
import org.apache.hudi.internal.schema.utils.{InternalSchemaUtils, SerDeHelper}
import org.apache.hudi.io.storage.HoodieHFileReader
import org.apache.spark.SerializableWritable
import org.apache.spark.execution.datasources.HoodieInMemoryFileIndex
import org.apache.spark.internal.Logging
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.{Expression, SubqueryExpression}
import org.apache.spark.sql.execution.FileRelation
import org.apache.spark.sql.execution.datasources.{FileStatusCache, PartitionDirectory, PartitionedFile, PartitioningUtils}
import org.apache.spark.sql.execution.datasources.orc.OrcFileFormat
import org.apache.spark.sql.execution.datasources.parquet.{HoodieParquetFileFormat, ParquetFileFormat}
import org.apache.spark.sql.execution.datasources.{FileFormat, FileStatusCache, PartitionDirectory, PartitionedFile, PartitioningUtils}
import org.apache.spark.sql.hudi.HoodieSqlCommonUtils
import org.apache.spark.sql.sources.{BaseRelation, Filter, PrunedFilteredScan}
import org.apache.spark.sql.types.{StringType, StructField, StructType}
@@ -60,7 +64,7 @@ import scala.util.{Failure, Success, Try}
trait HoodieFileSplit {}
case class HoodieTableSchema(structTypeSchema: StructType, avroSchemaStr: String, internalSchema: InternalSchema = InternalSchema.getEmptyInternalSchema)
case class HoodieTableSchema(structTypeSchema: StructType, avroSchemaStr: String, internalSchema: Option[InternalSchema] = None)
case class HoodieTableState(tablePath: String,
latestCommitTimestamp: String,
@@ -80,7 +84,8 @@ abstract class HoodieBaseRelation(val sqlContext: SQLContext,
extends BaseRelation
with FileRelation
with PrunedFilteredScan
with Logging {
with Logging
with SparkAdapterSupport {
type FileSplit <: HoodieFileSplit
@@ -128,9 +133,24 @@ abstract class HoodieBaseRelation(val sqlContext: SQLContext,
* NOTE: Initialization of teh following members is coupled on purpose to minimize amount of I/O
* required to fetch table's Avro and Internal schemas
*/
protected lazy val (tableAvroSchema: Schema, internalSchema: InternalSchema) = {
protected lazy val (tableAvroSchema: Schema, internalSchemaOpt: Option[InternalSchema]) = {
val schemaResolver = new TableSchemaResolver(metaClient)
val avroSchema: Schema = schemaSpec.map(convertToAvroSchema).getOrElse {
val internalSchemaOpt = if (!isSchemaEvolutionEnabled) {
None
} else {
Try(schemaResolver.getTableInternalSchemaFromCommitMetadata) match {
case Success(internalSchemaOpt) => toScalaOption(internalSchemaOpt)
case Failure(e) =>
logWarning("Failed to fetch internal-schema from the table", e)
None
}
}
val avroSchema = internalSchemaOpt.map { is =>
AvroInternalSchemaConverter.convert(is, "schema")
} orElse {
schemaSpec.map(convertToAvroSchema)
} getOrElse {
Try(schemaResolver.getTableAvroSchema) match {
case Success(schema) => schema
case Failure(e) =>
@@ -139,25 +159,20 @@ abstract class HoodieBaseRelation(val sqlContext: SQLContext,
}
}
val internalSchema: InternalSchema = if (!isSchemaEvolutionEnabled) {
InternalSchema.getEmptyInternalSchema
} else {
Try(schemaResolver.getTableInternalSchemaFromCommitMetadata) match {
case Success(internalSchemaOpt) =>
toScalaOption(internalSchemaOpt).getOrElse(InternalSchema.getEmptyInternalSchema)
case Failure(e) =>
logWarning("Failed to fetch internal-schema from the table", e)
InternalSchema.getEmptyInternalSchema
}
}
(avroSchema, internalSchema)
(avroSchema, internalSchemaOpt)
}
protected lazy val tableStructSchema: StructType = AvroConversionUtils.convertAvroSchemaToStructType(tableAvroSchema)
protected val partitionColumns: Array[String] = tableConfig.getPartitionFields.orElse(Array.empty)
/**
* Data schema optimized (externally) by Spark's Optimizer.
*
* Please check scala-doc for [[updatePrunedDataSchema]] more details
*/
protected var optimizerPrunedDataSchema: Option[StructType] = None
/**
* Controls whether partition values (ie values of partition columns) should be
* <ol>
@@ -189,6 +204,16 @@ abstract class HoodieBaseRelation(val sqlContext: SQLContext,
shouldOmitPartitionColumns || shouldExtractPartitionValueFromPath
}
lazy val (fileFormat: FileFormat, fileFormatClassName: String) =
metaClient.getTableConfig.getBaseFileFormat match {
case HoodieFileFormat.ORC => (new OrcFileFormat, "orc")
case HoodieFileFormat.PARQUET =>
// We're delegating to Spark to append partition values to every row only in cases
// when these corresponding partition-values are not persisted w/in the data file itself
val parquetFileFormat = sparkAdapter.createHoodieParquetFileFormat(shouldExtractPartitionValuesFromPartitionPath).get
(parquetFileFormat, HoodieParquetFileFormat.FILE_FORMAT_ID)
}
/**
* NOTE: PLEASE READ THIS CAREFULLY
*
@@ -206,19 +231,14 @@ abstract class HoodieBaseRelation(val sqlContext: SQLContext,
* meaning that regardless of whether this columns are being requested by the query they will be fetched
* regardless so that relation is able to combine records properly (if necessary)
*
* @VisibleForTesting
* @VisibleInTests
*/
val mandatoryFields: Seq[String]
protected def mandatoryRootFields: Seq[String] =
mandatoryFields.map(col => HoodieAvroUtils.getRootLevelFieldName(col))
protected def timeline: HoodieTimeline =
// NOTE: We're including compaction here since it's not considering a "commit" operation
metaClient.getCommitsAndCompactionTimeline.filterCompletedInstants
protected val validCommits = timeline.getInstants.toArray().map(_.asInstanceOf[HoodieInstant].getFileName).mkString(",")
protected def latestInstant: Option[HoodieInstant] =
toScalaOption(timeline.lastInstant())
@@ -228,9 +248,38 @@ abstract class HoodieBaseRelation(val sqlContext: SQLContext,
/**
* Returns true in case table supports Schema on Read (Schema Evolution)
*/
def hasSchemaOnRead: Boolean = !internalSchema.isEmptySchema
def hasSchemaOnRead: Boolean = internalSchemaOpt.isDefined
override def schema: StructType = tableStructSchema
/**
* Data schema is determined as the actual schema of the Table's Data Files (for ex, parquet/orc/etc);
*
* In cases when partition values are not persisted w/in the data files, data-schema is defined as
* <pre>table's schema - partition columns</pre>
*
* Check scala-doc for [[shouldExtractPartitionValuesFromPartitionPath]] for more details
*/
def dataSchema: StructType =
if (shouldExtractPartitionValuesFromPartitionPath) {
prunePartitionColumns(tableStructSchema)
} else {
tableStructSchema
}
/**
* Determines whether relation's schema could be pruned by Spark's Optimizer
*/
def canPruneRelationSchema: Boolean =
(fileFormat.isInstanceOf[ParquetFileFormat] || fileFormat.isInstanceOf[OrcFileFormat]) &&
// NOTE: Some relations might be disabling sophisticated schema pruning techniques (for ex, nested schema pruning)
// TODO(HUDI-XXX) internal schema doesn't supported nested schema pruning currently
!hasSchemaOnRead
override def schema: StructType = {
// NOTE: Optimizer could prune the schema (applying for ex, [[NestedSchemaPruning]] rule) setting new updated
// schema in-place (via [[setPrunedDataSchema]] method), therefore we have to make sure that we pick
// pruned data schema (if present) over the standard table's one
optimizerPrunedDataSchema.getOrElse(tableStructSchema)
}
/**
* This method controls whether relation will be producing
@@ -263,22 +312,24 @@ abstract class HoodieBaseRelation(val sqlContext: SQLContext,
//
// (!!!) IT'S CRITICAL TO AVOID REORDERING OF THE REQUESTED COLUMNS AS THIS WILL BREAK THE UPSTREAM
// PROJECTION
val fetchedColumns: Array[String] = appendMandatoryRootFields(requiredColumns)
val targetColumns: Array[String] = appendMandatoryColumns(requiredColumns)
// NOTE: We explicitly fallback to default table's Avro schema to make sure we avoid unnecessary Catalyst > Avro
// schema conversion, which is lossy in nature (for ex, it doesn't preserve original Avro type-names) and
// could have an effect on subsequent de-/serializing records in some exotic scenarios (when Avro unions
// w/ more than 2 types are involved)
val sourceSchema = optimizerPrunedDataSchema.map(convertToAvroSchema).getOrElse(tableAvroSchema)
val (requiredAvroSchema, requiredStructSchema, requiredInternalSchema) =
HoodieSparkUtils.getRequiredSchema(tableAvroSchema, fetchedColumns, internalSchema)
projectSchema(Either.cond(internalSchemaOpt.isDefined, internalSchemaOpt.get, sourceSchema), targetColumns)
val filterExpressions = convertToExpressions(filters)
val (partitionFilters, dataFilters) = filterExpressions.partition(isPartitionPredicate)
val fileSplits = collectFileSplits(partitionFilters, dataFilters)
val tableAvroSchemaStr =
if (internalSchema.isEmptySchema) tableAvroSchema.toString
else AvroInternalSchemaConverter.convert(internalSchema, tableAvroSchema.getName).toString
val tableAvroSchemaStr = tableAvroSchema.toString
val tableSchema = HoodieTableSchema(tableStructSchema, tableAvroSchemaStr, internalSchema)
val requiredSchema = HoodieTableSchema(requiredStructSchema, requiredAvroSchema.toString, requiredInternalSchema)
val tableSchema = HoodieTableSchema(tableStructSchema, tableAvroSchemaStr, internalSchemaOpt)
val requiredSchema = HoodieTableSchema(requiredStructSchema, requiredAvroSchema.toString, Some(requiredInternalSchema))
// Since schema requested by the caller might contain partition columns, we might need to
// prune it, removing all partition columns from it in case these columns are not persisted
@@ -289,19 +340,19 @@ abstract class HoodieBaseRelation(val sqlContext: SQLContext,
// the partition path, and omitted from the data file, back into fetched rows;
// Note that, by default, partition columns are not omitted therefore specifying
// partition schema for reader is not required
val (partitionSchema, dataSchema, prunedRequiredSchema) =
val (partitionSchema, dataSchema, requiredDataSchema) =
tryPrunePartitionColumns(tableSchema, requiredSchema)
if (fileSplits.isEmpty) {
sparkSession.sparkContext.emptyRDD
} else {
val rdd = composeRDD(fileSplits, partitionSchema, dataSchema, prunedRequiredSchema, filters)
val rdd = composeRDD(fileSplits, partitionSchema, dataSchema, requiredDataSchema, filters)
// NOTE: In case when partition columns have been pruned from the required schema, we have to project
// the rows from the pruned schema back into the one expected by the caller
val projectedRDD = if (prunedRequiredSchema.structTypeSchema != requiredSchema.structTypeSchema) {
val projectedRDD = if (requiredDataSchema.structTypeSchema != requiredSchema.structTypeSchema) {
rdd.mapPartitions { it =>
val fullPrunedSchema = StructType(prunedRequiredSchema.structTypeSchema.fields ++ partitionSchema.fields)
val fullPrunedSchema = StructType(requiredDataSchema.structTypeSchema.fields ++ partitionSchema.fields)
val unsafeProjection = generateUnsafeProjection(fullPrunedSchema, requiredSchema.structTypeSchema)
it.map(unsafeProjection)
}
@@ -408,11 +459,12 @@ abstract class HoodieBaseRelation(val sqlContext: SQLContext,
!SubqueryExpression.hasSubquery(condition)
}
protected final def appendMandatoryRootFields(requestedColumns: Array[String]): Array[String] = {
protected final def appendMandatoryColumns(requestedColumns: Array[String]): Array[String] = {
// For a nested field in mandatory columns, we should first get the root-level field, and then
// check for any missing column, as the requestedColumns should only contain root-level fields
// We should only append root-level field as well
val missing = mandatoryRootFields.filter(rootField => !requestedColumns.contains(rootField))
val missing = mandatoryFields.map(col => HoodieAvroUtils.getRootLevelFieldName(col))
.filter(rootField => !requestedColumns.contains(rootField))
requestedColumns ++ missing
}
@@ -476,6 +528,19 @@ abstract class HoodieBaseRelation(val sqlContext: SQLContext,
}
}
/**
* Hook for Spark's Optimizer to update expected relation schema after pruning
*
* NOTE: Only limited number of optimizations in respect to schema pruning could be performed
* internally w/in the relation itself w/o consideration for how the relation output is used.
* Therefore more advanced optimizations (like [[NestedSchemaPruning]]) have to be carried out
* by Spark's Optimizer holistically evaluating Spark's [[LogicalPlan]]
*/
def updatePrunedDataSchema(prunedSchema: StructType): this.type = {
optimizerPrunedDataSchema = Some(prunedSchema)
this
}
/**
* Returns file-reader routine accepting [[PartitionedFile]] and returning an [[Iterator]]
* over [[InternalRow]]
@@ -521,6 +586,19 @@ abstract class HoodieBaseRelation(val sqlContext: SQLContext,
}
}
protected def embedInternalSchema(conf: Configuration, internalSchemaOpt: Option[InternalSchema]): Configuration = {
val internalSchema = internalSchemaOpt.getOrElse(InternalSchema.getEmptyInternalSchema)
val querySchemaString = SerDeHelper.toJson(internalSchema)
if (!isNullOrEmpty(querySchemaString)) {
val validCommits = timeline.getInstants.iterator.asScala.map(_.getFileName).mkString(",")
conf.set(SparkInternalSchemaConverter.HOODIE_QUERY_SCHEMA, SerDeHelper.toJson(internalSchema))
conf.set(SparkInternalSchemaConverter.HOODIE_TABLE_PATH, metaClient.getBasePath)
conf.set(SparkInternalSchemaConverter.HOODIE_VALID_COMMITS_LIST, validCommits)
}
conf
}
private def tryPrunePartitionColumns(tableSchema: HoodieTableSchema,
requiredSchema: HoodieTableSchema): (StructType, HoodieTableSchema, HoodieTableSchema) = {
if (shouldExtractPartitionValuesFromPartitionPath) {
@@ -544,15 +622,15 @@ abstract class HoodieBaseRelation(val sqlContext: SQLContext,
// t/h Spark Session configuration (for ex, for Spark SQL)
optParams.getOrElse(DataSourceReadOptions.SCHEMA_EVOLUTION_ENABLED.key,
DataSourceReadOptions.SCHEMA_EVOLUTION_ENABLED.defaultValue.toString).toBoolean ||
sparkSession.conf.get(DataSourceReadOptions.SCHEMA_EVOLUTION_ENABLED.key,
DataSourceReadOptions.SCHEMA_EVOLUTION_ENABLED.defaultValue.toString).toBoolean
sparkSession.conf.get(DataSourceReadOptions.SCHEMA_EVOLUTION_ENABLED.key,
DataSourceReadOptions.SCHEMA_EVOLUTION_ENABLED.defaultValue.toString).toBoolean
}
}
object HoodieBaseRelation extends SparkAdapterSupport {
private def generateUnsafeProjection(from: StructType, to: StructType) =
sparkAdapter.createCatalystExpressionUtils().generateUnsafeProjection(from, to)
sparkAdapter.getCatalystExpressionUtils().generateUnsafeProjection(from, to)
def convertToAvroSchema(structSchema: StructType): Schema =
sparkAdapter.getAvroSchemaConverters.toAvroType(structSchema, nullable = false, "Record")
@@ -560,16 +638,50 @@ object HoodieBaseRelation extends SparkAdapterSupport {
def getPartitionPath(fileStatus: FileStatus): Path =
fileStatus.getPath.getParent
/**
* Projects provided schema by picking only required (projected) top-level columns from it
*
* @param tableSchema schema to project (either of [[InternalSchema]] or Avro's [[Schema]])
* @param requiredColumns required top-level columns to be projected
*/
def projectSchema(tableSchema: Either[Schema, InternalSchema], requiredColumns: Array[String]): (Schema, StructType, InternalSchema) = {
tableSchema match {
case Right(internalSchema) =>
checkState(!internalSchema.isEmptySchema)
// TODO extend pruning to leverage optimizer pruned schema
val prunedInternalSchema = InternalSchemaUtils.pruneInternalSchema(internalSchema, requiredColumns.toList.asJava)
val requiredAvroSchema = AvroInternalSchemaConverter.convert(prunedInternalSchema, "schema")
val requiredStructSchema = AvroConversionUtils.convertAvroSchemaToStructType(requiredAvroSchema)
(requiredAvroSchema, requiredStructSchema, prunedInternalSchema)
case Left(avroSchema) =>
val fieldMap = avroSchema.getFields.asScala.map(f => f.name() -> f).toMap
val requiredFields = requiredColumns.map { col =>
val f = fieldMap(col)
// We have to create a new [[Schema.Field]] since Avro schemas can't share field
// instances (and will throw "org.apache.avro.AvroRuntimeException: Field already used")
new Schema.Field(f.name(), f.schema(), f.doc(), f.defaultVal(), f.order())
}.toList
val requiredAvroSchema = Schema.createRecord(avroSchema.getName, avroSchema.getDoc,
avroSchema.getNamespace, avroSchema.isError, requiredFields.asJava)
val requiredStructSchema = AvroConversionUtils.convertAvroSchemaToStructType(requiredAvroSchema)
(requiredAvroSchema, requiredStructSchema, InternalSchema.getEmptyInternalSchema)
}
}
private def createHFileReader(spark: SparkSession,
dataSchema: HoodieTableSchema,
requiredSchema: HoodieTableSchema,
filters: Seq[Filter],
options: Map[String, String],
hadoopConf: Configuration): PartitionedFile => Iterator[InternalRow] = {
val hadoopConfBroadcast = spark.sparkContext.broadcast(new SerializableWritable(hadoopConf))
val hadoopConfBroadcast =
spark.sparkContext.broadcast(new SerializableConfiguration(hadoopConf))
partitionedFile => {
val hadoopConf = hadoopConfBroadcast.value.value
val hadoopConf = hadoopConfBroadcast.value.get()
val reader = new HoodieHFileReader[GenericRecord](hadoopConf, new Path(partitionedFile.filePath),
new CacheConfig(hadoopConf))
@@ -581,7 +693,7 @@ object HoodieBaseRelation extends SparkAdapterSupport {
reader.getRecordIterator(requiredAvroSchema).asScala
.map(record => {
avroToRowConverter.apply(record.asInstanceOf[GenericRecord]).get
avroToRowConverter.apply(record).get
})
}
}

View File

@@ -82,23 +82,4 @@ object HoodieDataSourceHelper extends PredicateHelper with SparkAdapterSupport {
PartitionedFile(partitionValues, filePath.toUri.toString, offset, size)
}
}
/**
* Set internalSchema evolution parameters to configuration.
* spark will broadcast them to each executor, we use those parameters to do schema evolution.
*
* @param conf hadoop conf.
* @param internalSchema internalschema for query.
* @param tablePath hoodie table base path.
* @param validCommits valid commits, using give validCommits to validate all legal histroy Schema files, and return the latest one.
*/
def getConfigurationWithInternalSchema(conf: Configuration, internalSchema: InternalSchema, tablePath: String, validCommits: String): Configuration = {
val querySchemaString = SerDeHelper.toJson(internalSchema)
if (!isNullOrEmpty(querySchemaString)) {
conf.set(SparkInternalSchemaConverter.HOODIE_QUERY_SCHEMA, SerDeHelper.toJson(internalSchema))
conf.set(SparkInternalSchemaConverter.HOODIE_TABLE_PATH, tablePath)
conf.set(SparkInternalSchemaConverter.HOODIE_VALID_COMMITS_LIST, validCommits)
}
conf
}
}

View File

@@ -166,10 +166,11 @@ class HoodieMergeOnReadRDD(@transient sc: SparkContext,
// be stored in non-columnar formats like Avro, HFile, etc)
private val requiredSchemaFieldOrdinals: List[Int] = collectFieldOrdinals(requiredAvroSchema, logFileReaderAvroSchema)
// TODO: now logScanner with internalSchema support column project, we may no need projectAvroUnsafe
private var logScanner =
private var logScanner = {
val internalSchema = tableSchema.internalSchema.getOrElse(InternalSchema.getEmptyInternalSchema)
HoodieMergeOnReadRDD.scanLog(split.logFiles, getPartitionPath(split), logFileReaderAvroSchema, tableState,
maxCompactionMemoryInBytes, config, tableSchema.internalSchema)
maxCompactionMemoryInBytes, config, internalSchema)
}
private val logRecords = logScanner.getRecords.asScala

View File

@@ -78,7 +78,7 @@ class MergeOnReadIncrementalRelation(sqlContext: SQLContext,
options = optParams,
// NOTE: We have to fork the Hadoop Config here as Spark will be modifying it
// to configure Parquet reader appropriately
hadoopConf = HoodieDataSourceHelper.getConfigurationWithInternalSchema(new Configuration(conf), internalSchema, metaClient.getBasePath, validCommits)
hadoopConf = embedInternalSchema(new Configuration(conf), internalSchemaOpt)
)
val requiredSchemaParquetReader = createBaseFileReader(
@@ -90,7 +90,7 @@ class MergeOnReadIncrementalRelation(sqlContext: SQLContext,
options = optParams,
// NOTE: We have to fork the Hadoop Config here as Spark will be modifying it
// to configure Parquet reader appropriately
hadoopConf = HoodieDataSourceHelper.getConfigurationWithInternalSchema(new Configuration(conf), requiredSchema.internalSchema, metaClient.getBasePath, validCommits)
hadoopConf = embedInternalSchema(new Configuration(conf), requiredSchema.internalSchema)
)
val hoodieTableState = getTableState

View File

@@ -76,7 +76,7 @@ class MergeOnReadSnapshotRelation(sqlContext: SQLContext,
options = optParams,
// NOTE: We have to fork the Hadoop Config here as Spark will be modifying it
// to configure Parquet reader appropriately
hadoopConf = HoodieDataSourceHelper.getConfigurationWithInternalSchema(new Configuration(conf), internalSchema, metaClient.getBasePath, validCommits)
hadoopConf = embedInternalSchema(new Configuration(conf), internalSchemaOpt)
)
val requiredSchemaParquetReader = createBaseFileReader(
@@ -88,7 +88,7 @@ class MergeOnReadSnapshotRelation(sqlContext: SQLContext,
options = optParams,
// NOTE: We have to fork the Hadoop Config here as Spark will be modifying it
// to configure Parquet reader appropriately
hadoopConf = HoodieDataSourceHelper.getConfigurationWithInternalSchema(new Configuration(conf), requiredSchema.internalSchema, metaClient.getBasePath, validCommits)
hadoopConf = embedInternalSchema(new Configuration(conf), requiredSchema.internalSchema)
)
val tableState = getTableState

View File

@@ -56,6 +56,8 @@ private[sql] object SchemaConverters {
toSqlTypeHelper(avroSchema, Set.empty)
}
private val unionFieldMemberPrefix = "member"
private def toSqlTypeHelper(avroSchema: Schema, existingRecordNames: Set[String]): SchemaType = {
avroSchema.getType match {
case INT => avroSchema.getLogicalType match {
@@ -134,7 +136,7 @@ private[sql] object SchemaConverters {
case (s, i) =>
val schemaType = toSqlTypeHelper(s, existingRecordNames)
// All fields are nullable because only one of them is set at a time
StructField(s"member$i", schemaType.dataType, nullable = true)
StructField(s"$unionFieldMemberPrefix$i", schemaType.dataType, nullable = true)
}
SchemaType(StructType(fields.toSeq), nullable = false)
@@ -187,23 +189,46 @@ private[sql] object SchemaConverters {
.values(toAvroType(vt, valueContainsNull, recordName, nameSpace))
case st: StructType =>
val childNameSpace = if (nameSpace != "") s"$nameSpace.$recordName" else recordName
val fieldsAssembler = builder.record(recordName).namespace(nameSpace).fields()
st.foreach { f =>
val fieldAvroType =
toAvroType(f.dataType, f.nullable, f.name, childNameSpace)
fieldsAssembler.name(f.name).`type`(fieldAvroType).noDefault()
if (canBeUnion(st)) {
val nonNullUnionFieldTypes = st.map(f => toAvroType(f.dataType, nullable = false, f.name, childNameSpace))
val unionFieldTypes = if (nullable) {
nullSchema +: nonNullUnionFieldTypes
} else {
nonNullUnionFieldTypes
}
Schema.createUnion(unionFieldTypes:_*)
} else {
val fieldsAssembler = builder.record(recordName).namespace(nameSpace).fields()
st.foreach { f =>
val fieldAvroType =
toAvroType(f.dataType, f.nullable, f.name, childNameSpace)
fieldsAssembler.name(f.name).`type`(fieldAvroType).noDefault()
}
fieldsAssembler.endRecord()
}
fieldsAssembler.endRecord()
// This should never happen.
case other => throw new IncompatibleSchemaException(s"Unexpected type $other.")
}
if (nullable && catalystType != NullType) {
if (nullable && catalystType != NullType && schema.getType != Schema.Type.UNION) {
Schema.createUnion(schema, nullSchema)
} else {
schema
}
}
private def canBeUnion(st: StructType): Boolean = {
// We use a heuristic to determine whether a [[StructType]] could potentially have been produced
// by converting Avro union to Catalyst's [[StructType]]:
// - It has to have at least 1 field
// - All fields have to be of the following format "memberN" (where N is sequentially increasing integer)
// - All fields are nullable
st.fields.length > 0 &&
st.forall { f =>
f.name.matches(s"$unionFieldMemberPrefix\\d+") && f.nullable
}
}
}
private[avro] class IncompatibleSchemaException(msg: String, ex: Throwable = null) extends Exception(msg, ex)

View File

@@ -384,7 +384,7 @@ private object ColumnStatsExpressionUtils {
* Returns only [[AttributeReference]] contained as a sub-expression
*/
object AllowedTransformationExpression extends SparkAdapterSupport {
val exprUtils: HoodieCatalystExpressionUtils = sparkAdapter.createCatalystExpressionUtils()
val exprUtils: HoodieCatalystExpressionUtils = sparkAdapter.getCatalystExpressionUtils()
def unapply(expr: Expression): Option[AttributeReference] = {
// First step, we check that expression

View File

@@ -53,7 +53,7 @@ object HoodieSqlCommonUtils extends SparkAdapterSupport {
def getTableIdentifier(table: LogicalPlan): TableIdentifier = {
table match {
case SubqueryAlias(name, _) => sparkAdapter.toTableIdentifier(name)
case SubqueryAlias(name, _) => sparkAdapter.getCatalystPlanUtils.toTableIdentifier(name)
case _ => throw new IllegalArgumentException(s"Illegal table: $table")
}
}

View File

@@ -324,12 +324,14 @@
<classifier>tests</classifier>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_${scala.binary.version}</artifactId>
<classifier>tests</classifier>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-catalyst_${scala.binary.version}</artifactId>

View File

@@ -28,21 +28,20 @@ import org.apache.spark.sql.parser.HoodieCommonSqlParser
class HoodieSparkSessionExtension extends (SparkSessionExtensions => Unit)
with SparkAdapterSupport {
override def apply(extensions: SparkSessionExtensions): Unit = {
extensions.injectParser { (session, parser) =>
new HoodieCommonSqlParser(session, parser)
}
HoodieAnalysis.customResolutionRules.foreach { ruleBuilder =>
extensions.injectResolutionRule { session =>
ruleBuilder(session)
}
HoodieAnalysis.customOptimizerRules.foreach { ruleBuilder =>
extensions.injectOptimizerRule(ruleBuilder(_))
}
HoodieAnalysis.customPostHocResolutionRules.foreach { rule =>
extensions.injectPostHocResolutionRule { session =>
rule(session)
}
HoodieAnalysis.customResolutionRules.foreach { ruleBuilder =>
extensions.injectResolutionRule(ruleBuilder(_))
}
HoodieAnalysis.customPostHocResolutionRules.foreach { ruleBuilder =>
extensions.injectPostHocResolutionRule(ruleBuilder(_))
}
}
}

View File

@@ -33,7 +33,7 @@ object HoodieSqlUtils extends SparkAdapterSupport {
case SubqueryAlias(tableId, _) => tableId
case plan => throw new IllegalArgumentException(s"Illegal plan $plan in target")
}
sparkAdapter.toTableIdentifier(aliaId)
sparkAdapter.getCatalystPlanUtils.toTableIdentifier(aliaId)
}
/**

View File

@@ -44,6 +44,16 @@ import scala.collection.mutable.ListBuffer
object HoodieAnalysis {
type RuleBuilder = SparkSession => Rule[LogicalPlan]
def customOptimizerRules: Seq[RuleBuilder] =
if (HoodieSparkUtils.gteqSpark3_1) {
val nestedSchemaPruningClass = "org.apache.spark.sql.execution.datasources.NestedSchemaPruning"
val nestedSchemaPruningRule = ReflectionUtils.loadClass(nestedSchemaPruningClass).asInstanceOf[Rule[LogicalPlan]]
Seq(_ => nestedSchemaPruningRule)
} else {
Seq.empty
}
def customResolutionRules: Seq[RuleBuilder] = {
val rules: ListBuffer[RuleBuilder] = ListBuffer(
// Default rules
@@ -130,8 +140,8 @@ case class HoodieAnalysis(sparkSession: SparkSession) extends Rule[LogicalPlan]
DeleteHoodieTableCommand(d)
// Convert to InsertIntoHoodieTableCommand
case l if sparkAdapter.isInsertInto(l) =>
val (table, partition, query, overwrite, _) = sparkAdapter.getInsertIntoChildren(l).get
case l if sparkAdapter.getCatalystPlanUtils.isInsertInto(l) =>
val (table, partition, query, overwrite, _) = sparkAdapter.getCatalystPlanUtils.getInsertIntoChildren(l).get
table match {
case relation: LogicalRelation if sparkAdapter.isHoodieTable(relation, sparkSession) =>
new InsertIntoHoodieTableCommand(relation, query, partition, overwrite)
@@ -420,9 +430,9 @@ case class HoodieResolveReferences(sparkSession: SparkSession) extends Rule[Logi
// Append the meta field to the insert query to walk through the validate for the
// number of insert fields with the number of the target table fields.
case l if sparkAdapter.isInsertInto(l) =>
case l if sparkAdapter.getCatalystPlanUtils.isInsertInto(l) =>
val (table, partition, query, overwrite, ifPartitionNotExists) =
sparkAdapter.getInsertIntoChildren(l).get
sparkAdapter.getCatalystPlanUtils.getInsertIntoChildren(l).get
if (sparkAdapter.isHoodieTable(table, sparkSession) && query.resolved &&
!containUnResolvedStar(query) &&
@@ -439,21 +449,21 @@ case class HoodieResolveReferences(sparkSession: SparkSession) extends Rule[Logi
val withMetaFieldProjects = metaFields ++ query.output
Project(withMetaFieldProjects, query)
}
sparkAdapter.createInsertInto(table, partition, newQuery, overwrite, ifPartitionNotExists)
sparkAdapter.getCatalystPlanUtils.createInsertInto(table, partition, newQuery, overwrite, ifPartitionNotExists)
} else {
l
}
case l if sparkAdapter.isRelationTimeTravel(l) =>
case l if sparkAdapter.getCatalystPlanUtils.isRelationTimeTravel(l) =>
val (plan: UnresolvedRelation, timestamp, version) =
sparkAdapter.getRelationTimeTravel(l).get
sparkAdapter.getCatalystPlanUtils.getRelationTimeTravel(l).get
if (timestamp.isEmpty && version.nonEmpty) {
throw new AnalysisException(
"version expression is not supported for time travel")
}
val tableIdentifier = sparkAdapter.toTableIdentifier(plan)
val tableIdentifier = sparkAdapter.getCatalystPlanUtils.toTableIdentifier(plan)
if (sparkAdapter.isHoodieTable(tableIdentifier, sparkSession)) {
val hoodieCatalogTable = HoodieCatalogTable(sparkSession, tableIdentifier)
val table = hoodieCatalogTable.table
@@ -525,7 +535,7 @@ case class HoodieResolveReferences(sparkSession: SparkSession) extends Rule[Logi
// Fake a project for the expression based on the source plan.
val fakeProject = if (right.isDefined) {
Project(Seq(Alias(expression, "_c0")()),
sparkAdapter.createJoin(left, right.get, Inner))
sparkAdapter.getCatalystPlanUtils.createJoin(left, right.get, Inner))
} else {
Project(Seq(Alias(expression, "_c0")()),
left)

View File

@@ -41,7 +41,7 @@ class RunClusteringProcedure extends BaseProcedure
with Logging
with SparkAdapterSupport {
private val exprUtils = sparkAdapter.createCatalystExpressionUtils()
private val exprUtils = sparkAdapter.getCatalystExpressionUtils()
/**
* OPTIMIZE table_name|table_path [WHERE predicate]

View File

@@ -57,7 +57,7 @@ case class IndexRow(fileName: String,
class TestDataSkippingUtils extends HoodieClientTestBase with SparkAdapterSupport {
val exprUtils: HoodieCatalystExpressionUtils = sparkAdapter.createCatalystExpressionUtils()
val exprUtils: HoodieCatalystExpressionUtils = sparkAdapter.getCatalystExpressionUtils()
var spark: SparkSession = _

View File

@@ -0,0 +1,56 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi
import org.apache.avro.Schema
import org.apache.hudi.AvroConversionUtils.convertAvroSchemaToStructType
import org.junit.jupiter.api.Assertions.assertEquals
import org.junit.jupiter.api.Test
import scala.collection.JavaConverters.asScalaBufferConverter
class TestHoodieRelations {
@Test
def testPruningSchema(): Unit = {
val avroSchemaString = "{\"type\":\"record\",\"name\":\"record\"," +
"\"fields\":[{\"name\":\"_hoodie_commit_time\",\"type\":[\"null\",\"string\"],\"doc\":\"\",\"default\":null}," +
"{\"name\":\"_hoodie_commit_seqno\",\"type\":[\"null\",\"string\"],\"doc\":\"\",\"default\":null}," +
"{\"name\":\"_hoodie_record_key\",\"type\":[\"null\",\"string\"],\"doc\":\"\",\"default\":null}," +
"{\"name\":\"_hoodie_partition_path\",\"type\":[\"null\",\"string\"],\"doc\":\"\",\"default\":null}," +
"{\"name\":\"_hoodie_file_name\",\"type\":[\"null\",\"string\"],\"doc\":\"\",\"default\":null}," +
"{\"name\":\"uuid\",\"type\":\"string\"},{\"name\":\"name\",\"type\":[\"null\",\"string\"],\"default\":null}," +
"{\"name\":\"age\",\"type\":[\"null\",\"int\"],\"default\":null}," +
"{\"name\":\"ts\",\"type\":[\"null\",{\"type\":\"long\",\"logicalType\":\"timestamp-millis\"}],\"default\":null}," +
"{\"name\":\"partition\",\"type\":[\"null\",\"string\"],\"default\":null}]}"
val tableAvroSchema = new Schema.Parser().parse(avroSchemaString)
val tableStructSchema = convertAvroSchemaToStructType(tableAvroSchema)
val (requiredAvroSchema, requiredStructSchema, _) =
HoodieBaseRelation.projectSchema(Left(tableAvroSchema), Array("ts"))
assertEquals(Seq(tableAvroSchema.getField("ts")), requiredAvroSchema.getFields.asScala)
assertEquals(
Seq(tableStructSchema.fields.apply(tableStructSchema.fieldIndex("ts"))),
requiredStructSchema.fields.toSeq
)
}
}

View File

@@ -18,17 +18,17 @@
package org.apache.hudi
import org.apache.avro.Schema
import org.apache.avro.generic.GenericRecord
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.apache.hudi.exception.SchemaCompatibilityException
import org.apache.hudi.testutils.DataSourceTestUtils
import org.apache.spark.sql.types.{StructType, TimestampType}
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.{Row, SparkSession}
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.Test
import org.junit.jupiter.api.io.TempDir
import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.ValueSource
import java.io.File
import java.nio.file.Paths
@@ -36,6 +36,60 @@ import scala.collection.JavaConverters
class TestHoodieSparkUtils {
@ParameterizedTest
@ValueSource(strings = Array("2.4.4", "3.1.0", "3.2.0", "3.3.0"))
def testSparkVersionCheckers(sparkVersion: String): Unit = {
val vsMock = new SparkVersionsSupport {
override def getSparkVersion: String = sparkVersion
}
sparkVersion match {
case "2.4.4" =>
assertTrue(vsMock.isSpark2)
assertFalse(vsMock.isSpark3)
assertFalse(vsMock.isSpark3_1)
assertFalse(vsMock.isSpark3_0)
assertFalse(vsMock.isSpark3_2)
assertFalse(vsMock.gteqSpark3_1)
assertFalse(vsMock.gteqSpark3_1_3)
assertFalse(vsMock.gteqSpark3_2)
case "3.1.0" =>
assertTrue(vsMock.isSpark3)
assertTrue(vsMock.isSpark3_1)
assertTrue(vsMock.gteqSpark3_1)
assertFalse(vsMock.isSpark2)
assertFalse(vsMock.isSpark3_0)
assertFalse(vsMock.isSpark3_2)
assertFalse(vsMock.gteqSpark3_1_3)
assertFalse(vsMock.gteqSpark3_2)
case "3.2.0" =>
assertTrue(vsMock.isSpark3)
assertTrue(vsMock.isSpark3_2)
assertTrue(vsMock.gteqSpark3_1)
assertTrue(vsMock.gteqSpark3_1_3)
assertTrue(vsMock.gteqSpark3_2)
assertFalse(vsMock.isSpark2)
assertFalse(vsMock.isSpark3_0)
assertFalse(vsMock.isSpark3_1)
case "3.3.0" =>
assertTrue(vsMock.isSpark3)
assertTrue(vsMock.gteqSpark3_1)
assertTrue(vsMock.gteqSpark3_1_3)
assertTrue(vsMock.gteqSpark3_2)
assertFalse(vsMock.isSpark3_2)
assertFalse(vsMock.isSpark2)
assertFalse(vsMock.isSpark3_0)
assertFalse(vsMock.isSpark3_1)
}
}
@Test
def testGlobPaths(@TempDir tempDir: File): Unit = {
val folders: Seq[Path] = Seq(
@@ -204,29 +258,6 @@ class TestHoodieSparkUtils {
spark.stop()
}
@Test
def testGetRequiredSchema(): Unit = {
val avroSchemaString = "{\"type\":\"record\",\"name\":\"record\"," +
"\"fields\":[{\"name\":\"_hoodie_commit_time\",\"type\":[\"null\",\"string\"],\"doc\":\"\",\"default\":null}," +
"{\"name\":\"_hoodie_commit_seqno\",\"type\":[\"null\",\"string\"],\"doc\":\"\",\"default\":null}," +
"{\"name\":\"_hoodie_record_key\",\"type\":[\"null\",\"string\"],\"doc\":\"\",\"default\":null}," +
"{\"name\":\"_hoodie_partition_path\",\"type\":[\"null\",\"string\"],\"doc\":\"\",\"default\":null}," +
"{\"name\":\"_hoodie_file_name\",\"type\":[\"null\",\"string\"],\"doc\":\"\",\"default\":null}," +
"{\"name\":\"uuid\",\"type\":\"string\"},{\"name\":\"name\",\"type\":[\"null\",\"string\"],\"default\":null}," +
"{\"name\":\"age\",\"type\":[\"null\",\"int\"],\"default\":null}," +
"{\"name\":\"ts\",\"type\":[\"null\",{\"type\":\"long\",\"logicalType\":\"timestamp-millis\"}],\"default\":null}," +
"{\"name\":\"partition\",\"type\":[\"null\",\"string\"],\"default\":null}]}"
val tableAvroSchema = new Schema.Parser().parse(avroSchemaString)
val (requiredAvroSchema, requiredStructSchema, _) =
HoodieSparkUtils.getRequiredSchema(tableAvroSchema, Array("ts"))
assertEquals("timestamp-millis",
requiredAvroSchema.getField("ts").schema().getTypes.get(1).getLogicalType.getName)
assertEquals(TimestampType, requiredStructSchema.fields(0).dataType)
}
def convertRowListToSeq(inputList: java.util.List[Row]): Seq[Row] =
JavaConverters.asScalaIteratorConverter(inputList.iterator).asScala.toSeq
}

View File

@@ -19,27 +19,31 @@ package org.apache.hudi.functional
import org.apache.hadoop.fs.Path
import org.apache.hudi.DataSourceWriteOptions._
import org.apache.hudi.HoodieConversionUtils.toJavaOption
import org.apache.hudi.common.config.HoodieMetadataConfig
import org.apache.hudi.common.model.{DefaultHoodieRecordPayload, HoodieTableType}
import org.apache.hudi.common.table.HoodieTableMetaClient
import org.apache.hudi.common.testutils.HoodieTestDataGenerator
import org.apache.hudi.common.testutils.RawTripTestPayload.recordsToStrings
import org.apache.hudi.common.util
import org.apache.hudi.config.{HoodieIndexConfig, HoodieWriteConfig}
import org.apache.hudi.index.HoodieIndex.IndexType
import org.apache.hudi.keygen.NonpartitionedKeyGenerator
import org.apache.hudi.keygen.constant.KeyGeneratorOptions.Config
import org.apache.hudi.testutils.{DataSourceTestUtils, HoodieClientTestBase}
import org.apache.hudi.util.JFunction
import org.apache.hudi.{DataSourceReadOptions, DataSourceWriteOptions, HoodieDataSourceHelpers, SparkDatasetMixin}
import org.apache.log4j.LogManager
import org.apache.spark.sql._
import org.apache.spark.sql.functions._
import org.apache.spark.sql.hudi.HoodieSparkSessionExtension
import org.apache.spark.sql.types.BooleanType
import org.junit.jupiter.api.Assertions.{assertEquals, assertTrue}
import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.CsvSource
import scala.collection.JavaConversions._
import java.util.function.Consumer
import scala.collection.JavaConverters._
/**
@@ -76,11 +80,17 @@ class TestMORDataSource extends HoodieClientTestBase with SparkDatasetMixin {
cleanupFileSystem()
}
override def getSparkSessionExtensionsInjector: util.Option[Consumer[SparkSessionExtensions]] =
toJavaOption(
Some(
JFunction.toJava((receiver: SparkSessionExtensions) => new HoodieSparkSessionExtension().apply(receiver)))
)
@Test def testCount() {
// First Operation:
// Producing parquet files to three default partitions.
// SNAPSHOT view on MOR table with parquet files only.
val records1 = recordsToStrings(dataGen.generateInserts("001", 100)).toList
val records1 = recordsToStrings(dataGen.generateInserts("001", 100)).asScala
val inputDF1 = spark.read.json(spark.sparkContext.parallelize(records1, 2))
inputDF1.write.format("org.apache.hudi")
.options(commonOpts)
@@ -98,7 +108,7 @@ class TestMORDataSource extends HoodieClientTestBase with SparkDatasetMixin {
// Second Operation:
// Upsert the update to the default partitions with duplicate records. Produced a log file for each parquet.
// SNAPSHOT view should read the log files only with the latest commit time.
val records2 = recordsToStrings(dataGen.generateUniqueUpdates("002", 100)).toList
val records2 = recordsToStrings(dataGen.generateUniqueUpdates("002", 100)).asScala
val inputDF2: Dataset[Row] = spark.read.json(spark.sparkContext.parallelize(records2, 2))
inputDF2.write.format("org.apache.hudi")
.options(commonOpts)
@@ -173,7 +183,7 @@ class TestMORDataSource extends HoodieClientTestBase with SparkDatasetMixin {
// Third Operation:
// Upsert another update to the default partitions with 50 duplicate records. Produced the second log file for each parquet.
// SNAPSHOT view should read the latest log files.
val records3 = recordsToStrings(dataGen.generateUniqueUpdates("003", 50)).toList
val records3 = recordsToStrings(dataGen.generateUniqueUpdates("003", 50)).asScala
val inputDF3: Dataset[Row] = spark.read.json(spark.sparkContext.parallelize(records3, 2))
inputDF3.write.format("org.apache.hudi")
.options(commonOpts)
@@ -213,7 +223,7 @@ class TestMORDataSource extends HoodieClientTestBase with SparkDatasetMixin {
val partitionPaths = new Array[String](1)
partitionPaths.update(0, "2020/01/10")
val newDataGen = new HoodieTestDataGenerator(partitionPaths)
val records4 = recordsToStrings(newDataGen.generateInserts("004", 100)).toList
val records4 = recordsToStrings(newDataGen.generateInserts("004", 100)).asScala
val inputDF4: Dataset[Row] = spark.read.json(spark.sparkContext.parallelize(records4, 2))
inputDF4.write.format("org.apache.hudi")
.options(commonOpts)
@@ -238,7 +248,7 @@ class TestMORDataSource extends HoodieClientTestBase with SparkDatasetMixin {
// Upsert records to the new partition. Produced a newer version of parquet file.
// SNAPSHOT view should read the latest log files from the default partition
// and the latest parquet from the new partition.
val records5 = recordsToStrings(newDataGen.generateUniqueUpdates("005", 50)).toList
val records5 = recordsToStrings(newDataGen.generateUniqueUpdates("005", 50)).asScala
val inputDF5: Dataset[Row] = spark.read.json(spark.sparkContext.parallelize(records5, 2))
inputDF5.write.format("org.apache.hudi")
.options(commonOpts)
@@ -252,7 +262,7 @@ class TestMORDataSource extends HoodieClientTestBase with SparkDatasetMixin {
// Sixth Operation:
// Insert 2 records and trigger compaction.
val records6 = recordsToStrings(newDataGen.generateInserts("006", 2)).toList
val records6 = recordsToStrings(newDataGen.generateInserts("006", 2)).asScala
val inputDF6: Dataset[Row] = spark.read.json(spark.sparkContext.parallelize(records6, 2))
inputDF6.write.format("org.apache.hudi")
.options(commonOpts)
@@ -279,7 +289,7 @@ class TestMORDataSource extends HoodieClientTestBase with SparkDatasetMixin {
// First Operation:
// Producing parquet files to three default partitions.
// SNAPSHOT view on MOR table with parquet files only.
val records1 = recordsToStrings(dataGen.generateInserts("001", 100)).toList
val records1 = recordsToStrings(dataGen.generateInserts("001", 100)).asScala
val inputDF1 = spark.read.json(spark.sparkContext.parallelize(records1, 2))
inputDF1.write.format("org.apache.hudi")
.options(commonOpts)
@@ -297,7 +307,7 @@ class TestMORDataSource extends HoodieClientTestBase with SparkDatasetMixin {
// Second Operation:
// Upsert 50 delete records
// Snopshot view should only read 50 records
val records2 = recordsToStrings(dataGen.generateUniqueDeleteRecords("002", 50)).toList
val records2 = recordsToStrings(dataGen.generateUniqueDeleteRecords("002", 50)).asScala
val inputDF2: Dataset[Row] = spark.read.json(spark.sparkContext.parallelize(records2, 2))
inputDF2.write.format("org.apache.hudi")
.options(commonOpts)
@@ -330,7 +340,7 @@ class TestMORDataSource extends HoodieClientTestBase with SparkDatasetMixin {
// Third Operation:
// Upsert 50 delete records to delete the reset
// Snopshot view should read 0 record
val records3 = recordsToStrings(dataGen.generateUniqueDeleteRecords("003", 50)).toList
val records3 = recordsToStrings(dataGen.generateUniqueDeleteRecords("003", 50)).asScala
val inputDF3: Dataset[Row] = spark.read.json(spark.sparkContext.parallelize(records3, 2))
inputDF3.write.format("org.apache.hudi")
.options(commonOpts)
@@ -447,7 +457,7 @@ class TestMORDataSource extends HoodieClientTestBase with SparkDatasetMixin {
// Vectorized Reader will only be triggered with AtomicType schema,
// which is not null, UDTs, arrays, structs, and maps.
val schema = HoodieTestDataGenerator.SHORT_TRIP_SCHEMA
val records1 = recordsToStrings(dataGen.generateInsertsAsPerSchema("001", 100, schema)).toList
val records1 = recordsToStrings(dataGen.generateInsertsAsPerSchema("001", 100, schema)).asScala
val inputDF1 = spark.read.json(spark.sparkContext.parallelize(records1, 2))
inputDF1.write.format("org.apache.hudi")
.options(commonOpts)
@@ -462,7 +472,7 @@ class TestMORDataSource extends HoodieClientTestBase with SparkDatasetMixin {
assertEquals(100, hudiSnapshotDF1.count())
val records2 = recordsToStrings(dataGen.generateUniqueUpdatesAsPerSchema("002", 50, schema))
.toList
.asScala
val inputDF2: Dataset[Row] = spark.read.json(spark.sparkContext.parallelize(records2, 2))
inputDF2.write.format("org.apache.hudi")
.options(commonOpts)
@@ -488,7 +498,7 @@ class TestMORDataSource extends HoodieClientTestBase with SparkDatasetMixin {
@Test def testNoPrecombine() {
// Insert Operation
val records = recordsToStrings(dataGen.generateInserts("000", 100)).toList
val records = recordsToStrings(dataGen.generateInserts("000", 100)).asScala
val inputDF = spark.read.json(spark.sparkContext.parallelize(records, 2))
val commonOptsNoPreCombine = Map(
@@ -594,7 +604,7 @@ class TestMORDataSource extends HoodieClientTestBase with SparkDatasetMixin {
val N = 20
// Test query with partition prune if URL_ENCODE_PARTITIONING has enable
val records1 = dataGen.generateInsertsContainsAllPartitions("000", N)
val inputDF1 = spark.read.json(spark.sparkContext.parallelize(recordsToStrings(records1), 2))
val inputDF1 = spark.read.json(spark.sparkContext.parallelize(recordsToStrings(records1).asScala, 2))
inputDF1.write.format("hudi")
.options(commonOpts)
.option(DataSourceWriteOptions.OPERATION.key, DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL)
@@ -624,7 +634,7 @@ class TestMORDataSource extends HoodieClientTestBase with SparkDatasetMixin {
// Second write with Append mode
val records2 = dataGen.generateInsertsContainsAllPartitions("000", N + 1)
val inputDF2 = spark.read.json(spark.sparkContext.parallelize(recordsToStrings(records2), 2))
val inputDF2 = spark.read.json(spark.sparkContext.parallelize(recordsToStrings(records2).asScala, 2))
inputDF2.write.format("hudi")
.options(commonOpts)
.option(DataSourceWriteOptions.OPERATION.key, DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL)
@@ -646,8 +656,8 @@ class TestMORDataSource extends HoodieClientTestBase with SparkDatasetMixin {
def testMORPartitionPrune(partitionEncode: Boolean, hiveStylePartition: Boolean): Unit = {
val partitions = Array("2021/03/01", "2021/03/02", "2021/03/03", "2021/03/04", "2021/03/05")
val newDataGen = new HoodieTestDataGenerator(partitions)
val records1 = newDataGen.generateInsertsContainsAllPartitions("000", 100)
val inputDF1 = spark.read.json(spark.sparkContext.parallelize(recordsToStrings(records1), 2))
val records1 = newDataGen.generateInsertsContainsAllPartitions("000", 100).asScala
val inputDF1 = spark.read.json(spark.sparkContext.parallelize(recordsToStrings(records1.asJava).asScala, 2))
val partitionCounts = partitions.map(p => p -> records1.count(r => r.getPartitionPath == p)).toMap
@@ -676,7 +686,7 @@ class TestMORDataSource extends HoodieClientTestBase with SparkDatasetMixin {
.load(basePath)
.filter("partition != '2021/03/01'")
.count()
assertEquals(records1.size() - partitionCounts("2021/03/01"), count3)
assertEquals(records1.size - partitionCounts("2021/03/01"), count3)
val count4 = spark.read.format("hudi")
.load(basePath)
@@ -688,7 +698,7 @@ class TestMORDataSource extends HoodieClientTestBase with SparkDatasetMixin {
.load(basePath)
.filter("partition like '%2021/03/%'")
.count()
assertEquals(records1.size(), count5)
assertEquals(records1.size, count5)
val count6 = spark.read.format("hudi")
.load(basePath)
@@ -708,7 +718,7 @@ class TestMORDataSource extends HoodieClientTestBase with SparkDatasetMixin {
def testReadPathsForMergeOnReadTable(): Unit = {
// Paths only baseFiles
val records1 = dataGen.generateInserts("001", 100)
val inputDF1 = spark.read.json(spark.sparkContext.parallelize(recordsToStrings(records1), 2))
val inputDF1 = spark.read.json(spark.sparkContext.parallelize(recordsToStrings(records1).asScala, 2))
inputDF1.write.format("org.apache.hudi")
.options(commonOpts)
.option("hoodie.compact.inline", "false") // else fails due to compaction & deltacommit instant times being same
@@ -722,7 +732,7 @@ class TestMORDataSource extends HoodieClientTestBase with SparkDatasetMixin {
.map(_.getPath.toString)
.mkString(",")
val records2 = dataGen.generateUniqueDeleteRecords("002", 100)
val inputDF2: Dataset[Row] = spark.read.json(spark.sparkContext.parallelize(recordsToStrings(records2), 2))
val inputDF2: Dataset[Row] = spark.read.json(spark.sparkContext.parallelize(recordsToStrings(records2).asScala, 2))
inputDF2.write.format("org.apache.hudi")
.options(commonOpts)
.mode(SaveMode.Append)
@@ -754,7 +764,7 @@ class TestMORDataSource extends HoodieClientTestBase with SparkDatasetMixin {
def testReadPathsForOnlyLogFiles(): Unit = {
initMetaClient(HoodieTableType.MERGE_ON_READ)
val records1 = dataGen.generateInsertsContainsAllPartitions("000", 20)
val inputDF1 = spark.read.json(spark.sparkContext.parallelize(recordsToStrings(records1), 2))
val inputDF1 = spark.read.json(spark.sparkContext.parallelize(recordsToStrings(records1).asScala, 2))
inputDF1.write.format("hudi")
.options(commonOpts)
.option(DataSourceWriteOptions.OPERATION.key, DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL)
@@ -772,7 +782,7 @@ class TestMORDataSource extends HoodieClientTestBase with SparkDatasetMixin {
.mkString(",")
val records2 = dataGen.generateInsertsContainsAllPartitions("000", 20)
val inputDF2 = spark.read.json(spark.sparkContext.parallelize(recordsToStrings(records2), 2))
val inputDF2 = spark.read.json(spark.sparkContext.parallelize(recordsToStrings(records2).asScala, 2))
inputDF2.write.format("hudi")
.options(commonOpts)
.option(DataSourceWriteOptions.OPERATION.key, DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL)
@@ -798,7 +808,7 @@ class TestMORDataSource extends HoodieClientTestBase with SparkDatasetMixin {
def testReadLogOnlyMergeOnReadTable(): Unit = {
initMetaClient(HoodieTableType.MERGE_ON_READ)
val records1 = dataGen.generateInsertsContainsAllPartitions("000", 20)
val inputDF = spark.read.json(spark.sparkContext.parallelize(recordsToStrings(records1), 2))
val inputDF = spark.read.json(spark.sparkContext.parallelize(recordsToStrings(records1).asScala, 2))
inputDF.write.format("hudi")
.options(commonOpts)
.option(DataSourceWriteOptions.OPERATION.key, DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL)
@@ -817,7 +827,7 @@ class TestMORDataSource extends HoodieClientTestBase with SparkDatasetMixin {
@Test
def testTempFilesCleanForClustering(): Unit = {
val records1 = recordsToStrings(dataGen.generateInserts("001", 1000)).toList
val records1 = recordsToStrings(dataGen.generateInserts("001", 1000)).asScala
val inputDF1: Dataset[Row] = spark.read.json(spark.sparkContext.parallelize(records1, 2))
inputDF1.write.format("org.apache.hudi")
.options(commonOpts)
@@ -835,7 +845,7 @@ class TestMORDataSource extends HoodieClientTestBase with SparkDatasetMixin {
@Test
def testClusteringOnNullableColumn(): Unit = {
val records1 = recordsToStrings(dataGen.generateInserts("001", 1000)).toList
val records1 = recordsToStrings(dataGen.generateInserts("001", 1000)).asScala
val inputDF1: Dataset[Row] = spark.read.json(spark.sparkContext.parallelize(records1, 2))
.withColumn("cluster_id", when(expr("end_lon < 0.2 "), lit(null).cast("string"))
.otherwise(col("_row_key")))
@@ -860,7 +870,7 @@ class TestMORDataSource extends HoodieClientTestBase with SparkDatasetMixin {
val numRecords = 100
val numRecordsToDelete = 2
val schema = HoodieTestDataGenerator.SHORT_TRIP_SCHEMA
val records0 = recordsToStrings(dataGen.generateInsertsAsPerSchema("000", numRecords, schema)).toList
val records0 = recordsToStrings(dataGen.generateInsertsAsPerSchema("000", numRecords, schema)).asScala
val inputDF0 = spark.read.json(spark.sparkContext.parallelize(records0, 2))
inputDF0.write.format("org.apache.hudi")
.options(commonOpts)
@@ -911,7 +921,7 @@ class TestMORDataSource extends HoodieClientTestBase with SparkDatasetMixin {
)
val dataGen1 = new HoodieTestDataGenerator(Array("2022-01-01"))
val records1 = recordsToStrings(dataGen1.generateInserts("001", 50)).toList
val records1 = recordsToStrings(dataGen1.generateInserts("001", 50)).asScala
val inputDF1 = spark.read.json(spark.sparkContext.parallelize(records1, 2))
inputDF1.write.format("org.apache.hudi")
.options(options)
@@ -924,7 +934,7 @@ class TestMORDataSource extends HoodieClientTestBase with SparkDatasetMixin {
val commit1Time = metaClient.getActiveTimeline.lastInstant().get().getTimestamp
val dataGen2 = new HoodieTestDataGenerator(Array("2022-01-02"))
val records2 = recordsToStrings(dataGen2.generateInserts("002", 60)).toList
val records2 = recordsToStrings(dataGen2.generateInserts("002", 60)).asScala
val inputDF2 = spark.read.json(spark.sparkContext.parallelize(records2, 2))
inputDF2.write.format("org.apache.hudi")
.options(options)
@@ -932,7 +942,7 @@ class TestMORDataSource extends HoodieClientTestBase with SparkDatasetMixin {
.save(basePath)
val commit2Time = metaClient.reloadActiveTimeline.lastInstant().get().getTimestamp
val records3 = recordsToStrings(dataGen2.generateUniqueUpdates("003", 20)).toList
val records3 = recordsToStrings(dataGen2.generateUniqueUpdates("003", 20)).asScala
val inputDF3 = spark.read.json(spark.sparkContext.parallelize(records3, 2))
inputDF3.write.format("org.apache.hudi")
.options(options)

View File

@@ -18,6 +18,7 @@
package org.apache.hudi.functional
import org.apache.avro.Schema
import org.apache.hudi.HoodieBaseRelation.projectSchema
import org.apache.hudi.common.config.HoodieMetadataConfig
import org.apache.hudi.common.model.{HoodieRecord, OverwriteNonDefaultsWithLatestAvroPayload}
import org.apache.hudi.common.table.HoodieTableConfig
@@ -333,7 +334,7 @@ class TestParquetColumnProjection extends SparkClientFunctionalTestHarness with
}
val readColumns = targetColumns ++ relation.mandatoryFields
val (_, projectedStructType, _) = HoodieSparkUtils.getRequiredSchema(tableState.schema, readColumns)
val (_, projectedStructType, _) = projectSchema(Left(tableState.schema), readColumns)
val row: InternalRow = rows.take(1).head

View File

@@ -0,0 +1,61 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.avro
import org.apache.avro.generic.GenericData
import org.apache.hudi.SparkAdapterSupport
import org.apache.hudi.avro.model.{HoodieMetadataColumnStats, IntWrapper}
import org.apache.spark.sql.avro.SchemaConverters.SchemaType
import org.junit.jupiter.api.Assertions.assertEquals
import org.junit.jupiter.api.Test
class TestAvroSerDe extends SparkAdapterSupport {
@Test
def testAvroUnionSerDe(): Unit = {
val originalAvroRecord = {
val minValue = new GenericData.Record(IntWrapper.SCHEMA$)
minValue.put("value", 9)
val maxValue = new GenericData.Record(IntWrapper.SCHEMA$)
maxValue.put("value", 10)
val record = new GenericData.Record(HoodieMetadataColumnStats.SCHEMA$)
record.put("fileName", "9388c460-4ace-4274-9a0b-d44606af60af-0_2-25-35_20220520154514641.parquet")
record.put("columnName", "c8")
record.put("minValue", minValue)
record.put("maxValue", maxValue)
record.put("valueCount", 10L)
record.put("nullCount", 0L)
record.put("totalSize", 94L)
record.put("totalUncompressedSize", 54L)
record.put("isDeleted", false)
record
}
val avroSchema = HoodieMetadataColumnStats.SCHEMA$
val SchemaType(catalystSchema, _) = SchemaConverters.toSqlType(avroSchema)
val deserializer = sparkAdapter.createAvroDeserializer(avroSchema, catalystSchema)
val serializer = sparkAdapter.createAvroSerializer(catalystSchema, avroSchema, nullable = false)
val row = deserializer.deserialize(originalAvroRecord).get
val deserializedAvroRecord = serializer.serialize(row)
assertEquals(originalAvroRecord, deserializedAvroRecord)
}
}

View File

@@ -0,0 +1,40 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.avro
import org.apache.hudi.avro.model.HoodieMetadataColumnStats
import org.apache.spark.sql.avro.SchemaConverters.SchemaType
import org.junit.Test
import org.junit.jupiter.api.Assertions.assertEquals
class TestSchemaConverters {
@Test
def testAvroUnionConversion(): Unit = {
val originalAvroSchema = HoodieMetadataColumnStats.SCHEMA$
val SchemaType(convertedStructType, _) = SchemaConverters.toSqlType(originalAvroSchema)
val convertedAvroSchema = SchemaConverters.toAvroType(convertedStructType)
// NOTE: Here we're validating that converting Avro -> Catalyst and Catalyst -> Avro are inverse
// transformations, but since it's not an easy endeavor to match Avro scehams, we match
// derived Catalyst schemas instead
assertEquals(convertedStructType, SchemaConverters.toSqlType(convertedAvroSchema).dataType)
}
}

View File

@@ -0,0 +1,121 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.hudi
import org.apache.hudi.{HoodieSparkUtils, SparkAdapterSupport}
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.execution.{FileSourceScanExec, ProjectExec, RowDataSourceScanExec, SparkPlan}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}
import org.junit.jupiter.api.Assertions.{assertEquals, assertTrue}
class TestNestedSchemaPruningOptimization extends HoodieSparkSqlTestBase with SparkAdapterSupport {
private def explain(plan: LogicalPlan): String = {
val explainCommand = sparkAdapter.getCatalystPlanUtils.createExplainCommand(plan, extended = true)
executePlan(explainCommand)
.executeCollect()
.mkString("\n")
}
private def executePlan(plan: LogicalPlan): SparkPlan =
spark.sessionState.executePlan(plan).executedPlan
test("Test NestedSchemaPruning optimization (COW/MOR)") {
withTempDir { tmp =>
// NOTE: This tests are only relevant for Spark >= 3.1
// TODO extract tests into a separate spark-version-specific module
if (HoodieSparkUtils.gteqSpark3_1) {
Seq("cow", "mor").foreach { tableType =>
val tableName = generateTableName
val tablePath = s"${tmp.getCanonicalPath}/$tableName"
spark.sql(
s"""
|CREATE TABLE $tableName (
| id int,
| item STRUCT<name: string, price: double>,
| ts long
|) USING HUDI TBLPROPERTIES (
| type = '$tableType',
| primaryKey = 'id',
| preCombineField = 'ts',
| hoodie.populate.meta.fields = 'false'
|)
|LOCATION '$tablePath'
""".stripMargin)
spark.sql(
s"""
|INSERT INTO $tableName
|SELECT 1 AS id, named_struct('name', 'a1', 'price', 10) AS item, 123456 AS ts
""".stripMargin)
val selectDF = spark.sql(s"SELECT id, item.name FROM $tableName")
val expectedSchema = StructType(Seq(
StructField("id", IntegerType),
StructField("item" , StructType(Seq(StructField("name", StringType))))
))
spark.sessionState.conf.setConf(SQLConf.WHOLESTAGE_CODEGEN_ENABLED, false)
val expectedReadSchemaClause = "ReadSchema: struct<id:int,item:struct<name:string>>"
val hint =
s"""
|Following is expected to be present in the plan (where ReadSchema has properly pruned nested structs, which
|is an optimization performed by NestedSchemaPruning rule):
|
|== Physical Plan ==
|*(1) Project [id#45, item#46.name AS name#55]
|+- FileScan parquet default.h0[id#45,item#46] Batched: false, DataFilters: [], Format: Parquet, Location: HoodieFileIndex(1 paths)[file:/private/var/folders/kb/cnff55vj041g2nnlzs5ylqk00000gn/T/spark-7137..., PartitionFilters: [], PushedFilters: [], $expectedReadSchemaClause
|]
|""".stripMargin
val executedPlan = executePlan(selectDF.logicalPlan)
// NOTE: Unfortunately, we can't use pattern-matching to extract required fields, due to a need to maintain
// compatibility w/ Spark 2.4
executedPlan match {
// COW
case ProjectExec(_, fileScan: FileSourceScanExec) =>
val tableIdentifier = fileScan.tableIdentifier
val requiredSchema = fileScan.requiredSchema
assertEquals(tableName, tableIdentifier.get.table)
assertEquals(expectedSchema, requiredSchema, hint)
// MOR
case ProjectExec(_, dataScan: RowDataSourceScanExec) =>
// NOTE: This is temporary solution to assert for Spark 2.4, until it's deprecated
val explainedPlan = explain(selectDF.queryExecution.logical)
assertTrue(explainedPlan.contains(expectedReadSchemaClause))
// TODO replace w/ after Spark 2.4 deprecation
//val tableIdentifier = dataScan.tableIdentifier
//val requiredSchema = dataScan.requiredSchema
//
//assertEquals(tableName, tableIdentifier.get.table)
//assertEquals(expectedSchema, requiredSchema, hint)
}
}
}
}
}
}

View File

@@ -0,0 +1,73 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql
import org.apache.spark.sql.catalyst.{AliasIdentifier, TableIdentifier}
import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation
import org.apache.spark.sql.catalyst.expressions.{Expression, Like}
import org.apache.spark.sql.catalyst.plans.JoinType
import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoTable, Join, LogicalPlan}
import org.apache.spark.sql.execution.command.ExplainCommand
object HoodieSpark2CatalystPlanUtils extends HoodieCatalystPlansUtils {
def createExplainCommand(plan: LogicalPlan, extended: Boolean): LogicalPlan =
ExplainCommand(plan, extended = extended)
override def toTableIdentifier(aliasId: AliasIdentifier): TableIdentifier = {
TableIdentifier(aliasId.identifier, aliasId.database)
}
override def toTableIdentifier(relation: UnresolvedRelation): TableIdentifier = {
relation.tableIdentifier
}
override def createJoin(left: LogicalPlan, right: LogicalPlan, joinType: JoinType): Join = {
Join(left, right, joinType, None)
}
override def isInsertInto(plan: LogicalPlan): Boolean = {
plan.isInstanceOf[InsertIntoTable]
}
override def getInsertIntoChildren(plan: LogicalPlan):
Option[(LogicalPlan, Map[String, Option[String]], LogicalPlan, Boolean, Boolean)] = {
plan match {
case InsertIntoTable(table, partition, query, overwrite, ifPartitionNotExists) =>
Some((table, partition, query, overwrite, ifPartitionNotExists))
case _=> None
}
}
override def createInsertInto(table: LogicalPlan, partition: Map[String, Option[String]],
query: LogicalPlan, overwrite: Boolean, ifPartitionNotExists: Boolean): LogicalPlan = {
InsertIntoTable(table, partition, query, overwrite, ifPartitionNotExists)
}
override def createLike(left: Expression, right: Expression): Expression = {
Like(left, right)
}
override def isRelationTimeTravel(plan: LogicalPlan): Boolean = {
false
}
override def getRelationTimeTravel(plan: LogicalPlan): Option[(LogicalPlan, Option[Expression], Option[String])] = {
throw new IllegalStateException(s"Should not call getRelationTimeTravel for spark2")
}
}

View File

@@ -21,22 +21,17 @@ package org.apache.spark.sql.adapter
import org.apache.avro.Schema
import org.apache.hudi.Spark2RowSerDe
import org.apache.hudi.client.utils.SparkRowSerDe
import org.apache.spark.sql.avro.{HoodieAvroDeserializer, HoodieAvroSchemaConverters, HoodieAvroSerializer, HoodieSpark2_4AvroDeserializer, HoodieSpark2_4AvroSerializer, HoodieSparkAvroSchemaConverters}
import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation
import org.apache.spark.sql.avro._
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
import org.apache.spark.sql.catalyst.expressions.{Expression, InterpretedPredicate, Like}
import org.apache.spark.sql.catalyst.expressions.{Expression, InterpretedPredicate}
import org.apache.spark.sql.catalyst.parser.ParserInterface
import org.apache.spark.sql.catalyst.plans.JoinType
import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoTable, Join, LogicalPlan}
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.catalyst.{AliasIdentifier, TableIdentifier}
import org.apache.spark.sql.execution.datasources.parquet.{ParquetFileFormat, Spark24HoodieParquetFileFormat}
import org.apache.spark.sql.execution.datasources.{FilePartition, PartitionedFile, Spark2ParsePartitionUtil, SparkParsePartitionUtil}
import org.apache.spark.sql.hudi.SparkAdapter
import org.apache.spark.sql.hudi.parser.HoodieSpark2ExtendedSqlParser
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types.DataType
import org.apache.spark.sql.{HoodieCatalystExpressionUtils, HoodieSpark2CatalystExpressionUtils, Row, SparkSession}
import org.apache.spark.sql.{HoodieCatalystExpressionUtils, HoodieCatalystPlansUtils, HoodieSpark2CatalystExpressionUtils, HoodieSpark2CatalystPlanUtils, Row, SparkSession}
import scala.collection.mutable.ArrayBuffer
@@ -45,7 +40,9 @@ import scala.collection.mutable.ArrayBuffer
*/
class Spark2Adapter extends SparkAdapter {
override def createCatalystExpressionUtils(): HoodieCatalystExpressionUtils = HoodieSpark2CatalystExpressionUtils
override def getCatalystExpressionUtils(): HoodieCatalystExpressionUtils = HoodieSpark2CatalystExpressionUtils
override def getCatalystPlanUtils: HoodieCatalystPlansUtils = HoodieSpark2CatalystPlanUtils
override def createAvroSerializer(rootCatalystType: DataType, rootAvroType: Schema, nullable: Boolean): HoodieAvroSerializer =
new HoodieSpark2_4AvroSerializer(rootCatalystType, rootAvroType, nullable)
@@ -59,36 +56,6 @@ class Spark2Adapter extends SparkAdapter {
new Spark2RowSerDe(encoder)
}
override def toTableIdentifier(aliasId: AliasIdentifier): TableIdentifier = {
TableIdentifier(aliasId.identifier, aliasId.database)
}
override def toTableIdentifier(relation: UnresolvedRelation): TableIdentifier = {
relation.tableIdentifier
}
override def createJoin(left: LogicalPlan, right: LogicalPlan, joinType: JoinType): Join = {
Join(left, right, joinType, None)
}
override def isInsertInto(plan: LogicalPlan): Boolean = {
plan.isInstanceOf[InsertIntoTable]
}
override def getInsertIntoChildren(plan: LogicalPlan):
Option[(LogicalPlan, Map[String, Option[String]], LogicalPlan, Boolean, Boolean)] = {
plan match {
case InsertIntoTable(table, partition, query, overwrite, ifPartitionNotExists) =>
Some((table, partition, query, overwrite, ifPartitionNotExists))
case _=> None
}
}
override def createInsertInto(table: LogicalPlan, partition: Map[String, Option[String]],
query: LogicalPlan, overwrite: Boolean, ifPartitionNotExists: Boolean): LogicalPlan = {
InsertIntoTable(table, partition, query, overwrite, ifPartitionNotExists)
}
override def createExtendedSparkParser: Option[(SparkSession, ParserInterface) => ParserInterface] = {
Some(
(spark: SparkSession, delegate: ParserInterface) => new HoodieSpark2ExtendedSqlParser(spark, delegate)
@@ -97,10 +64,6 @@ class Spark2Adapter extends SparkAdapter {
override def createSparkParsePartitionUtil(conf: SQLConf): SparkParsePartitionUtil = new Spark2ParsePartitionUtil
override def createLike(left: Expression, right: Expression): Expression = {
Like(left, right)
}
override def parseMultipartIdentifier(parser: ParserInterface, sqlText: String): Seq[String] = {
throw new IllegalStateException(s"Should not call ParserInterface#parseMultipartIdentifier for spark2")
}
@@ -145,20 +108,6 @@ class Spark2Adapter extends SparkAdapter {
partitions.toSeq
}
/**
* if the logical plan is a TimeTravelRelation LogicalPlan.
*/
override def isRelationTimeTravel(plan: LogicalPlan): Boolean = {
false
}
/**
* Get the member of the TimeTravelRelation LogicalPlan.
*/
override def getRelationTimeTravel(plan: LogicalPlan): Option[(LogicalPlan, Option[Expression], Option[String])] = {
throw new IllegalStateException(s"Should not call getRelationTimeTravel for spark2")
}
override def createHoodieParquetFileFormat(appendPartitionValues: Boolean): Option[ParquetFileFormat] = {
Some(new Spark24HoodieParquetFileFormat(appendPartitionValues))
}

View File

@@ -0,0 +1,77 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql
import org.apache.hudi.spark3.internal.ReflectUtil
import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation
import org.apache.spark.sql.catalyst.expressions.{Expression, Like}
import org.apache.spark.sql.catalyst.plans.JoinType
import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoStatement, Join, JoinHint, LogicalPlan}
import org.apache.spark.sql.catalyst.{AliasIdentifier, TableIdentifier}
import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._
import org.apache.spark.sql.execution.command.ExplainCommand
import org.apache.spark.sql.execution.{ExtendedMode, SimpleMode}
abstract class HoodieSpark3CatalystPlanUtils extends HoodieCatalystPlansUtils {
def createExplainCommand(plan: LogicalPlan, extended: Boolean): LogicalPlan =
ExplainCommand(plan, mode = if (extended) ExtendedMode else SimpleMode)
override def toTableIdentifier(aliasId: AliasIdentifier): TableIdentifier = {
aliasId match {
case AliasIdentifier(name, Seq(database)) =>
TableIdentifier(name, Some(database))
case AliasIdentifier(name, Seq(_, database)) =>
TableIdentifier(name, Some(database))
case AliasIdentifier(name, Seq()) =>
TableIdentifier(name, None)
case _ => throw new IllegalArgumentException(s"Cannot cast $aliasId to TableIdentifier")
}
}
override def toTableIdentifier(relation: UnresolvedRelation): TableIdentifier = {
relation.multipartIdentifier.asTableIdentifier
}
override def createJoin(left: LogicalPlan, right: LogicalPlan, joinType: JoinType): Join = {
Join(left, right, joinType, None, JoinHint.NONE)
}
override def isInsertInto(plan: LogicalPlan): Boolean = {
plan.isInstanceOf[InsertIntoStatement]
}
override def getInsertIntoChildren(plan: LogicalPlan):
Option[(LogicalPlan, Map[String, Option[String]], LogicalPlan, Boolean, Boolean)] = {
plan match {
case insert: InsertIntoStatement =>
Some((insert.table, insert.partitionSpec, insert.query, insert.overwrite, insert.ifPartitionNotExists))
case _ =>
None
}
}
override def createInsertInto(table: LogicalPlan, partition: Map[String, Option[String]],
query: LogicalPlan, overwrite: Boolean, ifPartitionNotExists: Boolean): LogicalPlan = {
ReflectUtil.createInsertInto(table, partition, Seq.empty[String], query, overwrite, ifPartitionNotExists)
}
override def createLike(left: Expression, right: Expression): Expression = {
new Like(left, right)
}
}

View File

@@ -1,45 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql
import org.apache.hudi.exception.HoodieException
import org.apache.spark.sql.catalyst.catalog.BucketSpec
import org.apache.spark.sql.connector.expressions.{BucketTransform, FieldReference, IdentityTransform, Transform}
import scala.collection.mutable
object HoodieSpark3SqlUtils {
def convertTransforms(partitions: Seq[Transform]): (Seq[String], Option[BucketSpec]) = {
val identityCols = new mutable.ArrayBuffer[String]
var bucketSpec = Option.empty[BucketSpec]
partitions.map {
case IdentityTransform(FieldReference(Seq(col))) =>
identityCols += col
case BucketTransform(numBuckets, FieldReference(Seq(col))) =>
bucketSpec = Some(BucketSpec(numBuckets, col :: Nil, Nil))
case _ =>
throw new HoodieException(s"Partitioning by expressions is not supported.")
}
(identityCols, bucketSpec)
}
}

View File

@@ -19,24 +19,20 @@ package org.apache.spark.sql.adapter
import org.apache.hudi.Spark3RowSerDe
import org.apache.hudi.client.utils.SparkRowSerDe
import org.apache.hudi.spark3.internal.ReflectUtil
import org.apache.spark.SPARK_VERSION
import org.apache.spark.internal.Logging
import org.apache.spark.sql.avro.{HoodieAvroSchemaConverters, HoodieSparkAvroSchemaConverters}
import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
import org.apache.spark.sql.catalyst.expressions.{Expression, InterpretedPredicate, Like, Predicate}
import org.apache.spark.sql.catalyst.expressions.{Expression, InterpretedPredicate, Predicate}
import org.apache.spark.sql.catalyst.parser.ParserInterface
import org.apache.spark.sql.catalyst.plans.JoinType
import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoStatement, Join, JoinHint, LogicalPlan}
import org.apache.spark.sql.catalyst.{AliasIdentifier, TableIdentifier}
import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.connector.catalog.Table
import org.apache.spark.sql.execution.datasources._
import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
import org.apache.spark.sql.hudi.SparkAdapter
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.sql.{HoodieCatalystPlansUtils, HoodieSpark3CatalystPlanUtils, Row, SparkSession}
import scala.util.control.NonFatal
@@ -51,53 +47,10 @@ abstract class BaseSpark3Adapter extends SparkAdapter with Logging {
override def getAvroSchemaConverters: HoodieAvroSchemaConverters = HoodieSparkAvroSchemaConverters
override def toTableIdentifier(aliasId: AliasIdentifier): TableIdentifier = {
aliasId match {
case AliasIdentifier(name, Seq(database)) =>
TableIdentifier(name, Some(database))
case AliasIdentifier(name, Seq(_, database)) =>
TableIdentifier(name, Some(database))
case AliasIdentifier(name, Seq()) =>
TableIdentifier(name, None)
case _=> throw new IllegalArgumentException(s"Cannot cast $aliasId to TableIdentifier")
}
}
override def toTableIdentifier(relation: UnresolvedRelation): TableIdentifier = {
relation.multipartIdentifier.asTableIdentifier
}
override def createJoin(left: LogicalPlan, right: LogicalPlan, joinType: JoinType): Join = {
Join(left, right, joinType, None, JoinHint.NONE)
}
override def isInsertInto(plan: LogicalPlan): Boolean = {
plan.isInstanceOf[InsertIntoStatement]
}
override def getInsertIntoChildren(plan: LogicalPlan):
Option[(LogicalPlan, Map[String, Option[String]], LogicalPlan, Boolean, Boolean)] = {
plan match {
case insert: InsertIntoStatement =>
Some((insert.table, insert.partitionSpec, insert.query, insert.overwrite, insert.ifPartitionNotExists))
case _ =>
None
}
}
override def createInsertInto(table: LogicalPlan, partition: Map[String, Option[String]],
query: LogicalPlan, overwrite: Boolean, ifPartitionNotExists: Boolean): LogicalPlan = {
ReflectUtil.createInsertInto(table, partition, Seq.empty[String], query, overwrite, ifPartitionNotExists)
}
override def createSparkParsePartitionUtil(conf: SQLConf): SparkParsePartitionUtil = {
new Spark3ParsePartitionUtil(conf)
}
override def createLike(left: Expression, right: Expression): Expression = {
new Like(left, right)
}
override def parseMultipartIdentifier(parser: ParserInterface, sqlText: String): Seq[String] = {
parser.parseMultipartIdentifier(sqlText)
}
@@ -117,7 +70,7 @@ abstract class BaseSpark3Adapter extends SparkAdapter with Logging {
case LogicalRelation(_, _, Some(table), _) => isHoodieTable(table)
case relation: UnresolvedRelation =>
try {
isHoodieTable(toTableIdentifier(relation), spark)
isHoodieTable(getCatalystPlanUtils.toTableIdentifier(relation), spark)
} catch {
case NonFatal(e) =>
logWarning("Failed to determine whether the table is a hoodie table", e)
@@ -128,19 +81,6 @@ abstract class BaseSpark3Adapter extends SparkAdapter with Logging {
}
}
/**
* if the logical plan is a TimeTravelRelation LogicalPlan.
*/
override def isRelationTimeTravel(plan: LogicalPlan): Boolean = {
false
}
/**
* Get the member of the TimeTravelRelation LogicalPlan.
*/
override def getRelationTimeTravel(plan: LogicalPlan): Option[(LogicalPlan, Option[Expression], Option[String])] = {
throw new IllegalStateException(s"Should not call getRelationTimeTravel for spark3.1.x")
}
override def createExtendedSparkParser: Option[(SparkSession, ParserInterface) => ParserInterface] = {
// since spark3.2.1 support datasourceV2, so we need to a new SqlParser to deal DDL statment
if (SPARK_VERSION.startsWith("3.1")) {

View File

@@ -0,0 +1,197 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.execution.datasources
import org.apache.hudi.HoodieBaseRelation
import org.apache.spark.sql.catalyst.expressions.{And, AttributeReference, Expression, NamedExpression, ProjectionOverSchema}
import org.apache.spark.sql.catalyst.planning.PhysicalOperation
import org.apache.spark.sql.catalyst.plans.logical.{Filter, LeafNode, LogicalPlan, Project}
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.execution.datasources.orc.OrcFileFormat
import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat
import org.apache.spark.sql.sources.BaseRelation
import org.apache.spark.sql.types.{ArrayType, DataType, MapType, StructType}
import org.apache.spark.sql.util.SchemaUtils.restoreOriginalOutputNames
/**
* Prunes unnecessary physical columns given a [[PhysicalOperation]] over a data source relation.
* By "physical column", we mean a column as defined in the data source format like Parquet format
* or ORC format. For example, in Spark SQL, a root-level Parquet column corresponds to a SQL
* column, and a nested Parquet column corresponds to a [[StructField]].
*
* NOTE: This class is borrowed from Spark 3.2.1, with modifications adapting it to handle [[HoodieBaseRelation]],
* instead of [[HadoopFsRelation]]
*/
class NestedSchemaPruning extends Rule[LogicalPlan] {
import org.apache.spark.sql.catalyst.expressions.SchemaPruning._
override def apply(plan: LogicalPlan): LogicalPlan =
if (conf.nestedSchemaPruningEnabled) {
apply0(plan)
} else {
plan
}
private def apply0(plan: LogicalPlan): LogicalPlan =
plan transformDown {
case op @ PhysicalOperation(projects, filters,
// NOTE: This is modified to accommodate for Hudi's custom relations, given that original
// [[NestedSchemaPruning]] rule is tightly coupled w/ [[HadoopFsRelation]]
// TODO generalize to any file-based relation
l @ LogicalRelation(relation: HoodieBaseRelation, _, _, _))
if relation.canPruneRelationSchema =>
prunePhysicalColumns(l.output, projects, filters, relation.dataSchema,
prunedDataSchema => {
val prunedRelation =
relation.updatePrunedDataSchema(prunedSchema = prunedDataSchema)
buildPrunedRelation(l, prunedRelation)
}).getOrElse(op)
}
/**
* This method returns optional logical plan. `None` is returned if no nested field is required or
* all nested fields are required.
*/
private def prunePhysicalColumns(output: Seq[AttributeReference],
projects: Seq[NamedExpression],
filters: Seq[Expression],
dataSchema: StructType,
outputRelationBuilder: StructType => LogicalRelation): Option[LogicalPlan] = {
val (normalizedProjects, normalizedFilters) =
normalizeAttributeRefNames(output, projects, filters)
val requestedRootFields = identifyRootFields(normalizedProjects, normalizedFilters)
// If requestedRootFields includes a nested field, continue. Otherwise,
// return op
if (requestedRootFields.exists { root: RootField => !root.derivedFromAtt }) {
val prunedDataSchema = pruneDataSchema(dataSchema, requestedRootFields)
// If the data schema is different from the pruned data schema, continue. Otherwise,
// return op. We effect this comparison by counting the number of "leaf" fields in
// each schemata, assuming the fields in prunedDataSchema are a subset of the fields
// in dataSchema.
if (countLeaves(dataSchema) > countLeaves(prunedDataSchema)) {
val prunedRelation = outputRelationBuilder(prunedDataSchema)
val projectionOverSchema = ProjectionOverSchema(prunedDataSchema)
Some(buildNewProjection(projects, normalizedProjects, normalizedFilters,
prunedRelation, projectionOverSchema))
} else {
None
}
} else {
None
}
}
/**
* Normalizes the names of the attribute references in the given projects and filters to reflect
* the names in the given logical relation. This makes it possible to compare attributes and
* fields by name. Returns a tuple with the normalized projects and filters, respectively.
*/
private def normalizeAttributeRefNames(output: Seq[AttributeReference],
projects: Seq[NamedExpression],
filters: Seq[Expression]): (Seq[NamedExpression], Seq[Expression]) = {
val normalizedAttNameMap = output.map(att => (att.exprId, att.name)).toMap
val normalizedProjects = projects.map(_.transform {
case att: AttributeReference if normalizedAttNameMap.contains(att.exprId) =>
att.withName(normalizedAttNameMap(att.exprId))
}).map { case expr: NamedExpression => expr }
val normalizedFilters = filters.map(_.transform {
case att: AttributeReference if normalizedAttNameMap.contains(att.exprId) =>
att.withName(normalizedAttNameMap(att.exprId))
})
(normalizedProjects, normalizedFilters)
}
/**
* Builds the new output [[Project]] Spark SQL operator that has the `leafNode`.
*/
private def buildNewProjection(projects: Seq[NamedExpression],
normalizedProjects: Seq[NamedExpression],
filters: Seq[Expression],
prunedRelation: LogicalRelation,
projectionOverSchema: ProjectionOverSchema): Project = {
// Construct a new target for our projection by rewriting and
// including the original filters where available
val projectionChild =
if (filters.nonEmpty) {
val projectedFilters = filters.map(_.transformDown {
case projectionOverSchema(expr) => expr
})
val newFilterCondition = projectedFilters.reduce(And)
Filter(newFilterCondition, prunedRelation)
} else {
prunedRelation
}
// Construct the new projections of our Project by
// rewriting the original projections
val newProjects = normalizedProjects.map(_.transformDown {
case projectionOverSchema(expr) => expr
}).map { case expr: NamedExpression => expr }
if (log.isDebugEnabled) {
logDebug(s"New projects:\n${newProjects.map(_.treeString).mkString("\n")}")
}
Project(restoreOriginalOutputNames(newProjects, projects.map(_.name)), projectionChild)
}
/**
* Builds a pruned logical relation from the output of the output relation and the schema of the
* pruned base relation.
*/
private def buildPrunedRelation(outputRelation: LogicalRelation,
prunedBaseRelation: BaseRelation): LogicalRelation = {
val prunedOutput = getPrunedOutput(outputRelation.output, prunedBaseRelation.schema)
outputRelation.copy(relation = prunedBaseRelation, output = prunedOutput)
}
// Prune the given output to make it consistent with `requiredSchema`.
private def getPrunedOutput(output: Seq[AttributeReference],
requiredSchema: StructType): Seq[AttributeReference] = {
// We need to replace the expression ids of the pruned relation output attributes
// with the expression ids of the original relation output attributes so that
// references to the original relation's output are not broken
val outputIdMap = output.map(att => (att.name, att.exprId)).toMap
requiredSchema
.toAttributes
.map {
case att if outputIdMap.contains(att.name) =>
att.withExprId(outputIdMap(att.name))
case att => att
}
}
/**
* Counts the "leaf" fields of the given dataType. Informally, this is the
* number of fields of non-complex data type in the tree representation of
* [[DataType]].
*/
private def countLeaves(dataType: DataType): Int = {
dataType match {
case array: ArrayType => countLeaves(array.elementType)
case map: MapType => countLeaves(map.keyType) + countLeaves(map.valueType)
case struct: StructType =>
struct.map(field => countLeaves(field.dataType)).sum
case _ => 1
}
}
}

View File

@@ -21,7 +21,7 @@ package org.apache.spark.sql
import HoodieSparkTypeUtils.isCastPreservingOrdering
import org.apache.spark.sql.catalyst.expressions.{Add, AttributeReference, BitwiseOr, Cast, DateAdd, DateDiff, DateFormatClass, DateSub, Divide, Exp, Expm1, Expression, FromUTCTimestamp, FromUnixTime, Log, Log10, Log1p, Log2, Lower, Multiply, ParseToDate, ParseToTimestamp, ShiftLeft, ShiftRight, ToUTCTimestamp, ToUnixTimestamp, Upper}
object HoodieSpark3_1CatalystExpressionUtils extends HoodieCatalystExpressionUtils {
object HoodieSpark31CatalystExpressionUtils extends HoodieCatalystExpressionUtils {
override def tryMatchAttributeOrderingPreservingTransformation(expr: Expression): Option[AttributeReference] = {
expr match {

View File

@@ -0,0 +1,31 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql
import org.apache.spark.sql.catalyst.expressions.Expression
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
object HoodieSpark31CatalystPlanUtils extends HoodieSpark3CatalystPlanUtils {
override def isRelationTimeTravel(plan: LogicalPlan): Boolean = false
override def getRelationTimeTravel(plan: LogicalPlan): Option[(LogicalPlan, Option[Expression], Option[String])] = {
throw new IllegalStateException(s"Should not call getRelationTimeTravel for Spark <= 3.2.x")
}
}

View File

@@ -19,21 +19,20 @@
package org.apache.spark.sql.adapter
import org.apache.avro.Schema
import org.apache.spark.SPARK_VERSION
import org.apache.spark.sql.avro.{HoodieAvroDeserializer, HoodieAvroSerializer, HoodieSpark3_1AvroDeserializer, HoodieSpark3_1AvroSerializer}
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.execution.datasources.parquet.{ParquetFileFormat, Spark31HoodieParquetFileFormat}
import org.apache.spark.sql.hudi.SparkAdapter
import org.apache.spark.sql.types.DataType
import org.apache.spark.sql.{HoodieCatalystExpressionUtils, HoodieSpark3_1CatalystExpressionUtils, SparkSession}
import org.apache.spark.sql.{HoodieCatalystExpressionUtils, HoodieCatalystPlansUtils, HoodieSpark31CatalystExpressionUtils, HoodieSpark31CatalystPlanUtils}
/**
* Implementation of [[SparkAdapter]] for Spark 3.1.x
*/
class Spark3_1Adapter extends BaseSpark3Adapter {
override def createCatalystExpressionUtils(): HoodieCatalystExpressionUtils = HoodieSpark3_1CatalystExpressionUtils
def getCatalystPlanUtils: HoodieCatalystPlansUtils = HoodieSpark31CatalystPlanUtils
override def getCatalystExpressionUtils(): HoodieCatalystExpressionUtils = HoodieSpark31CatalystExpressionUtils
override def createAvroSerializer(rootCatalystType: DataType, rootAvroType: Schema, nullable: Boolean): HoodieAvroSerializer =
new HoodieSpark3_1AvroSerializer(rootCatalystType, rootAvroType, nullable)

View File

@@ -325,6 +325,7 @@
<artifactId>junit-jupiter-api</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter-params</artifactId>

View File

@@ -20,7 +20,7 @@ package org.apache.spark.sql
import HoodieSparkTypeUtils.isCastPreservingOrdering
import org.apache.spark.sql.catalyst.expressions.{Add, AttributeReference, BitwiseOr, Cast, DateAdd, DateDiff, DateFormatClass, DateSub, Divide, Exp, Expm1, Expression, FromUTCTimestamp, FromUnixTime, Log, Log10, Log1p, Log2, Lower, Multiply, ParseToDate, ParseToTimestamp, ShiftLeft, ShiftRight, ToUTCTimestamp, ToUnixTimestamp, Upper}
object HoodieSpark3_2CatalystExpressionUtils extends HoodieCatalystExpressionUtils {
object HoodieSpark32CatalystExpressionUtils extends HoodieCatalystExpressionUtils {
override def tryMatchAttributeOrderingPreservingTransformation(expr: Expression): Option[AttributeReference] = {
expr match {

View File

@@ -0,0 +1,38 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql
import org.apache.spark.sql.catalyst.expressions.Expression
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, TimeTravelRelation}
object HoodieSpark32CatalystPlanUtils extends HoodieSpark3CatalystPlanUtils {
override def isRelationTimeTravel(plan: LogicalPlan): Boolean = {
plan.isInstanceOf[TimeTravelRelation]
}
override def getRelationTimeTravel(plan: LogicalPlan): Option[(LogicalPlan, Option[Expression], Option[String])] = {
plan match {
case timeTravel: TimeTravelRelation =>
Some((timeTravel.table, timeTravel.timestamp, timeTravel.version))
case _ =>
None
}
}
}

View File

@@ -19,47 +19,26 @@ package org.apache.spark.sql.adapter
import org.apache.avro.Schema
import org.apache.spark.sql.avro._
import org.apache.spark.sql.catalyst.expressions.Expression
import org.apache.spark.sql.catalyst.parser.ParserInterface
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.SPARK_VERSION
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.execution.datasources.parquet.{ParquetFileFormat, Spark32HoodieParquetFileFormat}
import org.apache.spark.sql.parser.HoodieSpark3_2ExtendedSqlParser
import org.apache.spark.sql.types.DataType
import org.apache.spark.sql.{HoodieCatalystExpressionUtils, HoodieSpark3_2CatalystExpressionUtils, SparkSession}
import org.apache.spark.sql._
/**
* Implementation of [[SparkAdapter]] for Spark 3.2.x branch
*/
class Spark3_2Adapter extends BaseSpark3Adapter {
def getCatalystPlanUtils: HoodieCatalystPlansUtils = HoodieSpark32CatalystPlanUtils
override def createAvroSerializer(rootCatalystType: DataType, rootAvroType: Schema, nullable: Boolean): HoodieAvroSerializer =
new HoodieSpark3_2AvroSerializer(rootCatalystType, rootAvroType, nullable)
override def createAvroDeserializer(rootAvroType: Schema, rootCatalystType: DataType): HoodieAvroDeserializer =
new HoodieSpark3_2AvroDeserializer(rootAvroType, rootCatalystType)
override def createCatalystExpressionUtils(): HoodieCatalystExpressionUtils = HoodieSpark3_2CatalystExpressionUtils
/**
* if the logical plan is a TimeTravelRelation LogicalPlan.
*/
override def isRelationTimeTravel(plan: LogicalPlan): Boolean = {
plan.isInstanceOf[TimeTravelRelation]
}
/**
* Get the member of the TimeTravelRelation LogicalPlan.
*/
override def getRelationTimeTravel(plan: LogicalPlan): Option[(LogicalPlan, Option[Expression], Option[String])] = {
plan match {
case timeTravel: TimeTravelRelation =>
Some((timeTravel.table, timeTravel.timestamp, timeTravel.version))
case _ =>
None
}
}
override def getCatalystExpressionUtils(): HoodieCatalystExpressionUtils = HoodieSpark32CatalystExpressionUtils
override def createExtendedSparkParser: Option[(SparkSession, ParserInterface) => ParserInterface] = {
Some(

View File

@@ -23,17 +23,17 @@ import org.apache.hudi.exception.HoodieException
import org.apache.hudi.sql.InsertMode
import org.apache.hudi.sync.common.util.ConfigUtils
import org.apache.hudi.{DataSourceReadOptions, DataSourceWriteOptions, SparkAdapterSupport}
import org.apache.spark.sql.HoodieSpark3SqlUtils.convertTransforms
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.analysis.{NoSuchTableException, TableAlreadyExistsException, UnresolvedAttribute}
import org.apache.spark.sql.catalyst.catalog.HoodieCatalogTable.needFilterProps
import org.apache.spark.sql.catalyst.catalog.{CatalogTable, CatalogTableType, CatalogUtils, HoodieCatalogTable}
import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogTable, CatalogTableType, CatalogUtils, HoodieCatalogTable}
import org.apache.spark.sql.connector.catalog.CatalogV2Implicits.IdentifierHelper
import org.apache.spark.sql.connector.catalog.TableChange.{AddColumn, ColumnChange, UpdateColumnComment, UpdateColumnType}
import org.apache.spark.sql.connector.catalog._
import org.apache.spark.sql.connector.expressions.Transform
import org.apache.spark.sql.connector.expressions.{BucketTransform, FieldReference, IdentityTransform, Transform}
import org.apache.spark.sql.execution.datasources.DataSource
import org.apache.spark.sql.hudi.analysis.HoodieV1OrV2Table
import org.apache.spark.sql.hudi.catalog.HoodieCatalog.convertTransforms
import org.apache.spark.sql.hudi.command._
import org.apache.spark.sql.hudi.{HoodieSqlCommonUtils, ProvidesHoodieConfig}
import org.apache.spark.sql.types.{StructField, StructType}
@@ -42,6 +42,7 @@ import org.apache.spark.sql.{Dataset, SaveMode, SparkSession, _}
import java.net.URI
import java.util
import scala.collection.JavaConverters.{mapAsJavaMapConverter, mapAsScalaMapConverter}
import scala.collection.mutable
class HoodieCatalog extends DelegatingCatalogExtension
with StagingTableCatalog
@@ -343,3 +344,24 @@ class HoodieCatalog extends DelegatingCatalogExtension
})
}
}
object HoodieCatalog {
def convertTransforms(partitions: Seq[Transform]): (Seq[String], Option[BucketSpec]) = {
val identityCols = new mutable.ArrayBuffer[String]
var bucketSpec = Option.empty[BucketSpec]
partitions.map {
case IdentityTransform(FieldReference(Seq(col))) =>
identityCols += col
case BucketTransform(numBuckets, FieldReference(Seq(col))) =>
bucketSpec = Some(BucketSpec(numBuckets, col :: Nil, Nil))
case _ =>
throw new HoodieException(s"Partitioning by expressions is not supported.")
}
(identityCols, bucketSpec)
}
}