[HUDI-69] Support Spark Datasource for MOR table - RDD approach (#1848)
- This PR implements Spark Datasource for MOR table in the RDD approach. - Implemented SnapshotRelation - Implemented HudiMergeOnReadRDD - Implemented separate Iterator to handle merge and unmerge record reader. - Added TestMORDataSource to verify this feature. - Clean up test file name, add tests for mixed query type tests - We can now revert the change made in DefaultSource Co-authored-by: Vinoth Chandar <vchandar@confluent.io>
This commit is contained in:
@@ -17,7 +17,7 @@
|
||||
|
||||
package org.apache.hudi
|
||||
|
||||
import org.apache.avro.generic.GenericRecord
|
||||
import org.apache.avro.generic.{GenericRecord, GenericRecordBuilder, IndexedRecord}
|
||||
import org.apache.hudi.common.model.HoodieKey
|
||||
import org.apache.avro.Schema
|
||||
import org.apache.spark.rdd.RDD
|
||||
@@ -26,6 +26,7 @@ import org.apache.spark.sql.catalyst.encoders.RowEncoder
|
||||
import org.apache.spark.sql.types._
|
||||
import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}
|
||||
|
||||
import scala.collection.JavaConverters._
|
||||
|
||||
object AvroConversionUtils {
|
||||
|
||||
@@ -78,4 +79,15 @@ object AvroConversionUtils {
|
||||
def convertAvroSchemaToStructType(avroSchema: Schema): StructType = {
|
||||
SchemaConverters.toSqlType(avroSchema).dataType.asInstanceOf[StructType]
|
||||
}
|
||||
|
||||
def buildAvroRecordBySchema(record: IndexedRecord,
|
||||
requiredSchema: Schema,
|
||||
requiredPos: List[Int],
|
||||
recordBuilder: GenericRecordBuilder): GenericRecord = {
|
||||
val requiredFields = requiredSchema.getFields.asScala
|
||||
assert(requiredFields.length == requiredPos.length)
|
||||
val positionIterator = requiredPos.iterator
|
||||
requiredFields.foreach(f => recordBuilder.set(f, record.get(positionIterator.next())))
|
||||
recordBuilder.build()
|
||||
}
|
||||
}
|
||||
|
||||
@@ -51,6 +51,14 @@ object DataSourceReadOptions {
|
||||
val QUERY_TYPE_INCREMENTAL_OPT_VAL = "incremental"
|
||||
val DEFAULT_QUERY_TYPE_OPT_VAL: String = QUERY_TYPE_SNAPSHOT_OPT_VAL
|
||||
|
||||
/**
|
||||
* For Snapshot query on merge on read table. Use this key to define the payload class.
|
||||
*/
|
||||
val REALTIME_MERGE_OPT_KEY = "hoodie.datasource.merge.type"
|
||||
val REALTIME_SKIP_MERGE_OPT_VAL = "skip_merge"
|
||||
val REALTIME_PAYLOAD_COMBINE_OPT_VAL = "payload_combine"
|
||||
val DEFAULT_REALTIME_MERGE_OPT_VAL = REALTIME_PAYLOAD_COMBINE_OPT_VAL
|
||||
|
||||
@Deprecated
|
||||
val VIEW_TYPE_OPT_KEY = "hoodie.datasource.view.type"
|
||||
@Deprecated
|
||||
|
||||
@@ -18,8 +18,9 @@
|
||||
package org.apache.hudi
|
||||
|
||||
import org.apache.hudi.DataSourceReadOptions._
|
||||
import org.apache.hudi.common.fs.FSUtils
|
||||
import org.apache.hudi.common.model.HoodieTableType
|
||||
import org.apache.hudi.common.table.HoodieTableMetaClient
|
||||
import org.apache.hudi.config.HoodieWriteConfig
|
||||
import org.apache.hudi.exception.HoodieException
|
||||
import org.apache.hudi.hadoop.HoodieROTablePathFilter
|
||||
import org.apache.log4j.LogManager
|
||||
@@ -60,26 +61,20 @@ class DefaultSource extends RelationProvider
|
||||
throw new HoodieException("'path' must be specified.")
|
||||
}
|
||||
|
||||
val fs = FSUtils.getFs(path.get, sqlContext.sparkContext.hadoopConfiguration)
|
||||
val globPaths = HoodieSparkUtils.checkAndGlobPathIfNecessary(Seq(path.get), fs)
|
||||
val tablePath = DataSourceUtils.getTablePath(fs, globPaths.toArray)
|
||||
if (parameters(QUERY_TYPE_OPT_KEY).equals(QUERY_TYPE_SNAPSHOT_OPT_VAL)) {
|
||||
// this is just effectively RO view only, where `path` can contain a mix of
|
||||
// non-hoodie/hoodie path files. set the path filter up
|
||||
sqlContext.sparkContext.hadoopConfiguration.setClass(
|
||||
"mapreduce.input.pathFilter.class",
|
||||
classOf[HoodieROTablePathFilter],
|
||||
classOf[org.apache.hadoop.fs.PathFilter])
|
||||
|
||||
log.info("Constructing hoodie (as parquet) data source with options :" + parameters)
|
||||
log.warn("Snapshot view not supported yet via data source, for MERGE_ON_READ tables. " +
|
||||
"Please query the Hive table registered using Spark SQL.")
|
||||
// simply return as a regular parquet relation
|
||||
DataSource.apply(
|
||||
sparkSession = sqlContext.sparkSession,
|
||||
userSpecifiedSchema = Option(schema),
|
||||
className = "parquet",
|
||||
options = parameters)
|
||||
.resolveRelation()
|
||||
val metaClient = new HoodieTableMetaClient(fs.getConf, tablePath)
|
||||
if (metaClient.getTableType.equals(HoodieTableType.MERGE_ON_READ)) {
|
||||
new MergeOnReadSnapshotRelation(sqlContext, optParams, schema, globPaths, metaClient)
|
||||
} else {
|
||||
getBaseFileOnlyView(sqlContext, parameters, schema)
|
||||
}
|
||||
} else if(parameters(QUERY_TYPE_OPT_KEY).equals(QUERY_TYPE_READ_OPTIMIZED_OPT_VAL)) {
|
||||
getBaseFileOnlyView(sqlContext, parameters, schema)
|
||||
} else if (parameters(QUERY_TYPE_OPT_KEY).equals(QUERY_TYPE_INCREMENTAL_OPT_VAL)) {
|
||||
new IncrementalRelation(sqlContext, path.get, optParams, schema)
|
||||
new IncrementalRelation(sqlContext, tablePath, optParams, schema)
|
||||
} else {
|
||||
throw new HoodieException("Invalid query type :" + parameters(QUERY_TYPE_OPT_KEY))
|
||||
}
|
||||
@@ -107,7 +102,7 @@ class DefaultSource extends RelationProvider
|
||||
df: DataFrame): BaseRelation = {
|
||||
val parameters = HoodieSparkSqlWriter.parametersWithWriteDefaults(optParams)
|
||||
HoodieSparkSqlWriter.write(sqlContext, mode, parameters, df)
|
||||
new HudiEmptyRelation(sqlContext, df.schema)
|
||||
new HoodieEmptyRelation(sqlContext, df.schema)
|
||||
}
|
||||
|
||||
override def createSink(sqlContext: SQLContext,
|
||||
@@ -123,4 +118,25 @@ class DefaultSource extends RelationProvider
|
||||
}
|
||||
|
||||
override def shortName(): String = "hudi"
|
||||
|
||||
private def getBaseFileOnlyView(sqlContext: SQLContext,
|
||||
optParams: Map[String, String],
|
||||
schema: StructType): BaseRelation = {
|
||||
log.warn("Loading Base File Only View.")
|
||||
// this is just effectively RO view only, where `path` can contain a mix of
|
||||
// non-hoodie/hoodie path files. set the path filter up
|
||||
sqlContext.sparkContext.hadoopConfiguration.setClass(
|
||||
"mapreduce.input.pathFilter.class",
|
||||
classOf[HoodieROTablePathFilter],
|
||||
classOf[org.apache.hadoop.fs.PathFilter])
|
||||
|
||||
log.info("Constructing hoodie (as parquet) data source with options :" + optParams)
|
||||
// simply return as a regular parquet relation
|
||||
DataSource.apply(
|
||||
sparkSession = sqlContext.sparkSession,
|
||||
userSpecifiedSchema = Option(schema),
|
||||
className = "parquet",
|
||||
options = optParams)
|
||||
.resolveRelation()
|
||||
}
|
||||
}
|
||||
|
||||
@@ -28,8 +28,8 @@ import org.apache.spark.sql.types.StructType
|
||||
* @param sqlContext Spark SQL Context
|
||||
* @param userSchema Users data schema
|
||||
*/
|
||||
class HudiEmptyRelation(val sqlContext: SQLContext,
|
||||
val userSchema: StructType) extends BaseRelation {
|
||||
class HoodieEmptyRelation(val sqlContext: SQLContext,
|
||||
val userSchema: StructType) extends BaseRelation {
|
||||
|
||||
override def schema: StructType = userSchema
|
||||
}
|
||||
@@ -0,0 +1,274 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hudi
|
||||
|
||||
import org.apache.hudi.common.fs.FSUtils
|
||||
import org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner
|
||||
import org.apache.hudi.exception.HoodieException
|
||||
import org.apache.hudi.hadoop.config.HoodieRealtimeConfig
|
||||
import org.apache.hudi.hadoop.utils.HoodieInputFormatUtils.HOODIE_RECORD_KEY_COL_POS
|
||||
|
||||
import org.apache.avro.Schema
|
||||
import org.apache.avro.generic.{GenericRecord, GenericRecordBuilder}
|
||||
import org.apache.hadoop.conf.Configuration
|
||||
import org.apache.spark.{Partition, SerializableWritable, SparkContext, TaskContext}
|
||||
import org.apache.spark.rdd.RDD
|
||||
import org.apache.spark.sql.avro.{AvroDeserializer, AvroSerializer}
|
||||
import org.apache.spark.sql.catalyst.InternalRow
|
||||
import org.apache.spark.sql.catalyst.expressions.{SpecificInternalRow, UnsafeProjection, UnsafeRow}
|
||||
import org.apache.spark.sql.execution.datasources.PartitionedFile
|
||||
import org.apache.spark.sql.vectorized.ColumnarBatch
|
||||
|
||||
import scala.collection.JavaConverters._
|
||||
import scala.collection.mutable
|
||||
import scala.util.Try
|
||||
|
||||
case class HoodieMergeOnReadPartition(index: Int, split: HoodieMergeOnReadFileSplit) extends Partition
|
||||
|
||||
class HoodieMergeOnReadRDD(@transient sc: SparkContext,
|
||||
@transient config: Configuration,
|
||||
fullSchemaFileReader: PartitionedFile => Iterator[Any],
|
||||
requiredSchemaFileReader: PartitionedFile => Iterator[Any],
|
||||
tableState: HoodieMergeOnReadTableState)
|
||||
extends RDD[InternalRow](sc, Nil) {
|
||||
|
||||
private val confBroadcast = sc.broadcast(new SerializableWritable(config))
|
||||
|
||||
override def compute(split: Partition, context: TaskContext): Iterator[InternalRow] = {
|
||||
val mergeParquetPartition = split.asInstanceOf[HoodieMergeOnReadPartition]
|
||||
mergeParquetPartition.split match {
|
||||
case dataFileOnlySplit if dataFileOnlySplit.logPaths.isEmpty =>
|
||||
read(mergeParquetPartition.split.dataFile, requiredSchemaFileReader)
|
||||
case skipMergeSplit if skipMergeSplit.mergeType
|
||||
.equals(DataSourceReadOptions.REALTIME_SKIP_MERGE_OPT_VAL) =>
|
||||
skipMergeFileIterator(
|
||||
skipMergeSplit,
|
||||
read(mergeParquetPartition.split.dataFile, requiredSchemaFileReader),
|
||||
getConfig
|
||||
)
|
||||
case payloadCombineSplit if payloadCombineSplit.mergeType
|
||||
.equals(DataSourceReadOptions.REALTIME_PAYLOAD_COMBINE_OPT_VAL) =>
|
||||
payloadCombineFileIterator(
|
||||
payloadCombineSplit,
|
||||
read(mergeParquetPartition.split.dataFile, fullSchemaFileReader),
|
||||
getConfig
|
||||
)
|
||||
case _ => throw new HoodieException(s"Unable to select an Iterator to read the Hoodie MOR File Split for " +
|
||||
s"file path: ${mergeParquetPartition.split.dataFile.filePath}" +
|
||||
s"log paths: ${mergeParquetPartition.split.logPaths.toString}" +
|
||||
s"hoodie table path: ${mergeParquetPartition.split.tablePath}" +
|
||||
s"spark partition Index: ${mergeParquetPartition.index}" +
|
||||
s"merge type: ${mergeParquetPartition.split.mergeType}")
|
||||
}
|
||||
}
|
||||
|
||||
override protected def getPartitions: Array[Partition] = {
|
||||
tableState
|
||||
.hoodieRealtimeFileSplits
|
||||
.zipWithIndex
|
||||
.map(file => HoodieMergeOnReadPartition(file._2, file._1)).toArray
|
||||
}
|
||||
|
||||
private def getConfig: Configuration = {
|
||||
val conf = confBroadcast.value.value
|
||||
HoodieMergeOnReadRDD.CONFIG_INSTANTIATION_LOCK.synchronized {
|
||||
new Configuration(conf)
|
||||
}
|
||||
}
|
||||
|
||||
private def read(partitionedFile: PartitionedFile,
|
||||
readFileFunction: PartitionedFile => Iterator[Any]): Iterator[InternalRow] = {
|
||||
val fileIterator = readFileFunction(partitionedFile)
|
||||
val rows = fileIterator.flatMap(_ match {
|
||||
case r: InternalRow => Seq(r)
|
||||
case b: ColumnarBatch => b.rowIterator().asScala
|
||||
})
|
||||
rows
|
||||
}
|
||||
|
||||
private def skipMergeFileIterator(split: HoodieMergeOnReadFileSplit,
|
||||
baseFileIterator: Iterator[InternalRow],
|
||||
config: Configuration): Iterator[InternalRow] =
|
||||
new Iterator[InternalRow] {
|
||||
private val tableAvroSchema = new Schema.Parser().parse(tableState.tableAvroSchema)
|
||||
private val requiredAvroSchema = new Schema.Parser().parse(tableState.requiredAvroSchema)
|
||||
private val requiredFieldPosition =
|
||||
tableState.requiredStructSchema
|
||||
.map(f => tableAvroSchema.getField(f.name).pos()).toList
|
||||
private val recordBuilder = new GenericRecordBuilder(requiredAvroSchema)
|
||||
private val deserializer = new AvroDeserializer(requiredAvroSchema, tableState.requiredStructSchema)
|
||||
private val unsafeProjection = UnsafeProjection.create(tableState.requiredStructSchema)
|
||||
private val logRecords = HoodieMergeOnReadRDD.scanLog(split, tableAvroSchema, config).getRecords
|
||||
private val logRecordsKeyIterator = logRecords.keySet().iterator().asScala
|
||||
|
||||
private var recordToLoad: InternalRow = _
|
||||
|
||||
@scala.annotation.tailrec
|
||||
override def hasNext: Boolean = {
|
||||
if (baseFileIterator.hasNext) {
|
||||
recordToLoad = baseFileIterator.next()
|
||||
true
|
||||
} else {
|
||||
if (logRecordsKeyIterator.hasNext) {
|
||||
val curAvrokey = logRecordsKeyIterator.next()
|
||||
val curAvroRecord = logRecords.get(curAvrokey).getData.getInsertValue(tableAvroSchema)
|
||||
if (!curAvroRecord.isPresent) {
|
||||
// delete record found, skipping
|
||||
this.hasNext
|
||||
} else {
|
||||
val requiredAvroRecord = AvroConversionUtils
|
||||
.buildAvroRecordBySchema(curAvroRecord.get(), requiredAvroSchema, requiredFieldPosition, recordBuilder)
|
||||
recordToLoad = unsafeProjection(deserializer.deserialize(requiredAvroRecord).asInstanceOf[InternalRow])
|
||||
true
|
||||
}
|
||||
} else {
|
||||
false
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
override def next(): InternalRow = {
|
||||
recordToLoad
|
||||
}
|
||||
}
|
||||
|
||||
private def payloadCombineFileIterator(split: HoodieMergeOnReadFileSplit,
|
||||
baseFileIterator: Iterator[InternalRow],
|
||||
config: Configuration): Iterator[InternalRow] =
|
||||
new Iterator[InternalRow] {
|
||||
private val tableAvroSchema = new Schema.Parser().parse(tableState.tableAvroSchema)
|
||||
private val requiredAvroSchema = new Schema.Parser().parse(tableState.requiredAvroSchema)
|
||||
private val requiredFieldPosition =
|
||||
tableState.requiredStructSchema
|
||||
.map(f => tableAvroSchema.getField(f.name).pos()).toList
|
||||
private val serializer = new AvroSerializer(tableState.tableStructSchema, tableAvroSchema, false)
|
||||
private val requiredDeserializer = new AvroDeserializer(requiredAvroSchema, tableState.requiredStructSchema)
|
||||
private val recordBuilder = new GenericRecordBuilder(requiredAvroSchema)
|
||||
private val unsafeProjection = UnsafeProjection.create(tableState.requiredStructSchema)
|
||||
private val logRecords = HoodieMergeOnReadRDD.scanLog(split, tableAvroSchema, config).getRecords
|
||||
private val logRecordsKeyIterator = logRecords.keySet().iterator().asScala
|
||||
private val keyToSkip = mutable.Set.empty[String]
|
||||
|
||||
private var recordToLoad: InternalRow = _
|
||||
|
||||
@scala.annotation.tailrec
|
||||
override def hasNext: Boolean = {
|
||||
if (baseFileIterator.hasNext) {
|
||||
val curRow = baseFileIterator.next()
|
||||
val curKey = curRow.getString(HOODIE_RECORD_KEY_COL_POS)
|
||||
if (logRecords.containsKey(curKey)) {
|
||||
// duplicate key found, merging
|
||||
keyToSkip.add(curKey)
|
||||
val mergedAvroRecord = mergeRowWithLog(curRow, curKey)
|
||||
if (!mergedAvroRecord.isPresent) {
|
||||
// deleted
|
||||
this.hasNext
|
||||
} else {
|
||||
// load merged record as InternalRow with required schema
|
||||
val requiredAvroRecord = AvroConversionUtils
|
||||
.buildAvroRecordBySchema(
|
||||
mergedAvroRecord.get(),
|
||||
requiredAvroSchema,
|
||||
requiredFieldPosition,
|
||||
recordBuilder
|
||||
)
|
||||
recordToLoad = unsafeProjection(requiredDeserializer
|
||||
.deserialize(requiredAvroRecord).asInstanceOf[InternalRow])
|
||||
true
|
||||
}
|
||||
} else {
|
||||
// No merge needed, load current row with required schema
|
||||
recordToLoad = unsafeProjection(createRowWithRequiredSchema(curRow))
|
||||
true
|
||||
}
|
||||
} else {
|
||||
if (logRecordsKeyIterator.hasNext) {
|
||||
val curKey = logRecordsKeyIterator.next()
|
||||
if (keyToSkip.contains(curKey)) {
|
||||
this.hasNext
|
||||
} else {
|
||||
val insertAvroRecord =
|
||||
logRecords.get(curKey).getData.getInsertValue(tableAvroSchema)
|
||||
if (!insertAvroRecord.isPresent) {
|
||||
// stand alone delete record, skipping
|
||||
this.hasNext
|
||||
} else {
|
||||
val requiredAvroRecord = AvroConversionUtils
|
||||
.buildAvroRecordBySchema(
|
||||
insertAvroRecord.get(),
|
||||
requiredAvroSchema,
|
||||
requiredFieldPosition,
|
||||
recordBuilder
|
||||
)
|
||||
recordToLoad = unsafeProjection(requiredDeserializer
|
||||
.deserialize(requiredAvroRecord).asInstanceOf[InternalRow])
|
||||
true
|
||||
}
|
||||
}
|
||||
} else {
|
||||
false
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
override def next(): InternalRow = recordToLoad
|
||||
|
||||
private def createRowWithRequiredSchema(row: InternalRow): InternalRow = {
|
||||
val rowToReturn = new SpecificInternalRow(tableState.requiredStructSchema)
|
||||
val posIterator = requiredFieldPosition.iterator
|
||||
var curIndex = 0
|
||||
tableState.requiredStructSchema.foreach(
|
||||
f => {
|
||||
val curPos = posIterator.next()
|
||||
val curField = row.get(curPos, f.dataType)
|
||||
rowToReturn.update(curIndex, curField)
|
||||
curIndex = curIndex + 1
|
||||
}
|
||||
)
|
||||
rowToReturn
|
||||
}
|
||||
|
||||
private def mergeRowWithLog(curRow: InternalRow, curKey: String) = {
|
||||
val historyAvroRecord = serializer.serialize(curRow).asInstanceOf[GenericRecord]
|
||||
logRecords.get(curKey).getData.combineAndGetUpdateValue(historyAvroRecord, tableAvroSchema)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private object HoodieMergeOnReadRDD {
|
||||
val CONFIG_INSTANTIATION_LOCK = new Object()
|
||||
|
||||
def scanLog(split: HoodieMergeOnReadFileSplit, logSchema: Schema, config: Configuration): HoodieMergedLogRecordScanner = {
|
||||
val fs = FSUtils.getFs(split.tablePath, config)
|
||||
new HoodieMergedLogRecordScanner(
|
||||
fs,
|
||||
split.tablePath,
|
||||
split.logPaths.get.asJava,
|
||||
logSchema,
|
||||
split.latestCommit,
|
||||
split.maxCompactionMemoryInBytes,
|
||||
Try(config.get(HoodieRealtimeConfig.COMPACTION_LAZY_BLOCK_READ_ENABLED_PROP,
|
||||
HoodieRealtimeConfig.DEFAULT_COMPACTION_LAZY_BLOCK_READ_ENABLED).toBoolean).getOrElse(false),
|
||||
false,
|
||||
config.getInt(HoodieRealtimeConfig.MAX_DFS_STREAM_BUFFER_SIZE_PROP,
|
||||
HoodieRealtimeConfig.DEFAULT_MAX_DFS_STREAM_BUFFER_SIZE),
|
||||
config.get(HoodieRealtimeConfig.SPILLABLE_MAP_BASE_PATH_PROP,
|
||||
HoodieRealtimeConfig.DEFAULT_SPILLABLE_MAP_BASE_PATH))
|
||||
}
|
||||
}
|
||||
@@ -27,9 +27,9 @@ import org.apache.spark.sql.types.{StringType, StructField, StructType}
|
||||
import scala.collection.JavaConverters._
|
||||
|
||||
|
||||
object HudiSparkUtils {
|
||||
object HoodieSparkUtils {
|
||||
|
||||
def getHudiMetadataSchema: StructType = {
|
||||
def getMetaSchema: StructType = {
|
||||
StructType(HoodieRecord.HOODIE_META_COLUMNS.asScala.map(col => {
|
||||
StructField(col, StringType, nullable = true)
|
||||
}))
|
||||
@@ -17,14 +17,14 @@
|
||||
|
||||
package org.apache.hudi
|
||||
|
||||
import org.apache.hadoop.fs.GlobPattern
|
||||
import org.apache.hadoop.fs.Path
|
||||
import org.apache.hudi.avro.HoodieAvroUtils
|
||||
import org.apache.hudi.common.model.{HoodieCommitMetadata, HoodieRecord, HoodieTableType}
|
||||
import org.apache.hudi.common.table.{HoodieTableMetaClient, TableSchemaResolver}
|
||||
import org.apache.hudi.config.HoodieWriteConfig
|
||||
import org.apache.hudi.exception.HoodieException
|
||||
import org.apache.hudi.table.HoodieTable
|
||||
|
||||
import org.apache.hadoop.fs.GlobPattern
|
||||
import org.apache.log4j.LogManager
|
||||
import org.apache.spark.rdd.RDD
|
||||
import org.apache.spark.sql.sources.{BaseRelation, TableScan}
|
||||
@@ -47,7 +47,8 @@ class IncrementalRelation(val sqlContext: SQLContext,
|
||||
|
||||
private val log = LogManager.getLogger(classOf[IncrementalRelation])
|
||||
|
||||
private val metaClient = new HoodieTableMetaClient(sqlContext.sparkContext.hadoopConfiguration, basePath, true)
|
||||
private val metaClient =
|
||||
new HoodieTableMetaClient(sqlContext.sparkContext.hadoopConfiguration, basePath, true)
|
||||
// MOR tables not supported yet
|
||||
if (metaClient.getTableType.equals(HoodieTableType.MERGE_ON_READ)) {
|
||||
throw new HoodieException("Incremental view not implemented yet, for merge-on-read tables")
|
||||
|
||||
@@ -0,0 +1,151 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hudi
|
||||
|
||||
import org.apache.hudi.common.model.HoodieBaseFile
|
||||
import org.apache.hudi.common.table.{HoodieTableMetaClient, TableSchemaResolver}
|
||||
import org.apache.hudi.common.table.view.HoodieTableFileSystemView
|
||||
import org.apache.hudi.exception.HoodieException
|
||||
import org.apache.hudi.hadoop.utils.HoodieRealtimeInputFormatUtils
|
||||
import org.apache.hudi.hadoop.utils.HoodieRealtimeRecordReaderUtils.getMaxCompactionMemoryInBytes
|
||||
|
||||
import org.apache.hadoop.fs.{FileSystem, Path}
|
||||
import org.apache.hadoop.mapred.JobConf
|
||||
import org.apache.spark.deploy.SparkHadoopUtil
|
||||
import org.apache.spark.internal.Logging
|
||||
import org.apache.spark.rdd.RDD
|
||||
import org.apache.spark.sql.catalyst.InternalRow
|
||||
import org.apache.spark.sql.execution.datasources.PartitionedFile
|
||||
import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat
|
||||
import org.apache.spark.sql.{Row, SQLContext}
|
||||
import org.apache.spark.sql.sources.{BaseRelation, Filter, PrunedFilteredScan}
|
||||
import org.apache.spark.sql.types.StructType
|
||||
|
||||
import scala.collection.JavaConverters._
|
||||
|
||||
case class HoodieMergeOnReadFileSplit(dataFile: PartitionedFile,
|
||||
logPaths: Option[List[String]],
|
||||
latestCommit: String,
|
||||
tablePath: String,
|
||||
maxCompactionMemoryInBytes: Long,
|
||||
mergeType: String)
|
||||
|
||||
case class HoodieMergeOnReadTableState(tableStructSchema: StructType,
|
||||
requiredStructSchema: StructType,
|
||||
tableAvroSchema: String,
|
||||
requiredAvroSchema: String,
|
||||
hoodieRealtimeFileSplits: List[HoodieMergeOnReadFileSplit])
|
||||
|
||||
class MergeOnReadSnapshotRelation(val sqlContext: SQLContext,
|
||||
val optParams: Map[String, String],
|
||||
val userSchema: StructType,
|
||||
val globPaths: Seq[Path],
|
||||
val metaClient: HoodieTableMetaClient)
|
||||
extends BaseRelation with PrunedFilteredScan with Logging {
|
||||
|
||||
private val conf = sqlContext.sparkContext.hadoopConfiguration
|
||||
private val jobConf = new JobConf(conf)
|
||||
// use schema from latest metadata, if not present, read schema from the data file
|
||||
private val schemaUtil = new TableSchemaResolver(metaClient)
|
||||
private val tableAvroSchema = schemaUtil.getTableAvroSchema
|
||||
private val tableStructSchema = AvroConversionUtils.convertAvroSchemaToStructType(tableAvroSchema)
|
||||
private val mergeType = optParams.getOrElse(
|
||||
DataSourceReadOptions.REALTIME_MERGE_OPT_KEY,
|
||||
DataSourceReadOptions.DEFAULT_REALTIME_MERGE_OPT_VAL)
|
||||
private val maxCompactionMemoryInBytes = getMaxCompactionMemoryInBytes(jobConf)
|
||||
private val fileIndex = buildFileIndex()
|
||||
|
||||
override def schema: StructType = tableStructSchema
|
||||
|
||||
override def needConversion: Boolean = false
|
||||
|
||||
override def buildScan(requiredColumns: Array[String], filters: Array[Filter]): RDD[Row] = {
|
||||
log.debug(s" buildScan requiredColumns = ${requiredColumns.mkString(",")}")
|
||||
log.debug(s" buildScan filters = ${filters.mkString(",")}")
|
||||
var requiredStructSchema = StructType(Seq())
|
||||
requiredColumns.foreach(col => {
|
||||
val field = tableStructSchema.find(_.name == col)
|
||||
if (field.isDefined) {
|
||||
requiredStructSchema = requiredStructSchema.add(field.get)
|
||||
}
|
||||
})
|
||||
val requiredAvroSchema = AvroConversionUtils
|
||||
.convertStructTypeToAvroSchema(requiredStructSchema, tableAvroSchema.getName, tableAvroSchema.getNamespace)
|
||||
val hoodieTableState = HoodieMergeOnReadTableState(
|
||||
tableStructSchema,
|
||||
requiredStructSchema,
|
||||
tableAvroSchema.toString,
|
||||
requiredAvroSchema.toString,
|
||||
fileIndex
|
||||
)
|
||||
val fullSchemaParquetReader = new ParquetFileFormat().buildReaderWithPartitionValues(
|
||||
sparkSession = sqlContext.sparkSession,
|
||||
dataSchema = tableStructSchema,
|
||||
partitionSchema = StructType(Nil),
|
||||
requiredSchema = tableStructSchema,
|
||||
filters = Seq(),
|
||||
options = optParams,
|
||||
hadoopConf = sqlContext.sparkSession.sessionState.newHadoopConf()
|
||||
)
|
||||
val requiredSchemaParquetReader = new ParquetFileFormat().buildReaderWithPartitionValues(
|
||||
sparkSession = sqlContext.sparkSession,
|
||||
dataSchema = tableStructSchema,
|
||||
partitionSchema = StructType(Nil),
|
||||
requiredSchema = requiredStructSchema,
|
||||
filters = filters,
|
||||
options = optParams,
|
||||
hadoopConf = sqlContext.sparkSession.sessionState.newHadoopConf()
|
||||
)
|
||||
|
||||
// Follow the implementation of Spark internal HadoopRDD to handle the broadcast configuration.
|
||||
FileSystem.getLocal(jobConf)
|
||||
SparkHadoopUtil.get.addCredentials(jobConf)
|
||||
val rdd = new HoodieMergeOnReadRDD(
|
||||
sqlContext.sparkContext,
|
||||
jobConf,
|
||||
fullSchemaParquetReader,
|
||||
requiredSchemaParquetReader,
|
||||
hoodieTableState
|
||||
)
|
||||
rdd.asInstanceOf[RDD[Row]]
|
||||
}
|
||||
|
||||
def buildFileIndex(): List[HoodieMergeOnReadFileSplit] = {
|
||||
val inMemoryFileIndex = HoodieSparkUtils.createInMemoryFileIndex(sqlContext.sparkSession, globPaths)
|
||||
val fileStatuses = inMemoryFileIndex.allFiles()
|
||||
if (fileStatuses.isEmpty) {
|
||||
throw new HoodieException("No files found for reading in user provided path.")
|
||||
}
|
||||
|
||||
val fsView = new HoodieTableFileSystemView(metaClient,
|
||||
metaClient.getActiveTimeline.getCommitsTimeline
|
||||
.filterCompletedInstants, fileStatuses.toArray)
|
||||
val latestFiles: List[HoodieBaseFile] = fsView.getLatestBaseFiles.iterator().asScala.toList
|
||||
val latestCommit = fsView.getLastInstant.get().getTimestamp
|
||||
val fileGroup = HoodieRealtimeInputFormatUtils.groupLogsByBaseFile(conf, latestFiles.asJava).asScala
|
||||
val fileSplits = fileGroup.map(kv => {
|
||||
val baseFile = kv._1
|
||||
val logPaths = if (kv._2.isEmpty) Option.empty else Option(kv._2.asScala.toList)
|
||||
val partitionedFile = PartitionedFile(InternalRow.empty, baseFile.getPath, 0, baseFile.getFileLen)
|
||||
HoodieMergeOnReadFileSplit(partitionedFile, logPaths, latestCommit,
|
||||
metaClient.getBasePath, maxCompactionMemoryInBytes, mergeType)
|
||||
}).toList
|
||||
fileSplits
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user