From 9e6889a8ce522ec455a11d6f6cf4949255c91a0f Mon Sep 17 00:00:00 2001 From: lw0090 Date: Mon, 28 Dec 2020 12:27:09 +0800 Subject: [PATCH] [HUDI-1481] add structured streaming and delta streamer clustering unit test (#2360) --- .../hudi/functional/TestCOWDataSource.scala | 2 +- .../functional/TestStructuredStreaming.scala | 175 +++++++++++++++--- .../functional/TestHoodieDeltaStreamer.java | 51 +++-- 3 files changed, 188 insertions(+), 40 deletions(-) diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala index 0386f2068..51ca72e04 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala @@ -115,7 +115,7 @@ class TestCOWDataSource extends HoodieClientTestBase { // Upsert Operation without Hudi metadata columns val records2 = recordsToStrings(dataGen.generateUpdates("001", 100)).toList - val inputDF2 = spark.read.json(spark.sparkContext.parallelize(records2, 2)) + val inputDF2 = spark.read.json(spark.sparkContext.parallelize(records2 , 2)) val uniqueKeyCnt = inputDF2.select("_row_key").distinct().count() inputDF2.write.format("org.apache.hudi") diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestStructuredStreaming.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestStructuredStreaming.scala index 7a902c14b..b07f00f61 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestStructuredStreaming.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestStructuredStreaming.scala @@ -18,16 +18,20 @@ package org.apache.hudi.functional import org.apache.hadoop.fs.{FileSystem, Path} +import org.apache.hudi.common.model.FileSlice import org.apache.hudi.common.table.HoodieTableMetaClient +import org.apache.hudi.common.testutils.{HoodieTestDataGenerator, HoodieTestTable} import org.apache.hudi.common.testutils.RawTripTestPayload.recordsToStrings -import org.apache.hudi.config.HoodieWriteConfig +import org.apache.hudi.config.{HoodieClusteringConfig, HoodieStorageConfig, HoodieWriteConfig} import org.apache.hudi.exception.TableNotFoundException import org.apache.hudi.testutils.HoodieClientTestBase import org.apache.hudi.{DataSourceReadOptions, DataSourceWriteOptions, HoodieDataSourceHelpers} import org.apache.log4j.LogManager import org.apache.spark.sql._ import org.apache.spark.sql.streaming.{OutputMode, Trigger} -import org.junit.jupiter.api.Assertions.{assertEquals, assertTrue} +import org.apache.spark.sql.types.StructType +import org.junit.jupiter.api.Assertions.{assertEquals, assertThrows, assertTrue} +import org.junit.jupiter.api.function.Executable import org.junit.jupiter.api.{AfterEach, BeforeEach, Test} import scala.collection.JavaConversions._ @@ -64,13 +68,39 @@ class TestStructuredStreaming extends HoodieClientTestBase { cleanupFileSystem() } + def initStreamingWriteFuture(schema: StructType, sourcePath: String, destPath: String, hudiOptions: Map[String, String]): Future[Unit] = { + // define the source of streaming + val streamingInput = + spark.readStream + .schema(schema) + .json(sourcePath) + Future { + println("streaming starting") + //'writeStream' can be called only on streaming Dataset/DataFrame + streamingInput + .writeStream + .format("org.apache.hudi") + .options(hudiOptions) + .trigger(Trigger.ProcessingTime(100)) + .option("checkpointLocation", basePath + "/checkpoint") + .outputMode(OutputMode.Append) + .start(destPath) + .awaitTermination(10000) + println("streaming ends") + } + } + + def initStreamingSourceAndDestPath(sourceDirName: String, destDirName: String): (String, String) = { + fs.delete(new Path(basePath), true) + val sourcePath = basePath + "/" + sourceDirName + val destPath = basePath + "/" + destDirName + fs.mkdirs(new Path(sourcePath)) + (sourcePath, destPath) + } + @Test def testStructuredStreaming(): Unit = { - fs.delete(new Path(basePath), true) - val sourcePath = basePath + "/source" - val destPath = basePath + "/dest" - fs.mkdirs(new Path(sourcePath)) - + val (sourcePath, destPath) = initStreamingSourceAndDestPath("source", "dest") // First chunk of data val records1 = recordsToStrings(dataGen.generateInserts("000", 100)).toList val inputDF1 = spark.read.json(spark.sparkContext.parallelize(records1, 2)) @@ -80,26 +110,7 @@ class TestStructuredStreaming extends HoodieClientTestBase { val inputDF2 = spark.read.json(spark.sparkContext.parallelize(records2, 2)) val uniqueKeyCnt = inputDF2.select("_row_key").distinct().count() - // define the source of streaming - val streamingInput = - spark.readStream - .schema(inputDF1.schema) - .json(sourcePath) - - val f1 = Future { - println("streaming starting") - //'writeStream' can be called only on streaming Dataset/DataFrame - streamingInput - .writeStream - .format("org.apache.hudi") - .options(commonOpts) - .trigger(Trigger.ProcessingTime(100)) - .option("checkpointLocation", basePath + "/checkpoint") - .outputMode(OutputMode.Append) - .start(destPath) - .awaitTermination(10000) - println("streaming ends") - } + val f1 = initStreamingWriteFuture(inputDF1.schema, sourcePath, destPath, commonOpts) val f2 = Future { inputDF1.coalesce(1).write.mode(SaveMode.Append).json(sourcePath) @@ -113,7 +124,7 @@ class TestStructuredStreaming extends HoodieClientTestBase { assert(hoodieROViewDF1.count() == 100) inputDF2.coalesce(1).write.mode(SaveMode.Append).json(sourcePath) - // wait for spark streaming to process one microbatch + // wait for spark streaming to process second microbatch waitTillAtleastNCommits(fs, destPath, currNumCommits + 1, 120, 5) val commitInstantTime2 = HoodieDataSourceHelpers.latestCommit(fs, destPath) assertEquals(2, HoodieDataSourceHelpers.listCommitsSince(fs, destPath, "000").size()) @@ -177,4 +188,112 @@ class TestStructuredStreaming extends HoodieClientTestBase { if (!success) throw new IllegalStateException("Timed-out waiting for " + numCommits + " commits to appear in " + tablePath) numInstants } + + def getInlineClusteringOpts( isInlineClustering: String, clusteringNumCommit: String, fileMaxRecordNum: Int):Map[String, String] = { + commonOpts + (HoodieClusteringConfig.INLINE_CLUSTERING_PROP -> isInlineClustering, + HoodieClusteringConfig.INLINE_CLUSTERING_MAX_COMMIT_PROP -> clusteringNumCommit, + HoodieStorageConfig.PARQUET_FILE_MAX_BYTES -> dataGen.getEstimatedFileSizeInBytes(fileMaxRecordNum).toString + ) + } + + @Test + def testStructuredStreamingWithInlineClustering(): Unit = { + val (sourcePath, destPath) = initStreamingSourceAndDestPath("source", "dest") + + def checkClusteringResult(destPath: String):Unit = { + // check have schedule clustering and clustering file group to one + waitTillHasCompletedReplaceInstant(destPath, 120, 5) + metaClient.reloadActiveTimeline() + assertEquals(1, getLatestFileGroupsFileId.size) + } + structuredStreamingForTestClusteringRunner(sourcePath, destPath, true, checkClusteringResult) + } + + @Test + def testStructuredStreamingWithoutInlineClustering(): Unit = { + val (sourcePath, destPath) = initStreamingSourceAndDestPath("source", "dest") + + def checkClusteringResult(destPath: String):Unit = { + val msg = "Should have replace commit completed" + assertThrows(classOf[IllegalStateException], new Executable { + override def execute(): Unit = { + waitTillHasCompletedReplaceInstant(destPath, 120, 5) + } + } + , "Should have replace commit completed") + println(msg) + } + structuredStreamingForTestClusteringRunner(sourcePath, destPath, false, checkClusteringResult) + } + + def structuredStreamingForTestClusteringRunner(sourcePath: String, destPath: String, + isInlineClustering: Boolean, checkClusteringResult: String => Unit): Unit = { + // First insert of data + val records1 = recordsToStrings(dataGen.generateInsertsForPartition("000", 100, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH)).toList + val inputDF1 = spark.read.json(spark.sparkContext.parallelize(records1, 2)) + + // Second insert of data + val records2 = recordsToStrings(dataGen.generateInsertsForPartition("001", 100, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH)).toList + val inputDF2 = spark.read.json(spark.sparkContext.parallelize(records2, 2)) + + val hudiOptions = getInlineClusteringOpts(isInlineClustering.toString, "2", 100) + val f1 = initStreamingWriteFuture(inputDF1.schema, sourcePath, destPath, hudiOptions) + + val f2 = Future { + inputDF1.coalesce(1).write.mode(SaveMode.Append).json(sourcePath) + // wait for spark streaming to process one microbatch + val currNumCommits = waitTillAtleastNCommits(fs, destPath, 1, 120, 5) + assertTrue(HoodieDataSourceHelpers.hasNewCommits(fs, destPath, "000")) + + inputDF2.coalesce(1).write.mode(SaveMode.Append).json(sourcePath) + // wait for spark streaming to process second microbatch + waitTillAtleastNCommits(fs, destPath, currNumCommits + 1, 120, 5) + assertEquals(2, HoodieDataSourceHelpers.listCommitsSince(fs, destPath, "000").size()) + + // check have more than one file group + this.metaClient = new HoodieTableMetaClient(fs.getConf, destPath, true) + assertTrue(getLatestFileGroupsFileId().size > 1) + + // check clustering result + checkClusteringResult(destPath) + + // check data correct after clustering + val hoodieROViewDF2 = spark.read.format("org.apache.hudi") + .load(destPath + "/*/*/*/*") + assertEquals(200, hoodieROViewDF2.count()) + } + Await.result(Future.sequence(Seq(f1, f2)), Duration.Inf) + } + + private def getLatestFileGroupsFileId():Array[String] = { + getHoodieTableFileSystemView(metaClient, metaClient.getActiveTimeline, + HoodieTestTable.of(metaClient).listAllBaseFiles()) + tableView.getLatestFileSlices(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH) + .toArray().map(slice => slice.asInstanceOf[FileSlice].getFileGroupId.getFileId) + } + + @throws[InterruptedException] + private def waitTillHasCompletedReplaceInstant(tablePath: String, + timeoutSecs: Int, sleepSecsAfterEachRun: Int) = { + val beginTime = System.currentTimeMillis + var currTime = beginTime + val timeoutMsecs = timeoutSecs * 1000 + var success = false + while ({!success && (currTime - beginTime) < timeoutMsecs}) try { + this.metaClient.reloadActiveTimeline() + val completeReplaceSize = this.metaClient.getActiveTimeline.getCompletedReplaceTimeline().getInstants.toArray.size + println("completeReplaceSize:" + completeReplaceSize) + if(completeReplaceSize > 0) { + success = true + } + } catch { + case te: TableNotFoundException => + log.info("Got table not found exception. Retrying") + } finally { + Thread.sleep(sleepSecsAfterEachRun * 1000) + currTime = System.currentTimeMillis + } + if (!success) throw new IllegalStateException("Timed-out waiting for " + " have completed replace instant appear in " + tablePath) + } + } diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java index 9b0097e8b..6966e2cd6 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java @@ -33,6 +33,7 @@ import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.table.timeline.HoodieTimeline; import org.apache.hudi.common.testutils.HoodieTestDataGenerator; import org.apache.hudi.common.util.Option; +import org.apache.hudi.config.HoodieClusteringConfig; import org.apache.hudi.config.HoodieCompactionConfig; import org.apache.hudi.exception.HoodieException; import org.apache.hudi.exception.TableNotFoundException; @@ -622,23 +623,14 @@ public class TestHoodieDeltaStreamer extends UtilitiesTestBase { String tableBasePath = dfsBasePath + "/" + tempDir; // Keep it higher than batch-size to test continuous mode int totalRecords = 3000; - // Initial bulk insert HoodieDeltaStreamer.Config cfg = TestHelpers.makeConfig(tableBasePath, WriteOperationType.UPSERT); cfg.continuousMode = true; cfg.tableType = tableType.name(); cfg.configs.add(String.format("%s=%d", SourceConfigs.MAX_UNIQUE_RECORDS_PROP, totalRecords)); cfg.configs.add(String.format("%s=false", HoodieCompactionConfig.AUTO_CLEAN_PROP)); - HoodieDeltaStreamer ds = new HoodieDeltaStreamer(cfg, jsc); - Future dsFuture = Executors.newSingleThreadExecutor().submit(() -> { - try { - ds.sync(); - } catch (Exception ex) { - throw new RuntimeException(ex.getMessage(), ex); - } - }); - TestHelpers.waitTillCondition((r) -> { + deltaStreamerTestRunner(cfg, (r) -> { if (tableType.equals(HoodieTableType.MERGE_ON_READ)) { TestHelpers.assertAtleastNDeltaCommits(5, tableBasePath, dfs); TestHelpers.assertAtleastNCompactionCommits(2, tableBasePath, dfs); @@ -648,11 +640,48 @@ public class TestHoodieDeltaStreamer extends UtilitiesTestBase { TestHelpers.assertRecordCount(totalRecords, tableBasePath + "/*/*.parquet", sqlContext); TestHelpers.assertDistanceCount(totalRecords, tableBasePath + "/*/*.parquet", sqlContext); return true; - }, 180); + }); + } + + private void deltaStreamerTestRunner(HoodieDeltaStreamer.Config cfg, Function condition) throws Exception { + HoodieDeltaStreamer ds = new HoodieDeltaStreamer(cfg, jsc); + Future dsFuture = Executors.newSingleThreadExecutor().submit(() -> { + try { + ds.sync(); + } catch (Exception ex) { + throw new RuntimeException(ex.getMessage(), ex); + } + }); + + TestHelpers.waitTillCondition(condition, 180); ds.shutdownGracefully(); dsFuture.get(); } + @Test + public void testInlineClustering() throws Exception { + String tableBasePath = dfsBasePath + "/inlineClustering"; + // Keep it higher than batch-size to test continuous mode + int totalRecords = 3000; + + // Initial bulk insert + HoodieDeltaStreamer.Config cfg = TestHelpers.makeConfig(tableBasePath, WriteOperationType.UPSERT); + cfg.continuousMode = true; + cfg.tableType = HoodieTableType.MERGE_ON_READ.name(); + cfg.configs.add(String.format("%s=%d", SourceConfigs.MAX_UNIQUE_RECORDS_PROP, totalRecords)); + cfg.configs.add(String.format("%s=false", HoodieCompactionConfig.AUTO_CLEAN_PROP)); + cfg.configs.add(String.format("%s=%s", HoodieClusteringConfig.INLINE_CLUSTERING_PROP, "true")); + cfg.configs.add(String.format("%s=%s", HoodieClusteringConfig.INLINE_CLUSTERING_MAX_COMMIT_PROP, "2")); + + deltaStreamerTestRunner(cfg, (r) -> { + HoodieTableMetaClient metaClient = new HoodieTableMetaClient(this.dfs.getConf(), tableBasePath, true); + int pendingReplaceSize = metaClient.getActiveTimeline().filterPendingReplaceTimeline().getInstants().toArray().length; + int completeReplaceSize = metaClient.getActiveTimeline().getCompletedReplaceTimeline().getInstants().toArray().length; + LOG.info("PendingReplaceSize=" + pendingReplaceSize + ",completeReplaceSize = " + completeReplaceSize); + return completeReplaceSize > 0; + }); + } + /** * Test Bulk Insert and upserts with hive syncing. Tests Hudi incremental processing using a 2 step pipeline The first * step involves using a SQL template to transform a source TEST-DATA-SOURCE ============================> HUDI TABLE