1
0

[HUDI-1776] Support AlterCommand For Hoodie (#3086)

This commit is contained in:
pengzhiwei
2021-06-21 22:58:43 +08:00
committed by GitHub
parent f8d9242372
commit 4fd8a88b7e
7 changed files with 439 additions and 19 deletions

View File

@@ -29,11 +29,11 @@ import org.apache.spark.sql.catalyst.expressions.{Alias, Expression, Literal, Na
import org.apache.spark.sql.catalyst.plans.Inner
import org.apache.spark.sql.catalyst.plans.logical.{Assignment, DeleteAction, DeleteFromTable, InsertAction, LogicalPlan, MergeIntoTable, Project, UpdateAction, UpdateTable}
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.execution.command.CreateDataSourceTableCommand
import org.apache.spark.sql.execution.command.{AlterTableAddColumnsCommand, AlterTableChangeColumnCommand, AlterTableRenameCommand, CreateDataSourceTableCommand}
import org.apache.spark.sql.execution.datasources.{CreateTable, LogicalRelation}
import org.apache.spark.sql.hudi.HoodieSqlUtils
import org.apache.spark.sql.hudi.HoodieSqlUtils._
import org.apache.spark.sql.hudi.command.{CreateHoodieTableAsSelectCommand, CreateHoodieTableCommand, DeleteHoodieTableCommand, InsertIntoHoodieTableCommand, MergeIntoHoodieTableCommand, UpdateHoodieTableCommand}
import org.apache.spark.sql.hudi.command.{AlterHoodieTableAddColumnsCommand, AlterHoodieTableChangeColumnCommand, AlterHoodieTableRenameCommand, CreateHoodieTableAsSelectCommand, CreateHoodieTableCommand, DeleteHoodieTableCommand, InsertIntoHoodieTableCommand, MergeIntoHoodieTableCommand, UpdateHoodieTableCommand}
import org.apache.spark.sql.types.StringType
object HoodieAnalysis {
@@ -86,6 +86,7 @@ case class HoodieAnalysis(sparkSession: SparkSession) extends Rule[LogicalPlan]
case CreateTable(table, mode, Some(query))
if query.resolved && isHoodieTable(table) =>
CreateHoodieTableAsSelectCommand(table, mode, query)
case _=> plan
}
}
@@ -307,6 +308,18 @@ case class HoodiePostAnalysisRule(sparkSession: SparkSession) extends Rule[Logic
case CreateDataSourceTableCommand(table, ignoreIfExists)
if isHoodieTable(table) =>
CreateHoodieTableCommand(table, ignoreIfExists)
// Rewrite the AlterTableAddColumnsCommand to AlterHoodieTableAddColumnsCommand
case AlterTableAddColumnsCommand(tableId, colsToAdd)
if isHoodieTable(tableId, sparkSession) =>
AlterHoodieTableAddColumnsCommand(tableId, colsToAdd)
// Rewrite the AlterTableRenameCommand to AlterHoodieTableRenameCommand
case AlterTableRenameCommand(oldName, newName, isView)
if !isView && isHoodieTable(oldName, sparkSession) =>
new AlterHoodieTableRenameCommand(oldName, newName, isView)
// Rewrite the AlterTableChangeColumnCommand to AlterHoodieTableChangeColumnCommand
case AlterTableChangeColumnCommand(tableName, columnName, newColumn)
if isHoodieTable(tableName, sparkSession) =>
AlterHoodieTableChangeColumnCommand(tableName, columnName, newColumn)
case _ => plan
}
}

View File

