diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java index c992ae537..4aeee1cbe 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java @@ -27,6 +27,7 @@ import org.apache.hudi.common.config.HoodieMetadataConfig; import org.apache.hudi.common.engine.EngineType; import org.apache.hudi.common.fs.ConsistencyGuardConfig; import org.apache.hudi.common.model.HoodieCleaningPolicy; +import org.apache.hudi.common.model.HoodieFileFormat; import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy; import org.apache.hudi.common.model.OverwriteWithLatestAvroPayload; import org.apache.hudi.common.model.WriteConcurrencyMode; @@ -112,6 +113,12 @@ public class HoodieWriteConfig extends HoodieConfig { .withDocumentation("Controls the layout of the timeline. Version 0 relied on renames, Version 1 (default) models " + "the timeline as an immutable log relying only on atomic writes for object storage."); + public static final ConfigProperty BASE_FILE_FORMAT = ConfigProperty + .key("hoodie.table.base.file.format") + .defaultValue(HoodieFileFormat.PARQUET) + .withAlternatives("hoodie.table.ro.file.format") + .withDocumentation(""); + public static final ConfigProperty BASE_PATH_PROP = ConfigProperty .key("hoodie.base.path") .noDefaultValue() diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala index 858d0284e..427209b05 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala @@ -116,12 +116,14 @@ object HoodieSparkSqlWriter { handleSaveModes(mode, basePath, tableConfig, tblName, operation, fs) // Create the table if not present if (!tableExists) { + val baseFileFormat = hoodieConfig.getStringOrDefault(HoodieTableConfig.HOODIE_BASE_FILE_FORMAT_PROP) val archiveLogFolder = hoodieConfig.getStringOrDefault(HoodieTableConfig.HOODIE_ARCHIVELOG_FOLDER_PROP) val partitionColumns = HoodieWriterUtils.getPartitionColumns(keyGenerator) val tableMetaClient = HoodieTableMetaClient.withPropertyBuilder() .setTableType(tableType) .setTableName(tblName) + .setBaseFileFormat(baseFileFormat) .setArchiveLogFolder(archiveLogFolder) .setPayloadClassName(hoodieConfig.getString(PAYLOAD_CLASS_OPT_KEY)) .setPreCombineField(hoodieConfig.getStringOrDefault(PRECOMBINE_FIELD_OPT_KEY, null)) diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/HoodieSparkSqlWriterSuite.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/HoodieSparkSqlWriterSuite.scala index 6f5f6e699..9734862a4 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/HoodieSparkSqlWriterSuite.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/HoodieSparkSqlWriterSuite.scala @@ -25,7 +25,7 @@ import org.apache.hadoop.fs.Path import org.apache.hudi.DataSourceWriteOptions._ import org.apache.hudi.client.{SparkRDDWriteClient, TestBootstrap} import org.apache.hudi.common.config.HoodieConfig -import org.apache.hudi.common.model.{HoodieRecord, HoodieRecordPayload} +import org.apache.hudi.common.model.{HoodieFileFormat, HoodieRecord, HoodieRecordPayload} import org.apache.hudi.common.testutils.HoodieTestDataGenerator import org.apache.hudi.config.{HoodieBootstrapConfig, HoodieWriteConfig} import org.apache.hudi.exception.HoodieException @@ -38,7 +38,7 @@ import org.apache.spark.SparkContext import org.apache.spark.api.java.JavaSparkContext import org.apache.spark.sql.functions.{expr, lit} import org.apache.spark.sql.internal.{SQLConf, StaticSQLConf} -import org.apache.spark.sql.{Row, SQLContext, SaveMode, SparkSession} +import org.apache.spark.sql.{DataFrame, Row, SQLContext, SaveMode, SparkSession} import org.mockito.ArgumentMatchers.any import org.mockito.Mockito.{spy, times, verify} import org.scalatest.{FunSuite, Matchers} @@ -327,9 +327,12 @@ class HoodieSparkSqlWriterSuite extends FunSuite with Matchers { } } - List(DataSourceWriteOptions.COW_TABLE_TYPE_OPT_VAL, DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL) - .foreach(tableType => { - test("test basic HoodieSparkSqlWriter functionality with datasource insert for " + tableType) { + List((DataSourceWriteOptions.COW_TABLE_TYPE_OPT_VAL, HoodieFileFormat.PARQUET.name()), (DataSourceWriteOptions.COW_TABLE_TYPE_OPT_VAL, HoodieFileFormat.ORC.name()), + (DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL, HoodieFileFormat.PARQUET.name()), (DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL, HoodieFileFormat.ORC.name())) + .foreach(t => { + val tableType = t._1 + val baseFileFormat = t._2 + test("test basic HoodieSparkSqlWriter functionality with datasource insert for " + tableType + " with " + baseFileFormat + "as the base file format") { initSparkContext("test_insert_datasource") val path = java.nio.file.Files.createTempDirectory("hoodie_test_path") try { @@ -339,6 +342,7 @@ class HoodieSparkSqlWriterSuite extends FunSuite with Matchers { //create a new table val fooTableModifier = Map("path" -> path.toAbsolutePath.toString, HoodieWriteConfig.TABLE_NAME.key -> hoodieFooTableName, + HoodieWriteConfig.BASE_FILE_FORMAT.key -> baseFileFormat, DataSourceWriteOptions.TABLE_TYPE_OPT_KEY.key -> tableType, HoodieWriteConfig.INSERT_PARALLELISM.key -> "4", DataSourceWriteOptions.OPERATION_OPT_KEY.key -> DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL, @@ -380,7 +384,12 @@ class HoodieSparkSqlWriterSuite extends FunSuite with Matchers { } // fetch all records from parquet files generated from write to hudi - val actualDf = sqlContext.read.parquet(fullPartitionPaths(0), fullPartitionPaths(1), fullPartitionPaths(2)) + var actualDf : DataFrame = null + if (baseFileFormat.equalsIgnoreCase(HoodieFileFormat.PARQUET.name())) { + actualDf = sqlContext.read.parquet(fullPartitionPaths(0), fullPartitionPaths(1), fullPartitionPaths(2)) + } else if (baseFileFormat.equalsIgnoreCase(HoodieFileFormat.ORC.name())) { + actualDf = sqlContext.read.orc(fullPartitionPaths(0), fullPartitionPaths(1), fullPartitionPaths(2)) + } // remove metadata columns so that expected and actual DFs can be compared as is val trimmedDf = actualDf.drop(HoodieRecord.HOODIE_META_COLUMNS.get(0)).drop(HoodieRecord.HOODIE_META_COLUMNS.get(1))