1
0

[HUDI-3254] Introduce HoodieCatalog to manage tables for Spark Datasource V2 (#4611)

This commit is contained in:
leesf
2022-02-14 22:26:58 +08:00
committed by GitHub
parent 5ca4480a38
commit 0db1e978c6
26 changed files with 1288 additions and 81 deletions

View File

@@ -17,8 +17,30 @@
package org.apache.hudi
import org.apache.hudi.exception.HoodieException
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.connector.catalog.{Table, TableProvider}
import org.apache.spark.sql.connector.expressions.Transform
import org.apache.spark.sql.hudi.catalog.HoodieInternalV2Table
import org.apache.spark.sql.sources.DataSourceRegister
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.util.CaseInsensitiveStringMap
class Spark3DefaultSource extends DefaultSource with DataSourceRegister with TableProvider {
class Spark3DefaultSource extends DefaultSource with DataSourceRegister {
override def shortName(): String = "hudi"
def inferSchema: StructType = new StructType()
override def inferSchema(options: CaseInsensitiveStringMap): StructType = inferSchema
override def getTable(schema: StructType,
partitioning: Array[Transform],
properties: java.util.Map[String, String]): Table = {
val options = new CaseInsensitiveStringMap(properties)
val path = options.get("path")
if (path == null) throw new HoodieException("'path' cannot be null, missing 'path' from table properties")
HoodieInternalV2Table(SparkSession.active, path)
}
}

View File

@@ -0,0 +1,43 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.connector.catalog
import java.util
import java.util.Objects
/**
* This class is to make scala-2.11 compilable.
* Using Identifier.of(namespace, name) to get a IdentifierImpl will throw
* compile exception( Static methods in interface require -target:jvm-1.8)
*/
case class HoodieIdentifier(namespace: Array[String], name: String) extends Identifier {
override def equals(o: Any): Boolean = {
o match {
case that: HoodieIdentifier => util.Arrays.equals(namespace.asInstanceOf[Array[Object]],
that.namespace.asInstanceOf[Array[Object]]) && name == that.name
case _ => false
}
}
override def hashCode: Int = {
val nh = namespace.toSeq.hashCode().asInstanceOf[Object]
Objects.hash(nh, name)
}
}

View File

@@ -0,0 +1,206 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.hudi.analysis
import org.apache.hudi.common.table.HoodieTableMetaClient
import org.apache.hudi.{DefaultSource, SparkAdapterSupport}
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.analysis.{ResolvedTable, UnresolvedPartitionSpec}
import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute}
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.connector.catalog.CatalogV2Implicits.IdentifierHelper
import org.apache.spark.sql.execution.datasources.LogicalRelation
import org.apache.spark.sql.execution.datasources.PreWriteCheck.failAnalysis
import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2Relation, V2SessionCatalog}
import org.apache.spark.sql.hudi.HoodieSqlCommonUtils
import org.apache.spark.sql.hudi.HoodieSqlCommonUtils.{castIfNeeded, getTableLocation, removeMetaFields, tableExistsInPath}
import org.apache.spark.sql.hudi.catalog.{HoodieCatalog, HoodieInternalV2Table, ProvidesHoodieConfig}
import org.apache.spark.sql.hudi.command.{AlterHoodieTableDropPartitionCommand, ShowHoodieTablePartitionsCommand, TruncateHoodieTableCommand}
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.{AnalysisException, SQLContext, SparkSession}
import scala.collection.JavaConverters.mapAsJavaMapConverter
/**
* Rule for convert the logical plan to command.
* @param sparkSession
*/
case class HoodieSpark3Analysis(sparkSession: SparkSession) extends Rule[LogicalPlan]
with SparkAdapterSupport with ProvidesHoodieConfig {
override def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperatorsDown {
case dsv2 @ DataSourceV2Relation(d: HoodieInternalV2Table, _, _, _, _) =>
val output = dsv2.output
val catalogTable = if (d.catalogTable.isDefined) {
Some(d.v1Table)
} else {
None
}
val relation = new DefaultSource().createRelation(new SQLContext(sparkSession),
buildHoodieConfig(d.hoodieCatalogTable))
LogicalRelation(relation, output, catalogTable, isStreaming = false)
case a @ InsertIntoStatement(r: DataSourceV2Relation, partitionSpec, _, _, _, _) if a.query.resolved &&
r.table.isInstanceOf[HoodieInternalV2Table] &&
needsSchemaAdjustment(a.query, r.table.asInstanceOf[HoodieInternalV2Table], partitionSpec, r.schema) =>
val projection = resolveQueryColumnsByOrdinal(a.query, r.output)
if (projection != a.query) {
a.copy(query = projection)
} else {
a
}
}
/**
* Need to adjust schema based on the query and relation schema, for example,
* if using insert into xx select 1, 2 here need to map to column names
* @param query
* @param hoodieTable
* @param partitionSpec
* @param schema
* @return
*/
private def needsSchemaAdjustment(query: LogicalPlan,
hoodieTable: HoodieInternalV2Table,
partitionSpec: Map[String, Option[String]],
schema: StructType): Boolean = {
val output = query.output
val queryOutputWithoutMetaFields = removeMetaFields(output)
val partitionFields = hoodieTable.hoodieCatalogTable.partitionFields
val partitionSchema = hoodieTable.hoodieCatalogTable.partitionSchema
val staticPartitionValues = partitionSpec.filter(p => p._2.isDefined).mapValues(_.get)
assert(staticPartitionValues.isEmpty ||
staticPartitionValues.size == partitionSchema.size,
s"Required partition columns is: ${partitionSchema.json}, Current static partitions " +
s"is: ${staticPartitionValues.mkString("," + "")}")
assert(staticPartitionValues.size + queryOutputWithoutMetaFields.size
== hoodieTable.hoodieCatalogTable.tableSchemaWithoutMetaFields.size,
s"Required select columns count: ${hoodieTable.hoodieCatalogTable.tableSchemaWithoutMetaFields.size}, " +
s"Current select columns(including static partition column) count: " +
s"${staticPartitionValues.size + queryOutputWithoutMetaFields.size}columns: " +
s"(${(queryOutputWithoutMetaFields.map(_.name) ++ staticPartitionValues.keys).mkString(",")})")
// static partition insert.
if (staticPartitionValues.nonEmpty) {
// drop partition fields in origin schema to align fields.
schema.dropWhile(p => partitionFields.contains(p.name))
}
val existingSchemaOutput = output.take(schema.length)
existingSchemaOutput.map(_.name) != schema.map(_.name) ||
existingSchemaOutput.map(_.dataType) != schema.map(_.dataType)
}
private def resolveQueryColumnsByOrdinal(query: LogicalPlan,
targetAttrs: Seq[Attribute]): LogicalPlan = {
// always add a Cast. it will be removed in the optimizer if it is unnecessary.
val project = query.output.zipWithIndex.map { case (attr, i) =>
if (i < targetAttrs.length) {
val targetAttr = targetAttrs(i)
val castAttr = castIfNeeded(attr.withNullability(targetAttr.nullable), targetAttr.dataType, conf)
Alias(castAttr, targetAttr.name)()
} else {
attr
}
}
Project(project, query)
}
}
/**
* Rule for resolve hoodie's extended syntax or rewrite some logical plan.
* @param sparkSession
*/
case class HoodieSpark3ResolveReferences(sparkSession: SparkSession) extends Rule[LogicalPlan]
with SparkAdapterSupport with ProvidesHoodieConfig {
def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperatorsUp {
// Fill schema for Create Table without specify schema info
case c @ CreateV2Table(tableCatalog, tableName, schema, partitioning, properties, _)
if sparkAdapter.isHoodieTable(properties.asJava) =>
if (schema.isEmpty && partitioning.nonEmpty) {
failAnalysis("It is not allowed to specify partition columns when the table schema is " +
"not defined. When the table schema is not provided, schema and partition columns " +
"will be inferred.")
}
val hoodieCatalog = tableCatalog match {
case catalog: HoodieCatalog => catalog
case _ => tableCatalog.asInstanceOf[V2SessionCatalog]
}
val tablePath = getTableLocation(properties,
TableIdentifier(tableName.name(), tableName.namespace().lastOption), sparkSession)
val tableExistInCatalog = hoodieCatalog.tableExists(tableName)
// Only when the table has not exist in catalog, we need to fill the schema info for creating table.
if (!tableExistInCatalog && tableExistsInPath(tablePath, sparkSession.sessionState.newHadoopConf())) {
val metaClient = HoodieTableMetaClient.builder()
.setBasePath(tablePath)
.setConf(sparkSession.sessionState.newHadoopConf())
.build()
val tableSchema = HoodieSqlCommonUtils.getTableSqlSchema(metaClient)
if (tableSchema.isDefined && schema.isEmpty) {
// Fill the schema with the schema from the table
c.copy(tableSchema = tableSchema.get)
} else if (tableSchema.isDefined && schema != tableSchema.get) {
throw new AnalysisException(s"Specified schema in create table statement is not equal to the table schema." +
s"You should not specify the schema for an exist table: $tableName ")
} else {
c
}
} else {
c
}
case p => p
}
}
/**
* Rule for rewrite some spark commands to hudi's implementation.
* @param sparkSession
*/
case class HoodieSpark3PostAnalysisRule(sparkSession: SparkSession) extends Rule[LogicalPlan] {
override def apply(plan: LogicalPlan): LogicalPlan = {
plan match {
case ShowPartitions(child, specOpt, _)
if child.isInstanceOf[ResolvedTable] &&
child.asInstanceOf[ResolvedTable].table.isInstanceOf[HoodieInternalV2Table] =>
ShowHoodieTablePartitionsCommand(child.asInstanceOf[ResolvedTable].identifier.asTableIdentifier, specOpt.map(s => s.asInstanceOf[UnresolvedPartitionSpec].spec))
// Rewrite TruncateTableCommand to TruncateHoodieTableCommand
case TruncateTable(child)
if child.isInstanceOf[ResolvedTable] &&
child.asInstanceOf[ResolvedTable].table.isInstanceOf[HoodieInternalV2Table] =>
new TruncateHoodieTableCommand(child.asInstanceOf[ResolvedTable].identifier.asTableIdentifier, None)
case DropPartitions(child, specs, ifExists, purge)
if child.resolved && child.isInstanceOf[ResolvedTable] && child.asInstanceOf[ResolvedTable].table.isInstanceOf[HoodieInternalV2Table] =>
AlterHoodieTableDropPartitionCommand(
child.asInstanceOf[ResolvedTable].identifier.asTableIdentifier,
specs.seq.map(f => f.asInstanceOf[UnresolvedPartitionSpec]).map(s => s.spec),
ifExists,
purge,
retainData = true
)
case _ => plan
}
}
}

