1
0

[HUDI-3239] Convert BaseHoodieTableFileIndex to Java (#4669)

Converting BaseHoodieTableFileIndex to Java, removing Scala as a dependency from "hudi-common"
This commit is contained in:
Alexey Kudinkin
2022-02-09 15:42:08 -08:00
committed by GitHub
parent 973087f385
commit 464027ec37
15 changed files with 443 additions and 554 deletions

View File

@@ -72,7 +72,7 @@ case class HoodieFileIndex(spark: SparkSession,
)
with FileIndex {
override def rootPaths: Seq[Path] = queryPaths
override def rootPaths: Seq[Path] = queryPaths.asScala
def enableDataSkipping(): Boolean = {
options.getOrElse(DataSourceReadOptions.ENABLE_DATA_SKIPPING.key(),
@@ -88,7 +88,7 @@ case class HoodieFileIndex(spark: SparkSession,
* @return List of FileStatus for base files
*/
def allFiles: Seq[FileStatus] = {
cachedAllInputFileSlices.values.flatten
cachedAllInputFileSlices.values.asScala.flatMap(_.asScala)
.filter(_.getBaseFile.isPresent)
.map(_.getBaseFile.get().getFileStatus)
.toSeq
@@ -101,31 +101,29 @@ case class HoodieFileIndex(spark: SparkSession,
* @param dataFilters data columns filters
* @return list of PartitionDirectory containing partition to base files mapping
*/
override def listFiles(partitionFilters: Seq[Expression],
dataFilters: Seq[Expression]): Seq[PartitionDirectory] = {
override def listFiles(partitionFilters: Seq[Expression], dataFilters: Seq[Expression]): Seq[PartitionDirectory] = {
// Look up candidate files names in the col-stats index, if all of the following conditions are true
// - Data-skipping is enabled
// - Col-Stats Index is present
// - List of predicates (filters) is present
val candidateFilesNamesOpt: Option[Set[String]] =
lookupCandidateFilesInColStatsIndex(dataFilters) match {
case Success(opt) => opt
case Failure(e) =>
if (e.isInstanceOf[AnalysisException]) {
logDebug("Failed to relay provided data filters to Z-index lookup", e)
} else {
logError("Failed to lookup candidate files in Z-index", e)
}
Option.empty
}
lookupCandidateFilesInColStatsIndex(dataFilters) match {
case Success(opt) => opt
case Failure(e) =>
if (e.isInstanceOf[AnalysisException]) {
logDebug("Failed to relay provided data filters to Z-index lookup", e)
} else {
logError("Failed to lookup candidate files in Z-index", e)
}
Option.empty
}
logDebug(s"Overlapping candidate files (from Z-index): ${candidateFilesNamesOpt.getOrElse(Set.empty)}")
if (queryAsNonePartitionedTable) {
// Read as Non-Partitioned table
// Filter in candidate files based on the col-stats index lookup
val candidateFiles =
allFiles.filter(fileStatus =>
val candidateFiles = allFiles.filter(fileStatus =>
// NOTE: This predicate is true when {@code Option} is empty
candidateFilesNamesOpt.forall(_.contains(fileStatus.getPath.getName))
)
@@ -137,22 +135,21 @@ case class HoodieFileIndex(spark: SparkSession,
Seq(PartitionDirectory(InternalRow.empty, candidateFiles))
} else {
// Prune the partition path by the partition filters
val prunedPartitions = prunePartition(cachedAllInputFileSlices.keys.toSeq, partitionFilters)
val prunedPartitions = prunePartition(cachedAllInputFileSlices.keySet.asScala.toSeq, partitionFilters)
var totalFileSize = 0
var candidateFileSize = 0
val result = prunedPartitions.map { partition =>
val baseFileStatuses: Seq[FileStatus] =
cachedAllInputFileSlices(partition)
cachedAllInputFileSlices.get(partition).asScala
.map(fs => fs.getBaseFile.orElse(null))
.filter(_ != null)
.map(_.getFileStatus)
// Filter in candidate files based on the col-stats index lookup
val candidateFiles =
baseFileStatuses.filter(fs =>
// NOTE: This predicate is true when {@code Option} is empty
candidateFilesNamesOpt.forall(_.contains(fs.getPath.getName)))
val candidateFiles = baseFileStatuses.filter(fs =>
// NOTE: This predicate is true when {@code Option} is empty
candidateFilesNamesOpt.forall(_.contains(fs.getPath.getName)))
totalFileSize += baseFileStatuses.size
candidateFileSize += candidateFiles.size
@@ -194,12 +191,14 @@ case class HoodieFileIndex(spark: SparkSession,
// scalastyle:on return
}
val completedCommits = getActiveTimeline.filterCompletedInstants().getInstants.iterator.asScala.toList.map(_.getTimestamp)
// Collect all index tables present in `.zindex` folder
val candidateIndexTables =
fs.listStatus(new Path(indexPath))
.filter(_.isDirectory)
.map(_.getPath.getName)
.filter(f => completedCommits.contains(f))
.filter(completedCommits.contains(_))
.sortBy(x => x)
if (candidateIndexTables.isEmpty) {

View File

@@ -18,8 +18,9 @@
package org.apache.hudi
import org.apache.hadoop.fs.{FileStatus, Path}
import org.apache.hudi.BaseHoodieTableFileIndex.PartitionPath
import org.apache.hudi.DataSourceReadOptions.{QUERY_TYPE, QUERY_TYPE_INCREMENTAL_OPT_VAL, QUERY_TYPE_READ_OPTIMIZED_OPT_VAL, QUERY_TYPE_SNAPSHOT_OPT_VAL}
import org.apache.hudi.SparkHoodieTableFileIndex.{deduceQueryType, generateFieldMap}
import org.apache.hudi.SparkHoodieTableFileIndex.{deduceQueryType, generateFieldMap, toJavaOption}
import org.apache.hudi.client.common.HoodieSparkEngineContext
import org.apache.hudi.common.config.TypedProperties
import org.apache.hudi.common.model.{FileSlice, HoodieTableQueryType}
@@ -36,10 +37,11 @@ import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types.{StringType, StructField, StructType}
import org.apache.spark.unsafe.types.UTF8String
import scala.collection.JavaConversions._
import scala.collection.JavaConverters._
import scala.language.implicitConversions
/**
* Implementation of the [[HoodieTableFileIndexBase]] for Spark
* Implementation of the [[BaseHoodieTableFileIndex]] for Spark
*
* @param spark spark session
* @param metaClient Hudi table's meta-client
@@ -55,14 +57,15 @@ class SparkHoodieTableFileIndex(spark: SparkSession,
queryPaths: Seq[Path],
specifiedQueryInstant: Option[String] = None,
@transient fileStatusCache: FileStatusCache = NoopCache)
extends HoodieTableFileIndexBase(
engineContext = new HoodieSparkEngineContext(new JavaSparkContext(spark.sparkContext)),
extends BaseHoodieTableFileIndex(
new HoodieSparkEngineContext(new JavaSparkContext(spark.sparkContext)),
metaClient,
configProperties,
queryType = deduceQueryType(configProperties),
queryPaths,
specifiedQueryInstant,
fileStatusCache = SparkHoodieTableFileIndex.adapt(fileStatusCache)
deduceQueryType(configProperties),
queryPaths.asJava,
toJavaOption(specifiedQueryInstant),
false,
SparkHoodieTableFileIndex.adapt(fileStatusCache)
)
with SparkAdapterSupport
with Logging {
@@ -136,9 +139,9 @@ class SparkHoodieTableFileIndex(spark: SparkSession,
*/
def listFileSlices(partitionFilters: Seq[Expression]): Map[String, Seq[FileSlice]] = {
// Prune the partition path by the partition filters
val prunedPartitions = prunePartition(cachedAllInputFileSlices.keys.toSeq, partitionFilters)
val prunedPartitions = prunePartition(cachedAllInputFileSlices.asScala.keys.toSeq, partitionFilters)
prunedPartitions.map(partition => {
(partition.path, cachedAllInputFileSlices(partition))
(partition.path, cachedAllInputFileSlices.get(partition).asScala)
}).toMap
}
@@ -150,9 +153,7 @@ class SparkHoodieTableFileIndex(spark: SparkSession,
* @param predicates The filter condition.
* @return The Pruned partition paths.
*/
def prunePartition(partitionPaths: Seq[PartitionPath],
predicates: Seq[Expression]): Seq[PartitionPath] = {
def prunePartition(partitionPaths: Seq[PartitionPath], predicates: Seq[Expression]): Seq[PartitionPath] = {
val partitionColumnNames = partitionSchema.fields.map(_.name).toSet
val partitionPruningPredicates = predicates.filter {
_.references.map(_.name).toSet.subsetOf(partitionColumnNames)
@@ -167,8 +168,9 @@ class SparkHoodieTableFileIndex(spark: SparkSession,
})
val prunedPartitionPaths = partitionPaths.filter {
case PartitionPath(_, values) => boundPredicate.eval(InternalRow.fromSeq(values))
partitionPath => boundPredicate.eval(InternalRow.fromSeq(partitionPath.values))
}
logInfo(s"Total partition size is: ${partitionPaths.size}," +
s" after partition prune size is: ${prunedPartitionPaths.size}")
prunedPartitionPaths
@@ -177,7 +179,7 @@ class SparkHoodieTableFileIndex(spark: SparkSession,
}
}
protected def parsePartitionColumnValues(partitionColumns: Array[String], partitionPath: String): Array[Any] = {
protected def parsePartitionColumnValues(partitionColumns: Array[String], partitionPath: String): Array[Object] = {
if (partitionColumns.length == 0) {
// This is a non-partitioned table
Array.empty
@@ -225,7 +227,7 @@ class SparkHoodieTableFileIndex(spark: SparkSession,
val pathWithPartitionName = new Path(basePath, partitionWithName)
val partitionValues = parsePartitionPath(pathWithPartitionName, partitionSchema)
partitionValues.toArray
partitionValues.map(_.asInstanceOf[Object]).toArray
}
}
}
@@ -247,6 +249,13 @@ class SparkHoodieTableFileIndex(spark: SparkSession,
object SparkHoodieTableFileIndex {
implicit def toJavaOption[T](opt: Option[T]): org.apache.hudi.common.util.Option[T] =
if (opt.isDefined) {
org.apache.hudi.common.util.Option.of(opt.get)
} else {
org.apache.hudi.common.util.Option.empty()
}
/**
* This method unravels [[StructType]] into a [[Map]] of pairs of dot-path notation with corresponding
* [[StructField]] object for every field of the provided [[StructType]], recursively.
@@ -287,17 +296,17 @@ object SparkHoodieTableFileIndex {
}
private def deduceQueryType(configProperties: TypedProperties): HoodieTableQueryType = {
configProperties(QUERY_TYPE.key()) match {
case QUERY_TYPE_SNAPSHOT_OPT_VAL => HoodieTableQueryType.QUERY_TYPE_SNAPSHOT
case QUERY_TYPE_INCREMENTAL_OPT_VAL => HoodieTableQueryType.QUERY_TYPE_INCREMENTAL
case QUERY_TYPE_READ_OPTIMIZED_OPT_VAL => HoodieTableQueryType.QUERY_TYPE_READ_OPTIMIZED
configProperties.asScala(QUERY_TYPE.key()) match {
case QUERY_TYPE_SNAPSHOT_OPT_VAL => HoodieTableQueryType.SNAPSHOT
case QUERY_TYPE_INCREMENTAL_OPT_VAL => HoodieTableQueryType.INCREMENTAL
case QUERY_TYPE_READ_OPTIMIZED_OPT_VAL => HoodieTableQueryType.READ_OPTIMIZED
case _ @ qt => throw new IllegalArgumentException(s"query-type ($qt) not supported")
}
}
private def adapt(cache: FileStatusCache): FileStatusCacheTrait = {
new FileStatusCacheTrait {
override def get(path: Path): Option[Array[FileStatus]] = cache.getLeafFiles(path)
private def adapt(cache: FileStatusCache): BaseHoodieTableFileIndex.FileStatusCache = {
new BaseHoodieTableFileIndex.FileStatusCache {
override def get(path: Path): org.apache.hudi.common.util.Option[Array[FileStatus]] = toJavaOption(cache.getLeafFiles(path))
override def put(path: Path, leafFiles: Array[FileStatus]): Unit = cache.putLeafFiles(path, leafFiles)
override def invalidate(): Unit = cache.invalidateAll()
}