1
0

[HUDI-1774] Adding support for delete_partitions to spark data source (#3437)

This commit is contained in:
Sivabalan Narayanan
2021-08-11 01:03:01 -04:00
committed by GitHub
parent a5e496fe23
commit c9fa3cffaf
5 changed files with 225 additions and 190 deletions

View File

@@ -29,8 +29,9 @@ import org.apache.hudi.client.{HoodieWriteResult, SparkRDDWriteClient}
import org.apache.hudi.common.config.{HoodieConfig, HoodieMetadataConfig, TypedProperties}
import org.apache.hudi.common.fs.FSUtils
import org.apache.hudi.common.model.{HoodieRecordPayload, HoodieTableType, WriteOperationType}
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline
import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient, TableSchemaResolver}
import org.apache.hudi.common.table.timeline.{HoodieActiveTimeline, HoodieTimeline}
import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient}
import org.apache.hudi.common.util.{CommitUtils, ReflectionUtils}
import org.apache.hudi.config.HoodieBootstrapConfig.{BOOTSTRAP_BASE_PATH_PROP, BOOTSTRAP_INDEX_CLASS_PROP}
import org.apache.hudi.config.{HoodieInternalConfig, HoodieWriteConfig}
@@ -154,97 +155,110 @@ object HoodieSparkSqlWriter {
}
// scalastyle:on
val reconcileSchema = parameters(DataSourceWriteOptions.RECONCILE_SCHEMA.key()).toBoolean
val (writeResult, writeClient: SparkRDDWriteClient[HoodieRecordPayload[Nothing]]) =
if (operation != WriteOperationType.DELETE) {
// register classes & schemas
val (structName, nameSpace) = AvroConversionUtils.getAvroRecordNameAndNamespace(tblName)
sparkContext.getConf.registerKryoClasses(
Array(classOf[org.apache.avro.generic.GenericData],
classOf[org.apache.avro.Schema]))
var schema = AvroConversionUtils.convertStructTypeToAvroSchema(df.schema, structName, nameSpace)
val reconcileSchema = parameters(DataSourceWriteOptions.RECONCILE_SCHEMA.key()).toBoolean
if (reconcileSchema) {
schema = getLatestTableSchema(fs, basePath, sparkContext, schema)
}
sparkContext.getConf.registerAvroSchemas(schema)
log.info(s"Registered avro schema : ${schema.toString(true)}")
operation match {
case WriteOperationType.DELETE => {
val genericRecords = registerKryoClassesAndGetGenericRecords(tblName, sparkContext, df, reconcileSchema)
// Convert to RDD[HoodieKey]
val hoodieKeysToDelete = genericRecords.map(gr => keyGenerator.getKey(gr)).toJavaRDD()
// Convert to RDD[HoodieRecord]
val genericRecords: RDD[GenericRecord] = HoodieSparkUtils.createRdd(df, structName, nameSpace, reconcileSchema,
org.apache.hudi.common.util.Option.of(schema))
val shouldCombine = parameters(INSERT_DROP_DUPS.key()).toBoolean ||
operation.equals(WriteOperationType.UPSERT) ||
parameters.getOrElse(HoodieWriteConfig.COMBINE_BEFORE_INSERT_PROP.key(),
HoodieWriteConfig.COMBINE_BEFORE_INSERT_PROP.defaultValue()).toBoolean
val hoodieAllIncomingRecords = genericRecords.map(gr => {
val hoodieRecord = if (shouldCombine) {
val orderingVal = HoodieAvroUtils.getNestedFieldVal(gr, hoodieConfig.getString(PRECOMBINE_FIELD), false)
.asInstanceOf[Comparable[_]]
DataSourceUtils.createHoodieRecord(gr,
orderingVal, keyGenerator.getKey(gr),
hoodieConfig.getString(PAYLOAD_CLASS))
} else {
DataSourceUtils.createHoodieRecord(gr, keyGenerator.getKey(gr), hoodieConfig.getString(PAYLOAD_CLASS))
if (!tableExists) {
throw new HoodieException(s"hoodie table at $basePath does not exist")
}
hoodieRecord
}).toJavaRDD()
// Create a HoodieWriteClient & issue the write.
val client = hoodieWriteClient.getOrElse(DataSourceUtils.createHoodieClient(jsc, schema.toString, path.get,
tblName, mapAsJavaMap(parameters - HoodieWriteConfig.HOODIE_AUTO_COMMIT_PROP.key)
)).asInstanceOf[SparkRDDWriteClient[HoodieRecordPayload[Nothing]]]
if (isAsyncCompactionEnabled(client, tableConfig, parameters, jsc.hadoopConfiguration())) {
asyncCompactionTriggerFn.get.apply(client)
}
if (isAsyncClusteringEnabled(client, parameters)) {
asyncClusteringTriggerFn.get.apply(client)
}
val hoodieRecords =
if (hoodieConfig.getBoolean(INSERT_DROP_DUPS)) {
DataSourceUtils.dropDuplicates(jsc, hoodieAllIncomingRecords, mapAsJavaMap(parameters))
} else {
hoodieAllIncomingRecords
}
client.startCommitWithTime(instantTime, commitActionType)
val writeResult = DataSourceUtils.doWriteOperation(client, hoodieRecords, instantTime, operation)
(writeResult, client)
} else {
val structName = s"${tblName}_record"
val nameSpace = s"hoodie.${tblName}"
sparkContext.getConf.registerKryoClasses(
Array(classOf[org.apache.avro.generic.GenericData],
classOf[org.apache.avro.Schema]))
// Convert to RDD[HoodieKey]
val genericRecords: RDD[GenericRecord] = HoodieSparkUtils.createRdd(df, structName, nameSpace,
parameters(DataSourceWriteOptions.RECONCILE_SCHEMA.key()).toBoolean)
val hoodieKeysToDelete = genericRecords.map(gr => keyGenerator.getKey(gr)).toJavaRDD()
if (!tableExists) {
throw new HoodieException(s"hoodie table at $basePath does not exist")
}
// Create a HoodieWriteClient & issue the delete.
val client = hoodieWriteClient.getOrElse(DataSourceUtils.createHoodieClient(jsc,
// Create a HoodieWriteClient & issue the delete.
val client = hoodieWriteClient.getOrElse(DataSourceUtils.createHoodieClient(jsc,
null, path.get, tblName,
mapAsJavaMap(parameters - HoodieWriteConfig.HOODIE_AUTO_COMMIT_PROP.key)))
.asInstanceOf[SparkRDDWriteClient[HoodieRecordPayload[Nothing]]]
if (isAsyncCompactionEnabled(client, tableConfig, parameters, jsc.hadoopConfiguration())) {
asyncCompactionTriggerFn.get.apply(client)
}
if (isAsyncCompactionEnabled(client, tableConfig, parameters, jsc.hadoopConfiguration())) {
asyncCompactionTriggerFn.get.apply(client)
}
if (isAsyncClusteringEnabled(client, parameters)) {
asyncClusteringTriggerFn.get.apply(client)
}
if (isAsyncClusteringEnabled(client, parameters)) {
asyncClusteringTriggerFn.get.apply(client)
// Issue deletes
client.startCommitWithTime(instantTime, commitActionType)
val writeStatuses = DataSourceUtils.doDeleteOperation(client, hoodieKeysToDelete, instantTime)
(writeStatuses, client)
}
case WriteOperationType.DELETE_PARTITION => {
val genericRecords = registerKryoClassesAndGetGenericRecords(tblName, sparkContext, df, reconcileSchema)
if (!tableExists) {
throw new HoodieException(s"hoodie table at $basePath does not exist")
}
// Issue deletes
client.startCommitWithTime(instantTime, commitActionType)
val writeStatuses = DataSourceUtils.doDeleteOperation(client, hoodieKeysToDelete, instantTime)
(writeStatuses, client)
// Get list of partitions to delete
val partitionsToDelete = genericRecords.map(gr => keyGenerator.getKey(gr).getPartitionPath).toJavaRDD().distinct().collect()
// Create a HoodieWriteClient & issue the delete.
val client = hoodieWriteClient.getOrElse(DataSourceUtils.createHoodieClient(jsc,
null, path.get, tblName,
mapAsJavaMap(parameters - HoodieWriteConfig.HOODIE_AUTO_COMMIT_PROP.key)))
.asInstanceOf[SparkRDDWriteClient[HoodieRecordPayload[Nothing]]]
// Issue delete partitions
client.startCommitWithTime(instantTime, commitActionType)
val writeStatuses = DataSourceUtils.doDeletePartitionsOperation(client, partitionsToDelete, instantTime)
(writeStatuses, client)
}
case _ => { // any other operation
// register classes & schemas
val (structName, nameSpace) = AvroConversionUtils.getAvroRecordNameAndNamespace(tblName)
sparkContext.getConf.registerKryoClasses(
Array(classOf[org.apache.avro.generic.GenericData],
classOf[org.apache.avro.Schema]))
var schema = AvroConversionUtils.convertStructTypeToAvroSchema(df.schema, structName, nameSpace)
if (reconcileSchema) {
schema = getLatestTableSchema(fs, basePath, sparkContext, schema)
}
sparkContext.getConf.registerAvroSchemas(schema)
log.info(s"Registered avro schema : ${schema.toString(true)}")
// Convert to RDD[HoodieRecord]
val genericRecords: RDD[GenericRecord] = HoodieSparkUtils.createRdd(df, structName, nameSpace, reconcileSchema,
org.apache.hudi.common.util.Option.of(schema))
val shouldCombine = parameters(INSERT_DROP_DUPS.key()).toBoolean ||
operation.equals(WriteOperationType.UPSERT) ||
parameters.getOrElse(HoodieWriteConfig.COMBINE_BEFORE_INSERT_PROP.key(),
HoodieWriteConfig.COMBINE_BEFORE_INSERT_PROP.defaultValue()).toBoolean
val hoodieAllIncomingRecords = genericRecords.map(gr => {
val hoodieRecord = if (shouldCombine) {
val orderingVal = HoodieAvroUtils.getNestedFieldVal(gr, hoodieConfig.getString(PRECOMBINE_FIELD), false)
.asInstanceOf[Comparable[_]]
DataSourceUtils.createHoodieRecord(gr,
orderingVal, keyGenerator.getKey(gr),
hoodieConfig.getString(PAYLOAD_CLASS))
} else {
DataSourceUtils.createHoodieRecord(gr, keyGenerator.getKey(gr), hoodieConfig.getString(PAYLOAD_CLASS))
}
hoodieRecord
}).toJavaRDD()
// Create a HoodieWriteClient & issue the write.
val client = hoodieWriteClient.getOrElse(DataSourceUtils.createHoodieClient(jsc, schema.toString, path.get,
tblName, mapAsJavaMap(parameters - HoodieWriteConfig.HOODIE_AUTO_COMMIT_PROP.key)
)).asInstanceOf[SparkRDDWriteClient[HoodieRecordPayload[Nothing]]]
if (isAsyncCompactionEnabled(client, tableConfig, parameters, jsc.hadoopConfiguration())) {
asyncCompactionTriggerFn.get.apply(client)
}
if (isAsyncClusteringEnabled(client, parameters)) {
asyncClusteringTriggerFn.get.apply(client)
}
val hoodieRecords =
if (hoodieConfig.getBoolean(INSERT_DROP_DUPS)) {
DataSourceUtils.dropDuplicates(jsc, hoodieAllIncomingRecords, mapAsJavaMap(parameters))
} else {
hoodieAllIncomingRecords
}
client.startCommitWithTime(instantTime, commitActionType)
val writeResult = DataSourceUtils.doWriteOperation(client, hoodieRecords, instantTime, operation)
(writeResult, client)
}
}
// Check for errors and commit the write.
@@ -276,6 +290,16 @@ object HoodieSparkSqlWriter {
latestSchema
}
def registerKryoClassesAndGetGenericRecords(tblName: String, sparkContext : SparkContext, df: Dataset[Row],
reconcileSchema: Boolean) : RDD[GenericRecord] = {
val structName = s"${tblName}_record"
val nameSpace = s"hoodie.${tblName}"
sparkContext.getConf.registerKryoClasses(
Array(classOf[org.apache.avro.generic.GenericData],
classOf[org.apache.avro.Schema]))
HoodieSparkUtils.createRdd(df, structName, nameSpace, reconcileSchema)
}
def bootstrap(sqlContext: SQLContext,
mode: SaveMode,
parameters: Map[String, String],