View File

@@ -0,0 +1,57 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.hudi.catalog
import org.apache.hudi.exception.HoodieException
import org.apache.spark.sql.connector.catalog._
import org.apache.spark.sql.connector.expressions.Transform
import org.apache.spark.sql.connector.write.{LogicalWriteInfo, WriteBuilder}
import org.apache.spark.sql.types.StructType
import java.util
/**
* Basic implementation that represents a table which is staged for being committed.
* @param ident table ident
* @param table table
* @param catalog table catalog
*/
case class BasicStagedTable(ident: Identifier,
table: Table,
catalog: TableCatalog) extends SupportsWrite with StagedTable {
override def newWriteBuilder(info: LogicalWriteInfo): WriteBuilder = {
info match {
case supportsWrite: SupportsWrite => supportsWrite.newWriteBuilder(info)
case _ => throw new HoodieException(s"Table `${ident.name}` does not support writes.")
}
}
override def abortStagedChanges(): Unit = catalog.dropTable(ident)
override def commitStagedChanges(): Unit = {}
override def name(): String = ident.name()
override def schema(): StructType = table.schema()
override def partitioning(): Array[Transform] = table.partitioning()
override def capabilities(): util.Set[TableCapability] = table.capabilities()
override def properties(): util.Map[String, String] = table.properties()
}

