1
0

[HUDI-328] Adding delete api to HoodieWriteClient (#1004)

[HUDI-328]  Adding delete api to HoodieWriteClient and Spark DataSource
This commit is contained in:
Sivabalan Narayanan
2019-11-22 15:05:25 -08:00
committed by Balaji Varadarajan
parent 7bc08cbfdc
commit c3355109b1
18 changed files with 818 additions and 172 deletions

View File

@@ -92,9 +92,9 @@ public class DataSourceUtils {
/**
* Create a key generator class via reflection, passing in any configs needed.
*
* If the class name of key generator is configured through the properties file, i.e., {@code
* props}, use the corresponding key generator class; otherwise, use the default key generator class specified in
* {@code DataSourceWriteOptions}.
* If the class name of key generator is configured through the properties file, i.e., {@code props}, use the
* corresponding key generator class; otherwise, use the default key generator class specified in {@code
* DataSourceWriteOptions}.
*/
public static KeyGenerator createKeyGenerator(TypedProperties props) throws IOException {
String keyGeneratorClass = props.getString(DataSourceWriteOptions.KEYGENERATOR_CLASS_OPT_KEY(),
@@ -124,7 +124,7 @@ public class DataSourceUtils {
throws IOException {
try {
return (HoodieRecordPayload) ReflectionUtils.loadClass(payloadClass,
new Class<?>[] {GenericRecord.class, Comparable.class}, record, orderingVal);
new Class<?>[]{GenericRecord.class, Comparable.class}, record, orderingVal);
} catch (Throwable e) {
throw new IOException("Could not create payload for class: " + payloadClass, e);
}
@@ -172,6 +172,11 @@ public class DataSourceUtils {
}
}
public static JavaRDD<WriteStatus> doDeleteOperation(HoodieWriteClient client, JavaRDD<HoodieKey> hoodieKeys,
String commitTime) {
return client.delete(hoodieKeys, commitTime);
}
public static HoodieRecord createHoodieRecord(GenericRecord gr, Comparable orderingVal, HoodieKey hKey,
String payloadClass) throws IOException {
HoodieRecordPayload payload = DataSourceUtils.createPayload(payloadClass, gr, orderingVal);

View File

@@ -1,48 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.generic.IndexedRecord;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.util.Option;
/**
* Empty payload used for deletions
*/
public class EmptyHoodieRecordPayload implements HoodieRecordPayload<EmptyHoodieRecordPayload> {
public EmptyHoodieRecordPayload(GenericRecord record, Comparable orderingVal) {}
@Override
public EmptyHoodieRecordPayload preCombine(EmptyHoodieRecordPayload another) {
return another;
}
@Override
public Option<IndexedRecord> combineAndGetUpdateValue(IndexedRecord currentValue, Schema schema) {
return Option.empty();
}
@Override
public Option<IndexedRecord> getInsertValue(Schema schema) {
return Option.empty();
}
}

View File

@@ -20,6 +20,7 @@ package org.apache.hudi
import com.databricks.spark.avro.SchemaConverters
import org.apache.avro.generic.GenericRecord
import org.apache.avro.{Schema, SchemaBuilder}
import org.apache.hudi.common.model.HoodieKey
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.encoders.RowEncoder
import org.apache.spark.sql.types._
@@ -41,6 +42,10 @@ object AvroConversionUtils {
}
}
def createRddForDeletes(df: DataFrame, rowField: String, partitionField: String): RDD[HoodieKey] = {
df.rdd.map(row => (new HoodieKey(row.getAs[String](rowField), row.getAs[String](partitionField))))
}
def createDataFrame(rdd: RDD[GenericRecord], schemaStr: String, ss: SparkSession): Dataset[Row] = {
if (rdd.isEmpty()) {
ss.emptyDataFrame

View File

@@ -66,8 +66,8 @@ object DataSourceReadOptions {
/**
* For use-cases like DeltaStreamer which reads from Hoodie Incremental table and applies opaque map functions,
* filters appearing late in the sequence of transformations cannot be automatically pushed down.
* This option allows setting filters directly on Hoodie Source
* filters appearing late in the sequence of transformations cannot be automatically pushed down.
* This option allows setting filters directly on Hoodie Source
*/
val PUSH_DOWN_INCR_FILTERS_OPT_KEY = "hoodie.datasource.read.incr.filters"
}
@@ -85,6 +85,7 @@ object DataSourceWriteOptions {
val BULK_INSERT_OPERATION_OPT_VAL = "bulk_insert"
val INSERT_OPERATION_OPT_VAL = "insert"
val UPSERT_OPERATION_OPT_VAL = "upsert"
val DELETE_OPERATION_OPT_VAL = "delete"
val DEFAULT_OPERATION_OPT_VAL = UPSERT_OPERATION_OPT_VAL
/**
@@ -152,31 +153,31 @@ object DataSourceWriteOptions {
val DEFAULT_COMMIT_METADATA_KEYPREFIX_OPT_VAL = "_"
/**
* Flag to indicate whether to drop duplicates upon insert.
* By default insert will accept duplicates, to gain extra performance.
*/
* Flag to indicate whether to drop duplicates upon insert.
* By default insert will accept duplicates, to gain extra performance.
*/
val INSERT_DROP_DUPS_OPT_KEY = "hoodie.datasource.write.insert.drop.duplicates"
val DEFAULT_INSERT_DROP_DUPS_OPT_VAL = "false"
/**
* Flag to indicate how many times streaming job should retry for a failed microbatch
* By default 3
*/
* Flag to indicate how many times streaming job should retry for a failed microbatch
* By default 3
*/
val STREAMING_RETRY_CNT_OPT_KEY = "hoodie.datasource.write.streaming.retry.count"
val DEFAULT_STREAMING_RETRY_CNT_OPT_VAL = "3"
/**
* Flag to indicate how long (by millisecond) before a retry should issued for failed microbatch
* By default 2000 and it will be doubled by every retry
*/
* Flag to indicate how long (by millisecond) before a retry should issued for failed microbatch
* By default 2000 and it will be doubled by every retry
*/
val STREAMING_RETRY_INTERVAL_MS_OPT_KEY = "hoodie.datasource.write.streaming.retry.interval.ms"
val DEFAULT_STREAMING_RETRY_INTERVAL_MS_OPT_VAL = "2000"
/**
* Flag to indicate whether to ignore any non exception error (e.g. writestatus error)
* within a streaming microbatch
* By default true (in favor of streaming progressing over data integrity)
*/
* Flag to indicate whether to ignore any non exception error (e.g. writestatus error)
* within a streaming microbatch
* By default true (in favor of streaming progressing over data integrity)
*/
val STREAMING_IGNORE_FAILED_BATCH_OPT_KEY = "hoodie.datasource.write.streaming.ignore.failed.batch"
val DEFAULT_STREAMING_IGNORE_FAILED_BATCH_OPT_VAL = "true"

View File

@@ -19,6 +19,7 @@ package org.apache.hudi
import java.util
import org.apache.avro.Schema
import org.apache.avro.generic.GenericRecord
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hadoop.hive.conf.HiveConf
@@ -29,7 +30,7 @@ import org.apache.hudi.config.HoodieWriteConfig
import org.apache.hudi.exception.HoodieException
import org.apache.hudi.hive.{HiveSyncConfig, HiveSyncTool}
import org.apache.log4j.LogManager
import org.apache.spark.api.java.JavaSparkContext
import org.apache.spark.api.java.{JavaRDD, JavaSparkContext}
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, SQLContext, SaveMode}
@@ -72,131 +73,215 @@ private[hudi] object HoodieSparkSqlWriter {
parameters(OPERATION_OPT_KEY)
}
// register classes & schemas
val structName = s"${tblName.get}_record"
val nameSpace = s"hoodie.${tblName.get}"
sparkContext.getConf.registerKryoClasses(
Array(classOf[org.apache.avro.generic.GenericData],
classOf[org.apache.avro.Schema]))
val schema = AvroConversionUtils.convertStructTypeToAvroSchema(df.schema, structName, nameSpace)
sparkContext.getConf.registerAvroSchemas(schema)
log.info(s"Registered avro schema : ${schema.toString(true)}")
// Convert to RDD[HoodieRecord]
val keyGenerator = DataSourceUtils.createKeyGenerator(toProperties(parameters))
val genericRecords: RDD[GenericRecord] = AvroConversionUtils.createRdd(df, structName, nameSpace)
val hoodieAllIncomingRecords = genericRecords.map(gr => {
val orderingVal = DataSourceUtils.getNestedFieldValAsString(
gr, parameters(PRECOMBINE_FIELD_OPT_KEY)).asInstanceOf[Comparable[_]]
DataSourceUtils.createHoodieRecord(gr,
orderingVal, keyGenerator.getKey(gr), parameters(PAYLOAD_CLASS_OPT_KEY))
}).toJavaRDD()
var writeSuccessful: Boolean = false
var commitTime: String = null
var writeStatuses: JavaRDD[WriteStatus] = null
val jsc = new JavaSparkContext(sparkContext)
val basePath = new Path(parameters("path"))
val fs = basePath.getFileSystem(sparkContext.hadoopConfiguration)
var exists = fs.exists(new Path(basePath, HoodieTableMetaClient.METAFOLDER_NAME))
// Handle various save modes
if (mode == SaveMode.ErrorIfExists && exists) {
throw new HoodieException(s"hoodie dataset at $basePath already exists.")
}
if (mode == SaveMode.Ignore && exists) {
log.warn(s"hoodie dataset at $basePath already exists. Ignoring & not performing actual writes.")
return (true, common.util.Option.empty())
}
if (mode == SaveMode.Overwrite && exists) {
log.warn(s"hoodie dataset at $basePath already exists. Deleting existing data & overwriting with new data.")
fs.delete(basePath, true)
exists = false
}
// Running into issues wrt generic type conversion from Java to Scala. Couldn't make common code paths for
// write and deletes. Specifically, instantiating client of type HoodieWriteClient<T extends HoodieRecordPayload>
// is having issues. Hence some codes blocks are same in both if and else blocks.
if (!operation.equalsIgnoreCase(DELETE_OPERATION_OPT_VAL)) {
// register classes & schemas
val structName = s"${tblName.get}_record"
val nameSpace = s"hoodie.${tblName.get}"
sparkContext.getConf.registerKryoClasses(
Array(classOf[org.apache.avro.generic.GenericData],
classOf[org.apache.avro.Schema]))
val schema = AvroConversionUtils.convertStructTypeToAvroSchema(df.schema, structName, nameSpace)
sparkContext.getConf.registerAvroSchemas(schema)
log.info(s"Registered avro schema : ${schema.toString(true)}")
// Create the dataset if not present
if (!exists) {
HoodieTableMetaClient.initTableType(sparkContext.hadoopConfiguration, path.get, storageType,
tblName.get, "archived")
}
// Convert to RDD[HoodieRecord]
val keyGenerator = DataSourceUtils.createKeyGenerator(toProperties(parameters))
val genericRecords: RDD[GenericRecord] = AvroConversionUtils.createRdd(df, structName, nameSpace)
val hoodieAllIncomingRecords = genericRecords.map(gr => {
val orderingVal = DataSourceUtils.getNestedFieldValAsString(
gr, parameters(PRECOMBINE_FIELD_OPT_KEY)).asInstanceOf[Comparable[_]]
DataSourceUtils.createHoodieRecord(gr,
orderingVal, keyGenerator.getKey(gr), parameters(PAYLOAD_CLASS_OPT_KEY))
}).toJavaRDD()
// Create a HoodieWriteClient & issue the write.
val client = DataSourceUtils.createHoodieClient(jsc, schema.toString, path.get, tblName.get,
mapAsJavaMap(parameters)
)
val hoodieRecords =
if (parameters(INSERT_DROP_DUPS_OPT_KEY).toBoolean) {
DataSourceUtils.dropDuplicates(
jsc,
hoodieAllIncomingRecords,
mapAsJavaMap(parameters), client.getTimelineServer)
} else {
hoodieAllIncomingRecords
// Handle various save modes
if (mode == SaveMode.ErrorIfExists && exists) {
throw new HoodieException(s"hoodie dataset at $basePath already exists.")
}
if (mode == SaveMode.Ignore && exists) {
log.warn(s"hoodie dataset at $basePath already exists. Ignoring & not performing actual writes.")
return (true, common.util.Option.empty())
}
if (mode == SaveMode.Overwrite && exists) {
log.warn(s"hoodie dataset at $basePath already exists. Deleting existing data & overwriting with new data.")
fs.delete(basePath, true)
exists = false
}
if (hoodieRecords.isEmpty()) {
log.info("new batch has no new records, skipping...")
return (true, common.util.Option.empty())
}
val commitTime = client.startCommit()
val writeStatuses = DataSourceUtils.doWriteOperation(client, hoodieRecords, commitTime, operation)
// Check for errors and commit the write.
val errorCount = writeStatuses.rdd.filter(ws => ws.hasErrors).count()
val writeSuccessful =
if (errorCount == 0) {
log.info("No errors. Proceeding to commit the write.")
val metaMap = parameters.filter(kv =>
kv._1.startsWith(parameters(COMMIT_METADATA_KEYPREFIX_OPT_KEY)))
val commitSuccess = if (metaMap.isEmpty) {
client.commit(commitTime, writeStatuses)
} else {
client.commit(commitTime, writeStatuses,
common.util.Option.of(new util.HashMap[String, String](mapAsJavaMap(metaMap))))
// Create the dataset if not present
if (!exists) {
HoodieTableMetaClient.initTableType(sparkContext.hadoopConfiguration, path.get, storageType,
tblName.get, "archived")
}
if (commitSuccess) {
log.info("Commit " + commitTime + " successful!")
}
else {
log.info("Commit " + commitTime + " failed!")
}
// Create a HoodieWriteClient & issue the write.
val client = DataSourceUtils.createHoodieClient(jsc, schema.toString, path.get, tblName.get,
mapAsJavaMap(parameters)
)
val hiveSyncEnabled = parameters.get(HIVE_SYNC_ENABLED_OPT_KEY).exists(r => r.toBoolean)
val syncHiveSucess = if (hiveSyncEnabled) {
log.info("Syncing to Hive Metastore (URL: " + parameters(HIVE_URL_OPT_KEY) + ")")
val fs = FSUtils.getFs(basePath.toString, jsc.hadoopConfiguration)
syncHive(basePath, fs, parameters)
} else {
true
val hoodieRecords =
if (parameters(INSERT_DROP_DUPS_OPT_KEY).toBoolean) {
DataSourceUtils.dropDuplicates(
jsc,
hoodieAllIncomingRecords,
mapAsJavaMap(parameters), client.getTimelineServer)
} else {
hoodieAllIncomingRecords
}
if (hoodieRecords.isEmpty()) {
log.info("new batch has no new records, skipping...")
return (true, common.util.Option.empty())
}
client.close()
commitSuccess && syncHiveSucess
commitTime = client.startCommit()
writeStatuses = DataSourceUtils.doWriteOperation(client, hoodieRecords, commitTime, operation)
// Check for errors and commit the write.
val errorCount = writeStatuses.rdd.filter(ws => ws.hasErrors).count()
writeSuccessful =
if (errorCount == 0) {
log.info("No errors. Proceeding to commit the write.")
val metaMap = parameters.filter(kv =>
kv._1.startsWith(parameters(COMMIT_METADATA_KEYPREFIX_OPT_KEY)))
val commitSuccess = if (metaMap.isEmpty) {
client.commit(commitTime, writeStatuses)
} else {
client.commit(commitTime, writeStatuses,
common.util.Option.of(new util.HashMap[String, String](mapAsJavaMap(metaMap))))
}
if (commitSuccess) {
log.info("Commit " + commitTime + " successful!")
}
else {
log.info("Commit " + commitTime + " failed!")
}
val hiveSyncEnabled = parameters.get(HIVE_SYNC_ENABLED_OPT_KEY).exists(r => r.toBoolean)
val syncHiveSucess = if (hiveSyncEnabled) {
log.info("Syncing to Hive Metastore (URL: " + parameters(HIVE_URL_OPT_KEY) + ")")
val fs = FSUtils.getFs(basePath.toString, jsc.hadoopConfiguration)
syncHive(basePath, fs, parameters)
} else {
true
}
client.close()
commitSuccess && syncHiveSucess
} else {
log.error(s"$operation failed with ${errorCount} errors :");
if (log.isTraceEnabled) {
log.trace("Printing out the top 100 errors")
writeStatuses.rdd.filter(ws => ws.hasErrors)
.take(100)
.foreach(ws => {
log.trace("Global error :", ws.getGlobalError)
if (ws.getErrors.size() > 0) {
ws.getErrors.foreach(kt =>
log.trace(s"Error for key: ${kt._1}", kt._2))
}
})
}
false
}
} else {
log.error(s"$operation failed with ${errorCount} errors :");
if (log.isTraceEnabled) {
log.trace("Printing out the top 100 errors")
writeStatuses.rdd.filter(ws => ws.hasErrors)
.take(100)
.foreach(ws => {
log.trace("Global error :", ws.getGlobalError)
if (ws.getErrors.size() > 0) {
ws.getErrors.foreach(kt =>
log.trace(s"Error for key: ${kt._1}", kt._2))
}
})
// Handle save modes
if (mode != SaveMode.Append) {
throw new HoodieException(s"Append is the only save mode applicable for $operation operation")
}
false
val structName = s"${tblName.get}_record"
val nameSpace = s"hoodie.${tblName.get}"
sparkContext.getConf.registerKryoClasses(
Array(classOf[org.apache.avro.generic.GenericData],
classOf[org.apache.avro.Schema]))
// Convert to RDD[HoodieKey]
val keyGenerator = DataSourceUtils.createKeyGenerator(toProperties(parameters))
val genericRecords: RDD[GenericRecord] = AvroConversionUtils.createRdd(df, structName, nameSpace)
val hoodieKeysToDelete = genericRecords.map(gr => keyGenerator.getKey(gr)).toJavaRDD()
if (!exists) {
throw new HoodieException(s"hoodie dataset at $basePath does not exist")
}
// Create a HoodieWriteClient & issue the delete.
val client = DataSourceUtils.createHoodieClient(jsc,
Schema.create(Schema.Type.NULL).toString, path.get, tblName.get,
mapAsJavaMap(parameters)
)
// Issue deletes
commitTime = client.startCommit()
writeStatuses = DataSourceUtils.doDeleteOperation(client, hoodieKeysToDelete, commitTime)
val errorCount = writeStatuses.rdd.filter(ws => ws.hasErrors).count()
writeSuccessful =
if (errorCount == 0) {
log.info("No errors. Proceeding to commit the write.")
val metaMap = parameters.filter(kv =>
kv._1.startsWith(parameters(COMMIT_METADATA_KEYPREFIX_OPT_KEY)))
val commitSuccess = if (metaMap.isEmpty) {
client.commit(commitTime, writeStatuses)
} else {
client.commit(commitTime, writeStatuses,
common.util.Option.of(new util.HashMap[String, String](mapAsJavaMap(metaMap))))
}
if (commitSuccess) {
log.info("Commit " + commitTime + " successful!")
}
else {
log.info("Commit " + commitTime + " failed!")
}
val hiveSyncEnabled = parameters.get(HIVE_SYNC_ENABLED_OPT_KEY).exists(r => r.toBoolean)
val syncHiveSucess = if (hiveSyncEnabled) {
log.info("Syncing to Hive Metastore (URL: " + parameters(HIVE_URL_OPT_KEY) + ")")
val fs = FSUtils.getFs(basePath.toString, jsc.hadoopConfiguration)
syncHive(basePath, fs, parameters)
} else {
true
}
client.close()
commitSuccess && syncHiveSucess
} else {
log.error(s"$operation failed with ${errorCount} errors :");
if (log.isTraceEnabled) {
log.trace("Printing out the top 100 errors")
writeStatuses.rdd.filter(ws => ws.hasErrors)
.take(100)
.foreach(ws => {
log.trace("Global error :", ws.getGlobalError)
if (ws.getErrors.size() > 0) {
ws.getErrors.foreach(kt =>
log.trace(s"Error for key: ${kt._1}", kt._2))
}
})
}
false
}
}
(writeSuccessful, common.util.Option.ofNullable(commitTime))
}
/**
* Add default options for unspecified write options keys.
*
* @param parameters
* @return
*/
* Add default options for unspecified write options keys.
*
* @param parameters
* @return
*/
def parametersWithWriteDefaults(parameters: Map[String, String]): Map[String, String] = {
Map(OPERATION_OPT_KEY -> DEFAULT_OPERATION_OPT_VAL,
STORAGE_TYPE_OPT_KEY -> DEFAULT_STORAGE_TYPE_OPT_VAL,