1
0

[MINOR] Improving runtime of TestStructuredStreaming by 2 mins (#3382)

This commit is contained in:
vinoth chandar
2021-08-02 13:42:46 -07:00
committed by GitHub
parent fe508376fa
commit b21ae68e67

View File

@@ -30,8 +30,7 @@ import org.apache.log4j.LogManager
import org.apache.spark.sql._ import org.apache.spark.sql._
import org.apache.spark.sql.streaming.{OutputMode, Trigger} import org.apache.spark.sql.streaming.{OutputMode, Trigger}
import org.apache.spark.sql.types.StructType import org.apache.spark.sql.types.StructType
import org.junit.jupiter.api.Assertions.{assertEquals, assertThrows, assertTrue} import org.junit.jupiter.api.Assertions.{assertEquals, assertTrue}
import org.junit.jupiter.api.function.Executable
import org.junit.jupiter.api.{AfterEach, BeforeEach, Test} import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
import scala.collection.JavaConversions._ import scala.collection.JavaConversions._
@@ -183,9 +182,11 @@ class TestStructuredStreaming extends HoodieClientTestBase {
case te: TableNotFoundException => case te: TableNotFoundException =>
log.info("Got table not found exception. Retrying") log.info("Got table not found exception. Retrying")
} finally { } finally {
if (!success) {
Thread.sleep(sleepSecsAfterEachRun * 1000) Thread.sleep(sleepSecsAfterEachRun * 1000)
currTime = System.currentTimeMillis currTime = System.currentTimeMillis
} }
}
if (!success) throw new IllegalStateException("Timed-out waiting for " + numCommits + " commits to appear in " + tablePath) if (!success) throw new IllegalStateException("Timed-out waiting for " + numCommits + " commits to appear in " + tablePath)
numInstants numInstants
} }
@@ -207,7 +208,7 @@ class TestStructuredStreaming extends HoodieClientTestBase {
def checkClusteringResult(destPath: String):Unit = { def checkClusteringResult(destPath: String):Unit = {
// check have schedule clustering and clustering file group to one // check have schedule clustering and clustering file group to one
waitTillHasCompletedReplaceInstant(destPath, 120, 5) waitTillHasCompletedReplaceInstant(destPath, 120, 1)
metaClient.reloadActiveTimeline() metaClient.reloadActiveTimeline()
assertEquals(1, getLatestFileGroupsFileId(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH).size) assertEquals(1, getLatestFileGroupsFileId(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH).size)
} }
@@ -221,7 +222,7 @@ class TestStructuredStreaming extends HoodieClientTestBase {
def checkClusteringResult(destPath: String):Unit = { def checkClusteringResult(destPath: String):Unit = {
// check have schedule clustering and clustering file group to one // check have schedule clustering and clustering file group to one
waitTillHasCompletedReplaceInstant(destPath, 120, 5) waitTillHasCompletedReplaceInstant(destPath, 120, 1)
metaClient.reloadActiveTimeline() metaClient.reloadActiveTimeline()
assertEquals(1, getLatestFileGroupsFileId(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH).size) assertEquals(1, getLatestFileGroupsFileId(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH).size)
} }
@@ -235,7 +236,7 @@ class TestStructuredStreaming extends HoodieClientTestBase {
def checkClusteringResult(destPath: String):Unit = { def checkClusteringResult(destPath: String):Unit = {
// check have schedule clustering and clustering file group to one // check have schedule clustering and clustering file group to one
waitTillHasCompletedReplaceInstant(destPath, 120, 5) waitTillHasCompletedReplaceInstant(destPath, 120, 1)
metaClient.reloadActiveTimeline() metaClient.reloadActiveTimeline()
assertEquals(1, getLatestFileGroupsFileId(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH).size) assertEquals(1, getLatestFileGroupsFileId(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH).size)
} }
@@ -243,23 +244,6 @@ class TestStructuredStreaming extends HoodieClientTestBase {
HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, checkClusteringResult) HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, checkClusteringResult)
} }
@Test
def testStructuredStreamingWithoutClustering(): Unit = {
val (sourcePath, destPath) = initStreamingSourceAndDestPath("source", "dest")
def checkClusteringResult(destPath: String):Unit = {
val msg = "Should have replace commit completed"
assertThrows(classOf[IllegalStateException], new Executable {
override def execute(): Unit = {
waitTillHasCompletedReplaceInstant(destPath, 120, 5)
}
}, msg)
println(msg)
}
structuredStreamingForTestClusteringRunner(sourcePath, destPath, false, false, false,
HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, checkClusteringResult)
}
def structuredStreamingForTestClusteringRunner(sourcePath: String, destPath: String, isInlineClustering: Boolean, def structuredStreamingForTestClusteringRunner(sourcePath: String, destPath: String, isInlineClustering: Boolean,
isAsyncClustering: Boolean, isAsyncCompaction: Boolean, isAsyncClustering: Boolean, isAsyncCompaction: Boolean,
partitionOfRecords: String, checkClusteringResult: String => Unit): Unit = { partitionOfRecords: String, checkClusteringResult: String => Unit): Unit = {
@@ -285,7 +269,7 @@ class TestStructuredStreaming extends HoodieClientTestBase {
// wait for spark streaming to process second microbatch // wait for spark streaming to process second microbatch
currNumCommits = waitTillAtleastNCommits(fs, destPath, currNumCommits + 1, 120, 5) currNumCommits = waitTillAtleastNCommits(fs, destPath, currNumCommits + 1, 120, 5)
// for inline clustering, clustering may be complete along with 2nd commit // for inline clustering, clustering may be complete along with 2nd commit
if (HoodieDataSourceHelpers.allCompletedCommitsCompactions(fs, destPath).getCompletedReplaceTimeline().countInstants() > 0) { if (HoodieDataSourceHelpers.allCompletedCommitsCompactions(fs, destPath).getCompletedReplaceTimeline.countInstants() > 0) {
assertEquals(3, HoodieDataSourceHelpers.listCommitsSince(fs, destPath, "000").size()) assertEquals(3, HoodieDataSourceHelpers.listCommitsSince(fs, destPath, "000").size())
// check have at least one file group // check have at least one file group
this.metaClient = HoodieTableMetaClient.builder().setConf(fs.getConf).setBasePath(destPath) this.metaClient = HoodieTableMetaClient.builder().setConf(fs.getConf).setBasePath(destPath)