1
0

[HUDI-1363] Provide option to drop partition columns (#3465)

- Co-authored-by: Sivabalan Narayanan <n.siva.b@gmail.com>
This commit is contained in:
Sagar Sumit
2021-08-13 22:31:26 +05:30
committed by GitHub
parent d4c2974eae
commit 9689278014
7 changed files with 87 additions and 20 deletions

View File

@@ -118,11 +118,11 @@ object HoodieSparkSqlWriter {
} else {
// Handle various save modes
handleSaveModes(sqlContext.sparkSession, mode, basePath, tableConfig, tblName, operation, fs)
val partitionColumns = HoodieWriterUtils.getPartitionColumns(keyGenerator)
// Create the table if not present
if (!tableExists) {
val baseFileFormat = hoodieConfig.getStringOrDefault(HoodieTableConfig.HOODIE_BASE_FILE_FORMAT_PROP)
val archiveLogFolder = hoodieConfig.getStringOrDefault(HoodieTableConfig.HOODIE_ARCHIVELOG_FOLDER_PROP)
val partitionColumns = HoodieWriterUtils.getPartitionColumns(keyGenerator)
val recordKeyFields = hoodieConfig.getString(DataSourceWriteOptions.RECORDKEY_FIELD)
val populateMetaFields = parameters.getOrElse(HoodieTableConfig.HOODIE_POPULATE_META_FIELDS.key(), HoodieTableConfig.HOODIE_POPULATE_META_FIELDS.defaultValue()).toBoolean
@@ -143,14 +143,14 @@ object HoodieSparkSqlWriter {
}
val commitActionType = CommitUtils.getCommitActionType(operation, tableConfig.getTableType)
val dropPartitionColumns = hoodieConfig.getBoolean(DataSourceWriteOptions.DROP_PARTITION_COLUMNS)
// short-circuit if bulk_insert via row is enabled.
// scalastyle:off
if (hoodieConfig.getBoolean(ENABLE_ROW_WRITER) &&
operation == WriteOperationType.BULK_INSERT) {
val (success, commitTime: common.util.Option[String]) = bulkInsertAsRow(sqlContext, parameters, df, tblName,
basePath, path, instantTime, parameters.getOrElse(HoodieTableConfig.HOODIE_POPULATE_META_FIELDS.key(),
HoodieTableConfig.HOODIE_POPULATE_META_FIELDS.defaultValue()).toBoolean)
basePath, path, instantTime, partitionColumns)
return (success, commitTime, common.util.Option.empty(), common.util.Option.empty(), hoodieWriteClient.orNull, tableConfig)
}
// scalastyle:on
@@ -224,20 +224,22 @@ object HoodieSparkSqlWriter {
parameters.getOrElse(HoodieWriteConfig.COMBINE_BEFORE_INSERT.key(),
HoodieWriteConfig.COMBINE_BEFORE_INSERT.defaultValue()).toBoolean
val hoodieAllIncomingRecords = genericRecords.map(gr => {
val processedRecord = getProcessedRecord(partitionColumns, gr, dropPartitionColumns)
val hoodieRecord = if (shouldCombine) {
val orderingVal = HoodieAvroUtils.getNestedFieldVal(gr, hoodieConfig.getString(PRECOMBINE_FIELD), false)
.asInstanceOf[Comparable[_]]
DataSourceUtils.createHoodieRecord(gr,
DataSourceUtils.createHoodieRecord(processedRecord,
orderingVal, keyGenerator.getKey(gr),
hoodieConfig.getString(PAYLOAD_CLASS))
} else {
DataSourceUtils.createHoodieRecord(gr, keyGenerator.getKey(gr), hoodieConfig.getString(PAYLOAD_CLASS))
DataSourceUtils.createHoodieRecord(processedRecord, keyGenerator.getKey(gr), hoodieConfig.getString(PAYLOAD_CLASS))
}
hoodieRecord
}).toJavaRDD()
val writeSchema = if (dropPartitionColumns) generateSchemaWithoutPartitionColumns(partitionColumns, schema) else schema
// Create a HoodieWriteClient & issue the write.
val client = hoodieWriteClient.getOrElse(DataSourceUtils.createHoodieClient(jsc, schema.toString, path.get,
val client = hoodieWriteClient.getOrElse(DataSourceUtils.createHoodieClient(jsc, writeSchema.toString, path.get,
tblName, mapAsJavaMap(parameters - HoodieWriteConfig.HOODIE_AUTO_COMMIT.key)
)).asInstanceOf[SparkRDDWriteClient[HoodieRecordPayload[Nothing]]]
@@ -271,6 +273,23 @@ object HoodieSparkSqlWriter {
}
}
def generateSchemaWithoutPartitionColumns(partitionParam: String, schema: Schema): Schema = {
val fieldsToRemove = new util.ArrayList[String]()
partitionParam.split(",").map(partitionField => partitionField.trim)
.filter(s => !s.isEmpty).map(field => fieldsToRemove.add(field))
HoodieAvroUtils.removeFields(schema, fieldsToRemove)
}
def getProcessedRecord(partitionParam: String, record: GenericRecord,
dropPartitionColumns: Boolean): GenericRecord = {
var processedRecord = record
if (dropPartitionColumns) {
val writeSchema = generateSchemaWithoutPartitionColumns(partitionParam, record.getSchema)
processedRecord = HoodieAvroUtils.rewriteRecord(record, writeSchema)
}
processedRecord
}
/**
* Checks if schema needs upgrade (if incoming record's write schema is old while table schema got evolved).
*
@@ -379,14 +398,21 @@ object HoodieSparkSqlWriter {
basePath: Path,
path: Option[String],
instantTime: String,
populateMetaFields: Boolean): (Boolean, common.util.Option[String]) = {
partitionColumns: String): (Boolean, common.util.Option[String]) = {
val sparkContext = sqlContext.sparkContext
val populateMetaFields = parameters.getOrElse(HoodieTableConfig.HOODIE_POPULATE_META_FIELDS.key(),
HoodieTableConfig.HOODIE_POPULATE_META_FIELDS.defaultValue()).toBoolean
val dropPartitionColumns =
parameters.getOrElse(DataSourceWriteOptions.DROP_PARTITION_COLUMNS.key(), DataSourceWriteOptions.DROP_PARTITION_COLUMNS.defaultValue()).toBoolean
// register classes & schemas
val (structName, nameSpace) = AvroConversionUtils.getAvroRecordNameAndNamespace(tblName)
sparkContext.getConf.registerKryoClasses(
Array(classOf[org.apache.avro.generic.GenericData],
classOf[org.apache.avro.Schema]))
val schema = AvroConversionUtils.convertStructTypeToAvroSchema(df.schema, structName, nameSpace)
var schema = AvroConversionUtils.convertStructTypeToAvroSchema(df.schema, structName, nameSpace)
if (dropPartitionColumns) {
schema = generateSchemaWithoutPartitionColumns(partitionColumns, schema)
}
sparkContext.getConf.registerAvroSchemas(schema)
log.info(s"Registered avro schema : ${schema.toString(true)}")
if (parameters(INSERT_DROP_DUPS.key).toBoolean) {
@@ -415,7 +441,7 @@ object HoodieSparkSqlWriter {
}
val hoodieDF = if (populateMetaFields) {
HoodieDatasetBulkInsertHelper.prepareHoodieDatasetForBulkInsert(sqlContext, writeConfig, df, structName, nameSpace,
bulkInsertPartitionerRows, isGlobalIndex)
bulkInsertPartitionerRows, isGlobalIndex, dropPartitionColumns)
} else {
HoodieDatasetBulkInsertHelper.prepareHoodieDatasetForBulkInsertWithoutMetaFields(df)
}

View File

@@ -76,7 +76,8 @@ object HoodieWriterUtils {
INLINE_CLUSTERING_ENABLE.key -> INLINE_CLUSTERING_ENABLE.defaultValue,
ASYNC_CLUSTERING_ENABLE.key -> ASYNC_CLUSTERING_ENABLE.defaultValue,
ENABLE_ROW_WRITER.key -> ENABLE_ROW_WRITER.defaultValue,
RECONCILE_SCHEMA.key -> RECONCILE_SCHEMA.defaultValue.toString
RECONCILE_SCHEMA.key -> RECONCILE_SCHEMA.defaultValue.toString,
DROP_PARTITION_COLUMNS.key -> DROP_PARTITION_COLUMNS.defaultValue
) ++ DataSourceOptionsHelper.translateConfigurations(parameters)
}