From 572a2144124b90b2dbef8183ee19a95b21de2716 Mon Sep 17 00:00:00 2001 From: pengzhiwei Date: Sat, 17 Jul 2021 12:59:18 +0800 Subject: [PATCH] [HUDI-1884] MergeInto Support Partial Update For COW (#3154) --- .../spark/sql/hudi/HoodieSqlUtils.scala | 14 +- .../sql/hudi/analysis/HoodieAnalysis.scala | 49 ++++++- .../command/MergeIntoHoodieTableCommand.scala | 12 +- .../hudi/TestPartialUpdateForMergeInto.scala | 137 ++++++++++++++++++ .../spark/sql/adapter/Spark2Adapter.scala | 2 +- .../spark/sql/adapter/Spark3Adapter.scala | 2 +- 6 files changed, 200 insertions(+), 16 deletions(-) create mode 100644 hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestPartialUpdateForMergeInto.scala diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/HoodieSqlUtils.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/HoodieSqlUtils.scala index 8a94ac675..8795dddb3 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/HoodieSqlUtils.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/HoodieSqlUtils.scala @@ -30,7 +30,7 @@ import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation import org.apache.spark.sql.catalyst.catalog.{CatalogTable, CatalogTableType} import org.apache.spark.sql.catalyst.expressions.{And, Attribute, Cast, Expression, Literal} -import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, SubqueryAlias} +import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, MergeIntoTable, SubqueryAlias} import org.apache.spark.sql.execution.datasources.LogicalRelation import org.apache.spark.sql.internal.{SQLConf, StaticSQLConf} import org.apache.spark.sql.types.{DataType, NullType, StringType, StructField, StructType} @@ -155,6 +155,18 @@ object HoodieSqlUtils extends SparkAdapterSupport { } } + /** + * Get the TableIdentifier of the target table in MergeInto. + */ + def getMergeIntoTargetTableId(mergeInto: MergeIntoTable): TableIdentifier = { + val aliaId = mergeInto.targetTable match { + case SubqueryAlias(_, SubqueryAlias(tableId, _)) => tableId + case SubqueryAlias(tableId, _) => tableId + case plan => throw new IllegalArgumentException(s"Illegal plan $plan in target") + } + sparkAdapter.toTableIdentify(aliaId) + } + /** * Split the expression to a sub expression seq by the AND operation. * @param expression diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodieAnalysis.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodieAnalysis.scala index dbd36bee4..4c222e745 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodieAnalysis.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodieAnalysis.scala @@ -17,10 +17,11 @@ package org.apache.spark.sql.hudi.analysis +import org.apache.hudi.DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL import org.apache.hudi.SparkAdapterSupport import scala.collection.JavaConverters._ -import org.apache.hudi.common.model.HoodieRecord +import org.apache.hudi.common.model.{HoodieRecord, HoodieTableType} import org.apache.spark.sql.{AnalysisException, SparkSession} import org.apache.spark.sql.catalyst.analysis.UnresolvedStar import org.apache.spark.sql.catalyst.expressions.AttributeReference @@ -31,7 +32,7 @@ import org.apache.spark.sql.catalyst.plans.logical.{Assignment, DeleteAction, De import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.execution.command.{AlterTableAddColumnsCommand, AlterTableChangeColumnCommand, AlterTableRenameCommand, CreateDataSourceTableCommand, TruncateTableCommand} import org.apache.spark.sql.execution.datasources.{CreateTable, LogicalRelation} -import org.apache.spark.sql.hudi.HoodieSqlUtils +import org.apache.spark.sql.hudi.{HoodieOptionConfig, HoodieSqlUtils} import org.apache.spark.sql.hudi.HoodieSqlUtils._ import org.apache.spark.sql.hudi.command.{AlterHoodieTableAddColumnsCommand, AlterHoodieTableChangeColumnCommand, AlterHoodieTableRenameCommand, CreateHoodieTableAsSelectCommand, CreateHoodieTableCommand, DeleteHoodieTableCommand, InsertIntoHoodieTableCommand, MergeIntoHoodieTableCommand, TruncateHoodieTableCommand, UpdateHoodieTableCommand} import org.apache.spark.sql.types.StringType @@ -102,7 +103,7 @@ case class HoodieResolveReferences(sparkSession: SparkSession) extends Rule[Logi def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperatorsUp { // Resolve merge into - case MergeIntoTable(target, source, mergeCondition, matchedActions, notMatchedActions) + case mergeInto @ MergeIntoTable(target, source, mergeCondition, matchedActions, notMatchedActions) if isHoodieTable(target, sparkSession) && target.resolved => val resolvedSource = analyzer.execute(source) @@ -164,7 +165,47 @@ case class HoodieResolveReferences(sparkSession: SparkSession) extends Rule[Logi case UpdateAction(condition, assignments) => val (resolvedCondition, resolvedAssignments) = resolveConditionAssignments(condition, assignments) - UpdateAction(resolvedCondition, resolvedAssignments) + + // Get the target table type and pre-combine field. + val targetTableId = getMergeIntoTargetTableId(mergeInto) + val targetTable = + sparkSession.sessionState.catalog.getTableMetadata(targetTableId) + val targetTableType = HoodieOptionConfig.getTableType(targetTable.storage.properties) + val preCombineField = HoodieOptionConfig.getPreCombineField(targetTable.storage.properties) + + // Get the map of target attribute to value of the update assignments. + val target2Values = resolvedAssignments.map { + case Assignment(attr: AttributeReference, value) => + attr.name -> value + case o => throw new IllegalArgumentException(s"Assignment key must be an attribute, current is: ${o.key}") + }.toMap + + // Validate if there are incorrect target attributes. + val unKnowTargets = target2Values.keys + .filterNot(removeMetaFields(target.output).map(_.name).contains(_)) + if (unKnowTargets.nonEmpty) { + throw new AnalysisException(s"Cannot find target attributes: ${unKnowTargets.mkString(",")}.") + } + + // Fill the missing target attribute in the update action for COW table to support partial update. + // e.g. If the update action missing 'id' attribute, we fill a "id = target.id" to the update action. + val newAssignments = removeMetaFields(target.output) + .map(attr => { + // TODO support partial update for MOR. + if (!target2Values.contains(attr.name) && targetTableType == MOR_TABLE_TYPE_OPT_VAL) { + throw new AnalysisException(s"Missing specify the value for target field: '${attr.name}' in merge into update action" + + s" for MOR table. Currently we cannot support partial update for MOR," + + s" please complete all the target fields just like '...update set id = s0.id, name = s0.name ....'") + } + if (preCombineField.isDefined && preCombineField.get.equalsIgnoreCase(attr.name) + && !target2Values.contains(attr.name)) { + throw new AnalysisException(s"Missing specify value for the preCombineField:" + + s" ${preCombineField.get} in merge-into update action. You should add" + + s" '... update set ${preCombineField.get} = xx....' to the when-matched clause.") + } + Assignment(attr, target2Values.getOrElse(attr.name, attr)) + }) + UpdateAction(resolvedCondition, newAssignments) case DeleteAction(condition) => val resolvedCondition = condition.map(resolveExpressionFrom(resolvedSource)(_)) DeleteAction(resolvedCondition) diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala index 008d0260d..a2f42579f 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala @@ -61,14 +61,7 @@ case class MergeIntoHoodieTableCommand(mergeInto: MergeIntoTable) extends Runnab /** * The target table identify. */ - private lazy val targetTableIdentify: TableIdentifier = { - val aliaId = mergeInto.targetTable match { - case SubqueryAlias(_, SubqueryAlias(tableId, _)) => tableId - case SubqueryAlias(tableId, _) => tableId - case plan => throw new IllegalArgumentException(s"Illegal plan $plan in target") - } - sparkAdapter.toTableIdentify(aliaId) - } + private lazy val targetTableIdentify: TableIdentifier = getMergeIntoTargetTableId(mergeInto) /** * The target table schema without hoodie meta fields. @@ -124,7 +117,7 @@ case class MergeIntoHoodieTableCommand(mergeInto: MergeIntoTable) extends Runnab */ private lazy val target2SourcePreCombineFiled: Option[(String, Expression)] = { val updateActions = mergeInto.matchedActions.collect { case u: UpdateAction => u } - assert(updateActions.size <= 1, s"Only support one updateAction, current is: ${updateActions.size}") + assert(updateActions.size <= 1, s"Only support one updateAction currently, current update action count is: ${updateActions.size}") val updateAction = updateActions.headOption HoodieOptionConfig.getPreCombineField(targetTable.storage.properties).map(preCombineField => { @@ -151,6 +144,7 @@ case class MergeIntoHoodieTableCommand(mergeInto: MergeIntoTable) extends Runnab // Create the write parameters val parameters = buildMergeIntoConfig(mergeInto) + val sourceDF = buildSourceDF(sparkSession) if (mergeInto.matchedActions.nonEmpty) { // Do the upsert diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestPartialUpdateForMergeInto.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestPartialUpdateForMergeInto.scala new file mode 100644 index 000000000..0def04973 --- /dev/null +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestPartialUpdateForMergeInto.scala @@ -0,0 +1,137 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hudi + +class TestPartialUpdateForMergeInto extends TestHoodieSqlBase { + + test("Test Partial Update") { + withTempDir { tmp => + // TODO after we support partial update for MOR, we can add test case for 'mor'. + Seq("cow").foreach { tableType => + val tableName = generateTableName + spark.sql( + s""" + |create table $tableName ( + | id int, + | name string, + | price double, + | _ts long + |) using hudi + |options( + | type ='$tableType', + | primaryKey = 'id', + | preCombineField = '_ts' + |) + |location '${tmp.getCanonicalPath}/$tableName' + """.stripMargin) + spark.sql(s"insert into $tableName values(1, 'a1', 10, 1000)") + + spark.sql( + s""" + |merge into $tableName t0 + |using ( select 1 as id, 'a1' as name, 12 as price, 1001 as ts) s0 + |on t0.id = s0.id + |when matched then update set price = s0.price, _ts = s0.ts + |""".stripMargin) + checkAnswer(s"select id, name, price, _ts from $tableName")( + Seq(1, "a1", 12.0, 1001) + ) + + val tableName2 = generateTableName + spark.sql( + s""" + |create table $tableName2 ( + | id int, + | name string, + | price double + |) using hudi + |options( + | type ='$tableType', + | primaryKey = 'id' + |) + |location '${tmp.getCanonicalPath}/$tableName2' + """.stripMargin) + spark.sql(s"insert into $tableName2 values(1, 'a1', 10)") + + spark.sql( + s""" + |merge into $tableName2 t0 + |using ( select 1 as id, 'a1' as name, 12 as price) s0 + |on t0.id = s0.id + |when matched then update set price = s0.price + |""".stripMargin) + checkAnswer(s"select id, name, price from $tableName2")( + Seq(1, "a1", 12.0) + ) + } + } + } + + test("Test MergeInto Exception") { + val tableName = generateTableName + spark.sql( + s""" + |create table $tableName ( + | id int, + | name string, + | price double, + | _ts long + |) using hudi + |options( + | type = 'cow', + | primaryKey = 'id', + | preCombineField = '_ts' + |)""".stripMargin) + + checkException( + s""" + |merge into $tableName t0 + |using ( select 1 as id, 'a1' as name, 12 as price) s0 + |on t0.id = s0.id + |when matched then update set price = s0.price + """.stripMargin)( + "Missing specify value for the preCombineField: _ts in merge-into update action. " + + "You should add '... update set _ts = xx....' to the when-matched clause.;") + + val tableName2 = generateTableName + spark.sql( + s""" + |create table $tableName2 ( + | id int, + | name string, + | price double, + | _ts long + |) using hudi + |options( + | type = 'mor', + | primaryKey = 'id', + | preCombineField = '_ts' + |)""".stripMargin) + + checkException( + s""" + |merge into $tableName2 t0 + |using ( select 1 as id, 'a1' as name, 12 as price) s0 + |on t0.id = s0.id + |when matched then update set price = s0.price + """.stripMargin)( + "Missing specify the value for target field: 'id' in merge into update action for MOR table. " + + "Currently we cannot support partial update for MOR, please complete all the target fields " + + "just like '...update set id = s0.id, name = s0.name ....';") + } +} diff --git a/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/adapter/Spark2Adapter.scala b/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/adapter/Spark2Adapter.scala index 39a477b4c..e55fdf2a5 100644 --- a/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/adapter/Spark2Adapter.scala +++ b/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/adapter/Spark2Adapter.scala @@ -33,7 +33,7 @@ import org.apache.spark.sql.hudi.parser.HoodieSqlParser import org.apache.spark.sql.internal.SQLConf /** - * A sql adapter for spark2. + * The adapter for spark2. */ class Spark2Adapter extends SparkAdapter { diff --git a/hudi-spark-datasource/hudi-spark3/src/main/scala/org/apache/spark/sql/adapter/Spark3Adapter.scala b/hudi-spark-datasource/hudi-spark3/src/main/scala/org/apache/spark/sql/adapter/Spark3Adapter.scala index 1ce4996f0..4c9a06b3c 100644 --- a/hudi-spark-datasource/hudi-spark3/src/main/scala/org/apache/spark/sql/adapter/Spark3Adapter.scala +++ b/hudi-spark-datasource/hudi-spark3/src/main/scala/org/apache/spark/sql/adapter/Spark3Adapter.scala @@ -32,7 +32,7 @@ import org.apache.spark.sql.hudi.SparkAdapter import org.apache.spark.sql.internal.SQLConf /** - * A sql adapter for spark3. + * The adapter for spark3. */ class Spark3Adapter extends SparkAdapter {