1
0

[HUDI-2139] MergeInto MOR Table May Result InCorrect Result (#3230)

This commit is contained in:
pengzhiwei
2021-07-23 10:19:43 +08:00
committed by GitHub
parent c89bf1de20
commit 5a2f3d439e
6 changed files with 241 additions and 52 deletions

View File

@@ -31,6 +31,7 @@ import org.apache.hudi.common.model.HoodiePartitionMetadata;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordLocation;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.model.HoodiePayloadProps;
import org.apache.hudi.common.model.HoodieWriteStat.RuntimeStats;
import org.apache.hudi.common.model.IOType;
import org.apache.hudi.common.table.log.AppendResult;
@@ -62,6 +63,7 @@ import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
@@ -108,6 +110,8 @@ public class HoodieAppendHandle<T extends HoodieRecordPayload, I, K, O> extends
protected final Map<HeaderMetadataType, String> header = new HashMap<>();
private SizeEstimator<HoodieRecord> sizeEstimator;
private Properties recordProperties = new Properties();
public HoodieAppendHandle(HoodieWriteConfig config, String instantTime, HoodieTable<T, I, K, O> hoodieTable,
String partitionPath, String fileId, Iterator<HoodieRecord<T>> recordItr, TaskContextSupplier taskContextSupplier) {
super(config, instantTime, partitionPath, fileId, hoodieTable, taskContextSupplier);
@@ -115,6 +119,7 @@ public class HoodieAppendHandle<T extends HoodieRecordPayload, I, K, O> extends
this.recordItr = recordItr;
sizeEstimator = new DefaultSizeEstimator();
this.statuses = new ArrayList<>();
this.recordProperties.putAll(config.getProps());
}
public HoodieAppendHandle(HoodieWriteConfig config, String instantTime, HoodieTable<T, I, K, O> hoodieTable,
@@ -189,8 +194,11 @@ public class HoodieAppendHandle<T extends HoodieRecordPayload, I, K, O> extends
private Option<IndexedRecord> getIndexedRecord(HoodieRecord<T> hoodieRecord) {
Option<Map<String, String>> recordMetadata = hoodieRecord.getData().getMetadata();
try {
Option<IndexedRecord> avroRecord = hoodieRecord.getData().getInsertValue(tableSchema,
config.getProps());
// Pass the isUpdateRecord to the props for HoodieRecordPayload to judge
// Whether it is a update or insert record.
boolean isUpdateRecord = isUpdateRecord(hoodieRecord);
recordProperties.put(HoodiePayloadProps.PAYLOAD_IS_UPDATE_RECORD_FOR_MOR, String.valueOf(isUpdateRecord));
Option<IndexedRecord> avroRecord = hoodieRecord.getData().getInsertValue(tableSchema, recordProperties);
if (avroRecord.isPresent()) {
if (avroRecord.get().equals(IGNORE_RECORD)) {
return avroRecord;

View File

@@ -18,6 +18,10 @@
package org.apache.hudi.common.model;
import org.apache.avro.Schema;
import java.util.Properties;
/**
* Holds payload properties that implementation of {@link HoodieRecordPayload} can leverage.
* Since both payload classes and HoodiePayloadConfig needs to access these props, storing it here in hudi-common.
@@ -38,4 +42,10 @@ public class HoodiePayloadProps {
* @see DefaultHoodieRecordPayload
*/
public static final String PAYLOAD_EVENT_TIME_FIELD_PROP_KEY = "hoodie.payload.event.time.field";
/**
* A runtime config pass to the {@link HoodieRecordPayload#getInsertValue(Schema, Properties)}
* to tell if the current record is a update record or insert record for mor table.
*/
public static final String PAYLOAD_IS_UPDATE_RECORD_FOR_MOR = "hoodie.is.update.record.for.mor";
}

View File

@@ -218,15 +218,9 @@ case class MergeIntoHoodieTableCommand(mergeInto: MergeIntoTable) extends Runnab
assert(deleteActions.size <= 1, "Should be only one delete action in the merge into statement.")
val deleteAction = deleteActions.headOption
val insertActions = if (targetTableType == MOR_TABLE_TYPE_OPT_VAL) {
// For Mor table, the update record goes through the HoodieRecordPayload#getInsertValue
// We append the update actions to the insert actions, so that we can execute the update
// actions in the ExpressionPayload#getInsertValue.
mergeInto.notMatchedActions.map(_.asInstanceOf[InsertAction]) ++
updateActions.map(update => InsertAction(update.condition, update.assignments))
} else {
val insertActions =
mergeInto.notMatchedActions.map(_.asInstanceOf[InsertAction])
}
// Check for the insert actions
checkInsertAssignments(insertActions)
@@ -297,6 +291,16 @@ case class MergeIntoHoodieTableCommand(mergeInto: MergeIntoTable) extends Runnab
assert(update.assignments.length == targetTableSchemaWithoutMetaFields.length,
s"The number of update assignments[${update.assignments.length}] must equal to the " +
s"targetTable field size[${targetTableSchemaWithoutMetaFields.length}]"))
// For MOR table, the target table field cannot be the right-value in the update action.
if (targetTableType == MOR_TABLE_TYPE_OPT_VAL) {
updateActions.foreach(update => {
val targetAttrs = update.assignments.flatMap(a => a.value.collect {
case attr: AttributeReference if mergeInto.targetTable.outputSet.contains(attr) => attr
})
assert(targetAttrs.isEmpty,
s"Target table's field(${targetAttrs.map(_.name).mkString(",")}) cannot be the right-value of the update clause for MOR table.")
})
}
}
private def checkInsertAssignments(insertActions: Seq[InsertAction]): Unit = {
@@ -304,6 +308,7 @@ case class MergeIntoHoodieTableCommand(mergeInto: MergeIntoTable) extends Runnab
assert(insert.assignments.length == targetTableSchemaWithoutMetaFields.length,
s"The number of insert assignments[${insert.assignments.length}] must equal to the " +
s"targetTable field size[${targetTableSchemaWithoutMetaFields.length}]"))
}
private def getTableSchema: Schema = {
@@ -393,7 +398,7 @@ case class MergeIntoHoodieTableCommand(mergeInto: MergeIntoTable) extends Runnab
references.foreach(ref => {
if (ref.ordinal >= sourceDFOutput.size) {
val targetColumn = targetTableSchemaWithoutMetaFields(ref.ordinal - sourceDFOutput.size)
throw new IllegalArgumentException(s"Insert clause cannot contain target table field: $targetColumn" +
throw new IllegalArgumentException(s"Insert clause cannot contain target table's field: ${targetColumn.name}" +
s" in ${exp.sql}")
}
})

View File

@@ -29,7 +29,7 @@ import org.apache.avro.util.Utf8
import org.apache.hudi.DataSourceWriteOptions._
import org.apache.hudi.avro.HoodieAvroUtils
import org.apache.hudi.avro.HoodieAvroUtils.bytesToAvro
import org.apache.hudi.common.model.{DefaultHoodieRecordPayload, HoodieRecord}
import org.apache.hudi.common.model.{DefaultHoodieRecordPayload, HoodiePayloadProps, HoodieRecord}
import org.apache.hudi.common.util.{ValidationUtils, Option => HOption}
import org.apache.hudi.config.HoodieWriteConfig
import org.apache.hudi.io.HoodieWriteHandle
@@ -76,7 +76,21 @@ class ExpressionPayload(record: GenericRecord,
schema: Schema, properties: Properties): HOption[IndexedRecord] = {
val sourceRecord = bytesToAvro(recordBytes, schema)
val joinSqlRecord = new SqlTypedRecord(joinRecord(sourceRecord, targetRecord))
processMatchedRecord(joinSqlRecord, Some(targetRecord), properties)
}
/**
* Process the matched record. Firstly test if the record matched any of the update-conditions,
* if matched, return the update assignments result. Secondly, test if the record matched
* delete-condition, if matched then return a delete record. Finally if no condition matched,
* return a {@link HoodieWriteHandle.IGNORE_RECORD} which will be ignored by HoodieWriteHandle.
* @param inputRecord The input record to process.
* @param targetRecord The origin exist record.
* @param properties The properties.
* @return The result of the record to update or delete.
*/
private def processMatchedRecord(inputRecord: SqlTypedRecord,
targetRecord: Option[IndexedRecord], properties: Properties): HOption[IndexedRecord] = {
// Process update
val updateConditionAndAssignmentsText =
properties.get(ExpressionPayload.PAYLOAD_UPDATE_CONDITION_AND_ASSIGNMENTS)
@@ -90,19 +104,19 @@ class ExpressionPayload(record: GenericRecord,
val updateConditionAndAssignments = getEvaluator(updateConditionAndAssignmentsText.toString, writeSchema)
for ((conditionEvaluator, assignmentEvaluator) <- updateConditionAndAssignments
if resultRecordOpt == null) {
val conditionVal = evaluate(conditionEvaluator, joinSqlRecord).head.asInstanceOf[Boolean]
val conditionVal = evaluate(conditionEvaluator, inputRecord).head.asInstanceOf[Boolean]
// If the update condition matched then execute assignment expression
// to compute final record to update. We will return the first matched record.
if (conditionVal) {
val results = evaluate(assignmentEvaluator, joinSqlRecord)
val results = evaluate(assignmentEvaluator, inputRecord)
val resultRecord = convertToRecord(results, writeSchema)
if (needUpdatingPersistedRecord(targetRecord, resultRecord, properties)) {
if (targetRecord.isEmpty || needUpdatingPersistedRecord(targetRecord.get, resultRecord, properties)) {
resultRecordOpt = HOption.of(resultRecord)
} else {
// if the PreCombine field value of targetRecord is greate
// than the new incoming record, just keep the old record value.
resultRecordOpt = HOption.of(targetRecord)
resultRecordOpt = HOption.of(targetRecord.get)
}
}
}
@@ -111,7 +125,7 @@ class ExpressionPayload(record: GenericRecord,
val deleteConditionText = properties.get(ExpressionPayload.PAYLOAD_DELETE_CONDITION)
if (deleteConditionText != null) {
val deleteCondition = getEvaluator(deleteConditionText.toString, writeSchema).head._1
val deleteConditionVal = evaluate(deleteCondition, joinSqlRecord).head.asInstanceOf[Boolean]
val deleteConditionVal = evaluate(deleteCondition, inputRecord).head.asInstanceOf[Boolean]
if (deleteConditionVal) {
resultRecordOpt = HOption.empty()
}
@@ -126,48 +140,62 @@ class ExpressionPayload(record: GenericRecord,
}
}
/**
* Process the not-matched record. Test if the record matched any of insert-conditions,
* if matched then return the result of insert-assignment. Or else return a
* {@link HoodieWriteHandle.IGNORE_RECORD} which will be ignored by HoodieWriteHandle.
*
* @param inputRecord The input record to process.
* @param properties The properties.
* @return The result of the record to insert.
*/
private def processNotMatchedRecord(inputRecord: SqlTypedRecord, properties: Properties): HOption[IndexedRecord] = {
val insertConditionAndAssignmentsText =
properties.get(ExpressionPayload.PAYLOAD_INSERT_CONDITION_AND_ASSIGNMENTS)
// Get the evaluator for each condition and insert assignment.
initWriteSchemaIfNeed(properties)
val insertConditionAndAssignments =
ExpressionPayload.getEvaluator(insertConditionAndAssignmentsText.toString, writeSchema)
var resultRecordOpt: HOption[IndexedRecord] = null
for ((conditionEvaluator, assignmentEvaluator) <- insertConditionAndAssignments
if resultRecordOpt == null) {
val conditionVal = evaluate(conditionEvaluator, inputRecord).head.asInstanceOf[Boolean]
// If matched the insert condition then execute the assignment expressions to compute the
// result record. We will return the first matched record.
if (conditionVal) {
val results = evaluate(assignmentEvaluator, inputRecord)
resultRecordOpt = HOption.of(convertToRecord(results, writeSchema))
}
}
if (resultRecordOpt != null) {
resultRecordOpt
} else {
// If there is no condition matched, just filter this record.
// Here we return a IGNORE_RECORD, HoodieCreateHandle will not handle it.
HOption.of(HoodieWriteHandle.IGNORE_RECORD)
}
}
override def getInsertValue(schema: Schema, properties: Properties): HOption[IndexedRecord] = {
val incomingRecord = bytesToAvro(recordBytes, schema)
if (isDeleteRecord(incomingRecord)) {
HOption.empty[IndexedRecord]()
} else {
val insertConditionAndAssignmentsText =
properties.get(ExpressionPayload.PAYLOAD_INSERT_CONDITION_AND_ASSIGNMENTS)
// Process insert
val sqlTypedRecord = new SqlTypedRecord(incomingRecord)
// Get the evaluator for each condition and insert assignment.
initWriteSchemaIfNeed(properties)
val insertConditionAndAssignments =
ExpressionPayload.getEvaluator(insertConditionAndAssignmentsText.toString, writeSchema)
var resultRecordOpt: HOption[IndexedRecord] = null
for ((conditionEvaluator, assignmentEvaluator) <- insertConditionAndAssignments
if resultRecordOpt == null) {
val conditionVal = evaluate(conditionEvaluator, sqlTypedRecord).head.asInstanceOf[Boolean]
// If matched the insert condition then execute the assignment expressions to compute the
// result record. We will return the first matched record.
if (conditionVal) {
val results = evaluate(assignmentEvaluator, sqlTypedRecord)
resultRecordOpt = HOption.of(convertToRecord(results, writeSchema))
if (isMORTable(properties)) {
// For the MOR table, both the matched and not-matched record will step into the getInsertValue() method.
// We call the processMatchedRecord() method if current is a Update-Record to process
// the matched record. Or else we call processNotMatchedRecord() method to process the not matched record.
val isUpdateRecord = properties.getProperty(HoodiePayloadProps.PAYLOAD_IS_UPDATE_RECORD_FOR_MOR, "false").toBoolean
if (isUpdateRecord) {
processMatchedRecord(sqlTypedRecord, Option.empty, properties)
} else {
processNotMatchedRecord(sqlTypedRecord, properties)
}
}
// Process delete for MOR
if (resultRecordOpt == null && isMORTable(properties)) {
val deleteConditionText = properties.get(ExpressionPayload.PAYLOAD_DELETE_CONDITION)
if (deleteConditionText != null) {
val deleteCondition = getEvaluator(deleteConditionText.toString, writeSchema).head._1
val deleteConditionVal = evaluate(deleteCondition, sqlTypedRecord).head.asInstanceOf[Boolean]
if (deleteConditionVal) {
resultRecordOpt = HOption.empty()
}
}
}
if (resultRecordOpt != null) {
resultRecordOpt
} else {
// If there is no condition matched, just filter this record.
// Here we return a IGNORE_RECORD, HoodieCreateHandle will not handle it.
HOption.of(HoodieWriteHandle.IGNORE_RECORD)
// For COW table, only the not-matched record will step into the getInsertValue method, So just call
// the processNotMatchedRecord() here.
processNotMatchedRecord(sqlTypedRecord, properties)
}
}
}

View File

@@ -75,7 +75,7 @@ class TestMereIntoLogOnlyTable extends TestHoodieSqlBase {
| select 4 as id, 'a4' as name, 11 as price, 1000 as ts
| ) s0
| on h0.id = s0.id
| when matched then update set *
| when not matched then insert *
|""".stripMargin)
// 5 commits will trigger compaction.

View File

@@ -0,0 +1,138 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.hudi
class TestMergeIntoTable2 extends TestHoodieSqlBase {
test("Test MergeInto for MOR table 2") {
withTempDir { tmp =>
val tableName = generateTableName
// Create a mor partitioned table.
spark.sql(
s"""
| create table $tableName (
| id int,
| name string,
| price double,
| ts long,
| dt string
| ) using hudi
| options (
| type = 'mor',
| primaryKey = 'id',
| preCombineField = 'ts'
| )
| partitioned by(dt)
| location '${tmp.getCanonicalPath}'
""".stripMargin)
// Insert data which matched insert-condition.
spark.sql(
s"""
| merge into $tableName as t0
| using (
| select 1 as id, 'a1' as name, 10 as price, 1000 as ts, '2021-03-21' as dt
| ) as s0
| on t0.id = s0.id
| when not matched and s0.id % 2 = 1 then insert *
""".stripMargin
)
checkAnswer(s"select id,name,price,dt from $tableName")(
Seq(1, "a1", 10, "2021-03-21")
)
// Insert data which not matched insert-condition.
spark.sql(
s"""
| merge into $tableName as t0
| using (
| select 2 as id, 'a2' as name, 10 as price, 1000 as ts, '2021-03-21' as dt
| ) as s0
| on t0.id = s0.id
| when not matched and s0.id % 2 = 1 then insert *
""".stripMargin
)
checkAnswer(s"select id,name,price,dt from $tableName")(
Seq(1, "a1", 10, "2021-03-21")
)
// Update data which not matched update-condition
spark.sql(
s"""
| merge into $tableName as t0
| using (
| select 1 as id, 'a1' as name, 11 as price, 1000 as ts, '2021-03-21' as dt
| ) as s0
| on t0.id = s0.id
| when matched and s0.id % 2 = 0 then update set *
| when matched and s0.id % 3 = 2 then delete
| when not matched then insert *
""".stripMargin
)
checkAnswer(s"select id,name,price,dt from $tableName")(
Seq(1, "a1", 10, "2021-03-21")
)
// Update data which matched update-condition
spark.sql(
s"""
| merge into $tableName as t0
| using (
| select 1 as id, 'a1' as name, 11 as price, 1000 as ts, '2021-03-21' as dt
| ) as s0
| on t0.id = s0.id
| when matched and s0.id % 2 = 1 then update set id = s0.id, name = s0.name,
| price = s0.price * 2, ts = s0.ts, dt = s0.dt
| when not matched then insert (id,name,price,ts,dt) values(s0.id, s0.name, s0.price, s0.ts, s0.dt)
""".stripMargin
)
checkAnswer(s"select id,name,price,dt from $tableName")(
Seq(1, "a1", 22, "2021-03-21")
)
// Delete data which matched update-condition
spark.sql(
s"""
| merge into $tableName as t0
| using (
| select 1 as id, 'a1' as name, 11 as price, 1000 as ts, '2021-03-21' as dt
| ) as s0
| on t0.id = s0.id
| when matched and s0.id % 2 = 0 then update set id = s0.id, name = s0.name,
| price = s0.price * 2, ts = s0.ts, dt = s0.dt
| when matched and s0.id % 2 = 1 then delete
| when not matched then insert (id,name,price,ts,dt) values(s0.id, s0.name, s0.price, s0.ts, s0.dt)
""".stripMargin
)
checkAnswer(s"select count(1) from $tableName")(
Seq(0)
)
checkException(
s"""
| merge into $tableName as t0
| using (
| select 1 as id, 'a1' as name, 10 as price, 1000 as ts, '2021-03-21' as dt
| ) as s0
| on t0.id = s0.id
| when matched and s0.id % 2 = 1 then update set id = s0.id, name = s0.name,
| price = s0.price + t0.price, ts = s0.ts, dt = s0.dt
""".stripMargin
)("assertion failed: Target table's field(price) cannot be the right-value of the update clause for MOR table.")
}
}
}