1
0

[HUDI-1481] add structured streaming and delta streamer clustering unit test (#2360)

This commit is contained in:
lw0090
2020-12-28 12:27:09 +08:00
committed by GitHub
parent 8cf6a7223f
commit 9e6889a8ce
3 changed files with 188 additions and 40 deletions

View File

@@ -115,7 +115,7 @@ class TestCOWDataSource extends HoodieClientTestBase {
// Upsert Operation without Hudi metadata columns
val records2 = recordsToStrings(dataGen.generateUpdates("001", 100)).toList
val inputDF2 = spark.read.json(spark.sparkContext.parallelize(records2, 2))
val inputDF2 = spark.read.json(spark.sparkContext.parallelize(records2 , 2))
val uniqueKeyCnt = inputDF2.select("_row_key").distinct().count()
inputDF2.write.format("org.apache.hudi")

View File

@@ -18,16 +18,20 @@
package org.apache.hudi.functional
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hudi.common.model.FileSlice
import org.apache.hudi.common.table.HoodieTableMetaClient
import org.apache.hudi.common.testutils.{HoodieTestDataGenerator, HoodieTestTable}
import org.apache.hudi.common.testutils.RawTripTestPayload.recordsToStrings
import org.apache.hudi.config.HoodieWriteConfig
import org.apache.hudi.config.{HoodieClusteringConfig, HoodieStorageConfig, HoodieWriteConfig}
import org.apache.hudi.exception.TableNotFoundException
import org.apache.hudi.testutils.HoodieClientTestBase
import org.apache.hudi.{DataSourceReadOptions, DataSourceWriteOptions, HoodieDataSourceHelpers}
import org.apache.log4j.LogManager
import org.apache.spark.sql._
import org.apache.spark.sql.streaming.{OutputMode, Trigger}
import org.junit.jupiter.api.Assertions.{assertEquals, assertTrue}
import org.apache.spark.sql.types.StructType
import org.junit.jupiter.api.Assertions.{assertEquals, assertThrows, assertTrue}
import org.junit.jupiter.api.function.Executable
import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
import scala.collection.JavaConversions._
@@ -64,13 +68,39 @@ class TestStructuredStreaming extends HoodieClientTestBase {
cleanupFileSystem()
}
def initStreamingWriteFuture(schema: StructType, sourcePath: String, destPath: String, hudiOptions: Map[String, String]): Future[Unit] = {
// define the source of streaming
val streamingInput =
spark.readStream
.schema(schema)
.json(sourcePath)
Future {
println("streaming starting")
//'writeStream' can be called only on streaming Dataset/DataFrame
streamingInput
.writeStream
.format("org.apache.hudi")
.options(hudiOptions)
.trigger(Trigger.ProcessingTime(100))
.option("checkpointLocation", basePath + "/checkpoint")
.outputMode(OutputMode.Append)
.start(destPath)
.awaitTermination(10000)
println("streaming ends")
}
}
def initStreamingSourceAndDestPath(sourceDirName: String, destDirName: String): (String, String) = {
fs.delete(new Path(basePath), true)
val sourcePath = basePath + "/" + sourceDirName
val destPath = basePath + "/" + destDirName
fs.mkdirs(new Path(sourcePath))
(sourcePath, destPath)
}
@Test
def testStructuredStreaming(): Unit = {
fs.delete(new Path(basePath), true)
val sourcePath = basePath + "/source"
val destPath = basePath + "/dest"
fs.mkdirs(new Path(sourcePath))
val (sourcePath, destPath) = initStreamingSourceAndDestPath("source", "dest")
// First chunk of data
val records1 = recordsToStrings(dataGen.generateInserts("000", 100)).toList
val inputDF1 = spark.read.json(spark.sparkContext.parallelize(records1, 2))
@@ -80,26 +110,7 @@ class TestStructuredStreaming extends HoodieClientTestBase {
val inputDF2 = spark.read.json(spark.sparkContext.parallelize(records2, 2))
val uniqueKeyCnt = inputDF2.select("_row_key").distinct().count()
// define the source of streaming
val streamingInput =
spark.readStream
.schema(inputDF1.schema)
.json(sourcePath)
val f1 = Future {
println("streaming starting")
//'writeStream' can be called only on streaming Dataset/DataFrame
streamingInput
.writeStream
.format("org.apache.hudi")
.options(commonOpts)
.trigger(Trigger.ProcessingTime(100))
.option("checkpointLocation", basePath + "/checkpoint")
.outputMode(OutputMode.Append)
.start(destPath)
.awaitTermination(10000)
println("streaming ends")
}
val f1 = initStreamingWriteFuture(inputDF1.schema, sourcePath, destPath, commonOpts)
val f2 = Future {
inputDF1.coalesce(1).write.mode(SaveMode.Append).json(sourcePath)
@@ -113,7 +124,7 @@ class TestStructuredStreaming extends HoodieClientTestBase {
assert(hoodieROViewDF1.count() == 100)
inputDF2.coalesce(1).write.mode(SaveMode.Append).json(sourcePath)
// wait for spark streaming to process one microbatch
// wait for spark streaming to process second microbatch
waitTillAtleastNCommits(fs, destPath, currNumCommits + 1, 120, 5)
val commitInstantTime2 = HoodieDataSourceHelpers.latestCommit(fs, destPath)
assertEquals(2, HoodieDataSourceHelpers.listCommitsSince(fs, destPath, "000").size())
@@ -177,4 +188,112 @@ class TestStructuredStreaming extends HoodieClientTestBase {
if (!success) throw new IllegalStateException("Timed-out waiting for " + numCommits + " commits to appear in " + tablePath)
numInstants
}
def getInlineClusteringOpts( isInlineClustering: String, clusteringNumCommit: String, fileMaxRecordNum: Int):Map[String, String] = {
commonOpts + (HoodieClusteringConfig.INLINE_CLUSTERING_PROP -> isInlineClustering,
HoodieClusteringConfig.INLINE_CLUSTERING_MAX_COMMIT_PROP -> clusteringNumCommit,
HoodieStorageConfig.PARQUET_FILE_MAX_BYTES -> dataGen.getEstimatedFileSizeInBytes(fileMaxRecordNum).toString
)
}
@Test
def testStructuredStreamingWithInlineClustering(): Unit = {
val (sourcePath, destPath) = initStreamingSourceAndDestPath("source", "dest")
def checkClusteringResult(destPath: String):Unit = {
// check have schedule clustering and clustering file group to one
waitTillHasCompletedReplaceInstant(destPath, 120, 5)
metaClient.reloadActiveTimeline()
assertEquals(1, getLatestFileGroupsFileId.size)
}
structuredStreamingForTestClusteringRunner(sourcePath, destPath, true, checkClusteringResult)
}
@Test
def testStructuredStreamingWithoutInlineClustering(): Unit = {
val (sourcePath, destPath) = initStreamingSourceAndDestPath("source", "dest")
def checkClusteringResult(destPath: String):Unit = {
val msg = "Should have replace commit completed"
assertThrows(classOf[IllegalStateException], new Executable {
override def execute(): Unit = {
waitTillHasCompletedReplaceInstant(destPath, 120, 5)
}
}
, "Should have replace commit completed")
println(msg)
}
structuredStreamingForTestClusteringRunner(sourcePath, destPath, false, checkClusteringResult)
}
def structuredStreamingForTestClusteringRunner(sourcePath: String, destPath: String,
isInlineClustering: Boolean, checkClusteringResult: String => Unit): Unit = {
// First insert of data
val records1 = recordsToStrings(dataGen.generateInsertsForPartition("000", 100, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH)).toList
val inputDF1 = spark.read.json(spark.sparkContext.parallelize(records1, 2))
// Second insert of data
val records2 = recordsToStrings(dataGen.generateInsertsForPartition("001", 100, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH)).toList
val inputDF2 = spark.read.json(spark.sparkContext.parallelize(records2, 2))
val hudiOptions = getInlineClusteringOpts(isInlineClustering.toString, "2", 100)
val f1 = initStreamingWriteFuture(inputDF1.schema, sourcePath, destPath, hudiOptions)
val f2 = Future {
inputDF1.coalesce(1).write.mode(SaveMode.Append).json(sourcePath)
// wait for spark streaming to process one microbatch
val currNumCommits = waitTillAtleastNCommits(fs, destPath, 1, 120, 5)
assertTrue(HoodieDataSourceHelpers.hasNewCommits(fs, destPath, "000"))
inputDF2.coalesce(1).write.mode(SaveMode.Append).json(sourcePath)
// wait for spark streaming to process second microbatch
waitTillAtleastNCommits(fs, destPath, currNumCommits + 1, 120, 5)
assertEquals(2, HoodieDataSourceHelpers.listCommitsSince(fs, destPath, "000").size())
// check have more than one file group
this.metaClient = new HoodieTableMetaClient(fs.getConf, destPath, true)
assertTrue(getLatestFileGroupsFileId().size > 1)
// check clustering result
checkClusteringResult(destPath)
// check data correct after clustering
val hoodieROViewDF2 = spark.read.format("org.apache.hudi")
.load(destPath + "/*/*/*/*")
assertEquals(200, hoodieROViewDF2.count())
}
Await.result(Future.sequence(Seq(f1, f2)), Duration.Inf)
}
private def getLatestFileGroupsFileId():Array[String] = {
getHoodieTableFileSystemView(metaClient, metaClient.getActiveTimeline,
HoodieTestTable.of(metaClient).listAllBaseFiles())
tableView.getLatestFileSlices(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH)
.toArray().map(slice => slice.asInstanceOf[FileSlice].getFileGroupId.getFileId)
}
@throws[InterruptedException]
private def waitTillHasCompletedReplaceInstant(tablePath: String,
timeoutSecs: Int, sleepSecsAfterEachRun: Int) = {
val beginTime = System.currentTimeMillis
var currTime = beginTime
val timeoutMsecs = timeoutSecs * 1000
var success = false
while ({!success && (currTime - beginTime) < timeoutMsecs}) try {
this.metaClient.reloadActiveTimeline()
val completeReplaceSize = this.metaClient.getActiveTimeline.getCompletedReplaceTimeline().getInstants.toArray.size
println("completeReplaceSize:" + completeReplaceSize)
if(completeReplaceSize > 0) {
success = true
}
} catch {
case te: TableNotFoundException =>
log.info("Got table not found exception. Retrying")
} finally {
Thread.sleep(sleepSecsAfterEachRun * 1000)
currTime = System.currentTimeMillis
}
if (!success) throw new IllegalStateException("Timed-out waiting for " + " have completed replace instant appear in " + tablePath)
}
}