Fix bug with incrementally pulling older data
This commit is contained in:
committed by
vinoth chandar
parent
bd5af89f12
commit
f44bcc5b03
@@ -88,8 +88,12 @@ class IncrementalRelation(val sqlContext: SQLContext,
|
|||||||
.get, classOf[HoodieCommitMetadata])
|
.get, classOf[HoodieCommitMetadata])
|
||||||
fileIdToFullPath ++= metadata.getFileIdAndFullPaths(basePath).toMap
|
fileIdToFullPath ++= metadata.getFileIdAndFullPaths(basePath).toMap
|
||||||
}
|
}
|
||||||
|
// unset the path filter, otherwise if end_instant_time is not the latest instant, path filter set for RO view
|
||||||
|
// will filter out all the files incorrectly.
|
||||||
|
sqlContext.sparkContext.hadoopConfiguration.unset("mapreduce.input.pathFilter.class");
|
||||||
val sOpts = optParams.filter(p => !p._1.equalsIgnoreCase("path"))
|
val sOpts = optParams.filter(p => !p._1.equalsIgnoreCase("path"))
|
||||||
sqlContext.read.options(sOpts)
|
sqlContext.read.options(sOpts)
|
||||||
|
.schema(latestSchema) // avoid AnalysisException for empty input
|
||||||
.parquet(fileIdToFullPath.values.toList: _*)
|
.parquet(fileIdToFullPath.values.toList: _*)
|
||||||
.filter(String.format("%s >= '%s'", HoodieRecord.COMMIT_TIME_METADATA_FIELD, commitsToReturn.head.getTimestamp))
|
.filter(String.format("%s >= '%s'", HoodieRecord.COMMIT_TIME_METADATA_FIELD, commitsToReturn.head.getTimestamp))
|
||||||
.filter(String.format("%s <= '%s'", HoodieRecord.COMMIT_TIME_METADATA_FIELD, commitsToReturn.last.getTimestamp))
|
.filter(String.format("%s <= '%s'", HoodieRecord.COMMIT_TIME_METADATA_FIELD, commitsToReturn.last.getTimestamp))
|
||||||
|
|||||||
@@ -98,14 +98,26 @@ class DataSourceTest extends AssertionsForJUnit {
|
|||||||
|
|
||||||
|
|
||||||
// Read Incremental View
|
// Read Incremental View
|
||||||
|
// we have 2 commits, try pulling the first commit (which is not the latest)
|
||||||
|
val firstCommit = HoodieDataSourceHelpers.listCommitsSince(fs, basePath, "000").get(0);
|
||||||
|
val hoodieIncViewDF1 = spark.read.format("com.uber.hoodie")
|
||||||
|
.option(DataSourceReadOptions.VIEW_TYPE_OPT_KEY, DataSourceReadOptions.VIEW_TYPE_INCREMENTAL_OPT_VAL)
|
||||||
|
.option(DataSourceReadOptions.BEGIN_INSTANTTIME_OPT_KEY, "000")
|
||||||
|
.option(DataSourceReadOptions.END_INSTANTTIME_OPT_KEY, firstCommit)
|
||||||
|
.load(basePath);
|
||||||
|
assertEquals(100, hoodieIncViewDF1.count()) // 100 initial inserts must be pulled
|
||||||
|
var countsPerCommit = hoodieIncViewDF1.groupBy("_hoodie_commit_time").count().collect();
|
||||||
|
assertEquals(1, countsPerCommit.length)
|
||||||
|
assertEquals(firstCommit, countsPerCommit(0).get(0))
|
||||||
|
|
||||||
|
// pull the latest commit
|
||||||
val hoodieIncViewDF2 = spark.read.format("com.uber.hoodie")
|
val hoodieIncViewDF2 = spark.read.format("com.uber.hoodie")
|
||||||
.option(DataSourceReadOptions.VIEW_TYPE_OPT_KEY, DataSourceReadOptions.VIEW_TYPE_INCREMENTAL_OPT_VAL)
|
.option(DataSourceReadOptions.VIEW_TYPE_OPT_KEY, DataSourceReadOptions.VIEW_TYPE_INCREMENTAL_OPT_VAL)
|
||||||
.option(DataSourceReadOptions.BEGIN_INSTANTTIME_OPT_KEY, commitInstantTime1)
|
.option(DataSourceReadOptions.BEGIN_INSTANTTIME_OPT_KEY, commitInstantTime1)
|
||||||
.load(basePath);
|
.load(basePath);
|
||||||
|
|
||||||
|
|
||||||
assertEquals(uniqueKeyCnt, hoodieIncViewDF2.count()) // 100 records must be pulled
|
assertEquals(uniqueKeyCnt, hoodieIncViewDF2.count()) // 100 records must be pulled
|
||||||
val countsPerCommit = hoodieIncViewDF2.groupBy("_hoodie_commit_time").count().collect();
|
countsPerCommit = hoodieIncViewDF2.groupBy("_hoodie_commit_time").count().collect();
|
||||||
assertEquals(1, countsPerCommit.length)
|
assertEquals(1, countsPerCommit.length)
|
||||||
assertEquals(commitInstantTime2, countsPerCommit(0).get(0))
|
assertEquals(commitInstantTime2, countsPerCommit(0).get(0))
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user