1
0

[HUDI-4176] Fixing TableSchemaResolver to avoid repeated HoodieCommitMetadata parsing (#5733)

As has been outlined in HUDI-4176, we've hit a roadblock while testing Hudi on a large dataset (~1Tb) having pretty fat commits where Hudi's commit metadata could reach into 100s of Mbs.
Given the size some of ours commit metadata instances Spark's parsing and resolving phase (when spark.sql(...) is involved, but before returned Dataset is dereferenced) starts to dominate some of our queries' execution time.

- Rebased onto new APIs to avoid excessive Hadoop's Path allocations
- Eliminated hasOperationField completely to avoid repeatitive computations
- Cleaning up duplication in HoodieActiveTimeline
- Added caching for common instances of HoodieCommitMetadata
- Made tableStructSchema lazy;
This commit is contained in:
Alexey Kudinkin
2022-06-06 10:14:26 -07:00
committed by GitHub
parent 132c0aa8c7
commit 4f7ea8c79a
14 changed files with 318 additions and 326 deletions

View File

@@ -213,27 +213,18 @@ class DefaultSource extends RelationProvider
globPaths: Seq[Path],
userSchema: Option[StructType],
metaClient: HoodieTableMetaClient,
optParams: Map[String, String]) = {
optParams: Map[String, String]): BaseRelation = {
val baseRelation = new BaseFileOnlyRelation(sqlContext, metaClient, optParams, userSchema, globPaths)
val enableSchemaOnRead: Boolean = !tryFetchInternalSchema(metaClient).isEmptySchema
// NOTE: We fallback to [[HadoopFsRelation]] in all of the cases except ones requiring usage of
// [[BaseFileOnlyRelation]] to function correctly. This is necessary to maintain performance parity w/
// vanilla Spark, since some of the Spark optimizations are predicated on the using of [[HadoopFsRelation]].
//
// You can check out HUDI-3896 for more details
if (enableSchemaOnRead) {
if (baseRelation.hasSchemaOnRead) {
baseRelation
} else {
baseRelation.toHadoopFsRelation
}
}
private def tryFetchInternalSchema(metaClient: HoodieTableMetaClient) =
try {
new TableSchemaResolver(metaClient).getTableInternalSchemaFromCommitMetadata
.orElse(InternalSchema.getEmptyInternalSchema)
} catch {
case _: Exception => InternalSchema.getEmptyInternalSchema
}
}

View File

@@ -122,9 +122,13 @@ abstract class HoodieBaseRelation(val sqlContext: SQLContext,
optParams.get(DataSourceReadOptions.TIME_TRAVEL_AS_OF_INSTANT.key)
.map(HoodieSqlCommonUtils.formatQueryInstant)
/**
* NOTE: Initialization of teh following members is coupled on purpose to minimize amount of I/O
* required to fetch table's Avro and Internal schemas
*/
protected lazy val (tableAvroSchema: Schema, internalSchema: InternalSchema) = {
val schemaUtil = new TableSchemaResolver(metaClient)
val avroSchema = Try(schemaUtil.getTableAvroSchema) match {
val schemaResolver = new TableSchemaResolver(metaClient)
val avroSchema = Try(schemaResolver.getTableAvroSchema) match {
case Success(schema) => schema
case Failure(e) =>
logWarning("Failed to fetch schema from the table", e)
@@ -137,14 +141,14 @@ abstract class HoodieBaseRelation(val sqlContext: SQLContext,
}
// try to find internalSchema
val internalSchemaFromMeta = try {
schemaUtil.getTableInternalSchemaFromCommitMetadata.orElse(InternalSchema.getEmptyInternalSchema)
schemaResolver.getTableInternalSchemaFromCommitMetadata.orElse(InternalSchema.getEmptyInternalSchema)
} catch {
case _: Exception => InternalSchema.getEmptyInternalSchema
}
(avroSchema, internalSchemaFromMeta)
}
protected val tableStructSchema: StructType = AvroConversionUtils.convertAvroSchemaToStructType(tableAvroSchema)
protected lazy val tableStructSchema: StructType = AvroConversionUtils.convertAvroSchemaToStructType(tableAvroSchema)
protected val partitionColumns: Array[String] = tableConfig.getPartitionFields.orElse(Array.empty)
@@ -196,7 +200,7 @@ abstract class HoodieBaseRelation(val sqlContext: SQLContext,
* meaning that regardless of whether this columns are being requested by the query they will be fetched
* regardless so that relation is able to combine records properly (if necessary)
*
* @VisibleInTests
* @VisibleForTesting
*/
val mandatoryFields: Seq[String]
@@ -215,6 +219,11 @@ abstract class HoodieBaseRelation(val sqlContext: SQLContext,
protected def queryTimestamp: Option[String] =
specifiedQueryTimestamp.orElse(toScalaOption(timeline.lastInstant()).map(_.getTimestamp))
/**
* Returns true in case table supports Schema on Read (Schema Evolution)
*/
def hasSchemaOnRead: Boolean = !internalSchema.isEmptySchema
override def schema: StructType = tableStructSchema
/**

View File

@@ -58,9 +58,9 @@ class IncrementalRelation(val sqlContext: SQLContext,
private val log = LogManager.getLogger(classOf[IncrementalRelation])
val skeletonSchema: StructType = HoodieSparkUtils.getMetaSchema
private val basePath = metaClient.getBasePath
private val basePath = metaClient.getBasePathV2
// TODO : Figure out a valid HoodieWriteConfig
private val hoodieTable = HoodieSparkTable.create(HoodieWriteConfig.newBuilder().withPath(basePath).build(),
private val hoodieTable = HoodieSparkTable.create(HoodieWriteConfig.newBuilder().withPath(basePath.toString).build(),
new HoodieSparkEngineContext(new JavaSparkContext(sqlContext.sparkContext)),
metaClient)
private val commitTimeline = hoodieTable.getMetaClient.getCommitTimeline.filterCompletedInstants()
@@ -98,7 +98,7 @@ class IncrementalRelation(val sqlContext: SQLContext,
val tableSchema = if (useEndInstantSchema && iSchema.isEmptySchema) {
if (commitsToReturn.isEmpty) schemaResolver.getTableAvroSchemaWithoutMetadataFields() else
schemaResolver.getTableAvroSchemaWithoutMetadataFields(commitsToReturn.last)
schemaResolver.getTableAvroSchema(commitsToReturn.last, false)
} else {
schemaResolver.getTableAvroSchemaWithoutMetadataFields()
}
@@ -202,7 +202,7 @@ class IncrementalRelation(val sqlContext: SQLContext,
var doFullTableScan = false
if (fallbackToFullTableScan) {
val fs = new Path(basePath).getFileSystem(sqlContext.sparkContext.hadoopConfiguration);
val fs = basePath.getFileSystem(sqlContext.sparkContext.hadoopConfiguration);
val timer = new HoodieTimer().startTimer();
val allFilesToCheck = filteredMetaBootstrapFullPaths ++ filteredRegularFullPaths
@@ -223,7 +223,7 @@ class IncrementalRelation(val sqlContext: SQLContext,
val hudiDF = sqlContext.read
.format("hudi_v1")
.schema(usedSchema)
.load(basePath)
.load(basePath.toString)
.filter(String.format("%s > '%s'", HoodieRecord.COMMIT_TIME_METADATA_FIELD, //Notice the > in place of >= because we are working with optParam instead of first commit > optParam
optParams(DataSourceReadOptions.BEGIN_INSTANTTIME.key)))
.filter(String.format("%s <= '%s'", HoodieRecord.COMMIT_TIME_METADATA_FIELD,