1
0

[HUDI-2811] Support Spark 3.2 (#4270)

This commit is contained in:
Yann Byron
2021-12-28 16:12:44 +08:00
committed by GitHub
parent 32505d5adb
commit 05942e018c
36 changed files with 596 additions and 167 deletions

View File

@@ -578,14 +578,10 @@ case class HoodieFileIndex(
}.mkString("/")
val pathWithPartitionName = new Path(basePath, partitionWithName)
val partitionDataTypes = partitionSchema.fields.map(f => f.name -> f.dataType).toMap
val partitionValues = sparkParsePartitionUtil.parsePartition(pathWithPartitionName,
sparkParsePartitionUtil.parsePartition(pathWithPartitionName,
typeInference = false, Set(new Path(basePath)), partitionDataTypes,
DateTimeUtils.getTimeZone(timeZoneId))
// Convert partitionValues to InternalRow
partitionValues.map(_.literals.map(_.value))
.map(InternalRow.fromSeq)
.getOrElse(InternalRow.empty)
}
}
PartitionRowPath(partitionRow, partitionPath)

View File

@@ -48,7 +48,8 @@ import org.apache.spark.rdd.RDD
import org.apache.spark.sql.internal.{SQLConf, StaticSQLConf}
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.{DataFrame, Dataset, Row, SQLContext, SaveMode, SparkSession}
import org.apache.spark.{SPARK_VERSION, SparkContext}
import org.apache.spark.SparkContext
import java.util.Properties
import scala.collection.JavaConversions._
@@ -463,13 +464,13 @@ object HoodieSparkSqlWriter {
} else {
HoodieDatasetBulkInsertHelper.prepareHoodieDatasetForBulkInsertWithoutMetaFields(df)
}
if (SPARK_VERSION.startsWith("2.")) {
if (HoodieSparkUtils.isSpark2) {
hoodieDF.write.format("org.apache.hudi.internal")
.option(DataSourceInternalWriterHelper.INSTANT_TIME_OPT_KEY, instantTime)
.options(params)
.mode(SaveMode.Append)
.save()
} else if (SPARK_VERSION.startsWith("3.")) {
} else if(HoodieSparkUtils.isSpark3) {
hoodieDF.write.format("org.apache.hudi.spark3.internal")
.option(DataSourceInternalWriterHelper.INSTANT_TIME_OPT_KEY, instantTime)
.option(HoodieInternalConfig.BULKINSERT_INPUT_DATA_SCHEMA_DDL.key, hoodieDF.schema.toDDL)

View File

@@ -18,18 +18,30 @@
package org.apache.spark.sql.avro
import org.apache.avro.Schema
import org.apache.hudi.HoodieSparkUtils
import org.apache.spark.sql.types.DataType
/**
* This is to be compatible with the type returned by Spark 3.1
* and other spark versions for AvroDeserializer
*/
case class HoodieAvroDeserializer(rootAvroType: Schema, rootCatalystType: DataType)
extends AvroDeserializer(rootAvroType, rootCatalystType) {
case class HoodieAvroDeserializer(rootAvroType: Schema, rootCatalystType: DataType) {
private val avroDeserializer = if (HoodieSparkUtils.isSpark3_2) {
// SPARK-34404: As of Spark3.2, there is no AvroDeserializer's constructor with Schema and DataType arguments.
// So use the reflection to get AvroDeserializer instance.
val constructor = classOf[AvroDeserializer].getConstructor(classOf[Schema], classOf[DataType], classOf[String])
constructor.newInstance(rootAvroType, rootCatalystType, "EXCEPTION")
} else {
val constructor = classOf[AvroDeserializer].getConstructor(classOf[Schema], classOf[DataType])
constructor.newInstance(rootAvroType, rootCatalystType)
}
def deserializeData(data: Any): Any = {
super.deserialize(data) match {
case Some(r) => r // spark 3.1 return type is Option, we fetch the data.
avroDeserializer.deserialize(data) match {
case Some(r) => r // As of spark 3.1, this will return data wrapped with Option, so we fetch the data.
case o => o // for other spark version, return the data directly.
}
}

View File

@@ -22,17 +22,37 @@ import org.apache.spark.sql.catalyst.plans.logical.CompactionOperation.Compactio
case class CompactionTable(table: LogicalPlan, operation: CompactionOperation, instantTimestamp: Option[Long])
extends Command {
override def children: Seq[LogicalPlan] = Seq(table)
def withNewChildrenInternal(newChildren: IndexedSeq[LogicalPlan]): CompactionTable = {
copy(table = newChildren.head)
}
}
case class CompactionPath(path: String, operation: CompactionOperation, instantTimestamp: Option[Long])
extends Command
extends Command {
override def children: Seq[LogicalPlan] = Seq.empty
def withNewChildrenInternal(newChildren: IndexedSeq[LogicalPlan]): CompactionPath = {
this
}
}
case class CompactionShowOnTable(table: LogicalPlan, limit: Int = 20)
extends Command {
override def children: Seq[LogicalPlan] = Seq(table)
def withNewChildrenInternal(newChildren: IndexedSeq[LogicalPlan]): CompactionShowOnTable = {
copy(table = newChildren.head)
}
}
case class CompactionShowOnPath(path: String, limit: Int = 20) extends Command
case class CompactionShowOnPath(path: String, limit: Int = 20) extends Command {
override def children: Seq[LogicalPlan] = Seq.empty
def withNewChildrenInternal(newChildren: IndexedSeq[LogicalPlan]): CompactionShowOnPath = {
this
}
}
object CompactionOperation extends Enumeration {
type CompactionOperation = Value

View File

@@ -0,0 +1,30 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.catalyst.trees
/**
* Similar to `LeafLike` in Spark3.2.
*/
trait HoodieLeafLike[T <: TreeNode[T]] { self: TreeNode[T] =>
override final def children: Seq[T] = Nil
override final def mapChildren(f: T => T): T = this.asInstanceOf[T]
final def withNewChildrenInternal(newChildren: IndexedSeq[T]): T = this.asInstanceOf[T]
}

View File

@@ -31,7 +31,7 @@ import org.apache.hudi.common.fs.FSUtils
import org.apache.hudi.common.model.HoodieRecord
import org.apache.hudi.common.table.{HoodieTableMetaClient, TableSchemaResolver}
import org.apache.hudi.common.table.timeline.{HoodieActiveTimeline, HoodieInstantTimeGenerator}
import org.apache.spark.SPARK_VERSION
import org.apache.spark.sql.{Column, DataFrame, SparkSession}
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation
@@ -282,8 +282,6 @@ object HoodieSqlUtils extends SparkAdapterSupport {
.filterKeys(_.startsWith("hoodie."))
}
def isSpark3: Boolean = SPARK_VERSION.startsWith("3.")
def isEnableHive(sparkSession: SparkSession): Boolean =
"hive" == sparkSession.sessionState.conf.getConf(StaticSQLConf.CATALOG_IMPLEMENTATION)

View File

@@ -17,12 +17,13 @@
package org.apache.spark.sql.hudi.analysis
import org.apache.hudi.{HoodieSparkUtils, SparkAdapterSupport}
import org.apache.hudi.DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL
import org.apache.hudi.SparkAdapterSupport
import org.apache.hudi.common.model.HoodieRecord
import org.apache.hudi.common.table.HoodieTableMetaClient
import org.apache.spark.sql.catalyst.analysis.{UnresolvedAttribute, UnresolvedStar}
import org.apache.spark.sql.catalyst.expressions.{Alias, AttributeReference, Expression, Literal, NamedExpression}
import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, AttributeReference, Expression, Literal, NamedExpression}
import org.apache.spark.sql.catalyst.plans.Inner
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.rules.Rule
@@ -137,7 +138,7 @@ case class HoodieResolveReferences(sparkSession: SparkSession) extends Rule[Logi
// We can do this because under the normal case, we should not allow to update or set
// the hoodie's meta field in sql statement, it is a system field, cannot set the value
// by user.
if (HoodieSqlUtils.isSpark3) {
if (HoodieSparkUtils.isSpark3) {
val assignmentFieldNames = assignments.map(_.key).map {
case attr: AttributeReference =>
attr.name
@@ -178,11 +179,19 @@ case class HoodieResolveReferences(sparkSession: SparkSession) extends Rule[Logi
.map { case (targetAttr, sourceAttr) => Assignment(targetAttr, sourceAttr) }
}
} else {
assignments.map(assignment => {
// For Spark3.2, InsertStarAction/UpdateStarAction's assignments will contain the meta fields.
val withoutMetaAttrs = assignments.filterNot{ assignment =>
if (assignment.key.isInstanceOf[Attribute]) {
HoodieSqlUtils.isMetaField(assignment.key.asInstanceOf[Attribute].name)
} else {
false
}
}
withoutMetaAttrs.map { assignment =>
val resolvedKey = resolveExpressionFrom(target)(assignment.key)
val resolvedValue = resolveExpressionFrom(resolvedSource, Some(target))(assignment.value)
Assignment(resolvedKey, resolvedValue)
})
}
}
(resolvedCondition, resolvedAssignments)
}
@@ -242,6 +251,10 @@ case class HoodieResolveReferences(sparkSession: SparkSession) extends Rule[Logi
case DeleteAction(condition) =>
val resolvedCondition = condition.map(resolveExpressionFrom(resolvedSource)(_))
DeleteAction(resolvedCondition)
case action: MergeAction =>
// SPARK-34962: use UpdateStarAction as the explicit representation of * in UpdateAction.
// So match and covert this in Spark3.2 env.
UpdateAction(action.condition, Seq.empty)
}
// Resolve the notMatchedActions
val resolvedNotMatchedActions = notMatchedActions.map {
@@ -249,6 +262,10 @@ case class HoodieResolveReferences(sparkSession: SparkSession) extends Rule[Logi
val (resolvedCondition, resolvedAssignments) =
resolveConditionAssignments(condition, assignments)
InsertAction(resolvedCondition, resolvedAssignments)
case action: MergeAction =>
// SPARK-34962: use InsertStarAction as the explicit representation of * in InsertAction.
// So match and covert this in Spark3.2 env.
InsertAction(action.condition, Seq.empty)
}
// Return the resolved MergeIntoTable
MergeIntoTable(target, resolvedSource, resolvedMergeCondition,
@@ -426,9 +443,11 @@ case class HoodiePostAnalysisRule(sparkSession: SparkSession) extends Rule[Logic
case AlterTableChangeColumnCommand(tableName, columnName, newColumn)
if isHoodieTable(tableName, sparkSession) =>
AlterHoodieTableChangeColumnCommand(tableName, columnName, newColumn)
case ShowPartitionsCommand(tableName, specOpt)
if isHoodieTable(tableName, sparkSession) =>
ShowHoodieTablePartitionsCommand(tableName, specOpt)
// SPARK-34238: the definition of ShowPartitionsCommand has been changed in Spark3.2.
// Match the class type instead of call the `unapply` method.
case s: ShowPartitionsCommand
if isHoodieTable(s.tableName, sparkSession) =>
ShowHoodieTablePartitionsCommand(s.tableName, s.spec)
// Rewrite TruncateTableCommand to TruncateHoodieTableCommand
case TruncateTableCommand(tableName, partitionSpec)
if isHoodieTable(tableName, sparkSession) =>

View File

@@ -31,7 +31,7 @@ import org.apache.spark.api.java.JavaSparkContext
import org.apache.spark.sql.{AnalysisException, Row, SparkSession}
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.catalog.{CatalogTable, HoodieCatalogTable}
import org.apache.spark.sql.execution.command.{DDLUtils, RunnableCommand}
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.types.{StructField, StructType}
import org.apache.spark.sql.util.SchemaUtils
@@ -44,7 +44,7 @@ import scala.util.control.NonFatal
case class AlterHoodieTableAddColumnsCommand(
tableId: TableIdentifier,
colsToAdd: Seq[StructField])
extends RunnableCommand {
extends HoodieLeafRunnableCommand {
override def run(sparkSession: SparkSession): Seq[Row] = {
if (colsToAdd.nonEmpty) {
@@ -74,7 +74,7 @@ case class AlterHoodieTableAddColumnsCommand(
}
private def refreshSchemaInMeta(sparkSession: SparkSession, table: CatalogTable,
newSqlSchema: StructType): Unit = {
newSqlDataSchema: StructType): Unit = {
try {
sparkSession.catalog.uncacheTable(tableId.quotedString)
} catch {
@@ -84,12 +84,11 @@ case class AlterHoodieTableAddColumnsCommand(
sparkSession.catalog.refreshTable(table.identifier.unquotedString)
SchemaUtils.checkColumnNameDuplication(
newSqlSchema.map(_.name),
newSqlDataSchema.map(_.name),
"in the table definition of " + table.identifier,
conf.caseSensitiveAnalysis)
DDLUtils.checkDataColNames(table, colsToAdd.map(_.name))
sparkSession.sessionState.catalog.alterTableDataSchema(tableId, newSqlSchema)
sparkSession.sessionState.catalog.alterTableDataSchema(tableId, newSqlDataSchema)
}
}

View File

@@ -27,7 +27,7 @@ import org.apache.hudi.exception.HoodieException
import org.apache.spark.sql.{AnalysisException, Row, SparkSession}
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.catalog.HoodieCatalogTable
import org.apache.spark.sql.execution.command.RunnableCommand
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.types.{StructField, StructType}
import scala.util.control.NonFatal
@@ -39,7 +39,7 @@ case class AlterHoodieTableChangeColumnCommand(
tableIdentifier: TableIdentifier,
columnName: String,
newColumn: StructField)
extends RunnableCommand {
extends HoodieLeafRunnableCommand {
override def run(sparkSession: SparkSession): Seq[Row] = {
val hoodieCatalogTable = HoodieCatalogTable(sparkSession, tableIdentifier)

View File

@@ -25,11 +25,12 @@ import org.apache.hudi.config.HoodieWriteConfig.TBL_NAME
import org.apache.hudi.hive.MultiPartKeysValueExtractor
import org.apache.hudi.hive.ddl.HiveSyncMode
import org.apache.hudi.{DataSourceWriteOptions, HoodieSparkSqlWriter}
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.analysis.Resolver
import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
import org.apache.spark.sql.catalyst.catalog.HoodieCatalogTable
import org.apache.spark.sql.execution.command.{DDLUtils, RunnableCommand}
import org.apache.spark.sql.execution.command.DDLUtils
import org.apache.spark.sql.hudi.HoodieSqlUtils._
import org.apache.spark.sql.{AnalysisException, Row, SaveMode, SparkSession}
@@ -39,7 +40,7 @@ case class AlterHoodieTableDropPartitionCommand(
ifExists : Boolean,
purge : Boolean,
retainData : Boolean)
extends RunnableCommand {
extends HoodieLeafRunnableCommand {
override def run(sparkSession: SparkSession): Seq[Row] = {
val fullTableName = s"${tableIdentifier.database}.${tableIdentifier.table}"

View File

@@ -24,12 +24,12 @@ import org.apache.hudi.common.table.timeline.{HoodieActiveTimeline, HoodieTimeli
import org.apache.hudi.common.table.{HoodieTableMetaClient, TableSchemaResolver}
import org.apache.hudi.common.util.{HoodieTimer, Option => HOption}
import org.apache.hudi.exception.HoodieException
import org.apache.spark.api.java.{JavaRDD, JavaSparkContext}
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference}
import org.apache.spark.sql.catalyst.plans.logical.CompactionOperation
import org.apache.spark.sql.catalyst.plans.logical.{CompactionOperation, LogicalPlan}
import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.sql.catalyst.plans.logical.CompactionOperation.{CompactionOperation, RUN, SCHEDULE}
import org.apache.spark.sql.execution.command.RunnableCommand
import org.apache.spark.sql.hudi.HoodieSqlUtils
import org.apache.spark.sql.types.StringType
@@ -38,7 +38,7 @@ import scala.collection.JavaConverters._
case class CompactionHoodiePathCommand(path: String,
operation: CompactionOperation, instantTimestamp: Option[Long] = None)
extends RunnableCommand {
extends HoodieLeafRunnableCommand {
override def run(sparkSession: SparkSession): Seq[Row] = {
val metaClient = HoodieTableMetaClient.builder().setBasePath(path)

View File

@@ -21,13 +21,13 @@ import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.sql.catalyst.catalog.CatalogTable
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference}
import org.apache.spark.sql.catalyst.plans.logical.CompactionOperation.{CompactionOperation, RUN, SCHEDULE}
import org.apache.spark.sql.execution.command.RunnableCommand
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.hudi.HoodieSqlUtils.getTableLocation
import org.apache.spark.sql.types.StringType
case class CompactionHoodieTableCommand(table: CatalogTable,
operation: CompactionOperation, instantTimestamp: Option[Long])
extends RunnableCommand {
extends HoodieLeafRunnableCommand {
override def run(sparkSession: SparkSession): Seq[Row] = {
val basePath = getTableLocation(table, sparkSession)

View File

@@ -22,14 +22,14 @@ import org.apache.hudi.common.table.HoodieTableMetaClient
import org.apache.hudi.common.table.timeline.HoodieTimeline
import org.apache.hudi.common.util.CompactionUtils
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference}
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.sql.execution.command.RunnableCommand
import org.apache.spark.sql.types.{IntegerType, StringType}
import scala.collection.JavaConverters.asScalaIteratorConverter
case class CompactionShowHoodiePathCommand(path: String, limit: Int)
extends RunnableCommand {
extends HoodieLeafRunnableCommand {
override def run(sparkSession: SparkSession): Seq[Row] = {
val metaClient = HoodieTableMetaClient.builder().setBasePath(path.toString)

View File

@@ -20,12 +20,12 @@ package org.apache.spark.sql.hudi.command
import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.sql.catalyst.catalog.CatalogTable
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference}
import org.apache.spark.sql.execution.command.RunnableCommand
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.hudi.HoodieSqlUtils.getTableLocation
import org.apache.spark.sql.types.{IntegerType, StringType}
case class CompactionShowHoodieTableCommand(table: CatalogTable, limit: Int)
extends RunnableCommand {
extends HoodieLeafRunnableCommand {
override def run(sparkSession: SparkSession): Seq[Row] = {
val basePath = getTableLocation(table, sparkSession)

View File

@@ -41,6 +41,10 @@ case class CreateHoodieTableAsSelectCommand(
mode: SaveMode,
query: LogicalPlan) extends DataWritingCommand {
def withNewChildInternal(newChild: LogicalPlan): CreateHoodieTableAsSelectCommand = {
this
}
override def run(sparkSession: SparkSession, child: SparkPlan): Seq[Row] = {
assert(table.tableType != CatalogTableType.VIEW)
assert(table.provider.isDefined)

View File

@@ -28,7 +28,7 @@ import org.apache.hudi.hadoop.utils.HoodieInputFormatUtils
import org.apache.spark.sql.catalyst.analysis.{NoSuchDatabaseException, TableAlreadyExistsException}
import org.apache.spark.sql.catalyst.catalog._
import org.apache.spark.sql.execution.command.RunnableCommand
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.hive.HiveClientUtils
import org.apache.spark.sql.hive.HiveExternalCatalog._
import org.apache.spark.sql.hudi.{HoodieOptionConfig, HoodieSqlUtils}
@@ -46,7 +46,7 @@ import scala.util.control.NonFatal
* Command for create hoodie table.
*/
case class CreateHoodieTableCommand(table: CatalogTable, ignoreIfExists: Boolean)
extends RunnableCommand with SparkAdapterSupport {
extends HoodieLeafRunnableCommand with SparkAdapterSupport {
override def run(sparkSession: SparkSession): Seq[Row] = {
val tableIsExists = sparkSession.sessionState.catalog.tableExists(table.identifier)
@@ -198,7 +198,7 @@ object CreateHoodieTableCommand {
val schemaJsonString = schema.json
// Split the JSON string.
val parts = schemaJsonString.grouped(threshold).toSeq
properties.put(DATASOURCE_SCHEMA_NUMPARTS, parts.size.toString)
properties.put(DATASOURCE_SCHEMA_PREFIX + "numParts", parts.size.toString)
parts.zipWithIndex.foreach { case (part, index) =>
properties.put(s"$DATASOURCE_SCHEMA_PART_PREFIX$index", part)
}

View File

@@ -25,12 +25,11 @@ import org.apache.hudi.{DataSourceWriteOptions, SparkAdapterSupport}
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.catalog.HoodieCatalogTable
import org.apache.spark.sql.catalyst.plans.logical.DeleteFromTable
import org.apache.spark.sql.execution.command.RunnableCommand
import org.apache.spark.sql.catalyst.plans.logical.{DeleteFromTable, LogicalPlan}
import org.apache.spark.sql.hudi.HoodieSqlUtils._
import org.apache.spark.sql.types.StructType
case class DeleteHoodieTableCommand(deleteTable: DeleteFromTable) extends RunnableCommand
case class DeleteHoodieTableCommand(deleteTable: DeleteFromTable) extends HoodieLeafRunnableCommand
with SparkAdapterSupport {
private val table = deleteTable.table

View File

@@ -18,25 +18,25 @@
package org.apache.spark.sql.hudi.command
import org.apache.hadoop.fs.Path
import org.apache.hudi.SparkAdapterSupport
import org.apache.hudi.client.common.HoodieSparkEngineContext
import org.apache.hudi.common.fs.FSUtils
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.analysis.{NoSuchDatabaseException, NoSuchTableException}
import org.apache.spark.sql.catalyst.catalog.{CatalogTable, CatalogTableType, HoodieCatalogTable}
import org.apache.spark.sql.execution.command.RunnableCommand
import org.apache.spark.sql.hive.HiveClientUtils
import org.apache.spark.sql.hudi.HoodieSqlUtils.isEnableHive
import scala.util.control.NonFatal
case class DropHoodieTableCommand(
tableIdentifier: TableIdentifier,
ifExists: Boolean,
isView: Boolean,
purge: Boolean) extends RunnableCommand
with SparkAdapterSupport {
tableIdentifier: TableIdentifier,
ifExists: Boolean,
isView: Boolean,
purge: Boolean)
extends HoodieLeafRunnableCommand {
override def run(sparkSession: SparkSession): Seq[Row] = {
val fullTableName = s"${tableIdentifier.database}.${tableIdentifier.table}"

View File

@@ -0,0 +1,29 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.hudi.command
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.catalyst.trees.HoodieLeafLike
import org.apache.spark.sql.execution.command.RunnableCommand
/**
* Similar to `LeafRunnableCommand` in Spark3.2, `HoodieLeafRunnableCommand` mixed in
* `HoodieLeafLike` can avoid subclasses of `RunnableCommand` to override
* the `withNewChildrenInternal` method repeatedly.
*/
trait HoodieLeafRunnableCommand extends RunnableCommand with HoodieLeafLike[LogicalPlan]

View File

@@ -36,7 +36,6 @@ import org.apache.spark.internal.Logging
import org.apache.spark.sql.catalyst.catalog.{CatalogTable, HoodieCatalogTable}
import org.apache.spark.sql.catalyst.expressions.{Alias, Literal}
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project}
import org.apache.spark.sql.execution.command.RunnableCommand
import org.apache.spark.sql.execution.datasources.LogicalRelation
import org.apache.spark.sql.hudi.HoodieSqlUtils._
import org.apache.spark.sql.internal.SQLConf
@@ -54,7 +53,7 @@ case class InsertIntoHoodieTableCommand(
query: LogicalPlan,
partition: Map[String, Option[String]],
overwrite: Boolean)
extends RunnableCommand {
extends HoodieLeafRunnableCommand {
override def run(sparkSession: SparkSession): Seq[Row] = {
assert(logicalRelation.catalogTable.isDefined, "Missing catalog table")

View File

@@ -32,7 +32,6 @@ import org.apache.spark.sql.catalyst.analysis.Resolver
import org.apache.spark.sql.catalyst.catalog.HoodieCatalogTable
import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, AttributeReference, BoundReference, Cast, EqualTo, Expression, Literal}
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.execution.command.RunnableCommand
import org.apache.spark.sql.hudi.HoodieSqlUtils._
import org.apache.spark.sql.hudi.command.payload.ExpressionPayload
import org.apache.spark.sql.hudi.command.payload.ExpressionPayload._
@@ -60,7 +59,7 @@ import java.util.Base64
* ExpressionPayload#getInsertValue.
*
*/
case class MergeIntoHoodieTableCommand(mergeInto: MergeIntoTable) extends RunnableCommand
case class MergeIntoHoodieTableCommand(mergeInto: MergeIntoTable) extends HoodieLeafRunnableCommand
with SparkAdapterSupport {
private var sparkSession: SparkSession = _
@@ -203,7 +202,13 @@ case class MergeIntoHoodieTableCommand(mergeInto: MergeIntoTable) extends Runnab
sourceExpression match {
case attr: AttributeReference if sourceColumnName.find(resolver(_, attr.name)).get.equals(targetColumnName) => true
case Cast(attr: AttributeReference, _, _) if sourceColumnName.find(resolver(_, attr.name)).get.equals(targetColumnName) => true
// SPARK-35857: the definition of Cast has been changed in Spark3.2.
// Match the class type instead of call the `unapply` method.
case cast: Cast =>
cast.child match {
case attr: AttributeReference if sourceColumnName.find(resolver(_, attr.name)).get.equals(targetColumnName) => true
case _ => false
}
case _=> false
}
}

View File

@@ -24,7 +24,7 @@ import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
import org.apache.spark.sql.catalyst.catalog.HoodieCatalogTable
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference}
import org.apache.spark.sql.execution.command.RunnableCommand
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.execution.datasources.PartitioningUtils
import org.apache.spark.sql.types.StringType
@@ -34,7 +34,7 @@ import org.apache.spark.sql.types.StringType
case class ShowHoodieTablePartitionsCommand(
tableIdentifier: TableIdentifier,
specOpt: Option[TablePartitionSpec])
extends RunnableCommand {
extends HoodieLeafRunnableCommand {
override val output: Seq[Attribute] = {
AttributeReference("partition", StringType, nullable = false)() :: Nil

View File

@@ -28,15 +28,14 @@ import org.apache.hudi.hive.ddl.HiveSyncMode
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.catalog.HoodieCatalogTable
import org.apache.spark.sql.catalyst.expressions.{Alias, AttributeReference, Expression}
import org.apache.spark.sql.catalyst.plans.logical.{Assignment, UpdateTable}
import org.apache.spark.sql.execution.command.RunnableCommand
import org.apache.spark.sql.catalyst.plans.logical.{Assignment, LogicalPlan, UpdateTable}
import org.apache.spark.sql.hudi.HoodieSqlUtils._
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types.{StructField, StructType}
import scala.collection.JavaConverters._
case class UpdateHoodieTableCommand(updateTable: UpdateTable) extends RunnableCommand
case class UpdateHoodieTableCommand(updateTable: UpdateTable) extends HoodieLeafRunnableCommand
with SparkAdapterSupport {
private val table = updateTable.table

View File

@@ -19,6 +19,7 @@ package org.apache.hudi
import org.apache.hudi.HoodieSparkUtils.convertToCatalystExpressions
import org.apache.hudi.HoodieSparkUtils.convertToCatalystExpression
import org.apache.spark.sql.sources.{And, EqualNullSafe, EqualTo, Filter, GreaterThan, GreaterThanOrEqual, In, IsNotNull, IsNull, LessThan, LessThanOrEqual, Not, Or, StringContains, StringEndsWith, StringStartsWith}
import org.apache.spark.sql.types.{DoubleType, IntegerType, LongType, StringType, StructField, StructType}
import org.junit.jupiter.api.Assertions.assertEquals
@@ -68,22 +69,36 @@ class TestConvertFilterToCatalystExpression {
}
private def checkConvertFilter(filter: Filter, expectExpression: String): Unit = {
// [SPARK-25769][SPARK-34636][SPARK-34626][SQL] sql method in UnresolvedAttribute,
// AttributeReference and Alias don't quote qualified names properly
val removeQuotesIfNeed = if (expectExpression != null && HoodieSparkUtils.isSpark3_2) {
expectExpression.replace("`", "")
} else {
expectExpression
}
val exp = convertToCatalystExpression(filter, tableSchema)
if (expectExpression == null) {
if (removeQuotesIfNeed == null) {
assertEquals(exp.isEmpty, true)
} else {
assertEquals(exp.isDefined, true)
assertEquals(expectExpression, exp.get.sql)
assertEquals(removeQuotesIfNeed, exp.get.sql)
}
}
private def checkConvertFilters(filters: Array[Filter], expectExpression: String): Unit = {
// [SPARK-25769][SPARK-34636][SPARK-34626][SQL] sql method in UnresolvedAttribute,
// AttributeReference and Alias don't quote qualified names properly
val removeQuotesIfNeed = if (expectExpression != null && HoodieSparkUtils.isSpark3_2) {
expectExpression.replace("`", "")
} else {
expectExpression
}
val exp = convertToCatalystExpressions(filters, tableSchema)
if (expectExpression == null) {
if (removeQuotesIfNeed == null) {
assertEquals(exp.isEmpty, true)
} else {
assertEquals(exp.isDefined, true)
assertEquals(expectExpression, exp.get.sql)
assertEquals(removeQuotesIfNeed, exp.get.sql)
}
}

View File

@@ -293,28 +293,26 @@ class TestHoodieSparkSqlWriter {
*/
@Test
def testDisableAndEnableMetaFields(): Unit = {
try {
testBulkInsertWithSortMode(BulkInsertSortMode.NONE, populateMetaFields = false)
//create a new table
val fooTableModifier = commonTableModifier.updated("hoodie.bulkinsert.shuffle.parallelism", "4")
.updated(DataSourceWriteOptions.OPERATION.key, DataSourceWriteOptions.BULK_INSERT_OPERATION_OPT_VAL)
.updated(DataSourceWriteOptions.ENABLE_ROW_WRITER.key, "true")
.updated(HoodieWriteConfig.BULK_INSERT_SORT_MODE.key(), BulkInsertSortMode.NONE.name())
.updated(HoodieTableConfig.POPULATE_META_FIELDS.key(), "true")
testBulkInsertWithSortMode(BulkInsertSortMode.NONE, populateMetaFields = false)
//create a new table
val fooTableModifier = commonTableModifier.updated("hoodie.bulkinsert.shuffle.parallelism", "4")
.updated(DataSourceWriteOptions.OPERATION.key, DataSourceWriteOptions.BULK_INSERT_OPERATION_OPT_VAL)
.updated(DataSourceWriteOptions.ENABLE_ROW_WRITER.key, "true")
.updated(HoodieWriteConfig.BULK_INSERT_SORT_MODE.key(), BulkInsertSortMode.NONE.name())
.updated(HoodieTableConfig.POPULATE_META_FIELDS.key(), "true")
// generate the inserts
val schema = DataSourceTestUtils.getStructTypeExampleSchema
val structType = AvroConversionUtils.convertAvroSchemaToStructType(schema)
val inserts = DataSourceTestUtils.generateRandomRows(1000)
val df = spark.createDataFrame(sc.parallelize(inserts), structType)
try {
// write to Hudi
HoodieSparkSqlWriter.write(sqlContext, SaveMode.Append, fooTableModifier, df)
fail("Should have thrown exception")
} catch {
case e: HoodieException => assertTrue(e.getMessage.startsWith("Config conflict"))
case e: Exception => fail(e);
}
// generate the inserts
val schema = DataSourceTestUtils.getStructTypeExampleSchema
val structType = AvroConversionUtils.convertAvroSchemaToStructType(schema)
val inserts = DataSourceTestUtils.generateRandomRows(1000)
val df = spark.createDataFrame(sc.parallelize(inserts), structType)
try {
// write to Hudi
HoodieSparkSqlWriter.write(sqlContext, SaveMode.Append, fooTableModifier, df)
fail("Should have thrown exception")
} catch {
case e: HoodieException => assertTrue(e.getMessage.startsWith("Config conflict"))
case e: Exception => fail(e);
}
}
@@ -711,51 +709,49 @@ class TestHoodieSparkSqlWriter {
DataSourceWriteOptions.PARTITIONPATH_FIELD.key -> "",
DataSourceWriteOptions.KEYGENERATOR_CLASS_NAME.key -> "org.apache.hudi.keygen.NonpartitionedKeyGenerator",
HoodieWriteConfig.TBL_NAME.key -> "hoodie_test")
try {
val df = spark.range(0, 1000).toDF("keyid")
.withColumn("col3", expr("keyid"))
.withColumn("age", lit(1))
.withColumn("p", lit(2))
val df = spark.range(0, 1000).toDF("keyid")
.withColumn("col3", expr("keyid"))
.withColumn("age", lit(1))
.withColumn("p", lit(2))
df.write.format("hudi")
.options(options)
.option(DataSourceWriteOptions.OPERATION.key, "insert")
.option("hoodie.insert.shuffle.parallelism", "4")
.mode(SaveMode.Overwrite).save(tempBasePath)
df.write.format("hudi")
.options(options)
.option(DataSourceWriteOptions.OPERATION.key, "insert")
.option("hoodie.insert.shuffle.parallelism", "4")
.mode(SaveMode.Overwrite).save(tempBasePath)
df.write.format("hudi")
.options(options)
.option(DataSourceWriteOptions.OPERATION.key, "insert_overwrite_table")
.option("hoodie.insert.shuffle.parallelism", "4")
.mode(SaveMode.Append).save(tempBasePath)
df.write.format("hudi")
.options(options)
.option(DataSourceWriteOptions.OPERATION.key, "insert_overwrite_table")
.option("hoodie.insert.shuffle.parallelism", "4")
.mode(SaveMode.Append).save(tempBasePath)
val currentCommits = spark.read.format("hudi").load(tempBasePath).select("_hoodie_commit_time").take(1).map(_.getString(0))
val incrementalKeyIdNum = spark.read.format("hudi")
.option(DataSourceReadOptions.QUERY_TYPE.key, DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL)
.option(DataSourceReadOptions.BEGIN_INSTANTTIME.key, "0000")
.option(DataSourceReadOptions.END_INSTANTTIME.key, currentCommits(0))
.load(tempBasePath).select("keyid").orderBy("keyid").count
assert(incrementalKeyIdNum == 1000)
val currentCommits = spark.read.format("hudi").load(tempBasePath).select("_hoodie_commit_time").take(1).map(_.getString(0))
val incrementalKeyIdNum = spark.read.format("hudi")
.option(DataSourceReadOptions.QUERY_TYPE.key, DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL)
.option(DataSourceReadOptions.BEGIN_INSTANTTIME.key, "0000")
.option(DataSourceReadOptions.END_INSTANTTIME.key, currentCommits(0))
.load(tempBasePath).select("keyid").orderBy("keyid").count
assert(incrementalKeyIdNum == 1000)
df.write.mode(SaveMode.Overwrite).save(baseBootStrapPath)
spark.emptyDataFrame.write.format("hudi")
.options(options)
.option(HoodieBootstrapConfig.BASE_PATH.key, baseBootStrapPath)
.option(HoodieBootstrapConfig.KEYGEN_CLASS_NAME.key, classOf[NonpartitionedKeyGenerator].getCanonicalName)
.option(DataSourceWriteOptions.OPERATION.key, DataSourceWriteOptions.BOOTSTRAP_OPERATION_OPT_VAL)
.option(HoodieBootstrapConfig.PARALLELISM_VALUE.key, "4")
.mode(SaveMode.Overwrite).save(tempBasePath)
df.write.format("hudi").options(options)
.option(DataSourceWriteOptions.OPERATION.key, "insert_overwrite_table")
.option("hoodie.insert.shuffle.parallelism", "4").mode(SaveMode.Append).save(tempBasePath)
val currentCommitsBootstrap = spark.read.format("hudi").load(tempBasePath).select("_hoodie_commit_time").take(1).map(_.getString(0))
val incrementalKeyIdNumBootstrap = spark.read.format("hudi")
.option(DataSourceReadOptions.QUERY_TYPE.key, DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL)
.option(DataSourceReadOptions.BEGIN_INSTANTTIME.key, "0000")
.option(DataSourceReadOptions.END_INSTANTTIME.key, currentCommitsBootstrap(0))
.load(tempBasePath).select("keyid").orderBy("keyid").count
assert(incrementalKeyIdNumBootstrap == 1000)
}
df.write.mode(SaveMode.Overwrite).save(baseBootStrapPath)
spark.emptyDataFrame.write.format("hudi")
.options(options)
.option(HoodieBootstrapConfig.BASE_PATH.key, baseBootStrapPath)
.option(HoodieBootstrapConfig.KEYGEN_CLASS_NAME.key, classOf[NonpartitionedKeyGenerator].getCanonicalName)
.option(DataSourceWriteOptions.OPERATION.key, DataSourceWriteOptions.BOOTSTRAP_OPERATION_OPT_VAL)
.option(HoodieBootstrapConfig.PARALLELISM_VALUE.key, "4")
.mode(SaveMode.Overwrite).save(tempBasePath)
df.write.format("hudi").options(options)
.option(DataSourceWriteOptions.OPERATION.key, "insert_overwrite_table")
.option("hoodie.insert.shuffle.parallelism", "4").mode(SaveMode.Append).save(tempBasePath)
val currentCommitsBootstrap = spark.read.format("hudi").load(tempBasePath).select("_hoodie_commit_time").take(1).map(_.getString(0))
val incrementalKeyIdNumBootstrap = spark.read.format("hudi")
.option(DataSourceReadOptions.QUERY_TYPE.key, DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL)
.option(DataSourceReadOptions.BEGIN_INSTANTTIME.key, "0000")
.option(DataSourceReadOptions.END_INSTANTTIME.key, currentCommitsBootstrap(0))
.load(tempBasePath).select("keyid").orderBy("keyid").count
assert(incrementalKeyIdNumBootstrap == 1000)
}
}

View File

@@ -61,14 +61,18 @@ class TestHoodieSqlBase extends FunSuite with BeforeAndAfterAll {
}
override protected def test(testName: String, testTags: Tag*)(testFun: => Any /* Assertion */)(implicit pos: source.Position): Unit = {
try super.test(testName, testTags: _*)(try testFun finally {
val catalog = spark.sessionState.catalog
catalog.listDatabases().foreach{db =>
catalog.listTables(db).foreach {table =>
catalog.dropTable(table, true, true)
super.test(testName, testTags: _*)(
try {
testFun
} finally {
val catalog = spark.sessionState.catalog
catalog.listDatabases().foreach{db =>
catalog.listTables(db).foreach {table =>
catalog.dropTable(table, true, true)
}
}
}
})
)
}
protected def generateTableName: String = {

View File

@@ -17,6 +17,7 @@
package org.apache.spark.sql.hudi
import org.apache.hudi.HoodieSparkUtils
import org.apache.hudi.common.table.HoodieTableMetaClient
import org.apache.spark.sql.Row
@@ -352,7 +353,7 @@ class TestMergeIntoTable2 extends TestHoodieSqlBase {
| when not matched and flag = '1' then insert *
|""".stripMargin
if (HoodieSqlUtils.isSpark3) {
if (HoodieSparkUtils.isSpark3) {
checkExceptionContain(mergeSql)("Columns aliases are not allowed in MERGE")
} else {
spark.sql(mergeSql)