[HUDI-2033] ClassCastException Throw When PreCombineField Is String Type (#3099)
This commit is contained in:
@@ -19,11 +19,11 @@ package org.apache.spark.sql.hudi.command.payload
|
|||||||
|
|
||||||
import java.util.{Base64, Properties}
|
import java.util.{Base64, Properties}
|
||||||
import java.util.concurrent.Callable
|
import java.util.concurrent.Callable
|
||||||
|
|
||||||
import scala.collection.JavaConverters._
|
import scala.collection.JavaConverters._
|
||||||
import com.google.common.cache.CacheBuilder
|
import com.google.common.cache.CacheBuilder
|
||||||
import org.apache.avro.Schema
|
import org.apache.avro.Schema
|
||||||
import org.apache.avro.generic.{GenericData, GenericRecord, IndexedRecord}
|
import org.apache.avro.generic.{GenericData, GenericRecord, IndexedRecord}
|
||||||
|
import org.apache.avro.util.Utf8
|
||||||
import org.apache.hudi.DataSourceWriteOptions._
|
import org.apache.hudi.DataSourceWriteOptions._
|
||||||
import org.apache.hudi.avro.HoodieAvroUtils
|
import org.apache.hudi.avro.HoodieAvroUtils
|
||||||
import org.apache.hudi.avro.HoodieAvroUtils.bytesToAvro
|
import org.apache.hudi.avro.HoodieAvroUtils.bytesToAvro
|
||||||
@@ -290,15 +290,15 @@ object ExpressionPayload {
|
|||||||
|
|
||||||
/**
|
/**
|
||||||
* As the "baseEvaluator" return "UTF8String" for the string type which cannot be process by
|
* As the "baseEvaluator" return "UTF8String" for the string type which cannot be process by
|
||||||
* the Avro, The StringConvertEvaluator will convert the "UTF8String" to "String".
|
* the Avro, The StringConvertEvaluator will convert the "UTF8String" to "Utf8".
|
||||||
*/
|
*/
|
||||||
case class StringConvertEvaluator(baseEvaluator: IExpressionEvaluator) extends IExpressionEvaluator {
|
case class StringConvertEvaluator(baseEvaluator: IExpressionEvaluator) extends IExpressionEvaluator {
|
||||||
/**
|
/**
|
||||||
* Convert the UTF8String to String
|
* Convert the UTF8String to Utf8
|
||||||
*/
|
*/
|
||||||
override def eval(record: IndexedRecord): Array[AnyRef] = {
|
override def eval(record: IndexedRecord): Array[AnyRef] = {
|
||||||
baseEvaluator.eval(record).map{
|
baseEvaluator.eval(record).map {
|
||||||
case s: UTF8String => s.toString
|
case s: UTF8String => new Utf8(s.toString)
|
||||||
case o => o
|
case o => o
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -532,4 +532,65 @@ class TestMergeIntoTable extends TestHoodieSqlBase {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
test("Test Different Type of PreCombineField") {
|
||||||
|
withTempDir { tmp =>
|
||||||
|
val typeAndValue = Seq(
|
||||||
|
("string", "'1000'"),
|
||||||
|
("int", 1000),
|
||||||
|
("bigint", 10000),
|
||||||
|
("timestamp", "'2021-05-20 00:00:00'"),
|
||||||
|
("date", "'2021-05-20'")
|
||||||
|
)
|
||||||
|
typeAndValue.foreach { case (dataType, dataValue) =>
|
||||||
|
val tableName = generateTableName
|
||||||
|
// Create table
|
||||||
|
spark.sql(
|
||||||
|
s"""
|
||||||
|
|create table $tableName (
|
||||||
|
| id int,
|
||||||
|
| name string,
|
||||||
|
| price double,
|
||||||
|
| c $dataType
|
||||||
|
|) using hudi
|
||||||
|
| location '${tmp.getCanonicalPath}/$tableName'
|
||||||
|
| options (
|
||||||
|
| primaryKey ='id',
|
||||||
|
| preCombineField = 'c'
|
||||||
|
| )
|
||||||
|
""".stripMargin)
|
||||||
|
|
||||||
|
// First merge with a extra input field 'flag' (insert a new record)
|
||||||
|
spark.sql(
|
||||||
|
s"""
|
||||||
|
| merge into $tableName
|
||||||
|
| using (
|
||||||
|
| select 1 as id, 'a1' as name, 10 as price, $dataValue as c0, '1' as flag
|
||||||
|
| ) s0
|
||||||
|
| on s0.id = $tableName.id
|
||||||
|
| when matched and flag = '1' then update set
|
||||||
|
| id = s0.id, name = s0.name, price = s0.price, c = s0.c0
|
||||||
|
| when not matched and flag = '1' then insert *
|
||||||
|
""".stripMargin)
|
||||||
|
checkAnswer(s"select id, name, price from $tableName")(
|
||||||
|
Seq(1, "a1", 10.0)
|
||||||
|
)
|
||||||
|
|
||||||
|
spark.sql(
|
||||||
|
s"""
|
||||||
|
| merge into $tableName
|
||||||
|
| using (
|
||||||
|
| select 1 as id, 'a1' as name, 10 as price, $dataValue as c
|
||||||
|
| ) s0
|
||||||
|
| on s0.id = $tableName.id
|
||||||
|
| when matched then update set
|
||||||
|
| id = s0.id, name = s0.name, price = s0.price + $tableName.price, c = s0.c
|
||||||
|
| when not matched then insert *
|
||||||
|
""".stripMargin)
|
||||||
|
checkAnswer(s"select id, name, price from $tableName")(
|
||||||
|
Seq(1, "a1", 20.0)
|
||||||
|
)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user