1
0

[HUDI-1558] Struct Stream Source Support Spark3 (#4586)

Co-authored-by: Hui An <hui.an@shopee.com>
This commit is contained in:
RexAn
2022-01-18 11:08:33 +08:00
committed by GitHub
parent 20e7983866
commit f18447406d

View File

@@ -26,7 +26,7 @@ import org.apache.spark.sql.execution.streaming.{Offset, SerializedOffset}
case class HoodieSourceOffset(commitTime: String) extends Offset { case class HoodieSourceOffset(commitTime: String) extends Offset {
override def json(): String = { override val json: String = {
HoodieSourceOffset.toJson(this) HoodieSourceOffset.toJson(this)
} }
@@ -45,17 +45,21 @@ case class HoodieSourceOffset(commitTime: String) extends Offset {
object HoodieSourceOffset { object HoodieSourceOffset {
val mapper = new ObjectMapper with ScalaObjectMapper
mapper.setSerializationInclusion(Include.NON_ABSENT) lazy val mapper: ObjectMapper = {
mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false) val _mapper = new ObjectMapper
mapper.registerModule(DefaultScalaModule) _mapper.setSerializationInclusion(Include.NON_ABSENT)
_mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false)
_mapper.registerModule(DefaultScalaModule)
_mapper
}
def toJson(offset: HoodieSourceOffset): String = { def toJson(offset: HoodieSourceOffset): String = {
mapper.writeValueAsString(offset) mapper.writeValueAsString(offset)
} }
def fromJson(json: String): HoodieSourceOffset = { def fromJson(json: String): HoodieSourceOffset = {
mapper.readValue[HoodieSourceOffset](json) mapper.readValue(json, classOf[HoodieSourceOffset])
} }
def apply(offset: Offset): HoodieSourceOffset = { def apply(offset: Offset): HoodieSourceOffset = {