1
0

[HUDI-2538] persist some configs to hoodie.properties when the first write (#3823)

This commit is contained in:
Yann Byron
2021-11-03 10:04:23 +08:00
committed by GitHub
parent 1f17467f73
commit 6351e5f4d0
25 changed files with 544 additions and 207 deletions

View File

@@ -154,14 +154,12 @@ class DefaultSource extends RelationProvider
mode: SaveMode,
optParams: Map[String, String],
df: DataFrame): BaseRelation = {
val parameters = HoodieWriterUtils.parametersWithWriteDefaults(optParams)
val translatedOptions = DataSourceWriteOptions.translateSqlOptions(parameters)
val dfWithoutMetaCols = df.drop(HoodieRecord.HOODIE_META_COLUMNS.asScala:_*)
if (translatedOptions(OPERATION.key).equals(BOOTSTRAP_OPERATION_OPT_VAL)) {
HoodieSparkSqlWriter.bootstrap(sqlContext, mode, translatedOptions, dfWithoutMetaCols)
if (optParams.get(OPERATION.key).contains(BOOTSTRAP_OPERATION_OPT_VAL)) {
HoodieSparkSqlWriter.bootstrap(sqlContext, mode, optParams, dfWithoutMetaCols)
} else {
HoodieSparkSqlWriter.write(sqlContext, mode, translatedOptions, dfWithoutMetaCols)
HoodieSparkSqlWriter.write(sqlContext, mode, optParams, dfWithoutMetaCols)
}
new HoodieEmptyRelation(sqlContext, dfWithoutMetaCols.schema)
}
@@ -170,11 +168,9 @@ class DefaultSource extends RelationProvider
optParams: Map[String, String],
partitionColumns: Seq[String],
outputMode: OutputMode): Sink = {
val parameters = HoodieWriterUtils.parametersWithWriteDefaults(optParams)
val translatedOptions = DataSourceWriteOptions.translateSqlOptions(parameters)
new HoodieStreamingSink(
sqlContext,
translatedOptions,
optParams,
partitionColumns,
outputMode)
}

View File