View File

@@ -0,0 +1,303 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.hudi.catalog
import org.apache.hadoop.fs.Path
import org.apache.hudi.{DataSourceWriteOptions, SparkAdapterSupport}
import org.apache.hudi.client.common.HoodieSparkEngineContext
import org.apache.hudi.common.fs.FSUtils
import org.apache.hudi.exception.HoodieException
import org.apache.hudi.hive.util.ConfigUtils
import org.apache.hudi.sql.InsertMode
import org.apache.spark.sql.HoodieSpark3SqlUtils.convertTransforms
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.analysis.{NoSuchTableException, TableAlreadyExistsException, UnresolvedAttribute}
import org.apache.spark.sql.catalyst.catalog.{CatalogTable, CatalogTableType, CatalogUtils, HoodieCatalogTable}
import org.apache.spark.sql.connector.catalog.CatalogV2Implicits.IdentifierHelper
import org.apache.spark.sql.connector.catalog.TableChange.{AddColumn, ColumnChange, UpdateColumnComment, UpdateColumnType}
import org.apache.spark.sql.connector.catalog._
import org.apache.spark.sql.connector.expressions.Transform
import org.apache.spark.sql.execution.datasources.DataSource
import org.apache.spark.sql.hudi.command.{AlterHoodieTableAddColumnsCommand, AlterHoodieTableChangeColumnCommand, AlterHoodieTableRenameCommand, CreateHoodieTableCommand}
import org.apache.spark.sql.hudi.HoodieSqlCommonUtils
import org.apache.spark.sql.types.{StructField, StructType}
import org.apache.spark.sql.{Dataset, SaveMode, SparkSession, _}
import java.util
import scala.collection.JavaConverters.{mapAsJavaMapConverter, mapAsScalaMapConverter}
class HoodieCatalog extends DelegatingCatalogExtension
with StagingTableCatalog
with SparkAdapterSupport
with ProvidesHoodieConfig {
val spark: SparkSession = SparkSession.active
override def stageCreate(ident: Identifier, schema: StructType, partitions: Array[Transform], properties: util.Map[String, String]): StagedTable = {
if (sparkAdapter.isHoodieTable(properties)) {
HoodieStagedTable(ident, this, schema, partitions, properties, TableCreationMode.STAGE_CREATE)
} else {
BasicStagedTable(
ident,
super.createTable(ident, schema, partitions, properties),
this)
}
}
override def stageReplace(ident: Identifier, schema: StructType, partitions: Array[Transform], properties: util.Map[String, String]): StagedTable = {
if (sparkAdapter.isHoodieTable(properties)) {
HoodieStagedTable(ident, this, schema, partitions, properties, TableCreationMode.STAGE_REPLACE)
} else {
super.dropTable(ident)
BasicStagedTable(
ident,
super.createTable(ident, schema, partitions, properties),
this)
}
}
override def stageCreateOrReplace(ident: Identifier,
schema: StructType,
partitions: Array[Transform],
properties: util.Map[String, String]): StagedTable = {
if (sparkAdapter.isHoodieTable(properties)) {
HoodieStagedTable(
ident, this, schema, partitions, properties, TableCreationMode.CREATE_OR_REPLACE)
} else {
try super.dropTable(ident) catch {
case _: NoSuchTableException => // ignore the exception
}
BasicStagedTable(
ident,
super.createTable(ident, schema, partitions, properties),
this)
}
}
override def loadTable(ident: Identifier): Table = {
try {
super.loadTable(ident) match {
case v1: V1Table if sparkAdapter.isHoodieTable(v1.catalogTable) =>
HoodieInternalV2Table(
spark,
v1.catalogTable.location.toString,
catalogTable = Some(v1.catalogTable),
tableIdentifier = Some(ident.toString))
case o => o
}
} catch {
case e: Exception =>
throw e
}
}
override def createTable(ident: Identifier,
schema: StructType,
partitions: Array[Transform],
properties: util.Map[String, String]): Table = {
createHoodieTable(ident, schema, partitions, properties, Map.empty, Option.empty, TableCreationMode.CREATE)
}
override def tableExists(ident: Identifier): Boolean = super.tableExists(ident)
override def dropTable(ident: Identifier): Boolean = super.dropTable(ident)
override def purgeTable(ident: Identifier): Boolean = {
val table = loadTable(ident)
table match {
case hoodieTable: HoodieInternalV2Table =>
val location = hoodieTable.hoodieCatalogTable.tableLocation
val targetPath = new Path(location)
val engineContext = new HoodieSparkEngineContext(spark.sparkContext)
val fs = FSUtils.getFs(location, spark.sparkContext.hadoopConfiguration)
FSUtils.deleteDir(engineContext, fs, targetPath, spark.sparkContext.defaultParallelism)
super.dropTable(ident)
case _ =>
}
true
}
@throws[NoSuchTableException]
@throws[TableAlreadyExistsException]
override def renameTable(oldIdent: Identifier, newIdent: Identifier): Unit = {
loadTable(oldIdent) match {
case _: HoodieInternalV2Table =>
new AlterHoodieTableRenameCommand(oldIdent.asTableIdentifier, newIdent.asTableIdentifier, false).run(spark)
case _ => super.renameTable(oldIdent, newIdent)
}
}
override def alterTable(ident: Identifier, changes: TableChange*): Table = {
val tableIdent = TableIdentifier(ident.name(), ident.namespace().lastOption)
// scalastyle:off
val table = loadTable(ident) match {
case hoodieTable: HoodieInternalV2Table => hoodieTable
case _ => return super.alterTable(ident, changes: _*)
}
// scalastyle:on
val grouped = changes.groupBy(c => c.getClass)
grouped.foreach {
case (t, newColumns) if t == classOf[AddColumn] =>
AlterHoodieTableAddColumnsCommand(
tableIdent,
newColumns.asInstanceOf[Seq[AddColumn]].map { col =>
StructField(
col.fieldNames()(0),
col.dataType(),
col.isNullable)
}).run(spark)
case (t, columnChanges) if classOf[ColumnChange].isAssignableFrom(t) =>
columnChanges.foreach {
case dataType: UpdateColumnType =>
val colName = UnresolvedAttribute(dataType.fieldNames()).name
val newDataType = dataType.newDataType()
val structField = StructField(colName, newDataType)
AlterHoodieTableChangeColumnCommand(tableIdent, colName, structField).run(spark)
case dataType: UpdateColumnComment =>
val newComment = dataType.newComment()
val colName = UnresolvedAttribute(dataType.fieldNames()).name
val fieldOpt = table.schema().findNestedField(dataType.fieldNames(), includeCollections = true,
spark.sessionState.conf.resolver).map(_._2)
val field = fieldOpt.getOrElse {
throw new AnalysisException(
s"Couldn't find column $colName in:\n${table.schema().treeString}")
}
AlterHoodieTableChangeColumnCommand(tableIdent, colName, field.withComment(newComment)).run(spark)
}
case (t, _) =>
throw new UnsupportedOperationException(s"not supported table change: ${t.getClass}")
}
loadTable(ident)
}
def createHoodieTable(ident: Identifier,
schema: StructType,
partitions: Array[Transform],
allTableProperties: util.Map[String, String],
writeOptions: Map[String, String],
sourceQuery: Option[DataFrame],
operation: TableCreationMode): Table = {
val (partitionColumns, maybeBucketSpec) = convertTransforms(partitions)
val newSchema = schema
val newPartitionColumns = partitionColumns
val newBucketSpec = maybeBucketSpec
val isByPath = isPathIdentifier(ident)
val location = if (isByPath) Option(ident.name()) else Option(allTableProperties.get("location"))
val id = ident.asTableIdentifier
val locUriOpt = location.map(CatalogUtils.stringToURI)
val existingTableOpt = getExistingTableIfExists(id)
val loc = locUriOpt
.orElse(existingTableOpt.flatMap(_.storage.locationUri))
.getOrElse(spark.sessionState.catalog.defaultTablePath(id))
val storage = DataSource.buildStorageFormatFromOptions(writeOptions)
.copy(locationUri = Option(loc))
val tableType =
if (location.isDefined) CatalogTableType.EXTERNAL else CatalogTableType.MANAGED
val commentOpt = Option(allTableProperties.get("comment"))
val tablePropertiesNew = new util.HashMap[String, String](allTableProperties)
// put path to table properties.
tablePropertiesNew.put("path", loc.getPath)
val tableDesc = new CatalogTable(
identifier = id,
tableType = tableType,
storage = storage,
schema = newSchema,
provider = Option("hudi"),
partitionColumnNames = newPartitionColumns,
bucketSpec = newBucketSpec,
properties = tablePropertiesNew.asScala.toMap,
comment = commentOpt)
val hoodieCatalogTable = HoodieCatalogTable(spark, tableDesc)
if (operation == TableCreationMode.STAGE_CREATE) {
val tablePath = hoodieCatalogTable.tableLocation
val hadoopConf = spark.sessionState.newHadoopConf()
assert(HoodieSqlCommonUtils.isEmptyPath(tablePath, hadoopConf),
s"Path '$tablePath' should be empty for CTAS")
hoodieCatalogTable.initHoodieTable()
val tblProperties = hoodieCatalogTable.catalogProperties
val options = Map(
DataSourceWriteOptions.HIVE_CREATE_MANAGED_TABLE.key -> (tableDesc.tableType == CatalogTableType.MANAGED).toString,
DataSourceWriteOptions.HIVE_TABLE_SERDE_PROPERTIES.key -> ConfigUtils.configToString(tblProperties.asJava),
DataSourceWriteOptions.HIVE_TABLE_PROPERTIES.key -> ConfigUtils.configToString(tableDesc.properties.asJava),
DataSourceWriteOptions.SQL_INSERT_MODE.key -> InsertMode.NON_STRICT.value(),
DataSourceWriteOptions.SQL_ENABLE_BULK_INSERT.key -> "true"
)
saveSourceDF(sourceQuery, tableDesc.properties ++ buildHoodieInsertConfig(hoodieCatalogTable, spark, isOverwrite = false, Map.empty, options))
CreateHoodieTableCommand.createTableInCatalog(spark, hoodieCatalogTable, ignoreIfExists = false)
} else if (sourceQuery.isEmpty) {
saveSourceDF(sourceQuery, tableDesc.properties)
new CreateHoodieTableCommand(tableDesc, false).run(spark)
} else {
saveSourceDF(sourceQuery, tableDesc.properties ++ buildHoodieInsertConfig(hoodieCatalogTable, spark, isOverwrite = false, Map.empty, Map.empty))
new CreateHoodieTableCommand(tableDesc, false).run(spark)
}
loadTable(ident)
}
private def isPathIdentifier(ident: Identifier) = new Path(ident.name()).isAbsolute
protected def isPathIdentifier(table: CatalogTable): Boolean = {
isPathIdentifier(table.identifier)
}
protected def isPathIdentifier(tableIdentifier: TableIdentifier): Boolean = {
isPathIdentifier(HoodieIdentifier(tableIdentifier.database.toArray, tableIdentifier.table))
}
private def getExistingTableIfExists(table: TableIdentifier): Option[CatalogTable] = {
// If this is a path identifier, we cannot return an existing CatalogTable. The Create command
// will check the file system itself
val catalog = spark.sessionState.catalog
// scalastyle:off
if (isPathIdentifier(table)) return None
// scalastyle:on
val tableExists = catalog.tableExists(table)
if (tableExists) {
val oldTable = catalog.getTableMetadata(table)
if (oldTable.tableType == CatalogTableType.VIEW) throw new HoodieException(
s"$table is a view. You may not write data into a view.")
if (!sparkAdapter.isHoodieTable(oldTable)) throw new HoodieException(s"$table is not a Hoodie table.")
Some(oldTable)
} else None
}
private def saveSourceDF(sourceQuery: Option[Dataset[_]],
properties: Map[String, String]): Unit = {
sourceQuery.map(df => {
df.write.format("org.apache.hudi")
.options(properties)
.mode(SaveMode.Append)
.save()
df
})
}
}

