1
0

[HUDI-1806] Honoring skipROSuffix in spark ds (#2882)

* Honoring skipROSuffix in spark ds

* Adding tests

* fixing scala checkstype issue
This commit is contained in:
Sivabalan Narayanan
2021-05-18 19:11:39 -04:00
committed by GitHub
parent 7d2971d4e2
commit 5d1f592395
2 changed files with 37 additions and 3 deletions

View File

@@ -412,6 +412,8 @@ private[hudi] object HoodieSparkSqlWriter {
hiveSyncConfig.hiveUser = parameters(HIVE_USER_OPT_KEY) hiveSyncConfig.hiveUser = parameters(HIVE_USER_OPT_KEY)
hiveSyncConfig.hivePass = parameters(HIVE_PASS_OPT_KEY) hiveSyncConfig.hivePass = parameters(HIVE_PASS_OPT_KEY)
hiveSyncConfig.jdbcUrl = parameters(HIVE_URL_OPT_KEY) hiveSyncConfig.jdbcUrl = parameters(HIVE_URL_OPT_KEY)
hiveSyncConfig.skipROSuffix = parameters.getOrElse(HIVE_SKIP_RO_SUFFIX,
DataSourceWriteOptions.DEFAULT_HIVE_SKIP_RO_SUFFIX_VAL).toBoolean
hiveSyncConfig.partitionFields = hiveSyncConfig.partitionFields =
ListBuffer(parameters(HIVE_PARTITION_FIELDS_OPT_KEY).split(",").map(_.trim).filter(!_.isEmpty).toList: _*) ListBuffer(parameters(HIVE_PARTITION_FIELDS_OPT_KEY).split(",").map(_.trim).filter(!_.isEmpty).toList: _*)
hiveSyncConfig.partitionValueExtractorClass = parameters(HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY) hiveSyncConfig.partitionValueExtractorClass = parameters(HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY)

View File

@@ -20,7 +20,6 @@ package org.apache.hudi.functional
import java.time.Instant import java.time.Instant
import java.util import java.util
import java.util.{Collections, Date, UUID} import java.util.{Collections, Date, UUID}
import org.apache.commons.io.FileUtils import org.apache.commons.io.FileUtils
import org.apache.hadoop.fs.Path import org.apache.hadoop.fs.Path
import org.apache.hudi.DataSourceWriteOptions._ import org.apache.hudi.DataSourceWriteOptions._
@@ -43,7 +42,7 @@ import org.mockito.Mockito.{spy, times, verify}
import org.scalatest.{FunSuite, Matchers} import org.scalatest.{FunSuite, Matchers}
import scala.collection.JavaConversions._ import scala.collection.JavaConversions._
import org.junit.jupiter.api.Assertions.assertEquals import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse, assertTrue}
class HoodieSparkSqlWriterSuite extends FunSuite with Matchers { class HoodieSparkSqlWriterSuite extends FunSuite with Matchers {
@@ -497,6 +496,48 @@ class HoodieSparkSqlWriterSuite extends FunSuite with Matchers {
classOf[SQLConf], classOf[StructType], classOf[Map[_,_]]) classOf[SQLConf], classOf[StructType], classOf[Map[_,_]])
addSqlTablePropertiesMethod.setAccessible(true) addSqlTablePropertiesMethod.setAccessible(true)
val schema = DataSourceTestUtils.getStructTypeExampleSchema
val structType = AvroConversionUtils.convertAvroSchemaToStructType(schema)
val basePath = "/tmp/hoodie_test"
val params = Map(
"path" -> basePath,
DataSourceWriteOptions.TABLE_NAME_OPT_KEY -> "test_hoodie",
DataSourceWriteOptions.HIVE_PARTITION_FIELDS_OPT_KEY -> "partition",
DataSourceWriteOptions.HIVE_SKIP_RO_SUFFIX -> "true"
)
val parameters = HoodieWriterUtils.parametersWithWriteDefaults(params)
val newParams = addSqlTablePropertiesMethod.invoke(HoodieSparkSqlWriter,
spark.sessionState.conf, structType, parameters)
.asInstanceOf[Map[String, String]]
val buildSyncConfigMethod =
HoodieSparkSqlWriter.getClass.getDeclaredMethod("buildSyncConfig", classOf[Path],
classOf[Map[_,_]])
buildSyncConfigMethod.setAccessible(true)
val hiveSyncConfig = buildSyncConfigMethod.invoke(HoodieSparkSqlWriter,
new Path(basePath), newParams).asInstanceOf[HiveSyncConfig]
assertTrue(hiveSyncConfig.skipROSuffix)
assertResult("spark.sql.sources.provider=hudi\n" +
"spark.sql.sources.schema.partCol.0=partition\n" +
"spark.sql.sources.schema.numParts=1\n" +
"spark.sql.sources.schema.numPartCols=1\n" +
"spark.sql.sources.schema.part.0=" +
"{\"type\":\"struct\",\"fields\":[{\"name\":\"_row_key\",\"type\":\"string\",\"nullable\":false,\"metadata\":{}}," +
"{\"name\":\"ts\",\"type\":\"long\",\"nullable\":true,\"metadata\":{}}," +
"{\"name\":\"partition\",\"type\":\"string\",\"nullable\":false,\"metadata\":{}}]}")(hiveSyncConfig.tableProperties)
assertResult("path=/tmp/hoodie_test")(hiveSyncConfig.serdeProperties)
}
test("Test build sync config for skip Ro Suffix vals") {
initSparkContext("test build sync config for skip Ro suffix vals")
val addSqlTablePropertiesMethod =
HoodieSparkSqlWriter.getClass.getDeclaredMethod("addSqlTableProperties",
classOf[SQLConf], classOf[StructType], classOf[Map[_, _]])
addSqlTablePropertiesMethod.setAccessible(true)
val schema = DataSourceTestUtils.getStructTypeExampleSchema val schema = DataSourceTestUtils.getStructTypeExampleSchema
val structType = AvroConversionUtils.convertAvroSchemaToStructType(schema) val structType = AvroConversionUtils.convertAvroSchemaToStructType(schema)
val basePath = "/tmp/hoodie_test" val basePath = "/tmp/hoodie_test"
@@ -518,16 +559,7 @@ class HoodieSparkSqlWriterSuite extends FunSuite with Matchers {
val hiveSyncConfig = buildSyncConfigMethod.invoke(HoodieSparkSqlWriter, val hiveSyncConfig = buildSyncConfigMethod.invoke(HoodieSparkSqlWriter,
new Path(basePath), newParams).asInstanceOf[HiveSyncConfig] new Path(basePath), newParams).asInstanceOf[HiveSyncConfig]
assertResult("spark.sql.sources.provider=hudi\n" + assertFalse(hiveSyncConfig.skipROSuffix)
"spark.sql.sources.schema.partCol.0=partition\n" +
"spark.sql.sources.schema.numParts=1\n" +
"spark.sql.sources.schema.numPartCols=1\n" +
"spark.sql.sources.schema.part.0=" +
"{\"type\":\"struct\",\"fields\":[{\"name\":\"_row_key\",\"type\":\"string\",\"nullable\":false,\"metadata\":{}}," +
"{\"name\":\"ts\",\"type\":\"long\",\"nullable\":true,\"metadata\":{}}," +
"{\"name\":\"partition\",\"type\":\"string\",\"nullable\":false,\"metadata\":{}}]}")(hiveSyncConfig.tableProperties)
assertResult("path=/tmp/hoodie_test")(hiveSyncConfig.serdeProperties)
} }
case class Test(uuid: String, ts: Long) case class Test(uuid: String, ts: Long)