[HUDI-3200] deprecate hoodie.file.index.enable and unify to use BaseFileOnlyViewRelation to handle (#4798)
This commit is contained in:
@@ -222,14 +222,21 @@ public class TableSchemaResolver {
|
||||
}
|
||||
|
||||
/**
|
||||
* Gets the schema for a hoodie table in Avro format from the HoodieCommitMetadata of the last commit.
|
||||
* Gets the schema for a hoodie table in Avro format from the HoodieCommitMetadata of the last commit with valid schema.
|
||||
*
|
||||
* @return Avro schema for this table
|
||||
*/
|
||||
private Option<Schema> getTableSchemaFromCommitMetadata(boolean includeMetadataFields) {
|
||||
HoodieTimeline timeline = metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants();
|
||||
if (timeline.lastInstant().isPresent()) {
|
||||
return getTableSchemaFromCommitMetadata(timeline.lastInstant().get(), includeMetadataFields);
|
||||
Option<Pair<HoodieInstant, HoodieCommitMetadata>> instantAndCommitMetadata =
|
||||
metaClient.getActiveTimeline().getLastCommitMetadataWithValidSchema();
|
||||
if (instantAndCommitMetadata.isPresent()) {
|
||||
HoodieCommitMetadata commitMetadata = instantAndCommitMetadata.get().getRight();
|
||||
String schemaStr = commitMetadata.getMetadata(HoodieCommitMetadata.SCHEMA_KEY);
|
||||
Schema schema = new Schema.Parser().parse(schemaStr);
|
||||
if (includeMetadataFields) {
|
||||
schema = HoodieAvroUtils.addMetadataFields(schema, hasOperationField);
|
||||
}
|
||||
return Option.of(schema);
|
||||
} else {
|
||||
return Option.empty();
|
||||
}
|
||||
@@ -519,7 +526,6 @@ public class TableSchemaResolver {
|
||||
Schema tableAvroSchema = getTableAvroSchemaFromDataFile();
|
||||
return tableAvroSchema.getField(HoodieRecord.OPERATION_METADATA_FIELD) != null;
|
||||
} catch (Exception e) {
|
||||
LOG.warn("Failed to read operation field from avro schema", e);
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -24,6 +24,7 @@ import org.apache.hudi.common.table.HoodieTableMetaClient;
|
||||
import org.apache.hudi.common.table.timeline.HoodieInstant.State;
|
||||
import org.apache.hudi.common.util.FileIOUtils;
|
||||
import org.apache.hudi.common.util.Option;
|
||||
import org.apache.hudi.common.util.StringUtils;
|
||||
import org.apache.hudi.common.util.ValidationUtils;
|
||||
import org.apache.hudi.common.util.collection.Pair;
|
||||
import org.apache.hudi.exception.HoodieIOException;
|
||||
@@ -259,6 +260,26 @@ public class HoodieActiveTimeline extends HoodieDefaultTimeline {
|
||||
return readDataFromPath(detailPath);
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the last instant with valid schema, and convert this to HoodieCommitMetadata
|
||||
*/
|
||||
public Option<Pair<HoodieInstant, HoodieCommitMetadata>> getLastCommitMetadataWithValidSchema() {
|
||||
List<HoodieInstant> completed = getCommitsTimeline().filterCompletedInstants().getInstants()
|
||||
.sorted(Comparator.comparing(HoodieInstant::getTimestamp).reversed()).collect(Collectors.toList());
|
||||
for (HoodieInstant instant : completed) {
|
||||
try {
|
||||
HoodieCommitMetadata commitMetadata = HoodieCommitMetadata.fromBytes(
|
||||
getInstantDetails(instant).get(), HoodieCommitMetadata.class);
|
||||
if (!StringUtils.isNullOrEmpty(commitMetadata.getMetadata(HoodieCommitMetadata.SCHEMA_KEY))) {
|
||||
return Option.of(Pair.of(instant, commitMetadata));
|
||||
}
|
||||
} catch (IOException e) {
|
||||
LOG.warn("Failed to convert instant to HoodieCommitMetadata: " + instant.toString());
|
||||
}
|
||||
}
|
||||
return Option.empty();
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the last instant with valid data, and convert this to HoodieCommitMetadata
|
||||
*/
|
||||
|
||||
@@ -18,21 +18,19 @@
|
||||
|
||||
package org.apache.hudi
|
||||
|
||||
import org.apache.hadoop.fs.Path
|
||||
|
||||
import org.apache.hudi.common.table.HoodieTableMetaClient
|
||||
import org.apache.hudi.common.table.TableSchemaResolver
|
||||
import org.apache.hudi.hadoop.HoodieROTablePathFilter
|
||||
|
||||
import org.apache.spark.rdd.RDD
|
||||
import org.apache.spark.sql.avro.SchemaConverters
|
||||
import org.apache.spark.sql.catalyst.InternalRow
|
||||
import org.apache.spark.sql.catalyst.expressions.Literal
|
||||
import org.apache.spark.sql.execution.datasources.{PartitionedFile, _}
|
||||
import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat
|
||||
import org.apache.spark.sql.catalyst.expressions.{Expression, Literal}
|
||||
import org.apache.spark.sql.execution.datasources.{FileStatusCache, PartitionedFile}
|
||||
import org.apache.spark.sql.{Row, SQLContext}
|
||||
import org.apache.spark.sql.sources.{BaseRelation, Filter, PrunedFilteredScan}
|
||||
import org.apache.spark.sql.sources.{BaseRelation, Filter}
|
||||
import org.apache.spark.sql.types.{BooleanType, StructType}
|
||||
|
||||
import scala.util.Try
|
||||
|
||||
/**
|
||||
* The implement of [[BaseRelation]], which is used to respond to query that only touches the base files(Parquet),
|
||||
* like query COW tables in Snapshot-Query and Read_Optimized mode and MOR tables in Read_Optimized mode.
|
||||
@@ -41,16 +39,10 @@ class BaseFileOnlyViewRelation(
|
||||
sqlContext: SQLContext,
|
||||
metaClient: HoodieTableMetaClient,
|
||||
optParams: Map[String, String],
|
||||
userSchema: Option[StructType]
|
||||
userSchema: Option[StructType],
|
||||
globPaths: Seq[Path]
|
||||
) extends HoodieBaseRelation(sqlContext, metaClient, optParams, userSchema) with SparkAdapterSupport {
|
||||
|
||||
private val fileIndex = HoodieFileIndex(sparkSession,
|
||||
metaClient,
|
||||
userSchema,
|
||||
optParams,
|
||||
FileStatusCache.getOrCreate(sqlContext.sparkSession)
|
||||
)
|
||||
|
||||
override def buildScan(requiredColumns: Array[String], filters: Array[Filter]): RDD[Row] = {
|
||||
sqlContext.sparkSession.sessionState.conf.setConfString("spark.sql.parquet.enableVectorizedReader", "false")
|
||||
|
||||
@@ -63,21 +55,10 @@ class BaseFileOnlyViewRelation(
|
||||
}
|
||||
(splited.flatMap(_._1), splited.flatMap(_._2))
|
||||
}
|
||||
val partitionFiles = getPartitionFiles(partitionFilters, dataFilters)
|
||||
|
||||
val partitionFiles = fileIndex.listFiles(partitionFilters, dataFilters).flatMap { partition =>
|
||||
partition.files.flatMap { file =>
|
||||
HoodieDataSourceHelper.splitFiles(
|
||||
sparkSession = sparkSession,
|
||||
file = file,
|
||||
partitionValues = partition.values
|
||||
)
|
||||
}
|
||||
}
|
||||
val emptyPartitionFiles = partitionFiles.map{ f =>
|
||||
PartitionedFile(InternalRow.empty, f.filePath, f.start, f.length)
|
||||
}
|
||||
val maxSplitBytes = sparkSession.sessionState.conf.filesMaxPartitionBytes
|
||||
val filePartitions = sparkAdapter.getFilePartitions(sparkSession, emptyPartitionFiles, maxSplitBytes)
|
||||
val filePartitions = sparkAdapter.getFilePartitions(sparkSession, partitionFiles, maxSplitBytes)
|
||||
|
||||
val requiredSchemaParquetReader = HoodieDataSourceHelper.buildHoodieParquetReader(
|
||||
sparkSession = sparkSession,
|
||||
@@ -92,4 +73,34 @@ class BaseFileOnlyViewRelation(
|
||||
new HoodieFileScanRDD(sparkSession, requiredColumns, tableStructSchema,
|
||||
requiredSchemaParquetReader, filePartitions)
|
||||
}
|
||||
|
||||
private def getPartitionFiles(partitionFilters: Seq[Expression], dataFilters: Seq[Expression]): Seq[PartitionedFile] = {
|
||||
val partitionDirectories = if (globPaths.isEmpty) {
|
||||
val hoodieFileIndex = HoodieFileIndex(sparkSession, metaClient, userSchema, optParams,
|
||||
FileStatusCache.getOrCreate(sqlContext.sparkSession))
|
||||
hoodieFileIndex.listFiles(partitionFilters, dataFilters)
|
||||
} else {
|
||||
sqlContext.sparkContext.hadoopConfiguration.setClass(
|
||||
"mapreduce.input.pathFilter.class",
|
||||
classOf[HoodieROTablePathFilter],
|
||||
classOf[org.apache.hadoop.fs.PathFilter])
|
||||
|
||||
val inMemoryFileIndex = HoodieSparkUtils.createInMemoryFileIndex(sparkSession, globPaths)
|
||||
inMemoryFileIndex.listFiles(partitionFilters, dataFilters)
|
||||
}
|
||||
|
||||
val partitionFiles = partitionDirectories.flatMap { partition =>
|
||||
partition.files.flatMap { file =>
|
||||
HoodieDataSourceHelper.splitFiles(
|
||||
sparkSession = sparkSession,
|
||||
file = file,
|
||||
partitionValues = partition.values
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
partitionFiles.map{ f =>
|
||||
PartitionedFile(InternalRow.empty, f.filePath, f.start, f.length)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -75,6 +75,7 @@ object DataSourceReadOptions {
|
||||
val ENABLE_HOODIE_FILE_INDEX: ConfigProperty[Boolean] = ConfigProperty
|
||||
.key("hoodie.file.index.enable")
|
||||
.defaultValue(true)
|
||||
.deprecatedAfter("0.11.0")
|
||||
.withDocumentation("Enables use of the spark file index implementation for Hudi, "
|
||||
+ "that speeds up listing of large tables.")
|
||||
|
||||
|
||||
@@ -85,20 +85,15 @@ class DefaultSource extends RelationProvider
|
||||
val allPaths = path.map(p => Seq(p)).getOrElse(Seq()) ++ readPaths
|
||||
|
||||
val fs = FSUtils.getFs(allPaths.head, sqlContext.sparkContext.hadoopConfiguration)
|
||||
// Use the HoodieFileIndex only if the 'path' is not globbed.
|
||||
// Or else we use the original way to read hoodie table.
|
||||
val enableFileIndex = optParams.get(ENABLE_HOODIE_FILE_INDEX.key)
|
||||
.map(_.toBoolean).getOrElse(ENABLE_HOODIE_FILE_INDEX.defaultValue)
|
||||
val useHoodieFileIndex = enableFileIndex && path.isDefined && !path.get.contains("*") &&
|
||||
!parameters.contains(DataSourceReadOptions.READ_PATHS.key)
|
||||
val globPaths = if (useHoodieFileIndex) {
|
||||
None
|
||||
|
||||
val globPaths = if (path.exists(_.contains("*")) || readPaths.nonEmpty) {
|
||||
HoodieSparkUtils.checkAndGlobPathIfNecessary(allPaths, fs)
|
||||
} else {
|
||||
Some(HoodieSparkUtils.checkAndGlobPathIfNecessary(allPaths, fs))
|
||||
Seq.empty
|
||||
}
|
||||
// Get the table base path
|
||||
val tablePath = if (globPaths.isDefined) {
|
||||
DataSourceUtils.getTablePath(fs, globPaths.get.toArray)
|
||||
val tablePath = if (globPaths.nonEmpty) {
|
||||
DataSourceUtils.getTablePath(fs, globPaths.toArray)
|
||||
} else {
|
||||
DataSourceUtils.getTablePath(fs, Array(new Path(path.get)))
|
||||
}
|
||||
@@ -118,8 +113,7 @@ class DefaultSource extends RelationProvider
|
||||
case (COPY_ON_WRITE, QUERY_TYPE_SNAPSHOT_OPT_VAL, false) |
|
||||
(COPY_ON_WRITE, QUERY_TYPE_READ_OPTIMIZED_OPT_VAL, false) |
|
||||
(MERGE_ON_READ, QUERY_TYPE_READ_OPTIMIZED_OPT_VAL, false) =>
|
||||
getBaseFileOnlyView(useHoodieFileIndex, sqlContext, parameters, userSchema, tablePath,
|
||||
readPaths, metaClient)
|
||||
new BaseFileOnlyViewRelation(sqlContext, metaClient, parameters, userSchema, globPaths)
|
||||
|
||||
case (COPY_ON_WRITE, QUERY_TYPE_INCREMENTAL_OPT_VAL, _) =>
|
||||
new IncrementalRelation(sqlContext, parameters, userSchema, metaClient)
|
||||
@@ -183,55 +177,6 @@ class DefaultSource extends RelationProvider
|
||||
|
||||
override def shortName(): String = "hudi_v1"
|
||||
|
||||
private def getBaseFileOnlyView(useHoodieFileIndex: Boolean,
|
||||
sqlContext: SQLContext,
|
||||
optParams: Map[String, String],
|
||||
schema: Option[StructType],
|
||||
tablePath: String,
|
||||
extraReadPaths: Seq[String],
|
||||
metaClient: HoodieTableMetaClient): BaseRelation = {
|
||||
log.info("Loading Base File Only View with options :" + optParams)
|
||||
val (tableFileFormat, formatClassName) = metaClient.getTableConfig.getBaseFileFormat match {
|
||||
case HoodieFileFormat.PARQUET => (new ParquetFileFormat, "parquet")
|
||||
case HoodieFileFormat.ORC => (new OrcFileFormat, "orc")
|
||||
}
|
||||
|
||||
if (useHoodieFileIndex) {
|
||||
new BaseFileOnlyViewRelation(sqlContext, metaClient, optParams, schema)
|
||||
} else {
|
||||
// this is just effectively RO view only, where `path` can contain a mix of
|
||||
// non-hoodie/hoodie path files. set the path filter up
|
||||
sqlContext.sparkContext.hadoopConfiguration.setClass(
|
||||
"mapreduce.input.pathFilter.class",
|
||||
classOf[HoodieROTablePathFilter],
|
||||
classOf[org.apache.hadoop.fs.PathFilter])
|
||||
|
||||
val specifySchema = if (schema.isEmpty) {
|
||||
// Load the schema from the commit meta data.
|
||||
// Here we should specify the schema to the latest commit schema since
|
||||
// the table schema evolution.
|
||||
val tableSchemaResolver = new TableSchemaResolver(metaClient)
|
||||
try {
|
||||
Some(AvroConversionUtils.convertAvroSchemaToStructType(tableSchemaResolver.getTableAvroSchema))
|
||||
} catch {
|
||||
case _: Throwable =>
|
||||
None // If there is no commit in the table, we can not get the schema
|
||||
// with tableSchemaResolver, return None here.
|
||||
}
|
||||
} else {
|
||||
schema
|
||||
}
|
||||
// simply return as a regular relation
|
||||
DataSource.apply(
|
||||
sparkSession = sqlContext.sparkSession,
|
||||
paths = extraReadPaths,
|
||||
userSpecifiedSchema = specifySchema,
|
||||
className = formatClassName,
|
||||
options = optParams)
|
||||
.resolveRelation()
|
||||
}
|
||||
}
|
||||
|
||||
override def sourceSchema(sqlContext: SQLContext,
|
||||
schema: Option[StructType],
|
||||
providerName: String,
|
||||
|
||||
@@ -53,7 +53,7 @@ import scala.collection.JavaConverters._
|
||||
*/
|
||||
class HoodieBootstrapRelation(@transient val _sqlContext: SQLContext,
|
||||
val userSchema: Option[StructType],
|
||||
val globPaths: Option[Seq[Path]],
|
||||
val globPaths: Seq[Path],
|
||||
val metaClient: HoodieTableMetaClient,
|
||||
val optParams: Map[String, String]) extends BaseRelation
|
||||
with PrunedFilteredScan with Logging {
|
||||
@@ -155,9 +155,9 @@ class HoodieBootstrapRelation(@transient val _sqlContext: SQLContext,
|
||||
|
||||
def buildFileIndex(): HoodieBootstrapFileIndex = {
|
||||
logInfo("Building file index..")
|
||||
val fileStatuses = if (globPaths.isDefined) {
|
||||
val fileStatuses = if (globPaths.nonEmpty) {
|
||||
// Load files from the global paths if it has defined to be compatible with the original mode
|
||||
val inMemoryFileIndex = HoodieSparkUtils.createInMemoryFileIndex(_sqlContext.sparkSession, globPaths.get)
|
||||
val inMemoryFileIndex = HoodieSparkUtils.createInMemoryFileIndex(_sqlContext.sparkSession, globPaths)
|
||||
inMemoryFileIndex.allFiles()
|
||||
} else { // Load files by the HoodieFileIndex.
|
||||
HoodieFileIndex(sqlContext.sparkSession, metaClient, Some(schema), optParams,
|
||||
|
||||
@@ -59,7 +59,7 @@ case class HoodieMergeOnReadTableState(tableStructSchema: StructType,
|
||||
class MergeOnReadSnapshotRelation(sqlContext: SQLContext,
|
||||
optParams: Map[String, String],
|
||||
val userSchema: Option[StructType],
|
||||
val globPaths: Option[Seq[Path]],
|
||||
val globPaths: Seq[Path],
|
||||
val metaClient: HoodieTableMetaClient)
|
||||
extends HoodieBaseRelation(sqlContext, metaClient, optParams, userSchema) {
|
||||
|
||||
@@ -139,9 +139,9 @@ class MergeOnReadSnapshotRelation(sqlContext: SQLContext,
|
||||
}
|
||||
|
||||
def buildFileIndex(filters: Array[Filter]): List[HoodieMergeOnReadFileSplit] = {
|
||||
if (globPaths.isDefined) {
|
||||
if (globPaths.nonEmpty) {
|
||||
// Load files from the global paths if it has defined to be compatible with the original mode
|
||||
val inMemoryFileIndex = HoodieSparkUtils.createInMemoryFileIndex(sqlContext.sparkSession, globPaths.get)
|
||||
val inMemoryFileIndex = HoodieSparkUtils.createInMemoryFileIndex(sqlContext.sparkSession, globPaths)
|
||||
val fsView = new HoodieTableFileSystemView(metaClient,
|
||||
// file-slice after pending compaction-requested instant-time is also considered valid
|
||||
metaClient.getCommitsAndCompactionTimeline.filterCompletedAndCompactionInstants,
|
||||
|
||||
@@ -157,7 +157,7 @@ class TestCOWDataSource extends HoodieClientTestBase {
|
||||
|
||||
val snapshotDF2 = spark.read.format("org.apache.hudi")
|
||||
.load(basePath + "/*/*/*/*")
|
||||
assertEquals(snapshotDF1.count() - inputDF2.count(), snapshotDF2.count())
|
||||
assertEquals(snapshotDF2.count(), 80)
|
||||
}
|
||||
|
||||
@Test def testOverWriteModeUseReplaceAction(): Unit = {
|
||||
|
||||
Reference in New Issue
Block a user