[MINOR] Fix typo,rename 'HooodieAvroDeserializer' to 'HoodieAvroDeserializer' (#4064)
This commit is contained in:
@@ -28,7 +28,7 @@ import org.apache.hudi.exception.HoodieException
|
|||||||
import org.apache.hudi.hadoop.config.HoodieRealtimeConfig
|
import org.apache.hudi.hadoop.config.HoodieRealtimeConfig
|
||||||
import org.apache.hudi.hadoop.utils.HoodieInputFormatUtils.HOODIE_RECORD_KEY_COL_POS
|
import org.apache.hudi.hadoop.utils.HoodieInputFormatUtils.HOODIE_RECORD_KEY_COL_POS
|
||||||
import org.apache.spark.rdd.RDD
|
import org.apache.spark.rdd.RDD
|
||||||
import org.apache.spark.sql.avro.{HoodieAvroSerializer, HooodieAvroDeserializer}
|
import org.apache.spark.sql.avro.{HoodieAvroSerializer, HoodieAvroDeserializer}
|
||||||
import org.apache.spark.sql.catalyst.InternalRow
|
import org.apache.spark.sql.catalyst.InternalRow
|
||||||
import org.apache.spark.sql.catalyst.expressions.{SpecificInternalRow, UnsafeProjection}
|
import org.apache.spark.sql.catalyst.expressions.{SpecificInternalRow, UnsafeProjection}
|
||||||
import org.apache.spark.sql.execution.datasources.PartitionedFile
|
import org.apache.spark.sql.execution.datasources.PartitionedFile
|
||||||
@@ -119,7 +119,7 @@ class HoodieMergeOnReadRDD(@transient sc: SparkContext,
|
|||||||
tableState.requiredStructSchema
|
tableState.requiredStructSchema
|
||||||
.map(f => tableAvroSchema.getField(f.name).pos()).toList
|
.map(f => tableAvroSchema.getField(f.name).pos()).toList
|
||||||
private val recordBuilder = new GenericRecordBuilder(requiredAvroSchema)
|
private val recordBuilder = new GenericRecordBuilder(requiredAvroSchema)
|
||||||
private val deserializer = HooodieAvroDeserializer(requiredAvroSchema, tableState.requiredStructSchema)
|
private val deserializer = HoodieAvroDeserializer(requiredAvroSchema, tableState.requiredStructSchema)
|
||||||
private val unsafeProjection = UnsafeProjection.create(tableState.requiredStructSchema)
|
private val unsafeProjection = UnsafeProjection.create(tableState.requiredStructSchema)
|
||||||
private val logRecords = HoodieMergeOnReadRDD.scanLog(split, tableAvroSchema, config).getRecords
|
private val logRecords = HoodieMergeOnReadRDD.scanLog(split, tableAvroSchema, config).getRecords
|
||||||
private val logRecordsKeyIterator = logRecords.keySet().iterator().asScala
|
private val logRecordsKeyIterator = logRecords.keySet().iterator().asScala
|
||||||
@@ -158,7 +158,7 @@ class HoodieMergeOnReadRDD(@transient sc: SparkContext,
|
|||||||
tableState.requiredStructSchema
|
tableState.requiredStructSchema
|
||||||
.map(f => tableAvroSchema.getField(f.name).pos()).toList
|
.map(f => tableAvroSchema.getField(f.name).pos()).toList
|
||||||
private val recordBuilder = new GenericRecordBuilder(requiredAvroSchema)
|
private val recordBuilder = new GenericRecordBuilder(requiredAvroSchema)
|
||||||
private val deserializer = HooodieAvroDeserializer(requiredAvroSchema, tableState.requiredStructSchema)
|
private val deserializer = HoodieAvroDeserializer(requiredAvroSchema, tableState.requiredStructSchema)
|
||||||
private val unsafeProjection = UnsafeProjection.create(tableState.requiredStructSchema)
|
private val unsafeProjection = UnsafeProjection.create(tableState.requiredStructSchema)
|
||||||
private val logRecords = HoodieMergeOnReadRDD.scanLog(split, tableAvroSchema, config).getRecords
|
private val logRecords = HoodieMergeOnReadRDD.scanLog(split, tableAvroSchema, config).getRecords
|
||||||
private val logRecordsKeyIterator = logRecords.keySet().iterator().asScala
|
private val logRecordsKeyIterator = logRecords.keySet().iterator().asScala
|
||||||
@@ -204,7 +204,7 @@ class HoodieMergeOnReadRDD(@transient sc: SparkContext,
|
|||||||
tableState.requiredStructSchema
|
tableState.requiredStructSchema
|
||||||
.map(f => tableAvroSchema.getField(f.name).pos()).toList
|
.map(f => tableAvroSchema.getField(f.name).pos()).toList
|
||||||
private val serializer = HoodieAvroSerializer(tableState.tableStructSchema, tableAvroSchema, false)
|
private val serializer = HoodieAvroSerializer(tableState.tableStructSchema, tableAvroSchema, false)
|
||||||
private val requiredDeserializer = HooodieAvroDeserializer(requiredAvroSchema, tableState.requiredStructSchema)
|
private val requiredDeserializer = HoodieAvroDeserializer(requiredAvroSchema, tableState.requiredStructSchema)
|
||||||
private val recordBuilder = new GenericRecordBuilder(requiredAvroSchema)
|
private val recordBuilder = new GenericRecordBuilder(requiredAvroSchema)
|
||||||
private val unsafeProjection = UnsafeProjection.create(tableState.requiredStructSchema)
|
private val unsafeProjection = UnsafeProjection.create(tableState.requiredStructSchema)
|
||||||
private val logRecords = HoodieMergeOnReadRDD.scanLog(split, tableAvroSchema, config).getRecords
|
private val logRecords = HoodieMergeOnReadRDD.scanLog(split, tableAvroSchema, config).getRecords
|
||||||
|
|||||||
@@ -24,7 +24,7 @@ import org.apache.spark.sql.types.DataType
|
|||||||
* This is to be compatible with the type returned by Spark 3.1
|
* This is to be compatible with the type returned by Spark 3.1
|
||||||
* and other spark versions for AvroDeserializer
|
* and other spark versions for AvroDeserializer
|
||||||
*/
|
*/
|
||||||
case class HooodieAvroDeserializer(rootAvroType: Schema, rootCatalystType: DataType)
|
case class HoodieAvroDeserializer(rootAvroType: Schema, rootCatalystType: DataType)
|
||||||
extends AvroDeserializer(rootAvroType, rootCatalystType) {
|
extends AvroDeserializer(rootAvroType, rootCatalystType) {
|
||||||
|
|
||||||
def deserializeData(data: Any): Any = {
|
def deserializeData(data: Any): Any = {
|
||||||
@@ -22,7 +22,7 @@ import org.apache.avro.Schema
|
|||||||
|
|
||||||
import org.apache.hudi.AvroConversionUtils
|
import org.apache.hudi.AvroConversionUtils
|
||||||
|
|
||||||
import org.apache.spark.sql.avro.HooodieAvroDeserializer
|
import org.apache.spark.sql.avro.HoodieAvroDeserializer
|
||||||
import org.apache.spark.sql.catalyst.InternalRow
|
import org.apache.spark.sql.catalyst.InternalRow
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@@ -31,7 +31,7 @@ import org.apache.spark.sql.catalyst.InternalRow
|
|||||||
class SqlTypedRecord(val record: IndexedRecord) extends IndexedRecord {
|
class SqlTypedRecord(val record: IndexedRecord) extends IndexedRecord {
|
||||||
|
|
||||||
private lazy val sqlType = AvroConversionUtils.convertAvroSchemaToStructType(getSchema)
|
private lazy val sqlType = AvroConversionUtils.convertAvroSchemaToStructType(getSchema)
|
||||||
private lazy val avroDeserializer = HooodieAvroDeserializer(record.getSchema, sqlType)
|
private lazy val avroDeserializer = HoodieAvroDeserializer(record.getSchema, sqlType)
|
||||||
private lazy val sqlRow = avroDeserializer.deserializeData(record).asInstanceOf[InternalRow]
|
private lazy val sqlRow = avroDeserializer.deserializeData(record).asInstanceOf[InternalRow]
|
||||||
|
|
||||||
override def put(i: Int, v: Any): Unit = {
|
override def put(i: Int, v: Any): Unit = {
|
||||||
|
|||||||
Reference in New Issue
Block a user