[HUDI-1487] fix unit test testCopyOnWriteStorage random failed (#2364)
This commit is contained in:
@@ -107,6 +107,7 @@ class TestCOWDataSource extends HoodieClientTestBase {
|
|||||||
.options(commonOpts)
|
.options(commonOpts)
|
||||||
.mode(SaveMode.Append)
|
.mode(SaveMode.Append)
|
||||||
.save(basePath)
|
.save(basePath)
|
||||||
|
val commitInstantTime2 = HoodieDataSourceHelpers.latestCommit(fs, basePath)
|
||||||
|
|
||||||
val snapshotDF2 = spark.read.format("hudi").load(basePath + "/*/*/*/*")
|
val snapshotDF2 = spark.read.format("hudi").load(basePath + "/*/*/*/*")
|
||||||
assertEquals(100, snapshotDF2.count())
|
assertEquals(100, snapshotDF2.count())
|
||||||
@@ -122,7 +123,7 @@ class TestCOWDataSource extends HoodieClientTestBase {
|
|||||||
.mode(SaveMode.Append)
|
.mode(SaveMode.Append)
|
||||||
.save(basePath)
|
.save(basePath)
|
||||||
|
|
||||||
val commitInstantTime2 = HoodieDataSourceHelpers.latestCommit(fs, basePath)
|
val commitInstantTime3 = HoodieDataSourceHelpers.latestCommit(fs, basePath)
|
||||||
assertEquals(3, HoodieDataSourceHelpers.listCommitsSince(fs, basePath, "000").size())
|
assertEquals(3, HoodieDataSourceHelpers.listCommitsSince(fs, basePath, "000").size())
|
||||||
|
|
||||||
// Snapshot Query
|
// Snapshot Query
|
||||||
@@ -154,18 +155,18 @@ class TestCOWDataSource extends HoodieClientTestBase {
|
|||||||
// pull the latest commit
|
// pull the latest commit
|
||||||
val hoodieIncViewDF2 = spark.read.format("org.apache.hudi")
|
val hoodieIncViewDF2 = spark.read.format("org.apache.hudi")
|
||||||
.option(DataSourceReadOptions.QUERY_TYPE_OPT_KEY, DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL)
|
.option(DataSourceReadOptions.QUERY_TYPE_OPT_KEY, DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL)
|
||||||
.option(DataSourceReadOptions.BEGIN_INSTANTTIME_OPT_KEY, commitInstantTime1)
|
.option(DataSourceReadOptions.BEGIN_INSTANTTIME_OPT_KEY, commitInstantTime2)
|
||||||
.load(basePath)
|
.load(basePath)
|
||||||
|
|
||||||
assertEquals(uniqueKeyCnt, hoodieIncViewDF2.count()) // 100 records must be pulled
|
assertEquals(uniqueKeyCnt, hoodieIncViewDF2.count()) // 100 records must be pulled
|
||||||
countsPerCommit = hoodieIncViewDF2.groupBy("_hoodie_commit_time").count().collect()
|
countsPerCommit = hoodieIncViewDF2.groupBy("_hoodie_commit_time").count().collect()
|
||||||
assertEquals(1, countsPerCommit.length)
|
assertEquals(1, countsPerCommit.length)
|
||||||
assertEquals(commitInstantTime2, countsPerCommit(0).get(0))
|
assertEquals(commitInstantTime3, countsPerCommit(0).get(0))
|
||||||
|
|
||||||
// pull the latest commit within certain partitions
|
// pull the latest commit within certain partitions
|
||||||
val hoodieIncViewDF3 = spark.read.format("org.apache.hudi")
|
val hoodieIncViewDF3 = spark.read.format("org.apache.hudi")
|
||||||
.option(DataSourceReadOptions.QUERY_TYPE_OPT_KEY, DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL)
|
.option(DataSourceReadOptions.QUERY_TYPE_OPT_KEY, DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL)
|
||||||
.option(DataSourceReadOptions.BEGIN_INSTANTTIME_OPT_KEY, commitInstantTime1)
|
.option(DataSourceReadOptions.BEGIN_INSTANTTIME_OPT_KEY, commitInstantTime2)
|
||||||
.option(DataSourceReadOptions.INCR_PATH_GLOB_OPT_KEY, "/2016/*/*/*")
|
.option(DataSourceReadOptions.INCR_PATH_GLOB_OPT_KEY, "/2016/*/*/*")
|
||||||
.load(basePath)
|
.load(basePath)
|
||||||
assertEquals(hoodieIncViewDF2.filter(col("_hoodie_partition_path").contains("2016")).count(), hoodieIncViewDF3.count())
|
assertEquals(hoodieIncViewDF2.filter(col("_hoodie_partition_path").contains("2016")).count(), hoodieIncViewDF3.count())
|
||||||
|
|||||||
Reference in New Issue
Block a user