[HUDI-852] adding check for table name for Append Save mode (#1580)
* adding check for table name for Append Save mode * adding existing table validation for delete and upsert operation Co-authored-by: Aakash Pradeep <apradeep@twilio.com>
This commit is contained in:
@@ -25,11 +25,11 @@ import org.apache.hadoop.fs.{FileSystem, Path}
|
||||
import org.apache.hadoop.hive.conf.HiveConf
|
||||
import org.apache.hudi.DataSourceWriteOptions._
|
||||
import org.apache.hudi.client.{HoodieWriteClient, WriteStatus}
|
||||
import org.apache.hudi.common.config.TypedProperties
|
||||
import org.apache.hudi.common.fs.FSUtils
|
||||
import org.apache.hudi.common.model.HoodieRecordPayload
|
||||
import org.apache.hudi.common.table.HoodieTableMetaClient
|
||||
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline
|
||||
import org.apache.hudi.common.config.TypedProperties
|
||||
import org.apache.hudi.config.HoodieWriteConfig
|
||||
import org.apache.hudi.exception.HoodieException
|
||||
import org.apache.hudi.hive.{HiveSyncConfig, HiveSyncTool}
|
||||
@@ -83,6 +83,13 @@ private[hudi] object HoodieSparkSqlWriter {
|
||||
val fs = basePath.getFileSystem(sparkContext.hadoopConfiguration)
|
||||
var exists = fs.exists(new Path(basePath, HoodieTableMetaClient.METAFOLDER_NAME))
|
||||
|
||||
if (exists && mode == SaveMode.Append) {
|
||||
val existingTableName = new HoodieTableMetaClient(sparkContext.hadoopConfiguration, path.get).getTableConfig.getTableName
|
||||
if (!existingTableName.equals(tblName.get)) {
|
||||
throw new HoodieException(s"hoodie table with name $existingTableName already exist at $basePath")
|
||||
}
|
||||
}
|
||||
|
||||
val (writeStatuses, writeClient: HoodieWriteClient[HoodieRecordPayload[Nothing]]) =
|
||||
if (!operation.equalsIgnoreCase(DELETE_OPERATION_OPT_VAL)) {
|
||||
// register classes & schemas
|
||||
|
||||
@@ -17,6 +17,9 @@
|
||||
|
||||
package org.apache.hudi
|
||||
|
||||
import java.util.{Date, UUID}
|
||||
|
||||
import org.apache.commons.io.FileUtils
|
||||
import org.apache.hudi.DataSourceWriteOptions._
|
||||
import org.apache.hudi.config.HoodieWriteConfig
|
||||
import org.apache.hudi.exception.HoodieException
|
||||
@@ -43,10 +46,59 @@ class HoodieSparkSqlWriterSuite extends FunSuite with Matchers {
|
||||
|
||||
test("throw hoodie exception when invalid serializer") {
|
||||
val session = SparkSession.builder().appName("hoodie_test").master("local").getOrCreate()
|
||||
val sqlContext = session.sqlContext
|
||||
val options = Map("path" -> "hoodie/test/path", HoodieWriteConfig.TABLE_NAME -> "hoodie_test_tbl")
|
||||
val e = intercept[HoodieException](HoodieSparkSqlWriter.write(sqlContext, SaveMode.ErrorIfExists, options, session.emptyDataFrame))
|
||||
assert(e.getMessage.contains("spark.serializer"))
|
||||
try {
|
||||
val sqlContext = session.sqlContext
|
||||
val options = Map("path" -> "hoodie/test/path", HoodieWriteConfig.TABLE_NAME -> "hoodie_test_tbl")
|
||||
val e = intercept[HoodieException](HoodieSparkSqlWriter.write(sqlContext, SaveMode.ErrorIfExists, options, session.emptyDataFrame))
|
||||
assert(e.getMessage.contains("spark.serializer"))
|
||||
} finally {
|
||||
session.stop()
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
test("throw hoodie exception when there already exist a table with different name with Append Save mode") {
|
||||
|
||||
val session = SparkSession.builder()
|
||||
.appName("test_append_mode")
|
||||
.master("local[2]")
|
||||
.config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
|
||||
.getOrCreate()
|
||||
val path = java.nio.file.Files.createTempDirectory("hoodie_test_path")
|
||||
try {
|
||||
|
||||
val sqlContext = session.sqlContext
|
||||
val hoodieFooTableName = "hoodie_foo_tbl"
|
||||
|
||||
//create a new table
|
||||
val fooTableModifier = Map("path" -> path.toAbsolutePath.toString,
|
||||
HoodieWriteConfig.TABLE_NAME -> hoodieFooTableName,
|
||||
"hoodie.insert.shuffle.parallelism" -> "4",
|
||||
"hoodie.upsert.shuffle.parallelism" -> "4")
|
||||
val fooTableParams = HoodieSparkSqlWriter.parametersWithWriteDefaults(fooTableModifier)
|
||||
val dataFrame = session.createDataFrame(Seq(Test(UUID.randomUUID().toString, new Date().getTime)))
|
||||
HoodieSparkSqlWriter.write(sqlContext, SaveMode.Append, fooTableParams, dataFrame)
|
||||
|
||||
//on same path try append with different("hoodie_bar_tbl") table name which should throw an exception
|
||||
val barTableModifier = Map("path" -> path.toAbsolutePath.toString,
|
||||
HoodieWriteConfig.TABLE_NAME -> "hoodie_bar_tbl",
|
||||
"hoodie.insert.shuffle.parallelism" -> "4",
|
||||
"hoodie.upsert.shuffle.parallelism" -> "4")
|
||||
val barTableParams = HoodieSparkSqlWriter.parametersWithWriteDefaults(barTableModifier)
|
||||
val dataFrame2 = session.createDataFrame(Seq(Test(UUID.randomUUID().toString, new Date().getTime)))
|
||||
val tableAlreadyExistException = intercept[HoodieException](HoodieSparkSqlWriter.write(sqlContext, SaveMode.Append, barTableParams, dataFrame2))
|
||||
assert(tableAlreadyExistException.getMessage.contains("hoodie table with name " + hoodieFooTableName + " already exist"))
|
||||
|
||||
//on same path try append with delete operation and different("hoodie_bar_tbl") table name which should throw an exception
|
||||
val deleteTableParams = barTableParams ++ Map(OPERATION_OPT_KEY -> "delete")
|
||||
val deleteCmdException = intercept[HoodieException](HoodieSparkSqlWriter.write(sqlContext, SaveMode.Append, deleteTableParams, dataFrame2))
|
||||
assert(deleteCmdException.getMessage.contains("hoodie table with name " + hoodieFooTableName + " already exist"))
|
||||
} finally {
|
||||
session.stop()
|
||||
FileUtils.deleteDirectory(path.toFile)
|
||||
}
|
||||
}
|
||||
|
||||
case class Test(uuid: String, ts: Long)
|
||||
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user