1
0

[HUDI-3179] Extracted common AbstractHoodieTableFileIndex to be shared across engines (#4520)

This commit is contained in:
Alexey Kudinkin
2022-01-16 22:46:20 -08:00
committed by GitHub
parent ed92c217ed
commit 75caa7d3d8
10 changed files with 732 additions and 520 deletions

View File

@@ -34,12 +34,12 @@ public class FileSlice implements Serializable {
/**
* File Group Id of the Slice.
*/
private HoodieFileGroupId fileGroupId;
private final HoodieFileGroupId fileGroupId;
/**
* Point in the timeline, at which the slice was created.
*/
private String baseInstantTime;
private final String baseInstantTime;
/**
* data file, with the compacted data, for this slice.

View File

@@ -421,8 +421,8 @@ public class HoodieInputFormatUtils {
}
public static Map<HoodieTableMetaClient, List<Path>> groupSnapshotPathsByMetaClient(
Collection<HoodieTableMetaClient> metaClientList,
List<Path> snapshotPaths
Collection<HoodieTableMetaClient> metaClientList,
List<Path> snapshotPaths
) {
Map<HoodieTableMetaClient, List<Path>> grouped = new HashMap<>();
metaClientList.forEach(metaClient -> grouped.put(metaClient, new ArrayList<>()));
@@ -445,9 +445,11 @@ public class HoodieInputFormatUtils {
HoodieLocalEngineContext engineContext = new HoodieLocalEngineContext(job);
List<FileStatus> returns = new ArrayList<>();
Map<HoodieTableMetaClient, List<Path>> groupedPaths = HoodieInputFormatUtils
.groupSnapshotPathsByMetaClient(tableMetaClientMap.values(), snapshotPaths);
Map<HoodieTableMetaClient, List<Path>> groupedPaths =
HoodieInputFormatUtils.groupSnapshotPathsByMetaClient(tableMetaClientMap.values(), snapshotPaths);
Map<HoodieTableMetaClient, HoodieTableFileSystemView> fsViewCache = new HashMap<>();
LOG.info("Found a total of " + groupedPaths.size() + " groups");
try {

View File

@@ -0,0 +1,289 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi
import org.apache.hadoop.fs.{FileStatus, Path}
import org.apache.hudi.DataSourceReadOptions.{QUERY_TYPE, QUERY_TYPE_SNAPSHOT_OPT_VAL}
import org.apache.hudi.common.config.{HoodieMetadataConfig, TypedProperties}
import org.apache.hudi.common.engine.HoodieEngineContext
import org.apache.hudi.common.fs.FSUtils
import org.apache.hudi.common.model.FileSlice
import org.apache.hudi.common.model.HoodieTableType.MERGE_ON_READ
import org.apache.hudi.common.table.HoodieTableMetaClient
import org.apache.hudi.common.table.view.{FileSystemViewStorageConfig, HoodieTableFileSystemView}
import scala.collection.JavaConversions._
import scala.collection.JavaConverters._
import scala.collection.mutable
/**
* Common (engine-agnostic) File Index implementation enabling individual query engines to
* list Hudi Table contents based on the
*
* <ul>
* <li>Table type (MOR, COW)</li>
* <li>Query type (snapshot, read_optimized, incremental)</li>
* <li>Query instant/range</li>
* </ul>
*
* @param engineContext Hudi engine-specific context
* @param metaClient Hudi table's meta-client
* @param configProperties unifying configuration (in the form of generic properties)
* @param queryType target query type
* @param queryPaths target DFS paths being queried
* @param specifiedQueryInstant instant as of which table is being queried
* @param shouldIncludePendingCommits flags whether file-index should exclude any pending operations
* @param fileStatusCache transient cache of fetched [[FileStatus]]es
*/
abstract class AbstractHoodieTableFileIndex(engineContext: HoodieEngineContext,
metaClient: HoodieTableMetaClient,
configProperties: TypedProperties,
specifiedQueryInstant: Option[String] = None,
@transient fileStatusCache: FileStatusCacheTrait) {
/**
* Get all completeCommits.
*/
lazy val completedCommits = metaClient.getCommitsTimeline
.filterCompletedInstants().getInstants.iterator().toList.map(_.getTimestamp)
private lazy val _partitionColumns: Array[String] =
metaClient.getTableConfig.getPartitionFields.orElse(Array[String]())
private lazy val fileSystemStorageConfig = FileSystemViewStorageConfig.newBuilder()
.fromProperties(configProperties)
.build()
private lazy val metadataConfig = HoodieMetadataConfig.newBuilder
.fromProperties(configProperties)
.build()
protected val basePath: String = metaClient.getBasePath
private val queryType = configProperties(QUERY_TYPE.key())
private val tableType = metaClient.getTableType
@transient private val queryPath = new Path(configProperties.getOrElse("path", "'path' option required"))
@transient
@volatile protected var cachedFileSize: Long = 0L
@transient
@volatile protected var cachedAllInputFileSlices: Map[PartitionPath, Seq[FileSlice]] = _
@volatile protected var queryAsNonePartitionedTable: Boolean = _
@transient
@volatile private var fileSystemView: HoodieTableFileSystemView = _
refresh0()
/**
* Fetch list of latest base files and log files per partition.
*
* @return mapping from string partition paths to its base/log files
*/
def listFileSlices(): Map[String, Seq[FileSlice]] = {
if (queryAsNonePartitionedTable) {
// Read as Non-Partitioned table.
cachedAllInputFileSlices.map(entry => (entry._1.path, entry._2))
} else {
cachedAllInputFileSlices.keys.toSeq.map(partition => {
(partition.path, cachedAllInputFileSlices(partition))
}).toMap
}
}
private def refresh0(): Unit = {
val startTime = System.currentTimeMillis()
val partitionFiles = loadPartitionPathFiles()
val allFiles = partitionFiles.values.reduceOption(_ ++ _)
.getOrElse(Array.empty[FileStatus])
metaClient.reloadActiveTimeline()
val activeInstants = metaClient.getActiveTimeline.getCommitsTimeline.filterCompletedInstants
val latestInstant = activeInstants.lastInstant()
// TODO we can optimize the flow by:
// - First fetch list of files from instants of interest
// - Load FileStatus's
fileSystemView = new HoodieTableFileSystemView(metaClient, activeInstants, allFiles)
val queryInstant = if (specifiedQueryInstant.isDefined) {
specifiedQueryInstant
} else if (latestInstant.isPresent) {
Some(latestInstant.get.getTimestamp)
} else {
None
}
(tableType, queryType) match {
case (MERGE_ON_READ, QUERY_TYPE_SNAPSHOT_OPT_VAL) =>
// Fetch and store latest base and log files, and their sizes
cachedAllInputFileSlices = partitionFiles.map(p => {
val latestSlices = if (queryInstant.isDefined) {
fileSystemView.getLatestMergedFileSlicesBeforeOrOn(p._1.path, queryInstant.get)
.iterator().asScala.toSeq
} else {
Seq()
}
(p._1, latestSlices)
})
cachedFileSize = cachedAllInputFileSlices.values.flatten.map(fileSlice => {
if (fileSlice.getBaseFile.isPresent) {
fileSlice.getBaseFile.get().getFileLen + fileSlice.getLogFiles.iterator().asScala.map(_.getFileSize).sum
} else {
fileSlice.getLogFiles.iterator().asScala.map(_.getFileSize).sum
}
}).sum
case (_, _) =>
// Fetch and store latest base files and its sizes
cachedAllInputFileSlices = partitionFiles.map(p => {
val fileSlices = specifiedQueryInstant
.map(instant =>
fileSystemView.getLatestFileSlicesBeforeOrOn(p._1.path, instant, true))
.getOrElse(fileSystemView.getLatestFileSlices(p._1.path))
.iterator().asScala.toSeq
(p._1, fileSlices)
})
cachedFileSize = cachedAllInputFileSlices.values.flatten.map(fileSliceSize).sum
}
// If the partition value contains InternalRow.empty, we query it as a non-partitioned table.
queryAsNonePartitionedTable = partitionFiles.keys.exists(p => p.values.isEmpty)
val flushSpend = System.currentTimeMillis() - startTime
logInfo(s"Refresh table ${metaClient.getTableConfig.getTableName}," +
s" spend: $flushSpend ms")
}
protected def refresh(): Unit = {
fileStatusCache.invalidate()
refresh0()
}
private def fileSliceSize(fileSlice: FileSlice): Long = {
val logFileSize = fileSlice.getLogFiles.iterator().asScala.map(_.getFileSize).filter(_ > 0).sum
if (fileSlice.getBaseFile.isPresent) {
fileSlice.getBaseFile.get().getFileLen + logFileSize
} else {
logFileSize
}
}
/**
* Load all partition paths and it's files under the query table path.
*/
private def loadPartitionPathFiles(): Map[PartitionPath, Array[FileStatus]] = {
val partitionPaths = getAllQueryPartitionPaths
// List files in all of the partition path.
val pathToFetch = mutable.ArrayBuffer[PartitionPath]()
val cachePartitionToFiles = mutable.Map[PartitionPath, Array[FileStatus]]()
// Fetch from the FileStatusCache
partitionPaths.foreach { partitionPath =>
fileStatusCache.get(partitionPath.fullPartitionPath(basePath)) match {
case Some(filesInPartition) =>
cachePartitionToFiles.put(partitionPath, filesInPartition)
case None => pathToFetch.append(partitionPath)
}
}
val fetchedPartitionToFiles =
if (pathToFetch.nonEmpty) {
val fullPartitionPathsToFetch = pathToFetch.map(p => (p, p.fullPartitionPath(basePath).toString)).toMap
val partitionToFilesMap = FSUtils.getFilesInPartitions(engineContext, metadataConfig, basePath,
fullPartitionPathsToFetch.values.toArray, fileSystemStorageConfig.getSpillableDir)
fullPartitionPathsToFetch.map(p => {
(p._1, partitionToFilesMap.get(p._2))
})
} else {
Map.empty[PartitionPath, Array[FileStatus]]
}
// Update the fileStatusCache
fetchedPartitionToFiles.foreach {
case (partitionRowPath, filesInPartition) =>
fileStatusCache.put(partitionRowPath.fullPartitionPath(basePath), filesInPartition)
}
cachePartitionToFiles.toMap ++ fetchedPartitionToFiles
}
def getAllQueryPartitionPaths: Seq[PartitionPath] = {
val queryPartitionPath = FSUtils.getRelativePartitionPath(new Path(basePath), queryPath)
// Load all the partition path from the basePath, and filter by the query partition path.
// TODO load files from the queryPartitionPath directly.
val partitionPaths = FSUtils.getAllPartitionPaths(engineContext, metadataConfig, basePath).asScala
.filter(_.startsWith(queryPartitionPath))
val partitionSchema = _partitionColumns
// Convert partition's path into partition descriptor
partitionPaths.map { partitionPath =>
val partitionColumnValues = parsePartitionColumnValues(partitionSchema, partitionPath)
PartitionPath(partitionPath, partitionColumnValues)
}
}
/**
* Parses partition columns' values from the provided partition's path, returning list of
* values (that might have engine-specific representation)
*
* @param partitionColumns partitioning columns identifying the partition
* @param partitionPath partition's path to parse partitioning columns' values from
*/
protected def parsePartitionColumnValues(partitionColumns: Array[String], partitionPath: String): Array[Any]
// TODO eval whether we should just use logger directly
protected def logWarning(str: => String): Unit
protected def logInfo(str: => String): Unit
/**
* Represents a partition as a tuple of
* <ul>
* <li>Actual partition path (relative to the table's base path)</li>
* <li>Values of the corresponding columns table is being partitioned by (partitioning columns)</li>
* </ul>
*
* E.g. PartitionPath("2021/02/01", Array("2021","02","01"))
*
* NOTE: Partitioning column values might have engine specific representation (for ex,
* {@code UTF8String} for Spark, etc) and are solely used in partition pruning in an very
* engine-specific ways
*
* @param values values of the corresponding partitioning columns
* @param path partition's path
*
* TODO expose as a trait and make impls engine-specific (current impl is tailored for Spark)
*/
case class PartitionPath(path: String, values: Array[Any]) {
override def equals(other: Any): Boolean = other match {
case PartitionPath(otherPath, _) => path == otherPath
case _ => false
}
override def hashCode(): Int = {
path.hashCode
}
def fullPartitionPath(basePath: String): Path = {
if (path.isEmpty) {
new Path(basePath) // This is a non-partition path
} else {
new Path(basePath, path)
}
}
}
}
trait FileStatusCacheTrait {
def get(path: Path): Option[Array[FileStatus]]
def put(path: Path, leafFiles: Array[FileStatus]): Unit
def invalidate(): Unit
}

View File

@@ -193,7 +193,6 @@ class DefaultSource extends RelationProvider
}
if (useHoodieFileIndex) {
val fileIndex = HoodieFileIndex(sqlContext.sparkSession, metaClient,
if (schema == null) Option.empty[StructType] else Some(schema),
optParams, FileStatusCache.getOrCreate(sqlContext.sparkSession))

View File

@@ -18,34 +18,19 @@
package org.apache.hudi
import org.apache.hadoop.fs.{FileStatus, Path}
import org.apache.hudi.DataSourceReadOptions.{QUERY_TYPE, QUERY_TYPE_SNAPSHOT_OPT_VAL}
import org.apache.hudi.client.common.HoodieSparkEngineContext
import org.apache.hudi.common.config.HoodieMetadataConfig
import org.apache.hudi.common.fs.FSUtils
import org.apache.hudi.common.model.FileSlice
import org.apache.hudi.common.model.HoodieTableType.MERGE_ON_READ
import org.apache.hudi.common.table.view.{FileSystemViewStorageConfig, HoodieTableFileSystemView}
import org.apache.hudi.common.table.{HoodieTableMetaClient, TableSchemaResolver}
import org.apache.hudi.keygen.{TimestampBasedAvroKeyGenerator, TimestampBasedKeyGenerator}
import org.apache.spark.api.java.JavaSparkContext
import org.apache.spark.internal.Logging
import org.apache.spark.sql.catalyst.expressions.{And, AttributeReference, BoundReference, Expression, InterpretedPredicate}
import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, DateTimeUtils}
import org.apache.spark.sql.catalyst.{InternalRow, expressions}
import org.apache.hudi.HoodieFileIndex.getConfigProperties
import org.apache.hudi.common.config.{HoodieMetadataConfig, TypedProperties}
import org.apache.hudi.common.table.HoodieTableMetaClient
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.{And, Expression}
import org.apache.spark.sql.execution.datasources.{FileIndex, FileStatusCache, NoopCache, PartitionDirectory}
import org.apache.spark.sql.hudi.DataSkippingUtils.createColumnStatsIndexFilterExpr
import org.apache.spark.sql.hudi.HoodieSqlCommonUtils
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types.{StringType, StructField, StructType}
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.{AnalysisException, Column, SparkSession}
import org.apache.spark.unsafe.types.UTF8String
import java.util.Properties
import scala.collection.JavaConversions._
import scala.collection.JavaConverters._
import scala.collection.mutable
import scala.util.{Failure, Success, Try}
/**
@@ -69,117 +54,25 @@ import scala.util.{Failure, Success, Try}
* , we read it as a Non-Partitioned table because we cannot know how to mapping the partition
* path with the partition columns in this case.
*
* TODO rename to HoodieSparkSqlFileIndex
*/
case class HoodieFileIndex(
spark: SparkSession,
metaClient: HoodieTableMetaClient,
schemaSpec: Option[StructType],
options: Map[String, String],
@transient fileStatusCache: FileStatusCache = NoopCache)
extends FileIndex with Logging with SparkAdapterSupport {
private val basePath = metaClient.getBasePath
case class HoodieFileIndex(spark: SparkSession,
metaClient: HoodieTableMetaClient,
schemaSpec: Option[StructType],
options: Map[String, String],
@transient fileStatusCache: FileStatusCache = NoopCache)
extends SparkHoodieTableFileIndex(
spark = spark,
metaClient = metaClient,
schemaSpec = schemaSpec,
configProperties = getConfigProperties(spark, options),
specifiedQueryInstant = options.get(DataSourceReadOptions.TIME_TRAVEL_AS_OF_INSTANT.key).map(HoodieSqlCommonUtils.formatQueryInstant),
fileStatusCache = fileStatusCache
)
with FileIndex {
@transient private val queryPath = new Path(options.getOrElse("path", "'path' option required"))
private val queryType = options(QUERY_TYPE.key())
private val tableType = metaClient.getTableType
private val specifiedQueryInstant = options.get(DataSourceReadOptions.TIME_TRAVEL_AS_OF_INSTANT.key)
.map(HoodieSqlCommonUtils.formatQueryInstant)
/**
* Get all completeCommits.
*/
lazy val completedCommits = metaClient.getCommitsTimeline
.filterCompletedInstants().getInstants.iterator().toList.map(_.getTimestamp)
/**
* Get the schema of the table.
*/
lazy val schema: StructType = schemaSpec.getOrElse({
val schemaUtil = new TableSchemaResolver(metaClient)
AvroConversionUtils.convertAvroSchemaToStructType(schemaUtil.getTableAvroSchema)
})
/**
* Get the partition schema from the hoodie.properties.
*/
private lazy val _partitionSchemaFromProperties: StructType = {
val tableConfig = metaClient.getTableConfig
val partitionColumns = tableConfig.getPartitionFields
val nameFieldMap = generateNameFieldMap(Right(schema))
if (partitionColumns.isPresent) {
if (tableConfig.getKeyGeneratorClassName.equalsIgnoreCase(classOf[TimestampBasedKeyGenerator].getName)
|| tableConfig.getKeyGeneratorClassName.equalsIgnoreCase(classOf[TimestampBasedAvroKeyGenerator].getName)) {
val partitionFields = partitionColumns.get().map(column => StructField(column, StringType))
StructType(partitionFields)
} else {
val partitionFields = partitionColumns.get().map(column =>
nameFieldMap.getOrElse(column, throw new IllegalArgumentException(s"Cannot find column: '" +
s"$column' in the schema[${schema.fields.mkString(",")}]")))
StructType(partitionFields)
}
} else { // If the partition columns have not stored in hoodie.properties(the table that was
// created earlier), we trait it as a non-partitioned table.
logWarning("No partition columns available from hoodie.properties." +
" Partition pruning will not work")
new StructType()
}
}
/**
* This method traverses StructType recursively to build map of columnName -> StructField
* Note : If there is nesting of columns like ["a.b.c.d", "a.b.c.e"] -> final map will have keys corresponding
* only to ["a.b.c.d", "a.b.c.e"] and not for subsets like ["a.b.c", "a.b"]
* @param structField
* @return map of ( columns names -> StructField )
*/
private def generateNameFieldMap(structField: Either[StructField, StructType]) : Map[String, StructField] = {
structField match {
case Right(field) => field.fields.map(f => generateNameFieldMap(Left(f))).flatten.toMap
case Left(field) => field.dataType match {
case struct: StructType => generateNameFieldMap(Right(struct)).map {
case (key: String, sf: StructField) => (field.name + "." + key, sf)
}
case _ => Map(field.name -> field)
}
}
}
private lazy val engineContext = new HoodieSparkEngineContext(new JavaSparkContext(spark.sparkContext))
private lazy val configProperties = {
val sqlConf: SQLConf = spark.sessionState.conf
val properties = new Properties()
// To support metadata listing via Spark SQL we allow users to pass the config via SQL Conf in spark session. Users
// would be able to run SET hoodie.metadata.enable=true in the spark sql session to enable metadata listing.
properties.setProperty(HoodieMetadataConfig.ENABLE.key(),
sqlConf.getConfString(HoodieMetadataConfig.ENABLE.key(),
HoodieMetadataConfig.DEFAULT_METADATA_ENABLE_FOR_READERS.toString))
properties.putAll(options.asJava)
properties
}
private lazy val fileSystemStorageConfig = FileSystemViewStorageConfig.newBuilder()
.fromProperties(configProperties)
.build()
private lazy val metadataConfig = HoodieMetadataConfig.newBuilder
.fromProperties(configProperties)
.build()
@transient @volatile private var fileSystemView: HoodieTableFileSystemView = _
@transient @volatile private var cachedAllInputFileSlices: Map[PartitionRowPath, Seq[FileSlice]] = _
@transient @volatile private var cachedFileSize: Long = 0L
@volatile private var queryAsNonePartitionedTable: Boolean = _
refresh0()
override def rootPaths: Seq[Path] = queryPath :: Nil
def enableDataSkipping(): Boolean = {
@@ -187,14 +80,107 @@ case class HoodieFileIndex(
spark.sessionState.conf.getConfString(DataSourceReadOptions.ENABLE_DATA_SKIPPING.key(), "false")).toBoolean
}
/**
* Returns the FileStatus for all the base files (excluding log files). This should be used only for
* cases where Spark directly fetches the list of files via HoodieFileIndex or for read optimized query logic
* implemented internally within Hudi like HoodieBootstrapRelation. This helps avoid the use of path filter
* to filter out log files within Spark.
*
* @return List of FileStatus for base files
*/
def allFiles: Seq[FileStatus] = {
cachedAllInputFileSlices.values.flatten
.filter(_.getBaseFile.isPresent)
.map(_.getBaseFile.get().getFileStatus)
.toSeq
}
/**
* Invoked by Spark to fetch list of latest base files per partition.
*
* @param partitionFilters partition column filters
* @param dataFilters data columns filters
* @return list of PartitionDirectory containing partition to base files mapping
*/
override def listFiles(partitionFilters: Seq[Expression],
dataFilters: Seq[Expression]): Seq[PartitionDirectory] = {
// Look up candidate files names in the col-stats index, if all of the following conditions are true
// - Data-skipping is enabled
// - Col-Stats Index is present
// - List of predicates (filters) is present
val candidateFilesNamesOpt: Option[Set[String]] =
lookupCandidateFilesInColStatsIndex(dataFilters) match {
case Success(opt) => opt
case Failure(e) =>
if (e.isInstanceOf[AnalysisException]) {
logDebug("Failed to relay provided data filters to Z-index lookup", e)
} else {
logError("Failed to lookup candidate files in Z-index", e)
}
Option.empty
}
logDebug(s"Overlapping candidate files (from Z-index): ${candidateFilesNamesOpt.getOrElse(Set.empty)}")
if (queryAsNonePartitionedTable) {
// Read as Non-Partitioned table
// Filter in candidate files based on the col-stats index lookup
val candidateFiles =
allFiles.filter(fileStatus =>
// NOTE: This predicate is true when {@code Option} is empty
candidateFilesNamesOpt.forall(_.contains(fileStatus.getPath.getName))
)
logInfo(s"Total files : ${allFiles.size}; " +
s"candidate files after data skipping: ${candidateFiles.size}; " +
s"skipping percent ${if (allFiles.nonEmpty) (allFiles.size - candidateFiles.size) / allFiles.size.toDouble else 0}")
Seq(PartitionDirectory(InternalRow.empty, candidateFiles))
} else {
// Prune the partition path by the partition filters
val prunedPartitions = prunePartition(cachedAllInputFileSlices.keys.toSeq, partitionFilters)
var totalFileSize = 0
var candidateFileSize = 0
val result = prunedPartitions.map { partition =>
val baseFileStatuses: Seq[FileStatus] =
cachedAllInputFileSlices(partition)
.map(fs => fs.getBaseFile.orElse(null))
.filter(_ != null)
.map(_.getFileStatus)
// Filter in candidate files based on the col-stats index lookup
val candidateFiles =
baseFileStatuses.filter(fs =>
// NOTE: This predicate is true when {@code Option} is empty
candidateFilesNamesOpt.forall(_.contains(fs.getPath.getName)))
totalFileSize += baseFileStatuses.size
candidateFileSize += candidateFiles.size
PartitionDirectory(InternalRow.fromSeq(partition.values), candidateFiles)
}
logInfo(s"Total base files: ${totalFileSize}; " +
s"candidate files after data skipping : ${candidateFileSize}; " +
s"skipping percent ${if (allFiles.nonEmpty) (totalFileSize - candidateFileSize) / totalFileSize.toDouble else 0}")
result
}
}
private def lookupFileNamesMissingFromIndex(allIndexedFileNames: Set[String]) = {
val allBaseFileNames = allFiles.map(f => f.getPath.getName).toSet
allBaseFileNames -- allIndexedFileNames
}
/**
* Computes pruned list of candidate base-files' names based on provided list of {@link dataFilters}
* conditions, by leveraging custom Column Statistics index (col-stats-index) bearing "min", "max",
* "num_nulls" statistics for all clustered columns.
*
* NOTE: This method has to return complete set of candidate files, since only provided candidates will
* ultimately be scanned as part of query execution. Hence, this method has to maintain the
* invariant of conservatively including every base-file's name, that is NOT referenced in its index.
* ultimately be scanned as part of query execution. Hence, this method has to maintain the
* invariant of conservatively including every base-file's name, that is NOT referenced in its index.
*
* @param queryFilters list of original data filters passed down from querying engine
* @return list of pruned (data-skipped) candidate base-files' names
@@ -264,390 +250,36 @@ case class HoodieFileIndex(
// files and all outstanding base-files, and make sure that all base files not
// represented w/in the index are included in the output of this method
val notIndexedFileNames =
lookupFileNamesMissingFromIndex(allIndexedFileNames)
lookupFileNamesMissingFromIndex(allIndexedFileNames)
prunedCandidateFileNames ++ notIndexedFileNames
})
}
private def lookupFileNamesMissingFromIndex(allIndexedFileNames: Set[String]) = {
val allBaseFileNames = allFiles.map(f => f.getPath.getName).toSet
allBaseFileNames -- allIndexedFileNames
}
/**
* Invoked by Spark to fetch list of latest base files per partition.
*
* @param partitionFilters partition column filters
* @param dataFilters data columns filters
* @return list of PartitionDirectory containing partition to base files mapping
*/
override def listFiles(partitionFilters: Seq[Expression],
dataFilters: Seq[Expression]): Seq[PartitionDirectory] = {
// Look up candidate files names in the col-stats index, if all of the following conditions are true
// - Data-skipping is enabled
// - Col-Stats Index is present
// - List of predicates (filters) is present
val candidateFilesNamesOpt: Option[Set[String]] =
lookupCandidateFilesInColStatsIndex(dataFilters) match {
case Success(opt) => opt
case Failure(e) =>
if (e.isInstanceOf[AnalysisException]) {
logDebug("Failed to relay provided data filters to Z-index lookup", e)
} else {
logError("Failed to lookup candidate files in Z-index", e)
}
Option.empty
}
logDebug(s"Overlapping candidate files (from Z-index): ${candidateFilesNamesOpt.getOrElse(Set.empty)}")
if (queryAsNonePartitionedTable) {
// Read as Non-Partitioned table
// Filter in candidate files based on the col-stats index lookup
val candidateFiles =
allFiles.filter(fileStatus =>
// NOTE: This predicate is true when {@code Option} is empty
candidateFilesNamesOpt.forall(_.contains(fileStatus.getPath.getName))
)
logInfo(s"Total files : ${allFiles.size}; " +
s"candidate files after data skipping: ${candidateFiles.size}; " +
s"skipping percent ${if (allFiles.nonEmpty) (allFiles.size - candidateFiles.size) / allFiles.size.toDouble else 0}")
Seq(PartitionDirectory(InternalRow.empty, candidateFiles))
} else {
// Prune the partition path by the partition filters
val prunedPartitions = prunePartition(cachedAllInputFileSlices.keys.toSeq, partitionFilters)
var totalFileSize = 0
var candidateFileSize = 0
val result = prunedPartitions.map { partition =>
val baseFileStatuses: Seq[FileStatus] =
cachedAllInputFileSlices(partition)
.map(fs => fs.getBaseFile.orElse(null))
.filter(_ != null)
.map(_.getFileStatus)
// Filter in candidate files based on the col-stats index lookup
val candidateFiles =
baseFileStatuses.filter(fs =>
// NOTE: This predicate is true when {@code Option} is empty
candidateFilesNamesOpt.forall(_.contains(fs.getPath.getName)))
totalFileSize += baseFileStatuses.size
candidateFileSize += candidateFiles.size
PartitionDirectory(partition.values, candidateFiles)
}
logInfo(s"Total base files: ${totalFileSize}; " +
s"candidate files after data skipping : ${candidateFileSize}; " +
s"skipping percent ${if (allFiles.nonEmpty) (totalFileSize - candidateFileSize) / totalFileSize.toDouble else 0}")
result
}
}
/**
* Fetch list of latest base files and log files per partition.
*
* @param partitionFilters partition column filters
* @param dataFilters data column filters
* @return mapping from string partition paths to its base/log files
*/
def listFileSlices(partitionFilters: Seq[Expression],
dataFilters: Seq[Expression]): Map[String, Seq[FileSlice]] = {
if (queryAsNonePartitionedTable) {
// Read as Non-Partitioned table.
cachedAllInputFileSlices.map(entry => (entry._1.partitionPath, entry._2))
} else {
// Prune the partition path by the partition filters
val prunedPartitions = prunePartition(cachedAllInputFileSlices.keys.toSeq, partitionFilters)
prunedPartitions.map(partition => {
(partition.partitionPath, cachedAllInputFileSlices(partition))
}).toMap
}
}
override def refresh(): Unit = super.refresh()
override def inputFiles: Array[String] = {
val fileStatusList = allFiles
fileStatusList.map(_.getPath.toString).toArray
}
override def refresh(): Unit = {
fileStatusCache.invalidateAll()
refresh0()
}
private def refresh0(): Unit = {
val startTime = System.currentTimeMillis()
val partitionFiles = loadPartitionPathFiles()
val allFiles = partitionFiles.values.reduceOption(_ ++ _)
.getOrElse(Array.empty[FileStatus])
metaClient.reloadActiveTimeline()
val activeInstants = metaClient.getActiveTimeline.getCommitsTimeline.filterCompletedInstants
val latestInstant = activeInstants.lastInstant()
fileSystemView = new HoodieTableFileSystemView(metaClient, activeInstants, allFiles)
val queryInstant = if (specifiedQueryInstant.isDefined) {
specifiedQueryInstant
} else if (latestInstant.isPresent) {
Some(latestInstant.get.getTimestamp)
} else {
None
}
(tableType, queryType) match {
case (MERGE_ON_READ, QUERY_TYPE_SNAPSHOT_OPT_VAL) =>
// Fetch and store latest base and log files, and their sizes
cachedAllInputFileSlices = partitionFiles.map(p => {
val latestSlices = if (latestInstant.isPresent) {
fileSystemView.getLatestMergedFileSlicesBeforeOrOn(p._1.partitionPath, queryInstant.get)
.iterator().asScala.toSeq
} else {
Seq()
}
(p._1, latestSlices)
})
cachedFileSize = cachedAllInputFileSlices.values.flatten.map(fileSlice => {
if (fileSlice.getBaseFile.isPresent) {
fileSlice.getBaseFile.get().getFileLen + fileSlice.getLogFiles.iterator().asScala.map(_.getFileSize).sum
} else {
fileSlice.getLogFiles.iterator().asScala.map(_.getFileSize).sum
}
}).sum
case (_, _) =>
// Fetch and store latest base files and its sizes
cachedAllInputFileSlices = partitionFiles.map(p => {
val fileSlices = specifiedQueryInstant
.map(instant =>
fileSystemView.getLatestFileSlicesBeforeOrOn(p._1.partitionPath, instant, true))
.getOrElse(fileSystemView.getLatestFileSlices(p._1.partitionPath))
.iterator().asScala.toSeq
(p._1, fileSlices)
})
cachedFileSize = cachedAllInputFileSlices.values.flatten.map(fileSliceSize).sum
}
// If the partition value contains InternalRow.empty, we query it as a non-partitioned table.
queryAsNonePartitionedTable = partitionFiles.keys.exists(p => p.values == InternalRow.empty)
val flushSpend = System.currentTimeMillis() - startTime
logInfo(s"Refresh table ${metaClient.getTableConfig.getTableName}," +
s" spend: $flushSpend ms")
}
private def fileSliceSize(fileSlice: FileSlice): Long = {
val logFileSize = fileSlice.getLogFiles.iterator().asScala.map(_.getFileSize).filter(_ > 0).sum
if (fileSlice.getBaseFile.isPresent) {
fileSlice.getBaseFile.get().getFileLen + logFileSize
} else {
logFileSize
}
}
override def sizeInBytes: Long = {
cachedFileSize
}
}
override def partitionSchema: StructType = {
if (queryAsNonePartitionedTable) {
// If we read it as Non-Partitioned table, we should not
// return the partition schema.
new StructType()
} else {
_partitionSchemaFromProperties
}
}
object HoodieFileIndex {
/**
* Get the data schema of the table.
* @return
*/
def dataSchema: StructType = {
val partitionColumns = partitionSchema.fields.map(_.name).toSet
StructType(schema.fields.filterNot(f => partitionColumns.contains(f.name)))
}
def getConfigProperties(spark: SparkSession, options: Map[String, String]) = {
val sqlConf: SQLConf = spark.sessionState.conf
val properties = new TypedProperties()
/**
* Returns the FileStatus for all the base files (excluding log files). This should be used only for
* cases where Spark directly fetches the list of files via HoodieFileIndex or for read optimized query logic
* implemented internally within Hudi like HoodieBootstrapRelation. This helps avoid the use of path filter
* to filter out log files within Spark.
*
* @return List of FileStatus for base files
*/
def allFiles: Seq[FileStatus] = {
cachedAllInputFileSlices.values.flatten
.filter(_.getBaseFile.isPresent)
.map(_.getBaseFile.get().getFileStatus)
.toSeq
}
/**
* Prune the partition by the filter.This implementation is fork from
* org.apache.spark.sql.execution.datasources.PartitioningAwareFileIndex#prunePartitions.
* @param partitionPaths All the partition paths.
* @param predicates The filter condition.
* @return The Pruned partition paths.
*/
def prunePartition(partitionPaths: Seq[PartitionRowPath],
predicates: Seq[Expression]): Seq[PartitionRowPath] = {
val partitionColumnNames = partitionSchema.fields.map(_.name).toSet
val partitionPruningPredicates = predicates.filter {
_.references.map(_.name).toSet.subsetOf(partitionColumnNames)
}
if (partitionPruningPredicates.nonEmpty) {
val predicate = partitionPruningPredicates.reduce(expressions.And)
val boundPredicate = InterpretedPredicate(predicate.transform {
case a: AttributeReference =>
val index = partitionSchema.indexWhere(a.name == _.name)
BoundReference(index, partitionSchema(index).dataType, nullable = true)
})
val prunedPartitionPaths = partitionPaths.filter {
case PartitionRowPath(values, _) => boundPredicate.eval(values)
}
logInfo(s"Total partition size is: ${partitionPaths.size}," +
s" after partition prune size is: ${prunedPartitionPaths.size}")
prunedPartitionPaths
} else {
partitionPaths
}
}
def getAllQueryPartitionPaths: Seq[PartitionRowPath] = {
val queryPartitionPath = FSUtils.getRelativePartitionPath(new Path(basePath), queryPath)
// Load all the partition path from the basePath, and filter by the query partition path.
// TODO load files from the queryPartitionPath directly.
val partitionPaths = FSUtils.getAllPartitionPaths(engineContext, metadataConfig, basePath).asScala
.filter(_.startsWith(queryPartitionPath))
val partitionSchema = _partitionSchemaFromProperties
val timeZoneId = CaseInsensitiveMap(options)
.get(DateTimeUtils.TIMEZONE_OPTION)
.getOrElse(SQLConf.get.sessionLocalTimeZone)
val sparkParsePartitionUtil = sparkAdapter.createSparkParsePartitionUtil(spark
.sessionState.conf)
// Convert partition path to PartitionRowPath
partitionPaths.map { partitionPath =>
val partitionRow = if (partitionSchema.fields.length == 0) {
// This is a non-partitioned table
InternalRow.empty
} else {
val partitionFragments = partitionPath.split("/")
if (partitionFragments.length != partitionSchema.fields.length &&
partitionSchema.fields.length == 1) {
// If the partition column size is not equal to the partition fragment size
// and the partition column size is 1, we map the whole partition path
// to the partition column which can benefit from the partition prune.
val prefix = s"${partitionSchema.fieldNames.head}="
val partitionValue = if (partitionPath.startsWith(prefix)) {
// support hive style partition path
partitionPath.substring(prefix.length)
} else {
partitionPath
}
InternalRow.fromSeq(Seq(UTF8String.fromString(partitionValue)))
} else if (partitionFragments.length != partitionSchema.fields.length &&
partitionSchema.fields.length > 1) {
// If the partition column size is not equal to the partition fragments size
// and the partition column size > 1, we do not know how to map the partition
// fragments to the partition columns. So we trait it as a Non-Partitioned Table
// for the query which do not benefit from the partition prune.
logWarning( s"Cannot do the partition prune for table $basePath." +
s"The partitionFragments size (${partitionFragments.mkString(",")})" +
s" is not equal to the partition columns size(${partitionSchema.fields.mkString(",")})")
InternalRow.empty
} else { // If partitionSeqs.length == partitionSchema.fields.length
// Append partition name to the partition value if the
// HIVE_STYLE_PARTITIONING is disable.
// e.g. convert "/xx/xx/2021/02" to "/xx/xx/year=2021/month=02"
val partitionWithName =
partitionFragments.zip(partitionSchema).map {
case (partition, field) =>
if (partition.indexOf("=") == -1) {
s"${field.name}=$partition"
} else {
partition
}
}.mkString("/")
val pathWithPartitionName = new Path(basePath, partitionWithName)
val partitionDataTypes = partitionSchema.fields.map(f => f.name -> f.dataType).toMap
sparkParsePartitionUtil.parsePartition(pathWithPartitionName,
typeInference = false, Set(new Path(basePath)), partitionDataTypes,
DateTimeUtils.getTimeZone(timeZoneId))
}
}
PartitionRowPath(partitionRow, partitionPath)
}
}
/**
* Load all partition paths and it's files under the query table path.
*/
private def loadPartitionPathFiles(): Map[PartitionRowPath, Array[FileStatus]] = {
val partitionRowPaths = getAllQueryPartitionPaths
// List files in all of the partition path.
val pathToFetch = mutable.ArrayBuffer[PartitionRowPath]()
val cachePartitionToFiles = mutable.Map[PartitionRowPath, Array[FileStatus]]()
// Fetch from the FileStatusCache
partitionRowPaths.foreach { partitionRowPath =>
fileStatusCache.getLeafFiles(partitionRowPath.fullPartitionPath(basePath)) match {
case Some(filesInPartition) =>
cachePartitionToFiles.put(partitionRowPath, filesInPartition)
case None => pathToFetch.append(partitionRowPath)
}
}
val fetchedPartitionToFiles =
if (pathToFetch.nonEmpty) {
val fullPartitionPathsToFetch = pathToFetch.map(p => (p, p.fullPartitionPath(basePath).toString)).toMap
val partitionToFilesMap = FSUtils.getFilesInPartitions(engineContext, metadataConfig, basePath,
fullPartitionPathsToFetch.values.toArray, fileSystemStorageConfig.getSpillableDir)
fullPartitionPathsToFetch.map(p => {
(p._1, partitionToFilesMap.get(p._2))
})
} else {
Map.empty[PartitionRowPath, Array[FileStatus]]
}
// Update the fileStatusCache
fetchedPartitionToFiles.foreach {
case (partitionRowPath, filesInPartition) =>
fileStatusCache.putLeafFiles(partitionRowPath.fullPartitionPath(basePath), filesInPartition)
}
cachePartitionToFiles.toMap ++ fetchedPartitionToFiles
}
/**
* Represent a partition path.
* e.g. PartitionPath(InternalRow("2021","02","01"), "2021/02/01"))
* @param values The partition values of this partition path.
* @param partitionPath The partition path string.
*/
case class PartitionRowPath(values: InternalRow, partitionPath: String) {
override def equals(other: Any): Boolean = other match {
case PartitionRowPath(_, otherPath) => partitionPath == otherPath
case _ => false
}
override def hashCode(): Int = {
partitionPath.hashCode
}
def fullPartitionPath(basePath: String): Path = {
if (partitionPath.isEmpty) {
new Path(basePath) // This is a non-partition path
} else {
new Path(basePath, partitionPath)
}
}
// To support metadata listing via Spark SQL we allow users to pass the config via SQL Conf in spark session. Users
// would be able to run SET hoodie.metadata.enable=true in the spark sql session to enable metadata listing.
properties.setProperty(HoodieMetadataConfig.ENABLE.key(),
sqlConf.getConfString(HoodieMetadataConfig.ENABLE.key(),
HoodieMetadataConfig.DEFAULT_METADATA_ENABLE_FOR_READERS.toString))
properties.putAll(options.asJava)
properties
}
}

View File

@@ -168,31 +168,31 @@ class MergeOnReadIncrementalRelation(val sqlContext: SQLContext,
val fsView = new HoodieTableFileSystemView(metaClient, commitsTimelineToReturn, affectedFileStatus)
// Iterate partitions to create splits
val fileGroup = getWritePartitionPaths(metadataList).flatMap(partitionPath =>
val fileGroups = getWritePartitionPaths(metadataList).flatMap(partitionPath =>
fsView.getAllFileGroups(partitionPath).iterator()
).toList
val latestCommit = fsView.getLastInstant.get().getTimestamp
val latestCommit = fsView.getLastInstant.get.getTimestamp
if (log.isDebugEnabled) {
fileGroup.foreach(f => log.debug(s"current file group id: " +
s"${f.getFileGroupId} and file slices ${f.getLatestFileSlice.get().toString}"))
fileGroups.foreach(f => log.debug(s"current file group id: " +
s"${f.getFileGroupId} and file slices ${f.getLatestFileSlice.get.toString}"))
}
// Filter files based on user defined glob pattern
val pathGlobPattern = optParams.getOrElse(
DataSourceReadOptions.INCR_PATH_GLOB.key,
DataSourceReadOptions.INCR_PATH_GLOB.defaultValue)
val filteredFileGroup = if(!pathGlobPattern
.equals(DataSourceReadOptions.INCR_PATH_GLOB.defaultValue)) {
val filteredFileGroup = if (!pathGlobPattern.equals(DataSourceReadOptions.INCR_PATH_GLOB.defaultValue)) {
val globMatcher = new GlobPattern("*" + pathGlobPattern)
fileGroup.filter(f => {
if (f.getLatestFileSlice.get().getBaseFile.isPresent) {
globMatcher.matches(f.getLatestFileSlice.get().getBaseFile.get.getPath)
fileGroups.filter(fg => {
val latestFileSlice = fg.getLatestFileSlice.get
if (latestFileSlice.getBaseFile.isPresent) {
globMatcher.matches(latestFileSlice.getBaseFile.get.getPath)
} else {
globMatcher.matches(f.getLatestFileSlice.get().getLatestLogFile.get().getPath.toString)
globMatcher.matches(latestFileSlice.getLatestLogFile.get.getPath.toString)
}
})
} else {
fileGroup
fileGroups
}
// Build HoodieMergeOnReadFileSplit.

View File

@@ -30,6 +30,7 @@ import org.apache.spark.internal.Logging
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.avro.SchemaConverters
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.Expression
import org.apache.spark.sql.execution.datasources.{FileStatusCache, PartitionedFile}
import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat
import org.apache.spark.sql.hudi.HoodieSqlCommonUtils
@@ -198,9 +199,9 @@ class MergeOnReadSnapshotRelation(val sqlContext: SQLContext,
// If convert success to catalyst expression, use the partition prune
val fileSlices = if (partitionFilterExpression.isDefined) {
hoodieFileIndex.listFileSlices(Seq(partitionFilterExpression.get), Seq.empty)
hoodieFileIndex.listFileSlices(Seq(partitionFilterExpression.get))
} else {
hoodieFileIndex.listFileSlices(Seq.empty, Seq.empty)
hoodieFileIndex.listFileSlices(Seq.empty[Expression])
}
if (fileSlices.isEmpty) {
@@ -223,6 +224,7 @@ class MergeOnReadSnapshotRelation(val sqlContext: SQLContext,
val logPaths = fileSlice.getLogFiles.sorted(HoodieLogFile.getLogFileComparator).iterator().asScala
.map(logFile => MergeOnReadSnapshotRelation.getFilePath(logFile.getPath)).toList
val logPathsOptional = if (logPaths.isEmpty) Option.empty else Option(logPaths)
HoodieMergeOnReadFileSplit(partitionedFile, logPathsOptional, queryInstant, metaClient.getBasePath,
maxCompactionMemoryInBytes, mergeType)
}).toList

View File

@@ -0,0 +1,288 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi
import org.apache.hadoop.fs.{FileStatus, Path}
import org.apache.hudi.SparkHoodieTableFileIndex.generateFieldMap
import org.apache.hudi.client.common.HoodieSparkEngineContext
import org.apache.hudi.common.config.TypedProperties
import org.apache.hudi.common.model.FileSlice
import org.apache.hudi.common.table.{HoodieTableMetaClient, TableSchemaResolver}
import org.apache.hudi.keygen.{TimestampBasedAvroKeyGenerator, TimestampBasedKeyGenerator}
import org.apache.spark.api.java.JavaSparkContext
import org.apache.spark.internal.Logging
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.expressions.{AttributeReference, BoundReference, Expression, InterpretedPredicate}
import org.apache.spark.sql.catalyst.util.DateTimeUtils
import org.apache.spark.sql.catalyst.{InternalRow, expressions}
import org.apache.spark.sql.execution.datasources.{FileStatusCache, NoopCache}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types.{StringType, StructField, StructType}
import org.apache.spark.unsafe.types.UTF8String
/**
* Implementation of the [[AbstractHoodieTableFileIndex]] for Spark
*
* @param spark spark session
* @param metaClient Hudi table's meta-client
* @param schemaSpec optional table's schema
* @param configProperties unifying configuration (in the form of generic properties)
* @param specifiedQueryInstant instant as of which table is being queried
* @param fileStatusCache transient cache of fetched [[FileStatus]]es
*/
class SparkHoodieTableFileIndex(spark: SparkSession,
metaClient: HoodieTableMetaClient,
schemaSpec: Option[StructType],
configProperties: TypedProperties,
specifiedQueryInstant: Option[String] = None,
@transient fileStatusCache: FileStatusCache = NoopCache)
extends AbstractHoodieTableFileIndex(
engineContext = new HoodieSparkEngineContext(new JavaSparkContext(spark.sparkContext)),
metaClient,
configProperties,
specifiedQueryInstant,
SparkHoodieTableFileIndex.adapt(fileStatusCache)
)
with SparkAdapterSupport
with Logging {
/**
* Get the schema of the table.
*/
lazy val schema: StructType = schemaSpec.getOrElse({
val schemaUtil = new TableSchemaResolver(metaClient)
AvroConversionUtils.convertAvroSchemaToStructType(schemaUtil.getTableAvroSchema)
})
private lazy val sparkParsePartitionUtil = sparkAdapter.createSparkParsePartitionUtil(spark.sessionState.conf)
/**
* Get the partition schema from the hoodie.properties.
*/
private lazy val _partitionSchemaFromProperties: StructType = {
val tableConfig = metaClient.getTableConfig
val partitionColumns = tableConfig.getPartitionFields
val nameFieldMap = generateFieldMap(schema)
if (partitionColumns.isPresent) {
if (tableConfig.getKeyGeneratorClassName.equalsIgnoreCase(classOf[TimestampBasedKeyGenerator].getName)
|| tableConfig.getKeyGeneratorClassName.equalsIgnoreCase(classOf[TimestampBasedAvroKeyGenerator].getName)) {
val partitionFields = partitionColumns.get().map(column => StructField(column, StringType))
StructType(partitionFields)
} else {
val partitionFields = partitionColumns.get().map(column =>
nameFieldMap.getOrElse(column, throw new IllegalArgumentException(s"Cannot find column: '" +
s"$column' in the schema[${schema.fields.mkString(",")}]")))
StructType(partitionFields)
}
} else {
// If the partition columns have not stored in hoodie.properties(the table that was
// created earlier), we trait it as a non-partitioned table.
logWarning("No partition columns available from hoodie.properties." +
" Partition pruning will not work")
new StructType()
}
}
/**
* Get the data schema of the table.
*
* @return
*/
def dataSchema: StructType = {
val partitionColumns = partitionSchema.fields.map(_.name).toSet
StructType(schema.fields.filterNot(f => partitionColumns.contains(f.name)))
}
def partitionSchema: StructType = {
if (queryAsNonePartitionedTable) {
// If we read it as Non-Partitioned table, we should not
// return the partition schema.
new StructType()
} else {
_partitionSchemaFromProperties
}
}
/**
* Fetch list of latest base files w/ corresponding log files, after performing
* partition pruning
*
* @param partitionFilters partition column filters
* @return mapping from string partition paths to its base/log files
*/
def listFileSlices(partitionFilters: Seq[Expression]): Map[String, Seq[FileSlice]] = {
// Prune the partition path by the partition filters
val prunedPartitions = prunePartition(cachedAllInputFileSlices.keys.toSeq, partitionFilters)
prunedPartitions.map(partition => {
(partition.path, cachedAllInputFileSlices(partition))
}).toMap
}
/**
* Prune the partition by the filter.This implementation is fork from
* org.apache.spark.sql.execution.datasources.PartitioningAwareFileIndex#prunePartitions.
*
* @param partitionPaths All the partition paths.
* @param predicates The filter condition.
* @return The Pruned partition paths.
*/
def prunePartition(partitionPaths: Seq[PartitionPath],
predicates: Seq[Expression]): Seq[PartitionPath] = {
val partitionColumnNames = partitionSchema.fields.map(_.name).toSet
val partitionPruningPredicates = predicates.filter {
_.references.map(_.name).toSet.subsetOf(partitionColumnNames)
}
if (partitionPruningPredicates.nonEmpty) {
val predicate = partitionPruningPredicates.reduce(expressions.And)
val boundPredicate = InterpretedPredicate(predicate.transform {
case a: AttributeReference =>
val index = partitionSchema.indexWhere(a.name == _.name)
BoundReference(index, partitionSchema(index).dataType, nullable = true)
})
val prunedPartitionPaths = partitionPaths.filter {
case PartitionPath(_, values) => boundPredicate.eval(InternalRow.fromSeq(values))
}
logInfo(s"Total partition size is: ${partitionPaths.size}," +
s" after partition prune size is: ${prunedPartitionPaths.size}")
prunedPartitionPaths
} else {
partitionPaths
}
}
protected def parsePartitionColumnValues(partitionColumns: Array[String], partitionPath: String): Array[Any] = {
if (partitionColumns.length == 0) {
// This is a non-partitioned table
Array.empty
} else {
val partitionFragments = partitionPath.split("/")
if (partitionFragments.length != partitionColumns.length &&
partitionColumns.length == 1) {
// If the partition column size is not equal to the partition fragment size
// and the partition column size is 1, we map the whole partition path
// to the partition column which can benefit from the partition prune.
val prefix = s"${partitionColumns.head}="
val partitionValue = if (partitionPath.startsWith(prefix)) {
// support hive style partition path
partitionPath.substring(prefix.length)
} else {
partitionPath
}
Array(UTF8String.fromString(partitionValue))
} else if (partitionFragments.length != partitionColumns.length &&
partitionColumns.length > 1) {
// If the partition column size is not equal to the partition fragments size
// and the partition column size > 1, we do not know how to map the partition
// fragments to the partition columns. So we trait it as a Non-Partitioned Table
// for the query which do not benefit from the partition prune.
logWarning(s"Cannot do the partition prune for table $basePath." +
s"The partitionFragments size (${partitionFragments.mkString(",")})" +
s" is not equal to the partition columns size(${partitionColumns.mkString(",")})")
Array.empty
} else {
// If partitionSeqs.length == partitionSchema.fields.length
// Append partition name to the partition value if the
// HIVE_STYLE_PARTITIONING is disable.
// e.g. convert "/xx/xx/2021/02" to "/xx/xx/year=2021/month=02"
val partitionWithName =
partitionFragments.zip(partitionColumns).map {
case (partition, columnName) =>
if (partition.indexOf("=") == -1) {
s"${columnName}=$partition"
} else {
partition
}
}.mkString("/")
val pathWithPartitionName = new Path(basePath, partitionWithName)
val partitionValues = parsePartitionPath(pathWithPartitionName, partitionSchema)
partitionValues.toArray
}
}
}
private def parsePartitionPath(partitionPath: Path, partitionSchema: StructType): Seq[Any] = {
val timeZoneId = configProperties.getString(DateTimeUtils.TIMEZONE_OPTION, SQLConf.get.sessionLocalTimeZone)
val partitionDataTypes = partitionSchema.map(f => f.name -> f.dataType).toMap
sparkParsePartitionUtil.parsePartition(
partitionPath,
typeInference = false,
Set(new Path(basePath)),
partitionDataTypes,
DateTimeUtils.getTimeZone(timeZoneId)
)
.toSeq(partitionSchema)
}
}
object SparkHoodieTableFileIndex {
/**
* This method unravels [[StructType]] into a [[Map]] of pairs of dot-path notation with corresponding
* [[StructField]] object for every field of the provided [[StructType]], recursively.
*
* For example, following struct
* <pre>
* StructType(
* StructField("a",
* StructType(
* StructField("b", StringType),
* StructField("c", IntType)
* )
* )
* )
* </pre>
*
* will be converted into following mapping:
*
* <pre>
* "a.b" -> StructField("b", StringType),
* "a.c" -> StructField("c", IntType),
* </pre>
*/
private def generateFieldMap(structType: StructType) : Map[String, StructField] = {
def traverse(structField: Either[StructField, StructType]) : Map[String, StructField] = {
structField match {
case Right(struct) => struct.fields.flatMap(f => traverse(Left(f))).toMap
case Left(field) => field.dataType match {
case struct: StructType => traverse(Right(struct)).map {
case (key, structField) => (s"${field.name}.$key", structField)
}
case _ => Map(field.name -> field)
}
}
}
traverse(Right(structType))
}
private def adapt(cache: FileStatusCache): FileStatusCacheTrait = {
new FileStatusCacheTrait {
override def get(path: Path): Option[Array[FileStatus]] = cache.getLeafFiles(path)
override def put(path: Path, leafFiles: Array[FileStatus]): Unit = cache.putLeafFiles(path, leafFiles)
override def invalidate(): Unit = cache.invalidateAll()
}
}
}

View File

@@ -273,7 +273,7 @@ class TestHoodieFileIndex extends HoodieClientTestBase {
val fileIndex = HoodieFileIndex(spark, metaClient, None,
queryOpts ++ Map(HoodieMetadataConfig.ENABLE.key -> useMetaFileList.toString))
// test if table is partitioned on nested columns, getAllQueryPartitionPaths does not break
assert(fileIndex.getAllQueryPartitionPaths.get(0).partitionPath.equals("c"))
assert(fileIndex.getAllQueryPartitionPaths.get(0).path.equals("c"))
}
private def attribute(partition: String): AttributeReference = {

View File

@@ -143,7 +143,7 @@ class Spark3ParsePartitionUtil(conf: SQLConf) extends SparkParsePartitionUtil {
(None, Some(path))
} else {
val (columnNames, values) = columns.reverse.unzip
(Some(PartitionValues(columnNames.toSeq, values.toSeq)), Some(currentPath))
(Some(PartitionValues(columnNames, values)), Some(currentPath))
}
}