[HUDI-4170] Make user can use hoodie.datasource.read.paths to read necessary files (#5722)
* Rebase codes * Move listFileSlices to HoodieBaseRelation * Fix review * Fix style * Fix bug
This commit is contained in:
@@ -68,11 +68,9 @@ class DefaultSource extends RelationProvider
|
||||
override def createRelation(sqlContext: SQLContext,
|
||||
optParams: Map[String, String],
|
||||
schema: StructType): BaseRelation = {
|
||||
// Add default options for unspecified read options keys.
|
||||
val parameters = DataSourceOptionsHelper.parametersWithReadDefaults(optParams)
|
||||
val path = optParams.get("path")
|
||||
val readPathsStr = optParams.get(DataSourceReadOptions.READ_PATHS.key)
|
||||
|
||||
val path = parameters.get("path")
|
||||
val readPathsStr = parameters.get(DataSourceReadOptions.READ_PATHS.key)
|
||||
if (path.isEmpty && readPathsStr.isEmpty) {
|
||||
throw new HoodieException(s"'path' or '$READ_PATHS' or both must be specified.")
|
||||
}
|
||||
@@ -87,6 +85,16 @@ class DefaultSource extends RelationProvider
|
||||
} else {
|
||||
Seq.empty
|
||||
}
|
||||
|
||||
// Add default options for unspecified read options keys.
|
||||
val parameters = (if (globPaths.nonEmpty) {
|
||||
Map(
|
||||
"glob.paths" -> globPaths.mkString(",")
|
||||
)
|
||||
} else {
|
||||
Map()
|
||||
}) ++ DataSourceOptionsHelper.parametersWithReadDefaults(optParams)
|
||||
|
||||
// Get the table base path
|
||||
val tablePath = if (globPaths.nonEmpty) {
|
||||
DataSourceUtils.getTablePath(fs, globPaths.toArray)
|
||||
|
||||
@@ -26,9 +26,10 @@ import org.apache.hadoop.mapred.JobConf
|
||||
import org.apache.hudi.HoodieBaseRelation.{convertToAvroSchema, createHFileReader, generateUnsafeProjection, getPartitionPath}
|
||||
import org.apache.hudi.HoodieConversionUtils.toScalaOption
|
||||
import org.apache.hudi.avro.HoodieAvroUtils
|
||||
import org.apache.hudi.common.config.{HoodieMetadataConfig, SerializableConfiguration}
|
||||
import org.apache.hudi.common.config.HoodieMetadataConfig
|
||||
import org.apache.hudi.common.fs.FSUtils
|
||||
import org.apache.hudi.common.model.{HoodieFileFormat, HoodieRecord}
|
||||
import org.apache.hudi.common.fs.FSUtils.getRelativePartitionPath
|
||||
import org.apache.hudi.common.model.{FileSlice, HoodieFileFormat, HoodieRecord}
|
||||
import org.apache.hudi.common.table.timeline.{HoodieInstant, HoodieTimeline}
|
||||
import org.apache.hudi.common.table.view.HoodieTableFileSystemView
|
||||
import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient, TableSchemaResolver}
|
||||
@@ -44,7 +45,7 @@ import org.apache.spark.rdd.RDD
|
||||
import org.apache.spark.sql.catalyst.InternalRow
|
||||
import org.apache.spark.sql.catalyst.expressions.{Expression, SubqueryExpression}
|
||||
import org.apache.spark.sql.execution.FileRelation
|
||||
import org.apache.spark.sql.execution.datasources.{FileStatusCache, PartitionedFile, PartitioningUtils}
|
||||
import org.apache.spark.sql.execution.datasources.{FileStatusCache, PartitionDirectory, PartitionedFile, PartitioningUtils}
|
||||
import org.apache.spark.sql.hudi.HoodieSqlCommonUtils
|
||||
import org.apache.spark.sql.sources.{BaseRelation, Filter, PrunedFilteredScan}
|
||||
import org.apache.spark.sql.types.{StringType, StructField, StructType}
|
||||
@@ -222,7 +223,7 @@ abstract class HoodieBaseRelation(val sqlContext: SQLContext,
|
||||
toScalaOption(timeline.lastInstant())
|
||||
|
||||
protected def queryTimestamp: Option[String] =
|
||||
specifiedQueryTimestamp.orElse(toScalaOption(timeline.lastInstant()).map(_.getTimestamp))
|
||||
specifiedQueryTimestamp.orElse(latestInstant.map(_.getTimestamp))
|
||||
|
||||
/**
|
||||
* Returns true in case table supports Schema on Read (Schema Evolution)
|
||||
@@ -340,20 +341,49 @@ abstract class HoodieBaseRelation(val sqlContext: SQLContext,
|
||||
*/
|
||||
protected def collectFileSplits(partitionFilters: Seq[Expression], dataFilters: Seq[Expression]): Seq[FileSplit]
|
||||
|
||||
protected def listLatestBaseFiles(globbedPaths: Seq[Path], partitionFilters: Seq[Expression], dataFilters: Seq[Expression]): Map[Path, Seq[FileStatus]] = {
|
||||
val partitionDirs = if (globbedPaths.isEmpty) {
|
||||
/**
|
||||
* Get all PartitionDirectories based on globPaths if specified, otherwise use the table path.
|
||||
* Will perform pruning if necessary
|
||||
*/
|
||||
private def listPartitionDirectories(globPaths: Seq[Path], partitionFilters: Seq[Expression], dataFilters: Seq[Expression]): Seq[PartitionDirectory] = {
|
||||
if (globPaths.isEmpty) {
|
||||
fileIndex.listFiles(partitionFilters, dataFilters)
|
||||
} else {
|
||||
val inMemoryFileIndex = HoodieInMemoryFileIndex.create(sparkSession, globbedPaths)
|
||||
val inMemoryFileIndex = HoodieInMemoryFileIndex.create(sparkSession, globPaths)
|
||||
inMemoryFileIndex.listFiles(partitionFilters, dataFilters)
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Get all latest base files with partition paths, if globPaths is empty, will listing files
|
||||
* under the table path.
|
||||
*/
|
||||
protected def listLatestBaseFiles(globPaths: Seq[Path], partitionFilters: Seq[Expression], dataFilters: Seq[Expression]): Map[Path, Seq[FileStatus]] = {
|
||||
val partitionDirs = listPartitionDirectories(globPaths, partitionFilters, dataFilters)
|
||||
val fsView = new HoodieTableFileSystemView(metaClient, timeline, partitionDirs.flatMap(_.files).toArray)
|
||||
|
||||
val latestBaseFiles = fsView.getLatestBaseFiles.iterator().asScala.toList.map(_.getFileStatus)
|
||||
|
||||
latestBaseFiles.groupBy(getPartitionPath)
|
||||
}
|
||||
|
||||
/**
|
||||
* Get all fileSlices(contains base files and log files if exist) from globPaths if not empty,
|
||||
* otherwise will use the table path to do the listing.
|
||||
*/
|
||||
protected def listLatestFileSlices(globPaths: Seq[Path], partitionFilters: Seq[Expression], dataFilters: Seq[Expression]): Seq[FileSlice] = {
|
||||
latestInstant.map { _ =>
|
||||
val partitionDirs = listPartitionDirectories(globPaths, partitionFilters, dataFilters)
|
||||
val fsView = new HoodieTableFileSystemView(metaClient, timeline, partitionDirs.flatMap(_.files).toArray)
|
||||
|
||||
val queryTimestamp = this.queryTimestamp.get
|
||||
fsView.getPartitionPaths.asScala.flatMap { partitionPath =>
|
||||
val relativePath = getRelativePartitionPath(new Path(basePath), partitionPath)
|
||||
fsView.getLatestMergedFileSlicesBeforeOrOn(relativePath, queryTimestamp).iterator().asScala.toSeq
|
||||
}
|
||||
}.getOrElse(Seq())
|
||||
}
|
||||
|
||||
protected def convertToExpressions(filters: Array[Filter]): Array[Expression] = {
|
||||
val catalystExpressions = HoodieSparkUtils.convertToCatalystExpressions(filters, tableStructSchema)
|
||||
|
||||
|
||||
@@ -76,7 +76,7 @@ case class HoodieFileIndex(spark: SparkSession,
|
||||
metaClient = metaClient,
|
||||
schemaSpec = schemaSpec,
|
||||
configProperties = getConfigProperties(spark, options),
|
||||
queryPaths = Seq(HoodieFileIndex.getQueryPath(options)),
|
||||
queryPaths = HoodieFileIndex.getQueryPaths(options),
|
||||
specifiedQueryInstant = options.get(DataSourceReadOptions.TIME_TRAVEL_AS_OF_INSTANT.key).map(HoodieSqlCommonUtils.formatQueryInstant),
|
||||
fileStatusCache = fileStatusCache
|
||||
)
|
||||
@@ -341,10 +341,15 @@ object HoodieFileIndex extends Logging {
|
||||
}
|
||||
}
|
||||
|
||||
private def getQueryPath(options: Map[String, String]) = {
|
||||
new Path(options.get("path") match {
|
||||
case Some(p) => p
|
||||
case None => throw new IllegalArgumentException("'path' option required")
|
||||
})
|
||||
private def getQueryPaths(options: Map[String, String]): Seq[Path] = {
|
||||
options.get("path") match {
|
||||
case Some(p) => Seq(new Path(p))
|
||||
case None =>
|
||||
options.getOrElse("glob.paths",
|
||||
throw new IllegalArgumentException("'path' or 'glob paths' option required"))
|
||||
.split(",")
|
||||
.map(new Path(_))
|
||||
.toSeq
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -104,24 +104,8 @@ class MergeOnReadSnapshotRelation(sqlContext: SQLContext,
|
||||
val fileSlices = fileIndex.listFileSlices(convertedPartitionFilters)
|
||||
buildSplits(fileSlices.values.flatten.toSeq)
|
||||
} else {
|
||||
val inMemoryFileIndex = HoodieInMemoryFileIndex.create(sparkSession, globPaths)
|
||||
val partitionDirs = inMemoryFileIndex.listFiles(partitionFilters, dataFilters)
|
||||
|
||||
val fsView = new HoodieTableFileSystemView(metaClient, timeline, partitionDirs.flatMap(_.files).toArray)
|
||||
val partitionPaths = fsView.getPartitionPaths.asScala
|
||||
|
||||
if (partitionPaths.isEmpty || latestInstant.isEmpty) {
|
||||
// If this an empty table OR it has no completed commits yet, return
|
||||
List.empty[HoodieMergeOnReadFileSplit]
|
||||
} else {
|
||||
val queryTimestamp = this.queryTimestamp.get
|
||||
|
||||
val fileSlices = partitionPaths.flatMap { partitionPath =>
|
||||
val relativePath = getRelativePartitionPath(new Path(basePath), partitionPath)
|
||||
fsView.getLatestMergedFileSlicesBeforeOrOn(relativePath, queryTimestamp).iterator().asScala.toSeq
|
||||
}
|
||||
buildSplits(fileSlices)
|
||||
}
|
||||
val fileSlices = listLatestFileSlices(globPaths, partitionFilters, dataFilters)
|
||||
buildSplits(fileSlices)
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -17,7 +17,7 @@
|
||||
|
||||
package org.apache.hudi.functional
|
||||
|
||||
import org.apache.hadoop.fs.FileSystem
|
||||
import org.apache.hadoop.fs.{FileSystem, Path}
|
||||
import org.apache.hudi.HoodieConversionUtils.toJavaOption
|
||||
import org.apache.hudi.common.config.HoodieMetadataConfig
|
||||
import org.apache.hudi.common.model.HoodieRecord
|
||||
@@ -296,6 +296,39 @@ class TestCOWDataSource extends HoodieClientTestBase {
|
||||
assertEquals("replacecommit", commits(1))
|
||||
}
|
||||
|
||||
@Test
|
||||
def testReadPathsOnCopyOnWriteTable(): Unit = {
|
||||
val records1 = dataGen.generateInsertsContainsAllPartitions("001", 20)
|
||||
val inputDF1 = spark.read.json(spark.sparkContext.parallelize(recordsToStrings(records1), 2))
|
||||
inputDF1.write.format("org.apache.hudi")
|
||||
.options(commonOpts)
|
||||
.option(DataSourceWriteOptions.OPERATION.key, DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL)
|
||||
.mode(SaveMode.Append)
|
||||
.save(basePath)
|
||||
|
||||
val record1FilePaths = fs.listStatus(new Path(basePath, dataGen.getPartitionPaths.head))
|
||||
.filter(!_.getPath.getName.contains("hoodie_partition_metadata"))
|
||||
.filter(_.getPath.getName.endsWith("parquet"))
|
||||
.map(_.getPath.toString)
|
||||
.mkString(",")
|
||||
|
||||
val records2 = dataGen.generateInsertsContainsAllPartitions("002", 20)
|
||||
val inputDF2 = spark.read.json(spark.sparkContext.parallelize(recordsToStrings(records2), 2))
|
||||
inputDF2.write.format("org.apache.hudi")
|
||||
.options(commonOpts)
|
||||
// Use bulk insert here to make sure the files have different file groups.
|
||||
.option(DataSourceWriteOptions.OPERATION.key, DataSourceWriteOptions.BULK_INSERT_OPERATION_OPT_VAL)
|
||||
.mode(SaveMode.Append)
|
||||
.save(basePath)
|
||||
|
||||
val hudiReadPathDF = spark.read.format("org.apache.hudi")
|
||||
.option(DataSourceReadOptions.READ_PATHS.key, record1FilePaths)
|
||||
.load()
|
||||
|
||||
val expectedCount = records1.asScala.count(record => record.getPartitionPath == dataGen.getPartitionPaths.head)
|
||||
assertEquals(expectedCount, hudiReadPathDF.count())
|
||||
}
|
||||
|
||||
@Test def testOverWriteTableModeUseReplaceAction(): Unit = {
|
||||
val records1 = recordsToStrings(dataGen.generateInserts("001", 5)).toList
|
||||
val inputDF1 = spark.read.json(spark.sparkContext.parallelize(records1, 2))
|
||||
|
||||
@@ -704,6 +704,96 @@ class TestMORDataSource extends HoodieClientTestBase with SparkDatasetMixin {
|
||||
assertEquals(partitionCounts("2021/03/03"), count7)
|
||||
}
|
||||
|
||||
@Test
|
||||
def testReadPathsForMergeOnReadTable(): Unit = {
|
||||
// Paths only baseFiles
|
||||
val records1 = dataGen.generateInserts("001", 100)
|
||||
val inputDF1 = spark.read.json(spark.sparkContext.parallelize(recordsToStrings(records1), 2))
|
||||
inputDF1.write.format("org.apache.hudi")
|
||||
.options(commonOpts)
|
||||
.option("hoodie.compact.inline", "false") // else fails due to compaction & deltacommit instant times being same
|
||||
.option(DataSourceWriteOptions.OPERATION.key, DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL)
|
||||
.option(DataSourceWriteOptions.TABLE_TYPE.key, DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL)
|
||||
.mode(SaveMode.Overwrite)
|
||||
.save(basePath)
|
||||
assertTrue(HoodieDataSourceHelpers.hasNewCommits(fs, basePath, "000"))
|
||||
val baseFilePath = fs.listStatus(new Path(basePath, dataGen.getPartitionPaths.head))
|
||||
.filter(_.getPath.getName.endsWith("parquet"))
|
||||
.map(_.getPath.toString)
|
||||
.mkString(",")
|
||||
val records2 = dataGen.generateUniqueDeleteRecords("002", 100)
|
||||
val inputDF2: Dataset[Row] = spark.read.json(spark.sparkContext.parallelize(recordsToStrings(records2), 2))
|
||||
inputDF2.write.format("org.apache.hudi")
|
||||
.options(commonOpts)
|
||||
.mode(SaveMode.Append)
|
||||
.save(basePath)
|
||||
val hudiReadPathDF1 = spark.read.format("org.apache.hudi")
|
||||
.option(DataSourceReadOptions.QUERY_TYPE.key, DataSourceReadOptions.QUERY_TYPE_SNAPSHOT_OPT_VAL)
|
||||
.option(DataSourceReadOptions.READ_PATHS.key, baseFilePath)
|
||||
.load()
|
||||
|
||||
val expectedCount1 = records1.asScala.count(record => record.getPartitionPath == dataGen.getPartitionPaths.head)
|
||||
assertEquals(expectedCount1, hudiReadPathDF1.count())
|
||||
|
||||
// Paths Contains both baseFile and log files
|
||||
val logFilePath = fs.listStatus(new Path(basePath, dataGen.getPartitionPaths.head))
|
||||
.filter(_.getPath.getName.contains("log"))
|
||||
.map(_.getPath.toString)
|
||||
.mkString(",")
|
||||
|
||||
val readPaths = baseFilePath + "," + logFilePath
|
||||
val hudiReadPathDF2 = spark.read.format("org.apache.hudi")
|
||||
.option(DataSourceReadOptions.QUERY_TYPE.key, DataSourceReadOptions.QUERY_TYPE_SNAPSHOT_OPT_VAL)
|
||||
.option(DataSourceReadOptions.READ_PATHS.key, readPaths)
|
||||
.load()
|
||||
|
||||
assertEquals(0, hudiReadPathDF2.count())
|
||||
}
|
||||
|
||||
@Test
|
||||
def testReadPathsForOnlyLogFiles(): Unit = {
|
||||
initMetaClient(HoodieTableType.MERGE_ON_READ)
|
||||
val records1 = dataGen.generateInsertsContainsAllPartitions("000", 20)
|
||||
val inputDF1 = spark.read.json(spark.sparkContext.parallelize(recordsToStrings(records1), 2))
|
||||
inputDF1.write.format("hudi")
|
||||
.options(commonOpts)
|
||||
.option(DataSourceWriteOptions.OPERATION.key, DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL)
|
||||
.option(DataSourceWriteOptions.TABLE_TYPE.key, DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL)
|
||||
// Use InMemoryIndex to generate log only mor table.
|
||||
.option(HoodieIndexConfig.INDEX_TYPE.key, IndexType.INMEMORY.toString)
|
||||
.mode(SaveMode.Overwrite)
|
||||
.save(basePath)
|
||||
// There should no base file in the file list.
|
||||
assertTrue(DataSourceTestUtils.isLogFileOnly(basePath))
|
||||
|
||||
val logFilePath = fs.listStatus(new Path(basePath, dataGen.getPartitionPaths.head))
|
||||
.filter(_.getPath.getName.contains("log"))
|
||||
.map(_.getPath.toString)
|
||||
.mkString(",")
|
||||
|
||||
val records2 = dataGen.generateInsertsContainsAllPartitions("000", 20)
|
||||
val inputDF2 = spark.read.json(spark.sparkContext.parallelize(recordsToStrings(records2), 2))
|
||||
inputDF2.write.format("hudi")
|
||||
.options(commonOpts)
|
||||
.option(DataSourceWriteOptions.OPERATION.key, DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL)
|
||||
.option(DataSourceWriteOptions.TABLE_TYPE.key, DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL)
|
||||
// Use InMemoryIndex to generate log only mor table.
|
||||
.option(HoodieIndexConfig.INDEX_TYPE.key, IndexType.INMEMORY.toString)
|
||||
.mode(SaveMode.Append)
|
||||
.save(basePath)
|
||||
// There should no base file in the file list.
|
||||
assertTrue(DataSourceTestUtils.isLogFileOnly(basePath))
|
||||
|
||||
val expectedCount1 = records1.asScala.count(record => record.getPartitionPath == dataGen.getPartitionPaths.head)
|
||||
|
||||
val hudiReadPathDF = spark.read.format("org.apache.hudi")
|
||||
.option(DataSourceReadOptions.QUERY_TYPE.key, DataSourceReadOptions.QUERY_TYPE_SNAPSHOT_OPT_VAL)
|
||||
.option(DataSourceReadOptions.READ_PATHS.key, logFilePath)
|
||||
.load()
|
||||
|
||||
assertEquals(expectedCount1, hudiReadPathDF.count())
|
||||
}
|
||||
|
||||
@Test
|
||||
def testReadLogOnlyMergeOnReadTable(): Unit = {
|
||||
initMetaClient(HoodieTableType.MERGE_ON_READ)
|
||||
|
||||
Reference in New Issue
Block a user