[HUDI-3236] use fields'comments persisted in catalog to fill in schema (#4587)
This commit is contained in:
@@ -17,6 +17,7 @@
|
|||||||
|
|
||||||
package org.apache.spark.sql.catalyst.catalog
|
package org.apache.spark.sql.catalyst.catalog
|
||||||
|
|
||||||
|
import org.apache.hudi.AvroConversionUtils
|
||||||
import org.apache.hudi.HoodieWriterUtils._
|
import org.apache.hudi.HoodieWriterUtils._
|
||||||
import org.apache.hudi.common.config.DFSPropertiesConfiguration
|
import org.apache.hudi.common.config.DFSPropertiesConfiguration
|
||||||
import org.apache.hudi.common.model.HoodieTableType
|
import org.apache.hudi.common.model.HoodieTableType
|
||||||
@@ -30,7 +31,7 @@ import org.apache.spark.internal.Logging
|
|||||||
import org.apache.spark.sql.{AnalysisException, SparkSession}
|
import org.apache.spark.sql.{AnalysisException, SparkSession}
|
||||||
import org.apache.spark.sql.avro.SchemaConverters
|
import org.apache.spark.sql.avro.SchemaConverters
|
||||||
import org.apache.spark.sql.catalyst.TableIdentifier
|
import org.apache.spark.sql.catalyst.TableIdentifier
|
||||||
import org.apache.spark.sql.hudi.{HoodieOptionConfig, HoodieSqlCommonUtils}
|
import org.apache.spark.sql.hudi.HoodieOptionConfig
|
||||||
import org.apache.spark.sql.hudi.HoodieSqlCommonUtils._
|
import org.apache.spark.sql.hudi.HoodieSqlCommonUtils._
|
||||||
import org.apache.spark.sql.types.{StructField, StructType}
|
import org.apache.spark.sql.types.{StructField, StructType}
|
||||||
|
|
||||||
@@ -62,7 +63,7 @@ class HoodieCatalogTable(val spark: SparkSession, val table: CatalogTable) exten
|
|||||||
* hoodie table's location.
|
* hoodie table's location.
|
||||||
* if create managed hoodie table, use `catalog.defaultTablePath`.
|
* if create managed hoodie table, use `catalog.defaultTablePath`.
|
||||||
*/
|
*/
|
||||||
val tableLocation: String = HoodieSqlCommonUtils.getTableLocation(table, spark)
|
val tableLocation: String = getTableLocation(table, spark)
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* A flag to whether the hoodie table exists.
|
* A flag to whether the hoodie table exists.
|
||||||
@@ -114,17 +115,27 @@ class HoodieCatalogTable(val spark: SparkSession, val table: CatalogTable) exten
|
|||||||
|
|
||||||
/**
|
/**
|
||||||
* The schema of table.
|
* The schema of table.
|
||||||
* Make StructField nullable.
|
* Make StructField nullable and fill the comments in.
|
||||||
*/
|
*/
|
||||||
lazy val tableSchema: StructType = {
|
lazy val tableSchema: StructType = {
|
||||||
|
val resolver = spark.sessionState.conf.resolver
|
||||||
val originSchema = getTableSqlSchema(metaClient, includeMetadataFields = true).getOrElse(table.schema)
|
val originSchema = getTableSqlSchema(metaClient, includeMetadataFields = true).getOrElse(table.schema)
|
||||||
StructType(originSchema.map(_.copy(nullable = true)))
|
val fields = originSchema.fields.map { f =>
|
||||||
|
val nullableField: StructField = f.copy(nullable = true)
|
||||||
|
val catalogField = findColumnByName(table.schema, nullableField.name, resolver)
|
||||||
|
if (catalogField.isDefined) {
|
||||||
|
catalogField.get.getComment().map(nullableField.withComment).getOrElse(nullableField)
|
||||||
|
} else {
|
||||||
|
nullableField
|
||||||
|
}
|
||||||
|
}
|
||||||
|
StructType(fields)
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* The schema without hoodie meta fields
|
* The schema without hoodie meta fields
|
||||||
*/
|
*/
|
||||||
lazy val tableSchemaWithoutMetaFields: StructType = HoodieSqlCommonUtils.removeMetaFields(tableSchema)
|
lazy val tableSchemaWithoutMetaFields: StructType = removeMetaFields(tableSchema)
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* The schema of data fields
|
* The schema of data fields
|
||||||
@@ -136,7 +147,7 @@ class HoodieCatalogTable(val spark: SparkSession, val table: CatalogTable) exten
|
|||||||
/**
|
/**
|
||||||
* The schema of data fields not including hoodie meta fields
|
* The schema of data fields not including hoodie meta fields
|
||||||
*/
|
*/
|
||||||
lazy val dataSchemaWithoutMetaFields: StructType = HoodieSqlCommonUtils.removeMetaFields(dataSchema)
|
lazy val dataSchemaWithoutMetaFields: StructType = removeMetaFields(dataSchema)
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* The schema of partition fields
|
* The schema of partition fields
|
||||||
@@ -146,7 +157,7 @@ class HoodieCatalogTable(val spark: SparkSession, val table: CatalogTable) exten
|
|||||||
/**
|
/**
|
||||||
* All the partition paths
|
* All the partition paths
|
||||||
*/
|
*/
|
||||||
def getAllPartitionPaths: Seq[String] = HoodieSqlCommonUtils.getAllPartitionPaths(spark, table)
|
def getPartitionPaths: Seq[String] = getAllPartitionPaths(spark, table)
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Check if table is a partitioned table
|
* Check if table is a partitioned table
|
||||||
@@ -171,10 +182,12 @@ class HoodieCatalogTable(val spark: SparkSession, val table: CatalogTable) exten
|
|||||||
.setTableCreateSchema(SchemaConverters.toAvroType(finalSchema).toString())
|
.setTableCreateSchema(SchemaConverters.toAvroType(finalSchema).toString())
|
||||||
.initTable(hadoopConf, tableLocation)
|
.initTable(hadoopConf, tableLocation)
|
||||||
} else {
|
} else {
|
||||||
|
val (recordName, namespace) = AvroConversionUtils.getAvroRecordNameAndNamespace(table.identifier.table)
|
||||||
|
val schema = SchemaConverters.toAvroType(finalSchema, false, recordName, namespace)
|
||||||
HoodieTableMetaClient.withPropertyBuilder()
|
HoodieTableMetaClient.withPropertyBuilder()
|
||||||
.fromProperties(properties)
|
.fromProperties(properties)
|
||||||
.setTableName(table.identifier.table)
|
.setTableName(table.identifier.table)
|
||||||
.setTableCreateSchema(SchemaConverters.toAvroType(finalSchema).toString())
|
.setTableCreateSchema(schema.toString())
|
||||||
.setPartitionFields(table.partitionColumnNames.mkString(","))
|
.setPartitionFields(table.partitionColumnNames.mkString(","))
|
||||||
.initTable(hadoopConf, tableLocation)
|
.initTable(hadoopConf, tableLocation)
|
||||||
}
|
}
|
||||||
@@ -239,7 +252,7 @@ class HoodieCatalogTable(val spark: SparkSession, val table: CatalogTable) exten
|
|||||||
originTableConfig: Map[String, String] = Map.empty): Map[String, String] = {
|
originTableConfig: Map[String, String] = Map.empty): Map[String, String] = {
|
||||||
val extraConfig = mutable.Map.empty[String, String]
|
val extraConfig = mutable.Map.empty[String, String]
|
||||||
if (isTableExists) {
|
if (isTableExists) {
|
||||||
val allPartitionPaths = getAllPartitionPaths
|
val allPartitionPaths = getPartitionPaths
|
||||||
if (originTableConfig.contains(HoodieTableConfig.HIVE_STYLE_PARTITIONING_ENABLE.key)) {
|
if (originTableConfig.contains(HoodieTableConfig.HIVE_STYLE_PARTITIONING_ENABLE.key)) {
|
||||||
extraConfig(HoodieTableConfig.HIVE_STYLE_PARTITIONING_ENABLE.key) =
|
extraConfig(HoodieTableConfig.HIVE_STYLE_PARTITIONING_ENABLE.key) =
|
||||||
originTableConfig(HoodieTableConfig.HIVE_STYLE_PARTITIONING_ENABLE.key)
|
originTableConfig(HoodieTableConfig.HIVE_STYLE_PARTITIONING_ENABLE.key)
|
||||||
|
|||||||
@@ -19,6 +19,7 @@ package org.apache.spark.sql.hudi
|
|||||||
|
|
||||||
import org.apache.hadoop.conf.Configuration
|
import org.apache.hadoop.conf.Configuration
|
||||||
import org.apache.hadoop.fs.Path
|
import org.apache.hadoop.fs.Path
|
||||||
|
|
||||||
import org.apache.hudi.client.common.HoodieSparkEngineContext
|
import org.apache.hudi.client.common.HoodieSparkEngineContext
|
||||||
import org.apache.hudi.common.config.{DFSPropertiesConfiguration, HoodieMetadataConfig}
|
import org.apache.hudi.common.config.{DFSPropertiesConfiguration, HoodieMetadataConfig}
|
||||||
import org.apache.hudi.common.fs.FSUtils
|
import org.apache.hudi.common.fs.FSUtils
|
||||||
@@ -26,9 +27,10 @@ import org.apache.hudi.common.model.HoodieRecord
|
|||||||
import org.apache.hudi.common.table.timeline.{HoodieActiveTimeline, HoodieInstantTimeGenerator}
|
import org.apache.hudi.common.table.timeline.{HoodieActiveTimeline, HoodieInstantTimeGenerator}
|
||||||
import org.apache.hudi.common.table.{HoodieTableMetaClient, TableSchemaResolver}
|
import org.apache.hudi.common.table.{HoodieTableMetaClient, TableSchemaResolver}
|
||||||
import org.apache.hudi.{AvroConversionUtils, SparkAdapterSupport}
|
import org.apache.hudi.{AvroConversionUtils, SparkAdapterSupport}
|
||||||
|
|
||||||
import org.apache.spark.api.java.JavaSparkContext
|
import org.apache.spark.api.java.JavaSparkContext
|
||||||
import org.apache.spark.sql.catalyst.TableIdentifier
|
import org.apache.spark.sql.catalyst.TableIdentifier
|
||||||
import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation
|
import org.apache.spark.sql.catalyst.analysis.{Resolver, UnresolvedRelation}
|
||||||
import org.apache.spark.sql.catalyst.catalog.{CatalogTable, CatalogTableType}
|
import org.apache.spark.sql.catalyst.catalog.{CatalogTable, CatalogTableType}
|
||||||
import org.apache.spark.sql.catalyst.expressions.{And, Attribute, Expression}
|
import org.apache.spark.sql.catalyst.expressions.{And, Attribute, Expression}
|
||||||
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, SubqueryAlias}
|
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, SubqueryAlias}
|
||||||
@@ -40,6 +42,7 @@ import org.apache.spark.sql.{Column, DataFrame, SparkSession}
|
|||||||
import java.net.URI
|
import java.net.URI
|
||||||
import java.text.SimpleDateFormat
|
import java.text.SimpleDateFormat
|
||||||
import java.util.{Locale, Properties}
|
import java.util.{Locale, Properties}
|
||||||
|
|
||||||
import scala.collection.JavaConverters._
|
import scala.collection.JavaConverters._
|
||||||
import scala.collection.immutable.Map
|
import scala.collection.immutable.Map
|
||||||
|
|
||||||
@@ -301,4 +304,12 @@ object HoodieSqlCommonUtils extends SparkAdapterSupport {
|
|||||||
true
|
true
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Find the origin column from schema by column name, throw an AnalysisException if the column
|
||||||
|
// reference is invalid.
|
||||||
|
def findColumnByName(schema: StructType, name: String, resolver: Resolver):Option[StructField] = {
|
||||||
|
schema.fields.collectFirst {
|
||||||
|
case field if resolver(field.name, name) => field
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -26,6 +26,7 @@ import org.apache.hudi.exception.HoodieException
|
|||||||
import org.apache.spark.sql.{AnalysisException, Row, SparkSession}
|
import org.apache.spark.sql.{AnalysisException, Row, SparkSession}
|
||||||
import org.apache.spark.sql.catalyst.TableIdentifier
|
import org.apache.spark.sql.catalyst.TableIdentifier
|
||||||
import org.apache.spark.sql.catalyst.catalog.HoodieCatalogTable
|
import org.apache.spark.sql.catalyst.catalog.HoodieCatalogTable
|
||||||
|
import org.apache.spark.sql.hudi.HoodieSqlCommonUtils._
|
||||||
import org.apache.spark.sql.types.{StructField, StructType}
|
import org.apache.spark.sql.types.{StructField, StructType}
|
||||||
|
|
||||||
import scala.util.control.NonFatal
|
import scala.util.control.NonFatal
|
||||||
@@ -40,16 +41,19 @@ case class AlterHoodieTableChangeColumnCommand(
|
|||||||
extends HoodieLeafRunnableCommand {
|
extends HoodieLeafRunnableCommand {
|
||||||
|
|
||||||
override def run(sparkSession: SparkSession): Seq[Row] = {
|
override def run(sparkSession: SparkSession): Seq[Row] = {
|
||||||
|
val resolver = sparkSession.sessionState.conf.resolver
|
||||||
val hoodieCatalogTable = HoodieCatalogTable(sparkSession, tableIdentifier)
|
val hoodieCatalogTable = HoodieCatalogTable(sparkSession, tableIdentifier)
|
||||||
|
|
||||||
val resolver = sparkSession.sessionState.conf.resolver
|
// Find the origin column from dataSchema by column name.
|
||||||
if (!resolver(columnName, newColumn.name)) {
|
val originColumn = findColumnByName(hoodieCatalogTable.dataSchema, columnName, resolver).getOrElse(
|
||||||
throw new AnalysisException(s"Can not support change column name for hudi table currently.")
|
throw new AnalysisException(s"Can't find column `$columnName` given table data columns " +
|
||||||
}
|
s"${hoodieCatalogTable.dataSchema.fieldNames.mkString("[`", "`, `", "`]")}")
|
||||||
|
)
|
||||||
|
|
||||||
// Get the new schema
|
// Get the new schema
|
||||||
val newTableSchema = StructType(
|
val newTableSchema = StructType(
|
||||||
hoodieCatalogTable.tableSchema.fields.map { field =>
|
hoodieCatalogTable.tableSchema.fields.map { field =>
|
||||||
if (resolver(field.name, columnName)) {
|
if (field.name == originColumn.name) {
|
||||||
newColumn
|
newColumn
|
||||||
} else {
|
} else {
|
||||||
field
|
field
|
||||||
@@ -57,7 +61,7 @@ case class AlterHoodieTableChangeColumnCommand(
|
|||||||
})
|
})
|
||||||
val newDataSchema = StructType(
|
val newDataSchema = StructType(
|
||||||
hoodieCatalogTable.dataSchema.fields.map { field =>
|
hoodieCatalogTable.dataSchema.fields.map { field =>
|
||||||
if (resolver(field.name, columnName)) {
|
if (field.name == columnName) {
|
||||||
newColumn
|
newColumn
|
||||||
} else {
|
} else {
|
||||||
field
|
field
|
||||||
|
|||||||
@@ -148,7 +148,7 @@ case class AlterHoodieTableDropPartitionCommand(
|
|||||||
hoodieCatalogTable: HoodieCatalogTable,
|
hoodieCatalogTable: HoodieCatalogTable,
|
||||||
normalizedSpecs: Seq[Map[String, String]]): String = {
|
normalizedSpecs: Seq[Map[String, String]]): String = {
|
||||||
val table = hoodieCatalogTable.table
|
val table = hoodieCatalogTable.table
|
||||||
val allPartitionPaths = hoodieCatalogTable.getAllPartitionPaths
|
val allPartitionPaths = hoodieCatalogTable.getPartitionPaths
|
||||||
val enableHiveStylePartitioning = isHiveStyledPartitioning(allPartitionPaths, table)
|
val enableHiveStylePartitioning = isHiveStyledPartitioning(allPartitionPaths, table)
|
||||||
val enableEncodeUrl = isUrlEncodeEnabled(allPartitionPaths, table)
|
val enableEncodeUrl = isUrlEncodeEnabled(allPartitionPaths, table)
|
||||||
val partitionsToDrop = normalizedSpecs.map { spec =>
|
val partitionsToDrop = normalizedSpecs.map { spec =>
|
||||||
|
|||||||
@@ -48,10 +48,10 @@ case class ShowHoodieTablePartitionsCommand(
|
|||||||
|
|
||||||
if (partitionColumnNamesOpt.isPresent && partitionColumnNamesOpt.get.nonEmpty && schemaOpt.nonEmpty) {
|
if (partitionColumnNamesOpt.isPresent && partitionColumnNamesOpt.get.nonEmpty && schemaOpt.nonEmpty) {
|
||||||
if (specOpt.isEmpty) {
|
if (specOpt.isEmpty) {
|
||||||
hoodieCatalogTable.getAllPartitionPaths.map(Row(_))
|
hoodieCatalogTable.getPartitionPaths.map(Row(_))
|
||||||
} else {
|
} else {
|
||||||
val spec = specOpt.get
|
val spec = specOpt.get
|
||||||
hoodieCatalogTable.getAllPartitionPaths.filter { partitionPath =>
|
hoodieCatalogTable.getPartitionPaths.filter { partitionPath =>
|
||||||
val part = PartitioningUtils.parsePathFragment(partitionPath)
|
val part = PartitioningUtils.parsePathFragment(partitionPath)
|
||||||
spec.forall { case (col, value) =>
|
spec.forall { case (col, value) =>
|
||||||
PartitionPathEncodeUtils.escapePartitionValue(value) == part.getOrElse(col, null)
|
PartitionPathEncodeUtils.escapePartitionValue(value) == part.getOrElse(col, null)
|
||||||
|
|||||||
@@ -18,6 +18,7 @@
|
|||||||
package org.apache.spark.sql.hudi
|
package org.apache.spark.sql.hudi
|
||||||
|
|
||||||
import org.apache.hudi.common.table.HoodieTableMetaClient
|
import org.apache.hudi.common.table.HoodieTableMetaClient
|
||||||
|
|
||||||
import org.apache.spark.sql.catalyst.TableIdentifier
|
import org.apache.spark.sql.catalyst.TableIdentifier
|
||||||
import org.apache.spark.sql.types.{LongType, StructField, StructType}
|
import org.apache.spark.sql.types.{LongType, StructField, StructType}
|
||||||
|
|
||||||
@@ -44,7 +45,24 @@ class TestAlterTable extends TestHoodieSqlBase {
|
|||||||
| preCombineField = 'ts'
|
| preCombineField = 'ts'
|
||||||
| )
|
| )
|
||||||
""".stripMargin)
|
""".stripMargin)
|
||||||
// Alter table name.
|
|
||||||
|
// change column comment
|
||||||
|
spark.sql(s"alter table $tableName change column id id int comment 'primary id'")
|
||||||
|
var catalogTable = spark.sessionState.catalog.getTableMetadata(new TableIdentifier(tableName))
|
||||||
|
assertResult("primary id") (
|
||||||
|
catalogTable.schema(catalogTable.schema.fieldIndex("id")).getComment().get
|
||||||
|
)
|
||||||
|
spark.sql(s"alter table $tableName change column name name string comment 'name column'")
|
||||||
|
spark.sessionState.catalog.refreshTable(new TableIdentifier(tableName))
|
||||||
|
catalogTable = spark.sessionState.catalog.getTableMetadata(new TableIdentifier(tableName))
|
||||||
|
assertResult("primary id") (
|
||||||
|
catalogTable.schema(catalogTable.schema.fieldIndex("id")).getComment().get
|
||||||
|
)
|
||||||
|
assertResult("name column") (
|
||||||
|
catalogTable.schema(catalogTable.schema.fieldIndex("name")).getComment().get
|
||||||
|
)
|
||||||
|
|
||||||
|
// alter table name.
|
||||||
val newTableName = s"${tableName}_1"
|
val newTableName = s"${tableName}_1"
|
||||||
spark.sql(s"alter table $tableName rename to $newTableName")
|
spark.sql(s"alter table $tableName rename to $newTableName")
|
||||||
assertResult(false)(
|
assertResult(false)(
|
||||||
@@ -53,24 +71,26 @@ class TestAlterTable extends TestHoodieSqlBase {
|
|||||||
assertResult(true) (
|
assertResult(true) (
|
||||||
spark.sessionState.catalog.tableExists(new TableIdentifier(newTableName))
|
spark.sessionState.catalog.tableExists(new TableIdentifier(newTableName))
|
||||||
)
|
)
|
||||||
|
|
||||||
val hadoopConf = spark.sessionState.newHadoopConf()
|
val hadoopConf = spark.sessionState.newHadoopConf()
|
||||||
val metaClient = HoodieTableMetaClient.builder().setBasePath(tablePath)
|
val metaClient = HoodieTableMetaClient.builder().setBasePath(tablePath)
|
||||||
.setConf(hadoopConf).build()
|
.setConf(hadoopConf).build()
|
||||||
assertResult(newTableName) (
|
assertResult(newTableName) (metaClient.getTableConfig.getTableName)
|
||||||
metaClient.getTableConfig.getTableName
|
|
||||||
)
|
// insert some data
|
||||||
spark.sql(s"insert into $newTableName values(1, 'a1', 10, 1000)")
|
spark.sql(s"insert into $newTableName values(1, 'a1', 10, 1000)")
|
||||||
|
|
||||||
// Add table column
|
// add column
|
||||||
spark.sql(s"alter table $newTableName add columns(ext0 string)")
|
spark.sql(s"alter table $newTableName add columns(ext0 string)")
|
||||||
val table = spark.sessionState.catalog.getTableMetadata(new TableIdentifier(newTableName))
|
catalogTable = spark.sessionState.catalog.getTableMetadata(new TableIdentifier(newTableName))
|
||||||
assertResult(Seq("id", "name", "price", "ts", "ext0")) {
|
assertResult(Seq("id", "name", "price", "ts", "ext0")) {
|
||||||
HoodieSqlCommonUtils.removeMetaFields(table.schema).fields.map(_.name)
|
HoodieSqlCommonUtils.removeMetaFields(catalogTable.schema).fields.map(_.name)
|
||||||
}
|
}
|
||||||
checkAnswer(s"select id, name, price, ts, ext0 from $newTableName")(
|
checkAnswer(s"select id, name, price, ts, ext0 from $newTableName")(
|
||||||
Seq(1, "a1", 10.0, 1000, null)
|
Seq(1, "a1", 10.0, 1000, null)
|
||||||
)
|
)
|
||||||
// Alter table column type
|
|
||||||
|
// change column's data type
|
||||||
spark.sql(s"alter table $newTableName change column id id bigint")
|
spark.sql(s"alter table $newTableName change column id id bigint")
|
||||||
assertResult(StructType(Seq(StructField("id", LongType, nullable = true))))(
|
assertResult(StructType(Seq(StructField("id", LongType, nullable = true))))(
|
||||||
spark.sql(s"select id from $newTableName").schema)
|
spark.sql(s"select id from $newTableName").schema)
|
||||||
|
|||||||
Reference in New Issue
Block a user