View File

@@ -0,0 +1,126 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.hudi.catalog
import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient}
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.catalog.{CatalogTable, HoodieCatalogTable}
import org.apache.spark.sql.connector.catalog.TableCapability._
import org.apache.spark.sql.connector.catalog.{SupportsWrite, Table, TableCapability, V2TableWithV1Fallback}
import org.apache.spark.sql.connector.expressions.{FieldReference, IdentityTransform, Transform}
import org.apache.spark.sql.connector.write._
import org.apache.spark.sql.sources.{Filter, InsertableRelation}
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.util.CaseInsensitiveStringMap
import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession}
import java.util
import scala.collection.JavaConverters.{mapAsJavaMapConverter, setAsJavaSetConverter}
case class HoodieInternalV2Table(spark: SparkSession,
path: String,
catalogTable: Option[CatalogTable] = None,
tableIdentifier: Option[String] = None,
options: CaseInsensitiveStringMap = CaseInsensitiveStringMap.empty())
extends Table with SupportsWrite with V2TableWithV1Fallback {
lazy val hoodieCatalogTable: HoodieCatalogTable = if (catalogTable.isDefined) {
HoodieCatalogTable(spark, catalogTable.get)
} else {
val metaClient: HoodieTableMetaClient = HoodieTableMetaClient.builder()
.setBasePath(path)
.setConf(SparkSession.active.sessionState.newHadoopConf)
.build()
val tableConfig: HoodieTableConfig = metaClient.getTableConfig
val tableName: String = tableConfig.getTableName
HoodieCatalogTable(spark, TableIdentifier(tableName))
}
private lazy val tableSchema: StructType = hoodieCatalogTable.tableSchema
override def name(): String = hoodieCatalogTable.table.identifier.unquotedString
override def schema(): StructType = tableSchema
override def capabilities(): util.Set[TableCapability] = Set(
BATCH_READ, V1_BATCH_WRITE, OVERWRITE_BY_FILTER, TRUNCATE, ACCEPT_ANY_SCHEMA
).asJava
override def properties(): util.Map[String, String] = {
hoodieCatalogTable.catalogProperties.asJava
}
override def newWriteBuilder(info: LogicalWriteInfo): WriteBuilder = {
new HoodieV1WriteBuilder(info.options, hoodieCatalogTable, spark)
}
override def v1Table: CatalogTable = hoodieCatalogTable.table
override def partitioning(): Array[Transform] = {
hoodieCatalogTable.partitionFields.map { col =>
new IdentityTransform(new FieldReference(Seq(col)))
}.toArray
}
}
private class HoodieV1WriteBuilder(writeOptions: CaseInsensitiveStringMap,
hoodieCatalogTable: HoodieCatalogTable,
spark: SparkSession)
extends SupportsTruncate with SupportsOverwrite with ProvidesHoodieConfig {
private var forceOverwrite = false
override def truncate(): HoodieV1WriteBuilder = {
forceOverwrite = true
this
}
override def overwrite(filters: Array[Filter]): WriteBuilder = {
forceOverwrite = true
this
}
override def build(): V1Write = new V1Write {
override def toInsertableRelation: InsertableRelation = {
new InsertableRelation {
override def insert(data: DataFrame, overwrite: Boolean): Unit = {
val mode = if (forceOverwrite && hoodieCatalogTable.partitionFields.isEmpty) {
// insert overwrite non-partition table
SaveMode.Overwrite
} else {
// for insert into or insert overwrite partition we use append mode.
SaveMode.Append
}
alignOutputColumns(data).write.format("org.apache.hudi")
.mode(mode)
.options(buildHoodieConfig(hoodieCatalogTable) ++
buildHoodieInsertConfig(hoodieCatalogTable, spark, forceOverwrite, Map.empty, Map.empty))
.save()
}
}
}
}
private def alignOutputColumns(data: DataFrame): DataFrame = {
val schema = hoodieCatalogTable.tableSchema
spark.createDataFrame(data.toJavaRDD, schema)
}
}

