1
0

[HUDI-2107] Support Read Log Only MOR Table For Spark (#3193)

This commit is contained in:
pengzhiwei
2021-07-12 17:31:23 +08:00
committed by GitHub
parent 5804ad8e32
commit ca440ccf88
6 changed files with 214 additions and 71 deletions

View File

@@ -106,6 +106,12 @@ case class HoodieFileIndex(
}
}
private lazy val metadataConfig = {
val properties = new Properties()
properties.putAll(options.asJava)
HoodieMetadataConfig.newBuilder.fromProperties(properties).build()
}
@transient @volatile private var fileSystemView: HoodieTableFileSystemView = _
@transient @volatile private var cachedAllInputFiles: Array[HoodieBaseFile] = _
@transient @volatile private var cachedFileSize: Long = 0L
@@ -195,8 +201,8 @@ case class HoodieFileIndex(
* @param predicates The filter condition.
* @return The Pruned partition paths.
*/
private def prunePartition(partitionPaths: Seq[PartitionRowPath],
predicates: Seq[Expression]): Seq[PartitionRowPath] = {
def prunePartition(partitionPaths: Seq[PartitionRowPath],
predicates: Seq[Expression]): Seq[PartitionRowPath] = {
val partitionColumnNames = partitionSchema.fields.map(_.name).toSet
val partitionPruningPredicates = predicates.filter {
@@ -222,26 +228,13 @@ case class HoodieFileIndex(
}
}
/**
* Load all partition paths and it's files under the query table path.
*/
private def loadPartitionPathFiles(): Map[PartitionRowPath, Array[FileStatus]] = {
def getAllQueryPartitionPaths: Seq[PartitionRowPath] = {
val sparkEngine = new HoodieSparkEngineContext(new JavaSparkContext(spark.sparkContext))
val properties = new Properties()
properties.putAll(options.asJava)
val metadataConfig = HoodieMetadataConfig.newBuilder.fromProperties(properties).build()
val queryPartitionPath = FSUtils.getRelativePartitionPath(new Path(basePath), queryPath)
// Load all the partition path from the basePath, and filter by the query partition path.
// TODO load files from the queryPartitionPath directly.
val partitionPaths = FSUtils.getAllPartitionPaths(sparkEngine, metadataConfig, basePath).asScala
.filter(_.startsWith(queryPartitionPath))
val writeConfig = HoodieWriteConfig.newBuilder()
.withPath(basePath).withProperties(properties).build()
val maxListParallelism = writeConfig.getFileListingParallelism
val serializableConf = new SerializableConfiguration(spark.sessionState.newHadoopConf())
val partitionSchema = _partitionSchemaFromProperties
val timeZoneId = CaseInsensitiveMap(options)
.get(DateTimeUtils.TIMEZONE_OPTION)
@@ -250,7 +243,7 @@ case class HoodieFileIndex(
val sparkParsePartitionUtil = sparkAdapter.createSparkParsePartitionUtil(spark
.sessionState.conf)
// Convert partition path to PartitionRowPath
val partitionRowPaths = partitionPaths.map { partitionPath =>
partitionPaths.map { partitionPath =>
val partitionRow = if (partitionSchema.fields.length == 0) {
// This is a non-partitioned table
InternalRow.empty
@@ -308,7 +301,20 @@ case class HoodieFileIndex(
}
PartitionRowPath(partitionRow, partitionPath)
}
}
/**
* Load all partition paths and it's files under the query table path.
*/
private def loadPartitionPathFiles(): Map[PartitionRowPath, Array[FileStatus]] = {
val properties = new Properties()
properties.putAll(options.asJava)
val writeConfig = HoodieWriteConfig.newBuilder()
.withPath(basePath).withProperties(properties).build()
val maxListParallelism = writeConfig.getFileListingParallelism
val serializableConf = new SerializableConfiguration(spark.sessionState.newHadoopConf())
val partitionRowPaths = getAllQueryPartitionPaths
// List files in all of the partition path.
val pathToFetch = mutable.ArrayBuffer[PartitionRowPath]()
val cachePartitionToFiles = mutable.Map[PartitionRowPath, Array[FileStatus]]()

View File

@@ -19,7 +19,6 @@
package org.apache.hudi
import org.apache.avro.Schema
import org.apache.hudi.common.model.HoodieBaseFile
import org.apache.hudi.common.table.{HoodieTableMetaClient, TableSchemaResolver}
import org.apache.hudi.common.table.view.HoodieTableFileSystemView
import org.apache.hudi.hadoop.utils.HoodieRealtimeInputFormatUtils
@@ -137,12 +136,15 @@ class MergeOnReadSnapshotRelation(val sqlContext: SQLContext,
}
def buildFileIndex(filters: Array[Filter]): List[HoodieMergeOnReadFileSplit] = {
val fileStatuses = if (globPaths.isDefined) {
// Get all partition paths
val partitionPaths = if (globPaths.isDefined) {
// Load files from the global paths if it has defined to be compatible with the original mode
val inMemoryFileIndex = HoodieSparkUtils.createInMemoryFileIndex(sqlContext.sparkSession, globPaths.get)
inMemoryFileIndex.allFiles()
} else { // Load files by the HoodieFileIndex.
val fsView = new HoodieTableFileSystemView(metaClient,
metaClient.getActiveTimeline.getCommitsTimeline
.filterCompletedInstants, inMemoryFileIndex.allFiles().toArray)
fsView.getLatestBaseFiles.iterator().asScala.toList.map(_.getFileStatus.getPath.getParent)
} else { // Load partition path by the HoodieFileIndex.
val hoodieFileIndex = HoodieFileIndex(sqlContext.sparkSession, metaClient,
Some(tableStructSchema), optParams, FileStatusCache.getOrCreate(sqlContext.sparkSession))
@@ -152,34 +154,35 @@ class MergeOnReadSnapshotRelation(val sqlContext: SQLContext,
val partitionFilterExpression =
HoodieSparkUtils.convertToCatalystExpressions(partitionFilters, tableStructSchema)
// if convert success to catalyst expression, use the partition prune
if (partitionFilterExpression.isDefined) {
hoodieFileIndex.listFiles(Seq(partitionFilterExpression.get), Seq.empty).flatMap(_.files)
} else {
hoodieFileIndex.allFiles
}
val allPartitionPaths = hoodieFileIndex.getAllQueryPartitionPaths
// If convert success to catalyst expression, use the partition prune
hoodieFileIndex.prunePartition(allPartitionPaths, partitionFilterExpression.map(Seq(_)).getOrElse(Seq.empty))
.map(_.fullPartitionPath(metaClient.getBasePath))
}
if (fileStatuses.isEmpty) { // If this an empty table, return an empty split list.
if (partitionPaths.isEmpty) { // If this an empty table, return an empty split list.
List.empty[HoodieMergeOnReadFileSplit]
} else {
val fsView = new HoodieTableFileSystemView(metaClient,
metaClient.getActiveTimeline.getCommitsTimeline
.filterCompletedInstants, fileStatuses.toArray)
val latestFiles: List[HoodieBaseFile] = fsView.getLatestBaseFiles.iterator().asScala.toList
if (!fsView.getLastInstant.isPresent) { // Return empty list if the table has no commit
val lastInstant = metaClient.getActiveTimeline.getCommitsTimeline.filterCompletedInstants.lastInstant()
if (!lastInstant.isPresent) { // Return empty list if the table has no commit
List.empty
} else {
val latestCommit = fsView.getLastInstant.get().getTimestamp
val fileGroup = HoodieRealtimeInputFormatUtils.groupLogsByBaseFile(conf, latestFiles.asJava).asScala
val fileSplits = fileGroup.map(kv => {
val baseFile = kv._1
val logPaths = if (kv._2.isEmpty) Option.empty else Option(kv._2.asScala.toList)
val filePath = MergeOnReadSnapshotRelation.getFilePath(baseFile.getFileStatus.getPath)
val latestCommit = lastInstant.get().getTimestamp
val baseAndLogsList = HoodieRealtimeInputFormatUtils.groupLogsByBaseFile(conf, partitionPaths.asJava).asScala
val fileSplits = baseAndLogsList.map(kv => {
val baseFile = kv.getLeft
val logPaths = if (kv.getRight.isEmpty) Option.empty else Option(kv.getRight.asScala.toList)
val partitionedFile = PartitionedFile(InternalRow.empty, filePath, 0, baseFile.getFileLen)
HoodieMergeOnReadFileSplit(Option(partitionedFile), logPaths, latestCommit,
val baseDataPath = if (baseFile.isPresent) {
Some(PartitionedFile(
InternalRow.empty,
MergeOnReadSnapshotRelation.getFilePath(baseFile.get.getFileStatus.getPath),
0, baseFile.get.getFileLen)
)
} else {
None
}
HoodieMergeOnReadFileSplit(baseDataPath, logPaths, latestCommit,
metaClient.getBasePath, maxCompactionMemoryInBytes, mergeType)
}).toList
fileSplits