1
0

[HUDI-1483] Support async clustering for deltastreamer and Spark streaming (#3142)

- Integrate async clustering service with HoodieDeltaStreamer and HoodieStreamingSink
- Added methods in HoodieAsyncService to reuse code
This commit is contained in:
Sagar Sumit
2021-07-12 00:13:38 +05:30
committed by GitHub
parent 9b01d2a045
commit 5804ad8e32
23 changed files with 710 additions and 112 deletions

View File

@@ -34,6 +34,7 @@ import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ReflectionUtils;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.common.util.TablePathUtils;
import org.apache.hudi.config.HoodieClusteringConfig;
import org.apache.hudi.config.HoodieCompactionConfig;
import org.apache.hudi.config.HoodieIndexConfig;
import org.apache.hudi.config.HoodiePayloadConfig;
@@ -171,6 +172,8 @@ public class DataSourceUtils {
boolean asyncCompact = Boolean.parseBoolean(parameters.get(DataSourceWriteOptions.ASYNC_COMPACT_ENABLE_OPT_KEY().key()));
boolean inlineCompact = !asyncCompact && parameters.get(DataSourceWriteOptions.TABLE_TYPE_OPT_KEY().key())
.equals(DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL());
boolean asyncClusteringEnabled = Boolean.parseBoolean(parameters.get(DataSourceWriteOptions.ASYNC_CLUSTERING_ENABLE_OPT_KEY().key()));
boolean inlineClusteringEnabled = Boolean.parseBoolean(parameters.get(DataSourceWriteOptions.INLINE_CLUSTERING_ENABLE_OPT_KEY().key()));
// insert/bulk-insert combining to be true, if filtering for duplicates
boolean combineInserts = Boolean.parseBoolean(parameters.get(DataSourceWriteOptions.INSERT_DROP_DUPS_OPT_KEY().key()));
HoodieWriteConfig.Builder builder = HoodieWriteConfig.newBuilder()
@@ -184,6 +187,9 @@ public class DataSourceUtils {
.withCompactionConfig(HoodieCompactionConfig.newBuilder()
.withPayloadClass(parameters.get(DataSourceWriteOptions.PAYLOAD_CLASS_OPT_KEY().key()))
.withInlineCompaction(inlineCompact).build())
.withClusteringConfig(HoodieClusteringConfig.newBuilder()
.withInlineClustering(inlineClusteringEnabled)
.withAsyncClustering(asyncClusteringEnabled).build())
.withPayloadConfig(HoodiePayloadConfig.newBuilder().withPayloadOrderingField(parameters.get(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY().key()))
.build())
// override above with Hoodie configs specified as options.

View File

@@ -474,6 +474,18 @@ object DataSourceWriteOptions {
.defaultValue("true")
.withDocumentation("")
val INLINE_CLUSTERING_ENABLE_OPT_KEY: ConfigProperty[String] = ConfigProperty
.key("hoodie.datasource.clustering.inline.enable")
.defaultValue("false")
.sinceVersion("0.9.0")
.withDocumentation("Enable inline clustering. Disabled by default.")
val ASYNC_CLUSTERING_ENABLE_OPT_KEY: ConfigProperty[String] = ConfigProperty
.key("hoodie.datasource.clustering.async.enable")
.defaultValue("false")
.sinceVersion("0.9.0")
.withDocumentation("Enable asynchronous clustering. Disabled by default.")
val KAFKA_AVRO_VALUE_DESERIALIZER_CLASS: ConfigProperty[String] = ConfigProperty
.key("hoodie.deltastreamer.source.kafka.value.deserializer.class")
.defaultValue("io.confluent.kafka.serializers.KafkaAvroDeserializer")

View File

@@ -0,0 +1,42 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.hudi.async;
import org.apache.hudi.client.AbstractClusteringClient;
import org.apache.hudi.client.AbstractHoodieWriteClient;
import org.apache.hudi.client.HoodieSparkClusteringClient;
/**
* Async clustering service for Spark structured streaming.
* Here, async clustering is run in daemon mode to prevent blocking shutting down the Spark application.
*/
public class SparkStreamingAsyncClusteringService extends AsyncClusteringService {
private static final long serialVersionUID = 1L;
public SparkStreamingAsyncClusteringService(AbstractHoodieWriteClient writeClient) {
super(writeClient, true);
}
@Override
protected AbstractClusteringClient createClusteringClient(AbstractHoodieWriteClient client) {
return new HoodieSparkClusteringClient(client);
}
}

View File

@@ -62,6 +62,7 @@ object HoodieSparkSqlWriter {
private val log = LogManager.getLogger(getClass)
private var tableExists: Boolean = false
private var asyncCompactionTriggerFnDefined: Boolean = false
private var asyncClusteringTriggerFnDefined: Boolean = false
def write(sqlContext: SQLContext,
mode: SaveMode,
@@ -69,9 +70,10 @@ object HoodieSparkSqlWriter {
df: DataFrame,
hoodieTableConfigOpt: Option[HoodieTableConfig] = Option.empty,
hoodieWriteClient: Option[SparkRDDWriteClient[HoodieRecordPayload[Nothing]]] = Option.empty,
asyncCompactionTriggerFn: Option[Function1[SparkRDDWriteClient[HoodieRecordPayload[Nothing]], Unit]] = Option.empty
asyncCompactionTriggerFn: Option[Function1[SparkRDDWriteClient[HoodieRecordPayload[Nothing]], Unit]] = Option.empty,
asyncClusteringTriggerFn: Option[Function1[SparkRDDWriteClient[HoodieRecordPayload[Nothing]], Unit]] = Option.empty
)
: (Boolean, common.util.Option[String], common.util.Option[String],
: (Boolean, common.util.Option[String], common.util.Option[String], common.util.Option[String],
SparkRDDWriteClient[HoodieRecordPayload[Nothing]], HoodieTableConfig) = {
val sparkContext = sqlContext.sparkContext
@@ -79,6 +81,7 @@ object HoodieSparkSqlWriter {
val hoodieConfig = HoodieWriterUtils.convertMapToHoodieConfig(parameters)
val tblNameOp = hoodieConfig.getStringOrThrow(HoodieWriteConfig.TABLE_NAME, s"'${HoodieWriteConfig.TABLE_NAME.key}' must be set.")
asyncCompactionTriggerFnDefined = asyncCompactionTriggerFn.isDefined
asyncClusteringTriggerFnDefined = asyncClusteringTriggerFn.isDefined
if (path.isEmpty) {
throw new HoodieException(s"'path' must be set.")
}
@@ -112,7 +115,7 @@ object HoodieSparkSqlWriter {
if (mode == SaveMode.Ignore && tableExists) {
log.warn(s"hoodie table at $basePath already exists. Ignoring & not performing actual writes.")
(false, common.util.Option.empty(), common.util.Option.empty(), hoodieWriteClient.orNull, tableConfig)
(false, common.util.Option.empty(), common.util.Option.empty(), common.util.Option.empty(), hoodieWriteClient.orNull, tableConfig)
} else {
// Handle various save modes
handleSaveModes(mode, basePath, tableConfig, tblName, operation, fs)
@@ -140,7 +143,7 @@ object HoodieSparkSqlWriter {
operation == WriteOperationType.BULK_INSERT) {
val (success, commitTime: common.util.Option[String]) = bulkInsertAsRow(sqlContext, parameters, df, tblName,
basePath, path, instantTime)
return (success, commitTime, common.util.Option.empty(), hoodieWriteClient.orNull, tableConfig)
return (success, commitTime, common.util.Option.empty(), common.util.Option.empty(), hoodieWriteClient.orNull, tableConfig)
}
// scalastyle:on
@@ -180,6 +183,10 @@ object HoodieSparkSqlWriter {
asyncCompactionTriggerFn.get.apply(client)
}
if (isAsyncClusteringEnabled(client, parameters)) {
asyncClusteringTriggerFn.get.apply(client)
}
val hoodieRecords =
if (hoodieConfig.getBoolean(INSERT_DROP_DUPS_OPT_KEY)) {
DataSourceUtils.dropDuplicates(jsc, hoodieAllIncomingRecords, mapAsJavaMap(parameters))
@@ -219,6 +226,10 @@ object HoodieSparkSqlWriter {
asyncCompactionTriggerFn.get.apply(client)
}
if (isAsyncClusteringEnabled(client, parameters)) {
asyncClusteringTriggerFn.get.apply(client)
}
// Issue deletes
client.startCommitWithTime(instantTime, commitActionType)
val writeStatuses = DataSourceUtils.doDeleteOperation(client, hoodieKeysToDelete, instantTime)
@@ -226,7 +237,7 @@ object HoodieSparkSqlWriter {
}
// Check for errors and commit the write.
val (writeSuccessful, compactionInstant) =
val (writeSuccessful, compactionInstant, clusteringInstant) =
commitAndPerformPostOperations(sqlContext.sparkSession, df.schema,
writeResult, parameters, writeClient, tableConfig, jsc,
TableInstantInfo(basePath, instantTime, commitActionType, operation))
@@ -247,7 +258,7 @@ object HoodieSparkSqlWriter {
// it's safe to unpersist cached rdds here
unpersistRdd(writeResult.getWriteStatuses.rdd)
(writeSuccessful, common.util.Option.ofNullable(instantTime), compactionInstant, writeClient, tableConfig)
(writeSuccessful, common.util.Option.ofNullable(instantTime), compactionInstant, clusteringInstant, writeClient, tableConfig)
}
}
@@ -565,7 +576,7 @@ object HoodieSparkSqlWriter {
tableConfig: HoodieTableConfig,
jsc: JavaSparkContext,
tableInstantInfo: TableInstantInfo
): (Boolean, common.util.Option[java.lang.String]) = {
): (Boolean, common.util.Option[java.lang.String], common.util.Option[java.lang.String]) = {
if(writeResult.getWriteStatuses.rdd.filter(ws => ws.hasErrors).isEmpty()) {
log.info("Proceeding to commit the write.")
val metaMap = parameters.filter(kv =>
@@ -593,14 +604,24 @@ object HoodieSparkSqlWriter {
log.info(s"Compaction Scheduled is $compactionInstant")
val asyncClusteringEnabled = isAsyncClusteringEnabled(client, parameters)
val clusteringInstant: common.util.Option[java.lang.String] =
if (asyncClusteringEnabled) {
client.scheduleClustering(common.util.Option.of(new util.HashMap[String, String](mapAsJavaMap(metaMap))))
} else {
common.util.Option.empty()
}
log.info(s"Clustering Scheduled is $clusteringInstant")
val metaSyncSuccess = metaSync(spark, HoodieWriterUtils.convertMapToHoodieConfig(parameters),
tableInstantInfo.basePath, schema)
log.info(s"Is Async Compaction Enabled ? $asyncCompactionEnabled")
if (!asyncCompactionEnabled) {
if (!asyncCompactionEnabled && !asyncClusteringEnabled) {
client.close()
}
(commitSuccess && metaSyncSuccess, compactionInstant)
(commitSuccess && metaSyncSuccess, compactionInstant, clusteringInstant)
} else {
log.error(s"${tableInstantInfo.operation} failed with errors")
if (log.isTraceEnabled) {
@@ -615,7 +636,7 @@ object HoodieSparkSqlWriter {
}
})
}
(false, common.util.Option.empty())
(false, common.util.Option.empty(), common.util.Option.empty())
}
}
@@ -631,6 +652,13 @@ object HoodieSparkSqlWriter {
}
}
private def isAsyncClusteringEnabled(client: SparkRDDWriteClient[HoodieRecordPayload[Nothing]],
parameters: Map[String, String]) : Boolean = {
log.info(s"Config.asyncClusteringEnabled ? ${client.getConfig.isAsyncClusteringEnabled}")
asyncClusteringTriggerFnDefined && client.getConfig.isAsyncClusteringEnabled &&
parameters.get(ASYNC_CLUSTERING_ENABLE_OPT_KEY.key).exists(r => r.toBoolean)
}
private def getHoodieTableConfig(sparkContext: SparkContext,
tablePath: String,
hoodieTableConfigOpt: Option[HoodieTableConfig]): HoodieTableConfig = {

View File

@@ -19,7 +19,7 @@ package org.apache.hudi
import java.lang
import java.util.function.Function
import org.apache.hudi.async.{AsyncCompactService, SparkStreamingAsyncCompactService}
import org.apache.hudi.async.{AsyncClusteringService, AsyncCompactService, SparkStreamingAsyncClusteringService, SparkStreamingAsyncCompactService}
import org.apache.hudi.client.SparkRDDWriteClient
import org.apache.hudi.client.common.HoodieSparkEngineContext
import org.apache.hudi.common.model.HoodieRecordPayload
@@ -27,6 +27,7 @@ import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient}
import org.apache.hudi.common.table.timeline.HoodieInstant.State
import org.apache.hudi.common.table.timeline.{HoodieInstant, HoodieTimeline}
import org.apache.hudi.common.util.CompactionUtils
import org.apache.hudi.common.util.ClusteringUtils
import org.apache.hudi.exception.HoodieCorruptedDataException
import org.apache.log4j.LogManager
import org.apache.spark.api.java.JavaSparkContext
@@ -52,6 +53,7 @@ class HoodieStreamingSink(sqlContext: SQLContext,
private val ignoreFailedBatch = options(DataSourceWriteOptions.STREAMING_IGNORE_FAILED_BATCH_OPT_KEY.key).toBoolean
private var isAsyncCompactorServiceShutdownAbnormally = false
private var isAsyncClusteringServiceShutdownAbnormally = false
private val mode =
if (outputMode == OutputMode.Append()) {
@@ -61,6 +63,7 @@ class HoodieStreamingSink(sqlContext: SQLContext,
}
private var asyncCompactorService : AsyncCompactService = _
private var asyncClusteringService: AsyncClusteringService = _
private var writeClient : Option[SparkRDDWriteClient[HoodieRecordPayload[Nothing]]] = Option.empty
private var hoodieTableConfig : Option[HoodieTableConfig] = Option.empty
@@ -68,13 +71,17 @@ class HoodieStreamingSink(sqlContext: SQLContext,
if (isAsyncCompactorServiceShutdownAbnormally) {
throw new IllegalStateException("Async Compactor shutdown unexpectedly")
}
if (isAsyncClusteringServiceShutdownAbnormally) {
log.error("Async clustering service shutdown unexpectedly")
throw new IllegalStateException("Async clustering service shutdown unexpectedly")
}
retry(retryCnt, retryIntervalMs)(
Try(
HoodieSparkSqlWriter.write(
sqlContext, mode, options, data, hoodieTableConfig, writeClient, Some(triggerAsyncCompactor))
sqlContext, mode, options, data, hoodieTableConfig, writeClient, Some(triggerAsyncCompactor), Some(triggerAsyncClustering))
) match {
case Success((true, commitOps, compactionInstantOps, client, tableConfig)) =>
case Success((true, commitOps, compactionInstantOps, clusteringInstant, client, tableConfig)) =>
log.info(s"Micro batch id=$batchId succeeded"
+ (commitOps.isPresent match {
case true => s" for commit=${commitOps.get()}"
@@ -83,9 +90,14 @@ class HoodieStreamingSink(sqlContext: SQLContext,
writeClient = Some(client)
hoodieTableConfig = Some(tableConfig)
if (compactionInstantOps.isPresent) {
asyncCompactorService.enqueuePendingCompaction(
asyncCompactorService.enqueuePendingAsyncServiceInstant(
new HoodieInstant(State.REQUESTED, HoodieTimeline.COMPACTION_ACTION, compactionInstantOps.get()))
}
if (clusteringInstant.isPresent) {
asyncClusteringService.enqueuePendingAsyncServiceInstant(new HoodieInstant(
State.REQUESTED, HoodieTimeline.REPLACE_COMMIT_ACTION, clusteringInstant.get()
))
}
Success((true, commitOps, compactionInstantOps))
case Failure(e) =>
// clean up persist rdds in the write process
@@ -107,7 +119,7 @@ class HoodieStreamingSink(sqlContext: SQLContext,
if (retryCnt > 1) log.info(s"Retrying the failed micro batch id=$batchId ...")
Failure(e)
}
case Success((false, commitOps, compactionInstantOps, client, tableConfig)) =>
case Success((false, commitOps, compactionInstantOps, clusteringInstant, client, tableConfig)) =>
log.error(s"Micro batch id=$batchId ended up with errors"
+ (commitOps.isPresent match {
case true => s" for commit=${commitOps.get()}"
@@ -179,7 +191,33 @@ class HoodieStreamingSink(sqlContext: SQLContext,
.setBasePath(client.getConfig.getBasePath).build()
val pendingInstants :java.util.List[HoodieInstant] =
CompactionUtils.getPendingCompactionInstantTimes(metaClient)
pendingInstants.foreach((h : HoodieInstant) => asyncCompactorService.enqueuePendingCompaction(h))
pendingInstants.foreach((h : HoodieInstant) => asyncCompactorService.enqueuePendingAsyncServiceInstant(h))
}
}
protected def triggerAsyncClustering(client: SparkRDDWriteClient[HoodieRecordPayload[Nothing]]): Unit = {
if (null == asyncClusteringService) {
log.info("Triggering async clustering!")
asyncClusteringService = new SparkStreamingAsyncClusteringService(client)
asyncClusteringService.start(new Function[java.lang.Boolean, java.lang.Boolean] {
override def apply(errored: lang.Boolean): lang.Boolean = {
log.info(s"Async clustering service shutdown. Errored ? $errored")
isAsyncClusteringServiceShutdownAbnormally = errored
reset(false)
true
}
})
// Add Shutdown Hook
Runtime.getRuntime.addShutdownHook(new Thread(new Runnable {
override def run(): Unit = reset(true)
}))
// First time, scan .hoodie folder and get all pending clustering instants
val metaClient = HoodieTableMetaClient.builder().setConf(sqlContext.sparkContext.hadoopConfiguration)
.setBasePath(client.getConfig.getBasePath).build()
val pendingInstants :java.util.List[HoodieInstant] = ClusteringUtils.getPendingClusteringInstantTimes(metaClient)
pendingInstants.foreach((h : HoodieInstant) => asyncClusteringService.enqueuePendingAsyncServiceInstant(h))
}
}
@@ -189,6 +227,11 @@ class HoodieStreamingSink(sqlContext: SQLContext,
asyncCompactorService = null
}
if (asyncClusteringService != null) {
asyncClusteringService.shutdown(force)
asyncClusteringService = null
}
if (writeClient.isDefined) {
writeClient.get.close()
writeClient = Option.empty

View File

@@ -76,6 +76,8 @@ object HoodieWriterUtils {
HIVE_CREATE_MANAGED_TABLE.key() -> HIVE_CREATE_MANAGED_TABLE.defaultValue.toString,
HIVE_SYNC_AS_DATA_SOURCE_TABLE.key() -> HIVE_SYNC_AS_DATA_SOURCE_TABLE.defaultValue(),
ASYNC_COMPACT_ENABLE_OPT_KEY.key -> ASYNC_COMPACT_ENABLE_OPT_KEY.defaultValue,
INLINE_CLUSTERING_ENABLE_OPT_KEY.key -> INLINE_CLUSTERING_ENABLE_OPT_KEY.defaultValue,
ASYNC_CLUSTERING_ENABLE_OPT_KEY.key -> ASYNC_CLUSTERING_ENABLE_OPT_KEY.defaultValue,
ENABLE_ROW_WRITER_OPT_KEY.key -> ENABLE_ROW_WRITER_OPT_KEY.defaultValue
) ++ DataSourceOptionsHelper.translateConfigurations(parameters)
}

View File

@@ -156,6 +156,7 @@ public class HoodieJavaApp {
nonPartitionedTable ? NonpartitionedKeyGenerator.class.getCanonicalName()
: SimpleKeyGenerator.class.getCanonicalName())
.option(DataSourceWriteOptions.ASYNC_COMPACT_ENABLE_OPT_KEY().key(), "false")
.option(DataSourceWriteOptions.ASYNC_CLUSTERING_ENABLE_OPT_KEY().key(), "true")
// This will remove any existing data at path below, and create a
.mode(SaveMode.Overwrite);
@@ -183,6 +184,7 @@ public class HoodieJavaApp {
: SimpleKeyGenerator.class.getCanonicalName()) // Add Key Extractor
.option(HoodieCompactionConfig.INLINE_COMPACT_NUM_DELTA_COMMITS_PROP.key(), "1")
.option(DataSourceWriteOptions.ASYNC_COMPACT_ENABLE_OPT_KEY().key(), "false")
.option(DataSourceWriteOptions.ASYNC_CLUSTERING_ENABLE_OPT_KEY().key(), "true")
.option(HoodieWriteConfig.TABLE_NAME.key(), tableName).mode(SaveMode.Append);
updateHiveSyncConfig(writer);
@@ -210,6 +212,7 @@ public class HoodieJavaApp {
: SimpleKeyGenerator.class.getCanonicalName()) // Add Key Extractor
.option(HoodieCompactionConfig.INLINE_COMPACT_NUM_DELTA_COMMITS_PROP.key(), "1")
.option(DataSourceWriteOptions.ASYNC_COMPACT_ENABLE_OPT_KEY().key(), "false")
.option(DataSourceWriteOptions.ASYNC_CLUSTERING_ENABLE_OPT_KEY().key(), "true")
.option(HoodieWriteConfig.TABLE_NAME.key(), tableName).mode(SaveMode.Append);
updateHiveSyncConfig(writer);

View File

@@ -362,6 +362,7 @@ public class HoodieJavaStreamingApp {
.option(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY().key(), "timestamp")
.option(HoodieCompactionConfig.INLINE_COMPACT_NUM_DELTA_COMMITS_PROP.key(), "1")
.option(DataSourceWriteOptions.ASYNC_COMPACT_ENABLE_OPT_KEY().key(), "true")
.option(DataSourceWriteOptions.ASYNC_CLUSTERING_ENABLE_OPT_KEY().key(), "true")
.option(HoodieWriteConfig.TABLE_NAME.key(), tableName).option("checkpointLocation", checkpointLocation)
.outputMode(OutputMode.Append());

View File

@@ -190,9 +190,13 @@ class TestStructuredStreaming extends HoodieClientTestBase {
numInstants
}
def getInlineClusteringOpts( isInlineClustering: String, clusteringNumCommit: String, fileMaxRecordNum: Int):Map[String, String] = {
def getClusteringOpts(isInlineClustering: String, isAsyncClustering: String, isAsyncCompaction: String,
clusteringNumCommit: String, fileMaxRecordNum: Int):Map[String, String] = {
commonOpts + (HoodieClusteringConfig.INLINE_CLUSTERING_PROP.key -> isInlineClustering,
HoodieClusteringConfig.INLINE_CLUSTERING_MAX_COMMIT_PROP.key -> clusteringNumCommit,
DataSourceWriteOptions.ASYNC_CLUSTERING_ENABLE_OPT_KEY.key -> isAsyncClustering,
DataSourceWriteOptions.ASYNC_COMPACT_ENABLE_OPT_KEY.key -> isAsyncCompaction,
HoodieClusteringConfig.ASYNC_CLUSTERING_MAX_COMMIT_PROP.key -> clusteringNumCommit,
HoodieStorageConfig.PARQUET_FILE_MAX_BYTES.key -> dataGen.getEstimatedFileSizeInBytes(fileMaxRecordNum).toString
)
}
@@ -207,12 +211,40 @@ class TestStructuredStreaming extends HoodieClientTestBase {
metaClient.reloadActiveTimeline()
assertEquals(1, getLatestFileGroupsFileId(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH).size)
}
structuredStreamingForTestClusteringRunner(sourcePath, destPath, true,
structuredStreamingForTestClusteringRunner(sourcePath, destPath, true, false, false,
HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, checkClusteringResult)
}
@Test
def testStructuredStreamingWithoutInlineClustering(): Unit = {
def testStructuredStreamingWithAsyncClustering(): Unit = {
val (sourcePath, destPath) = initStreamingSourceAndDestPath("source", "dest")
def checkClusteringResult(destPath: String):Unit = {
// check have schedule clustering and clustering file group to one
waitTillHasCompletedReplaceInstant(destPath, 120, 5)
metaClient.reloadActiveTimeline()
assertEquals(1, getLatestFileGroupsFileId(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH).size)
}
structuredStreamingForTestClusteringRunner(sourcePath, destPath, false, true, false,
HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, checkClusteringResult)
}
@Test
def testStructuredStreamingWithAsyncClusteringAndCompaction(): Unit = {
val (sourcePath, destPath) = initStreamingSourceAndDestPath("source", "dest")
def checkClusteringResult(destPath: String):Unit = {
// check have schedule clustering and clustering file group to one
waitTillHasCompletedReplaceInstant(destPath, 120, 5)
metaClient.reloadActiveTimeline()
assertEquals(1, getLatestFileGroupsFileId(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH).size)
}
structuredStreamingForTestClusteringRunner(sourcePath, destPath, false, true, true,
HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, checkClusteringResult)
}
@Test
def testStructuredStreamingWithoutClustering(): Unit = {
val (sourcePath, destPath) = initStreamingSourceAndDestPath("source", "dest")
def checkClusteringResult(destPath: String):Unit = {
@@ -224,12 +256,13 @@ class TestStructuredStreaming extends HoodieClientTestBase {
}, msg)
println(msg)
}
structuredStreamingForTestClusteringRunner(sourcePath, destPath, false,
structuredStreamingForTestClusteringRunner(sourcePath, destPath, false, false, false,
HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, checkClusteringResult)
}
def structuredStreamingForTestClusteringRunner(sourcePath: String, destPath: String,
isInlineClustering: Boolean, partitionOfRecords: String, checkClusteringResult: String => Unit): Unit = {
def structuredStreamingForTestClusteringRunner(sourcePath: String, destPath: String, isInlineClustering: Boolean,
isAsyncClustering: Boolean, isAsyncCompaction: Boolean,
partitionOfRecords: String, checkClusteringResult: String => Unit): Unit = {
// First insert of data
val records1 = recordsToStrings(dataGen.generateInsertsForPartition("000", 100, partitionOfRecords)).toList
val inputDF1 = spark.read.json(spark.sparkContext.parallelize(records1, 2))
@@ -238,7 +271,8 @@ class TestStructuredStreaming extends HoodieClientTestBase {
val records2 = recordsToStrings(dataGen.generateInsertsForPartition("001", 100, partitionOfRecords)).toList
val inputDF2 = spark.read.json(spark.sparkContext.parallelize(records2, 2))
val hudiOptions = getInlineClusteringOpts(isInlineClustering.toString, "2", 100)
val hudiOptions = getClusteringOpts(isInlineClustering.toString, isAsyncClustering.toString,
isAsyncCompaction.toString, "2", 100)
val f1 = initStreamingWriteFuture(inputDF1.schema, sourcePath, destPath, hudiOptions)
val f2 = Future {