[HUDI-4348] fix merge into sql data quality in concurrent scene (#6020)
This commit is contained in:
@@ -17,7 +17,7 @@
|
|||||||
|
|
||||||
package org.apache.spark.sql.hudi.command.payload
|
package org.apache.spark.sql.hudi.command.payload
|
||||||
|
|
||||||
import com.google.common.cache.CacheBuilder
|
import com.google.common.cache.{Cache, CacheBuilder}
|
||||||
import org.apache.avro.Schema
|
import org.apache.avro.Schema
|
||||||
import org.apache.avro.generic.IndexedRecord
|
import org.apache.avro.generic.IndexedRecord
|
||||||
import org.apache.hudi.HoodieSparkUtils.sparkAdapter
|
import org.apache.hudi.HoodieSparkUtils.sparkAdapter
|
||||||
@@ -51,7 +51,10 @@ object SqlTypedRecord {
|
|||||||
|
|
||||||
private val sqlTypeCache = CacheBuilder.newBuilder().build[Schema, StructType]()
|
private val sqlTypeCache = CacheBuilder.newBuilder().build[Schema, StructType]()
|
||||||
|
|
||||||
private val avroDeserializerCache = CacheBuilder.newBuilder().build[Schema, HoodieAvroDeserializer]()
|
private val avroDeserializerCacheLocal = new ThreadLocal[Cache[Schema, HoodieAvroDeserializer]] {
|
||||||
|
override def initialValue(): Cache[Schema, HoodieAvroDeserializer] =
|
||||||
|
CacheBuilder.newBuilder().build[Schema, HoodieAvroDeserializer]()
|
||||||
|
}
|
||||||
|
|
||||||
def getSqlType(schema: Schema): StructType = {
|
def getSqlType(schema: Schema): StructType = {
|
||||||
sqlTypeCache.get(schema, new Callable[StructType] {
|
sqlTypeCache.get(schema, new Callable[StructType] {
|
||||||
@@ -64,10 +67,10 @@ object SqlTypedRecord {
|
|||||||
}
|
}
|
||||||
|
|
||||||
def getAvroDeserializer(schema: Schema): HoodieAvroDeserializer= {
|
def getAvroDeserializer(schema: Schema): HoodieAvroDeserializer= {
|
||||||
avroDeserializerCache.get(schema, new Callable[HoodieAvroDeserializer] {
|
avroDeserializerCacheLocal.get().get(schema, new Callable[HoodieAvroDeserializer] {
|
||||||
override def call(): HoodieAvroDeserializer = {
|
override def call(): HoodieAvroDeserializer = {
|
||||||
val deserializer = sparkAdapter.createAvroDeserializer(schema, getSqlType(schema))
|
val deserializer = sparkAdapter.createAvroDeserializer(schema, getSqlType(schema))
|
||||||
avroDeserializerCache.put(schema, deserializer)
|
avroDeserializerCacheLocal.get().put(schema, deserializer)
|
||||||
deserializer
|
deserializer
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
|
|||||||
Reference in New Issue
Block a user