1
0

[HUDI-3457] Refactored Spark DataSource Relations to avoid code duplication (#4877)

Refactoring Spark DataSource Relations to avoid code duplication. 

Following Relations were in scope:

- BaseFileOnlyViewRelation
- MergeOnReadSnapshotRelaation
- MergeOnReadIncrementalRelation
This commit is contained in:
Alexey Kudinkin
2022-03-18 22:32:16 -07:00
committed by GitHub
parent 316e38c71e
commit 099c2c099a
22 changed files with 1051 additions and 572 deletions

View File

@@ -0,0 +1,29 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi
object HoodieConversionUtils {
def toJavaOption[T](opt: Option[T]): org.apache.hudi.common.util.Option[T] =
if (opt.isDefined) org.apache.hudi.common.util.Option.of(opt.get) else org.apache.hudi.common.util.Option.empty()
def toScalaOption[T](opt: org.apache.hudi.common.util.Option[T]): Option[T] =
if (opt.isPresent) Some(opt.get) else None
}

View File

@@ -118,11 +118,6 @@ object HoodieSparkUtils extends SparkAdapterSupport {
})
}
def createInMemoryFileIndex(sparkSession: SparkSession, globbedPaths: Seq[Path]): InMemoryFileIndex = {
val fileStatusCache = FileStatusCache.getOrCreate(sparkSession)
new InMemoryFileIndex(sparkSession, globbedPaths, Map(), Option.empty, fileStatusCache)
}
/**
* @deprecated please use other overload [[createRdd]]
*/

View File

