1
0

[RFC-33] [HUDI-2429][Stacked on HUDI-2560] Support full Schema evolution for Spark (#4910)

* [HUDI-2560] introduce id_based schema to support full schema evolution.

* add test for FileBasedInternalSchemaStorageManger and rebase code

* add support for change column type and fix some test case

* fix some bugs encountered in the production env and delete useless code

* fix test error

* rebase code

* fixed some nested schema change bugs

* [HUDI-2429][Stacked On HUDI-2560]Support full schema evolution for spark

* [use dummyInternalSchema instead of null]

* add support for spark3.1.x

* remove support for spark3.1.x , sicne some compile fail

* support spark3.1.x

* rebase and prepare solve all comments

* address all comments

* rebase code

* fixed the count(*) bug

* try to get internalSchema by parser commit file/history file directly, not use metaclient which is time cost
address some comments

* fixed all comments

* fix new comments

* rebase code,fix UT failed

* fixed mistake

* rebase code ,fixed new comments

* rebase code , and prepare for address new comments

* address commits

* address new comments

* fix new issues

* control fallback original write logical
This commit is contained in:
xiarixiaoyao
2022-04-02 04:20:24 +08:00
committed by GitHub
parent 9275b8fc7e
commit 444ff496a4
89 changed files with 10352 additions and 106 deletions

View File

@@ -23,6 +23,11 @@ import org.apache.spark.sql.avro.{HoodieAvroDeserializer, HoodieAvroSchemaConver
import org.apache.spark.sql.hudi.SparkAdapter
import org.apache.spark.sql.types.DataType
import org.apache.spark.sql.{HoodieCatalystExpressionUtils, HoodieSpark3_1CatalystExpressionUtils}
import org.apache.spark.SPARK_VERSION
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.{HoodieCatalystExpressionUtils, HoodieSpark3_1CatalystExpressionUtils, SparkSession}
/**
* Implementation of [[SparkAdapter]] for Spark 3.1.x
@@ -37,4 +42,27 @@ class Spark3_1Adapter extends BaseSpark3Adapter {
override def createAvroDeserializer(rootAvroType: Schema, rootCatalystType: DataType): HoodieAvroDeserializer =
new HoodieSpark3_1AvroDeserializer(rootAvroType, rootCatalystType)
override def createResolveHudiAlterTableCommand(sparkSession: SparkSession): Rule[LogicalPlan] = {
if (SPARK_VERSION.startsWith("3.1")) {
val loadClassName = "org.apache.spark.sql.hudi.ResolveHudiAlterTableCommand312"
val clazz = Class.forName(loadClassName, true, Thread.currentThread().getContextClassLoader)
val ctor = clazz.getConstructors.head
ctor.newInstance(sparkSession).asInstanceOf[Rule[LogicalPlan]]
} else {
new Rule[LogicalPlan] {
override def apply(plan: LogicalPlan): LogicalPlan = plan
}
}
}
override def createHoodieParquetFileFormat(): Option[ParquetFileFormat] = {
if (SPARK_VERSION.startsWith("3.1")) {
val loadClassName = "org.apache.spark.sql.execution.datasources.parquet.Spark312HoodieParquetFileFormat"
val clazz = Class.forName(loadClassName, true, Thread.currentThread().getContextClassLoader)
val ctor = clazz.getConstructors.head
Some(ctor.newInstance().asInstanceOf[ParquetFileFormat])
} else {
None
}
}
}

View File

@@ -0,0 +1,66 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.catalyst.plans.logical
import org.apache.spark.sql.connector.catalog.TableChange.ColumnPosition
import org.apache.spark.sql.types.DataType
/**
* ALTER TABLE ... ADD COLUMNS command, as parsed from SQL.
*/
case class HoodieAlterTableAddColumnsStatement(
tableName: Seq[String],
columnsToAdd: Seq[QualifiedColType]) extends ParsedStatement
/**
* ALTER TABLE ... CHANGE COLUMN command, as parsed from SQL.
*/
case class HoodieAlterTableAlterColumnStatement(
tableName: Seq[String],
column: Seq[String],
dataType: Option[DataType],
nullable: Option[Boolean],
comment: Option[String],
position: Option[ColumnPosition]) extends ParsedStatement
/**
* ALTER TABLE ... RENAME COLUMN command, as parsed from SQL.
*/
case class HoodieAlterTableRenameColumnStatement(
tableName: Seq[String],
column: Seq[String],
newName: String) extends ParsedStatement
/**
* ALTER TABLE ... DROP COLUMNS command, as parsed from SQL.
*/
case class HoodieAlterTableDropColumnsStatement(
tableName: Seq[String], columnsToDrop: Seq[Seq[String]]) extends ParsedStatement
/**
* ALTER TABLE ... SET TBLPROPERTIES command, as parsed from SQL.
*/
case class HoodieAlterTableSetPropertiesStatement(
tableName: Seq[String], properties: Map[String, String]) extends ParsedStatement
/**
* ALTER TABLE ... UNSET TBLPROPERTIES command, as parsed from SQL.
*/
case class HoodieAlterTableUnsetPropertiesStatement(
tableName: Seq[String], propertyKeys: Seq[String], ifExists: Boolean) extends ParsedStatement

View File

@@ -0,0 +1,365 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.execution.datasources.parquet
import java.net.URI
import java.util
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl
import org.apache.hadoop.mapreduce.{JobID, TaskAttemptID, TaskID, TaskType}
import org.apache.hudi.client.utils.SparkInternalSchemaConverter
import org.apache.hudi.common.fs.FSUtils
import org.apache.hudi.HoodieSparkUtils
import org.apache.hudi.common.util.InternalSchemaCache
import org.apache.hudi.common.util.collection.Pair
import org.apache.hudi.internal.schema.InternalSchema
import org.apache.hudi.internal.schema.utils.{InternalSchemaUtils, SerDeHelper}
import org.apache.hudi.internal.schema.action.InternalSchemaMerger
import org.apache.parquet.filter2.compat.FilterCompat
import org.apache.parquet.filter2.predicate.FilterApi
import org.apache.parquet.format.converter.ParquetMetadataConverter.SKIP_ROW_GROUPS
import org.apache.parquet.hadoop.{ParquetFileReader, ParquetInputFormat, ParquetRecordReader}
import org.apache.spark.TaskContext
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.{Cast, JoinedRow}
import org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection
import org.apache.spark.sql.catalyst.util.DateTimeUtils
import org.apache.spark.sql.execution.datasources.{DataSourceUtils, PartitionedFile, RecordReaderIterator}
import org.apache.spark.sql.execution.datasources.parquet._
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.sources._
import org.apache.spark.sql.types.{AtomicType, DataType, StructField, StructType}
import org.apache.spark.util.SerializableConfiguration
class Spark312HoodieParquetFileFormat extends ParquetFileFormat {
// reference ParquetFileFormat from spark project
override def buildReaderWithPartitionValues(
sparkSession: SparkSession,
dataSchema: StructType,
partitionSchema: StructType,
requiredSchema: StructType,
filters: Seq[Filter],
options: Map[String, String],
hadoopConf: Configuration): PartitionedFile => Iterator[InternalRow] = {
if (hadoopConf.get(SparkInternalSchemaConverter.HOODIE_QUERY_SCHEMA, "").isEmpty) {
// fallback to origin parquet File read
super.buildReaderWithPartitionValues(sparkSession, dataSchema, partitionSchema, requiredSchema, filters, options, hadoopConf)
} else {
hadoopConf.set(ParquetInputFormat.READ_SUPPORT_CLASS, classOf[ParquetReadSupport].getName)
hadoopConf.set(
ParquetReadSupport.SPARK_ROW_REQUESTED_SCHEMA,
requiredSchema.json)
hadoopConf.set(
ParquetWriteSupport.SPARK_ROW_SCHEMA,
requiredSchema.json)
hadoopConf.set(
SQLConf.SESSION_LOCAL_TIMEZONE.key,
sparkSession.sessionState.conf.sessionLocalTimeZone)
hadoopConf.setBoolean(
SQLConf.NESTED_SCHEMA_PRUNING_ENABLED.key,
sparkSession.sessionState.conf.nestedSchemaPruningEnabled)
hadoopConf.setBoolean(
SQLConf.CASE_SENSITIVE.key,
sparkSession.sessionState.conf.caseSensitiveAnalysis)
ParquetWriteSupport.setSchema(requiredSchema, hadoopConf)
// Sets flags for `ParquetToSparkSchemaConverter`
hadoopConf.setBoolean(
SQLConf.PARQUET_BINARY_AS_STRING.key,
sparkSession.sessionState.conf.isParquetBinaryAsString)
hadoopConf.setBoolean(
SQLConf.PARQUET_INT96_AS_TIMESTAMP.key,
sparkSession.sessionState.conf.isParquetINT96AsTimestamp)
// for dataSource v1, we have no method to do project for spark physical plan.
// it's safe to do cols project here.
val internalSchemaString = hadoopConf.get(SparkInternalSchemaConverter.HOODIE_QUERY_SCHEMA)
val querySchemaOption = SerDeHelper.fromJson(internalSchemaString)
if (querySchemaOption.isPresent && !requiredSchema.isEmpty) {
val prunedSchema = SparkInternalSchemaConverter.convertAndPruneStructTypeToInternalSchema(requiredSchema, querySchemaOption.get())
hadoopConf.set(SparkInternalSchemaConverter.HOODIE_QUERY_SCHEMA, SerDeHelper.toJson(prunedSchema))
}
val broadcastedHadoopConf =
sparkSession.sparkContext.broadcast(new SerializableConfiguration(hadoopConf))
// TODO: if you move this into the closure it reverts to the default values.
// If true, enable using the custom RecordReader for parquet. This only works for
// a subset of the types (no complex types).
val resultSchema = StructType(partitionSchema.fields ++ requiredSchema.fields)
val sqlConf = sparkSession.sessionState.conf
val enableOffHeapColumnVector = sqlConf.offHeapColumnVectorEnabled
val enableVectorizedReader: Boolean =
sqlConf.parquetVectorizedReaderEnabled &&
resultSchema.forall(_.dataType.isInstanceOf[AtomicType])
val enableRecordFilter: Boolean = sqlConf.parquetRecordFilterEnabled
val timestampConversion: Boolean = sqlConf.isParquetINT96TimestampConversion
val capacity = sqlConf.parquetVectorizedReaderBatchSize
val enableParquetFilterPushDown: Boolean = sqlConf.parquetFilterPushDown
// Whole stage codegen (PhysicalRDD) is able to deal with batches directly
val returningBatch = supportBatch(sparkSession, resultSchema)
val pushDownDate = sqlConf.parquetFilterPushDownDate
val pushDownTimestamp = sqlConf.parquetFilterPushDownTimestamp
val pushDownDecimal = sqlConf.parquetFilterPushDownDecimal
val pushDownStringStartWith = sqlConf.parquetFilterPushDownStringStartWith
val pushDownInFilterThreshold = sqlConf.parquetFilterPushDownInFilterThreshold
val isCaseSensitive = sqlConf.caseSensitiveAnalysis
(file: PartitionedFile) => {
assert(file.partitionValues.numFields == partitionSchema.size)
val filePath = new Path(new URI(file.filePath))
val split =
new org.apache.parquet.hadoop.ParquetInputSplit(
filePath,
file.start,
file.start + file.length,
file.length,
Array.empty,
null)
val sharedConf = broadcastedHadoopConf.value.value
// do deal with internalSchema
val internalSchemaString = sharedConf.get(SparkInternalSchemaConverter.HOODIE_QUERY_SCHEMA)
// querySchema must be a pruned schema.
val querySchemaOption = SerDeHelper.fromJson(internalSchemaString)
val internalSchemaChangeEnabled = if (internalSchemaString.isEmpty || !querySchemaOption.isPresent) false else true
val tablePath = sharedConf.get(SparkInternalSchemaConverter.HOODIE_TABLE_PATH)
val commitInstantTime = FSUtils.getCommitTime(filePath.getName).toLong;
val fileSchema = if (internalSchemaChangeEnabled) {
val validCommits = sharedConf.get(SparkInternalSchemaConverter.HOODIE_VALID_COMMITS_LIST)
InternalSchemaCache.getInternalSchemaByVersionId(commitInstantTime, tablePath, sharedConf, if (validCommits == null) "" else validCommits)
} else {
// this should not happened, searchSchemaAndCache will deal with correctly.
null
}
lazy val footerFileMetaData =
ParquetFileReader.readFooter(sharedConf, filePath, SKIP_ROW_GROUPS).getFileMetaData
val datetimeRebaseMode = DataSourceUtils.datetimeRebaseMode(
footerFileMetaData.getKeyValueMetaData.get,
SQLConf.get.getConf(SQLConf.LEGACY_PARQUET_REBASE_MODE_IN_READ))
// Try to push down filters when filter push-down is enabled.
val pushed = if (enableParquetFilterPushDown) {
val parquetSchema = footerFileMetaData.getSchema
val parquetFilters = if (HoodieSparkUtils.gteqSpark3_1_3) {
Spark312HoodieParquetFileFormat.createParquetFilters(
parquetSchema,
pushDownDate,
pushDownTimestamp,
pushDownDecimal,
pushDownStringStartWith,
pushDownInFilterThreshold,
isCaseSensitive,
datetimeRebaseMode)
} else {
Spark312HoodieParquetFileFormat.createParquetFilters(
parquetSchema,
pushDownDate,
pushDownTimestamp,
pushDownDecimal,
pushDownStringStartWith,
pushDownInFilterThreshold,
isCaseSensitive)
}
filters.map(Spark312HoodieParquetFileFormat.rebuildFilterFromParquet(_, fileSchema, querySchemaOption.get()))
// Collects all converted Parquet filter predicates. Notice that not all predicates can be
// converted (`ParquetFilters.createFilter` returns an `Option`). That's why a `flatMap`
// is used here.
.flatMap(parquetFilters.createFilter(_))
.reduceOption(FilterApi.and)
} else {
None
}
// PARQUET_INT96_TIMESTAMP_CONVERSION says to apply timezone conversions to int96 timestamps'
// *only* if the file was created by something other than "parquet-mr", so check the actual
// writer here for this file. We have to do this per-file, as each file in the table may
// have different writers.
// Define isCreatedByParquetMr as function to avoid unnecessary parquet footer reads.
def isCreatedByParquetMr: Boolean =
footerFileMetaData.getCreatedBy().startsWith("parquet-mr")
val convertTz =
if (timestampConversion && !isCreatedByParquetMr) {
Some(DateTimeUtils.getZoneId(sharedConf.get(SQLConf.SESSION_LOCAL_TIMEZONE.key)))
} else {
None
}
val int96RebaseMode = DataSourceUtils.int96RebaseMode(
footerFileMetaData.getKeyValueMetaData.get,
SQLConf.get.getConf(SQLConf.LEGACY_PARQUET_INT96_REBASE_MODE_IN_READ))
val attemptId = new TaskAttemptID(new TaskID(new JobID(), TaskType.MAP, 0), 0)
// use new conf
val hadoopAttempConf = new Configuration(broadcastedHadoopConf.value.value)
//
// reset request schema
var typeChangeInfos: java.util.Map[Integer, Pair[DataType, DataType]] = new java.util.HashMap()
if (internalSchemaChangeEnabled) {
val mergedInternalSchema = new InternalSchemaMerger(fileSchema, querySchemaOption.get(), true, true).mergeSchema()
val mergedSchema = SparkInternalSchemaConverter.constructSparkSchemaFromInternalSchema(mergedInternalSchema)
typeChangeInfos = SparkInternalSchemaConverter.collectTypeChangedCols(querySchemaOption.get(), mergedInternalSchema)
hadoopAttempConf.set(ParquetReadSupport.SPARK_ROW_REQUESTED_SCHEMA, mergedSchema.json)
}
val hadoopAttemptContext =
new TaskAttemptContextImpl(hadoopAttempConf, attemptId)
// Try to push down filters when filter push-down is enabled.
// Notice: This push-down is RowGroups level, not individual records.
if (pushed.isDefined) {
ParquetInputFormat.setFilterPredicate(hadoopAttemptContext.getConfiguration, pushed.get)
}
val taskContext = Option(TaskContext.get())
if (enableVectorizedReader) {
val vectorizedReader = new Spark312HoodieVectorizedParquetRecordReader(
convertTz.orNull,
datetimeRebaseMode.toString,
int96RebaseMode.toString,
enableOffHeapColumnVector && taskContext.isDefined,
capacity, typeChangeInfos)
val iter = new RecordReaderIterator(vectorizedReader)
// SPARK-23457 Register a task completion listener before `initialization`.
taskContext.foreach(_.addTaskCompletionListener[Unit](_ => iter.close()))
vectorizedReader.initialize(split, hadoopAttemptContext)
logDebug(s"Appending $partitionSchema ${file.partitionValues}")
vectorizedReader.initBatch(partitionSchema, file.partitionValues)
if (returningBatch) {
vectorizedReader.enableReturningBatches()
}
// UnsafeRowParquetRecordReader appends the columns internally to avoid another copy.
iter.asInstanceOf[Iterator[InternalRow]]
} else {
logDebug(s"Falling back to parquet-mr")
// ParquetRecordReader returns InternalRow
val readSupport = new ParquetReadSupport(
convertTz,
enableVectorizedReader = false,
datetimeRebaseMode,
int96RebaseMode)
val reader = if (pushed.isDefined && enableRecordFilter) {
val parquetFilter = FilterCompat.get(pushed.get, null)
new ParquetRecordReader[InternalRow](readSupport, parquetFilter)
} else {
new ParquetRecordReader[InternalRow](readSupport)
}
val iter = new RecordReaderIterator[InternalRow](reader)
// SPARK-23457 Register a task completion listener before `initialization`.
taskContext.foreach(_.addTaskCompletionListener[Unit](_ => iter.close()))
reader.initialize(split, hadoopAttemptContext)
val fullSchema = requiredSchema.toAttributes ++ partitionSchema.toAttributes
val unsafeProjection = if (typeChangeInfos.isEmpty) {
GenerateUnsafeProjection.generate(fullSchema, fullSchema)
} else {
// find type changed.
val newFullSchema = new StructType(requiredSchema.fields.zipWithIndex.map { case (f, i) =>
if (typeChangeInfos.containsKey(i)) {
StructField(f.name, typeChangeInfos.get(i).getRight, f.nullable, f.metadata)
} else f
}).toAttributes ++ partitionSchema.toAttributes
val castSchema = newFullSchema.zipWithIndex.map { case (attr, i) =>
if (typeChangeInfos.containsKey(i)) {
Cast(attr, typeChangeInfos.get(i).getLeft)
} else attr
}
GenerateUnsafeProjection.generate(castSchema, newFullSchema)
}
if (partitionSchema.length == 0) {
// There is no partition columns
iter.map(unsafeProjection)
} else {
val joinedRow = new JoinedRow()
iter.map(d => unsafeProjection(joinedRow(d, file.partitionValues)))
}
}
}
}
}
}
object Spark312HoodieParquetFileFormat {
val PARQUET_FILTERS_CLASS_NAME = "org.apache.spark.sql.execution.datasources.parquet.ParquetFilters"
private def createParquetFilters(arg: Any*): ParquetFilters = {
val clazz = Class.forName(PARQUET_FILTERS_CLASS_NAME, true, Thread.currentThread().getContextClassLoader)
val ctor = clazz.getConstructors.head
ctor.newInstance(arg.map(_.asInstanceOf[AnyRef]): _*).asInstanceOf[ParquetFilters]
}
private def rebuildFilterFromParquet(oldFilter: Filter, fileSchema: InternalSchema, querySchema: InternalSchema): Filter = {
if (fileSchema == null || querySchema == null) {
oldFilter
} else {
oldFilter match {
case eq: EqualTo =>
val newAttribute = InternalSchemaUtils.reBuildFilterName(eq.attribute, fileSchema, querySchema)
if (newAttribute.isEmpty) AlwaysTrue else eq.copy(attribute = newAttribute)
case eqs: EqualNullSafe =>
val newAttribute = InternalSchemaUtils.reBuildFilterName(eqs.attribute, fileSchema, querySchema)
if (newAttribute.isEmpty) AlwaysTrue else eqs.copy(attribute = newAttribute)
case gt: GreaterThan =>
val newAttribute = InternalSchemaUtils.reBuildFilterName(gt.attribute, fileSchema, querySchema)
if (newAttribute.isEmpty) AlwaysTrue else gt.copy(attribute = newAttribute)
case gtr: GreaterThanOrEqual =>
val newAttribute = InternalSchemaUtils.reBuildFilterName(gtr.attribute, fileSchema, querySchema)
if (newAttribute.isEmpty) AlwaysTrue else gtr.copy(attribute = newAttribute)
case lt: LessThan =>
val newAttribute = InternalSchemaUtils.reBuildFilterName(lt.attribute, fileSchema, querySchema)
if (newAttribute.isEmpty) AlwaysTrue else lt.copy(attribute = newAttribute)
case lte: LessThanOrEqual =>
val newAttribute = InternalSchemaUtils.reBuildFilterName(lte.attribute, fileSchema, querySchema)
if (newAttribute.isEmpty) AlwaysTrue else lte.copy(attribute = newAttribute)
case i: In =>
val newAttribute = InternalSchemaUtils.reBuildFilterName(i.attribute, fileSchema, querySchema)
if (newAttribute.isEmpty) AlwaysTrue else i.copy(attribute = newAttribute)
case isn: IsNull =>
val newAttribute = InternalSchemaUtils.reBuildFilterName(isn.attribute, fileSchema, querySchema)
if (newAttribute.isEmpty) AlwaysTrue else isn.copy(attribute = newAttribute)
case isnn: IsNotNull =>
val newAttribute = InternalSchemaUtils.reBuildFilterName(isnn.attribute, fileSchema, querySchema)
if (newAttribute.isEmpty) AlwaysTrue else isnn.copy(attribute = newAttribute)
case And(left, right) =>
And(rebuildFilterFromParquet(left, fileSchema, querySchema), rebuildFilterFromParquet(right, fileSchema, querySchema))
case Or(left, right) =>
Or(rebuildFilterFromParquet(left, fileSchema, querySchema), rebuildFilterFromParquet(right, fileSchema, querySchema))
case Not(child) =>
Not(rebuildFilterFromParquet(child, fileSchema, querySchema))
case ssw: StringStartsWith =>
val newAttribute = InternalSchemaUtils.reBuildFilterName(ssw.attribute, fileSchema, querySchema)
if (newAttribute.isEmpty) AlwaysTrue else ssw.copy(attribute = newAttribute)
case ses: StringEndsWith =>
val newAttribute = InternalSchemaUtils.reBuildFilterName(ses.attribute, fileSchema, querySchema)
if (newAttribute.isEmpty) AlwaysTrue else ses.copy(attribute = newAttribute)
case sc: StringContains =>
val newAttribute = InternalSchemaUtils.reBuildFilterName(sc.attribute, fileSchema, querySchema)
if (newAttribute.isEmpty) AlwaysTrue else sc.copy(attribute = newAttribute)
case AlwaysTrue =>
AlwaysTrue
case AlwaysFalse =>
AlwaysFalse
case _ =>
AlwaysTrue
}
}
}
}

View File

@@ -0,0 +1,282 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.hudi
import java.util.Locale
import org.apache.hudi.config.HoodieWriteConfig
import org.apache.hudi.internal.schema.action.TableChange.ColumnChangeID
import org.apache.spark.sql.{AnalysisException, SparkSession}
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.catalog.CatalogTable
import org.apache.spark.sql.connector.catalog.{CatalogV2Util, TableChange}
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.connector.catalog.CatalogV2Util.failNullType
import org.apache.spark.sql.connector.catalog.TableChange._
import org.apache.spark.sql.hudi.command.AlterTableCommand312
import org.apache.spark.sql.types.StructType
import scala.collection.mutable
/**
* Rule to mostly resolve, normalize and rewrite column names based on case sensitivity
* for alter table column commands.
* TODO: we should remove this file when we support datasourceV2 for hoodie on spark3.1x
*/
case class ResolveHudiAlterTableCommand312(sparkSession: SparkSession) extends Rule[LogicalPlan] {
import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._
def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperatorsUp {
case add @ HoodieAlterTableAddColumnsStatement(asTable(table), cols) =>
if (isHoodieTable(table) && schemaEvolutionEnabled){
cols.foreach(c => CatalogV2Util.failNullType(c.dataType))
val changes = cols.map { col =>
TableChange.addColumn(
col.name.toArray,
col.dataType,
col.nullable,
col.comment.orNull,
col.position.orNull)
}
val newChanges = normalizeChanges(changes, table.schema)
AlterTableCommand312(table, newChanges, ColumnChangeID.ADD)
} else {
// throw back to spark
AlterTableAddColumnsStatement(add.tableName, add.columnsToAdd)
}
case a @ HoodieAlterTableAlterColumnStatement(asTable(table), _, _, _, _, _) =>
if (isHoodieTable(table) && schemaEvolutionEnabled){
a.dataType.foreach(failNullType)
val colName = a.column.toArray
val typeChange = a.dataType.map { newDataType =>
TableChange.updateColumnType(colName, newDataType)
}
val nullabilityChange = a.nullable.map { nullable =>
TableChange.updateColumnNullability(colName, nullable)
}
val commentChange = a.comment.map { newComment =>
TableChange.updateColumnComment(colName, newComment)
}
val positionChange = a.position.map { newPosition =>
TableChange.updateColumnPosition(colName, newPosition)
}
AlterTableCommand312(table, normalizeChanges(typeChange.toSeq ++ nullabilityChange ++ commentChange ++ positionChange, table.schema), ColumnChangeID.UPDATE)
} else {
// throw back to spark
AlterTableAlterColumnStatement(a.tableName, a.column, a.dataType, a.nullable, a.comment, a.position)
}
case rename @ HoodieAlterTableRenameColumnStatement(asTable(table), col, newName) =>
if (isHoodieTable(table) && schemaEvolutionEnabled){
val changes = Seq(TableChange.renameColumn(col.toArray, newName))
AlterTableCommand312(table, normalizeChanges(changes, table.schema), ColumnChangeID.UPDATE)
} else {
// throw back to spark
AlterTableRenameColumnStatement(rename.tableName, rename.column, rename.newName)
}
case drop @ HoodieAlterTableDropColumnsStatement(asTable(table), cols) =>
if (isHoodieTable(table) && schemaEvolutionEnabled) {
val changes = cols.map(col => TableChange.deleteColumn(col.toArray))
AlterTableCommand312(table, normalizeChanges(changes, table.schema), ColumnChangeID.DELETE)
} else {
// throw back to spark
AlterTableDropColumnsStatement(drop.tableName, drop.columnsToDrop)
}
case set @ HoodieAlterTableSetPropertiesStatement(asTable(table), props) =>
if (isHoodieTable(table) && schemaEvolutionEnabled) {
val changes = props.map { case (key, value) =>
TableChange.setProperty(key, value)
}.toSeq
AlterTableCommand312(table, normalizeChanges(changes, table.schema), ColumnChangeID.PROPERTY_CHANGE)
} else {
// throw back to spark
AlterTableSetPropertiesStatement(set.tableName, set.properties)
}
case unset @ HoodieAlterTableUnsetPropertiesStatement(asTable(table), keys, _) =>
if (isHoodieTable(table) && schemaEvolutionEnabled) {
val changes = keys.map(key => TableChange.removeProperty(key))
AlterTableCommand312(table, normalizeChanges(changes, table.schema), ColumnChangeID.PROPERTY_CHANGE)
} else {
// throw back to spark
AlterTableUnsetPropertiesStatement(unset.tableName, unset.propertyKeys, unset.ifExists)
}
}
private def schemaEvolutionEnabled(): Boolean = sparkSession
.sessionState.conf.getConfString(HoodieWriteConfig.SCHEMA_EVOLUTION_ENABLE.key(), "false").toBoolean
private def isHoodieTable(table: CatalogTable): Boolean = table.provider.map(_.toLowerCase(Locale.ROOT)).orNull == "hudi"
def normalizeChanges(changes: Seq[TableChange], schema: StructType): Seq[TableChange] = {
val colsToAdd = mutable.Map.empty[Seq[String], Seq[String]]
changes.flatMap {
case add: AddColumn =>
def addColumn(parentSchema: StructType, parentName: String, normalizedParentName: Seq[String]): TableChange = {
val fieldsAdded = colsToAdd.getOrElse(normalizedParentName, Nil)
val pos = findColumnPosition(add.position(), parentName, parentSchema, fieldsAdded)
val field = add.fieldNames().last
colsToAdd(normalizedParentName) = fieldsAdded :+ field
TableChange.addColumn(
(normalizedParentName :+ field).toArray,
add.dataType(),
add.isNullable,
add.comment,
pos)
}
val parent = add.fieldNames().init
if (parent.nonEmpty) {
// Adding a nested field, need to normalize the parent column and position
val target = schema.findNestedField(parent, includeCollections = true, conf.resolver)
if (target.isEmpty) {
// Leave unresolved. Throws error in CheckAnalysis
Some(add)
} else {
val (normalizedName, sf) = target.get
sf.dataType match {
case struct: StructType =>
Some(addColumn(struct, parent.quoted, normalizedName :+ sf.name))
case other =>
Some(add)
}
}
} else {
// Adding to the root. Just need to normalize position
Some(addColumn(schema, "root", Nil))
}
case typeChange: UpdateColumnType =>
// Hive style syntax provides the column type, even if it may not have changed
val fieldOpt = schema.findNestedField(
typeChange.fieldNames(), includeCollections = true, conf.resolver)
if (fieldOpt.isEmpty) {
// We couldn't resolve the field. Leave it to CheckAnalysis
Some(typeChange)
} else {
val (fieldNames, field) = fieldOpt.get
if (field.dataType == typeChange.newDataType()) {
// The user didn't want the field to change, so remove this change
None
} else {
Some(TableChange.updateColumnType(
(fieldNames :+ field.name).toArray, typeChange.newDataType()))
}
}
case n: UpdateColumnNullability =>
// Need to resolve column
resolveFieldNames(
schema,
n.fieldNames(),
TableChange.updateColumnNullability(_, n.nullable())).orElse(Some(n))
case position: UpdateColumnPosition =>
position.position() match {
case after: After =>
// Need to resolve column as well as position reference
val fieldOpt = schema.findNestedField(
position.fieldNames(), includeCollections = true, conf.resolver)
if (fieldOpt.isEmpty) {
Some(position)
} else {
val (normalizedPath, field) = fieldOpt.get
val targetCol = schema.findNestedField(
normalizedPath :+ after.column(), includeCollections = true, conf.resolver)
if (targetCol.isEmpty) {
// Leave unchanged to CheckAnalysis
Some(position)
} else {
Some(TableChange.updateColumnPosition(
(normalizedPath :+ field.name).toArray,
ColumnPosition.after(targetCol.get._2.name)))
}
}
case _ =>
// Need to resolve column
resolveFieldNames(
schema,
position.fieldNames(),
TableChange.updateColumnPosition(_, position.position())).orElse(Some(position))
}
case comment: UpdateColumnComment =>
resolveFieldNames(
schema,
comment.fieldNames(),
TableChange.updateColumnComment(_, comment.newComment())).orElse(Some(comment))
case rename: RenameColumn =>
resolveFieldNames(
schema,
rename.fieldNames(),
TableChange.renameColumn(_, rename.newName())).orElse(Some(rename))
case delete: DeleteColumn =>
resolveFieldNames(schema, delete.fieldNames(), TableChange.deleteColumn)
.orElse(Some(delete))
case column: ColumnChange =>
// This is informational for future developers
throw new UnsupportedOperationException(
"Please add an implementation for a column change here")
case other => Some(other)
}
}
/**
* Returns the table change if the field can be resolved, returns None if the column is not
* found. An error will be thrown in CheckAnalysis for columns that can't be resolved.
*/
private def resolveFieldNames(
schema: StructType,
fieldNames: Array[String],
copy: Array[String] => TableChange): Option[TableChange] = {
val fieldOpt = schema.findNestedField(
fieldNames, includeCollections = true, conf.resolver)
fieldOpt.map { case (path, field) => copy((path :+ field.name).toArray) }
}
private def findColumnPosition(
position: ColumnPosition,
parentName: String,
struct: StructType,
fieldsAdded: Seq[String]): ColumnPosition = {
position match {
case null => null
case after: After =>
(struct.fieldNames ++ fieldsAdded).find(n => conf.resolver(n, after.column())) match {
case Some(colName) =>
ColumnPosition.after(colName)
case None =>
throw new AnalysisException("Couldn't find the reference column for " +
s"$after at $parentName")
}
case other => other
}
}
object asTable {
def unapply(parts: Seq[String]): Option[CatalogTable] = {
val identifier = parts match {
case Seq(tblName) => TableIdentifier(tblName)
case Seq(dbName, tblName) => TableIdentifier(tblName, Some(dbName))
case _ =>
throw new AnalysisException(
s"${parts} is not a valid TableIdentifier as it has more than 2 name parts.")
}
Some(sparkSession.sessionState.catalog.getTableMetadata(identifier))
}
}
}

View File

@@ -0,0 +1,324 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.hudi.command
import java.net.URI
import java.nio.charset.StandardCharsets
import java.util
import java.util.concurrent.atomic.AtomicInteger
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.apache.hudi.DataSourceWriteOptions._
import org.apache.hudi.client.utils.SparkInternalSchemaConverter
import org.apache.hudi.common.model.{HoodieCommitMetadata, WriteOperationType}
import org.apache.hudi.{AvroConversionUtils, DataSourceOptionsHelper, DataSourceUtils}
import org.apache.hudi.common.table.timeline.{HoodieActiveTimeline, HoodieInstant}
import org.apache.hudi.common.table.timeline.HoodieInstant.State
import org.apache.hudi.common.table.{HoodieTableMetaClient, TableSchemaResolver}
import org.apache.hudi.common.util.{CommitUtils, Option}
import org.apache.hudi.config.HoodieWriteConfig
import org.apache.hudi.internal.schema.InternalSchema
import org.apache.hudi.internal.schema.action.TableChange.ColumnChangeID
import org.apache.hudi.internal.schema.action.TableChanges
import org.apache.hudi.internal.schema.convert.AvroInternalSchemaConverter
import org.apache.hudi.internal.schema.utils.{SchemaChangeUtils, SerDeHelper}
import org.apache.hudi.internal.schema.io.FileBasedInternalSchemaStorageManager
import org.apache.hudi.table.HoodieSparkTable
import org.apache.spark.api.java.JavaSparkContext
import org.apache.spark.internal.Logging
import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.sql.catalyst.catalog.{CatalogTable, CatalogTableType}
import org.apache.spark.sql.connector.catalog.{TableCatalog, TableChange}
import org.apache.spark.sql.connector.catalog.TableChange.{AddColumn, DeleteColumn, RemoveProperty, SetProperty}
import org.apache.spark.sql.execution.command.RunnableCommand
import org.apache.spark.sql.types.StructType
import scala.collection.JavaConverters._
import scala.util.control.NonFatal
// TODO: we should remove this file when we support datasourceV2 for hoodie on spark3.1x
case class AlterTableCommand312(table: CatalogTable, changes: Seq[TableChange], changeType: ColumnChangeID) extends RunnableCommand with Logging {
override def run(sparkSession: SparkSession): Seq[Row] = {
changeType match {
case ColumnChangeID.ADD => applyAddAction(sparkSession)
case ColumnChangeID.DELETE => applyDeleteAction(sparkSession)
case ColumnChangeID.UPDATE => applyUpdateAction(sparkSession)
case ColumnChangeID.PROPERTY_CHANGE if (changes.filter(_.isInstanceOf[SetProperty]).size == changes.size) =>
applyPropertySet(sparkSession)
case ColumnChangeID.PROPERTY_CHANGE if (changes.filter(_.isInstanceOf[RemoveProperty]).size == changes.size) =>
applyPropertyUnset(sparkSession)
case other => throw new RuntimeException(s"find unsupported alter command type: ${other}")
}
Seq.empty[Row]
}
def applyAddAction(sparkSession: SparkSession): Unit = {
val (oldSchema, historySchema) = getInternalSchemaAndHistorySchemaStr(sparkSession)
val addChange = TableChanges.ColumnAddChange.get(oldSchema)
changes.map(_.asInstanceOf[AddColumn]).foreach { addColumn =>
val names = addColumn.fieldNames()
val parentName = AlterTableCommand312.getParentName(names)
// add col change
val colType = SparkInternalSchemaConverter.buildTypeFromStructType(addColumn.dataType(), true, new AtomicInteger(0))
addChange.addColumns(parentName, names.last, colType, addColumn.comment())
// add position change
addColumn.position() match {
case after: TableChange.After =>
addChange.addPositionChange(names.mkString("."),
if (parentName.isEmpty) after.column() else parentName + "." + after.column(), "after")
case _: TableChange.First =>
addChange.addPositionChange(names.mkString("."), "", "first")
case _ =>
}
}
val newSchema = SchemaChangeUtils.applyTableChanges2Schema(oldSchema, addChange)
val verifiedHistorySchema = if (historySchema == null || historySchema.isEmpty) {
SerDeHelper.inheritSchemas(oldSchema, "")
} else {
historySchema
}
AlterTableCommand312.commitWithSchema(newSchema, verifiedHistorySchema, table, sparkSession)
logInfo("column add finished")
}
def applyDeleteAction(sparkSession: SparkSession): Unit = {
val (oldSchema, historySchema) = getInternalSchemaAndHistorySchemaStr(sparkSession)
val deleteChange = TableChanges.ColumnDeleteChange.get(oldSchema)
changes.map(_.asInstanceOf[DeleteColumn]).foreach { c =>
val originalColName = c.fieldNames().mkString(".");
AlterTableCommand312.checkSchemaChange(Seq(originalColName), table)
deleteChange.deleteColumn(originalColName)
}
val newSchema = SchemaChangeUtils.applyTableChanges2Schema(oldSchema, deleteChange)
// delete action should not change the getMaxColumnId field.
newSchema.setMaxColumnId(oldSchema.getMaxColumnId)
val verifiedHistorySchema = if (historySchema == null || historySchema.isEmpty) {
SerDeHelper.inheritSchemas(oldSchema, "")
} else {
historySchema
}
AlterTableCommand312.commitWithSchema(newSchema, verifiedHistorySchema, table, sparkSession)
logInfo("column delete finished")
}
def applyUpdateAction(sparkSession: SparkSession): Unit = {
val (oldSchema, historySchema) = getInternalSchemaAndHistorySchemaStr(sparkSession)
val updateChange = TableChanges.ColumnUpdateChange.get(oldSchema)
changes.foreach { change =>
change match {
case updateType: TableChange.UpdateColumnType =>
val newType = SparkInternalSchemaConverter.buildTypeFromStructType(updateType.newDataType(), true, new AtomicInteger(0))
updateChange.updateColumnType(updateType.fieldNames().mkString("."), newType)
case updateComment: TableChange.UpdateColumnComment =>
updateChange.updateColumnComment(updateComment.fieldNames().mkString("."), updateComment.newComment())
case updateName: TableChange.RenameColumn =>
val originalColName = updateName.fieldNames().mkString(".")
AlterTableCommand312.checkSchemaChange(Seq(originalColName), table)
updateChange.renameColumn(originalColName, updateName.newName())
case updateNullAbility: TableChange.UpdateColumnNullability =>
updateChange.updateColumnNullability(updateNullAbility.fieldNames().mkString("."), updateNullAbility.nullable())
case updatePosition: TableChange.UpdateColumnPosition =>
val names = updatePosition.fieldNames()
val parentName = AlterTableCommand312.getParentName(names)
updatePosition.position() match {
case after: TableChange.After =>
updateChange.addPositionChange(names.mkString("."),
if (parentName.isEmpty) after.column() else parentName + "." + after.column(), "after")
case _: TableChange.First =>
updateChange.addPositionChange(names.mkString("."), "", "first")
case _ =>
}
}
}
val newSchema = SchemaChangeUtils.applyTableChanges2Schema(oldSchema, updateChange)
val verifiedHistorySchema = if (historySchema == null || historySchema.isEmpty) {
SerDeHelper.inheritSchemas(oldSchema, "")
} else {
historySchema
}
AlterTableCommand312.commitWithSchema(newSchema, verifiedHistorySchema, table, sparkSession)
logInfo("column update finished")
}
// to do support unset default value to columns, and apply them to internalSchema
def applyPropertyUnset(sparkSession: SparkSession): Unit = {
val catalog = sparkSession.sessionState.catalog
val propKeys = changes.map(_.asInstanceOf[RemoveProperty]).map(_.property())
// ignore NonExist unset
propKeys.foreach { k =>
if (!table.properties.contains(k) && k != TableCatalog.PROP_COMMENT) {
logWarning(s"find non exist unset property: ${k} , ignore it")
}
}
val tableComment = if (propKeys.contains(TableCatalog.PROP_COMMENT)) None else table.comment
val newProperties = table.properties.filter { case (k, _) => !propKeys.contains(k) }
val newTable = table.copy(properties = newProperties, comment = tableComment)
catalog.alterTable(newTable)
logInfo("table properties change finished")
}
// to do support set default value to columns, and apply them to internalSchema
def applyPropertySet(sparkSession: SparkSession): Unit = {
val catalog = sparkSession.sessionState.catalog
val properties = changes.map(_.asInstanceOf[SetProperty]).map(f => f.property -> f.value).toMap
// This overrides old properties and update the comment parameter of CatalogTable
// with the newly added/modified comment since CatalogTable also holds comment as its
// direct property.
val newTable = table.copy(
properties = table.properties ++ properties,
comment = properties.get(TableCatalog.PROP_COMMENT).orElse(table.comment))
catalog.alterTable(newTable)
logInfo("table properties change finished")
}
def getInternalSchemaAndHistorySchemaStr(sparkSession: SparkSession): (InternalSchema, String) = {
val path = AlterTableCommand312.getTableLocation(table, sparkSession)
val hadoopConf = sparkSession.sessionState.newHadoopConf()
val metaClient = HoodieTableMetaClient.builder().setBasePath(path)
.setConf(hadoopConf).build()
val schemaUtil = new TableSchemaResolver(metaClient)
val schema = schemaUtil.getTableInternalSchemaFromCommitMetadata().orElse {
AvroInternalSchemaConverter.convert(schemaUtil.getTableAvroSchema)
}
val historySchemaStr = schemaUtil.getTableHistorySchemaStrFromCommitMetadata.orElse("")
(schema, historySchemaStr)
}
}
object AlterTableCommand312 extends Logging {
/**
* Generate an commit with new schema to change the table's schema.
* @param internalSchema new schema after change
* @param historySchemaStr history schemas
* @param table The hoodie table.
* @param sparkSession The spark session.
*/
def commitWithSchema(internalSchema: InternalSchema, historySchemaStr: String, table: CatalogTable, sparkSession: SparkSession): Unit = {
val schema = AvroInternalSchemaConverter.convert(internalSchema, table.identifier.table)
val path = getTableLocation(table, sparkSession)
val jsc = new JavaSparkContext(sparkSession.sparkContext)
val client = DataSourceUtils.createHoodieClient(jsc, schema.toString,
path, table.identifier.table, parametersWithWriteDefaults(table.storage.properties).asJava)
val hadoopConf = sparkSession.sessionState.newHadoopConf()
val metaClient = HoodieTableMetaClient.builder().setBasePath(path).setConf(hadoopConf).build()
val commitActionType = CommitUtils.getCommitActionType(WriteOperationType.ALTER_SCHEMA, metaClient.getTableType)
val instantTime = HoodieActiveTimeline.createNewInstantTime
client.startCommitWithTime(instantTime, commitActionType)
val hoodieTable = HoodieSparkTable.create(client.getConfig, client.getEngineContext)
val timeLine = hoodieTable.getActiveTimeline
val requested = new HoodieInstant(State.REQUESTED, commitActionType, instantTime)
val metadata = new HoodieCommitMetadata
metadata.setOperationType(WriteOperationType.ALTER_SCHEMA)
timeLine.transitionRequestedToInflight(requested, Option.of(metadata.toJsonString.getBytes(StandardCharsets.UTF_8)))
val extraMeta = new util.HashMap[String, String]()
extraMeta.put(SerDeHelper.LATEST_SCHEMA, SerDeHelper.toJson(internalSchema.setSchemaId(instantTime.toLong)))
val schemaManager = new FileBasedInternalSchemaStorageManager(metaClient)
schemaManager.persistHistorySchemaStr(instantTime, SerDeHelper.inheritSchemas(internalSchema, historySchemaStr))
client.commit(instantTime, jsc.emptyRDD, Option.of(extraMeta))
val existRoTable = sparkSession.catalog.tableExists(table.identifier.unquotedString + "_ro")
val existRtTable = sparkSession.catalog.tableExists(table.identifier.unquotedString + "_rt")
try {
sparkSession.catalog.refreshTable(table.identifier.unquotedString)
// try to refresh ro/rt table
if (existRoTable) sparkSession.catalog.refreshTable(table.identifier.unquotedString + "_ro")
if (existRoTable) sparkSession.catalog.refreshTable(table.identifier.unquotedString + "_rt")
} catch {
case NonFatal(e) =>
log.error(s"Exception when attempting to refresh table ${table.identifier.quotedString}", e)
}
// try to sync to hive
// drop partition field before call alter table
val fullSparkSchema = SparkInternalSchemaConverter.constructSparkSchemaFromInternalSchema(internalSchema)
val dataSparkSchema = new StructType(fullSparkSchema.fields.filter(p => !table.partitionColumnNames.exists(f => sparkSession.sessionState.conf.resolver(f, p.name))))
alterTableDataSchema(sparkSession, table.identifier.database.getOrElse("default"), table.identifier.table, dataSparkSchema)
if (existRoTable) alterTableDataSchema(sparkSession, table.identifier.database.getOrElse("default"), table.identifier.table + "_ro", dataSparkSchema)
if (existRtTable) alterTableDataSchema(sparkSession, table.identifier.database.getOrElse("default"), table.identifier.table + "_rt", dataSparkSchema)
}
def alterTableDataSchema(sparkSession: SparkSession, db: String, tableName: String, dataSparkSchema: StructType): Unit = {
sparkSession.sessionState.catalog
.externalCatalog
.alterTableDataSchema(db, tableName, dataSparkSchema)
}
def getTableLocation(table: CatalogTable, sparkSession: SparkSession): String = {
val uri = if (table.tableType == CatalogTableType.MANAGED) {
Some(sparkSession.sessionState.catalog.defaultTablePath(table.identifier))
} else {
table.storage.locationUri
}
val conf = sparkSession.sessionState.newHadoopConf()
uri.map(makePathQualified(_, conf))
.map(removePlaceHolder)
.getOrElse(throw new IllegalArgumentException(s"Missing location for ${table.identifier}"))
}
private def removePlaceHolder(path: String): String = {
if (path == null || path.length == 0) {
path
} else if (path.endsWith("-__PLACEHOLDER__")) {
path.substring(0, path.length() - 16)
} else {
path
}
}
def makePathQualified(path: URI, hadoopConf: Configuration): String = {
val hadoopPath = new Path(path)
val fs = hadoopPath.getFileSystem(hadoopConf)
fs.makeQualified(hadoopPath).toUri.toString
}
def getParentName(names: Array[String]): String = {
if (names.size > 1) {
names.dropRight(1).mkString(".")
} else ""
}
def parametersWithWriteDefaults(parameters: Map[String, String]): Map[String, String] = {
Map(OPERATION.key -> OPERATION.defaultValue,
TABLE_TYPE.key -> TABLE_TYPE.defaultValue,
PRECOMBINE_FIELD.key -> PRECOMBINE_FIELD.defaultValue,
HoodieWriteConfig.WRITE_PAYLOAD_CLASS_NAME.key -> HoodieWriteConfig.DEFAULT_WRITE_PAYLOAD_CLASS,
INSERT_DROP_DUPS.key -> INSERT_DROP_DUPS.defaultValue,
ASYNC_COMPACT_ENABLE.key -> ASYNC_COMPACT_ENABLE.defaultValue,
INLINE_CLUSTERING_ENABLE.key -> INLINE_CLUSTERING_ENABLE.defaultValue,
ASYNC_CLUSTERING_ENABLE.key -> ASYNC_CLUSTERING_ENABLE.defaultValue
) ++ DataSourceOptionsHelper.translateConfigurations(parameters)
}
def checkSchemaChange(colNames: Seq[String], catalogTable: CatalogTable): Unit = {
val primaryKeys = catalogTable.storage.properties.getOrElse("primaryKey", catalogTable.properties.getOrElse("primaryKey", "keyid")).split(",").map(_.trim)
val preCombineKey = Seq(catalogTable.storage.properties.getOrElse("preCombineField", catalogTable.properties.getOrElse("preCombineField", "ts"))).map(_.trim)
val partitionKey = catalogTable.partitionColumnNames.map(_.trim)
val checkNames = primaryKeys ++ preCombineKey ++ partitionKey
colNames.foreach { col =>
if (checkNames.contains(col)) {
throw new UnsupportedOperationException("cannot support apply changes for primaryKey/CombineKey/partitionKey")
}
}
}
}

View File

@@ -0,0 +1,29 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.parser
import org.apache.spark.internal.Logging
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.parser.ParserInterface
import org.apache.spark.sql.execution.{SparkSqlAstBuilder, SparkSqlParser}
// TODO: we should remove this file when we support datasourceV2 for hoodie on spark3.1x
class HoodieSpark312ExtendedSqlParser(session: SparkSession, delegate: ParserInterface) extends SparkSqlParser with Logging {
override val astBuilder: SparkSqlAstBuilder = new HoodieSpark312SqlAstBuilder(session)
}

View File

@@ -0,0 +1,125 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.parser
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.parser.ParserUtils.withOrigin
import org.apache.spark.sql.catalyst.parser.SqlBaseParser._
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.execution.SparkSqlAstBuilder
// TODO: we should remove this file when we support datasourceV2 for hoodie on spark3.1x
class HoodieSpark312SqlAstBuilder(sparkSession: SparkSession) extends SparkSqlAstBuilder {
/**
* Parse a [[AlterTableAlterColumnStatement]] command to alter a column's property.
*
* For example:
* {{{
* ALTER TABLE table1 ALTER COLUMN a.b.c TYPE bigint
* ALTER TABLE table1 ALTER COLUMN a.b.c SET NOT NULL
* ALTER TABLE table1 ALTER COLUMN a.b.c DROP NOT NULL
* ALTER TABLE table1 ALTER COLUMN a.b.c COMMENT 'new comment'
* ALTER TABLE table1 ALTER COLUMN a.b.c FIRST
* ALTER TABLE table1 ALTER COLUMN a.b.c AFTER x
* }}}
*/
override def visitAlterTableAlterColumn(ctx: AlterTableAlterColumnContext): LogicalPlan = withOrigin(ctx) {
val alter = super.visitAlterTableAlterColumn(ctx).asInstanceOf[AlterTableAlterColumnStatement]
HoodieAlterTableAlterColumnStatement(alter.tableName, alter.column, alter.dataType, alter.nullable, alter.comment, alter.position)
}
/**
* Parse a [[org.apache.spark.sql.catalyst.plans.logical.AlterTableAddColumnsStatement]] command.
*
* For example:
* {{{
* ALTER TABLE table1
* ADD COLUMNS (col_name data_type [COMMENT col_comment], ...);
* }}}
*/
override def visitAddTableColumns(ctx: AddTableColumnsContext): LogicalPlan = withOrigin(ctx) {
val add = super.visitAddTableColumns(ctx).asInstanceOf[AlterTableAddColumnsStatement]
HoodieAlterTableAddColumnsStatement(add.tableName, add.columnsToAdd)
}
/**
* Parse a [[org.apache.spark.sql.catalyst.plans.logical.AlterTableRenameColumnStatement]] command.
*
* For example:
* {{{
* ALTER TABLE table1 RENAME COLUMN a.b.c TO x
* }}}
*/
override def visitRenameTableColumn(
ctx: RenameTableColumnContext): LogicalPlan = withOrigin(ctx) {
val rename = super.visitRenameTableColumn(ctx).asInstanceOf[AlterTableRenameColumnStatement]
HoodieAlterTableRenameColumnStatement(rename.tableName, rename.column, rename.newName)
}
/**
* Parse a [[AlterTableDropColumnsStatement]] command.
*
* For example:
* {{{
* ALTER TABLE table1 DROP COLUMN a.b.c
* ALTER TABLE table1 DROP COLUMNS a.b.c, x, y
* }}}
*/
override def visitDropTableColumns(
ctx: DropTableColumnsContext): LogicalPlan = withOrigin(ctx) {
val drop = super.visitDropTableColumns(ctx).asInstanceOf[AlterTableDropColumnsStatement]
HoodieAlterTableDropColumnsStatement(drop.tableName, drop.columnsToDrop)
}
/**
* Parse [[AlterViewSetPropertiesStatement]] or [[AlterTableSetPropertiesStatement]] commands.
*
* For example:
* {{{
* ALTER TABLE table SET TBLPROPERTIES ('table_property' = 'property_value');
* ALTER VIEW view SET TBLPROPERTIES ('table_property' = 'property_value');
* }}}
*/
override def visitSetTableProperties(
ctx: SetTablePropertiesContext): LogicalPlan = withOrigin(ctx) {
val set = super.visitSetTableProperties(ctx)
set match {
case s: AlterTableSetPropertiesStatement => HoodieAlterTableSetPropertiesStatement(s.tableName, s.properties)
case other => other
}
}
/**
* Parse [[AlterViewUnsetPropertiesStatement]] or [[AlterTableUnsetPropertiesStatement]] commands.
*
* For example:
* {{{
* ALTER TABLE table UNSET TBLPROPERTIES [IF EXISTS] ('comment', 'key');
* ALTER VIEW view UNSET TBLPROPERTIES [IF EXISTS] ('comment', 'key');
* }}}
*/
override def visitUnsetTableProperties(
ctx: UnsetTablePropertiesContext): LogicalPlan = withOrigin(ctx) {
val unset = super.visitUnsetTableProperties(ctx)
unset match {
case us: AlterTableUnsetPropertiesStatement => HoodieAlterTableUnsetPropertiesStatement(us.tableName, us.propertyKeys, us.ifExists)
case other => other
}
}
}