diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodieAnalysis.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodieAnalysis.scala index ed7799025..bf7b04607 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodieAnalysis.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodieAnalysis.scala @@ -29,11 +29,11 @@ import org.apache.spark.sql.catalyst.expressions.{Alias, Expression, Literal, Na import org.apache.spark.sql.catalyst.plans.Inner import org.apache.spark.sql.catalyst.plans.logical.{Assignment, DeleteAction, DeleteFromTable, InsertAction, LogicalPlan, MergeIntoTable, Project, UpdateAction, UpdateTable} import org.apache.spark.sql.catalyst.rules.Rule -import org.apache.spark.sql.execution.command.CreateDataSourceTableCommand +import org.apache.spark.sql.execution.command.{AlterTableAddColumnsCommand, AlterTableChangeColumnCommand, AlterTableRenameCommand, CreateDataSourceTableCommand} import org.apache.spark.sql.execution.datasources.{CreateTable, LogicalRelation} import org.apache.spark.sql.hudi.HoodieSqlUtils import org.apache.spark.sql.hudi.HoodieSqlUtils._ -import org.apache.spark.sql.hudi.command.{CreateHoodieTableAsSelectCommand, CreateHoodieTableCommand, DeleteHoodieTableCommand, InsertIntoHoodieTableCommand, MergeIntoHoodieTableCommand, UpdateHoodieTableCommand} +import org.apache.spark.sql.hudi.command.{AlterHoodieTableAddColumnsCommand, AlterHoodieTableChangeColumnCommand, AlterHoodieTableRenameCommand, CreateHoodieTableAsSelectCommand, CreateHoodieTableCommand, DeleteHoodieTableCommand, InsertIntoHoodieTableCommand, MergeIntoHoodieTableCommand, UpdateHoodieTableCommand} import org.apache.spark.sql.types.StringType object HoodieAnalysis { @@ -86,6 +86,7 @@ case class HoodieAnalysis(sparkSession: SparkSession) extends Rule[LogicalPlan] case CreateTable(table, mode, Some(query)) if query.resolved && isHoodieTable(table) => CreateHoodieTableAsSelectCommand(table, mode, query) + case _=> plan } } @@ -307,6 +308,18 @@ case class HoodiePostAnalysisRule(sparkSession: SparkSession) extends Rule[Logic case CreateDataSourceTableCommand(table, ignoreIfExists) if isHoodieTable(table) => CreateHoodieTableCommand(table, ignoreIfExists) + // Rewrite the AlterTableAddColumnsCommand to AlterHoodieTableAddColumnsCommand + case AlterTableAddColumnsCommand(tableId, colsToAdd) + if isHoodieTable(tableId, sparkSession) => + AlterHoodieTableAddColumnsCommand(tableId, colsToAdd) + // Rewrite the AlterTableRenameCommand to AlterHoodieTableRenameCommand + case AlterTableRenameCommand(oldName, newName, isView) + if !isView && isHoodieTable(oldName, sparkSession) => + new AlterHoodieTableRenameCommand(oldName, newName, isView) + // Rewrite the AlterTableChangeColumnCommand to AlterHoodieTableChangeColumnCommand + case AlterTableChangeColumnCommand(tableName, columnName, newColumn) + if isHoodieTable(tableName, sparkSession) => + AlterHoodieTableChangeColumnCommand(tableName, columnName, newColumn) case _ => plan } } diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/AlterHoodieTableAddColumnsCommand.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/AlterHoodieTableAddColumnsCommand.scala new file mode 100644 index 000000000..b513034bd --- /dev/null +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/AlterHoodieTableAddColumnsCommand.scala @@ -0,0 +1,119 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hudi.command + +import java.nio.charset.StandardCharsets + +import org.apache.avro.Schema +import org.apache.hudi.common.model.{HoodieCommitMetadata, WriteOperationType} +import org.apache.hudi.common.table.HoodieTableMetaClient +import org.apache.hudi.common.table.timeline.HoodieInstant.State +import org.apache.hudi.common.table.timeline.{HoodieActiveTimeline, HoodieInstant} +import org.apache.hudi.common.util.{CommitUtils, Option} +import org.apache.hudi.table.HoodieSparkTable + +import scala.collection.JavaConverters._ +import org.apache.hudi.{AvroConversionUtils, DataSourceUtils, HoodieWriterUtils} +import org.apache.spark.api.java.JavaSparkContext +import org.apache.spark.sql.{Row, SparkSession} +import org.apache.spark.sql.catalyst.TableIdentifier +import org.apache.spark.sql.catalyst.catalog.CatalogTable +import org.apache.spark.sql.execution.command.{DDLUtils, RunnableCommand} +import org.apache.spark.sql.hudi.HoodieSqlUtils.getTableLocation +import org.apache.spark.sql.types.{StructField, StructType} +import org.apache.spark.sql.util.SchemaUtils + +import scala.util.control.NonFatal + +/** + * Command for add new columns to the hudi table. + */ +case class AlterHoodieTableAddColumnsCommand( + tableId: TableIdentifier, + colsToAdd: Seq[StructField]) + extends RunnableCommand { + + override def run(sparkSession: SparkSession): Seq[Row] = { + if (colsToAdd.nonEmpty) { + val table = sparkSession.sessionState.catalog.getTableMetadata(tableId) + // Get the new schema + val newSqlSchema = StructType(table.schema.fields ++ colsToAdd) + val (structName, nameSpace) = AvroConversionUtils.getAvroRecordNameAndNamespace(tableId.table) + val newSchema = AvroConversionUtils.convertStructTypeToAvroSchema(newSqlSchema, structName, nameSpace) + + // Commit with new schema to change the table schema + AlterHoodieTableAddColumnsCommand.commitWithSchema(newSchema, table, sparkSession) + + // Refresh the new schema to meta + refreshSchemaInMeta(sparkSession, table, newSqlSchema) + } + Seq.empty[Row] + } + + private def refreshSchemaInMeta(sparkSession: SparkSession, table: CatalogTable, + newSqlSchema: StructType): Unit = { + try { + sparkSession.catalog.uncacheTable(tableId.quotedString) + } catch { + case NonFatal(e) => + log.warn(s"Exception when attempting to uncache table ${tableId.quotedString}", e) + } + sparkSession.catalog.refreshTable(table.identifier.unquotedString) + + SchemaUtils.checkColumnNameDuplication( + newSqlSchema.map(_.name), + "in the table definition of " + table.identifier, + conf.caseSensitiveAnalysis) + DDLUtils.checkDataColNames(table, colsToAdd.map(_.name)) + + sparkSession.sessionState.catalog.alterTableDataSchema(tableId, newSqlSchema) + } +} + +object AlterHoodieTableAddColumnsCommand { + /** + * Generate an empty commit with new schema to change the table's schema. + * @param schema The new schema to commit. + * @param table The hoodie table. + * @param sparkSession The spark session. + */ + def commitWithSchema(schema: Schema, table: CatalogTable, sparkSession: SparkSession): Unit = { + val path = getTableLocation(table, sparkSession) + .getOrElse(s"missing location for ${table.identifier}") + + val jsc = new JavaSparkContext(sparkSession.sparkContext) + val client = DataSourceUtils.createHoodieClient(jsc, schema.toString, + path, table.identifier.table, HoodieWriterUtils.parametersWithWriteDefaults(table.storage.properties).asJava) + + val hadoopConf = sparkSession.sessionState.newHadoopConf() + val metaClient = HoodieTableMetaClient.builder().setBasePath(path).setConf(hadoopConf).build() + + val commitActionType = CommitUtils.getCommitActionType(WriteOperationType.INSERT, metaClient.getTableType) + val instantTime = HoodieActiveTimeline.createNewInstantTime + client.startCommitWithTime(instantTime, commitActionType) + + val hoodieTable = HoodieSparkTable.create(client.getConfig, client.getEngineContext) + val timeLine = hoodieTable.getActiveTimeline + val requested = new HoodieInstant(State.REQUESTED, commitActionType, instantTime) + val metadata = new HoodieCommitMetadata + metadata.setOperationType(WriteOperationType.INSERT) + timeLine.transitionRequestedToInflight(requested, Option.of(metadata.toJsonString.getBytes(StandardCharsets.UTF_8))) + + client.commit(instantTime, jsc.emptyRDD) + } +} diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/AlterHoodieTableChangeColumnCommand.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/AlterHoodieTableChangeColumnCommand.scala new file mode 100644 index 000000000..78334fdfa --- /dev/null +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/AlterHoodieTableChangeColumnCommand.scala @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hudi.command + +import org.apache.avro.Schema +import org.apache.hudi.AvroConversionUtils +import org.apache.hudi.avro.HoodieAvroUtils +import org.apache.hudi.common.table.{HoodieTableMetaClient, TableSchemaResolver} +import org.apache.hudi.exception.HoodieException +import org.apache.spark.sql.{AnalysisException, Row, SparkSession} +import org.apache.spark.sql.catalyst.TableIdentifier +import org.apache.spark.sql.execution.command.RunnableCommand +import org.apache.spark.sql.hudi.HoodieSqlUtils.getTableLocation +import org.apache.spark.sql.types.{StructField, StructType} + +import scala.util.control.NonFatal + +/** + * Command for alter hudi table's column type. + */ +case class AlterHoodieTableChangeColumnCommand( + tableName: TableIdentifier, + columnName: String, + newColumn: StructField) + extends RunnableCommand { + + override def run(sparkSession: SparkSession): Seq[Row] = { + val catalog = sparkSession.sessionState.catalog + val table = catalog.getTableMetadata(tableName) + val resolver = sparkSession.sessionState.conf.resolver + + if (!resolver(columnName, newColumn.name)) { + throw new AnalysisException(s"Can not support change column name for hudi table currently.") + } + // Get the new schema + val newSqlSchema = StructType( + table.dataSchema.fields.map { field => + if (resolver(field.name, columnName)) { + newColumn + } else { + field + } + }) + val (structName, nameSpace) = AvroConversionUtils.getAvroRecordNameAndNamespace(tableName.table) + val newSchema = AvroConversionUtils.convertStructTypeToAvroSchema(newSqlSchema, structName, nameSpace) + + val path = getTableLocation(table, sparkSession) + .getOrElse(s"missing location for ${table.identifier}") + val hadoopConf = sparkSession.sessionState.newHadoopConf() + val metaClient = HoodieTableMetaClient.builder().setBasePath(path) + .setConf(hadoopConf).build() + // Validate the compatibility between new schema and origin schema. + validateSchema(newSchema, metaClient) + // Commit new schema to change the table schema + AlterHoodieTableAddColumnsCommand.commitWithSchema(newSchema, table, sparkSession) + + try { + sparkSession.catalog.uncacheTable(tableName.quotedString) + } catch { + case NonFatal(e) => + log.warn(s"Exception when attempting to uncache table ${tableName.quotedString}", e) + } + sparkSession.catalog.refreshTable(tableName.unquotedString) + // Change the schema in the meta + catalog.alterTableDataSchema(tableName, newSqlSchema) + + Seq.empty[Row] + } + + private def validateSchema(newSchema: Schema, metaClient: HoodieTableMetaClient): Unit = { + val schemaUtil = new TableSchemaResolver(metaClient) + val tableSchema = HoodieAvroUtils.createHoodieWriteSchema(schemaUtil.getTableAvroSchemaWithoutMetadataFields) + if (!TableSchemaResolver.isSchemaCompatible(tableSchema, newSchema)) { + throw new HoodieException("Failed schema compatibility check for newSchema :" + newSchema + + ", origin table schema :" + tableSchema + ", base path :" + metaClient.getBasePath) + } + } +} diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/AlterHoodieTableRenameCommand.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/AlterHoodieTableRenameCommand.scala new file mode 100644 index 000000000..2afef51a9 --- /dev/null +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/AlterHoodieTableRenameCommand.scala @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hudi.command + +import org.apache.hudi.common.table.HoodieTableMetaClient +import org.apache.spark.sql.{Row, SparkSession} +import org.apache.spark.sql.catalyst.TableIdentifier +import org.apache.spark.sql.execution.command.AlterTableRenameCommand +import org.apache.spark.sql.hudi.HoodieSqlUtils.getTableLocation + +/** + * Command for alter hudi table's table name. + */ +class AlterHoodieTableRenameCommand( + oldName: TableIdentifier, + newName: TableIdentifier, + isView: Boolean) + extends AlterTableRenameCommand(oldName, newName, isView) { + + override def run(sparkSession: SparkSession): Seq[Row] = { + if (newName != oldName) { + val catalog = sparkSession.sessionState.catalog + val table = catalog.getTableMetadata(oldName) + val path = getTableLocation(table, sparkSession) + .getOrElse(s"missing location for ${table.identifier}") + + val hadoopConf = sparkSession.sessionState.newHadoopConf() + val metaClient = HoodieTableMetaClient.builder().setBasePath(path) + .setConf(hadoopConf).build() + // Init table with new name. + HoodieTableMetaClient.withPropertyBuilder() + .fromProperties(metaClient.getTableConfig.getProperties) + .setTableName(newName.table) + .initTable(hadoopConf, path) + // Call AlterTableRenameCommand#run to rename table in meta. + super.run(sparkSession) + } + Seq.empty[Row] + } +} diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/InsertIntoHoodieTableCommand.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/InsertIntoHoodieTableCommand.scala index 04da5229f..2ad9a6838 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/InsertIntoHoodieTableCommand.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/InsertIntoHoodieTableCommand.scala @@ -32,7 +32,7 @@ import org.apache.hudi.{HoodieSparkSqlWriter, HoodieWriterUtils} import org.apache.spark.sql.catalyst.catalog.CatalogTable import org.apache.spark.sql.catalyst.expressions.{Alias, Literal} import org.apache.spark.sql.{Column, DataFrame, Dataset, Row, SaveMode, SparkSession} -import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan +import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project} import org.apache.spark.sql.execution.command.RunnableCommand import org.apache.spark.sql.execution.datasources.LogicalRelation import org.apache.spark.sql.hudi.{HoodieOptionConfig, HoodieSqlUtils} @@ -86,11 +86,16 @@ object InsertIntoHoodieTableCommand { SaveMode.Append } val parameters = HoodieWriterUtils.parametersWithWriteDefaults(config) - val queryData = Dataset.ofRows(sparkSession, query) val conf = sparkSession.sessionState.conf - val alignedQuery = alignOutputFields(queryData, table, insertPartitions, conf) + val alignedQuery = alignOutputFields(query, table, insertPartitions, conf) + // If we create dataframe using the Dataset.ofRows(sparkSession, alignedQuery), + // The nullable attribute of fields will lost. + // In order to pass the nullable attribute to the inputDF, we specify the schema + // of the rdd. + val inputDF = sparkSession.createDataFrame( + Dataset.ofRows(sparkSession, alignedQuery).rdd, alignedQuery.schema) val success = - HoodieSparkSqlWriter.write(sparkSession.sqlContext, mode, parameters, alignedQuery)._1 + HoodieSparkSqlWriter.write(sparkSession.sqlContext, mode, parameters, inputDF)._1 if (success) { if (refreshTable) { sparkSession.catalog.refreshTable(table.identifier.unquotedString) @@ -110,10 +115,10 @@ object InsertIntoHoodieTableCommand { * @return */ private def alignOutputFields( - query: DataFrame, + query: LogicalPlan, table: CatalogTable, insertPartitions: Map[String, Option[String]], - conf: SQLConf): DataFrame = { + conf: SQLConf): LogicalPlan = { val targetPartitionSchema = table.partitionSchema @@ -124,17 +129,17 @@ object InsertIntoHoodieTableCommand { s"is: ${staticPartitionValues.mkString("," + "")}") val queryDataFields = if (staticPartitionValues.isEmpty) { // insert dynamic partition - query.logicalPlan.output.dropRight(targetPartitionSchema.fields.length) + query.output.dropRight(targetPartitionSchema.fields.length) } else { // insert static partition - query.logicalPlan.output + query.output } val targetDataSchema = table.dataSchema // Align for the data fields of the query val dataProjects = queryDataFields.zip(targetDataSchema.fields).map { case (dataAttr, targetField) => - val castAttr = castIfNeeded(dataAttr, + val castAttr = castIfNeeded(dataAttr.withNullability(targetField.nullable), targetField.dataType, conf) - new Column(Alias(castAttr, targetField.name)()) + Alias(castAttr, targetField.name)() } val partitionProjects = if (staticPartitionValues.isEmpty) { // insert dynamic partitions @@ -142,23 +147,23 @@ object InsertIntoHoodieTableCommand { // So we init the partitionAttrPosition with the data schema size. var partitionAttrPosition = targetDataSchema.size targetPartitionSchema.fields.map(f => { - val partitionAttr = query.logicalPlan.output(partitionAttrPosition) + val partitionAttr = query.output(partitionAttrPosition) partitionAttrPosition = partitionAttrPosition + 1 - val castAttr = castIfNeeded(partitionAttr, f.dataType, conf) - new Column(Alias(castAttr, f.name)()) + val castAttr = castIfNeeded(partitionAttr.withNullability(f.nullable), f.dataType, conf) + Alias(castAttr, f.name)() }) } else { // insert static partitions targetPartitionSchema.fields.map(f => { val staticPartitionValue = staticPartitionValues.getOrElse(f.name, s"Missing static partition value for: ${f.name}") val castAttr = Literal.create(staticPartitionValue, f.dataType) - new Column(Alias(castAttr, f.name)()) + Alias(castAttr, f.name)() }) } // Remove the hoodie meta fileds from the projects as we do not need these to write - val withoutMetaFieldDataProjects = dataProjects.filter(c => !HoodieSqlUtils.isMetaField(c.named.name)) + val withoutMetaFieldDataProjects = dataProjects.filter(c => !HoodieSqlUtils.isMetaField(c.name)) val alignedProjects = withoutMetaFieldDataProjects ++ partitionProjects - query.select(alignedProjects: _*) + Project(alignedProjects, query) } /** diff --git a/hudi-spark-datasource/hudi-spark/src/test/resources/sql-statements.sql b/hudi-spark-datasource/hudi-spark/src/test/resources/sql-statements.sql index 35dde250a..280fde59f 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/resources/sql-statements.sql +++ b/hudi-spark-datasource/hudi-spark/src/test/resources/sql-statements.sql @@ -230,6 +230,20 @@ select id, name, price, ts, dt from h1_p order by id; | 6 _insert 10.0 1000 2021-05-08 | +--------------------------------+ +# ALTER TABLE +alter table h1_p rename to h2_p; ++----------+ +| ok | ++----------+ +alter table h2_p add columns(ext0 int); ++----------+ +| ok | ++----------+ +alter table h2_p change column ext0 ext0 bigint; ++----------+ +| ok | ++----------+ + # DROP TABLE drop table h0; +----------+ @@ -246,7 +260,7 @@ drop table h1; | ok | +----------+ -drop table h1_p; +drop table h2_p; +----------+ | ok | +----------+ diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestAlterTable.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestAlterTable.scala new file mode 100644 index 000000000..ee73823b0 --- /dev/null +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestAlterTable.scala @@ -0,0 +1,121 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hudi + +import org.apache.hudi.common.table.HoodieTableMetaClient +import org.apache.spark.sql.catalyst.TableIdentifier +import org.apache.spark.sql.types.{LongType, StructField, StructType} + +class TestAlterTable extends TestHoodieSqlBase { + + test("Test Alter Table") { + withTempDir { tmp => + Seq("cow", "mor").foreach { tableType => + val tableName = generateTableName + val tablePath = s"${tmp.getCanonicalPath}/$tableName" + // Create table + spark.sql( + s""" + |create table $tableName ( + | id int, + | name string, + | price double, + | ts long + |) using hudi + | location '$tablePath' + | options ( + | type = '$tableType', + | primaryKey = 'id', + | preCombineField = 'ts' + | ) + """.stripMargin) + // Alter table name. + val newTableName = s"${tableName}_1" + spark.sql(s"alter table $tableName rename to $newTableName") + assertResult(false)( + spark.sessionState.catalog.tableExists(new TableIdentifier(tableName)) + ) + assertResult(true) ( + spark.sessionState.catalog.tableExists(new TableIdentifier(newTableName)) + ) + val hadoopConf = spark.sessionState.newHadoopConf() + val metaClient = HoodieTableMetaClient.builder().setBasePath(tablePath) + .setConf(hadoopConf).build() + assertResult(newTableName) ( + metaClient.getTableConfig.getTableName + ) + spark.sql(s"insert into $newTableName values(1, 'a1', 10, 1000)") + + // Add table column + spark.sql(s"alter table $newTableName add columns(ext0 string)") + val table = spark.sessionState.catalog.getTableMetadata(new TableIdentifier(newTableName)) + assertResult(Seq("id", "name", "price", "ts", "ext0")) { + HoodieSqlUtils.removeMetaFields(table.schema).fields.map(_.name) + } + checkAnswer(s"select id, name, price, ts, ext0 from $newTableName")( + Seq(1, "a1", 10.0, 1000, null) + ) + // Alter table column type + spark.sql(s"alter table $newTableName change column id id bigint") + assertResult(StructType(Seq(StructField("id", LongType, nullable = true))))( + spark.sql(s"select id from $newTableName").schema) + + // Insert data to the new table. + spark.sql(s"insert into $newTableName values(2, 'a2', 12, 1000, 'e0')") + checkAnswer(s"select id, name, price, ts, ext0 from $newTableName")( + Seq(1, "a1", 10.0, 1000, null), + Seq(2, "a2", 12.0, 1000, "e0") + ) + + // Merge data to the new table. + spark.sql( + s""" + |merge into $newTableName t0 + |using ( + | select 1 as id, 'a1' as name, 12 as price, 1001 as ts, 'e0' as ext0 + |) s0 + |on t0.id = s0.id + |when matched then update set * + |when not matched then insert * + """.stripMargin) + checkAnswer(s"select id, name, price, ts, ext0 from $newTableName")( + Seq(1, "a1", 12.0, 1001, "e0"), + Seq(2, "a2", 12.0, 1000, "e0") + ) + + // Update data to the new table. + spark.sql(s"update $newTableName set price = 10, ext0 = null where id = 1") + checkAnswer(s"select id, name, price, ts, ext0 from $newTableName")( + Seq(1, "a1", 10.0, 1001, null), + Seq(2, "a2", 12.0, 1000, "e0") + ) + spark.sql(s"update $newTableName set price = 10, ext0 = null where id = 2") + checkAnswer(s"select id, name, price, ts, ext0 from $newTableName")( + Seq(1, "a1", 10.0, 1001, null), + Seq(2, "a2", 10.0, 1000, null) + ) + + // Delete data from the new table. + spark.sql(s"delete from $newTableName where id = 1") + checkAnswer(s"select id, name, price, ts, ext0 from $newTableName")( + Seq(2, "a2", 10.0, 1000, null) + ) + } + } + } +}