diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java index 64de06651..1b7f986bd 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java @@ -31,6 +31,7 @@ import org.apache.hudi.common.model.HoodiePartitionMetadata; import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieRecordLocation; import org.apache.hudi.common.model.HoodieRecordPayload; +import org.apache.hudi.common.model.HoodiePayloadProps; import org.apache.hudi.common.model.HoodieWriteStat.RuntimeStats; import org.apache.hudi.common.model.IOType; import org.apache.hudi.common.table.log.AppendResult; @@ -62,6 +63,7 @@ import java.util.HashMap; import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.Properties; import java.util.concurrent.atomic.AtomicLong; import java.util.stream.Collectors; @@ -108,6 +110,8 @@ public class HoodieAppendHandle extends protected final Map header = new HashMap<>(); private SizeEstimator sizeEstimator; + private Properties recordProperties = new Properties(); + public HoodieAppendHandle(HoodieWriteConfig config, String instantTime, HoodieTable hoodieTable, String partitionPath, String fileId, Iterator> recordItr, TaskContextSupplier taskContextSupplier) { super(config, instantTime, partitionPath, fileId, hoodieTable, taskContextSupplier); @@ -115,6 +119,7 @@ public class HoodieAppendHandle extends this.recordItr = recordItr; sizeEstimator = new DefaultSizeEstimator(); this.statuses = new ArrayList<>(); + this.recordProperties.putAll(config.getProps()); } public HoodieAppendHandle(HoodieWriteConfig config, String instantTime, HoodieTable hoodieTable, @@ -189,8 +194,11 @@ public class HoodieAppendHandle extends private Option getIndexedRecord(HoodieRecord hoodieRecord) { Option> recordMetadata = hoodieRecord.getData().getMetadata(); try { - Option avroRecord = hoodieRecord.getData().getInsertValue(tableSchema, - config.getProps()); + // Pass the isUpdateRecord to the props for HoodieRecordPayload to judge + // Whether it is a update or insert record. + boolean isUpdateRecord = isUpdateRecord(hoodieRecord); + recordProperties.put(HoodiePayloadProps.PAYLOAD_IS_UPDATE_RECORD_FOR_MOR, String.valueOf(isUpdateRecord)); + Option avroRecord = hoodieRecord.getData().getInsertValue(tableSchema, recordProperties); if (avroRecord.isPresent()) { if (avroRecord.get().equals(IGNORE_RECORD)) { return avroRecord; diff --git a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodiePayloadProps.java b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodiePayloadProps.java index d6067f2b9..24bf3d774 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodiePayloadProps.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodiePayloadProps.java @@ -18,6 +18,10 @@ package org.apache.hudi.common.model; +import org.apache.avro.Schema; + +import java.util.Properties; + /** * Holds payload properties that implementation of {@link HoodieRecordPayload} can leverage. * Since both payload classes and HoodiePayloadConfig needs to access these props, storing it here in hudi-common. @@ -38,4 +42,10 @@ public class HoodiePayloadProps { * @see DefaultHoodieRecordPayload */ public static final String PAYLOAD_EVENT_TIME_FIELD_PROP_KEY = "hoodie.payload.event.time.field"; + + /** + * A runtime config pass to the {@link HoodieRecordPayload#getInsertValue(Schema, Properties)} + * to tell if the current record is a update record or insert record for mor table. + */ + public static final String PAYLOAD_IS_UPDATE_RECORD_FOR_MOR = "hoodie.is.update.record.for.mor"; } diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala index a2f42579f..df8688562 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala @@ -218,15 +218,9 @@ case class MergeIntoHoodieTableCommand(mergeInto: MergeIntoTable) extends Runnab assert(deleteActions.size <= 1, "Should be only one delete action in the merge into statement.") val deleteAction = deleteActions.headOption - val insertActions = if (targetTableType == MOR_TABLE_TYPE_OPT_VAL) { - // For Mor table, the update record goes through the HoodieRecordPayload#getInsertValue - // We append the update actions to the insert actions, so that we can execute the update - // actions in the ExpressionPayload#getInsertValue. - mergeInto.notMatchedActions.map(_.asInstanceOf[InsertAction]) ++ - updateActions.map(update => InsertAction(update.condition, update.assignments)) - } else { + val insertActions = mergeInto.notMatchedActions.map(_.asInstanceOf[InsertAction]) - } + // Check for the insert actions checkInsertAssignments(insertActions) @@ -297,6 +291,16 @@ case class MergeIntoHoodieTableCommand(mergeInto: MergeIntoTable) extends Runnab assert(update.assignments.length == targetTableSchemaWithoutMetaFields.length, s"The number of update assignments[${update.assignments.length}] must equal to the " + s"targetTable field size[${targetTableSchemaWithoutMetaFields.length}]")) + // For MOR table, the target table field cannot be the right-value in the update action. + if (targetTableType == MOR_TABLE_TYPE_OPT_VAL) { + updateActions.foreach(update => { + val targetAttrs = update.assignments.flatMap(a => a.value.collect { + case attr: AttributeReference if mergeInto.targetTable.outputSet.contains(attr) => attr + }) + assert(targetAttrs.isEmpty, + s"Target table's field(${targetAttrs.map(_.name).mkString(",")}) cannot be the right-value of the update clause for MOR table.") + }) + } } private def checkInsertAssignments(insertActions: Seq[InsertAction]): Unit = { @@ -304,6 +308,7 @@ case class MergeIntoHoodieTableCommand(mergeInto: MergeIntoTable) extends Runnab assert(insert.assignments.length == targetTableSchemaWithoutMetaFields.length, s"The number of insert assignments[${insert.assignments.length}] must equal to the " + s"targetTable field size[${targetTableSchemaWithoutMetaFields.length}]")) + } private def getTableSchema: Schema = { @@ -393,7 +398,7 @@ case class MergeIntoHoodieTableCommand(mergeInto: MergeIntoTable) extends Runnab references.foreach(ref => { if (ref.ordinal >= sourceDFOutput.size) { val targetColumn = targetTableSchemaWithoutMetaFields(ref.ordinal - sourceDFOutput.size) - throw new IllegalArgumentException(s"Insert clause cannot contain target table field: $targetColumn" + + throw new IllegalArgumentException(s"Insert clause cannot contain target table's field: ${targetColumn.name}" + s" in ${exp.sql}") } }) diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/payload/ExpressionPayload.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/payload/ExpressionPayload.scala index 8ea9e71a0..ce3373ae7 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/payload/ExpressionPayload.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/payload/ExpressionPayload.scala @@ -29,7 +29,7 @@ import org.apache.avro.util.Utf8 import org.apache.hudi.DataSourceWriteOptions._ import org.apache.hudi.avro.HoodieAvroUtils import org.apache.hudi.avro.HoodieAvroUtils.bytesToAvro -import org.apache.hudi.common.model.{DefaultHoodieRecordPayload, HoodieRecord} +import org.apache.hudi.common.model.{DefaultHoodieRecordPayload, HoodiePayloadProps, HoodieRecord} import org.apache.hudi.common.util.{ValidationUtils, Option => HOption} import org.apache.hudi.config.HoodieWriteConfig import org.apache.hudi.io.HoodieWriteHandle @@ -76,7 +76,21 @@ class ExpressionPayload(record: GenericRecord, schema: Schema, properties: Properties): HOption[IndexedRecord] = { val sourceRecord = bytesToAvro(recordBytes, schema) val joinSqlRecord = new SqlTypedRecord(joinRecord(sourceRecord, targetRecord)) + processMatchedRecord(joinSqlRecord, Some(targetRecord), properties) + } + /** + * Process the matched record. Firstly test if the record matched any of the update-conditions, + * if matched, return the update assignments result. Secondly, test if the record matched + * delete-condition, if matched then return a delete record. Finally if no condition matched, + * return a {@link HoodieWriteHandle.IGNORE_RECORD} which will be ignored by HoodieWriteHandle. + * @param inputRecord The input record to process. + * @param targetRecord The origin exist record. + * @param properties The properties. + * @return The result of the record to update or delete. + */ + private def processMatchedRecord(inputRecord: SqlTypedRecord, + targetRecord: Option[IndexedRecord], properties: Properties): HOption[IndexedRecord] = { // Process update val updateConditionAndAssignmentsText = properties.get(ExpressionPayload.PAYLOAD_UPDATE_CONDITION_AND_ASSIGNMENTS) @@ -90,19 +104,19 @@ class ExpressionPayload(record: GenericRecord, val updateConditionAndAssignments = getEvaluator(updateConditionAndAssignmentsText.toString, writeSchema) for ((conditionEvaluator, assignmentEvaluator) <- updateConditionAndAssignments if resultRecordOpt == null) { - val conditionVal = evaluate(conditionEvaluator, joinSqlRecord).head.asInstanceOf[Boolean] + val conditionVal = evaluate(conditionEvaluator, inputRecord).head.asInstanceOf[Boolean] // If the update condition matched then execute assignment expression // to compute final record to update. We will return the first matched record. if (conditionVal) { - val results = evaluate(assignmentEvaluator, joinSqlRecord) + val results = evaluate(assignmentEvaluator, inputRecord) val resultRecord = convertToRecord(results, writeSchema) - if (needUpdatingPersistedRecord(targetRecord, resultRecord, properties)) { + if (targetRecord.isEmpty || needUpdatingPersistedRecord(targetRecord.get, resultRecord, properties)) { resultRecordOpt = HOption.of(resultRecord) } else { // if the PreCombine field value of targetRecord is greate // than the new incoming record, just keep the old record value. - resultRecordOpt = HOption.of(targetRecord) + resultRecordOpt = HOption.of(targetRecord.get) } } } @@ -111,7 +125,7 @@ class ExpressionPayload(record: GenericRecord, val deleteConditionText = properties.get(ExpressionPayload.PAYLOAD_DELETE_CONDITION) if (deleteConditionText != null) { val deleteCondition = getEvaluator(deleteConditionText.toString, writeSchema).head._1 - val deleteConditionVal = evaluate(deleteCondition, joinSqlRecord).head.asInstanceOf[Boolean] + val deleteConditionVal = evaluate(deleteCondition, inputRecord).head.asInstanceOf[Boolean] if (deleteConditionVal) { resultRecordOpt = HOption.empty() } @@ -126,48 +140,62 @@ class ExpressionPayload(record: GenericRecord, } } + /** + * Process the not-matched record. Test if the record matched any of insert-conditions, + * if matched then return the result of insert-assignment. Or else return a + * {@link HoodieWriteHandle.IGNORE_RECORD} which will be ignored by HoodieWriteHandle. + * + * @param inputRecord The input record to process. + * @param properties The properties. + * @return The result of the record to insert. + */ + private def processNotMatchedRecord(inputRecord: SqlTypedRecord, properties: Properties): HOption[IndexedRecord] = { + val insertConditionAndAssignmentsText = + properties.get(ExpressionPayload.PAYLOAD_INSERT_CONDITION_AND_ASSIGNMENTS) + // Get the evaluator for each condition and insert assignment. + initWriteSchemaIfNeed(properties) + val insertConditionAndAssignments = + ExpressionPayload.getEvaluator(insertConditionAndAssignmentsText.toString, writeSchema) + var resultRecordOpt: HOption[IndexedRecord] = null + for ((conditionEvaluator, assignmentEvaluator) <- insertConditionAndAssignments + if resultRecordOpt == null) { + val conditionVal = evaluate(conditionEvaluator, inputRecord).head.asInstanceOf[Boolean] + // If matched the insert condition then execute the assignment expressions to compute the + // result record. We will return the first matched record. + if (conditionVal) { + val results = evaluate(assignmentEvaluator, inputRecord) + resultRecordOpt = HOption.of(convertToRecord(results, writeSchema)) + } + } + if (resultRecordOpt != null) { + resultRecordOpt + } else { + // If there is no condition matched, just filter this record. + // Here we return a IGNORE_RECORD, HoodieCreateHandle will not handle it. + HOption.of(HoodieWriteHandle.IGNORE_RECORD) + } + } + override def getInsertValue(schema: Schema, properties: Properties): HOption[IndexedRecord] = { val incomingRecord = bytesToAvro(recordBytes, schema) if (isDeleteRecord(incomingRecord)) { HOption.empty[IndexedRecord]() } else { - val insertConditionAndAssignmentsText = - properties.get(ExpressionPayload.PAYLOAD_INSERT_CONDITION_AND_ASSIGNMENTS) - // Process insert val sqlTypedRecord = new SqlTypedRecord(incomingRecord) - // Get the evaluator for each condition and insert assignment. - initWriteSchemaIfNeed(properties) - val insertConditionAndAssignments = - ExpressionPayload.getEvaluator(insertConditionAndAssignmentsText.toString, writeSchema) - var resultRecordOpt: HOption[IndexedRecord] = null - for ((conditionEvaluator, assignmentEvaluator) <- insertConditionAndAssignments - if resultRecordOpt == null) { - val conditionVal = evaluate(conditionEvaluator, sqlTypedRecord).head.asInstanceOf[Boolean] - // If matched the insert condition then execute the assignment expressions to compute the - // result record. We will return the first matched record. - if (conditionVal) { - val results = evaluate(assignmentEvaluator, sqlTypedRecord) - resultRecordOpt = HOption.of(convertToRecord(results, writeSchema)) + if (isMORTable(properties)) { + // For the MOR table, both the matched and not-matched record will step into the getInsertValue() method. + // We call the processMatchedRecord() method if current is a Update-Record to process + // the matched record. Or else we call processNotMatchedRecord() method to process the not matched record. + val isUpdateRecord = properties.getProperty(HoodiePayloadProps.PAYLOAD_IS_UPDATE_RECORD_FOR_MOR, "false").toBoolean + if (isUpdateRecord) { + processMatchedRecord(sqlTypedRecord, Option.empty, properties) + } else { + processNotMatchedRecord(sqlTypedRecord, properties) } - } - - // Process delete for MOR - if (resultRecordOpt == null && isMORTable(properties)) { - val deleteConditionText = properties.get(ExpressionPayload.PAYLOAD_DELETE_CONDITION) - if (deleteConditionText != null) { - val deleteCondition = getEvaluator(deleteConditionText.toString, writeSchema).head._1 - val deleteConditionVal = evaluate(deleteCondition, sqlTypedRecord).head.asInstanceOf[Boolean] - if (deleteConditionVal) { - resultRecordOpt = HOption.empty() - } - } - } - if (resultRecordOpt != null) { - resultRecordOpt } else { - // If there is no condition matched, just filter this record. - // Here we return a IGNORE_RECORD, HoodieCreateHandle will not handle it. - HOption.of(HoodieWriteHandle.IGNORE_RECORD) + // For COW table, only the not-matched record will step into the getInsertValue method, So just call + // the processNotMatchedRecord() here. + processNotMatchedRecord(sqlTypedRecord, properties) } } } diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestMereIntoLogOnlyTable.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestMereIntoLogOnlyTable.scala index 668f0f960..b4492b591 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestMereIntoLogOnlyTable.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestMereIntoLogOnlyTable.scala @@ -75,7 +75,7 @@ class TestMereIntoLogOnlyTable extends TestHoodieSqlBase { | select 4 as id, 'a4' as name, 11 as price, 1000 as ts | ) s0 | on h0.id = s0.id - | when matched then update set * + | when not matched then insert * |""".stripMargin) // 5 commits will trigger compaction. diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestMergeIntoTable2.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestMergeIntoTable2.scala new file mode 100644 index 000000000..24723849a --- /dev/null +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestMergeIntoTable2.scala @@ -0,0 +1,138 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hudi + +class TestMergeIntoTable2 extends TestHoodieSqlBase { + + test("Test MergeInto for MOR table 2") { + withTempDir { tmp => + val tableName = generateTableName + // Create a mor partitioned table. + spark.sql( + s""" + | create table $tableName ( + | id int, + | name string, + | price double, + | ts long, + | dt string + | ) using hudi + | options ( + | type = 'mor', + | primaryKey = 'id', + | preCombineField = 'ts' + | ) + | partitioned by(dt) + | location '${tmp.getCanonicalPath}' + """.stripMargin) + // Insert data which matched insert-condition. + spark.sql( + s""" + | merge into $tableName as t0 + | using ( + | select 1 as id, 'a1' as name, 10 as price, 1000 as ts, '2021-03-21' as dt + | ) as s0 + | on t0.id = s0.id + | when not matched and s0.id % 2 = 1 then insert * + """.stripMargin + ) + checkAnswer(s"select id,name,price,dt from $tableName")( + Seq(1, "a1", 10, "2021-03-21") + ) + + // Insert data which not matched insert-condition. + spark.sql( + s""" + | merge into $tableName as t0 + | using ( + | select 2 as id, 'a2' as name, 10 as price, 1000 as ts, '2021-03-21' as dt + | ) as s0 + | on t0.id = s0.id + | when not matched and s0.id % 2 = 1 then insert * + """.stripMargin + ) + checkAnswer(s"select id,name,price,dt from $tableName")( + Seq(1, "a1", 10, "2021-03-21") + ) + + // Update data which not matched update-condition + spark.sql( + s""" + | merge into $tableName as t0 + | using ( + | select 1 as id, 'a1' as name, 11 as price, 1000 as ts, '2021-03-21' as dt + | ) as s0 + | on t0.id = s0.id + | when matched and s0.id % 2 = 0 then update set * + | when matched and s0.id % 3 = 2 then delete + | when not matched then insert * + """.stripMargin + ) + checkAnswer(s"select id,name,price,dt from $tableName")( + Seq(1, "a1", 10, "2021-03-21") + ) + + // Update data which matched update-condition + spark.sql( + s""" + | merge into $tableName as t0 + | using ( + | select 1 as id, 'a1' as name, 11 as price, 1000 as ts, '2021-03-21' as dt + | ) as s0 + | on t0.id = s0.id + | when matched and s0.id % 2 = 1 then update set id = s0.id, name = s0.name, + | price = s0.price * 2, ts = s0.ts, dt = s0.dt + | when not matched then insert (id,name,price,ts,dt) values(s0.id, s0.name, s0.price, s0.ts, s0.dt) + """.stripMargin + ) + checkAnswer(s"select id,name,price,dt from $tableName")( + Seq(1, "a1", 22, "2021-03-21") + ) + + // Delete data which matched update-condition + spark.sql( + s""" + | merge into $tableName as t0 + | using ( + | select 1 as id, 'a1' as name, 11 as price, 1000 as ts, '2021-03-21' as dt + | ) as s0 + | on t0.id = s0.id + | when matched and s0.id % 2 = 0 then update set id = s0.id, name = s0.name, + | price = s0.price * 2, ts = s0.ts, dt = s0.dt + | when matched and s0.id % 2 = 1 then delete + | when not matched then insert (id,name,price,ts,dt) values(s0.id, s0.name, s0.price, s0.ts, s0.dt) + """.stripMargin + ) + checkAnswer(s"select count(1) from $tableName")( + Seq(0) + ) + + checkException( + s""" + | merge into $tableName as t0 + | using ( + | select 1 as id, 'a1' as name, 10 as price, 1000 as ts, '2021-03-21' as dt + | ) as s0 + | on t0.id = s0.id + | when matched and s0.id % 2 = 1 then update set id = s0.id, name = s0.name, + | price = s0.price + t0.price, ts = s0.ts, dt = s0.dt + """.stripMargin + )("assertion failed: Target table's field(price) cannot be the right-value of the update clause for MOR table.") + } + } +}