1
0

[HUDI-3423] upgrade spark to 3.2.1 (#4815)

This commit is contained in:
Yann Byron
2022-02-22 08:52:21 +08:00
committed by GitHub
parent 801fdab55c
commit 0c950181aa
8 changed files with 17 additions and 18 deletions

View File

@@ -50,7 +50,7 @@ object HoodieAnalysis {
) ++ extraPostHocResolutionRules()
def extraResolutionRules(): Seq[SparkSession => Rule[LogicalPlan]] = {
if (!HoodieSparkUtils.beforeSpark3_2()) {
if (HoodieSparkUtils.gteqSpark3_2) {
val spark3AnalysisClass = "org.apache.spark.sql.hudi.analysis.HoodieSpark3Analysis"
val spark3Analysis: SparkSession => Rule[LogicalPlan] =
session => ReflectionUtils.loadClass(spark3AnalysisClass, session).asInstanceOf[Rule[LogicalPlan]]
@@ -66,7 +66,7 @@ object HoodieAnalysis {
}
def extraPostHocResolutionRules(): Seq[SparkSession => Rule[LogicalPlan]] =
if (!HoodieSparkUtils.beforeSpark3_2()) {
if (HoodieSparkUtils.gteqSpark3_2) {
val spark3PostHocResolutionClass = "org.apache.spark.sql.hudi.analysis.HoodieSpark3PostAnalysisRule"
val spark3PostHocResolution: SparkSession => Rule[LogicalPlan] =
session => ReflectionUtils.loadClass(spark3PostHocResolutionClass, session).asInstanceOf[Rule[LogicalPlan]]

View File

@@ -95,7 +95,7 @@ class TestHoodieSparkSqlWriter {
*/
def initSparkContext(): Unit = {
val sparkConf = new SparkConf()
if (!HoodieSparkUtils.beforeSpark3_2()) {
if (HoodieSparkUtils.gteqSpark3_2) {
sparkConf.set("spark.sql.catalog.spark_catalog",
"org.apache.spark.sql.hudi.catalog.HoodieCatalog")
}

View File

@@ -191,7 +191,7 @@ class TestHoodieSparkUtils {
val genRecRDD3 = HoodieSparkUtils.createRdd(df1, "test_struct_name", "test_namespace", true,
org.apache.hudi.common.util.Option.of(schema2))
assert(genRecRDD3.collect()(0).getSchema.equals(schema2))
genRecRDD3.foreach(entry => assertNull(entry.get("nonNullableInnerStruct2")))
genRecRDD3.foreach(entry => assertNull(entry.get("nullableInnerStruct2")))
val innerStruct3 = new StructType().add("innerKey","string",false).add("innerValue", "long", true)
.add("new_nested_col","string",true)

View File

@@ -30,8 +30,7 @@ import org.apache.hudi.index.HoodieIndex.IndexType
import org.apache.hudi.keygen.NonpartitionedKeyGenerator
import org.apache.hudi.keygen.constant.KeyGeneratorOptions.Config
import org.apache.hudi.testutils.{DataSourceTestUtils, HoodieClientTestBase}
import org.apache.hudi.{DataSourceReadOptions, DataSourceWriteOptions, HoodieDataSourceHelpers}
import org.apache.hudi.{DataSourceReadOptions, DataSourceWriteOptions, HoodieDataSourceHelpers, HoodieSparkUtils}
import org.apache.log4j.LogManager
import org.apache.spark.sql._
@@ -557,7 +556,12 @@ class TestMORDataSource extends HoodieClientTestBase {
assertEquals(sampleRow.getLong(1), sampleRow.get(1))
assertEquals(sampleRow.getString(2), sampleRow.get(2))
assertEquals(sampleRow.getSeq(3), sampleRow.get(3))
assertEquals(sampleRow.getStruct(4), sampleRow.get(4))
if (HoodieSparkUtils.gteqSpark3_2) {
// Since Spark3.2, the `nation` column is parsed as String, not Struct.
assertEquals(sampleRow.getString(4), sampleRow.get(4))
} else {
assertEquals(sampleRow.getStruct(4), sampleRow.get(4))
}
}
def verifyShow(df: DataFrame): Unit = {

View File

@@ -58,7 +58,7 @@ class TestHoodieSqlBase extends FunSuite with BeforeAndAfterAll {
def sparkConf(): SparkConf = {
val sparkConf = new SparkConf()
if (!HoodieSparkUtils.beforeSpark3_2()) {
if (HoodieSparkUtils.gteqSpark3_2) {
sparkConf.set("spark.sql.catalog.spark_catalog",
"org.apache.spark.sql.hudi.catalog.HoodieCatalog")
}