[HUDI-4214] improve repeat init write schema in ExpressionPayload (#5820)
* [HUDI-4214] improve repeat init write schema in ExpressionPayload
This commit is contained in:
@@ -32,7 +32,7 @@ import org.apache.hudi.sql.IExpressionEvaluator
|
|||||||
import org.apache.spark.sql.avro.{AvroSerializer, SchemaConverters}
|
import org.apache.spark.sql.avro.{AvroSerializer, SchemaConverters}
|
||||||
import org.apache.spark.sql.catalyst.expressions.Expression
|
import org.apache.spark.sql.catalyst.expressions.Expression
|
||||||
import org.apache.spark.sql.hudi.SerDeUtils
|
import org.apache.spark.sql.hudi.SerDeUtils
|
||||||
import org.apache.spark.sql.hudi.command.payload.ExpressionPayload.{getEvaluator, getMergedSchema}
|
import org.apache.spark.sql.hudi.command.payload.ExpressionPayload.{getEvaluator, setWriteSchema, getMergedSchema}
|
||||||
import org.apache.spark.sql.types.{StructField, StructType}
|
import org.apache.spark.sql.types.{StructField, StructType}
|
||||||
|
|
||||||
import java.util.concurrent.Callable
|
import java.util.concurrent.Callable
|
||||||
@@ -215,9 +215,7 @@ class ExpressionPayload(record: GenericRecord,
|
|||||||
*/
|
*/
|
||||||
private def initWriteSchemaIfNeed(properties: Properties): Unit = {
|
private def initWriteSchemaIfNeed(properties: Properties): Unit = {
|
||||||
if (writeSchema == null) {
|
if (writeSchema == null) {
|
||||||
ValidationUtils.checkArgument(properties.containsKey(HoodieWriteConfig.WRITE_SCHEMA.key),
|
writeSchema = setWriteSchema(properties)
|
||||||
s"Missing ${HoodieWriteConfig.WRITE_SCHEMA.key}")
|
|
||||||
writeSchema = new Schema.Parser().parse(properties.getProperty(HoodieWriteConfig.WRITE_SCHEMA.key))
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -276,6 +274,18 @@ object ExpressionPayload {
|
|||||||
.maximumSize(1024)
|
.maximumSize(1024)
|
||||||
.build[String, Map[IExpressionEvaluator, IExpressionEvaluator]]()
|
.build[String, Map[IExpressionEvaluator, IExpressionEvaluator]]()
|
||||||
|
|
||||||
|
private val writeSchemaCache = CacheBuilder.newBuilder().build[String, Schema]()
|
||||||
|
|
||||||
|
def setWriteSchema(properties: Properties): Schema = {
|
||||||
|
ValidationUtils.checkArgument(properties.containsKey(HoodieWriteConfig.WRITE_SCHEMA.key),
|
||||||
|
s"Missing ${HoodieWriteConfig.WRITE_SCHEMA.key}")
|
||||||
|
writeSchemaCache.get(properties.getProperty(HoodieWriteConfig.WRITE_SCHEMA.key),
|
||||||
|
new Callable[Schema] {
|
||||||
|
override def call(): Schema =
|
||||||
|
new Schema.Parser().parse(properties.getProperty(HoodieWriteConfig.WRITE_SCHEMA.key))
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Do the CodeGen for each condition and assignment expressions.We will cache it to reduce
|
* Do the CodeGen for each condition and assignment expressions.We will cache it to reduce
|
||||||
* the compile time for each method call.
|
* the compile time for each method call.
|
||||||
|
|||||||
Reference in New Issue
Block a user