[HUDI-920] Support Incremental query for MOR table (#1938)
This commit is contained in:
@@ -94,7 +94,12 @@ class DefaultSource extends RelationProvider
|
||||
} else if(parameters(QUERY_TYPE_OPT_KEY).equals(QUERY_TYPE_READ_OPTIMIZED_OPT_VAL)) {
|
||||
getBaseFileOnlyView(sqlContext, parameters, schema, readPaths, isBootstrappedTable, globPaths, metaClient)
|
||||
} else if (parameters(QUERY_TYPE_OPT_KEY).equals(QUERY_TYPE_INCREMENTAL_OPT_VAL)) {
|
||||
new IncrementalRelation(sqlContext, tablePath, optParams, schema)
|
||||
val metaClient = new HoodieTableMetaClient(fs.getConf, tablePath)
|
||||
if (metaClient.getTableType.equals(HoodieTableType.MERGE_ON_READ)) {
|
||||
new MergeOnReadIncrementalRelation(sqlContext, optParams, schema, metaClient)
|
||||
} else {
|
||||
new IncrementalRelation(sqlContext, optParams, schema, metaClient)
|
||||
}
|
||||
} else {
|
||||
throw new HoodieException("Invalid query type :" + parameters(QUERY_TYPE_OPT_KEY))
|
||||
}
|
||||
|
||||
@@ -50,30 +50,32 @@ class HoodieMergeOnReadRDD(@transient sc: SparkContext,
|
||||
private val confBroadcast = sc.broadcast(new SerializableWritable(config))
|
||||
|
||||
override def compute(split: Partition, context: TaskContext): Iterator[InternalRow] = {
|
||||
val mergeParquetPartition = split.asInstanceOf[HoodieMergeOnReadPartition]
|
||||
mergeParquetPartition.split match {
|
||||
val mergeOnReadPartition = split.asInstanceOf[HoodieMergeOnReadPartition]
|
||||
mergeOnReadPartition.split match {
|
||||
case dataFileOnlySplit if dataFileOnlySplit.logPaths.isEmpty =>
|
||||
read(mergeParquetPartition.split.dataFile, requiredSchemaFileReader)
|
||||
read(dataFileOnlySplit.dataFile.get, requiredSchemaFileReader)
|
||||
case logFileOnlySplit if logFileOnlySplit.dataFile.isEmpty =>
|
||||
logFileIterator(logFileOnlySplit, getConfig)
|
||||
case skipMergeSplit if skipMergeSplit.mergeType
|
||||
.equals(DataSourceReadOptions.REALTIME_SKIP_MERGE_OPT_VAL) =>
|
||||
skipMergeFileIterator(
|
||||
skipMergeSplit,
|
||||
read(mergeParquetPartition.split.dataFile, requiredSchemaFileReader),
|
||||
read(skipMergeSplit.dataFile.get, requiredSchemaFileReader),
|
||||
getConfig
|
||||
)
|
||||
case payloadCombineSplit if payloadCombineSplit.mergeType
|
||||
.equals(DataSourceReadOptions.REALTIME_PAYLOAD_COMBINE_OPT_VAL) =>
|
||||
payloadCombineFileIterator(
|
||||
payloadCombineSplit,
|
||||
read(mergeParquetPartition.split.dataFile, fullSchemaFileReader),
|
||||
read(payloadCombineSplit.dataFile.get, fullSchemaFileReader),
|
||||
getConfig
|
||||
)
|
||||
case _ => throw new HoodieException(s"Unable to select an Iterator to read the Hoodie MOR File Split for " +
|
||||
s"file path: ${mergeParquetPartition.split.dataFile.filePath}" +
|
||||
s"log paths: ${mergeParquetPartition.split.logPaths.toString}" +
|
||||
s"hoodie table path: ${mergeParquetPartition.split.tablePath}" +
|
||||
s"spark partition Index: ${mergeParquetPartition.index}" +
|
||||
s"merge type: ${mergeParquetPartition.split.mergeType}")
|
||||
s"file path: ${mergeOnReadPartition.split.dataFile.get.filePath}" +
|
||||
s"log paths: ${mergeOnReadPartition.split.logPaths.toString}" +
|
||||
s"hoodie table path: ${mergeOnReadPartition.split.tablePath}" +
|
||||
s"spark partition Index: ${mergeOnReadPartition.index}" +
|
||||
s"merge type: ${mergeOnReadPartition.split.mergeType}")
|
||||
}
|
||||
}
|
||||
|
||||
@@ -101,6 +103,44 @@ class HoodieMergeOnReadRDD(@transient sc: SparkContext,
|
||||
rows
|
||||
}
|
||||
|
||||
private def logFileIterator(split: HoodieMergeOnReadFileSplit,
|
||||
config: Configuration): Iterator[InternalRow] =
|
||||
new Iterator[InternalRow] {
|
||||
private val tableAvroSchema = new Schema.Parser().parse(tableState.tableAvroSchema)
|
||||
private val requiredAvroSchema = new Schema.Parser().parse(tableState.requiredAvroSchema)
|
||||
private val requiredFieldPosition =
|
||||
tableState.requiredStructSchema
|
||||
.map(f => tableAvroSchema.getField(f.name).pos()).toList
|
||||
private val recordBuilder = new GenericRecordBuilder(requiredAvroSchema)
|
||||
private val deserializer = new AvroDeserializer(requiredAvroSchema, tableState.requiredStructSchema)
|
||||
private val unsafeProjection = UnsafeProjection.create(tableState.requiredStructSchema)
|
||||
private val logRecords = HoodieMergeOnReadRDD.scanLog(split, tableAvroSchema, config).getRecords
|
||||
private val logRecordsKeyIterator = logRecords.keySet().iterator().asScala
|
||||
|
||||
private var recordToLoad: InternalRow = _
|
||||
override def hasNext: Boolean = {
|
||||
if (logRecordsKeyIterator.hasNext) {
|
||||
val curAvrokey = logRecordsKeyIterator.next()
|
||||
val curAvroRecord = logRecords.get(curAvrokey).getData.getInsertValue(tableAvroSchema)
|
||||
if (!curAvroRecord.isPresent) {
|
||||
// delete record found, skipping
|
||||
this.hasNext
|
||||
} else {
|
||||
val requiredAvroRecord = AvroConversionUtils
|
||||
.buildAvroRecordBySchema(curAvroRecord.get(), requiredAvroSchema, requiredFieldPosition, recordBuilder)
|
||||
recordToLoad = unsafeProjection(deserializer.deserialize(requiredAvroRecord).asInstanceOf[InternalRow])
|
||||
true
|
||||
}
|
||||
} else {
|
||||
false
|
||||
}
|
||||
}
|
||||
|
||||
override def next(): InternalRow = {
|
||||
recordToLoad
|
||||
}
|
||||
}
|
||||
|
||||
private def skipMergeFileIterator(split: HoodieMergeOnReadFileSplit,
|
||||
baseFileIterator: Iterator[InternalRow],
|
||||
config: Configuration): Iterator[InternalRow] =
|
||||
|
||||
@@ -42,19 +42,14 @@ import scala.collection.mutable
|
||||
*
|
||||
*/
|
||||
class IncrementalRelation(val sqlContext: SQLContext,
|
||||
val basePath: String,
|
||||
val optParams: Map[String, String],
|
||||
val userSchema: StructType) extends BaseRelation with TableScan {
|
||||
val userSchema: StructType,
|
||||
val metaClient: HoodieTableMetaClient) extends BaseRelation with TableScan {
|
||||
|
||||
private val log = LogManager.getLogger(classOf[IncrementalRelation])
|
||||
|
||||
val skeletonSchema: StructType = HoodieSparkUtils.getMetaSchema
|
||||
private val metaClient = new HoodieTableMetaClient(sqlContext.sparkContext.hadoopConfiguration, basePath, true)
|
||||
|
||||
// MOR tables not supported yet
|
||||
if (metaClient.getTableType.equals(HoodieTableType.MERGE_ON_READ)) {
|
||||
throw new HoodieException("Incremental view not implemented yet, for merge-on-read tables")
|
||||
}
|
||||
private val basePath = metaClient.getBasePath
|
||||
// TODO : Figure out a valid HoodieWriteConfig
|
||||
private val hoodieTable = HoodieSparkTable.create(HoodieWriteConfig.newBuilder().withPath(basePath).build(),
|
||||
new HoodieSparkEngineContext(new JavaSparkContext(sqlContext.sparkContext)),
|
||||
|
||||
@@ -0,0 +1,218 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hudi
|
||||
|
||||
import org.apache.hadoop.fs.{FileStatus, FileSystem, GlobPattern, Path}
|
||||
import org.apache.hadoop.mapred.JobConf
|
||||
import org.apache.hudi.common.fs.FSUtils
|
||||
import org.apache.hudi.common.model.HoodieRecord
|
||||
import org.apache.hudi.common.table.view.HoodieTableFileSystemView
|
||||
import org.apache.hudi.common.table.{HoodieTableMetaClient, TableSchemaResolver}
|
||||
import org.apache.hudi.exception.HoodieException
|
||||
import org.apache.hudi.hadoop.utils.HoodieInputFormatUtils.listAffectedFilesForCommits
|
||||
import org.apache.hudi.hadoop.utils.HoodieRealtimeRecordReaderUtils.getMaxCompactionMemoryInBytes
|
||||
import org.apache.log4j.LogManager
|
||||
import org.apache.spark.deploy.SparkHadoopUtil
|
||||
import org.apache.spark.rdd.RDD
|
||||
import org.apache.spark.sql.catalyst.InternalRow
|
||||
import org.apache.spark.sql.execution.datasources.PartitionedFile
|
||||
import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat
|
||||
import org.apache.spark.sql.sources._
|
||||
import org.apache.spark.sql.types.StructType
|
||||
import org.apache.spark.sql.{Row, SQLContext}
|
||||
|
||||
import scala.collection.JavaConversions._
|
||||
import scala.collection.mutable.ListBuffer
|
||||
|
||||
/**
|
||||
* Experimental.
|
||||
* Relation, that implements the Hoodie incremental view for Merge On Read table.
|
||||
*
|
||||
*/
|
||||
class MergeOnReadIncrementalRelation(val sqlContext: SQLContext,
|
||||
val optParams: Map[String, String],
|
||||
val userSchema: StructType,
|
||||
val metaClient: HoodieTableMetaClient)
|
||||
extends BaseRelation with PrunedFilteredScan {
|
||||
|
||||
private val log = LogManager.getLogger(classOf[MergeOnReadIncrementalRelation])
|
||||
private val conf = sqlContext.sparkContext.hadoopConfiguration
|
||||
private val jobConf = new JobConf(conf)
|
||||
private val fs = FSUtils.getFs(metaClient.getBasePath, conf)
|
||||
private val commitTimeline = metaClient.getCommitsAndCompactionTimeline.filterCompletedInstants()
|
||||
if (commitTimeline.empty()) {
|
||||
throw new HoodieException("No instants to incrementally pull")
|
||||
}
|
||||
if (!optParams.contains(DataSourceReadOptions.BEGIN_INSTANTTIME_OPT_KEY)) {
|
||||
throw new HoodieException(s"Specify the begin instant time to pull from using " +
|
||||
s"option ${DataSourceReadOptions.BEGIN_INSTANTTIME_OPT_KEY}")
|
||||
}
|
||||
|
||||
private val lastInstant = commitTimeline.lastInstant().get()
|
||||
private val mergeType = optParams.getOrElse(
|
||||
DataSourceReadOptions.REALTIME_MERGE_OPT_KEY,
|
||||
DataSourceReadOptions.DEFAULT_REALTIME_MERGE_OPT_VAL)
|
||||
|
||||
private val commitsTimelineToReturn = commitTimeline.findInstantsInRange(
|
||||
optParams(DataSourceReadOptions.BEGIN_INSTANTTIME_OPT_KEY),
|
||||
optParams.getOrElse(DataSourceReadOptions.END_INSTANTTIME_OPT_KEY, lastInstant.getTimestamp))
|
||||
log.debug(s"${commitsTimelineToReturn.getInstants.iterator().toList.map(f => f.toString).mkString(",")}")
|
||||
private val commitsToReturn = commitsTimelineToReturn.getInstants.iterator().toList
|
||||
private val schemaUtil = new TableSchemaResolver(metaClient)
|
||||
private val tableAvroSchema = schemaUtil.getTableAvroSchema
|
||||
private val tableStructSchema = AvroConversionUtils.convertAvroSchemaToStructType(tableAvroSchema)
|
||||
private val maxCompactionMemoryInBytes = getMaxCompactionMemoryInBytes(jobConf)
|
||||
private val fileIndex = buildFileIndex()
|
||||
|
||||
override def schema: StructType = tableStructSchema
|
||||
|
||||
override def needConversion: Boolean = false
|
||||
|
||||
override def unhandledFilters(filters: Array[Filter]): Array[Filter] = {
|
||||
val isNotNullFilter = IsNotNull(HoodieRecord.COMMIT_TIME_METADATA_FIELD)
|
||||
val largerThanFilter = GreaterThanOrEqual(HoodieRecord.COMMIT_TIME_METADATA_FIELD, commitsToReturn.head.getTimestamp)
|
||||
val lessThanFilter = LessThanOrEqual(HoodieRecord.COMMIT_TIME_METADATA_FIELD, commitsToReturn.last.getTimestamp)
|
||||
filters :+isNotNullFilter :+ largerThanFilter :+ lessThanFilter
|
||||
}
|
||||
|
||||
override def buildScan(requiredColumns: Array[String], filters: Array[Filter]): RDD[Row] = {
|
||||
log.debug(s"buildScan requiredColumns = ${requiredColumns.mkString(",")}")
|
||||
log.debug(s"buildScan filters = ${filters.mkString(",")}")
|
||||
// config to ensure the push down filter for parquet will be applied.
|
||||
sqlContext.sparkSession.sessionState.conf.setConfString("spark.sql.parquet.filterPushdown", "true")
|
||||
sqlContext.sparkSession.sessionState.conf.setConfString("spark.sql.parquet.recordLevelFilter.enabled", "true")
|
||||
sqlContext.sparkSession.sessionState.conf.setConfString("spark.sql.parquet.enableVectorizedReader", "false")
|
||||
val pushDownFilter = {
|
||||
val isNotNullFilter = IsNotNull(HoodieRecord.COMMIT_TIME_METADATA_FIELD)
|
||||
val largerThanFilter = GreaterThanOrEqual(HoodieRecord.COMMIT_TIME_METADATA_FIELD, commitsToReturn.head.getTimestamp)
|
||||
val lessThanFilter = LessThanOrEqual(HoodieRecord.COMMIT_TIME_METADATA_FIELD, commitsToReturn.last.getTimestamp)
|
||||
filters :+isNotNullFilter :+ largerThanFilter :+ lessThanFilter
|
||||
}
|
||||
var requiredStructSchema = StructType(Seq())
|
||||
requiredColumns.foreach(col => {
|
||||
val field = tableStructSchema.find(_.name == col)
|
||||
if (field.isDefined) {
|
||||
requiredStructSchema = requiredStructSchema.add(field.get)
|
||||
}
|
||||
})
|
||||
val requiredAvroSchema = AvroConversionUtils
|
||||
.convertStructTypeToAvroSchema(requiredStructSchema, tableAvroSchema.getName, tableAvroSchema.getNamespace)
|
||||
val hoodieTableState = HoodieMergeOnReadTableState(
|
||||
tableStructSchema,
|
||||
requiredStructSchema,
|
||||
tableAvroSchema.toString,
|
||||
requiredAvroSchema.toString,
|
||||
fileIndex
|
||||
)
|
||||
val fullSchemaParquetReader = new ParquetFileFormat().buildReaderWithPartitionValues(
|
||||
sparkSession = sqlContext.sparkSession,
|
||||
dataSchema = tableStructSchema,
|
||||
partitionSchema = StructType(Nil),
|
||||
requiredSchema = tableStructSchema,
|
||||
filters = pushDownFilter,
|
||||
options = optParams,
|
||||
hadoopConf = sqlContext.sparkSession.sessionState.newHadoopConf()
|
||||
)
|
||||
val requiredSchemaParquetReader = new ParquetFileFormat().buildReaderWithPartitionValues(
|
||||
sparkSession = sqlContext.sparkSession,
|
||||
dataSchema = tableStructSchema,
|
||||
partitionSchema = StructType(Nil),
|
||||
requiredSchema = requiredStructSchema,
|
||||
filters = pushDownFilter,
|
||||
options = optParams,
|
||||
hadoopConf = sqlContext.sparkSession.sessionState.newHadoopConf()
|
||||
)
|
||||
|
||||
// Follow the implementation of Spark internal HadoopRDD to handle the broadcast configuration.
|
||||
FileSystem.getLocal(jobConf)
|
||||
SparkHadoopUtil.get.addCredentials(jobConf)
|
||||
val rdd = new HoodieMergeOnReadRDD(
|
||||
sqlContext.sparkContext,
|
||||
jobConf,
|
||||
fullSchemaParquetReader,
|
||||
requiredSchemaParquetReader,
|
||||
hoodieTableState
|
||||
)
|
||||
rdd.asInstanceOf[RDD[Row]]
|
||||
}
|
||||
|
||||
def buildFileIndex(): List[HoodieMergeOnReadFileSplit] = {
|
||||
val partitionsWithFileStatus = listAffectedFilesForCommits(new Path(metaClient.getBasePath),
|
||||
commitsToReturn, commitsTimelineToReturn)
|
||||
val affectedFileStatus = new ListBuffer[FileStatus]
|
||||
partitionsWithFileStatus.iterator.foreach(p =>
|
||||
p._2.iterator.foreach(status => affectedFileStatus += status._2))
|
||||
val fsView = new HoodieTableFileSystemView(metaClient,
|
||||
commitsTimelineToReturn, affectedFileStatus.toArray)
|
||||
|
||||
// Iterate partitions to create splits
|
||||
val fileGroup = partitionsWithFileStatus.keySet().flatMap(partitionPath =>
|
||||
fsView.getAllFileGroups(partitionPath).iterator()
|
||||
).toList
|
||||
val latestCommit = fsView.getLastInstant.get().getTimestamp
|
||||
if (log.isDebugEnabled) {
|
||||
fileGroup.foreach(f => log.debug(s"current file group id: " +
|
||||
s"${f.getFileGroupId} and file slices ${f.getLatestFileSlice.get().toString}"))
|
||||
}
|
||||
|
||||
// Filter files based on user defined glob pattern
|
||||
val pathGlobPattern = optParams.getOrElse(
|
||||
DataSourceReadOptions.INCR_PATH_GLOB_OPT_KEY,
|
||||
DataSourceReadOptions.DEFAULT_INCR_PATH_GLOB_OPT_VAL)
|
||||
val filteredFileGroup = if(!pathGlobPattern
|
||||
.equals(DataSourceReadOptions.DEFAULT_INCR_PATH_GLOB_OPT_VAL)) {
|
||||
val globMatcher = new GlobPattern("*" + pathGlobPattern)
|
||||
fileGroup.filter(f => {
|
||||
if (f.getLatestFileSlice.get().getBaseFile.isPresent) {
|
||||
globMatcher.matches(f.getLatestFileSlice.get().getBaseFile.get.getPath)
|
||||
} else {
|
||||
globMatcher.matches(f.getLatestFileSlice.get().getLatestLogFile.get().getPath.toString)
|
||||
}
|
||||
})
|
||||
} else {
|
||||
fileGroup
|
||||
}
|
||||
|
||||
// Build HoodieMergeOnReadFileSplit.
|
||||
filteredFileGroup.map(f => {
|
||||
// Ensure get the base file when there is a pending compaction, which means the base file
|
||||
// won't be in the latest file slice.
|
||||
val baseFiles = f.getAllFileSlices.iterator().filter(slice => slice.getBaseFile.isPresent).toList
|
||||
val partitionedFile = if (baseFiles.nonEmpty) {
|
||||
val baseFile = baseFiles.head.getBaseFile
|
||||
Option(PartitionedFile(InternalRow.empty, baseFile.get.getPath, 0, baseFile.get.getFileLen))
|
||||
}
|
||||
else {
|
||||
Option.empty
|
||||
}
|
||||
|
||||
val logPath = if (f.getLatestFileSlice.isPresent) {
|
||||
//If log path doesn't exist, we still include an empty path to avoid using
|
||||
// the default parquet reader to ensure the push down filter will be applied.
|
||||
Option(f.getLatestFileSlice.get().getLogFiles.iterator().toList
|
||||
.map(logfile => logfile.getPath.toString))
|
||||
}
|
||||
else {
|
||||
Option.empty
|
||||
}
|
||||
|
||||
HoodieMergeOnReadFileSplit(partitionedFile, logPath,
|
||||
latestCommit, metaClient.getBasePath, maxCompactionMemoryInBytes, mergeType)
|
||||
})
|
||||
}
|
||||
}
|
||||
@@ -39,7 +39,7 @@ import org.apache.spark.sql.types.StructType
|
||||
|
||||
import scala.collection.JavaConverters._
|
||||
|
||||
case class HoodieMergeOnReadFileSplit(dataFile: PartitionedFile,
|
||||
case class HoodieMergeOnReadFileSplit(dataFile: Option[PartitionedFile],
|
||||
logPaths: Option[List[String]],
|
||||
latestCommit: String,
|
||||
tablePath: String,
|
||||
@@ -99,7 +99,7 @@ class MergeOnReadSnapshotRelation(val sqlContext: SQLContext,
|
||||
dataSchema = tableStructSchema,
|
||||
partitionSchema = StructType(Nil),
|
||||
requiredSchema = tableStructSchema,
|
||||
filters = Seq(),
|
||||
filters = filters,
|
||||
options = optParams,
|
||||
hadoopConf = sqlContext.sparkSession.sessionState.newHadoopConf()
|
||||
)
|
||||
@@ -140,7 +140,7 @@ class MergeOnReadSnapshotRelation(val sqlContext: SQLContext,
|
||||
val baseFile = kv._1
|
||||
val logPaths = if (kv._2.isEmpty) Option.empty else Option(kv._2.asScala.toList)
|
||||
val partitionedFile = PartitionedFile(InternalRow.empty, baseFile.getPath, 0, baseFile.getFileLen)
|
||||
HoodieMergeOnReadFileSplit(partitionedFile, logPaths, latestCommit,
|
||||
HoodieMergeOnReadFileSplit(Option(partitionedFile), logPaths, latestCommit,
|
||||
metaClient.getBasePath, maxCompactionMemoryInBytes, mergeType)
|
||||
}).toList
|
||||
fileSplits
|
||||
|
||||
Reference in New Issue
Block a user