[HUDI-3198] Improve Spark SQL create table from existing hudi table (#4584)
To modify SQL statement for creating hudi table based on an existing hudi path. From: ```sql create table hudi_tbl using hudi tblproperties (primaryKey='id', preCombineField='ts', type='cow') partitioned by (pt) location '/path/to/hudi' ``` To: ```sql create table hudi_tbl using hudi location '/path/to/hudi' ```
This commit is contained in:
@@ -164,12 +164,20 @@ class HoodieCatalogTable(val spark: SparkSession, val table: CatalogTable) exten
|
|||||||
val properties = new Properties()
|
val properties = new Properties()
|
||||||
properties.putAll(tableConfigs.asJava)
|
properties.putAll(tableConfigs.asJava)
|
||||||
|
|
||||||
HoodieTableMetaClient.withPropertyBuilder()
|
if (hoodieTableExists) {
|
||||||
.fromProperties(properties)
|
// just persist hoodie.table.create.schema
|
||||||
.setTableName(table.identifier.table)
|
HoodieTableMetaClient.withPropertyBuilder()
|
||||||
.setTableCreateSchema(SchemaConverters.toAvroType(finalSchema).toString())
|
.fromProperties(properties)
|
||||||
.setPartitionFields(table.partitionColumnNames.mkString(","))
|
.setTableCreateSchema(SchemaConverters.toAvroType(finalSchema).toString())
|
||||||
.initTable(hadoopConf, tableLocation)
|
.initTable(hadoopConf, tableLocation)
|
||||||
|
} else {
|
||||||
|
HoodieTableMetaClient.withPropertyBuilder()
|
||||||
|
.fromProperties(properties)
|
||||||
|
.setTableName(table.identifier.table)
|
||||||
|
.setTableCreateSchema(SchemaConverters.toAvroType(finalSchema).toString())
|
||||||
|
.setPartitionFields(table.partitionColumnNames.mkString(","))
|
||||||
|
.initTable(hadoopConf, tableLocation)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|||||||
@@ -61,6 +61,7 @@ case class CreateHoodieTableCommand(table: CatalogTable, ignoreIfExists: Boolean
|
|||||||
val hoodieCatalogTable = HoodieCatalogTable(sparkSession, table)
|
val hoodieCatalogTable = HoodieCatalogTable(sparkSession, table)
|
||||||
// check if there are conflict between table configs defined in hoodie table and properties defined in catalog.
|
// check if there are conflict between table configs defined in hoodie table and properties defined in catalog.
|
||||||
CreateHoodieTableCommand.validateTblProperties(hoodieCatalogTable)
|
CreateHoodieTableCommand.validateTblProperties(hoodieCatalogTable)
|
||||||
|
|
||||||
// init hoodie table
|
// init hoodie table
|
||||||
hoodieCatalogTable.initHoodieTable()
|
hoodieCatalogTable.initHoodieTable()
|
||||||
|
|
||||||
@@ -129,12 +130,14 @@ object CreateHoodieTableCommand {
|
|||||||
val newTableIdentifier = table.identifier
|
val newTableIdentifier = table.identifier
|
||||||
.copy(table = tablName, database = Some(newDatabaseName))
|
.copy(table = tablName, database = Some(newDatabaseName))
|
||||||
|
|
||||||
|
val partitionColumnNames = hoodieCatalogTable.partitionSchema.map(_.name)
|
||||||
// append pk, preCombineKey, type to the properties of table
|
// append pk, preCombineKey, type to the properties of table
|
||||||
val newTblProperties = hoodieCatalogTable.catalogProperties ++ HoodieOptionConfig.extractSqlOptions(properties)
|
val newTblProperties = hoodieCatalogTable.catalogProperties ++ HoodieOptionConfig.extractSqlOptions(properties)
|
||||||
val newTable = table.copy(
|
val newTable = table.copy(
|
||||||
identifier = newTableIdentifier,
|
identifier = newTableIdentifier,
|
||||||
schema = hoodieCatalogTable.tableSchema,
|
|
||||||
storage = newStorage,
|
storage = newStorage,
|
||||||
|
schema = hoodieCatalogTable.tableSchema,
|
||||||
|
partitionColumnNames = partitionColumnNames,
|
||||||
createVersion = SPARK_VERSION,
|
createVersion = SPARK_VERSION,
|
||||||
properties = newTblProperties
|
properties = newTblProperties
|
||||||
)
|
)
|
||||||
|
|||||||
@@ -322,30 +322,7 @@ case class HoodieResolveReferences(sparkSession: SparkSession) extends Rule[Logi
|
|||||||
} else {
|
} else {
|
||||||
l
|
l
|
||||||
}
|
}
|
||||||
// Fill schema for Create Table without specify schema info
|
|
||||||
case c @ CreateTable(tableDesc, _, _)
|
|
||||||
if isHoodieTable(tableDesc) =>
|
|
||||||
val tablePath = getTableLocation(c.tableDesc, sparkSession)
|
|
||||||
val tableExistInCatalog = sparkSession.sessionState.catalog.tableExists(tableDesc.identifier)
|
|
||||||
// Only when the table has not exist in catalog, we need to fill the schema info for creating table.
|
|
||||||
if (!tableExistInCatalog && tableExistsInPath(tablePath, sparkSession.sessionState.newHadoopConf())) {
|
|
||||||
val metaClient = HoodieTableMetaClient.builder()
|
|
||||||
.setBasePath(tablePath)
|
|
||||||
.setConf(sparkSession.sessionState.newHadoopConf())
|
|
||||||
.build()
|
|
||||||
val tableSchema = HoodieSqlCommonUtils.getTableSqlSchema(metaClient)
|
|
||||||
if (tableSchema.isDefined && tableDesc.schema.isEmpty) {
|
|
||||||
// Fill the schema with the schema from the table
|
|
||||||
c.copy(tableDesc.copy(schema = tableSchema.get))
|
|
||||||
} else if (tableSchema.isDefined && tableDesc.schema != tableSchema.get) {
|
|
||||||
throw new AnalysisException(s"Specified schema in create table statement is not equal to the table schema." +
|
|
||||||
s"You should not specify the schema for an exist table: ${tableDesc.identifier} ")
|
|
||||||
} else {
|
|
||||||
c
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
c
|
|
||||||
}
|
|
||||||
case p => p
|
case p => p
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -100,11 +100,6 @@ class TestAlterTableDropPartition extends TestHoodieSqlBase {
|
|||||||
spark.sql(
|
spark.sql(
|
||||||
s"""
|
s"""
|
||||||
|create table $tableName using hudi
|
|create table $tableName using hudi
|
||||||
|tblproperties (
|
|
||||||
| primaryKey = 'id',
|
|
||||||
| preCombineField = 'ts'
|
|
||||||
|)
|
|
||||||
|partitioned by (dt)
|
|
||||||
|location '$tablePath'
|
|location '$tablePath'
|
||||||
|""".stripMargin)
|
|""".stripMargin)
|
||||||
|
|
||||||
@@ -149,11 +144,6 @@ class TestAlterTableDropPartition extends TestHoodieSqlBase {
|
|||||||
spark.sql(
|
spark.sql(
|
||||||
s"""
|
s"""
|
||||||
|create table $tableName using hudi
|
|create table $tableName using hudi
|
||||||
|tblproperties (
|
|
||||||
| primaryKey = 'id',
|
|
||||||
| preCombineField = 'ts'
|
|
||||||
|)
|
|
||||||
|partitioned by (dt)
|
|
||||||
|location '$tablePath'
|
|location '$tablePath'
|
||||||
|""".stripMargin)
|
|""".stripMargin)
|
||||||
|
|
||||||
@@ -210,7 +200,7 @@ class TestAlterTableDropPartition extends TestHoodieSqlBase {
|
|||||||
|
|
||||||
import spark.implicits._
|
import spark.implicits._
|
||||||
val df = Seq((1, "z3", "v1", "2021", "10", "01"), (2, "l4", "v1", "2021", "10","02"))
|
val df = Seq((1, "z3", "v1", "2021", "10", "01"), (2, "l4", "v1", "2021", "10","02"))
|
||||||
.toDF("id", "name", "ts", "year", "month", "day")
|
.toDF("id", "name", "ts", "year", "month", "day")
|
||||||
|
|
||||||
df.write.format("hudi")
|
df.write.format("hudi")
|
||||||
.option(HoodieWriteConfig.TBL_NAME.key, tableName)
|
.option(HoodieWriteConfig.TBL_NAME.key, tableName)
|
||||||
@@ -229,11 +219,6 @@ class TestAlterTableDropPartition extends TestHoodieSqlBase {
|
|||||||
spark.sql(
|
spark.sql(
|
||||||
s"""
|
s"""
|
||||||
|create table $tableName using hudi
|
|create table $tableName using hudi
|
||||||
|tblproperties (
|
|
||||||
| primaryKey = 'id',
|
|
||||||
| preCombineField = 'ts'
|
|
||||||
|)
|
|
||||||
|partitioned by (year, month, day)
|
|
||||||
|location '$tablePath'
|
|location '$tablePath'
|
||||||
|""".stripMargin)
|
|""".stripMargin)
|
||||||
|
|
||||||
@@ -278,11 +263,6 @@ class TestAlterTableDropPartition extends TestHoodieSqlBase {
|
|||||||
spark.sql(
|
spark.sql(
|
||||||
s"""
|
s"""
|
||||||
|create table $tableName using hudi
|
|create table $tableName using hudi
|
||||||
|tblproperties (
|
|
||||||
| primaryKey = 'id',
|
|
||||||
| preCombineField = 'ts'
|
|
||||||
|)
|
|
||||||
|partitioned by (year, month, day)
|
|
||||||
|location '$tablePath'
|
|location '$tablePath'
|
||||||
|""".stripMargin)
|
|""".stripMargin)
|
||||||
|
|
||||||
|
|||||||
@@ -167,7 +167,7 @@ class TestCreateTable extends TestHoodieSqlBase {
|
|||||||
assertResult(Seq("dt"))(table2.partitionColumnNames)
|
assertResult(Seq("dt"))(table2.partitionColumnNames)
|
||||||
assertResult(classOf[HoodieParquetRealtimeInputFormat].getCanonicalName)(table2.storage.inputFormat.get)
|
assertResult(classOf[HoodieParquetRealtimeInputFormat].getCanonicalName)(table2.storage.inputFormat.get)
|
||||||
|
|
||||||
// Test create a external table with an exist table in the path
|
// Test create a external table with an existing table in the path
|
||||||
val tableName3 = generateTableName
|
val tableName3 = generateTableName
|
||||||
spark.sql(
|
spark.sql(
|
||||||
s"""
|
s"""
|
||||||
@@ -285,17 +285,18 @@ class TestCreateTable extends TestHoodieSqlBase {
|
|||||||
val tableName3 = generateTableName
|
val tableName3 = generateTableName
|
||||||
// CTAS failed with null primaryKey
|
// CTAS failed with null primaryKey
|
||||||
assertThrows[Exception] {
|
assertThrows[Exception] {
|
||||||
spark.sql(
|
spark.sql(
|
||||||
s"""
|
s"""
|
||||||
| create table $tableName3 using hudi
|
| create table $tableName3 using hudi
|
||||||
| partitioned by (dt)
|
| partitioned by (dt)
|
||||||
| tblproperties(primaryKey = 'id')
|
| tblproperties(primaryKey = 'id')
|
||||||
| location '${tmp.getCanonicalPath}/$tableName3'
|
| location '${tmp.getCanonicalPath}/$tableName3'
|
||||||
| AS
|
| AS
|
||||||
| select null as id, 'a1' as name, 10 as price, '2021-05-07' as dt
|
| select null as id, 'a1' as name, 10 as price, '2021-05-07' as dt
|
||||||
|
|
|
|
||||||
""".stripMargin
|
""".stripMargin
|
||||||
)}
|
)
|
||||||
|
}
|
||||||
// Create table with timestamp type partition
|
// Create table with timestamp type partition
|
||||||
spark.sql(
|
spark.sql(
|
||||||
s"""
|
s"""
|
||||||
@@ -357,7 +358,7 @@ class TestCreateTable extends TestHoodieSqlBase {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
test("Test Create Table From Exist Hoodie Table") {
|
test("Test Create Table From Existing Hoodie Table") {
|
||||||
withTempDir { tmp =>
|
withTempDir { tmp =>
|
||||||
Seq("2021-08-02", "2021/08/02").foreach { partitionValue =>
|
Seq("2021-08-02", "2021/08/02").foreach { partitionValue =>
|
||||||
val tableName = generateTableName
|
val tableName = generateTableName
|
||||||
@@ -377,15 +378,19 @@ class TestCreateTable extends TestHoodieSqlBase {
|
|||||||
.mode(SaveMode.Overwrite)
|
.mode(SaveMode.Overwrite)
|
||||||
.save(tablePath)
|
.save(tablePath)
|
||||||
|
|
||||||
// Create a table over the exist old table.
|
// Create a table over the existing table.
|
||||||
|
// Fail to create table if only specify partition columns, no table schema.
|
||||||
|
checkExceptionContain(
|
||||||
|
s"""
|
||||||
|
|create table $tableName using hudi
|
||||||
|
|partitioned by (dt)
|
||||||
|
|location '$tablePath'
|
||||||
|
|""".stripMargin
|
||||||
|
) ("It is not allowed to specify partition columns when the table schema is not defined.")
|
||||||
|
|
||||||
spark.sql(
|
spark.sql(
|
||||||
s"""
|
s"""
|
||||||
|create table $tableName using hudi
|
|create table $tableName using hudi
|
||||||
|tblproperties (
|
|
||||||
| primaryKey = 'id',
|
|
||||||
| preCombineField = 'ts'
|
|
||||||
|)
|
|
||||||
|partitioned by (dt)
|
|
||||||
|location '$tablePath'
|
|location '$tablePath'
|
||||||
|""".stripMargin)
|
|""".stripMargin)
|
||||||
checkAnswer(s"select id, name, value, ts, dt from $tableName")(
|
checkAnswer(s"select id, name, value, ts, dt from $tableName")(
|
||||||
@@ -434,7 +439,7 @@ class TestCreateTable extends TestHoodieSqlBase {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
test("Test Create Table From Exist Hoodie Table For Multi-Level Partitioned Table") {
|
test("Test Create Table From Existing Hoodie Table For Multi-Level Partitioned Table") {
|
||||||
withTempDir { tmp =>
|
withTempDir { tmp =>
|
||||||
Seq("2021-08-02", "2021/08/02").foreach { day =>
|
Seq("2021-08-02", "2021/08/02").foreach { day =>
|
||||||
val tableName = generateTableName
|
val tableName = generateTableName
|
||||||
@@ -454,15 +459,10 @@ class TestCreateTable extends TestHoodieSqlBase {
|
|||||||
.mode(SaveMode.Overwrite)
|
.mode(SaveMode.Overwrite)
|
||||||
.save(tablePath)
|
.save(tablePath)
|
||||||
|
|
||||||
// Create a table over the exist old table.
|
// Create a table over the existing table.
|
||||||
spark.sql(
|
spark.sql(
|
||||||
s"""
|
s"""
|
||||||
|create table $tableName using hudi
|
|create table $tableName using hudi
|
||||||
|tblproperties (
|
|
||||||
| primaryKey = 'id',
|
|
||||||
| preCombineField = 'ts'
|
|
||||||
|)
|
|
||||||
|partitioned by (day, hh)
|
|
||||||
|location '$tablePath'
|
|location '$tablePath'
|
||||||
|""".stripMargin)
|
|""".stripMargin)
|
||||||
checkAnswer(s"select id, name, value, ts, day, hh from $tableName")(
|
checkAnswer(s"select id, name, value, ts, day, hh from $tableName")(
|
||||||
@@ -511,7 +511,7 @@ class TestCreateTable extends TestHoodieSqlBase {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
test("Test Create Table From Exist Hoodie Table For None Partitioned Table") {
|
test("Test Create Table From Existing Hoodie Table For None Partitioned Table") {
|
||||||
withTempDir{tmp =>
|
withTempDir{tmp =>
|
||||||
// Write a table by spark dataframe.
|
// Write a table by spark dataframe.
|
||||||
val tableName = generateTableName
|
val tableName = generateTableName
|
||||||
@@ -529,14 +529,10 @@ class TestCreateTable extends TestHoodieSqlBase {
|
|||||||
.mode(SaveMode.Overwrite)
|
.mode(SaveMode.Overwrite)
|
||||||
.save(tmp.getCanonicalPath)
|
.save(tmp.getCanonicalPath)
|
||||||
|
|
||||||
// Create a table over the exist old table.
|
// Create a table over the existing table.
|
||||||
spark.sql(
|
spark.sql(
|
||||||
s"""
|
s"""
|
||||||
|create table $tableName using hudi
|
|create table $tableName using hudi
|
||||||
|tblproperties (
|
|
||||||
| primaryKey = 'id',
|
|
||||||
| preCombineField = 'ts'
|
|
||||||
|)
|
|
||||||
|location '${tmp.getCanonicalPath}'
|
|location '${tmp.getCanonicalPath}'
|
||||||
|""".stripMargin)
|
|""".stripMargin)
|
||||||
checkAnswer(s"select id, name, value, ts from $tableName")(
|
checkAnswer(s"select id, name, value, ts from $tableName")(
|
||||||
@@ -583,7 +579,7 @@ class TestCreateTable extends TestHoodieSqlBase {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
test("Test Create Table Exists In Catalog") {
|
test("Test Create Table Existing In Catalog") {
|
||||||
val tableName = generateTableName
|
val tableName = generateTableName
|
||||||
spark.sql(
|
spark.sql(
|
||||||
s"""
|
s"""
|
||||||
@@ -598,7 +594,7 @@ class TestCreateTable extends TestHoodieSqlBase {
|
|||||||
|
|
||||||
spark.sql(s"alter table $tableName add columns(ts bigint)")
|
spark.sql(s"alter table $tableName add columns(ts bigint)")
|
||||||
|
|
||||||
// Check "create table if not exist" works after schema evolution.
|
// Check "create table if not exists" works after schema evolution.
|
||||||
spark.sql(
|
spark.sql(
|
||||||
s"""
|
s"""
|
||||||
|create table if not exists $tableName (
|
|create table if not exists $tableName (
|
||||||
|
|||||||
Reference in New Issue
Block a user