[HUDI-1591] Implement Spark's FileIndex for Hudi to support queries via Hudi DataSource using non-globbed table path and partition pruning (#2651)
This commit is contained in:
@@ -19,14 +19,16 @@ package org.apache.hudi
|
||||
|
||||
import org.apache.hadoop.fs.Path
|
||||
import org.apache.hudi.DataSourceReadOptions._
|
||||
import org.apache.hudi.common.model.{HoodieRecord, HoodieTableType}
|
||||
import org.apache.hudi.common.model.HoodieRecord
|
||||
import org.apache.hudi.DataSourceWriteOptions.{BOOTSTRAP_OPERATION_OPT_VAL, OPERATION_OPT_KEY}
|
||||
import org.apache.hudi.common.fs.FSUtils
|
||||
import org.apache.hudi.common.model.HoodieTableType.{COPY_ON_WRITE, MERGE_ON_READ}
|
||||
import org.apache.hudi.common.table.{HoodieTableMetaClient, TableSchemaResolver}
|
||||
import org.apache.hudi.exception.HoodieException
|
||||
import org.apache.hudi.hadoop.HoodieROTablePathFilter
|
||||
import org.apache.log4j.LogManager
|
||||
import org.apache.spark.sql.execution.datasources.DataSource
|
||||
import org.apache.spark.sql.execution.datasources.{DataSource, FileStatusCache, HadoopFsRelation}
|
||||
import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat
|
||||
import org.apache.spark.sql.execution.streaming.{Sink, Source}
|
||||
import org.apache.spark.sql.hudi.streaming.HoodieStreamSource
|
||||
import org.apache.spark.sql.sources._
|
||||
@@ -79,39 +81,53 @@ class DefaultSource extends RelationProvider
|
||||
val allPaths = path.map(p => Seq(p)).getOrElse(Seq()) ++ readPaths
|
||||
|
||||
val fs = FSUtils.getFs(allPaths.head, sqlContext.sparkContext.hadoopConfiguration)
|
||||
val globPaths = HoodieSparkUtils.checkAndGlobPathIfNecessary(allPaths, fs)
|
||||
|
||||
val tablePath = DataSourceUtils.getTablePath(fs, globPaths.toArray)
|
||||
// Use the HoodieFileIndex only if the 'path' is not globbed.
|
||||
// Or else we use the original way to read hoodie table.
|
||||
val enableFileIndex = optParams.get(ENABLE_HOODIE_FILE_INDEX)
|
||||
.map(_.toBoolean).getOrElse(DEFAULT_ENABLE_HOODIE_FILE_INDEX)
|
||||
val useHoodieFileIndex = enableFileIndex && path.isDefined && !path.get.contains("*") &&
|
||||
!parameters.contains(DataSourceReadOptions.READ_PATHS_OPT_KEY)
|
||||
val globPaths = if (useHoodieFileIndex) {
|
||||
None
|
||||
} else {
|
||||
Some(HoodieSparkUtils.checkAndGlobPathIfNecessary(allPaths, fs))
|
||||
}
|
||||
// Get the table base path
|
||||
val tablePath = if (globPaths.isDefined) {
|
||||
DataSourceUtils.getTablePath(fs, globPaths.get.toArray)
|
||||
} else {
|
||||
DataSourceUtils.getTablePath(fs, Array(new Path(path.get)))
|
||||
}
|
||||
log.info("Obtained hudi table path: " + tablePath)
|
||||
|
||||
val metaClient = HoodieTableMetaClient.builder().setConf(fs.getConf).setBasePath(tablePath).build()
|
||||
val isBootstrappedTable = metaClient.getTableConfig.getBootstrapBasePath.isPresent
|
||||
log.info("Is bootstrapped table => " + isBootstrappedTable)
|
||||
val tableType = metaClient.getTableType
|
||||
val queryType = parameters(QUERY_TYPE_OPT_KEY)
|
||||
log.info(s"Is bootstrapped table => $isBootstrappedTable, tableType is: $tableType")
|
||||
|
||||
if (parameters(QUERY_TYPE_OPT_KEY).equals(QUERY_TYPE_SNAPSHOT_OPT_VAL)) {
|
||||
if (metaClient.getTableType.equals(HoodieTableType.MERGE_ON_READ)) {
|
||||
if (isBootstrappedTable) {
|
||||
// Snapshot query is not supported for Bootstrapped MOR tables
|
||||
log.warn("Snapshot query is not supported for Bootstrapped Merge-on-Read tables." +
|
||||
" Falling back to Read Optimized query.")
|
||||
new HoodieBootstrapRelation(sqlContext, schema, globPaths, metaClient, optParams)
|
||||
} else {
|
||||
new MergeOnReadSnapshotRelation(sqlContext, optParams, schema, globPaths, metaClient)
|
||||
}
|
||||
} else {
|
||||
getBaseFileOnlyView(sqlContext, parameters, schema, readPaths, isBootstrappedTable, globPaths, metaClient)
|
||||
}
|
||||
} else if(parameters(QUERY_TYPE_OPT_KEY).equals(QUERY_TYPE_READ_OPTIMIZED_OPT_VAL)) {
|
||||
getBaseFileOnlyView(sqlContext, parameters, schema, readPaths, isBootstrappedTable, globPaths, metaClient)
|
||||
} else if (parameters(QUERY_TYPE_OPT_KEY).equals(QUERY_TYPE_INCREMENTAL_OPT_VAL)) {
|
||||
val metaClient = HoodieTableMetaClient.builder().setConf(fs.getConf).setBasePath(tablePath).build()
|
||||
if (metaClient.getTableType.equals(HoodieTableType.MERGE_ON_READ)) {
|
||||
new MergeOnReadIncrementalRelation(sqlContext, optParams, schema, metaClient)
|
||||
} else {
|
||||
new IncrementalRelation(sqlContext, optParams, schema, metaClient)
|
||||
}
|
||||
} else {
|
||||
throw new HoodieException("Invalid query type :" + parameters(QUERY_TYPE_OPT_KEY))
|
||||
(tableType, queryType, isBootstrappedTable) match {
|
||||
case (COPY_ON_WRITE, QUERY_TYPE_SNAPSHOT_OPT_VAL, false) |
|
||||
(COPY_ON_WRITE, QUERY_TYPE_READ_OPTIMIZED_OPT_VAL, false) |
|
||||
(MERGE_ON_READ, QUERY_TYPE_READ_OPTIMIZED_OPT_VAL, false) =>
|
||||
getBaseFileOnlyView(useHoodieFileIndex, sqlContext, parameters, schema, tablePath,
|
||||
readPaths, metaClient)
|
||||
|
||||
case (COPY_ON_WRITE, QUERY_TYPE_INCREMENTAL_OPT_VAL, _) =>
|
||||
new IncrementalRelation(sqlContext, parameters, schema, metaClient)
|
||||
|
||||
case (MERGE_ON_READ, QUERY_TYPE_SNAPSHOT_OPT_VAL, false) =>
|
||||
new MergeOnReadSnapshotRelation(sqlContext, parameters, schema, globPaths, metaClient)
|
||||
|
||||
case (MERGE_ON_READ, QUERY_TYPE_INCREMENTAL_OPT_VAL, _) =>
|
||||
new MergeOnReadIncrementalRelation(sqlContext, parameters, schema, metaClient)
|
||||
|
||||
case (_, _, true) =>
|
||||
new HoodieBootstrapRelation(sqlContext, schema, globPaths, metaClient, parameters)
|
||||
|
||||
case (_, _, _) =>
|
||||
throw new HoodieException(s"Invalid query type : $queryType for tableType: $tableType," +
|
||||
s"isBootstrappedTable: $isBootstrappedTable ")
|
||||
}
|
||||
}
|
||||
|
||||
@@ -162,18 +178,28 @@ class DefaultSource extends RelationProvider
|
||||
|
||||
override def shortName(): String = "hudi"
|
||||
|
||||
private def getBaseFileOnlyView(sqlContext: SQLContext,
|
||||
private def getBaseFileOnlyView(useHoodieFileIndex: Boolean,
|
||||
sqlContext: SQLContext,
|
||||
optParams: Map[String, String],
|
||||
schema: StructType,
|
||||
tablePath: String,
|
||||
extraReadPaths: Seq[String],
|
||||
isBootstrappedTable: Boolean,
|
||||
globPaths: Seq[Path],
|
||||
metaClient: HoodieTableMetaClient): BaseRelation = {
|
||||
log.warn("Loading Base File Only View.")
|
||||
log.info("Loading Base File Only View with options :" + optParams)
|
||||
|
||||
if (isBootstrappedTable) {
|
||||
// For bootstrapped tables, use our custom Spark relation for querying
|
||||
new HoodieBootstrapRelation(sqlContext, schema, globPaths, metaClient, optParams)
|
||||
if (useHoodieFileIndex) {
|
||||
|
||||
val fileIndex = HoodieFileIndex(sqlContext.sparkSession, metaClient,
|
||||
if (schema == null) Option.empty[StructType] else Some(schema),
|
||||
optParams, FileStatusCache.getOrCreate(sqlContext.sparkSession))
|
||||
|
||||
HadoopFsRelation(
|
||||
fileIndex,
|
||||
fileIndex.partitionSchema,
|
||||
fileIndex.dataSchema,
|
||||
bucketSpec = None,
|
||||
fileFormat = new ParquetFileFormat,
|
||||
optParams)(sqlContext.sparkSession)
|
||||
} else {
|
||||
// this is just effectively RO view only, where `path` can contain a mix of
|
||||
// non-hoodie/hoodie path files. set the path filter up
|
||||
@@ -182,7 +208,6 @@ class DefaultSource extends RelationProvider
|
||||
classOf[HoodieROTablePathFilter],
|
||||
classOf[org.apache.hadoop.fs.PathFilter])
|
||||
|
||||
log.info("Constructing hoodie (as parquet) data source with options :" + optParams)
|
||||
// simply return as a regular parquet relation
|
||||
DataSource.apply(
|
||||
sparkSession = sqlContext.sparkSession,
|
||||
|
||||
@@ -26,7 +26,7 @@ import org.apache.hudi.exception.HoodieException
|
||||
import org.apache.spark.internal.Logging
|
||||
import org.apache.spark.rdd.RDD
|
||||
import org.apache.spark.sql.catalyst.InternalRow
|
||||
import org.apache.spark.sql.execution.datasources.PartitionedFile
|
||||
import org.apache.spark.sql.execution.datasources.{FileStatusCache, PartitionedFile}
|
||||
import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat
|
||||
import org.apache.spark.sql.{Row, SQLContext}
|
||||
import org.apache.spark.sql.sources.{BaseRelation, Filter, PrunedFilteredScan}
|
||||
@@ -46,13 +46,14 @@ import scala.collection.JavaConverters._
|
||||
*
|
||||
* @param _sqlContext Spark SQL Context
|
||||
* @param userSchema User specified schema in the datasource query
|
||||
* @param globPaths Globbed paths obtained from the user provided path for querying
|
||||
* @param globPaths The global paths to query. If it not none, read from the globPaths,
|
||||
* else read data from tablePath using HoodiFileIndex.
|
||||
* @param metaClient Hoodie table meta client
|
||||
* @param optParams DataSource options passed by the user
|
||||
*/
|
||||
class HoodieBootstrapRelation(@transient val _sqlContext: SQLContext,
|
||||
val userSchema: StructType,
|
||||
val globPaths: Seq[Path],
|
||||
val globPaths: Option[Seq[Path]],
|
||||
val metaClient: HoodieTableMetaClient,
|
||||
val optParams: Map[String, String]) extends BaseRelation
|
||||
with PrunedFilteredScan with Logging {
|
||||
@@ -156,9 +157,14 @@ class HoodieBootstrapRelation(@transient val _sqlContext: SQLContext,
|
||||
|
||||
def buildFileIndex(): HoodieBootstrapFileIndex = {
|
||||
logInfo("Building file index..")
|
||||
val inMemoryFileIndex = HoodieSparkUtils.createInMemoryFileIndex(_sqlContext.sparkSession, globPaths)
|
||||
val fileStatuses = inMemoryFileIndex.allFiles()
|
||||
|
||||
val fileStatuses = if (globPaths.isDefined) {
|
||||
// Load files from the global paths if it has defined to be compatible with the original mode
|
||||
val inMemoryFileIndex = HoodieSparkUtils.createInMemoryFileIndex(_sqlContext.sparkSession, globPaths.get)
|
||||
inMemoryFileIndex.allFiles()
|
||||
} else { // Load files by the HoodieFileIndex.
|
||||
HoodieFileIndex(sqlContext.sparkSession, metaClient, Some(schema), optParams,
|
||||
FileStatusCache.getOrCreate(sqlContext.sparkSession)).allFiles
|
||||
}
|
||||
if (fileStatuses.isEmpty) {
|
||||
throw new HoodieException("No files found for reading in user provided path.")
|
||||
}
|
||||
|
||||
@@ -0,0 +1,362 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hudi
|
||||
|
||||
import java.util.Properties
|
||||
|
||||
import scala.collection.JavaConverters._
|
||||
import org.apache.hadoop.fs.{FileStatus, Path}
|
||||
import org.apache.hudi.client.common.HoodieSparkEngineContext
|
||||
import org.apache.hudi.common.config.{HoodieMetadataConfig, SerializableConfiguration}
|
||||
import org.apache.hudi.common.engine.HoodieLocalEngineContext
|
||||
import org.apache.hudi.common.fs.FSUtils
|
||||
import org.apache.hudi.common.model.HoodieBaseFile
|
||||
import org.apache.hudi.common.table.{HoodieTableMetaClient, TableSchemaResolver}
|
||||
import org.apache.hudi.common.table.view.HoodieTableFileSystemView
|
||||
import org.apache.hudi.config.HoodieWriteConfig
|
||||
import org.apache.spark.api.java.JavaSparkContext
|
||||
import org.apache.spark.internal.Logging
|
||||
import org.apache.spark.sql.catalyst.{InternalRow, expressions}
|
||||
import org.apache.spark.sql.SparkSession
|
||||
import org.apache.spark.sql.avro.SchemaConverters
|
||||
import org.apache.spark.sql.catalyst.expressions.{AttributeReference, BoundReference, Expression, InterpretedPredicate}
|
||||
import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, DateTimeUtils}
|
||||
import org.apache.spark.sql.execution.datasources.{FileIndex, FileStatusCache, NoopCache, PartitionDirectory}
|
||||
import org.apache.spark.sql.internal.SQLConf
|
||||
import org.apache.spark.sql.types.StructType
|
||||
import org.apache.spark.unsafe.types.UTF8String
|
||||
|
||||
import scala.collection.mutable
|
||||
|
||||
/**
|
||||
* A file index which support partition prune for hoodie snapshot and read-optimized query.
|
||||
*
|
||||
* Main steps to get the file list for query:
|
||||
* 1、Load all files and partition values from the table path.
|
||||
* 2、Do the partition prune by the partition filter condition.
|
||||
*
|
||||
* There are 3 cases for this:
|
||||
* 1、If the partition columns size is equal to the actually partition path level, we
|
||||
* read it as partitioned table.(e.g partition column is "dt", the partition path is "2021-03-10")
|
||||
*
|
||||
* 2、If the partition columns size is not equal to the partition path level, but the partition
|
||||
* column size is "1" (e.g. partition column is "dt", but the partition path is "2021/03/10"
|
||||
* who'es directory level is 3).We can still read it as a partitioned table. We will mapping the
|
||||
* partition path (e.g. 2021/03/10) to the only partition column (e.g. "dt").
|
||||
*
|
||||
* 3、Else the the partition columns size is not equal to the partition directory level and the
|
||||
* size is great than "1" (e.g. partition column is "dt,hh", the partition path is "2021/03/10/12")
|
||||
* , we read it as a Non-Partitioned table because we cannot know how to mapping the partition
|
||||
* path with the partition columns in this case.
|
||||
*
|
||||
*/
|
||||
case class HoodieFileIndex(
|
||||
spark: SparkSession,
|
||||
metaClient: HoodieTableMetaClient,
|
||||
schemaSpec: Option[StructType],
|
||||
options: Map[String, String],
|
||||
@transient fileStatusCache: FileStatusCache = NoopCache)
|
||||
extends FileIndex with Logging {
|
||||
|
||||
private val basePath = metaClient.getBasePath
|
||||
|
||||
@transient private val queryPath = new Path(options.getOrElse("path", "'path' option required"))
|
||||
/**
|
||||
* Get the schema of the table.
|
||||
*/
|
||||
lazy val schema: StructType = schemaSpec.getOrElse({
|
||||
val schemaUtil = new TableSchemaResolver(metaClient)
|
||||
SchemaConverters.toSqlType(schemaUtil.getTableAvroSchema)
|
||||
.dataType.asInstanceOf[StructType]
|
||||
})
|
||||
|
||||
/**
|
||||
* Get the partition schema from the hoodie.properties.
|
||||
*/
|
||||
private lazy val _partitionSchemaFromProperties: StructType = {
|
||||
val tableConfig = metaClient.getTableConfig
|
||||
val partitionColumns = tableConfig.getPartitionColumns
|
||||
val nameFieldMap = schema.fields.map(filed => filed.name -> filed).toMap
|
||||
|
||||
if (partitionColumns.isPresent) {
|
||||
val partitionFields = partitionColumns.get().map(column =>
|
||||
nameFieldMap.getOrElse(column, throw new IllegalArgumentException(s"Cannot find column: '" +
|
||||
s"$column' in the schema[${schema.fields.mkString(",")}]")))
|
||||
new StructType(partitionFields)
|
||||
} else { // If the partition columns have not stored in hoodie.properites(the table that was
|
||||
// created earlier), we trait it as a non-partitioned table.
|
||||
logWarning("No partition columns available from hoodie.properties." +
|
||||
" Partition pruning will not work")
|
||||
new StructType()
|
||||
}
|
||||
}
|
||||
|
||||
@transient @volatile private var fileSystemView: HoodieTableFileSystemView = _
|
||||
@transient @volatile private var cachedAllInputFiles: Array[HoodieBaseFile] = _
|
||||
@transient @volatile private var cachedFileSize: Long = 0L
|
||||
@transient @volatile private var cachedAllPartitionPaths: Seq[PartitionRowPath] = _
|
||||
|
||||
@volatile private var queryAsNonePartitionedTable: Boolean = _
|
||||
|
||||
refresh0()
|
||||
|
||||
override def rootPaths: Seq[Path] = queryPath :: Nil
|
||||
|
||||
override def listFiles(partitionFilters: Seq[Expression],
|
||||
dataFilters: Seq[Expression]): Seq[PartitionDirectory] = {
|
||||
if (queryAsNonePartitionedTable) { // Read as Non-Partitioned table.
|
||||
Seq(PartitionDirectory(InternalRow.empty, allFiles))
|
||||
} else {
|
||||
// Prune the partition path by the partition filters
|
||||
val prunedPartitions = prunePartition(cachedAllPartitionPaths, partitionFilters)
|
||||
prunedPartitions.map { partition =>
|
||||
val fileStatues = fileSystemView.getLatestBaseFiles(partition.partitionPath).iterator()
|
||||
.asScala.toSeq
|
||||
.map(_.getFileStatus)
|
||||
PartitionDirectory(partition.values, fileStatues)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
override def inputFiles: Array[String] = {
|
||||
cachedAllInputFiles.map(_.getFileStatus.getPath.toString)
|
||||
}
|
||||
|
||||
override def refresh(): Unit = {
|
||||
fileStatusCache.invalidateAll()
|
||||
refresh0()
|
||||
}
|
||||
|
||||
private def refresh0(): Unit = {
|
||||
val startTime = System.currentTimeMillis()
|
||||
val partitionFiles = loadPartitionPathFiles()
|
||||
val allFiles = partitionFiles.values.reduceOption(_ ++ _)
|
||||
.getOrElse(Array.empty[FileStatus])
|
||||
|
||||
metaClient.reloadActiveTimeline()
|
||||
val activeInstants = metaClient.getActiveTimeline.getCommitsTimeline.filterCompletedInstants
|
||||
fileSystemView = new HoodieTableFileSystemView(metaClient, activeInstants, allFiles)
|
||||
cachedAllInputFiles = fileSystemView.getLatestBaseFiles.iterator().asScala.toArray
|
||||
cachedAllPartitionPaths = partitionFiles.keys.toSeq
|
||||
cachedFileSize = cachedAllInputFiles.map(_.getFileLen).sum
|
||||
|
||||
// If the partition value contains InternalRow.empty, we query it as a non-partitioned table.
|
||||
queryAsNonePartitionedTable = cachedAllPartitionPaths
|
||||
.exists(p => p.values == InternalRow.empty)
|
||||
val flushSpend = System.currentTimeMillis() - startTime
|
||||
logInfo(s"Refresh for table ${metaClient.getTableConfig.getTableName}," +
|
||||
s" spend: $flushSpend ms")
|
||||
}
|
||||
|
||||
override def sizeInBytes: Long = {
|
||||
cachedFileSize
|
||||
}
|
||||
|
||||
override def partitionSchema: StructType = {
|
||||
if (queryAsNonePartitionedTable) {
|
||||
// If we read it as Non-Partitioned table, we should not
|
||||
// return the partition schema.
|
||||
new StructType()
|
||||
} else {
|
||||
_partitionSchemaFromProperties
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the data schema of the table.
|
||||
* @return
|
||||
*/
|
||||
def dataSchema: StructType = {
|
||||
val partitionColumns = partitionSchema.fields.map(_.name).toSet
|
||||
StructType(schema.fields.filterNot(f => partitionColumns.contains(f.name)))
|
||||
}
|
||||
|
||||
def allFiles: Seq[FileStatus] = cachedAllInputFiles.map(_.getFileStatus)
|
||||
|
||||
/**
|
||||
* Prune the partition by the filter.This implementation is fork from
|
||||
* org.apache.spark.sql.execution.datasources.PartitioningAwareFileIndex#prunePartitions.
|
||||
* @param partitionPaths All the partition paths.
|
||||
* @param predicates The filter condition.
|
||||
* @return The Pruned partition paths.
|
||||
*/
|
||||
private def prunePartition(partitionPaths: Seq[PartitionRowPath],
|
||||
predicates: Seq[Expression]): Seq[PartitionRowPath] = {
|
||||
|
||||
val partitionColumnNames = partitionSchema.fields.map(_.name).toSet
|
||||
val partitionPruningPredicates = predicates.filter {
|
||||
_.references.map(_.name).toSet.subsetOf(partitionColumnNames)
|
||||
}
|
||||
if (partitionPruningPredicates.nonEmpty) {
|
||||
val predicate = partitionPruningPredicates.reduce(expressions.And)
|
||||
|
||||
val boundPredicate = InterpretedPredicate(predicate.transform {
|
||||
case a: AttributeReference =>
|
||||
val index = partitionSchema.indexWhere(a.name == _.name)
|
||||
BoundReference(index, partitionSchema(index).dataType, nullable = true)
|
||||
})
|
||||
|
||||
val prunedPartitionPaths = partitionPaths.filter {
|
||||
case PartitionRowPath(values, _) => boundPredicate.eval(values)
|
||||
}
|
||||
logInfo(s"Total partition size is: ${partitionPaths.size}," +
|
||||
s" after partition prune size is: ${prunedPartitionPaths.size}")
|
||||
prunedPartitionPaths
|
||||
} else {
|
||||
partitionPaths
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Load all partition paths and it's files under the query table path.
|
||||
*/
|
||||
private def loadPartitionPathFiles(): Map[PartitionRowPath, Array[FileStatus]] = {
|
||||
val sparkEngine = new HoodieSparkEngineContext(new JavaSparkContext(spark.sparkContext))
|
||||
val properties = new Properties()
|
||||
properties.putAll(options.asJava)
|
||||
val metadataConfig = HoodieMetadataConfig.newBuilder.fromProperties(properties).build()
|
||||
|
||||
val queryPartitionPath = FSUtils.getRelativePartitionPath(new Path(basePath), queryPath)
|
||||
// Load all the partition path from the basePath, and filter by the query partition path.
|
||||
// TODO load files from the queryPartitionPath directly.
|
||||
val partitionPaths = FSUtils.getAllPartitionPaths(sparkEngine, metadataConfig, basePath).asScala
|
||||
.filter(_.startsWith(queryPartitionPath))
|
||||
|
||||
val writeConfig = HoodieWriteConfig.newBuilder()
|
||||
.withPath(basePath).withProperties(properties).build()
|
||||
val maxListParallelism = writeConfig.getFileListingParallelism
|
||||
|
||||
val serializableConf = new SerializableConfiguration(spark.sessionState.newHadoopConf())
|
||||
val partitionSchema = _partitionSchemaFromProperties
|
||||
val timeZoneId = CaseInsensitiveMap(options)
|
||||
.get(DateTimeUtils.TIMEZONE_OPTION)
|
||||
.getOrElse(SQLConf.get.sessionLocalTimeZone)
|
||||
|
||||
val sparkParsePartitionUtil = HoodieSparkUtils.createSparkParsePartitionUtil(spark
|
||||
.sessionState.conf)
|
||||
// Convert partition path to PartitionRowPath
|
||||
val partitionRowPaths = partitionPaths.map { partitionPath =>
|
||||
val partitionRow = if (partitionSchema.fields.length == 0) {
|
||||
// This is a non-partitioned table
|
||||
InternalRow.empty
|
||||
} else {
|
||||
val partitionFragments = partitionPath.split("/")
|
||||
|
||||
if (partitionFragments.length != partitionSchema.fields.length &&
|
||||
partitionSchema.fields.length == 1) {
|
||||
// If the partition column size is not equal to the partition fragment size
|
||||
// and the partition column size is 1, we map the whole partition path
|
||||
// to the partition column which can benefit from the partition prune.
|
||||
InternalRow.fromSeq(Seq(UTF8String.fromString(partitionPath)))
|
||||
} else if (partitionFragments.length != partitionSchema.fields.length &&
|
||||
partitionSchema.fields.length > 1) {
|
||||
// If the partition column size is not equal to the partition fragments size
|
||||
// and the partition column size > 1, we do not know how to map the partition
|
||||
// fragments to the partition columns. So we trait it as a Non-Partitioned Table
|
||||
// for the query which do not benefit from the partition prune.
|
||||
logWarning( s"Cannot do the partition prune for table $basePath." +
|
||||
s"The partitionFragments size (${partitionFragments.mkString(",")})" +
|
||||
s" is not equal to the partition columns size(${partitionSchema.fields.mkString(",")})")
|
||||
InternalRow.empty
|
||||
} else { // If partitionSeqs.length == partitionSchema.fields.length
|
||||
|
||||
// Append partition name to the partition value if the
|
||||
// HIVE_STYLE_PARTITIONING_OPT_KEY is disable.
|
||||
// e.g. convert "/xx/xx/2021/02" to "/xx/xx/year=2021/month=02"
|
||||
val partitionWithName =
|
||||
partitionFragments.zip(partitionSchema).map {
|
||||
case (partition, field) =>
|
||||
if (partition.indexOf("=") == -1) {
|
||||
s"${field.name}=$partition"
|
||||
} else {
|
||||
partition
|
||||
}
|
||||
}.mkString("/")
|
||||
val pathWithPartitionName = new Path(basePath, partitionWithName)
|
||||
val partitionDataTypes = partitionSchema.fields.map(f => f.name -> f.dataType).toMap
|
||||
val partitionValues = sparkParsePartitionUtil.parsePartition(pathWithPartitionName,
|
||||
typeInference = false, Set(new Path(basePath)), partitionDataTypes,
|
||||
DateTimeUtils.getTimeZone(timeZoneId))
|
||||
|
||||
// Convert partitionValues to InternalRow
|
||||
partitionValues.map(_.literals.map(_.value))
|
||||
.map(InternalRow.fromSeq)
|
||||
.getOrElse(InternalRow.empty)
|
||||
}
|
||||
}
|
||||
PartitionRowPath(partitionRow, partitionPath)
|
||||
}
|
||||
|
||||
// List files in all of the partition path.
|
||||
val pathToFetch = mutable.ArrayBuffer[PartitionRowPath]()
|
||||
val cachePartitionToFiles = mutable.Map[PartitionRowPath, Array[FileStatus]]()
|
||||
// Fetch from the FileStatusCache
|
||||
partitionRowPaths.foreach { partitionRowPath =>
|
||||
fileStatusCache.getLeafFiles(partitionRowPath.fullPartitionPath(basePath)) match {
|
||||
case Some(filesInPartition) =>
|
||||
cachePartitionToFiles.put(partitionRowPath, filesInPartition)
|
||||
|
||||
case None => pathToFetch.append(partitionRowPath)
|
||||
}
|
||||
}
|
||||
// Fetch the rest from the file system.
|
||||
val fetchedPartition2Files =
|
||||
spark.sparkContext.parallelize(pathToFetch, Math.min(pathToFetch.size, maxListParallelism))
|
||||
.map { partitionRowPath =>
|
||||
// Here we use a LocalEngineContext to get the files in the partition.
|
||||
// We can do this because the TableMetadata.getAllFilesInPartition only rely on the
|
||||
// hadoopConf of the EngineContext.
|
||||
val engineContext = new HoodieLocalEngineContext(serializableConf.get())
|
||||
val filesInPartition = FSUtils.getFilesInPartition(engineContext, metadataConfig,
|
||||
basePath, partitionRowPath.fullPartitionPath(basePath))
|
||||
(partitionRowPath, filesInPartition)
|
||||
}.collect().map(f => f._1 -> f._2).toMap
|
||||
|
||||
// Update the fileStatusCache
|
||||
fetchedPartition2Files.foreach {
|
||||
case (partitionRowPath, filesInPartition) =>
|
||||
fileStatusCache.putLeafFiles(partitionRowPath.fullPartitionPath(basePath), filesInPartition)
|
||||
}
|
||||
cachePartitionToFiles.toMap ++ fetchedPartition2Files
|
||||
}
|
||||
|
||||
/**
|
||||
* Represent a partition path.
|
||||
* e.g. PartitionPath(InternalRow("2021","02","01"), "2021/02/01"))
|
||||
* @param values The partition values of this partition path.
|
||||
* @param partitionPath The partition path string.
|
||||
*/
|
||||
case class PartitionRowPath(values: InternalRow, partitionPath: String) {
|
||||
override def equals(other: Any): Boolean = other match {
|
||||
case PartitionRowPath(_, otherPath) => partitionPath == otherPath
|
||||
case _ => false
|
||||
}
|
||||
|
||||
override def hashCode(): Int = {
|
||||
partitionPath.hashCode
|
||||
}
|
||||
|
||||
def fullPartitionPath(basePath: String): Path = {
|
||||
if (partitionPath.isEmpty) {
|
||||
new Path(basePath) // This is a non-partition path
|
||||
} else {
|
||||
new Path(basePath, partitionPath)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -100,6 +100,7 @@ private[hudi] object HoodieSparkSqlWriter {
|
||||
val fs = basePath.getFileSystem(sparkContext.hadoopConfiguration)
|
||||
tableExists = fs.exists(new Path(basePath, HoodieTableMetaClient.METAFOLDER_NAME))
|
||||
var tableConfig = getHoodieTableConfig(sparkContext, path.get, hoodieTableConfigOpt)
|
||||
val keyGenerator = DataSourceUtils.createKeyGenerator(toProperties(parameters))
|
||||
|
||||
if (mode == SaveMode.Ignore && tableExists) {
|
||||
log.warn(s"hoodie table at $basePath already exists. Ignoring & not performing actual writes.")
|
||||
@@ -112,12 +113,15 @@ private[hudi] object HoodieSparkSqlWriter {
|
||||
val archiveLogFolder = parameters.getOrElse(
|
||||
HoodieTableConfig.HOODIE_ARCHIVELOG_FOLDER_PROP_NAME, "archived")
|
||||
|
||||
val partitionColumns = HoodieWriterUtils.getPartitionColumns(keyGenerator)
|
||||
|
||||
val tableMetaClient = HoodieTableMetaClient.withPropertyBuilder()
|
||||
.setTableType(tableType)
|
||||
.setTableName(tblName)
|
||||
.setArchiveLogFolder(archiveLogFolder)
|
||||
.setPayloadClassName(parameters(PAYLOAD_CLASS_OPT_KEY))
|
||||
.setPreCombineField(parameters.getOrDefault(PRECOMBINE_FIELD_OPT_KEY, null))
|
||||
.setPartitionColumns(partitionColumns)
|
||||
.initTable(sparkContext.hadoopConfiguration, path.get)
|
||||
tableConfig = tableMetaClient.getTableConfig
|
||||
}
|
||||
@@ -146,7 +150,6 @@ private[hudi] object HoodieSparkSqlWriter {
|
||||
log.info(s"Registered avro schema : ${schema.toString(true)}")
|
||||
|
||||
// Convert to RDD[HoodieRecord]
|
||||
val keyGenerator = DataSourceUtils.createKeyGenerator(toProperties(parameters))
|
||||
val genericRecords: RDD[GenericRecord] = HoodieSparkUtils.createRdd(df, schema, structName, nameSpace)
|
||||
val shouldCombine = parameters(INSERT_DROP_DUPS_OPT_KEY).toBoolean || operation.equals(WriteOperationType.UPSERT);
|
||||
val hoodieAllIncomingRecords = genericRecords.map(gr => {
|
||||
@@ -193,7 +196,6 @@ private[hudi] object HoodieSparkSqlWriter {
|
||||
classOf[org.apache.avro.Schema]))
|
||||
|
||||
// Convert to RDD[HoodieKey]
|
||||
val keyGenerator = DataSourceUtils.createKeyGenerator(toProperties(parameters))
|
||||
val genericRecords: RDD[GenericRecord] = HoodieSparkUtils.createRdd(df, structName, nameSpace)
|
||||
val hoodieKeysToDelete = genericRecords.map(gr => keyGenerator.getKey(gr)).toJavaRDD()
|
||||
|
||||
@@ -283,6 +285,7 @@ private[hudi] object HoodieSparkSqlWriter {
|
||||
if (!tableExists) {
|
||||
val archiveLogFolder = parameters.getOrElse(
|
||||
HoodieTableConfig.HOODIE_ARCHIVELOG_FOLDER_PROP_NAME, "archived")
|
||||
val partitionColumns = HoodieWriterUtils.getPartitionColumns(parameters)
|
||||
HoodieTableMetaClient.withPropertyBuilder()
|
||||
.setTableType(HoodieTableType.valueOf(tableType))
|
||||
.setTableName(tableName)
|
||||
@@ -291,6 +294,7 @@ private[hudi] object HoodieSparkSqlWriter {
|
||||
.setPreCombineField(parameters.getOrDefault(PRECOMBINE_FIELD_OPT_KEY, null))
|
||||
.setBootstrapIndexClass(bootstrapIndexClass)
|
||||
.setBootstrapBasePath(bootstrapBasePath)
|
||||
.setPartitionColumns(partitionColumns)
|
||||
.initTable(sparkContext.hadoopConfiguration, path)
|
||||
}
|
||||
|
||||
|
||||
@@ -28,7 +28,8 @@ import org.apache.spark.rdd.RDD
|
||||
import org.apache.spark.sql.{DataFrame, Row, SparkSession}
|
||||
import org.apache.spark.sql.avro.SchemaConverters
|
||||
import org.apache.spark.sql.catalyst.encoders.{ExpressionEncoder, RowEncoder}
|
||||
import org.apache.spark.sql.execution.datasources.{FileStatusCache, InMemoryFileIndex}
|
||||
import org.apache.spark.sql.execution.datasources.{FileStatusCache, InMemoryFileIndex, Spark2ParsePartitionUtil, Spark3ParsePartitionUtil, SparkParsePartitionUtil}
|
||||
import org.apache.spark.sql.internal.SQLConf
|
||||
import org.apache.spark.sql.types.{StringType, StructField, StructType}
|
||||
|
||||
import scala.collection.JavaConverters._
|
||||
@@ -118,4 +119,13 @@ object HoodieSparkUtils {
|
||||
new Spark3RowSerDe(encoder)
|
||||
}
|
||||
}
|
||||
|
||||
def createSparkParsePartitionUtil(conf: SQLConf): SparkParsePartitionUtil = {
|
||||
// TODO remove Spark2RowSerDe if Spark 2.x support is dropped
|
||||
if (SPARK_VERSION.startsWith("2.")) {
|
||||
new Spark2ParsePartitionUtil
|
||||
} else {
|
||||
new Spark3ParsePartitionUtil(conf)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -17,16 +17,17 @@
|
||||
|
||||
package org.apache.hudi
|
||||
|
||||
import scala.collection.JavaConverters._
|
||||
import org.apache.hudi.DataSourceWriteOptions._
|
||||
import org.apache.hudi.common.config.TypedProperties
|
||||
|
||||
import scala.collection.JavaConversions.mapAsJavaMap
|
||||
import scala.collection.JavaConverters.mapAsScalaMapConverter
|
||||
|
||||
import org.apache.hudi.common.config.HoodieMetadataConfig.DEFAULT_METADATA_ENABLE
|
||||
import org.apache.hudi.common.config.HoodieMetadataConfig.DEFAULT_METADATA_VALIDATE
|
||||
import org.apache.hudi.common.config.HoodieMetadataConfig.METADATA_ENABLE_PROP
|
||||
import org.apache.hudi.common.config.HoodieMetadataConfig.METADATA_VALIDATE_PROP
|
||||
import org.apache.hudi.keygen.{BaseKeyGenerator, CustomAvroKeyGenerator, CustomKeyGenerator, KeyGenerator}
|
||||
|
||||
/**
|
||||
* WriterUtils to assist in write path in Datasource and tests.
|
||||
@@ -81,4 +82,32 @@ object HoodieWriterUtils {
|
||||
params.foreach(kv => props.setProperty(kv._1, kv._2))
|
||||
props
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the partition columns to stored to hoodie.properties.
|
||||
* @param parameters
|
||||
* @return
|
||||
*/
|
||||
def getPartitionColumns(parameters: Map[String, String]): String = {
|
||||
val props = new TypedProperties()
|
||||
props.putAll(parameters.asJava)
|
||||
val keyGen = DataSourceUtils.createKeyGenerator(props)
|
||||
getPartitionColumns(keyGen)
|
||||
}
|
||||
|
||||
def getPartitionColumns(keyGen: KeyGenerator): String = {
|
||||
keyGen match {
|
||||
// For CustomKeyGenerator and CustomAvroKeyGenerator, the partition path filed format
|
||||
// is: "field_name: field_type", we extract the field_name from the partition path field.
|
||||
case c: BaseKeyGenerator
|
||||
if c.isInstanceOf[CustomKeyGenerator] || c.isInstanceOf[CustomAvroKeyGenerator] =>
|
||||
c.getPartitionPathFields.asScala.map(pathField =>
|
||||
pathField.split(CustomAvroKeyGenerator.SPLIT_REGEX)
|
||||
.headOption.getOrElse(s"Illegal partition path field format: '$pathField' for ${c.getClass.getSimpleName}"))
|
||||
.mkString(",")
|
||||
|
||||
case b: BaseKeyGenerator => b.getPartitionPathFields.asScala.mkString(",")
|
||||
case _=> null
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -201,7 +201,8 @@ class MergeOnReadIncrementalRelation(val sqlContext: SQLContext,
|
||||
val baseFiles = f.getAllFileSlices.iterator().filter(slice => slice.getBaseFile.isPresent).toList
|
||||
val partitionedFile = if (baseFiles.nonEmpty) {
|
||||
val baseFile = baseFiles.head.getBaseFile
|
||||
Option(PartitionedFile(InternalRow.empty, baseFile.get.getPath, 0, baseFile.get.getFileLen))
|
||||
val filePath = MergeOnReadSnapshotRelation.getFilePath(baseFile.get.getFileStatus.getPath)
|
||||
Option(PartitionedFile(InternalRow.empty, filePath, 0, baseFile.get.getFileLen))
|
||||
}
|
||||
else {
|
||||
Option.empty
|
||||
|
||||
@@ -21,7 +21,6 @@ package org.apache.hudi
|
||||
import org.apache.hudi.common.model.HoodieBaseFile
|
||||
import org.apache.hudi.common.table.{HoodieTableMetaClient, TableSchemaResolver}
|
||||
import org.apache.hudi.common.table.view.HoodieTableFileSystemView
|
||||
import org.apache.hudi.exception.HoodieException
|
||||
import org.apache.hudi.hadoop.utils.HoodieRealtimeInputFormatUtils
|
||||
import org.apache.hudi.hadoop.utils.HoodieRealtimeRecordReaderUtils.getMaxCompactionMemoryInBytes
|
||||
import org.apache.hadoop.fs.Path
|
||||
@@ -29,7 +28,7 @@ import org.apache.hadoop.mapred.JobConf
|
||||
import org.apache.spark.internal.Logging
|
||||
import org.apache.spark.rdd.RDD
|
||||
import org.apache.spark.sql.catalyst.InternalRow
|
||||
import org.apache.spark.sql.execution.datasources.PartitionedFile
|
||||
import org.apache.spark.sql.execution.datasources.{FileStatusCache, PartitionedFile}
|
||||
import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat
|
||||
import org.apache.spark.sql.{Row, SQLContext}
|
||||
import org.apache.spark.sql.sources.{BaseRelation, Filter, PrunedFilteredScan}
|
||||
@@ -54,7 +53,7 @@ case class HoodieMergeOnReadTableState(tableStructSchema: StructType,
|
||||
class MergeOnReadSnapshotRelation(val sqlContext: SQLContext,
|
||||
val optParams: Map[String, String],
|
||||
val userSchema: StructType,
|
||||
val globPaths: Seq[Path],
|
||||
val globPaths: Option[Seq[Path]],
|
||||
val metaClient: HoodieTableMetaClient)
|
||||
extends BaseRelation with PrunedFilteredScan with Logging {
|
||||
|
||||
@@ -133,25 +132,54 @@ class MergeOnReadSnapshotRelation(val sqlContext: SQLContext,
|
||||
}
|
||||
|
||||
def buildFileIndex(): List[HoodieMergeOnReadFileSplit] = {
|
||||
val inMemoryFileIndex = HoodieSparkUtils.createInMemoryFileIndex(sqlContext.sparkSession, globPaths)
|
||||
val fileStatuses = inMemoryFileIndex.allFiles()
|
||||
if (fileStatuses.isEmpty) {
|
||||
throw new HoodieException("No files found for reading in user provided path.")
|
||||
val fileStatuses = if (globPaths.isDefined) {
|
||||
// Load files from the global paths if it has defined to be compatible with the original mode
|
||||
val inMemoryFileIndex = HoodieSparkUtils.createInMemoryFileIndex(sqlContext.sparkSession, globPaths.get)
|
||||
inMemoryFileIndex.allFiles()
|
||||
} else { // Load files by the HoodieFileIndex.
|
||||
val hoodieFileIndex = HoodieFileIndex(sqlContext.sparkSession, metaClient,
|
||||
Some(tableStructSchema), optParams, FileStatusCache.getOrCreate(sqlContext.sparkSession))
|
||||
hoodieFileIndex.allFiles
|
||||
}
|
||||
|
||||
val fsView = new HoodieTableFileSystemView(metaClient,
|
||||
metaClient.getActiveTimeline.getCommitsTimeline
|
||||
.filterCompletedInstants, fileStatuses.toArray)
|
||||
val latestFiles: List[HoodieBaseFile] = fsView.getLatestBaseFiles.iterator().asScala.toList
|
||||
val latestCommit = fsView.getLastInstant.get().getTimestamp
|
||||
val fileGroup = HoodieRealtimeInputFormatUtils.groupLogsByBaseFile(conf, latestFiles.asJava).asScala
|
||||
val fileSplits = fileGroup.map(kv => {
|
||||
val baseFile = kv._1
|
||||
val logPaths = if (kv._2.isEmpty) Option.empty else Option(kv._2.asScala.toList)
|
||||
val partitionedFile = PartitionedFile(InternalRow.empty, baseFile.getPath, 0, baseFile.getFileLen)
|
||||
HoodieMergeOnReadFileSplit(Option(partitionedFile), logPaths, latestCommit,
|
||||
metaClient.getBasePath, maxCompactionMemoryInBytes, mergeType)
|
||||
}).toList
|
||||
fileSplits
|
||||
if (fileStatuses.isEmpty) { // If this an empty table, return an empty split list.
|
||||
List.empty[HoodieMergeOnReadFileSplit]
|
||||
} else {
|
||||
val fsView = new HoodieTableFileSystemView(metaClient,
|
||||
metaClient.getActiveTimeline.getCommitsTimeline
|
||||
.filterCompletedInstants, fileStatuses.toArray)
|
||||
val latestFiles: List[HoodieBaseFile] = fsView.getLatestBaseFiles.iterator().asScala.toList
|
||||
val latestCommit = fsView.getLastInstant.get().getTimestamp
|
||||
val fileGroup = HoodieRealtimeInputFormatUtils.groupLogsByBaseFile(conf, latestFiles.asJava).asScala
|
||||
val fileSplits = fileGroup.map(kv => {
|
||||
val baseFile = kv._1
|
||||
val logPaths = if (kv._2.isEmpty) Option.empty else Option(kv._2.asScala.toList)
|
||||
|
||||
val filePath = MergeOnReadSnapshotRelation.getFilePath(baseFile.getFileStatus.getPath)
|
||||
val partitionedFile = PartitionedFile(InternalRow.empty, filePath, 0, baseFile.getFileLen)
|
||||
HoodieMergeOnReadFileSplit(Option(partitionedFile), logPaths, latestCommit,
|
||||
metaClient.getBasePath, maxCompactionMemoryInBytes, mergeType)
|
||||
}).toList
|
||||
fileSplits
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
object MergeOnReadSnapshotRelation {
|
||||
|
||||
def getFilePath(path: Path): String = {
|
||||
// Here we use the Path#toUri to encode the path string, as there is a decode in
|
||||
// ParquetFileFormat#buildReaderWithPartitionValues in the spark project when read the table
|
||||
// .So we should encode the file path here. Otherwise, there is a FileNotException throw
|
||||
// out.
|
||||
// For example, If the "pt" is the partition path field, and "pt" = "2021/02/02", If
|
||||
// we enable the URL_ENCODE_PARTITIONING_OPT_KEY and write data to hudi table.The data
|
||||
// path in the table will just like "/basePath/2021%2F02%2F02/xxxx.parquet". When we read
|
||||
// data from the table, if there are no encode for the file path,
|
||||
// ParquetFileFormat#buildReaderWithPartitionValues will decode it to
|
||||
// "/basePath/2021/02/02/xxxx.parquet" witch will result to a FileNotException.
|
||||
// See FileSourceScanExec#createBucketedReadRDD in spark project which do the same thing
|
||||
// when create PartitionedFile.
|
||||
path.toUri.toString
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user