1
0

[HUDI-1842] Spark Sql Support For pre-existing Hoodie Table (#3393)

This commit is contained in:
pengzhiwei
2021-08-07 19:49:26 +08:00
committed by GitHub
parent 70b6bd485f
commit 55d2e786db
19 changed files with 451 additions and 99 deletions

View File

@@ -26,7 +26,6 @@ import org.apache.hudi.common.model.HoodieTableType.{COPY_ON_WRITE, MERGE_ON_REA
import org.apache.hudi.common.table.{HoodieTableMetaClient, TableSchemaResolver}
import org.apache.hudi.exception.HoodieException
import org.apache.hudi.hadoop.HoodieROTablePathFilter
import org.apache.hudi.hive.util.ConfigUtils
import org.apache.log4j.LogManager
import org.apache.spark.sql.avro.SchemaConverters
import org.apache.spark.sql.execution.datasources.{DataSource, FileStatusCache, HadoopFsRelation}

View File

@@ -126,12 +126,9 @@ object HoodieOptionConfig {
/**
* Mapping the table config (loaded from the hoodie.properties) to the sql options.
* @param options
* @return
*/
def mappingTableConfigToSqlOption(options: Map[String, String]): Map[String, String] = {
options.filterKeys(k => tableConfigKeyToSqlKey.contains(k))
.map(kv => tableConfigKeyToSqlKey(kv._1) -> reverseValueMapping.getOrElse(kv._2, kv._2))
options.map(kv => tableConfigKeyToSqlKey.getOrElse(kv._1, kv._1) -> reverseValueMapping.getOrElse(kv._2, kv._2))
}
private lazy val defaultTableConfig: Map[String, String] = {

View File

@@ -24,7 +24,9 @@ import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.apache.hudi.SparkAdapterSupport
import org.apache.hudi.common.model.HoodieRecord
import org.apache.hudi.common.table.{HoodieTableMetaClient, TableSchemaResolver}
import org.apache.spark.SPARK_VERSION
import org.apache.spark.sql.avro.SchemaConverters
import org.apache.spark.sql.{Column, DataFrame, SparkSession}
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation
@@ -64,6 +66,16 @@ object HoodieSqlUtils extends SparkAdapterSupport {
}
}
def getTableSqlSchema(metaClient: HoodieTableMetaClient): Option[StructType] = {
val schemaResolver = new TableSchemaResolver(metaClient)
val avroSchema = try Some(schemaResolver.getTableAvroSchema(false))
catch {
case _: Throwable => None
}
avroSchema.map(SchemaConverters.toSqlType(_).dataType
.asInstanceOf[StructType])
}
private def tripAlias(plan: LogicalPlan): LogicalPlan = {
plan match {
case SubqueryAlias(_, relation: LogicalPlan) =>
@@ -122,12 +134,12 @@ object HoodieSqlUtils extends SparkAdapterSupport {
* @param spark
* @return
*/
def getTableLocation(tableId: TableIdentifier, spark: SparkSession): Option[String] = {
def getTableLocation(tableId: TableIdentifier, spark: SparkSession): String = {
val table = spark.sessionState.catalog.getTableMetadata(tableId)
getTableLocation(table, spark)
}
def getTableLocation(table: CatalogTable, sparkSession: SparkSession): Option[String] = {
def getTableLocation(table: CatalogTable, sparkSession: SparkSession): String = {
val uri = if (table.tableType == CatalogTableType.MANAGED && isHoodieTable(table)) {
Some(sparkSession.sessionState.catalog.defaultTablePath(table.identifier))
} else {
@@ -136,6 +148,7 @@ object HoodieSqlUtils extends SparkAdapterSupport {
val conf = sparkSession.sessionState.newHadoopConf()
uri.map(makePathQualified(_, conf))
.map(removePlaceHolder)
.getOrElse(throw new IllegalArgumentException(s"Missing location for ${table.identifier}"))
}
private def removePlaceHolder(path: String): String = {
@@ -154,6 +167,16 @@ object HoodieSqlUtils extends SparkAdapterSupport {
fs.makeQualified(hadoopPath).toUri.toString
}
/**
* Check if the hoodie.properties exists in the table path.
*/
def tableExistsInPath(tablePath: String, conf: Configuration): Boolean = {
val basePath = new Path(tablePath)
val fs = basePath.getFileSystem(conf)
val metaPath = new Path(basePath, HoodieTableMetaClient.METAFOLDER_NAME)
fs.exists(metaPath)
}
def castIfNeeded(child: Expression, dataType: DataType, conf: SQLConf): Expression = {
child match {
case Literal(nul, NullType) => Literal(nul, dataType)

View File

@@ -21,7 +21,8 @@ import org.apache.hudi.DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL
import org.apache.hudi.SparkAdapterSupport
import scala.collection.JavaConverters._
import org.apache.hudi.common.model.{HoodieRecord, HoodieTableType}
import org.apache.hudi.common.model.HoodieRecord
import org.apache.hudi.common.table.HoodieTableMetaClient
import org.apache.spark.sql.{AnalysisException, SparkSession}
import org.apache.spark.sql.catalyst.analysis.UnresolvedStar
import org.apache.spark.sql.catalyst.expressions.AttributeReference
@@ -286,7 +287,28 @@ case class HoodieResolveReferences(sparkSession: SparkSession) extends Rule[Logi
} else {
l
}
// Fill schema for Create Table without specify schema info
case c @ CreateTable(tableDesc, _, _)
if isHoodieTable(tableDesc) =>
val tablePath = getTableLocation(c.tableDesc, sparkSession)
if (tableExistsInPath(tablePath, sparkSession.sessionState.newHadoopConf())) {
val metaClient = HoodieTableMetaClient.builder()
.setBasePath(tablePath)
.setConf(sparkSession.sessionState.newHadoopConf())
.build()
val tableSchema = HoodieSqlUtils.getTableSqlSchema(metaClient).map(HoodieSqlUtils.addMetaFields)
if (tableSchema.isDefined && tableDesc.schema.isEmpty) {
// Fill the schema with the schema from the table
c.copy(tableDesc.copy(schema = tableSchema.get))
} else if (tableSchema.isDefined && tableDesc.schema != tableSchema.get) {
throw new AnalysisException(s"Specified schema in create table statement is not equal to the table schema." +
s"You should not specify the schema for an exist table: ${tableDesc.identifier} ")
} else {
c
}
} else {
c
}
case p => p
}

View File

@@ -103,7 +103,6 @@ object AlterHoodieTableAddColumnsCommand {
*/
def commitWithSchema(schema: Schema, table: CatalogTable, sparkSession: SparkSession): Unit = {
val path = getTableLocation(table, sparkSession)
.getOrElse(s"missing location for ${table.identifier}")
val jsc = new JavaSparkContext(sparkSession.sparkContext)
val client = DataSourceUtils.createHoodieClient(jsc, schema.toString,

View File

@@ -68,7 +68,6 @@ case class AlterHoodieTableChangeColumnCommand(
val newSchema = AvroConversionUtils.convertStructTypeToAvroSchema(newSqlSchema, structName, nameSpace)
val path = getTableLocation(table, sparkSession)
.getOrElse(s"missing location for ${table.identifier}")
val hadoopConf = sparkSession.sessionState.newHadoopConf()
val metaClient = HoodieTableMetaClient.builder().setBasePath(path)
.setConf(hadoopConf).build()

View File

@@ -37,7 +37,6 @@ class AlterHoodieTableRenameCommand(
val catalog = sparkSession.sessionState.catalog
val table = catalog.getTableMetadata(oldName)
val path = getTableLocation(table, sparkSession)
.getOrElse(s"missing location for ${table.identifier}")
val hadoopConf = sparkSession.sessionState.newHadoopConf()
val metaClient = HoodieTableMetaClient.builder().setBasePath(path)

View File

@@ -31,7 +31,6 @@ case class CompactionHoodieTableCommand(table: CatalogTable,
override def run(sparkSession: SparkSession): Seq[Row] = {
val basePath = getTableLocation(table, sparkSession)
.getOrElse(s"missing location for ${table.identifier}")
CompactionHoodiePathCommand(basePath, operation, instantTimestamp).run(sparkSession)
}

View File

@@ -29,7 +29,6 @@ case class CompactionShowHoodieTableCommand(table: CatalogTable, limit: Int)
override def run(sparkSession: SparkSession): Seq[Row] = {
val basePath = getTableLocation(table, sparkSession)
.getOrElse(s"missing location for ${table.identifier}")
CompactionShowHoodiePathCommand(basePath, limit).run(sparkSession)
}

View File

@@ -62,7 +62,6 @@ case class CreateHoodieTableAsSelectCommand(
}
}
val tablePath = getTableLocation(table, sparkSession)
.getOrElse(s"Missing path for table ${table.identifier}")
val conf = sparkSession.sessionState.newHadoopConf()
assert(CreateHoodieTableCommand.isEmptyPath(tablePath, conf),
s"Path '$tablePath' should be empty for CTAS")

View File

@@ -19,16 +19,19 @@ package org.apache.spark.sql.hudi.command
import scala.collection.JavaConverters._
import java.util.{Locale, Properties}
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.apache.hudi.client.common.HoodieSparkEngineContext
import org.apache.hudi.common.config.HoodieMetadataConfig
import org.apache.hudi.common.fs.FSUtils
import org.apache.hudi.{DataSourceWriteOptions, SparkAdapterSupport}
import org.apache.hudi.common.model.HoodieFileFormat
import org.apache.hudi.common.table.{HoodieTableMetaClient, TableSchemaResolver}
import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient}
import org.apache.hudi.common.util.ValidationUtils
import org.apache.hudi.hadoop.HoodieParquetInputFormat
import org.apache.hudi.hadoop.realtime.HoodieParquetRealtimeInputFormat
import org.apache.hudi.hadoop.utils.HoodieInputFormatUtils
import org.apache.spark.api.java.JavaSparkContext
import org.apache.spark.{SPARK_VERSION, SparkConf}
import org.apache.spark.internal.Logging
import org.apache.spark.sql.avro.SchemaConverters
@@ -40,8 +43,7 @@ import org.apache.spark.sql.hive.HiveClientUtils
import org.apache.spark.sql.hive.HiveExternalCatalog._
import org.apache.spark.sql.hudi.HoodieSqlUtils._
import org.apache.spark.sql.hudi.HoodieOptionConfig
import org.apache.spark.sql.hudi.command.CreateHoodieTableCommand.{initTableIfNeed, tableExistsInPath, isEmptyPath}
import org.apache.spark.sql.internal.StaticSQLConf
import org.apache.spark.sql.hudi.command.CreateHoodieTableCommand.{initTableIfNeed, isEmptyPath}
import org.apache.spark.sql.internal.StaticSQLConf.SCHEMA_STRING_LENGTH_THRESHOLD
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.{AnalysisException, Row, SparkSession}
@@ -50,8 +52,6 @@ import scala.collection.mutable
/**
* Command for create hoodie table.
* @param table
* @param ignoreIfExists
*/
case class CreateHoodieTableCommand(table: CatalogTable, ignoreIfExists: Boolean)
extends RunnableCommand with SparkAdapterSupport {
@@ -83,7 +83,6 @@ case class CreateHoodieTableCommand(table: CatalogTable, ignoreIfExists: Boolean
val sessionState = sparkSession.sessionState
val tableName = table.identifier.unquotedString
val path = getTableLocation(table, sparkSession)
.getOrElse(s"Missing path for table ${table.identifier}")
val conf = sparkSession.sessionState.newHadoopConf()
val isTableExists = tableExistsInPath(path, conf)
// Get the schema & table options
@@ -95,26 +94,44 @@ case class CreateHoodieTableCommand(table: CatalogTable, ignoreIfExists: Boolean
.setBasePath(path)
.setConf(conf)
.build()
val schemaResolver = new TableSchemaResolver(metaClient)
val avroSchema = try Some(schemaResolver.getTableAvroSchema(false))
catch {
case _: Throwable => None
}
val tableSchema = avroSchema.map(SchemaConverters.toSqlType(_).dataType
.asInstanceOf[StructType])
val tableSchema = getTableSqlSchema(metaClient)
// Get options from the external table and append with the options in ddl.
val options = HoodieOptionConfig.mappingTableConfigToSqlOption(
metaClient.getTableConfig.getProps.asScala.toMap) ++ table.storage.properties
// Get options from the external table and append with the options in ddl.
val originTableConfig = HoodieOptionConfig.mappingTableConfigToSqlOption(
metaClient.getTableConfig.getProps.asScala.toMap)
val userSpecifiedSchema = table.schema
if (userSpecifiedSchema.isEmpty && tableSchema.isDefined) {
val allPartitionPaths = getAllPartitionPaths(sparkSession, table)
var upgrateConfig = Map.empty[String, String]
// If this is a non-hive-styled partition table, disable the hive style config.
// (By default this config is enable for spark sql)
upgrateConfig = if (isNotHiveStyledPartitionTable(allPartitionPaths, table)) {
upgrateConfig + (DataSourceWriteOptions.HIVE_STYLE_PARTITIONING.key -> "false")
} else {
upgrateConfig
}
upgrateConfig = if (isUrlEncodeDisable(allPartitionPaths, table)) {
upgrateConfig + (DataSourceWriteOptions.URL_ENCODE_PARTITIONING.key -> "false")
} else {
upgrateConfig
}
// Use the origin keygen to generate record key to keep the rowkey consistent with the old table for spark sql.
// See SqlKeyGenerator#getRecordKey for detail.
upgrateConfig = if (originTableConfig.contains(HoodieTableConfig.HOODIE_TABLE_KEY_GENERATOR_CLASS.key)) {
upgrateConfig + (SqlKeyGenerator.ORIGIN_KEYGEN_CLASS -> originTableConfig(HoodieTableConfig.HOODIE_TABLE_KEY_GENERATOR_CLASS.key))
} else {
upgrateConfig
}
val options = originTableConfig ++ upgrateConfig ++ table.storage.properties
val userSpecifiedSchema = table.schema
if (userSpecifiedSchema.isEmpty && tableSchema.isDefined) {
(addMetaFields(tableSchema.get), options)
} else if (userSpecifiedSchema.nonEmpty) {
} else if (userSpecifiedSchema.nonEmpty) {
(addMetaFields(userSpecifiedSchema), options)
} else {
} else {
throw new IllegalArgumentException(s"Missing schema for Create Table: $tableName")
}
}
} else {
assert(table.schema.nonEmpty, s"Missing schema for Create Table: $tableName")
// SPARK-19724: the default location of a managed table should be non-existent or empty.
@@ -301,47 +318,102 @@ case class CreateHoodieTableCommand(table: CatalogTable, ignoreIfExists: Boolean
s"'${HoodieOptionConfig.SQL_VALUE_TABLE_TYPE_MOR}'")
}
}
private def getAllPartitionPaths(spark: SparkSession, table: CatalogTable): Seq[String] = {
val sparkEngine = new HoodieSparkEngineContext(new JavaSparkContext(spark.sparkContext))
val metadataConfig = {
val properties = new Properties()
properties.putAll((spark.sessionState.conf.getAllConfs ++ table.storage.properties).asJava)
HoodieMetadataConfig.newBuilder.fromProperties(properties).build()
}
FSUtils.getAllPartitionPaths(sparkEngine, metadataConfig, getTableLocation(table, spark)).asScala
}
/**
* This method is used to compatible with the old non-hive-styled partition table.
* By default we enable the "hoodie.datasource.write.hive_style_partitioning"
* when writing data to hudi table by spark sql by default.
* If the exist table is a non-hive-styled partitioned table, we should
* disable the "hoodie.datasource.write.hive_style_partitioning" when
* merge or update the table. Or else, we will get an incorrect merge result
* as the partition path mismatch.
*/
private def isNotHiveStyledPartitionTable(partitionPaths: Seq[String], table: CatalogTable): Boolean = {
if (table.partitionColumnNames.nonEmpty) {
val isHiveStylePartitionPath = (path: String) => {
val fragments = path.split("/")
if (fragments.size != table.partitionColumnNames.size) {
false
} else {
fragments.zip(table.partitionColumnNames).forall {
case (pathFragment, partitionColumn) => pathFragment.startsWith(s"$partitionColumn=")
}
}
}
!partitionPaths.forall(isHiveStylePartitionPath)
} else {
false
}
}
/**
* If this table has disable the url encode, spark sql should also disable it when writing to the table.
*/
private def isUrlEncodeDisable(partitionPaths: Seq[String], table: CatalogTable): Boolean = {
if (table.partitionColumnNames.nonEmpty) {
!partitionPaths.forall(partitionPath => partitionPath.split("/").length == table.partitionColumnNames.size)
} else {
false
}
}
}
object CreateHoodieTableCommand extends Logging {
/**
* Init the table if it is not exists.
* @param sparkSession
* @param table
* @return
* Init the hoodie.properties.
*/
def initTableIfNeed(sparkSession: SparkSession, table: CatalogTable): Unit = {
val location = getTableLocation(table, sparkSession).getOrElse(
throw new IllegalArgumentException(s"Missing location for ${table.identifier}"))
val location = getTableLocation(table, sparkSession)
val conf = sparkSession.sessionState.newHadoopConf()
// Init the hoodie table
if (!tableExistsInPath(location, conf)) {
val tableName = table.identifier.table
logInfo(s"Table $tableName is not exists, start to create the hudi table")
val originTableConfig = if (tableExistsInPath(location, conf)) {
val metaClient = HoodieTableMetaClient.builder()
.setBasePath(location)
.setConf(conf)
.build()
metaClient.getTableConfig.getProps.asScala.toMap
} else {
Map.empty[String, String]
}
// Save all the table config to the hoodie.properties.
val parameters = HoodieOptionConfig.mappingSqlOptionToTableConfig(table.storage.properties)
val properties = new Properties()
val tableName = table.identifier.table
logInfo(s"Init hoodie.properties for $tableName")
val tableOptions = HoodieOptionConfig.mappingSqlOptionToTableConfig(table.storage.properties)
checkTableConfigEqual(originTableConfig, tableOptions, HoodieTableConfig.HOODIE_TABLE_PRECOMBINE_FIELD_PROP.key)
checkTableConfigEqual(originTableConfig, tableOptions, HoodieTableConfig.HOODIE_TABLE_PARTITION_FIELDS_PROP.key)
checkTableConfigEqual(originTableConfig, tableOptions, HoodieTableConfig.HOODIE_TABLE_RECORDKEY_FIELDS.key)
// Save all the table config to the hoodie.properties.
val parameters = originTableConfig ++ tableOptions
val properties = new Properties()
properties.putAll(parameters.asJava)
HoodieTableMetaClient.withPropertyBuilder()
.fromProperties(properties)
.setTableName(tableName)
.setTableCreateSchema(SchemaConverters.toAvroType(table.schema).toString())
.setPartitionFields(table.partitionColumnNames.mkString(","))
.initTable(conf, location)
}
.fromProperties(properties)
.setTableName(tableName)
.setTableCreateSchema(SchemaConverters.toAvroType(table.schema).toString())
.setPartitionFields(table.partitionColumnNames.mkString(","))
.initTable(conf, location)
}
/**
* Check if the hoodie.properties exists in the table path.
*/
def tableExistsInPath(tablePath: String, conf: Configuration): Boolean = {
val basePath = new Path(tablePath)
val fs = basePath.getFileSystem(conf)
val metaPath = new Path(basePath, HoodieTableMetaClient.METAFOLDER_NAME)
fs.exists(metaPath)
def checkTableConfigEqual(originTableConfig: Map[String, String],
newTableConfig: Map[String, String], configKey: String): Unit = {
if (originTableConfig.contains(configKey) && newTableConfig.contains(configKey)) {
assert(originTableConfig(configKey) == newTableConfig(configKey),
s"Table config: $configKey in the create table is: ${newTableConfig(configKey)}, is not the same with the value in " +
s"hoodie.properties, which is: ${originTableConfig(configKey)}. Please keep the same.")
}
}
/**

View File

@@ -59,7 +59,6 @@ case class DeleteHoodieTableCommand(deleteTable: DeleteFromTable) extends Runnab
val targetTable = sparkSession.sessionState.catalog
.getTableMetadata(tableId)
val path = getTableLocation(targetTable, sparkSession)
.getOrElse(s"missing location for $tableId")
val primaryColumns = HoodieOptionConfig.getPrimaryColumns(targetTable.storage.properties)

View File

@@ -193,7 +193,6 @@ object InsertIntoHoodieTableCommand {
val partitionFields = table.partitionColumnNames.mkString(",")
val path = getTableLocation(table, sparkSession)
.getOrElse(s"Missing location for table ${table.identifier}")
val tableSchema = table.schema
val options = table.storage.properties

View File

@@ -416,7 +416,6 @@ case class MergeIntoHoodieTableCommand(mergeInto: MergeIntoTable) extends Runnab
val targetTableDb = targetTableIdentify.database.getOrElse("default")
val targetTableName = targetTableIdentify.identifier
val path = getTableLocation(targetTable, sparkSession)
.getOrElse(s"missing location for $targetTableIdentify")
val options = targetTable.storage.properties
val definedPk = HoodieOptionConfig.getPrimaryColumns(options)

View File

@@ -18,10 +18,10 @@
package org.apache.spark.sql.hudi.command
import java.util.concurrent.TimeUnit.{MICROSECONDS, MILLISECONDS}
import org.apache.avro.generic.GenericRecord
import org.apache.hudi.common.config.TypedProperties
import org.apache.hudi.common.util.PartitionPathEncodeUtils
import org.apache.hudi.config.HoodieWriteConfig
import org.apache.hudi.keygen.{ComplexKeyGenerator, KeyGenUtils}
import org.apache.spark.sql.types.{StructType, TimestampType}
import org.joda.time.format.DateTimeFormat
@@ -40,45 +40,62 @@ class SqlKeyGenerator(props: TypedProperties) extends ComplexKeyGenerator(props)
None
}
}
// The origin key generator class for this table.
private lazy val originKeyGen = {
val beforeKeyGenClassName = props.getString(SqlKeyGenerator.ORIGIN_KEYGEN_CLASS, null)
if (beforeKeyGenClassName != null) {
val keyGenProps = new TypedProperties()
keyGenProps.putAll(props)
keyGenProps.remove(SqlKeyGenerator.ORIGIN_KEYGEN_CLASS)
keyGenProps.put(HoodieWriteConfig.KEYGENERATOR_CLASS_PROP.key, beforeKeyGenClassName)
Some(KeyGenUtils.createKeyGeneratorByClassName(keyGenProps))
} else {
None
}
}
override def getPartitionPath(record: GenericRecord): String = {
override def getRecordKey(record: GenericRecord): String = {
if (originKeyGen.isDefined) {
originKeyGen.get.getKey(record).getRecordKey
} else {
super.getRecordKey(record)
}
}
override def getPartitionPath(record: GenericRecord) = {
val partitionPath = super.getPartitionPath(record)
if (partitionSchema.isDefined) {
// we can split the partitionPath here because we enable the URL_ENCODE_PARTITIONING_OPT
// by default for sql.
val partitionFragments = partitionPath.split(KeyGenUtils.DEFAULT_PARTITION_PATH_SEPARATOR)
assert(partitionFragments.size == partitionSchema.get.size)
// If it is a table which is not write by spark sql before and the url encode has disabled,
// the partition path level may not equal to the partition schema size. Just return the partitionPath
// in this case.
if (partitionFragments.size != partitionSchema.get.size) {
partitionPath
} else {
partitionFragments.zip(partitionSchema.get.fields).map {
case (partitionValue, partitionField) =>
val hiveStylePrefix = s"${partitionField.name}="
val isHiveStyle = partitionValue.startsWith(hiveStylePrefix)
val _partitionValue = if (isHiveStyle) partitionValue.substring(hiveStylePrefix.length) else partitionValue
partitionFragments.zip(partitionSchema.get.fields).map {
case (partitionValue, partitionField) =>
val hiveStylePrefix = s"${partitionField.name}="
val isHiveStyle = partitionValue.startsWith(hiveStylePrefix)
val _partitionValue = if (isHiveStyle) {
partitionValue.substring(hiveStylePrefix.length)
} else {
partitionValue
}
partitionField.dataType match {
case TimestampType =>
val timeMs = MILLISECONDS.convert(_partitionValue.toLong, MICROSECONDS)
val timestampFormat = PartitionPathEncodeUtils.escapePathName(
SqlKeyGenerator.timestampTimeFormat.print(timeMs))
if (isHiveStyle) {
s"$hiveStylePrefix$timestampFormat"
} else {
timestampFormat
}
case _=> partitionValue
}
}.mkString(KeyGenUtils.DEFAULT_PARTITION_PATH_SEPARATOR)
} else {
partitionPath
}
partitionField.dataType match {
case TimestampType =>
val timeMs = MILLISECONDS.convert(_partitionValue.toLong, MICROSECONDS)
val timestampFormat = PartitionPathEncodeUtils.escapePathName(
SqlKeyGenerator.timestampTimeFormat.print(timeMs))
if (isHiveStyle) s"$hiveStylePrefix$timestampFormat" else timestampFormat
case _ => partitionValue
}
}.mkString(KeyGenUtils.DEFAULT_PARTITION_PATH_SEPARATOR)
}
} else partitionPath
}
}
object SqlKeyGenerator {
val PARTITION_SCHEMA = "hoodie.sql.partition.schema"
val ORIGIN_KEYGEN_CLASS = "hoodie.sql.origin.keygen.class"
private val timestampTimeFormat = DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss")
}

View File

@@ -35,7 +35,6 @@ class TruncateHoodieTableCommand(
override def run(sparkSession: SparkSession): Seq[Row] = {
val table = sparkSession.sessionState.catalog.getTableMetadata(tableName)
val path = getTableLocation(table, sparkSession)
.getOrElse(s"missing location for ${table.identifier}")
val hadoopConf = sparkSession.sessionState.newHadoopConf()
// If we have not specified the partition, truncate will delete all the
// data in the table path include the hoodi.properties. In this case we

View File

@@ -85,7 +85,6 @@ case class UpdateHoodieTableCommand(updateTable: UpdateTable) extends RunnableCo
val targetTable = sparkSession.sessionState.catalog
.getTableMetadata(tableId)
val path = getTableLocation(targetTable, sparkSession)
.getOrElse(s"missing location for $tableId")
val primaryColumns = HoodieOptionConfig.getPrimaryColumns(targetTable.storage.properties)

View File

@@ -17,9 +17,15 @@
package org.apache.spark.sql.hudi
import org.apache.hudi.DataSourceWriteOptions._
import scala.collection.JavaConverters._
import org.apache.hudi.common.model.HoodieRecord
import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient}
import org.apache.hudi.config.HoodieWriteConfig
import org.apache.hudi.hadoop.realtime.HoodieParquetRealtimeInputFormat
import org.apache.hudi.keygen.{ComplexKeyGenerator, NonpartitionedKeyGenerator, SimpleKeyGenerator}
import org.apache.spark.sql.SaveMode
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.catalog.CatalogTableType
import org.apache.spark.sql.types.{DoubleType, IntegerType, LongType, StringType, StructField}
@@ -272,4 +278,231 @@ class TestCreateTable extends TestHoodieSqlBase {
)
}
}
test("Test Create Table From Exist Hoodie Table") {
withTempDir { tmp =>
Seq("2021-08-02", "2021/08/02").foreach { partitionValue =>
val tableName = generateTableName
val tablePath = s"${tmp.getCanonicalPath}/$tableName"
import spark.implicits._
val df = Seq((1, "a1", 10, 1000, partitionValue)).toDF("id", "name", "value", "ts", "dt")
// Write a table by spark dataframe.
df.write.format("hudi")
.option(HoodieWriteConfig.TABLE_NAME.key, tableName)
.option(TABLE_TYPE.key, COW_TABLE_TYPE_OPT_VAL)
.option(RECORDKEY_FIELD.key, "id")
.option(PRECOMBINE_FIELD.key, "ts")
.option(PARTITIONPATH_FIELD.key, "dt")
.option(KEYGENERATOR_CLASS.key, classOf[SimpleKeyGenerator].getName)
.option(HoodieWriteConfig.INSERT_PARALLELISM.key, "1")
.option(HoodieWriteConfig.UPSERT_PARALLELISM.key, "1")
.mode(SaveMode.Overwrite)
.save(tablePath)
// Create a table over the exist old table.
spark.sql(
s"""
|create table $tableName using hudi
| options (
| primaryKey = 'id',
| preCombineField = 'ts'
|)
|partitioned by (dt)
|location '$tablePath'
|""".stripMargin)
checkAnswer(s"select id, name, value, ts, dt from $tableName")(
Seq(1, "a1", 10, 1000, partitionValue)
)
// Check the missing properties for spark sql
val metaClient = HoodieTableMetaClient.builder()
.setBasePath(tablePath)
.setConf(spark.sessionState.newHadoopConf())
.build()
val properties = metaClient.getTableConfig.getProps.asScala.toMap
assertResult(true)(properties.contains(HoodieTableConfig.HOODIE_TABLE_CREATE_SCHEMA.key))
assertResult("dt")(properties(HoodieTableConfig.HOODIE_TABLE_PARTITION_FIELDS_PROP.key))
assertResult("ts")(properties(HoodieTableConfig.HOODIE_TABLE_PRECOMBINE_FIELD_PROP.key))
// Test insert into
spark.sql(s"insert into $tableName values(2, 'a2', 10, 1000, '$partitionValue')")
checkAnswer(s"select _hoodie_record_key, _hoodie_partition_path, id, name, value, ts, dt from $tableName order by id")(
Seq("1", partitionValue, 1, "a1", 10, 1000, partitionValue),
Seq("2", partitionValue, 2, "a2", 10, 1000, partitionValue)
)
// Test merge into
spark.sql(
s"""
|merge into $tableName h0
|using (select 1 as id, 'a1' as name, 11 as value, 1001 as ts, '$partitionValue' as dt) s0
|on h0.id = s0.id
|when matched then update set *
|""".stripMargin)
checkAnswer(s"select _hoodie_record_key, _hoodie_partition_path, id, name, value, ts, dt from $tableName order by id")(
Seq("1", partitionValue, 1, "a1", 11, 1001, partitionValue),
Seq("2", partitionValue, 2, "a2", 10, 1000, partitionValue)
)
// Test update
spark.sql(s"update $tableName set value = value + 1 where id = 2")
checkAnswer(s"select _hoodie_record_key, _hoodie_partition_path, id, name, value, ts, dt from $tableName order by id")(
Seq("1", partitionValue, 1, "a1", 11, 1001, partitionValue),
Seq("2", partitionValue, 2, "a2", 11, 1000, partitionValue)
)
// Test delete
spark.sql(s"delete from $tableName where id = 1")
checkAnswer(s"select _hoodie_record_key, _hoodie_partition_path, id, name, value, ts, dt from $tableName order by id")(
Seq("2", partitionValue, 2, "a2", 11, 1000, partitionValue)
)
}
}
}
test("Test Create Table From Exist Hoodie Table For Multi-Level Partitioned Table") {
withTempDir { tmp =>
Seq("2021-08-02", "2021/08/02").foreach { day =>
val tableName = generateTableName
val tablePath = s"${tmp.getCanonicalPath}/$tableName"
import spark.implicits._
val df = Seq((1, "a1", 10, 1000, day, 12)).toDF("id", "name", "value", "ts", "day", "hh")
// Write a table by spark dataframe.
df.write.format("hudi")
.option(HoodieWriteConfig.TABLE_NAME.key, tableName)
.option(TABLE_TYPE.key, MOR_TABLE_TYPE_OPT_VAL)
.option(RECORDKEY_FIELD.key, "id")
.option(PRECOMBINE_FIELD.key, "ts")
.option(PARTITIONPATH_FIELD.key, "day,hh")
.option(KEYGENERATOR_CLASS.key, classOf[ComplexKeyGenerator].getName)
.option(HoodieWriteConfig.INSERT_PARALLELISM.key, "1")
.option(HoodieWriteConfig.UPSERT_PARALLELISM.key, "1")
.mode(SaveMode.Overwrite)
.save(tablePath)
// Create a table over the exist old table.
spark.sql(
s"""
|create table $tableName using hudi
| options (
| primaryKey = 'id',
| preCombineField = 'ts'
|)
|partitioned by (day, hh)
|location '$tablePath'
|""".stripMargin)
checkAnswer(s"select id, name, value, ts, day, hh from $tableName")(
Seq(1, "a1", 10, 1000, day, 12)
)
// Check the missing properties for spark sql
val metaClient = HoodieTableMetaClient.builder()
.setBasePath(tablePath)
.setConf(spark.sessionState.newHadoopConf())
.build()
val properties = metaClient.getTableConfig.getProps.asScala.toMap
assertResult(true)(properties.contains(HoodieTableConfig.HOODIE_TABLE_CREATE_SCHEMA.key))
assertResult("day,hh")(properties(HoodieTableConfig.HOODIE_TABLE_PARTITION_FIELDS_PROP.key))
assertResult("ts")(properties(HoodieTableConfig.HOODIE_TABLE_PRECOMBINE_FIELD_PROP.key))
// Test insert into
spark.sql(s"insert into $tableName values(2, 'a2', 10, 1000, '$day', 12)")
checkAnswer(s"select _hoodie_record_key, _hoodie_partition_path, id, name, value, ts, day, hh from $tableName order by id")(
Seq("id:1", s"$day/12", 1, "a1", 10, 1000, day, 12),
Seq("id:2", s"$day/12", 2, "a2", 10, 1000, day, 12)
)
// Test merge into
spark.sql(
s"""
|merge into $tableName h0
|using (select 1 as id, 'a1' as name, 11 as value, 1001 as ts, '$day' as day, 12 as hh) s0
|on h0.id = s0.id
|when matched then update set *
|""".stripMargin)
checkAnswer(s"select _hoodie_record_key, _hoodie_partition_path, id, name, value, ts, day, hh from $tableName order by id")(
Seq("id:1", s"$day/12", 1, "a1", 11, 1001, day, 12),
Seq("id:2", s"$day/12", 2, "a2", 10, 1000, day, 12)
)
// Test update
spark.sql(s"update $tableName set value = value + 1 where id = 2")
checkAnswer(s"select _hoodie_record_key, _hoodie_partition_path, id, name, value, ts, day, hh from $tableName order by id")(
Seq("id:1", s"$day/12", 1, "a1", 11, 1001, day, 12),
Seq("id:2", s"$day/12", 2, "a2", 11, 1000, day, 12)
)
// Test delete
spark.sql(s"delete from $tableName where id = 1")
checkAnswer(s"select _hoodie_record_key, _hoodie_partition_path, id, name, value, ts, day, hh from $tableName order by id")(
Seq("id:2", s"$day/12", 2, "a2", 11, 1000, day, 12)
)
}
}
}
test("Test Create Table From Exist Hoodie Table For None Partitioned Table") {
withTempDir{tmp =>
// Write a table by spark dataframe.
val tableName = generateTableName
import spark.implicits._
val df = Seq((1, "a1", 10, 1000)).toDF("id", "name", "value", "ts")
df.write.format("hudi")
.option(HoodieWriteConfig.TABLE_NAME.key, tableName)
.option(TABLE_TYPE.key, COW_TABLE_TYPE_OPT_VAL)
.option(RECORDKEY_FIELD.key, "id")
.option(PRECOMBINE_FIELD.key, "ts")
.option(PARTITIONPATH_FIELD.key, "")
.option(KEYGENERATOR_CLASS.key, classOf[NonpartitionedKeyGenerator].getName)
.option(HoodieWriteConfig.INSERT_PARALLELISM.key, "1")
.option(HoodieWriteConfig.UPSERT_PARALLELISM.key, "1")
.mode(SaveMode.Overwrite)
.save(tmp.getCanonicalPath)
// Create a table over the exist old table.
spark.sql(
s"""
|create table $tableName using hudi
| options (
| primaryKey = 'id',
| preCombineField = 'ts'
|)
|location '${tmp.getCanonicalPath}'
|""".stripMargin)
checkAnswer(s"select id, name, value, ts from $tableName")(
Seq(1, "a1", 10, 1000)
)
// Check the missing properties for spark sql
val metaClient = HoodieTableMetaClient.builder()
.setBasePath(tmp.getCanonicalPath)
.setConf(spark.sessionState.newHadoopConf())
.build()
val properties = metaClient.getTableConfig.getProps.asScala.toMap
assertResult(true)(properties.contains(HoodieTableConfig.HOODIE_TABLE_CREATE_SCHEMA.key))
assertResult("ts")(properties(HoodieTableConfig.HOODIE_TABLE_PRECOMBINE_FIELD_PROP.key))
// Test insert into
spark.sql(s"insert into $tableName values(2, 'a2', 10, 1000)")
checkAnswer(s"select _hoodie_record_key, _hoodie_partition_path, id, name, value, ts from $tableName order by id")(
Seq("1", "", 1, "a1", 10, 1000),
Seq("2", "", 2, "a2", 10, 1000)
)
// Test merge into
spark.sql(
s"""
|merge into $tableName h0
|using (select 1 as id, 'a1' as name, 11 as value, 1001 as ts) s0
|on h0.id = s0.id
|when matched then update set *
|""".stripMargin)
checkAnswer(s"select id, name, value, ts from $tableName order by id")(
Seq(1, "a1", 11, 1001),
Seq(2, "a2", 10, 1000)
)
// Test update
spark.sql(s"update $tableName set value = value + 1 where id = 2")
checkAnswer(s"select id, name, value, ts from $tableName order by id")(
Seq(1, "a1", 11, 1001),
Seq(2, "a2", 11, 1000)
)
// Test delete
spark.sql(s"delete from $tableName where id = 1")
checkAnswer(s"select id, name, value, ts from $tableName order by id")(
Seq(2, "a2", 11, 1000)
)
}
}
}

View File

@@ -126,9 +126,9 @@ class TestPartialUpdateForMergeInto extends TestHoodieSqlBase {
checkException(
s"""
|merge into $tableName2 t0
|using ( select 1 as id, 'a1' as name, 12 as price) s0
|using ( select 1 as id, 'a1' as name, 12 as price, 1000 as ts) s0
|on t0.id = s0.id
|when matched then update set price = s0.price
|when matched then update set price = s0.price, _ts = s0.ts
""".stripMargin)(
"Missing specify the value for target field: 'id' in merge into update action for MOR table. " +
"Currently we cannot support partial update for MOR, please complete all the target fields " +