@@ -17,13 +17,13 @@
package org.apache.hudi
import org.apache.avro.Schema
import org.apache.avro.generic.GenericRecord
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hadoop.hive.conf.HiveConf
import org.apache.hudi.DataSourceWriteOptions._
import org.apache.hudi.DataSourceOptionsHelper.{allAlternatives, translateConfigurations}
import org.apache.hudi.avro.HoodieAvroUtils
import org.apache.hudi.client.{HoodieWriteResult, SparkRDDWriteClient}
import org.apache.hudi.common.config.{HoodieConfig, HoodieMetadataConfig, TypedProperties}
@@ -31,12 +31,13 @@ import org.apache.hudi.common.fs.FSUtils
import org.apache.hudi.common.model.{HoodieRecordPayload, HoodieTableType, WriteOperationType}
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline
import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient, TableSchemaResolver}
import org.apache.hudi.common.util.{CommitUtils, ReflectionUtils}
import org.apache.hudi.common.util.{CommitUtils, ReflectionUtils, StringUtils}
import org.apache.hudi.config.HoodieBootstrapConfig.{BASE_PATH, INDEX_CLASS_NAME}
import org.apache.hudi.config.{HoodieInternalConfig, HoodieWriteConfig}
import org.apache.hudi.exception.HoodieException
import org.apache.hudi.execution.bulkinsert.{BulkInsertInternalPartitionerWithRowsFactory, NonSortPartitionerWithRows}
import org.apache.hudi.hive.{HiveSyncConfig, HiveSyncTool}
import org.apache.hudi.index.SparkHoodieIndexFactory
import org.apache.hudi.internal.DataSourceInternalWriterHelper
import org.apache.hudi.keygen.factory.HoodieSparkKeyGeneratorFactory
import org.apache.hudi.sync.common.AbstractSyncTool
@@ -51,9 +52,9 @@ import org.apache.spark.{SPARK_VERSION, SparkContext}
import java.util
import java.util.Properties
import org.apache.hudi.index.SparkHoodieIndexFactory
import scala.collection.JavaConversions._
import scala.collection.mutable
import scala.collection.mutable.StringBuilder
import scala.collection.mutable.ListBuffer
object HoodieSparkSqlWriter {
@@ -65,7 +66,7 @@ object HoodieSparkSqlWriter {
def write(sqlContext: SQLContext,
mode: SaveMode,
parameters: Map[String, String],
optParams: Map[String, String],
df: DataFrame,
hoodieTableConfigOpt: Option[HoodieTableConfig] = Option.empty,
hoodieWriteClient: Option[SparkRDDWriteClient[HoodieRecordPayload[Nothing]]] = Option.empty,
@@ -75,16 +76,23 @@ object HoodieSparkSqlWriter {
: (Boolean, common.util.Option[String], common.util.Option[String], common.util.Option[String],
SparkRDDWriteClient[HoodieRecordPayload[Nothing]], HoodieTableConfig) = {
assert(optParams.get("path").exists(!StringUtils.isNullOrEmpty(_)), "'path' must be set")
val path = optParams("path")
val basePath = new Path(path)
val sparkContext = sqlContext.sparkContext
val path = parameters.get("path")
val hoodieConfig = HoodieWriterUtils.convertMapToHoodieConfig(parameters)
val tblNameOp = hoodieConfig.getStringOrThrow(HoodieWriteConfig.TBL_NAME, s"'${HoodieWriteConfig.TBL_NAME.key}' must be set.")
val fs = basePath.getFileSystem(sparkContext.hadoopConfiguration)
tableExists = fs.exists(new Path(basePath, HoodieTableMetaClient.METAFOLDER_NAME))
var tableConfig = getHoodieTableConfig(sparkContext, path, hoodieTableConfigOpt)
validateTableConfig(sqlContext.sparkSession, optParams, tableConfig)
val (parameters, hoodieConfig) = mergeParamsAndGetHoodieConfig(optParams, tableConfig)
val tblName = hoodieConfig.getStringOrThrow(HoodieWriteConfig.TBL_NAME,
s"'${HoodieWriteConfig.TBL_NAME.key}' must be set.").trim
assert(!StringUtils.isNullOrEmpty(hoodieConfig.getString(HoodieWriteConfig.TBL_NAME)),
s"'${HoodieWriteConfig.TBL_NAME.key}' must be set.")
asyncCompactionTriggerFnDefined = asyncCompactionTriggerFn.isDefined
asyncClusteringTriggerFnDefined = asyncClusteringTriggerFn.isDefined
if (path.isEmpty) {
throw new HoodieException(s"'path' must be set.")
}
val tblName = tblNameOp.trim
sparkContext.getConf.getOption("spark.serializer") match {
case Some(ser) if ser.equals("org.apache.spark.serializer.KryoSerializer") =>
case _ => throw new HoodieException("hoodie only support org.apache.spark.serializer.KryoSerializer as spark.serializer")
@@ -105,12 +113,8 @@ object HoodieSparkSqlWriter {
}
val jsc = new JavaSparkContext(sparkContext)
val basePath = new Path(path.get)
val instantTime = HoodieActiveTimeline.createNewInstantTime()
val fs = basePath.getFileSystem(sparkContext.hadoopConfiguration)
tableExists = fs.exists(new Path(basePath, HoodieTableMetaClient.METAFOLDER_NAME))
var tableConfig = getHoodieTableConfig(sparkContext, path.get, hoodieTableConfigOpt)
val keyGenerator = HoodieSparkKeyGeneratorFactory.createKeyGenerator(toProperties(parameters))
val keyGenerator = HoodieSparkKeyGeneratorFactory.createKeyGenerator(new TypedProperties(hoodieConfig.getProps))
if (mode == SaveMode.Ignore && tableExists) {
log.warn(s"hoodie table at $basePath already exists. Ignoring & not performing actual writes.")
@@ -124,7 +128,7 @@ object HoodieSparkSqlWriter {
val baseFileFormat = hoodieConfig.getStringOrDefault(HoodieTableConfig.BASE_FILE_FORMAT)
val archiveLogFolder = hoodieConfig.getStringOrDefault(HoodieTableConfig.ARCHIVELOG_FOLDER)
val recordKeyFields = hoodieConfig.getString(DataSourceWriteOptions.RECORDKEY_FIELD)
val populateMetaFields = parameters.getOrElse(HoodieTableConfig.POPULATE_META_FIELDS.key(), HoodieTableConfig.POPULATE_META_FIELDS.defaultValue()).toBoolean
val populateMetaFields = hoodieConfig.getBooleanOrDefault(HoodieTableConfig.POPULATE_META_FIELDS)
val tableMetaClient = HoodieTableMetaClient.withPropertyBuilder()
.setTableType(tableType)
@@ -138,7 +142,9 @@ object HoodieSparkSqlWriter {
.setPopulateMetaFields(populateMetaFields)
.setRecordKeyFields(hoodieConfig.getString(RECORDKEY_FIELD))
.setKeyGeneratorClassProp(hoodieConfig.getString(KEYGENERATOR_CLASS_NAME))
.initTable(sparkContext.hadoopConfiguration, path.get)
.setHiveStylePartitioningEnable(hoodieConfig.getBoolean(HIVE_STYLE_PARTITIONING))
.setUrlEncodePartitioning(hoodieConfig.getBoolean(URL_ENCODE_PARTITIONING))
.initTable(sparkContext.hadoopConfiguration, path)
tableConfig = tableMetaClient.getTableConfig
}
@@ -169,7 +175,7 @@ object HoodieSparkSqlWriter {
// Create a HoodieWriteClient & issue the delete.
val client = hoodieWriteClient.getOrElse(DataSourceUtils.createHoodieClient(jsc,
null, path.get, tblName,
null, path, tblName,
mapAsJavaMap(parameters - HoodieWriteConfig.AUTO_COMMIT_ENABLE.key)))
.asInstanceOf[SparkRDDWriteClient[HoodieRecordPayload[Nothing]]]
@@ -200,7 +206,7 @@ object HoodieSparkSqlWriter {
}
// Create a HoodieWriteClient & issue the delete.
val client = hoodieWriteClient.getOrElse(DataSourceUtils.createHoodieClient(jsc,
null, path.get, tblName,
null, path, tblName,
mapAsJavaMap(parameters - HoodieWriteConfig.AUTO_COMMIT_ENABLE.key)))
.asInstanceOf[SparkRDDWriteClient[HoodieRecordPayload[Nothing]]]
// Issue delete partitions
@@ -244,7 +250,7 @@ object HoodieSparkSqlWriter {
val writeSchema = if (dropPartitionColumns) generateSchemaWithoutPartitionColumns(partitionColumns, schema) else schema
// Create a HoodieWriteClient & issue the write.
val client = hoodieWriteClient.getOrElse(DataSourceUtils.createHoodieClient(jsc, writeSchema.toString, path.get,
val client = hoodieWriteClient.getOrElse(DataSourceUtils.createHoodieClient(jsc, writeSchema.toString, path,
tblName, mapAsJavaMap(parameters - HoodieWriteConfig.AUTO_COMMIT_ENABLE.key)
)).asInstanceOf[SparkRDDWriteClient[HoodieRecordPayload[Nothing]]]
@@ -326,14 +332,21 @@ object HoodieSparkSqlWriter {
def bootstrap(sqlContext: SQLContext,
mode: SaveMode,
parameters: Map[String, String],
optParams: Map[String, String],
df: DataFrame,
hoodieTableConfigOpt: Option[HoodieTableConfig] = Option.empty,
hoodieWriteClient: Option[SparkRDDWriteClient[HoodieRecordPayload[Nothing]]] = Option.empty): Boolean = {
assert(optParams.get("path").exists(!StringUtils.isNullOrEmpty(_)), "'path' must be set")
val path = optParams("path")
val basePath = new Path(path)
val sparkContext = sqlContext.sparkContext
val path = parameters.getOrElse("path", throw new HoodieException("'path' must be set."))
val hoodieConfig = HoodieWriterUtils.convertMapToHoodieConfig(parameters)
val fs = basePath.getFileSystem(sparkContext.hadoopConfiguration)
tableExists = fs.exists(new Path(basePath, HoodieTableMetaClient.METAFOLDER_NAME))
var tableConfig = getHoodieTableConfig(sparkContext, path, hoodieTableConfigOpt)
validateTableConfig(sqlContext.sparkSession, optParams, tableConfig)
val (parameters, hoodieConfig) = mergeParamsAndGetHoodieConfig(optParams, tableConfig)
val tableName = hoodieConfig.getStringOrThrow(HoodieWriteConfig.TBL_NAME, s"'${HoodieWriteConfig.TBL_NAME.key}' must be set.")
val tableType = hoodieConfig.getStringOrDefault(TABLE_TYPE)
val bootstrapBasePath = hoodieConfig.getStringOrThrow(BASE_PATH,
@@ -349,10 +362,6 @@ object HoodieSparkSqlWriter {
schema = HoodieAvroUtils.getNullSchema.toString
}
val basePath = new Path(path)
val fs = basePath.getFileSystem(sparkContext.hadoopConfiguration)
tableExists = fs.exists(new Path(basePath, HoodieTableMetaClient.METAFOLDER_NAME))
val tableConfig = getHoodieTableConfig(sparkContext, path, hoodieTableConfigOpt)
// Handle various save modes
if (mode == SaveMode.Ignore && tableExists) {
@@ -381,6 +390,8 @@ object HoodieSparkSqlWriter {
.setPartitionFields(partitionColumns)
.setPopulateMetaFields(populateMetaFields)
.setKeyGeneratorClassProp(keyGenProp)
.setHiveStylePartitioningEnable(hoodieConfig.getBoolean(HIVE_STYLE_PARTITIONING))
.setUrlEncodePartitioning(hoodieConfig.getBoolean(URL_ENCODE_PARTITIONING))
.initTable(sparkContext.hadoopConfiguration, path)
}
@@ -401,7 +412,7 @@ object HoodieSparkSqlWriter {
df: DataFrame,
tblName: String,
basePath: Path,
path: Option[String],
path: String,
instantTime: String,
partitionColumns: String): (Boolean, common.util.Option[String]) = {
val sparkContext = sqlContext.sparkContext
@@ -424,7 +435,7 @@ object HoodieSparkSqlWriter {
throw new HoodieException("Dropping duplicates with bulk_insert in row writer path is not supported yet")
}
val params = parameters.updated(HoodieWriteConfig.AVRO_SCHEMA_STRING.key, schema.toString)
val writeConfig = DataSourceUtils.createHoodieConfig(schema.toString, path.get, tblName, mapAsJavaMap(params))
val writeConfig = DataSourceUtils.createHoodieConfig(schema.toString, path, tblName, mapAsJavaMap(params))
val bulkInsertPartitionerRows : BulkInsertPartitioner[Dataset[Row]] = if (populateMetaFields) {
val userDefinedBulkInsertPartitionerOpt = DataSourceUtils.createUserDefinedBulkInsertPartitionerWithRows(writeConfig)
if (userDefinedBulkInsertPartitionerOpt.isPresent) {
@@ -699,4 +710,49 @@ object HoodieSparkSqlWriter {
null
}
}
private def validateTableConfig(spark: SparkSession, params: Map[String, String],
tableConfig: HoodieTableConfig): Unit = {
val resolver = spark.sessionState.conf.resolver
val diffConfigs = StringBuilder.newBuilder
params.foreach { case (key, value) =>
val existingValue = getStringFromTableConfigWithAlternatives(tableConfig, key)
if (null != existingValue && !resolver(existingValue, value)) {
diffConfigs.append(s"$key:\t$value\t${tableConfig.getString(key)}\n")
}
}
if (diffConfigs.nonEmpty) {
diffConfigs.insert(0, "\nConfig conflict(key\tcurrent value\texisting value):\n")
throw new HoodieException(diffConfigs.toString.trim)
}
}
private def mergeParamsAndGetHoodieConfig(optParams: Map[String, String],
tableConfig: HoodieTableConfig): (Map[String, String], HoodieConfig) = {
val mergedParams = mutable.Map.empty ++
DataSourceWriteOptions.translateSqlOptions(HoodieWriterUtils.parametersWithWriteDefaults(optParams))
if (!mergedParams.contains(HoodieTableConfig.KEY_GENERATOR_CLASS_NAME.key)
&& mergedParams.contains(KEYGENERATOR_CLASS_NAME.key)) {
mergedParams(HoodieTableConfig.KEY_GENERATOR_CLASS_NAME.key) = mergedParams(KEYGENERATOR_CLASS_NAME.key)
}
if (null != tableConfig) {
tableConfig.getProps.foreach { case (key, value) =>
mergedParams(key) = value
}
}
val params = mergedParams.toMap
(params, HoodieWriterUtils.convertMapToHoodieConfig(params))
}
private def getStringFromTableConfigWithAlternatives(tableConfig: HoodieTableConfig, key: String): String = {
if (null == tableConfig) {
null
} else {
if (allAlternatives.contains(key)) {
tableConfig.getString(allAlternatives(key))
} else {
tableConfig.getString(key)
}
}
}
}

View File

@@ -48,9 +48,12 @@ class HoodieStreamingSink(sqlContext: SQLContext,
private val log = LogManager.getLogger(classOf[HoodieStreamingSink])
private val retryCnt = options(DataSourceWriteOptions.STREAMING_RETRY_CNT.key).toInt
private val retryIntervalMs = options(DataSourceWriteOptions.STREAMING_RETRY_INTERVAL_MS.key).toLong
private val ignoreFailedBatch = options(DataSourceWriteOptions.STREAMING_IGNORE_FAILED_BATCH.key).toBoolean
private val retryCnt = options.getOrDefault(DataSourceWriteOptions.STREAMING_RETRY_CNT.key,
DataSourceWriteOptions.STREAMING_RETRY_CNT.defaultValue).toInt
private val retryIntervalMs = options.getOrDefault(DataSourceWriteOptions.STREAMING_RETRY_INTERVAL_MS.key,
DataSourceWriteOptions.STREAMING_RETRY_INTERVAL_MS.defaultValue).toLong
private val ignoreFailedBatch = options.getOrDefault(DataSourceWriteOptions.STREAMING_IGNORE_FAILED_BATCH.key,
DataSourceWriteOptions.STREAMING_IGNORE_FAILED_BATCH.defaultValue).toBoolean
private var isAsyncCompactorServiceShutdownAbnormally = false
private var isAsyncClusteringServiceShutdownAbnormally = false

View File

@@ -92,10 +92,9 @@ object HoodieWriterUtils {
* @return
*/
def getPartitionColumns(parameters: Map[String, String]): String = {
val props = new TypedProperties()
val props = new Properties()
props.putAll(parameters.asJava)
val keyGen = HoodieSparkKeyGeneratorFactory.createKeyGenerator(props)
HoodieSparkUtils.getPartitionColumns(keyGen, props)
HoodieSparkUtils.getPartitionColumns(props)
}
def convertMapToHoodieConfig(parameters: Map[String, String]): HoodieConfig = {

View File

@@ -120,8 +120,13 @@ object HoodieOptionConfig {
*/
def mappingSqlOptionToTableConfig(options: Map[String, String]): Map[String, String] = {
defaultTableConfig ++
options.filterKeys(k => keyTableConfigMapping.contains(k))
.map(kv => keyTableConfigMapping(kv._1) -> valueMapping.getOrElse(kv._2, kv._2))
options.map { case (k, v) =>
if (keyTableConfigMapping.contains(k)) {
keyTableConfigMapping(k) -> valueMapping.getOrElse(v, v)
} else {
k -> v
}
}
}
/**

View File

@@ -41,8 +41,12 @@ import org.apache.spark.sql.internal.StaticSQLConf.SCHEMA_STRING_LENGTH_THRESHOL
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.{AnalysisException, Row, SparkSession}
import org.apache.spark.{SPARK_VERSION, SparkConf}
import java.util.{Locale, Properties}
import org.apache.hudi.exception.HoodieException
import org.apache.hudi.keygen.ComplexKeyGenerator
import org.apache.hudi.keygen.factory.HoodieSparkKeyGeneratorFactory
import scala.collection.JavaConverters._
import scala.collection.mutable
@@ -90,35 +94,13 @@ case class CreateHoodieTableCommand(table: CatalogTable, ignoreIfExists: Boolean
.setBasePath(path)
.setConf(conf)
.build()
val tableSchema = getTableSqlSchema(metaClient)
val tableSchema = getTableSqlSchema(metaClient)
// Get options from the external table and append with the options in ddl.
val originTableConfig = HoodieOptionConfig.mappingTableConfigToSqlOption(
metaClient.getTableConfig.getProps.asScala.toMap)
val allPartitionPaths = getAllPartitionPaths(sparkSession, table)
var upgrateConfig = Map.empty[String, String]
// If this is a non-hive-styled partition table, disable the hive style config.
// (By default this config is enable for spark sql)
upgrateConfig = if (!isHiveStyledPartitioning(allPartitionPaths, table)) {
upgrateConfig + (DataSourceWriteOptions.HIVE_STYLE_PARTITIONING.key -> "false")
} else {
upgrateConfig
}
upgrateConfig = if (!isUrlEncodeEnabled(allPartitionPaths, table)) {
upgrateConfig + (DataSourceWriteOptions.URL_ENCODE_PARTITIONING.key -> "false")
} else {
upgrateConfig
}
// Use the origin keygen to generate record key to keep the rowkey consistent with the old table for spark sql.
// See SqlKeyGenerator#getRecordKey for detail.
upgrateConfig = if (originTableConfig.contains(HoodieTableConfig.KEY_GENERATOR_CLASS_NAME.key)) {
upgrateConfig + (SqlKeyGenerator.ORIGIN_KEYGEN_CLASS_NAME -> originTableConfig(HoodieTableConfig.KEY_GENERATOR_CLASS_NAME.key))
} else {
upgrateConfig
}
val options = originTableConfig ++ upgrateConfig ++ table.storage.properties
// Get options from the external table and append with the options in ddl.
val originTableConfig = HoodieOptionConfig.mappingTableConfigToSqlOption(
metaClient.getTableConfig.getProps.asScala.toMap)
val extraConfig = extraTableConfig(sparkSession, isTableExists, originTableConfig)
val options = originTableConfig ++ table.storage.properties ++ extraConfig
val userSpecifiedSchema = table.schema
if (userSpecifiedSchema.isEmpty && tableSchema.isDefined) {
@@ -137,7 +119,8 @@ case class CreateHoodieTableCommand(table: CatalogTable, ignoreIfExists: Boolean
s". The associated location('$path') already exists.")
}
// Add the meta fields to the schema if this is a managed table or an empty external table.
(addMetaFields(table.schema), table.storage.properties)
val options = table.storage.properties ++ extraTableConfig(sparkSession, false)
(addMetaFields(table.schema), options)
}
val tableType = HoodieOptionConfig.getTableType(table.storage.properties)
@@ -314,6 +297,43 @@ case class CreateHoodieTableCommand(table: CatalogTable, ignoreIfExists: Boolean
s"'${HoodieOptionConfig.SQL_VALUE_TABLE_TYPE_MOR}'")
}
}
def extraTableConfig(sparkSession: SparkSession, isTableExists: Boolean,
originTableConfig: Map[String, String] = Map.empty): Map[String, String] = {
val extraConfig = mutable.Map.empty[String, String]
if (isTableExists) {
val allPartitionPaths = getAllPartitionPaths(sparkSession, table)
if (originTableConfig.contains(HoodieTableConfig.HIVE_STYLE_PARTITIONING_ENABLE.key)) {
extraConfig(HoodieTableConfig.HIVE_STYLE_PARTITIONING_ENABLE.key) =
originTableConfig(HoodieTableConfig.HIVE_STYLE_PARTITIONING_ENABLE.key)
} else {
extraConfig(HoodieTableConfig.HIVE_STYLE_PARTITIONING_ENABLE.key) =
String.valueOf(isHiveStyledPartitioning(allPartitionPaths, table))
}
if (originTableConfig.contains(HoodieTableConfig.URL_ENCODE_PARTITIONING.key)) {
extraConfig(HoodieTableConfig.URL_ENCODE_PARTITIONING.key) =
originTableConfig(HoodieTableConfig.URL_ENCODE_PARTITIONING.key)
} else {
extraConfig(HoodieTableConfig.URL_ENCODE_PARTITIONING.key) =
String.valueOf(isUrlEncodeEnabled(allPartitionPaths, table))
}
} else {
extraConfig(HoodieTableConfig.HIVE_STYLE_PARTITIONING_ENABLE.key) = "true"
extraConfig(HoodieTableConfig.URL_ENCODE_PARTITIONING.key) = HoodieTableConfig.URL_ENCODE_PARTITIONING.defaultValue()
}
val primaryColumns = HoodieOptionConfig.getPrimaryColumns(originTableConfig ++ table.storage.properties)
if (primaryColumns.isEmpty) {
extraConfig(HoodieTableConfig.KEY_GENERATOR_CLASS_NAME.key) = classOf[UuidKeyGenerator].getCanonicalName
} else if (originTableConfig.contains(HoodieTableConfig.KEY_GENERATOR_CLASS_NAME.key)) {
extraConfig(HoodieTableConfig.KEY_GENERATOR_CLASS_NAME.key) =
HoodieSparkKeyGeneratorFactory.convertToSparkKeyGenerator(
originTableConfig(HoodieTableConfig.KEY_GENERATOR_CLASS_NAME.key))
} else {
extraConfig(HoodieTableConfig.KEY_GENERATOR_CLASS_NAME.key) = classOf[ComplexKeyGenerator].getCanonicalName
}
extraConfig.toMap
}
}
object CreateHoodieTableCommand extends Logging {
@@ -342,6 +362,9 @@ object CreateHoodieTableCommand extends Logging {
checkTableConfigEqual(originTableConfig, tableOptions, HoodieTableConfig.PRECOMBINE_FIELD.key)
checkTableConfigEqual(originTableConfig, tableOptions, HoodieTableConfig.PARTITION_FIELDS.key)
checkTableConfigEqual(originTableConfig, tableOptions, HoodieTableConfig.RECORDKEY_FIELDS.key)
checkTableConfigEqual(originTableConfig, tableOptions, HoodieTableConfig.KEY_GENERATOR_CLASS_NAME.key)
checkTableConfigEqual(originTableConfig, tableOptions, HoodieTableConfig.URL_ENCODE_PARTITIONING.key)
checkTableConfigEqual(originTableConfig, tableOptions, HoodieTableConfig.HIVE_STYLE_PARTITIONING_ENABLE.key)
// Save all the table config to the hoodie.properties.
val parameters = originTableConfig ++ tableOptions
val properties = new Properties()

View File

@@ -18,6 +18,7 @@
package org.apache.spark.sql.hudi.command
import org.apache.hudi.DataSourceWriteOptions.{OPERATION, _}
import org.apache.hudi.common.table.HoodieTableMetaClient
import org.apache.hudi.config.HoodieWriteConfig
import org.apache.hudi.config.HoodieWriteConfig.TBL_NAME
import org.apache.hudi.hive.ddl.HiveSyncMode
@@ -58,7 +59,12 @@ case class DeleteHoodieTableCommand(deleteTable: DeleteFromTable) extends Runnab
val targetTable = sparkSession.sessionState.catalog
.getTableMetadata(tableId)
val path = getTableLocation(targetTable, sparkSession)
val conf = sparkSession.sessionState.newHadoopConf()
val metaClient = HoodieTableMetaClient.builder()
.setBasePath(path)
.setConf(conf)
.build()
val tableConfig = metaClient.getTableConfig
val primaryColumns = HoodieOptionConfig.getPrimaryColumns(targetTable.storage.properties)
assert(primaryColumns.nonEmpty,
@@ -66,13 +72,14 @@ case class DeleteHoodieTableCommand(deleteTable: DeleteFromTable) extends Runnab
withSparkConf(sparkSession, targetTable.storage.properties) {
Map(
"path" -> path,
KEYGENERATOR_CLASS_NAME.key -> classOf[SqlKeyGenerator].getCanonicalName,
TBL_NAME.key -> tableId.table,
HIVE_STYLE_PARTITIONING.key -> tableConfig.getHiveStylePartitioningEnable,
URL_ENCODE_PARTITIONING.key -> tableConfig.getUrlEncodePartitoning,
KEYGENERATOR_CLASS_NAME.key -> tableConfig.getKeyGeneratorClassName,
OPERATION.key -> DataSourceWriteOptions.DELETE_OPERATION_OPT_VAL,
PARTITIONPATH_FIELD.key -> targetTable.partitionColumnNames.mkString(","),
HIVE_SYNC_MODE.key -> HiveSyncMode.HMS.name(),
HIVE_SUPPORT_TIMESTAMP_TYPE.key -> "true",
HIVE_STYLE_PARTITIONING.key -> "true",
HoodieWriteConfig.DELETE_PARALLELISM_VALUE.key -> "200",
SqlKeyGenerator.PARTITION_SCHEMA -> targetTable.partitionSchema.toDDL
)

View File

@@ -21,12 +21,14 @@ import org.apache.avro.Schema
import org.apache.avro.generic.{GenericRecord, IndexedRecord}
import org.apache.hudi.DataSourceWriteOptions._
import org.apache.hudi.common.model.{DefaultHoodieRecordPayload, HoodieRecord}
import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient}
import org.apache.hudi.common.util.{Option => HOption}
import org.apache.hudi.config.HoodieWriteConfig
import org.apache.hudi.config.HoodieWriteConfig.TBL_NAME
import org.apache.hudi.exception.HoodieDuplicateKeyException
import org.apache.hudi.hive.MultiPartKeysValueExtractor
import org.apache.hudi.hive.ddl.HiveSyncMode
import org.apache.hudi.keygen.ComplexKeyGenerator
import org.apache.hudi.sql.InsertMode
import org.apache.hudi.{DataSourceWriteOptions, HoodieSparkSqlWriter, HoodieWriterUtils}
import org.apache.spark.internal.Logging
@@ -90,7 +92,6 @@ object InsertIntoHoodieTableCommand extends Logging {
// for insert into or insert overwrite partition we use append mode.
SaveMode.Append
}
val parameters = HoodieWriterUtils.parametersWithWriteDefaults(config)
val conf = sparkSession.sessionState.conf
val alignedQuery = alignOutputFields(query, table, insertPartitions, conf)
// If we create dataframe using the Dataset.ofRows(sparkSession, alignedQuery),
@@ -100,7 +101,7 @@ object InsertIntoHoodieTableCommand extends Logging {
val inputDF = sparkSession.createDataFrame(
Dataset.ofRows(sparkSession, alignedQuery).rdd, alignedQuery.schema)
val success =
HoodieSparkSqlWriter.write(sparkSession.sqlContext, mode, parameters, inputDF)._1
HoodieSparkSqlWriter.write(sparkSession.sqlContext, mode, config, inputDF)._1
if (success) {
if (refreshTable) {
sparkSession.catalog.refreshTable(table.identifier.unquotedString)
@@ -197,20 +198,43 @@ object InsertIntoHoodieTableCommand extends Logging {
val parameters = withSparkConf(sparkSession, options)()
val tableType = parameters.getOrElse(TABLE_TYPE.key, TABLE_TYPE.defaultValue)
val primaryColumns = HoodieOptionConfig.getPrimaryColumns(options)
val partitionFields = table.partitionColumnNames.mkString(",")
val path = getTableLocation(table, sparkSession)
val conf = sparkSession.sessionState.newHadoopConf()
val isTableExists = tableExistsInPath(path, conf)
val tableConfig = if (isTableExists) {
HoodieTableMetaClient.builder()
.setBasePath(path)
.setConf(conf)
.build()
.getTableConfig
} else {
null
}
val hiveStylePartitioningEnable = if (null == tableConfig || null == tableConfig.getHiveStylePartitioningEnable) {
"true"
} else {
tableConfig.getHiveStylePartitioningEnable
}
val urlEncodePartitioning = if (null == tableConfig || null == tableConfig.getUrlEncodePartitoning) {
"false"
} else {
tableConfig.getUrlEncodePartitoning
}
val keyGeneratorClassName = if (null == tableConfig || null == tableConfig.getKeyGeneratorClassName) {
if (primaryColumns.nonEmpty) {
classOf[ComplexKeyGenerator].getCanonicalName
} else {
classOf[UuidKeyGenerator].getCanonicalName
}
} else {
tableConfig.getKeyGeneratorClassName
}
val tableSchema = table.schema
val primaryColumns = HoodieOptionConfig.getPrimaryColumns(options)
val keyGenClass = if (primaryColumns.nonEmpty) {
classOf[SqlKeyGenerator].getCanonicalName
} else {
classOf[UuidKeyGenerator].getName
}
val dropDuplicate = sparkSession.conf
.getOption(INSERT_DROP_DUPS.key)
.getOrElse(INSERT_DROP_DUPS.defaultValue)
@@ -267,7 +291,9 @@ object InsertIntoHoodieTableCommand extends Logging {
TBL_NAME.key -> table.identifier.table,
PRECOMBINE_FIELD.key -> tableSchema.fields.last.name,
OPERATION.key -> operation,
KEYGENERATOR_CLASS_NAME.key -> keyGenClass,
HIVE_STYLE_PARTITIONING.key -> hiveStylePartitioningEnable,
URL_ENCODE_PARTITIONING.key -> urlEncodePartitioning,
KEYGENERATOR_CLASS_NAME.key -> keyGeneratorClassName,
RECORDKEY_FIELD.key -> primaryColumns.mkString(","),
PARTITIONPATH_FIELD.key -> partitionFields,
PAYLOAD_CLASS_NAME.key -> payloadClassName,
@@ -279,10 +305,8 @@ object InsertIntoHoodieTableCommand extends Logging {
HIVE_DATABASE.key -> table.identifier.database.getOrElse("default"),
HIVE_TABLE.key -> table.identifier.table,
HIVE_SUPPORT_TIMESTAMP_TYPE.key -> "true",
HIVE_STYLE_PARTITIONING.key -> "true",
HIVE_PARTITION_FIELDS.key -> partitionFields,
HIVE_PARTITION_EXTRACTOR_CLASS.key -> classOf[MultiPartKeysValueExtractor].getCanonicalName,
URL_ENCODE_PARTITIONING.key -> "true",
HoodieWriteConfig.INSERT_PARALLELISM_VALUE.key -> "200",
HoodieWriteConfig.UPSERT_PARALLELISM_VALUE.key -> "200",
SqlKeyGenerator.PARTITION_SCHEMA -> table.partitionSchema.toDDL

View File

@@ -19,6 +19,7 @@ package org.apache.spark.sql.hudi.command
import org.apache.avro.Schema
import org.apache.hudi.DataSourceWriteOptions._
import org.apache.hudi.common.table.HoodieTableMetaClient
import org.apache.hudi.config.HoodieWriteConfig
import org.apache.hudi.config.HoodieWriteConfig.TBL_NAME
import org.apache.hudi.hive.MultiPartKeysValueExtractor
@@ -34,7 +35,6 @@ import org.apache.spark.sql.hudi.command.payload.ExpressionPayload
import org.apache.spark.sql.hudi.command.payload.ExpressionPayload._
import org.apache.spark.sql.hudi.{HoodieOptionConfig, SerDeUtils}
import org.apache.spark.sql.types.{BooleanType, StructType}
import java.util.Base64
/**
@@ -419,7 +419,12 @@ case class MergeIntoHoodieTableCommand(mergeInto: MergeIntoTable) extends Runnab
val targetTableDb = targetTableIdentify.database.getOrElse("default")
val targetTableName = targetTableIdentify.identifier
val path = getTableLocation(targetTable, sparkSession)
val conf = sparkSession.sessionState.newHadoopConf()
val metaClient = HoodieTableMetaClient.builder()
.setBasePath(path)
.setConf(conf)
.build()
val tableConfig = metaClient.getTableConfig
val options = targetTable.storage.properties
val definedPk = HoodieOptionConfig.getPrimaryColumns(options)
// TODO Currently the mergeEqualConditionKeys must be the same the primary key.
@@ -429,31 +434,30 @@ case class MergeIntoHoodieTableCommand(mergeInto: MergeIntoTable) extends Runnab
}
// Enable the hive sync by default if spark have enable the hive metastore.
val enableHive = isEnableHive(sparkSession)
HoodieWriterUtils.parametersWithWriteDefaults(
withSparkConf(sparkSession, options) {
Map(
"path" -> path,
RECORDKEY_FIELD.key -> targetKey2SourceExpression.keySet.mkString(","),
KEYGENERATOR_CLASS_NAME.key -> classOf[SqlKeyGenerator].getCanonicalName,
PRECOMBINE_FIELD.key -> targetKey2SourceExpression.keySet.head, // set a default preCombine field
TBL_NAME.key -> targetTableName,
PARTITIONPATH_FIELD.key -> targetTable.partitionColumnNames.mkString(","),
PAYLOAD_CLASS_NAME.key -> classOf[ExpressionPayload].getCanonicalName,
META_SYNC_ENABLED.key -> enableHive.toString,
HIVE_SYNC_MODE.key -> HiveSyncMode.HMS.name(),
HIVE_USE_JDBC.key -> "false",
HIVE_DATABASE.key -> targetTableDb,
HIVE_TABLE.key -> targetTableName,
HIVE_SUPPORT_TIMESTAMP_TYPE.key -> "true",
HIVE_STYLE_PARTITIONING.key -> "true",
HIVE_PARTITION_FIELDS.key -> targetTable.partitionColumnNames.mkString(","),
HIVE_PARTITION_EXTRACTOR_CLASS.key -> classOf[MultiPartKeysValueExtractor].getCanonicalName,
URL_ENCODE_PARTITIONING.key -> "true", // enable the url decode for sql.
HoodieWriteConfig.INSERT_PARALLELISM_VALUE.key -> "200", // set the default parallelism to 200 for sql
HoodieWriteConfig.UPSERT_PARALLELISM_VALUE.key -> "200",
HoodieWriteConfig.DELETE_PARALLELISM_VALUE.key -> "200",
SqlKeyGenerator.PARTITION_SCHEMA -> targetTable.partitionSchema.toDDL
)
})
withSparkConf(sparkSession, options) {
Map(
"path" -> path,
RECORDKEY_FIELD.key -> targetKey2SourceExpression.keySet.mkString(","),
PRECOMBINE_FIELD.key -> targetKey2SourceExpression.keySet.head, // set a default preCombine field
TBL_NAME.key -> targetTableName,
PARTITIONPATH_FIELD.key -> targetTable.partitionColumnNames.mkString(","),
PAYLOAD_CLASS_NAME.key -> classOf[ExpressionPayload].getCanonicalName,
HIVE_STYLE_PARTITIONING.key -> tableConfig.getHiveStylePartitioningEnable,
URL_ENCODE_PARTITIONING.key -> tableConfig.getUrlEncodePartitoning,
KEYGENERATOR_CLASS_NAME.key -> tableConfig.getKeyGeneratorClassName,
META_SYNC_ENABLED.key -> enableHive.toString,
HIVE_SYNC_MODE.key -> HiveSyncMode.HMS.name(),
HIVE_USE_JDBC.key -> "false",
HIVE_DATABASE.key -> targetTableDb,
HIVE_TABLE.key -> targetTableName,
HIVE_SUPPORT_TIMESTAMP_TYPE.key -> "true",
HIVE_PARTITION_FIELDS.key -> targetTable.partitionColumnNames.mkString(","),
HIVE_PARTITION_EXTRACTOR_CLASS.key -> classOf[MultiPartKeysValueExtractor].getCanonicalName,
HoodieWriteConfig.INSERT_PARALLELISM_VALUE.key -> "200", // set the default parallelism to 200 for sql
HoodieWriteConfig.UPSERT_PARALLELISM_VALUE.key -> "200",
HoodieWriteConfig.DELETE_PARALLELISM_VALUE.key -> "200",
SqlKeyGenerator.PARTITION_SCHEMA -> targetTable.partitionSchema.toDDL
)
}
}
}

View File

@@ -18,11 +18,13 @@
package org.apache.spark.sql.hudi.command
import java.util.concurrent.TimeUnit.{MICROSECONDS, MILLISECONDS}
import org.apache.avro.generic.GenericRecord
import org.apache.hudi.common.config.TypedProperties
import org.apache.hudi.common.util.PartitionPathEncodeUtils
import org.apache.hudi.config.HoodieWriteConfig
import org.apache.hudi.keygen.{BaseKeyGenerator, ComplexKeyGenerator, KeyGenUtils, SparkKeyGeneratorInterface}
import org.apache.hudi.keygen._
import org.apache.hudi.keygen.factory.HoodieSparkKeyGeneratorFactory
import org.apache.spark.sql.Row
import org.apache.spark.sql.types.{StructType, TimestampType}
import org.joda.time.format.{DateTimeFormat, DateTimeFormatter}
@@ -48,7 +50,8 @@ class SqlKeyGenerator(props: TypedProperties) extends ComplexKeyGenerator(props)
val keyGenProps = new TypedProperties()
keyGenProps.putAll(props)
keyGenProps.remove(SqlKeyGenerator.ORIGIN_KEYGEN_CLASS_NAME)
keyGenProps.put(HoodieWriteConfig.KEYGENERATOR_CLASS_NAME.key, beforeKeyGenClassName)
val convertedKeyGenClassName = SqlKeyGenerator.getRealKeyGenClassName(props)
keyGenProps.put(HoodieWriteConfig.KEYGENERATOR_CLASS_NAME.key, convertedKeyGenClassName)
Some(KeyGenUtils.createKeyGeneratorByClassName(keyGenProps))
} else {
None
@@ -64,7 +67,7 @@ class SqlKeyGenerator(props: TypedProperties) extends ComplexKeyGenerator(props)
}
override def getRecordKey(row: Row): String = {
if (originKeyGen.isDefined && originKeyGen.get.isInstanceOf[SparkKeyGeneratorInterface]) {
if (originKeyGen.isDefined) {
originKeyGen.get.asInstanceOf[SparkKeyGeneratorInterface].getRecordKey(row)
} else {
super.getRecordKey(row)
@@ -121,4 +124,13 @@ object SqlKeyGenerator {
val ORIGIN_KEYGEN_CLASS_NAME = "hoodie.sql.origin.keygen.class"
private val timestampTimeFormat = DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss")
private val sqlTimestampFormat = DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss.S")
def getRealKeyGenClassName(props: TypedProperties): String = {
val beforeKeyGenClassName = props.getString(SqlKeyGenerator.ORIGIN_KEYGEN_CLASS_NAME, null)
if (beforeKeyGenClassName != null) {
HoodieSparkKeyGeneratorFactory.convertToSparkKeyGenerator(beforeKeyGenClassName)
} else {
classOf[ComplexKeyGenerator].getCanonicalName
}
}
}

View File

@@ -19,6 +19,7 @@ package org.apache.spark.sql.hudi.command
import org.apache.hudi.DataSourceWriteOptions._
import org.apache.hudi.common.model.HoodieRecord
import org.apache.hudi.common.table.HoodieTableMetaClient
import org.apache.hudi.config.HoodieWriteConfig
import org.apache.hudi.config.HoodieWriteConfig.TBL_NAME
import org.apache.hudi.hive.MultiPartKeysValueExtractor
@@ -85,7 +86,12 @@ case class UpdateHoodieTableCommand(updateTable: UpdateTable) extends RunnableCo
val targetTable = sparkSession.sessionState.catalog
.getTableMetadata(tableId)
val path = getTableLocation(targetTable, sparkSession)
val conf = sparkSession.sessionState.newHadoopConf()
val metaClient = HoodieTableMetaClient.builder()
.setBasePath(path)
.setConf(conf)
.build()
val tableConfig = metaClient.getTableConfig
val primaryColumns = HoodieOptionConfig.getPrimaryColumns(targetTable.storage.properties)
assert(primaryColumns.nonEmpty,
@@ -95,9 +101,11 @@ case class UpdateHoodieTableCommand(updateTable: UpdateTable) extends RunnableCo
Map(
"path" -> path,
RECORDKEY_FIELD.key -> primaryColumns.mkString(","),
KEYGENERATOR_CLASS_NAME.key -> classOf[SqlKeyGenerator].getCanonicalName,
PRECOMBINE_FIELD.key -> primaryColumns.head, //set the default preCombine field.
TBL_NAME.key -> tableId.table,
HIVE_STYLE_PARTITIONING.key -> tableConfig.getHiveStylePartitioningEnable,
URL_ENCODE_PARTITIONING.key -> tableConfig.getUrlEncodePartitoning,
KEYGENERATOR_CLASS_NAME.key -> tableConfig.getKeyGeneratorClassName,
OPERATION.key -> DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL,
PARTITIONPATH_FIELD.key -> targetTable.partitionColumnNames.mkString(","),
META_SYNC_ENABLED.key -> enableHive.toString,
@@ -107,9 +115,7 @@ case class UpdateHoodieTableCommand(updateTable: UpdateTable) extends RunnableCo
HIVE_TABLE.key -> tableId.table,
HIVE_PARTITION_FIELDS.key -> targetTable.partitionColumnNames.mkString(","),
HIVE_PARTITION_EXTRACTOR_CLASS.key -> classOf[MultiPartKeysValueExtractor].getCanonicalName,
URL_ENCODE_PARTITIONING.key -> "true",
HIVE_SUPPORT_TIMESTAMP_TYPE.key -> "true",
HIVE_STYLE_PARTITIONING.key -> "true",
HoodieWriteConfig.UPSERT_PARALLELISM_VALUE.key -> "200",
SqlKeyGenerator.PARTITION_SCHEMA -> targetTable.partitionSchema.toDDL
)