@@ -0,0 +1,119 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.hudi.command
import java.nio.charset.StandardCharsets
import org.apache.avro.Schema
import org.apache.hudi.common.model.{HoodieCommitMetadata, WriteOperationType}
import org.apache.hudi.common.table.HoodieTableMetaClient
import org.apache.hudi.common.table.timeline.HoodieInstant.State
import org.apache.hudi.common.table.timeline.{HoodieActiveTimeline, HoodieInstant}
import org.apache.hudi.common.util.{CommitUtils, Option}
import org.apache.hudi.table.HoodieSparkTable
import scala.collection.JavaConverters._
import org.apache.hudi.{AvroConversionUtils, DataSourceUtils, HoodieWriterUtils}
import org.apache.spark.api.java.JavaSparkContext
import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.catalog.CatalogTable
import org.apache.spark.sql.execution.command.{DDLUtils, RunnableCommand}
import org.apache.spark.sql.hudi.HoodieSqlUtils.getTableLocation
import org.apache.spark.sql.types.{StructField, StructType}
import org.apache.spark.sql.util.SchemaUtils
import scala.util.control.NonFatal
/**
* Command for add new columns to the hudi table.
*/
case class AlterHoodieTableAddColumnsCommand(
tableId: TableIdentifier,
colsToAdd: Seq[StructField])
extends RunnableCommand {
override def run(sparkSession: SparkSession): Seq[Row] = {
if (colsToAdd.nonEmpty) {
val table = sparkSession.sessionState.catalog.getTableMetadata(tableId)
// Get the new schema
val newSqlSchema = StructType(table.schema.fields ++ colsToAdd)
val (structName, nameSpace) = AvroConversionUtils.getAvroRecordNameAndNamespace(tableId.table)
val newSchema = AvroConversionUtils.convertStructTypeToAvroSchema(newSqlSchema, structName, nameSpace)
// Commit with new schema to change the table schema
AlterHoodieTableAddColumnsCommand.commitWithSchema(newSchema, table, sparkSession)
// Refresh the new schema to meta
refreshSchemaInMeta(sparkSession, table, newSqlSchema)
}
Seq.empty[Row]
}
private def refreshSchemaInMeta(sparkSession: SparkSession, table: CatalogTable,
newSqlSchema: StructType): Unit = {
try {
sparkSession.catalog.uncacheTable(tableId.quotedString)
} catch {
case NonFatal(e) =>
log.warn(s"Exception when attempting to uncache table ${tableId.quotedString}", e)
}
sparkSession.catalog.refreshTable(table.identifier.unquotedString)
SchemaUtils.checkColumnNameDuplication(
newSqlSchema.map(_.name),
"in the table definition of " + table.identifier,
conf.caseSensitiveAnalysis)
DDLUtils.checkDataColNames(table, colsToAdd.map(_.name))
sparkSession.sessionState.catalog.alterTableDataSchema(tableId, newSqlSchema)
}
}
object AlterHoodieTableAddColumnsCommand {
/**
* Generate an empty commit with new schema to change the table's schema.
* @param schema The new schema to commit.
* @param table The hoodie table.
* @param sparkSession The spark session.
*/
def commitWithSchema(schema: Schema, table: CatalogTable, sparkSession: SparkSession): Unit = {
val path = getTableLocation(table, sparkSession)
.getOrElse(s"missing location for ${table.identifier}")
val jsc = new JavaSparkContext(sparkSession.sparkContext)
val client = DataSourceUtils.createHoodieClient(jsc, schema.toString,
path, table.identifier.table, HoodieWriterUtils.parametersWithWriteDefaults(table.storage.properties).asJava)
val hadoopConf = sparkSession.sessionState.newHadoopConf()
val metaClient = HoodieTableMetaClient.builder().setBasePath(path).setConf(hadoopConf).build()
val commitActionType = CommitUtils.getCommitActionType(WriteOperationType.INSERT, metaClient.getTableType)
val instantTime = HoodieActiveTimeline.createNewInstantTime
client.startCommitWithTime(instantTime, commitActionType)
val hoodieTable = HoodieSparkTable.create(client.getConfig, client.getEngineContext)
val timeLine = hoodieTable.getActiveTimeline
val requested = new HoodieInstant(State.REQUESTED, commitActionType, instantTime)
val metadata = new HoodieCommitMetadata
metadata.setOperationType(WriteOperationType.INSERT)
timeLine.transitionRequestedToInflight(requested, Option.of(metadata.toJsonString.getBytes(StandardCharsets.UTF_8)))
client.commit(instantTime, jsc.emptyRDD)
}
}

View File

