diff --git a/hoodie-spark/src/main/scala/com/uber/hoodie/IncrementalRelation.scala b/hoodie-spark/src/main/scala/com/uber/hoodie/IncrementalRelation.scala index b0014d3b2..019a76e23 100644 --- a/hoodie-spark/src/main/scala/com/uber/hoodie/IncrementalRelation.scala +++ b/hoodie-spark/src/main/scala/com/uber/hoodie/IncrementalRelation.scala @@ -88,8 +88,12 @@ class IncrementalRelation(val sqlContext: SQLContext, .get, classOf[HoodieCommitMetadata]) fileIdToFullPath ++= metadata.getFileIdAndFullPaths(basePath).toMap } + // unset the path filter, otherwise if end_instant_time is not the latest instant, path filter set for RO view + // will filter out all the files incorrectly. + sqlContext.sparkContext.hadoopConfiguration.unset("mapreduce.input.pathFilter.class"); val sOpts = optParams.filter(p => !p._1.equalsIgnoreCase("path")) sqlContext.read.options(sOpts) + .schema(latestSchema) // avoid AnalysisException for empty input .parquet(fileIdToFullPath.values.toList: _*) .filter(String.format("%s >= '%s'", HoodieRecord.COMMIT_TIME_METADATA_FIELD, commitsToReturn.head.getTimestamp)) .filter(String.format("%s <= '%s'", HoodieRecord.COMMIT_TIME_METADATA_FIELD, commitsToReturn.last.getTimestamp)) diff --git a/hoodie-spark/src/test/scala/DataSourceTest.scala b/hoodie-spark/src/test/scala/DataSourceTest.scala index 821a7225a..f75192b35 100644 --- a/hoodie-spark/src/test/scala/DataSourceTest.scala +++ b/hoodie-spark/src/test/scala/DataSourceTest.scala @@ -98,14 +98,26 @@ class DataSourceTest extends AssertionsForJUnit { // Read Incremental View + // we have 2 commits, try pulling the first commit (which is not the latest) + val firstCommit = HoodieDataSourceHelpers.listCommitsSince(fs, basePath, "000").get(0); + val hoodieIncViewDF1 = spark.read.format("com.uber.hoodie") + .option(DataSourceReadOptions.VIEW_TYPE_OPT_KEY, DataSourceReadOptions.VIEW_TYPE_INCREMENTAL_OPT_VAL) + .option(DataSourceReadOptions.BEGIN_INSTANTTIME_OPT_KEY, "000") + .option(DataSourceReadOptions.END_INSTANTTIME_OPT_KEY, firstCommit) + .load(basePath); + assertEquals(100, hoodieIncViewDF1.count()) // 100 initial inserts must be pulled + var countsPerCommit = hoodieIncViewDF1.groupBy("_hoodie_commit_time").count().collect(); + assertEquals(1, countsPerCommit.length) + assertEquals(firstCommit, countsPerCommit(0).get(0)) + + // pull the latest commit val hoodieIncViewDF2 = spark.read.format("com.uber.hoodie") .option(DataSourceReadOptions.VIEW_TYPE_OPT_KEY, DataSourceReadOptions.VIEW_TYPE_INCREMENTAL_OPT_VAL) .option(DataSourceReadOptions.BEGIN_INSTANTTIME_OPT_KEY, commitInstantTime1) .load(basePath); - assertEquals(uniqueKeyCnt, hoodieIncViewDF2.count()) // 100 records must be pulled - val countsPerCommit = hoodieIncViewDF2.groupBy("_hoodie_commit_time").count().collect(); + countsPerCommit = hoodieIncViewDF2.groupBy("_hoodie_commit_time").count().collect(); assertEquals(1, countsPerCommit.length) assertEquals(commitInstantTime2, countsPerCommit(0).get(0)) }