From f18447406dd3305c93e69ff109fe031ff470e2ad Mon Sep 17 00:00:00 2001 From: RexAn Date: Tue, 18 Jan 2022 11:08:33 +0800 Subject: [PATCH] [HUDI-1558] Struct Stream Source Support Spark3 (#4586) Co-authored-by: Hui An --- .../sql/hudi/streaming/HoodieSourceOffset.scala | 16 ++++++++++------ 1 file changed, 10 insertions(+), 6 deletions(-) diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/streaming/HoodieSourceOffset.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/streaming/HoodieSourceOffset.scala index 03e651ed8..a5561a65a 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/streaming/HoodieSourceOffset.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/streaming/HoodieSourceOffset.scala @@ -26,7 +26,7 @@ import org.apache.spark.sql.execution.streaming.{Offset, SerializedOffset} case class HoodieSourceOffset(commitTime: String) extends Offset { - override def json(): String = { + override val json: String = { HoodieSourceOffset.toJson(this) } @@ -45,17 +45,21 @@ case class HoodieSourceOffset(commitTime: String) extends Offset { object HoodieSourceOffset { - val mapper = new ObjectMapper with ScalaObjectMapper - mapper.setSerializationInclusion(Include.NON_ABSENT) - mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false) - mapper.registerModule(DefaultScalaModule) + + lazy val mapper: ObjectMapper = { + val _mapper = new ObjectMapper + _mapper.setSerializationInclusion(Include.NON_ABSENT) + _mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false) + _mapper.registerModule(DefaultScalaModule) + _mapper + } def toJson(offset: HoodieSourceOffset): String = { mapper.writeValueAsString(offset) } def fromJson(json: String): HoodieSourceOffset = { - mapper.readValue[HoodieSourceOffset](json) + mapper.readValue(json, classOf[HoodieSourceOffset]) } def apply(offset: Offset): HoodieSourceOffset = {