@@ -0,0 +1,93 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.hudi.command
import org.apache.avro.Schema
import org.apache.hudi.AvroConversionUtils
import org.apache.hudi.avro.HoodieAvroUtils
import org.apache.hudi.common.table.{HoodieTableMetaClient, TableSchemaResolver}
import org.apache.hudi.exception.HoodieException
import org.apache.spark.sql.{AnalysisException, Row, SparkSession}
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.execution.command.RunnableCommand
import org.apache.spark.sql.hudi.HoodieSqlUtils.getTableLocation
import org.apache.spark.sql.types.{StructField, StructType}
import scala.util.control.NonFatal
/**
* Command for alter hudi table's column type.
*/
case class AlterHoodieTableChangeColumnCommand(
tableName: TableIdentifier,
columnName: String,
newColumn: StructField)
extends RunnableCommand {
override def run(sparkSession: SparkSession): Seq[Row] = {
val catalog = sparkSession.sessionState.catalog
val table = catalog.getTableMetadata(tableName)
val resolver = sparkSession.sessionState.conf.resolver
if (!resolver(columnName, newColumn.name)) {
throw new AnalysisException(s"Can not support change column name for hudi table currently.")
}
// Get the new schema
val newSqlSchema = StructType(
table.dataSchema.fields.map { field =>
if (resolver(field.name, columnName)) {
newColumn
} else {
field
}
})
val (structName, nameSpace) = AvroConversionUtils.getAvroRecordNameAndNamespace(tableName.table)
val newSchema = AvroConversionUtils.convertStructTypeToAvroSchema(newSqlSchema, structName, nameSpace)
val path = getTableLocation(table, sparkSession)
.getOrElse(s"missing location for ${table.identifier}")
val hadoopConf = sparkSession.sessionState.newHadoopConf()
val metaClient = HoodieTableMetaClient.builder().setBasePath(path)
.setConf(hadoopConf).build()
// Validate the compatibility between new schema and origin schema.
validateSchema(newSchema, metaClient)
// Commit new schema to change the table schema
AlterHoodieTableAddColumnsCommand.commitWithSchema(newSchema, table, sparkSession)
try {
sparkSession.catalog.uncacheTable(tableName.quotedString)
} catch {
case NonFatal(e) =>
log.warn(s"Exception when attempting to uncache table ${tableName.quotedString}", e)
}
sparkSession.catalog.refreshTable(tableName.unquotedString)
// Change the schema in the meta
catalog.alterTableDataSchema(tableName, newSqlSchema)
Seq.empty[Row]
}
private def validateSchema(newSchema: Schema, metaClient: HoodieTableMetaClient): Unit = {
val schemaUtil = new TableSchemaResolver(metaClient)
val tableSchema = HoodieAvroUtils.createHoodieWriteSchema(schemaUtil.getTableAvroSchemaWithoutMetadataFields)
if (!TableSchemaResolver.isSchemaCompatible(tableSchema, newSchema)) {
throw new HoodieException("Failed schema compatibility check for newSchema :" + newSchema +
", origin table schema :" + tableSchema + ", base path :" + metaClient.getBasePath)
}
}
}

View File

@@ -0,0 +1,55 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.hudi.command
import org.apache.hudi.common.table.HoodieTableMetaClient
import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.execution.command.AlterTableRenameCommand
import org.apache.spark.sql.hudi.HoodieSqlUtils.getTableLocation
/**
* Command for alter hudi table's table name.
*/
class AlterHoodieTableRenameCommand(
oldName: TableIdentifier,
newName: TableIdentifier,
isView: Boolean)
extends AlterTableRenameCommand(oldName, newName, isView) {
override def run(sparkSession: SparkSession): Seq[Row] = {
if (newName != oldName) {
val catalog = sparkSession.sessionState.catalog
val table = catalog.getTableMetadata(oldName)
val path = getTableLocation(table, sparkSession)
.getOrElse(s"missing location for ${table.identifier}")
val hadoopConf = sparkSession.sessionState.newHadoopConf()
val metaClient = HoodieTableMetaClient.builder().setBasePath(path)
.setConf(hadoopConf).build()
// Init table with new name.
HoodieTableMetaClient.withPropertyBuilder()
.fromProperties(metaClient.getTableConfig.getProperties)
.setTableName(newName.table)
.initTable(hadoopConf, path)
// Call AlterTableRenameCommand#run to rename table in meta.
super.run(sparkSession)
}
Seq.empty[Row]
}
}

View File

