1
0

[HUDI-4217] improve repeat init object in ExpressionPayload (#5825)

This commit is contained in:
KnightChess
2022-06-15 20:21:28 +08:00
committed by GitHub
parent c291b05699
commit 2bf0a1906d
2 changed files with 65 additions and 21 deletions

View File

@@ -32,7 +32,7 @@ import org.apache.hudi.sql.IExpressionEvaluator
import org.apache.spark.sql.avro.{AvroSerializer, SchemaConverters}
import org.apache.spark.sql.catalyst.expressions.Expression
import org.apache.spark.sql.hudi.SerDeUtils
import org.apache.spark.sql.hudi.command.payload.ExpressionPayload.getEvaluator
import org.apache.spark.sql.hudi.command.payload.ExpressionPayload.{getEvaluator, getMergedSchema}
import org.apache.spark.sql.types.{StructField, StructType}
import java.util.concurrent.Callable
@@ -228,9 +228,7 @@ class ExpressionPayload(record: GenericRecord,
*/
private def joinRecord(sourceRecord: IndexedRecord, targetRecord: IndexedRecord): IndexedRecord = {
val leftSchema = sourceRecord.getSchema
// the targetRecord is load from the disk, it contains the meta fields, so we remove it here
val rightSchema = HoodieAvroUtils.removeMetadataFields(targetRecord.getSchema)
val joinSchema = mergeSchema(leftSchema, rightSchema)
val joinSchema = getMergedSchema(leftSchema, targetRecord.getSchema)
val values = new ArrayBuffer[AnyRef]()
for (i <- 0 until joinSchema.getFields.size()) {
@@ -244,17 +242,6 @@ class ExpressionPayload(record: GenericRecord,
convertToRecord(values.toArray, joinSchema)
}
private def mergeSchema(a: Schema, b: Schema): Schema = {
val mergedFields =
a.getFields.asScala.map(field =>
new Schema.Field("a_" + field.name,
field.schema, field.doc, field.defaultVal, field.order)) ++
b.getFields.asScala.map(field =>
new Schema.Field("b_" + field.name,
field.schema, field.doc, field.defaultVal, field.order))
Schema.createRecord(a.getName, a.getDoc, a.getNamespace, a.isError, mergedFields.asJava)
}
private def evaluate(evaluator: IExpressionEvaluator, sqlTypedRecord: SqlTypedRecord): GenericRecord = {
try evaluator.eval(sqlTypedRecord) catch {
case e: Throwable =>
@@ -318,5 +305,30 @@ object ExpressionPayload {
}
})
}
private val mergedSchemaCache = CacheBuilder.newBuilder().build[TupleSchema, Schema]()
def getMergedSchema(source: Schema, target: Schema): Schema = {
mergedSchemaCache.get(TupleSchema(source, target), new Callable[Schema] {
override def call(): Schema = {
val rightSchema = HoodieAvroUtils.removeMetadataFields(target)
mergeSchema(source, rightSchema)
}
})
}
def mergeSchema(a: Schema, b: Schema): Schema = {
val mergedFields =
a.getFields.asScala.map(field =>
new Schema.Field("a_" + field.name,
field.schema, field.doc, field.defaultVal, field.order)) ++
b.getFields.asScala.map(field =>
new Schema.Field("b_" + field.name,
field.schema, field.doc, field.defaultVal, field.order))
Schema.createRecord(a.getName, a.getDoc, a.getNamespace, a.isError, mergedFields.asJava)
}
case class TupleSchema(first: Schema, second: Schema)
}