1
0

[HUDI-2264] Refactor HoodieSparkSqlWriterSuite to add setup and teardown (#3544)

This commit is contained in:
Satish M
2021-08-26 19:31:48 +05:30
committed by GitHub
parent 0f39137ba8
commit 55a80a817d

View File

@@ -19,10 +19,10 @@ package org.apache.hudi
import org.apache.commons.io.FileUtils import org.apache.commons.io.FileUtils
import org.apache.hadoop.fs.Path import org.apache.hadoop.fs.Path
import org.apache.hudi.DataSourceWriteOptions._ import org.apache.hudi.DataSourceWriteOptions.{INSERT_DROP_DUPS, INSERT_OPERATION_OPT_VAL, MOR_TABLE_TYPE_OPT_VAL, OPERATION, TABLE_TYPE}
import org.apache.hudi.client.SparkRDDWriteClient import org.apache.hudi.client.SparkRDDWriteClient
import org.apache.hudi.common.config.HoodieConfig import org.apache.hudi.common.config.HoodieConfig
import org.apache.hudi.common.model._ import org.apache.hudi.common.model.{HoodieFileFormat, HoodieRecord, HoodieRecordPayload, HoodieTableType, WriteOperationType}
import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient, TableSchemaResolver} import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient, TableSchemaResolver}
import org.apache.hudi.common.testutils.HoodieTestDataGenerator import org.apache.hudi.common.testutils.HoodieTestDataGenerator
import org.apache.hudi.config.{HoodieBootstrapConfig, HoodieWriteConfig} import org.apache.hudi.config.{HoodieBootstrapConfig, HoodieWriteConfig}
@@ -37,117 +37,126 @@ import org.apache.spark.api.java.JavaSparkContext
import org.apache.spark.sql.functions.{expr, lit} import org.apache.spark.sql.functions.{expr, lit}
import org.apache.spark.sql.internal.{SQLConf, StaticSQLConf} import org.apache.spark.sql.internal.{SQLConf, StaticSQLConf}
import org.apache.spark.sql.{DataFrame, Dataset, Row, SQLContext, SaveMode, SparkSession} import org.apache.spark.sql.{DataFrame, Dataset, Row, SQLContext, SaveMode, SparkSession}
import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse, assertTrue} import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse, assertTrue, fail}
import org.junit.jupiter.api.{AfterEach, Assertions, BeforeEach, Test}
import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.{CsvSource, EnumSource, ValueSource}
import org.mockito.ArgumentMatchers.any import org.mockito.ArgumentMatchers.any
import org.mockito.Mockito.{spy, times, verify} import org.mockito.Mockito.{spy, times, verify}
import org.scalatest.{FunSuite, Matchers} import org.scalatest.Matchers.{assertResult, be, convertToAnyShouldWrapper, intercept}
import java.time.Instant import java.time.Instant
import java.util import java.util
import java.util.{Collections, Date, UUID} import java.util.{Collections, Date, UUID}
import scala.collection.JavaConversions._ import scala.collection.JavaConversions._
import scala.collection.JavaConverters
class HoodieSparkSqlWriterSuite extends FunSuite with Matchers { /**
* Test suite for SparkSqlWriter class.
*/
class HoodieSparkSqlWriterSuite {
var spark: SparkSession = _ var spark: SparkSession = _
var sc: SparkContext = _
var sqlContext: SQLContext = _ var sqlContext: SQLContext = _
var sc: SparkContext = _
var tempPath: java.nio.file.Path = _
var tempBootStrapPath: java.nio.file.Path = _
var hoodieFooTableName = "hoodie_foo_tbl"
var tempBasePath: String = _
var commonTableModifier: Map[String, String] = Map()
case class StringLongTest(uuid: String, ts: Long)
test("Parameters With Write Defaults") { /**
val originals = HoodieWriterUtils.parametersWithWriteDefaults(Map.empty) * Setup method running before each test.
val rhsKey = "hoodie.right.hand.side.key" */
val rhsVal = "hoodie.right.hand.side.val" @BeforeEach
val modifier = Map(OPERATION.key -> INSERT_OPERATION_OPT_VAL, TABLE_TYPE.key -> MOR_TABLE_TYPE_OPT_VAL, rhsKey -> rhsVal) def setUp() {
val modified = HoodieWriterUtils.parametersWithWriteDefaults(modifier) initSparkContext()
val matcher = (k: String, v: String) => modified(k) should be(v) tempPath = java.nio.file.Files.createTempDirectory("hoodie_test_path")
tempBootStrapPath = java.nio.file.Files.createTempDirectory("hoodie_test_bootstrap")
tempBasePath = tempPath.toAbsolutePath.toString
commonTableModifier = getCommonParams(tempPath, hoodieFooTableName, HoodieTableType.COPY_ON_WRITE.name())
}
originals foreach { /**
case ("hoodie.datasource.write.operation", _) => matcher("hoodie.datasource.write.operation", INSERT_OPERATION_OPT_VAL) * Tear down method running after each test.
case ("hoodie.datasource.write.table.type", _) => matcher("hoodie.datasource.write.table.type", MOR_TABLE_TYPE_OPT_VAL) */
case (`rhsKey`, _) => matcher(rhsKey, rhsVal) @AfterEach
case (k, v) => matcher(k, v) def tearDown(): Unit = {
cleanupSparkContexts()
FileUtils.deleteDirectory(tempPath.toFile)
FileUtils.deleteDirectory(tempBootStrapPath.toFile)
}
/**
* Utility method for initializing the spark context.
*/
def initSparkContext(): Unit = {
spark = SparkSession.builder()
.appName(hoodieFooTableName)
.master("local[2]")
.config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.getOrCreate()
sc = spark.sparkContext
sc.setLogLevel("ERROR")
sqlContext = spark.sqlContext
}
/**
* Utility method for cleaning up spark resources.
*/
def cleanupSparkContexts(): Unit = {
if (sqlContext != null) {
sqlContext.clearCache();
sqlContext = null;
} }
} }
test("throw hoodie exception when invalid serializer") { /**
val session = SparkSession.builder().appName("hoodie_test").master("local").getOrCreate() * Utility method for dropping all hoodie meta related columns.
try { */
val sqlContext = session.sqlContext def dropMetaFields(df: Dataset[Row]): Dataset[Row] = {
val options = Map("path" -> "hoodie/test/path", HoodieWriteConfig.TBL_NAME.key -> "hoodie_test_tbl") df.drop(HoodieRecord.HOODIE_META_COLUMNS.get(0)).drop(HoodieRecord.HOODIE_META_COLUMNS.get(1))
val e = intercept[HoodieException](HoodieSparkSqlWriter.write(sqlContext, SaveMode.ErrorIfExists, options, .drop(HoodieRecord.HOODIE_META_COLUMNS.get(2)).drop(HoodieRecord.HOODIE_META_COLUMNS.get(3))
session.emptyDataFrame)) .drop(HoodieRecord.HOODIE_META_COLUMNS.get(4))
assert(e.getMessage.contains("spark.serializer"))
} finally {
session.stop()
}
} }
test("throw hoodie exception when there already exist a table with different name with Append Save mode") { /**
initSparkContext("test_append_mode") * Utility method for creating common params for writer.
val path = java.nio.file.Files.createTempDirectory("hoodie_test_path") *
try { * @param path Path for hoodie table
val hoodieFooTableName = "hoodie_foo_tbl" * @param hoodieFooTableName Name of hoodie table
//create a new table * @param tableType Type of table
val fooTableModifier = Map("path" -> path.toAbsolutePath.toString, * @return Map of common params
*/
def getCommonParams(path: java.nio.file.Path, hoodieFooTableName: String, tableType: String): Map[String, String] = {
Map("path" -> path.toAbsolutePath.toString,
HoodieWriteConfig.TBL_NAME.key -> hoodieFooTableName, HoodieWriteConfig.TBL_NAME.key -> hoodieFooTableName,
"hoodie.insert.shuffle.parallelism" -> "4", "hoodie.insert.shuffle.parallelism" -> "1",
"hoodie.upsert.shuffle.parallelism" -> "4") "hoodie.upsert.shuffle.parallelism" -> "1",
val fooTableParams = HoodieWriterUtils.parametersWithWriteDefaults(fooTableModifier) DataSourceWriteOptions.TABLE_TYPE.key -> tableType,
val dataFrame = spark.createDataFrame(Seq(Test(UUID.randomUUID().toString, new Date().getTime))) DataSourceWriteOptions.RECORDKEY_FIELD.key -> "_row_key",
HoodieSparkSqlWriter.write(sqlContext, SaveMode.Append, fooTableParams, dataFrame) DataSourceWriteOptions.PARTITIONPATH_FIELD.key -> "partition",
DataSourceWriteOptions.KEYGENERATOR_CLASS_NAME.key -> "org.apache.hudi.keygen.SimpleKeyGenerator")
//on same path try append with different("hoodie_bar_tbl") table name which should throw an exception
val barTableModifier = Map("path" -> path.toAbsolutePath.toString,
HoodieWriteConfig.TBL_NAME.key -> "hoodie_bar_tbl",
"hoodie.insert.shuffle.parallelism" -> "4",
"hoodie.upsert.shuffle.parallelism" -> "4")
val barTableParams = HoodieWriterUtils.parametersWithWriteDefaults(barTableModifier)
val dataFrame2 = spark.createDataFrame(Seq(Test(UUID.randomUUID().toString, new Date().getTime)))
val tableAlreadyExistException = intercept[HoodieException](HoodieSparkSqlWriter.write(sqlContext, SaveMode.Append, barTableParams, dataFrame2))
assert(tableAlreadyExistException.getMessage.contains("hoodie table with name " + hoodieFooTableName + " already exist"))
//on same path try append with delete operation and different("hoodie_bar_tbl") table name which should throw an exception
val deleteTableParams = barTableParams ++ Map(OPERATION.key -> "delete")
val deleteCmdException = intercept[HoodieException](HoodieSparkSqlWriter.write(sqlContext, SaveMode.Append, deleteTableParams, dataFrame2))
assert(deleteCmdException.getMessage.contains("hoodie table with name " + hoodieFooTableName + " already exist"))
} finally {
spark.stop()
FileUtils.deleteDirectory(path.toFile)
}
} }
List(BulkInsertSortMode.GLOBAL_SORT, BulkInsertSortMode.NONE, BulkInsertSortMode.PARTITION_SORT) /**
.foreach(sortMode => { * Utility method for converting list of Row to list of Seq.
test("test_bulk_insert_for_" + sortMode) { *
initSparkContext("test_bulk_insert_datasource") * @param inputList list of Row
val path = java.nio.file.Files.createTempDirectory("hoodie_test_path") * @return list of Seq
try { */
testBulkInsertWithSortMode(sortMode, path) def convertRowListToSeq(inputList: util.List[Row]): Seq[Row] =
} finally { JavaConverters.asScalaIteratorConverter(inputList.iterator).asScala.toSeq
spark.stop()
FileUtils.deleteDirectory(path.toFile)
}
}
})
List(true, false) /**
.foreach(populateMetaFields => { * Utility method for performing bulk insert tests.
test("test_bulk_insert_for_populate_meta_fields_" + populateMetaFields) { *
initSparkContext("test_bulk_insert_datasource_populate_meta_fields") * @param sortMode Bulk insert sort mode
val path = java.nio.file.Files.createTempDirectory("hoodie_test_path_populate_meta_fields") * @param populateMetaFields Flag for populating meta fields
try { */
testBulkInsertWithSortMode(BulkInsertSortMode.NONE, path, populateMetaFields) def testBulkInsertWithSortMode(sortMode: BulkInsertSortMode, populateMetaFields: Boolean = true): Unit = {
} finally {
spark.stop()
FileUtils.deleteDirectory(path.toFile)
}
}
})
def testBulkInsertWithSortMode(sortMode: BulkInsertSortMode, path: java.nio.file.Path, populateMetaFields : Boolean = true) : Unit = {
val hoodieFooTableName = "hoodie_foo_tbl"
//create a new table //create a new table
val fooTableModifier = getCommonParams(path, hoodieFooTableName, HoodieTableType.COPY_ON_WRITE.name()) val fooTableModifier = commonTableModifier.updated("hoodie.bulkinsert.shuffle.parallelism", "4")
.updated("hoodie.bulkinsert.shuffle.parallelism", "4")
.updated(DataSourceWriteOptions.OPERATION.key, DataSourceWriteOptions.BULK_INSERT_OPERATION_OPT_VAL) .updated(DataSourceWriteOptions.OPERATION.key, DataSourceWriteOptions.BULK_INSERT_OPERATION_OPT_VAL)
.updated(DataSourceWriteOptions.ENABLE_ROW_WRITER.key, "true") .updated(DataSourceWriteOptions.ENABLE_ROW_WRITER.key, "true")
.updated(HoodieTableConfig.POPULATE_META_FIELDS.key(), String.valueOf(populateMetaFields)) .updated(HoodieTableConfig.POPULATE_META_FIELDS.key(), String.valueOf(populateMetaFields))
@@ -168,15 +177,14 @@ class HoodieSparkSqlWriterSuite extends FunSuite with Matchers {
// write to Hudi // write to Hudi
HoodieSparkSqlWriter.write(sqlContext, SaveMode.Append, fooTableParams, df) HoodieSparkSqlWriter.write(sqlContext, SaveMode.Append, fooTableParams, df)
// collect all parition paths to issue read of parquet files // collect all partition paths to issue read of parquet files
val partitions = Seq(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH, val partitions = Seq(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH,
HoodieTestDataGenerator.DEFAULT_THIRD_PARTITION_PATH) HoodieTestDataGenerator.DEFAULT_THIRD_PARTITION_PATH)
// Check the entire dataset has all records still // Check the entire dataset has all records still
val fullPartitionPaths = new Array[String](3) val fullPartitionPaths = new Array[String](3)
for (i <- 0 until fullPartitionPaths.length) { for (i <- fullPartitionPaths.indices) {
fullPartitionPaths(i) = String.format("%s/%s/*", path.toAbsolutePath.toString, partitions(i)) fullPartitionPaths(i) = String.format("%s/%s/*", tempBasePath, partitions(i))
} }
// fetch all records from parquet files generated from write to hudi // fetch all records from parquet files generated from write to hudi
val actualDf = sqlContext.read.parquet(fullPartitionPaths(0), fullPartitionPaths(1), fullPartitionPaths(2)) val actualDf = sqlContext.read.parquet(fullPartitionPaths(0), fullPartitionPaths(1), fullPartitionPaths(2))
if (!populateMetaFields) { if (!populateMetaFields) {
@@ -187,16 +195,103 @@ class HoodieSparkSqlWriterSuite extends FunSuite with Matchers {
assert(df.except(trimmedDf).count() == 0) assert(df.except(trimmedDf).count() == 0)
} }
test("test disable and enable meta fields") { /**
initSparkContext("test_disable_enable_meta_fields") * Utility method for performing bulk insert tests.
val path = java.nio.file.Files.createTempDirectory("hoodie_test_path") */
@Test
def testParametersWithWriteDefaults(): Unit = {
val originals = HoodieWriterUtils.parametersWithWriteDefaults(Map.empty)
val rhsKey = "hoodie.right.hand.side.key"
val rhsVal = "hoodie.right.hand.side.val"
val modifier = Map(OPERATION.key -> INSERT_OPERATION_OPT_VAL, TABLE_TYPE.key -> MOR_TABLE_TYPE_OPT_VAL, rhsKey -> rhsVal)
val modified = HoodieWriterUtils.parametersWithWriteDefaults(modifier)
val matcher = (k: String, v: String) => modified(k) should be(v)
originals foreach {
case ("hoodie.datasource.write.operation", _) => matcher("hoodie.datasource.write.operation", INSERT_OPERATION_OPT_VAL)
case ("hoodie.datasource.write.table.type", _) => matcher("hoodie.datasource.write.table.type", MOR_TABLE_TYPE_OPT_VAL)
case (`rhsKey`, _) => matcher(rhsKey, rhsVal)
case (k, v) => matcher(k, v)
}
}
/**
* Test case for invalid serializer provided.
*/
@Test
def testThrowExceptionInvalidSerializer(): Unit = {
spark.stop()
val session = SparkSession.builder().appName("hoodie_test").master("local").getOrCreate()
try { try {
testBulkInsertWithSortMode(BulkInsertSortMode.NONE, path, false) val sqlContext = session.sqlContext
// enabling meta fields back should throw exception val options = Map("path" -> "hoodie/test/path", HoodieWriteConfig.TBL_NAME.key -> "hoodie_test_tbl")
val hoodieFooTableName = "hoodie_foo_tbl" val e = intercept[HoodieException](HoodieSparkSqlWriter.write(sqlContext, SaveMode.ErrorIfExists, options,
session.emptyDataFrame))
assert(e.getMessage.contains("spark.serializer"))
} finally {
session.stop()
initSparkContext()
}
}
/**
* Test case for throw hoodie exception when there already exist a table
* with different name with Append Save mode
*/
@Test
def testThrowExceptionAlreadyExistsWithAppendSaveMode(): Unit = {
//create a new table //create a new table
val fooTableModifier = getCommonParams(path, hoodieFooTableName, HoodieTableType.COPY_ON_WRITE.name()) val fooTableModifier = Map("path" -> tempBasePath, HoodieWriteConfig.TBL_NAME.key -> hoodieFooTableName,
.updated("hoodie.bulkinsert.shuffle.parallelism", "4") "hoodie.insert.shuffle.parallelism" -> "4", "hoodie.upsert.shuffle.parallelism" -> "4")
val fooTableParams = HoodieWriterUtils.parametersWithWriteDefaults(fooTableModifier)
val dataFrame = spark.createDataFrame(Seq(StringLongTest(UUID.randomUUID().toString, new Date().getTime)))
HoodieSparkSqlWriter.write(sqlContext, SaveMode.Append, fooTableParams, dataFrame)
//on same path try append with different("hoodie_bar_tbl") table name which should throw an exception
val barTableModifier = Map("path" -> tempBasePath, HoodieWriteConfig.TBL_NAME.key -> "hoodie_bar_tbl",
"hoodie.insert.shuffle.parallelism" -> "4", "hoodie.upsert.shuffle.parallelism" -> "4")
val barTableParams = HoodieWriterUtils.parametersWithWriteDefaults(barTableModifier)
val dataFrame2 = spark.createDataFrame(Seq(StringLongTest(UUID.randomUUID().toString, new Date().getTime)))
val tableAlreadyExistException = intercept[HoodieException](HoodieSparkSqlWriter.write(sqlContext, SaveMode.Append, barTableParams, dataFrame2))
assert(tableAlreadyExistException.getMessage.contains("hoodie table with name " + hoodieFooTableName + " already exist"))
//on same path try append with delete operation and different("hoodie_bar_tbl") table name which should throw an exception
val deleteTableParams = barTableParams ++ Map(OPERATION.key -> "delete")
val deleteCmdException = intercept[HoodieException](HoodieSparkSqlWriter.write(sqlContext, SaveMode.Append, deleteTableParams, dataFrame2))
assert(deleteCmdException.getMessage.contains("hoodie table with name " + hoodieFooTableName + " already exist"))
}
/**
* Test case for each bulk insert sort mode
*
* @param sortMode Bulk insert sort mode
*/
@ParameterizedTest
@EnumSource(value = classOf[BulkInsertSortMode])
def testBulkInsertForSortMode(sortMode: BulkInsertSortMode): Unit = {
testBulkInsertWithSortMode(sortMode, true)
}
/**
* Test case for Bulk insert with populating meta fields or
* without populating meta fields.
*
* @param populateMetaFields Flag for populating meta fields
*/
@ParameterizedTest
@ValueSource(booleans = Array(true, false))
def testBulkInsertForPopulateMetaFields(populateMetaFields: Boolean): Unit = {
testBulkInsertWithSortMode(BulkInsertSortMode.NONE, populateMetaFields)
}
/**
* Test case for disable and enable meta fields.
*/
@Test
def testDisableAndEnableMetaFields(): Unit = {
try {
testBulkInsertWithSortMode(BulkInsertSortMode.NONE, false)
//create a new table
val fooTableModifier = commonTableModifier.updated("hoodie.bulkinsert.shuffle.parallelism", "4")
.updated(DataSourceWriteOptions.OPERATION.key, DataSourceWriteOptions.BULK_INSERT_OPERATION_OPT_VAL) .updated(DataSourceWriteOptions.OPERATION.key, DataSourceWriteOptions.BULK_INSERT_OPERATION_OPT_VAL)
.updated(DataSourceWriteOptions.ENABLE_ROW_WRITER.key, "true") .updated(DataSourceWriteOptions.ENABLE_ROW_WRITER.key, "true")
.updated(HoodieWriteConfig.BULK_INSERT_SORT_MODE.key(), BulkInsertSortMode.NONE.name()) .updated(HoodieWriteConfig.BULK_INSERT_SORT_MODE.key(), BulkInsertSortMode.NONE.name())
@@ -210,24 +305,21 @@ class HoodieSparkSqlWriterSuite extends FunSuite with Matchers {
try { try {
// write to Hudi // write to Hudi
HoodieSparkSqlWriter.write(sqlContext, SaveMode.Append, fooTableParams, df) HoodieSparkSqlWriter.write(sqlContext, SaveMode.Append, fooTableParams, df)
fail("Should have thrown exception") Assertions.fail("Should have thrown exception")
} catch { } catch {
case e: HoodieException => assertTrue(e.getMessage.contains("hoodie.populate.meta.fields already disabled for the table. Can't be re-enabled back")) case e: HoodieException => assertTrue(e.getMessage.contains("hoodie.populate.meta.fields already disabled for the table. Can't be re-enabled back"))
} }
} finally {
spark.stop()
FileUtils.deleteDirectory(path.toFile)
} }
} }
test("test drop duplicates row writing for bulk_insert") { /**
initSparkContext("test_append_mode") * Test case for drop duplicates row writing for bulk_insert.
val path = java.nio.file.Files.createTempDirectory("hoodie_test_path") */
@Test
def testDropDuplicatesRowForBulkInsert(): Unit = {
try { try {
val hoodieFooTableName = "hoodie_foo_tbl"
//create a new table //create a new table
val fooTableModifier = getCommonParams(path, hoodieFooTableName, HoodieTableType.COPY_ON_WRITE.name()) val fooTableModifier = commonTableModifier.updated("hoodie.bulkinsert.shuffle.parallelism", "4")
.updated("hoodie.bulkinsert.shuffle.parallelism", "4")
.updated(DataSourceWriteOptions.OPERATION.key, DataSourceWriteOptions.BULK_INSERT_OPERATION_OPT_VAL) .updated(DataSourceWriteOptions.OPERATION.key, DataSourceWriteOptions.BULK_INSERT_OPERATION_OPT_VAL)
.updated(DataSourceWriteOptions.ENABLE_ROW_WRITER.key, "true") .updated(DataSourceWriteOptions.ENABLE_ROW_WRITER.key, "true")
.updated(INSERT_DROP_DUPS.key, "true") .updated(INSERT_DROP_DUPS.key, "true")
@@ -244,22 +336,17 @@ class HoodieSparkSqlWriterSuite extends FunSuite with Matchers {
fail("Drop duplicates with bulk insert in row writing should have thrown exception") fail("Drop duplicates with bulk insert in row writing should have thrown exception")
} catch { } catch {
case e: HoodieException => assertTrue(e.getMessage.contains("Dropping duplicates with bulk_insert in row writer path is not supported yet")) case e: HoodieException => assertTrue(e.getMessage.contains("Dropping duplicates with bulk_insert in row writer path is not supported yet"))
} finally {
spark.stop()
FileUtils.deleteDirectory(path.toFile)
} }
} }
test("test insert dataset without precombine field") { /**
initSparkContext("test_bulk_insert_datasource") * Test case for insert dataset without precombine field.
val path = java.nio.file.Files.createTempDirectory("hoodie_test_path") */
try { @Test
val sqlContext = spark.sqlContext def testInsertDatasetWithoutPrecombineField(): Unit = {
val sc = spark.sparkContext
val hoodieFooTableName = "hoodie_foo_tbl"
//create a new table //create a new table
val fooTableModifier = getCommonParams(path, hoodieFooTableName, HoodieTableType.COPY_ON_WRITE.name()) val fooTableModifier = commonTableModifier.updated(DataSourceWriteOptions.OPERATION.key, DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL)
.updated(DataSourceWriteOptions.OPERATION.key, DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL)
.updated(DataSourceWriteOptions.INSERT_DROP_DUPS.key, "false") .updated(DataSourceWriteOptions.INSERT_DROP_DUPS.key, "false")
val fooTableParams = HoodieWriterUtils.parametersWithWriteDefaults(fooTableModifier) val fooTableParams = HoodieWriterUtils.parametersWithWriteDefaults(fooTableModifier)
@@ -272,13 +359,13 @@ class HoodieSparkSqlWriterSuite extends FunSuite with Matchers {
// write to Hudi // write to Hudi
HoodieSparkSqlWriter.write(sqlContext, SaveMode.Append, fooTableParams - DataSourceWriteOptions.PRECOMBINE_FIELD.key, df) HoodieSparkSqlWriter.write(sqlContext, SaveMode.Append, fooTableParams - DataSourceWriteOptions.PRECOMBINE_FIELD.key, df)
// collect all parition paths to issue read of parquet files // collect all partition paths to issue read of parquet files
val partitions = Seq(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH, val partitions = Seq(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH,
HoodieTestDataGenerator.DEFAULT_THIRD_PARTITION_PATH) HoodieTestDataGenerator.DEFAULT_THIRD_PARTITION_PATH)
// Check the entire dataset has all records still // Check the entire dataset has all records still
val fullPartitionPaths = new Array[String](3) val fullPartitionPaths = new Array[String](3)
for (i <- 0 until fullPartitionPaths.length) { for (i <- fullPartitionPaths.indices) {
fullPartitionPaths(i) = String.format("%s/%s/*", path.toAbsolutePath.toString, partitions(i)) fullPartitionPaths(i) = String.format("%s/%s/*", tempBasePath, partitions(i))
} }
// fetch all records from parquet files generated from write to hudi // fetch all records from parquet files generated from write to hudi
@@ -286,34 +373,27 @@ class HoodieSparkSqlWriterSuite extends FunSuite with Matchers {
// remove metadata columns so that expected and actual DFs can be compared as is // remove metadata columns so that expected and actual DFs can be compared as is
val trimmedDf = dropMetaFields(actualDf) val trimmedDf = dropMetaFields(actualDf)
assert(df.except(trimmedDf).count() == 0) assert(df.except(trimmedDf).count() == 0)
} finally {
spark.stop()
FileUtils.deleteDirectory(path.toFile)
}
} }
test("test bulk insert dataset with datasource impl multiple rounds") { /**
initSparkContext("test_bulk_insert_datasource") * Test case for bulk insert dataset with datasource impl multiple rounds.
val path = java.nio.file.Files.createTempDirectory("hoodie_test_path") */
try { @Test
val hoodieFooTableName = "hoodie_foo_tbl" def testBulkInsertDatasetWithDatasourceImplMultipleRounds(): Unit = {
val fooTableModifier = getCommonParams(path, hoodieFooTableName, HoodieTableType.COPY_ON_WRITE.name())
.updated("hoodie.bulkinsert.shuffle.parallelism", "4") val fooTableModifier = commonTableModifier.updated("hoodie.bulkinsert.shuffle.parallelism", "4")
.updated(DataSourceWriteOptions.OPERATION.key, DataSourceWriteOptions.BULK_INSERT_OPERATION_OPT_VAL) .updated(DataSourceWriteOptions.OPERATION.key, DataSourceWriteOptions.BULK_INSERT_OPERATION_OPT_VAL)
.updated(DataSourceWriteOptions.ENABLE_ROW_WRITER.key, "true") .updated(DataSourceWriteOptions.ENABLE_ROW_WRITER.key, "true")
val fooTableParams = HoodieWriterUtils.parametersWithWriteDefaults(fooTableModifier) val fooTableParams = HoodieWriterUtils.parametersWithWriteDefaults(fooTableModifier)
val partitions = Seq(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH, val partitions = Seq(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH,
HoodieTestDataGenerator.DEFAULT_THIRD_PARTITION_PATH) HoodieTestDataGenerator.DEFAULT_THIRD_PARTITION_PATH)
val fullPartitionPaths = new Array[String](3) val fullPartitionPaths = new Array[String](3)
for (i <- 0 to 2) { for (i <- 0 to 2) {
fullPartitionPaths(i) = String.format("%s/%s/*", path.toAbsolutePath.toString, partitions(i)) fullPartitionPaths(i) = String.format("%s/%s/*", tempBasePath, partitions(i))
} }
val schema = DataSourceTestUtils.getStructTypeExampleSchema val schema = DataSourceTestUtils.getStructTypeExampleSchema
val structType = AvroConversionUtils.convertAvroSchemaToStructType(schema) val structType = AvroConversionUtils.convertAvroSchemaToStructType(schema)
var totalExpectedDf = spark.createDataFrame(sc.emptyRDD[Row], structType) var totalExpectedDf = spark.createDataFrame(sc.emptyRDD[Row], structType)
for (_ <- 0 to 2) { for (_ <- 0 to 2) {
// generate the inserts // generate the inserts
val records = DataSourceTestUtils.generateRandomRows(200) val records = DataSourceTestUtils.generateRandomRows(200)
@@ -321,7 +401,6 @@ class HoodieSparkSqlWriterSuite extends FunSuite with Matchers {
val df = spark.createDataFrame(sc.parallelize(recordsSeq), structType) val df = spark.createDataFrame(sc.parallelize(recordsSeq), structType)
// write to Hudi // write to Hudi
HoodieSparkSqlWriter.write(sqlContext, SaveMode.Append, fooTableParams, df) HoodieSparkSqlWriter.write(sqlContext, SaveMode.Append, fooTableParams, df)
// Fetch records from entire dataset // Fetch records from entire dataset
val actualDf = sqlContext.read.parquet(fullPartitionPaths(0), fullPartitionPaths(1), fullPartitionPaths(2)) val actualDf = sqlContext.read.parquet(fullPartitionPaths(0), fullPartitionPaths(1), fullPartitionPaths(2))
// remove metadata columns so that expected and actual DFs can be compared as is // remove metadata columns so that expected and actual DFs can be compared as is
@@ -331,26 +410,24 @@ class HoodieSparkSqlWriterSuite extends FunSuite with Matchers {
// find mismatch between actual and expected df // find mismatch between actual and expected df
assert(totalExpectedDf.except(trimmedDf).count() == 0) assert(totalExpectedDf.except(trimmedDf).count() == 0)
} }
} finally {
spark.stop()
FileUtils.deleteDirectory(path.toFile)
}
} }
List((DataSourceWriteOptions.COW_TABLE_TYPE_OPT_VAL, HoodieFileFormat.PARQUET.name(), true), (DataSourceWriteOptions.COW_TABLE_TYPE_OPT_VAL, HoodieFileFormat.ORC.name(), true), /**
(DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL, HoodieFileFormat.PARQUET.name(), true), (DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL, HoodieFileFormat.ORC.name(), true), * Test cases for basic HoodieSparkSqlWriter functionality with datasource insert
(DataSourceWriteOptions.COW_TABLE_TYPE_OPT_VAL, HoodieFileFormat.PARQUET.name(), false), (DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL, HoodieFileFormat.PARQUET.name(), false)) * for different tableTypes, fileFormats and options for population meta fields.
.foreach(t => { *
val tableType = t._1 * @param tableType Type of table
val baseFileFormat = t._2 * @param baseFileFormat File format
val populateMetaFields = t._3 * @param populateMetaFields Flag for populating meta fields
test("test basic HoodieSparkSqlWriter functionality with datasource insert for " + tableType + " with " + baseFileFormat + " as the base file format " */
+ " with populate meta fields " + populateMetaFields) { @ParameterizedTest
initSparkContext("test_insert_base_file_format_datasource") @CsvSource(
val path = java.nio.file.Files.createTempDirectory("hoodie_test_path") Array("COPY_ON_WRITE,parquet,true", "COPY_ON_WRITE,parquet,false", "MERGE_ON_READ,parquet,true", "MERGE_ON_READ,parquet,false",
try { "COPY_ON_WRITE,orc,true", "COPY_ON_WRITE,orc,false", "MERGE_ON_READ,orc,true", "MERGE_ON_READ,orc,false"
))
def testDatasourceInsertForTableTypeBaseFileMetaFields(tableType: String, baseFileFormat: String, populateMetaFields: Boolean): Unit = {
val hoodieFooTableName = "hoodie_foo_tbl" val hoodieFooTableName = "hoodie_foo_tbl"
val fooTableModifier = Map("path" -> path.toAbsolutePath.toString, val fooTableModifier = Map("path" -> tempBasePath,
HoodieWriteConfig.TBL_NAME.key -> hoodieFooTableName, HoodieWriteConfig.TBL_NAME.key -> hoodieFooTableName,
HoodieWriteConfig.BASE_FILE_FORMAT.key -> baseFileFormat, HoodieWriteConfig.BASE_FILE_FORMAT.key -> baseFileFormat,
DataSourceWriteOptions.TABLE_TYPE.key -> tableType, DataSourceWriteOptions.TABLE_TYPE.key -> tableType,
@@ -361,7 +438,6 @@ class HoodieSparkSqlWriterSuite extends FunSuite with Matchers {
HoodieTableConfig.POPULATE_META_FIELDS.key() -> String.valueOf(populateMetaFields), HoodieTableConfig.POPULATE_META_FIELDS.key() -> String.valueOf(populateMetaFields),
DataSourceWriteOptions.KEYGENERATOR_CLASS_NAME.key -> classOf[SimpleKeyGenerator].getCanonicalName) DataSourceWriteOptions.KEYGENERATOR_CLASS_NAME.key -> classOf[SimpleKeyGenerator].getCanonicalName)
val fooTableParams = HoodieWriterUtils.parametersWithWriteDefaults(fooTableModifier) val fooTableParams = HoodieWriterUtils.parametersWithWriteDefaults(fooTableModifier)
// generate the inserts // generate the inserts
val schema = DataSourceTestUtils.getStructTypeExampleSchema val schema = DataSourceTestUtils.getStructTypeExampleSchema
val structType = AvroConversionUtils.convertAvroSchemaToStructType(schema) val structType = AvroConversionUtils.convertAvroSchemaToStructType(schema)
@@ -369,16 +445,11 @@ class HoodieSparkSqlWriterSuite extends FunSuite with Matchers {
val records = DataSourceTestUtils.generateRandomRows(100) val records = DataSourceTestUtils.generateRandomRows(100)
val recordsSeq = convertRowListToSeq(records) val recordsSeq = convertRowListToSeq(records)
val df = spark.createDataFrame(sc.parallelize(recordsSeq), structType) val df = spark.createDataFrame(sc.parallelize(recordsSeq), structType)
val client = spy(DataSourceUtils.createHoodieClient( val client = spy(DataSourceUtils.createHoodieClient(
new JavaSparkContext(sc), new JavaSparkContext(sc), modifiedSchema.toString, tempBasePath, hoodieFooTableName,
modifiedSchema.toString,
path.toAbsolutePath.toString,
hoodieFooTableName,
mapAsJavaMap(fooTableParams)).asInstanceOf[SparkRDDWriteClient[HoodieRecordPayload[Nothing]]]) mapAsJavaMap(fooTableParams)).asInstanceOf[SparkRDDWriteClient[HoodieRecordPayload[Nothing]]])
HoodieSparkSqlWriter.write(sqlContext, SaveMode.Append, fooTableParams, df, Option.empty, HoodieSparkSqlWriter.write(sqlContext, SaveMode.Append, fooTableParams, df, Option.empty, Option(client))
Option(client))
// Verify that asynchronous compaction is not scheduled // Verify that asynchronous compaction is not scheduled
verify(client, times(0)).scheduleCompaction(any()) verify(client, times(0)).scheduleCompaction(any())
// Verify that HoodieWriteClient is closed correctly // Verify that HoodieWriteClient is closed correctly
@@ -390,9 +461,8 @@ class HoodieSparkSqlWriterSuite extends FunSuite with Matchers {
// Check the entire dataset has all records still // Check the entire dataset has all records still
val fullPartitionPaths = new Array[String](3) val fullPartitionPaths = new Array[String](3)
for (i <- fullPartitionPaths.indices) { for (i <- fullPartitionPaths.indices) {
fullPartitionPaths(i) = String.format("%s/%s/*", path.toAbsolutePath.toString, partitions(i)) fullPartitionPaths(i) = String.format("%s/%s/*", tempBasePath, partitions(i))
} }
// fetch all records from parquet files generated from write to hudi // fetch all records from parquet files generated from write to hudi
var actualDf: DataFrame = null var actualDf: DataFrame = null
if (baseFileFormat.equalsIgnoreCase(HoodieFileFormat.PARQUET.name())) { if (baseFileFormat.equalsIgnoreCase(HoodieFileFormat.PARQUET.name())) {
@@ -403,30 +473,25 @@ class HoodieSparkSqlWriterSuite extends FunSuite with Matchers {
// remove metadata columns so that expected and actual DFs can be compared as is // remove metadata columns so that expected and actual DFs can be compared as is
val trimmedDf = dropMetaFields(actualDf) val trimmedDf = dropMetaFields(actualDf)
assert(df.except(trimmedDf).count() == 0) assert(df.except(trimmedDf).count() == 0)
} finally {
spark.stop()
FileUtils.deleteDirectory(path.toFile)
} }
}
})
List(DataSourceWriteOptions.COW_TABLE_TYPE_OPT_VAL, DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL) /**
.foreach(tableType => { * Test cases for HoodieSparkSqlWriter functionality with datasource bootstrap
test("test HoodieSparkSqlWriter functionality with datasource bootstrap for " + tableType) { * for different type of tables.
initSparkContext("test_bootstrap_datasource") *
val path = java.nio.file.Files.createTempDirectory("hoodie_test_path") * @param tableType Type of table
*/
@ParameterizedTest
@ValueSource(strings = Array("COPY_ON_WRITE", "MERGE_ON_READ"))
def testWithDatasourceBootstrapForTableType(tableType: String): Unit = {
val srcPath = java.nio.file.Files.createTempDirectory("hoodie_bootstrap_source_path") val srcPath = java.nio.file.Files.createTempDirectory("hoodie_bootstrap_source_path")
try { try {
val hoodieFooTableName = "hoodie_foo_tbl"
val sourceDF = TestBootstrap.generateTestRawTripDataset(Instant.now.toEpochMilli, 0, 100, Collections.emptyList(), sc, val sourceDF = TestBootstrap.generateTestRawTripDataset(Instant.now.toEpochMilli, 0, 100, Collections.emptyList(), sc,
spark.sqlContext) spark.sqlContext)
// Write source data non-partitioned // Write source data non-partitioned
sourceDF.write sourceDF.write.format("parquet").mode(SaveMode.Overwrite).save(srcPath.toAbsolutePath.toString)
.format("parquet")
.mode(SaveMode.Overwrite)
.save(srcPath.toAbsolutePath.toString)
val fooTableModifier = Map("path" -> path.toAbsolutePath.toString, val fooTableModifier = Map("path" -> tempBasePath,
HoodieBootstrapConfig.BASE_PATH.key -> srcPath.toAbsolutePath.toString, HoodieBootstrapConfig.BASE_PATH.key -> srcPath.toAbsolutePath.toString,
HoodieWriteConfig.TBL_NAME.key -> hoodieFooTableName, HoodieWriteConfig.TBL_NAME.key -> hoodieFooTableName,
DataSourceWriteOptions.TABLE_TYPE.key -> tableType, DataSourceWriteOptions.TABLE_TYPE.key -> tableType,
@@ -440,7 +505,7 @@ class HoodieSparkSqlWriterSuite extends FunSuite with Matchers {
val client = spy(DataSourceUtils.createHoodieClient( val client = spy(DataSourceUtils.createHoodieClient(
new JavaSparkContext(sc), new JavaSparkContext(sc),
null, null,
path.toAbsolutePath.toString, tempBasePath,
hoodieFooTableName, hoodieFooTableName,
mapAsJavaMap(fooTableParams)).asInstanceOf[SparkRDDWriteClient[HoodieRecordPayload[Nothing]]]) mapAsJavaMap(fooTableParams)).asInstanceOf[SparkRDDWriteClient[HoodieRecordPayload[Nothing]]])
@@ -450,38 +515,36 @@ class HoodieSparkSqlWriterSuite extends FunSuite with Matchers {
// Verify that HoodieWriteClient is closed correctly // Verify that HoodieWriteClient is closed correctly
verify(client, times(1)).close() verify(client, times(1)).close()
// fetch all records from parquet files generated from write to hudi // fetch all records from parquet files generated from write to hudi
val actualDf = sqlContext.read.parquet(path.toAbsolutePath.toString) val actualDf = sqlContext.read.parquet(tempBasePath)
assert(actualDf.count == 100) assert(actualDf.count == 100)
} finally { } finally {
spark.stop()
FileUtils.deleteDirectory(path.toFile)
FileUtils.deleteDirectory(srcPath.toFile) FileUtils.deleteDirectory(srcPath.toFile)
} }
} }
})
List(DataSourceWriteOptions.COW_TABLE_TYPE_OPT_VAL, DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL) /**
.foreach(tableType => { * Test cases for schema evolution in different types of tables.
test("test schema evolution for " + tableType) { *
initSparkContext("test_schema_evolution") * @param tableType Type of table
val path = java.nio.file.Files.createTempDirectory("hoodie_test_path_schema_evol") */
try { @ParameterizedTest
val hoodieFooTableName = "hoodie_foo_tbl_schema_evolution_" + tableType @ValueSource(strings = Array("COPY_ON_WRITE", "MERGE_ON_READ"))
def testSchemaEvolutionForTableType(tableType: String): Unit = {
//create a new table //create a new table
val fooTableModifier = getCommonParams(path, hoodieFooTableName, tableType) val fooTableModifier = getCommonParams(tempPath, hoodieFooTableName, tableType)
.updated(DataSourceWriteOptions.RECONCILE_SCHEMA.key, "true") .updated(DataSourceWriteOptions.RECONCILE_SCHEMA.key, "true")
val fooTableParams = HoodieWriterUtils.parametersWithWriteDefaults(fooTableModifier) val fooTableParams = HoodieWriterUtils.parametersWithWriteDefaults(fooTableModifier)
// generate the inserts // generate the inserts
var schema = DataSourceTestUtils.getStructTypeExampleSchema val schema = DataSourceTestUtils.getStructTypeExampleSchema
var structType = AvroConversionUtils.convertAvroSchemaToStructType(schema) val structType = AvroConversionUtils.convertAvroSchemaToStructType(schema)
var records = DataSourceTestUtils.generateRandomRows(10) var records = DataSourceTestUtils.generateRandomRows(10)
var recordsSeq = convertRowListToSeq(records) var recordsSeq = convertRowListToSeq(records)
var df1 = spark.createDataFrame(sc.parallelize(recordsSeq), structType) val df1 = spark.createDataFrame(sc.parallelize(recordsSeq), structType)
HoodieSparkSqlWriter.write(sqlContext, SaveMode.Overwrite, fooTableParams, df1) HoodieSparkSqlWriter.write(sqlContext, SaveMode.Overwrite, fooTableParams, df1)
val snapshotDF1 = spark.read.format("org.apache.hudi") val snapshotDF1 = spark.read.format("org.apache.hudi")
.load(path.toAbsolutePath.toString + "/*/*/*/*") .load(tempBasePath + "/*/*/*/*")
assertEquals(10, snapshotDF1.count()) assertEquals(10, snapshotDF1.count())
// remove metadata columns so that expected and actual DFs can be compared as is // remove metadata columns so that expected and actual DFs can be compared as is
@@ -494,7 +557,7 @@ class HoodieSparkSqlWriterSuite extends FunSuite with Matchers {
HoodieSparkSqlWriter.write(sqlContext, SaveMode.Append, fooTableParams, updatesDf) HoodieSparkSqlWriter.write(sqlContext, SaveMode.Append, fooTableParams, updatesDf)
val snapshotDF2 = spark.read.format("org.apache.hudi") val snapshotDF2 = spark.read.format("org.apache.hudi")
.load(path.toAbsolutePath.toString + "/*/*/*/*") .load(tempBasePath + "/*/*/*/*")
assertEquals(10, snapshotDF2.count()) assertEquals(10, snapshotDF2.count())
// remove metadata columns so that expected and actual DFs can be compared as is // remove metadata columns so that expected and actual DFs can be compared as is
@@ -512,7 +575,7 @@ class HoodieSparkSqlWriterSuite extends FunSuite with Matchers {
HoodieSparkSqlWriter.write(sqlContext, SaveMode.Append, fooTableParams, df3) HoodieSparkSqlWriter.write(sqlContext, SaveMode.Append, fooTableParams, df3)
val snapshotDF3 = spark.read.format("org.apache.hudi") val snapshotDF3 = spark.read.format("org.apache.hudi")
.load(path.toAbsolutePath.toString + "/*/*/*/*") .load(tempBasePath + "/*/*/*/*")
assertEquals(15, snapshotDF3.count()) assertEquals(15, snapshotDF3.count())
// remove metadata columns so that expected and actual DFs can be compared as is // remove metadata columns so that expected and actual DFs can be compared as is
@@ -527,28 +590,25 @@ class HoodieSparkSqlWriterSuite extends FunSuite with Matchers {
HoodieSparkSqlWriter.write(sqlContext, SaveMode.Append, fooTableParams, df4) HoodieSparkSqlWriter.write(sqlContext, SaveMode.Append, fooTableParams, df4)
val snapshotDF4 = spark.read.format("org.apache.hudi") val snapshotDF4 = spark.read.format("org.apache.hudi")
.load(path.toAbsolutePath.toString + "/*/*/*/*") .load(tempBasePath + "/*/*/*/*")
assertEquals(25, snapshotDF4.count()) assertEquals(25, snapshotDF4.count())
val tableMetaClient = HoodieTableMetaClient.builder().setConf(spark.sparkContext.hadoopConfiguration) val tableMetaClient = HoodieTableMetaClient.builder().setConf(spark.sparkContext.hadoopConfiguration)
.setBasePath(path.toAbsolutePath.toString).build() .setBasePath(tempBasePath).build()
val actualSchema = new TableSchemaResolver(tableMetaClient).getTableAvroSchemaWithoutMetadataFields val actualSchema = new TableSchemaResolver(tableMetaClient).getTableAvroSchemaWithoutMetadataFields
assertTrue(actualSchema != null) assertTrue(actualSchema != null)
val (structName, nameSpace) = AvroConversionUtils.getAvroRecordNameAndNamespace(hoodieFooTableName) val (structName, nameSpace) = AvroConversionUtils.getAvroRecordNameAndNamespace(hoodieFooTableName)
val expectedSchema = AvroConversionUtils.convertStructTypeToAvroSchema(evolStructType, structName, nameSpace) val expectedSchema = AvroConversionUtils.convertStructTypeToAvroSchema(evolStructType, structName, nameSpace)
assertEquals(expectedSchema, actualSchema) assertEquals(expectedSchema, actualSchema)
} finally {
spark.stop()
FileUtils.deleteDirectory(path.toFile)
} }
}
})
test("Test build sync config for spark sql") { /**
initSparkContext("test build sync config") * Test case for build sync config for spark sql.
val basePath = "/tmp/hoodie_test" */
@Test
def testBuildSyncConfigForSparkSql(): Unit = {
val params = Map( val params = Map(
"path" -> basePath, "path" -> tempBasePath,
DataSourceWriteOptions.TABLE_NAME.key -> "test_hoodie", DataSourceWriteOptions.TABLE_NAME.key -> "test_hoodie",
DataSourceWriteOptions.HIVE_PARTITION_FIELDS.key -> "partition", DataSourceWriteOptions.HIVE_PARTITION_FIELDS.key -> "partition",
DataSourceWriteOptions.HIVE_SKIP_RO_SUFFIX_FOR_READ_OPTIMIZED_TABLE.key -> "true", DataSourceWriteOptions.HIVE_SKIP_RO_SUFFIX_FOR_READ_OPTIMIZED_TABLE.key -> "true",
@@ -563,18 +623,20 @@ class HoodieSparkSqlWriterSuite extends FunSuite with Matchers {
buildSyncConfigMethod.setAccessible(true) buildSyncConfigMethod.setAccessible(true)
val hiveSyncConfig = buildSyncConfigMethod.invoke(HoodieSparkSqlWriter, val hiveSyncConfig = buildSyncConfigMethod.invoke(HoodieSparkSqlWriter,
new Path(basePath), hoodieConfig, spark.sessionState.conf).asInstanceOf[HiveSyncConfig] new Path(tempBasePath), hoodieConfig, spark.sessionState.conf).asInstanceOf[HiveSyncConfig]
assertTrue(hiveSyncConfig.skipROSuffix) assertTrue(hiveSyncConfig.skipROSuffix)
assertTrue(hiveSyncConfig.createManagedTable) assertTrue(hiveSyncConfig.createManagedTable)
assertTrue(hiveSyncConfig.syncAsSparkDataSourceTable) assertTrue(hiveSyncConfig.syncAsSparkDataSourceTable)
assertResult(spark.sessionState.conf.getConf(StaticSQLConf.SCHEMA_STRING_LENGTH_THRESHOLD))(hiveSyncConfig.sparkSchemaLengthThreshold) assertResult(spark.sessionState.conf.getConf(StaticSQLConf.SCHEMA_STRING_LENGTH_THRESHOLD))(hiveSyncConfig.sparkSchemaLengthThreshold)
} }
test("Test build sync config for skip Ro Suffix vals") { /**
initSparkContext("test build sync config for skip Ro suffix vals") * Test case for build sync config for skip Ro Suffix values.
val basePath = "/tmp/hoodie_test" */
@Test
def testBuildSyncConfigForSkipRoSuffixValues(): Unit = {
val params = Map( val params = Map(
"path" -> basePath, "path" -> tempBasePath,
DataSourceWriteOptions.TABLE_NAME.key -> "test_hoodie", DataSourceWriteOptions.TABLE_NAME.key -> "test_hoodie",
DataSourceWriteOptions.HIVE_PARTITION_FIELDS.key -> "partition" DataSourceWriteOptions.HIVE_PARTITION_FIELDS.key -> "partition"
) )
@@ -584,38 +646,18 @@ class HoodieSparkSqlWriterSuite extends FunSuite with Matchers {
HoodieSparkSqlWriter.getClass.getDeclaredMethod("buildSyncConfig", classOf[Path], HoodieSparkSqlWriter.getClass.getDeclaredMethod("buildSyncConfig", classOf[Path],
classOf[HoodieConfig], classOf[SQLConf]) classOf[HoodieConfig], classOf[SQLConf])
buildSyncConfigMethod.setAccessible(true) buildSyncConfigMethod.setAccessible(true)
val hiveSyncConfig = buildSyncConfigMethod.invoke(HoodieSparkSqlWriter, val hiveSyncConfig = buildSyncConfigMethod.invoke(HoodieSparkSqlWriter,
new Path(basePath), hoodieConfig, spark.sessionState.conf).asInstanceOf[HiveSyncConfig] new Path(tempBasePath), hoodieConfig, spark.sessionState.conf).asInstanceOf[HiveSyncConfig]
assertFalse(hiveSyncConfig.skipROSuffix) assertFalse(hiveSyncConfig.skipROSuffix)
} }
case class Test(uuid: String, ts: Long) /**
* Test case for incremental view with replacement.
import scala.collection.JavaConverters */
@Test
def convertRowListToSeq(inputList: util.List[Row]): Seq[Row] = def testIncrementalViewWithReplacement(): Unit = {
JavaConverters.asScalaIteratorConverter(inputList.iterator).asScala.toSeq
def initSparkContext(appName: String): Unit = {
spark = SparkSession.builder()
.appName(appName)
.master("local[2]")
.config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.getOrCreate()
sc = spark.sparkContext
sc.setLogLevel("ERROR")
sqlContext = spark.sqlContext
}
test("test Incremental View WithReplacement") {
List(DataSourceWriteOptions.COW_TABLE_TYPE_OPT_VAL, DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL).foreach { tableType => List(DataSourceWriteOptions.COW_TABLE_TYPE_OPT_VAL, DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL).foreach { tableType =>
initSparkContext("testNonPartitionTableWithMetaTable") val baseBootStrapPath = tempBootStrapPath.toAbsolutePath.toString
initSparkContext("test_schema_evolution")
val path = java.nio.file.Files.createTempDirectory("hoodie_test_path")
val bootStrapPath = java.nio.file.Files.createTempDirectory("hoodie_test_bootstrap")
val basePath = path.toAbsolutePath.toString
val baseBootStrapPath = bootStrapPath.toAbsolutePath.toString
val options = Map(DataSourceWriteOptions.TABLE_TYPE.key -> tableType, val options = Map(DataSourceWriteOptions.TABLE_TYPE.key -> tableType,
DataSourceWriteOptions.PRECOMBINE_FIELD.key -> "col3", DataSourceWriteOptions.PRECOMBINE_FIELD.key -> "col3",
DataSourceWriteOptions.RECORDKEY_FIELD.key -> "keyid", DataSourceWriteOptions.RECORDKEY_FIELD.key -> "keyid",
@@ -632,20 +674,20 @@ class HoodieSparkSqlWriterSuite extends FunSuite with Matchers {
.options(options) .options(options)
.option(DataSourceWriteOptions.OPERATION.key, "insert") .option(DataSourceWriteOptions.OPERATION.key, "insert")
.option("hoodie.insert.shuffle.parallelism", "4") .option("hoodie.insert.shuffle.parallelism", "4")
.mode(SaveMode.Overwrite).save(basePath) .mode(SaveMode.Overwrite).save(tempBasePath)
df.write.format("hudi") df.write.format("hudi")
.options(options) .options(options)
.option(DataSourceWriteOptions.OPERATION.key, "insert_overwrite_table") .option(DataSourceWriteOptions.OPERATION.key, "insert_overwrite_table")
.option("hoodie.insert.shuffle.parallelism", "4") .option("hoodie.insert.shuffle.parallelism", "4")
.mode(SaveMode.Append).save(basePath) .mode(SaveMode.Append).save(tempBasePath)
val currentCommits = spark.read.format("hudi").load(basePath).select("_hoodie_commit_time").take(1).map(_.getString(0)) val currentCommits = spark.read.format("hudi").load(tempBasePath).select("_hoodie_commit_time").take(1).map(_.getString(0))
val incrementalKeyIdNum = spark.read.format("hudi") val incrementalKeyIdNum = spark.read.format("hudi")
.option(DataSourceReadOptions.QUERY_TYPE.key, DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL) .option(DataSourceReadOptions.QUERY_TYPE.key, DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL)
.option(DataSourceReadOptions.BEGIN_INSTANTTIME.key, "0000") .option(DataSourceReadOptions.BEGIN_INSTANTTIME.key, "0000")
.option(DataSourceReadOptions.END_INSTANTTIME.key, currentCommits(0)) .option(DataSourceReadOptions.END_INSTANTTIME.key, currentCommits(0))
.load(basePath).select("keyid").orderBy("keyid").count .load(tempBasePath).select("keyid").orderBy("keyid").count
assert(incrementalKeyIdNum == 1000) assert(incrementalKeyIdNum == 1000)
df.write.mode(SaveMode.Overwrite).save(baseBootStrapPath) df.write.mode(SaveMode.Overwrite).save(baseBootStrapPath)
@@ -655,38 +697,30 @@ class HoodieSparkSqlWriterSuite extends FunSuite with Matchers {
.option(HoodieBootstrapConfig.KEYGEN_CLASS_NAME.key, classOf[NonpartitionedKeyGenerator].getCanonicalName) .option(HoodieBootstrapConfig.KEYGEN_CLASS_NAME.key, classOf[NonpartitionedKeyGenerator].getCanonicalName)
.option(DataSourceWriteOptions.OPERATION.key, DataSourceWriteOptions.BOOTSTRAP_OPERATION_OPT_VAL) .option(DataSourceWriteOptions.OPERATION.key, DataSourceWriteOptions.BOOTSTRAP_OPERATION_OPT_VAL)
.option(HoodieBootstrapConfig.PARALLELISM_VALUE.key, "4") .option(HoodieBootstrapConfig.PARALLELISM_VALUE.key, "4")
.mode(SaveMode.Overwrite).save(basePath) .mode(SaveMode.Overwrite).save(tempBasePath)
df.write.format("hudi").options(options)
df.write.format("hudi")
.options(options)
.option(DataSourceWriteOptions.OPERATION.key, "insert_overwrite_table") .option(DataSourceWriteOptions.OPERATION.key, "insert_overwrite_table")
.option("hoodie.insert.shuffle.parallelism", "4") .option("hoodie.insert.shuffle.parallelism", "4").mode(SaveMode.Append).save(tempBasePath)
.mode(SaveMode.Append).save(basePath) val currentCommitsBootstrap = spark.read.format("hudi").load(tempBasePath).select("_hoodie_commit_time").take(1).map(_.getString(0))
val currentCommitsBootstrap = spark.read.format("hudi").load(basePath).select("_hoodie_commit_time").take(1).map(_.getString(0))
val incrementalKeyIdNumBootstrap = spark.read.format("hudi") val incrementalKeyIdNumBootstrap = spark.read.format("hudi")
.option(DataSourceReadOptions.QUERY_TYPE.key, DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL) .option(DataSourceReadOptions.QUERY_TYPE.key, DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL)
.option(DataSourceReadOptions.BEGIN_INSTANTTIME.key, "0000") .option(DataSourceReadOptions.BEGIN_INSTANTTIME.key, "0000")
.option(DataSourceReadOptions.END_INSTANTTIME.key, currentCommitsBootstrap(0)) .option(DataSourceReadOptions.END_INSTANTTIME.key, currentCommitsBootstrap(0))
.load(basePath).select("keyid").orderBy("keyid").count .load(tempBasePath).select("keyid").orderBy("keyid").count
assert(incrementalKeyIdNumBootstrap == 1000) assert(incrementalKeyIdNumBootstrap == 1000)
} finally {
spark.stop()
FileUtils.deleteDirectory(path.toFile)
FileUtils.deleteDirectory(bootStrapPath.toFile)
} }
} }
} }
List(true, false) /**
.foreach(usePartitionsToDeleteConfig => { * Test case for deletion of partitions.
test("test delete partitions for " + usePartitionsToDeleteConfig) { * @param usePartitionsToDeleteConfig Flag for if use partitions to delete config
initSparkContext("test_delete_partitions_" + usePartitionsToDeleteConfig) */
val path = java.nio.file.Files.createTempDirectory("hoodie_test_path_delete_partitions") @ParameterizedTest
try { @ValueSource(booleans = Array(true, false))
val hoodieFooTableName = "hoodie_foo_tbl_delete_partitions" def testDeletePartitionsV2(usePartitionsToDeleteConfig: Boolean): Unit = {
val fooTableModifier = getCommonParams(path, hoodieFooTableName, HoodieTableType.COPY_ON_WRITE.name()) val fooTableModifier = getCommonParams(tempPath, hoodieFooTableName, HoodieTableType.COPY_ON_WRITE.name())
var fooTableParams = HoodieWriterUtils.parametersWithWriteDefaults(fooTableModifier) val fooTableParams = HoodieWriterUtils.parametersWithWriteDefaults(fooTableModifier)
val schema = DataSourceTestUtils.getStructTypeExampleSchema val schema = DataSourceTestUtils.getStructTypeExampleSchema
val structType = AvroConversionUtils.convertAvroSchemaToStructType(schema) val structType = AvroConversionUtils.convertAvroSchemaToStructType(schema)
val records = DataSourceTestUtils.generateRandomRows(10) val records = DataSourceTestUtils.generateRandomRows(10)
@@ -694,28 +728,24 @@ class HoodieSparkSqlWriterSuite extends FunSuite with Matchers {
val df1 = spark.createDataFrame(sc.parallelize(recordsSeq), structType) val df1 = spark.createDataFrame(sc.parallelize(recordsSeq), structType)
// write to Hudi // write to Hudi
HoodieSparkSqlWriter.write(sqlContext, SaveMode.Overwrite, fooTableParams, df1) HoodieSparkSqlWriter.write(sqlContext, SaveMode.Overwrite, fooTableParams, df1)
val snapshotDF1 = spark.read.format("org.apache.hudi") val snapshotDF1 = spark.read.format("org.apache.hudi")
.load(path.toAbsolutePath.toString + "/*/*/*/*") .load(tempBasePath + "/*/*/*/*")
assertEquals(10, snapshotDF1.count()) assertEquals(10, snapshotDF1.count())
// remove metadata columns so that expected and actual DFs can be compared as is // remove metadata columns so that expected and actual DFs can be compared as is
val trimmedDf1 = dropMetaFields(snapshotDF1) val trimmedDf1 = dropMetaFields(snapshotDF1)
assert(df1.except(trimmedDf1).count() == 0) assert(df1.except(trimmedDf1).count() == 0)
// issue updates so that log files are created for MOR table // issue updates so that log files are created for MOR table
var updatesSeq = convertRowListToSeq(DataSourceTestUtils.generateUpdates(records, 5)) val updatesSeq = convertRowListToSeq(DataSourceTestUtils.generateUpdates(records, 5))
var updatesDf = spark.createDataFrame(sc.parallelize(updatesSeq), structType) val updatesDf = spark.createDataFrame(sc.parallelize(updatesSeq), structType)
// write updates to Hudi // write updates to Hudi
HoodieSparkSqlWriter.write(sqlContext, SaveMode.Append, fooTableParams, updatesDf) HoodieSparkSqlWriter.write(sqlContext, SaveMode.Append, fooTableParams, updatesDf)
val snapshotDF2 = spark.read.format("org.apache.hudi") val snapshotDF2 = spark.read.format("org.apache.hudi")
.load(path.toAbsolutePath.toString + "/*/*/*/*") .load(tempBasePath + "/*/*/*/*")
assertEquals(10, snapshotDF2.count()) assertEquals(10, snapshotDF2.count())
// remove metadata columns so that expected and actual DFs can be compared as is // remove metadata columns so that expected and actual DFs can be compared as is
val trimmedDf2 = dropMetaFields(snapshotDF2) val trimmedDf2 = dropMetaFields(snapshotDF2)
// ensure 2nd batch of updates matches. // ensure 2nd batch of updates matches.
assert(updatesDf.intersect(trimmedDf2).except(updatesDf).count() == 0) assert(updatesDf.intersect(trimmedDf2).except(updatesDf).count() == 0)
if (usePartitionsToDeleteConfig) { if (usePartitionsToDeleteConfig) {
fooTableParams.updated(DataSourceWriteOptions.PARTITIONS_TO_DELETE.key(), HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH) fooTableParams.updated(DataSourceWriteOptions.PARTITIONS_TO_DELETE.key(), HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH)
} }
@@ -727,80 +757,43 @@ class HoodieSparkSqlWriterSuite extends FunSuite with Matchers {
}) })
val updatedParams = fooTableParams.updated(DataSourceWriteOptions.OPERATION.key(), WriteOperationType.DELETE_PARTITION.name()) val updatedParams = fooTableParams.updated(DataSourceWriteOptions.OPERATION.key(), WriteOperationType.DELETE_PARTITION.name())
HoodieSparkSqlWriter.write(sqlContext, SaveMode.Append, updatedParams, recordsToDelete) HoodieSparkSqlWriter.write(sqlContext, SaveMode.Append, updatedParams, recordsToDelete)
val snapshotDF3 = spark.read.format("org.apache.hudi") val snapshotDF3 = spark.read.format("org.apache.hudi")
.load(path.toAbsolutePath.toString + "/*/*/*/*") .load(tempBasePath + "/*/*/*/*")
assertEquals(0, snapshotDF3.filter(entry => { assertEquals(0, snapshotDF3.filter(entry => {
val partitionPath = entry.getString(3) val partitionPath = entry.getString(3)
!partitionPath.equals(HoodieTestDataGenerator.DEFAULT_THIRD_PARTITION_PATH) !partitionPath.equals(HoodieTestDataGenerator.DEFAULT_THIRD_PARTITION_PATH)
}).count()) }).count())
} finally {
spark.stop()
FileUtils.deleteDirectory(path.toFile)
}
}
})
def dropMetaFields(df: Dataset[Row]) : Dataset[Row] = {
df.drop(HoodieRecord.HOODIE_META_COLUMNS.get(0)).drop(HoodieRecord.HOODIE_META_COLUMNS.get(1))
.drop(HoodieRecord.HOODIE_META_COLUMNS.get(2)).drop(HoodieRecord.HOODIE_META_COLUMNS.get(3))
.drop(HoodieRecord.HOODIE_META_COLUMNS.get(4))
} }
def getCommonParams(path: java.nio.file.Path, hoodieFooTableName: String, tableType: String) : Map[String, String] = { /**
Map("path" -> path.toAbsolutePath.toString, * Test case for non partition table with metatable support.
HoodieWriteConfig.TBL_NAME.key -> hoodieFooTableName, */
"hoodie.insert.shuffle.parallelism" -> "1", @Test
"hoodie.upsert.shuffle.parallelism" -> "1", def testNonPartitionTableWithMetatableSupport(): Unit = {
DataSourceWriteOptions.TABLE_TYPE.key -> tableType,
DataSourceWriteOptions.RECORDKEY_FIELD.key -> "_row_key",
DataSourceWriteOptions.PARTITIONPATH_FIELD.key -> "partition",
DataSourceWriteOptions.KEYGENERATOR_CLASS_NAME.key -> "org.apache.hudi.keygen.SimpleKeyGenerator")
}
test("test Non partition table with metatable support") {
List(DataSourceWriteOptions.COW_TABLE_TYPE_OPT_VAL, DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL).foreach { tableType => List(DataSourceWriteOptions.COW_TABLE_TYPE_OPT_VAL, DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL).foreach { tableType =>
initSparkContext("testNonPartitionTableWithMetaTable") val options = Map(DataSourceWriteOptions.TABLE_TYPE.key -> tableType,
initSparkContext("test_schema_evolution") DataSourceWriteOptions.PRECOMBINE_FIELD.key -> "col3",
val path = java.nio.file.Files.createTempDirectory("hoodie_test_path") DataSourceWriteOptions.RECORDKEY_FIELD.key -> "keyid",
val basePath = path.toAbsolutePath.toString DataSourceWriteOptions.PARTITIONPATH_FIELD.key -> "",
try { DataSourceWriteOptions.KEYGENERATOR_CLASS_NAME.key -> "org.apache.hudi.keygen.NonpartitionedKeyGenerator",
HoodieWriteConfig.TBL_NAME.key -> "hoodie_test",
"hoodie.insert.shuffle.parallelism" -> "1",
"hoodie.metadata.enable" -> "true")
val df = spark.range(0, 10).toDF("keyid") val df = spark.range(0, 10).toDF("keyid")
.withColumn("col3", expr("keyid")) .withColumn("col3", expr("keyid"))
.withColumn("age", expr("keyid + 1000")) .withColumn("age", expr("keyid + 1000"))
df.write.format("hudi") df.write.format("hudi")
.option(DataSourceWriteOptions.TABLE_TYPE.key, tableType) .options(options.updated(DataSourceWriteOptions.OPERATION.key, "insert"))
.option(DataSourceWriteOptions.PRECOMBINE_FIELD.key, "col3") .mode(SaveMode.Overwrite).save(tempBasePath)
.option(DataSourceWriteOptions.RECORDKEY_FIELD.key, "keyid")
.option(DataSourceWriteOptions.PARTITIONPATH_FIELD.key, "")
.option(DataSourceWriteOptions.KEYGENERATOR_CLASS_NAME.key, "org.apache.hudi.keygen.NonpartitionedKeyGenerator")
.option(DataSourceWriteOptions.OPERATION.key, "insert")
.option("hoodie.insert.shuffle.parallelism", "1")
.option("hoodie.metadata.enable", "true")
.option(HoodieWriteConfig.TBL_NAME.key, "hoodie_test")
.mode(SaveMode.Overwrite).save(basePath)
// upsert same record again // upsert same record again
val df_update = spark.range(0, 10).toDF("keyid") val df_update = spark.range(0, 10).toDF("keyid")
.withColumn("col3", expr("keyid")) .withColumn("col3", expr("keyid"))
.withColumn("age", expr("keyid + 2000")) .withColumn("age", expr("keyid + 2000"))
df_update.write.format("hudi") df_update.write.format("hudi")
.option(DataSourceWriteOptions.TABLE_TYPE.key, tableType) .options(options.updated(DataSourceWriteOptions.OPERATION.key, "upsert"))
.option(DataSourceWriteOptions.PRECOMBINE_FIELD.key, "col3") .mode(SaveMode.Append).save(tempBasePath)
.option(DataSourceWriteOptions.RECORDKEY_FIELD.key, "keyid") assert(spark.read.format("hudi").load(tempBasePath).count() == 10)
.option(DataSourceWriteOptions.PARTITIONPATH_FIELD.key, "") assert(spark.read.format("hudi").load(tempBasePath).where("age >= 2000").count() == 10)
.option(DataSourceWriteOptions.KEYGENERATOR_CLASS_NAME.key, "org.apache.hudi.keygen.NonpartitionedKeyGenerator")
.option(DataSourceWriteOptions.OPERATION.key, "upsert")
.option("hoodie.upsert.shuffle.parallelism", "1")
.option("hoodie.metadata.enable", "true")
.option(HoodieWriteConfig.TBL_NAME.key, "hoodie_test")
.mode(SaveMode.Append).save(basePath)
assert(spark.read.format("hudi").load(basePath).count() == 10)
assert(spark.read.format("hudi").load(basePath).where("age >= 2000").count() == 10)
} finally {
spark.stop()
FileUtils.deleteDirectory(path.toFile)
}
} }
} }
} }