[HUDI-2223] Fix Alter Partitioned Table Failed (#3350)
This commit is contained in:
@@ -18,7 +18,6 @@
|
||||
package org.apache.spark.sql.hudi.command
|
||||
|
||||
import java.nio.charset.StandardCharsets
|
||||
|
||||
import org.apache.avro.Schema
|
||||
import org.apache.hudi.common.model.{HoodieCommitMetadata, WriteOperationType}
|
||||
import org.apache.hudi.common.table.HoodieTableMetaClient
|
||||
@@ -30,10 +29,11 @@ import org.apache.hudi.table.HoodieSparkTable
|
||||
import scala.collection.JavaConverters._
|
||||
import org.apache.hudi.{AvroConversionUtils, DataSourceUtils, HoodieWriterUtils}
|
||||
import org.apache.spark.api.java.JavaSparkContext
|
||||
import org.apache.spark.sql.{Row, SparkSession}
|
||||
import org.apache.spark.sql.{AnalysisException, Row, SparkSession}
|
||||
import org.apache.spark.sql.catalyst.TableIdentifier
|
||||
import org.apache.spark.sql.catalyst.catalog.CatalogTable
|
||||
import org.apache.spark.sql.execution.command.{DDLUtils, RunnableCommand}
|
||||
import org.apache.spark.sql.hudi.HoodieSqlUtils
|
||||
import org.apache.spark.sql.hudi.HoodieSqlUtils.getTableLocation
|
||||
import org.apache.spark.sql.types.{StructField, StructType}
|
||||
import org.apache.spark.sql.util.SchemaUtils
|
||||
@@ -50,7 +50,15 @@ case class AlterHoodieTableAddColumnsCommand(
|
||||
|
||||
override def run(sparkSession: SparkSession): Seq[Row] = {
|
||||
if (colsToAdd.nonEmpty) {
|
||||
val resolver = sparkSession.sessionState.conf.resolver
|
||||
val table = sparkSession.sessionState.catalog.getTableMetadata(tableId)
|
||||
val existsColumns =
|
||||
colsToAdd.map(_.name).filter(col => table.schema.fieldNames.exists(f => resolver(f, col)))
|
||||
|
||||
if (existsColumns.nonEmpty) {
|
||||
throw new AnalysisException(s"Columns: [${existsColumns.mkString(",")}] already exists in the table," +
|
||||
s" table columns is: [${HoodieSqlUtils.removeMetaFields(table.schema).fieldNames.mkString(",")}]")
|
||||
}
|
||||
// Get the new schema
|
||||
val newSqlSchema = StructType(table.schema.fields ++ colsToAdd)
|
||||
val (structName, nameSpace) = AvroConversionUtils.getAvroRecordNameAndNamespace(tableId.table)
|
||||
@@ -60,7 +68,8 @@ case class AlterHoodieTableAddColumnsCommand(
|
||||
AlterHoodieTableAddColumnsCommand.commitWithSchema(newSchema, table, sparkSession)
|
||||
|
||||
// Refresh the new schema to meta
|
||||
refreshSchemaInMeta(sparkSession, table, newSqlSchema)
|
||||
val newDataSchema = StructType(table.dataSchema.fields ++ colsToAdd)
|
||||
refreshSchemaInMeta(sparkSession, table, newDataSchema)
|
||||
}
|
||||
Seq.empty[Row]
|
||||
}
|
||||
|
||||
@@ -49,13 +49,21 @@ case class AlterHoodieTableChangeColumnCommand(
|
||||
}
|
||||
// Get the new schema
|
||||
val newSqlSchema = StructType(
|
||||
table.dataSchema.fields.map { field =>
|
||||
table.schema.fields.map { field =>
|
||||
if (resolver(field.name, columnName)) {
|
||||
newColumn
|
||||
} else {
|
||||
field
|
||||
}
|
||||
})
|
||||
val newDataSchema = StructType(
|
||||
table.dataSchema.fields.map { field =>
|
||||
if (resolver(field.name, columnName)) {
|
||||
newColumn
|
||||
} else {
|
||||
field
|
||||
}
|
||||
})
|
||||
val (structName, nameSpace) = AvroConversionUtils.getAvroRecordNameAndNamespace(tableName.table)
|
||||
val newSchema = AvroConversionUtils.convertStructTypeToAvroSchema(newSqlSchema, structName, nameSpace)
|
||||
|
||||
@@ -76,8 +84,8 @@ case class AlterHoodieTableChangeColumnCommand(
|
||||
log.warn(s"Exception when attempting to uncache table ${tableName.quotedString}", e)
|
||||
}
|
||||
sparkSession.catalog.refreshTable(tableName.unquotedString)
|
||||
// Change the schema in the meta
|
||||
catalog.alterTableDataSchema(tableName, newSqlSchema)
|
||||
// Change the schema in the meta using new data schema.
|
||||
catalog.alterTableDataSchema(tableName, newDataSchema)
|
||||
|
||||
Seq.empty[Row]
|
||||
}
|
||||
|
||||
@@ -115,6 +115,36 @@ class TestAlterTable extends TestHoodieSqlBase {
|
||||
checkAnswer(s"select id, name, price, ts, ext0 from $newTableName")(
|
||||
Seq(2, "a2", 10.0, 1000, null)
|
||||
)
|
||||
|
||||
val partitionedTable = generateTableName
|
||||
spark.sql(
|
||||
s"""
|
||||
|create table $partitionedTable (
|
||||
| id int,
|
||||
| name string,
|
||||
| price double,
|
||||
| ts long,
|
||||
| dt string
|
||||
|) using hudi
|
||||
| location '${tmp.getCanonicalPath}/$partitionedTable'
|
||||
| options (
|
||||
| type = '$tableType',
|
||||
| primaryKey = 'id',
|
||||
| preCombineField = 'ts'
|
||||
| )
|
||||
| partitioned by (dt)
|
||||
""".stripMargin)
|
||||
spark.sql(s"insert into $partitionedTable values(1, 'a1', 10, 1000, '2021-07-25')")
|
||||
spark.sql(s"alter table $partitionedTable add columns(ext0 double)")
|
||||
checkAnswer(s"select id, name, price, ts, dt, ext0 from $partitionedTable")(
|
||||
Seq(1, "a1", 10.0, 1000, "2021-07-25", null)
|
||||
)
|
||||
|
||||
spark.sql(s"insert into $partitionedTable values(2, 'a2', 10, 1000, 1, '2021-07-25')");
|
||||
checkAnswer(s"select id, name, price, ts, dt, ext0 from $partitionedTable order by id")(
|
||||
Seq(1, "a1", 10.0, 1000, "2021-07-25", null),
|
||||
Seq(2, "a2", 10.0, 1000, "2021-07-25", 1.0)
|
||||
)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user