1
0

Using immutable map instead of mutables to generate parameters

This commit is contained in:
Kent Yao
2019-01-29 10:01:25 +08:00
committed by vinoth chandar
parent 7985eb72b5
commit 2b55f0751f
4 changed files with 95 additions and 55 deletions

View File

@@ -18,12 +18,8 @@
package com.uber.hoodie
import java.util
import java.util.concurrent.ConcurrentHashMap
import java.util.Optional
import scala.collection.JavaConversions._
import scala.collection.mutable.ListBuffer
import com.uber.hoodie.DataSourceWriteOptions._
import com.uber.hoodie.common.table.HoodieTableMetaClient
import com.uber.hoodie.common.util.{FSUtils, TypedProperties}
@@ -36,9 +32,10 @@ import org.apache.hadoop.hive.conf.HiveConf
import org.apache.log4j.LogManager
import org.apache.spark.api.java.JavaSparkContext
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, SQLContext, SaveMode}
import org.apache.spark.sql.{DataFrame, SaveMode, SQLContext}
import scala.collection.mutable
import scala.collection.JavaConversions._
import scala.collection.mutable.ListBuffer
private[hoodie] object HoodieSparkSqlWriter {
@@ -98,7 +95,7 @@ private[hoodie] object HoodieSparkSqlWriter {
gr, parameters(PRECOMBINE_FIELD_OPT_KEY)).asInstanceOf[Comparable[_]]
DataSourceUtils.createHoodieRecord(gr,
orderingVal, keyGenerator.getKey(gr), parameters(PAYLOAD_CLASS_OPT_KEY))
}).toJavaRDD();
}).toJavaRDD()
val jsc = new JavaSparkContext(sparkContext)
@@ -117,7 +114,7 @@ private[hoodie] object HoodieSparkSqlWriter {
return (true, None)
}
val basePath = new Path(parameters.get("path").get)
val basePath = new Path(parameters("path"))
val fs = basePath.getFileSystem(sparkContext.hadoopConfiguration)
var exists = fs.exists(basePath)
@@ -172,7 +169,7 @@ private[hoodie] object HoodieSparkSqlWriter {
log.info("Commit " + commitTime + " failed!")
}
val hiveSyncEnabled = parameters.get(HIVE_SYNC_ENABLED_OPT_KEY).map(r => r.toBoolean).getOrElse(false)
val hiveSyncEnabled = parameters.get(HIVE_SYNC_ENABLED_OPT_KEY).exists(r => r.toBoolean)
val syncHiveSucess = if (hiveSyncEnabled) {
log.info("Syncing to Hive Metastore (URL: " + parameters(HIVE_URL_OPT_KEY) + ")")
val fs = FSUtils.getFs(basePath.toString, jsc.hadoopConfiguration)
@@ -207,30 +204,29 @@ private[hoodie] object HoodieSparkSqlWriter {
* @param parameters
* @return
*/
def parametersWithWriteDefaults(parameters: Map[String, String]): mutable.Map[String, String] = {
val defaultsMap = new ConcurrentHashMap[String, String](mapAsJavaMap(parameters))
defaultsMap.putIfAbsent(OPERATION_OPT_KEY, DEFAULT_OPERATION_OPT_VAL)
defaultsMap.putIfAbsent(STORAGE_TYPE_OPT_KEY, DEFAULT_STORAGE_TYPE_OPT_VAL)
defaultsMap.putIfAbsent(PRECOMBINE_FIELD_OPT_KEY, DEFAULT_PRECOMBINE_FIELD_OPT_VAL)
defaultsMap.putIfAbsent(PAYLOAD_CLASS_OPT_KEY, DEFAULT_PAYLOAD_OPT_VAL)
defaultsMap.putIfAbsent(RECORDKEY_FIELD_OPT_KEY, DEFAULT_RECORDKEY_FIELD_OPT_VAL)
defaultsMap.putIfAbsent(PARTITIONPATH_FIELD_OPT_KEY, DEFAULT_PARTITIONPATH_FIELD_OPT_VAL)
defaultsMap.putIfAbsent(KEYGENERATOR_CLASS_OPT_KEY, DEFAULT_KEYGENERATOR_CLASS_OPT_VAL)
defaultsMap.putIfAbsent(COMMIT_METADATA_KEYPREFIX_OPT_KEY, DEFAULT_COMMIT_METADATA_KEYPREFIX_OPT_VAL)
defaultsMap.putIfAbsent(INSERT_DROP_DUPS_OPT_KEY, DEFAULT_INSERT_DROP_DUPS_OPT_VAL)
defaultsMap.putIfAbsent(STREAMING_RETRY_CNT_OPT_KEY, DEFAULT_STREAMING_RETRY_CNT_OPT_VAL)
defaultsMap.putIfAbsent(STREAMING_RETRY_INTERVAL_MS_OPT_KEY, DEFAULT_STREAMING_RETRY_INTERVAL_MS_OPT_VAL)
defaultsMap.putIfAbsent(STREAMING_IGNORE_FAILED_BATCH_OPT_KEY, DEFAULT_STREAMING_IGNORE_FAILED_BATCH_OPT_VAL)
defaultsMap.putIfAbsent(HIVE_SYNC_ENABLED_OPT_KEY, DEFAULT_HIVE_SYNC_ENABLED_OPT_VAL)
defaultsMap.putIfAbsent(HIVE_DATABASE_OPT_KEY, DEFAULT_HIVE_DATABASE_OPT_VAL)
defaultsMap.putIfAbsent(HIVE_TABLE_OPT_KEY, DEFAULT_HIVE_TABLE_OPT_VAL)
defaultsMap.putIfAbsent(HIVE_USER_OPT_KEY, DEFAULT_HIVE_USER_OPT_VAL)
defaultsMap.putIfAbsent(HIVE_PASS_OPT_KEY, DEFAULT_HIVE_PASS_OPT_VAL)
defaultsMap.putIfAbsent(HIVE_URL_OPT_KEY, DEFAULT_HIVE_URL_OPT_VAL)
defaultsMap.putIfAbsent(HIVE_PARTITION_FIELDS_OPT_KEY, DEFAULT_HIVE_PARTITION_FIELDS_OPT_VAL)
defaultsMap.putIfAbsent(HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY, DEFAULT_HIVE_PARTITION_EXTRACTOR_CLASS_OPT_VAL)
defaultsMap.putIfAbsent(HIVE_ASSUME_DATE_PARTITION_OPT_KEY, DEFAULT_HIVE_ASSUME_DATE_PARTITION_OPT_VAL)
mapAsScalaMap(defaultsMap)
def parametersWithWriteDefaults(parameters: Map[String, String]): Map[String, String] = {
Map(OPERATION_OPT_KEY -> DEFAULT_OPERATION_OPT_VAL,
STORAGE_TYPE_OPT_KEY -> DEFAULT_STORAGE_TYPE_OPT_VAL,
PRECOMBINE_FIELD_OPT_KEY -> DEFAULT_PRECOMBINE_FIELD_OPT_VAL,
PAYLOAD_CLASS_OPT_KEY -> DEFAULT_PAYLOAD_OPT_VAL,
RECORDKEY_FIELD_OPT_KEY -> DEFAULT_RECORDKEY_FIELD_OPT_VAL,
PARTITIONPATH_FIELD_OPT_KEY -> DEFAULT_PARTITIONPATH_FIELD_OPT_VAL,
KEYGENERATOR_CLASS_OPT_KEY -> DEFAULT_KEYGENERATOR_CLASS_OPT_VAL,
COMMIT_METADATA_KEYPREFIX_OPT_KEY -> DEFAULT_COMMIT_METADATA_KEYPREFIX_OPT_VAL,
INSERT_DROP_DUPS_OPT_KEY -> DEFAULT_INSERT_DROP_DUPS_OPT_VAL,
STREAMING_RETRY_CNT_OPT_KEY -> DEFAULT_STREAMING_RETRY_CNT_OPT_VAL,
STREAMING_RETRY_INTERVAL_MS_OPT_KEY -> DEFAULT_STREAMING_RETRY_INTERVAL_MS_OPT_VAL,
STREAMING_IGNORE_FAILED_BATCH_OPT_KEY -> DEFAULT_STREAMING_IGNORE_FAILED_BATCH_OPT_VAL,
HIVE_SYNC_ENABLED_OPT_KEY -> DEFAULT_HIVE_SYNC_ENABLED_OPT_VAL,
HIVE_DATABASE_OPT_KEY -> DEFAULT_HIVE_DATABASE_OPT_VAL,
HIVE_TABLE_OPT_KEY -> DEFAULT_HIVE_TABLE_OPT_VAL,
HIVE_USER_OPT_KEY -> DEFAULT_HIVE_USER_OPT_VAL,
HIVE_PASS_OPT_KEY -> DEFAULT_HIVE_PASS_OPT_VAL,
HIVE_URL_OPT_KEY -> DEFAULT_HIVE_URL_OPT_VAL,
HIVE_PARTITION_FIELDS_OPT_KEY -> DEFAULT_HIVE_PARTITION_FIELDS_OPT_VAL,
HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY -> DEFAULT_HIVE_PARTITION_EXTRACTOR_CLASS_OPT_VAL,
HIVE_ASSUME_DATE_PARTITION_OPT_KEY -> DEFAULT_HIVE_ASSUME_DATE_PARTITION_OPT_VAL
) ++: parameters
}
def toProperties(params: Map[String, String]): TypedProperties = {