[HUDI-3979] Optimize out mandatory columns when no merging is performed (#5430)
For MOR, when no merging is performed there is no point in reading either primary-key or pre-combine-key values (unless query is referencing these). Avoiding reading these allows to potentially save substantial resources wasted for reading it out.
This commit is contained in:
@@ -60,9 +60,7 @@ class BaseFileOnlyRelation(sqlContext: SQLContext,
|
||||
override protected val shouldExtractPartitionValuesFromPartitionPath: Boolean =
|
||||
internalSchemaOpt.isEmpty
|
||||
|
||||
override lazy val mandatoryFields: Seq[String] =
|
||||
// TODO reconcile, record's key shouldn't be mandatory for base-file only relation
|
||||
Seq(recordKeyField)
|
||||
override lazy val mandatoryFields: Seq[String] = Seq.empty
|
||||
|
||||
override def imbueConfigs(sqlContext: SQLContext): Unit = {
|
||||
super.imbueConfigs(sqlContext)
|
||||
@@ -73,6 +71,7 @@ class BaseFileOnlyRelation(sqlContext: SQLContext,
|
||||
partitionSchema: StructType,
|
||||
dataSchema: HoodieTableSchema,
|
||||
requiredSchema: HoodieTableSchema,
|
||||
requestedColumns: Array[String],
|
||||
filters: Array[Filter]): HoodieUnsafeRDD = {
|
||||
|
||||
val baseFileReader = createBaseFileReader(
|
||||
|
||||
@@ -23,7 +23,7 @@ import org.apache.hadoop.conf.Configuration
|
||||
import org.apache.hadoop.fs.{FileStatus, Path}
|
||||
import org.apache.hadoop.hbase.io.hfile.CacheConfig
|
||||
import org.apache.hadoop.mapred.JobConf
|
||||
import org.apache.hudi.HoodieBaseRelation.{convertToAvroSchema, createHFileReader, generateUnsafeProjection, getPartitionPath, projectSchema}
|
||||
import org.apache.hudi.HoodieBaseRelation.{BaseFileReader, convertToAvroSchema, createHFileReader, generateUnsafeProjection, getPartitionPath, projectSchema}
|
||||
import org.apache.hudi.HoodieConversionUtils.toScalaOption
|
||||
import org.apache.hudi.avro.HoodieAvroUtils
|
||||
import org.apache.hudi.client.utils.SparkInternalSchemaConverter
|
||||
@@ -204,6 +204,10 @@ abstract class HoodieBaseRelation(val sqlContext: SQLContext,
|
||||
shouldOmitPartitionColumns || shouldExtractPartitionValueFromPath
|
||||
}
|
||||
|
||||
/**
|
||||
* NOTE: This fields are accessed by [[NestedSchemaPruning]] component which is only enabled for
|
||||
* Spark >= 3.1
|
||||
*/
|
||||
lazy val (fileFormat: FileFormat, fileFormatClassName: String) =
|
||||
metaClient.getTableConfig.getBaseFileFormat match {
|
||||
case HoodieFileFormat.ORC => (new OrcFileFormat, "orc")
|
||||
@@ -258,12 +262,11 @@ abstract class HoodieBaseRelation(val sqlContext: SQLContext,
|
||||
*
|
||||
* Check scala-doc for [[shouldExtractPartitionValuesFromPartitionPath]] for more details
|
||||
*/
|
||||
def dataSchema: StructType =
|
||||
if (shouldExtractPartitionValuesFromPartitionPath) {
|
||||
prunePartitionColumns(tableStructSchema)
|
||||
} else {
|
||||
tableStructSchema
|
||||
}
|
||||
def dataSchema: StructType = if (shouldExtractPartitionValuesFromPartitionPath) {
|
||||
prunePartitionColumns(tableStructSchema)
|
||||
} else {
|
||||
tableStructSchema
|
||||
}
|
||||
|
||||
/**
|
||||
* Determines whether relation's schema could be pruned by Spark's Optimizer
|
||||
@@ -346,7 +349,7 @@ abstract class HoodieBaseRelation(val sqlContext: SQLContext,
|
||||
if (fileSplits.isEmpty) {
|
||||
sparkSession.sparkContext.emptyRDD
|
||||
} else {
|
||||
val rdd = composeRDD(fileSplits, partitionSchema, dataSchema, requiredDataSchema, filters)
|
||||
val rdd = composeRDD(fileSplits, partitionSchema, dataSchema, requiredDataSchema, targetColumns, filters)
|
||||
|
||||
// NOTE: In case when partition columns have been pruned from the required schema, we have to project
|
||||
// the rows from the pruned schema back into the one expected by the caller
|
||||
@@ -369,17 +372,19 @@ abstract class HoodieBaseRelation(val sqlContext: SQLContext,
|
||||
/**
|
||||
* Composes RDD provided file splits to read from, table and partition schemas, data filters to be applied
|
||||
*
|
||||
* @param fileSplits file splits to be handled by the RDD
|
||||
* @param partitionSchema target table's partition schema
|
||||
* @param dataSchema target table's data files' schema
|
||||
* @param requiredSchema projected schema required by the reader
|
||||
* @param filters data filters to be applied
|
||||
* @param fileSplits file splits to be handled by the RDD
|
||||
* @param partitionSchema target table's partition schema
|
||||
* @param dataSchema target table's data files' schema
|
||||
* @param requiredSchema projected schema required by the reader
|
||||
* @param requestedColumns columns requested by the query
|
||||
* @param filters data filters to be applied
|
||||
* @return instance of RDD (implementing [[HoodieUnsafeRDD]])
|
||||
*/
|
||||
protected def composeRDD(fileSplits: Seq[FileSplit],
|
||||
partitionSchema: StructType,
|
||||
dataSchema: HoodieTableSchema,
|
||||
requiredSchema: HoodieTableSchema,
|
||||
requestedColumns: Array[String],
|
||||
filters: Array[Filter]): HoodieUnsafeRDD
|
||||
|
||||
/**
|
||||
@@ -551,37 +556,48 @@ abstract class HoodieBaseRelation(val sqlContext: SQLContext,
|
||||
requiredSchema: HoodieTableSchema,
|
||||
filters: Seq[Filter],
|
||||
options: Map[String, String],
|
||||
hadoopConf: Configuration): PartitionedFile => Iterator[InternalRow] = {
|
||||
val hfileReader = createHFileReader(
|
||||
spark = spark,
|
||||
dataSchema = dataSchema,
|
||||
requiredSchema = requiredSchema,
|
||||
filters = filters,
|
||||
options = options,
|
||||
hadoopConf = hadoopConf
|
||||
)
|
||||
hadoopConf: Configuration): BaseFileReader = {
|
||||
val tableBaseFileFormat = tableConfig.getBaseFileFormat
|
||||
|
||||
val parquetReader = HoodieDataSourceHelper.buildHoodieParquetReader(
|
||||
sparkSession = spark,
|
||||
dataSchema = dataSchema.structTypeSchema,
|
||||
partitionSchema = partitionSchema,
|
||||
requiredSchema = requiredSchema.structTypeSchema,
|
||||
filters = filters,
|
||||
options = options,
|
||||
hadoopConf = hadoopConf,
|
||||
// We're delegating to Spark to append partition values to every row only in cases
|
||||
// when these corresponding partition-values are not persisted w/in the data file itself
|
||||
appendPartitionValues = shouldExtractPartitionValuesFromPartitionPath
|
||||
)
|
||||
// NOTE: PLEASE READ CAREFULLY
|
||||
// Lambda returned from this method is going to be invoked on the executor, and therefore
|
||||
// we have to eagerly initialize all of the readers even though only one specific to the type
|
||||
// of the file being read will be used. This is required to avoid serialization of the whole
|
||||
// relation (containing file-index for ex) and passing it to the executor
|
||||
val reader = tableBaseFileFormat match {
|
||||
case HoodieFileFormat.PARQUET =>
|
||||
HoodieDataSourceHelper.buildHoodieParquetReader(
|
||||
sparkSession = spark,
|
||||
dataSchema = dataSchema.structTypeSchema,
|
||||
partitionSchema = partitionSchema,
|
||||
requiredSchema = requiredSchema.structTypeSchema,
|
||||
filters = filters,
|
||||
options = options,
|
||||
hadoopConf = hadoopConf,
|
||||
// We're delegating to Spark to append partition values to every row only in cases
|
||||
// when these corresponding partition-values are not persisted w/in the data file itself
|
||||
appendPartitionValues = shouldExtractPartitionValuesFromPartitionPath
|
||||
)
|
||||
|
||||
case HoodieFileFormat.HFILE =>
|
||||
createHFileReader(
|
||||
spark = spark,
|
||||
dataSchema = dataSchema,
|
||||
requiredSchema = requiredSchema,
|
||||
filters = filters,
|
||||
options = options,
|
||||
hadoopConf = hadoopConf
|
||||
)
|
||||
|
||||
case _ => throw new UnsupportedOperationException(s"Base file format is not currently supported ($tableBaseFileFormat)")
|
||||
}
|
||||
|
||||
partitionedFile => {
|
||||
val extension = FSUtils.getFileExtension(partitionedFile.filePath)
|
||||
if (HoodieFileFormat.PARQUET.getFileExtension.equals(extension)) {
|
||||
parquetReader.apply(partitionedFile)
|
||||
} else if (HoodieFileFormat.HFILE.getFileExtension.equals(extension)) {
|
||||
hfileReader.apply(partitionedFile)
|
||||
if (tableBaseFileFormat.getFileExtension.equals(extension)) {
|
||||
reader.apply(partitionedFile)
|
||||
} else {
|
||||
throw new UnsupportedOperationException(s"Base file format not supported by Spark DataSource ($partitionedFile)")
|
||||
throw new UnsupportedOperationException(s"Invalid base-file format ($extension), expected ($tableBaseFileFormat)")
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -629,6 +645,8 @@ abstract class HoodieBaseRelation(val sqlContext: SQLContext,
|
||||
|
||||
object HoodieBaseRelation extends SparkAdapterSupport {
|
||||
|
||||
type BaseFileReader = PartitionedFile => Iterator[InternalRow]
|
||||
|
||||
private def generateUnsafeProjection(from: StructType, to: StructType) =
|
||||
sparkAdapter.getCatalystExpressionUtils().generateUnsafeProjection(from, to)
|
||||
|
||||
@@ -676,7 +694,7 @@ object HoodieBaseRelation extends SparkAdapterSupport {
|
||||
requiredSchema: HoodieTableSchema,
|
||||
filters: Seq[Filter],
|
||||
options: Map[String, String],
|
||||
hadoopConf: Configuration): PartitionedFile => Iterator[InternalRow] = {
|
||||
hadoopConf: Configuration): BaseFileReader = {
|
||||
val hadoopConfBroadcast =
|
||||
spark.sparkContext.broadcast(new SerializableConfiguration(hadoopConf))
|
||||
|
||||
|
||||
@@ -23,6 +23,7 @@ import org.apache.avro.generic.{GenericRecord, GenericRecordBuilder, IndexedReco
|
||||
import org.apache.hadoop.conf.Configuration
|
||||
import org.apache.hadoop.fs.Path
|
||||
import org.apache.hadoop.mapred.JobConf
|
||||
import org.apache.hudi.HoodieBaseRelation.BaseFileReader
|
||||
import org.apache.hudi.HoodieConversionUtils.{toJavaOption, toScalaOption}
|
||||
import org.apache.hudi.HoodieMergeOnReadRDD.{AvroDeserializerSupport, collectFieldOrdinals, getPartitionPath, projectAvro, projectAvroUnsafe, projectRowUnsafe, resolveAvroSchemaNullability}
|
||||
import org.apache.hudi.common.config.{HoodieCommonConfig, HoodieMetadataConfig}
|
||||
@@ -55,11 +56,14 @@ import scala.util.Try
|
||||
|
||||
case class HoodieMergeOnReadPartition(index: Int, split: HoodieMergeOnReadFileSplit) extends Partition
|
||||
|
||||
case class HoodieMergeOnReadBaseFileReaders(fullSchemaFileReader: BaseFileReader,
|
||||
requiredSchemaFileReaderForMerging: BaseFileReader,
|
||||
requiredSchemaFileReaderForNoMerging: BaseFileReader)
|
||||
|
||||
class HoodieMergeOnReadRDD(@transient sc: SparkContext,
|
||||
@transient config: Configuration,
|
||||
fullSchemaFileReader: PartitionedFile => Iterator[InternalRow],
|
||||
requiredSchemaFileReader: PartitionedFile => Iterator[InternalRow],
|
||||
tableSchema: HoodieTableSchema,
|
||||
fileReaders: HoodieMergeOnReadBaseFileReaders,
|
||||
dataSchema: HoodieTableSchema,
|
||||
requiredSchema: HoodieTableSchema,
|
||||
tableState: HoodieTableState,
|
||||
mergeType: String,
|
||||
@@ -86,13 +90,13 @@ class HoodieMergeOnReadRDD(@transient sc: SparkContext,
|
||||
val mergeOnReadPartition = split.asInstanceOf[HoodieMergeOnReadPartition]
|
||||
val iter = mergeOnReadPartition.split match {
|
||||
case dataFileOnlySplit if dataFileOnlySplit.logFiles.isEmpty =>
|
||||
requiredSchemaFileReader.apply(dataFileOnlySplit.dataFile.get)
|
||||
fileReaders.requiredSchemaFileReaderForNoMerging.apply(dataFileOnlySplit.dataFile.get)
|
||||
|
||||
case logFileOnlySplit if logFileOnlySplit.dataFile.isEmpty =>
|
||||
new LogFileIterator(logFileOnlySplit, getConfig)
|
||||
|
||||
case split if mergeType.equals(DataSourceReadOptions.REALTIME_SKIP_MERGE_OPT_VAL) =>
|
||||
val baseFileIterator = requiredSchemaFileReader.apply(split.dataFile.get)
|
||||
val baseFileIterator = fileReaders.requiredSchemaFileReaderForNoMerging.apply(split.dataFile.get)
|
||||
new SkipMergeIterator(split, baseFileIterator, getConfig)
|
||||
|
||||
case split if mergeType.equals(DataSourceReadOptions.REALTIME_PAYLOAD_COMBINE_OPT_VAL) =>
|
||||
@@ -126,9 +130,9 @@ class HoodieMergeOnReadRDD(@transient sc: SparkContext,
|
||||
// then we can avoid reading and parsing the records w/ _full_ schema, and instead only
|
||||
// rely on projected one, nevertheless being able to perform merging correctly
|
||||
if (!whitelistedPayloadClasses.contains(tableState.recordPayloadClassName))
|
||||
(fullSchemaFileReader(split.dataFile.get), tableSchema)
|
||||
(fileReaders.fullSchemaFileReader(split.dataFile.get), dataSchema)
|
||||
else
|
||||
(requiredSchemaFileReader(split.dataFile.get), requiredSchema)
|
||||
(fileReaders.requiredSchemaFileReaderForMerging(split.dataFile.get), requiredSchema)
|
||||
}
|
||||
|
||||
override protected def getPartitions: Array[Partition] =
|
||||
@@ -152,7 +156,7 @@ class HoodieMergeOnReadRDD(@transient sc: SparkContext,
|
||||
protected override val requiredAvroSchema: Schema = new Schema.Parser().parse(requiredSchema.avroSchemaStr)
|
||||
protected override val requiredStructTypeSchema: StructType = requiredSchema.structTypeSchema
|
||||
|
||||
protected val logFileReaderAvroSchema: Schema = new Schema.Parser().parse(tableSchema.avroSchemaStr)
|
||||
protected val logFileReaderAvroSchema: Schema = new Schema.Parser().parse(dataSchema.avroSchemaStr)
|
||||
|
||||
protected val recordBuilder: GenericRecordBuilder = new GenericRecordBuilder(requiredAvroSchema)
|
||||
protected var recordToLoad: InternalRow = _
|
||||
@@ -167,7 +171,7 @@ class HoodieMergeOnReadRDD(@transient sc: SparkContext,
|
||||
private val requiredSchemaFieldOrdinals: List[Int] = collectFieldOrdinals(requiredAvroSchema, logFileReaderAvroSchema)
|
||||
|
||||
private var logScanner = {
|
||||
val internalSchema = tableSchema.internalSchema.getOrElse(InternalSchema.getEmptyInternalSchema)
|
||||
val internalSchema = dataSchema.internalSchema.getOrElse(InternalSchema.getEmptyInternalSchema)
|
||||
HoodieMergeOnReadRDD.scanLog(split.logFiles, getPartitionPath(split), logFileReaderAvroSchema, tableState,
|
||||
maxCompactionMemoryInBytes, config, internalSchema)
|
||||
}
|
||||
@@ -232,7 +236,7 @@ class HoodieMergeOnReadRDD(@transient sc: SparkContext,
|
||||
override def hasNext: Boolean = {
|
||||
if (baseFileIterator.hasNext) {
|
||||
val curRow = baseFileIterator.next()
|
||||
recordToLoad = unsafeProjection(curRow)
|
||||
recordToLoad = curRow
|
||||
true
|
||||
} else {
|
||||
super[LogFileIterator].hasNext
|
||||
|
||||
@@ -61,6 +61,7 @@ class MergeOnReadIncrementalRelation(sqlContext: SQLContext,
|
||||
partitionSchema: StructType,
|
||||
dataSchema: HoodieTableSchema,
|
||||
requiredSchema: HoodieTableSchema,
|
||||
requestedColumns: Array[String],
|
||||
filters: Array[Filter]): HoodieMergeOnReadRDD = {
|
||||
val fullSchemaParquetReader = createBaseFileReader(
|
||||
spark = sqlContext.sparkSession,
|
||||
@@ -81,23 +82,25 @@ class MergeOnReadIncrementalRelation(sqlContext: SQLContext,
|
||||
hadoopConf = embedInternalSchema(new Configuration(conf), internalSchemaOpt)
|
||||
)
|
||||
|
||||
val requiredSchemaParquetReader = createBaseFileReader(
|
||||
spark = sqlContext.sparkSession,
|
||||
partitionSchema = partitionSchema,
|
||||
dataSchema = dataSchema,
|
||||
requiredSchema = requiredSchema,
|
||||
filters = filters ++ incrementalSpanRecordFilters,
|
||||
options = optParams,
|
||||
// NOTE: We have to fork the Hadoop Config here as Spark will be modifying it
|
||||
// to configure Parquet reader appropriately
|
||||
hadoopConf = embedInternalSchema(new Configuration(conf), requiredSchema.internalSchema)
|
||||
)
|
||||
val (requiredSchemaBaseFileReaderMerging, requiredSchemaBaseFileReaderNoMerging) =
|
||||
createMergeOnReadBaseFileReaders(partitionSchema, dataSchema, requiredSchema, requestedColumns, filters ++ incrementalSpanRecordFilters)
|
||||
|
||||
val hoodieTableState = getTableState
|
||||
// TODO(HUDI-3639) implement incremental span record filtering w/in RDD to make sure returned iterator is appropriately
|
||||
// filtered, since file-reader might not be capable to perform filtering
|
||||
new HoodieMergeOnReadRDD(sqlContext.sparkContext, jobConf, fullSchemaParquetReader, requiredSchemaParquetReader,
|
||||
dataSchema, requiredSchema, hoodieTableState, mergeType, fileSplits)
|
||||
new HoodieMergeOnReadRDD(
|
||||
sqlContext.sparkContext,
|
||||
config = jobConf,
|
||||
fileReaders = HoodieMergeOnReadBaseFileReaders(
|
||||
fullSchemaFileReader = fullSchemaParquetReader,
|
||||
requiredSchemaFileReaderForMerging = requiredSchemaBaseFileReaderMerging,
|
||||
requiredSchemaFileReaderForNoMerging = requiredSchemaBaseFileReaderNoMerging
|
||||
),
|
||||
dataSchema = dataSchema,
|
||||
requiredSchema = requiredSchema,
|
||||
tableState = hoodieTableState,
|
||||
mergeType = mergeType,
|
||||
fileSplits = fileSplits)
|
||||
}
|
||||
|
||||
override protected def collectFileSplits(partitionFilters: Seq[Expression], dataFilters: Seq[Expression]): List[HoodieMergeOnReadFileSplit] = {
|
||||
|
||||
@@ -20,14 +20,17 @@ package org.apache.hudi
|
||||
|
||||
import org.apache.hadoop.conf.Configuration
|
||||
import org.apache.hadoop.fs.Path
|
||||
import org.apache.hudi.HoodieBaseRelation.{BaseFileReader, convertToAvroSchema}
|
||||
import org.apache.hudi.HoodieConversionUtils.toScalaOption
|
||||
import org.apache.hudi.MergeOnReadSnapshotRelation.getFilePath
|
||||
import org.apache.hudi.avro.HoodieAvroUtils
|
||||
import org.apache.hudi.common.fs.FSUtils.getRelativePartitionPath
|
||||
import org.apache.hudi.common.model.{FileSlice, HoodieLogFile}
|
||||
import org.apache.hudi.common.table.HoodieTableMetaClient
|
||||
import org.apache.hudi.common.table.view.HoodieTableFileSystemView
|
||||
import org.apache.spark.execution.datasources.HoodieInMemoryFileIndex
|
||||
import org.apache.spark.sql.SQLContext
|
||||
import org.apache.spark.sql.catalyst.InternalRow
|
||||
import org.apache.spark.sql.catalyst.expressions.Expression
|
||||
import org.apache.spark.sql.execution.datasources.PartitionedFile
|
||||
import org.apache.spark.sql.sources.Filter
|
||||
@@ -47,9 +50,27 @@ class MergeOnReadSnapshotRelation(sqlContext: SQLContext,
|
||||
|
||||
override type FileSplit = HoodieMergeOnReadFileSplit
|
||||
|
||||
override lazy val mandatoryFields: Seq[String] =
|
||||
/**
|
||||
* NOTE: These are the fields that are required to properly fulfil Merge-on-Read (MOR)
|
||||
* semantic:
|
||||
*
|
||||
* <ol>
|
||||
* <li>Primary key is required to make sure we're able to correlate records from the base
|
||||
* file with the updated records from the delta-log file</li>
|
||||
* <li>Pre-combine key is required to properly perform the combining (or merging) of the
|
||||
* existing and updated records</li>
|
||||
* </ol>
|
||||
*
|
||||
* However, in cases when merging is NOT performed (for ex, if file-group only contains base
|
||||
* files but no delta-log files, or if the query-type is equal to [["skip_merge"]]) neither
|
||||
* of primary-key or pre-combine-key are required to be fetched from storage (unless requested
|
||||
* by the query), therefore saving on throughput
|
||||
*/
|
||||
protected lazy val mandatoryFieldsForMerging: Seq[String] =
|
||||
Seq(recordKeyField) ++ preCombineFieldOpt.map(Seq(_)).getOrElse(Seq())
|
||||
|
||||
override lazy val mandatoryFields: Seq[String] = mandatoryFieldsForMerging
|
||||
|
||||
protected val mergeType: String = optParams.getOrElse(DataSourceReadOptions.REALTIME_MERGE.key,
|
||||
DataSourceReadOptions.REALTIME_MERGE.defaultValue)
|
||||
|
||||
@@ -62,8 +83,9 @@ class MergeOnReadSnapshotRelation(sqlContext: SQLContext,
|
||||
partitionSchema: StructType,
|
||||
dataSchema: HoodieTableSchema,
|
||||
requiredSchema: HoodieTableSchema,
|
||||
requestedColumns: Array[String],
|
||||
filters: Array[Filter]): HoodieMergeOnReadRDD = {
|
||||
val fullSchemaParquetReader = createBaseFileReader(
|
||||
val fullSchemaBaseFileReader = createBaseFileReader(
|
||||
spark = sqlContext.sparkSession,
|
||||
partitionSchema = partitionSchema,
|
||||
dataSchema = dataSchema,
|
||||
@@ -79,21 +101,23 @@ class MergeOnReadSnapshotRelation(sqlContext: SQLContext,
|
||||
hadoopConf = embedInternalSchema(new Configuration(conf), internalSchemaOpt)
|
||||
)
|
||||
|
||||
val requiredSchemaParquetReader = createBaseFileReader(
|
||||
spark = sqlContext.sparkSession,
|
||||
partitionSchema = partitionSchema,
|
||||
dataSchema = dataSchema,
|
||||
requiredSchema = requiredSchema,
|
||||
filters = filters,
|
||||
options = optParams,
|
||||
// NOTE: We have to fork the Hadoop Config here as Spark will be modifying it
|
||||
// to configure Parquet reader appropriately
|
||||
hadoopConf = embedInternalSchema(new Configuration(conf), requiredSchema.internalSchema)
|
||||
)
|
||||
val (requiredSchemaBaseFileReaderMerging, requiredSchemaBaseFileReaderNoMerging) =
|
||||
createMergeOnReadBaseFileReaders(partitionSchema, dataSchema, requiredSchema, requestedColumns, filters)
|
||||
|
||||
val tableState = getTableState
|
||||
new HoodieMergeOnReadRDD(sqlContext.sparkContext, jobConf, fullSchemaParquetReader, requiredSchemaParquetReader,
|
||||
dataSchema, requiredSchema, tableState, mergeType, fileSplits)
|
||||
new HoodieMergeOnReadRDD(
|
||||
sqlContext.sparkContext,
|
||||
config = jobConf,
|
||||
fileReaders = HoodieMergeOnReadBaseFileReaders(
|
||||
fullSchemaFileReader = fullSchemaBaseFileReader,
|
||||
requiredSchemaFileReaderForMerging = requiredSchemaBaseFileReaderMerging,
|
||||
requiredSchemaFileReaderForNoMerging = requiredSchemaBaseFileReaderNoMerging
|
||||
),
|
||||
dataSchema = dataSchema,
|
||||
requiredSchema = requiredSchema,
|
||||
tableState = tableState,
|
||||
mergeType = mergeType,
|
||||
fileSplits = fileSplits)
|
||||
}
|
||||
|
||||
protected override def collectFileSplits(partitionFilters: Seq[Expression], dataFilters: Seq[Expression]): List[HoodieMergeOnReadFileSplit] = {
|
||||
@@ -122,6 +146,61 @@ class MergeOnReadSnapshotRelation(sqlContext: SQLContext,
|
||||
HoodieMergeOnReadFileSplit(partitionedBaseFile, logFiles)
|
||||
}.toList
|
||||
}
|
||||
|
||||
protected def createMergeOnReadBaseFileReaders(partitionSchema: StructType,
|
||||
dataSchema: HoodieTableSchema,
|
||||
requiredDataSchema: HoodieTableSchema,
|
||||
requestedColumns: Array[String],
|
||||
filters: Array[Filter]): (BaseFileReader, BaseFileReader) = {
|
||||
val requiredSchemaFileReaderMerging = createBaseFileReader(
|
||||
spark = sqlContext.sparkSession,
|
||||
partitionSchema = partitionSchema,
|
||||
dataSchema = dataSchema,
|
||||
requiredSchema = requiredDataSchema,
|
||||
filters = filters,
|
||||
options = optParams,
|
||||
// NOTE: We have to fork the Hadoop Config here as Spark will be modifying it
|
||||
// to configure Parquet reader appropriately
|
||||
hadoopConf = embedInternalSchema(new Configuration(conf), requiredDataSchema.internalSchema)
|
||||
)
|
||||
|
||||
// Check whether fields required for merging were also requested to be fetched
|
||||
// by the query:
|
||||
// - In case they were, there's no optimization we could apply here (we will have
|
||||
// to fetch such fields)
|
||||
// - In case they were not, we will provide 2 separate file-readers
|
||||
// a) One which would be applied to file-groups w/ delta-logs (merging)
|
||||
// b) One which would be applied to file-groups w/ no delta-logs or
|
||||
// in case query-mode is skipping merging
|
||||
val requiredColumns = mandatoryFieldsForMerging.map(HoodieAvroUtils.getRootLevelFieldName)
|
||||
if (requiredColumns.forall(requestedColumns.contains)) {
|
||||
(requiredSchemaFileReaderMerging, requiredSchemaFileReaderMerging)
|
||||
} else {
|
||||
val prunedRequiredSchema = {
|
||||
val superfluousColumnNames = requiredColumns.filterNot(requestedColumns.contains)
|
||||
val prunedStructSchema =
|
||||
StructType(requiredDataSchema.structTypeSchema.fields
|
||||
.filterNot(f => superfluousColumnNames.contains(f.name)))
|
||||
|
||||
HoodieTableSchema(prunedStructSchema, convertToAvroSchema(prunedStructSchema).toString)
|
||||
}
|
||||
|
||||
val requiredSchemaFileReaderNoMerging = createBaseFileReader(
|
||||
spark = sqlContext.sparkSession,
|
||||
partitionSchema = partitionSchema,
|
||||
dataSchema = dataSchema,
|
||||
requiredSchema = prunedRequiredSchema,
|
||||
filters = filters,
|
||||
options = optParams,
|
||||
// NOTE: We have to fork the Hadoop Config here as Spark will be modifying it
|
||||
// to configure Parquet reader appropriately
|
||||
hadoopConf = embedInternalSchema(new Configuration(conf), requiredDataSchema.internalSchema)
|
||||
)
|
||||
|
||||
(requiredSchemaFileReaderMerging, requiredSchemaFileReaderNoMerging)
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
object MergeOnReadSnapshotRelation {
|
||||
|
||||
@@ -51,6 +51,4 @@ class TestHoodieRelations {
|
||||
requiredStructSchema.fields.toSeq
|
||||
)
|
||||
}
|
||||
|
||||
|
||||
}
|
||||
|
||||
@@ -54,7 +54,7 @@ class TestParquetColumnProjection extends SparkClientFunctionalTestHarness with
|
||||
DataSourceWriteOptions.KEYGENERATOR_CLASS_NAME.key -> classOf[NonpartitionedKeyGenerator].getName
|
||||
)
|
||||
|
||||
@Disabled("HUDI-3896")
|
||||
@Disabled("Currently disabled b/c of the fallback to HadoopFsRelation")
|
||||
@Test
|
||||
def testBaseFileOnlyViewRelation(): Unit = {
|
||||
val tablePath = s"$basePath/cow"
|
||||
|
||||
@@ -24,6 +24,7 @@ import org.apache.hudi.config.HoodieWriteConfig
|
||||
import org.apache.hudi.exception.HoodieDuplicateKeyException
|
||||
import org.apache.hudi.keygen.ComplexKeyGenerator
|
||||
import org.apache.spark.sql.SaveMode
|
||||
import org.apache.spark.sql.internal.SQLConf
|
||||
|
||||
import java.io.File
|
||||
|
||||
|
||||
Reference in New Issue
Block a user