1
0

[HUDI-1869] Upgrading Spark3 To 3.1 (#3844)

Co-authored-by: pengzhiwei <pengzhiwei2015@icloud.com>
This commit is contained in:
Yann Byron
2021-11-03 09:25:12 +08:00
committed by GitHub
parent dee3a14aae
commit 1f17467f73
22 changed files with 315 additions and 57 deletions

View File

@@ -28,7 +28,7 @@ import org.apache.hudi.exception.HoodieException
import org.apache.hudi.hadoop.config.HoodieRealtimeConfig
import org.apache.hudi.hadoop.utils.HoodieInputFormatUtils.HOODIE_RECORD_KEY_COL_POS
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.avro.{AvroDeserializer, AvroSerializer}
import org.apache.spark.sql.avro.{HoodieAvroSerializer, HooodieAvroDeserializer}
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.{SpecificInternalRow, UnsafeProjection}
import org.apache.spark.sql.execution.datasources.PartitionedFile
@@ -119,7 +119,7 @@ class HoodieMergeOnReadRDD(@transient sc: SparkContext,
tableState.requiredStructSchema
.map(f => tableAvroSchema.getField(f.name).pos()).toList
private val recordBuilder = new GenericRecordBuilder(requiredAvroSchema)
private val deserializer = new AvroDeserializer(requiredAvroSchema, tableState.requiredStructSchema)
private val deserializer = HooodieAvroDeserializer(requiredAvroSchema, tableState.requiredStructSchema)
private val unsafeProjection = UnsafeProjection.create(tableState.requiredStructSchema)
private val logRecords = HoodieMergeOnReadRDD.scanLog(split, tableAvroSchema, config).getRecords
private val logRecordsKeyIterator = logRecords.keySet().iterator().asScala
@@ -135,7 +135,7 @@ class HoodieMergeOnReadRDD(@transient sc: SparkContext,
} else {
val requiredAvroRecord = AvroConversionUtils
.buildAvroRecordBySchema(curAvroRecord.get(), requiredAvroSchema, requiredFieldPosition, recordBuilder)
recordToLoad = unsafeProjection(deserializer.deserialize(requiredAvroRecord).asInstanceOf[InternalRow])
recordToLoad = unsafeProjection(deserializer.deserializeData(requiredAvroRecord).asInstanceOf[InternalRow])
true
}
} else {
@@ -158,7 +158,7 @@ class HoodieMergeOnReadRDD(@transient sc: SparkContext,
tableState.requiredStructSchema
.map(f => tableAvroSchema.getField(f.name).pos()).toList
private val recordBuilder = new GenericRecordBuilder(requiredAvroSchema)
private val deserializer = new AvroDeserializer(requiredAvroSchema, tableState.requiredStructSchema)
private val deserializer = HooodieAvroDeserializer(requiredAvroSchema, tableState.requiredStructSchema)
private val unsafeProjection = UnsafeProjection.create(tableState.requiredStructSchema)
private val logRecords = HoodieMergeOnReadRDD.scanLog(split, tableAvroSchema, config).getRecords
private val logRecordsKeyIterator = logRecords.keySet().iterator().asScala
@@ -180,7 +180,7 @@ class HoodieMergeOnReadRDD(@transient sc: SparkContext,
} else {
val requiredAvroRecord = AvroConversionUtils
.buildAvroRecordBySchema(curAvroRecord.get(), requiredAvroSchema, requiredFieldPosition, recordBuilder)
recordToLoad = unsafeProjection(deserializer.deserialize(requiredAvroRecord).asInstanceOf[InternalRow])
recordToLoad = unsafeProjection(deserializer.deserializeData(requiredAvroRecord).asInstanceOf[InternalRow])
true
}
} else {
@@ -203,8 +203,8 @@ class HoodieMergeOnReadRDD(@transient sc: SparkContext,
private val requiredFieldPosition =
tableState.requiredStructSchema
.map(f => tableAvroSchema.getField(f.name).pos()).toList
private val serializer = new AvroSerializer(tableState.tableStructSchema, tableAvroSchema, false)
private val requiredDeserializer = new AvroDeserializer(requiredAvroSchema, tableState.requiredStructSchema)
private val serializer = HoodieAvroSerializer(tableState.tableStructSchema, tableAvroSchema, false)
private val requiredDeserializer = HooodieAvroDeserializer(requiredAvroSchema, tableState.requiredStructSchema)
private val recordBuilder = new GenericRecordBuilder(requiredAvroSchema)
private val unsafeProjection = UnsafeProjection.create(tableState.requiredStructSchema)
private val logRecords = HoodieMergeOnReadRDD.scanLog(split, tableAvroSchema, config).getRecords
@@ -236,7 +236,7 @@ class HoodieMergeOnReadRDD(@transient sc: SparkContext,
recordBuilder
)
recordToLoad = unsafeProjection(requiredDeserializer
.deserialize(requiredAvroRecord).asInstanceOf[InternalRow])
.deserializeData(requiredAvroRecord).asInstanceOf[InternalRow])
true
}
} else {
@@ -264,7 +264,7 @@ class HoodieMergeOnReadRDD(@transient sc: SparkContext,
recordBuilder
)
recordToLoad = unsafeProjection(requiredDeserializer
.deserialize(requiredAvroRecord).asInstanceOf[InternalRow])
.deserializeData(requiredAvroRecord).asInstanceOf[InternalRow])
true
}
}

