[HUDI-2255] Refactor Datasource options (#3373)
Co-authored-by: Wenning Ding <wenningd@amazon.com>
This commit is contained in:
@@ -77,7 +77,7 @@ public class HoodieDatasetBulkInsertHelper {
|
||||
|
||||
TypedProperties properties = new TypedProperties();
|
||||
properties.putAll(config.getProps());
|
||||
String keyGeneratorClass = properties.getString(DataSourceWriteOptions.KEYGENERATOR_CLASS_OPT_KEY().key());
|
||||
String keyGeneratorClass = properties.getString(DataSourceWriteOptions.KEYGENERATOR_CLASS().key());
|
||||
BuiltinKeyGenerator keyGenerator = (BuiltinKeyGenerator) ReflectionUtils.loadClass(keyGeneratorClass, properties);
|
||||
StructType structTypeForUDF = rows.schema();
|
||||
|
||||
|
||||
@@ -20,7 +20,7 @@ package org.apache.hudi
|
||||
import org.apache.hadoop.fs.Path
|
||||
import org.apache.hudi.DataSourceReadOptions._
|
||||
import org.apache.hudi.common.model.{HoodieFileFormat, HoodieRecord}
|
||||
import org.apache.hudi.DataSourceWriteOptions.{BOOTSTRAP_OPERATION_OPT_VAL, OPERATION_OPT_KEY}
|
||||
import org.apache.hudi.DataSourceWriteOptions.{BOOTSTRAP_OPERATION_OPT_VAL, OPERATION}
|
||||
import org.apache.hudi.common.fs.FSUtils
|
||||
import org.apache.hudi.common.model.HoodieTableType.{COPY_ON_WRITE, MERGE_ON_READ}
|
||||
import org.apache.hudi.common.table.{HoodieTableMetaClient, TableSchemaResolver}
|
||||
@@ -75,9 +75,9 @@ class DefaultSource extends RelationProvider
|
||||
val parameters = DataSourceOptionsHelper.parametersWithReadDefaults(optParams)
|
||||
|
||||
val path = parameters.get("path")
|
||||
val readPathsStr = parameters.get(DataSourceReadOptions.READ_PATHS_OPT_KEY.key)
|
||||
val readPathsStr = parameters.get(DataSourceReadOptions.READ_PATHS.key)
|
||||
if (path.isEmpty && readPathsStr.isEmpty) {
|
||||
throw new HoodieException(s"'path' or '$READ_PATHS_OPT_KEY' or both must be specified.")
|
||||
throw new HoodieException(s"'path' or '$READ_PATHS' or both must be specified.")
|
||||
}
|
||||
|
||||
val readPaths = readPathsStr.map(p => p.split(",").toSeq).getOrElse(Seq())
|
||||
@@ -89,7 +89,7 @@ class DefaultSource extends RelationProvider
|
||||
val enableFileIndex = optParams.get(ENABLE_HOODIE_FILE_INDEX.key)
|
||||
.map(_.toBoolean).getOrElse(ENABLE_HOODIE_FILE_INDEX.defaultValue)
|
||||
val useHoodieFileIndex = enableFileIndex && path.isDefined && !path.get.contains("*") &&
|
||||
!parameters.contains(DataSourceReadOptions.READ_PATHS_OPT_KEY.key)
|
||||
!parameters.contains(DataSourceReadOptions.READ_PATHS.key)
|
||||
val globPaths = if (useHoodieFileIndex) {
|
||||
None
|
||||
} else {
|
||||
@@ -106,7 +106,7 @@ class DefaultSource extends RelationProvider
|
||||
val metaClient = HoodieTableMetaClient.builder().setConf(fs.getConf).setBasePath(tablePath).build()
|
||||
val isBootstrappedTable = metaClient.getTableConfig.getBootstrapBasePath.isPresent
|
||||
val tableType = metaClient.getTableType
|
||||
val queryType = parameters(QUERY_TYPE_OPT_KEY.key)
|
||||
val queryType = parameters(QUERY_TYPE.key)
|
||||
|
||||
log.info(s"Is bootstrapped table => $isBootstrappedTable, tableType is: $tableType, queryType is: $queryType")
|
||||
|
||||
@@ -159,7 +159,7 @@ class DefaultSource extends RelationProvider
|
||||
val translatedOptions = DataSourceWriteOptions.translateSqlOptions(parameters)
|
||||
val dfWithoutMetaCols = df.drop(HoodieRecord.HOODIE_META_COLUMNS.asScala:_*)
|
||||
|
||||
if (translatedOptions(OPERATION_OPT_KEY.key).equals(BOOTSTRAP_OPERATION_OPT_VAL)) {
|
||||
if (translatedOptions(OPERATION.key).equals(BOOTSTRAP_OPERATION_OPT_VAL)) {
|
||||
HoodieSparkSqlWriter.bootstrap(sqlContext, mode, translatedOptions, dfWithoutMetaCols)
|
||||
} else {
|
||||
HoodieSparkSqlWriter.write(sqlContext, mode, translatedOptions, dfWithoutMetaCols)
|
||||
|
||||
@@ -21,7 +21,7 @@ import java.util.Properties
|
||||
|
||||
import scala.collection.JavaConverters._
|
||||
import org.apache.hadoop.fs.{FileStatus, Path}
|
||||
import org.apache.hudi.DataSourceReadOptions.{QUERY_TYPE_OPT_KEY, QUERY_TYPE_SNAPSHOT_OPT_VAL}
|
||||
import org.apache.hudi.DataSourceReadOptions.{QUERY_TYPE, QUERY_TYPE_SNAPSHOT_OPT_VAL}
|
||||
import org.apache.hudi.client.common.HoodieSparkEngineContext
|
||||
import org.apache.hudi.common.config.HoodieMetadataConfig
|
||||
import org.apache.hudi.common.fs.FSUtils
|
||||
@@ -77,7 +77,7 @@ case class HoodieFileIndex(
|
||||
|
||||
@transient private val queryPath = new Path(options.getOrElse("path", "'path' option required"))
|
||||
|
||||
private val queryType = options(QUERY_TYPE_OPT_KEY.key())
|
||||
private val queryType = options(QUERY_TYPE.key())
|
||||
|
||||
private val tableType = metaClient.getTableType
|
||||
|
||||
@@ -370,7 +370,7 @@ case class HoodieFileIndex(
|
||||
} else { // If partitionSeqs.length == partitionSchema.fields.length
|
||||
|
||||
// Append partition name to the partition value if the
|
||||
// HIVE_STYLE_PARTITIONING_OPT_KEY is disable.
|
||||
// HIVE_STYLE_PARTITIONING is disable.
|
||||
// e.g. convert "/xx/xx/2021/02" to "/xx/xx/year=2021/month=02"
|
||||
val partitionWithName =
|
||||
partitionFragments.zip(partitionSchema).map {
|
||||
|
||||
@@ -85,17 +85,17 @@ object HoodieSparkSqlWriter {
|
||||
case Some(ser) if ser.equals("org.apache.spark.serializer.KryoSerializer") =>
|
||||
case _ => throw new HoodieException("hoodie only support org.apache.spark.serializer.KryoSerializer as spark.serializer")
|
||||
}
|
||||
val tableType = HoodieTableType.valueOf(hoodieConfig.getString(TABLE_TYPE_OPT_KEY))
|
||||
var operation = WriteOperationType.fromValue(hoodieConfig.getString(OPERATION_OPT_KEY))
|
||||
// It does not make sense to allow upsert() operation if INSERT_DROP_DUPS_OPT_KEY is true
|
||||
// Auto-correct the operation to "insert" if OPERATION_OPT_KEY is set to "upsert" wrongly
|
||||
val tableType = HoodieTableType.valueOf(hoodieConfig.getString(TABLE_TYPE))
|
||||
var operation = WriteOperationType.fromValue(hoodieConfig.getString(OPERATION))
|
||||
// It does not make sense to allow upsert() operation if INSERT_DROP_DUPS is true
|
||||
// Auto-correct the operation to "insert" if OPERATION is set to "upsert" wrongly
|
||||
// or not set (in which case it will be set as "upsert" by parametersWithWriteDefaults()) .
|
||||
if (hoodieConfig.getBoolean(INSERT_DROP_DUPS_OPT_KEY) &&
|
||||
if (hoodieConfig.getBoolean(INSERT_DROP_DUPS) &&
|
||||
operation == WriteOperationType.UPSERT) {
|
||||
|
||||
log.warn(s"$UPSERT_OPERATION_OPT_VAL is not applicable " +
|
||||
s"when $INSERT_DROP_DUPS_OPT_KEY is set to be true, " +
|
||||
s"overriding the $OPERATION_OPT_KEY to be $INSERT_OPERATION_OPT_VAL")
|
||||
s"when $INSERT_DROP_DUPS is set to be true, " +
|
||||
s"overriding the $OPERATION to be $INSERT_OPERATION_OPT_VAL")
|
||||
|
||||
operation = WriteOperationType.INSERT
|
||||
}
|
||||
@@ -119,7 +119,7 @@ object HoodieSparkSqlWriter {
|
||||
val baseFileFormat = hoodieConfig.getStringOrDefault(HoodieTableConfig.HOODIE_BASE_FILE_FORMAT_PROP)
|
||||
val archiveLogFolder = hoodieConfig.getStringOrDefault(HoodieTableConfig.HOODIE_ARCHIVELOG_FOLDER_PROP)
|
||||
val partitionColumns = HoodieWriterUtils.getPartitionColumns(keyGenerator)
|
||||
val recordKeyFields = hoodieConfig.getString(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY)
|
||||
val recordKeyFields = hoodieConfig.getString(DataSourceWriteOptions.RECORDKEY_FIELD)
|
||||
val populateMetaFields = parameters.getOrElse(HoodieTableConfig.HOODIE_POPULATE_META_FIELDS.key(), HoodieTableConfig.HOODIE_POPULATE_META_FIELDS.defaultValue()).toBoolean
|
||||
|
||||
val tableMetaClient = HoodieTableMetaClient.withPropertyBuilder()
|
||||
@@ -128,12 +128,12 @@ object HoodieSparkSqlWriter {
|
||||
.setRecordKeyFields(recordKeyFields)
|
||||
.setBaseFileFormat(baseFileFormat)
|
||||
.setArchiveLogFolder(archiveLogFolder)
|
||||
.setPayloadClassName(hoodieConfig.getString(PAYLOAD_CLASS_OPT_KEY))
|
||||
.setPreCombineField(hoodieConfig.getStringOrDefault(PRECOMBINE_FIELD_OPT_KEY, null))
|
||||
.setPayloadClassName(hoodieConfig.getString(PAYLOAD_CLASS))
|
||||
.setPreCombineField(hoodieConfig.getStringOrDefault(PRECOMBINE_FIELD, null))
|
||||
.setPartitionFields(partitionColumns)
|
||||
.setPopulateMetaFields(populateMetaFields)
|
||||
.setRecordKeyFields(hoodieConfig.getString(RECORDKEY_FIELD_OPT_KEY))
|
||||
.setKeyGeneratorClassProp(hoodieConfig.getString(KEYGENERATOR_CLASS_OPT_KEY))
|
||||
.setRecordKeyFields(hoodieConfig.getString(RECORDKEY_FIELD))
|
||||
.setKeyGeneratorClassProp(hoodieConfig.getString(KEYGENERATOR_CLASS))
|
||||
.initTable(sparkContext.hadoopConfiguration, path.get)
|
||||
tableConfig = tableMetaClient.getTableConfig
|
||||
}
|
||||
@@ -142,7 +142,7 @@ object HoodieSparkSqlWriter {
|
||||
|
||||
// short-circuit if bulk_insert via row is enabled.
|
||||
// scalastyle:off
|
||||
if (hoodieConfig.getBoolean(ENABLE_ROW_WRITER_OPT_KEY) &&
|
||||
if (hoodieConfig.getBoolean(ENABLE_ROW_WRITER) &&
|
||||
operation == WriteOperationType.BULK_INSERT) {
|
||||
val (success, commitTime: common.util.Option[String]) = bulkInsertAsRow(sqlContext, parameters, df, tblName,
|
||||
basePath, path, instantTime, parameters.getOrElse(HoodieTableConfig.HOODIE_POPULATE_META_FIELDS.key(),
|
||||
@@ -164,16 +164,16 @@ object HoodieSparkSqlWriter {
|
||||
|
||||
// Convert to RDD[HoodieRecord]
|
||||
val genericRecords: RDD[GenericRecord] = HoodieSparkUtils.createRdd(df, schema, structName, nameSpace)
|
||||
val shouldCombine = parameters(INSERT_DROP_DUPS_OPT_KEY.key()).toBoolean || operation.equals(WriteOperationType.UPSERT);
|
||||
val shouldCombine = parameters(INSERT_DROP_DUPS.key()).toBoolean || operation.equals(WriteOperationType.UPSERT);
|
||||
val hoodieAllIncomingRecords = genericRecords.map(gr => {
|
||||
val hoodieRecord = if (shouldCombine) {
|
||||
val orderingVal = HoodieAvroUtils.getNestedFieldVal(gr, hoodieConfig.getString(PRECOMBINE_FIELD_OPT_KEY), false)
|
||||
val orderingVal = HoodieAvroUtils.getNestedFieldVal(gr, hoodieConfig.getString(PRECOMBINE_FIELD), false)
|
||||
.asInstanceOf[Comparable[_]]
|
||||
DataSourceUtils.createHoodieRecord(gr,
|
||||
orderingVal, keyGenerator.getKey(gr),
|
||||
hoodieConfig.getString(PAYLOAD_CLASS_OPT_KEY))
|
||||
hoodieConfig.getString(PAYLOAD_CLASS))
|
||||
} else {
|
||||
DataSourceUtils.createHoodieRecord(gr, keyGenerator.getKey(gr), hoodieConfig.getString(PAYLOAD_CLASS_OPT_KEY))
|
||||
DataSourceUtils.createHoodieRecord(gr, keyGenerator.getKey(gr), hoodieConfig.getString(PAYLOAD_CLASS))
|
||||
}
|
||||
hoodieRecord
|
||||
}).toJavaRDD()
|
||||
@@ -192,7 +192,7 @@ object HoodieSparkSqlWriter {
|
||||
}
|
||||
|
||||
val hoodieRecords =
|
||||
if (hoodieConfig.getBoolean(INSERT_DROP_DUPS_OPT_KEY)) {
|
||||
if (hoodieConfig.getBoolean(INSERT_DROP_DUPS)) {
|
||||
DataSourceUtils.dropDuplicates(jsc, hoodieAllIncomingRecords, mapAsJavaMap(parameters))
|
||||
} else {
|
||||
hoodieAllIncomingRecords
|
||||
@@ -256,7 +256,7 @@ object HoodieSparkSqlWriter {
|
||||
val path = parameters.getOrElse("path", throw new HoodieException("'path' must be set."))
|
||||
val hoodieConfig = HoodieWriterUtils.convertMapToHoodieConfig(parameters)
|
||||
val tableName = hoodieConfig.getStringOrThrow(HoodieWriteConfig.TABLE_NAME, s"'${HoodieWriteConfig.TABLE_NAME.key}' must be set.")
|
||||
val tableType = hoodieConfig.getStringOrDefault(TABLE_TYPE_OPT_KEY)
|
||||
val tableType = hoodieConfig.getStringOrDefault(TABLE_TYPE)
|
||||
val bootstrapBasePath = hoodieConfig.getStringOrThrow(BOOTSTRAP_BASE_PATH_PROP,
|
||||
s"'${BOOTSTRAP_BASE_PATH_PROP.key}' is required for '${BOOTSTRAP_OPERATION_OPT_VAL}'" +
|
||||
" operation'")
|
||||
@@ -286,7 +286,7 @@ object HoodieSparkSqlWriter {
|
||||
if (!tableExists) {
|
||||
val archiveLogFolder = hoodieConfig.getStringOrDefault(HoodieTableConfig.HOODIE_ARCHIVELOG_FOLDER_PROP)
|
||||
val partitionColumns = HoodieWriterUtils.getPartitionColumns(parameters)
|
||||
val recordKeyFields = hoodieConfig.getString(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY)
|
||||
val recordKeyFields = hoodieConfig.getString(DataSourceWriteOptions.RECORDKEY_FIELD)
|
||||
val keyGenProp = hoodieConfig.getString(HoodieTableConfig.HOODIE_TABLE_KEY_GENERATOR_CLASS)
|
||||
val populateMetaFields = parameters.getOrElse(HoodieTableConfig.HOODIE_POPULATE_META_FIELDS.key(), HoodieTableConfig.HOODIE_POPULATE_META_FIELDS.defaultValue()).toBoolean
|
||||
|
||||
@@ -295,8 +295,8 @@ object HoodieSparkSqlWriter {
|
||||
.setTableName(tableName)
|
||||
.setRecordKeyFields(recordKeyFields)
|
||||
.setArchiveLogFolder(archiveLogFolder)
|
||||
.setPayloadClassName(hoodieConfig.getStringOrDefault(PAYLOAD_CLASS_OPT_KEY))
|
||||
.setPreCombineField(hoodieConfig.getStringOrDefault(PRECOMBINE_FIELD_OPT_KEY, null))
|
||||
.setPayloadClassName(hoodieConfig.getStringOrDefault(PAYLOAD_CLASS))
|
||||
.setPreCombineField(hoodieConfig.getStringOrDefault(PRECOMBINE_FIELD, null))
|
||||
.setBootstrapIndexClass(bootstrapIndexClass)
|
||||
.setBootstrapBasePath(bootstrapBasePath)
|
||||
.setPartitionFields(partitionColumns)
|
||||
@@ -334,7 +334,7 @@ object HoodieSparkSqlWriter {
|
||||
val schema = AvroConversionUtils.convertStructTypeToAvroSchema(df.schema, structName, nameSpace)
|
||||
sparkContext.getConf.registerAvroSchemas(schema)
|
||||
log.info(s"Registered avro schema : ${schema.toString(true)}")
|
||||
if (parameters(INSERT_DROP_DUPS_OPT_KEY.key).toBoolean) {
|
||||
if (parameters(INSERT_DROP_DUPS.key).toBoolean) {
|
||||
throw new HoodieException("Dropping duplicates with bulk_insert in row writer path is not supported yet")
|
||||
}
|
||||
val params = parameters.updated(HoodieWriteConfig.AVRO_SCHEMA.key, schema.toString)
|
||||
@@ -382,8 +382,8 @@ object HoodieSparkSqlWriter {
|
||||
+ " To use row writer please switch to spark 2 or spark 3")
|
||||
}
|
||||
val hoodieConfig = HoodieWriterUtils.convertMapToHoodieConfig(params)
|
||||
val hiveSyncEnabled = hoodieConfig.getStringOrDefault(HIVE_SYNC_ENABLED_OPT_KEY).toBoolean
|
||||
val metaSyncEnabled = hoodieConfig.getStringOrDefault(META_SYNC_ENABLED_OPT_KEY).toBoolean
|
||||
val hiveSyncEnabled = hoodieConfig.getStringOrDefault(HIVE_SYNC_ENABLED).toBoolean
|
||||
val metaSyncEnabled = hoodieConfig.getStringOrDefault(META_SYNC_ENABLED).toBoolean
|
||||
val syncHiveSuccess =
|
||||
if (hiveSyncEnabled || metaSyncEnabled) {
|
||||
metaSync(sqlContext.sparkSession, hoodieConfig, basePath, df.schema)
|
||||
@@ -439,26 +439,26 @@ object HoodieSparkSqlWriter {
|
||||
private def buildSyncConfig(basePath: Path, hoodieConfig: HoodieConfig, sqlConf: SQLConf): HiveSyncConfig = {
|
||||
val hiveSyncConfig: HiveSyncConfig = new HiveSyncConfig()
|
||||
hiveSyncConfig.basePath = basePath.toString
|
||||
hiveSyncConfig.baseFileFormat = hoodieConfig.getString(HIVE_BASE_FILE_FORMAT_OPT_KEY)
|
||||
hiveSyncConfig.baseFileFormat = hoodieConfig.getString(HIVE_BASE_FILE_FORMAT)
|
||||
hiveSyncConfig.usePreApacheInputFormat =
|
||||
hoodieConfig.getStringOrDefault(HIVE_USE_PRE_APACHE_INPUT_FORMAT_OPT_KEY).toBoolean
|
||||
hiveSyncConfig.databaseName = hoodieConfig.getString(HIVE_DATABASE_OPT_KEY)
|
||||
hiveSyncConfig.tableName = hoodieConfig.getString(HIVE_TABLE_OPT_KEY)
|
||||
hiveSyncConfig.hiveUser = hoodieConfig.getString(HIVE_USER_OPT_KEY)
|
||||
hiveSyncConfig.hivePass = hoodieConfig.getString(HIVE_PASS_OPT_KEY)
|
||||
hiveSyncConfig.jdbcUrl = hoodieConfig.getString(HIVE_URL_OPT_KEY)
|
||||
hoodieConfig.getStringOrDefault(HIVE_USE_PRE_APACHE_INPUT_FORMAT).toBoolean
|
||||
hiveSyncConfig.databaseName = hoodieConfig.getString(HIVE_DATABASE)
|
||||
hiveSyncConfig.tableName = hoodieConfig.getString(HIVE_TABLE)
|
||||
hiveSyncConfig.hiveUser = hoodieConfig.getString(HIVE_USER)
|
||||
hiveSyncConfig.hivePass = hoodieConfig.getString(HIVE_PASS)
|
||||
hiveSyncConfig.jdbcUrl = hoodieConfig.getString(HIVE_URL)
|
||||
hiveSyncConfig.skipROSuffix = hoodieConfig.getStringOrDefault(HIVE_SKIP_RO_SUFFIX,
|
||||
DataSourceWriteOptions.HIVE_SKIP_RO_SUFFIX.defaultValue).toBoolean
|
||||
hiveSyncConfig.partitionFields =
|
||||
ListBuffer(hoodieConfig.getString(HIVE_PARTITION_FIELDS_OPT_KEY).split(",").map(_.trim).filter(!_.isEmpty).toList: _*)
|
||||
hiveSyncConfig.partitionValueExtractorClass = hoodieConfig.getString(HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY)
|
||||
hiveSyncConfig.useJdbc = hoodieConfig.getBoolean(HIVE_USE_JDBC_OPT_KEY)
|
||||
ListBuffer(hoodieConfig.getString(HIVE_PARTITION_FIELDS).split(",").map(_.trim).filter(!_.isEmpty).toList: _*)
|
||||
hiveSyncConfig.partitionValueExtractorClass = hoodieConfig.getString(HIVE_PARTITION_EXTRACTOR_CLASS)
|
||||
hiveSyncConfig.useJdbc = hoodieConfig.getBoolean(HIVE_USE_JDBC)
|
||||
hiveSyncConfig.useFileListingFromMetadata = hoodieConfig.getBoolean(HoodieMetadataConfig.METADATA_ENABLE_PROP)
|
||||
hiveSyncConfig.verifyMetadataFileListing = hoodieConfig.getBoolean(HoodieMetadataConfig.METADATA_VALIDATE_PROP)
|
||||
hiveSyncConfig.ignoreExceptions = hoodieConfig.getStringOrDefault(HIVE_IGNORE_EXCEPTIONS_OPT_KEY).toBoolean
|
||||
hiveSyncConfig.ignoreExceptions = hoodieConfig.getStringOrDefault(HIVE_IGNORE_EXCEPTIONS).toBoolean
|
||||
hiveSyncConfig.supportTimestamp = hoodieConfig.getStringOrDefault(HIVE_SUPPORT_TIMESTAMP).toBoolean
|
||||
hiveSyncConfig.autoCreateDatabase = hoodieConfig.getStringOrDefault(HIVE_AUTO_CREATE_DATABASE_OPT_KEY).toBoolean
|
||||
hiveSyncConfig.decodePartition = hoodieConfig.getStringOrDefault(URL_ENCODE_PARTITIONING_OPT_KEY).toBoolean
|
||||
hiveSyncConfig.autoCreateDatabase = hoodieConfig.getStringOrDefault(HIVE_AUTO_CREATE_DATABASE).toBoolean
|
||||
hiveSyncConfig.decodePartition = hoodieConfig.getStringOrDefault(URL_ENCODE_PARTITIONING).toBoolean
|
||||
hiveSyncConfig.batchSyncNum = hoodieConfig.getStringOrDefault(HIVE_BATCH_SYNC_PARTITION_NUM).toInt
|
||||
|
||||
hiveSyncConfig.syncAsSparkDataSourceTable = hoodieConfig.getStringOrDefault(HIVE_SYNC_AS_DATA_SOURCE_TABLE).toBoolean
|
||||
@@ -472,8 +472,8 @@ object HoodieSparkSqlWriter {
|
||||
|
||||
private def metaSync(spark: SparkSession, hoodieConfig: HoodieConfig, basePath: Path,
|
||||
schema: StructType): Boolean = {
|
||||
val hiveSyncEnabled = hoodieConfig.getStringOrDefault(HIVE_SYNC_ENABLED_OPT_KEY).toBoolean
|
||||
var metaSyncEnabled = hoodieConfig.getStringOrDefault(META_SYNC_ENABLED_OPT_KEY).toBoolean
|
||||
val hiveSyncEnabled = hoodieConfig.getStringOrDefault(HIVE_SYNC_ENABLED).toBoolean
|
||||
var metaSyncEnabled = hoodieConfig.getStringOrDefault(META_SYNC_ENABLED).toBoolean
|
||||
var syncClientToolClassSet = scala.collection.mutable.Set[String]()
|
||||
hoodieConfig.getString(META_SYNC_CLIENT_TOOL_CLASS).split(",").foreach(syncClass => syncClientToolClassSet += syncClass)
|
||||
|
||||
@@ -488,7 +488,7 @@ object HoodieSparkSqlWriter {
|
||||
syncClientToolClassSet.foreach(impl => {
|
||||
val syncSuccess = impl.trim match {
|
||||
case "org.apache.hudi.hive.HiveSyncTool" => {
|
||||
log.info("Syncing to Hive Metastore (URL: " + hoodieConfig.getString(HIVE_URL_OPT_KEY) + ")")
|
||||
log.info("Syncing to Hive Metastore (URL: " + hoodieConfig.getString(HIVE_URL) + ")")
|
||||
syncHive(basePath, fs, hoodieConfig, spark.sessionState.conf)
|
||||
true
|
||||
}
|
||||
@@ -524,7 +524,7 @@ object HoodieSparkSqlWriter {
|
||||
if(writeResult.getWriteStatuses.rdd.filter(ws => ws.hasErrors).isEmpty()) {
|
||||
log.info("Proceeding to commit the write.")
|
||||
val metaMap = parameters.filter(kv =>
|
||||
kv._1.startsWith(parameters(COMMIT_METADATA_KEYPREFIX_OPT_KEY.key)))
|
||||
kv._1.startsWith(parameters(COMMIT_METADATA_KEYPREFIX.key)))
|
||||
val commitSuccess =
|
||||
client.commit(tableInstantInfo.instantTime, writeResult.getWriteStatuses,
|
||||
common.util.Option.of(new util.HashMap[String, String](mapAsJavaMap(metaMap))),
|
||||
@@ -589,7 +589,7 @@ object HoodieSparkSqlWriter {
|
||||
parameters: Map[String, String], configuration: Configuration) : Boolean = {
|
||||
log.info(s"Config.inlineCompactionEnabled ? ${client.getConfig.inlineCompactionEnabled}")
|
||||
if (asyncCompactionTriggerFnDefined && !client.getConfig.inlineCompactionEnabled
|
||||
&& parameters.get(ASYNC_COMPACT_ENABLE_OPT_KEY.key).exists(r => r.toBoolean)) {
|
||||
&& parameters.get(ASYNC_COMPACT_ENABLE.key).exists(r => r.toBoolean)) {
|
||||
tableConfig.getTableType == HoodieTableType.MERGE_ON_READ
|
||||
} else {
|
||||
false
|
||||
@@ -600,7 +600,7 @@ object HoodieSparkSqlWriter {
|
||||
parameters: Map[String, String]) : Boolean = {
|
||||
log.info(s"Config.asyncClusteringEnabled ? ${client.getConfig.isAsyncClusteringEnabled}")
|
||||
asyncClusteringTriggerFnDefined && client.getConfig.isAsyncClusteringEnabled &&
|
||||
parameters.get(ASYNC_CLUSTERING_ENABLE_OPT_KEY.key).exists(r => r.toBoolean)
|
||||
parameters.get(ASYNC_CLUSTERING_ENABLE.key).exists(r => r.toBoolean)
|
||||
}
|
||||
|
||||
private def getHoodieTableConfig(sparkContext: SparkContext,
|
||||
|
||||
@@ -48,9 +48,9 @@ class HoodieStreamingSink(sqlContext: SQLContext,
|
||||
|
||||
private val log = LogManager.getLogger(classOf[HoodieStreamingSink])
|
||||
|
||||
private val retryCnt = options(DataSourceWriteOptions.STREAMING_RETRY_CNT_OPT_KEY.key).toInt
|
||||
private val retryIntervalMs = options(DataSourceWriteOptions.STREAMING_RETRY_INTERVAL_MS_OPT_KEY.key).toLong
|
||||
private val ignoreFailedBatch = options(DataSourceWriteOptions.STREAMING_IGNORE_FAILED_BATCH_OPT_KEY.key).toBoolean
|
||||
private val retryCnt = options(DataSourceWriteOptions.STREAMING_RETRY_CNT.key).toInt
|
||||
private val retryIntervalMs = options(DataSourceWriteOptions.STREAMING_RETRY_INTERVAL_MS.key).toLong
|
||||
private val ignoreFailedBatch = options(DataSourceWriteOptions.STREAMING_IGNORE_FAILED_BATCH.key).toBoolean
|
||||
|
||||
private var isAsyncCompactorServiceShutdownAbnormally = false
|
||||
private var isAsyncClusteringServiceShutdownAbnormally = false
|
||||
@@ -113,7 +113,7 @@ class HoodieStreamingSink(sqlContext: SQLContext,
|
||||
log.error(s"Micro batch id=$batchId threw following exception: ", e)
|
||||
if (ignoreFailedBatch) {
|
||||
log.info(s"Ignore the exception and move on streaming as per " +
|
||||
s"${DataSourceWriteOptions.STREAMING_IGNORE_FAILED_BATCH_OPT_KEY.key} configuration")
|
||||
s"${DataSourceWriteOptions.STREAMING_IGNORE_FAILED_BATCH.key} configuration")
|
||||
Success((true, None, None))
|
||||
} else {
|
||||
if (retryCnt > 1) log.info(s"Retrying the failed micro batch id=$batchId ...")
|
||||
@@ -127,7 +127,7 @@ class HoodieStreamingSink(sqlContext: SQLContext,
|
||||
}))
|
||||
if (ignoreFailedBatch) {
|
||||
log.info(s"Ignore the errors and move on streaming as per " +
|
||||
s"${DataSourceWriteOptions.STREAMING_IGNORE_FAILED_BATCH_OPT_KEY.key} configuration")
|
||||
s"${DataSourceWriteOptions.STREAMING_IGNORE_FAILED_BATCH.key} configuration")
|
||||
Success((true, None, None))
|
||||
} else {
|
||||
if (retryCnt > 1) log.info(s"Retrying the failed micro batch id=$batchId ...")
|
||||
|
||||
@@ -46,39 +46,39 @@ object HoodieWriterUtils {
|
||||
* @return
|
||||
*/
|
||||
def parametersWithWriteDefaults(parameters: Map[String, String]): Map[String, String] = {
|
||||
Map(OPERATION_OPT_KEY.key -> OPERATION_OPT_KEY.defaultValue,
|
||||
TABLE_TYPE_OPT_KEY.key -> TABLE_TYPE_OPT_KEY.defaultValue,
|
||||
PRECOMBINE_FIELD_OPT_KEY.key -> PRECOMBINE_FIELD_OPT_KEY.defaultValue,
|
||||
PAYLOAD_CLASS_OPT_KEY.key -> PAYLOAD_CLASS_OPT_KEY.defaultValue,
|
||||
RECORDKEY_FIELD_OPT_KEY.key -> RECORDKEY_FIELD_OPT_KEY.defaultValue,
|
||||
PARTITIONPATH_FIELD_OPT_KEY.key -> PARTITIONPATH_FIELD_OPT_KEY.defaultValue,
|
||||
KEYGENERATOR_CLASS_OPT_KEY.key -> DEFAULT_KEYGENERATOR_CLASS_OPT_VAL,
|
||||
Map(OPERATION.key -> OPERATION.defaultValue,
|
||||
TABLE_TYPE.key -> TABLE_TYPE.defaultValue,
|
||||
PRECOMBINE_FIELD.key -> PRECOMBINE_FIELD.defaultValue,
|
||||
PAYLOAD_CLASS.key -> PAYLOAD_CLASS.defaultValue,
|
||||
RECORDKEY_FIELD.key -> RECORDKEY_FIELD.defaultValue,
|
||||
PARTITIONPATH_FIELD.key -> PARTITIONPATH_FIELD.defaultValue,
|
||||
KEYGENERATOR_CLASS.key -> DEFAULT_KEYGENERATOR_CLASS_OPT_VAL,
|
||||
METADATA_ENABLE_PROP.key -> METADATA_ENABLE_PROP.defaultValue.toString,
|
||||
METADATA_VALIDATE_PROP.key -> METADATA_VALIDATE_PROP.defaultValue.toString,
|
||||
COMMIT_METADATA_KEYPREFIX_OPT_KEY.key -> COMMIT_METADATA_KEYPREFIX_OPT_KEY.defaultValue,
|
||||
INSERT_DROP_DUPS_OPT_KEY.key -> INSERT_DROP_DUPS_OPT_KEY.defaultValue,
|
||||
STREAMING_RETRY_CNT_OPT_KEY.key -> STREAMING_RETRY_CNT_OPT_KEY.defaultValue,
|
||||
STREAMING_RETRY_INTERVAL_MS_OPT_KEY.key -> STREAMING_RETRY_INTERVAL_MS_OPT_KEY.defaultValue,
|
||||
STREAMING_IGNORE_FAILED_BATCH_OPT_KEY.key -> STREAMING_IGNORE_FAILED_BATCH_OPT_KEY.defaultValue,
|
||||
COMMIT_METADATA_KEYPREFIX.key -> COMMIT_METADATA_KEYPREFIX.defaultValue,
|
||||
INSERT_DROP_DUPS.key -> INSERT_DROP_DUPS.defaultValue,
|
||||
STREAMING_RETRY_CNT.key -> STREAMING_RETRY_CNT.defaultValue,
|
||||
STREAMING_RETRY_INTERVAL_MS.key -> STREAMING_RETRY_INTERVAL_MS.defaultValue,
|
||||
STREAMING_IGNORE_FAILED_BATCH.key -> STREAMING_IGNORE_FAILED_BATCH.defaultValue,
|
||||
META_SYNC_CLIENT_TOOL_CLASS.key -> META_SYNC_CLIENT_TOOL_CLASS.defaultValue,
|
||||
HIVE_SYNC_ENABLED_OPT_KEY.key -> HIVE_SYNC_ENABLED_OPT_KEY.defaultValue,
|
||||
META_SYNC_ENABLED_OPT_KEY.key -> META_SYNC_ENABLED_OPT_KEY.defaultValue,
|
||||
HIVE_DATABASE_OPT_KEY.key -> HIVE_DATABASE_OPT_KEY.defaultValue,
|
||||
HIVE_TABLE_OPT_KEY.key -> HIVE_TABLE_OPT_KEY.defaultValue,
|
||||
HIVE_BASE_FILE_FORMAT_OPT_KEY.key -> HIVE_BASE_FILE_FORMAT_OPT_KEY.defaultValue,
|
||||
HIVE_USER_OPT_KEY.key -> HIVE_USER_OPT_KEY.defaultValue,
|
||||
HIVE_PASS_OPT_KEY.key -> HIVE_PASS_OPT_KEY.defaultValue,
|
||||
HIVE_URL_OPT_KEY.key -> HIVE_URL_OPT_KEY.defaultValue,
|
||||
HIVE_PARTITION_FIELDS_OPT_KEY.key -> HIVE_PARTITION_FIELDS_OPT_KEY.defaultValue,
|
||||
HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY.key -> HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY.defaultValue,
|
||||
HIVE_STYLE_PARTITIONING_OPT_KEY.key -> HIVE_STYLE_PARTITIONING_OPT_KEY.defaultValue,
|
||||
HIVE_USE_JDBC_OPT_KEY.key -> HIVE_USE_JDBC_OPT_KEY.defaultValue,
|
||||
HIVE_SYNC_ENABLED.key -> HIVE_SYNC_ENABLED.defaultValue,
|
||||
META_SYNC_ENABLED.key -> META_SYNC_ENABLED.defaultValue,
|
||||
HIVE_DATABASE.key -> HIVE_DATABASE.defaultValue,
|
||||
HIVE_TABLE.key -> HIVE_TABLE.defaultValue,
|
||||
HIVE_BASE_FILE_FORMAT.key -> HIVE_BASE_FILE_FORMAT.defaultValue,
|
||||
HIVE_USER.key -> HIVE_USER.defaultValue,
|
||||
HIVE_PASS.key -> HIVE_PASS.defaultValue,
|
||||
HIVE_URL.key -> HIVE_URL.defaultValue,
|
||||
HIVE_PARTITION_FIELDS.key -> HIVE_PARTITION_FIELDS.defaultValue,
|
||||
HIVE_PARTITION_EXTRACTOR_CLASS.key -> HIVE_PARTITION_EXTRACTOR_CLASS.defaultValue,
|
||||
HIVE_STYLE_PARTITIONING.key -> HIVE_STYLE_PARTITIONING.defaultValue,
|
||||
HIVE_USE_JDBC.key -> HIVE_USE_JDBC.defaultValue,
|
||||
HIVE_CREATE_MANAGED_TABLE.key() -> HIVE_CREATE_MANAGED_TABLE.defaultValue.toString,
|
||||
HIVE_SYNC_AS_DATA_SOURCE_TABLE.key() -> HIVE_SYNC_AS_DATA_SOURCE_TABLE.defaultValue(),
|
||||
ASYNC_COMPACT_ENABLE_OPT_KEY.key -> ASYNC_COMPACT_ENABLE_OPT_KEY.defaultValue,
|
||||
INLINE_CLUSTERING_ENABLE_OPT_KEY.key -> INLINE_CLUSTERING_ENABLE_OPT_KEY.defaultValue,
|
||||
ASYNC_CLUSTERING_ENABLE_OPT_KEY.key -> ASYNC_CLUSTERING_ENABLE_OPT_KEY.defaultValue,
|
||||
ENABLE_ROW_WRITER_OPT_KEY.key -> ENABLE_ROW_WRITER_OPT_KEY.defaultValue
|
||||
ASYNC_COMPACT_ENABLE.key -> ASYNC_COMPACT_ENABLE.defaultValue,
|
||||
INLINE_CLUSTERING_ENABLE.key -> INLINE_CLUSTERING_ENABLE.defaultValue,
|
||||
ASYNC_CLUSTERING_ENABLE.key -> ASYNC_CLUSTERING_ENABLE.defaultValue,
|
||||
ENABLE_ROW_WRITER.key -> ENABLE_ROW_WRITER.defaultValue
|
||||
) ++ DataSourceOptionsHelper.translateConfigurations(parameters)
|
||||
}
|
||||
|
||||
|
||||
@@ -61,22 +61,22 @@ class IncrementalRelation(val sqlContext: SQLContext,
|
||||
if (commitTimeline.empty()) {
|
||||
throw new HoodieException("No instants to incrementally pull")
|
||||
}
|
||||
if (!optParams.contains(DataSourceReadOptions.BEGIN_INSTANTTIME_OPT_KEY.key)) {
|
||||
if (!optParams.contains(DataSourceReadOptions.BEGIN_INSTANTTIME.key)) {
|
||||
throw new HoodieException(s"Specify the begin instant time to pull from using " +
|
||||
s"option ${DataSourceReadOptions.BEGIN_INSTANTTIME_OPT_KEY.key}")
|
||||
s"option ${DataSourceReadOptions.BEGIN_INSTANTTIME.key}")
|
||||
}
|
||||
if (!metaClient.getTableConfig.populateMetaFields()) {
|
||||
throw new HoodieException("Incremental queries are not supported when meta fields are disabled")
|
||||
}
|
||||
|
||||
val useEndInstantSchema = optParams.getOrElse(DataSourceReadOptions.INCREMENTAL_READ_SCHEMA_USE_END_INSTANTTIME_OPT_KEY.key,
|
||||
DataSourceReadOptions.INCREMENTAL_READ_SCHEMA_USE_END_INSTANTTIME_OPT_KEY.defaultValue).toBoolean
|
||||
val useEndInstantSchema = optParams.getOrElse(DataSourceReadOptions.INCREMENTAL_READ_SCHEMA_USE_END_INSTANTTIME.key,
|
||||
DataSourceReadOptions.INCREMENTAL_READ_SCHEMA_USE_END_INSTANTTIME.defaultValue).toBoolean
|
||||
|
||||
private val lastInstant = commitTimeline.lastInstant().get()
|
||||
|
||||
private val commitsTimelineToReturn = commitTimeline.findInstantsInRange(
|
||||
optParams(DataSourceReadOptions.BEGIN_INSTANTTIME_OPT_KEY.key),
|
||||
optParams.getOrElse(DataSourceReadOptions.END_INSTANTTIME_OPT_KEY.key(), lastInstant.getTimestamp))
|
||||
optParams(DataSourceReadOptions.BEGIN_INSTANTTIME.key),
|
||||
optParams.getOrElse(DataSourceReadOptions.END_INSTANTTIME.key(), lastInstant.getTimestamp))
|
||||
private val commitsToReturn = commitsTimelineToReturn.getInstants.iterator().toList
|
||||
|
||||
// use schema from a file produced in the end/latest instant
|
||||
@@ -93,8 +93,8 @@ class IncrementalRelation(val sqlContext: SQLContext,
|
||||
StructType(skeletonSchema.fields ++ dataSchema.fields)
|
||||
}
|
||||
|
||||
private val filters = optParams.getOrElse(DataSourceReadOptions.PUSH_DOWN_INCR_FILTERS_OPT_KEY.key,
|
||||
DataSourceReadOptions.PUSH_DOWN_INCR_FILTERS_OPT_KEY.defaultValue).split(",").filter(!_.isEmpty)
|
||||
private val filters = optParams.getOrElse(DataSourceReadOptions.PUSH_DOWN_INCR_FILTERS.key,
|
||||
DataSourceReadOptions.PUSH_DOWN_INCR_FILTERS.defaultValue).split(",").filter(!_.isEmpty)
|
||||
|
||||
override def schema: StructType = usedSchema
|
||||
|
||||
@@ -137,10 +137,10 @@ class IncrementalRelation(val sqlContext: SQLContext,
|
||||
}
|
||||
|
||||
val pathGlobPattern = optParams.getOrElse(
|
||||
DataSourceReadOptions.INCR_PATH_GLOB_OPT_KEY.key,
|
||||
DataSourceReadOptions.INCR_PATH_GLOB_OPT_KEY.defaultValue)
|
||||
DataSourceReadOptions.INCR_PATH_GLOB.key,
|
||||
DataSourceReadOptions.INCR_PATH_GLOB.defaultValue)
|
||||
val (filteredRegularFullPaths, filteredMetaBootstrapFullPaths) = {
|
||||
if(!pathGlobPattern.equals(DataSourceReadOptions.INCR_PATH_GLOB_OPT_KEY.defaultValue)) {
|
||||
if(!pathGlobPattern.equals(DataSourceReadOptions.INCR_PATH_GLOB.defaultValue)) {
|
||||
val globMatcher = new GlobPattern("*" + pathGlobPattern)
|
||||
(regularFileIdToFullPath.filter(p => globMatcher.matches(p._2)).values,
|
||||
metaBootstrapFileIdToFullPath.filter(p => globMatcher.matches(p._2)).values)
|
||||
@@ -163,7 +163,7 @@ class IncrementalRelation(val sqlContext: SQLContext,
|
||||
df = sqlContext.sparkSession.read
|
||||
.format("hudi")
|
||||
.schema(usedSchema)
|
||||
.option(DataSourceReadOptions.READ_PATHS_OPT_KEY.key, filteredMetaBootstrapFullPaths.mkString(","))
|
||||
.option(DataSourceReadOptions.READ_PATHS.key, filteredMetaBootstrapFullPaths.mkString(","))
|
||||
.load()
|
||||
}
|
||||
|
||||
|
||||
@@ -55,9 +55,9 @@ class MergeOnReadIncrementalRelation(val sqlContext: SQLContext,
|
||||
if (commitTimeline.empty()) {
|
||||
throw new HoodieException("No instants to incrementally pull")
|
||||
}
|
||||
if (!optParams.contains(DataSourceReadOptions.BEGIN_INSTANTTIME_OPT_KEY.key)) {
|
||||
if (!optParams.contains(DataSourceReadOptions.BEGIN_INSTANTTIME.key)) {
|
||||
throw new HoodieException(s"Specify the begin instant time to pull from using " +
|
||||
s"option ${DataSourceReadOptions.BEGIN_INSTANTTIME_OPT_KEY.key}")
|
||||
s"option ${DataSourceReadOptions.BEGIN_INSTANTTIME.key}")
|
||||
}
|
||||
if (!metaClient.getTableConfig.populateMetaFields()) {
|
||||
throw new HoodieException("Incremental queries are not supported when meta fields are disabled")
|
||||
@@ -65,12 +65,12 @@ class MergeOnReadIncrementalRelation(val sqlContext: SQLContext,
|
||||
|
||||
private val lastInstant = commitTimeline.lastInstant().get()
|
||||
private val mergeType = optParams.getOrElse(
|
||||
DataSourceReadOptions.REALTIME_MERGE_OPT_KEY.key,
|
||||
DataSourceReadOptions.REALTIME_MERGE_OPT_KEY.defaultValue)
|
||||
DataSourceReadOptions.REALTIME_MERGE.key,
|
||||
DataSourceReadOptions.REALTIME_MERGE.defaultValue)
|
||||
|
||||
private val commitsTimelineToReturn = commitTimeline.findInstantsInRange(
|
||||
optParams(DataSourceReadOptions.BEGIN_INSTANTTIME_OPT_KEY.key),
|
||||
optParams.getOrElse(DataSourceReadOptions.END_INSTANTTIME_OPT_KEY.key, lastInstant.getTimestamp))
|
||||
optParams(DataSourceReadOptions.BEGIN_INSTANTTIME.key),
|
||||
optParams.getOrElse(DataSourceReadOptions.END_INSTANTTIME.key, lastInstant.getTimestamp))
|
||||
log.debug(s"${commitsTimelineToReturn.getInstants.iterator().toList.map(f => f.toString).mkString(",")}")
|
||||
private val commitsToReturn = commitsTimelineToReturn.getInstants.iterator().toList
|
||||
private val schemaUtil = new TableSchemaResolver(metaClient)
|
||||
@@ -182,10 +182,10 @@ class MergeOnReadIncrementalRelation(val sqlContext: SQLContext,
|
||||
|
||||
// Filter files based on user defined glob pattern
|
||||
val pathGlobPattern = optParams.getOrElse(
|
||||
DataSourceReadOptions.INCR_PATH_GLOB_OPT_KEY.key,
|
||||
DataSourceReadOptions.INCR_PATH_GLOB_OPT_KEY.defaultValue)
|
||||
DataSourceReadOptions.INCR_PATH_GLOB.key,
|
||||
DataSourceReadOptions.INCR_PATH_GLOB.defaultValue)
|
||||
val filteredFileGroup = if(!pathGlobPattern
|
||||
.equals(DataSourceReadOptions.INCR_PATH_GLOB_OPT_KEY.defaultValue)) {
|
||||
.equals(DataSourceReadOptions.INCR_PATH_GLOB.defaultValue)) {
|
||||
val globMatcher = new GlobPattern("*" + pathGlobPattern)
|
||||
fileGroup.filter(f => {
|
||||
if (f.getLatestFileSlice.get().getBaseFile.isPresent) {
|
||||
|
||||
@@ -76,8 +76,8 @@ class MergeOnReadSnapshotRelation(val sqlContext: SQLContext,
|
||||
|
||||
private lazy val tableStructSchema = AvroConversionUtils.convertAvroSchemaToStructType(tableAvroSchema)
|
||||
private val mergeType = optParams.getOrElse(
|
||||
DataSourceReadOptions.REALTIME_MERGE_OPT_KEY.key,
|
||||
DataSourceReadOptions.REALTIME_MERGE_OPT_KEY.defaultValue)
|
||||
DataSourceReadOptions.REALTIME_MERGE.key,
|
||||
DataSourceReadOptions.REALTIME_MERGE.defaultValue)
|
||||
private val maxCompactionMemoryInBytes = getMaxCompactionMemoryInBytes(jobConf)
|
||||
private val preCombineField = {
|
||||
val preCombineFieldFromTableConfig = metaClient.getTableConfig.getPreCombineField
|
||||
@@ -234,8 +234,8 @@ object MergeOnReadSnapshotRelation {
|
||||
// .So we should encode the file path here. Otherwise, there is a FileNotException throw
|
||||
// out.
|
||||
// For example, If the "pt" is the partition path field, and "pt" = "2021/02/02", If
|
||||
// we enable the URL_ENCODE_PARTITIONING_OPT_KEY and write data to hudi table.The data
|
||||
// path in the table will just like "/basePath/2021%2F02%2F02/xxxx.parquet". When we read
|
||||
// we enable the URL_ENCODE_PARTITIONING and write data to hudi table.The data path
|
||||
// in the table will just like "/basePath/2021%2F02%2F02/xxxx.parquet". When we read
|
||||
// data from the table, if there are no encode for the file path,
|
||||
// ParquetFileFormat#buildReaderWithPartitionValues will decode it to
|
||||
// "/basePath/2021/02/02/xxxx.parquet" witch will result to a FileNotException.
|
||||
|
||||
@@ -41,26 +41,26 @@ object HoodieOptionConfig {
|
||||
|
||||
val SQL_KEY_TABLE_PRIMARY_KEY: HoodieOption[String] = buildConf()
|
||||
.withSqlKey("primaryKey")
|
||||
.withHoodieKey(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY.key)
|
||||
.withHoodieKey(DataSourceWriteOptions.RECORDKEY_FIELD.key)
|
||||
.withTableConfigKey(HoodieTableConfig.HOODIE_TABLE_RECORDKEY_FIELDS.key)
|
||||
.build()
|
||||
|
||||
val SQL_KEY_TABLE_TYPE: HoodieOption[String] = buildConf()
|
||||
.withSqlKey("type")
|
||||
.withHoodieKey(DataSourceWriteOptions.TABLE_TYPE_OPT_KEY.key)
|
||||
.withHoodieKey(DataSourceWriteOptions.TABLE_TYPE.key)
|
||||
.withTableConfigKey(HoodieTableConfig.HOODIE_TABLE_TYPE_PROP.key)
|
||||
.defaultValue(SQL_VALUE_TABLE_TYPE_COW)
|
||||
.build()
|
||||
|
||||
val SQL_KEY_PRECOMBINE_FIELD: HoodieOption[String] = buildConf()
|
||||
.withSqlKey("preCombineField")
|
||||
.withHoodieKey(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY.key)
|
||||
.withHoodieKey(DataSourceWriteOptions.PRECOMBINE_FIELD.key)
|
||||
.withTableConfigKey(HoodieTableConfig.HOODIE_TABLE_PRECOMBINE_FIELD_PROP.key)
|
||||
.build()
|
||||
|
||||
val SQL_PAYLOAD_CLASS: HoodieOption[String] = buildConf()
|
||||
.withSqlKey("payloadClass")
|
||||
.withHoodieKey(DataSourceWriteOptions.PAYLOAD_CLASS_OPT_KEY.key)
|
||||
.withHoodieKey(DataSourceWriteOptions.PAYLOAD_CLASS.key)
|
||||
.withTableConfigKey(HoodieTableConfig.HOODIE_PAYLOAD_CLASS_PROP.key)
|
||||
.defaultValue(classOf[DefaultHoodieRecordPayload].getName)
|
||||
.build()
|
||||
@@ -151,7 +151,7 @@ object HoodieOptionConfig {
|
||||
*/
|
||||
def getPrimaryColumns(options: Map[String, String]): Array[String] = {
|
||||
val params = mappingSqlOptionToHoodieParam(options)
|
||||
params.get(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY.key)
|
||||
params.get(DataSourceWriteOptions.RECORDKEY_FIELD.key)
|
||||
.map(_.split(",").filter(_.length > 0))
|
||||
.getOrElse(Array.empty)
|
||||
}
|
||||
@@ -163,13 +163,13 @@ object HoodieOptionConfig {
|
||||
*/
|
||||
def getTableType(options: Map[String, String]): String = {
|
||||
val params = mappingSqlOptionToHoodieParam(options)
|
||||
params.getOrElse(DataSourceWriteOptions.TABLE_TYPE_OPT_KEY.key,
|
||||
DataSourceWriteOptions.TABLE_TYPE_OPT_KEY.defaultValue)
|
||||
params.getOrElse(DataSourceWriteOptions.TABLE_TYPE.key,
|
||||
DataSourceWriteOptions.TABLE_TYPE.defaultValue)
|
||||
}
|
||||
|
||||
def getPreCombineField(options: Map[String, String]): Option[String] = {
|
||||
val params = mappingSqlOptionToHoodieParam(options)
|
||||
params.get(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY.key)
|
||||
params.get(DataSourceWriteOptions.PRECOMBINE_FIELD.key)
|
||||
}
|
||||
|
||||
def buildConf[T](): HoodieOptions[T] = {
|
||||
|
||||
@@ -18,7 +18,7 @@
|
||||
package org.apache.spark.sql.hudi.command
|
||||
|
||||
import org.apache.hudi.{DataSourceWriteOptions, SparkAdapterSupport}
|
||||
import org.apache.hudi.DataSourceWriteOptions.{HIVE_STYLE_PARTITIONING_OPT_KEY, HIVE_SUPPORT_TIMESTAMP, KEYGENERATOR_CLASS_OPT_KEY, OPERATION_OPT_KEY, PARTITIONPATH_FIELD_OPT_KEY, RECORDKEY_FIELD_OPT_KEY}
|
||||
import org.apache.hudi.DataSourceWriteOptions.{HIVE_STYLE_PARTITIONING, HIVE_SUPPORT_TIMESTAMP, KEYGENERATOR_CLASS, OPERATION, PARTITIONPATH_FIELD, RECORDKEY_FIELD}
|
||||
import org.apache.hudi.config.HoodieWriteConfig
|
||||
import org.apache.hudi.config.HoodieWriteConfig.TABLE_NAME
|
||||
import org.apache.spark.sql._
|
||||
@@ -69,12 +69,12 @@ case class DeleteHoodieTableCommand(deleteTable: DeleteFromTable) extends Runnab
|
||||
withSparkConf(sparkSession, targetTable.storage.properties) {
|
||||
Map(
|
||||
"path" -> path,
|
||||
KEYGENERATOR_CLASS_OPT_KEY.key -> classOf[SqlKeyGenerator].getCanonicalName,
|
||||
KEYGENERATOR_CLASS.key -> classOf[SqlKeyGenerator].getCanonicalName,
|
||||
TABLE_NAME.key -> tableId.table,
|
||||
OPERATION_OPT_KEY.key -> DataSourceWriteOptions.DELETE_OPERATION_OPT_VAL,
|
||||
PARTITIONPATH_FIELD_OPT_KEY.key -> targetTable.partitionColumnNames.mkString(","),
|
||||
OPERATION.key -> DataSourceWriteOptions.DELETE_OPERATION_OPT_VAL,
|
||||
PARTITIONPATH_FIELD.key -> targetTable.partitionColumnNames.mkString(","),
|
||||
HIVE_SUPPORT_TIMESTAMP.key -> "true",
|
||||
HIVE_STYLE_PARTITIONING_OPT_KEY.key -> "true",
|
||||
HIVE_STYLE_PARTITIONING.key -> "true",
|
||||
HoodieWriteConfig.DELETE_PARALLELISM.key -> "200",
|
||||
SqlKeyGenerator.PARTITION_SCHEMA -> targetTable.partitionSchema.toDDL
|
||||
)
|
||||
|
||||
@@ -188,7 +188,7 @@ object InsertIntoHoodieTableCommand {
|
||||
}
|
||||
val parameters = HoodieOptionConfig.mappingSqlOptionToHoodieParam(table.storage.properties)
|
||||
|
||||
val tableType = parameters.getOrElse(TABLE_TYPE_OPT_KEY.key, TABLE_TYPE_OPT_KEY.defaultValue)
|
||||
val tableType = parameters.getOrElse(TABLE_TYPE.key, TABLE_TYPE.defaultValue)
|
||||
|
||||
val partitionFields = table.partitionColumnNames.mkString(",")
|
||||
val path = getTableLocation(table, sparkSession)
|
||||
@@ -205,8 +205,8 @@ object InsertIntoHoodieTableCommand {
|
||||
}
|
||||
|
||||
val dropDuplicate = sparkSession.conf
|
||||
.getOption(INSERT_DROP_DUPS_OPT_KEY.key)
|
||||
.getOrElse(INSERT_DROP_DUPS_OPT_KEY.defaultValue)
|
||||
.getOption(INSERT_DROP_DUPS.key)
|
||||
.getOrElse(INSERT_DROP_DUPS.defaultValue)
|
||||
.toBoolean
|
||||
|
||||
val operation = if (isOverwrite) {
|
||||
@@ -235,23 +235,23 @@ object InsertIntoHoodieTableCommand {
|
||||
withSparkConf(sparkSession, options) {
|
||||
Map(
|
||||
"path" -> path,
|
||||
TABLE_TYPE_OPT_KEY.key -> tableType,
|
||||
TABLE_TYPE.key -> tableType,
|
||||
TABLE_NAME.key -> table.identifier.table,
|
||||
PRECOMBINE_FIELD_OPT_KEY.key -> tableSchema.fields.last.name,
|
||||
OPERATION_OPT_KEY.key -> operation,
|
||||
KEYGENERATOR_CLASS_OPT_KEY.key -> keyGenClass,
|
||||
RECORDKEY_FIELD_OPT_KEY.key -> primaryColumns.mkString(","),
|
||||
PARTITIONPATH_FIELD_OPT_KEY.key -> partitionFields,
|
||||
PAYLOAD_CLASS_OPT_KEY.key -> payloadClassName,
|
||||
META_SYNC_ENABLED_OPT_KEY.key -> enableHive.toString,
|
||||
HIVE_USE_JDBC_OPT_KEY.key -> "false",
|
||||
HIVE_DATABASE_OPT_KEY.key -> table.identifier.database.getOrElse("default"),
|
||||
HIVE_TABLE_OPT_KEY.key -> table.identifier.table,
|
||||
PRECOMBINE_FIELD.key -> tableSchema.fields.last.name,
|
||||
OPERATION.key -> operation,
|
||||
KEYGENERATOR_CLASS.key -> keyGenClass,
|
||||
RECORDKEY_FIELD.key -> primaryColumns.mkString(","),
|
||||
PARTITIONPATH_FIELD.key -> partitionFields,
|
||||
PAYLOAD_CLASS.key -> payloadClassName,
|
||||
META_SYNC_ENABLED.key -> enableHive.toString,
|
||||
HIVE_USE_JDBC.key -> "false",
|
||||
HIVE_DATABASE.key -> table.identifier.database.getOrElse("default"),
|
||||
HIVE_TABLE.key -> table.identifier.table,
|
||||
HIVE_SUPPORT_TIMESTAMP.key -> "true",
|
||||
HIVE_STYLE_PARTITIONING_OPT_KEY.key -> "true",
|
||||
HIVE_PARTITION_FIELDS_OPT_KEY.key -> partitionFields,
|
||||
HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY.key -> classOf[MultiPartKeysValueExtractor].getCanonicalName,
|
||||
URL_ENCODE_PARTITIONING_OPT_KEY.key -> "true",
|
||||
HIVE_STYLE_PARTITIONING.key -> "true",
|
||||
HIVE_PARTITION_FIELDS.key -> partitionFields,
|
||||
HIVE_PARTITION_EXTRACTOR_CLASS.key -> classOf[MultiPartKeysValueExtractor].getCanonicalName,
|
||||
URL_ENCODE_PARTITIONING.key -> "true",
|
||||
HoodieWriteConfig.INSERT_PARALLELISM.key -> "200",
|
||||
HoodieWriteConfig.UPSERT_PARALLELISM.key -> "200",
|
||||
SqlKeyGenerator.PARTITION_SCHEMA -> table.partitionSchema.toDDL
|
||||
@@ -261,7 +261,7 @@ object InsertIntoHoodieTableCommand {
|
||||
}
|
||||
|
||||
/**
|
||||
* Validate the duplicate key for insert statement without enable the INSERT_DROP_DUPS_OPT_KEY
|
||||
* Validate the duplicate key for insert statement without enable the INSERT_DROP_DUPS_OPT
|
||||
* config.
|
||||
*/
|
||||
class ValidateDuplicateKeyPayload(record: GenericRecord, orderingVal: Comparable[_])
|
||||
|
||||
@@ -228,9 +228,9 @@ case class MergeIntoHoodieTableCommand(mergeInto: MergeIntoTable) extends Runnab
|
||||
// may be different from the target table, because the are transform logical in the update or
|
||||
// insert actions.
|
||||
var writeParams = parameters +
|
||||
(OPERATION_OPT_KEY.key -> UPSERT_OPERATION_OPT_VAL) +
|
||||
(OPERATION.key -> UPSERT_OPERATION_OPT_VAL) +
|
||||
(HoodieWriteConfig.WRITE_SCHEMA_PROP.key -> getTableSchema.toString) +
|
||||
(DataSourceWriteOptions.TABLE_TYPE_OPT_KEY.key -> targetTableType)
|
||||
(DataSourceWriteOptions.TABLE_TYPE.key -> targetTableType)
|
||||
|
||||
// Map of Condition -> Assignments
|
||||
val updateConditionToAssignments =
|
||||
@@ -275,7 +275,7 @@ case class MergeIntoHoodieTableCommand(mergeInto: MergeIntoTable) extends Runnab
|
||||
checkInsertAssignments(insertActions)
|
||||
|
||||
var writeParams = parameters +
|
||||
(OPERATION_OPT_KEY.key -> INSERT_OPERATION_OPT_VAL) +
|
||||
(OPERATION.key -> INSERT_OPERATION_OPT_VAL) +
|
||||
(HoodieWriteConfig.WRITE_SCHEMA_PROP.key -> getTableSchema.toString)
|
||||
|
||||
writeParams += (PAYLOAD_INSERT_CONDITION_AND_ASSIGNMENTS ->
|
||||
@@ -430,21 +430,21 @@ case class MergeIntoHoodieTableCommand(mergeInto: MergeIntoTable) extends Runnab
|
||||
withSparkConf(sparkSession, options) {
|
||||
Map(
|
||||
"path" -> path,
|
||||
RECORDKEY_FIELD_OPT_KEY.key -> targetKey2SourceExpression.keySet.mkString(","),
|
||||
KEYGENERATOR_CLASS_OPT_KEY.key -> classOf[SqlKeyGenerator].getCanonicalName,
|
||||
PRECOMBINE_FIELD_OPT_KEY.key -> targetKey2SourceExpression.keySet.head, // set a default preCombine field
|
||||
RECORDKEY_FIELD.key -> targetKey2SourceExpression.keySet.mkString(","),
|
||||
KEYGENERATOR_CLASS.key -> classOf[SqlKeyGenerator].getCanonicalName,
|
||||
PRECOMBINE_FIELD.key -> targetKey2SourceExpression.keySet.head, // set a default preCombine field
|
||||
TABLE_NAME.key -> targetTableName,
|
||||
PARTITIONPATH_FIELD_OPT_KEY.key -> targetTable.partitionColumnNames.mkString(","),
|
||||
PAYLOAD_CLASS_OPT_KEY.key -> classOf[ExpressionPayload].getCanonicalName,
|
||||
META_SYNC_ENABLED_OPT_KEY.key -> enableHive.toString,
|
||||
HIVE_USE_JDBC_OPT_KEY.key -> "false",
|
||||
HIVE_DATABASE_OPT_KEY.key -> targetTableDb,
|
||||
HIVE_TABLE_OPT_KEY.key -> targetTableName,
|
||||
PARTITIONPATH_FIELD.key -> targetTable.partitionColumnNames.mkString(","),
|
||||
PAYLOAD_CLASS.key -> classOf[ExpressionPayload].getCanonicalName,
|
||||
META_SYNC_ENABLED.key -> enableHive.toString,
|
||||
HIVE_USE_JDBC.key -> "false",
|
||||
HIVE_DATABASE.key -> targetTableDb,
|
||||
HIVE_TABLE.key -> targetTableName,
|
||||
HIVE_SUPPORT_TIMESTAMP.key -> "true",
|
||||
HIVE_STYLE_PARTITIONING_OPT_KEY.key -> "true",
|
||||
HIVE_PARTITION_FIELDS_OPT_KEY.key -> targetTable.partitionColumnNames.mkString(","),
|
||||
HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY.key -> classOf[MultiPartKeysValueExtractor].getCanonicalName,
|
||||
URL_ENCODE_PARTITIONING_OPT_KEY.key -> "true", // enable the url decode for sql.
|
||||
HIVE_STYLE_PARTITIONING.key -> "true",
|
||||
HIVE_PARTITION_FIELDS.key -> targetTable.partitionColumnNames.mkString(","),
|
||||
HIVE_PARTITION_EXTRACTOR_CLASS.key -> classOf[MultiPartKeysValueExtractor].getCanonicalName,
|
||||
URL_ENCODE_PARTITIONING.key -> "true", // enable the url decode for sql.
|
||||
HoodieWriteConfig.INSERT_PARALLELISM.key -> "200", // set the default parallelism to 200 for sql
|
||||
HoodieWriteConfig.UPSERT_PARALLELISM.key -> "200",
|
||||
HoodieWriteConfig.DELETE_PARALLELISM.key -> "200",
|
||||
|
||||
@@ -44,7 +44,7 @@ class SqlKeyGenerator(props: TypedProperties) extends ComplexKeyGenerator(props)
|
||||
override def getPartitionPath(record: GenericRecord): String = {
|
||||
val partitionPath = super.getPartitionPath(record)
|
||||
if (partitionSchema.isDefined) {
|
||||
// we can split the partitionPath here because we enable the URL_ENCODE_PARTITIONING_OPT_KEY
|
||||
// we can split the partitionPath here because we enable the URL_ENCODE_PARTITIONING_OPT
|
||||
// by default for sql.
|
||||
val partitionFragments = partitionPath.split(KeyGenUtils.DEFAULT_PARTITION_PATH_SEPARATOR)
|
||||
assert(partitionFragments.size == partitionSchema.get.size)
|
||||
|
||||
@@ -97,21 +97,21 @@ case class UpdateHoodieTableCommand(updateTable: UpdateTable) extends RunnableCo
|
||||
withSparkConf(sparkSession, targetTable.storage.properties) {
|
||||
Map(
|
||||
"path" -> path,
|
||||
RECORDKEY_FIELD_OPT_KEY.key -> primaryColumns.mkString(","),
|
||||
KEYGENERATOR_CLASS_OPT_KEY.key -> classOf[SqlKeyGenerator].getCanonicalName,
|
||||
PRECOMBINE_FIELD_OPT_KEY.key -> primaryColumns.head, //set the default preCombine field.
|
||||
RECORDKEY_FIELD.key -> primaryColumns.mkString(","),
|
||||
KEYGENERATOR_CLASS.key -> classOf[SqlKeyGenerator].getCanonicalName,
|
||||
PRECOMBINE_FIELD.key -> primaryColumns.head, //set the default preCombine field.
|
||||
TABLE_NAME.key -> tableId.table,
|
||||
OPERATION_OPT_KEY.key -> DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL,
|
||||
PARTITIONPATH_FIELD_OPT_KEY.key -> targetTable.partitionColumnNames.mkString(","),
|
||||
META_SYNC_ENABLED_OPT_KEY.key -> enableHive.toString,
|
||||
HIVE_USE_JDBC_OPT_KEY.key -> "false",
|
||||
HIVE_DATABASE_OPT_KEY.key -> tableId.database.getOrElse("default"),
|
||||
HIVE_TABLE_OPT_KEY.key -> tableId.table,
|
||||
HIVE_PARTITION_FIELDS_OPT_KEY.key -> targetTable.partitionColumnNames.mkString(","),
|
||||
HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY.key -> classOf[MultiPartKeysValueExtractor].getCanonicalName,
|
||||
URL_ENCODE_PARTITIONING_OPT_KEY.key -> "true",
|
||||
OPERATION.key -> DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL,
|
||||
PARTITIONPATH_FIELD.key -> targetTable.partitionColumnNames.mkString(","),
|
||||
META_SYNC_ENABLED.key -> enableHive.toString,
|
||||
HIVE_USE_JDBC.key -> "false",
|
||||
HIVE_DATABASE.key -> tableId.database.getOrElse("default"),
|
||||
HIVE_TABLE.key -> tableId.table,
|
||||
HIVE_PARTITION_FIELDS.key -> targetTable.partitionColumnNames.mkString(","),
|
||||
HIVE_PARTITION_EXTRACTOR_CLASS.key -> classOf[MultiPartKeysValueExtractor].getCanonicalName,
|
||||
URL_ENCODE_PARTITIONING.key -> "true",
|
||||
HIVE_SUPPORT_TIMESTAMP.key -> "true",
|
||||
HIVE_STYLE_PARTITIONING_OPT_KEY.key -> "true",
|
||||
HIVE_STYLE_PARTITIONING.key -> "true",
|
||||
HoodieWriteConfig.UPSERT_PARALLELISM.key -> "200",
|
||||
SqlKeyGenerator.PARTITION_SCHEMA -> targetTable.partitionSchema.toDDL
|
||||
)
|
||||
|
||||
@@ -201,7 +201,7 @@ class ExpressionPayload(record: GenericRecord,
|
||||
}
|
||||
|
||||
private def isMORTable(properties: Properties): Boolean = {
|
||||
properties.getProperty(TABLE_TYPE_OPT_KEY.key, null) == MOR_TABLE_TYPE_OPT_VAL
|
||||
properties.getProperty(TABLE_TYPE.key, null) == MOR_TABLE_TYPE_OPT_VAL
|
||||
}
|
||||
|
||||
private def convertToRecord(values: Array[AnyRef], schema: Schema): IndexedRecord = {
|
||||
|
||||
@@ -154,8 +154,8 @@ class HoodieStreamSource(
|
||||
} else {
|
||||
// Consume the data between (startCommitTime, endCommitTime]
|
||||
val incParams = parameters ++ Map(
|
||||
DataSourceReadOptions.BEGIN_INSTANTTIME_OPT_KEY.key -> startCommitTime(startOffset),
|
||||
DataSourceReadOptions.END_INSTANTTIME_OPT_KEY.key -> endOffset.commitTime
|
||||
DataSourceReadOptions.BEGIN_INSTANTTIME.key -> startCommitTime(startOffset),
|
||||
DataSourceReadOptions.END_INSTANTTIME.key -> endOffset.commitTime
|
||||
)
|
||||
|
||||
val rdd = tableType match {
|
||||
|
||||
Reference in New Issue
Block a user