1
0

[HUDI-3549] Removing dependency on "spark-avro" (#4955)

Hudi will be taking on promise for it bundles to stay compatible with Spark minor versions (for ex 2.4, 3.1, 3.2): meaning that single build of Hudi (for ex "hudi-spark3.2-bundle") will be compatible with ALL patch versions in that minor branch (in that case 3.2.1, 3.2.0, etc)

To achieve that we'll have to remove (and ban) "spark-avro" as a dependency, which on a few occasions was the root-cause of incompatibility b/w consecutive Spark patch versions (most recently 3.2.1 and 3.2.0, due to this PR).

Instead of bundling "spark-avro" as dependency, we will be copying over some of the classes Hudi depends on and maintain them along the Hudi code-base to make sure we're able to provide for the aforementioned guarantee. To workaround arising compatibility issues we will be applying local patches to guarantee compatibility of Hudi bundles w/in the Spark minor version branches.

Following Hudi modules to Spark minor branches is currently maintained:

"hudi-spark3" -> 3.2.x
"hudi-spark3.1.x" -> 3.1.x
"hudi-spark2" -> 2.4.x
Following classes hierarchies (borrowed from "spark-avro") are maintained w/in these Spark-specific modules to guarantee compatibility with respective minor version branches:

AvroSerializer
AvroDeserializer
AvroUtils
Each of these classes has been correspondingly copied from Spark 3.2.1 (for 3.2.x branch), 3.1.2 (for 3.1.x branch), 2.4.4 (for 2.4.x branch) into their respective modules.

SchemaConverters class in turn is shared across all those modules given its relative stability (there're only cosmetical changes from 2.4.4 to 3.2.1).
All of the aforementioned classes have their corresponding scope of visibility limited to corresponding packages (org.apache.spark.sql.avro, org.apache.spark.sql) to make sure broader code-base does not become dependent on them and instead relies on facades abstracting them.

Additionally, given that Hudi plans on supporting all the patch versions of Spark w/in aforementioned minor versions branches of Spark, additional build steps were added to validate that Hudi could be properly compiled against those versions. Testing, however, is performed against the most recent patch versions of Spark with the help of Azure CI.

Brief change log:
- Removing spark-avro bundling from Hudi by default
- Scaffolded Spark 3.2.x hierarchy
- Bootstrapped Spark 3.1.x Avro serializer/deserializer hierarchy
- Bootstrapped Spark 2.4.x Avro serializer/deserializer hierarchy
- Moved ExpressionCodeGen,ExpressionPayload into hudi-spark module
- Fixed AvroDeserializer to stay compatible w/ both Spark 3.2.1 and 3.2.0
- Modified bot.yml to build full matrix of support Spark versions
- Removed "spark-avro" dependency from all modules
- Fixed relocation of spark-avro classes in bundles to assist in running integ-tests.
This commit is contained in:
Alexey Kudinkin
2022-03-29 11:44:47 -07:00
committed by GitHub
parent 0802510ca9
commit e5a2baeed0
54 changed files with 2665 additions and 278 deletions

View File

@@ -70,7 +70,7 @@ abstract class HoodieBaseRelation(val sqlContext: SQLContext,
val metaClient: HoodieTableMetaClient,
val optParams: Map[String, String],
userSchema: Option[StructType])
extends BaseRelation with PrunedFilteredScan with Logging {
extends BaseRelation with PrunedFilteredScan with Logging with SparkAdapterSupport {
type FileSplit <: HoodieFileSplit
@@ -120,7 +120,7 @@ abstract class HoodieBaseRelation(val sqlContext: SQLContext,
// If there is no commit in the table, we can't get the schema
// t/h [[TableSchemaResolver]], fallback to the provided [[userSchema]] instead.
userSchema match {
case Some(s) => SchemaConverters.toAvroType(s)
case Some(s) => sparkAdapter.getAvroSchemaConverters.toAvroType(s, nullable = false, "record")
case _ => throw new IllegalArgumentException("User-provided schema is required in case the table is empty")
}
)

View File

@@ -0,0 +1,39 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.avro
import org.apache.avro.Schema
import org.apache.spark.sql.avro.SchemaConverters.SchemaType
import org.apache.spark.sql.types.DataType
/**
* This interface is simply a facade abstracting away Spark's [[SchemaConverters]] implementation, allowing
* the rest of the code-base to not depend on it directly
*/
object HoodieSparkAvroSchemaConverters extends HoodieAvroSchemaConverters {
override def toSqlType(avroSchema: Schema): (DataType, Boolean) =
SchemaConverters.toSqlType(avroSchema) match {
case SchemaType(dataType, nullable) => (dataType, nullable)
}
override def toAvroType(catalystType: DataType, nullable: Boolean, recordName: String, nameSpace: String): Schema =
SchemaConverters.toAvroType(catalystType, nullable, recordName, nameSpace)
}

View File

@@ -1,29 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.avro
import org.apache.avro.Schema
import org.apache.spark.sql.types.DataType
class HoodieSparkAvroSerializer(rootCatalystType: DataType, rootAvroType: Schema, nullable: Boolean)
extends HoodieAvroSerializer {
val avroSerializer = new AvroSerializer(rootCatalystType, rootAvroType, nullable)
override def serialize(catalystData: Any): Any = avroSerializer.serialize(catalystData)
}

View File

@@ -0,0 +1,211 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.avro
import org.apache.avro.LogicalTypes.{Date, Decimal, TimestampMicros, TimestampMillis}
import org.apache.avro.Schema.Type._
import org.apache.avro.{LogicalTypes, Schema, SchemaBuilder}
import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.sql.types.Decimal.minBytesForPrecision
import org.apache.spark.sql.types._
import scala.collection.JavaConverters._
/**
* This object contains method that are used to convert sparkSQL schemas to avro schemas and vice
* versa.
*
* NOTE: This code is borrowed from Spark 3.2.1
* This code is borrowed, so that we can better control compatibility w/in Spark minor
* branches (3.2.x, 3.1.x, etc)
*
* PLEASE REFRAIN MAKING ANY CHANGES TO THIS CODE UNLESS ABSOLUTELY NECESSARY
*/
@DeveloperApi
private[sql] object SchemaConverters {
private lazy val nullSchema = Schema.create(Schema.Type.NULL)
/**
* Internal wrapper for SQL data type and nullability.
*
* @since 2.4.0
*/
case class SchemaType(dataType: DataType, nullable: Boolean)
/**
* Converts an Avro schema to a corresponding Spark SQL schema.
*
* @since 2.4.0
*/
def toSqlType(avroSchema: Schema): SchemaType = {
toSqlTypeHelper(avroSchema, Set.empty)
}
private def toSqlTypeHelper(avroSchema: Schema, existingRecordNames: Set[String]): SchemaType = {
avroSchema.getType match {
case INT => avroSchema.getLogicalType match {
case _: Date => SchemaType(DateType, nullable = false)
case _ => SchemaType(IntegerType, nullable = false)
}
case STRING => SchemaType(StringType, nullable = false)
case BOOLEAN => SchemaType(BooleanType, nullable = false)
case BYTES | FIXED => avroSchema.getLogicalType match {
// For FIXED type, if the precision requires more bytes than fixed size, the logical
// type will be null, which is handled by Avro library.
case d: Decimal => SchemaType(DecimalType(d.getPrecision, d.getScale), nullable = false)
case _ => SchemaType(BinaryType, nullable = false)
}
case DOUBLE => SchemaType(DoubleType, nullable = false)
case FLOAT => SchemaType(FloatType, nullable = false)
case LONG => avroSchema.getLogicalType match {
case _: TimestampMillis | _: TimestampMicros => SchemaType(TimestampType, nullable = false)
case _ => SchemaType(LongType, nullable = false)
}
case ENUM => SchemaType(StringType, nullable = false)
case NULL => SchemaType(NullType, nullable = true)
case RECORD =>
if (existingRecordNames.contains(avroSchema.getFullName)) {
throw new IncompatibleSchemaException(
s"""
|Found recursive reference in Avro schema, which can not be processed by Spark:
|${avroSchema.toString(true)}
""".stripMargin)
}
val newRecordNames = existingRecordNames + avroSchema.getFullName
val fields = avroSchema.getFields.asScala.map { f =>
val schemaType = toSqlTypeHelper(f.schema(), newRecordNames)
StructField(f.name, schemaType.dataType, schemaType.nullable)
}
SchemaType(StructType(fields.toSeq), nullable = false)
case ARRAY =>
val schemaType = toSqlTypeHelper(avroSchema.getElementType, existingRecordNames)
SchemaType(
ArrayType(schemaType.dataType, containsNull = schemaType.nullable),
nullable = false)
case MAP =>
val schemaType = toSqlTypeHelper(avroSchema.getValueType, existingRecordNames)
SchemaType(
MapType(StringType, schemaType.dataType, valueContainsNull = schemaType.nullable),
nullable = false)
case UNION =>
if (avroSchema.getTypes.asScala.exists(_.getType == NULL)) {
// In case of a union with null, eliminate it and make a recursive call
val remainingUnionTypes = avroSchema.getTypes.asScala.filterNot(_.getType == NULL)
if (remainingUnionTypes.size == 1) {
toSqlTypeHelper(remainingUnionTypes.head, existingRecordNames).copy(nullable = true)
} else {
toSqlTypeHelper(Schema.createUnion(remainingUnionTypes.asJava), existingRecordNames)
.copy(nullable = true)
}
} else avroSchema.getTypes.asScala.map(_.getType).toSeq match {
case Seq(t1) =>
toSqlTypeHelper(avroSchema.getTypes.get(0), existingRecordNames)
case Seq(t1, t2) if Set(t1, t2) == Set(INT, LONG) =>
SchemaType(LongType, nullable = false)
case Seq(t1, t2) if Set(t1, t2) == Set(FLOAT, DOUBLE) =>
SchemaType(DoubleType, nullable = false)
case _ =>
// Convert complex unions to struct types where field names are member0, member1, etc.
// This is consistent with the behavior when converting between Avro and Parquet.
val fields = avroSchema.getTypes.asScala.zipWithIndex.map {
case (s, i) =>
val schemaType = toSqlTypeHelper(s, existingRecordNames)
// All fields are nullable because only one of them is set at a time
StructField(s"member$i", schemaType.dataType, nullable = true)
}
SchemaType(StructType(fields.toSeq), nullable = false)
}
case other => throw new IncompatibleSchemaException(s"Unsupported type $other")
}
}
/**
* Converts a Spark SQL schema to a corresponding Avro schema.
*
* @since 2.4.0
*/
def toAvroType(catalystType: DataType,
nullable: Boolean = false,
recordName: String = "topLevelRecord",
nameSpace: String = ""): Schema = {
val builder = SchemaBuilder.builder()
val schema = catalystType match {
case BooleanType => builder.booleanType()
case ByteType | ShortType | IntegerType => builder.intType()
case LongType => builder.longType()
case DateType =>
LogicalTypes.date().addToSchema(builder.intType())
case TimestampType =>
LogicalTypes.timestampMicros().addToSchema(builder.longType())
case FloatType => builder.floatType()
case DoubleType => builder.doubleType()
case StringType => builder.stringType()
case NullType => builder.nullType()
case d: DecimalType =>
val avroType = LogicalTypes.decimal(d.precision, d.scale)
val fixedSize = minBytesForPrecision(d.precision)
// Need to avoid naming conflict for the fixed fields
val name = nameSpace match {
case "" => s"$recordName.fixed"
case _ => s"$nameSpace.$recordName.fixed"
}
avroType.addToSchema(SchemaBuilder.fixed(name).size(fixedSize))
case BinaryType => builder.bytesType()
case ArrayType(et, containsNull) =>
builder.array()
.items(toAvroType(et, containsNull, recordName, nameSpace))
case MapType(StringType, vt, valueContainsNull) =>
builder.map()
.values(toAvroType(vt, valueContainsNull, recordName, nameSpace))
case st: StructType =>
val childNameSpace = if (nameSpace != "") s"$nameSpace.$recordName" else recordName
val fieldsAssembler = builder.record(recordName).namespace(nameSpace).fields()
st.foreach { f =>
val fieldAvroType =
toAvroType(f.dataType, f.nullable, f.name, childNameSpace)
fieldsAssembler.name(f.name).`type`(fieldAvroType).noDefault()
}
fieldsAssembler.endRecord()
// This should never happen.
case other => throw new IncompatibleSchemaException(s"Unexpected type $other.")
}
if (nullable && catalystType != NullType) {
Schema.createUnion(schema, nullSchema)
} else {
schema
}
}
}
private[avro] class IncompatibleSchemaException(msg: String, ex: Throwable = null) extends Exception(msg, ex)
private[avro] class UnsupportedAvroTypeException(msg: String) extends Exception(msg)

View File

@@ -1,192 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.hudi.command.payload
import org.apache.avro.generic.{GenericRecord, IndexedRecord}
import org.apache.hudi.sql.IExpressionEvaluator
import org.apache.spark.executor.InputMetrics
import org.apache.spark.internal.Logging
import org.apache.spark.sql.avro.AvroSerializer
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.codegen.Block.BlockHelper
import org.apache.spark.sql.catalyst.expressions.codegen._
import org.apache.spark.sql.catalyst.expressions.{BoundReference, Expression, GenericInternalRow, LeafExpression, UnsafeArrayData, UnsafeMapData, UnsafeRow}
import org.apache.spark.sql.catalyst.util.{ArrayData, MapData}
import org.apache.spark.sql.hudi.command.payload.ExpressionCodeGen.RECORD_NAME
import org.apache.spark.sql.types.{DataType, Decimal}
import org.apache.spark.unsafe.Platform
import org.apache.spark.unsafe.types.{CalendarInterval, UTF8String}
import org.apache.spark.util.ParentClassLoader
import org.apache.spark.{TaskContext, TaskKilledException}
import org.codehaus.commons.compiler.CompileException
import org.codehaus.janino.{ClassBodyEvaluator, InternalCompilerException}
import java.util.UUID
/**
* Do CodeGen for expression based on IndexedRecord.
* The mainly difference with the spark's CodeGen for expression is that
* the expression's input is a IndexedRecord but not a Row.
*
*/
object ExpressionCodeGen extends Logging {
val RECORD_NAME = "record"
/**
* CodeGen for expressions.
* @param exprs The expression list to CodeGen.
* @return An IExpressionEvaluator generate by CodeGen which take a IndexedRecord as input
* param and return a Array of results for each expression.
*/
def doCodeGen(exprs: Seq[Expression], serializer: AvroSerializer): IExpressionEvaluator = {
val ctx = new CodegenContext()
// Set the input_row to null as we do not use row as the input object but Record.
ctx.INPUT_ROW = null
val replacedExprs = exprs.map(replaceBoundReference)
val resultVars = replacedExprs.map(_.genCode(ctx))
val className = s"ExpressionPayloadEvaluator_${UUID.randomUUID().toString.replace("-", "_")}"
val codeBody =
s"""
|private Object[] references;
|private String code;
|private AvroSerializer serializer;
|
|public $className(Object references, String code, AvroSerializer serializer) {
| this.references = (Object[])references;
| this.code = code;
| this.serializer = serializer;
|}
|
|public GenericRecord eval(IndexedRecord $RECORD_NAME) {
| ${resultVars.map(_.code).mkString("\n")}
| Object[] results = new Object[${resultVars.length}];
| ${
(for (i <- resultVars.indices) yield {
s"""
|if (${resultVars(i).isNull}) {
| results[$i] = null;
|} else {
| results[$i] = ${resultVars(i).value.code};
|}
""".stripMargin
}).mkString("\n")
}
InternalRow row = new GenericInternalRow(results);
return (GenericRecord) serializer.serialize(row);
| }
|
|public String getCode() {
| return code;
|}
""".stripMargin
val evaluator = new ClassBodyEvaluator()
val parentClassLoader = new ParentClassLoader(
Option(Thread.currentThread().getContextClassLoader).getOrElse(getClass.getClassLoader))
evaluator.setParentClassLoader(parentClassLoader)
// Cannot be under package codegen, or fail with java.lang.InstantiationException
evaluator.setClassName(s"org.apache.hudi.sql.payload.$className")
evaluator.setDefaultImports(
classOf[Platform].getName,
classOf[InternalRow].getName,
classOf[UnsafeRow].getName,
classOf[UTF8String].getName,
classOf[Decimal].getName,
classOf[CalendarInterval].getName,
classOf[ArrayData].getName,
classOf[UnsafeArrayData].getName,
classOf[MapData].getName,
classOf[UnsafeMapData].getName,
classOf[Expression].getName,
classOf[TaskContext].getName,
classOf[TaskKilledException].getName,
classOf[InputMetrics].getName,
classOf[IndexedRecord].getName,
classOf[AvroSerializer].getName,
classOf[GenericRecord].getName,
classOf[GenericInternalRow].getName
)
evaluator.setImplementedInterfaces(Array(classOf[IExpressionEvaluator]))
try {
evaluator.cook(codeBody)
} catch {
case e: InternalCompilerException =>
val msg = s"failed to compile: $e"
logError(msg, e)
throw new InternalCompilerException(msg, e)
case e: CompileException =>
val msg = s"failed to compile: $e"
logError(msg, e)
throw new CompileException(msg, e.getLocation)
}
val referenceArray = ctx.references.toArray.map(_.asInstanceOf[Object])
val expressionSql = exprs.map(_.sql).mkString(" ")
evaluator.getClazz.getConstructor(classOf[Object], classOf[String], classOf[AvroSerializer])
.newInstance(referenceArray, s"Expressions is: [$expressionSql]\nCodeBody is: {\n$codeBody\n}", serializer)
.asInstanceOf[IExpressionEvaluator]
}
/**
* Replace the BoundReference to the Record implement which will override the
* doGenCode method.
*/
private def replaceBoundReference(expression: Expression): Expression = {
expression transformDown {
case BoundReference(ordinal, dataType, nullable) =>
RecordBoundReference(ordinal, dataType, nullable)
case other =>
other
}
}
}
case class RecordBoundReference(ordinal: Int, dataType: DataType, nullable: Boolean)
extends LeafExpression {
/**
* Do the CodeGen for RecordBoundReference.
* Use "IndexedRecord" as the input object but not a "Row"
*/
override def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = {
val javaType = JavaCode.javaType(dataType)
val boxType = JavaCode.boxedType(dataType)
val value = s"($boxType)$RECORD_NAME.get($ordinal)"
if (nullable) {
ev.copy(code =
code"""
| boolean ${ev.isNull} = $RECORD_NAME.get($ordinal) == null;
| $javaType ${ev.value} = ${ev.isNull} ?
| ${CodeGenerator.defaultValue(dataType)} : ($value);
"""
)
} else {
ev.copy(code = code"$javaType ${ev.value} = $value;", isNull = FalseLiteral)
}
}
override def eval(input: InternalRow): Any = {
throw new IllegalArgumentException(s"Should not call eval method for " +
s"${getClass.getCanonicalName}")
}
}

View File

@@ -1,322 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.hudi.command.payload
import com.google.common.cache.CacheBuilder
import org.apache.avro.Schema
import org.apache.avro.generic.{GenericData, GenericRecord, IndexedRecord}
import org.apache.hudi.AvroConversionUtils
import org.apache.hudi.DataSourceWriteOptions._
import org.apache.hudi.avro.HoodieAvroUtils
import org.apache.hudi.avro.HoodieAvroUtils.bytesToAvro
import org.apache.hudi.common.model.{DefaultHoodieRecordPayload, HoodiePayloadProps, HoodieRecord}
import org.apache.hudi.common.util.{ValidationUtils, Option => HOption}
import org.apache.hudi.config.HoodieWriteConfig
import org.apache.hudi.io.HoodieWriteHandle
import org.apache.hudi.sql.IExpressionEvaluator
import org.apache.spark.sql.avro.{AvroSerializer, SchemaConverters}
import org.apache.spark.sql.catalyst.expressions.Expression
import org.apache.spark.sql.hudi.SerDeUtils
import org.apache.spark.sql.hudi.command.payload.ExpressionPayload.getEvaluator
import org.apache.spark.sql.types.{StructField, StructType}
import java.util.concurrent.Callable
import java.util.{Base64, Properties}
import scala.collection.JavaConverters._
import scala.collection.mutable.ArrayBuffer
/**
* A HoodieRecordPayload for MergeIntoHoodieTableCommand.
* It will execute the condition and assignments expression in the
* match and not-match actions and compute the final record to write.
*
* If there is no condition match the record, ExpressionPayload will return
* a HoodieWriteHandle.IGNORE_RECORD, and the write handles will ignore this record.
*/
class ExpressionPayload(record: GenericRecord,
orderingVal: Comparable[_])
extends DefaultHoodieRecordPayload(record, orderingVal) {
def this(recordOpt: HOption[GenericRecord]) {
this(recordOpt.orElse(null), 0)
}
/**
* The schema of this table.
*/
private var writeSchema: Schema = _
override def combineAndGetUpdateValue(currentValue: IndexedRecord,
schema: Schema): HOption[IndexedRecord] = {
throw new IllegalStateException(s"Should not call this method for ${getClass.getCanonicalName}")
}
override def getInsertValue(schema: Schema): HOption[IndexedRecord] = {
throw new IllegalStateException(s"Should not call this method for ${getClass.getCanonicalName}")
}
override def combineAndGetUpdateValue(targetRecord: IndexedRecord,
schema: Schema, properties: Properties): HOption[IndexedRecord] = {
val sourceRecord = bytesToAvro(recordBytes, schema)
val joinSqlRecord = new SqlTypedRecord(joinRecord(sourceRecord, targetRecord))
processMatchedRecord(joinSqlRecord, Some(targetRecord), properties)
}
/**
* Process the matched record. Firstly test if the record matched any of the update-conditions,
* if matched, return the update assignments result. Secondly, test if the record matched
* delete-condition, if matched then return a delete record. Finally if no condition matched,
* return a {@link HoodieWriteHandle.IGNORE_RECORD} which will be ignored by HoodieWriteHandle.
* @param inputRecord The input record to process.
* @param targetRecord The origin exist record.
* @param properties The properties.
* @return The result of the record to update or delete.
*/
private def processMatchedRecord(inputRecord: SqlTypedRecord,
targetRecord: Option[IndexedRecord], properties: Properties): HOption[IndexedRecord] = {
// Process update
val updateConditionAndAssignmentsText =
properties.get(ExpressionPayload.PAYLOAD_UPDATE_CONDITION_AND_ASSIGNMENTS)
assert(updateConditionAndAssignmentsText != null,
s"${ExpressionPayload.PAYLOAD_UPDATE_CONDITION_AND_ASSIGNMENTS} have not set")
var resultRecordOpt: HOption[IndexedRecord] = null
// Get the Evaluator for each condition and update assignments.
initWriteSchemaIfNeed(properties)
val updateConditionAndAssignments = getEvaluator(updateConditionAndAssignmentsText.toString, writeSchema)
for ((conditionEvaluator, assignmentEvaluator) <- updateConditionAndAssignments
if resultRecordOpt == null) {
val conditionVal = evaluate(conditionEvaluator, inputRecord).get(0).asInstanceOf[Boolean]
// If the update condition matched then execute assignment expression
// to compute final record to update. We will return the first matched record.
if (conditionVal) {
val resultRecord = evaluate(assignmentEvaluator, inputRecord)
if (targetRecord.isEmpty || needUpdatingPersistedRecord(targetRecord.get, resultRecord, properties)) {
resultRecordOpt = HOption.of(resultRecord)
} else {
// if the PreCombine field value of targetRecord is greater
// than the new incoming record, just keep the old record value.
resultRecordOpt = HOption.of(targetRecord.get)
}
}
}
if (resultRecordOpt == null) {
// Process delete
val deleteConditionText = properties.get(ExpressionPayload.PAYLOAD_DELETE_CONDITION)
if (deleteConditionText != null) {
val deleteCondition = getEvaluator(deleteConditionText.toString, writeSchema).head._1
val deleteConditionVal = evaluate(deleteCondition, inputRecord).get(0).asInstanceOf[Boolean]
if (deleteConditionVal) {
resultRecordOpt = HOption.empty()
}
}
}
if (resultRecordOpt == null) {
// If there is no condition matched, just filter this record.
// here we return a IGNORE_RECORD, HoodieMergeHandle will not handle it.
HOption.of(HoodieWriteHandle.IGNORE_RECORD)
} else {
resultRecordOpt
}
}
/**
* Process the not-matched record. Test if the record matched any of insert-conditions,
* if matched then return the result of insert-assignment. Or else return a
* {@link HoodieWriteHandle.IGNORE_RECORD} which will be ignored by HoodieWriteHandle.
*
* @param inputRecord The input record to process.
* @param properties The properties.
* @return The result of the record to insert.
*/
private def processNotMatchedRecord(inputRecord: SqlTypedRecord, properties: Properties): HOption[IndexedRecord] = {
val insertConditionAndAssignmentsText =
properties.get(ExpressionPayload.PAYLOAD_INSERT_CONDITION_AND_ASSIGNMENTS)
// Get the evaluator for each condition and insert assignment.
initWriteSchemaIfNeed(properties)
val insertConditionAndAssignments =
ExpressionPayload.getEvaluator(insertConditionAndAssignmentsText.toString, writeSchema)
var resultRecordOpt: HOption[IndexedRecord] = null
for ((conditionEvaluator, assignmentEvaluator) <- insertConditionAndAssignments
if resultRecordOpt == null) {
val conditionVal = evaluate(conditionEvaluator, inputRecord).get(0).asInstanceOf[Boolean]
// If matched the insert condition then execute the assignment expressions to compute the
// result record. We will return the first matched record.
if (conditionVal) {
val resultRecord = evaluate(assignmentEvaluator, inputRecord)
resultRecordOpt = HOption.of(resultRecord)
}
}
if (resultRecordOpt != null) {
resultRecordOpt
} else {
// If there is no condition matched, just filter this record.
// Here we return a IGNORE_RECORD, HoodieCreateHandle will not handle it.
HOption.of(HoodieWriteHandle.IGNORE_RECORD)
}
}
override def getInsertValue(schema: Schema, properties: Properties): HOption[IndexedRecord] = {
val incomingRecord = bytesToAvro(recordBytes, schema)
if (isDeleteRecord(incomingRecord)) {
HOption.empty[IndexedRecord]()
} else {
val sqlTypedRecord = new SqlTypedRecord(incomingRecord)
if (isMORTable(properties)) {
// For the MOR table, both the matched and not-matched record will step into the getInsertValue() method.
// We call the processMatchedRecord() method if current is a Update-Record to process
// the matched record. Or else we call processNotMatchedRecord() method to process the not matched record.
val isUpdateRecord = properties.getProperty(HoodiePayloadProps.PAYLOAD_IS_UPDATE_RECORD_FOR_MOR, "false").toBoolean
if (isUpdateRecord) {
processMatchedRecord(sqlTypedRecord, Option.empty, properties)
} else {
processNotMatchedRecord(sqlTypedRecord, properties)
}
} else {
// For COW table, only the not-matched record will step into the getInsertValue method, So just call
// the processNotMatchedRecord() here.
processNotMatchedRecord(sqlTypedRecord, properties)
}
}
}
private def isMORTable(properties: Properties): Boolean = {
properties.getProperty(TABLE_TYPE.key, null) == MOR_TABLE_TYPE_OPT_VAL
}
private def convertToRecord(values: Array[AnyRef], schema: Schema): IndexedRecord = {
assert(values.length == schema.getFields.size())
val writeRecord = new GenericData.Record(schema)
for (i <- values.indices) {
writeRecord.put(i, values(i))
}
writeRecord
}
/**
* Init the table schema.
*/
private def initWriteSchemaIfNeed(properties: Properties): Unit = {
if (writeSchema == null) {
ValidationUtils.checkArgument(properties.containsKey(HoodieWriteConfig.WRITE_SCHEMA.key),
s"Missing ${HoodieWriteConfig.WRITE_SCHEMA.key}")
writeSchema = new Schema.Parser().parse(properties.getProperty(HoodieWriteConfig.WRITE_SCHEMA.key))
}
}
/**
* Join the source record with the target record.
*
* @return
*/
private def joinRecord(sourceRecord: IndexedRecord, targetRecord: IndexedRecord): IndexedRecord = {
val leftSchema = sourceRecord.getSchema
// the targetRecord is load from the disk, it contains the meta fields, so we remove it here
val rightSchema = HoodieAvroUtils.removeMetadataFields(targetRecord.getSchema)
val joinSchema = mergeSchema(leftSchema, rightSchema)
val values = new ArrayBuffer[AnyRef]()
for (i <- 0 until joinSchema.getFields.size()) {
val value = if (i < leftSchema.getFields.size()) {
sourceRecord.get(i)
} else { // skip meta field
targetRecord.get(i - leftSchema.getFields.size() + HoodieRecord.HOODIE_META_COLUMNS.size())
}
values += value
}
convertToRecord(values.toArray, joinSchema)
}
private def mergeSchema(a: Schema, b: Schema): Schema = {
val mergedFields =
a.getFields.asScala.map(field =>
new Schema.Field("a_" + field.name,
field.schema, field.doc, field.defaultVal, field.order)) ++
b.getFields.asScala.map(field =>
new Schema.Field("b_" + field.name,
field.schema, field.doc, field.defaultVal, field.order))
Schema.createRecord(a.getName, a.getDoc, a.getNamespace, a.isError, mergedFields.asJava)
}
private def evaluate(evaluator: IExpressionEvaluator, sqlTypedRecord: SqlTypedRecord): GenericRecord = {
try evaluator.eval(sqlTypedRecord) catch {
case e: Throwable =>
throw new RuntimeException(s"Error in execute expression: ${e.getMessage}.\n${evaluator.getCode}", e)
}
}
}
object ExpressionPayload {
/**
* Property for pass the merge-into delete clause condition expression.
*/
val PAYLOAD_DELETE_CONDITION = "hoodie.payload.delete.condition"
/**
* Property for pass the merge-into update clauses's condition and assignments.
*/
val PAYLOAD_UPDATE_CONDITION_AND_ASSIGNMENTS = "hoodie.payload.update.condition.assignments"
/**
* Property for pass the merge-into insert clauses's condition and assignments.
*/
val PAYLOAD_INSERT_CONDITION_AND_ASSIGNMENTS = "hoodie.payload.insert.condition.assignments"
/**
* A cache for the serializedConditionAssignments to the compiled class after CodeGen.
* The Map[IExpressionEvaluator, IExpressionEvaluator] is the map of the condition expression
* to the assignments expression.
*/
private val cache = CacheBuilder.newBuilder()
.maximumSize(1024)
.build[String, Map[IExpressionEvaluator, IExpressionEvaluator]]()
/**
* Do the CodeGen for each condition and assignment expressions.We will cache it to reduce
* the compile time for each method call.
*/
def getEvaluator(
serializedConditionAssignments: String, writeSchema: Schema): Map[IExpressionEvaluator, IExpressionEvaluator] = {
cache.get(serializedConditionAssignments,
new Callable[Map[IExpressionEvaluator, IExpressionEvaluator]] {
override def call(): Map[IExpressionEvaluator, IExpressionEvaluator] = {
val serializedBytes = Base64.getDecoder.decode(serializedConditionAssignments)
val conditionAssignments = SerDeUtils.toObject(serializedBytes)
.asInstanceOf[Map[Expression, Seq[Expression]]]
// Do the CodeGen for condition expression and assignment expression
conditionAssignments.map {
case (condition, assignments) =>
val conditionType = StructType(Seq(StructField("_col0", condition.dataType, nullable = true)))
val conditionSerializer = new AvroSerializer(conditionType,
SchemaConverters.toAvroType(conditionType), false)
val conditionEvaluator = ExpressionCodeGen.doCodeGen(Seq(condition), conditionSerializer)
val assignSqlType = AvroConversionUtils.convertAvroSchemaToStructType(writeSchema)
val assignSerializer = new AvroSerializer(assignSqlType, writeSchema, false)
val assignmentEvaluator = ExpressionCodeGen.doCodeGen(assignments, assignSerializer)
conditionEvaluator -> assignmentEvaluator
}
}
})
}
}