View File

@@ -0,0 +1,28 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.avro
import org.apache.avro.Schema
import org.apache.spark.sql.types.DataType
/**
* As AvroSerializer cannot be access out of the spark.sql.avro package since spark 3.1, we define
* this class to be accessed by other class.
*/
case class HoodieAvroSerializer(rootCatalystType: DataType, rootAvroType: Schema, nullable: Boolean)
extends AvroSerializer(rootCatalystType, rootAvroType, nullable)

View File

@@ -0,0 +1,36 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.avro
import org.apache.avro.Schema
import org.apache.spark.sql.types.DataType
/**
* This is to be compatible with the type returned by Spark 3.1
* and other spark versions for AvroDeserializer
*/
case class HooodieAvroDeserializer(rootAvroType: Schema, rootCatalystType: DataType)
extends AvroDeserializer(rootAvroType, rootCatalystType) {
def deserializeData(data: Any): Any = {
super.deserialize(data) match {
case Some(r) => r // spark 3.1 return type is Option, we fetch the data.
case o => o // for other spark version, return the data directly.
}
}
}

View File

@@ -31,7 +31,7 @@ import org.apache.hudi.common.util.{ValidationUtils, Option => HOption}
import org.apache.hudi.config.HoodieWriteConfig
import org.apache.hudi.io.HoodieWriteHandle
import org.apache.hudi.sql.IExpressionEvaluator
import org.apache.spark.sql.avro.{AvroSerializer, SchemaConverters}
import org.apache.spark.sql.avro.{AvroSerializer, HoodieAvroSerializer, SchemaConverters}
import org.apache.spark.sql.catalyst.expressions.Expression
import org.apache.spark.sql.hudi.SerDeUtils
import org.apache.spark.sql.hudi.command.payload.ExpressionPayload.getEvaluator
@@ -310,7 +310,7 @@ object ExpressionPayload {
val conditionEvaluator = ExpressionCodeGen.doCodeGen(Seq(condition), conditionSerializer)
val assignSqlType = SchemaConverters.toSqlType(writeSchema).dataType.asInstanceOf[StructType]
val assignSerializer = new AvroSerializer(assignSqlType, writeSchema, false)
val assignSerializer = new HoodieAvroSerializer(assignSqlType, writeSchema, false)
val assignmentEvaluator = ExpressionCodeGen.doCodeGen(assignments, assignSerializer)
conditionEvaluator -> assignmentEvaluator
}

View File

