1
0

[HUDI-575] Spark Streaming with async compaction support (#1752)

This commit is contained in:
Balaji Varadarajan
2020-08-05 07:50:15 -07:00
committed by GitHub
parent 61e027fadd
commit 7a2429f5ba
22 changed files with 835 additions and 304 deletions

View File

@@ -226,11 +226,16 @@ public class DataSourceUtils {
}
public static HoodieWriteClient createHoodieClient(JavaSparkContext jssc, String schemaStr, String basePath,
String tblName, Map<String, String> parameters) {
String tblName, Map<String, String> parameters) {
boolean asyncCompact = Boolean.parseBoolean(parameters.get(DataSourceWriteOptions.ASYNC_COMPACT_ENABLE_KEY()));
// inline compaction is on by default for MOR
boolean inlineCompact = parameters.get(DataSourceWriteOptions.TABLE_TYPE_OPT_KEY())
boolean inlineCompact = !asyncCompact && parameters.get(DataSourceWriteOptions.TABLE_TYPE_OPT_KEY())
.equals(DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL());
return createHoodieClient(jssc, schemaStr, basePath, tblName, parameters, inlineCompact);
}
public static HoodieWriteClient createHoodieClient(JavaSparkContext jssc, String schemaStr, String basePath,
String tblName, Map<String, String> parameters, boolean inlineCompact) {
// insert/bulk-insert combining to be true, if filtering for duplicates
boolean combineInserts = Boolean.parseBoolean(parameters.get(DataSourceWriteOptions.INSERT_DROP_DUPS_OPT_KEY()));

View File

@@ -0,0 +1,35 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.async;
import org.apache.hudi.client.HoodieWriteClient;
import org.apache.spark.api.java.JavaSparkContext;
/**
* Async Compaction Service used by Structured Streaming. Here, async compaction is run in daemon mode to prevent
* blocking shutting down the Spark application.
*/
public class SparkStreamingAsyncCompactService extends AsyncCompactService {
private static final long serialVersionUID = 1L;
public SparkStreamingAsyncCompactService(JavaSparkContext jssc, HoodieWriteClient client) {
super(jssc, client, true);
}
}

View File

@@ -281,4 +281,8 @@ object DataSourceWriteOptions {
val DEFAULT_HIVE_ASSUME_DATE_PARTITION_OPT_VAL = "false"
val DEFAULT_USE_PRE_APACHE_INPUT_FORMAT_OPT_VAL = "false"
val DEFAULT_HIVE_USE_JDBC_OPT_VAL = "true"
// Async Compaction - Enabled by default for MOR
val ASYNC_COMPACT_ENABLE_KEY = "hoodie.datasource.compaction.async.enable"
val DEFAULT_ASYNC_COMPACT_ENABLE_VAL = "true"
}

View File

@@ -18,6 +18,8 @@
package org.apache.hudi
import org.apache.hudi.DataSourceReadOptions._
import org.apache.hudi.common.table.HoodieTableMetaClient
import org.apache.hudi.config.HoodieWriteConfig
import org.apache.hudi.exception.HoodieException
import org.apache.hudi.hadoop.HoodieROTablePathFilter
import org.apache.log4j.LogManager
@@ -103,10 +105,8 @@ class DefaultSource extends RelationProvider
mode: SaveMode,
optParams: Map[String, String],
df: DataFrame): BaseRelation = {
val parameters = HoodieSparkSqlWriter.parametersWithWriteDefaults(optParams)
HoodieSparkSqlWriter.write(sqlContext, mode, parameters, df)
new HudiEmptyRelation(sqlContext, df.schema)
}

View File

@@ -21,6 +21,7 @@ import java.util
import org.apache.avro.Schema
import org.apache.avro.generic.GenericRecord
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hadoop.hive.conf.HiveConf
import org.apache.hudi.DataSourceWriteOptions._
@@ -29,7 +30,7 @@ import org.apache.hudi.client.{HoodieWriteClient, WriteStatus}
import org.apache.hudi.common.config.TypedProperties
import org.apache.hudi.common.fs.FSUtils
import org.apache.hudi.common.model.{HoodieRecordPayload, HoodieTableType}
import org.apache.hudi.common.table.HoodieTableMetaClient
import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient}
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline
import org.apache.hudi.config.HoodieWriteConfig
import org.apache.hudi.exception.HoodieException
@@ -49,7 +50,13 @@ private[hudi] object HoodieSparkSqlWriter {
def write(sqlContext: SQLContext,
mode: SaveMode,
parameters: Map[String, String],
df: DataFrame): (Boolean, common.util.Option[String]) = {
df: DataFrame,
hoodieTableConfig: Option[HoodieTableConfig] = Option.empty,
hoodieWriteClient: Option[HoodieWriteClient[HoodieRecordPayload[Nothing]]] = Option.empty,
asyncCompactionTriggerFn: Option[Function1[HoodieWriteClient[HoodieRecordPayload[Nothing]], Unit]] = Option.empty
)
: (Boolean, common.util.Option[String], common.util.Option[String],
HoodieWriteClient[HoodieRecordPayload[Nothing]], HoodieTableConfig) = {
val sparkContext = sqlContext.sparkContext
val path = parameters.get("path")
@@ -84,113 +91,134 @@ private[hudi] object HoodieSparkSqlWriter {
val instantTime = HoodieActiveTimeline.createNewInstantTime()
val fs = basePath.getFileSystem(sparkContext.hadoopConfiguration)
var exists = fs.exists(new Path(basePath, HoodieTableMetaClient.METAFOLDER_NAME))
if (exists && mode == SaveMode.Append) {
val existingTableName = new HoodieTableMetaClient(sparkContext.hadoopConfiguration, path.get).getTableConfig.getTableName
if (!existingTableName.equals(tblName)) {
throw new HoodieException(s"hoodie table with name $existingTableName already exist at $basePath")
}
var tableConfig : HoodieTableConfig = if (exists) {
hoodieTableConfig.getOrElse(
new HoodieTableMetaClient(sparkContext.hadoopConfiguration, path.get).getTableConfig)
} else {
null
}
val (writeStatuses, writeClient: HoodieWriteClient[HoodieRecordPayload[Nothing]]) =
if (!operation.equalsIgnoreCase(DELETE_OPERATION_OPT_VAL)) {
// register classes & schemas
val structName = s"${tblName}_record"
val nameSpace = s"hoodie.${tblName}"
sparkContext.getConf.registerKryoClasses(
Array(classOf[org.apache.avro.generic.GenericData],
classOf[org.apache.avro.Schema]))
val schema = AvroConversionUtils.convertStructTypeToAvroSchema(df.schema, structName, nameSpace)
sparkContext.getConf.registerAvroSchemas(schema)
log.info(s"Registered avro schema : ${schema.toString(true)}")
// Convert to RDD[HoodieRecord]
val keyGenerator = DataSourceUtils.createKeyGenerator(toProperties(parameters))
val genericRecords: RDD[GenericRecord] = AvroConversionUtils.createRdd(df, structName, nameSpace)
val hoodieAllIncomingRecords = genericRecords.map(gr => {
val orderingVal = HoodieAvroUtils.getNestedFieldVal(gr, parameters(PRECOMBINE_FIELD_OPT_KEY), false)
.asInstanceOf[Comparable[_]]
DataSourceUtils.createHoodieRecord(gr,
orderingVal, keyGenerator.getKey(gr), parameters(PAYLOAD_CLASS_OPT_KEY))
}).toJavaRDD()
// Handle various save modes
if (mode == SaveMode.ErrorIfExists && exists) {
throw new HoodieException(s"hoodie table at $basePath already exists.")
}
if (mode == SaveMode.Ignore && exists) {
log.warn(s"hoodie table at $basePath already exists. Ignoring & not performing actual writes.")
(true, common.util.Option.empty())
}
if (mode == SaveMode.Overwrite && exists) {
log.warn(s"hoodie table at $basePath already exists. Deleting existing data & overwriting with new data.")
fs.delete(basePath, true)
exists = false
if (mode == SaveMode.Ignore && exists) {
log.warn(s"hoodie table at $basePath already exists. Ignoring & not performing actual writes.")
(false, common.util.Option.empty(), common.util.Option.empty(), hoodieWriteClient.orNull, tableConfig)
} else {
if (exists && mode == SaveMode.Append) {
val existingTableName = tableConfig.getTableName
if (!existingTableName.equals(tblName)) {
throw new HoodieException(s"hoodie table with name $existingTableName already exist at $basePath")
}
}
val (writeStatuses, writeClient: HoodieWriteClient[HoodieRecordPayload[Nothing]]) =
if (!operation.equalsIgnoreCase(DELETE_OPERATION_OPT_VAL)) {
// register classes & schemas
val structName = s"${tblName}_record"
val nameSpace = s"hoodie.${tblName}"
sparkContext.getConf.registerKryoClasses(
Array(classOf[org.apache.avro.generic.GenericData],
classOf[org.apache.avro.Schema]))
val schema = AvroConversionUtils.convertStructTypeToAvroSchema(df.schema, structName, nameSpace)
sparkContext.getConf.registerAvroSchemas(schema)
log.info(s"Registered avro schema : ${schema.toString(true)}")
// Create the table if not present
if (!exists) {
//FIXME(bootstrap): bootstrapIndexClass needs to be set when bootstrap index class is integrated.
HoodieTableMetaClient.initTableTypeWithBootstrap(sparkContext.hadoopConfiguration, path.get, HoodieTableType.valueOf(tableType),
tblName, "archived", parameters(PAYLOAD_CLASS_OPT_KEY), null, null, null)
}
// Convert to RDD[HoodieRecord]
val keyGenerator = DataSourceUtils.createKeyGenerator(toProperties(parameters))
val genericRecords: RDD[GenericRecord] = AvroConversionUtils.createRdd(df, structName, nameSpace)
val hoodieAllIncomingRecords = genericRecords.map(gr => {
val orderingVal = HoodieAvroUtils.getNestedFieldVal(gr, parameters(PRECOMBINE_FIELD_OPT_KEY), false)
.asInstanceOf[Comparable[_]]
DataSourceUtils.createHoodieRecord(gr,
orderingVal, keyGenerator.getKey(gr),
parameters(PAYLOAD_CLASS_OPT_KEY))
}).toJavaRDD()
// Create a HoodieWriteClient & issue the write.
val client = DataSourceUtils.createHoodieClient(jsc, schema.toString, path.get, tblName,
mapAsJavaMap(parameters)
)
// Handle various save modes
if (mode == SaveMode.ErrorIfExists && exists) {
throw new HoodieException(s"hoodie table at $basePath already exists.")
}
val hoodieRecords =
if (parameters(INSERT_DROP_DUPS_OPT_KEY).toBoolean) {
DataSourceUtils.dropDuplicates(jsc, hoodieAllIncomingRecords, mapAsJavaMap(parameters))
if (mode == SaveMode.Overwrite && exists) {
log.warn(s"hoodie table at $basePath already exists. Deleting existing data & overwriting with new data.")
fs.delete(basePath, true)
exists = false
}
// Create the table if not present
if (!exists) {
//FIXME(bootstrap): bootstrapIndexClass needs to be set when bootstrap index class is integrated.
val tableMetaClient = HoodieTableMetaClient.initTableTypeWithBootstrap(sparkContext.hadoopConfiguration,
path.get, HoodieTableType.valueOf(tableType),
tblName, "archived", parameters(PAYLOAD_CLASS_OPT_KEY), null, null, null)
tableConfig = tableMetaClient.getTableConfig
}
// Create a HoodieWriteClient & issue the write.
val client = hoodieWriteClient.getOrElse(DataSourceUtils.createHoodieClient(jsc, schema.toString, path.get,
tblName, mapAsJavaMap(parameters)
)).asInstanceOf[HoodieWriteClient[HoodieRecordPayload[Nothing]]]
if (asyncCompactionTriggerFn.isDefined &&
isAsyncCompactionEnabled(client, tableConfig, parameters, jsc.hadoopConfiguration())) {
asyncCompactionTriggerFn.get.apply(client)
}
val hoodieRecords =
if (parameters(INSERT_DROP_DUPS_OPT_KEY).toBoolean) {
DataSourceUtils.dropDuplicates(jsc, hoodieAllIncomingRecords, mapAsJavaMap(parameters))
} else {
hoodieAllIncomingRecords
}
if (hoodieRecords.isEmpty()) {
log.info("new batch has no new records, skipping...")
(true, common.util.Option.empty())
}
client.startCommitWithTime(instantTime)
val writeStatuses = DataSourceUtils.doWriteOperation(client, hoodieRecords, instantTime, operation)
(writeStatuses, client)
} else {
hoodieAllIncomingRecords
// Handle save modes
if (mode != SaveMode.Append) {
throw new HoodieException(s"Append is the only save mode applicable for $operation operation")
}
val structName = s"${tblName}_record"
val nameSpace = s"hoodie.${tblName}"
sparkContext.getConf.registerKryoClasses(
Array(classOf[org.apache.avro.generic.GenericData],
classOf[org.apache.avro.Schema]))
// Convert to RDD[HoodieKey]
val keyGenerator = DataSourceUtils.createKeyGenerator(toProperties(parameters))
val genericRecords: RDD[GenericRecord] = AvroConversionUtils.createRdd(df, structName, nameSpace)
val hoodieKeysToDelete = genericRecords.map(gr => keyGenerator.getKey(gr)).toJavaRDD()
if (!exists) {
throw new HoodieException(s"hoodie table at $basePath does not exist")
}
// Create a HoodieWriteClient & issue the delete.
val client = hoodieWriteClient.getOrElse(DataSourceUtils.createHoodieClient(jsc,
Schema.create(Schema.Type.NULL).toString, path.get, tblName,
mapAsJavaMap(parameters))).asInstanceOf[HoodieWriteClient[HoodieRecordPayload[Nothing]]]
if (asyncCompactionTriggerFn.isDefined &&
isAsyncCompactionEnabled(client, tableConfig, parameters, jsc.hadoopConfiguration())) {
asyncCompactionTriggerFn.get.apply(client)
}
// Issue deletes
client.startCommitWithTime(instantTime)
val writeStatuses = DataSourceUtils.doDeleteOperation(client, hoodieKeysToDelete, instantTime)
(writeStatuses, client)
}
if (hoodieRecords.isEmpty()) {
log.info("new batch has no new records, skipping...")
(true, common.util.Option.empty())
}
client.startCommitWithTime(instantTime)
val writeStatuses = DataSourceUtils.doWriteOperation(client, hoodieRecords, instantTime, operation)
(writeStatuses, client)
} else {
// Handle save modes
if (mode != SaveMode.Append) {
throw new HoodieException(s"Append is the only save mode applicable for $operation operation")
}
val structName = s"${tblName}_record"
val nameSpace = s"hoodie.${tblName}"
sparkContext.getConf.registerKryoClasses(
Array(classOf[org.apache.avro.generic.GenericData],
classOf[org.apache.avro.Schema]))
// Convert to RDD[HoodieKey]
val keyGenerator = DataSourceUtils.createKeyGenerator(toProperties(parameters))
val genericRecords: RDD[GenericRecord] = AvroConversionUtils.createRdd(df, structName, nameSpace)
val hoodieKeysToDelete = genericRecords.map(gr => keyGenerator.getKey(gr)).toJavaRDD()
if (!exists) {
throw new HoodieException(s"hoodie table at $basePath does not exist")
}
// Create a HoodieWriteClient & issue the delete.
val client = DataSourceUtils.createHoodieClient(jsc,
Schema.create(Schema.Type.NULL).toString, path.get, tblName,
mapAsJavaMap(parameters)
)
// Issue deletes
client.startCommitWithTime(instantTime)
val writeStatuses = DataSourceUtils.doDeleteOperation(client, hoodieKeysToDelete, instantTime)
(writeStatuses, client)
// Check for errors and commit the write.
val (writeSuccessful, compactionInstant) =
commitAndPerformPostOperations(writeStatuses, parameters, writeClient, tableConfig, instantTime, basePath,
operation, jsc)
(writeSuccessful, common.util.Option.ofNullable(instantTime), compactionInstant, writeClient, tableConfig)
}
// Check for errors and commit the write.
val writeSuccessful = checkWriteStatus(writeStatuses, parameters, writeClient, instantTime, basePath, operation, jsc)
(writeSuccessful, common.util.Option.ofNullable(instantTime))
}
/**
@@ -222,7 +250,8 @@ private[hudi] object HoodieSparkSqlWriter {
HIVE_PARTITION_FIELDS_OPT_KEY -> DEFAULT_HIVE_PARTITION_FIELDS_OPT_VAL,
HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY -> DEFAULT_HIVE_PARTITION_EXTRACTOR_CLASS_OPT_VAL,
HIVE_STYLE_PARTITIONING_OPT_KEY -> DEFAULT_HIVE_STYLE_PARTITIONING_OPT_VAL,
HIVE_USE_JDBC_OPT_KEY -> DEFAULT_HIVE_USE_JDBC_OPT_VAL
HIVE_USE_JDBC_OPT_KEY -> DEFAULT_HIVE_USE_JDBC_OPT_VAL,
ASYNC_COMPACT_ENABLE_KEY -> DEFAULT_ASYNC_COMPACT_ENABLE_VAL
) ++ translateStorageTypeToTableType(parameters)
}
@@ -258,13 +287,14 @@ private[hudi] object HoodieSparkSqlWriter {
hiveSyncConfig
}
private def checkWriteStatus(writeStatuses: JavaRDD[WriteStatus],
parameters: Map[String, String],
client: HoodieWriteClient[_],
instantTime: String,
basePath: Path,
operation: String,
jsc: JavaSparkContext): Boolean = {
private def commitAndPerformPostOperations(writeStatuses: JavaRDD[WriteStatus],
parameters: Map[String, String],
client: HoodieWriteClient[HoodieRecordPayload[Nothing]],
tableConfig: HoodieTableConfig,
instantTime: String,
basePath: Path,
operation: String,
jsc: JavaSparkContext): (Boolean, common.util.Option[java.lang.String]) = {
val errorCount = writeStatuses.rdd.filter(ws => ws.hasErrors).count()
if (errorCount == 0) {
log.info("No errors. Proceeding to commit the write.")
@@ -284,6 +314,15 @@ private[hudi] object HoodieSparkSqlWriter {
log.info("Commit " + instantTime + " failed!")
}
val asyncCompactionEnabled = isAsyncCompactionEnabled(client, tableConfig, parameters, jsc.hadoopConfiguration())
val compactionInstant : common.util.Option[java.lang.String] =
if (asyncCompactionEnabled) {
client.scheduleCompaction(common.util.Option.of(new util.HashMap[String, String](mapAsJavaMap(metaMap))))
} else {
common.util.Option.empty()
}
log.info(s"Compaction Scheduled is $compactionInstant")
val hiveSyncEnabled = parameters.get(HIVE_SYNC_ENABLED_OPT_KEY).exists(r => r.toBoolean)
val syncHiveSucess = if (hiveSyncEnabled) {
log.info("Syncing to Hive Metastore (URL: " + parameters(HIVE_URL_OPT_KEY) + ")")
@@ -292,8 +331,12 @@ private[hudi] object HoodieSparkSqlWriter {
} else {
true
}
client.close()
commitSuccess && syncHiveSucess
log.info(s"Is Async Compaction Enabled ? $asyncCompactionEnabled")
if (!asyncCompactionEnabled) {
client.close()
}
(commitSuccess && syncHiveSucess, compactionInstant)
} else {
log.error(s"$operation failed with $errorCount errors :")
if (log.isTraceEnabled) {
@@ -308,6 +351,18 @@ private[hudi] object HoodieSparkSqlWriter {
}
})
}
(false, common.util.Option.empty())
}
}
private def isAsyncCompactionEnabled(client: HoodieWriteClient[HoodieRecordPayload[Nothing]],
tableConfig: HoodieTableConfig,
parameters: Map[String, String], configuration: Configuration) : Boolean = {
log.info(s"Config.isInlineCompaction ? ${client.getConfig.isInlineCompaction}")
if (!client.getConfig.isInlineCompaction
&& parameters.get(ASYNC_COMPACT_ENABLE_KEY).exists(r => r.toBoolean)) {
tableConfig.getTableType == HoodieTableType.MERGE_ON_READ
} else {
false
}
}

View File

@@ -16,13 +16,25 @@
*/
package org.apache.hudi
import java.lang
import java.util.function.{Function, Supplier}
import org.apache.hudi.async.{AsyncCompactService, SparkStreamingAsyncCompactService}
import org.apache.hudi.client.HoodieWriteClient
import org.apache.hudi.common.model.HoodieRecordPayload
import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient}
import org.apache.hudi.common.table.timeline.HoodieInstant.State
import org.apache.hudi.common.table.timeline.{HoodieInstant, HoodieTimeline}
import org.apache.hudi.common.util.CompactionUtils
import org.apache.hudi.exception.HoodieCorruptedDataException
import org.apache.log4j.LogManager
import org.apache.spark.api.java.JavaSparkContext
import org.apache.spark.sql.execution.streaming.Sink
import org.apache.spark.sql.streaming.OutputMode
import org.apache.spark.sql.streaming.{OutputMode, StreamingQueryListener}
import org.apache.spark.sql.{DataFrame, SQLContext, SaveMode}
import scala.util.{Failure, Success, Try}
import scala.collection.JavaConversions._
class HoodieStreamingSink(sqlContext: SQLContext,
options: Map[String, String],
@@ -38,6 +50,8 @@ class HoodieStreamingSink(sqlContext: SQLContext,
private val retryIntervalMs = options(DataSourceWriteOptions.STREAMING_RETRY_INTERVAL_MS_OPT_KEY).toLong
private val ignoreFailedBatch = options(DataSourceWriteOptions.STREAMING_IGNORE_FAILED_BATCH_OPT_KEY).toBoolean
private var isAsyncCompactorServiceShutdownAbnormally = false
private val mode =
if (outputMode == OutputMode.Append()) {
SaveMode.Append
@@ -45,39 +59,54 @@ class HoodieStreamingSink(sqlContext: SQLContext,
SaveMode.Overwrite
}
override def addBatch(batchId: Long, data: DataFrame): Unit = {
private var asyncCompactorService : AsyncCompactService = _
private var writeClient : Option[HoodieWriteClient[HoodieRecordPayload[Nothing]]] = Option.empty
private var hoodieTableConfig : Option[HoodieTableConfig] = Option.empty
override def addBatch(batchId: Long, data: DataFrame): Unit = this.synchronized {
if (isAsyncCompactorServiceShutdownAbnormally) {
throw new IllegalStateException("Async Compactor shutdown unexpectedly")
}
retry(retryCnt, retryIntervalMs)(
Try(
HoodieSparkSqlWriter.write(
sqlContext,
mode,
options,
data)
sqlContext, mode, options, data, hoodieTableConfig, writeClient, Some(triggerAsyncCompactor))
) match {
case Success((true, commitOps)) =>
case Success((true, commitOps, compactionInstantOps, client, tableConfig)) =>
log.info(s"Micro batch id=$batchId succeeded"
+ (commitOps.isPresent match {
case true => s" for commit=${commitOps.get()}"
case _ => s" with no new commits"
}))
Success((true, commitOps))
writeClient = Some(client)
hoodieTableConfig = Some(tableConfig)
if (compactionInstantOps.isPresent) {
asyncCompactorService.enqueuePendingCompaction(
new HoodieInstant(State.REQUESTED, HoodieTimeline.COMPACTION_ACTION, compactionInstantOps.get()))
}
Success((true, commitOps, compactionInstantOps))
case Failure(e) =>
// clean up persist rdds in the write process
data.sparkSession.sparkContext.getPersistentRDDs
.foreach {
case (id, rdd) =>
rdd.unpersist()
try {
rdd.unpersist()
} catch {
case t: Exception => log.warn("Got excepting trying to unpersist rdd", t)
}
}
log.error(s"Micro batch id=$batchId threw following expection: ", e)
log.error(s"Micro batch id=$batchId threw following exception: ", e)
if (ignoreFailedBatch) {
log.info(s"Ignore the exception and move on streaming as per " +
s"${DataSourceWriteOptions.STREAMING_IGNORE_FAILED_BATCH_OPT_KEY} configuration")
Success((true, None))
Success((true, None, None))
} else {
if (retryCnt > 1) log.info(s"Retrying the failed micro batch id=$batchId ...")
Failure(e)
}
case Success((false, commitOps)) =>
case Success((false, commitOps, compactionInstantOps, client, tableConfig)) =>
log.error(s"Micro batch id=$batchId ended up with errors"
+ (commitOps.isPresent match {
case true => s" for commit=${commitOps.get()}"
@@ -86,7 +115,7 @@ class HoodieStreamingSink(sqlContext: SQLContext,
if (ignoreFailedBatch) {
log.info(s"Ignore the errors and move on streaming as per " +
s"${DataSourceWriteOptions.STREAMING_IGNORE_FAILED_BATCH_OPT_KEY} configuration")
Success((true, None))
Success((true, None, None))
} else {
if (retryCnt > 1) log.info(s"Retrying the failed micro batch id=$batchId ...")
Failure(new HoodieCorruptedDataException(s"Micro batch id=$batchId ended up with errors"))
@@ -100,6 +129,7 @@ class HoodieStreamingSink(sqlContext: SQLContext,
// spark sometimes hangs upon exceptions and keep on hold of the executors
// this is to force exit upon errors / exceptions and release all executors
// will require redeployment / supervise mode to restart the streaming
reset(true)
System.exit(1)
}
case Success(_) =>
@@ -112,11 +142,55 @@ class HoodieStreamingSink(sqlContext: SQLContext,
@annotation.tailrec
private def retry[T](n: Int, waitInMillis: Long)(fn: => Try[T]): Try[T] = {
fn match {
case x: util.Success[T] => x
case x: Success[T] =>
x
case _ if n > 1 =>
Thread.sleep(waitInMillis)
retry(n - 1, waitInMillis * 2)(fn)
case f => f
case f =>
reset(false)
f
}
}
protected def triggerAsyncCompactor(client: HoodieWriteClient[HoodieRecordPayload[Nothing]]): Unit = {
if (null == asyncCompactorService) {
log.info("Triggering Async compaction !!")
asyncCompactorService = new SparkStreamingAsyncCompactService(new JavaSparkContext(sqlContext.sparkContext),
client)
asyncCompactorService.start(new Function[java.lang.Boolean, java.lang.Boolean] {
override def apply(errored: lang.Boolean): lang.Boolean = {
log.info(s"Async Compactor shutdown. Errored ? $errored")
isAsyncCompactorServiceShutdownAbnormally = errored
reset(false)
log.info("Done resetting write client.")
true
}
})
// Add Shutdown Hook
Runtime.getRuntime.addShutdownHook(new Thread(new Runnable {
override def run(): Unit = reset(true)
}))
// First time, scan .hoodie folder and get all pending compactions
val metaClient = new HoodieTableMetaClient(sqlContext.sparkContext.hadoopConfiguration,
client.getConfig.getBasePath)
val pendingInstants :java.util.List[HoodieInstant] =
CompactionUtils.getPendingCompactionInstantTimes(metaClient)
pendingInstants.foreach((h : HoodieInstant) => asyncCompactorService.enqueuePendingCompaction(h))
}
}
private def reset(force: Boolean) : Unit = this.synchronized {
if (asyncCompactorService != null) {
asyncCompactorService.shutdown(force)
asyncCompactorService = null
}
if (writeClient.isDefined) {
writeClient.get.close()
writeClient = Option.empty
}
}
}