[HUDI-1870] Add more Spark CI build tasks (#4022)
* [HUDI-1870] Add more Spark CI build tasks - build for spark3.0.x - build for spark-shade-unbundle-avro - fix build failures - delete unnecessary assertion for spark 3.0.x - use AvroConversionUtils#convertAvroSchemaToStructType instead of calling SchemaConverters#toSqlType directly to solve the compilation failures with spark-shade-unbundle-avro (#5) Co-authored-by: Yann <biyan900116@gmail.com>
This commit is contained in:
8
.github/workflows/bot.yml
vendored
8
.github/workflows/bot.yml
vendored
@@ -18,8 +18,16 @@ jobs:
|
|||||||
include:
|
include:
|
||||||
- scala: "scala-2.11"
|
- scala: "scala-2.11"
|
||||||
spark: "spark2"
|
spark: "spark2"
|
||||||
|
- scala: "scala-2.11"
|
||||||
|
spark: "spark2,spark-shade-unbundle-avro"
|
||||||
|
- scala: "scala-2.12"
|
||||||
|
spark: "spark3,spark3.0.x"
|
||||||
|
- scala: "scala-2.12"
|
||||||
|
spark: "spark3,spark3.0.x,spark-shade-unbundle-avro"
|
||||||
- scala: "scala-2.12"
|
- scala: "scala-2.12"
|
||||||
spark: "spark3"
|
spark: "spark3"
|
||||||
|
- scala: "scala-2.12"
|
||||||
|
spark: "spark3,spark-shade-unbundle-avro"
|
||||||
steps:
|
steps:
|
||||||
- uses: actions/checkout@v2
|
- uses: actions/checkout@v2
|
||||||
- name: Set up JDK 8
|
- name: Set up JDK 8
|
||||||
|
|||||||
@@ -22,7 +22,9 @@ import java.util.Properties
|
|||||||
|
|
||||||
import org.apache.avro.Schema
|
import org.apache.avro.Schema
|
||||||
import org.apache.avro.generic.GenericRecord
|
import org.apache.avro.generic.GenericRecord
|
||||||
|
|
||||||
import org.apache.hadoop.fs.{FileSystem, Path}
|
import org.apache.hadoop.fs.{FileSystem, Path}
|
||||||
|
|
||||||
import org.apache.hudi.client.utils.SparkRowSerDe
|
import org.apache.hudi.client.utils.SparkRowSerDe
|
||||||
import org.apache.hudi.common.config.TypedProperties
|
import org.apache.hudi.common.config.TypedProperties
|
||||||
import org.apache.hudi.common.model.HoodieRecord
|
import org.apache.hudi.common.model.HoodieRecord
|
||||||
@@ -30,9 +32,9 @@ import org.apache.hudi.common.table.HoodieTableMetaClient
|
|||||||
import org.apache.hudi.keygen.constant.KeyGeneratorOptions
|
import org.apache.hudi.keygen.constant.KeyGeneratorOptions
|
||||||
import org.apache.hudi.keygen.factory.HoodieSparkKeyGeneratorFactory
|
import org.apache.hudi.keygen.factory.HoodieSparkKeyGeneratorFactory
|
||||||
import org.apache.hudi.keygen.{BaseKeyGenerator, CustomAvroKeyGenerator, CustomKeyGenerator, KeyGenerator}
|
import org.apache.hudi.keygen.{BaseKeyGenerator, CustomAvroKeyGenerator, CustomKeyGenerator, KeyGenerator}
|
||||||
|
|
||||||
import org.apache.spark.SPARK_VERSION
|
import org.apache.spark.SPARK_VERSION
|
||||||
import org.apache.spark.rdd.RDD
|
import org.apache.spark.rdd.RDD
|
||||||
import org.apache.spark.sql.avro.SchemaConverters
|
|
||||||
import org.apache.spark.sql.catalyst.encoders.RowEncoder
|
import org.apache.spark.sql.catalyst.encoders.RowEncoder
|
||||||
import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Expression, Literal}
|
import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Expression, Literal}
|
||||||
import org.apache.spark.sql.execution.datasources.{FileStatusCache, InMemoryFileIndex}
|
import org.apache.spark.sql.execution.datasources.{FileStatusCache, InMemoryFileIndex}
|
||||||
@@ -137,13 +139,13 @@ object HoodieSparkUtils extends SparkAdapterSupport {
|
|||||||
def createRddInternal(df: DataFrame, writeSchema: Schema, latestTableSchema: Schema, structName: String, recordNamespace: String)
|
def createRddInternal(df: DataFrame, writeSchema: Schema, latestTableSchema: Schema, structName: String, recordNamespace: String)
|
||||||
: RDD[GenericRecord] = {
|
: RDD[GenericRecord] = {
|
||||||
// Use the write avro schema to derive the StructType which has the correct nullability information
|
// Use the write avro schema to derive the StructType which has the correct nullability information
|
||||||
val writeDataType = SchemaConverters.toSqlType(writeSchema).dataType.asInstanceOf[StructType]
|
val writeDataType = AvroConversionUtils.convertAvroSchemaToStructType(writeSchema)
|
||||||
val encoder = RowEncoder.apply(writeDataType).resolveAndBind()
|
val encoder = RowEncoder.apply(writeDataType).resolveAndBind()
|
||||||
val deserializer = sparkAdapter.createSparkRowSerDe(encoder)
|
val deserializer = sparkAdapter.createSparkRowSerDe(encoder)
|
||||||
// if records were serialized with old schema, but an evolved schema was passed in with latestTableSchema, we need
|
// if records were serialized with old schema, but an evolved schema was passed in with latestTableSchema, we need
|
||||||
// latestTableSchema equivalent datatype to be passed in to AvroConversionHelper.createConverterToAvro()
|
// latestTableSchema equivalent datatype to be passed in to AvroConversionHelper.createConverterToAvro()
|
||||||
val reconciledDataType =
|
val reconciledDataType =
|
||||||
if (latestTableSchema != null) SchemaConverters.toSqlType(latestTableSchema).dataType.asInstanceOf[StructType] else writeDataType
|
if (latestTableSchema != null) AvroConversionUtils.convertAvroSchemaToStructType(latestTableSchema) else writeDataType
|
||||||
// Note: deserializer.deserializeRow(row) is not capable of handling evolved schema. i.e. if Row was serialized in
|
// Note: deserializer.deserializeRow(row) is not capable of handling evolved schema. i.e. if Row was serialized in
|
||||||
// old schema, but deserializer was created with an encoder with evolved schema, deserialization fails.
|
// old schema, but deserializer was created with an encoder with evolved schema, deserialization fails.
|
||||||
// Hence we always need to deserialize in the same schema as serialized schema.
|
// Hence we always need to deserialize in the same schema as serialized schema.
|
||||||
|
|||||||
@@ -21,18 +21,19 @@ package org.apache.hudi.integ.testsuite.utils
|
|||||||
|
|
||||||
import org.apache.avro.Schema
|
import org.apache.avro.Schema
|
||||||
import org.apache.avro.generic.GenericRecord
|
import org.apache.avro.generic.GenericRecord
|
||||||
import org.apache.hudi.HoodieSparkUtils
|
|
||||||
|
import org.apache.hudi.{AvroConversionUtils, HoodieSparkUtils}
|
||||||
import org.apache.hudi.common.model.HoodieRecord
|
import org.apache.hudi.common.model.HoodieRecord
|
||||||
import org.apache.hudi.common.util.Option
|
import org.apache.hudi.common.util.Option
|
||||||
import org.apache.hudi.integ.testsuite.configuration.DeltaConfig.Config
|
import org.apache.hudi.integ.testsuite.configuration.DeltaConfig.Config
|
||||||
import org.apache.hudi.integ.testsuite.generator.GenericRecordFullPayloadGenerator
|
import org.apache.hudi.integ.testsuite.generator.GenericRecordFullPayloadGenerator
|
||||||
import org.apache.hudi.integ.testsuite.utils.SparkSqlUtils.getFieldNamesAndTypes
|
|
||||||
import org.apache.hudi.utilities.schema.RowBasedSchemaProvider
|
import org.apache.hudi.utilities.schema.RowBasedSchemaProvider
|
||||||
|
|
||||||
import org.apache.spark.api.java.JavaRDD
|
import org.apache.spark.api.java.JavaRDD
|
||||||
import org.apache.spark.sql.avro.SchemaConverters
|
|
||||||
import org.apache.spark.sql.types.StructType
|
import org.apache.spark.sql.types.StructType
|
||||||
import org.apache.spark.sql.{AnalysisException, SparkSession}
|
import org.apache.spark.sql.SparkSession
|
||||||
import org.apache.spark.storage.StorageLevel
|
import org.apache.spark.storage.StorageLevel
|
||||||
|
|
||||||
import org.slf4j.Logger
|
import org.slf4j.Logger
|
||||||
|
|
||||||
import scala.math.BigDecimal.RoundingMode.RoundingMode
|
import scala.math.BigDecimal.RoundingMode.RoundingMode
|
||||||
@@ -139,7 +140,7 @@ object SparkSqlUtils {
|
|||||||
*/
|
*/
|
||||||
def getFieldNamesAndTypes(avroSchemaString: String): Array[(String, String)] = {
|
def getFieldNamesAndTypes(avroSchemaString: String): Array[(String, String)] = {
|
||||||
val schema = new Schema.Parser().parse(avroSchemaString)
|
val schema = new Schema.Parser().parse(avroSchemaString)
|
||||||
val structType = SchemaConverters.toSqlType(schema).dataType.asInstanceOf[StructType]
|
val structType = AvroConversionUtils.convertAvroSchemaToStructType(schema)
|
||||||
structType.fields.map(field => (field.name, field.dataType.simpleString))
|
structType.fields.map(field => (field.name, field.dataType.simpleString))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -18,6 +18,7 @@
|
|||||||
package org.apache.hudi
|
package org.apache.hudi
|
||||||
|
|
||||||
import org.apache.hadoop.fs.Path
|
import org.apache.hadoop.fs.Path
|
||||||
|
|
||||||
import org.apache.hudi.DataSourceReadOptions._
|
import org.apache.hudi.DataSourceReadOptions._
|
||||||
import org.apache.hudi.common.model.{HoodieFileFormat, HoodieRecord}
|
import org.apache.hudi.common.model.{HoodieFileFormat, HoodieRecord}
|
||||||
import org.apache.hudi.DataSourceWriteOptions.{BOOTSTRAP_OPERATION_OPT_VAL, OPERATION}
|
import org.apache.hudi.DataSourceWriteOptions.{BOOTSTRAP_OPERATION_OPT_VAL, OPERATION}
|
||||||
@@ -26,8 +27,9 @@ import org.apache.hudi.common.model.HoodieTableType.{COPY_ON_WRITE, MERGE_ON_REA
|
|||||||
import org.apache.hudi.common.table.{HoodieTableMetaClient, TableSchemaResolver}
|
import org.apache.hudi.common.table.{HoodieTableMetaClient, TableSchemaResolver}
|
||||||
import org.apache.hudi.exception.HoodieException
|
import org.apache.hudi.exception.HoodieException
|
||||||
import org.apache.hudi.hadoop.HoodieROTablePathFilter
|
import org.apache.hudi.hadoop.HoodieROTablePathFilter
|
||||||
|
|
||||||
import org.apache.log4j.LogManager
|
import org.apache.log4j.LogManager
|
||||||
import org.apache.spark.sql.avro.SchemaConverters
|
|
||||||
import org.apache.spark.sql.execution.datasources.{DataSource, FileStatusCache, HadoopFsRelation}
|
import org.apache.spark.sql.execution.datasources.{DataSource, FileStatusCache, HadoopFsRelation}
|
||||||
import org.apache.spark.sql.execution.datasources.orc.OrcFileFormat
|
import org.apache.spark.sql.execution.datasources.orc.OrcFileFormat
|
||||||
import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat
|
import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat
|
||||||
@@ -217,8 +219,7 @@ class DefaultSource extends RelationProvider
|
|||||||
// the table schema evolution.
|
// the table schema evolution.
|
||||||
val tableSchemaResolver = new TableSchemaResolver(metaClient)
|
val tableSchemaResolver = new TableSchemaResolver(metaClient)
|
||||||
try {
|
try {
|
||||||
Some(SchemaConverters.toSqlType(tableSchemaResolver.getTableAvroSchema)
|
Some(AvroConversionUtils.convertAvroSchemaToStructType(tableSchemaResolver.getTableAvroSchema))
|
||||||
.dataType.asInstanceOf[StructType])
|
|
||||||
} catch {
|
} catch {
|
||||||
case _: Throwable =>
|
case _: Throwable =>
|
||||||
None // If there is no commit in the table, we can not get the schema
|
None // If there is no commit in the table, we can not get the schema
|
||||||
|
|||||||
@@ -18,6 +18,7 @@
|
|||||||
package org.apache.hudi
|
package org.apache.hudi
|
||||||
|
|
||||||
import org.apache.hadoop.fs.{FileStatus, Path}
|
import org.apache.hadoop.fs.{FileStatus, Path}
|
||||||
|
|
||||||
import org.apache.hudi.DataSourceReadOptions.{QUERY_TYPE, QUERY_TYPE_SNAPSHOT_OPT_VAL}
|
import org.apache.hudi.DataSourceReadOptions.{QUERY_TYPE, QUERY_TYPE_SNAPSHOT_OPT_VAL}
|
||||||
import org.apache.hudi.client.common.HoodieSparkEngineContext
|
import org.apache.hudi.client.common.HoodieSparkEngineContext
|
||||||
import org.apache.hudi.common.config.HoodieMetadataConfig
|
import org.apache.hudi.common.config.HoodieMetadataConfig
|
||||||
@@ -26,10 +27,10 @@ import org.apache.hudi.common.model.FileSlice
|
|||||||
import org.apache.hudi.common.model.HoodieTableType.MERGE_ON_READ
|
import org.apache.hudi.common.model.HoodieTableType.MERGE_ON_READ
|
||||||
import org.apache.hudi.common.table.view.{FileSystemViewStorageConfig, HoodieTableFileSystemView}
|
import org.apache.hudi.common.table.view.{FileSystemViewStorageConfig, HoodieTableFileSystemView}
|
||||||
import org.apache.hudi.common.table.{HoodieTableMetaClient, TableSchemaResolver}
|
import org.apache.hudi.common.table.{HoodieTableMetaClient, TableSchemaResolver}
|
||||||
|
|
||||||
import org.apache.spark.api.java.JavaSparkContext
|
import org.apache.spark.api.java.JavaSparkContext
|
||||||
import org.apache.spark.internal.Logging
|
import org.apache.spark.internal.Logging
|
||||||
import org.apache.spark.sql.{Column, SparkSession}
|
import org.apache.spark.sql.{Column, SparkSession}
|
||||||
import org.apache.spark.sql.avro.SchemaConverters
|
|
||||||
import org.apache.spark.sql.catalyst.expressions.{And, AttributeReference, BoundReference, Expression, InterpretedPredicate}
|
import org.apache.spark.sql.catalyst.expressions.{And, AttributeReference, BoundReference, Expression, InterpretedPredicate}
|
||||||
import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, DateTimeUtils}
|
import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, DateTimeUtils}
|
||||||
import org.apache.spark.sql.catalyst.{InternalRow, expressions}
|
import org.apache.spark.sql.catalyst.{InternalRow, expressions}
|
||||||
@@ -38,6 +39,7 @@ import org.apache.spark.sql.hudi.{DataSkippingUtils, HoodieSqlUtils}
|
|||||||
import org.apache.spark.sql.internal.SQLConf
|
import org.apache.spark.sql.internal.SQLConf
|
||||||
import org.apache.spark.sql.types.StructType
|
import org.apache.spark.sql.types.StructType
|
||||||
import org.apache.spark.unsafe.types.UTF8String
|
import org.apache.spark.unsafe.types.UTF8String
|
||||||
|
|
||||||
import java.util.Properties
|
import java.util.Properties
|
||||||
|
|
||||||
import scala.collection.JavaConverters._
|
import scala.collection.JavaConverters._
|
||||||
@@ -96,8 +98,7 @@ case class HoodieFileIndex(
|
|||||||
*/
|
*/
|
||||||
lazy val schema: StructType = schemaSpec.getOrElse({
|
lazy val schema: StructType = schemaSpec.getOrElse({
|
||||||
val schemaUtil = new TableSchemaResolver(metaClient)
|
val schemaUtil = new TableSchemaResolver(metaClient)
|
||||||
SchemaConverters.toSqlType(schemaUtil.getTableAvroSchema)
|
AvroConversionUtils.convertAvroSchemaToStructType(schemaUtil.getTableAvroSchema)
|
||||||
.dataType.asInstanceOf[StructType]
|
|
||||||
})
|
})
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|||||||
@@ -23,7 +23,8 @@ import java.util.{Date, Locale, Properties}
|
|||||||
|
|
||||||
import org.apache.hadoop.conf.Configuration
|
import org.apache.hadoop.conf.Configuration
|
||||||
import org.apache.hadoop.fs.Path
|
import org.apache.hadoop.fs.Path
|
||||||
import org.apache.hudi.SparkAdapterSupport
|
|
||||||
|
import org.apache.hudi.{AvroConversionUtils, SparkAdapterSupport}
|
||||||
import org.apache.hudi.client.common.HoodieSparkEngineContext
|
import org.apache.hudi.client.common.HoodieSparkEngineContext
|
||||||
import org.apache.hudi.common.config.DFSPropertiesConfiguration
|
import org.apache.hudi.common.config.DFSPropertiesConfiguration
|
||||||
import org.apache.hudi.common.config.HoodieMetadataConfig
|
import org.apache.hudi.common.config.HoodieMetadataConfig
|
||||||
@@ -31,9 +32,8 @@ import org.apache.hudi.common.fs.FSUtils
|
|||||||
import org.apache.hudi.common.model.HoodieRecord
|
import org.apache.hudi.common.model.HoodieRecord
|
||||||
import org.apache.hudi.common.table.{HoodieTableMetaClient, TableSchemaResolver}
|
import org.apache.hudi.common.table.{HoodieTableMetaClient, TableSchemaResolver}
|
||||||
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline
|
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline
|
||||||
|
|
||||||
import org.apache.spark.SPARK_VERSION
|
import org.apache.spark.SPARK_VERSION
|
||||||
import org.apache.spark.api.java.JavaSparkContext
|
|
||||||
import org.apache.spark.sql.avro.SchemaConverters
|
|
||||||
import org.apache.spark.sql.{Column, DataFrame, SparkSession}
|
import org.apache.spark.sql.{Column, DataFrame, SparkSession}
|
||||||
import org.apache.spark.sql.catalyst.TableIdentifier
|
import org.apache.spark.sql.catalyst.TableIdentifier
|
||||||
import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation
|
import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation
|
||||||
@@ -46,6 +46,7 @@ import org.apache.spark.api.java.JavaSparkContext
|
|||||||
import org.apache.spark.sql.types.{DataType, NullType, StringType, StructField, StructType}
|
import org.apache.spark.sql.types.{DataType, NullType, StringType, StructField, StructType}
|
||||||
|
|
||||||
import java.text.SimpleDateFormat
|
import java.text.SimpleDateFormat
|
||||||
|
|
||||||
import scala.collection.immutable.Map
|
import scala.collection.immutable.Map
|
||||||
|
|
||||||
object HoodieSqlUtils extends SparkAdapterSupport {
|
object HoodieSqlUtils extends SparkAdapterSupport {
|
||||||
@@ -83,8 +84,7 @@ object HoodieSqlUtils extends SparkAdapterSupport {
|
|||||||
catch {
|
catch {
|
||||||
case _: Throwable => None
|
case _: Throwable => None
|
||||||
}
|
}
|
||||||
avroSchema.map(SchemaConverters.toSqlType(_).dataType
|
avroSchema.map(AvroConversionUtils.convertAvroSchemaToStructType).map(removeMetaFields)
|
||||||
.asInstanceOf[StructType]).map(removeMetaFields)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
def getAllPartitionPaths(spark: SparkSession, table: CatalogTable): Seq[String] = {
|
def getAllPartitionPaths(spark: SparkSession, table: CatalogTable): Seq[String] = {
|
||||||
|
|||||||
@@ -19,10 +19,13 @@ package org.apache.spark.sql.hudi.command.payload
|
|||||||
|
|
||||||
import java.util.{Base64, Properties}
|
import java.util.{Base64, Properties}
|
||||||
import java.util.concurrent.Callable
|
import java.util.concurrent.Callable
|
||||||
import scala.collection.JavaConverters._
|
|
||||||
import com.google.common.cache.CacheBuilder
|
import com.google.common.cache.CacheBuilder
|
||||||
|
|
||||||
import org.apache.avro.Schema
|
import org.apache.avro.Schema
|
||||||
import org.apache.avro.generic.{GenericData, GenericRecord, IndexedRecord}
|
import org.apache.avro.generic.{GenericData, GenericRecord, IndexedRecord}
|
||||||
|
|
||||||
|
import org.apache.hudi.AvroConversionUtils
|
||||||
import org.apache.hudi.DataSourceWriteOptions._
|
import org.apache.hudi.DataSourceWriteOptions._
|
||||||
import org.apache.hudi.avro.HoodieAvroUtils
|
import org.apache.hudi.avro.HoodieAvroUtils
|
||||||
import org.apache.hudi.avro.HoodieAvroUtils.bytesToAvro
|
import org.apache.hudi.avro.HoodieAvroUtils.bytesToAvro
|
||||||
@@ -31,12 +34,14 @@ import org.apache.hudi.common.util.{ValidationUtils, Option => HOption}
|
|||||||
import org.apache.hudi.config.HoodieWriteConfig
|
import org.apache.hudi.config.HoodieWriteConfig
|
||||||
import org.apache.hudi.io.HoodieWriteHandle
|
import org.apache.hudi.io.HoodieWriteHandle
|
||||||
import org.apache.hudi.sql.IExpressionEvaluator
|
import org.apache.hudi.sql.IExpressionEvaluator
|
||||||
|
|
||||||
import org.apache.spark.sql.avro.{AvroSerializer, HoodieAvroSerializer, SchemaConverters}
|
import org.apache.spark.sql.avro.{AvroSerializer, HoodieAvroSerializer, SchemaConverters}
|
||||||
import org.apache.spark.sql.catalyst.expressions.Expression
|
import org.apache.spark.sql.catalyst.expressions.Expression
|
||||||
import org.apache.spark.sql.hudi.SerDeUtils
|
import org.apache.spark.sql.hudi.SerDeUtils
|
||||||
import org.apache.spark.sql.hudi.command.payload.ExpressionPayload.getEvaluator
|
import org.apache.spark.sql.hudi.command.payload.ExpressionPayload.getEvaluator
|
||||||
import org.apache.spark.sql.types.{StructField, StructType}
|
import org.apache.spark.sql.types.{StructField, StructType}
|
||||||
|
|
||||||
|
import scala.collection.JavaConverters._
|
||||||
import scala.collection.mutable.ArrayBuffer
|
import scala.collection.mutable.ArrayBuffer
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@@ -309,7 +314,7 @@ object ExpressionPayload {
|
|||||||
SchemaConverters.toAvroType(conditionType), false)
|
SchemaConverters.toAvroType(conditionType), false)
|
||||||
val conditionEvaluator = ExpressionCodeGen.doCodeGen(Seq(condition), conditionSerializer)
|
val conditionEvaluator = ExpressionCodeGen.doCodeGen(Seq(condition), conditionSerializer)
|
||||||
|
|
||||||
val assignSqlType = SchemaConverters.toSqlType(writeSchema).dataType.asInstanceOf[StructType]
|
val assignSqlType = AvroConversionUtils.convertAvroSchemaToStructType(writeSchema)
|
||||||
val assignSerializer = new HoodieAvroSerializer(assignSqlType, writeSchema, false)
|
val assignSerializer = new HoodieAvroSerializer(assignSqlType, writeSchema, false)
|
||||||
val assignmentEvaluator = ExpressionCodeGen.doCodeGen(assignments, assignSerializer)
|
val assignmentEvaluator = ExpressionCodeGen.doCodeGen(assignments, assignSerializer)
|
||||||
conditionEvaluator -> assignmentEvaluator
|
conditionEvaluator -> assignmentEvaluator
|
||||||
|
|||||||
@@ -19,16 +19,18 @@ package org.apache.spark.sql.hudi.command.payload
|
|||||||
|
|
||||||
import org.apache.avro.generic.IndexedRecord
|
import org.apache.avro.generic.IndexedRecord
|
||||||
import org.apache.avro.Schema
|
import org.apache.avro.Schema
|
||||||
import org.apache.spark.sql.avro.{HooodieAvroDeserializer, SchemaConverters}
|
|
||||||
|
import org.apache.hudi.AvroConversionUtils
|
||||||
|
|
||||||
|
import org.apache.spark.sql.avro.HooodieAvroDeserializer
|
||||||
import org.apache.spark.sql.catalyst.InternalRow
|
import org.apache.spark.sql.catalyst.InternalRow
|
||||||
import org.apache.spark.sql.types._
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* A sql typed record which will convert the avro field to sql typed value.
|
* A sql typed record which will convert the avro field to sql typed value.
|
||||||
*/
|
*/
|
||||||
class SqlTypedRecord(val record: IndexedRecord) extends IndexedRecord {
|
class SqlTypedRecord(val record: IndexedRecord) extends IndexedRecord {
|
||||||
|
|
||||||
private lazy val sqlType = SchemaConverters.toSqlType(getSchema).dataType.asInstanceOf[StructType]
|
private lazy val sqlType = AvroConversionUtils.convertAvroSchemaToStructType(getSchema)
|
||||||
private lazy val avroDeserializer = HooodieAvroDeserializer(record.getSchema, sqlType)
|
private lazy val avroDeserializer = HooodieAvroDeserializer(record.getSchema, sqlType)
|
||||||
private lazy val sqlRow = avroDeserializer.deserializeData(record).asInstanceOf[InternalRow]
|
private lazy val sqlRow = avroDeserializer.deserializeData(record).asInstanceOf[InternalRow]
|
||||||
|
|
||||||
|
|||||||
@@ -22,16 +22,17 @@ import java.nio.charset.StandardCharsets
|
|||||||
import java.util.Date
|
import java.util.Date
|
||||||
|
|
||||||
import org.apache.hadoop.fs.Path
|
import org.apache.hadoop.fs.Path
|
||||||
import org.apache.hudi.{DataSourceReadOptions, IncrementalRelation, MergeOnReadIncrementalRelation, SparkAdapterSupport}
|
|
||||||
|
import org.apache.hudi.{AvroConversionUtils, DataSourceReadOptions, IncrementalRelation, MergeOnReadIncrementalRelation, SparkAdapterSupport}
|
||||||
import org.apache.hudi.common.model.HoodieTableType
|
import org.apache.hudi.common.model.HoodieTableType
|
||||||
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline
|
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline
|
||||||
import org.apache.hudi.common.table.{HoodieTableMetaClient, TableSchemaResolver}
|
import org.apache.hudi.common.table.{HoodieTableMetaClient, TableSchemaResolver}
|
||||||
import org.apache.hudi.common.util.{FileIOUtils, TablePathUtils}
|
import org.apache.hudi.common.util.{FileIOUtils, TablePathUtils}
|
||||||
|
|
||||||
import org.apache.spark.sql.hudi.streaming.HoodieStreamSource.VERSION
|
import org.apache.spark.sql.hudi.streaming.HoodieStreamSource.VERSION
|
||||||
import org.apache.spark.sql.hudi.streaming.HoodieSourceOffset.INIT_OFFSET
|
import org.apache.spark.sql.hudi.streaming.HoodieSourceOffset.INIT_OFFSET
|
||||||
import org.apache.spark.internal.Logging
|
import org.apache.spark.internal.Logging
|
||||||
import org.apache.spark.rdd.RDD
|
import org.apache.spark.rdd.RDD
|
||||||
import org.apache.spark.sql.avro.SchemaConverters
|
|
||||||
import org.apache.spark.sql.catalyst.InternalRow
|
import org.apache.spark.sql.catalyst.InternalRow
|
||||||
import org.apache.spark.sql.catalyst.encoders.RowEncoder
|
import org.apache.spark.sql.catalyst.encoders.RowEncoder
|
||||||
import org.apache.spark.sql.execution.streaming.{HDFSMetadataLog, Offset, Source}
|
import org.apache.spark.sql.execution.streaming.{HDFSMetadataLog, Offset, Source}
|
||||||
@@ -118,8 +119,7 @@ class HoodieStreamSource(
|
|||||||
override def schema: StructType = {
|
override def schema: StructType = {
|
||||||
schemaOption.getOrElse {
|
schemaOption.getOrElse {
|
||||||
val schemaUtil = new TableSchemaResolver(metaClient)
|
val schemaUtil = new TableSchemaResolver(metaClient)
|
||||||
SchemaConverters.toSqlType(schemaUtil.getTableAvroSchema)
|
AvroConversionUtils.convertAvroSchemaToStructType(schemaUtil.getTableAvroSchema)
|
||||||
.dataType.asInstanceOf[StructType]
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -50,9 +50,5 @@ public class TestReflectUtil extends HoodieClientTestBase {
|
|||||||
|
|
||||||
Assertions.assertTrue(
|
Assertions.assertTrue(
|
||||||
((UnresolvedRelation)newStatment.table()).multipartIdentifier().contains("test_reflect_util"));
|
((UnresolvedRelation)newStatment.table()).multipartIdentifier().contains("test_reflect_util"));
|
||||||
|
|
||||||
if (!spark.version().startsWith("3.0")) {
|
|
||||||
Assertions.assertTrue(newStatment.userSpecifiedCols().isEmpty());
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user