@@ -32,7 +32,7 @@ import org.apache.hudi.{HoodieSparkSqlWriter, HoodieWriterUtils}
import org.apache.spark.sql.catalyst.catalog.CatalogTable
import org.apache.spark.sql.catalyst.expressions.{Alias, Literal}
import org.apache.spark.sql.{Column, DataFrame, Dataset, Row, SaveMode, SparkSession}
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project}
import org.apache.spark.sql.execution.command.RunnableCommand
import org.apache.spark.sql.execution.datasources.LogicalRelation
import org.apache.spark.sql.hudi.{HoodieOptionConfig, HoodieSqlUtils}
@@ -86,11 +86,16 @@ object InsertIntoHoodieTableCommand {
SaveMode.Append
}
val parameters = HoodieWriterUtils.parametersWithWriteDefaults(config)
val queryData = Dataset.ofRows(sparkSession, query)
val conf = sparkSession.sessionState.conf
val alignedQuery = alignOutputFields(queryData, table, insertPartitions, conf)
val alignedQuery = alignOutputFields(query, table, insertPartitions, conf)
// If we create dataframe using the Dataset.ofRows(sparkSession, alignedQuery),
// The nullable attribute of fields will lost.
// In order to pass the nullable attribute to the inputDF, we specify the schema
// of the rdd.
val inputDF = sparkSession.createDataFrame(
Dataset.ofRows(sparkSession, alignedQuery).rdd, alignedQuery.schema)
val success =
HoodieSparkSqlWriter.write(sparkSession.sqlContext, mode, parameters, alignedQuery)._1
HoodieSparkSqlWriter.write(sparkSession.sqlContext, mode, parameters, inputDF)._1
if (success) {
if (refreshTable) {
sparkSession.catalog.refreshTable(table.identifier.unquotedString)
@@ -110,10 +115,10 @@ object InsertIntoHoodieTableCommand {
* @return
*/
private def alignOutputFields(
query: DataFrame,
query: LogicalPlan,
table: CatalogTable,
insertPartitions: Map[String, Option[String]],
conf: SQLConf): DataFrame = {
conf: SQLConf): LogicalPlan = {
val targetPartitionSchema = table.partitionSchema
@@ -124,17 +129,17 @@ object InsertIntoHoodieTableCommand {
s"is: ${staticPartitionValues.mkString("," + "")}")
val queryDataFields = if (staticPartitionValues.isEmpty) { // insert dynamic partition
query.logicalPlan.output.dropRight(targetPartitionSchema.fields.length)
query.output.dropRight(targetPartitionSchema.fields.length)
} else { // insert static partition
query.logicalPlan.output
query.output
}
val targetDataSchema = table.dataSchema
// Align for the data fields of the query
val dataProjects = queryDataFields.zip(targetDataSchema.fields).map {
case (dataAttr, targetField) =>
val castAttr = castIfNeeded(dataAttr,
val castAttr = castIfNeeded(dataAttr.withNullability(targetField.nullable),
targetField.dataType, conf)
new Column(Alias(castAttr, targetField.name)())
Alias(castAttr, targetField.name)()
}
val partitionProjects = if (staticPartitionValues.isEmpty) { // insert dynamic partitions
@@ -142,23 +147,23 @@ object InsertIntoHoodieTableCommand {
// So we init the partitionAttrPosition with the data schema size.
var partitionAttrPosition = targetDataSchema.size
targetPartitionSchema.fields.map(f => {
val partitionAttr = query.logicalPlan.output(partitionAttrPosition)
val partitionAttr = query.output(partitionAttrPosition)
partitionAttrPosition = partitionAttrPosition + 1
val castAttr = castIfNeeded(partitionAttr, f.dataType, conf)
new Column(Alias(castAttr, f.name)())
val castAttr = castIfNeeded(partitionAttr.withNullability(f.nullable), f.dataType, conf)
Alias(castAttr, f.name)()
})
} else { // insert static partitions
targetPartitionSchema.fields.map(f => {
val staticPartitionValue = staticPartitionValues.getOrElse(f.name,
s"Missing static partition value for: ${f.name}")
val castAttr = Literal.create(staticPartitionValue, f.dataType)
new Column(Alias(castAttr, f.name)())
Alias(castAttr, f.name)()
})
}
// Remove the hoodie meta fileds from the projects as we do not need these to write
val withoutMetaFieldDataProjects = dataProjects.filter(c => !HoodieSqlUtils.isMetaField(c.named.name))
val withoutMetaFieldDataProjects = dataProjects.filter(c => !HoodieSqlUtils.isMetaField(c.name))
val alignedProjects = withoutMetaFieldDataProjects ++ partitionProjects
query.select(alignedProjects: _*)
Project(alignedProjects, query)
}
/**

View File

@@ -230,6 +230,20 @@ select id, name, price, ts, dt from h1_p order by id;
| 6 _insert 10.0 1000 2021-05-08 |
+--------------------------------+
# ALTER TABLE
alter table h1_p rename to h2_p;
+----------+
| ok |
+----------+
alter table h2_p add columns(ext0 int);
+----------+
| ok |
+----------+
alter table h2_p change column ext0 ext0 bigint;
+----------+
| ok |
+----------+
# DROP TABLE
drop table h0;
+----------+
@@ -246,7 +260,7 @@ drop table h1;
| ok |
+----------+
drop table h1_p;
drop table h2_p;
+----------+
| ok |
+----------+

View File

@@ -0,0 +1,121 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.hudi
import org.apache.hudi.common.table.HoodieTableMetaClient
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.types.{LongType, StructField, StructType}
class TestAlterTable extends TestHoodieSqlBase {
test("Test Alter Table") {
withTempDir { tmp =>
Seq("cow", "mor").foreach { tableType =>
val tableName = generateTableName
val tablePath = s"${tmp.getCanonicalPath}/$tableName"
// Create table
spark.sql(
s"""
|create table $tableName (
| id int,
| name string,
| price double,
| ts long
|) using hudi
| location '$tablePath'
| options (
| type = '$tableType',
| primaryKey = 'id',
| preCombineField = 'ts'
| )
""".stripMargin)
// Alter table name.
val newTableName = s"${tableName}_1"
spark.sql(s"alter table $tableName rename to $newTableName")
assertResult(false)(
spark.sessionState.catalog.tableExists(new TableIdentifier(tableName))
)
assertResult(true) (
spark.sessionState.catalog.tableExists(new TableIdentifier(newTableName))
)
val hadoopConf = spark.sessionState.newHadoopConf()
val metaClient = HoodieTableMetaClient.builder().setBasePath(tablePath)
.setConf(hadoopConf).build()
assertResult(newTableName) (
metaClient.getTableConfig.getTableName
)
spark.sql(s"insert into $newTableName values(1, 'a1', 10, 1000)")
// Add table column
spark.sql(s"alter table $newTableName add columns(ext0 string)")
val table = spark.sessionState.catalog.getTableMetadata(new TableIdentifier(newTableName))
assertResult(Seq("id", "name", "price", "ts", "ext0")) {
HoodieSqlUtils.removeMetaFields(table.schema).fields.map(_.name)
}
checkAnswer(s"select id, name, price, ts, ext0 from $newTableName")(
Seq(1, "a1", 10.0, 1000, null)
)
// Alter table column type
spark.sql(s"alter table $newTableName change column id id bigint")
assertResult(StructType(Seq(StructField("id", LongType, nullable = true))))(
spark.sql(s"select id from $newTableName").schema)
// Insert data to the new table.
spark.sql(s"insert into $newTableName values(2, 'a2', 12, 1000, 'e0')")
checkAnswer(s"select id, name, price, ts, ext0 from $newTableName")(
Seq(1, "a1", 10.0, 1000, null),
Seq(2, "a2", 12.0, 1000, "e0")
)
// Merge data to the new table.
spark.sql(
s"""
|merge into $newTableName t0
|using (
| select 1 as id, 'a1' as name, 12 as price, 1001 as ts, 'e0' as ext0
|) s0
|on t0.id = s0.id
|when matched then update set *
|when not matched then insert *
""".stripMargin)
checkAnswer(s"select id, name, price, ts, ext0 from $newTableName")(
Seq(1, "a1", 12.0, 1001, "e0"),
Seq(2, "a2", 12.0, 1000, "e0")
)
// Update data to the new table.
spark.sql(s"update $newTableName set price = 10, ext0 = null where id = 1")
checkAnswer(s"select id, name, price, ts, ext0 from $newTableName")(
Seq(1, "a1", 10.0, 1001, null),
Seq(2, "a2", 12.0, 1000, "e0")
)
spark.sql(s"update $newTableName set price = 10, ext0 = null where id = 2")
checkAnswer(s"select id, name, price, ts, ext0 from $newTableName")(
Seq(1, "a1", 10.0, 1001, null),
Seq(2, "a2", 10.0, 1000, null)
)
// Delete data from the new table.
spark.sql(s"delete from $newTableName where id = 1")
checkAnswer(s"select id, name, price, ts, ext0 from $newTableName")(
Seq(2, "a2", 10.0, 1000, null)
)
}
}
}
}