1
0

[HUDI-2767] Enabling timeline-server-based marker as default (#4112)

- Changes the default config of marker type (HoodieWriteConfig.MARKERS_TYPE or hoodie.write.markers.type) from DIRECT to TIMELINE_SERVER_BASED for Spark Engine.
- Adds engine-specific marker type configs: Spark -> TIMELINE_SERVER_BASED, Flink -> DIRECT, Java -> DIRECT.
- Uses DIRECT markers as well for Spark structured streaming due to timeline server only available for the first mini-batch.
- Fixes the marker creation method for non-partitioned table in TimelineServerBasedWriteMarkers.
- Adds the fallback to direct markers even when TIMELINE_SERVER_BASED is configured, in WriteMarkersFactory: when HDFS is used, or embedded timeline server is disabled, the fallback to direct markers happens.
- Fixes the closing of timeline service.
- Fixes tests that depend on markers, mainly by starting the timeline service for each test.
This commit is contained in:
Y Ethan Guo
2021-11-26 13:41:05 -08:00
committed by GitHub
parent f8e0176eb0
commit d1e83e4ba0
35 changed files with 529 additions and 134 deletions

View File

@@ -16,18 +16,16 @@
*/
package org.apache.hudi
import java.lang
import java.util.function.Function
import org.apache.hudi.async.{AsyncClusteringService, AsyncCompactService, SparkStreamingAsyncClusteringService, SparkStreamingAsyncCompactService}
import org.apache.hudi.client.SparkRDDWriteClient
import org.apache.hudi.client.common.HoodieSparkEngineContext
import org.apache.hudi.common.model.HoodieRecordPayload
import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient}
import org.apache.hudi.common.table.marker.MarkerType
import org.apache.hudi.common.table.timeline.HoodieInstant.State
import org.apache.hudi.common.table.timeline.{HoodieInstant, HoodieTimeline}
import org.apache.hudi.common.util.CompactionUtils
import org.apache.hudi.common.util.ClusteringUtils
import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient}
import org.apache.hudi.common.util.{ClusteringUtils, CompactionUtils}
import org.apache.hudi.config.HoodieWriteConfig
import org.apache.hudi.exception.HoodieCorruptedDataException
import org.apache.log4j.LogManager
import org.apache.spark.api.java.JavaSparkContext
@@ -35,8 +33,10 @@ import org.apache.spark.sql.execution.streaming.Sink
import org.apache.spark.sql.streaming.OutputMode
import org.apache.spark.sql.{DataFrame, SQLContext, SaveMode}
import scala.util.{Failure, Success, Try}
import java.lang
import java.util.function.Function
import scala.collection.JavaConversions._
import scala.util.{Failure, Success, Try}
class HoodieStreamingSink(sqlContext: SQLContext,
options: Map[String, String],
@@ -71,25 +71,29 @@ class HoodieStreamingSink(sqlContext: SQLContext,
private var hoodieTableConfig : Option[HoodieTableConfig] = Option.empty
override def addBatch(batchId: Long, data: DataFrame): Unit = this.synchronized {
if (isAsyncCompactorServiceShutdownAbnormally) {
if (isAsyncCompactorServiceShutdownAbnormally) {
throw new IllegalStateException("Async Compactor shutdown unexpectedly")
}
if (isAsyncClusteringServiceShutdownAbnormally) {
if (isAsyncClusteringServiceShutdownAbnormally) {
log.error("Async clustering service shutdown unexpectedly")
throw new IllegalStateException("Async clustering service shutdown unexpectedly")
}
// Override to use direct markers. In Structured streaming, timeline server is closed after
// first micro-batch and subsequent micro-batches do not have timeline server running.
// Thus, we can't use timeline-server-based markers.
val updatedOptions = options.updated(HoodieWriteConfig.MARKERS_TYPE.key(), MarkerType.DIRECT.name())
retry(retryCnt, retryIntervalMs)(
Try(
HoodieSparkSqlWriter.write(
sqlContext, mode, options, data, hoodieTableConfig, writeClient, Some(triggerAsyncCompactor), Some(triggerAsyncClustering))
sqlContext, mode, updatedOptions, data, hoodieTableConfig, writeClient, Some(triggerAsyncCompactor), Some(triggerAsyncClustering))
) match {
case Success((true, commitOps, compactionInstantOps, clusteringInstant, client, tableConfig)) =>
log.info(s"Micro batch id=$batchId succeeded"
+ (commitOps.isPresent match {
case true => s" for commit=${commitOps.get()}"
case _ => s" with no new commits"
}))
case true => s" for commit=${commitOps.get()}"
case _ => s" with no new commits"
}))
writeClient = Some(client)
hoodieTableConfig = Some(tableConfig)
if (compactionInstantOps.isPresent) {