diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/AlterHoodieTableAddColumnsCommand.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/AlterHoodieTableAddColumnsCommand.scala index b513034bd..c5a6f845a 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/AlterHoodieTableAddColumnsCommand.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/AlterHoodieTableAddColumnsCommand.scala @@ -18,7 +18,6 @@ package org.apache.spark.sql.hudi.command import java.nio.charset.StandardCharsets - import org.apache.avro.Schema import org.apache.hudi.common.model.{HoodieCommitMetadata, WriteOperationType} import org.apache.hudi.common.table.HoodieTableMetaClient @@ -30,10 +29,11 @@ import org.apache.hudi.table.HoodieSparkTable import scala.collection.JavaConverters._ import org.apache.hudi.{AvroConversionUtils, DataSourceUtils, HoodieWriterUtils} import org.apache.spark.api.java.JavaSparkContext -import org.apache.spark.sql.{Row, SparkSession} +import org.apache.spark.sql.{AnalysisException, Row, SparkSession} import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.catalog.CatalogTable import org.apache.spark.sql.execution.command.{DDLUtils, RunnableCommand} +import org.apache.spark.sql.hudi.HoodieSqlUtils import org.apache.spark.sql.hudi.HoodieSqlUtils.getTableLocation import org.apache.spark.sql.types.{StructField, StructType} import org.apache.spark.sql.util.SchemaUtils @@ -50,7 +50,15 @@ case class AlterHoodieTableAddColumnsCommand( override def run(sparkSession: SparkSession): Seq[Row] = { if (colsToAdd.nonEmpty) { + val resolver = sparkSession.sessionState.conf.resolver val table = sparkSession.sessionState.catalog.getTableMetadata(tableId) + val existsColumns = + colsToAdd.map(_.name).filter(col => table.schema.fieldNames.exists(f => resolver(f, col))) + + if (existsColumns.nonEmpty) { + throw new AnalysisException(s"Columns: [${existsColumns.mkString(",")}] already exists in the table," + + s" table columns is: [${HoodieSqlUtils.removeMetaFields(table.schema).fieldNames.mkString(",")}]") + } // Get the new schema val newSqlSchema = StructType(table.schema.fields ++ colsToAdd) val (structName, nameSpace) = AvroConversionUtils.getAvroRecordNameAndNamespace(tableId.table) @@ -60,7 +68,8 @@ case class AlterHoodieTableAddColumnsCommand( AlterHoodieTableAddColumnsCommand.commitWithSchema(newSchema, table, sparkSession) // Refresh the new schema to meta - refreshSchemaInMeta(sparkSession, table, newSqlSchema) + val newDataSchema = StructType(table.dataSchema.fields ++ colsToAdd) + refreshSchemaInMeta(sparkSession, table, newDataSchema) } Seq.empty[Row] } diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/AlterHoodieTableChangeColumnCommand.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/AlterHoodieTableChangeColumnCommand.scala index 78334fdfa..d38b98c7a 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/AlterHoodieTableChangeColumnCommand.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/AlterHoodieTableChangeColumnCommand.scala @@ -49,13 +49,21 @@ case class AlterHoodieTableChangeColumnCommand( } // Get the new schema val newSqlSchema = StructType( - table.dataSchema.fields.map { field => + table.schema.fields.map { field => if (resolver(field.name, columnName)) { newColumn } else { field } }) + val newDataSchema = StructType( + table.dataSchema.fields.map { field => + if (resolver(field.name, columnName)) { + newColumn + } else { + field + } + }) val (structName, nameSpace) = AvroConversionUtils.getAvroRecordNameAndNamespace(tableName.table) val newSchema = AvroConversionUtils.convertStructTypeToAvroSchema(newSqlSchema, structName, nameSpace) @@ -76,8 +84,8 @@ case class AlterHoodieTableChangeColumnCommand( log.warn(s"Exception when attempting to uncache table ${tableName.quotedString}", e) } sparkSession.catalog.refreshTable(tableName.unquotedString) - // Change the schema in the meta - catalog.alterTableDataSchema(tableName, newSqlSchema) + // Change the schema in the meta using new data schema. + catalog.alterTableDataSchema(tableName, newDataSchema) Seq.empty[Row] } diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestAlterTable.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestAlterTable.scala index ee73823b0..e1bc4a1f7 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestAlterTable.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestAlterTable.scala @@ -115,6 +115,36 @@ class TestAlterTable extends TestHoodieSqlBase { checkAnswer(s"select id, name, price, ts, ext0 from $newTableName")( Seq(2, "a2", 10.0, 1000, null) ) + + val partitionedTable = generateTableName + spark.sql( + s""" + |create table $partitionedTable ( + | id int, + | name string, + | price double, + | ts long, + | dt string + |) using hudi + | location '${tmp.getCanonicalPath}/$partitionedTable' + | options ( + | type = '$tableType', + | primaryKey = 'id', + | preCombineField = 'ts' + | ) + | partitioned by (dt) + """.stripMargin) + spark.sql(s"insert into $partitionedTable values(1, 'a1', 10, 1000, '2021-07-25')") + spark.sql(s"alter table $partitionedTable add columns(ext0 double)") + checkAnswer(s"select id, name, price, ts, dt, ext0 from $partitionedTable")( + Seq(1, "a1", 10.0, 1000, "2021-07-25", null) + ) + + spark.sql(s"insert into $partitionedTable values(2, 'a2', 10, 1000, 1, '2021-07-25')"); + checkAnswer(s"select id, name, price, ts, dt, ext0 from $partitionedTable order by id")( + Seq(1, "a1", 10.0, 1000, "2021-07-25", null), + Seq(2, "a2", 10.0, 1000, "2021-07-25", 1.0) + ) } } }