1
0

Merge pull request #3120 from pengzhiwei2018/dev_metasync

[HUDI-2045] Support Read Hoodie As DataSource Table For Flink And DeltaStreamer
This commit is contained in:
pengzhiwei
2021-07-13 22:37:20 +08:00
committed by GitHub
14 changed files with 470 additions and 173 deletions

View File

@@ -26,6 +26,7 @@ import org.apache.hudi.common.model.HoodieTableType.{COPY_ON_WRITE, MERGE_ON_REA
import org.apache.hudi.common.table.{HoodieTableMetaClient, TableSchemaResolver}
import org.apache.hudi.exception.HoodieException
import org.apache.hudi.hadoop.HoodieROTablePathFilter
import org.apache.hudi.hive.util.ConfigUtils
import org.apache.log4j.LogManager
import org.apache.spark.sql.avro.SchemaConverters
import org.apache.spark.sql.execution.datasources.{DataSource, FileStatusCache, HadoopFsRelation}
@@ -105,8 +106,14 @@ class DefaultSource extends RelationProvider
val metaClient = HoodieTableMetaClient.builder().setConf(fs.getConf).setBasePath(tablePath).build()
val isBootstrappedTable = metaClient.getTableConfig.getBootstrapBasePath.isPresent
val tableType = metaClient.getTableType
val queryType = parameters.getOrElse(QUERY_TYPE_OPT_KEY.key, QUERY_TYPE_OPT_KEY.defaultValue)
log.info(s"Is bootstrapped table => $isBootstrappedTable, tableType is: $tableType")
// First check if the ConfigUtils.IS_QUERY_AS_RO_TABLE has set by HiveSyncTool,
// or else use query type from QUERY_TYPE_OPT_KEY.
val queryType = parameters.get(ConfigUtils.IS_QUERY_AS_RO_TABLE)
.map(is => if (is.toBoolean) QUERY_TYPE_READ_OPTIMIZED_OPT_VAL else QUERY_TYPE_SNAPSHOT_OPT_VAL)
.getOrElse(parameters.getOrElse(QUERY_TYPE_OPT_KEY.key, QUERY_TYPE_OPT_KEY.defaultValue()))
log.info(s"Is bootstrapped table => $isBootstrappedTable, tableType is: $tableType, queryType is: $queryType")
(tableType, queryType, isBootstrappedTable) match {
case (COPY_ON_WRITE, QUERY_TYPE_SNAPSHOT_OPT_VAL, false) |

View File

@@ -36,7 +36,6 @@ import org.apache.hudi.config.HoodieBootstrapConfig.{BOOTSTRAP_BASE_PATH_PROP, B
import org.apache.hudi.config.{HoodieInternalConfig, HoodieWriteConfig}
import org.apache.hudi.exception.HoodieException
import org.apache.hudi.execution.bulkinsert.BulkInsertInternalPartitionerWithRowsFactory
import org.apache.hudi.hive.util.ConfigUtils
import org.apache.hudi.hive.{HiveSyncConfig, HiveSyncTool}
import org.apache.hudi.index.SparkHoodieIndex
import org.apache.hudi.internal.DataSourceInternalWriterHelper
@@ -48,11 +47,9 @@ import org.apache.spark.SPARK_VERSION
import org.apache.spark.SparkContext
import org.apache.spark.api.java.JavaSparkContext
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.hudi.HoodieSqlUtils
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.internal.StaticSQLConf.SCHEMA_STRING_LENGTH_THRESHOLD
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.{DataFrame, Dataset, Row, SQLContext, SaveMode, SparkSession}
import org.apache.spark.sql.{DataFrame, Dataset,Row, SQLContext, SaveMode, SparkSession}
import org.apache.spark.sql.internal.{SQLConf, StaticSQLConf}
import scala.collection.JavaConversions._
import scala.collection.mutable.ListBuffer
@@ -421,15 +418,15 @@ object HoodieSparkSqlWriter {
}
}
private def syncHive(basePath: Path, fs: FileSystem, hoodieConfig: HoodieConfig): Boolean = {
val hiveSyncConfig: HiveSyncConfig = buildSyncConfig(basePath, hoodieConfig)
private def syncHive(basePath: Path, fs: FileSystem, hoodieConfig: HoodieConfig, sqlConf: SQLConf): Boolean = {
val hiveSyncConfig: HiveSyncConfig = buildSyncConfig(basePath, hoodieConfig, sqlConf)
val hiveConf: HiveConf = new HiveConf()
hiveConf.addResource(fs.getConf)
new HiveSyncTool(hiveSyncConfig, hiveConf, fs).syncHoodieTable()
true
}
private def buildSyncConfig(basePath: Path, hoodieConfig: HoodieConfig): HiveSyncConfig = {
private def buildSyncConfig(basePath: Path, hoodieConfig: HoodieConfig, sqlConf: SQLConf): HiveSyncConfig = {
val hiveSyncConfig: HiveSyncConfig = new HiveSyncConfig()
hiveSyncConfig.basePath = basePath.toString
hiveSyncConfig.baseFileFormat = hoodieConfig.getString(HIVE_BASE_FILE_FORMAT_OPT_KEY)
@@ -454,77 +451,12 @@ object HoodieSparkSqlWriter {
hiveSyncConfig.decodePartition = hoodieConfig.getStringOrDefault(URL_ENCODE_PARTITIONING_OPT_KEY).toBoolean
hiveSyncConfig.batchSyncNum = hoodieConfig.getStringOrDefault(HIVE_BATCH_SYNC_PARTITION_NUM).toInt
val syncAsDtaSourceTable = hoodieConfig.getStringOrDefault(HIVE_SYNC_AS_DATA_SOURCE_TABLE).toBoolean
if (syncAsDtaSourceTable) {
hiveSyncConfig.tableProperties = hoodieConfig.getStringOrDefault(HIVE_TABLE_PROPERTIES, null)
val serdePropText = createSqlTableSerdeProperties(hoodieConfig, basePath.toString)
val serdeProp = ConfigUtils.toMap(serdePropText)
serdeProp.put(ConfigUtils.SPARK_QUERY_TYPE_KEY, DataSourceReadOptions.QUERY_TYPE_OPT_KEY.key)
serdeProp.put(ConfigUtils.SPARK_QUERY_AS_RO_KEY, DataSourceReadOptions.QUERY_TYPE_READ_OPTIMIZED_OPT_VAL)
serdeProp.put(ConfigUtils.SPARK_QUERY_AS_RT_KEY, DataSourceReadOptions.QUERY_TYPE_SNAPSHOT_OPT_VAL)
hiveSyncConfig.serdeProperties = ConfigUtils.configToString(serdeProp)
}
hiveSyncConfig.syncAsSparkDataSourceTable = hoodieConfig.getStringOrDefault(HIVE_SYNC_AS_DATA_SOURCE_TABLE).toBoolean
hiveSyncConfig.sparkSchemaLengthThreshold = sqlConf.getConf(StaticSQLConf.SCHEMA_STRING_LENGTH_THRESHOLD)
hiveSyncConfig.createManagedTable = hoodieConfig.getBoolean(HIVE_CREATE_MANAGED_TABLE)
hiveSyncConfig
}
/**
* Add Spark Sql related table properties to the HIVE_TABLE_PROPERTIES.
* @param sqlConf The spark sql conf.
* @param schema The schema to write to the table.
* @param hoodieConfig The HoodieConfig contains origin parameters.
* @return A new parameters added the HIVE_TABLE_PROPERTIES property.
*/
private def addSqlTableProperties(sqlConf: SQLConf, schema: StructType,
hoodieConfig: HoodieConfig): HoodieConfig = {
// Convert the schema and partition info used by spark sql to hive table properties.
// The following code refers to the spark code in
// https://github.com/apache/spark/blob/master/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
// Sync schema with meta fields
val schemaWithMetaFields = HoodieSqlUtils.addMetaFields(schema)
val partitionSet = hoodieConfig.getString(HIVE_PARTITION_FIELDS_OPT_KEY)
.split(",").map(_.trim).filter(!_.isEmpty).toSet
val threshold = sqlConf.getConf(SCHEMA_STRING_LENGTH_THRESHOLD)
val (partitionCols, dataCols) = schemaWithMetaFields.partition(c => partitionSet.contains(c.name))
val reOrderedType = StructType(dataCols ++ partitionCols)
val schemaParts = reOrderedType.json.grouped(threshold).toSeq
var properties = Map(
"spark.sql.sources.provider" -> "hudi",
"spark.sql.sources.schema.numParts" -> schemaParts.size.toString
)
schemaParts.zipWithIndex.foreach { case (part, index) =>
properties += s"spark.sql.sources.schema.part.$index" -> part
}
// add partition columns
if (partitionSet.nonEmpty) {
properties += "spark.sql.sources.schema.numPartCols" -> partitionSet.size.toString
partitionSet.zipWithIndex.foreach { case (partCol, index) =>
properties += s"spark.sql.sources.schema.partCol.$index" -> partCol
}
}
var sqlPropertyText = ConfigUtils.configToString(properties)
sqlPropertyText = if (hoodieConfig.contains(HIVE_TABLE_PROPERTIES)) {
sqlPropertyText + "\n" + hoodieConfig.getString(HIVE_TABLE_PROPERTIES)
} else {
sqlPropertyText
}
hoodieConfig.setValue(HIVE_TABLE_PROPERTIES, sqlPropertyText)
hoodieConfig
}
private def createSqlTableSerdeProperties(hoodieConfig: HoodieConfig, basePath: String): String = {
val pathProp = s"path=$basePath"
if (hoodieConfig.contains(HIVE_TABLE_SERDE_PROPERTIES)) {
pathProp + "\n" + hoodieConfig.getString(HIVE_TABLE_SERDE_PROPERTIES)
} else {
pathProp
}
}
private def metaSync(spark: SparkSession, hoodieConfig: HoodieConfig, basePath: Path,
schema: StructType): Boolean = {
val hiveSyncEnabled = hoodieConfig.getStringOrDefault(HIVE_SYNC_ENABLED_OPT_KEY).toBoolean
@@ -532,7 +464,6 @@ object HoodieSparkSqlWriter {
var syncClientToolClassSet = scala.collection.mutable.Set[String]()
hoodieConfig.getString(META_SYNC_CLIENT_TOOL_CLASS).split(",").foreach(syncClass => syncClientToolClassSet += syncClass)
val newHoodieConfig = addSqlTableProperties(spark.sessionState.conf, schema, hoodieConfig)
// for backward compatibility
if (hiveSyncEnabled) {
metaSyncEnabled = true
@@ -545,12 +476,12 @@ object HoodieSparkSqlWriter {
val syncSuccess = impl.trim match {
case "org.apache.hudi.hive.HiveSyncTool" => {
log.info("Syncing to Hive Metastore (URL: " + hoodieConfig.getString(HIVE_URL_OPT_KEY) + ")")
syncHive(basePath, fs, newHoodieConfig)
syncHive(basePath, fs, hoodieConfig, spark.sessionState.conf)
true
}
case _ => {
val properties = new Properties()
properties.putAll(newHoodieConfig.getProps)
properties.putAll(hoodieConfig.getProps)
properties.put("basePath", basePath.toString)
val syncHoodie = ReflectionUtils.loadClass(impl.trim, Array[Class[_]](classOf[Properties], classOf[FileSystem]), properties, fs).asInstanceOf[AbstractSyncTool]
syncHoodie.syncHoodieTable()