1
0

[HUDI-4081][HUDI-4472] Addressing Spark SQL vs Spark DS performance gap (#6213)

This commit is contained in:
Alexey Kudinkin
2022-07-28 15:36:03 -07:00
committed by GitHub
parent 70b5cf6dab
commit cfd0c1ee34
14 changed files with 382 additions and 189 deletions

View File

@@ -19,12 +19,33 @@ package org.apache.spark.sql
import org.apache.spark.sql.catalyst.{AliasIdentifier, TableIdentifier}
import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation
import org.apache.spark.sql.catalyst.expressions.Expression
import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression}
import org.apache.spark.sql.catalyst.plans.JoinType
import org.apache.spark.sql.catalyst.plans.logical.{Join, LogicalPlan}
import org.apache.spark.sql.internal.SQLConf
trait HoodieCatalystPlansUtils {
/**
* Resolves output of the provided [[query]] against the [[expected]] list of [[Attribute]],
* and returns new (reshaped) instance of the [[LogicalPlan]]
*
* @param tableName used purely for more human-readable error output (if any)
* @param expected list of attributes output of the query has to adhere to
* @param query query whose output has to be reshaped
* @param byName whether the matching should occur by-name or positionally
* @param conf instance of [[SQLConf]]
* @return [[LogicalPlan]] which output is aligned to match to that of [[expected]]
*/
def resolveOutputColumns(tableName: String,
expected: Seq[Attribute],
query: LogicalPlan,
byName: Boolean,
conf: SQLConf): LogicalPlan
/**
* Instantiates an [[Explain]] command
*/
def createExplainCommand(plan: LogicalPlan, extended: Boolean): LogicalPlan
/**

View File

@@ -316,6 +316,9 @@ public class TableSchemaResolver {
* @param oldSchema Older schema to check.
* @param newSchema Newer schema to check.
* @return True if the schema validation is successful
*
* TODO revisit this method: it's implemented incorrectly as it might be applying different criteria
* to top-level record and nested record (for ex, if that nested record is contained w/in an array)
*/
public static boolean isSchemaCompatible(Schema oldSchema, Schema newSchema) {
if (oldSchema.getType() == newSchema.getType() && newSchema.getType() == Schema.Type.RECORD) {
@@ -366,13 +369,31 @@ public class TableSchemaResolver {
return isSchemaCompatible(new Schema.Parser().parse(oldSchema), new Schema.Parser().parse(newSchema));
}
/**
* Returns table's latest Avro {@link Schema} iff table is non-empty (ie there's at least
* a single commit)
*
* This method differs from {@link #getTableAvroSchema(boolean)} in that it won't fallback
* to use table's schema used at creation
*/
public Option<Schema> getTableAvroSchemaFromLatestCommit(boolean includeMetadataFields) throws Exception {
if (metaClient.isTimelineNonEmpty()) {
return Option.of(getTableAvroSchemaInternal(includeMetadataFields, Option.empty()));
}
return Option.empty();
}
/**
* Get latest schema either from incoming schema or table schema.
* @param writeSchema incoming batch's write schema.
* @param convertTableSchemaToAddNamespace {@code true} if table schema needs to be converted. {@code false} otherwise.
* @param converterFn converter function to be called over table schema (to add namespace may be). Each caller can decide if any conversion is required.
* @return the latest schema.
*
* @deprecated will be removed (HUDI-4472)
*/
@Deprecated
public Schema getLatestSchema(Schema writeSchema, boolean convertTableSchemaToAddNamespace,
Function1<Schema, Schema> converterFn) {
Schema latestSchema = writeSchema;

View File

@@ -22,7 +22,7 @@ import org.apache.avro.generic.GenericRecord
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hudi.DataSourceWriteOptions._
import org.apache.hudi.HoodieConversionUtils.toProperties
import org.apache.hudi.HoodieConversionUtils.{toProperties, toScalaOption}
import org.apache.hudi.HoodieWriterUtils._
import org.apache.hudi.avro.HoodieAvroUtils
import org.apache.hudi.client.{HoodieWriteResult, SparkRDDWriteClient}
@@ -31,7 +31,7 @@ import org.apache.hudi.common.fs.FSUtils
import org.apache.hudi.common.model._
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline
import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient, TableSchemaResolver}
import org.apache.hudi.common.util.{CommitUtils, StringUtils}
import org.apache.hudi.common.util.{CommitUtils, Functions, StringUtils}
import org.apache.hudi.config.HoodieBootstrapConfig.{BASE_PATH, INDEX_CLASS_NAME}
import org.apache.hudi.config.{HoodieInternalConfig, HoodieWriteConfig}
import org.apache.hudi.exception.HoodieException
@@ -72,8 +72,7 @@ object HoodieSparkSqlWriter {
hoodieTableConfigOpt: Option[HoodieTableConfig] = Option.empty,
hoodieWriteClient: Option[SparkRDDWriteClient[HoodieRecordPayload[Nothing]]] = Option.empty,
asyncCompactionTriggerFn: Option[SparkRDDWriteClient[HoodieRecordPayload[Nothing]] => Unit] = Option.empty,
asyncClusteringTriggerFn: Option[SparkRDDWriteClient[HoodieRecordPayload[Nothing]] => Unit] = Option.empty
)
asyncClusteringTriggerFn: Option[SparkRDDWriteClient[HoodieRecordPayload[Nothing]] => Unit] = Option.empty)
: (Boolean, common.util.Option[String], common.util.Option[String], common.util.Option[String],
SparkRDDWriteClient[HoodieRecordPayload[Nothing]], HoodieTableConfig) = {
@@ -241,39 +240,49 @@ object HoodieSparkSqlWriter {
sparkContext.getConf.registerKryoClasses(
Array(classOf[org.apache.avro.generic.GenericData],
classOf[org.apache.avro.Schema]))
var schema = AvroConversionUtils.convertStructTypeToAvroSchema(df.schema, structName, nameSpace)
val lastestSchema = getLatestTableSchema(fs, basePath, sparkContext, schema)
var internalSchemaOpt = getLatestTableInternalSchema(fs, basePath, sparkContext)
if (reconcileSchema && parameters.getOrDefault(DataSourceReadOptions.SCHEMA_EVOLUTION_ENABLED.key(), "false").toBoolean
&& internalSchemaOpt.isEmpty) {
// force apply full schema evolution.
internalSchemaOpt = Some(AvroInternalSchemaConverter.convert(schema))
}
if (reconcileSchema) {
schema = lastestSchema
}
if (internalSchemaOpt.isDefined) {
// Apply schema evolution.
val mergedSparkSchema = if (!reconcileSchema) {
AvroConversionUtils.convertAvroSchemaToStructType(AvroSchemaEvolutionUtils.canonicalizeColumnNullability(schema, lastestSchema))
} else {
// Auto merge write schema and read schema.
val mergedInternalSchema = AvroSchemaEvolutionUtils.reconcileSchema(schema, internalSchemaOpt.get)
AvroConversionUtils.convertAvroSchemaToStructType(AvroInternalSchemaConverter.convert(mergedInternalSchema, lastestSchema.getName))
}
schema = AvroConversionUtils.convertStructTypeToAvroSchema(mergedSparkSchema, structName, nameSpace)
}
if (reconcileSchema && internalSchemaOpt.isEmpty) {
schema = lastestSchema
}
validateSchemaForHoodieIsDeleted(schema)
sparkContext.getConf.registerAvroSchemas(schema)
log.info(s"Registered avro schema : ${schema.toString(true)}")
// TODO(HUDI-4472) revisit and simplify schema handling
val sourceSchema = AvroConversionUtils.convertStructTypeToAvroSchema(df.schema, structName, nameSpace)
val latestTableSchema = getLatestTableSchema(fs, basePath, sparkContext).getOrElse(sourceSchema)
val schemaEvolutionEnabled = parameters.getOrDefault(DataSourceReadOptions.SCHEMA_EVOLUTION_ENABLED.key(), "false").toBoolean
var internalSchemaOpt = getLatestTableInternalSchema(fs, basePath, sparkContext)
val writerSchema: Schema =
if (reconcileSchema) {
// In case we need to reconcile the schema and schema evolution is enabled,
// we will force-apply schema evolution to the writer's schema
if (schemaEvolutionEnabled && internalSchemaOpt.isEmpty) {
internalSchemaOpt = Some(AvroInternalSchemaConverter.convert(sourceSchema))
}
if (internalSchemaOpt.isDefined) {
// Apply schema evolution, by auto-merging write schema and read schema
val mergedInternalSchema = AvroSchemaEvolutionUtils.reconcileSchema(sourceSchema, internalSchemaOpt.get)
AvroInternalSchemaConverter.convert(mergedInternalSchema, latestTableSchema.getName)
} else if (TableSchemaResolver.isSchemaCompatible(sourceSchema, latestTableSchema)) {
// In case schema reconciliation is enabled and source and latest table schemas
// are compatible (as defined by [[TableSchemaResolver#isSchemaCompatible]], then we will
// pick latest table's schema as the writer's schema
latestTableSchema
} else {
// Otherwise fallback to original source's schema
sourceSchema
}
} else {
// In case reconciliation is disabled, we still have to do nullability attributes
// (minor) reconciliation, making sure schema of the incoming batch is in-line with
// the data already committed in the table
AvroSchemaEvolutionUtils.canonicalizeColumnNullability(sourceSchema, latestTableSchema)
}
validateSchemaForHoodieIsDeleted(writerSchema)
sparkContext.getConf.registerAvroSchemas(writerSchema)
log.info(s"Registered avro schema : ${writerSchema.toString(true)}")
// Convert to RDD[HoodieRecord]
val genericRecords: RDD[GenericRecord] = HoodieSparkUtils.createRdd(df, structName, nameSpace, reconcileSchema,
org.apache.hudi.common.util.Option.of(schema))
org.apache.hudi.common.util.Option.of(writerSchema))
val shouldCombine = parameters(INSERT_DROP_DUPS.key()).toBoolean ||
operation.equals(WriteOperationType.UPSERT) ||
parameters.getOrElse(HoodieWriteConfig.COMBINE_BEFORE_INSERT.key(),
@@ -295,10 +304,10 @@ object HoodieSparkSqlWriter {
hoodieRecord
}).toJavaRDD()
val writeSchema = if (dropPartitionColumns) generateSchemaWithoutPartitionColumns(partitionColumns, schema) else schema
val writerDataSchema = if (dropPartitionColumns) generateSchemaWithoutPartitionColumns(partitionColumns, writerSchema) else writerSchema
// Create a HoodieWriteClient & issue the write.
val client = hoodieWriteClient.getOrElse(DataSourceUtils.createHoodieClient(jsc, writeSchema.toString, path,
val client = hoodieWriteClient.getOrElse(DataSourceUtils.createHoodieClient(jsc, writerDataSchema.toString, path,
tblName, mapAsJavaMap(addSchemaEvolutionParameters(parameters, internalSchemaOpt) - HoodieWriteConfig.AUTO_COMMIT_ENABLE.key)
)).asInstanceOf[SparkRDDWriteClient[HoodieRecordPayload[Nothing]]]
@@ -388,14 +397,18 @@ object HoodieSparkSqlWriter {
* @param schema incoming record's schema.
* @return Pair of(boolean, table schema), where first entry will be true only if schema conversion is required.
*/
def getLatestTableSchema(fs: FileSystem, basePath: Path, sparkContext: SparkContext, schema: Schema): Schema = {
var latestSchema: Schema = schema
def getLatestTableSchema(fs: FileSystem, basePath: Path, sparkContext: SparkContext): Option[Schema] = {
if (FSUtils.isTableExists(basePath.toString, fs)) {
val tableMetaClient = HoodieTableMetaClient.builder.setConf(sparkContext.hadoopConfiguration).setBasePath(basePath.toString).build()
val tableMetaClient = HoodieTableMetaClient.builder
.setConf(sparkContext.hadoopConfiguration)
.setBasePath(basePath.toString)
.build()
val tableSchemaResolver = new TableSchemaResolver(tableMetaClient)
latestSchema = tableSchemaResolver.getLatestSchema(schema, false, null)
toScalaOption(tableSchemaResolver.getTableAvroSchemaFromLatestCommit(false))
} else {
None
}
latestSchema
}
def registerKryoClassesAndGetGenericRecords(tblName: String, sparkContext: SparkContext, df: Dataset[Row],

View File

@@ -317,8 +317,8 @@ object HoodieSqlCommonUtils extends SparkAdapterSupport {
def castIfNeeded(child: Expression, dataType: DataType, conf: SQLConf): Expression = {
child match {
case Literal(nul, NullType) => Literal(nul, dataType)
case _ => if (child.dataType != dataType)
Cast(child, dataType, Option(conf.sessionLocalTimeZone)) else child
case expr if child.dataType != dataType => Cast(expr, dataType, Option(conf.sessionLocalTimeZone))
case _ => child
}
}

View File

@@ -17,27 +17,39 @@
package org.apache.spark.sql.hudi.command
import org.apache.hudi.HoodieSparkSqlWriter
import org.apache.hudi.exception.HoodieException
import org.apache.hudi.{HoodieSparkSqlWriter, SparkAdapterSupport}
import org.apache.spark.internal.Logging
import org.apache.spark.sql.catalyst.catalog.{CatalogTable, HoodieCatalogTable}
import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, Literal}
import org.apache.spark.sql.catalyst.expressions.{Alias, Cast, Literal, NamedExpression}
import org.apache.spark.sql.catalyst.plans.QueryPlan
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project}
import org.apache.spark.sql.execution.datasources.LogicalRelation
import org.apache.spark.sql.hudi.HoodieSqlCommonUtils._
import org.apache.spark.sql.hudi.ProvidesHoodieConfig
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types.{StructField, StructType}
import org.apache.spark.sql.{Dataset, Row, SaveMode, SparkSession}
import org.apache.spark.sql.types.{StringType, StructField, StructType}
import org.apache.spark.sql._
/**
* Command for insert into hoodie table.
* Command for insert into Hudi table.
*
* This is correspondent to Spark's native [[InsertIntoStatement]]
*
* @param logicalRelation the [[LogicalRelation]] representing the table to be writing into.
* @param query the logical plan representing data to be written
* @param partitionSpec a map from the partition key to the partition value (optional).
* If the value is missing, dynamic partition insert will be performed.
* As an example, `INSERT INTO tbl PARTITION (a=1, b=2) AS` would have
* Map('a' -> Some('1'), 'b' -> Some('2')),
* and `INSERT INTO tbl PARTITION (a=1, b) AS ...`
* would have Map('a' -> Some('1'), 'b' -> None).
* @param overwrite overwrite existing table or partitions.
*/
case class InsertIntoHoodieTableCommand(
logicalRelation: LogicalRelation,
query: LogicalPlan,
partition: Map[String, Option[String]],
overwrite: Boolean)
case class InsertIntoHoodieTableCommand(logicalRelation: LogicalRelation,
query: LogicalPlan,
partitionSpec: Map[String, Option[String]],
overwrite: Boolean)
extends HoodieLeafRunnableCommand {
override def innerChildren: Seq[QueryPlan[_]] = Seq(query)
@@ -45,18 +57,19 @@ case class InsertIntoHoodieTableCommand(
assert(logicalRelation.catalogTable.isDefined, "Missing catalog table")
val table = logicalRelation.catalogTable.get
InsertIntoHoodieTableCommand.run(sparkSession, table, query, partition, overwrite)
InsertIntoHoodieTableCommand.run(sparkSession, table, query, partitionSpec, overwrite)
Seq.empty[Row]
}
}
object InsertIntoHoodieTableCommand extends Logging with ProvidesHoodieConfig {
object InsertIntoHoodieTableCommand extends Logging with ProvidesHoodieConfig with SparkAdapterSupport {
/**
* Run the insert query. We support both dynamic partition insert and static partition insert.
* @param sparkSession The spark session.
* @param table The insert table.
* @param query The insert query.
* @param insertPartitions The specified insert partition map.
* @param partitionSpec The specified insert partition map.
* e.g. "insert into h(dt = '2021') select id, name from src"
* "dt" is the key in the map and "2021" is the partition value. If the
* partition value has not specified(in the case of dynamic partition)
@@ -66,103 +79,139 @@ object InsertIntoHoodieTableCommand extends Logging with ProvidesHoodieConfig {
* @param extraOptions Extra options for insert.
*/
def run(sparkSession: SparkSession,
table: CatalogTable,
query: LogicalPlan,
insertPartitions: Map[String, Option[String]],
overwrite: Boolean,
refreshTable: Boolean = true,
extraOptions: Map[String, String] = Map.empty): Boolean = {
table: CatalogTable,
query: LogicalPlan,
partitionSpec: Map[String, Option[String]],
overwrite: Boolean,
refreshTable: Boolean = true,
extraOptions: Map[String, String] = Map.empty): Boolean = {
val catalogTable = new HoodieCatalogTable(sparkSession, table)
val config = buildHoodieInsertConfig(catalogTable, sparkSession, overwrite, partitionSpec, extraOptions)
val hoodieCatalogTable = new HoodieCatalogTable(sparkSession, table)
val config = buildHoodieInsertConfig(hoodieCatalogTable, sparkSession, overwrite, insertPartitions, extraOptions)
val mode = if (overwrite && hoodieCatalogTable.partitionFields.isEmpty) {
// insert overwrite non-partition table
// NOTE: In case of partitioned table we override specified "overwrite" parameter
// to instead append to the dataset
val mode = if (overwrite && catalogTable.partitionFields.isEmpty) {
SaveMode.Overwrite
} else {
// for insert into or insert overwrite partition we use append mode.
SaveMode.Append
}
val conf = sparkSession.sessionState.conf
val alignedQuery = alignOutputFields(query, hoodieCatalogTable, insertPartitions, conf)
// If we create dataframe using the Dataset.ofRows(sparkSession, alignedQuery),
// The nullable attribute of fields will lost.
// In order to pass the nullable attribute to the inputDF, we specify the schema
// of the rdd.
val inputDF = sparkSession.createDataFrame(
Dataset.ofRows(sparkSession, alignedQuery).rdd, alignedQuery.schema)
val success =
HoodieSparkSqlWriter.write(sparkSession.sqlContext, mode, config, inputDF)._1
if (success) {
if (refreshTable) {
sparkSession.catalog.refreshTable(table.identifier.unquotedString)
}
true
} else {
false
val alignedQuery = alignQueryOutput(query, catalogTable, partitionSpec, sparkSession.sessionState.conf)
val (success, _, _, _, _, _) = HoodieSparkSqlWriter.write(sparkSession.sqlContext, mode, config, Dataset.ofRows(sparkSession, alignedQuery))
if (success && refreshTable) {
sparkSession.catalog.refreshTable(table.identifier.unquotedString)
}
success
}
/**
* Aligned the type and name of query's output fields with the result table's fields.
* @param query The insert query which to aligned.
* @param hoodieCatalogTable The result hoodie catalog table.
* @param insertPartitions The insert partition map.
* @param conf The SQLConf.
* @return
* Align provided [[query]]'s output with the expected [[catalogTable]] schema by
*
* <ul>
* <li>Performing type coercion (casting corresponding outputs, where needed)</li>
* <li>Adding aliases (matching column names) to corresponding outputs </li>
* </ul>
*
* @param query target query whose output is to be inserted
* @param catalogTable catalog table
* @param partitionsSpec partition spec specifying static/dynamic partition values
* @param conf Spark's [[SQLConf]]
*/
private def alignOutputFields(
query: LogicalPlan,
hoodieCatalogTable: HoodieCatalogTable,
insertPartitions: Map[String, Option[String]],
conf: SQLConf): LogicalPlan = {
private def alignQueryOutput(query: LogicalPlan,
catalogTable: HoodieCatalogTable,
partitionsSpec: Map[String, Option[String]],
conf: SQLConf): LogicalPlan = {
val targetPartitionSchema = hoodieCatalogTable.partitionSchema
val targetPartitionSchema = catalogTable.partitionSchema
val staticPartitionValues = filterStaticPartitionValues(partitionsSpec)
val staticPartitionValues = insertPartitions.filter(p => p._2.isDefined).mapValues(_.get)
assert(staticPartitionValues.isEmpty ||
insertPartitions.size == targetPartitionSchema.size,
s"Required partition columns is: ${targetPartitionSchema.json}, Current input partitions " +
s"is: ${staticPartitionValues.mkString("," + "")}")
validate(removeMetaFields(query.schema), partitionsSpec, catalogTable)
// Make sure we strip out meta-fields from the incoming dataset (these will have to be discarded anyway)
val cleanedQuery = stripMetaFields(query)
// To validate and align properly output of the query, we simply filter out partition columns with already
// provided static values from the table's schema
//
// NOTE: This is a crucial step: since coercion might rely on either of a) name-based or b) positional-based
// matching it's important to strip out partition columns, having static values provided in the partition spec,
// since such columns wouldn't be otherwise specified w/in the query itself and therefore couldn't be matched
// positionally for example
val expectedQueryColumns = catalogTable.tableSchemaWithoutMetaFields.filterNot(f => staticPartitionValues.contains(f.name))
val coercedQueryOutput = coerceQueryOutputColumns(StructType(expectedQueryColumns), cleanedQuery, catalogTable, conf)
val queryOutputWithoutMetaFields = removeMetaFields(query.output)
assert(staticPartitionValues.size + queryOutputWithoutMetaFields.size
== hoodieCatalogTable.tableSchemaWithoutMetaFields.size,
s"Required select columns count: ${hoodieCatalogTable.tableSchemaWithoutMetaFields.size}, " +
s"Current select columns(including static partition column) count: " +
s"${staticPartitionValues.size + queryOutputWithoutMetaFields.size}columns: " +
s"(${(queryOutputWithoutMetaFields.map(_.name) ++ staticPartitionValues.keys).mkString(",")})")
val staticPartitionValuesExprs = createStaticPartitionValuesExpressions(staticPartitionValues, targetPartitionSchema, conf)
val dataAndDynamicPartitionSchemaWithoutMetaFields = StructType(
hoodieCatalogTable.tableSchemaWithoutMetaFields.filterNot(f => staticPartitionValues.contains(f.name)))
val dataProjectsWithoutMetaFields = getTableFieldsAlias(queryOutputWithoutMetaFields,
dataAndDynamicPartitionSchemaWithoutMetaFields.fields, conf)
val partitionProjects = targetPartitionSchema.fields.filter(f => staticPartitionValues.contains(f.name))
.map(f => {
val staticPartitionValue = staticPartitionValues.getOrElse(f.name,
s"Missing static partition value for: ${f.name}")
val castAttr = castIfNeeded(Literal.create(staticPartitionValue), f.dataType, conf)
Alias(castAttr, f.name)()
})
Project(dataProjectsWithoutMetaFields ++ partitionProjects, query)
Project(coercedQueryOutput.output ++ staticPartitionValuesExprs, coercedQueryOutput)
}
private def getTableFieldsAlias(
queryOutputWithoutMetaFields: Seq[Attribute],
schemaWithoutMetaFields: Seq[StructField],
conf: SQLConf): Seq[Alias] = {
queryOutputWithoutMetaFields.zip(schemaWithoutMetaFields).map { case (dataAttr, dataField) =>
val targetAttrOption = if (dataAttr.name.startsWith("col")) {
None
} else {
queryOutputWithoutMetaFields.find(_.name.equals(dataField.name))
}
val targetAttr = targetAttrOption.getOrElse(dataAttr)
val castAttr = castIfNeeded(targetAttr.withNullability(dataField.nullable),
dataField.dataType, conf)
Alias(castAttr, dataField.name)()
private def coerceQueryOutputColumns(expectedSchema: StructType,
query: LogicalPlan,
catalogTable: HoodieCatalogTable,
conf: SQLConf): LogicalPlan = {
val planUtils = sparkAdapter.getCatalystPlanUtils
try {
planUtils.resolveOutputColumns(catalogTable.catalogTableName, expectedSchema.toAttributes, query, byName = true, conf)
} catch {
// NOTE: In case matching by name didn't match the query output, we will attempt positional matching
case ae: AnalysisException if ae.getMessage().startsWith("Cannot write incompatible data to table") =>
planUtils.resolveOutputColumns(catalogTable.catalogTableName, expectedSchema.toAttributes, query, byName = false, conf)
}
}
private def validate(queryOutputSchema: StructType, partitionsSpec: Map[String, Option[String]], catalogTable: HoodieCatalogTable): Unit = {
// Validate that partition-spec has proper format (it could be empty if all of the partition values are dynamic,
// ie there are no static partition-values specified)
if (partitionsSpec.nonEmpty && partitionsSpec.size != catalogTable.partitionSchema.size) {
throw new HoodieException(s"Required partition schema is: ${catalogTable.partitionSchema.fieldNames.mkString("[", ", ", "]")}, " +
s"partition spec is: ${partitionsSpec.mkString("[", ", ", "]")}")
}
val staticPartitionValues = filterStaticPartitionValues(partitionsSpec)
val fullQueryOutputSchema = StructType(queryOutputSchema.fields ++ staticPartitionValues.keys.map(StructField(_, StringType)))
// Assert that query provides all the required columns
if (!conforms(fullQueryOutputSchema, catalogTable.tableSchemaWithoutMetaFields)) {
throw new HoodieException(s"Expected table's schema: ${catalogTable.tableSchemaWithoutMetaFields.fields.mkString("[", ", ", "]")}, " +
s"query's output (including static partition values): ${fullQueryOutputSchema.fields.mkString("[", ", ", "]")}")
}
}
private def createStaticPartitionValuesExpressions(staticPartitionValues: Map[String, String],
partitionSchema: StructType,
conf: SQLConf): Seq[NamedExpression] = {
partitionSchema.fields
.filter(pf => staticPartitionValues.contains(pf.name))
.map(pf => {
val staticPartitionValue = staticPartitionValues(pf.name)
val castExpr = castIfNeeded(Literal.create(staticPartitionValue), pf.dataType, conf)
Alias(castExpr, pf.name)()
})
}
private def conforms(sourceSchema: StructType, targetSchema: StructType): Boolean = {
if (sourceSchema.fields.length != targetSchema.fields.length) {
false
} else {
targetSchema.fields.zip(sourceSchema).forall {
case (targetColumn, sourceColumn) =>
// Make sure we can cast source column to the target column type
Cast.canCast(sourceColumn.dataType, targetColumn.dataType)
}
}
}
def stripMetaFields(query: LogicalPlan): LogicalPlan = {
val filteredOutput = query.output.filterNot(attr => isMetaField(attr.name))
if (filteredOutput == query.output) {
query
} else {
Project(filteredOutput, query)
}
}
private def filterStaticPartitionValues(partitionsSpec: Map[String, Option[String]]): Map[String, String] =
partitionsSpec.filter(p => p._2.isDefined).mapValues(_.get)
}

View File

@@ -145,11 +145,23 @@ class HoodieSparkSqlTestBase extends FunSuite with BeforeAndAfterAll {
assertResult(true)(hasException)
}
protected def removeQuotes(value: Any): Any = {
def dropTypeLiteralPrefix(value: Any): Any = {
value match {
case s: String => s.stripPrefix("'").stripSuffix("'")
case _=> value
case s: String =>
s.stripPrefix("DATE").stripPrefix("TIMESTAMP").stripPrefix("X")
case _ => value
}
}
protected def extractRawValue(value: Any): Any = {
value match {
case s: String =>
// We need to strip out data-type prefixes like "DATE", "TIMESTAMP"
dropTypeLiteralPrefix(s)
.asInstanceOf[String]
.stripPrefix("'")
.stripSuffix("'")
case _ => value
}
}

View File

@@ -18,6 +18,7 @@
package org.apache.spark.sql.hudi
import org.apache.hudi.DataSourceWriteOptions._
import org.apache.hudi.HoodieSparkUtils.isSpark2
import org.apache.hudi.config.HoodieWriteConfig
import org.apache.hudi.keygen.SimpleKeyGenerator
import org.apache.spark.sql.SaveMode
@@ -93,11 +94,20 @@ class TestDeleteTable extends HoodieSparkSqlTestBase {
""".stripMargin)
// insert data to table
spark.sql(
s"""
|insert into $tableName
|values (1, 'a1', 10.0, 1000), (2, 'a2', 20.0, 1000), (3, 'a2', 30.0, 1000)
""".stripMargin)
if (isSpark2) {
spark.sql(
s"""
|insert into $tableName
|values (1, 'a1', cast(10.0 as double), 1000), (2, 'a2', cast(20.0 as double), 1000), (3, 'a2', cast(30.0 as double), 1000)
|""".stripMargin)
} else {
spark.sql(
s"""
|insert into $tableName
|values (1, 'a1', 10.0, 1000), (2, 'a2', 20.0, 1000), (3, 'a2', 30.0, 1000)
|""".stripMargin)
}
checkAnswer(s"select id, name, price, ts from $tableName")(
Seq(1, "a1", 10.0, 1000),
Seq(2, "a2", 20.0, 1000),
@@ -132,11 +142,20 @@ class TestDeleteTable extends HoodieSparkSqlTestBase {
""".stripMargin)
// insert data to table
spark.sql(
s"""
|insert into $ptTableName
|values (1, 'a1', 10.0, 1000, "2021"), (2, 'a2', 20.0, 1000, "2021"), (3, 'a2', 30.0, 1000, "2022")
""".stripMargin)
if (isSpark2) {
spark.sql(
s"""
|insert into $ptTableName
|values (1, 'a1', cast(10.0 as double), 1000, "2021"), (2, 'a2', cast(20.0 as double), 1000, "2021"), (3, 'a2', cast(30.0 as double), 1000, "2022")
|""".stripMargin)
} else {
spark.sql(
s"""
|insert into $ptTableName
|values (1, 'a1', 10.0, 1000, "2021"), (2, 'a2', 20.0, 1000, "2021"), (3, 'a2', 30.0, 1000, "2022")
|""".stripMargin)
}
checkAnswer(s"select id, name, price, ts, pt from $ptTableName")(
Seq(1, "a1", 10.0, 1000, "2021"),
Seq(2, "a2", 20.0, 1000, "2021"),

View File

@@ -24,6 +24,7 @@ import org.apache.hudi.config.HoodieWriteConfig
import org.apache.hudi.exception.HoodieDuplicateKeyException
import org.apache.hudi.keygen.ComplexKeyGenerator
import org.apache.spark.sql.SaveMode
import org.apache.spark.sql.internal.SQLConf
import java.io.File
@@ -396,8 +397,8 @@ class TestInsertTable extends HoodieSparkSqlTestBase {
("string", "'1000'"),
("int", 1000),
("bigint", 10000),
("timestamp", "'2021-05-20 00:00:00'"),
("date", "'2021-05-20'")
("timestamp", "TIMESTAMP'2021-05-20 00:00:00'"),
("date", "DATE'2021-05-20'")
)
typeAndValue.foreach { case (partitionType, partitionValue) =>
val tableName = generateTableName
@@ -409,8 +410,8 @@ class TestInsertTable extends HoodieSparkSqlTestBase {
test("Test TimestampType Partition Column With Consistent Logical Timestamp Enabled") {
withTempDir { tmp =>
val typeAndValue = Seq(
("timestamp", "'2021-05-20 00:00:00'"),
("date", "'2021-05-20'")
("timestamp", "TIMESTAMP'2021-05-20 00:00:00'"),
("date", "DATE'2021-05-20'")
)
typeAndValue.foreach { case (partitionType, partitionValue) =>
val tableName = generateTableName
@@ -433,11 +434,12 @@ class TestInsertTable extends HoodieSparkSqlTestBase {
| partitioned by (dt)
| location '${tmp.getCanonicalPath}/$tableName'
""".stripMargin)
spark.sql(s"insert into $tableName partition(dt = $partitionValue) select 1, 'a1', 10")
// NOTE: We have to drop type-literal prefix since Spark doesn't parse type literals appropriately
spark.sql(s"insert into $tableName partition(dt = ${dropTypeLiteralPrefix(partitionValue)}) select 1, 'a1', 10")
spark.sql(s"insert into $tableName select 2, 'a2', 10, $partitionValue")
checkAnswer(s"select id, name, price, cast(dt as string) from $tableName order by id")(
Seq(1, "a1", 10, removeQuotes(partitionValue).toString),
Seq(2, "a2", 10, removeQuotes(partitionValue).toString)
Seq(1, "a1", 10, extractRawValue(partitionValue).toString),
Seq(2, "a2", 10, extractRawValue(partitionValue).toString)
)
}
@@ -481,14 +483,17 @@ class TestInsertTable extends HoodieSparkSqlTestBase {
| tblproperties (primaryKey = 'id')
| partitioned by (dt)
""".stripMargin)
checkException(s"insert into $tableName partition(dt = '2021-06-20')" +
s" select 1, 'a1', 10, '2021-06-20'") (
"assertion failed: Required select columns count: 4, Current select columns(including static partition column)" +
" count: 5columns: (1,a1,10,2021-06-20,dt)"
checkException(s"insert into $tableName partition(dt = '2021-06-20') select 1, 'a1', 10, '2021-06-20'") (
"Expected table's schema: " +
"[StructField(id,IntegerType,true), StructField(name,StringType,true), StructField(price,DoubleType,true), StructField(dt,StringType,true)], " +
"query's output (including static partition values): " +
"[StructField(1,IntegerType,false), StructField(a1,StringType,false), StructField(10,IntegerType,false), StructField(2021-06-20,StringType,false), StructField(dt,StringType,true)]"
)
checkException(s"insert into $tableName select 1, 'a1', 10")(
"assertion failed: Required select columns count: 4, Current select columns(including static partition column)" +
" count: 3columns: (1,a1,10)"
"Expected table's schema: " +
"[StructField(id,IntegerType,true), StructField(name,StringType,true), StructField(price,DoubleType,true), StructField(dt,StringType,true)], " +
"query's output (including static partition values): " +
"[StructField(1,IntegerType,false), StructField(a1,StringType,false), StructField(10,IntegerType,false)]"
)
spark.sql("set hoodie.sql.bulk.insert.enable = true")
spark.sql("set hoodie.sql.insert.mode = strict")

View File

@@ -908,7 +908,7 @@ class TestMergeIntoTable extends HoodieSparkSqlTestBase {
| when not matched then insert *
|""".stripMargin)
checkAnswer(s"select id, name, cast(value as string), ts from $tableName")(
Seq(1, "a1", removeQuotes(dataValue), 1000)
Seq(1, "a1", extractRawValue(dataValue), 1000)
)
}
}

View File

@@ -17,6 +17,7 @@
package org.apache.spark.sql.hudi
import org.apache.hudi.HoodieSparkUtils.isSpark2
import org.apache.hudi.common.util.PartitionPathEncodeUtils.DEFAULT_PARTITION_PATH
class TestShowPartitions extends HoodieSparkSqlTestBase {
@@ -84,11 +85,22 @@ class TestShowPartitions extends HoodieSparkSqlTestBase {
checkAnswer(s"show partitions $tableName partition(dt='2021-01-02')")(Seq("dt=2021-01-02"))
// Insert into null partition
spark.sql(
s"""
| insert into $tableName
| select 3 as id, 'a3' as name, 10 as price, 1000 as ts, null as dt
if (isSpark2) {
// Spark 2 isn't able to convert NullType to any other type w/ appropriate nullability, so
// explicit cast is required
spark.sql(
s"""
| insert into $tableName
| select 3 as id, 'a3' as name, 10 as price, 1000 as ts, cast(null as string) as dt
""".stripMargin)
} else {
spark.sql(
s"""
| insert into $tableName
| select 3 as id, 'a3' as name, 10 as price, 1000 as ts, null as dt
""".stripMargin)
}
checkAnswer(s"show partitions $tableName")(
Seq("dt=2021-01-01"), Seq("dt=2021-01-02"), Seq("dt=%s".format(DEFAULT_PARTITION_PATH))
)

View File

@@ -55,11 +55,11 @@ class TestSpark3DDL extends HoodieSparkSqlTestBase {
spark.sql(
s"""
| insert into $tableName values
| (1,1,11,100001,101.01,1001.0001,100001.0001,'a000001','2021-12-25','2021-12-25 12:01:01',true,'a01','2021-12-25'),
| (2,2,12,100002,102.02,1002.0002,100002.0002,'a000002','2021-12-25','2021-12-25 12:02:02',true,'a02','2021-12-25'),
| (3,3,13,100003,103.03,1003.0003,100003.0003,'a000003','2021-12-25','2021-12-25 12:03:03',false,'a03','2021-12-25'),
| (4,4,14,100004,104.04,1004.0004,100004.0004,'a000004','2021-12-26','2021-12-26 12:04:04',true,'a04','2021-12-26'),
| (5,5,15,100005,105.05,1005.0005,100005.0005,'a000005','2021-12-26','2021-12-26 12:05:05',false,'a05','2021-12-26')
| (1,1,11,100001,101.01,1001.0001,100001.0001,'a000001',DATE'2021-12-25',TIMESTAMP'2021-12-25 12:01:01',true,X'a01',TIMESTAMP'2021-12-25'),
| (2,2,12,100002,102.02,1002.0002,100002.0002,'a000002',DATE'2021-12-25',TIMESTAMP'2021-12-25 12:02:02',true,X'a02',TIMESTAMP'2021-12-25'),
| (3,3,13,100003,103.03,1003.0003,100003.0003,'a000003',DATE'2021-12-25',TIMESTAMP'2021-12-25 12:03:03',false,X'a03',TIMESTAMP'2021-12-25'),
| (4,4,14,100004,104.04,1004.0004,100004.0004,'a000004',DATE'2021-12-26',TIMESTAMP'2021-12-26 12:04:04',true,X'a04',TIMESTAMP'2021-12-26'),
| (5,5,15,100005,105.05,1005.0005,100005.0005,'a000005',DATE'2021-12-26',TIMESTAMP'2021-12-26 12:05:05',false,X'a05',TIMESTAMP'2021-12-26')
|""".stripMargin)
}
@@ -70,6 +70,9 @@ class TestSpark3DDL extends HoodieSparkSqlTestBase {
val tablePath = s"${new Path(tmp.getCanonicalPath, tableName).toUri.toString}"
if (HoodieSparkUtils.gteqSpark3_1) {
spark.sql("set hoodie.schema.on.read.enable=true")
// NOTE: This is required since as this tests use type coercions which were only permitted in Spark 2.x
// and are disallowed now by default in Spark 3.x
spark.sql("set spark.sql.storeAssignmentPolicy=legacy")
createAndPreparePartitionTable(spark, tableName, tablePath, tableType)
// date -> string -> date
spark.sql(s"alter table $tableName alter column col6 type String")
@@ -138,6 +141,9 @@ class TestSpark3DDL extends HoodieSparkSqlTestBase {
val tablePath = s"${new Path(tmp.getCanonicalPath, tableName).toUri.toString}"
if (HoodieSparkUtils.gteqSpark3_1) {
spark.sql("set hoodie.schema.on.read.enable=true")
// NOTE: This is required since as this tests use type coercions which were only permitted in Spark 2.x
// and are disallowed now by default in Spark 3.x
spark.sql("set spark.sql.storeAssignmentPolicy=legacy")
createAndPreparePartitionTable(spark, tableName, tablePath, tableType)
// float -> double -> decimal -> String
spark.sql(s"alter table $tableName alter column col2 type double")
@@ -172,6 +178,9 @@ class TestSpark3DDL extends HoodieSparkSqlTestBase {
val tablePath = s"${new Path(tmp.getCanonicalPath, tableName).toUri.toString}"
if (HoodieSparkUtils.gteqSpark3_1) {
spark.sql("set hoodie.schema.on.read.enable=true")
// NOTE: This is required since as this tests use type coercions which were only permitted in Spark 2.x
// and are disallowed now by default in Spark 3.x
spark.sql("set spark.sql.storeAssignmentPolicy=legacy")
createAndPreparePartitionTable(spark, tableName, tablePath, tableType)
// test set properties
@@ -402,7 +411,7 @@ class TestSpark3DDL extends HoodieSparkSqlTestBase {
spark.sql(s"alter table $tableName alter column members.value.a first")
spark.sql(s"insert into ${tableName} values(1, 'jack', map('k1', struct('v1', 100), 'k2', struct('v2', 200)), struct('jackStruct', 29, 100), 1000)")
spark.sql(s"insert into ${tableName} values(1, 'jack', map('k1', struct(100, 'v1'), 'k2', struct(200, 'v2')), struct('jackStruct', 29, 100), 1000)")
// rename column
spark.sql(s"alter table ${tableName} rename column user to userx")
@@ -424,7 +433,7 @@ class TestSpark3DDL extends HoodieSparkSqlTestBase {
checkAnswer(spark.sql(s"select name, userx.name, userx.score from ${tableName}").collect())(Seq(null, null, null))
// insert again
spark.sql(s"insert into ${tableName} values(2 , map('k1', struct('v1', 100), 'k2', struct('v2', 200)), struct('jackStructNew', 291 , 101), 'jacknew', 1000)")
spark.sql(s"insert into ${tableName} values(2 , map('k1', struct(100, 'v1'), 'k2', struct(200, 'v2')), struct('jackStructNew', 291 , 101), 'jacknew', 1000)")
// check again
checkAnswer(spark.sql(s"select name, userx.name as uxname, userx.score as uxs from ${tableName} order by id").collect())(
@@ -440,9 +449,9 @@ class TestSpark3DDL extends HoodieSparkSqlTestBase {
Seq(291, 2, "jacknew"))
// test map value type change
spark.sql(s"alter table ${tableName} add columns(mxp map<String, int>)")
spark.sql(s"insert into ${tableName} values(2 , map('k1', struct('v1', 100), 'k2', struct('v2', 200)), struct('jackStructNew', 291 , 101), 'jacknew', 1000, map('t1', 9))")
spark.sql(s"insert into ${tableName} values(2, map('k1', struct(100, 'v1'), 'k2', struct(200, 'v2')), struct('jackStructNew', 291 , 101), 'jacknew', 1000, map('t1', 9))")
spark.sql(s"alter table ${tableName} alter column mxp.value type double")
spark.sql(s"insert into ${tableName} values(2 , map('k1', struct('v1', 100), 'k2', struct('v2', 200)), struct('jackStructNew', 291 , 101), 'jacknew', 1000, map('t1', 10))")
spark.sql(s"insert into ${tableName} values(2, map('k1', struct(100, 'v1'), 'k2', struct(200, 'v2')), struct('jackStructNew', 291 , 101), 'jacknew', 1000, map('t1', 10))")
spark.sql(s"select * from $tableName").show(false)
checkAnswer(spark.sql(s"select mxp from ${tableName} order by id").collect())(
Seq(null),
@@ -453,7 +462,7 @@ class TestSpark3DDL extends HoodieSparkSqlTestBase {
spark.sql(s"alter table ${tableName} rename column userx to us")
spark.sql(s"alter table ${tableName} rename column us.age to age1")
spark.sql(s"insert into ${tableName} values(2 , map('k1', struct('v1', 100), 'k2', struct('v2', 200)), struct('jackStructNew', 291 , 101), 'jacknew', 1000, map('t1', 10))")
spark.sql(s"insert into ${tableName} values(2, map('k1', struct(100, 'v1'), 'k2', struct(200, 'v2')), struct('jackStructNew', 291 , 101), 'jacknew', 1000, map('t1', 10))")
spark.sql(s"select mem.value.nn, us.age1 from $tableName order by id").show()
checkAnswer(spark.sql(s"select mem.value.nn, us.age1 from $tableName order by id").collect())(
Seq(null, 29),

View File

@@ -17,6 +17,8 @@
package org.apache.spark.sql.hudi
import org.apache.hudi.HoodieSparkUtils.isSpark2
class TestUpdateTable extends HoodieSparkSqlTestBase {
test("Test Update Table") {
@@ -84,7 +86,12 @@ class TestUpdateTable extends HoodieSparkSqlTestBase {
""".stripMargin)
// insert data to table
spark.sql(s"insert into $tableName values (1, 'a1', 10.0, 1000), (2, 'a2', 20.0, 1000)")
if (isSpark2) {
spark.sql(s"insert into $tableName values (1, 'a1', cast(10.0 as double), 1000), (2, 'a2', cast(20.0 as double), 1000)")
} else {
spark.sql(s"insert into $tableName values (1, 'a1', 10.0, 1000), (2, 'a2', 20.0, 1000)")
}
checkAnswer(s"select id, name, price, ts from $tableName")(
Seq(1, "a1", 10.0, 1000),
Seq(2, "a2", 20.0, 1000)
@@ -119,11 +126,20 @@ class TestUpdateTable extends HoodieSparkSqlTestBase {
""".stripMargin)
// insert data to table
spark.sql(
s"""
|insert into $ptTableName
|values (1, 'a1', 10.0, 1000, "2021"), (2, 'a2', 20.0, 1000, "2021"), (3, 'a2', 30.0, 1000, "2022")
""".stripMargin)
if (isSpark2) {
spark.sql(
s"""
|insert into $ptTableName
|values (1, 'a1', cast(10.0 as double), 1000, "2021"), (2, 'a2', cast(20.0 as double), 1000, "2021"), (3, 'a2', cast(30.0 as double), 1000, "2022")
|""".stripMargin)
} else {
spark.sql(
s"""
|insert into $ptTableName
|values (1, 'a1', 10.0, 1000, "2021"), (2, 'a2', 20.0, 1000, "2021"), (3, 'a2', 30.0, 1000, "2022")
|""".stripMargin)
}
checkAnswer(s"select id, name, price, ts, pt from $ptTableName")(
Seq(1, "a1", 10.0, 1000, "2021"),
Seq(2, "a2", 20.0, 1000, "2021"),

View File

@@ -18,14 +18,22 @@
package org.apache.spark.sql
import org.apache.spark.sql.catalyst.{AliasIdentifier, TableIdentifier}
import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation
import org.apache.spark.sql.catalyst.expressions.{Expression, Like}
import org.apache.spark.sql.catalyst.analysis.{SimpleAnalyzer, UnresolvedRelation}
import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression, Like}
import org.apache.spark.sql.catalyst.plans.JoinType
import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoTable, Join, LogicalPlan}
import org.apache.spark.sql.execution.command.ExplainCommand
import org.apache.spark.sql.internal.SQLConf
object HoodieSpark2CatalystPlanUtils extends HoodieCatalystPlansUtils {
def resolveOutputColumns(tableName: String,
expected: Seq[Attribute],
query: LogicalPlan,
byName: Boolean,
conf: SQLConf): LogicalPlan =
SimpleAnalyzer.ResolveOutputRelation.resolveOutputColumns(tableName, expected, query, byName)
def createExplainCommand(plan: LogicalPlan, extended: Boolean): LogicalPlan =
ExplainCommand(plan, extended = extended)

View File

@@ -18,17 +18,25 @@
package org.apache.spark.sql
import org.apache.hudi.spark3.internal.ReflectUtil
import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation
import org.apache.spark.sql.catalyst.expressions.{Expression, Like}
import org.apache.spark.sql.catalyst.analysis.{TableOutputResolver, UnresolvedRelation}
import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression, Like}
import org.apache.spark.sql.catalyst.plans.JoinType
import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoStatement, Join, JoinHint, LogicalPlan}
import org.apache.spark.sql.catalyst.{AliasIdentifier, TableIdentifier}
import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._
import org.apache.spark.sql.execution.command.ExplainCommand
import org.apache.spark.sql.execution.{ExtendedMode, SimpleMode}
import org.apache.spark.sql.internal.SQLConf
abstract class HoodieSpark3CatalystPlanUtils extends HoodieCatalystPlansUtils {
def resolveOutputColumns(tableName: String,
expected: Seq[Attribute],
query: LogicalPlan,
byName: Boolean,
conf: SQLConf): LogicalPlan =
TableOutputResolver.resolveOutputColumns(tableName, expected, query, byName, conf)
def createExplainCommand(plan: LogicalPlan, extended: Boolean): LogicalPlan =
ExplainCommand(plan, mode = if (extended) ExtendedMode else SimpleMode)