@@ -19,7 +19,7 @@ package org.apache.spark.sql.hudi.command.payload
import org.apache.avro.generic.IndexedRecord
import org.apache.avro.Schema
import org.apache.spark.sql.avro.{AvroDeserializer, SchemaConverters}
import org.apache.spark.sql.avro.{HooodieAvroDeserializer, SchemaConverters}
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.types._
@@ -29,8 +29,8 @@ import org.apache.spark.sql.types._
class SqlTypedRecord(val record: IndexedRecord) extends IndexedRecord {
private lazy val sqlType = SchemaConverters.toSqlType(getSchema).dataType.asInstanceOf[StructType]
private lazy val avroDeserializer = new AvroDeserializer(record.getSchema, sqlType)
private lazy val sqlRow = avroDeserializer.deserialize(record).asInstanceOf[InternalRow]
private lazy val avroDeserializer = HooodieAvroDeserializer(record.getSchema, sqlType)
private lazy val sqlRow = avroDeserializer.deserializeData(record).asInstanceOf[InternalRow]
override def put(i: Int, v: Any): Unit = {
record.put(i, v)

View File

@@ -62,7 +62,7 @@ class HoodieCommonSqlParser(session: SparkSession, delegate: ParserInterface)
}
def parseMultipartIdentifier(sqlText: String): Seq[String] = {
throw new UnsupportedOperationException(s"Unsupported parseMultipartIdentifier method")
sparkAdapter.parseMultipartIdentifier(delegate, sqlText)
}
protected def parse[T](command: String)(toResult: HoodieSqlCommonParser => T): T = {

View File

@@ -137,7 +137,8 @@ class TestDataSourceForBootstrap {
verifyIncrementalViewResult(commitInstantTime1, commitInstantTime2, isPartitioned = false, isHiveStylePartitioned = false)
}
@Test def testMetadataBootstrapCOWHiveStylePartitioned(): Unit = {
@Test
def testMetadataBootstrapCOWHiveStylePartitioned(): Unit = {
val timestamp = Instant.now.toEpochMilli
val jsc = JavaSparkContext.fromSparkContext(spark.sparkContext)

View File

@@ -46,8 +46,8 @@ class TestAlterTableDropPartition extends TestHoodieSqlBase {
// insert data
spark.sql(s"""insert into $tableName values (1, "z3", "v1", "2021-10-01"), (2, "l4", "v1", "2021-10-02")""")
checkException(s"alter table $tableName drop partition (dt='2021-10-01')")(
s"dt is not a valid partition column in table `default`.`${tableName}`.;")
checkExceptionContain(s"alter table $tableName drop partition (dt='2021-10-01')")(
s"dt is not a valid partition column in table `default`.`$tableName`.")
}
Seq(false, true).foreach { urlencode =>
@@ -115,12 +115,8 @@ class TestAlterTableDropPartition extends TestHoodieSqlBase {
spark.sql(s"""insert into $tableName values (1, "z3", "v1", "2021-10-01"), (2, "l4", "v1", "2021-10-02")""")
// specify duplicate partition columns
try {
spark.sql(s"alter table $tableName drop partition (dt='2021-10-01', dt='2021-10-02')")
} catch {
case NonFatal(e) =>
assert(e.getMessage.contains("Found duplicate keys 'dt'"))
}
checkExceptionContain(s"alter table $tableName drop partition (dt='2021-10-01', dt='2021-10-02')")(
"Found duplicate keys 'dt'")
// drop 2021-10-01 partition
spark.sql(s"alter table $tableName drop partition (dt='2021-10-01')")
@@ -164,8 +160,8 @@ class TestAlterTableDropPartition extends TestHoodieSqlBase {
|""".stripMargin)
// not specified all partition column
checkException(s"alter table $tableName drop partition (year='2021', month='10')")(
"All partition columns need to be specified for Hoodie's dropping partition;"
checkExceptionContain(s"alter table $tableName drop partition (year='2021', month='10')")(
"All partition columns need to be specified for Hoodie's dropping partition"
)
// drop 2021-10-01 partition
spark.sql(s"alter table $tableName drop partition (year='2021', month='10', day='01')")

View File

@@ -18,13 +18,15 @@
package org.apache.spark.sql.hudi
import java.io.File
import org.apache.log4j.Level
import org.apache.spark.sql.catalyst.util.DateTimeUtils
import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.util.Utils
import org.scalactic.source
import org.scalatest.{BeforeAndAfterAll, FunSuite, Tag}
import java.util.TimeZone
class TestHoodieSqlBase extends FunSuite with BeforeAndAfterAll {
org.apache.log4j.Logger.getRootLogger.setLevel(Level.WARN)
@@ -34,6 +36,7 @@ class TestHoodieSqlBase extends FunSuite with BeforeAndAfterAll {
dir
}
TimeZone.setDefault(DateTimeUtils.getTimeZone("CTT"))
protected lazy val spark: SparkSession = SparkSession.builder()
.master("local[1]")
.appName("hoodie sql test")
@@ -43,6 +46,7 @@ class TestHoodieSqlBase extends FunSuite with BeforeAndAfterAll {
.config("hoodie.upsert.shuffle.parallelism", "4")
.config("hoodie.delete.shuffle.parallelism", "4")
.config("spark.sql.warehouse.dir", sparkWareHouse.getCanonicalPath)
.config("spark.sql.session.timeZone", "CTT")
.getOrCreate()
private var tableId = 0
@@ -92,6 +96,19 @@ class TestHoodieSqlBase extends FunSuite with BeforeAndAfterAll {
assertResult(true)(hasException)
}
protected def checkExceptionContain(sql: String)(errorMsg: String): Unit = {
var hasException = false
try {
spark.sql(sql)
} catch {
case e: Throwable =>
assertResult(true)(e.getMessage.contains(errorMsg))
hasException = true
}
assertResult(true)(hasException)
}
protected def removeQuotes(value: Any): Any = {
value match {
case s: String => s.stripPrefix("'").stripSuffix("'")

View File

@@ -353,19 +353,7 @@ class TestMergeIntoTable2 extends TestHoodieSqlBase {
|""".stripMargin
if (HoodieSqlUtils.isSpark3) {
checkException(mergeSql)(
"\nColumns aliases are not allowed in MERGE.(line 5, pos 5)\n\n" +
"== SQL ==\n\r\n" +
s" merge into $tableName\r\n" +
" using (\r\n" +
" select 1, 'a1', 10, 1000, '1'\r\n" +
" ) s0(id,name,price,ts,flag)\r\n" +
"-----^^^\n" +
s" on s0.id = $tableName.id\r\n" +
" when matched and flag = '1' then update set\r\n" +
" id = s0.id, name = s0.name, price = s0.price, ts = s0.ts\r\n" +
" when not matched and flag = '1' then insert *\r\n"
)
checkExceptionContain(mergeSql)("Columns aliases are not allowed in MERGE")
} else {
spark.sql(mergeSql)
checkAnswer(s"select id, name, price, ts from $tableName")(

View File

@@ -98,7 +98,7 @@ class TestPartialUpdateForMergeInto extends TestHoodieSqlBase {
| preCombineField = '_ts'
|)""".stripMargin)
checkException(
checkExceptionContain(
s"""
|merge into $tableName t0
|using ( select 1 as id, 'a1' as name, 12 as price) s0
@@ -106,7 +106,7 @@ class TestPartialUpdateForMergeInto extends TestHoodieSqlBase {
|when matched then update set price = s0.price
""".stripMargin)(
"Missing specify value for the preCombineField: _ts in merge-into update action. " +
"You should add '... update set _ts = xx....' to the when-matched clause.;")
"You should add '... update set _ts = xx....' to the when-matched clause.")
val tableName2 = generateTableName
spark.sql(
@@ -123,7 +123,7 @@ class TestPartialUpdateForMergeInto extends TestHoodieSqlBase {
| preCombineField = '_ts'
|)""".stripMargin)
checkException(
checkExceptionContain(
s"""
|merge into $tableName2 t0
|using ( select 1 as id, 'a1' as name, 12 as price, 1000 as ts) s0
@@ -132,6 +132,6 @@ class TestPartialUpdateForMergeInto extends TestHoodieSqlBase {
""".stripMargin)(
"Missing specify the value for target field: 'id' in merge into update action for MOR table. " +
"Currently we cannot support partial update for MOR, please complete all the target fields " +
"just like '...update set id = s0.id, name = s0.name ....';")
"just like '...update set id = s0.id, name = s0.name ....'")
}
}