[HUDI-290] Normalize test class name of all test classes (#951)
This commit is contained in:
278
hudi-spark/src/test/scala/TestDataSource.scala
Normal file
278
hudi-spark/src/test/scala/TestDataSource.scala
Normal file
@@ -0,0 +1,278 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
import org.apache.hadoop.fs.{FileSystem, Path}
|
||||
import org.apache.hudi.common.HoodieTestDataGenerator
|
||||
import org.apache.hudi.common.util.FSUtils
|
||||
import org.apache.hudi.config.HoodieWriteConfig
|
||||
import org.apache.hudi.{DataSourceReadOptions, DataSourceWriteOptions, HoodieDataSourceHelpers}
|
||||
import org.apache.spark.sql._
|
||||
import org.apache.spark.sql.streaming.{OutputMode, ProcessingTime}
|
||||
import org.junit.Assert._
|
||||
import org.junit.rules.TemporaryFolder
|
||||
import org.junit.{Before, Test}
|
||||
import org.scalatest.junit.AssertionsForJUnit
|
||||
|
||||
import scala.collection.JavaConversions._
|
||||
import scala.concurrent.ExecutionContext.Implicits.global
|
||||
import scala.concurrent.duration.Duration
|
||||
import scala.concurrent.{Await, Future}
|
||||
|
||||
/**
|
||||
* Basic tests on the spark datasource
|
||||
*/
|
||||
class TestDataSource extends AssertionsForJUnit {
|
||||
|
||||
var spark: SparkSession = null
|
||||
var dataGen: HoodieTestDataGenerator = null
|
||||
val commonOpts = Map(
|
||||
"hoodie.insert.shuffle.parallelism" -> "4",
|
||||
"hoodie.upsert.shuffle.parallelism" -> "4",
|
||||
DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY -> "_row_key",
|
||||
DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY -> "partition",
|
||||
DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY -> "timestamp",
|
||||
HoodieWriteConfig.TABLE_NAME -> "hoodie_test"
|
||||
)
|
||||
var basePath: String = null
|
||||
var fs: FileSystem = null
|
||||
|
||||
@Before def initialize() {
|
||||
spark = SparkSession.builder
|
||||
.appName("Hoodie Datasource test")
|
||||
.master("local[2]")
|
||||
.config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
|
||||
.getOrCreate
|
||||
dataGen = new HoodieTestDataGenerator()
|
||||
val folder = new TemporaryFolder
|
||||
folder.create
|
||||
basePath = folder.getRoot.getAbsolutePath
|
||||
fs = FSUtils.getFs(basePath, spark.sparkContext.hadoopConfiguration)
|
||||
}
|
||||
|
||||
@Test def testCopyOnWriteStorage() {
|
||||
// Insert Operation
|
||||
val records1 = DataSourceTestUtils.convertToStringList(dataGen.generateInserts("000", 100)).toList
|
||||
val inputDF1: Dataset[Row] = spark.read.json(spark.sparkContext.parallelize(records1, 2))
|
||||
inputDF1.write.format("org.apache.hudi")
|
||||
.options(commonOpts)
|
||||
.option(DataSourceWriteOptions.OPERATION_OPT_KEY, DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL)
|
||||
.mode(SaveMode.Overwrite)
|
||||
.save(basePath)
|
||||
|
||||
assertTrue(HoodieDataSourceHelpers.hasNewCommits(fs, basePath, "000"))
|
||||
val commitInstantTime1: String = HoodieDataSourceHelpers.latestCommit(fs, basePath)
|
||||
|
||||
// Read RO View
|
||||
val hoodieROViewDF1 = spark.read.format("org.apache.hudi")
|
||||
.load(basePath + "/*/*/*/*");
|
||||
assertEquals(100, hoodieROViewDF1.count())
|
||||
|
||||
val records2 = DataSourceTestUtils.convertToStringList(dataGen.generateUpdates("001", 100)).toList
|
||||
val inputDF2: Dataset[Row] = spark.read.json(spark.sparkContext.parallelize(records2, 2))
|
||||
val uniqueKeyCnt = inputDF2.select("_row_key").distinct().count()
|
||||
|
||||
// Upsert Operation
|
||||
inputDF2.write.format("org.apache.hudi")
|
||||
.options(commonOpts)
|
||||
.mode(SaveMode.Append)
|
||||
.save(basePath)
|
||||
|
||||
val commitInstantTime2: String = HoodieDataSourceHelpers.latestCommit(fs, basePath)
|
||||
assertEquals(2, HoodieDataSourceHelpers.listCommitsSince(fs, basePath, "000").size())
|
||||
|
||||
// Read RO View
|
||||
val hoodieROViewDF2 = spark.read.format("org.apache.hudi")
|
||||
.load(basePath + "/*/*/*/*");
|
||||
assertEquals(100, hoodieROViewDF2.count()) // still 100, since we only updated
|
||||
|
||||
// Read Incremental View
|
||||
// we have 2 commits, try pulling the first commit (which is not the latest)
|
||||
val firstCommit = HoodieDataSourceHelpers.listCommitsSince(fs, basePath, "000").get(0);
|
||||
val hoodieIncViewDF1 = spark.read.format("org.apache.hudi")
|
||||
.option(DataSourceReadOptions.VIEW_TYPE_OPT_KEY, DataSourceReadOptions.VIEW_TYPE_INCREMENTAL_OPT_VAL)
|
||||
.option(DataSourceReadOptions.BEGIN_INSTANTTIME_OPT_KEY, "000")
|
||||
.option(DataSourceReadOptions.END_INSTANTTIME_OPT_KEY, firstCommit)
|
||||
.load(basePath);
|
||||
assertEquals(100, hoodieIncViewDF1.count()) // 100 initial inserts must be pulled
|
||||
var countsPerCommit = hoodieIncViewDF1.groupBy("_hoodie_commit_time").count().collect();
|
||||
assertEquals(1, countsPerCommit.length)
|
||||
assertEquals(firstCommit, countsPerCommit(0).get(0))
|
||||
|
||||
// pull the latest commit
|
||||
val hoodieIncViewDF2 = spark.read.format("org.apache.hudi")
|
||||
.option(DataSourceReadOptions.VIEW_TYPE_OPT_KEY, DataSourceReadOptions.VIEW_TYPE_INCREMENTAL_OPT_VAL)
|
||||
.option(DataSourceReadOptions.BEGIN_INSTANTTIME_OPT_KEY, commitInstantTime1)
|
||||
.load(basePath);
|
||||
|
||||
assertEquals(uniqueKeyCnt, hoodieIncViewDF2.count()) // 100 records must be pulled
|
||||
countsPerCommit = hoodieIncViewDF2.groupBy("_hoodie_commit_time").count().collect();
|
||||
assertEquals(1, countsPerCommit.length)
|
||||
assertEquals(commitInstantTime2, countsPerCommit(0).get(0))
|
||||
}
|
||||
|
||||
@Test def testMergeOnReadStorage() {
|
||||
// Bulk Insert Operation
|
||||
val records1 = DataSourceTestUtils.convertToStringList(dataGen.generateInserts("001", 100)).toList
|
||||
val inputDF1: Dataset[Row] = spark.read.json(spark.sparkContext.parallelize(records1, 2))
|
||||
inputDF1.write.format("org.apache.hudi")
|
||||
.options(commonOpts)
|
||||
.option("hoodie.compact.inline", "false") // else fails due to compaction & deltacommit instant times being same
|
||||
.option(DataSourceWriteOptions.OPERATION_OPT_KEY, DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL)
|
||||
.option(DataSourceWriteOptions.STORAGE_TYPE_OPT_KEY, DataSourceWriteOptions.MOR_STORAGE_TYPE_OPT_VAL)
|
||||
.mode(SaveMode.Overwrite)
|
||||
.save(basePath)
|
||||
|
||||
assertTrue(HoodieDataSourceHelpers.hasNewCommits(fs, basePath, "000"))
|
||||
|
||||
// Read RO View
|
||||
val hoodieROViewDF1 = spark.read.format("org.apache.hudi").load(basePath + "/*/*/*/*")
|
||||
assertEquals(100, hoodieROViewDF1.count()) // still 100, since we only updated
|
||||
}
|
||||
|
||||
@Test def testDropInsertDup(): Unit = {
|
||||
val insert1Cnt = 10
|
||||
val insert2DupKeyCnt = 9
|
||||
val insert2NewKeyCnt = 2
|
||||
|
||||
val totalUniqueKeyToGenerate = insert1Cnt + insert2NewKeyCnt
|
||||
val allRecords = dataGen.generateInserts("001", totalUniqueKeyToGenerate)
|
||||
val inserts1 = allRecords.subList(0, insert1Cnt)
|
||||
val inserts2New = dataGen.generateSameKeyInserts("002", allRecords.subList(insert1Cnt, insert1Cnt + insert2NewKeyCnt))
|
||||
val inserts2Dup = dataGen.generateSameKeyInserts("002", inserts1.subList(0, insert2DupKeyCnt))
|
||||
|
||||
val records1 = DataSourceTestUtils.convertToStringList(inserts1).toList
|
||||
val inputDF1: Dataset[Row] = spark.read.json(spark.sparkContext.parallelize(records1, 2))
|
||||
inputDF1.write.format("org.apache.hudi")
|
||||
.options(commonOpts)
|
||||
.option(DataSourceWriteOptions.OPERATION_OPT_KEY, DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL)
|
||||
.mode(SaveMode.Overwrite)
|
||||
.save(basePath)
|
||||
val hoodieROViewDF1 = spark.read.format("org.apache.hudi")
|
||||
.load(basePath + "/*/*/*/*")
|
||||
assertEquals(insert1Cnt, hoodieROViewDF1.count())
|
||||
|
||||
val commitInstantTime1 = HoodieDataSourceHelpers.latestCommit(fs, basePath)
|
||||
val records2 = DataSourceTestUtils
|
||||
.convertToStringList(inserts2Dup ++ inserts2New)
|
||||
.toList
|
||||
val inputDF2: Dataset[Row] = spark.read.json(spark.sparkContext.parallelize(records2, 2))
|
||||
inputDF2.write.format("org.apache.hudi")
|
||||
.options(commonOpts)
|
||||
.option(DataSourceWriteOptions.INSERT_DROP_DUPS_OPT_KEY, "true")
|
||||
.mode(SaveMode.Append)
|
||||
.save(basePath)
|
||||
val hoodieROViewDF2 = spark.read.format("org.apache.hudi")
|
||||
.load(basePath + "/*/*/*/*")
|
||||
assertEquals(hoodieROViewDF2.count(), totalUniqueKeyToGenerate)
|
||||
|
||||
val hoodieIncViewDF2 = spark.read.format("org.apache.hudi")
|
||||
.option(DataSourceReadOptions.VIEW_TYPE_OPT_KEY, DataSourceReadOptions.VIEW_TYPE_INCREMENTAL_OPT_VAL)
|
||||
.option(DataSourceReadOptions.BEGIN_INSTANTTIME_OPT_KEY, commitInstantTime1)
|
||||
.load(basePath)
|
||||
assertEquals(hoodieIncViewDF2.count(), insert2NewKeyCnt)
|
||||
}
|
||||
|
||||
//@Test (TODO: re-enable after fixing noisyness)
|
||||
def testStructuredStreaming(): Unit = {
|
||||
fs.delete(new Path(basePath), true)
|
||||
val sourcePath = basePath + "/source"
|
||||
val destPath = basePath + "/dest"
|
||||
fs.mkdirs(new Path(sourcePath))
|
||||
|
||||
// First chunk of data
|
||||
val records1 = DataSourceTestUtils.convertToStringList(dataGen.generateInserts("000", 100)).toList
|
||||
val inputDF1: Dataset[Row] = spark.read.json(spark.sparkContext.parallelize(records1, 2))
|
||||
|
||||
// Second chunk of data
|
||||
val records2 = DataSourceTestUtils.convertToStringList(dataGen.generateUpdates("001", 100)).toList
|
||||
val inputDF2: Dataset[Row] = spark.read.json(spark.sparkContext.parallelize(records2, 2))
|
||||
val uniqueKeyCnt = inputDF2.select("_row_key").distinct().count()
|
||||
|
||||
// define the source of streaming
|
||||
val streamingInput =
|
||||
spark.readStream
|
||||
.schema(inputDF1.schema)
|
||||
.json(sourcePath)
|
||||
|
||||
val f1 = Future {
|
||||
println("streaming starting")
|
||||
//'writeStream' can be called only on streaming Dataset/DataFrame
|
||||
streamingInput
|
||||
.writeStream
|
||||
.format("org.apache.hudi")
|
||||
.options(commonOpts)
|
||||
.trigger(new ProcessingTime(100))
|
||||
.option("checkpointLocation", basePath + "/checkpoint")
|
||||
.outputMode(OutputMode.Append)
|
||||
.start(destPath)
|
||||
.awaitTermination(10000)
|
||||
println("streaming ends")
|
||||
}
|
||||
|
||||
val f2 = Future {
|
||||
inputDF1.write.mode(SaveMode.Append).json(sourcePath)
|
||||
// wait for spark streaming to process one microbatch
|
||||
Thread.sleep(3000)
|
||||
assertTrue(HoodieDataSourceHelpers.hasNewCommits(fs, destPath, "000"))
|
||||
val commitInstantTime1: String = HoodieDataSourceHelpers.latestCommit(fs, destPath)
|
||||
// Read RO View
|
||||
val hoodieROViewDF1 = spark.read.format("org.apache.hudi")
|
||||
.load(destPath + "/*/*/*/*")
|
||||
assert(hoodieROViewDF1.count() == 100)
|
||||
|
||||
inputDF2.write.mode(SaveMode.Append).json(sourcePath)
|
||||
// wait for spark streaming to process one microbatch
|
||||
Thread.sleep(10000)
|
||||
val commitInstantTime2: String = HoodieDataSourceHelpers.latestCommit(fs, destPath)
|
||||
|
||||
assertEquals(2, HoodieDataSourceHelpers.listCommitsSince(fs, destPath, "000").size())
|
||||
// Read RO View
|
||||
val hoodieROViewDF2 = spark.read.format("org.apache.hudi")
|
||||
.load(destPath + "/*/*/*/*")
|
||||
assertEquals(100, hoodieROViewDF2.count()) // still 100, since we only updated
|
||||
|
||||
|
||||
// Read Incremental View
|
||||
// we have 2 commits, try pulling the first commit (which is not the latest)
|
||||
val firstCommit = HoodieDataSourceHelpers.listCommitsSince(fs, destPath, "000").get(0)
|
||||
val hoodieIncViewDF1 = spark.read.format("org.apache.hudi")
|
||||
.option(DataSourceReadOptions.VIEW_TYPE_OPT_KEY, DataSourceReadOptions.VIEW_TYPE_INCREMENTAL_OPT_VAL)
|
||||
.option(DataSourceReadOptions.BEGIN_INSTANTTIME_OPT_KEY, "000")
|
||||
.option(DataSourceReadOptions.END_INSTANTTIME_OPT_KEY, firstCommit)
|
||||
.load(destPath)
|
||||
assertEquals(100, hoodieIncViewDF1.count())
|
||||
// 100 initial inserts must be pulled
|
||||
var countsPerCommit = hoodieIncViewDF1.groupBy("_hoodie_commit_time").count().collect()
|
||||
assertEquals(1, countsPerCommit.length)
|
||||
assertEquals(firstCommit, countsPerCommit(0).get(0))
|
||||
|
||||
// pull the latest commit
|
||||
val hoodieIncViewDF2 = spark.read.format("org.apache.hudi")
|
||||
.option(DataSourceReadOptions.VIEW_TYPE_OPT_KEY, DataSourceReadOptions.VIEW_TYPE_INCREMENTAL_OPT_VAL)
|
||||
.option(DataSourceReadOptions.BEGIN_INSTANTTIME_OPT_KEY, commitInstantTime1)
|
||||
.load(destPath)
|
||||
|
||||
assertEquals(uniqueKeyCnt, hoodieIncViewDF2.count()) // 100 records must be pulled
|
||||
countsPerCommit = hoodieIncViewDF2.groupBy("_hoodie_commit_time").count().collect()
|
||||
assertEquals(1, countsPerCommit.length)
|
||||
assertEquals(commitInstantTime2, countsPerCommit(0).get(0))
|
||||
}
|
||||
|
||||
Await.result(Future.sequence(Seq(f1, f2)), Duration.Inf)
|
||||
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user