[HUDI-2251] Fix Exception Cause By Table Name Case Sensitivity For Append Mode Write (#3367)
This commit is contained in:
@@ -113,7 +113,7 @@ object HoodieSparkSqlWriter {
|
|||||||
(false, common.util.Option.empty(), common.util.Option.empty(), common.util.Option.empty(), hoodieWriteClient.orNull, tableConfig)
|
(false, common.util.Option.empty(), common.util.Option.empty(), common.util.Option.empty(), hoodieWriteClient.orNull, tableConfig)
|
||||||
} else {
|
} else {
|
||||||
// Handle various save modes
|
// Handle various save modes
|
||||||
handleSaveModes(mode, basePath, tableConfig, tblName, operation, fs)
|
handleSaveModes(sqlContext.sparkSession, mode, basePath, tableConfig, tblName, operation, fs)
|
||||||
// Create the table if not present
|
// Create the table if not present
|
||||||
if (!tableExists) {
|
if (!tableExists) {
|
||||||
val baseFileFormat = hoodieConfig.getStringOrDefault(HoodieTableConfig.HOODIE_BASE_FILE_FORMAT_PROP)
|
val baseFileFormat = hoodieConfig.getStringOrDefault(HoodieTableConfig.HOODIE_BASE_FILE_FORMAT_PROP)
|
||||||
@@ -277,7 +277,7 @@ object HoodieSparkSqlWriter {
|
|||||||
log.warn(s"hoodie table at $basePath already exists. Ignoring & not performing actual writes.")
|
log.warn(s"hoodie table at $basePath already exists. Ignoring & not performing actual writes.")
|
||||||
false
|
false
|
||||||
} else {
|
} else {
|
||||||
handleSaveModes(mode, basePath, tableConfig, tableName, WriteOperationType.BOOTSTRAP, fs)
|
handleSaveModes(sqlContext.sparkSession, mode, basePath, tableConfig, tableName, WriteOperationType.BOOTSTRAP, fs)
|
||||||
}
|
}
|
||||||
|
|
||||||
if (!tableExists) {
|
if (!tableExists) {
|
||||||
@@ -392,12 +392,14 @@ object HoodieSparkSqlWriter {
|
|||||||
props
|
props
|
||||||
}
|
}
|
||||||
|
|
||||||
private def handleSaveModes(mode: SaveMode, tablePath: Path, tableConfig: HoodieTableConfig, tableName: String,
|
private def handleSaveModes(spark: SparkSession, mode: SaveMode, tablePath: Path, tableConfig: HoodieTableConfig, tableName: String,
|
||||||
operation: WriteOperationType, fs: FileSystem): Unit = {
|
operation: WriteOperationType, fs: FileSystem): Unit = {
|
||||||
if (mode == SaveMode.Append && tableExists) {
|
if (mode == SaveMode.Append && tableExists) {
|
||||||
val existingTableName = tableConfig.getTableName
|
val existingTableName = tableConfig.getTableName
|
||||||
if (!existingTableName.equals(tableName)) {
|
val resolver = spark.sessionState.conf.resolver
|
||||||
throw new HoodieException(s"hoodie table with name $existingTableName already exists at $tablePath")
|
if (!resolver(existingTableName, tableName)) {
|
||||||
|
throw new HoodieException(s"hoodie table with name $existingTableName already exists at $tablePath," +
|
||||||
|
s" can not append data to the table with another name $tableName.")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -17,6 +17,8 @@
|
|||||||
|
|
||||||
package org.apache.spark.sql.hudi
|
package org.apache.spark.sql.hudi
|
||||||
|
|
||||||
|
import org.apache.hudi.common.model.HoodieTableType
|
||||||
|
import org.apache.hudi.common.table.HoodieTableMetaClient
|
||||||
import org.apache.hudi.exception.HoodieDuplicateKeyException
|
import org.apache.hudi.exception.HoodieDuplicateKeyException
|
||||||
|
|
||||||
class TestInsertTable extends TestHoodieSqlBase {
|
class TestInsertTable extends TestHoodieSqlBase {
|
||||||
@@ -255,6 +257,31 @@ class TestInsertTable extends TestHoodieSqlBase {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
test("Test insert for uppercase table name") {
|
||||||
|
withTempDir{ tmp =>
|
||||||
|
val tableName = s"H_$generateTableName"
|
||||||
|
HoodieTableMetaClient.withPropertyBuilder()
|
||||||
|
.setTableName(tableName)
|
||||||
|
.setTableType(HoodieTableType.COPY_ON_WRITE.name())
|
||||||
|
.initTable(spark.sessionState.newHadoopConf(), tmp.getCanonicalPath)
|
||||||
|
|
||||||
|
spark.sql(
|
||||||
|
s"""
|
||||||
|
|create table $tableName (
|
||||||
|
| id int,
|
||||||
|
| name string,
|
||||||
|
| price double
|
||||||
|
|) using hudi
|
||||||
|
| location '${tmp.getCanonicalPath}'
|
||||||
|
""".stripMargin)
|
||||||
|
|
||||||
|
spark.sql(s"insert into $tableName values(1, 'a1', 10)")
|
||||||
|
checkAnswer(s"select id, name, price from $tableName")(
|
||||||
|
Seq(1, "a1", 10.0)
|
||||||
|
)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
test("Test Insert Exception") {
|
test("Test Insert Exception") {
|
||||||
val tableName = generateTableName
|
val tableName = generateTableName
|
||||||
spark.sql(
|
spark.sql(
|
||||||
|
|||||||
Reference in New Issue
Block a user