[HUDI-4017] Improve spark sql coverage in CI (#5512)
Add GitHub actions tasks to run spark sql UTs under spark 3.1 and 3.2.
This commit is contained in:
8
.github/workflows/bot.yml
vendored
8
.github/workflows/bot.yml
vendored
@@ -59,3 +59,11 @@ jobs:
|
|||||||
if: ${{ !endsWith(env.SPARK_PROFILE, '3.2') }} # skip test spark 3.2 before hadoop upgrade to 3.x
|
if: ${{ !endsWith(env.SPARK_PROFILE, '3.2') }} # skip test spark 3.2 before hadoop upgrade to 3.x
|
||||||
run:
|
run:
|
||||||
mvn test -Punit-tests -D"$SCALA_PROFILE" -D"$SPARK_PROFILE" -D"$FLINK_PROFILE" -DfailIfNoTests=false -pl hudi-examples/hudi-examples-flink,hudi-examples/hudi-examples-java,hudi-examples/hudi-examples-spark
|
mvn test -Punit-tests -D"$SCALA_PROFILE" -D"$SPARK_PROFILE" -D"$FLINK_PROFILE" -DfailIfNoTests=false -pl hudi-examples/hudi-examples-flink,hudi-examples/hudi-examples-java,hudi-examples/hudi-examples-spark
|
||||||
|
- name: Spark SQL Test
|
||||||
|
env:
|
||||||
|
SCALA_PROFILE: ${{ matrix.scalaProfile }}
|
||||||
|
SPARK_PROFILE: ${{ matrix.sparkProfile }}
|
||||||
|
FLINK_PROFILE: ${{ matrix.flinkProfile }}
|
||||||
|
if: ${{ !endsWith(env.SPARK_PROFILE, '2.4') }} # skip test spark 2.4 as it's covered by Azure CI
|
||||||
|
run:
|
||||||
|
mvn test -Punit-tests -D"$SCALA_PROFILE" -D"$SPARK_PROFILE" -D"$FLINK_PROFILE" '-Dtest=org.apache.spark.sql.hudi.Test*' -pl hudi-spark-datasource/hudi-spark
|
||||||
|
|||||||
@@ -18,9 +18,9 @@
|
|||||||
package org.apache.hudi.functional
|
package org.apache.hudi.functional
|
||||||
|
|
||||||
import org.apache.hudi.common.util.FileIOUtils
|
import org.apache.hudi.common.util.FileIOUtils
|
||||||
import org.apache.spark.sql.hudi.TestHoodieSqlBase
|
import org.apache.spark.sql.hudi.HoodieSparkSqlTestBase
|
||||||
|
|
||||||
class TestSqlStatement extends TestHoodieSqlBase {
|
class TestSqlStatement extends HoodieSparkSqlTestBase {
|
||||||
val STATE_INIT = 0
|
val STATE_INIT = 0
|
||||||
val STATE_SKIP_COMMENT = 1
|
val STATE_SKIP_COMMENT = 1
|
||||||
val STATE_FINISH_COMMENT = 2
|
val STATE_FINISH_COMMENT = 2
|
||||||
|
|||||||
@@ -23,7 +23,7 @@ import org.apache.hudi.ColumnStatsIndexHelper.buildColumnStatsTableFor
|
|||||||
import org.apache.hudi.config.HoodieClusteringConfig.LayoutOptimizationStrategy
|
import org.apache.hudi.config.HoodieClusteringConfig.LayoutOptimizationStrategy
|
||||||
import org.apache.hudi.sort.SpaceCurveSortingHelper
|
import org.apache.hudi.sort.SpaceCurveSortingHelper
|
||||||
import org.apache.spark.sql.DataFrame
|
import org.apache.spark.sql.DataFrame
|
||||||
import org.apache.spark.sql.hudi.TestHoodieSqlBase
|
import org.apache.spark.sql.hudi.HoodieSparkSqlTestBase
|
||||||
import org.apache.spark.sql.types.{IntegerType, StructField}
|
import org.apache.spark.sql.types.{IntegerType, StructField}
|
||||||
import org.junit.jupiter.api.{Disabled, Tag, Test}
|
import org.junit.jupiter.api.{Disabled, Tag, Test}
|
||||||
|
|
||||||
@@ -31,7 +31,7 @@ import scala.collection.JavaConversions._
|
|||||||
import scala.util.Random
|
import scala.util.Random
|
||||||
|
|
||||||
@Tag("functional")
|
@Tag("functional")
|
||||||
object SpaceCurveOptimizeBenchmark extends TestHoodieSqlBase {
|
object SpaceCurveOptimizeBenchmark extends HoodieSparkSqlTestBase {
|
||||||
|
|
||||||
def evalSkippingPercent(tableName: String, co1: String, co2: String, value1: Int, value2: Int): Unit= {
|
def evalSkippingPercent(tableName: String, co1: String, co2: String, value1: Int, value2: Int): Unit= {
|
||||||
val sourceTableDF = spark.sql(s"select * from ${tableName}")
|
val sourceTableDF = spark.sql(s"select * from ${tableName}")
|
||||||
|
|||||||
@@ -31,7 +31,7 @@ import org.scalatest.{BeforeAndAfterAll, FunSuite, Tag}
|
|||||||
import java.io.File
|
import java.io.File
|
||||||
import java.util.TimeZone
|
import java.util.TimeZone
|
||||||
|
|
||||||
class TestHoodieSqlBase extends FunSuite with BeforeAndAfterAll {
|
class HoodieSparkSqlTestBase extends FunSuite with BeforeAndAfterAll {
|
||||||
org.apache.log4j.Logger.getRootLogger.setLevel(Level.WARN)
|
org.apache.log4j.Logger.getRootLogger.setLevel(Level.WARN)
|
||||||
|
|
||||||
private lazy val sparkWareHouse = {
|
private lazy val sparkWareHouse = {
|
||||||
@@ -22,7 +22,7 @@ import org.apache.hudi.common.table.HoodieTableMetaClient
|
|||||||
import org.apache.spark.sql.catalyst.TableIdentifier
|
import org.apache.spark.sql.catalyst.TableIdentifier
|
||||||
import org.apache.spark.sql.types.{LongType, StructField, StructType}
|
import org.apache.spark.sql.types.{LongType, StructField, StructType}
|
||||||
|
|
||||||
class TestAlterTable extends TestHoodieSqlBase {
|
class TestAlterTable extends HoodieSparkSqlTestBase {
|
||||||
|
|
||||||
test("Test Alter Table") {
|
test("Test Alter Table") {
|
||||||
withTempDir { tmp =>
|
withTempDir { tmp =>
|
||||||
|
|||||||
@@ -23,7 +23,7 @@ import org.apache.hudi.config.HoodieWriteConfig
|
|||||||
import org.apache.hudi.keygen.{ComplexKeyGenerator, SimpleKeyGenerator}
|
import org.apache.hudi.keygen.{ComplexKeyGenerator, SimpleKeyGenerator}
|
||||||
import org.apache.spark.sql.SaveMode
|
import org.apache.spark.sql.SaveMode
|
||||||
|
|
||||||
class TestAlterTableDropPartition extends TestHoodieSqlBase {
|
class TestAlterTableDropPartition extends HoodieSparkSqlTestBase {
|
||||||
|
|
||||||
test("Drop non-partitioned table") {
|
test("Drop non-partitioned table") {
|
||||||
val tableName = generateTableName
|
val tableName = generateTableName
|
||||||
|
|||||||
@@ -17,7 +17,7 @@
|
|||||||
|
|
||||||
package org.apache.spark.sql.hudi
|
package org.apache.spark.sql.hudi
|
||||||
|
|
||||||
class TestCompactionTable extends TestHoodieSqlBase {
|
class TestCompactionTable extends HoodieSparkSqlTestBase {
|
||||||
|
|
||||||
test("Test compaction table") {
|
test("Test compaction table") {
|
||||||
withTempDir {tmp =>
|
withTempDir {tmp =>
|
||||||
|
|||||||
@@ -30,7 +30,7 @@ import org.apache.spark.sql.types._
|
|||||||
|
|
||||||
import scala.collection.JavaConverters._
|
import scala.collection.JavaConverters._
|
||||||
|
|
||||||
class TestCreateTable extends TestHoodieSqlBase {
|
class TestCreateTable extends HoodieSparkSqlTestBase {
|
||||||
|
|
||||||
test("Test Create Managed Hoodie Table") {
|
test("Test Create Managed Hoodie Table") {
|
||||||
val databaseName = "hudi_database"
|
val databaseName = "hudi_database"
|
||||||
|
|||||||
@@ -22,7 +22,7 @@ import org.apache.hudi.config.HoodieWriteConfig
|
|||||||
import org.apache.hudi.keygen.SimpleKeyGenerator
|
import org.apache.hudi.keygen.SimpleKeyGenerator
|
||||||
import org.apache.spark.sql.SaveMode
|
import org.apache.spark.sql.SaveMode
|
||||||
|
|
||||||
class TestDeleteTable extends TestHoodieSqlBase {
|
class TestDeleteTable extends HoodieSparkSqlTestBase {
|
||||||
|
|
||||||
test("Test Delete Table") {
|
test("Test Delete Table") {
|
||||||
withTempDir { tmp =>
|
withTempDir { tmp =>
|
||||||
|
|||||||
@@ -17,7 +17,7 @@
|
|||||||
|
|
||||||
package org.apache.spark.sql.hudi
|
package org.apache.spark.sql.hudi
|
||||||
|
|
||||||
class TestDropTable extends TestHoodieSqlBase {
|
class TestDropTable extends HoodieSparkSqlTestBase {
|
||||||
|
|
||||||
test("Test Drop Table") {
|
test("Test Drop Table") {
|
||||||
withTempDir { tmp =>
|
withTempDir { tmp =>
|
||||||
|
|||||||
@@ -19,27 +19,13 @@ package org.apache.spark.sql.hudi
|
|||||||
|
|
||||||
import org.apache.hudi.common.model.{DefaultHoodieRecordPayload, OverwriteWithLatestAvroPayload}
|
import org.apache.hudi.common.model.{DefaultHoodieRecordPayload, OverwriteWithLatestAvroPayload}
|
||||||
import org.apache.hudi.common.table.HoodieTableConfig
|
import org.apache.hudi.common.table.HoodieTableConfig
|
||||||
import org.apache.hudi.testutils.HoodieClientTestBase
|
import org.apache.hudi.testutils.SparkClientFunctionalTestHarness
|
||||||
|
|
||||||
import org.apache.spark.sql.SparkSession
|
|
||||||
import org.apache.spark.sql.types._
|
import org.apache.spark.sql.types._
|
||||||
|
|
||||||
import org.junit.jupiter.api.Assertions.assertTrue
|
import org.junit.jupiter.api.Assertions.assertTrue
|
||||||
import org.junit.jupiter.api.{BeforeEach, Test}
|
import org.junit.jupiter.api.Test
|
||||||
|
|
||||||
import org.scalatest.Matchers.intercept
|
import org.scalatest.Matchers.intercept
|
||||||
|
|
||||||
class TestHoodieOptionConfig extends HoodieClientTestBase {
|
class TestHoodieOptionConfig extends SparkClientFunctionalTestHarness {
|
||||||
|
|
||||||
var spark: SparkSession = _
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Setup method running before each test.
|
|
||||||
*/
|
|
||||||
@BeforeEach override def setUp() {
|
|
||||||
initSparkContexts()
|
|
||||||
spark = sqlContext.sparkSession
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
def testWithDefaultSqlOptions(): Unit = {
|
def testWithDefaultSqlOptions(): Unit = {
|
||||||
|
|||||||
@@ -26,7 +26,7 @@ import org.apache.spark.sql.SaveMode
|
|||||||
|
|
||||||
import java.io.File
|
import java.io.File
|
||||||
|
|
||||||
class TestInsertTable extends TestHoodieSqlBase {
|
class TestInsertTable extends HoodieSparkSqlTestBase {
|
||||||
|
|
||||||
test("Test Insert Into") {
|
test("Test Insert Into") {
|
||||||
withTempDir { tmp =>
|
withTempDir { tmp =>
|
||||||
|
|||||||
@@ -19,7 +19,7 @@ package org.apache.spark.sql.hudi
|
|||||||
|
|
||||||
import org.apache.hudi.testutils.DataSourceTestUtils
|
import org.apache.hudi.testutils.DataSourceTestUtils
|
||||||
|
|
||||||
class TestMergeIntoLogOnlyTable extends TestHoodieSqlBase {
|
class TestMergeIntoLogOnlyTable extends HoodieSparkSqlTestBase {
|
||||||
|
|
||||||
test("Test Query Log Only MOR Table") {
|
test("Test Query Log Only MOR Table") {
|
||||||
withTempDir { tmp =>
|
withTempDir { tmp =>
|
||||||
|
|||||||
@@ -20,7 +20,7 @@ package org.apache.spark.sql.hudi
|
|||||||
import org.apache.hudi.{DataSourceReadOptions, HoodieDataSourceHelpers}
|
import org.apache.hudi.{DataSourceReadOptions, HoodieDataSourceHelpers}
|
||||||
import org.apache.hudi.common.fs.FSUtils
|
import org.apache.hudi.common.fs.FSUtils
|
||||||
|
|
||||||
class TestMergeIntoTable extends TestHoodieSqlBase {
|
class TestMergeIntoTable extends HoodieSparkSqlTestBase {
|
||||||
|
|
||||||
test("Test MergeInto Basic") {
|
test("Test MergeInto Basic") {
|
||||||
withTempDir { tmp =>
|
withTempDir { tmp =>
|
||||||
|
|||||||
@@ -21,7 +21,7 @@ import org.apache.hudi.HoodieSparkUtils
|
|||||||
import org.apache.hudi.common.table.HoodieTableMetaClient
|
import org.apache.hudi.common.table.HoodieTableMetaClient
|
||||||
import org.apache.spark.sql.Row
|
import org.apache.spark.sql.Row
|
||||||
|
|
||||||
class TestMergeIntoTable2 extends TestHoodieSqlBase {
|
class TestMergeIntoTable2 extends HoodieSparkSqlTestBase {
|
||||||
|
|
||||||
test("Test MergeInto for MOR table 2") {
|
test("Test MergeInto for MOR table 2") {
|
||||||
withTempDir { tmp =>
|
withTempDir { tmp =>
|
||||||
|
|||||||
@@ -17,7 +17,7 @@
|
|||||||
|
|
||||||
package org.apache.spark.sql.hudi
|
package org.apache.spark.sql.hudi
|
||||||
|
|
||||||
class TestPartialUpdateForMergeInto extends TestHoodieSqlBase {
|
class TestPartialUpdateForMergeInto extends HoodieSparkSqlTestBase {
|
||||||
|
|
||||||
test("Test Partial Update") {
|
test("Test Partial Update") {
|
||||||
withTempDir { tmp =>
|
withTempDir { tmp =>
|
||||||
|
|||||||
@@ -19,7 +19,7 @@ package org.apache.spark.sql.hudi
|
|||||||
|
|
||||||
import org.apache.spark.sql.Row
|
import org.apache.spark.sql.Row
|
||||||
|
|
||||||
class TestShowPartitions extends TestHoodieSqlBase {
|
class TestShowPartitions extends HoodieSparkSqlTestBase {
|
||||||
|
|
||||||
test("Test Show Non Partitioned Table's Partitions") {
|
test("Test Show Non Partitioned Table's Partitions") {
|
||||||
val tableName = generateTableName
|
val tableName = generateTableName
|
||||||
|
|||||||
@@ -27,7 +27,7 @@ import org.apache.spark.sql.{DataFrame, Row, SaveMode, SparkSession}
|
|||||||
import scala.collection.JavaConversions._
|
import scala.collection.JavaConversions._
|
||||||
import scala.collection.JavaConverters._
|
import scala.collection.JavaConverters._
|
||||||
|
|
||||||
class TestSpark3DDL extends TestHoodieSqlBase {
|
class TestSpark3DDL extends HoodieSparkSqlTestBase {
|
||||||
|
|
||||||
def createTestResult(tableName: String): Array[Row] = {
|
def createTestResult(tableName: String): Array[Row] = {
|
||||||
spark.sql(s"select * from ${tableName} order by id")
|
spark.sql(s"select * from ${tableName} order by id")
|
||||||
|
|||||||
@@ -28,7 +28,7 @@ import java.nio.file.{Files, Paths}
|
|||||||
|
|
||||||
import org.scalatest.BeforeAndAfter
|
import org.scalatest.BeforeAndAfter
|
||||||
|
|
||||||
class TestSqlConf extends TestHoodieSqlBase with BeforeAndAfter {
|
class TestSqlConf extends HoodieSparkSqlTestBase with BeforeAndAfter {
|
||||||
|
|
||||||
def setEnv(key: String, value: String): String = {
|
def setEnv(key: String, value: String): String = {
|
||||||
val field = System.getenv().getClass.getDeclaredField("m")
|
val field = System.getenv().getClass.getDeclaredField("m")
|
||||||
|
|||||||
@@ -20,7 +20,7 @@ package org.apache.spark.sql.hudi
|
|||||||
import org.apache.hudi.HoodieSparkUtils
|
import org.apache.hudi.HoodieSparkUtils
|
||||||
import org.apache.hudi.common.table.HoodieTableMetaClient
|
import org.apache.hudi.common.table.HoodieTableMetaClient
|
||||||
|
|
||||||
class TestTimeTravelTable extends TestHoodieSqlBase {
|
class TestTimeTravelTable extends HoodieSparkSqlTestBase {
|
||||||
test("Test Insert and Update Record with time travel") {
|
test("Test Insert and Update Record with time travel") {
|
||||||
if (HoodieSparkUtils.gteqSpark3_2) {
|
if (HoodieSparkUtils.gteqSpark3_2) {
|
||||||
withTempDir { tmp =>
|
withTempDir { tmp =>
|
||||||
|
|||||||
@@ -23,7 +23,7 @@ import org.apache.hudi.config.HoodieWriteConfig
|
|||||||
import org.apache.hudi.keygen.{ComplexKeyGenerator, SimpleKeyGenerator}
|
import org.apache.hudi.keygen.{ComplexKeyGenerator, SimpleKeyGenerator}
|
||||||
import org.apache.spark.sql.SaveMode
|
import org.apache.spark.sql.SaveMode
|
||||||
|
|
||||||
class TestTruncateTable extends TestHoodieSqlBase {
|
class TestTruncateTable extends HoodieSparkSqlTestBase {
|
||||||
|
|
||||||
test("Test Truncate non-partitioned Table") {
|
test("Test Truncate non-partitioned Table") {
|
||||||
Seq("cow", "mor").foreach { tableType =>
|
Seq("cow", "mor").foreach { tableType =>
|
||||||
|
|||||||
@@ -17,7 +17,7 @@
|
|||||||
|
|
||||||
package org.apache.spark.sql.hudi
|
package org.apache.spark.sql.hudi
|
||||||
|
|
||||||
class TestUpdateTable extends TestHoodieSqlBase {
|
class TestUpdateTable extends HoodieSparkSqlTestBase {
|
||||||
|
|
||||||
test("Test Update Table") {
|
test("Test Update Table") {
|
||||||
withTempDir { tmp =>
|
withTempDir { tmp =>
|
||||||
|
|||||||
@@ -21,13 +21,13 @@ import com.google.common.collect.ImmutableList
|
|||||||
import org.apache.hudi.HoodieSparkUtils
|
import org.apache.hudi.HoodieSparkUtils
|
||||||
import org.apache.spark.sql.catalyst.expressions.Literal
|
import org.apache.spark.sql.catalyst.expressions.Literal
|
||||||
import org.apache.spark.sql.catalyst.plans.logical.{CallCommand, NamedArgument, PositionalArgument}
|
import org.apache.spark.sql.catalyst.plans.logical.{CallCommand, NamedArgument, PositionalArgument}
|
||||||
import org.apache.spark.sql.hudi.TestHoodieSqlBase
|
import org.apache.spark.sql.hudi.HoodieSparkSqlTestBase
|
||||||
import org.apache.spark.sql.types.{DataType, DataTypes}
|
import org.apache.spark.sql.types.{DataType, DataTypes}
|
||||||
|
|
||||||
import java.math.BigDecimal
|
import java.math.BigDecimal
|
||||||
import scala.collection.JavaConverters
|
import scala.collection.JavaConverters
|
||||||
|
|
||||||
class TestCallCommandParser extends TestHoodieSqlBase {
|
class TestCallCommandParser extends HoodieSparkSqlTestBase {
|
||||||
private val parser = spark.sessionState.sqlParser
|
private val parser = spark.sessionState.sqlParser
|
||||||
|
|
||||||
test("Test Call Produce with Positional Arguments") {
|
test("Test Call Produce with Positional Arguments") {
|
||||||
|
|||||||
@@ -17,9 +17,9 @@
|
|||||||
|
|
||||||
package org.apache.spark.sql.hudi.procedure
|
package org.apache.spark.sql.hudi.procedure
|
||||||
|
|
||||||
import org.apache.spark.sql.hudi.TestHoodieSqlBase
|
import org.apache.spark.sql.hudi.HoodieSparkSqlTestBase
|
||||||
|
|
||||||
class TestCallProcedure extends TestHoodieSqlBase {
|
class TestCallProcedure extends HoodieSparkSqlTestBase {
|
||||||
|
|
||||||
test("Test Call show_commits Procedure") {
|
test("Test Call show_commits Procedure") {
|
||||||
withTempDir { tmp =>
|
withTempDir { tmp =>
|
||||||
|
|||||||
@@ -24,11 +24,11 @@ import org.apache.hudi.common.table.timeline.{HoodieActiveTimeline, HoodieTimeli
|
|||||||
import org.apache.hudi.common.util.{Option => HOption}
|
import org.apache.hudi.common.util.{Option => HOption}
|
||||||
import org.apache.hudi.{HoodieCLIUtils, HoodieDataSourceHelpers}
|
import org.apache.hudi.{HoodieCLIUtils, HoodieDataSourceHelpers}
|
||||||
|
|
||||||
import org.apache.spark.sql.hudi.TestHoodieSqlBase
|
import org.apache.spark.sql.hudi.HoodieSparkSqlTestBase
|
||||||
|
|
||||||
import scala.collection.JavaConverters.asScalaIteratorConverter
|
import scala.collection.JavaConverters.asScalaIteratorConverter
|
||||||
|
|
||||||
class TestClusteringProcedure extends TestHoodieSqlBase {
|
class TestClusteringProcedure extends HoodieSparkSqlTestBase {
|
||||||
|
|
||||||
test("Test Call run_clustering Procedure By Table") {
|
test("Test Call run_clustering Procedure By Table") {
|
||||||
withTempDir { tmp =>
|
withTempDir { tmp =>
|
||||||
|
|||||||
@@ -19,9 +19,9 @@
|
|||||||
|
|
||||||
package org.apache.spark.sql.hudi.procedure
|
package org.apache.spark.sql.hudi.procedure
|
||||||
|
|
||||||
import org.apache.spark.sql.hudi.TestHoodieSqlBase
|
import org.apache.spark.sql.hudi.HoodieSparkSqlTestBase
|
||||||
|
|
||||||
class TestCompactionProcedure extends TestHoodieSqlBase {
|
class TestCompactionProcedure extends HoodieSparkSqlTestBase {
|
||||||
|
|
||||||
test("Test Call run_compaction Procedure by Table") {
|
test("Test Call run_compaction Procedure by Table") {
|
||||||
withTempDir { tmp =>
|
withTempDir { tmp =>
|
||||||
|
|||||||
@@ -17,9 +17,9 @@
|
|||||||
|
|
||||||
package org.apache.spark.sql.hudi.procedure
|
package org.apache.spark.sql.hudi.procedure
|
||||||
|
|
||||||
import org.apache.spark.sql.hudi.TestHoodieSqlBase
|
import org.apache.spark.sql.hudi.HoodieSparkSqlTestBase
|
||||||
|
|
||||||
class TestSavepointsProcedure extends TestHoodieSqlBase {
|
class TestSavepointsProcedure extends HoodieSparkSqlTestBase {
|
||||||
|
|
||||||
test("Test Call create_savepoints Procedure") {
|
test("Test Call create_savepoints Procedure") {
|
||||||
withTempDir { tmp =>
|
withTempDir { tmp =>
|
||||||
|
|||||||
Reference in New Issue
Block a user