1
0

[HUDI-3396] Refactoring MergeOnReadRDD to avoid duplication, fetch only projected columns (#4888)

This commit is contained in:
Alexey Kudinkin
2022-03-25 09:32:03 -07:00
committed by GitHub
parent 12cc8e715b
commit 51034fecf1
12 changed files with 454 additions and 427 deletions

View File

@@ -206,17 +206,6 @@ object AvroConversionUtils {
SchemaConverters.toSqlType(avroSchema).dataType.asInstanceOf[StructType]
}
def buildAvroRecordBySchema(record: IndexedRecord,
requiredSchema: Schema,
requiredPos: Seq[Int],
recordBuilder: GenericRecordBuilder): GenericRecord = {
val requiredFields = requiredSchema.getFields.asScala
assert(requiredFields.length == requiredPos.length)
val positionIterator = requiredPos.iterator
requiredFields.foreach(f => recordBuilder.set(f, record.get(positionIterator.next())))
recordBuilder.build()
}
def getAvroRecordNameAndNamespace(tableName: String): (String, String) = {
val name = HoodieAvroUtils.sanitizeName(tableName)
(s"${name}_record", s"hoodie.${name}")

View File

@@ -22,7 +22,7 @@ import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.apache.hudi.HoodieBaseRelation.createBaseFileReader
import org.apache.hudi.common.table.HoodieTableMetaClient
import org.apache.spark.sql.{HoodieCatalystExpressionUtils, SQLContext}
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.Expression
import org.apache.spark.sql.execution.datasources._
@@ -52,6 +52,9 @@ class BaseFileOnlyRelation(sqlContext: SQLContext,
override type FileSplit = HoodieBaseFileSplit
override lazy val mandatoryColumns: Seq[String] =
Seq(recordKeyField)
protected override def composeRDD(fileSplits: Seq[HoodieBaseFileSplit],
partitionSchema: StructType,
tableSchema: HoodieTableSchema,

View File

@@ -23,7 +23,7 @@ import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileStatus, Path}
import org.apache.hadoop.hbase.io.hfile.CacheConfig
import org.apache.hadoop.mapred.JobConf
import org.apache.hudi.HoodieBaseRelation.{getPartitionPath, isMetadataTable}
import org.apache.hudi.HoodieBaseRelation.getPartitionPath
import org.apache.hudi.HoodieConversionUtils.toScalaOption
import org.apache.hudi.common.config.SerializableConfiguration
import org.apache.hudi.common.fs.FSUtils
@@ -32,8 +32,9 @@ import org.apache.hudi.common.table.timeline.{HoodieInstant, HoodieTimeline}
import org.apache.hudi.common.table.view.HoodieTableFileSystemView
import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient, TableSchemaResolver}
import org.apache.hudi.common.util.StringUtils
import org.apache.hudi.common.util.ValidationUtils.checkState
import org.apache.hudi.io.storage.HoodieHFileReader
import org.apache.hudi.metadata.{HoodieMetadataPayload, HoodieTableMetadata}
import org.apache.hudi.metadata.HoodieTableMetadata
import org.apache.spark.execution.datasources.HoodieInMemoryFileIndex
import org.apache.spark.internal.Logging
import org.apache.spark.rdd.RDD
@@ -53,8 +54,12 @@ trait HoodieFileSplit {}
case class HoodieTableSchema(structTypeSchema: StructType, avroSchemaStr: String)
case class HoodieTableState(recordKeyField: String,
preCombineFieldOpt: Option[String])
case class HoodieTableState(tablePath: String,
latestCommitTimestamp: String,
recordKeyField: String,
preCombineFieldOpt: Option[String],
usesVirtualKeys: Boolean,
recordPayloadClassName: String)
/**
* Hoodie BaseRelation which extends [[PrunedFilteredScan]].
@@ -78,13 +83,30 @@ abstract class HoodieBaseRelation(val sqlContext: SQLContext,
protected lazy val basePath: String = metaClient.getBasePath
// If meta fields are enabled, always prefer key from the meta field as opposed to user-specified one
// NOTE: This is historical behavior which is preserved as is
// NOTE: Record key-field is assumed singular here due to the either of
// - In case Hudi's meta fields are enabled: record key will be pre-materialized (stored) as part
// of the record's payload (as part of the Hudi's metadata)
// - In case Hudi's meta fields are disabled (virtual keys): in that case record has to bear _single field_
// identified as its (unique) primary key w/in its payload (this is a limitation of [[SimpleKeyGenerator]],
// which is the only [[KeyGenerator]] permitted for virtual-keys payloads)
protected lazy val recordKeyField: String =
if (tableConfig.populateMetaFields()) HoodieRecord.RECORD_KEY_METADATA_FIELD
else tableConfig.getRecordKeyFieldProp
if (tableConfig.populateMetaFields()) {
HoodieRecord.RECORD_KEY_METADATA_FIELD
} else {
val keyFields = tableConfig.getRecordKeyFields.get()
checkState(keyFields.length == 1)
keyFields.head
}
protected lazy val preCombineFieldOpt: Option[String] = getPrecombineFieldProperty
protected lazy val preCombineFieldOpt: Option[String] =
Option(tableConfig.getPreCombineField)
.orElse(optParams.get(DataSourceWriteOptions.PRECOMBINE_FIELD.key)) match {
// NOTE: This is required to compensate for cases when empty string is used to stub
// property value to avoid it being set with the default value
// TODO(HUDI-3456) cleanup
case Some(f) if !StringUtils.isNullOrEmpty(f) => Some(f)
case _ => None
}
protected lazy val specifiedQueryTimestamp: Option[String] =
optParams.get(DataSourceReadOptions.TIME_TRAVEL_AS_OF_INSTANT.key)
@@ -118,16 +140,14 @@ abstract class HoodieBaseRelation(val sqlContext: SQLContext,
FileStatusCache.getOrCreate(sparkSession))
/**
* Columns that relation has to read from the storage to properly execute on its semantic: for ex,
* for Merge-on-Read tables key fields as well and pre-combine field comprise mandatory set of columns,
* meaning that regardless of whether this columns are being requested by the query they will be fetched
* regardless so that relation is able to combine records properly (if necessary)
*
* @VisibleInTests
*/
lazy val mandatoryColumns: Seq[String] = {
if (isMetadataTable(metaClient)) {
Seq(HoodieMetadataPayload.KEY_FIELD_NAME, HoodieMetadataPayload.SCHEMA_FIELD_NAME_TYPE)
} else {
// TODO this is MOR table requirement, not necessary for COW
Seq(recordKeyField) ++ preCombineFieldOpt.map(Seq(_)).getOrElse(Seq())
}
}
val mandatoryColumns: Seq[String]
protected def timeline: HoodieTimeline =
// NOTE: We're including compaction here since it's not considering a "commit" operation
@@ -136,9 +156,8 @@ abstract class HoodieBaseRelation(val sqlContext: SQLContext,
protected def latestInstant: Option[HoodieInstant] =
toScalaOption(timeline.lastInstant())
protected def queryTimestamp: Option[String] = {
specifiedQueryTimestamp.orElse(toScalaOption(timeline.lastInstant()).map(i => i.getTimestamp))
}
protected def queryTimestamp: Option[String] =
specifiedQueryTimestamp.orElse(toScalaOption(timeline.lastInstant()).map(_.getTimestamp))
override def schema: StructType = tableStructSchema
@@ -257,15 +276,17 @@ abstract class HoodieBaseRelation(val sqlContext: SQLContext,
requestedColumns ++ missing
}
private def getPrecombineFieldProperty: Option[String] =
Option(tableConfig.getPreCombineField)
.orElse(optParams.get(DataSourceWriteOptions.PRECOMBINE_FIELD.key)) match {
// NOTE: This is required to compensate for cases when empty string is used to stub
// property value to avoid it being set with the default value
// TODO(HUDI-3456) cleanup
case Some(f) if !StringUtils.isNullOrEmpty(f) => Some(f)
case _ => None
}
protected def getTableState: HoodieTableState = {
// Subset of the state of table's configuration as of at the time of the query
HoodieTableState(
tablePath = basePath,
latestCommitTimestamp = queryTimestamp.get,
recordKeyField = recordKeyField,
preCombineFieldOpt = preCombineFieldOpt,
usesVirtualKeys = !tableConfig.populateMetaFields(),
recordPayloadClassName = tableConfig.getPayloadClass
)
}
private def imbueConfigs(sqlContext: SQLContext): Unit = {
sqlContext.sparkSession.sessionState.conf.setConfString("spark.sql.parquet.filterPushdown", "true")
@@ -280,9 +301,6 @@ object HoodieBaseRelation {
def getPartitionPath(fileStatus: FileStatus): Path =
fileStatus.getPath.getParent
def isMetadataTable(metaClient: HoodieTableMetaClient): Boolean =
HoodieTableMetadata.isMetadataTable(metaClient.getBasePath)
/**
* Returns file-reader routine accepting [[PartitionedFile]] and returning an [[Iterator]]
* over [[InternalRow]]

View File

@@ -65,28 +65,6 @@ object HoodieDataSourceHelper extends PredicateHelper {
}
}
/**
* Convert [[InternalRow]] to [[SpecificInternalRow]].
*/
def createInternalRowWithSchema(
row: InternalRow,
schema: StructType,
positions: Seq[Int]): InternalRow = {
val rowToReturn = new SpecificInternalRow(schema)
var curIndex = 0
schema.zip(positions).foreach { case (field, pos) =>
val curField = if (row.isNullAt(pos)) {
null
} else {
row.get(pos, field.dataType)
}
rowToReturn.update(curIndex, curField)
curIndex += 1
}
rowToReturn
}
def splitFiles(
sparkSession: SparkSession,
file: FileStatus,

View File

@@ -20,64 +20,15 @@ package org.apache.hudi
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.execution.QueryExecutionException
import org.apache.spark.sql.execution.datasources.{FilePartition, PartitionedFile, SchemaColumnConvertNotSupportedException}
import org.apache.spark.{Partition, TaskContext}
import org.apache.spark.sql.execution.datasources.{FilePartition, FileScanRDD, PartitionedFile}
case class HoodieBaseFileSplit(filePartition: FilePartition) extends HoodieFileSplit
/**
* TODO eval if we actually need it
*/
class HoodieFileScanRDD(@transient private val sparkSession: SparkSession,
readFunction: PartitionedFile => Iterator[InternalRow],
@transient fileSplits: Seq[HoodieBaseFileSplit])
extends HoodieUnsafeRDD(sparkSession.sparkContext) {
extends FileScanRDD(sparkSession, readFunction, fileSplits.map(_.filePartition))
with HoodieUnsafeRDD {
override def compute(split: Partition, context: TaskContext): Iterator[InternalRow] = {
val iterator = new Iterator[InternalRow] with AutoCloseable {
private[this] val files = split.asInstanceOf[FilePartition].files.toIterator
private[this] var currentFile: PartitionedFile = _
private[this] var currentIterator: Iterator[InternalRow] = _
override def hasNext: Boolean = {
(currentIterator != null && currentIterator.hasNext) || nextIterator()
}
def next(): InternalRow = currentIterator.next()
/** Advances to the next file. Returns true if a new non-empty iterator is available. */
private def nextIterator(): Boolean = {
if (files.hasNext) {
currentFile = files.next()
logInfo(s"Reading File $currentFile")
currentIterator = readFunction(currentFile)
try {
hasNext
} catch {
case e: SchemaColumnConvertNotSupportedException =>
val message = "Parquet column cannot be converted in " +
s"file ${currentFile.filePath}. Column: ${e.getColumn}, " +
s"Expected: ${e.getLogicalType}, Found: ${e.getPhysicalType}"
throw new QueryExecutionException(message, e)
case e => throw e
}
} else {
currentFile = null
false
}
}
override def close(): Unit = {}
}
// Register an on-task-completion callback to close the input stream.
context.addTaskCompletionListener[Unit](_ => iterator.close())
iterator.asInstanceOf[Iterator[InternalRow]]
}
override protected def getPartitions: Array[Partition] = fileSplits.map(_.filePartition).toArray
override final def collect(): Array[InternalRow] = super[HoodieUnsafeRDD].collect()
}

View File

@@ -22,29 +22,34 @@ import org.apache.avro.Schema
import org.apache.avro.generic.{GenericRecord, GenericRecordBuilder, IndexedRecord}
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.apache.hudi.HoodieDataSourceHelper._
import org.apache.hudi.HoodieMergeOnReadRDD.resolveAvroSchemaNullability
import org.apache.hadoop.mapred.JobConf
import org.apache.hudi.HoodieConversionUtils.toScalaOption
import org.apache.hudi.HoodieMergeOnReadRDD.{AvroDeserializerSupport, collectFieldOrdinals, getPartitionPath, projectAvro, projectAvroUnsafe, projectRowUnsafe, resolveAvroSchemaNullability}
import org.apache.hudi.MergeOnReadSnapshotRelation.getFilePath
import org.apache.hudi.avro.HoodieAvroUtils
import org.apache.hudi.common.config.HoodieMetadataConfig
import org.apache.hudi.common.engine.HoodieLocalEngineContext
import org.apache.hudi.common.fs.FSUtils
import org.apache.hudi.common.fs.FSUtils.getRelativePartitionPath
import org.apache.hudi.common.model.{HoodieLogFile, HoodieRecord, HoodieRecordPayload, OverwriteWithLatestAvroPayload}
import org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner
import org.apache.hudi.common.util.ValidationUtils.checkState
import org.apache.hudi.config.HoodiePayloadConfig
import org.apache.hudi.exception.HoodieException
import org.apache.hudi.hadoop.config.HoodieRealtimeConfig
import org.apache.hudi.hadoop.utils.HoodieRealtimeRecordReaderUtils.getMaxCompactionMemoryInBytes
import org.apache.hudi.metadata.HoodieTableMetadata.getDataTableBasePathFromMetadataTable
import org.apache.hudi.metadata.{HoodieBackedTableMetadata, HoodieTableMetadata}
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.avro.HoodieAvroDeserializer
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.UnsafeProjection
import org.apache.spark.sql.catalyst.expressions.{SpecificInternalRow, UnsafeProjection}
import org.apache.spark.sql.execution.datasources.PartitionedFile
import org.apache.spark.sql.types.StructType
import org.apache.spark.{Partition, SerializableWritable, SparkContext, TaskContext}
import java.io.Closeable
import java.util.Properties
import scala.collection.JavaConverters._
import scala.collection.mutable
import scala.util.Try
case class HoodieMergeOnReadPartition(index: Int, split: HoodieMergeOnReadFileSplit) extends Partition
@@ -53,14 +58,16 @@ class HoodieMergeOnReadRDD(@transient sc: SparkContext,
@transient config: Configuration,
fullSchemaFileReader: PartitionedFile => Iterator[InternalRow],
requiredSchemaFileReader: PartitionedFile => Iterator[InternalRow],
tableState: HoodieTableState,
tableSchema: HoodieTableSchema,
requiredSchema: HoodieTableSchema,
tableState: HoodieTableState,
mergeType: String,
@transient fileSplits: Seq[HoodieMergeOnReadFileSplit])
extends HoodieUnsafeRDD(sc) {
extends RDD[InternalRow](sc, Nil) with HoodieUnsafeRDD {
protected val maxCompactionMemoryInBytes: Long = getMaxCompactionMemoryInBytes(new JobConf(config))
private val confBroadcast = sc.broadcast(new SerializableWritable(config))
private val recordKeyField = tableState.recordKeyField
private val payloadProps = tableState.preCombineFieldOpt
.map(preCombineField =>
HoodiePayloadConfig.newBuilder
@@ -70,34 +77,59 @@ class HoodieMergeOnReadRDD(@transient sc: SparkContext,
)
.getOrElse(new Properties())
private val whitelistedPayloadClasses: Set[String] = Seq(
classOf[OverwriteWithLatestAvroPayload]
).map(_.getName).toSet
override def compute(split: Partition, context: TaskContext): Iterator[InternalRow] = {
val mergeOnReadPartition = split.asInstanceOf[HoodieMergeOnReadPartition]
val iter = mergeOnReadPartition.split match {
case dataFileOnlySplit if dataFileOnlySplit.logFiles.isEmpty =>
requiredSchemaFileReader(dataFileOnlySplit.dataFile.get)
requiredSchemaFileReader.apply(dataFileOnlySplit.dataFile.get)
case logFileOnlySplit if logFileOnlySplit.dataFile.isEmpty =>
logFileIterator(logFileOnlySplit, getConfig)
case skipMergeSplit if skipMergeSplit.mergeType.equals(DataSourceReadOptions.REALTIME_SKIP_MERGE_OPT_VAL) =>
skipMergeFileIterator(skipMergeSplit, requiredSchemaFileReader(skipMergeSplit.dataFile.get), getConfig)
case payloadCombineSplit
if payloadCombineSplit.mergeType.equals(DataSourceReadOptions.REALTIME_PAYLOAD_COMBINE_OPT_VAL) =>
payloadCombineFileIterator(payloadCombineSplit, fullSchemaFileReader(payloadCombineSplit.dataFile.get),
getConfig)
new LogFileIterator(logFileOnlySplit, getConfig)
case split if mergeType.equals(DataSourceReadOptions.REALTIME_SKIP_MERGE_OPT_VAL) =>
val baseFileIterator = requiredSchemaFileReader.apply(split.dataFile.get)
new SkipMergeIterator(split, baseFileIterator, getConfig)
case split if mergeType.equals(DataSourceReadOptions.REALTIME_PAYLOAD_COMBINE_OPT_VAL) =>
val (baseFileIterator, schema) = readBaseFile(split)
new RecordMergingFileIterator(split, baseFileIterator, schema, getConfig)
case _ => throw new HoodieException(s"Unable to select an Iterator to read the Hoodie MOR File Split for " +
s"file path: ${mergeOnReadPartition.split.dataFile.get.filePath}" +
s"log paths: ${mergeOnReadPartition.split.logFiles.toString}" +
s"hoodie table path: ${mergeOnReadPartition.split.tablePath}" +
s"hoodie table path: ${tableState.tablePath}" +
s"spark partition Index: ${mergeOnReadPartition.index}" +
s"merge type: ${mergeOnReadPartition.split.mergeType}")
s"merge type: ${mergeType}")
}
if (iter.isInstanceOf[Closeable]) {
// register a callback to close logScanner which will be executed on task completion.
// when tasks finished, this method will be called, and release resources.
Option(TaskContext.get()).foreach(_.addTaskCompletionListener[Unit](_ => iter.asInstanceOf[Closeable].close()))
}
iter
}
private def readBaseFile(split: HoodieMergeOnReadFileSplit): (Iterator[InternalRow], HoodieTableSchema) = {
// NOTE: This is an optimization making sure that even for MOR tables we fetch absolute minimum
// of the stored data possible, while still properly executing corresponding relation's semantic
// and meet the query's requirements.
//
// Here we assume that iff queried table
// a) It does use one of the standard (and whitelisted) Record Payload classes
// then we can avoid reading and parsing the records w/ _full_ schema, and instead only
// rely on projected one, nevertheless being able to perform merging correctly
if (!whitelistedPayloadClasses.contains(tableState.recordPayloadClassName))
(fullSchemaFileReader(split.dataFile.get), tableSchema)
else
(requiredSchemaFileReader(split.dataFile.get), requiredSchema)
}
override protected def getPartitions: Array[Partition] =
fileSplits.zipWithIndex.map(file => HoodieMergeOnReadPartition(file._2, file._1)).toArray
@@ -108,270 +140,284 @@ class HoodieMergeOnReadRDD(@transient sc: SparkContext,
}
}
private def logFileIterator(split: HoodieMergeOnReadFileSplit,
config: Configuration): Iterator[InternalRow] =
new Iterator[InternalRow] with Closeable with SparkAdapterSupport {
private val tableAvroSchema = new Schema.Parser().parse(tableSchema.avroSchemaStr)
private val requiredAvroSchema = new Schema.Parser().parse(requiredSchema.avroSchemaStr)
private val requiredFieldPosition =
requiredSchema.structTypeSchema
.map(f => tableAvroSchema.getField(f.name).pos()).toList
private val recordBuilder = new GenericRecordBuilder(requiredAvroSchema)
private val deserializer = sparkAdapter.createAvroDeserializer(requiredAvroSchema, requiredSchema.structTypeSchema)
private val unsafeProjection = UnsafeProjection.create(requiredSchema.structTypeSchema)
private var logScanner = HoodieMergeOnReadRDD.scanLog(split, tableAvroSchema, config)
private val logRecords = logScanner.getRecords
private val logRecordsKeyIterator = logRecords.keySet().iterator().asScala
/**
* Provided w/ instance of [[HoodieMergeOnReadFileSplit]], iterates over all of the records stored in
* Delta Log files (represented as [[InternalRow]]s)
*/
private class LogFileIterator(split: HoodieMergeOnReadFileSplit,
config: Configuration)
extends Iterator[InternalRow] with Closeable with AvroDeserializerSupport {
private var recordToLoad: InternalRow = _
protected override val requiredAvroSchema: Schema = new Schema.Parser().parse(requiredSchema.avroSchemaStr)
protected override val requiredStructTypeSchema: StructType = requiredSchema.structTypeSchema
override def hasNext: Boolean = {
if (logRecordsKeyIterator.hasNext) {
val curAvrokey = logRecordsKeyIterator.next()
val curAvroRecord = logRecords.get(curAvrokey).getData.getInsertValue(tableAvroSchema, payloadProps)
if (!curAvroRecord.isPresent) {
// delete record found, skipping
this.hasNext
} else {
val requiredAvroRecord = AvroConversionUtils.buildAvroRecordBySchema(curAvroRecord.get(), requiredAvroSchema,
requiredFieldPosition, recordBuilder)
val rowOpt = deserializer.deserialize(requiredAvroRecord)
recordToLoad = unsafeProjection(rowOpt.get.asInstanceOf[InternalRow])
true
protected val logFileReaderAvroSchema: Schema = new Schema.Parser().parse(tableSchema.avroSchemaStr)
protected val recordBuilder: GenericRecordBuilder = new GenericRecordBuilder(requiredAvroSchema)
protected var recordToLoad: InternalRow = _
// TODO validate whether we need to do UnsafeProjection
protected val unsafeProjection: UnsafeProjection = UnsafeProjection.create(requiredStructTypeSchema)
// NOTE: This maps _required_ schema fields onto the _full_ table schema, collecting their "ordinals"
// w/in the record payload. This is required, to project records read from the Delta Log file
// which always reads records in full schema (never projected, due to the fact that DL file might
// be stored in non-columnar formats like Avro, HFile, etc)
private val requiredSchemaFieldOrdinals: List[Int] = collectFieldOrdinals(requiredAvroSchema, logFileReaderAvroSchema)
private var logScanner =
HoodieMergeOnReadRDD.scanLog(split.logFiles, getPartitionPath(split), logFileReaderAvroSchema, tableState,
maxCompactionMemoryInBytes, config)
private val logRecords = logScanner.getRecords.asScala
// NOTE: This iterator iterates over already projected (in required schema) records
// NOTE: This have to stay lazy to make sure it's initialized only at the point where it's
// going to be used, since we modify `logRecords` before that and therefore can't do it any earlier
protected lazy val logRecordsIterator: Iterator[Option[GenericRecord]] =
logRecords.iterator.map {
case (_, record) =>
val avroRecordOpt = toScalaOption(record.getData.getInsertValue(logFileReaderAvroSchema, payloadProps))
avroRecordOpt.map {
avroRecord => projectAvroUnsafe(avroRecord, requiredAvroSchema, requiredSchemaFieldOrdinals, recordBuilder)
}
}
protected def removeLogRecord(key: String): Option[HoodieRecord[_ <: HoodieRecordPayload[_]]] =
logRecords.remove(key)
override def hasNext: Boolean =
logRecordsIterator.hasNext && {
val avroRecordOpt = logRecordsIterator.next()
if (avroRecordOpt.isEmpty) {
// Record has been deleted, skipping
this.hasNext
} else {
false
recordToLoad = unsafeProjection(deserialize(avroRecordOpt.get))
true
}
}
override def next(): InternalRow = {
recordToLoad
}
override final def next(): InternalRow = recordToLoad
override def close(): Unit = {
if (logScanner != null) {
try {
logScanner.close()
} finally {
logScanner = null
}
override def close(): Unit =
if (logScanner != null) {
try {
logScanner.close()
} finally {
logScanner = null
}
}
}
/**
* Provided w/ instance of [[HoodieMergeOnReadFileSplit]], provides an iterator over all of the records stored in
* Base file as well as all of the Delta Log files simply returning concatenation of these streams, while not
* performing any combination/merging of the records w/ the same primary keys (ie producing duplicates potentially)
*/
private class SkipMergeIterator(split: HoodieMergeOnReadFileSplit,
baseFileIterator: Iterator[InternalRow],
config: Configuration)
extends LogFileIterator(split, config) {
override def hasNext: Boolean = {
if (baseFileIterator.hasNext) {
val curRow = baseFileIterator.next()
recordToLoad = unsafeProjection(curRow)
true
} else {
super[LogFileIterator].hasNext
}
}
}
private def skipMergeFileIterator(split: HoodieMergeOnReadFileSplit,
baseFileIterator: Iterator[InternalRow],
config: Configuration): Iterator[InternalRow] =
new Iterator[InternalRow] with Closeable with SparkAdapterSupport {
private val tableAvroSchema = new Schema.Parser().parse(tableSchema.avroSchemaStr)
private val requiredAvroSchema = new Schema.Parser().parse(requiredSchema.avroSchemaStr)
private val requiredFieldPosition =
requiredSchema.structTypeSchema
.map(f => tableAvroSchema.getField(f.name).pos()).toList
private val recordBuilder = new GenericRecordBuilder(requiredAvroSchema)
private val deserializer = sparkAdapter.createAvroDeserializer(requiredAvroSchema, requiredSchema.structTypeSchema)
private val unsafeProjection = UnsafeProjection.create(requiredSchema.structTypeSchema)
private var logScanner = HoodieMergeOnReadRDD.scanLog(split, tableAvroSchema, config)
private val logRecords = logScanner.getRecords
private val logRecordsKeyIterator = logRecords.keySet().iterator().asScala
/**
* Provided w/ instance of [[HoodieMergeOnReadFileSplit]], provides an iterator over all of the records stored in
* a) Base file and all of the b) Delta Log files combining records with the same primary key from both of these
* streams
*/
private class RecordMergingFileIterator(split: HoodieMergeOnReadFileSplit,
baseFileIterator: Iterator[InternalRow],
baseFileReaderSchema: HoodieTableSchema,
config: Configuration)
extends LogFileIterator(split, config) {
private var recordToLoad: InternalRow = _
// NOTE: Record-merging iterator supports 2 modes of operation merging records bearing either
// - Full table's schema
// - Projected schema
// As such, no particular schema could be assumed, and therefore we rely on the caller
// to correspondingly set the scheme of the expected output of base-file reader
private val baseFileReaderAvroSchema = new Schema.Parser().parse(baseFileReaderSchema.avroSchemaStr)
private val requiredSchemaFieldOrdinals: List[Int] = collectFieldOrdinals(requiredAvroSchema, baseFileReaderAvroSchema)
@scala.annotation.tailrec
override def hasNext: Boolean = {
if (baseFileIterator.hasNext) {
val curRow = baseFileIterator.next()
recordToLoad = unsafeProjection(curRow)
private val serializer = sparkAdapter.createAvroSerializer(baseFileReaderSchema.structTypeSchema,
baseFileReaderAvroSchema, resolveAvroSchemaNullability(baseFileReaderAvroSchema))
private val recordKeyOrdinal = baseFileReaderSchema.structTypeSchema.fieldIndex(tableState.recordKeyField)
override def hasNext: Boolean = {
if (baseFileIterator.hasNext) {
val curRowRecord = baseFileIterator.next()
val curKey = curRowRecord.getString(recordKeyOrdinal)
val updatedRecordOpt = removeLogRecord(curKey)
if (updatedRecordOpt.isEmpty) {
// No merge needed, load current row with required projected schema
recordToLoad = unsafeProjection(projectRowUnsafe(curRowRecord, requiredSchema.structTypeSchema, requiredSchemaFieldOrdinals))
true
} else {
if (logRecordsKeyIterator.hasNext) {
val curAvrokey = logRecordsKeyIterator.next()
val curAvroRecord = logRecords.get(curAvrokey).getData.getInsertValue(tableAvroSchema, payloadProps)
if (!curAvroRecord.isPresent) {
// delete record found, skipping
this.hasNext
} else {
val requiredAvroRecord = AvroConversionUtils.buildAvroRecordBySchema(curAvroRecord.get(), requiredAvroSchema,
requiredFieldPosition, recordBuilder)
val rowOpt = deserializer.deserialize(requiredAvroRecord)
recordToLoad = unsafeProjection(rowOpt.get.asInstanceOf[InternalRow])
true
}
val mergedAvroRecordOpt = merge(serialize(curRowRecord), updatedRecordOpt.get)
if (mergedAvroRecordOpt.isEmpty) {
// Record has been deleted, skipping
this.hasNext
} else {
false
}
}
}
override def next(): InternalRow = {
recordToLoad
}
override def close(): Unit = {
if (logScanner != null) {
try {
logScanner.close()
} finally {
logScanner = null
}
}
}
}
private def payloadCombineFileIterator(split: HoodieMergeOnReadFileSplit,
baseFileIterator: Iterator[InternalRow],
config: Configuration): Iterator[InternalRow] =
new Iterator[InternalRow] with Closeable with SparkAdapterSupport {
private val tableAvroSchema = new Schema.Parser().parse(tableSchema.avroSchemaStr)
private val requiredAvroSchema = new Schema.Parser().parse(requiredSchema.avroSchemaStr)
private val requiredFieldPosition =
requiredSchema.structTypeSchema
.map(f => tableAvroSchema.getField(f.name).pos()).toList
private val serializer = sparkAdapter.createAvroSerializer(tableSchema.structTypeSchema, tableAvroSchema,
resolveAvroSchemaNullability(tableAvroSchema))
private val requiredDeserializer = sparkAdapter.createAvroDeserializer(requiredAvroSchema, requiredSchema.structTypeSchema)
private val recordBuilder = new GenericRecordBuilder(requiredAvroSchema)
private val unsafeProjection = UnsafeProjection.create(requiredSchema.structTypeSchema)
private var logScanner = HoodieMergeOnReadRDD.scanLog(split, tableAvroSchema, config)
private val logRecords = logScanner.getRecords
private val logRecordsKeyIterator = logRecords.keySet().iterator().asScala
private val keyToSkip = mutable.Set.empty[String]
private val recordKeyPosition = tableSchema.structTypeSchema.fieldIndex(recordKeyField)
private var recordToLoad: InternalRow = _
@scala.annotation.tailrec
override def hasNext: Boolean = {
if (baseFileIterator.hasNext) {
val curRow = baseFileIterator.next()
val curKey = curRow.getString(recordKeyPosition)
if (logRecords.containsKey(curKey)) {
// duplicate key found, merging
keyToSkip.add(curKey)
val mergedAvroRecord = mergeRowWithLog(curRow, curKey)
if (!mergedAvroRecord.isPresent) {
// deleted
this.hasNext
} else {
// load merged record as InternalRow with required schema
val requiredAvroRecord = AvroConversionUtils.buildAvroRecordBySchema(mergedAvroRecord.get(), requiredAvroSchema,
requiredFieldPosition, recordBuilder)
val rowOpt = requiredDeserializer.deserialize(requiredAvroRecord)
recordToLoad = unsafeProjection(rowOpt.get.asInstanceOf[InternalRow])
true
}
} else {
// No merge needed, load current row with required schema
recordToLoad = unsafeProjection(createInternalRowWithSchema(curRow, requiredSchema.structTypeSchema, requiredFieldPosition))
// NOTE: In occurrence of a merge we can't know the schema of the record being returned, b/c
// record from the Delta Log will bear (full) Table schema, while record from the Base file
// might already be read in projected one (as an optimization).
// As such we can't use more performant [[projectAvroUnsafe]], and instead have to fallback
// to [[projectAvro]]
val projectedAvroRecord = projectAvro(mergedAvroRecordOpt.get, requiredAvroSchema, recordBuilder)
recordToLoad = unsafeProjection(deserialize(projectedAvroRecord))
true
}
} else {
if (logRecordsKeyIterator.hasNext) {
val curKey = logRecordsKeyIterator.next()
if (keyToSkip.contains(curKey)) {
this.hasNext
} else {
val insertAvroRecord = logRecords.get(curKey).getData.getInsertValue(tableAvroSchema, payloadProps)
if (!insertAvroRecord.isPresent) {
// stand alone delete record, skipping
this.hasNext
} else {
val requiredAvroRecord = AvroConversionUtils
.buildAvroRecordBySchema(
insertAvroRecord.get(),
requiredAvroSchema,
requiredFieldPosition,
recordBuilder
)
val rowOpt = requiredDeserializer.deserialize(requiredAvroRecord)
recordToLoad = unsafeProjection(rowOpt.get.asInstanceOf[InternalRow])
true
}
}
} else {
false
}
}
}
override def next(): InternalRow = recordToLoad
override def close(): Unit = {
if (logScanner != null) {
try {
logScanner.close()
} finally {
logScanner = null
}
}
}
private def mergeRowWithLog(curRow: InternalRow, curKey: String) : org.apache.hudi.common.util.Option[IndexedRecord] = {
val historyAvroRecord = serializer.serialize(curRow).asInstanceOf[GenericRecord]
val mergedRec = logRecords.get(curKey).getData
.combineAndGetUpdateValue(historyAvroRecord, tableAvroSchema, payloadProps)
if (mergedRec.isPresent && mergedRec.get().getSchema != tableAvroSchema) {
org.apache.hudi.common.util.Option.of(HoodieAvroUtils.rewriteRecord(mergedRec.get().asInstanceOf[GenericRecord], tableAvroSchema).asInstanceOf[IndexedRecord])
} else {
mergedRec
}
} else {
super[LogFileIterator].hasNext
}
}
private def serialize(curRowRecord: InternalRow): GenericRecord =
serializer.serialize(curRowRecord).asInstanceOf[GenericRecord]
private def merge(curAvroRecord: GenericRecord, newRecord: HoodieRecord[_ <: HoodieRecordPayload[_]]): Option[IndexedRecord] = {
// NOTE: We have to pass in Avro Schema used to read from Delta Log file since we invoke combining API
// on the record from the Delta Log
toScalaOption(newRecord.getData.combineAndGetUpdateValue(curAvroRecord, logFileReaderAvroSchema, payloadProps))
}
}
}
private object HoodieMergeOnReadRDD {
val CONFIG_INSTANTIATION_LOCK = new Object()
def scanLog(split: HoodieMergeOnReadFileSplit, logSchema: Schema, config: Configuration): HoodieMergedLogRecordScanner = {
val fs = FSUtils.getFs(split.tablePath, config)
val logFiles = split.logFiles.get
def scanLog(logFiles: List[HoodieLogFile],
partitionPath: Path,
logSchema: Schema,
tableState: HoodieTableState,
maxCompactionMemoryInBytes: Long,
hadoopConf: Configuration): HoodieMergedLogRecordScanner = {
val tablePath = tableState.tablePath
val fs = FSUtils.getFs(tablePath, hadoopConf)
if (HoodieTableMetadata.isMetadataTable(split.tablePath)) {
if (HoodieTableMetadata.isMetadataTable(tablePath)) {
val metadataConfig = HoodieMetadataConfig.newBuilder().enable(true).build()
val dataTableBasePath = getDataTableBasePathFromMetadataTable(split.tablePath)
val dataTableBasePath = getDataTableBasePathFromMetadataTable(tablePath)
val metadataTable = new HoodieBackedTableMetadata(
new HoodieLocalEngineContext(config), metadataConfig,
new HoodieLocalEngineContext(hadoopConf), metadataConfig,
dataTableBasePath,
config.get(HoodieRealtimeConfig.SPILLABLE_MAP_BASE_PATH_PROP, HoodieRealtimeConfig.DEFAULT_SPILLABLE_MAP_BASE_PATH))
hadoopConf.get(HoodieRealtimeConfig.SPILLABLE_MAP_BASE_PATH_PROP, HoodieRealtimeConfig.DEFAULT_SPILLABLE_MAP_BASE_PATH))
// NOTE: In case of Metadata Table partition path equates to partition name (since there's just one level
// of indirection among MT partitions)
val relativePartitionPath = getRelativePartitionPath(new Path(split.tablePath), getPartitionPath(split))
val relativePartitionPath = getRelativePartitionPath(new Path(tablePath), partitionPath)
metadataTable.getLogRecordScanner(logFiles.asJava, relativePartitionPath).getLeft
} else {
val logRecordScannerBuilder = HoodieMergedLogRecordScanner.newBuilder()
.withFileSystem(fs)
.withBasePath(split.tablePath)
.withLogFilePaths(split.logFiles.get.map(logFile => getFilePath(logFile.getPath)).asJava)
.withBasePath(tablePath)
.withLogFilePaths(logFiles.map(logFile => getFilePath(logFile.getPath)).asJava)
.withReaderSchema(logSchema)
.withLatestInstantTime(split.latestCommit)
.withLatestInstantTime(tableState.latestCommitTimestamp)
.withReadBlocksLazily(
Try(config.get(HoodieRealtimeConfig.COMPACTION_LAZY_BLOCK_READ_ENABLED_PROP,
Try(hadoopConf.get(HoodieRealtimeConfig.COMPACTION_LAZY_BLOCK_READ_ENABLED_PROP,
HoodieRealtimeConfig.DEFAULT_COMPACTION_LAZY_BLOCK_READ_ENABLED).toBoolean)
.getOrElse(false))
.withReverseReader(false)
.withBufferSize(
config.getInt(HoodieRealtimeConfig.MAX_DFS_STREAM_BUFFER_SIZE_PROP,
hadoopConf.getInt(HoodieRealtimeConfig.MAX_DFS_STREAM_BUFFER_SIZE_PROP,
HoodieRealtimeConfig.DEFAULT_MAX_DFS_STREAM_BUFFER_SIZE))
.withMaxMemorySizeInBytes(split.maxCompactionMemoryInBytes)
.withMaxMemorySizeInBytes(maxCompactionMemoryInBytes)
.withSpillableMapBasePath(
config.get(HoodieRealtimeConfig.SPILLABLE_MAP_BASE_PATH_PROP,
hadoopConf.get(HoodieRealtimeConfig.SPILLABLE_MAP_BASE_PATH_PROP,
HoodieRealtimeConfig.DEFAULT_SPILLABLE_MAP_BASE_PATH))
if (logFiles.nonEmpty) {
logRecordScannerBuilder.withPartition(getRelativePartitionPath(new Path(split.tablePath), logFiles.head.getPath.getParent))
logRecordScannerBuilder.withPartition(
getRelativePartitionPath(new Path(tableState.tablePath), logFiles.head.getPath.getParent))
}
logRecordScannerBuilder.build()
}
}
/**
* Projects provided instance of [[InternalRow]] into provided schema, assuming that the
* the schema of the original row is strictly a superset of the given one
*/
private def projectRowUnsafe(row: InternalRow,
projectedSchema: StructType,
ordinals: Seq[Int]): InternalRow = {
val projectedRow = new SpecificInternalRow(projectedSchema)
var curIndex = 0
projectedSchema.zip(ordinals).foreach { case (field, pos) =>
val curField = if (row.isNullAt(pos)) {
null
} else {
row.get(pos, field.dataType)
}
projectedRow.update(curIndex, curField)
curIndex += 1
}
projectedRow
}
/**
* Projects provided instance of [[IndexedRecord]] into provided schema, assuming that the
* the schema of the original row is strictly a superset of the given one
*/
def projectAvroUnsafe(record: IndexedRecord,
projectedSchema: Schema,
ordinals: List[Int],
recordBuilder: GenericRecordBuilder): GenericRecord = {
val fields = projectedSchema.getFields.asScala
checkState(fields.length == ordinals.length)
fields.zip(ordinals).foreach {
case (field, pos) => recordBuilder.set(field, record.get(pos))
}
recordBuilder.build()
}
/**
* Projects provided instance of [[IndexedRecord]] into provided schema, assuming that the
* the schema of the original row is strictly a superset of the given one
*
* This is a "safe" counterpart of [[projectAvroUnsafe]]: it does build mapping of the record's
* schema into projected one itself (instead of expecting such mapping from the caller)
*/
def projectAvro(record: IndexedRecord,
projectedSchema: Schema,
recordBuilder: GenericRecordBuilder): GenericRecord = {
projectAvroUnsafe(record, projectedSchema, collectFieldOrdinals(projectedSchema, record.getSchema), recordBuilder)
}
/**
* Maps [[projected]] [[Schema]] onto [[source]] one, collecting corresponding field ordinals w/in it, which
* will be subsequently used by either [[projectRowUnsafe]] or [[projectAvroUnsafe()]] method
*
* @param projected target projected schema (which is a proper subset of [[source]] [[Schema]])
* @param source source schema of the record being projected
* @return list of ordinals of corresponding fields of [[projected]] schema w/in [[source]] one
*/
private def collectFieldOrdinals(projected: Schema, source: Schema): List[Int] = {
projected.getFields.asScala.map(f => source.getField(f.name()).pos()).toList
}
private def getPartitionPath(split: HoodieMergeOnReadFileSplit): Path = {
// Determine partition path as an immediate parent folder of either
// - The base file
// - Some log file
split.dataFile.map(baseFile => new Path(baseFile.filePath))
.getOrElse(split.logFiles.get.head.getPath)
.getOrElse(split.logFiles.head.getPath)
.getParent
}
@@ -380,4 +426,17 @@ private object HoodieMergeOnReadRDD {
case (nullable, _) => nullable
}
}
trait AvroDeserializerSupport extends SparkAdapterSupport {
protected val requiredAvroSchema: Schema
protected val requiredStructTypeSchema: StructType
private lazy val deserializer: HoodieAvroDeserializer =
sparkAdapter.createAvroDeserializer(requiredAvroSchema, requiredStructTypeSchema)
protected def deserialize(avroRecord: GenericRecord): InternalRow = {
checkState(avroRecord.getSchema.getFields.size() == requiredStructTypeSchema.fields.length)
deserializer.deserialize(avroRecord).get.asInstanceOf[InternalRow]
}
}
}

View File

@@ -56,12 +56,8 @@ import org.apache.spark.{Partition, SparkContext, TaskContext}
* NOTE: It enforces, for ex, that all of the RDDs implement [[compute]] method returning
* [[InternalRow]] to avoid superfluous ser/de
*/
abstract class HoodieUnsafeRDD(@transient sc: SparkContext)
extends RDD[InternalRow](sc, Nil) {
def compute(split: Partition, context: TaskContext): Iterator[InternalRow]
override final def collect(): Array[InternalRow] =
trait HoodieUnsafeRDD extends RDD[InternalRow] {
override def collect(): Array[InternalRow] =
throw new UnsupportedOperationException(
"This method will not function correctly, please refer to scala-doc for HoodieUnsafeRDD"
)

View File

@@ -90,12 +90,11 @@ class MergeOnReadIncrementalRelation(sqlContext: SQLContext,
hadoopConf = new Configuration(conf)
)
val hoodieTableState = HoodieTableState(HoodieRecord.RECORD_KEY_METADATA_FIELD, preCombineFieldOpt)
val hoodieTableState = getTableState
// TODO(HUDI-3639) implement incremental span record filtering w/in RDD to make sure returned iterator is appropriately
// filtered, since file-reader might not be capable to perform filtering
new HoodieMergeOnReadRDD(sqlContext.sparkContext, jobConf, fullSchemaParquetReader,
requiredSchemaParquetReader, hoodieTableState, tableSchema, requiredSchema, fileSplits)
new HoodieMergeOnReadRDD(sqlContext.sparkContext, jobConf, fullSchemaParquetReader, requiredSchemaParquetReader,
tableSchema, requiredSchema, hoodieTableState, mergeType, fileSplits)
}
override protected def collectFileSplits(partitionFilters: Seq[Expression], dataFilters: Seq[Expression]): List[HoodieMergeOnReadFileSplit] = {

View File

@@ -39,11 +39,7 @@ import org.apache.spark.sql.types.StructType
import scala.collection.JavaConverters._
case class HoodieMergeOnReadFileSplit(dataFile: Option[PartitionedFile],
logFiles: Option[List[HoodieLogFile]],
latestCommit: String,
tablePath: String,
maxCompactionMemoryInBytes: Long,
mergeType: String) extends HoodieFileSplit
logFiles: List[HoodieLogFile]) extends HoodieFileSplit
class MergeOnReadSnapshotRelation(sqlContext: SQLContext,
optParams: Map[String, String],
@@ -54,13 +50,13 @@ class MergeOnReadSnapshotRelation(sqlContext: SQLContext,
override type FileSplit = HoodieMergeOnReadFileSplit
private val mergeType = optParams.getOrElse(
DataSourceReadOptions.REALTIME_MERGE.key,
override lazy val mandatoryColumns: Seq[String] =
Seq(recordKeyField) ++ preCombineFieldOpt.map(Seq(_)).getOrElse(Seq())
protected val mergeType: String = optParams.getOrElse(DataSourceReadOptions.REALTIME_MERGE.key,
DataSourceReadOptions.REALTIME_MERGE.defaultValue)
private val maxCompactionMemoryInBytes = getMaxCompactionMemoryInBytes(jobConf)
protected override def composeRDD(fileIndex: Seq[HoodieMergeOnReadFileSplit],
protected override def composeRDD(fileSplits: Seq[HoodieMergeOnReadFileSplit],
partitionSchema: StructType,
tableSchema: HoodieTableSchema,
requiredSchema: HoodieTableSchema,
@@ -93,10 +89,9 @@ class MergeOnReadSnapshotRelation(sqlContext: SQLContext,
hadoopConf = new Configuration(conf)
)
val tableState = HoodieTableState(recordKeyField, preCombineFieldOpt)
new HoodieMergeOnReadRDD(sqlContext.sparkContext, jobConf, fullSchemaParquetReader,
requiredSchemaParquetReader, tableState, tableSchema, requiredSchema, fileIndex)
val tableState = getTableState
new HoodieMergeOnReadRDD(sqlContext.sparkContext, jobConf, fullSchemaParquetReader, requiredSchemaParquetReader,
tableSchema, requiredSchema, tableState, mergeType, fileSplits)
}
protected override def collectFileSplits(partitionFilters: Seq[Expression], dataFilters: Seq[Expression]): List[HoodieMergeOnReadFileSplit] = {
@@ -123,15 +118,14 @@ class MergeOnReadSnapshotRelation(sqlContext: SQLContext,
protected def buildSplits(fileSlices: Seq[FileSlice]): List[HoodieMergeOnReadFileSplit] = {
fileSlices.map { fileSlice =>
val baseFile = toScalaOption(fileSlice.getBaseFile)
val logFiles = Option(fileSlice.getLogFiles.sorted(HoodieLogFile.getLogFileComparator).iterator().asScala.toList)
val logFiles = fileSlice.getLogFiles.sorted(HoodieLogFile.getLogFileComparator).iterator().asScala.toList
val partitionedBaseFile = baseFile.map { file =>
val filePath = getFilePath(file.getFileStatus.getPath)
PartitionedFile(InternalRow.empty, filePath, 0, file.getFileLen)
}
HoodieMergeOnReadFileSplit(partitionedBaseFile, logFiles, queryTimestamp.get,
metaClient.getBasePath, maxCompactionMemoryInBytes, mergeType)
HoodieMergeOnReadFileSplit(partitionedBaseFile, logFiles)
}.toList
}

View File

@@ -18,6 +18,7 @@
package org.apache.spark
import org.apache.hudi.HoodieUnsafeRDD
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.util.MutablePair

View File

@@ -29,7 +29,7 @@ import org.apache.hudi.exception.{HoodieException, HoodieUpsertException}
import org.apache.hudi.keygen._
import org.apache.hudi.keygen.constant.KeyGeneratorOptions.Config
import org.apache.hudi.testutils.HoodieClientTestBase
import org.apache.hudi.{AvroConversionUtils, DataSourceReadOptions, DataSourceWriteOptions, HoodieDataSourceHelpers, HoodieMergeOnReadRDD}
import org.apache.hudi.{AvroConversionUtils, DataSourceReadOptions, DataSourceWriteOptions, HoodieDataSourceHelpers}
import org.apache.spark.sql._
import org.apache.spark.sql.functions.{col, concat, lit, udf}
import org.apache.spark.sql.types._

View File

@@ -18,22 +18,22 @@
package org.apache.hudi.functional
import org.apache.avro.Schema
import org.apache.hudi.{DataSourceReadOptions, DataSourceWriteOptions, DefaultSource, HoodieBaseRelation, HoodieSparkUtils, HoodieUnsafeRDD}
import org.apache.hudi.common.config.HoodieMetadataConfig
import org.apache.hudi.common.model.HoodieRecord
import org.apache.hudi.common.model.{HoodieRecord, OverwriteNonDefaultsWithLatestAvroPayload, OverwriteWithLatestAvroPayload}
import org.apache.hudi.common.table.HoodieTableConfig
import org.apache.hudi.common.testutils.{HadoopMapRedUtils, HoodieTestDataGenerator}
import org.apache.hudi.config.{HoodieStorageConfig, HoodieWriteConfig}
import org.apache.hudi.keygen.NonpartitionedKeyGenerator
import org.apache.hudi.testutils.SparkClientFunctionalTestHarness
import org.apache.hudi.{DataSourceReadOptions, DataSourceWriteOptions, DefaultSource, HoodieBaseRelation, HoodieSparkUtils, HoodieUnsafeRDD}
import org.apache.parquet.hadoop.util.counters.BenchmarkCounter
import org.apache.spark.HoodieUnsafeRDDUtils
import org.apache.spark.internal.Logging
import org.apache.spark.sql.{Dataset, Row, SaveMode}
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.{Dataset, Row, SaveMode}
import org.junit.jupiter.api.Assertions.{assertEquals, fail}
import org.junit.jupiter.api.{Tag, Test}
import scala.:+
import scala.collection.JavaConverters._
@Tag("functional")
@@ -67,14 +67,14 @@ class TestParquetColumnProjection extends SparkClientFunctionalTestHarness with
val projectedColumnsReadStats: Array[(String, Long)] =
if (HoodieSparkUtils.isSpark3)
Array(
("rider", 2452),
("rider,driver", 2552),
("rider,driver,tip_history", 3517))
("rider", 2363),
("rider,driver", 2463),
("rider,driver,tip_history", 3428))
else if (HoodieSparkUtils.isSpark2)
Array(
("rider", 2595),
("rider,driver", 2735),
("rider,driver,tip_history", 3750))
("rider", 2474),
("rider,driver", 2614),
("rider,driver,tip_history", 3629))
else
fail("Only Spark 3 and Spark 2 are currently supported")
@@ -107,31 +107,30 @@ class TestParquetColumnProjection extends SparkClientFunctionalTestHarness with
else
fail("Only Spark 3 and Spark 2 are currently supported")
// Stats for the reads fetching _all_ columns (note, how amount of bytes read
// is invariant of the # of columns)
val fullColumnsReadStats: Array[(String, Long)] =
if (HoodieSparkUtils.isSpark3)
Array(
("rider", 14166),
("rider,driver", 14166),
("rider,driver,tip_history", 14166))
else if (HoodieSparkUtils.isSpark2)
// TODO re-enable tests (these tests are very unstable currently)
Array(
("rider", -1),
("rider,driver", -1),
("rider,driver,tip_history", -1))
else
fail("Only Spark 3 and Spark 2 are currently supported")
// Test MOR / Snapshot / Skip-merge
runTest(tableState, DataSourceReadOptions.QUERY_TYPE_SNAPSHOT_OPT_VAL, DataSourceReadOptions.REALTIME_SKIP_MERGE_OPT_VAL, projectedColumnsReadStats)
// Test MOR / Snapshot / Payload-combine
runTest(tableState, DataSourceReadOptions.QUERY_TYPE_SNAPSHOT_OPT_VAL, DataSourceReadOptions.REALTIME_PAYLOAD_COMBINE_OPT_VAL, fullColumnsReadStats)
runTest(tableState, DataSourceReadOptions.QUERY_TYPE_SNAPSHOT_OPT_VAL, DataSourceReadOptions.REALTIME_PAYLOAD_COMBINE_OPT_VAL, projectedColumnsReadStats)
// Stats for the reads fetching only _projected_ columns (note how amount of bytes read
// increases along w/ the # of columns) in Read Optimized mode (which is essentially equivalent to COW)
val projectedColumnsReadStatsReadOptimized: Array[(String, Long)] =
if (HoodieSparkUtils.isSpark3)
Array(
("rider", 2363),
("rider,driver", 2463),
("rider,driver,tip_history", 3428))
else if (HoodieSparkUtils.isSpark2)
Array(
("rider", 2474),
("rider,driver", 2614),
("rider,driver,tip_history", 3629))
else
fail("Only Spark 3 and Spark 2 are currently supported")
// Test MOR / Read Optimized
runTest(tableState, DataSourceReadOptions.QUERY_TYPE_READ_OPTIMIZED_OPT_VAL, "null", projectedColumnsReadStats)
runTest(tableState, DataSourceReadOptions.QUERY_TYPE_READ_OPTIMIZED_OPT_VAL, "null", projectedColumnsReadStatsReadOptimized)
}
@Test
@@ -163,17 +162,76 @@ class TestParquetColumnProjection extends SparkClientFunctionalTestHarness with
else
fail("Only Spark 3 and Spark 2 are currently supported")
// Stats for the reads fetching _all_ columns (currently for MOR to be able to merge
// records properly full row has to be fetched; note, how amount of bytes read
// Test MOR / Snapshot / Skip-merge
runTest(tableState, DataSourceReadOptions.QUERY_TYPE_SNAPSHOT_OPT_VAL, DataSourceReadOptions.REALTIME_SKIP_MERGE_OPT_VAL, projectedColumnsReadStats)
// Test MOR / Snapshot / Payload-combine
runTest(tableState, DataSourceReadOptions.QUERY_TYPE_SNAPSHOT_OPT_VAL, DataSourceReadOptions.REALTIME_PAYLOAD_COMBINE_OPT_VAL, projectedColumnsReadStats)
// Stats for the reads fetching only _projected_ columns (note how amount of bytes read
// increases along w/ the # of columns) in Read Optimized mode (which is essentially equivalent to COW)
val projectedColumnsReadStatsReadOptimized: Array[(String, Long)] =
if (HoodieSparkUtils.isSpark3)
Array(
("rider", 2363),
("rider,driver", 2463),
("rider,driver,tip_history", 3428))
else if (HoodieSparkUtils.isSpark2)
Array(
("rider", 2474),
("rider,driver", 2614),
("rider,driver,tip_history", 3629))
else
fail("Only Spark 3 and Spark 2 are currently supported")
// Test MOR / Read Optimized
runTest(tableState, DataSourceReadOptions.QUERY_TYPE_READ_OPTIMIZED_OPT_VAL, "null", projectedColumnsReadStatsReadOptimized)
}
@Test
def testMergeOnReadSnapshotRelationWithDeltaLogsFallback(): Unit = {
val tablePath = s"$basePath/mor-with-logs-fallback"
val targetRecordsCount = 100
val targetUpdatedRecordsRatio = 0.5
// NOTE: This test validates MOR Snapshot Relation falling back to read "whole" row from MOR table (as
// opposed to only required columns) in following cases
// - Non-standard Record Payload is used: such Payload might rely on the fields that are not
// being queried by the Spark, and we currently have no way figuring out what these fields are, therefore
// we fallback to read whole row
val overriddenOpts = defaultWriteOpts ++ Map(
HoodieWriteConfig.WRITE_PAYLOAD_CLASS_NAME.key -> classOf[OverwriteNonDefaultsWithLatestAvroPayload].getName
)
val (_, schema) = bootstrapMORTable(tablePath, targetRecordsCount, targetUpdatedRecordsRatio, overriddenOpts, populateMetaFields = true)
val tableState = TableState(tablePath, schema, targetRecordsCount, targetUpdatedRecordsRatio)
// Stats for the reads fetching only _projected_ columns (note how amount of bytes read
// increases along w/ the # of columns)
val projectedColumnsReadStats: Array[(String, Long)] =
if (HoodieSparkUtils.isSpark3)
Array(
("rider", 2452),
("rider,driver", 2552),
("rider,driver,tip_history", 3517))
else if (HoodieSparkUtils.isSpark2)
Array(
("rider", 2595),
("rider,driver", 2735),
("rider,driver,tip_history", 3750))
else
fail("Only Spark 3 and Spark 2 are currently supported")
// Stats for the reads fetching _all_ columns (note, how amount of bytes read
// is invariant of the # of columns)
val fullColumnsReadStats: Array[(String, Long)] =
if (HoodieSparkUtils.isSpark3)
Array(
("rider", 14166),
("rider,driver", 14166),
("rider,driver,tip_history", 14166))
("rider", 14167),
("rider,driver", 14167),
("rider,driver,tip_history", 14167))
else if (HoodieSparkUtils.isSpark2)
// TODO re-enable tests (these tests are very unstable currently)
// TODO re-enable tests (these tests are very unstable currently)
Array(
("rider", -1),
("rider,driver", -1),
@@ -184,11 +242,8 @@ class TestParquetColumnProjection extends SparkClientFunctionalTestHarness with
// Test MOR / Snapshot / Skip-merge
runTest(tableState, DataSourceReadOptions.QUERY_TYPE_SNAPSHOT_OPT_VAL, DataSourceReadOptions.REALTIME_SKIP_MERGE_OPT_VAL, projectedColumnsReadStats)
// Test MOR / Snapshot / Payload-combine
// Test MOR / Snapshot / Payload-combine (using non-standard Record Payload)
runTest(tableState, DataSourceReadOptions.QUERY_TYPE_SNAPSHOT_OPT_VAL, DataSourceReadOptions.REALTIME_PAYLOAD_COMBINE_OPT_VAL, fullColumnsReadStats)
// Test MOR / Read Optimized
runTest(tableState, DataSourceReadOptions.QUERY_TYPE_READ_OPTIMIZED_OPT_VAL, "null", projectedColumnsReadStats)
}
// TODO add test for incremental query of the table with logs
@@ -222,23 +277,6 @@ class TestParquetColumnProjection extends SparkClientFunctionalTestHarness with
else
fail("Only Spark 3 and Spark 2 are currently supported")
// Stats for the reads fetching _all_ columns (note, how amount of bytes read
// is invariant of the # of columns)
val fullColumnsReadStats: Array[(String, Long)] =
if (HoodieSparkUtils.isSpark3)
Array(
("rider", 19684),
("rider,driver", 19684),
("rider,driver,tip_history", 19684))
else if (HoodieSparkUtils.isSpark2)
// TODO re-enable tests (these tests are very unstable currently)
Array(
("rider", -1),
("rider,driver", -1),
("rider,driver,tip_history", -1))
else
fail("Only Spark 3 and Spark 2 are currently supported")
val incrementalOpts: Map[String, String] = Map(
DataSourceReadOptions.BEGIN_INSTANTTIME.key -> "001"
)
@@ -249,10 +287,9 @@ class TestParquetColumnProjection extends SparkClientFunctionalTestHarness with
// Test MOR / Incremental / Payload-combine
runTest(tableState, DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL, DataSourceReadOptions.REALTIME_PAYLOAD_COMBINE_OPT_VAL,
fullColumnsReadStats, incrementalOpts)
projectedColumnsReadStats, incrementalOpts)
}
// Test routine
private def runTest(tableState: TableState,
queryType: String,
@@ -322,6 +359,7 @@ class TestParquetColumnProjection extends SparkClientFunctionalTestHarness with
inputDF.write.format("org.apache.hudi")
.options(opts)
.option(HoodieTableConfig.POPULATE_META_FIELDS.key, populateMetaFields.toString)
.option(DataSourceWriteOptions.TABLE_TYPE.key, tableType)
.option(DataSourceWriteOptions.OPERATION.key, DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL)
.mode(SaveMode.Overwrite)
@@ -354,6 +392,7 @@ class TestParquetColumnProjection extends SparkClientFunctionalTestHarness with
inputDF.write.format("org.apache.hudi")
.options(opts)
.option(DataSourceWriteOptions.OPERATION.key, DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL)
.option(HoodieTableConfig.POPULATE_META_FIELDS.key, populateMetaFields.toString)
.mode(SaveMode.Append)
.save(path)