[HUDI-1230] Fix for preventing MOR datasource jobs from hanging via spark-submit (#2046)
This commit is contained in:
@@ -52,6 +52,7 @@ private[hudi] object HoodieSparkSqlWriter {
|
|||||||
|
|
||||||
private val log = LogManager.getLogger(getClass)
|
private val log = LogManager.getLogger(getClass)
|
||||||
private var tableExists: Boolean = false
|
private var tableExists: Boolean = false
|
||||||
|
private var asyncCompactionTriggerFnDefined: Boolean = false
|
||||||
|
|
||||||
def write(sqlContext: SQLContext,
|
def write(sqlContext: SQLContext,
|
||||||
mode: SaveMode,
|
mode: SaveMode,
|
||||||
@@ -67,6 +68,7 @@ private[hudi] object HoodieSparkSqlWriter {
|
|||||||
val sparkContext = sqlContext.sparkContext
|
val sparkContext = sqlContext.sparkContext
|
||||||
val path = parameters.get("path")
|
val path = parameters.get("path")
|
||||||
val tblNameOp = parameters.get(HoodieWriteConfig.TABLE_NAME)
|
val tblNameOp = parameters.get(HoodieWriteConfig.TABLE_NAME)
|
||||||
|
asyncCompactionTriggerFnDefined = asyncCompactionTriggerFn.isDefined
|
||||||
if (path.isEmpty || tblNameOp.isEmpty) {
|
if (path.isEmpty || tblNameOp.isEmpty) {
|
||||||
throw new HoodieException(s"'${HoodieWriteConfig.TABLE_NAME}', 'path' must be set.")
|
throw new HoodieException(s"'${HoodieWriteConfig.TABLE_NAME}', 'path' must be set.")
|
||||||
}
|
}
|
||||||
@@ -147,8 +149,7 @@ private[hudi] object HoodieSparkSqlWriter {
|
|||||||
tblName, mapAsJavaMap(parameters)
|
tblName, mapAsJavaMap(parameters)
|
||||||
)).asInstanceOf[HoodieWriteClient[HoodieRecordPayload[Nothing]]]
|
)).asInstanceOf[HoodieWriteClient[HoodieRecordPayload[Nothing]]]
|
||||||
|
|
||||||
if (asyncCompactionTriggerFn.isDefined &&
|
if (isAsyncCompactionEnabled(client, tableConfig, parameters, jsc.hadoopConfiguration())) {
|
||||||
isAsyncCompactionEnabled(client, tableConfig, parameters, jsc.hadoopConfiguration())) {
|
|
||||||
asyncCompactionTriggerFn.get.apply(client)
|
asyncCompactionTriggerFn.get.apply(client)
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -187,8 +188,7 @@ private[hudi] object HoodieSparkSqlWriter {
|
|||||||
Schema.create(Schema.Type.NULL).toString, path.get, tblName,
|
Schema.create(Schema.Type.NULL).toString, path.get, tblName,
|
||||||
mapAsJavaMap(parameters))).asInstanceOf[HoodieWriteClient[HoodieRecordPayload[Nothing]]]
|
mapAsJavaMap(parameters))).asInstanceOf[HoodieWriteClient[HoodieRecordPayload[Nothing]]]
|
||||||
|
|
||||||
if (asyncCompactionTriggerFn.isDefined &&
|
if (isAsyncCompactionEnabled(client, tableConfig, parameters, jsc.hadoopConfiguration())) {
|
||||||
isAsyncCompactionEnabled(client, tableConfig, parameters, jsc.hadoopConfiguration())) {
|
|
||||||
asyncCompactionTriggerFn.get.apply(client)
|
asyncCompactionTriggerFn.get.apply(client)
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -441,7 +441,7 @@ private[hudi] object HoodieSparkSqlWriter {
|
|||||||
tableConfig: HoodieTableConfig,
|
tableConfig: HoodieTableConfig,
|
||||||
parameters: Map[String, String], configuration: Configuration) : Boolean = {
|
parameters: Map[String, String], configuration: Configuration) : Boolean = {
|
||||||
log.info(s"Config.isInlineCompaction ? ${client.getConfig.isInlineCompaction}")
|
log.info(s"Config.isInlineCompaction ? ${client.getConfig.isInlineCompaction}")
|
||||||
if (!client.getConfig.isInlineCompaction
|
if (asyncCompactionTriggerFnDefined && !client.getConfig.isInlineCompaction
|
||||||
&& parameters.get(ASYNC_COMPACT_ENABLE_OPT_KEY).exists(r => r.toBoolean)) {
|
&& parameters.get(ASYNC_COMPACT_ENABLE_OPT_KEY).exists(r => r.toBoolean)) {
|
||||||
tableConfig.getTableType == HoodieTableType.MERGE_ON_READ
|
tableConfig.getTableType == HoodieTableType.MERGE_ON_READ
|
||||||
} else {
|
} else {
|
||||||
|
|||||||
@@ -22,17 +22,29 @@ import java.util.{Date, UUID}
|
|||||||
|
|
||||||
import org.apache.commons.io.FileUtils
|
import org.apache.commons.io.FileUtils
|
||||||
import org.apache.hudi.DataSourceWriteOptions._
|
import org.apache.hudi.DataSourceWriteOptions._
|
||||||
import org.apache.hudi.common.model.HoodieRecord
|
import org.apache.hudi.client.HoodieWriteClient
|
||||||
|
import org.apache.hudi.common.model.{HoodieRecord, HoodieRecordPayload}
|
||||||
import org.apache.hudi.common.testutils.HoodieTestDataGenerator
|
import org.apache.hudi.common.testutils.HoodieTestDataGenerator
|
||||||
import org.apache.hudi.config.HoodieWriteConfig
|
import org.apache.hudi.config.HoodieWriteConfig
|
||||||
import org.apache.hudi.exception.HoodieException
|
import org.apache.hudi.exception.HoodieException
|
||||||
|
import org.apache.hudi.keygen.SimpleKeyGenerator
|
||||||
import org.apache.hudi.testutils.DataSourceTestUtils
|
import org.apache.hudi.testutils.DataSourceTestUtils
|
||||||
import org.apache.hudi.{AvroConversionUtils, DataSourceWriteOptions, HoodieSparkSqlWriter, HoodieWriterUtils}
|
import org.apache.hudi.{AvroConversionUtils, DataSourceUtils, DataSourceWriteOptions, HoodieSparkSqlWriter, HoodieWriterUtils}
|
||||||
import org.apache.spark.sql.{Row, SaveMode, SparkSession}
|
import org.apache.spark.SparkContext
|
||||||
|
import org.apache.spark.api.java.JavaSparkContext
|
||||||
|
import org.apache.spark.sql.{Row, SQLContext, SaveMode, SparkSession}
|
||||||
|
import org.mockito.ArgumentMatchers.any
|
||||||
|
import org.mockito.Mockito.{spy, times, verify}
|
||||||
import org.scalatest.{FunSuite, Matchers}
|
import org.scalatest.{FunSuite, Matchers}
|
||||||
|
|
||||||
|
import scala.collection.JavaConversions._
|
||||||
|
|
||||||
class HoodieSparkSqlWriterSuite extends FunSuite with Matchers {
|
class HoodieSparkSqlWriterSuite extends FunSuite with Matchers {
|
||||||
|
|
||||||
|
var spark: SparkSession = _
|
||||||
|
var sc: SparkContext = _
|
||||||
|
var sqlContext: SQLContext = _
|
||||||
|
|
||||||
test("Parameters With Write Defaults") {
|
test("Parameters With Write Defaults") {
|
||||||
val originals = HoodieWriterUtils.parametersWithWriteDefaults(Map.empty)
|
val originals = HoodieWriterUtils.parametersWithWriteDefaults(Map.empty)
|
||||||
val rhsKey = "hoodie.right.hand.side.key"
|
val rhsKey = "hoodie.right.hand.side.key"
|
||||||
@@ -65,15 +77,10 @@ class HoodieSparkSqlWriterSuite extends FunSuite with Matchers {
|
|||||||
|
|
||||||
test("throw hoodie exception when there already exist a table with different name with Append Save mode") {
|
test("throw hoodie exception when there already exist a table with different name with Append Save mode") {
|
||||||
|
|
||||||
val session = SparkSession.builder()
|
initSparkContext("test_append_mode")
|
||||||
.appName("test_append_mode")
|
|
||||||
.master("local[2]")
|
|
||||||
.config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
|
|
||||||
.getOrCreate()
|
|
||||||
val path = java.nio.file.Files.createTempDirectory("hoodie_test_path")
|
val path = java.nio.file.Files.createTempDirectory("hoodie_test_path")
|
||||||
try {
|
try {
|
||||||
|
|
||||||
val sqlContext = session.sqlContext
|
|
||||||
val hoodieFooTableName = "hoodie_foo_tbl"
|
val hoodieFooTableName = "hoodie_foo_tbl"
|
||||||
|
|
||||||
//create a new table
|
//create a new table
|
||||||
@@ -82,7 +89,7 @@ class HoodieSparkSqlWriterSuite extends FunSuite with Matchers {
|
|||||||
"hoodie.insert.shuffle.parallelism" -> "4",
|
"hoodie.insert.shuffle.parallelism" -> "4",
|
||||||
"hoodie.upsert.shuffle.parallelism" -> "4")
|
"hoodie.upsert.shuffle.parallelism" -> "4")
|
||||||
val fooTableParams = HoodieWriterUtils.parametersWithWriteDefaults(fooTableModifier)
|
val fooTableParams = HoodieWriterUtils.parametersWithWriteDefaults(fooTableModifier)
|
||||||
val dataFrame = session.createDataFrame(Seq(Test(UUID.randomUUID().toString, new Date().getTime)))
|
val dataFrame = spark.createDataFrame(Seq(Test(UUID.randomUUID().toString, new Date().getTime)))
|
||||||
HoodieSparkSqlWriter.write(sqlContext, SaveMode.Append, fooTableParams, dataFrame)
|
HoodieSparkSqlWriter.write(sqlContext, SaveMode.Append, fooTableParams, dataFrame)
|
||||||
|
|
||||||
//on same path try append with different("hoodie_bar_tbl") table name which should throw an exception
|
//on same path try append with different("hoodie_bar_tbl") table name which should throw an exception
|
||||||
@@ -91,7 +98,7 @@ class HoodieSparkSqlWriterSuite extends FunSuite with Matchers {
|
|||||||
"hoodie.insert.shuffle.parallelism" -> "4",
|
"hoodie.insert.shuffle.parallelism" -> "4",
|
||||||
"hoodie.upsert.shuffle.parallelism" -> "4")
|
"hoodie.upsert.shuffle.parallelism" -> "4")
|
||||||
val barTableParams = HoodieWriterUtils.parametersWithWriteDefaults(barTableModifier)
|
val barTableParams = HoodieWriterUtils.parametersWithWriteDefaults(barTableModifier)
|
||||||
val dataFrame2 = session.createDataFrame(Seq(Test(UUID.randomUUID().toString, new Date().getTime)))
|
val dataFrame2 = spark.createDataFrame(Seq(Test(UUID.randomUUID().toString, new Date().getTime)))
|
||||||
val tableAlreadyExistException = intercept[HoodieException](HoodieSparkSqlWriter.write(sqlContext, SaveMode.Append, barTableParams, dataFrame2))
|
val tableAlreadyExistException = intercept[HoodieException](HoodieSparkSqlWriter.write(sqlContext, SaveMode.Append, barTableParams, dataFrame2))
|
||||||
assert(tableAlreadyExistException.getMessage.contains("hoodie table with name " + hoodieFooTableName + " already exist"))
|
assert(tableAlreadyExistException.getMessage.contains("hoodie table with name " + hoodieFooTableName + " already exist"))
|
||||||
|
|
||||||
@@ -100,22 +107,16 @@ class HoodieSparkSqlWriterSuite extends FunSuite with Matchers {
|
|||||||
val deleteCmdException = intercept[HoodieException](HoodieSparkSqlWriter.write(sqlContext, SaveMode.Append, deleteTableParams, dataFrame2))
|
val deleteCmdException = intercept[HoodieException](HoodieSparkSqlWriter.write(sqlContext, SaveMode.Append, deleteTableParams, dataFrame2))
|
||||||
assert(deleteCmdException.getMessage.contains("hoodie table with name " + hoodieFooTableName + " already exist"))
|
assert(deleteCmdException.getMessage.contains("hoodie table with name " + hoodieFooTableName + " already exist"))
|
||||||
} finally {
|
} finally {
|
||||||
session.stop()
|
spark.stop()
|
||||||
FileUtils.deleteDirectory(path.toFile)
|
FileUtils.deleteDirectory(path.toFile)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
test("test bulk insert dataset with datasource impl") {
|
test("test bulk insert dataset with datasource impl") {
|
||||||
val session = SparkSession.builder()
|
initSparkContext("test_bulk_insert_datasource")
|
||||||
.appName("test_bulk_insert_datasource")
|
|
||||||
.master("local[2]")
|
|
||||||
.config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
|
|
||||||
.getOrCreate()
|
|
||||||
val path = java.nio.file.Files.createTempDirectory("hoodie_test_path")
|
val path = java.nio.file.Files.createTempDirectory("hoodie_test_path")
|
||||||
try {
|
try {
|
||||||
|
|
||||||
val sqlContext = session.sqlContext
|
|
||||||
val sc = session.sparkContext
|
|
||||||
val hoodieFooTableName = "hoodie_foo_tbl"
|
val hoodieFooTableName = "hoodie_foo_tbl"
|
||||||
|
|
||||||
//create a new table
|
//create a new table
|
||||||
@@ -134,7 +135,7 @@ class HoodieSparkSqlWriterSuite extends FunSuite with Matchers {
|
|||||||
val structType = AvroConversionUtils.convertAvroSchemaToStructType(schema)
|
val structType = AvroConversionUtils.convertAvroSchemaToStructType(schema)
|
||||||
val records = DataSourceTestUtils.generateRandomRows(100)
|
val records = DataSourceTestUtils.generateRandomRows(100)
|
||||||
val recordsSeq = convertRowListToSeq(records)
|
val recordsSeq = convertRowListToSeq(records)
|
||||||
val df = session.createDataFrame(sc.parallelize(recordsSeq), structType)
|
val df = spark.createDataFrame(sc.parallelize(recordsSeq), structType)
|
||||||
// write to Hudi
|
// write to Hudi
|
||||||
HoodieSparkSqlWriter.write(sqlContext, SaveMode.Append, fooTableParams, df)
|
HoodieSparkSqlWriter.write(sqlContext, SaveMode.Append, fooTableParams, df)
|
||||||
|
|
||||||
@@ -148,7 +149,7 @@ class HoodieSparkSqlWriterSuite extends FunSuite with Matchers {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// fetch all records from parquet files generated from write to hudi
|
// fetch all records from parquet files generated from write to hudi
|
||||||
val actualDf = session.sqlContext.read.parquet(fullPartitionPaths(0), fullPartitionPaths(1), fullPartitionPaths(2))
|
val actualDf = sqlContext.read.parquet(fullPartitionPaths(0), fullPartitionPaths(1), fullPartitionPaths(2))
|
||||||
|
|
||||||
// remove metadata columns so that expected and actual DFs can be compared as is
|
// remove metadata columns so that expected and actual DFs can be compared as is
|
||||||
val trimmedDf = actualDf.drop(HoodieRecord.HOODIE_META_COLUMNS.get(0)).drop(HoodieRecord.HOODIE_META_COLUMNS.get(1))
|
val trimmedDf = actualDf.drop(HoodieRecord.HOODIE_META_COLUMNS.get(0)).drop(HoodieRecord.HOODIE_META_COLUMNS.get(1))
|
||||||
@@ -157,22 +158,16 @@ class HoodieSparkSqlWriterSuite extends FunSuite with Matchers {
|
|||||||
|
|
||||||
assert(df.except(trimmedDf).count() == 0)
|
assert(df.except(trimmedDf).count() == 0)
|
||||||
} finally {
|
} finally {
|
||||||
session.stop()
|
spark.stop()
|
||||||
FileUtils.deleteDirectory(path.toFile)
|
FileUtils.deleteDirectory(path.toFile)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
test("test bulk insert dataset with datasource impl multiple rounds") {
|
test("test bulk insert dataset with datasource impl multiple rounds") {
|
||||||
val session = SparkSession.builder()
|
initSparkContext("test_bulk_insert_datasource")
|
||||||
.appName("test_bulk_insert_datasource")
|
|
||||||
.master("local[2]")
|
|
||||||
.config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
|
|
||||||
.getOrCreate()
|
|
||||||
val path = java.nio.file.Files.createTempDirectory("hoodie_test_path")
|
val path = java.nio.file.Files.createTempDirectory("hoodie_test_path")
|
||||||
try {
|
try {
|
||||||
|
|
||||||
val sqlContext = session.sqlContext
|
|
||||||
val sc = session.sparkContext
|
|
||||||
val hoodieFooTableName = "hoodie_foo_tbl"
|
val hoodieFooTableName = "hoodie_foo_tbl"
|
||||||
|
|
||||||
//create a new table
|
//create a new table
|
||||||
@@ -194,18 +189,18 @@ class HoodieSparkSqlWriterSuite extends FunSuite with Matchers {
|
|||||||
|
|
||||||
val schema = DataSourceTestUtils.getStructTypeExampleSchema
|
val schema = DataSourceTestUtils.getStructTypeExampleSchema
|
||||||
val structType = AvroConversionUtils.convertAvroSchemaToStructType(schema)
|
val structType = AvroConversionUtils.convertAvroSchemaToStructType(schema)
|
||||||
var totalExpectedDf = session.createDataFrame(sc.emptyRDD[Row], structType)
|
var totalExpectedDf = spark.createDataFrame(sc.emptyRDD[Row], structType)
|
||||||
|
|
||||||
for (_ <- 0 to 2) {
|
for (_ <- 0 to 2) {
|
||||||
// generate the inserts
|
// generate the inserts
|
||||||
val records = DataSourceTestUtils.generateRandomRows(200)
|
val records = DataSourceTestUtils.generateRandomRows(200)
|
||||||
val recordsSeq = convertRowListToSeq(records)
|
val recordsSeq = convertRowListToSeq(records)
|
||||||
val df = session.createDataFrame(sc.parallelize(recordsSeq), structType)
|
val df = spark.createDataFrame(sc.parallelize(recordsSeq), structType)
|
||||||
// write to Hudi
|
// write to Hudi
|
||||||
HoodieSparkSqlWriter.write(sqlContext, SaveMode.Append, fooTableParams, df)
|
HoodieSparkSqlWriter.write(sqlContext, SaveMode.Append, fooTableParams, df)
|
||||||
|
|
||||||
// Fetch records from entire dataset
|
// Fetch records from entire dataset
|
||||||
val actualDf = session.sqlContext.read.parquet(fullPartitionPaths(0), fullPartitionPaths(1), fullPartitionPaths(2))
|
val actualDf = sqlContext.read.parquet(fullPartitionPaths(0), fullPartitionPaths(1), fullPartitionPaths(2))
|
||||||
|
|
||||||
// remove metadata columns so that expected and actual DFs can be compared as is
|
// remove metadata columns so that expected and actual DFs can be compared as is
|
||||||
val trimmedDf = actualDf.drop(HoodieRecord.HOODIE_META_COLUMNS.get(0)).drop(HoodieRecord.HOODIE_META_COLUMNS.get(1))
|
val trimmedDf = actualDf.drop(HoodieRecord.HOODIE_META_COLUMNS.get(0)).drop(HoodieRecord.HOODIE_META_COLUMNS.get(1))
|
||||||
@@ -218,11 +213,78 @@ class HoodieSparkSqlWriterSuite extends FunSuite with Matchers {
|
|||||||
assert(totalExpectedDf.except(trimmedDf).count() == 0)
|
assert(totalExpectedDf.except(trimmedDf).count() == 0)
|
||||||
}
|
}
|
||||||
} finally {
|
} finally {
|
||||||
session.stop()
|
spark.stop()
|
||||||
FileUtils.deleteDirectory(path.toFile)
|
FileUtils.deleteDirectory(path.toFile)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
List(DataSourceWriteOptions.COW_TABLE_TYPE_OPT_VAL, DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL)
|
||||||
|
.foreach(tableType => {
|
||||||
|
test("test basic HoodieSparkSqlWriter functionality with datasource insert for " + tableType) {
|
||||||
|
initSparkContext("test_insert_datasource")
|
||||||
|
val path = java.nio.file.Files.createTempDirectory("hoodie_test_path")
|
||||||
|
try {
|
||||||
|
|
||||||
|
val hoodieFooTableName = "hoodie_foo_tbl"
|
||||||
|
|
||||||
|
//create a new table
|
||||||
|
val fooTableModifier = Map("path" -> path.toAbsolutePath.toString,
|
||||||
|
HoodieWriteConfig.TABLE_NAME -> hoodieFooTableName,
|
||||||
|
DataSourceWriteOptions.TABLE_TYPE_OPT_KEY -> tableType,
|
||||||
|
HoodieWriteConfig.INSERT_PARALLELISM -> "4",
|
||||||
|
DataSourceWriteOptions.OPERATION_OPT_KEY -> DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL,
|
||||||
|
DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY -> "_row_key",
|
||||||
|
DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY -> "partition",
|
||||||
|
DataSourceWriteOptions.KEYGENERATOR_CLASS_OPT_KEY -> classOf[SimpleKeyGenerator].getCanonicalName)
|
||||||
|
val fooTableParams = HoodieWriterUtils.parametersWithWriteDefaults(fooTableModifier)
|
||||||
|
|
||||||
|
// generate the inserts
|
||||||
|
val schema = DataSourceTestUtils.getStructTypeExampleSchema
|
||||||
|
val structType = AvroConversionUtils.convertAvroSchemaToStructType(schema)
|
||||||
|
val records = DataSourceTestUtils.generateRandomRows(100)
|
||||||
|
val recordsSeq = convertRowListToSeq(records)
|
||||||
|
val df = spark.createDataFrame(sc.parallelize(recordsSeq), structType)
|
||||||
|
|
||||||
|
val client = spy(DataSourceUtils.createHoodieClient(
|
||||||
|
new JavaSparkContext(sc),
|
||||||
|
schema.toString,
|
||||||
|
path.toAbsolutePath.toString,
|
||||||
|
hoodieFooTableName,
|
||||||
|
mapAsJavaMap(fooTableParams)).asInstanceOf[HoodieWriteClient[HoodieRecordPayload[Nothing]]])
|
||||||
|
|
||||||
|
// write to Hudi
|
||||||
|
HoodieSparkSqlWriter.write(sqlContext, SaveMode.Append, fooTableParams, df, Option.empty,
|
||||||
|
Option(client))
|
||||||
|
// Verify that asynchronous compaction is not scheduled
|
||||||
|
verify(client, times(0)).scheduleCompaction(any())
|
||||||
|
// Verify that HoodieWriteClient is closed correctly
|
||||||
|
verify(client, times(1)).close()
|
||||||
|
|
||||||
|
// collect all partition paths to issue read of parquet files
|
||||||
|
val partitions = Seq(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH,
|
||||||
|
HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH, HoodieTestDataGenerator.DEFAULT_THIRD_PARTITION_PATH)
|
||||||
|
// Check the entire dataset has all records still
|
||||||
|
val fullPartitionPaths = new Array[String](3)
|
||||||
|
for (i <- fullPartitionPaths.indices) {
|
||||||
|
fullPartitionPaths(i) = String.format("%s/%s/*", path.toAbsolutePath.toString, partitions(i))
|
||||||
|
}
|
||||||
|
|
||||||
|
// fetch all records from parquet files generated from write to hudi
|
||||||
|
val actualDf = sqlContext.read.parquet(fullPartitionPaths(0), fullPartitionPaths(1), fullPartitionPaths(2))
|
||||||
|
|
||||||
|
// remove metadata columns so that expected and actual DFs can be compared as is
|
||||||
|
val trimmedDf = actualDf.drop(HoodieRecord.HOODIE_META_COLUMNS.get(0)).drop(HoodieRecord.HOODIE_META_COLUMNS.get(1))
|
||||||
|
.drop(HoodieRecord.HOODIE_META_COLUMNS.get(2)).drop(HoodieRecord.HOODIE_META_COLUMNS.get(3))
|
||||||
|
.drop(HoodieRecord.HOODIE_META_COLUMNS.get(4))
|
||||||
|
|
||||||
|
assert(df.except(trimmedDf).count() == 0)
|
||||||
|
} finally {
|
||||||
|
spark.stop()
|
||||||
|
FileUtils.deleteDirectory(path.toFile)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
})
|
||||||
|
|
||||||
case class Test(uuid: String, ts: Long)
|
case class Test(uuid: String, ts: Long)
|
||||||
|
|
||||||
import scala.collection.JavaConverters
|
import scala.collection.JavaConverters
|
||||||
@@ -230,4 +292,14 @@ class HoodieSparkSqlWriterSuite extends FunSuite with Matchers {
|
|||||||
def convertRowListToSeq(inputList: util.List[Row]): Seq[Row] =
|
def convertRowListToSeq(inputList: util.List[Row]): Seq[Row] =
|
||||||
JavaConverters.asScalaIteratorConverter(inputList.iterator).asScala.toSeq
|
JavaConverters.asScalaIteratorConverter(inputList.iterator).asScala.toSeq
|
||||||
|
|
||||||
|
def initSparkContext(appName: String): Unit = {
|
||||||
|
spark = SparkSession.builder()
|
||||||
|
.appName(appName)
|
||||||
|
.master("local[2]")
|
||||||
|
.config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
|
||||||
|
.getOrCreate()
|
||||||
|
sc = spark.sparkContext
|
||||||
|
sc.setLogLevel("ERROR")
|
||||||
|
sqlContext = spark.sqlContext
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user