1
0

[HUDI-1064]Trim hoodie table name (#1805)

This commit is contained in:
mabin001
2020-07-07 19:10:16 +08:00
committed by GitHub
parent be85a6c32b
commit 8c4ff185f1

View File

@@ -52,10 +52,11 @@ private[hudi] object HoodieSparkSqlWriter {
val sparkContext = sqlContext.sparkContext val sparkContext = sqlContext.sparkContext
val path = parameters.get("path") val path = parameters.get("path")
val tblName = parameters.get(HoodieWriteConfig.TABLE_NAME) val tblNameOp = parameters.get(HoodieWriteConfig.TABLE_NAME)
if (path.isEmpty || tblName.isEmpty) { if (path.isEmpty || tblNameOp.isEmpty) {
throw new HoodieException(s"'${HoodieWriteConfig.TABLE_NAME}', 'path' must be set.") throw new HoodieException(s"'${HoodieWriteConfig.TABLE_NAME}', 'path' must be set.")
} }
val tblName = tblNameOp.get.trim
sparkContext.getConf.getOption("spark.serializer") match { sparkContext.getConf.getOption("spark.serializer") match {
case Some(ser) if ser.equals("org.apache.spark.serializer.KryoSerializer") => case Some(ser) if ser.equals("org.apache.spark.serializer.KryoSerializer") =>
case _ => throw new HoodieException("hoodie only support org.apache.spark.serializer.KryoSerializer as spark.serializer") case _ => throw new HoodieException("hoodie only support org.apache.spark.serializer.KryoSerializer as spark.serializer")
@@ -85,7 +86,7 @@ private[hudi] object HoodieSparkSqlWriter {
if (exists && mode == SaveMode.Append) { if (exists && mode == SaveMode.Append) {
val existingTableName = new HoodieTableMetaClient(sparkContext.hadoopConfiguration, path.get).getTableConfig.getTableName val existingTableName = new HoodieTableMetaClient(sparkContext.hadoopConfiguration, path.get).getTableConfig.getTableName
if (!existingTableName.equals(tblName.get)) { if (!existingTableName.equals(tblName)) {
throw new HoodieException(s"hoodie table with name $existingTableName already exist at $basePath") throw new HoodieException(s"hoodie table with name $existingTableName already exist at $basePath")
} }
} }
@@ -93,8 +94,8 @@ private[hudi] object HoodieSparkSqlWriter {
val (writeStatuses, writeClient: HoodieWriteClient[HoodieRecordPayload[Nothing]]) = val (writeStatuses, writeClient: HoodieWriteClient[HoodieRecordPayload[Nothing]]) =
if (!operation.equalsIgnoreCase(DELETE_OPERATION_OPT_VAL)) { if (!operation.equalsIgnoreCase(DELETE_OPERATION_OPT_VAL)) {
// register classes & schemas // register classes & schemas
val structName = s"${tblName.get}_record" val structName = s"${tblName}_record"
val nameSpace = s"hoodie.${tblName.get}" val nameSpace = s"hoodie.${tblName}"
sparkContext.getConf.registerKryoClasses( sparkContext.getConf.registerKryoClasses(
Array(classOf[org.apache.avro.generic.GenericData], Array(classOf[org.apache.avro.generic.GenericData],
classOf[org.apache.avro.Schema])) classOf[org.apache.avro.Schema]))
@@ -129,11 +130,11 @@ private[hudi] object HoodieSparkSqlWriter {
// Create the table if not present // Create the table if not present
if (!exists) { if (!exists) {
HoodieTableMetaClient.initTableType(sparkContext.hadoopConfiguration, path.get, tableType, HoodieTableMetaClient.initTableType(sparkContext.hadoopConfiguration, path.get, tableType,
tblName.get, "archived", parameters(PAYLOAD_CLASS_OPT_KEY)) tblName, "archived", parameters(PAYLOAD_CLASS_OPT_KEY))
} }
// Create a HoodieWriteClient & issue the write. // Create a HoodieWriteClient & issue the write.
val client = DataSourceUtils.createHoodieClient(jsc, schema.toString, path.get, tblName.get, val client = DataSourceUtils.createHoodieClient(jsc, schema.toString, path.get, tblName,
mapAsJavaMap(parameters) mapAsJavaMap(parameters)
) )
@@ -158,8 +159,8 @@ private[hudi] object HoodieSparkSqlWriter {
throw new HoodieException(s"Append is the only save mode applicable for $operation operation") throw new HoodieException(s"Append is the only save mode applicable for $operation operation")
} }
val structName = s"${tblName.get}_record" val structName = s"${tblName}_record"
val nameSpace = s"hoodie.${tblName.get}" val nameSpace = s"hoodie.${tblName}"
sparkContext.getConf.registerKryoClasses( sparkContext.getConf.registerKryoClasses(
Array(classOf[org.apache.avro.generic.GenericData], Array(classOf[org.apache.avro.generic.GenericData],
classOf[org.apache.avro.Schema])) classOf[org.apache.avro.Schema]))
@@ -175,7 +176,7 @@ private[hudi] object HoodieSparkSqlWriter {
// Create a HoodieWriteClient & issue the delete. // Create a HoodieWriteClient & issue the delete.
val client = DataSourceUtils.createHoodieClient(jsc, val client = DataSourceUtils.createHoodieClient(jsc,
Schema.create(Schema.Type.NULL).toString, path.get, tblName.get, Schema.create(Schema.Type.NULL).toString, path.get, tblName,
mapAsJavaMap(parameters) mapAsJavaMap(parameters)
) )