@@ -198,7 +198,7 @@ public abstract class BaseHoodieTableFileIndex {
// that is under the pending compaction process, new log-file will bear the compaction's instant (on the
// timeline) in its name, as opposed to the base-file's commit instant. To make sure we're not filtering
// such log-file we have to _always_ include pending compaction instants into consideration
// TODO(HUDI-3302) re-evaluate whether we should not filter any commits in here
// TODO(HUDI-3302) re-evaluate whether we should filter any commits in here
HoodieTimeline timeline = metaClient.getCommitsAndCompactionTimeline();
if (shouldIncludePendingCommits) {
return timeline;

View File

@@ -509,19 +509,16 @@ public class TableSchemaResolver {
* @return
*/
public static MessageType readSchemaFromLogFile(FileSystem fs, Path path) throws IOException {
Reader reader = HoodieLogFormat.newReader(fs, new HoodieLogFile(path), null);
HoodieDataBlock lastBlock = null;
while (reader.hasNext()) {
HoodieLogBlock block = reader.next();
if (block instanceof HoodieDataBlock) {
lastBlock = (HoodieDataBlock) block;
try (Reader reader = HoodieLogFormat.newReader(fs, new HoodieLogFile(path), null)) {
HoodieDataBlock lastBlock = null;
while (reader.hasNext()) {
HoodieLogBlock block = reader.next();
if (block instanceof HoodieDataBlock) {
lastBlock = (HoodieDataBlock) block;
}
}
return lastBlock != null ? new AvroSchemaConverter().convert(lastBlock.getSchema()) : null;
}
reader.close();
if (lastBlock != null) {
return new AvroSchemaConverter().convert(lastBlock.getSchema());
}
return null;
}
public boolean isHasOperationField() {

View File

@@ -19,21 +19,11 @@
package org.apache.hudi.hadoop.utils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.serde2.ColumnProjectionUtils;
import org.apache.hadoop.mapred.FileSplit;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.FileSlice;
import org.apache.hudi.common.model.HoodieBaseFile;
import org.apache.hudi.common.model.HoodieLogFile;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.hadoop.realtime.HoodieRealtimeBootstrapBaseFileSplit;
import org.apache.hudi.hadoop.realtime.HoodieRealtimeFileSplit;
import org.apache.hudi.hadoop.realtime.HoodieVirtualKeyInfo;
@@ -41,14 +31,6 @@ import org.apache.hudi.hadoop.realtime.RealtimeSplit;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import static org.apache.hudi.TypeUtils.unsafeCast;
public class HoodieRealtimeInputFormatUtils extends HoodieInputFormatUtils {
@@ -67,41 +49,6 @@ public class HoodieRealtimeInputFormatUtils extends HoodieInputFormatUtils {
return false;
}
// Return parquet file with a list of log files in the same file group.
public static List<Pair<Option<HoodieBaseFile>, List<HoodieLogFile>>> groupLogsByBaseFile(Configuration conf, List<Path> partitionPaths) {
Set<Path> partitionSet = new HashSet<>(partitionPaths);
// TODO(vc): Should we handle also non-hoodie splits here?
Map<Path, HoodieTableMetaClient> partitionsToMetaClient = getTableMetaClientByPartitionPath(conf, partitionSet);
// Get all the base file and it's log files pairs in required partition paths.
List<Pair<Option<HoodieBaseFile>, List<HoodieLogFile>>> baseAndLogsList = new ArrayList<>();
partitionSet.forEach(partitionPath -> {
// for each partition path obtain the data & log file groupings, then map back to inputsplits
HoodieTableMetaClient metaClient = partitionsToMetaClient.get(partitionPath);
HoodieTableFileSystemView fsView = new HoodieTableFileSystemView(metaClient, metaClient.getActiveTimeline());
String relPartitionPath = FSUtils.getRelativePartitionPath(new Path(metaClient.getBasePath()), partitionPath);
try {
// Both commit and delta-commits are included - pick the latest completed one
Option<HoodieInstant> latestCompletedInstant =
metaClient.getCommitsAndCompactionTimeline().filterCompletedAndCompactionInstants().lastInstant();
Stream<FileSlice> latestFileSlices = latestCompletedInstant
.map(instant -> fsView.getLatestMergedFileSlicesBeforeOrOn(relPartitionPath, instant.getTimestamp()))
.orElse(Stream.empty());
latestFileSlices.forEach(fileSlice -> {
List<HoodieLogFile> logFilePaths = fileSlice.getLogFiles().sorted(HoodieLogFile.getLogFileComparator()).collect(Collectors.toList());
baseAndLogsList.add(Pair.of(fileSlice.getBaseFile(), logFilePaths));
});
} catch (Exception e) {
throw new HoodieException("Error obtaining data file/log file grouping: " + partitionPath, e);
}
});
return baseAndLogsList;
}
/**
* Add a field to the existing fields projected.
*/

View File

@@ -0,0 +1,94 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.apache.hudi.HoodieBaseRelation.createBaseFileReader
import org.apache.hudi.common.table.HoodieTableMetaClient
import org.apache.spark.sql.{HoodieCatalystExpressionUtils, SQLContext}
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.Expression
import org.apache.spark.sql.execution.datasources._
import org.apache.spark.sql.sources.{BaseRelation, Filter}
import org.apache.spark.sql.types.StructType
/**
* [[BaseRelation]] implementation only reading Base files of Hudi tables, essentially supporting following querying
* modes:
* <ul>
* <li>For COW tables: Snapshot</li>
* <li>For MOR tables: Read-optimized</li>
* </ul>
*
* NOTE: The reason this Relation is used in liue of Spark's default [[HadoopFsRelation]] is primarily due to the
* fact that it injects real partition's path as the value of the partition field, which Hudi ultimately persists
* as part of the record payload. In some cases, however, partition path might not necessarily be equal to the
* verbatim value of the partition path field (when custom [[KeyGenerator]] is used) therefore leading to incorrect
* partition field values being written
*/
class BaseFileOnlyRelation(sqlContext: SQLContext,
metaClient: HoodieTableMetaClient,
optParams: Map[String, String],
userSchema: Option[StructType],
globPaths: Seq[Path])
extends HoodieBaseRelation(sqlContext, metaClient, optParams, userSchema) with SparkAdapterSupport {
override type FileSplit = HoodieBaseFileSplit
protected override def composeRDD(fileSplits: Seq[HoodieBaseFileSplit],
partitionSchema: StructType,
tableSchema: HoodieTableSchema,
requiredSchema: HoodieTableSchema,
filters: Array[Filter]): HoodieUnsafeRDD = {
val baseFileReader = createBaseFileReader(
spark = sparkSession,
partitionSchema = partitionSchema,
tableSchema = tableSchema,
requiredSchema = requiredSchema,
filters = filters,
options = optParams,
// NOTE: We have to fork the Hadoop Config here as Spark will be modifying it
// to configure Parquet reader appropriately
hadoopConf = new Configuration(conf)
)
new HoodieFileScanRDD(sparkSession, baseFileReader, fileSplits)
}
protected def collectFileSplits(partitionFilters: Seq[Expression], dataFilters: Seq[Expression]): Seq[HoodieBaseFileSplit] = {
val partitions = listLatestBaseFiles(globPaths, partitionFilters, dataFilters)
val fileSplits = partitions.values.toSeq.flatMap { files =>
files.flatMap { file =>
// TODO move to adapter
// TODO fix, currently assuming parquet as underlying format
HoodieDataSourceHelper.splitFiles(
sparkSession = sparkSession,
file = file,
// TODO clarify why this is required
partitionValues = InternalRow.empty
)
}
}
val maxSplitBytes = sparkSession.sessionState.conf.filesMaxPartitionBytes
sparkAdapter.getFilePartitions(sparkSession, fileSplits, maxSplitBytes).map(HoodieBaseFileSplit.apply)
}
}

View File

@@ -1,141 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.apache.hudi.HoodieBaseRelation.createBaseFileReader
import org.apache.hudi.common.table.HoodieTableMetaClient
import org.apache.hudi.hadoop.HoodieROTablePathFilter
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{HoodieCatalystExpressionUtils, SQLContext}
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.Expression
import org.apache.spark.sql.execution.datasources._
import org.apache.spark.sql.sources.{BaseRelation, Filter}
import org.apache.spark.sql.types.StructType
/**
* [[BaseRelation]] implementation only reading Base files of Hudi tables, essentially supporting following querying
* modes:
* <ul>
* <li>For COW tables: Snapshot</li>
* <li>For MOR tables: Read-optimized</li>
* </ul>
*
* NOTE: The reason this Relation is used in liue of Spark's default [[HadoopFsRelation]] is primarily due to the
* fact that it injects real partition's path as the value of the partition field, which Hudi ultimately persists
* as part of the record payload. In some cases, however, partition path might not necessarily be equal to the
* verbatim value of the partition path field (when custom [[KeyGenerator]] is used) therefore leading to incorrect
* partition field values being written
*/
class BaseFileOnlyViewRelation(sqlContext: SQLContext,
metaClient: HoodieTableMetaClient,
optParams: Map[String, String],
userSchema: Option[StructType],
globPaths: Seq[Path])
extends HoodieBaseRelation(sqlContext, metaClient, optParams, userSchema) with SparkAdapterSupport {
private val fileIndex = HoodieFileIndex(sparkSession, metaClient, userSchema, optParams,
FileStatusCache.getOrCreate(sqlContext.sparkSession))
override def doBuildScan(requiredColumns: Array[String], filters: Array[Filter]): RDD[InternalRow] = {
// NOTE: In case list of requested columns doesn't contain the Primary Key one, we
// have to add it explicitly so that
// - Merging could be performed correctly
// - In case 0 columns are to be fetched (for ex, when doing {@code count()} on Spark's [[Dataset]],
// Spark still fetches all the rows to execute the query correctly
//
// It's okay to return columns that have not been requested by the caller, as those nevertheless will be
// filtered out upstream
val fetchedColumns: Array[String] = appendMandatoryColumns(requiredColumns)
val (requiredAvroSchema, requiredStructSchema) =
HoodieSparkUtils.getRequiredSchema(tableAvroSchema, fetchedColumns)
val filterExpressions = convertToExpressions(filters)
val (partitionFilters, dataFilters) = HoodieCatalystExpressionUtils.splitPartitionAndDataPredicates(
sparkSession, filterExpressions, partitionColumns)
val filePartitions = getPartitions(partitionFilters, dataFilters)
val partitionSchema = StructType(Nil)
val tableSchema = HoodieTableSchema(tableStructSchema, tableAvroSchema.toString)
val requiredSchema = HoodieTableSchema(requiredStructSchema, requiredAvroSchema.toString)
val baseFileReader = createBaseFileReader(
spark = sparkSession,
partitionSchema = partitionSchema,
tableSchema = tableSchema,
requiredSchema = requiredSchema,
filters = filters,
options = optParams,
// NOTE: We have to fork the Hadoop Config here as Spark will be modifying it
// to configure Parquet reader appropriately
hadoopConf = new Configuration(conf)
)
new HoodieFileScanRDD(sparkSession, baseFileReader, filePartitions)
}
private def getPartitions(partitionFilters: Seq[Expression], dataFilters: Seq[Expression]): Seq[FilePartition] = {
val partitionDirectories = if (globPaths.isEmpty) {
val hoodieFileIndex = HoodieFileIndex(sparkSession, metaClient, userSchema, optParams,
FileStatusCache.getOrCreate(sqlContext.sparkSession))
hoodieFileIndex.listFiles(partitionFilters, dataFilters)
} else {
sqlContext.sparkContext.hadoopConfiguration.setClass(
"mapreduce.input.pathFilter.class",
classOf[HoodieROTablePathFilter],
classOf[org.apache.hadoop.fs.PathFilter])
val inMemoryFileIndex = HoodieSparkUtils.createInMemoryFileIndex(sparkSession, globPaths)
inMemoryFileIndex.listFiles(partitionFilters, dataFilters)
}
val partitions = partitionDirectories.flatMap { partition =>
partition.files.flatMap { file =>
// TODO move to adapter
// TODO fix, currently assuming parquet as underlying format
HoodieDataSourceHelper.splitFiles(
sparkSession = sparkSession,
file = file,
// TODO clarify why this is required
partitionValues = InternalRow.empty
)
}
}
val maxSplitBytes = sparkSession.sessionState.conf.filesMaxPartitionBytes
sparkAdapter.getFilePartitions(sparkSession, partitions, maxSplitBytes)
}
private def convertToExpressions(filters: Array[Filter]): Array[Expression] = {
val catalystExpressions = HoodieSparkUtils.convertToCatalystExpressions(filters, tableStructSchema)
val failedExprs = catalystExpressions.zipWithIndex.filter { case (opt, _) => opt.isEmpty }
if (failedExprs.nonEmpty) {
val failedFilters = failedExprs.map(p => filters(p._2))
logWarning(s"Failed to convert Filters into Catalyst expressions (${failedFilters.map(_.toString)})")
}
catalystExpressions.filter(_.isDefined).map(_.get).toArray
}
}

View File

@@ -107,7 +107,7 @@ class DefaultSource extends RelationProvider
case (COPY_ON_WRITE, QUERY_TYPE_SNAPSHOT_OPT_VAL, false) |
(COPY_ON_WRITE, QUERY_TYPE_READ_OPTIMIZED_OPT_VAL, false) |
(MERGE_ON_READ, QUERY_TYPE_READ_OPTIMIZED_OPT_VAL, false) =>
new BaseFileOnlyViewRelation(sqlContext, metaClient, parameters, userSchema, globPaths)
new BaseFileOnlyRelation(sqlContext, metaClient, parameters, userSchema, globPaths)
case (COPY_ON_WRITE, QUERY_TYPE_INCREMENTAL_OPT_VAL, _) =>
new IncrementalRelation(sqlContext, parameters, userSchema, metaClient)

View File

@@ -20,22 +20,28 @@ package org.apache.hudi
import org.apache.avro.Schema
import org.apache.avro.generic.GenericRecord
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.apache.hadoop.fs.{FileStatus, Path, PathFilter}
import org.apache.hadoop.hbase.io.hfile.CacheConfig
import org.apache.hadoop.mapred.JobConf
import org.apache.hudi.HoodieBaseRelation.isMetadataTable
import org.apache.hudi.HoodieBaseRelation.{getPartitionPath, isMetadataTable}
import org.apache.hudi.HoodieConversionUtils.toScalaOption
import org.apache.hudi.common.config.SerializableConfiguration
import org.apache.hudi.common.fs.FSUtils
import org.apache.hudi.common.model.{HoodieFileFormat, HoodieRecord}
import org.apache.hudi.common.table.{HoodieTableMetaClient, TableSchemaResolver}
import org.apache.hudi.common.table.timeline.{HoodieInstant, HoodieTimeline}
import org.apache.hudi.common.table.view.HoodieTableFileSystemView
import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient, TableSchemaResolver}
import org.apache.hudi.common.util.StringUtils
import org.apache.hudi.hadoop.HoodieROTablePathFilter
import org.apache.hudi.io.storage.HoodieHFileReader
import org.apache.hudi.metadata.{HoodieMetadataPayload, HoodieTableMetadata}
import org.apache.spark.execution.datasources.HoodieInMemoryFileIndex
import org.apache.spark.internal.Logging
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.avro.SchemaConverters
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.execution.datasources.PartitionedFile
import org.apache.spark.sql.catalyst.expressions.{Expression, SubqueryExpression}
import org.apache.spark.sql.execution.datasources.{FileStatusCache, PartitionDirectory, PartitionedFile}
import org.apache.spark.sql.hudi.HoodieSqlCommonUtils
import org.apache.spark.sql.sources.{BaseRelation, Filter, PrunedFilteredScan}
import org.apache.spark.sql.types.StructType
@@ -44,6 +50,8 @@ import org.apache.spark.sql.{Row, SQLContext, SparkSession}
import scala.collection.JavaConverters._
import scala.util.Try
trait HoodieFileSplit {}
case class HoodieTableSchema(structTypeSchema: StructType, avroSchemaStr: String)
case class HoodieTableState(recordKeyField: String,
@@ -53,36 +61,33 @@ case class HoodieTableState(recordKeyField: String,
* Hoodie BaseRelation which extends [[PrunedFilteredScan]].
*/
abstract class HoodieBaseRelation(val sqlContext: SQLContext,
metaClient: HoodieTableMetaClient,
optParams: Map[String, String],
val metaClient: HoodieTableMetaClient,
val optParams: Map[String, String],
userSchema: Option[StructType])
extends BaseRelation with PrunedFilteredScan with Logging {
type FileSplit <: HoodieFileSplit
imbueConfigs(sqlContext)
protected val sparkSession: SparkSession = sqlContext.sparkSession
protected lazy val conf: Configuration = new Configuration(sqlContext.sparkContext.hadoopConfiguration)
protected lazy val jobConf = new JobConf(conf)
protected lazy val tableConfig: HoodieTableConfig = metaClient.getTableConfig
protected lazy val basePath: String = metaClient.getBasePath
// If meta fields are enabled, always prefer key from the meta field as opposed to user-specified one
// NOTE: This is historical behavior which is preserved as is
protected lazy val recordKeyField: String =
if (metaClient.getTableConfig.populateMetaFields()) HoodieRecord.RECORD_KEY_METADATA_FIELD
else metaClient.getTableConfig.getRecordKeyFieldProp
if (tableConfig.populateMetaFields()) HoodieRecord.RECORD_KEY_METADATA_FIELD
else tableConfig.getRecordKeyFieldProp
protected lazy val preCombineFieldOpt: Option[String] = getPrecombineFieldProperty
/**
* @VisibleInTests
*/
lazy val mandatoryColumns: Seq[String] = {
if (isMetadataTable(metaClient)) {
Seq(HoodieMetadataPayload.KEY_FIELD_NAME, HoodieMetadataPayload.SCHEMA_FIELD_NAME_TYPE)
} else {
Seq(recordKeyField) ++ preCombineFieldOpt.map(Seq(_)).getOrElse(Seq())
}
}
protected lazy val specifiedQueryInstant: Option[String] =
protected lazy val specifiedQueryTimestamp: Option[String] =
optParams.get(DataSourceReadOptions.TIME_TRAVEL_AS_OF_INSTANT.key)
.map(HoodieSqlCommonUtils.formatQueryInstant)
@@ -100,25 +105,49 @@ abstract class HoodieBaseRelation(val sqlContext: SQLContext,
protected val tableStructSchema: StructType = AvroConversionUtils.convertAvroSchemaToStructType(tableAvroSchema)
protected val partitionColumns: Array[String] = metaClient.getTableConfig.getPartitionFields.orElse(Array.empty)
protected val partitionColumns: Array[String] = tableConfig.getPartitionFields.orElse(Array.empty)
protected def getPrecombineFieldProperty: Option[String] =
Option(metaClient.getTableConfig.getPreCombineField)
.orElse(optParams.get(DataSourceWriteOptions.PRECOMBINE_FIELD.key)) match {
// NOTE: This is required to compensate for cases when empty string is used to stub
// property value to avoid it being set with the default value
// TODO(HUDI-3456) cleanup
case Some(f) if !StringUtils.isNullOrEmpty(f) => Some(f)
case _ => None
/**
* NOTE: PLEASE READ THIS CAREFULLY
*
* Even though [[HoodieFileIndex]] initializes eagerly listing all of the files w/in the given Hudi table,
* this variable itself is _lazy_ (and have to stay that way) which guarantees that it's not initialized, until
* it's actually accessed
*/
protected lazy val fileIndex: HoodieFileIndex =
HoodieFileIndex(sparkSession, metaClient, Some(tableStructSchema), optParams,
FileStatusCache.getOrCreate(sparkSession))
/**
* @VisibleInTests
*/
lazy val mandatoryColumns: Seq[String] = {
if (isMetadataTable(metaClient)) {
Seq(HoodieMetadataPayload.KEY_FIELD_NAME, HoodieMetadataPayload.SCHEMA_FIELD_NAME_TYPE)
} else {
// TODO this is MOR table requirement, not necessary for COW
Seq(recordKeyField) ++ preCombineFieldOpt.map(Seq(_)).getOrElse(Seq())
}
}
protected def timeline: HoodieTimeline =
// NOTE: We're including compaction here since it's not considering a "commit" operation
metaClient.getCommitsAndCompactionTimeline.filterCompletedInstants
protected def latestInstant: Option[HoodieInstant] =
toScalaOption(timeline.lastInstant())
protected def queryTimestamp: Option[String] = {
specifiedQueryTimestamp.orElse(toScalaOption(timeline.lastInstant()).map(i => i.getTimestamp))
}
override def schema: StructType = tableStructSchema
/**
* This method controls whether relation will be producing
* <ul>
* <li>[[Row]], when it's being equal to true</li>
* <li>[[InternalRow]], when it's being equal to false</li>
* <li>[[Row]], when it's being equal to true</li>
* <li>[[InternalRow]], when it's being equal to false</li>
* </ul>
*
* Returning [[InternalRow]] directly enables us to save on needless ser/de loop from [[InternalRow]] (being
@@ -130,22 +159,129 @@ abstract class HoodieBaseRelation(val sqlContext: SQLContext,
* NOTE: DO NOT OVERRIDE THIS METHOD
*/
override final def buildScan(requiredColumns: Array[String], filters: Array[Filter]): RDD[Row] = {
// NOTE: In case list of requested columns doesn't contain the Primary Key one, we
// have to add it explicitly so that
// - Merging could be performed correctly
// - In case 0 columns are to be fetched (for ex, when doing {@code count()} on Spark's [[Dataset]],
// Spark still fetches all the rows to execute the query correctly
//
// It's okay to return columns that have not been requested by the caller, as those nevertheless will be
// filtered out upstream
val fetchedColumns: Array[String] = appendMandatoryColumns(requiredColumns)
val (requiredAvroSchema, requiredStructSchema) =
HoodieSparkUtils.getRequiredSchema(tableAvroSchema, fetchedColumns)
val filterExpressions = convertToExpressions(filters)
val (partitionFilters, dataFilters) = filterExpressions.partition(isPartitionPredicate)
val fileSplits = collectFileSplits(partitionFilters, dataFilters)
val partitionSchema = StructType(Nil)
val tableSchema = HoodieTableSchema(tableStructSchema, tableAvroSchema.toString)
val requiredSchema = HoodieTableSchema(requiredStructSchema, requiredAvroSchema.toString)
// Here we rely on a type erasure, to workaround inherited API restriction and pass [[RDD[InternalRow]]] back as [[RDD[Row]]]
// Please check [[needConversion]] scala-doc for more details
doBuildScan(requiredColumns, filters).asInstanceOf[RDD[Row]]
if (fileSplits.nonEmpty)
composeRDD(fileSplits, partitionSchema, tableSchema, requiredSchema, filters).asInstanceOf[RDD[Row]]
else
sparkSession.sparkContext.emptyRDD
}
protected def doBuildScan(requiredColumns: Array[String], filters: Array[Filter]): RDD[InternalRow]
/**
* Composes RDD provided file splits to read from, table and partition schemas, data filters to be applied
*
* @param fileSplits file splits to be handled by the RDD
* @param partitionSchema target table's partition schema
* @param tableSchema target table's schema
* @param requiredSchema projected schema required by the reader
* @param filters data filters to be applied
* @return instance of RDD (implementing [[HoodieUnsafeRDD]])
*/
protected def composeRDD(fileSplits: Seq[FileSplit],
partitionSchema: StructType,
tableSchema: HoodieTableSchema,
requiredSchema: HoodieTableSchema,
filters: Array[Filter]): HoodieUnsafeRDD
/**
* Provided with partition and date filters collects target file splits to read records from, while
* performing pruning if necessary
*
* @param partitionFilters partition filters to be applied
* @param dataFilters data filters to be applied
* @return list of [[FileSplit]] to fetch records from
*/
protected def collectFileSplits(partitionFilters: Seq[Expression], dataFilters: Seq[Expression]): Seq[FileSplit]
protected def listLatestBaseFiles(globbedPaths: Seq[Path], partitionFilters: Seq[Expression], dataFilters: Seq[Expression]): Map[Path, Seq[FileStatus]] = {
val partitionDirs = if (globbedPaths.isEmpty) {
fileIndex.listFiles(partitionFilters, dataFilters)
} else {
val inMemoryFileIndex = HoodieInMemoryFileIndex.create(sparkSession, globbedPaths)
inMemoryFileIndex.listFiles(partitionFilters, dataFilters)
}
val fsView = new HoodieTableFileSystemView(metaClient, timeline, partitionDirs.flatMap(_.files).toArray)
val latestBaseFiles = fsView.getLatestBaseFiles.iterator().asScala.toList.map(_.getFileStatus)
latestBaseFiles.groupBy(getPartitionPath)
}
protected def convertToExpressions(filters: Array[Filter]): Array[Expression] = {
val catalystExpressions = HoodieSparkUtils.convertToCatalystExpressions(filters, tableStructSchema)
val failedExprs = catalystExpressions.zipWithIndex.filter { case (opt, _) => opt.isEmpty }
if (failedExprs.nonEmpty) {
val failedFilters = failedExprs.map(p => filters(p._2))
logWarning(s"Failed to convert Filters into Catalyst expressions (${failedFilters.map(_.toString)})")
}
catalystExpressions.filter(_.isDefined).map(_.get).toArray
}
/**
* Checks whether given expression only references partition columns
* (and involves no sub-query)
*/
protected def isPartitionPredicate(condition: Expression): Boolean = {
// Validates that the provided names both resolve to the same entity
val resolvedNameEquals = sparkSession.sessionState.analyzer.resolver
condition.references.forall { r => partitionColumns.exists(resolvedNameEquals(r.name, _)) } &&
!SubqueryExpression.hasSubquery(condition)
}
protected final def appendMandatoryColumns(requestedColumns: Array[String]): Array[String] = {
val missing = mandatoryColumns.filter(col => !requestedColumns.contains(col))
requestedColumns ++ missing
}
private def getPrecombineFieldProperty: Option[String] =
Option(tableConfig.getPreCombineField)
.orElse(optParams.get(DataSourceWriteOptions.PRECOMBINE_FIELD.key)) match {
// NOTE: This is required to compensate for cases when empty string is used to stub
// property value to avoid it being set with the default value
// TODO(HUDI-3456) cleanup
case Some(f) if !StringUtils.isNullOrEmpty(f) => Some(f)
case _ => None
}
private def imbueConfigs(sqlContext: SQLContext): Unit = {
sqlContext.sparkSession.sessionState.conf.setConfString("spark.sql.parquet.filterPushdown", "true")
sqlContext.sparkSession.sessionState.conf.setConfString("spark.sql.parquet.recordLevelFilter.enabled", "true")
// TODO(HUDI-3639) vectorized reader has to be disabled to make sure MORIncrementalRelation is working properly
sqlContext.sparkSession.sessionState.conf.setConfString("spark.sql.parquet.enableVectorizedReader", "false")
}
}
object HoodieBaseRelation {
def isMetadataTable(metaClient: HoodieTableMetaClient) =
def getPartitionPath(fileStatus: FileStatus): Path =
fileStatus.getPath.getParent
def isMetadataTable(metaClient: HoodieTableMetaClient): Boolean =
HoodieTableMetadata.isMetadataTable(metaClient.getBasePath)
/**

View File

@@ -23,6 +23,7 @@ import org.apache.hudi.common.model.HoodieBaseFile
import org.apache.hudi.common.table.{HoodieTableMetaClient, TableSchemaResolver}
import org.apache.hudi.common.table.view.HoodieTableFileSystemView
import org.apache.hudi.exception.HoodieException
import org.apache.spark.execution.datasources.HoodieInMemoryFileIndex
import org.apache.spark.internal.Logging
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.InternalRow
@@ -157,7 +158,7 @@ class HoodieBootstrapRelation(@transient val _sqlContext: SQLContext,
logInfo("Building file index..")
val fileStatuses = if (globPaths.nonEmpty) {
// Load files from the global paths if it has defined to be compatible with the original mode
val inMemoryFileIndex = HoodieSparkUtils.createInMemoryFileIndex(_sqlContext.sparkSession, globPaths)
val inMemoryFileIndex = HoodieInMemoryFileIndex.create(_sqlContext.sparkSession, globPaths)
inMemoryFileIndex.allFiles()
} else { // Load files by the HoodieFileIndex.
HoodieFileIndex(sqlContext.sparkSession, metaClient, Some(schema), optParams,

View File

@@ -65,20 +65,6 @@ object HoodieDataSourceHelper extends PredicateHelper {
}
}
/**
* Extract the required schema from [[InternalRow]]
*/
def extractRequiredSchema(
iter: Iterator[InternalRow],
requiredSchema: StructType,
requiredFieldPos: Seq[Int]): Iterator[InternalRow] = {
val unsafeProjection = UnsafeProjection.create(requiredSchema)
val rows = iter.map { row =>
unsafeProjection(createInternalRowWithSchema(row, requiredSchema, requiredFieldPos))
}
rows
}
/**
* Convert [[InternalRow]] to [[SpecificInternalRow]].
*/

View File

@@ -348,7 +348,7 @@ object HoodieFileIndex extends Logging {
}
} catch {
case NonFatal(e) =>
logWarning("Fail to convert filters for TimestampBaseAvroKeyGenerator.")
logWarning("Fail to convert filters for TimestampBaseAvroKeyGenerator", e)
partitionFilters
}
}

View File

@@ -24,12 +24,14 @@ import org.apache.spark.sql.execution.QueryExecutionException
import org.apache.spark.sql.execution.datasources.{FilePartition, PartitionedFile, SchemaColumnConvertNotSupportedException}
import org.apache.spark.{Partition, TaskContext}
case class HoodieBaseFileSplit(filePartition: FilePartition) extends HoodieFileSplit
/**
* TODO eval if we actually need it
*/
class HoodieFileScanRDD(@transient private val sparkSession: SparkSession,
readFunction: PartitionedFile => Iterator[InternalRow],
@transient fileSplits: Seq[FilePartition])
@transient fileSplits: Seq[HoodieBaseFileSplit])
extends HoodieUnsafeRDD(sparkSession.sparkContext) {
override def compute(split: Partition, context: TaskContext): Iterator[InternalRow] = {
@@ -77,5 +79,5 @@ class HoodieFileScanRDD(@transient private val sparkSession: SparkSession,
iterator.asInstanceOf[Iterator[InternalRow]]
}
override protected def getPartitions: Array[Partition] = fileSplits.toArray
override protected def getPartitions: Array[Partition] = fileSplits.map(_.filePartition).toArray
}

View File

@@ -56,7 +56,7 @@ class HoodieMergeOnReadRDD(@transient sc: SparkContext,
tableState: HoodieTableState,
tableSchema: HoodieTableSchema,
requiredSchema: HoodieTableSchema,
@transient fileSplits: List[HoodieMergeOnReadFileSplit])
@transient fileSplits: Seq[HoodieMergeOnReadFileSplit])
extends HoodieUnsafeRDD(sc) {
private val confBroadcast = sc.broadcast(new SerializableWritable(config))

View File

@@ -20,65 +20,134 @@ package org.apache.hudi
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{GlobPattern, Path}
import org.apache.hudi.HoodieBaseRelation.createBaseFileReader
import org.apache.hudi.common.model.HoodieRecord
import org.apache.hudi.HoodieConversionUtils.toScalaOption
import org.apache.hudi.common.fs.FSUtils.getRelativePartitionPath
import org.apache.hudi.common.model.{FileSlice, HoodieRecord}
import org.apache.hudi.common.table.HoodieTableMetaClient
import org.apache.hudi.common.table.timeline.{HoodieInstant, HoodieTimeline}
import org.apache.hudi.common.table.view.HoodieTableFileSystemView
import org.apache.hudi.common.util.StringUtils
import org.apache.hudi.exception.HoodieException
import org.apache.hudi.hadoop.utils.HoodieInputFormatUtils.{getCommitMetadata, getWritePartitionPaths, listAffectedFilesForCommits}
import org.apache.hudi.hadoop.utils.HoodieRealtimeRecordReaderUtils.getMaxCompactionMemoryInBytes
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{Row, SQLContext}
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.execution.datasources.PartitionedFile
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.catalyst.expressions.Expression
import org.apache.spark.sql.sources._
import org.apache.spark.sql.types.StructType
import scala.collection.JavaConversions._
import scala.collection.JavaConverters._
import scala.collection.immutable
/**
* Experimental.
* Relation, that implements the Hoodie incremental view for Merge On Read table.
*
* @Experimental
*/
class MergeOnReadIncrementalRelation(sqlContext: SQLContext,
val optParams: Map[String, String],
val userSchema: Option[StructType],
val metaClient: HoodieTableMetaClient)
extends HoodieBaseRelation(sqlContext, metaClient, optParams, userSchema) {
optParams: Map[String, String],
userSchema: Option[StructType],
metaClient: HoodieTableMetaClient)
extends MergeOnReadSnapshotRelation(sqlContext, optParams, userSchema, Seq(), metaClient) with HoodieIncrementalRelationTrait {
private val commitTimeline = metaClient.getCommitsAndCompactionTimeline.filterCompletedInstants()
if (commitTimeline.empty()) {
throw new HoodieException("No instants to incrementally pull")
}
if (!optParams.contains(DataSourceReadOptions.BEGIN_INSTANTTIME.key)) {
throw new HoodieException(s"Specify the begin instant time to pull from using " +
s"option ${DataSourceReadOptions.BEGIN_INSTANTTIME.key}")
}
if (!metaClient.getTableConfig.populateMetaFields()) {
throw new HoodieException("Incremental queries are not supported when meta fields are disabled")
override type FileSplit = HoodieMergeOnReadFileSplit
override protected def timeline: HoodieTimeline = {
val startTimestamp = optParams(DataSourceReadOptions.BEGIN_INSTANTTIME.key)
val endTimestamp = optParams.getOrElse(DataSourceReadOptions.END_INSTANTTIME.key, super.timeline.lastInstant().get.getTimestamp)
super.timeline.findInstantsInRange(startTimestamp, endTimestamp)
}
private val lastInstant = commitTimeline.lastInstant().get()
private val mergeType = optParams.getOrElse(
DataSourceReadOptions.REALTIME_MERGE.key,
DataSourceReadOptions.REALTIME_MERGE.defaultValue)
protected override def composeRDD(fileSplits: Seq[HoodieMergeOnReadFileSplit],
partitionSchema: StructType,
tableSchema: HoodieTableSchema,
requiredSchema: HoodieTableSchema,
filters: Array[Filter]): HoodieMergeOnReadRDD = {
val fullSchemaParquetReader = createBaseFileReader(
spark = sqlContext.sparkSession,
partitionSchema = partitionSchema,
tableSchema = tableSchema,
requiredSchema = tableSchema,
// This file-reader is used to read base file records, subsequently merging them with the records
// stored in delta-log files. As such, we have to read _all_ records from the base file, while avoiding
// applying any user-defined filtering _before_ we complete combining them w/ delta-log records (to make sure that
// we combine them correctly)
//
// The only filtering applicable here is the filtering to make sure we're only fetching records that
// fall into incremental span of the timeline being queried
filters = incrementalSpanRecordFilters,
options = optParams,
// NOTE: We have to fork the Hadoop Config here as Spark will be modifying it
// to configure Parquet reader appropriately
hadoopConf = new Configuration(conf)
)
private val commitsTimelineToReturn = commitTimeline.findInstantsInRange(
optParams(DataSourceReadOptions.BEGIN_INSTANTTIME.key),
optParams.getOrElse(DataSourceReadOptions.END_INSTANTTIME.key, lastInstant.getTimestamp))
logDebug(s"${commitsTimelineToReturn.getInstants.iterator().toList.map(f => f.toString).mkString(",")}")
private val commitsToReturn = commitsTimelineToReturn.getInstants.iterator().toList
val requiredSchemaParquetReader = createBaseFileReader(
spark = sqlContext.sparkSession,
partitionSchema = partitionSchema,
tableSchema = tableSchema,
requiredSchema = requiredSchema,
filters = filters ++ incrementalSpanRecordFilters,
options = optParams,
// NOTE: We have to fork the Hadoop Config here as Spark will be modifying it
// to configure Parquet reader appropriately
hadoopConf = new Configuration(conf)
)
private val maxCompactionMemoryInBytes = getMaxCompactionMemoryInBytes(jobConf)
val hoodieTableState = HoodieTableState(HoodieRecord.RECORD_KEY_METADATA_FIELD, preCombineFieldOpt)
private val fileIndex = if (commitsToReturn.isEmpty) List() else buildFileIndex()
// TODO(HUDI-3639) implement incremental span record filtering w/in RDD to make sure returned iterator is appropriately
// filtered, since file-reader might not be capable to perform filtering
new HoodieMergeOnReadRDD(sqlContext.sparkContext, jobConf, fullSchemaParquetReader,
requiredSchemaParquetReader, hoodieTableState, tableSchema, requiredSchema, fileSplits)
}
override protected def collectFileSplits(partitionFilters: Seq[Expression], dataFilters: Seq[Expression]): List[HoodieMergeOnReadFileSplit] = {
if (includedCommits.isEmpty) {
List()
} else {
val latestCommit = includedCommits.last.getTimestamp
val commitsMetadata = includedCommits.map(getCommitMetadata(_, timeline)).asJava
val modifiedFiles = listAffectedFilesForCommits(conf, new Path(metaClient.getBasePath), commitsMetadata)
val fsView = new HoodieTableFileSystemView(metaClient, timeline, modifiedFiles)
val modifiedPartitions = getWritePartitionPaths(commitsMetadata)
val fileSlices = modifiedPartitions.asScala.flatMap { relativePartitionPath =>
fsView.getLatestMergedFileSlicesBeforeOrOn(relativePartitionPath, latestCommit).iterator().asScala
}.toSeq
buildSplits(filterFileSlices(fileSlices, globPattern))
}
}
private def filterFileSlices(fileSlices: Seq[FileSlice], pathGlobPattern: String): Seq[FileSlice] = {
val filteredFileSlices = if (!StringUtils.isNullOrEmpty(pathGlobPattern)) {
val globMatcher = new GlobPattern("*" + pathGlobPattern)
fileSlices.filter(fileSlice => {
val path = toScalaOption(fileSlice.getBaseFile).map(_.getPath)
.orElse(toScalaOption(fileSlice.getLatestLogFile).map(_.getPath.toString))
.get
globMatcher.matches(path)
})
} else {
fileSlices
}
filteredFileSlices
}
}
trait HoodieIncrementalRelationTrait extends HoodieBaseRelation {
// Validate this Incremental implementation is properly configured
validate()
protected lazy val includedCommits: immutable.Seq[HoodieInstant] = timeline.getInstants.iterator().asScala.toList
// Record filters making sure that only records w/in the requested bounds are being fetched as part of the
// scan collected by this relation
private lazy val incrementalSpanRecordsFilters: Seq[Filter] = {
protected lazy val incrementalSpanRecordFilters: Seq[Filter] = {
val isNotNullFilter = IsNotNull(HoodieRecord.COMMIT_TIME_METADATA_FIELD)
val largerThanFilter = GreaterThanOrEqual(HoodieRecord.COMMIT_TIME_METADATA_FIELD, commitsToReturn.head.getTimestamp)
val lessThanFilter = LessThanOrEqual(HoodieRecord.COMMIT_TIME_METADATA_FIELD, commitsToReturn.last.getTimestamp)
val largerThanFilter = GreaterThanOrEqual(HoodieRecord.COMMIT_TIME_METADATA_FIELD, includedCommits.head.getTimestamp)
val lessThanFilter = LessThanOrEqual(HoodieRecord.COMMIT_TIME_METADATA_FIELD, includedCommits.last.getTimestamp)
Seq(isNotNullFilter, largerThanFilter, lessThanFilter)
}
@@ -89,132 +158,23 @@ class MergeOnReadIncrementalRelation(sqlContext: SQLContext,
preCombineFieldOpt.map(Seq(_)).getOrElse(Seq())
}
override def doBuildScan(requiredColumns: Array[String], filters: Array[Filter]): RDD[InternalRow] = {
if (fileIndex.isEmpty) {
sqlContext.sparkContext.emptyRDD[InternalRow]
} else {
logDebug(s"buildScan requiredColumns = ${requiredColumns.mkString(",")}")
logDebug(s"buildScan filters = ${filters.mkString(",")}")
protected def validate(): Unit = {
if (super.timeline.empty()) {
throw new HoodieException("No instants to incrementally pull")
}
// config to ensure the push down filter for parquet will be applied.
sqlContext.sparkSession.sessionState.conf.setConfString("spark.sql.parquet.filterPushdown", "true")
sqlContext.sparkSession.sessionState.conf.setConfString("spark.sql.parquet.recordLevelFilter.enabled", "true")
sqlContext.sparkSession.sessionState.conf.setConfString("spark.sql.parquet.enableVectorizedReader", "false")
if (!this.optParams.contains(DataSourceReadOptions.BEGIN_INSTANTTIME.key)) {
throw new HoodieException(s"Specify the begin instant time to pull from using " +
s"option ${DataSourceReadOptions.BEGIN_INSTANTTIME.key}")
}
val fetchedColumns: Array[String] = appendMandatoryColumns(requiredColumns)
val (requiredAvroSchema, requiredStructSchema) =
HoodieSparkUtils.getRequiredSchema(tableAvroSchema, fetchedColumns)
val partitionSchema = StructType(Nil)
val tableSchema = HoodieTableSchema(tableStructSchema, tableAvroSchema.toString)
val requiredSchema = HoodieTableSchema(requiredStructSchema, requiredAvroSchema.toString)
val fullSchemaParquetReader = createBaseFileReader(
spark = sqlContext.sparkSession,
partitionSchema = partitionSchema,
tableSchema = tableSchema,
requiredSchema = tableSchema,
// This file-reader is used to read base file records, subsequently merging them with the records
// stored in delta-log files. As such, we have to read _all_ records from the base file, while avoiding
// applying any user-defined filtering _before_ we complete combining them w/ delta-log records (to make sure that
// we combine them correctly)
//
// The only filtering applicable here is the filtering to make sure we're only fetching records that
// fall into incremental span of the timeline being queried
filters = incrementalSpanRecordsFilters,
options = optParams,
// NOTE: We have to fork the Hadoop Config here as Spark will be modifying it
// to configure Parquet reader appropriately
hadoopConf = new Configuration(conf)
)
val requiredSchemaParquetReader = createBaseFileReader(
spark = sqlContext.sparkSession,
partitionSchema = partitionSchema,
tableSchema = tableSchema,
requiredSchema = requiredSchema,
filters = filters ++ incrementalSpanRecordsFilters,
options = optParams,
// NOTE: We have to fork the Hadoop Config here as Spark will be modifying it
// to configure Parquet reader appropriately
hadoopConf = new Configuration(conf)
)
val hoodieTableState = HoodieTableState(HoodieRecord.RECORD_KEY_METADATA_FIELD, preCombineFieldOpt)
// TODO implement incremental span record filtering w/in RDD to make sure returned iterator is appropriately
// filtered, since file-reader might not be capable to perform filtering
new HoodieMergeOnReadRDD(
sqlContext.sparkContext,
jobConf,
fullSchemaParquetReader,
requiredSchemaParquetReader,
hoodieTableState,
tableSchema,
requiredSchema,
fileIndex
)
if (!this.tableConfig.populateMetaFields()) {
throw new HoodieException("Incremental queries are not supported when meta fields are disabled")
}
}
def buildFileIndex(): List[HoodieMergeOnReadFileSplit] = {
val metadataList = commitsToReturn.map(instant => getCommitMetadata(instant, commitsTimelineToReturn))
val affectedFileStatus = listAffectedFilesForCommits(conf, new Path(metaClient.getBasePath), metadataList)
val fsView = new HoodieTableFileSystemView(metaClient, commitsTimelineToReturn, affectedFileStatus)
protected def globPattern: String =
optParams.getOrElse(DataSourceReadOptions.INCR_PATH_GLOB.key, DataSourceReadOptions.INCR_PATH_GLOB.defaultValue)
// Iterate partitions to create splits
val fileGroups = getWritePartitionPaths(metadataList).flatMap(partitionPath =>
fsView.getAllFileGroups(partitionPath).iterator()
).toList
val latestCommit = fsView.getLastInstant.get.getTimestamp
if (log.isDebugEnabled) {
fileGroups.foreach(f => logDebug(s"current file group id: " +
s"${f.getFileGroupId} and file slices ${f.getLatestFileSlice.get.toString}"))
}
// Filter files based on user defined glob pattern
val pathGlobPattern = optParams.getOrElse(
DataSourceReadOptions.INCR_PATH_GLOB.key,
DataSourceReadOptions.INCR_PATH_GLOB.defaultValue)
val filteredFileGroup = if (!pathGlobPattern.equals(DataSourceReadOptions.INCR_PATH_GLOB.defaultValue)) {
val globMatcher = new GlobPattern("*" + pathGlobPattern)
fileGroups.filter(fg => {
val latestFileSlice = fg.getLatestFileSlice.get
if (latestFileSlice.getBaseFile.isPresent) {
globMatcher.matches(latestFileSlice.getBaseFile.get.getPath)
} else {
globMatcher.matches(latestFileSlice.getLatestLogFile.get.getPath.toString)
}
})
} else {
fileGroups
}
// Build HoodieMergeOnReadFileSplit.
filteredFileGroup.map(f => {
// Ensure get the base file when there is a pending compaction, which means the base file
// won't be in the latest file slice.
val baseFiles = f.getAllFileSlices.iterator().filter(slice => slice.getBaseFile.isPresent).toList
val partitionedFile = if (baseFiles.nonEmpty) {
val baseFile = baseFiles.head.getBaseFile
val filePath = MergeOnReadSnapshotRelation.getFilePath(baseFile.get.getFileStatus.getPath)
Option(PartitionedFile(InternalRow.empty, filePath, 0, baseFile.get.getFileLen))
}
else {
Option.empty
}
val logPath = if (f.getLatestFileSlice.isPresent) {
// If log path doesn't exist, we still include an empty path to avoid using
// the default parquet reader to ensure the push down filter will be applied.
Option(f.getLatestFileSlice.get().getLogFiles.iterator().toList)
}
else {
Option.empty
}
HoodieMergeOnReadFileSplit(partitionedFile, logPath,
latestCommit, metaClient.getBasePath, maxCompactionMemoryInBytes, mergeType)
})
}
}

View File

@@ -21,16 +21,18 @@ package org.apache.hudi
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.apache.hudi.HoodieBaseRelation.createBaseFileReader
import org.apache.hudi.common.model.{HoodieLogFile, HoodieRecord}
import org.apache.hudi.HoodieConversionUtils.toScalaOption
import org.apache.hudi.MergeOnReadSnapshotRelation.getFilePath
import org.apache.hudi.common.fs.FSUtils.getRelativePartitionPath
import org.apache.hudi.common.model.{FileSlice, HoodieLogFile}
import org.apache.hudi.common.table.HoodieTableMetaClient
import org.apache.hudi.common.table.view.HoodieTableFileSystemView
import org.apache.hudi.hadoop.utils.HoodieRealtimeInputFormatUtils
import org.apache.hudi.hadoop.utils.HoodieRealtimeRecordReaderUtils.getMaxCompactionMemoryInBytes
import org.apache.spark.rdd.RDD
import org.apache.spark.execution.datasources.HoodieInMemoryFileIndex
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.Expression
import org.apache.spark.sql.execution.datasources.{FileStatusCache, PartitionedFile}
import org.apache.spark.sql.execution.datasources.PartitionedFile
import org.apache.spark.sql.sources.Filter
import org.apache.spark.sql.types.StructType
@@ -41,43 +43,28 @@ case class HoodieMergeOnReadFileSplit(dataFile: Option[PartitionedFile],
latestCommit: String,
tablePath: String,
maxCompactionMemoryInBytes: Long,
mergeType: String)
mergeType: String) extends HoodieFileSplit
class MergeOnReadSnapshotRelation(sqlContext: SQLContext,
optParams: Map[String, String],
val userSchema: Option[StructType],
val globPaths: Seq[Path],
val metaClient: HoodieTableMetaClient)
userSchema: Option[StructType],
globPaths: Seq[Path],
metaClient: HoodieTableMetaClient)
extends HoodieBaseRelation(sqlContext, metaClient, optParams, userSchema) {
override type FileSplit = HoodieMergeOnReadFileSplit
private val mergeType = optParams.getOrElse(
DataSourceReadOptions.REALTIME_MERGE.key,
DataSourceReadOptions.REALTIME_MERGE.defaultValue)
private val maxCompactionMemoryInBytes = getMaxCompactionMemoryInBytes(jobConf)
override def doBuildScan(requiredColumns: Array[String], filters: Array[Filter]): RDD[InternalRow] = {
log.debug(s" buildScan requiredColumns = ${requiredColumns.mkString(",")}")
log.debug(s" buildScan filters = ${filters.mkString(",")}")
// NOTE: In case list of requested columns doesn't contain the Primary Key one, we
// have to add it explicitly so that
// - Merging could be performed correctly
// - In case 0 columns are to be fetched (for ex, when doing {@code count()} on Spark's [[Dataset]],
// Spark still fetches all the rows to execute the query correctly
//
// It's okay to return columns that have not been requested by the caller, as those nevertheless will be
// filtered out upstream
val fetchedColumns: Array[String] = appendMandatoryColumns(requiredColumns)
val (requiredAvroSchema, requiredStructSchema) =
HoodieSparkUtils.getRequiredSchema(tableAvroSchema, fetchedColumns)
val fileIndex = buildFileIndex(filters)
val partitionSchema = StructType(Nil)
val tableSchema = HoodieTableSchema(tableStructSchema, tableAvroSchema.toString)
val requiredSchema = HoodieTableSchema(requiredStructSchema, requiredAvroSchema.toString)
protected override def composeRDD(fileIndex: Seq[HoodieMergeOnReadFileSplit],
partitionSchema: StructType,
tableSchema: HoodieTableSchema,
requiredSchema: HoodieTableSchema,
filters: Array[Filter]): HoodieMergeOnReadRDD = {
val fullSchemaParquetReader = createBaseFileReader(
spark = sqlContext.sparkSession,
partitionSchema = partitionSchema,
@@ -93,6 +80,7 @@ class MergeOnReadSnapshotRelation(sqlContext: SQLContext,
// to configure Parquet reader appropriately
hadoopConf = new Configuration(conf)
)
val requiredSchemaParquetReader = createBaseFileReader(
spark = sqlContext.sparkSession,
partitionSchema = partitionSchema,
@@ -111,92 +99,55 @@ class MergeOnReadSnapshotRelation(sqlContext: SQLContext,
requiredSchemaParquetReader, tableState, tableSchema, requiredSchema, fileIndex)
}
def buildFileIndex(filters: Array[Filter]): List[HoodieMergeOnReadFileSplit] = {
if (globPaths.nonEmpty) {
// Load files from the global paths if it has defined to be compatible with the original mode
val inMemoryFileIndex = HoodieSparkUtils.createInMemoryFileIndex(sqlContext.sparkSession, globPaths)
val fsView = new HoodieTableFileSystemView(metaClient,
// file-slice after pending compaction-requested instant-time is also considered valid
metaClient.getCommitsAndCompactionTimeline.filterCompletedAndCompactionInstants,
inMemoryFileIndex.allFiles().toArray)
val partitionPaths = fsView.getLatestBaseFiles.iterator().asScala.toList.map(_.getFileStatus.getPath.getParent)
protected override def collectFileSplits(partitionFilters: Seq[Expression], dataFilters: Seq[Expression]): List[HoodieMergeOnReadFileSplit] = {
val convertedPartitionFilters =
HoodieFileIndex.convertFilterForTimestampKeyGenerator(metaClient, partitionFilters)
if (partitionPaths.isEmpty) { // If this an empty table, return an empty split list.
List.empty[HoodieMergeOnReadFileSplit]
} else {
val lastInstant = metaClient.getActiveTimeline.getCommitsTimeline.filterCompletedInstants.lastInstant()
if (!lastInstant.isPresent) { // Return empty list if the table has no commit
List.empty
} else {
val queryInstant = specifiedQueryInstant.getOrElse(lastInstant.get().getTimestamp)
val baseAndLogsList = HoodieRealtimeInputFormatUtils.groupLogsByBaseFile(conf, partitionPaths.asJava).asScala
val fileSplits = baseAndLogsList.map(kv => {
val baseFile = kv.getLeft
val logPaths = if (kv.getRight.isEmpty) Option.empty else Option(kv.getRight.asScala.toList)
val baseDataPath = if (baseFile.isPresent) {
Some(PartitionedFile(
InternalRow.empty,
MergeOnReadSnapshotRelation.getFilePath(baseFile.get.getFileStatus.getPath),
0, baseFile.get.getFileLen)
)
} else {
None
}
HoodieMergeOnReadFileSplit(baseDataPath, logPaths, queryInstant,
metaClient.getBasePath, maxCompactionMemoryInBytes, mergeType)
}).toList
fileSplits
}
}
if (globPaths.isEmpty) {
val fileSlices = fileIndex.listFileSlices(convertedPartitionFilters)
buildSplits(fileSlices.values.flatten.toSeq)
} else {
// Load files by the HoodieFileIndex.
val hoodieFileIndex = HoodieFileIndex(sqlContext.sparkSession, metaClient,
Some(tableStructSchema), optParams, FileStatusCache.getOrCreate(sqlContext.sparkSession))
// Get partition filter and convert to catalyst expression
val partitionColumns = hoodieFileIndex.partitionSchema.fieldNames.toSet
val partitionFilters = filters.filter(f => f.references.forall(p => partitionColumns.contains(p)))
val partitionFilterExpression =
HoodieSparkUtils.convertToCatalystExpression(partitionFilters, tableStructSchema)
val convertedPartitionFilterExpression =
HoodieFileIndex.convertFilterForTimestampKeyGenerator(metaClient, partitionFilterExpression.toSeq)
// If convert success to catalyst expression, use the partition prune
val fileSlices = if (convertedPartitionFilterExpression.nonEmpty) {
hoodieFileIndex.listFileSlices(convertedPartitionFilterExpression)
} else {
hoodieFileIndex.listFileSlices(Seq.empty[Expression])
}
if (fileSlices.isEmpty) {
// If this an empty table, return an empty split list.
// TODO refactor to avoid iterating over listed files multiple times
val partitions = listLatestBaseFiles(globPaths, convertedPartitionFilters, dataFilters)
val partitionPaths = partitions.keys.toSeq
if (partitionPaths.isEmpty || latestInstant.isEmpty) {
// If this an empty table OR it has no completed commits yet, return
List.empty[HoodieMergeOnReadFileSplit]
} else {
val fileSplits = fileSlices.values.flatten.map(fileSlice => {
val latestInstant = metaClient.getActiveTimeline.getCommitsTimeline
.filterCompletedInstants.lastInstant().get().getTimestamp
val queryInstant = specifiedQueryInstant.getOrElse(latestInstant)
val partitionedFile = if (fileSlice.getBaseFile.isPresent) {
val baseFile = fileSlice.getBaseFile.get()
val baseFilePath = MergeOnReadSnapshotRelation.getFilePath(baseFile.getFileStatus.getPath)
Option(PartitionedFile(InternalRow.empty, baseFilePath, 0, baseFile.getFileLen))
} else {
Option.empty
}
val logPaths = fileSlice.getLogFiles.sorted(HoodieLogFile.getLogFileComparator).iterator().asScala.toList
val logPathsOptional = if (logPaths.isEmpty) Option.empty else Option(logPaths)
HoodieMergeOnReadFileSplit(partitionedFile, logPathsOptional, queryInstant, metaClient.getBasePath,
maxCompactionMemoryInBytes, mergeType)
}).toList
fileSplits
val fileSlices = listFileSlices(partitionPaths)
buildSplits(fileSlices)
}
}
}
protected def buildSplits(fileSlices: Seq[FileSlice]): List[HoodieMergeOnReadFileSplit] = {
fileSlices.map { fileSlice =>
val baseFile = toScalaOption(fileSlice.getBaseFile)
val logFiles = Option(fileSlice.getLogFiles.sorted(HoodieLogFile.getLogFileComparator).iterator().asScala.toList)
val partitionedBaseFile = baseFile.map { file =>
val filePath = getFilePath(file.getFileStatus.getPath)
PartitionedFile(InternalRow.empty, filePath, 0, file.getFileLen)
}
HoodieMergeOnReadFileSplit(partitionedBaseFile, logFiles, queryTimestamp.get,
metaClient.getBasePath, maxCompactionMemoryInBytes, mergeType)
}.toList
}
private def listFileSlices(partitionPaths: Seq[Path]): Seq[FileSlice] = {
// NOTE: It's critical for us to re-use [[InMemoryFileIndex]] to make sure we're leveraging
// [[FileStatusCache]] and avoid listing the whole table again
val inMemoryFileIndex = HoodieInMemoryFileIndex.create(sparkSession, partitionPaths)
val fsView = new HoodieTableFileSystemView(metaClient, timeline, inMemoryFileIndex.allFiles.toArray)
val queryTimestamp = this.queryTimestamp.get
partitionPaths.flatMap { partitionPath =>
val relativePath = getRelativePartitionPath(new Path(basePath), partitionPath)
fsView.getLatestMergedFileSlicesBeforeOrOn(relativePath, queryTimestamp).iterator().asScala.toSeq
}
}
}
object MergeOnReadSnapshotRelation {

View File

@@ -38,7 +38,6 @@ import org.apache.spark.sql.types.{StringType, StructField, StructType}
import org.apache.spark.unsafe.types.UTF8String
import scala.collection.JavaConverters._
import scala.language.implicitConversions
/**
* Implementation of the [[BaseHoodieTableFileIndex]] for Spark
@@ -135,12 +134,14 @@ class SparkHoodieTableFileIndex(spark: SparkSession,
* Fetch list of latest base files w/ corresponding log files, after performing
* partition pruning
*
* TODO unify w/ HoodieFileIndex#listFiles
*
* @param partitionFilters partition column filters
* @return mapping from string partition paths to its base/log files
*/
def listFileSlices(partitionFilters: Seq[Expression]): Map[String, Seq[FileSlice]] = {
// Prune the partition path by the partition filters
val prunedPartitions = prunePartition(cachedAllInputFileSlices.asScala.keys.toSeq, partitionFilters)
val prunedPartitions = prunePartition(cachedAllInputFileSlices.keySet().asScala.toSeq, partitionFilters)
prunedPartitions.map(partition => {
(partition.path, cachedAllInputFileSlices.get(partition).asScala)
}).toMap

View File

@@ -0,0 +1,370 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.viewfs.ViewFileSystem
import org.apache.hadoop.fs._
import org.apache.hadoop.hdfs.DistributedFileSystem
import org.apache.spark.internal.Logging
import org.apache.spark.metrics.source.HiveCatalogMetrics
import org.apache.spark.util.SerializableConfiguration
import java.io.FileNotFoundException
import scala.collection.mutable
/**
* NOTE: This method class is replica of HadoopFSUtils from Spark 3.2.1, with the following adjustments
*
* - Filtering out of the listed files is adjusted to include files starting w/ "." (to include Hoodie Delta Log
* files)
*/
object HoodieHadoopFSUtils extends Logging {
/**
* Lists a collection of paths recursively. Picks the listing strategy adaptively depending
* on the number of paths to list.
*
* This may only be called on the driver.
*
* @param sc Spark context used to run parallel listing.
* @param paths Input paths to list
* @param hadoopConf Hadoop configuration
* @param filter Path filter used to exclude leaf files from result
* @param ignoreMissingFiles Ignore missing files that happen during recursive listing
* (e.g., due to race conditions)
* @param ignoreLocality Whether to fetch data locality info when listing leaf files. If false,
* this will return `FileStatus` without `BlockLocation` info.
* @param parallelismThreshold The threshold to enable parallelism. If the number of input paths
* is smaller than this value, this will fallback to use
* sequential listing.
* @param parallelismMax The maximum parallelism for listing. If the number of input paths is
* larger than this value, parallelism will be throttled to this value
* to avoid generating too many tasks.
* @return for each input path, the set of discovered files for the path
*/
def parallelListLeafFiles(sc: SparkContext,
paths: Seq[Path],
hadoopConf: Configuration,
filter: PathFilter,
ignoreMissingFiles: Boolean,
ignoreLocality: Boolean,
parallelismThreshold: Int,
parallelismMax: Int): Seq[(Path, Seq[FileStatus])] = {
parallelListLeafFilesInternal(sc, paths, hadoopConf, filter, isRootLevel = true,
ignoreMissingFiles, ignoreLocality, parallelismThreshold, parallelismMax)
}
// scalastyle:off parameter.number
private def parallelListLeafFilesInternal(sc: SparkContext,
paths: Seq[Path],
hadoopConf: Configuration,
filter: PathFilter,
isRootLevel: Boolean,
ignoreMissingFiles: Boolean,
ignoreLocality: Boolean,
parallelismThreshold: Int,
parallelismMax: Int): Seq[(Path, Seq[FileStatus])] = {
// Short-circuits parallel listing when serial listing is likely to be faster.
if (paths.size <= parallelismThreshold) {
// scalastyle:off return
return paths.map { path =>
val leafFiles = listLeafFiles(
path,
hadoopConf,
filter,
Some(sc),
ignoreMissingFiles = ignoreMissingFiles,
ignoreLocality = ignoreLocality,
isRootPath = isRootLevel,
parallelismThreshold = parallelismThreshold,
parallelismMax = parallelismMax)
(path, leafFiles)
}
// scalastyle:on return
}
logInfo(s"Listing leaf files and directories in parallel under ${paths.length} paths." +
s" The first several paths are: ${paths.take(10).mkString(", ")}.")
HiveCatalogMetrics.incrementParallelListingJobCount(1)
val serializableConfiguration = new SerializableConfiguration(hadoopConf)
val serializedPaths = paths.map(_.toString)
// Set the number of parallelism to prevent following file listing from generating many tasks
// in case of large #defaultParallelism.
val numParallelism = Math.min(paths.size, parallelismMax)
val previousJobDescription = sc.getLocalProperty(SparkContext.SPARK_JOB_DESCRIPTION)
val statusMap = try {
val description = paths.size match {
case 0 =>
"Listing leaf files and directories 0 paths"
case 1 =>
s"Listing leaf files and directories for 1 path:<br/>${paths(0)}"
case s =>
s"Listing leaf files and directories for $s paths:<br/>${paths(0)}, ..."
}
sc.setJobDescription(description)
sc
.parallelize(serializedPaths, numParallelism)
.mapPartitions { pathStrings =>
val hadoopConf = serializableConfiguration.value
pathStrings.map(new Path(_)).toSeq.map { path =>
val leafFiles = listLeafFiles(
path = path,
hadoopConf = hadoopConf,
filter = filter,
contextOpt = None, // Can't execute parallel scans on workers
ignoreMissingFiles = ignoreMissingFiles,
ignoreLocality = ignoreLocality,
isRootPath = isRootLevel,
parallelismThreshold = Int.MaxValue,
parallelismMax = 0)
(path, leafFiles)
}.iterator
}.map { case (path, statuses) =>
val serializableStatuses = statuses.map { status =>
// Turn FileStatus into SerializableFileStatus so we can send it back to the driver
val blockLocations = status match {
case f: LocatedFileStatus =>
f.getBlockLocations.map { loc =>
SerializableBlockLocation(
loc.getNames,
loc.getHosts,
loc.getOffset,
loc.getLength)
}
case _ =>
Array.empty[SerializableBlockLocation]
}
SerializableFileStatus(
status.getPath.toString,
status.getLen,
status.isDirectory,
status.getReplication,
status.getBlockSize,
status.getModificationTime,
status.getAccessTime,
blockLocations)
}
(path.toString, serializableStatuses)
}.collect()
} finally {
sc.setJobDescription(previousJobDescription)
}
// turn SerializableFileStatus back to Status
statusMap.map { case (path, serializableStatuses) =>
val statuses = serializableStatuses.map { f =>
val blockLocations = f.blockLocations.map { loc =>
new BlockLocation(loc.names, loc.hosts, loc.offset, loc.length)
}
new LocatedFileStatus(
new FileStatus(
f.length, f.isDir, f.blockReplication, f.blockSize, f.modificationTime,
new Path(f.path)),
blockLocations)
}
(new Path(path), statuses)
}
}
// scalastyle:on parameter.number
// scalastyle:off parameter.number
/**
* Lists a single filesystem path recursively. If a `SparkContext` object is specified, this
* function may launch Spark jobs to parallelize listing based on `parallelismThreshold`.
*
* If sessionOpt is None, this may be called on executors.
*
* @return all children of path that match the specified filter.
*/
private def listLeafFiles(path: Path,
hadoopConf: Configuration,
filter: PathFilter,
contextOpt: Option[SparkContext],
ignoreMissingFiles: Boolean,
ignoreLocality: Boolean,
isRootPath: Boolean,
parallelismThreshold: Int,
parallelismMax: Int): Seq[FileStatus] = {
logTrace(s"Listing $path")
val fs = path.getFileSystem(hadoopConf)
// Note that statuses only include FileStatus for the files and dirs directly under path,
// and does not include anything else recursively.
val statuses: Array[FileStatus] = try {
fs match {
// DistributedFileSystem overrides listLocatedStatus to make 1 single call to namenode
// to retrieve the file status with the file block location. The reason to still fallback
// to listStatus is because the default implementation would potentially throw a
// FileNotFoundException which is better handled by doing the lookups manually below.
case (_: DistributedFileSystem | _: ViewFileSystem) if !ignoreLocality =>
val remoteIter = fs.listLocatedStatus(path)
new Iterator[LocatedFileStatus]() {
def next(): LocatedFileStatus = remoteIter.next
def hasNext(): Boolean = remoteIter.hasNext
}.toArray
case _ => fs.listStatus(path)
}
} catch {
// If we are listing a root path for SQL (e.g. a top level directory of a table), we need to
// ignore FileNotFoundExceptions during this root level of the listing because
//
// (a) certain code paths might construct an InMemoryFileIndex with root paths that
// might not exist (i.e. not all callers are guaranteed to have checked
// path existence prior to constructing InMemoryFileIndex) and,
// (b) we need to ignore deleted root paths during REFRESH TABLE, otherwise we break
// existing behavior and break the ability drop SessionCatalog tables when tables'
// root directories have been deleted (which breaks a number of Spark's own tests).
//
// If we are NOT listing a root path then a FileNotFoundException here means that the
// directory was present in a previous level of file listing but is absent in this
// listing, likely indicating a race condition (e.g. concurrent table overwrite or S3
// list inconsistency).
//
// The trade-off in supporting existing behaviors / use-cases is that we won't be
// able to detect race conditions involving root paths being deleted during
// InMemoryFileIndex construction. However, it's still a net improvement to detect and
// fail-fast on the non-root cases. For more info see the SPARK-27676 review discussion.
case _: FileNotFoundException if isRootPath || ignoreMissingFiles =>
logWarning(s"The directory $path was not found. Was it deleted very recently?")
Array.empty[FileStatus]
}
val filteredStatuses =
statuses.filterNot(status => shouldFilterOutPathName(status.getPath.getName))
val allLeafStatuses = {
val (dirs, topLevelFiles) = filteredStatuses.partition(_.isDirectory)
val nestedFiles: Seq[FileStatus] = contextOpt match {
case Some(context) if dirs.size > parallelismThreshold =>
parallelListLeafFilesInternal(
context,
dirs.map(_.getPath),
hadoopConf = hadoopConf,
filter = filter,
isRootLevel = false,
ignoreMissingFiles = ignoreMissingFiles,
ignoreLocality = ignoreLocality,
parallelismThreshold = parallelismThreshold,
parallelismMax = parallelismMax
).flatMap(_._2)
case _ =>
dirs.flatMap { dir =>
listLeafFiles(
path = dir.getPath,
hadoopConf = hadoopConf,
filter = filter,
contextOpt = contextOpt,
ignoreMissingFiles = ignoreMissingFiles,
ignoreLocality = ignoreLocality,
isRootPath = false,
parallelismThreshold = parallelismThreshold,
parallelismMax = parallelismMax)
}
}
val allFiles = topLevelFiles ++ nestedFiles
if (filter != null) allFiles.filter(f => filter.accept(f.getPath)) else allFiles
}
val missingFiles = mutable.ArrayBuffer.empty[String]
val resolvedLeafStatuses = allLeafStatuses.flatMap {
case f: LocatedFileStatus =>
Some(f)
// NOTE:
//
// - Although S3/S3A/S3N file system can be quite slow for remote file metadata
// operations, calling `getFileBlockLocations` does no harm here since these file system
// implementations don't actually issue RPC for this method.
//
// - Here we are calling `getFileBlockLocations` in a sequential manner, but it should not
// be a big deal since we always use to `parallelListLeafFiles` when the number of
// paths exceeds threshold.
case f if !ignoreLocality =>
// The other constructor of LocatedFileStatus will call FileStatus.getPermission(),
// which is very slow on some file system (RawLocalFileSystem, which is launch a
// subprocess and parse the stdout).
try {
val locations = fs.getFileBlockLocations(f, 0, f.getLen).map { loc =>
// Store BlockLocation objects to consume less memory
if (loc.getClass == classOf[BlockLocation]) {
loc
} else {
new BlockLocation(loc.getNames, loc.getHosts, loc.getOffset, loc.getLength)
}
}
val lfs = new LocatedFileStatus(f.getLen, f.isDirectory, f.getReplication, f.getBlockSize,
f.getModificationTime, 0, null, null, null, null, f.getPath, locations)
if (f.isSymlink) {
lfs.setSymlink(f.getSymlink)
}
Some(lfs)
} catch {
case _: FileNotFoundException if ignoreMissingFiles =>
missingFiles += f.getPath.toString
None
}
case f => Some(f)
}
if (missingFiles.nonEmpty) {
logWarning(
s"the following files were missing during file scan:\n ${missingFiles.mkString("\n ")}")
}
resolvedLeafStatuses
}
// scalastyle:on parameter.number
/** A serializable variant of HDFS's BlockLocation. This is required by Hadoop 2.7. */
private case class SerializableBlockLocation(names: Array[String],
hosts: Array[String],
offset: Long,
length: Long)
/** A serializable variant of HDFS's FileStatus. This is required by Hadoop 2.7. */
private case class SerializableFileStatus(path: String,
length: Long,
isDir: Boolean,
blockReplication: Short,
blockSize: Long,
modificationTime: Long,
accessTime: Long,
blockLocations: Array[SerializableBlockLocation])
/** Checks if we should filter out this path name. */
def shouldFilterOutPathName(pathName: String): Boolean = {
// We filter follow paths:
// 1. everything that starts with _ and ., except _common_metadata and _metadata
// because Parquet needs to find those metadata files from leaf files returned by this method.
// We should refactor this logic to not mix metadata files with data files.
// 2. everything that ends with `._COPYING_`, because this is a intermediate state of file. we
// should skip this file in case of double reading.
val exclude = (pathName.startsWith("_") && !pathName.contains("=")) || pathName.endsWith("._COPYING_")
val include = pathName.startsWith("_common_metadata") || pathName.startsWith("_metadata")
exclude && !include
}
}

View File

@@ -0,0 +1,102 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.execution.datasources
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileStatus, Path, PathFilter}
import org.apache.hadoop.mapred.{FileInputFormat, JobConf}
import org.apache.spark.HoodieHadoopFSUtils
import org.apache.spark.metrics.source.HiveCatalogMetrics
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.execution.datasources._
import org.apache.spark.sql.types.StructType
import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer
class HoodieInMemoryFileIndex(sparkSession: SparkSession,
rootPathsSpecified: Seq[Path],
parameters: Map[String, String],
userSpecifiedSchema: Option[StructType],
fileStatusCache: FileStatusCache = NoopCache)
extends InMemoryFileIndex(sparkSession, rootPathsSpecified, parameters, userSpecifiedSchema, fileStatusCache) {
/**
* List leaf files of given paths. This method will submit a Spark job to do parallel
* listing whenever there is a path having more files than the parallel partition discovery threshold.
*
* This is publicly visible for testing.
*
* NOTE: This method replicates the one it overrides, however it uses custom method to run parallel
* listing that accepts files starting with "."
*/
override def listLeafFiles(paths: Seq[Path]): mutable.LinkedHashSet[FileStatus] = {
val startTime = System.nanoTime()
val output = mutable.LinkedHashSet[FileStatus]()
val pathsToFetch = mutable.ArrayBuffer[Path]()
for (path <- paths) {
fileStatusCache.getLeafFiles(path) match {
case Some(files) =>
HiveCatalogMetrics.incrementFileCacheHits(files.length)
output ++= files
case None =>
pathsToFetch += path
}
() // for some reasons scalac 2.12 needs this; return type doesn't matter
}
val filter = FileInputFormat.getInputPathFilter(new JobConf(hadoopConf, this.getClass))
val discovered = bulkListLeafFiles(sparkSession, pathsToFetch, filter, hadoopConf)
discovered.foreach { case (path, leafFiles) =>
HiveCatalogMetrics.incrementFilesDiscovered(leafFiles.size)
fileStatusCache.putLeafFiles(path, leafFiles.toArray)
output ++= leafFiles
}
logInfo(s"It took ${(System.nanoTime() - startTime) / (1000 * 1000)} ms to list leaf files" +
s" for ${paths.length} paths.")
output
}
protected def bulkListLeafFiles(sparkSession: SparkSession, paths: ArrayBuffer[Path], filter: PathFilter, hadoopConf: Configuration): Seq[(Path, Seq[FileStatus])] = {
HoodieHadoopFSUtils.parallelListLeafFiles(
sc = sparkSession.sparkContext,
paths = paths,
hadoopConf = hadoopConf,
filter = new PathFilterWrapper(filter),
ignoreMissingFiles = sparkSession.sessionState.conf.ignoreMissingFiles,
// NOTE: We're disabling fetching Block Info to speed up file listing
ignoreLocality = true,
parallelismThreshold = sparkSession.sessionState.conf.parallelPartitionDiscoveryThreshold,
parallelismMax = sparkSession.sessionState.conf.parallelPartitionDiscoveryParallelism)
}
}
object HoodieInMemoryFileIndex {
def create(sparkSession: SparkSession, globbedPaths: Seq[Path]): HoodieInMemoryFileIndex = {
val fileStatusCache = FileStatusCache.getOrCreate(sparkSession)
new HoodieInMemoryFileIndex(sparkSession, globbedPaths, Map(), Option.empty, fileStatusCache)
}
}
private class PathFilterWrapper(val filter: PathFilter) extends PathFilter with Serializable {
override def accept(path: Path): Boolean = {
(filter == null || filter.accept(path)) && !HoodieHadoopFSUtils.shouldFilterOutPathName(path.getName)
}
}

View File

@@ -0,0 +1,60 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.execution.datasources
import org.apache.hadoop.fs.Path
import org.apache.spark.sql.SparkSession
import org.junit.jupiter.api.Assertions.assertEquals
import org.junit.jupiter.api.Test
import org.junit.jupiter.api.io.TempDir
import java.io.File
import java.nio.file.Paths
class TestHoodieInMemoryFileIndex {
@Test
def testCreateInMemoryIndex(@TempDir tempDir: File): Unit = {
val spark = SparkSession.builder
.appName("Hoodie Datasource test")
.master("local[2]")
.config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.getOrCreate
val folders: Seq[Path] = Seq(
new Path(Paths.get(tempDir.getAbsolutePath, "folder1").toUri),
new Path(Paths.get(tempDir.getAbsolutePath, "folder2").toUri)
)
val files: Seq[Path] = Seq(
new Path(Paths.get(tempDir.getAbsolutePath, "folder1", "file1").toUri),
new Path(Paths.get(tempDir.getAbsolutePath, "folder1", "file2").toUri),
new Path(Paths.get(tempDir.getAbsolutePath, "folder2", "file3").toUri),
new Path(Paths.get(tempDir.getAbsolutePath, "folder2", "file4").toUri)
)
folders.foreach(folder => new File(folder.toUri).mkdir())
files.foreach(file => new File(file.toUri).createNewFile())
val index = HoodieInMemoryFileIndex.create(spark, Seq(folders(0), folders(1)))
val indexedFilePaths = index.allFiles().map(fs => fs.getPath)
assertEquals(files.sortWith(_.toString < _.toString), indexedFilePaths.sortWith(_.toString < _.toString))
spark.stop()
}
}

View File

@@ -88,35 +88,6 @@ class TestHoodieSparkUtils {
.sortWith(_.toString < _.toString), globbedPaths.sortWith(_.toString < _.toString))
}
@Test
def testCreateInMemoryIndex(@TempDir tempDir: File): Unit = {
val spark = SparkSession.builder
.appName("Hoodie Datasource test")
.master("local[2]")
.config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.getOrCreate
val folders: Seq[Path] = Seq(
new Path(Paths.get(tempDir.getAbsolutePath, "folder1").toUri),
new Path(Paths.get(tempDir.getAbsolutePath, "folder2").toUri)
)
val files: Seq[Path] = Seq(
new Path(Paths.get(tempDir.getAbsolutePath, "folder1", "file1").toUri),
new Path(Paths.get(tempDir.getAbsolutePath, "folder1", "file2").toUri),
new Path(Paths.get(tempDir.getAbsolutePath, "folder2", "file3").toUri),
new Path(Paths.get(tempDir.getAbsolutePath, "folder2", "file4").toUri)
)
folders.foreach(folder => new File(folder.toUri).mkdir())
files.foreach(file => new File(file.toUri).createNewFile())
val index = HoodieSparkUtils.createInMemoryFileIndex(spark, Seq(folders(0), folders(1)))
val indexedFilePaths = index.allFiles().map(fs => fs.getPath)
assertEquals(files.sortWith(_.toString < _.toString), indexedFilePaths.sortWith(_.toString < _.toString))
spark.stop()
}
@Test
def testCreateRddSchemaEvol(): Unit = {
val spark = SparkSession.builder

View File

@@ -112,9 +112,9 @@ class TestParquetColumnProjection extends SparkClientFunctionalTestHarness with
val fullColumnsReadStats: Array[(String, Long)] =
if (HoodieSparkUtils.isSpark3)
Array(
("rider", 14665),
("rider,driver", 14665),
("rider,driver,tip_history", 14665))
("rider", 14166),
("rider,driver", 14166),
("rider,driver,tip_history", 14166))
else if (HoodieSparkUtils.isSpark2)
// TODO re-enable tests (these tests are very unstable currently)
Array(
@@ -163,11 +163,29 @@ class TestParquetColumnProjection extends SparkClientFunctionalTestHarness with
else
fail("Only Spark 3 and Spark 2 are currently supported")
// Stats for the reads fetching _all_ columns (currently for MOR to be able to merge
// records properly full row has to be fetched; note, how amount of bytes read
// is invariant of the # of columns)
val fullColumnsReadStats: Array[(String, Long)] =
if (HoodieSparkUtils.isSpark3)
Array(
("rider", 14166),
("rider,driver", 14166),
("rider,driver,tip_history", 14166))
else if (HoodieSparkUtils.isSpark2)
// TODO re-enable tests (these tests are very unstable currently)
Array(
("rider", -1),
("rider,driver", -1),
("rider,driver,tip_history", -1))
else
fail("Only Spark 3 and Spark 2 are currently supported")
// Test MOR / Snapshot / Skip-merge
runTest(tableState, DataSourceReadOptions.QUERY_TYPE_SNAPSHOT_OPT_VAL, DataSourceReadOptions.REALTIME_SKIP_MERGE_OPT_VAL, projectedColumnsReadStats)
// Test MOR / Snapshot / Payload-combine
runTest(tableState, DataSourceReadOptions.QUERY_TYPE_SNAPSHOT_OPT_VAL, DataSourceReadOptions.REALTIME_PAYLOAD_COMBINE_OPT_VAL, projectedColumnsReadStats)
runTest(tableState, DataSourceReadOptions.QUERY_TYPE_SNAPSHOT_OPT_VAL, DataSourceReadOptions.REALTIME_PAYLOAD_COMBINE_OPT_VAL, fullColumnsReadStats)
// Test MOR / Read Optimized
runTest(tableState, DataSourceReadOptions.QUERY_TYPE_READ_OPTIMIZED_OPT_VAL, "null", projectedColumnsReadStats)
@@ -209,9 +227,9 @@ class TestParquetColumnProjection extends SparkClientFunctionalTestHarness with
val fullColumnsReadStats: Array[(String, Long)] =
if (HoodieSparkUtils.isSpark3)
Array(
("rider", 19683),
("rider,driver", 19683),
("rider,driver,tip_history", 19683))
("rider", 19684),
("rider,driver", 19684),
("rider,driver,tip_history", 19684))
else if (HoodieSparkUtils.isSpark2)
// TODO re-enable tests (these tests are very unstable currently)
Array(