View File

@@ -0,0 +1,99 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.hudi.catalog
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.apache.hudi.DataSourceWriteOptions.RECORDKEY_FIELD
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.connector.catalog.{Identifier, StagedTable, SupportsWrite, TableCapability}
import org.apache.spark.sql.connector.expressions.Transform
import org.apache.spark.sql.connector.write.{LogicalWriteInfo, V1Write, WriteBuilder}
import org.apache.spark.sql.sources.InsertableRelation
import org.apache.spark.sql.types.StructType
import java.util
import scala.collection.JavaConverters.{mapAsScalaMapConverter, setAsJavaSetConverter}
case class HoodieStagedTable(ident: Identifier,
catalog: HoodieCatalog,
override val schema: StructType,
partitions: Array[Transform],
override val properties: util.Map[String, String],
mode: TableCreationMode) extends StagedTable with SupportsWrite {
private var sourceQuery: Option[DataFrame] = None
private var writeOptions: Map[String, String] = Map.empty
override def commitStagedChanges(): Unit = {
val props = new util.HashMap[String, String]()
val optionsThroughProperties = properties.asScala.collect {
case (k, _) if k.startsWith("option.") => k.stripPrefix("option.")
}.toSet
val sqlWriteOptions = new util.HashMap[String, String]()
properties.asScala.foreach { case (k, v) =>
if (!k.startsWith("option.") && !optionsThroughProperties.contains(k)) {
props.put(k, v)
} else if (optionsThroughProperties.contains(k)) {
sqlWriteOptions.put(k, v)
}
}
if (writeOptions.isEmpty && !sqlWriteOptions.isEmpty) {
writeOptions = sqlWriteOptions.asScala.toMap
}
props.putAll(properties)
props.put("hoodie.table.name", ident.name())
props.put(RECORDKEY_FIELD.key, properties.get("primaryKey"))
catalog.createHoodieTable(ident, schema, partitions, props, writeOptions, sourceQuery, mode)
}
override def name(): String = ident.name()
override def abortStagedChanges(): Unit = {
clearTablePath(properties.get("location"), catalog.spark.sparkContext.hadoopConfiguration)
}
private def clearTablePath(tablePath: String, conf: Configuration): Unit = {
val path = new Path(tablePath)
val fs = path.getFileSystem(conf)
fs.delete(path, true)
}
override def capabilities(): util.Set[TableCapability] = Set(TableCapability.V1_BATCH_WRITE).asJava
override def newWriteBuilder(info: LogicalWriteInfo): WriteBuilder = {
writeOptions = info.options.asCaseSensitiveMap().asScala.toMap
new HoodieV1WriteBuilder
}
/*
* WriteBuilder for creating a Hoodie table.
*/
private class HoodieV1WriteBuilder extends WriteBuilder {
override def build(): V1Write = new V1Write {
override def toInsertableRelation(): InsertableRelation = {
new InsertableRelation {
override def insert(data: DataFrame, overwrite: Boolean): Unit = {
sourceQuery = Option(data)
}
}
}
}
}
}

