diff --git a/hudi-spark-datasource/hudi-spark3/src/main/scala/org/apache/spark/sql/hudi/catalog/ProvidesHoodieConfig.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/ProvidesHoodieConfig.scala similarity index 90% rename from hudi-spark-datasource/hudi-spark3/src/main/scala/org/apache/spark/sql/hudi/catalog/ProvidesHoodieConfig.scala rename to hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/ProvidesHoodieConfig.scala index 30fdde168..a124575e0 100644 --- a/hudi-spark-datasource/hudi-spark3/src/main/scala/org/apache/spark/sql/hudi/catalog/ProvidesHoodieConfig.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/ProvidesHoodieConfig.scala @@ -1,13 +1,12 @@ /* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -16,7 +15,7 @@ * limitations under the License. */ -package org.apache.spark.sql.hudi.catalog +package org.apache.spark.sql.hudi import org.apache.hudi.DataSourceWriteOptions import org.apache.hudi.DataSourceWriteOptions._ @@ -76,18 +75,19 @@ trait ProvidesHoodieConfig extends Logging { /** * Build the default config for insert. + * * @return */ - def buildHoodieInsertConfig(hoodieCatalogTable: HoodieCatalogTable, - sparkSession: SparkSession, - isOverwrite: Boolean, - insertPartitions: Map[String, Option[String]] = Map.empty, - extraOptions: Map[String, String]): Map[String, String] = { + def buildHoodieInsertConfig(hoodieCatalogTable: HoodieCatalogTable, + sparkSession: SparkSession, + isOverwrite: Boolean, + insertPartitions: Map[String, Option[String]] = Map.empty, + extraOptions: Map[String, String]): Map[String, String] = { if (insertPartitions.nonEmpty && (insertPartitions.keys.toSet != hoodieCatalogTable.partitionFields.toSet)) { throw new IllegalArgumentException(s"Insert partition fields" + - s"[${insertPartitions.keys.mkString(" " )}]" + + s"[${insertPartitions.keys.mkString(" ")}]" + s" not equal to the defined partition in table[${hoodieCatalogTable.partitionFields.mkString(",")}]") } val path = hoodieCatalogTable.tableLocation diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/InsertIntoHoodieTableCommand.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/InsertIntoHoodieTableCommand.scala index b4013d2d0..53e4623e1 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/InsertIntoHoodieTableCommand.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/InsertIntoHoodieTableCommand.scala @@ -17,26 +17,17 @@ package org.apache.spark.sql.hudi.command -import org.apache.hudi.DataSourceWriteOptions._ -import org.apache.hudi.common.model.OverwriteWithLatestAvroPayload -import org.apache.hudi.config.HoodieWriteConfig -import org.apache.hudi.config.HoodieWriteConfig.TBL_NAME -import org.apache.hudi.hive.MultiPartKeysValueExtractor -import org.apache.hudi.hive.ddl.HiveSyncMode -import org.apache.hudi.keygen.ComplexKeyGenerator -import org.apache.hudi.sql.InsertMode -import org.apache.hudi.{DataSourceWriteOptions, HoodieSparkSqlWriter} +import org.apache.hudi.HoodieSparkSqlWriter import org.apache.spark.internal.Logging import org.apache.spark.sql.catalyst.catalog.{CatalogTable, HoodieCatalogTable} import org.apache.spark.sql.catalyst.expressions.{Alias, Literal} import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project} import org.apache.spark.sql.execution.datasources.LogicalRelation import org.apache.spark.sql.hudi.HoodieSqlCommonUtils._ +import org.apache.spark.sql.hudi.ProvidesHoodieConfig import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.{Dataset, Row, SaveMode, SparkSession} -import scala.collection.JavaConverters._ - /** * Command for insert into hoodie table. */ @@ -56,7 +47,7 @@ case class InsertIntoHoodieTableCommand( } } -object InsertIntoHoodieTableCommand extends Logging { +object InsertIntoHoodieTableCommand extends Logging with ProvidesHoodieConfig { /** * Run the insert query. We support both dynamic partition insert and static partition insert. * @param sparkSession The spark session. @@ -173,112 +164,4 @@ object InsertIntoHoodieTableCommand extends Logging { val alignedProjects = dataProjectsWithoutMetaFields ++ partitionProjects Project(alignedProjects, query) } - - /** - * Build the default config for insert. - * @return - */ - private def buildHoodieInsertConfig( - hoodieCatalogTable: HoodieCatalogTable, - sparkSession: SparkSession, - isOverwrite: Boolean, - insertPartitions: Map[String, Option[String]] = Map.empty, - extraOptions: Map[String, String]): Map[String, String] = { - - if (insertPartitions.nonEmpty && - (insertPartitions.keys.toSet != hoodieCatalogTable.partitionFields.toSet)) { - throw new IllegalArgumentException(s"Insert partition fields" + - s"[${insertPartitions.keys.mkString(" " )}]" + - s" not equal to the defined partition in table[${hoodieCatalogTable.partitionFields.mkString(",")}]") - } - val path = hoodieCatalogTable.tableLocation - val tableType = hoodieCatalogTable.tableTypeName - val tableConfig = hoodieCatalogTable.tableConfig - val tableSchema = hoodieCatalogTable.tableSchema - - val options = hoodieCatalogTable.catalogProperties ++ tableConfig.getProps.asScala.toMap ++ extraOptions - val parameters = withSparkConf(sparkSession, options)() - - val preCombineColumn = hoodieCatalogTable.preCombineKey.getOrElse("") - val partitionFields = hoodieCatalogTable.partitionFields.mkString(",") - - val hiveStylePartitioningEnable = Option(tableConfig.getHiveStylePartitioningEnable).getOrElse("true") - val urlEncodePartitioning = Option(tableConfig.getUrlEncodePartitioning).getOrElse("false") - val keyGeneratorClassName = Option(tableConfig.getKeyGeneratorClassName) - .getOrElse(classOf[ComplexKeyGenerator].getCanonicalName) - - val enableBulkInsert = parameters.getOrElse(DataSourceWriteOptions.SQL_ENABLE_BULK_INSERT.key, - DataSourceWriteOptions.SQL_ENABLE_BULK_INSERT.defaultValue()).toBoolean - val dropDuplicate = sparkSession.conf - .getOption(INSERT_DROP_DUPS.key).getOrElse(INSERT_DROP_DUPS.defaultValue).toBoolean - - val insertMode = InsertMode.of(parameters.getOrElse(DataSourceWriteOptions.SQL_INSERT_MODE.key, - DataSourceWriteOptions.SQL_INSERT_MODE.defaultValue())) - val isNonStrictMode = insertMode == InsertMode.NON_STRICT - val isPartitionedTable = hoodieCatalogTable.partitionFields.nonEmpty - val hasPreCombineColumn = preCombineColumn.nonEmpty - val operation = - (enableBulkInsert, isOverwrite, dropDuplicate, isNonStrictMode, isPartitionedTable) match { - case (true, _, _, false, _) => - throw new IllegalArgumentException(s"Table with primaryKey can not use bulk insert in ${insertMode.value()} mode.") - case (true, true, _, _, true) => - throw new IllegalArgumentException(s"Insert Overwrite Partition can not use bulk insert.") - case (true, _, true, _, _) => - throw new IllegalArgumentException(s"Bulk insert cannot support drop duplication." + - s" Please disable $INSERT_DROP_DUPS and try again.") - // if enableBulkInsert is true, use bulk insert for the insert overwrite non-partitioned table. - case (true, true, _, _, false) => BULK_INSERT_OPERATION_OPT_VAL - // insert overwrite table - case (false, true, _, _, false) => INSERT_OVERWRITE_TABLE_OPERATION_OPT_VAL - // insert overwrite partition - case (_, true, _, _, true) => INSERT_OVERWRITE_OPERATION_OPT_VAL - // disable dropDuplicate, and provide preCombineKey, use the upsert operation for strict and upsert mode. - case (false, false, false, false, _) if hasPreCombineColumn => UPSERT_OPERATION_OPT_VAL - // if table is pk table and has enableBulkInsert use bulk insert for non-strict mode. - case (true, _, _, true, _) => BULK_INSERT_OPERATION_OPT_VAL - // for the rest case, use the insert operation - case _ => INSERT_OPERATION_OPT_VAL - } - - val payloadClassName = if (operation == UPSERT_OPERATION_OPT_VAL && - tableType == COW_TABLE_TYPE_OPT_VAL && insertMode == InsertMode.STRICT) { - // Only validate duplicate key for COW, for MOR it will do the merge with the DefaultHoodieRecordPayload - // on reading. - classOf[ValidateDuplicateKeyPayload].getCanonicalName - } else { - classOf[OverwriteWithLatestAvroPayload].getCanonicalName - } - logInfo(s"insert statement use write operation type: $operation, payloadClass: $payloadClassName") - - val enableHive = isEnableHive(sparkSession) - withSparkConf(sparkSession, options) { - Map( - "path" -> path, - TABLE_TYPE.key -> tableType, - TBL_NAME.key -> hoodieCatalogTable.tableName, - PRECOMBINE_FIELD.key -> preCombineColumn, - OPERATION.key -> operation, - HIVE_STYLE_PARTITIONING.key -> hiveStylePartitioningEnable, - URL_ENCODE_PARTITIONING.key -> urlEncodePartitioning, - KEYGENERATOR_CLASS_NAME.key -> classOf[SqlKeyGenerator].getCanonicalName, - SqlKeyGenerator.ORIGIN_KEYGEN_CLASS_NAME -> keyGeneratorClassName, - RECORDKEY_FIELD.key -> hoodieCatalogTable.primaryKeys.mkString(","), - PARTITIONPATH_FIELD.key -> partitionFields, - PAYLOAD_CLASS_NAME.key -> payloadClassName, - ENABLE_ROW_WRITER.key -> enableBulkInsert.toString, - HoodieWriteConfig.COMBINE_BEFORE_INSERT.key -> String.valueOf(hasPreCombineColumn), - META_SYNC_ENABLED.key -> enableHive.toString, - HIVE_SYNC_MODE.key -> HiveSyncMode.HMS.name(), - HIVE_USE_JDBC.key -> "false", - HIVE_DATABASE.key -> hoodieCatalogTable.table.identifier.database.getOrElse("default"), - HIVE_TABLE.key -> hoodieCatalogTable.table.identifier.table, - HIVE_SUPPORT_TIMESTAMP_TYPE.key -> "true", - HIVE_PARTITION_FIELDS.key -> partitionFields, - HIVE_PARTITION_EXTRACTOR_CLASS.key -> classOf[MultiPartKeysValueExtractor].getCanonicalName, - HoodieWriteConfig.INSERT_PARALLELISM_VALUE.key -> "200", - HoodieWriteConfig.UPSERT_PARALLELISM_VALUE.key -> "200", - SqlKeyGenerator.PARTITION_SCHEMA -> hoodieCatalogTable.partitionSchema.toDDL - ) - } - } } diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/UpdateHoodieTableCommand.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/UpdateHoodieTableCommand.scala index 10c57a7fd..277f26434 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/UpdateHoodieTableCommand.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/UpdateHoodieTableCommand.scala @@ -17,25 +17,21 @@ package org.apache.spark.sql.hudi.command -import org.apache.hudi.DataSourceWriteOptions._ import org.apache.hudi.SparkAdapterSupport import org.apache.hudi.common.model.HoodieRecord -import org.apache.hudi.config.HoodieWriteConfig -import org.apache.hudi.config.HoodieWriteConfig.TBL_NAME -import org.apache.hudi.hive.MultiPartKeysValueExtractor -import org.apache.hudi.hive.ddl.HiveSyncMode import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.catalog.HoodieCatalogTable import org.apache.spark.sql.catalyst.expressions.{Alias, AttributeReference, Expression} import org.apache.spark.sql.catalyst.plans.logical.{Assignment, UpdateTable} import org.apache.spark.sql.hudi.HoodieSqlCommonUtils._ +import org.apache.spark.sql.hudi.ProvidesHoodieConfig import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types.StructField import scala.collection.JavaConverters._ case class UpdateHoodieTableCommand(updateTable: UpdateTable) extends HoodieLeafRunnableCommand - with SparkAdapterSupport { + with SparkAdapterSupport with ProvidesHoodieConfig { private val table = updateTable.table private val tableId = getTableIdentifier(table) @@ -71,7 +67,7 @@ case class UpdateHoodieTableCommand(updateTable: UpdateTable) extends HoodieLeaf df = df.filter(Column(updateTable.condition.get)) } df = df.select(projects: _*) - val config = buildHoodieConfig(sparkSession) + val config = buildHoodieConfig(HoodieCatalogTable(sparkSession, tableId)) df.write .format("hudi") .mode(SaveMode.Append) @@ -82,42 +78,6 @@ case class UpdateHoodieTableCommand(updateTable: UpdateTable) extends HoodieLeaf Seq.empty[Row] } - private def buildHoodieConfig(sparkSession: SparkSession): Map[String, String] = { - val hoodieCatalogTable = HoodieCatalogTable(sparkSession, tableId) - val catalogProperties = hoodieCatalogTable.catalogProperties - val tableConfig = hoodieCatalogTable.tableConfig - - val preCombineColumn = Option(tableConfig.getPreCombineField).getOrElse("") - assert(hoodieCatalogTable.primaryKeys.nonEmpty, - s"There are no primary key in table $tableId, cannot execute update operator") - val enableHive = isEnableHive(sparkSession) - - withSparkConf(sparkSession, catalogProperties) { - Map( - "path" -> hoodieCatalogTable.tableLocation, - RECORDKEY_FIELD.key -> hoodieCatalogTable.primaryKeys.mkString(","), - PRECOMBINE_FIELD.key -> preCombineColumn, - TBL_NAME.key -> hoodieCatalogTable.tableName, - HIVE_STYLE_PARTITIONING.key -> tableConfig.getHiveStylePartitioningEnable, - URL_ENCODE_PARTITIONING.key -> tableConfig.getUrlEncodePartitioning, - KEYGENERATOR_CLASS_NAME.key -> classOf[SqlKeyGenerator].getCanonicalName, - SqlKeyGenerator.ORIGIN_KEYGEN_CLASS_NAME -> tableConfig.getKeyGeneratorClassName, - OPERATION.key -> UPSERT_OPERATION_OPT_VAL, - PARTITIONPATH_FIELD.key -> tableConfig.getPartitionFieldProp, - META_SYNC_ENABLED.key -> enableHive.toString, - HIVE_SYNC_MODE.key -> HiveSyncMode.HMS.name(), - HIVE_USE_JDBC.key -> "false", - HIVE_DATABASE.key -> tableId.database.getOrElse("default"), - HIVE_TABLE.key -> tableId.table, - HIVE_PARTITION_FIELDS.key -> tableConfig.getPartitionFieldProp, - HIVE_PARTITION_EXTRACTOR_CLASS.key -> classOf[MultiPartKeysValueExtractor].getCanonicalName, - HIVE_SUPPORT_TIMESTAMP_TYPE.key -> "true", - HoodieWriteConfig.UPSERT_PARALLELISM_VALUE.key -> "200", - SqlKeyGenerator.PARTITION_SCHEMA -> hoodieCatalogTable.partitionSchema.toDDL - ) - } - } - def cast(exp:Expression, field: StructField, sqlConf: SQLConf): Expression = { castIfNeeded(exp, field.dataType, sqlConf) } diff --git a/hudi-spark-datasource/hudi-spark3/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodieSpark3Analysis.scala b/hudi-spark-datasource/hudi-spark3/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodieSpark3Analysis.scala index 0d4f1ce3f..e20f93459 100644 --- a/hudi-spark-datasource/hudi-spark3/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodieSpark3Analysis.scala +++ b/hudi-spark-datasource/hudi-spark3/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodieSpark3Analysis.scala @@ -28,9 +28,9 @@ import org.apache.spark.sql.connector.catalog.CatalogV2Implicits.IdentifierHelpe import org.apache.spark.sql.execution.datasources.LogicalRelation import org.apache.spark.sql.execution.datasources.PreWriteCheck.failAnalysis import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2Relation, V2SessionCatalog} -import org.apache.spark.sql.hudi.HoodieSqlCommonUtils +import org.apache.spark.sql.hudi.{HoodieSqlCommonUtils, ProvidesHoodieConfig} import org.apache.spark.sql.hudi.HoodieSqlCommonUtils.{castIfNeeded, getTableLocation, removeMetaFields, tableExistsInPath} -import org.apache.spark.sql.hudi.catalog.{HoodieCatalog, HoodieInternalV2Table, ProvidesHoodieConfig} +import org.apache.spark.sql.hudi.catalog.{HoodieCatalog, HoodieInternalV2Table} import org.apache.spark.sql.hudi.command.{AlterHoodieTableDropPartitionCommand, ShowHoodieTablePartitionsCommand, TruncateHoodieTableCommand} import org.apache.spark.sql.types.StructType import org.apache.spark.sql.{AnalysisException, SQLContext, SparkSession} diff --git a/hudi-spark-datasource/hudi-spark3/src/main/scala/org/apache/spark/sql/hudi/catalog/HoodieCatalog.scala b/hudi-spark-datasource/hudi-spark3/src/main/scala/org/apache/spark/sql/hudi/catalog/HoodieCatalog.scala index 7b7f5c7f7..3046af991 100644 --- a/hudi-spark-datasource/hudi-spark3/src/main/scala/org/apache/spark/sql/hudi/catalog/HoodieCatalog.scala +++ b/hudi-spark-datasource/hudi-spark3/src/main/scala/org/apache/spark/sql/hudi/catalog/HoodieCatalog.scala @@ -35,7 +35,7 @@ import org.apache.spark.sql.connector.catalog._ import org.apache.spark.sql.connector.expressions.Transform import org.apache.spark.sql.execution.datasources.DataSource import org.apache.spark.sql.hudi.command.{AlterHoodieTableAddColumnsCommand, AlterHoodieTableChangeColumnCommand, AlterHoodieTableRenameCommand, CreateHoodieTableCommand} -import org.apache.spark.sql.hudi.HoodieSqlCommonUtils +import org.apache.spark.sql.hudi.{HoodieSqlCommonUtils, ProvidesHoodieConfig} import org.apache.spark.sql.types.{StructField, StructType} import org.apache.spark.sql.{Dataset, SaveMode, SparkSession, _} diff --git a/hudi-spark-datasource/hudi-spark3/src/main/scala/org/apache/spark/sql/hudi/catalog/HoodieInternalV2Table.scala b/hudi-spark-datasource/hudi-spark3/src/main/scala/org/apache/spark/sql/hudi/catalog/HoodieInternalV2Table.scala index a9bad42d6..848925aaf 100644 --- a/hudi-spark-datasource/hudi-spark3/src/main/scala/org/apache/spark/sql/hudi/catalog/HoodieInternalV2Table.scala +++ b/hudi-spark-datasource/hudi-spark3/src/main/scala/org/apache/spark/sql/hudi/catalog/HoodieInternalV2Table.scala @@ -24,6 +24,7 @@ import org.apache.spark.sql.connector.catalog.TableCapability._ import org.apache.spark.sql.connector.catalog.{SupportsWrite, Table, TableCapability, V2TableWithV1Fallback} import org.apache.spark.sql.connector.expressions.{FieldReference, IdentityTransform, Transform} import org.apache.spark.sql.connector.write._ +import org.apache.spark.sql.hudi.ProvidesHoodieConfig import org.apache.spark.sql.sources.{Filter, InsertableRelation} import org.apache.spark.sql.types.StructType import org.apache.spark.sql.util.CaseInsensitiveStringMap