[HUDI-597] Enable incremental pulling from defined partitions (#1348)
This commit is contained in:
@@ -102,7 +102,14 @@ object DataSourceReadOptions {
|
|||||||
* This option allows setting filters directly on Hoodie Source
|
* This option allows setting filters directly on Hoodie Source
|
||||||
*/
|
*/
|
||||||
val PUSH_DOWN_INCR_FILTERS_OPT_KEY = "hoodie.datasource.read.incr.filters"
|
val PUSH_DOWN_INCR_FILTERS_OPT_KEY = "hoodie.datasource.read.incr.filters"
|
||||||
val DEFAULTPUSH_DOWN_FILTERS_OPT_VAL = ""
|
val DEFAULT_PUSH_DOWN_FILTERS_OPT_VAL = ""
|
||||||
|
|
||||||
|
/**
|
||||||
|
* For the use-cases like users only want to incremental pull from certain partitions instead of the full table.
|
||||||
|
* This option allows using glob pattern to directly filter on path.
|
||||||
|
*/
|
||||||
|
val INCR_PATH_GLOB_OPT_KEY = "hoodie.datasource.read.incr.path.glob"
|
||||||
|
val DEFAULT_INCR_PATH_GLOB_OPT_VAL = ""
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|||||||
@@ -17,6 +17,7 @@
|
|||||||
|
|
||||||
package org.apache.hudi
|
package org.apache.hudi
|
||||||
|
|
||||||
|
import org.apache.hadoop.fs.GlobPattern
|
||||||
import org.apache.hadoop.fs.Path
|
import org.apache.hadoop.fs.Path
|
||||||
import org.apache.hudi.common.model.{HoodieCommitMetadata, HoodieRecord, HoodieTableType}
|
import org.apache.hudi.common.model.{HoodieCommitMetadata, HoodieRecord, HoodieTableType}
|
||||||
import org.apache.hudi.common.table.HoodieTableMetaClient
|
import org.apache.hudi.common.table.HoodieTableMetaClient
|
||||||
@@ -84,7 +85,9 @@ class IncrementalRelation(val sqlContext: SQLContext,
|
|||||||
|
|
||||||
val filters = {
|
val filters = {
|
||||||
if (optParams.contains(DataSourceReadOptions.PUSH_DOWN_INCR_FILTERS_OPT_KEY)) {
|
if (optParams.contains(DataSourceReadOptions.PUSH_DOWN_INCR_FILTERS_OPT_KEY)) {
|
||||||
val filterStr = optParams.get(DataSourceReadOptions.PUSH_DOWN_INCR_FILTERS_OPT_KEY).getOrElse("")
|
val filterStr = optParams.getOrElse(
|
||||||
|
DataSourceReadOptions.PUSH_DOWN_INCR_FILTERS_OPT_KEY,
|
||||||
|
DataSourceReadOptions.DEFAULT_PUSH_DOWN_FILTERS_OPT_VAL)
|
||||||
filterStr.split(",").filter(!_.isEmpty)
|
filterStr.split(",").filter(!_.isEmpty)
|
||||||
} else {
|
} else {
|
||||||
Array[String]()
|
Array[String]()
|
||||||
@@ -100,17 +103,26 @@ class IncrementalRelation(val sqlContext: SQLContext,
|
|||||||
.get, classOf[HoodieCommitMetadata])
|
.get, classOf[HoodieCommitMetadata])
|
||||||
fileIdToFullPath ++= metadata.getFileIdAndFullPaths(basePath).toMap
|
fileIdToFullPath ++= metadata.getFileIdAndFullPaths(basePath).toMap
|
||||||
}
|
}
|
||||||
|
val pathGlobPattern = optParams.getOrElse(
|
||||||
|
DataSourceReadOptions.INCR_PATH_GLOB_OPT_KEY,
|
||||||
|
DataSourceReadOptions.DEFAULT_INCR_PATH_GLOB_OPT_VAL)
|
||||||
|
val filteredFullPath = if(!pathGlobPattern.equals(DataSourceReadOptions.DEFAULT_INCR_PATH_GLOB_OPT_VAL)) {
|
||||||
|
val globMatcher = new GlobPattern("*" + pathGlobPattern)
|
||||||
|
fileIdToFullPath.filter(p => globMatcher.matches(p._2))
|
||||||
|
} else {
|
||||||
|
fileIdToFullPath
|
||||||
|
}
|
||||||
// unset the path filter, otherwise if end_instant_time is not the latest instant, path filter set for RO view
|
// unset the path filter, otherwise if end_instant_time is not the latest instant, path filter set for RO view
|
||||||
// will filter out all the files incorrectly.
|
// will filter out all the files incorrectly.
|
||||||
sqlContext.sparkContext.hadoopConfiguration.unset("mapreduce.input.pathFilter.class")
|
sqlContext.sparkContext.hadoopConfiguration.unset("mapreduce.input.pathFilter.class")
|
||||||
val sOpts = optParams.filter(p => !p._1.equalsIgnoreCase("path"))
|
val sOpts = optParams.filter(p => !p._1.equalsIgnoreCase("path"))
|
||||||
if (fileIdToFullPath.isEmpty) {
|
if (filteredFullPath.isEmpty) {
|
||||||
sqlContext.sparkContext.emptyRDD[Row]
|
sqlContext.sparkContext.emptyRDD[Row]
|
||||||
} else {
|
} else {
|
||||||
log.info("Additional Filters to be applied to incremental source are :" + filters)
|
log.info("Additional Filters to be applied to incremental source are :" + filters)
|
||||||
filters.foldLeft(sqlContext.read.options(sOpts)
|
filters.foldLeft(sqlContext.read.options(sOpts)
|
||||||
.schema(latestSchema)
|
.schema(latestSchema)
|
||||||
.parquet(fileIdToFullPath.values.toList: _*)
|
.parquet(filteredFullPath.values.toList: _*)
|
||||||
.filter(String.format("%s >= '%s'", HoodieRecord.COMMIT_TIME_METADATA_FIELD, commitsToReturn.head.getTimestamp))
|
.filter(String.format("%s >= '%s'", HoodieRecord.COMMIT_TIME_METADATA_FIELD, commitsToReturn.head.getTimestamp))
|
||||||
.filter(String.format("%s <= '%s'",
|
.filter(String.format("%s <= '%s'",
|
||||||
HoodieRecord.COMMIT_TIME_METADATA_FIELD, commitsToReturn.last.getTimestamp)))((e, f) => e.filter(f))
|
HoodieRecord.COMMIT_TIME_METADATA_FIELD, commitsToReturn.last.getTimestamp)))((e, f) => e.filter(f))
|
||||||
|
|||||||
@@ -22,6 +22,7 @@ import org.apache.hudi.config.HoodieWriteConfig
|
|||||||
import org.apache.hudi.{DataSourceReadOptions, DataSourceWriteOptions, HoodieDataSourceHelpers}
|
import org.apache.hudi.{DataSourceReadOptions, DataSourceWriteOptions, HoodieDataSourceHelpers}
|
||||||
import org.apache.spark.sql._
|
import org.apache.spark.sql._
|
||||||
import org.apache.spark.sql.streaming.{OutputMode, ProcessingTime}
|
import org.apache.spark.sql.streaming.{OutputMode, ProcessingTime}
|
||||||
|
import org.apache.spark.sql.functions.col
|
||||||
import org.junit.Assert._
|
import org.junit.Assert._
|
||||||
import org.junit.rules.TemporaryFolder
|
import org.junit.rules.TemporaryFolder
|
||||||
import org.junit.{Before, Test}
|
import org.junit.{Before, Test}
|
||||||
@@ -135,6 +136,14 @@ class TestDataSource extends AssertionsForJUnit {
|
|||||||
countsPerCommit = hoodieIncViewDF2.groupBy("_hoodie_commit_time").count().collect();
|
countsPerCommit = hoodieIncViewDF2.groupBy("_hoodie_commit_time").count().collect();
|
||||||
assertEquals(1, countsPerCommit.length)
|
assertEquals(1, countsPerCommit.length)
|
||||||
assertEquals(commitInstantTime2, countsPerCommit(0).get(0))
|
assertEquals(commitInstantTime2, countsPerCommit(0).get(0))
|
||||||
|
|
||||||
|
// pull the latest commit within certain partitions
|
||||||
|
val hoodieIncViewDF3 = spark.read.format("org.apache.hudi")
|
||||||
|
.option(DataSourceReadOptions.QUERY_TYPE_OPT_KEY, DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL)
|
||||||
|
.option(DataSourceReadOptions.BEGIN_INSTANTTIME_OPT_KEY, commitInstantTime1)
|
||||||
|
.option(DataSourceReadOptions.INCR_PATH_GLOB_OPT_KEY, "/2016/*/*/*")
|
||||||
|
.load(basePath);
|
||||||
|
assertEquals(hoodieIncViewDF2.filter(col("_hoodie_partition_path").contains("2016")).count(), hoodieIncViewDF3.count())
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test def testMergeOnReadStorage() {
|
@Test def testMergeOnReadStorage() {
|
||||||
|
|||||||
Reference in New Issue
Block a user