From 3b401d839c1a3a5a8060f671b13df71b76bf88ac Mon Sep 17 00:00:00 2001 From: Yann Byron Date: Tue, 15 Feb 2022 09:38:01 +0800 Subject: [PATCH] [HUDI-3200] deprecate hoodie.file.index.enable and unify to use BaseFileOnlyViewRelation to handle (#4798) --- .../common/table/TableSchemaResolver.java | 16 +++-- .../table/timeline/HoodieActiveTimeline.java | 21 ++++++ .../hudi/BaseFileOnlyViewRelation.scala | 69 +++++++++++-------- .../org/apache/hudi/DataSourceOptions.scala | 1 + .../scala/org/apache/hudi/DefaultSource.scala | 69 ++----------------- .../apache/hudi/HoodieBootstrapRelation.scala | 6 +- .../hudi/MergeOnReadSnapshotRelation.scala | 6 +- .../hudi/functional/TestCOWDataSource.scala | 2 +- 8 files changed, 87 insertions(+), 103 deletions(-) diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/TableSchemaResolver.java b/hudi-common/src/main/java/org/apache/hudi/common/table/TableSchemaResolver.java index e1c1b947c..3ca2122aa 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/TableSchemaResolver.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/TableSchemaResolver.java @@ -222,14 +222,21 @@ public class TableSchemaResolver { } /** - * Gets the schema for a hoodie table in Avro format from the HoodieCommitMetadata of the last commit. + * Gets the schema for a hoodie table in Avro format from the HoodieCommitMetadata of the last commit with valid schema. * * @return Avro schema for this table */ private Option getTableSchemaFromCommitMetadata(boolean includeMetadataFields) { - HoodieTimeline timeline = metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants(); - if (timeline.lastInstant().isPresent()) { - return getTableSchemaFromCommitMetadata(timeline.lastInstant().get(), includeMetadataFields); + Option> instantAndCommitMetadata = + metaClient.getActiveTimeline().getLastCommitMetadataWithValidSchema(); + if (instantAndCommitMetadata.isPresent()) { + HoodieCommitMetadata commitMetadata = instantAndCommitMetadata.get().getRight(); + String schemaStr = commitMetadata.getMetadata(HoodieCommitMetadata.SCHEMA_KEY); + Schema schema = new Schema.Parser().parse(schemaStr); + if (includeMetadataFields) { + schema = HoodieAvroUtils.addMetadataFields(schema, hasOperationField); + } + return Option.of(schema); } else { return Option.empty(); } @@ -519,7 +526,6 @@ public class TableSchemaResolver { Schema tableAvroSchema = getTableAvroSchemaFromDataFile(); return tableAvroSchema.getField(HoodieRecord.OPERATION_METADATA_FIELD) != null; } catch (Exception e) { - LOG.warn("Failed to read operation field from avro schema", e); return false; } } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieActiveTimeline.java b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieActiveTimeline.java index 1fa3845bf..36dd5368d 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieActiveTimeline.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieActiveTimeline.java @@ -24,6 +24,7 @@ import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.timeline.HoodieInstant.State; import org.apache.hudi.common.util.FileIOUtils; import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.StringUtils; import org.apache.hudi.common.util.ValidationUtils; import org.apache.hudi.common.util.collection.Pair; import org.apache.hudi.exception.HoodieIOException; @@ -259,6 +260,26 @@ public class HoodieActiveTimeline extends HoodieDefaultTimeline { return readDataFromPath(detailPath); } + /** + * Get the last instant with valid schema, and convert this to HoodieCommitMetadata + */ + public Option> getLastCommitMetadataWithValidSchema() { + List completed = getCommitsTimeline().filterCompletedInstants().getInstants() + .sorted(Comparator.comparing(HoodieInstant::getTimestamp).reversed()).collect(Collectors.toList()); + for (HoodieInstant instant : completed) { + try { + HoodieCommitMetadata commitMetadata = HoodieCommitMetadata.fromBytes( + getInstantDetails(instant).get(), HoodieCommitMetadata.class); + if (!StringUtils.isNullOrEmpty(commitMetadata.getMetadata(HoodieCommitMetadata.SCHEMA_KEY))) { + return Option.of(Pair.of(instant, commitMetadata)); + } + } catch (IOException e) { + LOG.warn("Failed to convert instant to HoodieCommitMetadata: " + instant.toString()); + } + } + return Option.empty(); + } + /** * Get the last instant with valid data, and convert this to HoodieCommitMetadata */ diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/BaseFileOnlyViewRelation.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/BaseFileOnlyViewRelation.scala index 7c3f99c9e..8e9480532 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/BaseFileOnlyViewRelation.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/BaseFileOnlyViewRelation.scala @@ -18,21 +18,19 @@ package org.apache.hudi +import org.apache.hadoop.fs.Path + import org.apache.hudi.common.table.HoodieTableMetaClient -import org.apache.hudi.common.table.TableSchemaResolver +import org.apache.hudi.hadoop.HoodieROTablePathFilter import org.apache.spark.rdd.RDD -import org.apache.spark.sql.avro.SchemaConverters import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.sql.catalyst.expressions.Literal -import org.apache.spark.sql.execution.datasources.{PartitionedFile, _} -import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat +import org.apache.spark.sql.catalyst.expressions.{Expression, Literal} +import org.apache.spark.sql.execution.datasources.{FileStatusCache, PartitionedFile} import org.apache.spark.sql.{Row, SQLContext} -import org.apache.spark.sql.sources.{BaseRelation, Filter, PrunedFilteredScan} +import org.apache.spark.sql.sources.{BaseRelation, Filter} import org.apache.spark.sql.types.{BooleanType, StructType} -import scala.util.Try - /** * The implement of [[BaseRelation]], which is used to respond to query that only touches the base files(Parquet), * like query COW tables in Snapshot-Query and Read_Optimized mode and MOR tables in Read_Optimized mode. @@ -41,16 +39,10 @@ class BaseFileOnlyViewRelation( sqlContext: SQLContext, metaClient: HoodieTableMetaClient, optParams: Map[String, String], - userSchema: Option[StructType] + userSchema: Option[StructType], + globPaths: Seq[Path] ) extends HoodieBaseRelation(sqlContext, metaClient, optParams, userSchema) with SparkAdapterSupport { - private val fileIndex = HoodieFileIndex(sparkSession, - metaClient, - userSchema, - optParams, - FileStatusCache.getOrCreate(sqlContext.sparkSession) - ) - override def buildScan(requiredColumns: Array[String], filters: Array[Filter]): RDD[Row] = { sqlContext.sparkSession.sessionState.conf.setConfString("spark.sql.parquet.enableVectorizedReader", "false") @@ -63,21 +55,10 @@ class BaseFileOnlyViewRelation( } (splited.flatMap(_._1), splited.flatMap(_._2)) } + val partitionFiles = getPartitionFiles(partitionFilters, dataFilters) - val partitionFiles = fileIndex.listFiles(partitionFilters, dataFilters).flatMap { partition => - partition.files.flatMap { file => - HoodieDataSourceHelper.splitFiles( - sparkSession = sparkSession, - file = file, - partitionValues = partition.values - ) - } - } - val emptyPartitionFiles = partitionFiles.map{ f => - PartitionedFile(InternalRow.empty, f.filePath, f.start, f.length) - } val maxSplitBytes = sparkSession.sessionState.conf.filesMaxPartitionBytes - val filePartitions = sparkAdapter.getFilePartitions(sparkSession, emptyPartitionFiles, maxSplitBytes) + val filePartitions = sparkAdapter.getFilePartitions(sparkSession, partitionFiles, maxSplitBytes) val requiredSchemaParquetReader = HoodieDataSourceHelper.buildHoodieParquetReader( sparkSession = sparkSession, @@ -92,4 +73,34 @@ class BaseFileOnlyViewRelation( new HoodieFileScanRDD(sparkSession, requiredColumns, tableStructSchema, requiredSchemaParquetReader, filePartitions) } + + private def getPartitionFiles(partitionFilters: Seq[Expression], dataFilters: Seq[Expression]): Seq[PartitionedFile] = { + val partitionDirectories = if (globPaths.isEmpty) { + val hoodieFileIndex = HoodieFileIndex(sparkSession, metaClient, userSchema, optParams, + FileStatusCache.getOrCreate(sqlContext.sparkSession)) + hoodieFileIndex.listFiles(partitionFilters, dataFilters) + } else { + sqlContext.sparkContext.hadoopConfiguration.setClass( + "mapreduce.input.pathFilter.class", + classOf[HoodieROTablePathFilter], + classOf[org.apache.hadoop.fs.PathFilter]) + + val inMemoryFileIndex = HoodieSparkUtils.createInMemoryFileIndex(sparkSession, globPaths) + inMemoryFileIndex.listFiles(partitionFilters, dataFilters) + } + + val partitionFiles = partitionDirectories.flatMap { partition => + partition.files.flatMap { file => + HoodieDataSourceHelper.splitFiles( + sparkSession = sparkSession, + file = file, + partitionValues = partition.values + ) + } + } + + partitionFiles.map{ f => + PartitionedFile(InternalRow.empty, f.filePath, f.start, f.length) + } + } } diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala index e02d96ee8..34641f7f6 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala @@ -75,6 +75,7 @@ object DataSourceReadOptions { val ENABLE_HOODIE_FILE_INDEX: ConfigProperty[Boolean] = ConfigProperty .key("hoodie.file.index.enable") .defaultValue(true) + .deprecatedAfter("0.11.0") .withDocumentation("Enables use of the spark file index implementation for Hudi, " + "that speeds up listing of large tables.") diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DefaultSource.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DefaultSource.scala index 3003345ea..1508babcb 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DefaultSource.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DefaultSource.scala @@ -85,20 +85,15 @@ class DefaultSource extends RelationProvider val allPaths = path.map(p => Seq(p)).getOrElse(Seq()) ++ readPaths val fs = FSUtils.getFs(allPaths.head, sqlContext.sparkContext.hadoopConfiguration) - // Use the HoodieFileIndex only if the 'path' is not globbed. - // Or else we use the original way to read hoodie table. - val enableFileIndex = optParams.get(ENABLE_HOODIE_FILE_INDEX.key) - .map(_.toBoolean).getOrElse(ENABLE_HOODIE_FILE_INDEX.defaultValue) - val useHoodieFileIndex = enableFileIndex && path.isDefined && !path.get.contains("*") && - !parameters.contains(DataSourceReadOptions.READ_PATHS.key) - val globPaths = if (useHoodieFileIndex) { - None + + val globPaths = if (path.exists(_.contains("*")) || readPaths.nonEmpty) { + HoodieSparkUtils.checkAndGlobPathIfNecessary(allPaths, fs) } else { - Some(HoodieSparkUtils.checkAndGlobPathIfNecessary(allPaths, fs)) + Seq.empty } // Get the table base path - val tablePath = if (globPaths.isDefined) { - DataSourceUtils.getTablePath(fs, globPaths.get.toArray) + val tablePath = if (globPaths.nonEmpty) { + DataSourceUtils.getTablePath(fs, globPaths.toArray) } else { DataSourceUtils.getTablePath(fs, Array(new Path(path.get))) } @@ -118,8 +113,7 @@ class DefaultSource extends RelationProvider case (COPY_ON_WRITE, QUERY_TYPE_SNAPSHOT_OPT_VAL, false) | (COPY_ON_WRITE, QUERY_TYPE_READ_OPTIMIZED_OPT_VAL, false) | (MERGE_ON_READ, QUERY_TYPE_READ_OPTIMIZED_OPT_VAL, false) => - getBaseFileOnlyView(useHoodieFileIndex, sqlContext, parameters, userSchema, tablePath, - readPaths, metaClient) + new BaseFileOnlyViewRelation(sqlContext, metaClient, parameters, userSchema, globPaths) case (COPY_ON_WRITE, QUERY_TYPE_INCREMENTAL_OPT_VAL, _) => new IncrementalRelation(sqlContext, parameters, userSchema, metaClient) @@ -183,55 +177,6 @@ class DefaultSource extends RelationProvider override def shortName(): String = "hudi_v1" - private def getBaseFileOnlyView(useHoodieFileIndex: Boolean, - sqlContext: SQLContext, - optParams: Map[String, String], - schema: Option[StructType], - tablePath: String, - extraReadPaths: Seq[String], - metaClient: HoodieTableMetaClient): BaseRelation = { - log.info("Loading Base File Only View with options :" + optParams) - val (tableFileFormat, formatClassName) = metaClient.getTableConfig.getBaseFileFormat match { - case HoodieFileFormat.PARQUET => (new ParquetFileFormat, "parquet") - case HoodieFileFormat.ORC => (new OrcFileFormat, "orc") - } - - if (useHoodieFileIndex) { - new BaseFileOnlyViewRelation(sqlContext, metaClient, optParams, schema) - } else { - // this is just effectively RO view only, where `path` can contain a mix of - // non-hoodie/hoodie path files. set the path filter up - sqlContext.sparkContext.hadoopConfiguration.setClass( - "mapreduce.input.pathFilter.class", - classOf[HoodieROTablePathFilter], - classOf[org.apache.hadoop.fs.PathFilter]) - - val specifySchema = if (schema.isEmpty) { - // Load the schema from the commit meta data. - // Here we should specify the schema to the latest commit schema since - // the table schema evolution. - val tableSchemaResolver = new TableSchemaResolver(metaClient) - try { - Some(AvroConversionUtils.convertAvroSchemaToStructType(tableSchemaResolver.getTableAvroSchema)) - } catch { - case _: Throwable => - None // If there is no commit in the table, we can not get the schema - // with tableSchemaResolver, return None here. - } - } else { - schema - } - // simply return as a regular relation - DataSource.apply( - sparkSession = sqlContext.sparkSession, - paths = extraReadPaths, - userSpecifiedSchema = specifySchema, - className = formatClassName, - options = optParams) - .resolveRelation() - } - } - override def sourceSchema(sqlContext: SQLContext, schema: Option[StructType], providerName: String, diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBootstrapRelation.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBootstrapRelation.scala index e9ce0f1ae..dd90d724c 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBootstrapRelation.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBootstrapRelation.scala @@ -53,7 +53,7 @@ import scala.collection.JavaConverters._ */ class HoodieBootstrapRelation(@transient val _sqlContext: SQLContext, val userSchema: Option[StructType], - val globPaths: Option[Seq[Path]], + val globPaths: Seq[Path], val metaClient: HoodieTableMetaClient, val optParams: Map[String, String]) extends BaseRelation with PrunedFilteredScan with Logging { @@ -155,9 +155,9 @@ class HoodieBootstrapRelation(@transient val _sqlContext: SQLContext, def buildFileIndex(): HoodieBootstrapFileIndex = { logInfo("Building file index..") - val fileStatuses = if (globPaths.isDefined) { + val fileStatuses = if (globPaths.nonEmpty) { // Load files from the global paths if it has defined to be compatible with the original mode - val inMemoryFileIndex = HoodieSparkUtils.createInMemoryFileIndex(_sqlContext.sparkSession, globPaths.get) + val inMemoryFileIndex = HoodieSparkUtils.createInMemoryFileIndex(_sqlContext.sparkSession, globPaths) inMemoryFileIndex.allFiles() } else { // Load files by the HoodieFileIndex. HoodieFileIndex(sqlContext.sparkSession, metaClient, Some(schema), optParams, diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadSnapshotRelation.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadSnapshotRelation.scala index 6ff49823a..c18ca8737 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadSnapshotRelation.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadSnapshotRelation.scala @@ -59,7 +59,7 @@ case class HoodieMergeOnReadTableState(tableStructSchema: StructType, class MergeOnReadSnapshotRelation(sqlContext: SQLContext, optParams: Map[String, String], val userSchema: Option[StructType], - val globPaths: Option[Seq[Path]], + val globPaths: Seq[Path], val metaClient: HoodieTableMetaClient) extends HoodieBaseRelation(sqlContext, metaClient, optParams, userSchema) { @@ -139,9 +139,9 @@ class MergeOnReadSnapshotRelation(sqlContext: SQLContext, } def buildFileIndex(filters: Array[Filter]): List[HoodieMergeOnReadFileSplit] = { - if (globPaths.isDefined) { + if (globPaths.nonEmpty) { // Load files from the global paths if it has defined to be compatible with the original mode - val inMemoryFileIndex = HoodieSparkUtils.createInMemoryFileIndex(sqlContext.sparkSession, globPaths.get) + val inMemoryFileIndex = HoodieSparkUtils.createInMemoryFileIndex(sqlContext.sparkSession, globPaths) val fsView = new HoodieTableFileSystemView(metaClient, // file-slice after pending compaction-requested instant-time is also considered valid metaClient.getCommitsAndCompactionTimeline.filterCompletedAndCompactionInstants, diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala index e998f7cfa..8e6c6b27e 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala @@ -157,7 +157,7 @@ class TestCOWDataSource extends HoodieClientTestBase { val snapshotDF2 = spark.read.format("org.apache.hudi") .load(basePath + "/*/*/*/*") - assertEquals(snapshotDF1.count() - inputDF2.count(), snapshotDF2.count()) + assertEquals(snapshotDF2.count(), 80) } @Test def testOverWriteModeUseReplaceAction(): Unit = {