From 0db1e978c6c26e7bdcbbaa5f048e0e11e6dee073 Mon Sep 17 00:00:00 2001
From: leesf <490081539@qq.com>
Date: Mon, 14 Feb 2022 22:26:58 +0800
Subject: [PATCH] [HUDI-3254] Introduce HoodieCatalog to manage tables for
Spark Datasource V2 (#4611)
---
.github/workflows/bot.yml | 7 +-
.../org/apache/hudi/HoodieSparkUtils.scala | 10 +
.../apache/spark/sql/hudi/SparkAdapter.scala | 40 ++-
.../org/apache/hudi/IncrementalRelation.scala | 4 +-
.../spark/sql/hudi/HoodieSqlCommonUtils.scala | 49 +--
.../AlterHoodieTableAddColumnsCommand.scala | 3 +-
.../spark/sql/hudi/HoodieSqlUtils.scala | 12 +-
.../sql/hudi/analysis/HoodieAnalysis.scala | 77 +++--
.../InsertIntoHoodieTableCommand.scala | 1 -
.../command/MergeIntoHoodieTableCommand.scala | 2 +-
.../command/UpdateHoodieTableCommand.scala | 1 -
.../hudi/TestHoodieSparkSqlWriter.scala | 8 +-
.../spark/sql/hudi/TestHoodieSqlBase.scala | 12 +
.../spark/sql/hudi/TestMergeIntoTable.scala | 14 +-
.../spark/sql/HoodieSpark3SqlUtils.scala | 45 +++
.../spark/sql/adapter/Spark3Adapter.scala | 18 +-
.../org/apache/hudi/Spark3DefaultSource.scala | 24 +-
.../connector/catalog/HoodieIdentifier.scala | 43 +++
.../hudi/analysis/HoodieSpark3Analysis.scala | 206 ++++++++++++
.../sql/hudi/catalog/BasicStagedTable.scala | 57 ++++
.../sql/hudi/catalog/HoodieCatalog.scala | 303 ++++++++++++++++++
.../hudi/catalog/HoodieInternalV2Table.scala | 126 ++++++++
.../sql/hudi/catalog/HoodieStagedTable.scala | 99 ++++++
.../hudi/catalog/ProvidesHoodieConfig.scala | 183 +++++++++++
.../sql/hudi/catalog/TableCreationMode.java | 23 ++
style/scalastyle.xml | 2 +-
26 files changed, 1288 insertions(+), 81 deletions(-)
create mode 100644 hudi-spark-datasource/hudi-spark3-common/src/main/scala/org/apache/spark/sql/HoodieSpark3SqlUtils.scala
create mode 100644 hudi-spark-datasource/hudi-spark3/src/main/scala/org/apache/spark/sql/connector/catalog/HoodieIdentifier.scala
create mode 100644 hudi-spark-datasource/hudi-spark3/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodieSpark3Analysis.scala
create mode 100644 hudi-spark-datasource/hudi-spark3/src/main/scala/org/apache/spark/sql/hudi/catalog/BasicStagedTable.scala
create mode 100644 hudi-spark-datasource/hudi-spark3/src/main/scala/org/apache/spark/sql/hudi/catalog/HoodieCatalog.scala
create mode 100644 hudi-spark-datasource/hudi-spark3/src/main/scala/org/apache/spark/sql/hudi/catalog/HoodieInternalV2Table.scala
create mode 100644 hudi-spark-datasource/hudi-spark3/src/main/scala/org/apache/spark/sql/hudi/catalog/HoodieStagedTable.scala
create mode 100644 hudi-spark-datasource/hudi-spark3/src/main/scala/org/apache/spark/sql/hudi/catalog/ProvidesHoodieConfig.scala
create mode 100644 hudi-spark-datasource/hudi-spark3/src/main/scala/org/apache/spark/sql/hudi/catalog/TableCreationMode.java
diff --git a/.github/workflows/bot.yml b/.github/workflows/bot.yml
index ebf3caccd..20515f7c7 100644
--- a/.github/workflows/bot.yml
+++ b/.github/workflows/bot.yml
@@ -18,12 +18,16 @@ jobs:
include:
- scala: "scala-2.11"
spark: "spark2"
+ skipModules: ""
- scala: "scala-2.11"
spark: "spark2,spark-shade-unbundle-avro"
+ skipModules: ""
- scala: "scala-2.12"
spark: "spark3.1.x"
+ skipModules: "!hudi-spark-datasource/hudi-spark3"
- scala: "scala-2.12"
spark: "spark3.1.x,spark-shade-unbundle-avro"
+ skipModules: "!hudi-spark-datasource/hudi-spark3"
- scala: "scala-2.12"
spark: "spark3"
- scala: "scala-2.12"
@@ -40,4 +44,5 @@ jobs:
env:
SCALA_PROFILE: ${{ matrix.scala }}
SPARK_PROFILE: ${{ matrix.spark }}
- run: mvn install -P "$SCALA_PROFILE,$SPARK_PROFILE" -DskipTests=true -Dmaven.javadoc.skip=true -B -V
+ SKIP_MODULES: ${{ matrix.skipModules }}
+ run: mvn install -P "$SCALA_PROFILE,$SPARK_PROFILE" -pl "$SKIP_MODULES" -DskipTests=true -Dmaven.javadoc.skip=true -B -V
diff --git a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieSparkUtils.scala b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieSparkUtils.scala
index dc61a5107..c763c264c 100644
--- a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieSparkUtils.scala
+++ b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieSparkUtils.scala
@@ -53,8 +53,18 @@ object HoodieSparkUtils extends SparkAdapterSupport {
def isSpark3_0: Boolean = SPARK_VERSION.startsWith("3.0")
+ def isSpark3_1: Boolean = SPARK_VERSION.startsWith("3.1")
+
def isSpark3_2: Boolean = SPARK_VERSION.startsWith("3.2")
+ def beforeSpark3_2(): Boolean = {
+ if (isSpark2 || isSpark3_0 || isSpark3_1) {
+ true
+ } else {
+ false
+ }
+ }
+
def getMetaSchema: StructType = {
StructType(HoodieRecord.HOODIE_META_COLUMNS.asScala.map(col => {
StructField(col, StringType, nullable = true)
diff --git a/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/hudi/SparkAdapter.scala b/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/hudi/SparkAdapter.scala
index 1264b9a1e..20b4d3cc1 100644
--- a/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/hudi/SparkAdapter.scala
+++ b/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/hudi/SparkAdapter.scala
@@ -18,19 +18,22 @@
package org.apache.spark.sql.hudi
+import org.apache.hudi.HoodieSparkUtils.sparkAdapter
import org.apache.hudi.client.utils.SparkRowSerDe
-
import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation
+import org.apache.spark.sql.catalyst.catalog.CatalogTable
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
import org.apache.spark.sql.catalyst.expressions.Expression
import org.apache.spark.sql.catalyst.parser.ParserInterface
import org.apache.spark.sql.catalyst.plans.JoinType
-import org.apache.spark.sql.catalyst.plans.logical.{Join, LogicalPlan}
+import org.apache.spark.sql.catalyst.plans.logical.{Join, LogicalPlan, SubqueryAlias}
import org.apache.spark.sql.catalyst.{AliasIdentifier, TableIdentifier}
-import org.apache.spark.sql.execution.datasources.{FilePartition, PartitionedFile, SparkParsePartitionUtil}
+import org.apache.spark.sql.execution.datasources.{FilePartition, LogicalRelation, PartitionedFile, SparkParsePartitionUtil}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.{Row, SparkSession}
+import java.util.Locale
+
/**
* An interface to adapter the difference between spark2 and spark3
* in some spark related class.
@@ -99,4 +102,35 @@ trait SparkAdapter extends Serializable {
*/
def getFilePartitions(sparkSession: SparkSession, partitionedFiles: Seq[PartitionedFile],
maxSplitBytes: Long): Seq[FilePartition]
+
+ def isHoodieTable(table: LogicalPlan, spark: SparkSession): Boolean = {
+ tripAlias(table) match {
+ case LogicalRelation(_, _, Some(tbl), _) => isHoodieTable(tbl)
+ case relation: UnresolvedRelation =>
+ isHoodieTable(toTableIdentifier(relation), spark)
+ case _=> false
+ }
+ }
+
+ def isHoodieTable(map: java.util.Map[String, String]): Boolean = {
+ map.getOrDefault("provider", "").equals("hudi")
+ }
+
+ def isHoodieTable(table: CatalogTable): Boolean = {
+ table.provider.map(_.toLowerCase(Locale.ROOT)).orNull == "hudi"
+ }
+
+ def isHoodieTable(tableId: TableIdentifier, spark: SparkSession): Boolean = {
+ val table = spark.sessionState.catalog.getTableMetadata(tableId)
+ isHoodieTable(table)
+ }
+
+ def tripAlias(plan: LogicalPlan): LogicalPlan = {
+ plan match {
+ case SubqueryAlias(_, relation: LogicalPlan) =>
+ tripAlias(relation)
+ case other =>
+ other
+ }
+ }
}
diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/IncrementalRelation.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/IncrementalRelation.scala
index 395e1b3a2..9247973e7 100644
--- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/IncrementalRelation.scala
+++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/IncrementalRelation.scala
@@ -195,7 +195,7 @@ class IncrementalRelation(val sqlContext: SQLContext,
if (doFullTableScan) {
val hudiDF = sqlContext.read
- .format("hudi")
+ .format("hudi_v1")
.schema(usedSchema)
.load(basePath)
.filter(String.format("%s > '%s'", HoodieRecord.COMMIT_TIME_METADATA_FIELD, //Notice the > in place of >= because we are working with optParam instead of first commit > optParam
@@ -208,7 +208,7 @@ class IncrementalRelation(val sqlContext: SQLContext,
} else {
if (metaBootstrapFileIdToFullPath.nonEmpty) {
df = sqlContext.sparkSession.read
- .format("hudi")
+ .format("hudi_v1")
.schema(usedSchema)
.option(DataSourceReadOptions.READ_PATHS.key, filteredMetaBootstrapFullPaths.mkString(","))
.load()
diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/HoodieSqlCommonUtils.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/HoodieSqlCommonUtils.scala
index a094e90b1..1e1e9c663 100644
--- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/HoodieSqlCommonUtils.scala
+++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/HoodieSqlCommonUtils.scala
@@ -32,11 +32,11 @@ import org.apache.spark.api.java.JavaSparkContext
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.analysis.{Resolver, UnresolvedRelation}
import org.apache.spark.sql.catalyst.catalog.{CatalogTable, CatalogTableType}
-import org.apache.spark.sql.catalyst.expressions.{And, Attribute, Expression}
+import org.apache.spark.sql.catalyst.expressions.{And, Attribute, Cast, Expression, Literal}
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, SubqueryAlias}
import org.apache.spark.sql.execution.datasources.LogicalRelation
-import org.apache.spark.sql.internal.StaticSQLConf
-import org.apache.spark.sql.types.{StringType, StructField, StructType}
+import org.apache.spark.sql.internal.{SQLConf, StaticSQLConf}
+import org.apache.spark.sql.types.{DataType, NullType, StringType, StructField, StructType}
import org.apache.spark.sql.{Column, DataFrame, SparkSession}
import java.net.URI
@@ -54,24 +54,6 @@ object HoodieSqlCommonUtils extends SparkAdapterSupport {
override def get() = new SimpleDateFormat("yyyy-MM-dd")
})
- def isHoodieTable(table: CatalogTable): Boolean = {
- table.provider.map(_.toLowerCase(Locale.ROOT)).orNull == "hudi"
- }
-
- def isHoodieTable(tableId: TableIdentifier, spark: SparkSession): Boolean = {
- val table = spark.sessionState.catalog.getTableMetadata(tableId)
- isHoodieTable(table)
- }
-
- def isHoodieTable(table: LogicalPlan, spark: SparkSession): Boolean = {
- tripAlias(table) match {
- case LogicalRelation(_, _, Some(tbl), _) => isHoodieTable(tbl)
- case relation: UnresolvedRelation =>
- isHoodieTable(sparkAdapter.toTableIdentifier(relation), spark)
- case _=> false
- }
- }
-
def getTableIdentifier(table: LogicalPlan): TableIdentifier = {
table match {
case SubqueryAlias(name, _) => sparkAdapter.toTableIdentifier(name)
@@ -200,14 +182,29 @@ object HoodieSqlCommonUtils extends SparkAdapterSupport {
getTableLocation(table, spark)
}
+ def getTableLocation(properties: Map[String, String], identifier: TableIdentifier, sparkSession: SparkSession): String = {
+ val location: Option[String] = Some(properties.getOrElse("location", ""))
+ val isManaged = location.isEmpty || location.get.isEmpty
+ val uri = if (isManaged) {
+ Some(sparkSession.sessionState.catalog.defaultTablePath(identifier))
+ } else {
+ Some(new Path(location.get).toUri)
+ }
+ getTableLocation(uri, identifier, sparkSession)
+ }
+
def getTableLocation(table: CatalogTable, sparkSession: SparkSession): String = {
val uri = table.storage.locationUri.orElse {
Some(sparkSession.sessionState.catalog.defaultTablePath(table.identifier))
}
+ getTableLocation(uri, table.identifier, sparkSession)
+ }
+
+ def getTableLocation(uri: Option[URI], identifier: TableIdentifier, sparkSession: SparkSession): String = {
val conf = sparkSession.sessionState.newHadoopConf()
uri.map(makePathQualified(_, conf))
.map(removePlaceHolder)
- .getOrElse(throw new IllegalArgumentException(s"Missing location for ${table.identifier}"))
+ .getOrElse(throw new IllegalArgumentException(s"Missing location for ${identifier}"))
}
private def removePlaceHolder(path: String): String = {
@@ -316,4 +313,12 @@ object HoodieSqlCommonUtils extends SparkAdapterSupport {
def columnEqual(field: StructField, other: StructField, resolver: Resolver): Boolean = {
resolver(field.name, other.name) && field.dataType == other.dataType
}
+
+ def castIfNeeded(child: Expression, dataType: DataType, conf: SQLConf): Expression = {
+ child match {
+ case Literal(nul, NullType) => Literal(nul, dataType)
+ case _ => if (child.dataType != dataType)
+ Cast(child, dataType, Option(conf.sessionLocalTimeZone)) else child
+ }
+ }
}
diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/command/AlterHoodieTableAddColumnsCommand.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/command/AlterHoodieTableAddColumnsCommand.scala
index c6c79f431..c4f5cd39f 100644
--- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/command/AlterHoodieTableAddColumnsCommand.scala
+++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/command/AlterHoodieTableAddColumnsCommand.scala
@@ -57,7 +57,8 @@ case class AlterHoodieTableAddColumnsCommand(
s" table columns is: [${hoodieCatalogTable.tableSchemaWithoutMetaFields.fieldNames.mkString(",")}]")
}
// Get the new schema
- val newSqlSchema = StructType(tableSchema.fields ++ colsToAdd)
+ val rearrangedSchema = hoodieCatalogTable.dataSchema ++ colsToAdd ++ hoodieCatalogTable.partitionSchema
+ val newSqlSchema = StructType(rearrangedSchema)
val (structName, nameSpace) = AvroConversionUtils.getAvroRecordNameAndNamespace(tableId.table)
val newSchema = AvroConversionUtils.convertStructTypeToAvroSchema(newSqlSchema, structName, nameSpace)
diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/HoodieSqlUtils.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/HoodieSqlUtils.scala
index a198d0e00..048ca4ec6 100644
--- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/HoodieSqlUtils.scala
+++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/HoodieSqlUtils.scala
@@ -19,10 +19,8 @@ package org.apache.spark.sql.hudi
import org.apache.hudi.SparkAdapterSupport
import org.apache.spark.sql.catalyst.TableIdentifier
-import org.apache.spark.sql.catalyst.expressions.{And, Cast, Expression, Literal}
+import org.apache.spark.sql.catalyst.expressions.{And, Expression}
import org.apache.spark.sql.catalyst.plans.logical.{MergeIntoTable, SubqueryAlias}
-import org.apache.spark.sql.internal.SQLConf
-import org.apache.spark.sql.types.{DataType, NullType}
object HoodieSqlUtils extends SparkAdapterSupport {
@@ -50,12 +48,4 @@ object HoodieSqlUtils extends SparkAdapterSupport {
case exp => Seq(exp)
}
}
-
- def castIfNeeded(child: Expression, dataType: DataType, conf: SQLConf): Expression = {
- child match {
- case Literal(nul, NullType) => Literal(nul, dataType)
- case _ => if (child.dataType != dataType)
- Cast(child, dataType, Option(conf.sessionLocalTimeZone)) else child
- }
- }
}
diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodieAnalysis.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodieAnalysis.scala
index c8fa32891..5b44bc6d8 100644
--- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodieAnalysis.scala
+++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodieAnalysis.scala
@@ -17,10 +17,10 @@
package org.apache.spark.sql.hudi.analysis
-import org.apache.hudi.{HoodieSparkUtils, SparkAdapterSupport}
import org.apache.hudi.DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL
import org.apache.hudi.common.model.HoodieRecord
-import org.apache.hudi.common.table.HoodieTableMetaClient
+import org.apache.hudi.common.util.ReflectionUtils
+import org.apache.hudi.{HoodieSparkUtils, SparkAdapterSupport}
import org.apache.spark.sql.catalyst.analysis.{UnresolvedAttribute, UnresolvedStar}
import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, AttributeReference, Expression, Literal, NamedExpression}
import org.apache.spark.sql.catalyst.plans.Inner
@@ -28,10 +28,10 @@ import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.execution.command._
import org.apache.spark.sql.execution.datasources.{CreateTable, LogicalRelation}
-import org.apache.spark.sql.hudi.HoodieSqlCommonUtils.{getTableIdentifier, getTableLocation, isHoodieTable, removeMetaFields, tableExistsInPath}
+import org.apache.spark.sql.hudi.HoodieSqlCommonUtils.{getTableIdentifier, removeMetaFields}
import org.apache.spark.sql.hudi.HoodieSqlUtils._
import org.apache.spark.sql.hudi.command._
-import org.apache.spark.sql.hudi.{HoodieOptionConfig, HoodieSqlCommonUtils, HoodieSqlUtils}
+import org.apache.spark.sql.hudi.{HoodieOptionConfig, HoodieSqlCommonUtils}
import org.apache.spark.sql.types.StringType
import org.apache.spark.sql.{AnalysisException, SparkSession}
@@ -42,12 +42,39 @@ object HoodieAnalysis {
Seq(
session => HoodieResolveReferences(session),
session => HoodieAnalysis(session)
- )
+ ) ++ extraResolutionRules()
def customPostHocResolutionRules(): Seq[SparkSession => Rule[LogicalPlan]] =
Seq(
session => HoodiePostAnalysisRule(session)
- )
+ ) ++ extraPostHocResolutionRules()
+
+ def extraResolutionRules(): Seq[SparkSession => Rule[LogicalPlan]] = {
+ if (!HoodieSparkUtils.beforeSpark3_2()) {
+ val spark3AnalysisClass = "org.apache.spark.sql.hudi.analysis.HoodieSpark3Analysis"
+ val spark3Analysis: SparkSession => Rule[LogicalPlan] =
+ session => ReflectionUtils.loadClass(spark3AnalysisClass, session).asInstanceOf[Rule[LogicalPlan]]
+
+ val spark3ResolveReferences = "org.apache.spark.sql.hudi.analysis.HoodieSpark3ResolveReferences"
+ val spark3References: SparkSession => Rule[LogicalPlan] =
+ session => ReflectionUtils.loadClass(spark3ResolveReferences, session).asInstanceOf[Rule[LogicalPlan]]
+
+ Seq(spark3Analysis, spark3References)
+ } else {
+ Seq.empty
+ }
+ }
+
+ def extraPostHocResolutionRules(): Seq[SparkSession => Rule[LogicalPlan]] =
+ if (!HoodieSparkUtils.beforeSpark3_2()) {
+ val spark3PostHocResolutionClass = "org.apache.spark.sql.hudi.analysis.HoodieSpark3PostAnalysisRule"
+ val spark3PostHocResolution: SparkSession => Rule[LogicalPlan] =
+ session => ReflectionUtils.loadClass(spark3PostHocResolutionClass, session).asInstanceOf[Rule[LogicalPlan]]
+
+ Seq(spark3PostHocResolution)
+ } else {
+ Seq.empty
+ }
}
/**
@@ -61,36 +88,36 @@ case class HoodieAnalysis(sparkSession: SparkSession) extends Rule[LogicalPlan]
plan match {
// Convert to MergeIntoHoodieTableCommand
case m @ MergeIntoTable(target, _, _, _, _)
- if m.resolved && isHoodieTable(target, sparkSession) =>
+ if m.resolved && sparkAdapter.isHoodieTable(target, sparkSession) =>
MergeIntoHoodieTableCommand(m)
// Convert to UpdateHoodieTableCommand
case u @ UpdateTable(table, _, _)
- if u.resolved && isHoodieTable(table, sparkSession) =>
+ if u.resolved && sparkAdapter.isHoodieTable(table, sparkSession) =>
UpdateHoodieTableCommand(u)
// Convert to DeleteHoodieTableCommand
case d @ DeleteFromTable(table, _)
- if d.resolved && isHoodieTable(table, sparkSession) =>
+ if d.resolved && sparkAdapter.isHoodieTable(table, sparkSession) =>
DeleteHoodieTableCommand(d)
// Convert to InsertIntoHoodieTableCommand
case l if sparkAdapter.isInsertInto(l) =>
val (table, partition, query, overwrite, _) = sparkAdapter.getInsertIntoChildren(l).get
table match {
- case relation: LogicalRelation if isHoodieTable(relation, sparkSession) =>
+ case relation: LogicalRelation if sparkAdapter.isHoodieTable(relation, sparkSession) =>
new InsertIntoHoodieTableCommand(relation, query, partition, overwrite)
case _ =>
l
}
// Convert to CreateHoodieTableAsSelectCommand
case CreateTable(table, mode, Some(query))
- if query.resolved && isHoodieTable(table) =>
+ if query.resolved && sparkAdapter.isHoodieTable(table) =>
CreateHoodieTableAsSelectCommand(table, mode, query)
// Convert to CompactionHoodieTableCommand
case CompactionTable(table, operation, options)
- if table.resolved && isHoodieTable(table, sparkSession) =>
+ if table.resolved && sparkAdapter.isHoodieTable(table, sparkSession) =>
val tableId = getTableIdentifier(table)
val catalogTable = sparkSession.sessionState.catalog.getTableMetadata(tableId)
CompactionHoodieTableCommand(catalogTable, operation, options)
@@ -99,7 +126,7 @@ case class HoodieAnalysis(sparkSession: SparkSession) extends Rule[LogicalPlan]
CompactionHoodiePathCommand(path, operation, options)
// Convert to CompactionShowOnTable
case CompactionShowOnTable(table, limit)
- if isHoodieTable(table, sparkSession) =>
+ if sparkAdapter.isHoodieTable(table, sparkSession) =>
val tableId = getTableIdentifier(table)
val catalogTable = sparkSession.sessionState.catalog.getTableMetadata(tableId)
CompactionShowHoodieTableCommand(catalogTable, limit)
@@ -122,7 +149,7 @@ case class HoodieResolveReferences(sparkSession: SparkSession) extends Rule[Logi
def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperatorsUp {
// Resolve merge into
case mergeInto @ MergeIntoTable(target, source, mergeCondition, matchedActions, notMatchedActions)
- if isHoodieTable(target, sparkSession) && target.resolved =>
+ if sparkAdapter.isHoodieTable(target, sparkSession) && target.resolved =>
val resolver = sparkSession.sessionState.conf.resolver
val resolvedSource = analyzer.execute(source)
@@ -277,7 +304,7 @@ case class HoodieResolveReferences(sparkSession: SparkSession) extends Rule[Logi
// Resolve update table
case UpdateTable(table, assignments, condition)
- if isHoodieTable(table, sparkSession) && table.resolved =>
+ if sparkAdapter.isHoodieTable(table, sparkSession) && table.resolved =>
// Resolve condition
val resolvedCondition = condition.map(resolveExpressionFrom(table)(_))
// Resolve assignments
@@ -291,7 +318,7 @@ case class HoodieResolveReferences(sparkSession: SparkSession) extends Rule[Logi
// Resolve Delete Table
case DeleteFromTable(table, condition)
- if isHoodieTable(table, sparkSession) && table.resolved =>
+ if sparkAdapter.isHoodieTable(table, sparkSession) && table.resolved =>
// Resolve condition
val resolvedCondition = condition.map(resolveExpressionFrom(table)(_))
// Return the resolved DeleteTable
@@ -303,7 +330,7 @@ case class HoodieResolveReferences(sparkSession: SparkSession) extends Rule[Logi
val (table, partition, query, overwrite, ifPartitionNotExists) =
sparkAdapter.getInsertIntoChildren(l).get
- if (isHoodieTable(table, sparkSession) && query.resolved &&
+ if (sparkAdapter.isHoodieTable(table, sparkSession) && query.resolved &&
!containUnResolvedStar(query) &&
!checkAlreadyAppendMetaField(query)) {
val metaFields = HoodieRecord.HOODIE_META_COLUMNS.asScala.map(
@@ -401,37 +428,37 @@ case class HoodiePostAnalysisRule(sparkSession: SparkSession) extends Rule[Logic
plan match {
// Rewrite the CreateDataSourceTableCommand to CreateHoodieTableCommand
case CreateDataSourceTableCommand(table, ignoreIfExists)
- if isHoodieTable(table) =>
+ if sparkAdapter.isHoodieTable(table) =>
CreateHoodieTableCommand(table, ignoreIfExists)
// Rewrite the DropTableCommand to DropHoodieTableCommand
case DropTableCommand(tableName, ifExists, isView, purge)
- if isHoodieTable(tableName, sparkSession) =>
+ if sparkAdapter.isHoodieTable(tableName, sparkSession) =>
DropHoodieTableCommand(tableName, ifExists, isView, purge)
// Rewrite the AlterTableDropPartitionCommand to AlterHoodieTableDropPartitionCommand
case AlterTableDropPartitionCommand(tableName, specs, ifExists, purge, retainData)
- if isHoodieTable(tableName, sparkSession) =>
+ if sparkAdapter.isHoodieTable(tableName, sparkSession) =>
AlterHoodieTableDropPartitionCommand(tableName, specs, ifExists, purge, retainData)
// Rewrite the AlterTableRenameCommand to AlterHoodieTableRenameCommand
// Rewrite the AlterTableAddColumnsCommand to AlterHoodieTableAddColumnsCommand
case AlterTableAddColumnsCommand(tableId, colsToAdd)
- if isHoodieTable(tableId, sparkSession) =>
+ if sparkAdapter.isHoodieTable(tableId, sparkSession) =>
AlterHoodieTableAddColumnsCommand(tableId, colsToAdd)
// Rewrite the AlterTableRenameCommand to AlterHoodieTableRenameCommand
case AlterTableRenameCommand(oldName, newName, isView)
- if !isView && isHoodieTable(oldName, sparkSession) =>
+ if !isView && sparkAdapter.isHoodieTable(oldName, sparkSession) =>
new AlterHoodieTableRenameCommand(oldName, newName, isView)
// Rewrite the AlterTableChangeColumnCommand to AlterHoodieTableChangeColumnCommand
case AlterTableChangeColumnCommand(tableName, columnName, newColumn)
- if isHoodieTable(tableName, sparkSession) =>
+ if sparkAdapter.isHoodieTable(tableName, sparkSession) =>
AlterHoodieTableChangeColumnCommand(tableName, columnName, newColumn)
// SPARK-34238: the definition of ShowPartitionsCommand has been changed in Spark3.2.
// Match the class type instead of call the `unapply` method.
case s: ShowPartitionsCommand
- if isHoodieTable(s.tableName, sparkSession) =>
+ if sparkAdapter.isHoodieTable(s.tableName, sparkSession) =>
ShowHoodieTablePartitionsCommand(s.tableName, s.spec)
// Rewrite TruncateTableCommand to TruncateHoodieTableCommand
case TruncateTableCommand(tableName, partitionSpec)
- if isHoodieTable(tableName, sparkSession) =>
+ if sparkAdapter.isHoodieTable(tableName, sparkSession) =>
new TruncateHoodieTableCommand(tableName, partitionSpec)
case _ => plan
}
diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/InsertIntoHoodieTableCommand.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/InsertIntoHoodieTableCommand.scala
index 030d3e3c6..b4013d2d0 100644
--- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/InsertIntoHoodieTableCommand.scala
+++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/InsertIntoHoodieTableCommand.scala
@@ -32,7 +32,6 @@ import org.apache.spark.sql.catalyst.expressions.{Alias, Literal}
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project}
import org.apache.spark.sql.execution.datasources.LogicalRelation
import org.apache.spark.sql.hudi.HoodieSqlCommonUtils._
-import org.apache.spark.sql.hudi.HoodieSqlUtils.castIfNeeded
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.{Dataset, Row, SaveMode, SparkSession}
diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala
index 2c76ad567..29386dc43 100644
--- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala
+++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala
@@ -32,7 +32,7 @@ import org.apache.spark.sql.catalyst.catalog.HoodieCatalogTable
import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, AttributeReference, BoundReference, Cast, EqualTo, Expression, Literal}
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.hudi.HoodieSqlCommonUtils._
-import org.apache.spark.sql.hudi.HoodieSqlUtils.{castIfNeeded, getMergeIntoTargetTableId}
+import org.apache.spark.sql.hudi.HoodieSqlUtils.getMergeIntoTargetTableId
import org.apache.spark.sql.hudi.SerDeUtils
import org.apache.spark.sql.hudi.command.payload.ExpressionPayload
import org.apache.spark.sql.hudi.command.payload.ExpressionPayload._
diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/UpdateHoodieTableCommand.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/UpdateHoodieTableCommand.scala
index 512e9a18b..10c57a7fd 100644
--- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/UpdateHoodieTableCommand.scala
+++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/UpdateHoodieTableCommand.scala
@@ -29,7 +29,6 @@ import org.apache.spark.sql.catalyst.catalog.HoodieCatalogTable
import org.apache.spark.sql.catalyst.expressions.{Alias, AttributeReference, Expression}
import org.apache.spark.sql.catalyst.plans.logical.{Assignment, UpdateTable}
import org.apache.spark.sql.hudi.HoodieSqlCommonUtils._
-import org.apache.spark.sql.hudi.HoodieSqlUtils.castIfNeeded
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types.StructField
diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieSparkSqlWriter.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieSparkSqlWriter.scala
index 69a0d1e89..edcefb531 100644
--- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieSparkSqlWriter.scala
+++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieSparkSqlWriter.scala
@@ -32,7 +32,7 @@ import org.apache.hudi.functional.TestBootstrap
import org.apache.hudi.hive.HiveSyncConfig
import org.apache.hudi.keygen.{ComplexKeyGenerator, NonpartitionedKeyGenerator, SimpleKeyGenerator}
import org.apache.hudi.testutils.DataSourceTestUtils
-import org.apache.spark.SparkContext
+import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.api.java.JavaSparkContext
import org.apache.spark.sql._
import org.apache.spark.sql.functions.{expr, lit}
@@ -94,11 +94,17 @@ class TestHoodieSparkSqlWriter {
* Utility method for initializing the spark context.
*/
def initSparkContext(): Unit = {
+ val sparkConf = new SparkConf()
+ if (!HoodieSparkUtils.beforeSpark3_2()) {
+ sparkConf.set("spark.sql.catalog.spark_catalog",
+ "org.apache.spark.sql.hudi.catalog.HoodieCatalog")
+ }
spark = SparkSession.builder()
.appName(hoodieFooTableName)
.master("local[2]")
.withExtensions(new HoodieSparkSessionExtension)
.config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
+ .config(sparkConf)
.getOrCreate()
sc = spark.sparkContext
sc.setLogLevel("ERROR")
diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestHoodieSqlBase.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestHoodieSqlBase.scala
index a5b49cc36..a222b91d4 100644
--- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestHoodieSqlBase.scala
+++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestHoodieSqlBase.scala
@@ -18,8 +18,10 @@
package org.apache.spark.sql.hudi
import org.apache.hadoop.fs.Path
+import org.apache.hudi.HoodieSparkUtils
import org.apache.hudi.common.fs.FSUtils
import org.apache.log4j.Level
+import org.apache.spark.SparkConf
import org.apache.spark.sql.catalyst.util.DateTimeUtils
import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.util.Utils
@@ -49,10 +51,20 @@ class TestHoodieSqlBase extends FunSuite with BeforeAndAfterAll {
.config("hoodie.delete.shuffle.parallelism", "4")
.config("spark.sql.warehouse.dir", sparkWareHouse.getCanonicalPath)
.config("spark.sql.session.timeZone", "CTT")
+ .config(sparkConf())
.getOrCreate()
private var tableId = 0
+ def sparkConf(): SparkConf = {
+ val sparkConf = new SparkConf()
+ if (!HoodieSparkUtils.beforeSpark3_2()) {
+ sparkConf.set("spark.sql.catalog.spark_catalog",
+ "org.apache.spark.sql.hudi.catalog.HoodieCatalog")
+ }
+ sparkConf
+ }
+
protected def withTempDir(f: File => Unit): Unit = {
val tempDir = Utils.createTempDir()
try f(tempDir) finally {
diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestMergeIntoTable.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestMergeIntoTable.scala
index baac82f4b..28dee88e1 100644
--- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestMergeIntoTable.scala
+++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestMergeIntoTable.scala
@@ -87,7 +87,7 @@ class TestMergeIntoTable extends TestHoodieSqlBase {
| on s0.id = $tableName.id
| when matched then update set
| id = s0.id, name = s0.name, price = s0.price + $tableName.price, ts = s0.ts
- | when not matched and id % 2 = 0 then insert *
+ | when not matched and s0.id % 2 = 0 then insert *
""".stripMargin)
checkAnswer(s"select id, name, price, ts from $tableName")(
Seq(1, "a1", 30.0, 1002),
@@ -102,9 +102,9 @@ class TestMergeIntoTable extends TestHoodieSqlBase {
| select 1 as id, 'a1' as name, 12 as price, 1003 as ts
| ) s0
| on s0.id = $tableName.id
- | when matched and id != 1 then update set
+ | when matched and s0.id != 1 then update set
| id = s0.id, name = s0.name, price = s0.price, ts = s0.ts
- | when matched and id = 1 then delete
+ | when matched and s0.id = 1 then delete
| when not matched then insert *
""".stripMargin)
val cnt = spark.sql(s"select * from $tableName where id = 1").count()
@@ -178,7 +178,7 @@ class TestMergeIntoTable extends TestHoodieSqlBase {
| )
| ) s0
| on s0.s_id = t0.id
- | when matched and ts = 1001 then update set id = s0.s_id, name = t0.name, price =
+ | when matched and s0.ts = 1001 then update set id = s0.s_id, name = t0.name, price =
| s0.price, ts = s0.ts
""".stripMargin
)
@@ -233,7 +233,7 @@ class TestMergeIntoTable extends TestHoodieSqlBase {
| select 1 as id, 'a1' as name, 12 as price, 1001 as ts, '2021-03-21' as dt
| ) as s0
| on t0.id = s0.id
- | when matched and id % 2 = 0 then update set *
+ | when matched and s0.id % 2 = 0 then update set *
""".stripMargin
)
checkAnswer(s"select id,name,price,dt from $tableName")(
@@ -488,7 +488,7 @@ class TestMergeIntoTable extends TestHoodieSqlBase {
|merge into $targetTable t0
|using $sourceTable s0
|on t0.id = s0.id
- |when matched and cast(_ts as string) > '1000' then update set *
+ |when matched and cast(s0._ts as string) > '1000' then update set *
""".stripMargin)
checkAnswer(s"select id, name, price, _ts from $targetTable")(
Seq(1, "a1", 12, 1001)
@@ -512,7 +512,7 @@ class TestMergeIntoTable extends TestHoodieSqlBase {
|using $sourceTable s0
|on t0.id = s0.id
|when matched then update set *
- |when not matched and name = 'a2' then insert *
+ |when not matched and s0.name = 'a2' then insert *
""".stripMargin)
checkAnswer(s"select id, name, price, _ts from $targetTable order by id")(
Seq(1, "a1", 12, 1001),
diff --git a/hudi-spark-datasource/hudi-spark3-common/src/main/scala/org/apache/spark/sql/HoodieSpark3SqlUtils.scala b/hudi-spark-datasource/hudi-spark3-common/src/main/scala/org/apache/spark/sql/HoodieSpark3SqlUtils.scala
new file mode 100644
index 000000000..c4c6fd682
--- /dev/null
+++ b/hudi-spark-datasource/hudi-spark3-common/src/main/scala/org/apache/spark/sql/HoodieSpark3SqlUtils.scala
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+
+import org.apache.hudi.exception.HoodieException
+import org.apache.spark.sql.catalyst.catalog.BucketSpec
+import org.apache.spark.sql.connector.expressions.{BucketTransform, FieldReference, IdentityTransform, Transform}
+
+import scala.collection.mutable
+
+object HoodieSpark3SqlUtils {
+ def convertTransforms(partitions: Seq[Transform]): (Seq[String], Option[BucketSpec]) = {
+ val identityCols = new mutable.ArrayBuffer[String]
+ var bucketSpec = Option.empty[BucketSpec]
+
+ partitions.map {
+ case IdentityTransform(FieldReference(Seq(col))) =>
+ identityCols += col
+
+
+ case BucketTransform(numBuckets, FieldReference(Seq(col))) =>
+ bucketSpec = Some(BucketSpec(numBuckets, col :: Nil, Nil))
+
+ case _ =>
+ throw new HoodieException(s"Partitioning by expressions is not supported.")
+ }
+
+ (identityCols, bucketSpec)
+ }
+}
diff --git a/hudi-spark-datasource/hudi-spark3-common/src/main/scala/org/apache/spark/sql/adapter/Spark3Adapter.scala b/hudi-spark-datasource/hudi-spark3-common/src/main/scala/org/apache/spark/sql/adapter/Spark3Adapter.scala
index f446715a6..a1c41e80a 100644
--- a/hudi-spark-datasource/hudi-spark3-common/src/main/scala/org/apache/spark/sql/adapter/Spark3Adapter.scala
+++ b/hudi-spark-datasource/hudi-spark3-common/src/main/scala/org/apache/spark/sql/adapter/Spark3Adapter.scala
@@ -20,7 +20,6 @@ package org.apache.spark.sql.adapter
import org.apache.hudi.Spark3RowSerDe
import org.apache.hudi.client.utils.SparkRowSerDe
import org.apache.hudi.spark3.internal.ReflectUtil
-
import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
@@ -30,11 +29,14 @@ import org.apache.spark.sql.catalyst.plans.JoinType
import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoStatement, Join, JoinHint, LogicalPlan}
import org.apache.spark.sql.catalyst.{AliasIdentifier, TableIdentifier}
import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._
-import org.apache.spark.sql.execution.datasources.{Spark3ParsePartitionUtil, SparkParsePartitionUtil}
-import org.apache.spark.sql.execution.datasources.{FilePartition, PartitionedFile}
+import org.apache.spark.sql.connector.catalog.Table
+import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
+import org.apache.spark.sql.execution.datasources.{FilePartition, LogicalRelation, PartitionedFile, Spark3ParsePartitionUtil, SparkParsePartitionUtil}
import org.apache.spark.sql.hudi.SparkAdapter
import org.apache.spark.sql.internal.SQLConf
+import scala.collection.JavaConverters.mapAsScalaMapConverter
+
/**
* The adapter for spark3.
*/
@@ -104,4 +106,14 @@ class Spark3Adapter extends SparkAdapter {
maxSplitBytes: Long): Seq[FilePartition] = {
FilePartition.getFilePartitions(sparkSession, partitionedFiles, maxSplitBytes)
}
+
+ override def isHoodieTable(table: LogicalPlan, spark: SparkSession): Boolean = {
+ tripAlias(table) match {
+ case LogicalRelation(_, _, Some(tbl), _) => isHoodieTable(tbl)
+ case relation: UnresolvedRelation =>
+ isHoodieTable(toTableIdentifier(relation), spark)
+ case DataSourceV2Relation(table: Table, _, _, _, _) => isHoodieTable(table.properties())
+ case _=> false
+ }
+ }
}
diff --git a/hudi-spark-datasource/hudi-spark3/src/main/scala/org/apache/hudi/Spark3DefaultSource.scala b/hudi-spark-datasource/hudi-spark3/src/main/scala/org/apache/hudi/Spark3DefaultSource.scala
index b55379087..d94fee1f4 100644
--- a/hudi-spark-datasource/hudi-spark3/src/main/scala/org/apache/hudi/Spark3DefaultSource.scala
+++ b/hudi-spark-datasource/hudi-spark3/src/main/scala/org/apache/hudi/Spark3DefaultSource.scala
@@ -17,8 +17,30 @@
package org.apache.hudi
+import org.apache.hudi.exception.HoodieException
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.connector.catalog.{Table, TableProvider}
+import org.apache.spark.sql.connector.expressions.Transform
+import org.apache.spark.sql.hudi.catalog.HoodieInternalV2Table
import org.apache.spark.sql.sources.DataSourceRegister
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.sql.util.CaseInsensitiveStringMap
+
+class Spark3DefaultSource extends DefaultSource with DataSourceRegister with TableProvider {
-class Spark3DefaultSource extends DefaultSource with DataSourceRegister {
override def shortName(): String = "hudi"
+
+ def inferSchema: StructType = new StructType()
+
+ override def inferSchema(options: CaseInsensitiveStringMap): StructType = inferSchema
+
+ override def getTable(schema: StructType,
+ partitioning: Array[Transform],
+ properties: java.util.Map[String, String]): Table = {
+ val options = new CaseInsensitiveStringMap(properties)
+ val path = options.get("path")
+ if (path == null) throw new HoodieException("'path' cannot be null, missing 'path' from table properties")
+
+ HoodieInternalV2Table(SparkSession.active, path)
+ }
}
diff --git a/hudi-spark-datasource/hudi-spark3/src/main/scala/org/apache/spark/sql/connector/catalog/HoodieIdentifier.scala b/hudi-spark-datasource/hudi-spark3/src/main/scala/org/apache/spark/sql/connector/catalog/HoodieIdentifier.scala
new file mode 100644
index 000000000..2649c56e5
--- /dev/null
+++ b/hudi-spark-datasource/hudi-spark3/src/main/scala/org/apache/spark/sql/connector/catalog/HoodieIdentifier.scala
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.connector.catalog
+
+import java.util
+import java.util.Objects
+
+/**
+ * This class is to make scala-2.11 compilable.
+ * Using Identifier.of(namespace, name) to get a IdentifierImpl will throw
+ * compile exception( Static methods in interface require -target:jvm-1.8)
+ */
+case class HoodieIdentifier(namespace: Array[String], name: String) extends Identifier {
+
+ override def equals(o: Any): Boolean = {
+ o match {
+ case that: HoodieIdentifier => util.Arrays.equals(namespace.asInstanceOf[Array[Object]],
+ that.namespace.asInstanceOf[Array[Object]]) && name == that.name
+ case _ => false
+ }
+ }
+
+ override def hashCode: Int = {
+ val nh = namespace.toSeq.hashCode().asInstanceOf[Object]
+ Objects.hash(nh, name)
+ }
+}
diff --git a/hudi-spark-datasource/hudi-spark3/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodieSpark3Analysis.scala b/hudi-spark-datasource/hudi-spark3/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodieSpark3Analysis.scala
new file mode 100644
index 000000000..0d4f1ce3f
--- /dev/null
+++ b/hudi-spark-datasource/hudi-spark3/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodieSpark3Analysis.scala
@@ -0,0 +1,206 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hudi.analysis
+
+import org.apache.hudi.common.table.HoodieTableMetaClient
+import org.apache.hudi.{DefaultSource, SparkAdapterSupport}
+import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.catalyst.analysis.{ResolvedTable, UnresolvedPartitionSpec}
+import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute}
+import org.apache.spark.sql.catalyst.plans.logical._
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.connector.catalog.CatalogV2Implicits.IdentifierHelper
+import org.apache.spark.sql.execution.datasources.LogicalRelation
+import org.apache.spark.sql.execution.datasources.PreWriteCheck.failAnalysis
+import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2Relation, V2SessionCatalog}
+import org.apache.spark.sql.hudi.HoodieSqlCommonUtils
+import org.apache.spark.sql.hudi.HoodieSqlCommonUtils.{castIfNeeded, getTableLocation, removeMetaFields, tableExistsInPath}
+import org.apache.spark.sql.hudi.catalog.{HoodieCatalog, HoodieInternalV2Table, ProvidesHoodieConfig}
+import org.apache.spark.sql.hudi.command.{AlterHoodieTableDropPartitionCommand, ShowHoodieTablePartitionsCommand, TruncateHoodieTableCommand}
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.sql.{AnalysisException, SQLContext, SparkSession}
+
+import scala.collection.JavaConverters.mapAsJavaMapConverter
+
+/**
+ * Rule for convert the logical plan to command.
+ * @param sparkSession
+ */
+case class HoodieSpark3Analysis(sparkSession: SparkSession) extends Rule[LogicalPlan]
+ with SparkAdapterSupport with ProvidesHoodieConfig {
+
+ override def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperatorsDown {
+ case dsv2 @ DataSourceV2Relation(d: HoodieInternalV2Table, _, _, _, _) =>
+ val output = dsv2.output
+ val catalogTable = if (d.catalogTable.isDefined) {
+ Some(d.v1Table)
+ } else {
+ None
+ }
+ val relation = new DefaultSource().createRelation(new SQLContext(sparkSession),
+ buildHoodieConfig(d.hoodieCatalogTable))
+ LogicalRelation(relation, output, catalogTable, isStreaming = false)
+ case a @ InsertIntoStatement(r: DataSourceV2Relation, partitionSpec, _, _, _, _) if a.query.resolved &&
+ r.table.isInstanceOf[HoodieInternalV2Table] &&
+ needsSchemaAdjustment(a.query, r.table.asInstanceOf[HoodieInternalV2Table], partitionSpec, r.schema) =>
+ val projection = resolveQueryColumnsByOrdinal(a.query, r.output)
+ if (projection != a.query) {
+ a.copy(query = projection)
+ } else {
+ a
+ }
+ }
+
+ /**
+ * Need to adjust schema based on the query and relation schema, for example,
+ * if using insert into xx select 1, 2 here need to map to column names
+ * @param query
+ * @param hoodieTable
+ * @param partitionSpec
+ * @param schema
+ * @return
+ */
+ private def needsSchemaAdjustment(query: LogicalPlan,
+ hoodieTable: HoodieInternalV2Table,
+ partitionSpec: Map[String, Option[String]],
+ schema: StructType): Boolean = {
+ val output = query.output
+ val queryOutputWithoutMetaFields = removeMetaFields(output)
+ val partitionFields = hoodieTable.hoodieCatalogTable.partitionFields
+ val partitionSchema = hoodieTable.hoodieCatalogTable.partitionSchema
+ val staticPartitionValues = partitionSpec.filter(p => p._2.isDefined).mapValues(_.get)
+
+ assert(staticPartitionValues.isEmpty ||
+ staticPartitionValues.size == partitionSchema.size,
+ s"Required partition columns is: ${partitionSchema.json}, Current static partitions " +
+ s"is: ${staticPartitionValues.mkString("," + "")}")
+
+ assert(staticPartitionValues.size + queryOutputWithoutMetaFields.size
+ == hoodieTable.hoodieCatalogTable.tableSchemaWithoutMetaFields.size,
+ s"Required select columns count: ${hoodieTable.hoodieCatalogTable.tableSchemaWithoutMetaFields.size}, " +
+ s"Current select columns(including static partition column) count: " +
+ s"${staticPartitionValues.size + queryOutputWithoutMetaFields.size},columns: " +
+ s"(${(queryOutputWithoutMetaFields.map(_.name) ++ staticPartitionValues.keys).mkString(",")})")
+
+ // static partition insert.
+ if (staticPartitionValues.nonEmpty) {
+ // drop partition fields in origin schema to align fields.
+ schema.dropWhile(p => partitionFields.contains(p.name))
+ }
+
+ val existingSchemaOutput = output.take(schema.length)
+ existingSchemaOutput.map(_.name) != schema.map(_.name) ||
+ existingSchemaOutput.map(_.dataType) != schema.map(_.dataType)
+ }
+
+ private def resolveQueryColumnsByOrdinal(query: LogicalPlan,
+ targetAttrs: Seq[Attribute]): LogicalPlan = {
+ // always add a Cast. it will be removed in the optimizer if it is unnecessary.
+ val project = query.output.zipWithIndex.map { case (attr, i) =>
+ if (i < targetAttrs.length) {
+ val targetAttr = targetAttrs(i)
+ val castAttr = castIfNeeded(attr.withNullability(targetAttr.nullable), targetAttr.dataType, conf)
+ Alias(castAttr, targetAttr.name)()
+ } else {
+ attr
+ }
+ }
+ Project(project, query)
+ }
+}
+
+/**
+ * Rule for resolve hoodie's extended syntax or rewrite some logical plan.
+ * @param sparkSession
+ */
+case class HoodieSpark3ResolveReferences(sparkSession: SparkSession) extends Rule[LogicalPlan]
+ with SparkAdapterSupport with ProvidesHoodieConfig {
+
+ def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperatorsUp {
+ // Fill schema for Create Table without specify schema info
+ case c @ CreateV2Table(tableCatalog, tableName, schema, partitioning, properties, _)
+ if sparkAdapter.isHoodieTable(properties.asJava) =>
+
+ if (schema.isEmpty && partitioning.nonEmpty) {
+ failAnalysis("It is not allowed to specify partition columns when the table schema is " +
+ "not defined. When the table schema is not provided, schema and partition columns " +
+ "will be inferred.")
+ }
+ val hoodieCatalog = tableCatalog match {
+ case catalog: HoodieCatalog => catalog
+ case _ => tableCatalog.asInstanceOf[V2SessionCatalog]
+ }
+ val tablePath = getTableLocation(properties,
+ TableIdentifier(tableName.name(), tableName.namespace().lastOption), sparkSession)
+
+ val tableExistInCatalog = hoodieCatalog.tableExists(tableName)
+ // Only when the table has not exist in catalog, we need to fill the schema info for creating table.
+ if (!tableExistInCatalog && tableExistsInPath(tablePath, sparkSession.sessionState.newHadoopConf())) {
+ val metaClient = HoodieTableMetaClient.builder()
+ .setBasePath(tablePath)
+ .setConf(sparkSession.sessionState.newHadoopConf())
+ .build()
+ val tableSchema = HoodieSqlCommonUtils.getTableSqlSchema(metaClient)
+ if (tableSchema.isDefined && schema.isEmpty) {
+ // Fill the schema with the schema from the table
+ c.copy(tableSchema = tableSchema.get)
+ } else if (tableSchema.isDefined && schema != tableSchema.get) {
+ throw new AnalysisException(s"Specified schema in create table statement is not equal to the table schema." +
+ s"You should not specify the schema for an exist table: $tableName ")
+ } else {
+ c
+ }
+ } else {
+ c
+ }
+ case p => p
+ }
+}
+
+/**
+ * Rule for rewrite some spark commands to hudi's implementation.
+ * @param sparkSession
+ */
+case class HoodieSpark3PostAnalysisRule(sparkSession: SparkSession) extends Rule[LogicalPlan] {
+ override def apply(plan: LogicalPlan): LogicalPlan = {
+ plan match {
+ case ShowPartitions(child, specOpt, _)
+ if child.isInstanceOf[ResolvedTable] &&
+ child.asInstanceOf[ResolvedTable].table.isInstanceOf[HoodieInternalV2Table] =>
+ ShowHoodieTablePartitionsCommand(child.asInstanceOf[ResolvedTable].identifier.asTableIdentifier, specOpt.map(s => s.asInstanceOf[UnresolvedPartitionSpec].spec))
+
+ // Rewrite TruncateTableCommand to TruncateHoodieTableCommand
+ case TruncateTable(child)
+ if child.isInstanceOf[ResolvedTable] &&
+ child.asInstanceOf[ResolvedTable].table.isInstanceOf[HoodieInternalV2Table] =>
+ new TruncateHoodieTableCommand(child.asInstanceOf[ResolvedTable].identifier.asTableIdentifier, None)
+
+ case DropPartitions(child, specs, ifExists, purge)
+ if child.resolved && child.isInstanceOf[ResolvedTable] && child.asInstanceOf[ResolvedTable].table.isInstanceOf[HoodieInternalV2Table] =>
+ AlterHoodieTableDropPartitionCommand(
+ child.asInstanceOf[ResolvedTable].identifier.asTableIdentifier,
+ specs.seq.map(f => f.asInstanceOf[UnresolvedPartitionSpec]).map(s => s.spec),
+ ifExists,
+ purge,
+ retainData = true
+ )
+
+ case _ => plan
+ }
+ }
+}
diff --git a/hudi-spark-datasource/hudi-spark3/src/main/scala/org/apache/spark/sql/hudi/catalog/BasicStagedTable.scala b/hudi-spark-datasource/hudi-spark3/src/main/scala/org/apache/spark/sql/hudi/catalog/BasicStagedTable.scala
new file mode 100644
index 000000000..67d9e1ebb
--- /dev/null
+++ b/hudi-spark-datasource/hudi-spark3/src/main/scala/org/apache/spark/sql/hudi/catalog/BasicStagedTable.scala
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hudi.catalog
+
+import org.apache.hudi.exception.HoodieException
+import org.apache.spark.sql.connector.catalog._
+import org.apache.spark.sql.connector.expressions.Transform
+import org.apache.spark.sql.connector.write.{LogicalWriteInfo, WriteBuilder}
+import org.apache.spark.sql.types.StructType
+
+import java.util
+
+/**
+ * Basic implementation that represents a table which is staged for being committed.
+ * @param ident table ident
+ * @param table table
+ * @param catalog table catalog
+ */
+case class BasicStagedTable(ident: Identifier,
+ table: Table,
+ catalog: TableCatalog) extends SupportsWrite with StagedTable {
+ override def newWriteBuilder(info: LogicalWriteInfo): WriteBuilder = {
+ info match {
+ case supportsWrite: SupportsWrite => supportsWrite.newWriteBuilder(info)
+ case _ => throw new HoodieException(s"Table `${ident.name}` does not support writes.")
+ }
+ }
+
+ override def abortStagedChanges(): Unit = catalog.dropTable(ident)
+
+ override def commitStagedChanges(): Unit = {}
+
+ override def name(): String = ident.name()
+
+ override def schema(): StructType = table.schema()
+
+ override def partitioning(): Array[Transform] = table.partitioning()
+
+ override def capabilities(): util.Set[TableCapability] = table.capabilities()
+
+ override def properties(): util.Map[String, String] = table.properties()
+}
diff --git a/hudi-spark-datasource/hudi-spark3/src/main/scala/org/apache/spark/sql/hudi/catalog/HoodieCatalog.scala b/hudi-spark-datasource/hudi-spark3/src/main/scala/org/apache/spark/sql/hudi/catalog/HoodieCatalog.scala
new file mode 100644
index 000000000..7b7f5c7f7
--- /dev/null
+++ b/hudi-spark-datasource/hudi-spark3/src/main/scala/org/apache/spark/sql/hudi/catalog/HoodieCatalog.scala
@@ -0,0 +1,303 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hudi.catalog
+
+import org.apache.hadoop.fs.Path
+import org.apache.hudi.{DataSourceWriteOptions, SparkAdapterSupport}
+import org.apache.hudi.client.common.HoodieSparkEngineContext
+import org.apache.hudi.common.fs.FSUtils
+import org.apache.hudi.exception.HoodieException
+import org.apache.hudi.hive.util.ConfigUtils
+import org.apache.hudi.sql.InsertMode
+import org.apache.spark.sql.HoodieSpark3SqlUtils.convertTransforms
+import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.catalyst.analysis.{NoSuchTableException, TableAlreadyExistsException, UnresolvedAttribute}
+import org.apache.spark.sql.catalyst.catalog.{CatalogTable, CatalogTableType, CatalogUtils, HoodieCatalogTable}
+import org.apache.spark.sql.connector.catalog.CatalogV2Implicits.IdentifierHelper
+import org.apache.spark.sql.connector.catalog.TableChange.{AddColumn, ColumnChange, UpdateColumnComment, UpdateColumnType}
+import org.apache.spark.sql.connector.catalog._
+import org.apache.spark.sql.connector.expressions.Transform
+import org.apache.spark.sql.execution.datasources.DataSource
+import org.apache.spark.sql.hudi.command.{AlterHoodieTableAddColumnsCommand, AlterHoodieTableChangeColumnCommand, AlterHoodieTableRenameCommand, CreateHoodieTableCommand}
+import org.apache.spark.sql.hudi.HoodieSqlCommonUtils
+import org.apache.spark.sql.types.{StructField, StructType}
+import org.apache.spark.sql.{Dataset, SaveMode, SparkSession, _}
+
+import java.util
+import scala.collection.JavaConverters.{mapAsJavaMapConverter, mapAsScalaMapConverter}
+
+class HoodieCatalog extends DelegatingCatalogExtension
+ with StagingTableCatalog
+ with SparkAdapterSupport
+ with ProvidesHoodieConfig {
+
+ val spark: SparkSession = SparkSession.active
+
+ override def stageCreate(ident: Identifier, schema: StructType, partitions: Array[Transform], properties: util.Map[String, String]): StagedTable = {
+ if (sparkAdapter.isHoodieTable(properties)) {
+ HoodieStagedTable(ident, this, schema, partitions, properties, TableCreationMode.STAGE_CREATE)
+ } else {
+ BasicStagedTable(
+ ident,
+ super.createTable(ident, schema, partitions, properties),
+ this)
+ }
+ }
+
+ override def stageReplace(ident: Identifier, schema: StructType, partitions: Array[Transform], properties: util.Map[String, String]): StagedTable = {
+ if (sparkAdapter.isHoodieTable(properties)) {
+ HoodieStagedTable(ident, this, schema, partitions, properties, TableCreationMode.STAGE_REPLACE)
+ } else {
+ super.dropTable(ident)
+ BasicStagedTable(
+ ident,
+ super.createTable(ident, schema, partitions, properties),
+ this)
+ }
+ }
+
+ override def stageCreateOrReplace(ident: Identifier,
+ schema: StructType,
+ partitions: Array[Transform],
+ properties: util.Map[String, String]): StagedTable = {
+ if (sparkAdapter.isHoodieTable(properties)) {
+ HoodieStagedTable(
+ ident, this, schema, partitions, properties, TableCreationMode.CREATE_OR_REPLACE)
+ } else {
+ try super.dropTable(ident) catch {
+ case _: NoSuchTableException => // ignore the exception
+ }
+ BasicStagedTable(
+ ident,
+ super.createTable(ident, schema, partitions, properties),
+ this)
+ }
+ }
+
+ override def loadTable(ident: Identifier): Table = {
+ try {
+ super.loadTable(ident) match {
+ case v1: V1Table if sparkAdapter.isHoodieTable(v1.catalogTable) =>
+ HoodieInternalV2Table(
+ spark,
+ v1.catalogTable.location.toString,
+ catalogTable = Some(v1.catalogTable),
+ tableIdentifier = Some(ident.toString))
+ case o => o
+ }
+ } catch {
+ case e: Exception =>
+ throw e
+ }
+ }
+
+ override def createTable(ident: Identifier,
+ schema: StructType,
+ partitions: Array[Transform],
+ properties: util.Map[String, String]): Table = {
+ createHoodieTable(ident, schema, partitions, properties, Map.empty, Option.empty, TableCreationMode.CREATE)
+ }
+
+ override def tableExists(ident: Identifier): Boolean = super.tableExists(ident)
+
+ override def dropTable(ident: Identifier): Boolean = super.dropTable(ident)
+
+ override def purgeTable(ident: Identifier): Boolean = {
+ val table = loadTable(ident)
+ table match {
+ case hoodieTable: HoodieInternalV2Table =>
+ val location = hoodieTable.hoodieCatalogTable.tableLocation
+ val targetPath = new Path(location)
+ val engineContext = new HoodieSparkEngineContext(spark.sparkContext)
+ val fs = FSUtils.getFs(location, spark.sparkContext.hadoopConfiguration)
+ FSUtils.deleteDir(engineContext, fs, targetPath, spark.sparkContext.defaultParallelism)
+ super.dropTable(ident)
+ case _ =>
+ }
+ true
+ }
+
+ @throws[NoSuchTableException]
+ @throws[TableAlreadyExistsException]
+ override def renameTable(oldIdent: Identifier, newIdent: Identifier): Unit = {
+ loadTable(oldIdent) match {
+ case _: HoodieInternalV2Table =>
+ new AlterHoodieTableRenameCommand(oldIdent.asTableIdentifier, newIdent.asTableIdentifier, false).run(spark)
+ case _ => super.renameTable(oldIdent, newIdent)
+ }
+ }
+
+ override def alterTable(ident: Identifier, changes: TableChange*): Table = {
+ val tableIdent = TableIdentifier(ident.name(), ident.namespace().lastOption)
+ // scalastyle:off
+ val table = loadTable(ident) match {
+ case hoodieTable: HoodieInternalV2Table => hoodieTable
+ case _ => return super.alterTable(ident, changes: _*)
+ }
+ // scalastyle:on
+
+ val grouped = changes.groupBy(c => c.getClass)
+
+ grouped.foreach {
+ case (t, newColumns) if t == classOf[AddColumn] =>
+ AlterHoodieTableAddColumnsCommand(
+ tableIdent,
+ newColumns.asInstanceOf[Seq[AddColumn]].map { col =>
+ StructField(
+ col.fieldNames()(0),
+ col.dataType(),
+ col.isNullable)
+ }).run(spark)
+ case (t, columnChanges) if classOf[ColumnChange].isAssignableFrom(t) =>
+ columnChanges.foreach {
+ case dataType: UpdateColumnType =>
+ val colName = UnresolvedAttribute(dataType.fieldNames()).name
+ val newDataType = dataType.newDataType()
+ val structField = StructField(colName, newDataType)
+ AlterHoodieTableChangeColumnCommand(tableIdent, colName, structField).run(spark)
+ case dataType: UpdateColumnComment =>
+ val newComment = dataType.newComment()
+ val colName = UnresolvedAttribute(dataType.fieldNames()).name
+ val fieldOpt = table.schema().findNestedField(dataType.fieldNames(), includeCollections = true,
+ spark.sessionState.conf.resolver).map(_._2)
+ val field = fieldOpt.getOrElse {
+ throw new AnalysisException(
+ s"Couldn't find column $colName in:\n${table.schema().treeString}")
+ }
+ AlterHoodieTableChangeColumnCommand(tableIdent, colName, field.withComment(newComment)).run(spark)
+ }
+ case (t, _) =>
+ throw new UnsupportedOperationException(s"not supported table change: ${t.getClass}")
+ }
+
+ loadTable(ident)
+ }
+
+ def createHoodieTable(ident: Identifier,
+ schema: StructType,
+ partitions: Array[Transform],
+ allTableProperties: util.Map[String, String],
+ writeOptions: Map[String, String],
+ sourceQuery: Option[DataFrame],
+ operation: TableCreationMode): Table = {
+
+ val (partitionColumns, maybeBucketSpec) = convertTransforms(partitions)
+ val newSchema = schema
+ val newPartitionColumns = partitionColumns
+ val newBucketSpec = maybeBucketSpec
+
+ val isByPath = isPathIdentifier(ident)
+
+ val location = if (isByPath) Option(ident.name()) else Option(allTableProperties.get("location"))
+ val id = ident.asTableIdentifier
+
+ val locUriOpt = location.map(CatalogUtils.stringToURI)
+ val existingTableOpt = getExistingTableIfExists(id)
+ val loc = locUriOpt
+ .orElse(existingTableOpt.flatMap(_.storage.locationUri))
+ .getOrElse(spark.sessionState.catalog.defaultTablePath(id))
+ val storage = DataSource.buildStorageFormatFromOptions(writeOptions)
+ .copy(locationUri = Option(loc))
+ val tableType =
+ if (location.isDefined) CatalogTableType.EXTERNAL else CatalogTableType.MANAGED
+ val commentOpt = Option(allTableProperties.get("comment"))
+
+ val tablePropertiesNew = new util.HashMap[String, String](allTableProperties)
+ // put path to table properties.
+ tablePropertiesNew.put("path", loc.getPath)
+
+ val tableDesc = new CatalogTable(
+ identifier = id,
+ tableType = tableType,
+ storage = storage,
+ schema = newSchema,
+ provider = Option("hudi"),
+ partitionColumnNames = newPartitionColumns,
+ bucketSpec = newBucketSpec,
+ properties = tablePropertiesNew.asScala.toMap,
+ comment = commentOpt)
+
+ val hoodieCatalogTable = HoodieCatalogTable(spark, tableDesc)
+
+ if (operation == TableCreationMode.STAGE_CREATE) {
+ val tablePath = hoodieCatalogTable.tableLocation
+ val hadoopConf = spark.sessionState.newHadoopConf()
+ assert(HoodieSqlCommonUtils.isEmptyPath(tablePath, hadoopConf),
+ s"Path '$tablePath' should be empty for CTAS")
+ hoodieCatalogTable.initHoodieTable()
+
+ val tblProperties = hoodieCatalogTable.catalogProperties
+ val options = Map(
+ DataSourceWriteOptions.HIVE_CREATE_MANAGED_TABLE.key -> (tableDesc.tableType == CatalogTableType.MANAGED).toString,
+ DataSourceWriteOptions.HIVE_TABLE_SERDE_PROPERTIES.key -> ConfigUtils.configToString(tblProperties.asJava),
+ DataSourceWriteOptions.HIVE_TABLE_PROPERTIES.key -> ConfigUtils.configToString(tableDesc.properties.asJava),
+ DataSourceWriteOptions.SQL_INSERT_MODE.key -> InsertMode.NON_STRICT.value(),
+ DataSourceWriteOptions.SQL_ENABLE_BULK_INSERT.key -> "true"
+ )
+ saveSourceDF(sourceQuery, tableDesc.properties ++ buildHoodieInsertConfig(hoodieCatalogTable, spark, isOverwrite = false, Map.empty, options))
+ CreateHoodieTableCommand.createTableInCatalog(spark, hoodieCatalogTable, ignoreIfExists = false)
+ } else if (sourceQuery.isEmpty) {
+ saveSourceDF(sourceQuery, tableDesc.properties)
+ new CreateHoodieTableCommand(tableDesc, false).run(spark)
+ } else {
+ saveSourceDF(sourceQuery, tableDesc.properties ++ buildHoodieInsertConfig(hoodieCatalogTable, spark, isOverwrite = false, Map.empty, Map.empty))
+ new CreateHoodieTableCommand(tableDesc, false).run(spark)
+ }
+
+ loadTable(ident)
+ }
+
+ private def isPathIdentifier(ident: Identifier) = new Path(ident.name()).isAbsolute
+
+ protected def isPathIdentifier(table: CatalogTable): Boolean = {
+ isPathIdentifier(table.identifier)
+ }
+
+ protected def isPathIdentifier(tableIdentifier: TableIdentifier): Boolean = {
+ isPathIdentifier(HoodieIdentifier(tableIdentifier.database.toArray, tableIdentifier.table))
+ }
+
+ private def getExistingTableIfExists(table: TableIdentifier): Option[CatalogTable] = {
+ // If this is a path identifier, we cannot return an existing CatalogTable. The Create command
+ // will check the file system itself
+ val catalog = spark.sessionState.catalog
+ // scalastyle:off
+ if (isPathIdentifier(table)) return None
+ // scalastyle:on
+ val tableExists = catalog.tableExists(table)
+ if (tableExists) {
+ val oldTable = catalog.getTableMetadata(table)
+ if (oldTable.tableType == CatalogTableType.VIEW) throw new HoodieException(
+ s"$table is a view. You may not write data into a view.")
+ if (!sparkAdapter.isHoodieTable(oldTable)) throw new HoodieException(s"$table is not a Hoodie table.")
+ Some(oldTable)
+ } else None
+ }
+
+ private def saveSourceDF(sourceQuery: Option[Dataset[_]],
+ properties: Map[String, String]): Unit = {
+ sourceQuery.map(df => {
+ df.write.format("org.apache.hudi")
+ .options(properties)
+ .mode(SaveMode.Append)
+ .save()
+ df
+ })
+ }
+}
diff --git a/hudi-spark-datasource/hudi-spark3/src/main/scala/org/apache/spark/sql/hudi/catalog/HoodieInternalV2Table.scala b/hudi-spark-datasource/hudi-spark3/src/main/scala/org/apache/spark/sql/hudi/catalog/HoodieInternalV2Table.scala
new file mode 100644
index 000000000..a9bad42d6
--- /dev/null
+++ b/hudi-spark-datasource/hudi-spark3/src/main/scala/org/apache/spark/sql/hudi/catalog/HoodieInternalV2Table.scala
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hudi.catalog
+
+import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient}
+import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.catalyst.catalog.{CatalogTable, HoodieCatalogTable}
+import org.apache.spark.sql.connector.catalog.TableCapability._
+import org.apache.spark.sql.connector.catalog.{SupportsWrite, Table, TableCapability, V2TableWithV1Fallback}
+import org.apache.spark.sql.connector.expressions.{FieldReference, IdentityTransform, Transform}
+import org.apache.spark.sql.connector.write._
+import org.apache.spark.sql.sources.{Filter, InsertableRelation}
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.sql.util.CaseInsensitiveStringMap
+import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession}
+
+import java.util
+import scala.collection.JavaConverters.{mapAsJavaMapConverter, setAsJavaSetConverter}
+
+case class HoodieInternalV2Table(spark: SparkSession,
+ path: String,
+ catalogTable: Option[CatalogTable] = None,
+ tableIdentifier: Option[String] = None,
+ options: CaseInsensitiveStringMap = CaseInsensitiveStringMap.empty())
+ extends Table with SupportsWrite with V2TableWithV1Fallback {
+
+ lazy val hoodieCatalogTable: HoodieCatalogTable = if (catalogTable.isDefined) {
+ HoodieCatalogTable(spark, catalogTable.get)
+ } else {
+ val metaClient: HoodieTableMetaClient = HoodieTableMetaClient.builder()
+ .setBasePath(path)
+ .setConf(SparkSession.active.sessionState.newHadoopConf)
+ .build()
+
+ val tableConfig: HoodieTableConfig = metaClient.getTableConfig
+ val tableName: String = tableConfig.getTableName
+
+ HoodieCatalogTable(spark, TableIdentifier(tableName))
+ }
+
+ private lazy val tableSchema: StructType = hoodieCatalogTable.tableSchema
+
+ override def name(): String = hoodieCatalogTable.table.identifier.unquotedString
+
+ override def schema(): StructType = tableSchema
+
+ override def capabilities(): util.Set[TableCapability] = Set(
+ BATCH_READ, V1_BATCH_WRITE, OVERWRITE_BY_FILTER, TRUNCATE, ACCEPT_ANY_SCHEMA
+ ).asJava
+
+ override def properties(): util.Map[String, String] = {
+ hoodieCatalogTable.catalogProperties.asJava
+ }
+
+ override def newWriteBuilder(info: LogicalWriteInfo): WriteBuilder = {
+ new HoodieV1WriteBuilder(info.options, hoodieCatalogTable, spark)
+ }
+
+ override def v1Table: CatalogTable = hoodieCatalogTable.table
+
+ override def partitioning(): Array[Transform] = {
+ hoodieCatalogTable.partitionFields.map { col =>
+ new IdentityTransform(new FieldReference(Seq(col)))
+ }.toArray
+ }
+
+}
+
+private class HoodieV1WriteBuilder(writeOptions: CaseInsensitiveStringMap,
+ hoodieCatalogTable: HoodieCatalogTable,
+ spark: SparkSession)
+ extends SupportsTruncate with SupportsOverwrite with ProvidesHoodieConfig {
+
+ private var forceOverwrite = false
+
+ override def truncate(): HoodieV1WriteBuilder = {
+ forceOverwrite = true
+ this
+ }
+
+ override def overwrite(filters: Array[Filter]): WriteBuilder = {
+ forceOverwrite = true
+ this
+ }
+
+ override def build(): V1Write = new V1Write {
+ override def toInsertableRelation: InsertableRelation = {
+ new InsertableRelation {
+ override def insert(data: DataFrame, overwrite: Boolean): Unit = {
+ val mode = if (forceOverwrite && hoodieCatalogTable.partitionFields.isEmpty) {
+ // insert overwrite non-partition table
+ SaveMode.Overwrite
+ } else {
+ // for insert into or insert overwrite partition we use append mode.
+ SaveMode.Append
+ }
+ alignOutputColumns(data).write.format("org.apache.hudi")
+ .mode(mode)
+ .options(buildHoodieConfig(hoodieCatalogTable) ++
+ buildHoodieInsertConfig(hoodieCatalogTable, spark, forceOverwrite, Map.empty, Map.empty))
+ .save()
+ }
+ }
+ }
+ }
+
+ private def alignOutputColumns(data: DataFrame): DataFrame = {
+ val schema = hoodieCatalogTable.tableSchema
+ spark.createDataFrame(data.toJavaRDD, schema)
+ }
+}
diff --git a/hudi-spark-datasource/hudi-spark3/src/main/scala/org/apache/spark/sql/hudi/catalog/HoodieStagedTable.scala b/hudi-spark-datasource/hudi-spark3/src/main/scala/org/apache/spark/sql/hudi/catalog/HoodieStagedTable.scala
new file mode 100644
index 000000000..403486216
--- /dev/null
+++ b/hudi-spark-datasource/hudi-spark3/src/main/scala/org/apache/spark/sql/hudi/catalog/HoodieStagedTable.scala
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hudi.catalog
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.Path
+import org.apache.hudi.DataSourceWriteOptions.RECORDKEY_FIELD
+import org.apache.spark.sql.DataFrame
+import org.apache.spark.sql.connector.catalog.{Identifier, StagedTable, SupportsWrite, TableCapability}
+import org.apache.spark.sql.connector.expressions.Transform
+import org.apache.spark.sql.connector.write.{LogicalWriteInfo, V1Write, WriteBuilder}
+import org.apache.spark.sql.sources.InsertableRelation
+import org.apache.spark.sql.types.StructType
+
+import java.util
+import scala.collection.JavaConverters.{mapAsScalaMapConverter, setAsJavaSetConverter}
+
+case class HoodieStagedTable(ident: Identifier,
+ catalog: HoodieCatalog,
+ override val schema: StructType,
+ partitions: Array[Transform],
+ override val properties: util.Map[String, String],
+ mode: TableCreationMode) extends StagedTable with SupportsWrite {
+
+ private var sourceQuery: Option[DataFrame] = None
+ private var writeOptions: Map[String, String] = Map.empty
+
+ override def commitStagedChanges(): Unit = {
+ val props = new util.HashMap[String, String]()
+ val optionsThroughProperties = properties.asScala.collect {
+ case (k, _) if k.startsWith("option.") => k.stripPrefix("option.")
+ }.toSet
+ val sqlWriteOptions = new util.HashMap[String, String]()
+ properties.asScala.foreach { case (k, v) =>
+ if (!k.startsWith("option.") && !optionsThroughProperties.contains(k)) {
+ props.put(k, v)
+ } else if (optionsThroughProperties.contains(k)) {
+ sqlWriteOptions.put(k, v)
+ }
+ }
+ if (writeOptions.isEmpty && !sqlWriteOptions.isEmpty) {
+ writeOptions = sqlWriteOptions.asScala.toMap
+ }
+ props.putAll(properties)
+ props.put("hoodie.table.name", ident.name())
+ props.put(RECORDKEY_FIELD.key, properties.get("primaryKey"))
+ catalog.createHoodieTable(ident, schema, partitions, props, writeOptions, sourceQuery, mode)
+ }
+
+ override def name(): String = ident.name()
+
+ override def abortStagedChanges(): Unit = {
+ clearTablePath(properties.get("location"), catalog.spark.sparkContext.hadoopConfiguration)
+ }
+
+ private def clearTablePath(tablePath: String, conf: Configuration): Unit = {
+ val path = new Path(tablePath)
+ val fs = path.getFileSystem(conf)
+ fs.delete(path, true)
+ }
+
+ override def capabilities(): util.Set[TableCapability] = Set(TableCapability.V1_BATCH_WRITE).asJava
+
+ override def newWriteBuilder(info: LogicalWriteInfo): WriteBuilder = {
+ writeOptions = info.options.asCaseSensitiveMap().asScala.toMap
+ new HoodieV1WriteBuilder
+ }
+
+ /*
+ * WriteBuilder for creating a Hoodie table.
+ */
+ private class HoodieV1WriteBuilder extends WriteBuilder {
+ override def build(): V1Write = new V1Write {
+ override def toInsertableRelation(): InsertableRelation = {
+ new InsertableRelation {
+ override def insert(data: DataFrame, overwrite: Boolean): Unit = {
+ sourceQuery = Option(data)
+ }
+ }
+ }
+ }
+ }
+
+}
diff --git a/hudi-spark-datasource/hudi-spark3/src/main/scala/org/apache/spark/sql/hudi/catalog/ProvidesHoodieConfig.scala b/hudi-spark-datasource/hudi-spark3/src/main/scala/org/apache/spark/sql/hudi/catalog/ProvidesHoodieConfig.scala
new file mode 100644
index 000000000..30fdde168
--- /dev/null
+++ b/hudi-spark-datasource/hudi-spark3/src/main/scala/org/apache/spark/sql/hudi/catalog/ProvidesHoodieConfig.scala
@@ -0,0 +1,183 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hudi.catalog
+
+import org.apache.hudi.DataSourceWriteOptions
+import org.apache.hudi.DataSourceWriteOptions._
+import org.apache.hudi.common.model.OverwriteWithLatestAvroPayload
+import org.apache.hudi.config.HoodieWriteConfig
+import org.apache.hudi.config.HoodieWriteConfig.TBL_NAME
+import org.apache.hudi.hive.MultiPartKeysValueExtractor
+import org.apache.hudi.hive.ddl.HiveSyncMode
+import org.apache.hudi.keygen.ComplexKeyGenerator
+import org.apache.hudi.sql.InsertMode
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.catalog.HoodieCatalogTable
+import org.apache.spark.sql.hudi.HoodieSqlCommonUtils.{isEnableHive, withSparkConf}
+import org.apache.spark.sql.hudi.command.{SqlKeyGenerator, ValidateDuplicateKeyPayload}
+
+import scala.collection.JavaConverters.propertiesAsScalaMapConverter
+
+trait ProvidesHoodieConfig extends Logging {
+
+ def buildHoodieConfig(hoodieCatalogTable: HoodieCatalogTable): Map[String, String] = {
+ val sparkSession: SparkSession = hoodieCatalogTable.spark
+ val catalogProperties = hoodieCatalogTable.catalogProperties
+ val tableConfig = hoodieCatalogTable.tableConfig
+ val tableId = hoodieCatalogTable.table.identifier
+
+ val preCombineField = Option(tableConfig.getPreCombineField).getOrElse("")
+ require(hoodieCatalogTable.primaryKeys.nonEmpty,
+ s"There are no primary key in table ${hoodieCatalogTable.table.identifier}, cannot execute update operator")
+ val enableHive = isEnableHive(sparkSession)
+
+ withSparkConf(sparkSession, catalogProperties) {
+ Map(
+ "path" -> hoodieCatalogTable.tableLocation,
+ RECORDKEY_FIELD.key -> hoodieCatalogTable.primaryKeys.mkString(","),
+ PRECOMBINE_FIELD.key -> preCombineField,
+ TBL_NAME.key -> hoodieCatalogTable.tableName,
+ HIVE_STYLE_PARTITIONING.key -> tableConfig.getHiveStylePartitioningEnable,
+ URL_ENCODE_PARTITIONING.key -> tableConfig.getUrlEncodePartitioning,
+ KEYGENERATOR_CLASS_NAME.key -> classOf[SqlKeyGenerator].getCanonicalName,
+ SqlKeyGenerator.ORIGIN_KEYGEN_CLASS_NAME -> tableConfig.getKeyGeneratorClassName,
+ OPERATION.key -> UPSERT_OPERATION_OPT_VAL,
+ PARTITIONPATH_FIELD.key -> tableConfig.getPartitionFieldProp,
+ META_SYNC_ENABLED.key -> enableHive.toString,
+ HIVE_SYNC_MODE.key -> HiveSyncMode.HMS.name(),
+ HIVE_USE_JDBC.key -> "false",
+ HIVE_DATABASE.key -> tableId.database.getOrElse("default"),
+ HIVE_TABLE.key -> tableId.table,
+ HIVE_PARTITION_FIELDS.key -> tableConfig.getPartitionFieldProp,
+ HIVE_PARTITION_EXTRACTOR_CLASS.key -> classOf[MultiPartKeysValueExtractor].getCanonicalName,
+ HIVE_SUPPORT_TIMESTAMP_TYPE.key -> "true",
+ HoodieWriteConfig.UPSERT_PARALLELISM_VALUE.key -> "200",
+ SqlKeyGenerator.PARTITION_SCHEMA -> hoodieCatalogTable.partitionSchema.toDDL
+ )
+ }
+ }
+
+ /**
+ * Build the default config for insert.
+ * @return
+ */
+ def buildHoodieInsertConfig(hoodieCatalogTable: HoodieCatalogTable,
+ sparkSession: SparkSession,
+ isOverwrite: Boolean,
+ insertPartitions: Map[String, Option[String]] = Map.empty,
+ extraOptions: Map[String, String]): Map[String, String] = {
+
+ if (insertPartitions.nonEmpty &&
+ (insertPartitions.keys.toSet != hoodieCatalogTable.partitionFields.toSet)) {
+ throw new IllegalArgumentException(s"Insert partition fields" +
+ s"[${insertPartitions.keys.mkString(" " )}]" +
+ s" not equal to the defined partition in table[${hoodieCatalogTable.partitionFields.mkString(",")}]")
+ }
+ val path = hoodieCatalogTable.tableLocation
+ val tableType = hoodieCatalogTable.tableTypeName
+ val tableConfig = hoodieCatalogTable.tableConfig
+ val tableSchema = hoodieCatalogTable.tableSchema
+
+ val options = hoodieCatalogTable.catalogProperties ++ tableConfig.getProps.asScala.toMap ++ extraOptions
+ val parameters = withSparkConf(sparkSession, options)()
+
+ val preCombineColumn = hoodieCatalogTable.preCombineKey.getOrElse("")
+ val partitionFields = hoodieCatalogTable.partitionFields.mkString(",")
+
+ val hiveStylePartitioningEnable = Option(tableConfig.getHiveStylePartitioningEnable).getOrElse("true")
+ val urlEncodePartitioning = Option(tableConfig.getUrlEncodePartitioning).getOrElse("false")
+ val keyGeneratorClassName = Option(tableConfig.getKeyGeneratorClassName)
+ .getOrElse(classOf[ComplexKeyGenerator].getCanonicalName)
+
+ val enableBulkInsert = parameters.getOrElse(DataSourceWriteOptions.SQL_ENABLE_BULK_INSERT.key,
+ DataSourceWriteOptions.SQL_ENABLE_BULK_INSERT.defaultValue()).toBoolean
+ val dropDuplicate = sparkSession.conf
+ .getOption(INSERT_DROP_DUPS.key).getOrElse(INSERT_DROP_DUPS.defaultValue).toBoolean
+
+ val insertMode = InsertMode.of(parameters.getOrElse(DataSourceWriteOptions.SQL_INSERT_MODE.key,
+ DataSourceWriteOptions.SQL_INSERT_MODE.defaultValue()))
+ val isNonStrictMode = insertMode == InsertMode.NON_STRICT
+ val isPartitionedTable = hoodieCatalogTable.partitionFields.nonEmpty
+ val hasPrecombineColumn = preCombineColumn.nonEmpty
+ val operation =
+ (enableBulkInsert, isOverwrite, dropDuplicate, isNonStrictMode, isPartitionedTable) match {
+ case (true, _, _, false, _) =>
+ throw new IllegalArgumentException(s"Table with primaryKey can not use bulk insert in ${insertMode.value()} mode.")
+ case (true, true, _, _, true) =>
+ throw new IllegalArgumentException(s"Insert Overwrite Partition can not use bulk insert.")
+ case (true, _, true, _, _) =>
+ throw new IllegalArgumentException(s"Bulk insert cannot support drop duplication." +
+ s" Please disable $INSERT_DROP_DUPS and try again.")
+ // if enableBulkInsert is true, use bulk insert for the insert overwrite non-partitioned table.
+ case (true, true, _, _, false) => BULK_INSERT_OPERATION_OPT_VAL
+ // insert overwrite table
+ case (false, true, _, _, false) => INSERT_OVERWRITE_TABLE_OPERATION_OPT_VAL
+ // insert overwrite partition
+ case (_, true, _, _, true) => INSERT_OVERWRITE_OPERATION_OPT_VAL
+ // disable dropDuplicate, and provide preCombineKey, use the upsert operation for strict and upsert mode.
+ case (false, false, false, false, _) if hasPrecombineColumn => UPSERT_OPERATION_OPT_VAL
+ // if table is pk table and has enableBulkInsert use bulk insert for non-strict mode.
+ case (true, _, _, true, _) => BULK_INSERT_OPERATION_OPT_VAL
+ // for the rest case, use the insert operation
+ case _ => INSERT_OPERATION_OPT_VAL
+ }
+
+ val payloadClassName = if (operation == UPSERT_OPERATION_OPT_VAL &&
+ tableType == COW_TABLE_TYPE_OPT_VAL && insertMode == InsertMode.STRICT) {
+ // Only validate duplicate key for COW, for MOR it will do the merge with the DefaultHoodieRecordPayload
+ // on reading.
+ classOf[ValidateDuplicateKeyPayload].getCanonicalName
+ } else {
+ classOf[OverwriteWithLatestAvroPayload].getCanonicalName
+ }
+ logInfo(s"insert statement use write operation type: $operation, payloadClass: $payloadClassName")
+
+ val enableHive = isEnableHive(sparkSession)
+ withSparkConf(sparkSession, options) {
+ Map(
+ "path" -> path,
+ TABLE_TYPE.key -> tableType,
+ TBL_NAME.key -> hoodieCatalogTable.tableName,
+ PRECOMBINE_FIELD.key -> preCombineColumn,
+ OPERATION.key -> operation,
+ HIVE_STYLE_PARTITIONING.key -> hiveStylePartitioningEnable,
+ URL_ENCODE_PARTITIONING.key -> urlEncodePartitioning,
+ KEYGENERATOR_CLASS_NAME.key -> classOf[SqlKeyGenerator].getCanonicalName,
+ SqlKeyGenerator.ORIGIN_KEYGEN_CLASS_NAME -> keyGeneratorClassName,
+ RECORDKEY_FIELD.key -> hoodieCatalogTable.primaryKeys.mkString(","),
+ PARTITIONPATH_FIELD.key -> partitionFields,
+ PAYLOAD_CLASS_NAME.key -> payloadClassName,
+ ENABLE_ROW_WRITER.key -> enableBulkInsert.toString,
+ HoodieWriteConfig.COMBINE_BEFORE_INSERT.key -> String.valueOf(hasPrecombineColumn),
+ META_SYNC_ENABLED.key -> enableHive.toString,
+ HIVE_SYNC_MODE.key -> HiveSyncMode.HMS.name(),
+ HIVE_USE_JDBC.key -> "false",
+ HIVE_DATABASE.key -> hoodieCatalogTable.table.identifier.database.getOrElse("default"),
+ HIVE_TABLE.key -> hoodieCatalogTable.table.identifier.table,
+ HIVE_SUPPORT_TIMESTAMP_TYPE.key -> "true",
+ HIVE_PARTITION_FIELDS.key -> partitionFields,
+ HIVE_PARTITION_EXTRACTOR_CLASS.key -> classOf[MultiPartKeysValueExtractor].getCanonicalName,
+ HoodieWriteConfig.INSERT_PARALLELISM_VALUE.key -> "200",
+ HoodieWriteConfig.UPSERT_PARALLELISM_VALUE.key -> "200",
+ SqlKeyGenerator.PARTITION_SCHEMA -> hoodieCatalogTable.partitionSchema.toDDL
+ )
+ }
+ }
+}
diff --git a/hudi-spark-datasource/hudi-spark3/src/main/scala/org/apache/spark/sql/hudi/catalog/TableCreationMode.java b/hudi-spark-datasource/hudi-spark3/src/main/scala/org/apache/spark/sql/hudi/catalog/TableCreationMode.java
new file mode 100644
index 000000000..8b54775be
--- /dev/null
+++ b/hudi-spark-datasource/hudi-spark3/src/main/scala/org/apache/spark/sql/hudi/catalog/TableCreationMode.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hudi.catalog;
+
+public enum TableCreationMode {
+ CREATE, CREATE_OR_REPLACE, STAGE_CREATE, STAGE_REPLACE
+}
diff --git a/style/scalastyle.xml b/style/scalastyle.xml
index 2ba4042be..74d7b9d73 100644
--- a/style/scalastyle.xml
+++ b/style/scalastyle.xml
@@ -113,7 +113,7 @@
-
+