View File

@@ -0,0 +1,183 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.hudi.catalog
import org.apache.hudi.DataSourceWriteOptions
import org.apache.hudi.DataSourceWriteOptions._
import org.apache.hudi.common.model.OverwriteWithLatestAvroPayload
import org.apache.hudi.config.HoodieWriteConfig
import org.apache.hudi.config.HoodieWriteConfig.TBL_NAME
import org.apache.hudi.hive.MultiPartKeysValueExtractor
import org.apache.hudi.hive.ddl.HiveSyncMode
import org.apache.hudi.keygen.ComplexKeyGenerator
import org.apache.hudi.sql.InsertMode
import org.apache.spark.internal.Logging
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.catalog.HoodieCatalogTable
import org.apache.spark.sql.hudi.HoodieSqlCommonUtils.{isEnableHive, withSparkConf}
import org.apache.spark.sql.hudi.command.{SqlKeyGenerator, ValidateDuplicateKeyPayload}
import scala.collection.JavaConverters.propertiesAsScalaMapConverter
trait ProvidesHoodieConfig extends Logging {
def buildHoodieConfig(hoodieCatalogTable: HoodieCatalogTable): Map[String, String] = {
val sparkSession: SparkSession = hoodieCatalogTable.spark
val catalogProperties = hoodieCatalogTable.catalogProperties
val tableConfig = hoodieCatalogTable.tableConfig
val tableId = hoodieCatalogTable.table.identifier
val preCombineField = Option(tableConfig.getPreCombineField).getOrElse("")
require(hoodieCatalogTable.primaryKeys.nonEmpty,
s"There are no primary key in table ${hoodieCatalogTable.table.identifier}, cannot execute update operator")
val enableHive = isEnableHive(sparkSession)
withSparkConf(sparkSession, catalogProperties) {
Map(
"path" -> hoodieCatalogTable.tableLocation,
RECORDKEY_FIELD.key -> hoodieCatalogTable.primaryKeys.mkString(","),
PRECOMBINE_FIELD.key -> preCombineField,
TBL_NAME.key -> hoodieCatalogTable.tableName,
HIVE_STYLE_PARTITIONING.key -> tableConfig.getHiveStylePartitioningEnable,
URL_ENCODE_PARTITIONING.key -> tableConfig.getUrlEncodePartitioning,
KEYGENERATOR_CLASS_NAME.key -> classOf[SqlKeyGenerator].getCanonicalName,
SqlKeyGenerator.ORIGIN_KEYGEN_CLASS_NAME -> tableConfig.getKeyGeneratorClassName,
OPERATION.key -> UPSERT_OPERATION_OPT_VAL,
PARTITIONPATH_FIELD.key -> tableConfig.getPartitionFieldProp,
META_SYNC_ENABLED.key -> enableHive.toString,
HIVE_SYNC_MODE.key -> HiveSyncMode.HMS.name(),
HIVE_USE_JDBC.key -> "false",
HIVE_DATABASE.key -> tableId.database.getOrElse("default"),
HIVE_TABLE.key -> tableId.table,
HIVE_PARTITION_FIELDS.key -> tableConfig.getPartitionFieldProp,
HIVE_PARTITION_EXTRACTOR_CLASS.key -> classOf[MultiPartKeysValueExtractor].getCanonicalName,
HIVE_SUPPORT_TIMESTAMP_TYPE.key -> "true",
HoodieWriteConfig.UPSERT_PARALLELISM_VALUE.key -> "200",
SqlKeyGenerator.PARTITION_SCHEMA -> hoodieCatalogTable.partitionSchema.toDDL
)
}
}
/**
* Build the default config for insert.
* @return
*/
def buildHoodieInsertConfig(hoodieCatalogTable: HoodieCatalogTable,
sparkSession: SparkSession,
isOverwrite: Boolean,
insertPartitions: Map[String, Option[String]] = Map.empty,
extraOptions: Map[String, String]): Map[String, String] = {
if (insertPartitions.nonEmpty &&
(insertPartitions.keys.toSet != hoodieCatalogTable.partitionFields.toSet)) {
throw new IllegalArgumentException(s"Insert partition fields" +
s"[${insertPartitions.keys.mkString(" " )}]" +
s" not equal to the defined partition in table[${hoodieCatalogTable.partitionFields.mkString(",")}]")
}
val path = hoodieCatalogTable.tableLocation
val tableType = hoodieCatalogTable.tableTypeName
val tableConfig = hoodieCatalogTable.tableConfig
val tableSchema = hoodieCatalogTable.tableSchema
val options = hoodieCatalogTable.catalogProperties ++ tableConfig.getProps.asScala.toMap ++ extraOptions
val parameters = withSparkConf(sparkSession, options)()
val preCombineColumn = hoodieCatalogTable.preCombineKey.getOrElse("")
val partitionFields = hoodieCatalogTable.partitionFields.mkString(",")
val hiveStylePartitioningEnable = Option(tableConfig.getHiveStylePartitioningEnable).getOrElse("true")
val urlEncodePartitioning = Option(tableConfig.getUrlEncodePartitioning).getOrElse("false")
val keyGeneratorClassName = Option(tableConfig.getKeyGeneratorClassName)
.getOrElse(classOf[ComplexKeyGenerator].getCanonicalName)
val enableBulkInsert = parameters.getOrElse(DataSourceWriteOptions.SQL_ENABLE_BULK_INSERT.key,
DataSourceWriteOptions.SQL_ENABLE_BULK_INSERT.defaultValue()).toBoolean
val dropDuplicate = sparkSession.conf
.getOption(INSERT_DROP_DUPS.key).getOrElse(INSERT_DROP_DUPS.defaultValue).toBoolean
val insertMode = InsertMode.of(parameters.getOrElse(DataSourceWriteOptions.SQL_INSERT_MODE.key,
DataSourceWriteOptions.SQL_INSERT_MODE.defaultValue()))
val isNonStrictMode = insertMode == InsertMode.NON_STRICT
val isPartitionedTable = hoodieCatalogTable.partitionFields.nonEmpty
val hasPrecombineColumn = preCombineColumn.nonEmpty
val operation =
(enableBulkInsert, isOverwrite, dropDuplicate, isNonStrictMode, isPartitionedTable) match {
case (true, _, _, false, _) =>
throw new IllegalArgumentException(s"Table with primaryKey can not use bulk insert in ${insertMode.value()} mode.")
case (true, true, _, _, true) =>
throw new IllegalArgumentException(s"Insert Overwrite Partition can not use bulk insert.")
case (true, _, true, _, _) =>
throw new IllegalArgumentException(s"Bulk insert cannot support drop duplication." +
s" Please disable $INSERT_DROP_DUPS and try again.")
// if enableBulkInsert is true, use bulk insert for the insert overwrite non-partitioned table.
case (true, true, _, _, false) => BULK_INSERT_OPERATION_OPT_VAL
// insert overwrite table
case (false, true, _, _, false) => INSERT_OVERWRITE_TABLE_OPERATION_OPT_VAL
// insert overwrite partition
case (_, true, _, _, true) => INSERT_OVERWRITE_OPERATION_OPT_VAL
// disable dropDuplicate, and provide preCombineKey, use the upsert operation for strict and upsert mode.
case (false, false, false, false, _) if hasPrecombineColumn => UPSERT_OPERATION_OPT_VAL
// if table is pk table and has enableBulkInsert use bulk insert for non-strict mode.
case (true, _, _, true, _) => BULK_INSERT_OPERATION_OPT_VAL
// for the rest case, use the insert operation
case _ => INSERT_OPERATION_OPT_VAL
}
val payloadClassName = if (operation == UPSERT_OPERATION_OPT_VAL &&
tableType == COW_TABLE_TYPE_OPT_VAL && insertMode == InsertMode.STRICT) {
// Only validate duplicate key for COW, for MOR it will do the merge with the DefaultHoodieRecordPayload
// on reading.
classOf[ValidateDuplicateKeyPayload].getCanonicalName
} else {
classOf[OverwriteWithLatestAvroPayload].getCanonicalName
}
logInfo(s"insert statement use write operation type: $operation, payloadClass: $payloadClassName")
val enableHive = isEnableHive(sparkSession)
withSparkConf(sparkSession, options) {
Map(
"path" -> path,
TABLE_TYPE.key -> tableType,
TBL_NAME.key -> hoodieCatalogTable.tableName,
PRECOMBINE_FIELD.key -> preCombineColumn,
OPERATION.key -> operation,
HIVE_STYLE_PARTITIONING.key -> hiveStylePartitioningEnable,
URL_ENCODE_PARTITIONING.key -> urlEncodePartitioning,
KEYGENERATOR_CLASS_NAME.key -> classOf[SqlKeyGenerator].getCanonicalName,
SqlKeyGenerator.ORIGIN_KEYGEN_CLASS_NAME -> keyGeneratorClassName,
RECORDKEY_FIELD.key -> hoodieCatalogTable.primaryKeys.mkString(","),
PARTITIONPATH_FIELD.key -> partitionFields,
PAYLOAD_CLASS_NAME.key -> payloadClassName,
ENABLE_ROW_WRITER.key -> enableBulkInsert.toString,
HoodieWriteConfig.COMBINE_BEFORE_INSERT.key -> String.valueOf(hasPrecombineColumn),
META_SYNC_ENABLED.key -> enableHive.toString,
HIVE_SYNC_MODE.key -> HiveSyncMode.HMS.name(),
HIVE_USE_JDBC.key -> "false",
HIVE_DATABASE.key -> hoodieCatalogTable.table.identifier.database.getOrElse("default"),
HIVE_TABLE.key -> hoodieCatalogTable.table.identifier.table,
HIVE_SUPPORT_TIMESTAMP_TYPE.key -> "true",
HIVE_PARTITION_FIELDS.key -> partitionFields,
HIVE_PARTITION_EXTRACTOR_CLASS.key -> classOf[MultiPartKeysValueExtractor].getCanonicalName,
HoodieWriteConfig.INSERT_PARALLELISM_VALUE.key -> "200",
HoodieWriteConfig.UPSERT_PARALLELISM_VALUE.key -> "200",
SqlKeyGenerator.PARTITION_SCHEMA -> hoodieCatalogTable.partitionSchema.toDDL
)
}
}
}

View File

@@ -0,0 +1,23 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.hudi.catalog;
public enum TableCreationMode {
CREATE, CREATE_OR_REPLACE, STAGE_CREATE, STAGE_REPLACE
}