[HUDI-4315] Do not throw exception in BaseSpark3Adapter#toTableIdentifier (#5957)
This commit is contained in:
@@ -18,6 +18,7 @@
|
|||||||
package org.apache.spark.sql.hudi
|
package org.apache.spark.sql.hudi
|
||||||
|
|
||||||
import org.apache.hudi.DataSourceWriteOptions.{KEYGENERATOR_CLASS_NAME, MOR_TABLE_TYPE_OPT_VAL, PARTITIONPATH_FIELD, PRECOMBINE_FIELD, RECORDKEY_FIELD, TABLE_TYPE}
|
import org.apache.hudi.DataSourceWriteOptions.{KEYGENERATOR_CLASS_NAME, MOR_TABLE_TYPE_OPT_VAL, PARTITIONPATH_FIELD, PRECOMBINE_FIELD, RECORDKEY_FIELD, TABLE_TYPE}
|
||||||
|
import org.apache.hudi.HoodieSparkUtils
|
||||||
import org.apache.hudi.common.table.{HoodieTableMetaClient, TableSchemaResolver}
|
import org.apache.hudi.common.table.{HoodieTableMetaClient, TableSchemaResolver}
|
||||||
import org.apache.hudi.config.HoodieWriteConfig
|
import org.apache.hudi.config.HoodieWriteConfig
|
||||||
import org.apache.hudi.exception.HoodieDuplicateKeyException
|
import org.apache.hudi.exception.HoodieDuplicateKeyException
|
||||||
@@ -696,4 +697,47 @@ class TestInsertTable extends HoodieSparkSqlTestBase {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
test("Test Insert Into With Catalog Identifier for spark >= 3.2.0") {
|
||||||
|
Seq("hudi", "parquet").foreach { format =>
|
||||||
|
withTempDir { tmp =>
|
||||||
|
val tableName = s"spark_catalog.default.$generateTableName"
|
||||||
|
// Create a partitioned table
|
||||||
|
if (HoodieSparkUtils.gteqSpark3_2) {
|
||||||
|
spark.sql(
|
||||||
|
s"""
|
||||||
|
|create table $tableName (
|
||||||
|
| id int,
|
||||||
|
| name string,
|
||||||
|
| price double,
|
||||||
|
| ts long,
|
||||||
|
| dt string
|
||||||
|
|) using $format
|
||||||
|
| tblproperties (primaryKey = 'id')
|
||||||
|
| partitioned by (dt)
|
||||||
|
| location '${tmp.getCanonicalPath}'
|
||||||
|
""".stripMargin)
|
||||||
|
// Insert into dynamic partition
|
||||||
|
spark.sql(
|
||||||
|
s"""
|
||||||
|
| insert into $tableName
|
||||||
|
| select 1 as id, 'a1' as name, 10 as price, 1000 as ts, '2021-01-05' as dt
|
||||||
|
""".stripMargin)
|
||||||
|
checkAnswer(s"select id, name, price, ts, dt from $tableName")(
|
||||||
|
Seq(1, "a1", 10.0, 1000, "2021-01-05")
|
||||||
|
)
|
||||||
|
// Insert into static partition
|
||||||
|
spark.sql(
|
||||||
|
s"""
|
||||||
|
| insert into $tableName partition(dt = '2021-01-05')
|
||||||
|
| select 2 as id, 'a2' as name, 10 as price, 1000 as ts
|
||||||
|
""".stripMargin)
|
||||||
|
checkAnswer(s"select id, name, price, ts, dt from $tableName")(
|
||||||
|
Seq(1, "a1", 10.0, 1000, "2021-01-05"),
|
||||||
|
Seq(2, "a2", 10.0, 1000, "2021-01-05")
|
||||||
|
)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -19,8 +19,9 @@ package org.apache.spark.sql.adapter
|
|||||||
|
|
||||||
import org.apache.hudi.Spark3RowSerDe
|
import org.apache.hudi.Spark3RowSerDe
|
||||||
import org.apache.hudi.client.utils.SparkRowSerDe
|
import org.apache.hudi.client.utils.SparkRowSerDe
|
||||||
import org.apache.spark.SPARK_VERSION
|
|
||||||
import org.apache.hudi.spark3.internal.ReflectUtil
|
import org.apache.hudi.spark3.internal.ReflectUtil
|
||||||
|
import org.apache.spark.SPARK_VERSION
|
||||||
|
import org.apache.spark.internal.Logging
|
||||||
import org.apache.spark.sql.avro.{HoodieAvroSchemaConverters, HoodieSparkAvroSchemaConverters}
|
import org.apache.spark.sql.avro.{HoodieAvroSchemaConverters, HoodieSparkAvroSchemaConverters}
|
||||||
import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation
|
import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation
|
||||||
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
|
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
|
||||||
@@ -28,21 +29,21 @@ import org.apache.spark.sql.catalyst.expressions.{Expression, InterpretedPredica
|
|||||||
import org.apache.spark.sql.catalyst.parser.ParserInterface
|
import org.apache.spark.sql.catalyst.parser.ParserInterface
|
||||||
import org.apache.spark.sql.catalyst.plans.JoinType
|
import org.apache.spark.sql.catalyst.plans.JoinType
|
||||||
import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoStatement, Join, JoinHint, LogicalPlan}
|
import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoStatement, Join, JoinHint, LogicalPlan}
|
||||||
import org.apache.spark.sql.catalyst.rules.Rule
|
|
||||||
import org.apache.spark.sql.catalyst.{AliasIdentifier, TableIdentifier}
|
import org.apache.spark.sql.catalyst.{AliasIdentifier, TableIdentifier}
|
||||||
import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._
|
import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._
|
||||||
import org.apache.spark.sql.connector.catalog.Table
|
import org.apache.spark.sql.connector.catalog.Table
|
||||||
import org.apache.spark.sql.execution.datasources._
|
import org.apache.spark.sql.execution.datasources._
|
||||||
import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
|
import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
|
||||||
import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat
|
|
||||||
import org.apache.spark.sql.hudi.SparkAdapter
|
import org.apache.spark.sql.hudi.SparkAdapter
|
||||||
import org.apache.spark.sql.internal.SQLConf
|
import org.apache.spark.sql.internal.SQLConf
|
||||||
import org.apache.spark.sql.{Row, SparkSession}
|
import org.apache.spark.sql.{Row, SparkSession}
|
||||||
|
|
||||||
|
import scala.util.control.NonFatal
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Base implementation of [[SparkAdapter]] for Spark 3.x branch
|
* Base implementation of [[SparkAdapter]] for Spark 3.x branch
|
||||||
*/
|
*/
|
||||||
abstract class BaseSpark3Adapter extends SparkAdapter {
|
abstract class BaseSpark3Adapter extends SparkAdapter with Logging {
|
||||||
|
|
||||||
override def createSparkRowSerDe(encoder: ExpressionEncoder[Row]): SparkRowSerDe = {
|
override def createSparkRowSerDe(encoder: ExpressionEncoder[Row]): SparkRowSerDe = {
|
||||||
new Spark3RowSerDe(encoder)
|
new Spark3RowSerDe(encoder)
|
||||||
@@ -115,7 +116,13 @@ abstract class BaseSpark3Adapter extends SparkAdapter {
|
|||||||
unfoldSubqueryAliases(table) match {
|
unfoldSubqueryAliases(table) match {
|
||||||
case LogicalRelation(_, _, Some(table), _) => isHoodieTable(table)
|
case LogicalRelation(_, _, Some(table), _) => isHoodieTable(table)
|
||||||
case relation: UnresolvedRelation =>
|
case relation: UnresolvedRelation =>
|
||||||
isHoodieTable(toTableIdentifier(relation), spark)
|
try {
|
||||||
|
isHoodieTable(toTableIdentifier(relation), spark)
|
||||||
|
} catch {
|
||||||
|
case NonFatal(e) =>
|
||||||
|
logWarning("Failed to determine whether the table is a hoodie table", e)
|
||||||
|
false
|
||||||
|
}
|
||||||
case DataSourceV2Relation(table: Table, _, _, _, _) => isHoodieTable(table.properties())
|
case DataSourceV2Relation(table: Table, _, _, _, _) => isHoodieTable(table.properties())
|
||||||
case _=> false
|
case _=> false
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user