[MINOR] Allow users to choose ORC as base file format in Spark SQL (#3279)
This commit is contained in:
@@ -27,6 +27,7 @@ import org.apache.hudi.common.config.HoodieMetadataConfig;
|
|||||||
import org.apache.hudi.common.engine.EngineType;
|
import org.apache.hudi.common.engine.EngineType;
|
||||||
import org.apache.hudi.common.fs.ConsistencyGuardConfig;
|
import org.apache.hudi.common.fs.ConsistencyGuardConfig;
|
||||||
import org.apache.hudi.common.model.HoodieCleaningPolicy;
|
import org.apache.hudi.common.model.HoodieCleaningPolicy;
|
||||||
|
import org.apache.hudi.common.model.HoodieFileFormat;
|
||||||
import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy;
|
import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy;
|
||||||
import org.apache.hudi.common.model.OverwriteWithLatestAvroPayload;
|
import org.apache.hudi.common.model.OverwriteWithLatestAvroPayload;
|
||||||
import org.apache.hudi.common.model.WriteConcurrencyMode;
|
import org.apache.hudi.common.model.WriteConcurrencyMode;
|
||||||
@@ -112,6 +113,12 @@ public class HoodieWriteConfig extends HoodieConfig {
|
|||||||
.withDocumentation("Controls the layout of the timeline. Version 0 relied on renames, Version 1 (default) models "
|
.withDocumentation("Controls the layout of the timeline. Version 0 relied on renames, Version 1 (default) models "
|
||||||
+ "the timeline as an immutable log relying only on atomic writes for object storage.");
|
+ "the timeline as an immutable log relying only on atomic writes for object storage.");
|
||||||
|
|
||||||
|
public static final ConfigProperty<HoodieFileFormat> BASE_FILE_FORMAT = ConfigProperty
|
||||||
|
.key("hoodie.table.base.file.format")
|
||||||
|
.defaultValue(HoodieFileFormat.PARQUET)
|
||||||
|
.withAlternatives("hoodie.table.ro.file.format")
|
||||||
|
.withDocumentation("");
|
||||||
|
|
||||||
public static final ConfigProperty<String> BASE_PATH_PROP = ConfigProperty
|
public static final ConfigProperty<String> BASE_PATH_PROP = ConfigProperty
|
||||||
.key("hoodie.base.path")
|
.key("hoodie.base.path")
|
||||||
.noDefaultValue()
|
.noDefaultValue()
|
||||||
|
|||||||
@@ -116,12 +116,14 @@ object HoodieSparkSqlWriter {
|
|||||||
handleSaveModes(mode, basePath, tableConfig, tblName, operation, fs)
|
handleSaveModes(mode, basePath, tableConfig, tblName, operation, fs)
|
||||||
// Create the table if not present
|
// Create the table if not present
|
||||||
if (!tableExists) {
|
if (!tableExists) {
|
||||||
|
val baseFileFormat = hoodieConfig.getStringOrDefault(HoodieTableConfig.HOODIE_BASE_FILE_FORMAT_PROP)
|
||||||
val archiveLogFolder = hoodieConfig.getStringOrDefault(HoodieTableConfig.HOODIE_ARCHIVELOG_FOLDER_PROP)
|
val archiveLogFolder = hoodieConfig.getStringOrDefault(HoodieTableConfig.HOODIE_ARCHIVELOG_FOLDER_PROP)
|
||||||
val partitionColumns = HoodieWriterUtils.getPartitionColumns(keyGenerator)
|
val partitionColumns = HoodieWriterUtils.getPartitionColumns(keyGenerator)
|
||||||
|
|
||||||
val tableMetaClient = HoodieTableMetaClient.withPropertyBuilder()
|
val tableMetaClient = HoodieTableMetaClient.withPropertyBuilder()
|
||||||
.setTableType(tableType)
|
.setTableType(tableType)
|
||||||
.setTableName(tblName)
|
.setTableName(tblName)
|
||||||
|
.setBaseFileFormat(baseFileFormat)
|
||||||
.setArchiveLogFolder(archiveLogFolder)
|
.setArchiveLogFolder(archiveLogFolder)
|
||||||
.setPayloadClassName(hoodieConfig.getString(PAYLOAD_CLASS_OPT_KEY))
|
.setPayloadClassName(hoodieConfig.getString(PAYLOAD_CLASS_OPT_KEY))
|
||||||
.setPreCombineField(hoodieConfig.getStringOrDefault(PRECOMBINE_FIELD_OPT_KEY, null))
|
.setPreCombineField(hoodieConfig.getStringOrDefault(PRECOMBINE_FIELD_OPT_KEY, null))
|
||||||
|
|||||||
@@ -25,7 +25,7 @@ import org.apache.hadoop.fs.Path
|
|||||||
import org.apache.hudi.DataSourceWriteOptions._
|
import org.apache.hudi.DataSourceWriteOptions._
|
||||||
import org.apache.hudi.client.{SparkRDDWriteClient, TestBootstrap}
|
import org.apache.hudi.client.{SparkRDDWriteClient, TestBootstrap}
|
||||||
import org.apache.hudi.common.config.HoodieConfig
|
import org.apache.hudi.common.config.HoodieConfig
|
||||||
import org.apache.hudi.common.model.{HoodieRecord, HoodieRecordPayload}
|
import org.apache.hudi.common.model.{HoodieFileFormat, HoodieRecord, HoodieRecordPayload}
|
||||||
import org.apache.hudi.common.testutils.HoodieTestDataGenerator
|
import org.apache.hudi.common.testutils.HoodieTestDataGenerator
|
||||||
import org.apache.hudi.config.{HoodieBootstrapConfig, HoodieWriteConfig}
|
import org.apache.hudi.config.{HoodieBootstrapConfig, HoodieWriteConfig}
|
||||||
import org.apache.hudi.exception.HoodieException
|
import org.apache.hudi.exception.HoodieException
|
||||||
@@ -38,7 +38,7 @@ import org.apache.spark.SparkContext
|
|||||||
import org.apache.spark.api.java.JavaSparkContext
|
import org.apache.spark.api.java.JavaSparkContext
|
||||||
import org.apache.spark.sql.functions.{expr, lit}
|
import org.apache.spark.sql.functions.{expr, lit}
|
||||||
import org.apache.spark.sql.internal.{SQLConf, StaticSQLConf}
|
import org.apache.spark.sql.internal.{SQLConf, StaticSQLConf}
|
||||||
import org.apache.spark.sql.{Row, SQLContext, SaveMode, SparkSession}
|
import org.apache.spark.sql.{DataFrame, Row, SQLContext, SaveMode, SparkSession}
|
||||||
import org.mockito.ArgumentMatchers.any
|
import org.mockito.ArgumentMatchers.any
|
||||||
import org.mockito.Mockito.{spy, times, verify}
|
import org.mockito.Mockito.{spy, times, verify}
|
||||||
import org.scalatest.{FunSuite, Matchers}
|
import org.scalatest.{FunSuite, Matchers}
|
||||||
@@ -327,9 +327,12 @@ class HoodieSparkSqlWriterSuite extends FunSuite with Matchers {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
List(DataSourceWriteOptions.COW_TABLE_TYPE_OPT_VAL, DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL)
|
List((DataSourceWriteOptions.COW_TABLE_TYPE_OPT_VAL, HoodieFileFormat.PARQUET.name()), (DataSourceWriteOptions.COW_TABLE_TYPE_OPT_VAL, HoodieFileFormat.ORC.name()),
|
||||||
.foreach(tableType => {
|
(DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL, HoodieFileFormat.PARQUET.name()), (DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL, HoodieFileFormat.ORC.name()))
|
||||||
test("test basic HoodieSparkSqlWriter functionality with datasource insert for " + tableType) {
|
.foreach(t => {
|
||||||
|
val tableType = t._1
|
||||||
|
val baseFileFormat = t._2
|
||||||
|
test("test basic HoodieSparkSqlWriter functionality with datasource insert for " + tableType + " with " + baseFileFormat + "as the base file format") {
|
||||||
initSparkContext("test_insert_datasource")
|
initSparkContext("test_insert_datasource")
|
||||||
val path = java.nio.file.Files.createTempDirectory("hoodie_test_path")
|
val path = java.nio.file.Files.createTempDirectory("hoodie_test_path")
|
||||||
try {
|
try {
|
||||||
@@ -339,6 +342,7 @@ class HoodieSparkSqlWriterSuite extends FunSuite with Matchers {
|
|||||||
//create a new table
|
//create a new table
|
||||||
val fooTableModifier = Map("path" -> path.toAbsolutePath.toString,
|
val fooTableModifier = Map("path" -> path.toAbsolutePath.toString,
|
||||||
HoodieWriteConfig.TABLE_NAME.key -> hoodieFooTableName,
|
HoodieWriteConfig.TABLE_NAME.key -> hoodieFooTableName,
|
||||||
|
HoodieWriteConfig.BASE_FILE_FORMAT.key -> baseFileFormat,
|
||||||
DataSourceWriteOptions.TABLE_TYPE_OPT_KEY.key -> tableType,
|
DataSourceWriteOptions.TABLE_TYPE_OPT_KEY.key -> tableType,
|
||||||
HoodieWriteConfig.INSERT_PARALLELISM.key -> "4",
|
HoodieWriteConfig.INSERT_PARALLELISM.key -> "4",
|
||||||
DataSourceWriteOptions.OPERATION_OPT_KEY.key -> DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL,
|
DataSourceWriteOptions.OPERATION_OPT_KEY.key -> DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL,
|
||||||
@@ -380,7 +384,12 @@ class HoodieSparkSqlWriterSuite extends FunSuite with Matchers {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// fetch all records from parquet files generated from write to hudi
|
// fetch all records from parquet files generated from write to hudi
|
||||||
val actualDf = sqlContext.read.parquet(fullPartitionPaths(0), fullPartitionPaths(1), fullPartitionPaths(2))
|
var actualDf : DataFrame = null
|
||||||
|
if (baseFileFormat.equalsIgnoreCase(HoodieFileFormat.PARQUET.name())) {
|
||||||
|
actualDf = sqlContext.read.parquet(fullPartitionPaths(0), fullPartitionPaths(1), fullPartitionPaths(2))
|
||||||
|
} else if (baseFileFormat.equalsIgnoreCase(HoodieFileFormat.ORC.name())) {
|
||||||
|
actualDf = sqlContext.read.orc(fullPartitionPaths(0), fullPartitionPaths(1), fullPartitionPaths(2))
|
||||||
|
}
|
||||||
|
|
||||||
// remove metadata columns so that expected and actual DFs can be compared as is
|
// remove metadata columns so that expected and actual DFs can be compared as is
|
||||||
val trimmedDf = actualDf.drop(HoodieRecord.HOODIE_META_COLUMNS.get(0)).drop(HoodieRecord.HOODIE_META_COLUMNS.get(1))
|
val trimmedDf = actualDf.drop(HoodieRecord.HOODIE_META_COLUMNS.get(0)).drop(HoodieRecord.HOODIE_META_COLUMNS.get(1))
|
||||||
|
|||||||
Reference in New Issue
Block a user