diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala index 3a5b51e2f..9653e6f19 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala @@ -412,6 +412,8 @@ private[hudi] object HoodieSparkSqlWriter { hiveSyncConfig.hiveUser = parameters(HIVE_USER_OPT_KEY) hiveSyncConfig.hivePass = parameters(HIVE_PASS_OPT_KEY) hiveSyncConfig.jdbcUrl = parameters(HIVE_URL_OPT_KEY) + hiveSyncConfig.skipROSuffix = parameters.getOrElse(HIVE_SKIP_RO_SUFFIX, + DataSourceWriteOptions.DEFAULT_HIVE_SKIP_RO_SUFFIX_VAL).toBoolean hiveSyncConfig.partitionFields = ListBuffer(parameters(HIVE_PARTITION_FIELDS_OPT_KEY).split(",").map(_.trim).filter(!_.isEmpty).toList: _*) hiveSyncConfig.partitionValueExtractorClass = parameters(HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY) diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/HoodieSparkSqlWriterSuite.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/HoodieSparkSqlWriterSuite.scala index 269c4acdf..d53b59e68 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/HoodieSparkSqlWriterSuite.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/HoodieSparkSqlWriterSuite.scala @@ -20,7 +20,6 @@ package org.apache.hudi.functional import java.time.Instant import java.util import java.util.{Collections, Date, UUID} - import org.apache.commons.io.FileUtils import org.apache.hadoop.fs.Path import org.apache.hudi.DataSourceWriteOptions._ @@ -43,7 +42,7 @@ import org.mockito.Mockito.{spy, times, verify} import org.scalatest.{FunSuite, Matchers} import scala.collection.JavaConversions._ -import org.junit.jupiter.api.Assertions.assertEquals +import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse, assertTrue} class HoodieSparkSqlWriterSuite extends FunSuite with Matchers { @@ -503,7 +502,8 @@ class HoodieSparkSqlWriterSuite extends FunSuite with Matchers { val params = Map( "path" -> basePath, DataSourceWriteOptions.TABLE_NAME_OPT_KEY -> "test_hoodie", - DataSourceWriteOptions.HIVE_PARTITION_FIELDS_OPT_KEY -> "partition" + DataSourceWriteOptions.HIVE_PARTITION_FIELDS_OPT_KEY -> "partition", + DataSourceWriteOptions.HIVE_SKIP_RO_SUFFIX -> "true" ) val parameters = HoodieWriterUtils.parametersWithWriteDefaults(params) val newParams = addSqlTablePropertiesMethod.invoke(HoodieSparkSqlWriter, @@ -518,6 +518,7 @@ class HoodieSparkSqlWriterSuite extends FunSuite with Matchers { val hiveSyncConfig = buildSyncConfigMethod.invoke(HoodieSparkSqlWriter, new Path(basePath), newParams).asInstanceOf[HiveSyncConfig] + assertTrue(hiveSyncConfig.skipROSuffix) assertResult("spark.sql.sources.provider=hudi\n" + "spark.sql.sources.schema.partCol.0=partition\n" + "spark.sql.sources.schema.numParts=1\n" + @@ -530,6 +531,37 @@ class HoodieSparkSqlWriterSuite extends FunSuite with Matchers { assertResult("path=/tmp/hoodie_test")(hiveSyncConfig.serdeProperties) } + test("Test build sync config for skip Ro Suffix vals") { + initSparkContext("test build sync config for skip Ro suffix vals") + val addSqlTablePropertiesMethod = + HoodieSparkSqlWriter.getClass.getDeclaredMethod("addSqlTableProperties", + classOf[SQLConf], classOf[StructType], classOf[Map[_, _]]) + addSqlTablePropertiesMethod.setAccessible(true) + + val schema = DataSourceTestUtils.getStructTypeExampleSchema + val structType = AvroConversionUtils.convertAvroSchemaToStructType(schema) + val basePath = "/tmp/hoodie_test" + val params = Map( + "path" -> basePath, + DataSourceWriteOptions.TABLE_NAME_OPT_KEY -> "test_hoodie", + DataSourceWriteOptions.HIVE_PARTITION_FIELDS_OPT_KEY -> "partition" + ) + val parameters = HoodieWriterUtils.parametersWithWriteDefaults(params) + val newParams = addSqlTablePropertiesMethod.invoke(HoodieSparkSqlWriter, + spark.sessionState.conf, structType, parameters) + .asInstanceOf[Map[String, String]] + + val buildSyncConfigMethod = + HoodieSparkSqlWriter.getClass.getDeclaredMethod("buildSyncConfig", classOf[Path], + classOf[Map[_, _]]) + buildSyncConfigMethod.setAccessible(true) + + val hiveSyncConfig = buildSyncConfigMethod.invoke(HoodieSparkSqlWriter, + new Path(basePath), newParams).asInstanceOf[HiveSyncConfig] + + assertFalse(hiveSyncConfig.skipROSuffix) + } + case class Test(uuid: String, ts: Long) import scala.collection.JavaConverters