[HUDI-4173] Fix wrong results if the user read no base files hudi table by glob paths (#5723)
This commit is contained in:
@@ -24,7 +24,7 @@ import org.apache.spark.sql.avro.{HoodieAvroDeserializer, HoodieAvroSchemaConver
|
||||
import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation
|
||||
import org.apache.spark.sql.catalyst.catalog.CatalogTable
|
||||
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
|
||||
import org.apache.spark.sql.catalyst.expressions.Expression
|
||||
import org.apache.spark.sql.catalyst.expressions.{Expression, InterpretedPredicate}
|
||||
import org.apache.spark.sql.catalyst.parser.ParserInterface
|
||||
import org.apache.spark.sql.catalyst.plans.JoinType
|
||||
import org.apache.spark.sql.catalyst.plans.logical.{Join, LogicalPlan, SubqueryAlias}
|
||||
@@ -174,4 +174,9 @@ trait SparkAdapter extends Serializable {
|
||||
* Create instance of [[ParquetFileFormat]]
|
||||
*/
|
||||
def createHoodieParquetFileFormat(appendPartitionValues: Boolean): Option[ParquetFileFormat]
|
||||
|
||||
/**
|
||||
* Create instance of [[InterpretedPredicate]]
|
||||
*/
|
||||
def createInterpretedPredicate(e: Expression): InterpretedPredicate
|
||||
}
|
||||
|
||||
@@ -470,6 +470,20 @@ public abstract class AbstractTableFileSystemView implements SyncableFileSystemV
|
||||
}
|
||||
}
|
||||
|
||||
public final List<Path> getPartitionPaths() {
|
||||
try {
|
||||
readLock.lock();
|
||||
return fetchAllStoredFileGroups()
|
||||
.filter(fg -> !isFileGroupReplaced(fg))
|
||||
.map(HoodieFileGroup::getPartitionPath)
|
||||
.distinct()
|
||||
.map(name -> name.isEmpty() ? metaClient.getBasePathV2() : new Path(metaClient.getBasePathV2(), name))
|
||||
.collect(Collectors.toList());
|
||||
} finally {
|
||||
readLock.unlock();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public final Stream<HoodieBaseFile> getLatestBaseFiles(String partitionStr) {
|
||||
try {
|
||||
|
||||
@@ -104,14 +104,22 @@ class MergeOnReadSnapshotRelation(sqlContext: SQLContext,
|
||||
val fileSlices = fileIndex.listFileSlices(convertedPartitionFilters)
|
||||
buildSplits(fileSlices.values.flatten.toSeq)
|
||||
} else {
|
||||
// TODO refactor to avoid iterating over listed files multiple times
|
||||
val partitions = listLatestBaseFiles(globPaths, convertedPartitionFilters, dataFilters)
|
||||
val partitionPaths = partitions.keys.toSeq
|
||||
val inMemoryFileIndex = HoodieInMemoryFileIndex.create(sparkSession, globPaths)
|
||||
val partitionDirs = inMemoryFileIndex.listFiles(partitionFilters, dataFilters)
|
||||
|
||||
val fsView = new HoodieTableFileSystemView(metaClient, timeline, partitionDirs.flatMap(_.files).toArray)
|
||||
val partitionPaths = fsView.getPartitionPaths.asScala
|
||||
|
||||
if (partitionPaths.isEmpty || latestInstant.isEmpty) {
|
||||
// If this an empty table OR it has no completed commits yet, return
|
||||
List.empty[HoodieMergeOnReadFileSplit]
|
||||
} else {
|
||||
val fileSlices = listFileSlices(partitionPaths)
|
||||
val queryTimestamp = this.queryTimestamp.get
|
||||
|
||||
val fileSlices = partitionPaths.flatMap { partitionPath =>
|
||||
val relativePath = getRelativePartitionPath(new Path(basePath), partitionPath)
|
||||
fsView.getLatestMergedFileSlicesBeforeOrOn(relativePath, queryTimestamp).iterator().asScala.toSeq
|
||||
}
|
||||
buildSplits(fileSlices)
|
||||
}
|
||||
}
|
||||
@@ -130,20 +138,6 @@ class MergeOnReadSnapshotRelation(sqlContext: SQLContext,
|
||||
HoodieMergeOnReadFileSplit(partitionedBaseFile, logFiles)
|
||||
}.toList
|
||||
}
|
||||
|
||||
private def listFileSlices(partitionPaths: Seq[Path]): Seq[FileSlice] = {
|
||||
// NOTE: It's critical for us to re-use [[InMemoryFileIndex]] to make sure we're leveraging
|
||||
// [[FileStatusCache]] and avoid listing the whole table again
|
||||
val inMemoryFileIndex = HoodieInMemoryFileIndex.create(sparkSession, partitionPaths)
|
||||
val fsView = new HoodieTableFileSystemView(metaClient, timeline, inMemoryFileIndex.allFiles.toArray)
|
||||
|
||||
val queryTimestamp = this.queryTimestamp.get
|
||||
|
||||
partitionPaths.flatMap { partitionPath =>
|
||||
val relativePath = getRelativePartitionPath(new Path(basePath), partitionPath)
|
||||
fsView.getLatestMergedFileSlicesBeforeOrOn(relativePath, queryTimestamp).iterator().asScala.toSeq
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
object MergeOnReadSnapshotRelation {
|
||||
|
||||
@@ -20,9 +20,12 @@ package org.apache.spark.execution.datasources
|
||||
import org.apache.hadoop.conf.Configuration
|
||||
import org.apache.hadoop.fs.{FileStatus, Path, PathFilter}
|
||||
import org.apache.hadoop.mapred.{FileInputFormat, JobConf}
|
||||
import org.apache.hudi.SparkAdapterSupport
|
||||
import org.apache.spark.HoodieHadoopFSUtils
|
||||
import org.apache.spark.metrics.source.HiveCatalogMetrics
|
||||
import org.apache.spark.sql.SparkSession
|
||||
import org.apache.spark.sql.catalyst.{InternalRow, expressions}
|
||||
import org.apache.spark.sql.catalyst.expressions.{AttributeReference, BoundReference, Expression, InterpretedPredicate}
|
||||
import org.apache.spark.sql.execution.datasources._
|
||||
import org.apache.spark.sql.types.StructType
|
||||
|
||||
@@ -34,7 +37,77 @@ class HoodieInMemoryFileIndex(sparkSession: SparkSession,
|
||||
parameters: Map[String, String],
|
||||
userSpecifiedSchema: Option[StructType],
|
||||
fileStatusCache: FileStatusCache = NoopCache)
|
||||
extends InMemoryFileIndex(sparkSession, rootPathsSpecified, parameters, userSpecifiedSchema, fileStatusCache) {
|
||||
extends InMemoryFileIndex(sparkSession, rootPathsSpecified, parameters, userSpecifiedSchema, fileStatusCache)
|
||||
with SparkAdapterSupport {
|
||||
|
||||
/**
|
||||
* Returns all valid files grouped into partitions when the data is partitioned. If the data is unpartitioned,
|
||||
* this will return a single partition with no partition values
|
||||
*
|
||||
* NOTE: This method replicates the one it overrides, however it uses custom method
|
||||
* that accepts files starting with "."
|
||||
*/
|
||||
override def listFiles(partitionFilters: Seq[Expression], dataFilters: Seq[Expression]): Seq[PartitionDirectory] = {
|
||||
val selectedPartitions = if (partitionSpec().partitionColumns.isEmpty) {
|
||||
PartitionDirectory(InternalRow.empty, allFiles().filter(f => isDataPath(f.getPath))) :: Nil
|
||||
} else {
|
||||
prunePartitions(partitionFilters, partitionSpec()).map {
|
||||
case PartitionPath(values, path) =>
|
||||
val files: Seq[FileStatus] = leafDirToChildrenFiles.get(path) match {
|
||||
case Some(existingDir) =>
|
||||
// Directory has children files in it, return them
|
||||
existingDir.filter(f => isDataPath(f.getPath))
|
||||
|
||||
case None =>
|
||||
// Directory does not exist, or has no children files
|
||||
Nil
|
||||
}
|
||||
PartitionDirectory(values, files)
|
||||
}
|
||||
}
|
||||
logTrace("Selected files after partition pruning:\n\t" + selectedPartitions.mkString("\n\t"))
|
||||
selectedPartitions
|
||||
}
|
||||
|
||||
private def isDataPath(path: Path): Boolean = {
|
||||
val name = path.getName
|
||||
!(name.startsWith("_") && !name.contains("="))
|
||||
}
|
||||
|
||||
private def prunePartitions(
|
||||
predicates: Seq[Expression],
|
||||
partitionSpec: PartitionSpec): Seq[PartitionPath] = {
|
||||
val PartitionSpec(partitionColumns, partitions) = partitionSpec
|
||||
val partitionColumnNames = partitionColumns.map(_.name).toSet
|
||||
val partitionPruningPredicates = predicates.filter {
|
||||
_.references.map(_.name).toSet.subsetOf(partitionColumnNames)
|
||||
}
|
||||
|
||||
if (partitionPruningPredicates.nonEmpty) {
|
||||
val predicate = partitionPruningPredicates.reduce(expressions.And)
|
||||
|
||||
val boundPredicate = sparkAdapter.createInterpretedPredicate(predicate.transform {
|
||||
case a: AttributeReference =>
|
||||
val index = partitionColumns.indexWhere(a.name == _.name)
|
||||
BoundReference(index, partitionColumns(index).dataType, nullable = true)
|
||||
})
|
||||
|
||||
val selected = partitions.filter {
|
||||
case PartitionPath(values, _) => boundPredicate.eval(values)
|
||||
}
|
||||
logInfo {
|
||||
val total = partitions.length
|
||||
val selectedSize = selected.length
|
||||
val percentPruned = (1 - selectedSize.toDouble / total.toDouble) * 100
|
||||
s"Selected $selectedSize partitions out of $total, " +
|
||||
s"pruned ${if (total == 0) "0" else s"$percentPruned%"} partitions."
|
||||
}
|
||||
|
||||
selected
|
||||
} else {
|
||||
partitions
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* List leaf files of given paths. This method will submit a Spark job to do parallel
|
||||
|
||||
@@ -719,6 +719,8 @@ class TestMORDataSource extends HoodieClientTestBase with SparkDatasetMixin {
|
||||
.save(basePath)
|
||||
// There should no base file in the file list.
|
||||
assertTrue(DataSourceTestUtils.isLogFileOnly(basePath))
|
||||
// Test read logs only mor table with glob paths.
|
||||
assertEquals(20, spark.read.format("hudi").load(basePath + "/*/*/*/*").count())
|
||||
// Test read log only mor table.
|
||||
assertEquals(20, spark.read.format("hudi").load(basePath).count())
|
||||
}
|
||||
|
||||
@@ -24,7 +24,7 @@ import org.apache.hudi.client.utils.SparkRowSerDe
|
||||
import org.apache.spark.sql.avro.{HoodieAvroDeserializer, HoodieAvroSchemaConverters, HoodieAvroSerializer, HoodieSpark2_4AvroDeserializer, HoodieSpark2_4AvroSerializer, HoodieSparkAvroSchemaConverters}
|
||||
import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation
|
||||
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
|
||||
import org.apache.spark.sql.catalyst.expressions.{Expression, Like}
|
||||
import org.apache.spark.sql.catalyst.expressions.{Expression, InterpretedPredicate, Like}
|
||||
import org.apache.spark.sql.catalyst.parser.ParserInterface
|
||||
import org.apache.spark.sql.catalyst.plans.JoinType
|
||||
import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoTable, Join, LogicalPlan}
|
||||
@@ -162,4 +162,8 @@ class Spark2Adapter extends SparkAdapter {
|
||||
override def createHoodieParquetFileFormat(appendPartitionValues: Boolean): Option[ParquetFileFormat] = {
|
||||
Some(new Spark24HoodieParquetFileFormat(appendPartitionValues))
|
||||
}
|
||||
|
||||
override def createInterpretedPredicate(e: Expression): InterpretedPredicate = {
|
||||
InterpretedPredicate.create(e)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -24,7 +24,7 @@ import org.apache.hudi.spark3.internal.ReflectUtil
|
||||
import org.apache.spark.sql.avro.{HoodieAvroSchemaConverters, HoodieSparkAvroSchemaConverters}
|
||||
import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation
|
||||
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
|
||||
import org.apache.spark.sql.catalyst.expressions.{Expression, Like}
|
||||
import org.apache.spark.sql.catalyst.expressions.{Expression, InterpretedPredicate, Like, Predicate}
|
||||
import org.apache.spark.sql.catalyst.parser.ParserInterface
|
||||
import org.apache.spark.sql.catalyst.plans.JoinType
|
||||
import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoStatement, Join, JoinHint, LogicalPlan}
|
||||
@@ -149,4 +149,8 @@ abstract class BaseSpark3Adapter extends SparkAdapter {
|
||||
None
|
||||
}
|
||||
}
|
||||
|
||||
override def createInterpretedPredicate(e: Expression): InterpretedPredicate = {
|
||||
Predicate.createInterpreted(e)
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user