1
0

[HUDI-1058] Make delete marker configurable (#1819)

This commit is contained in:
Shen Hong
2020-08-03 23:06:31 +08:00
committed by GitHub
parent 8aa9142de8
commit 433d7d2c98
13 changed files with 264 additions and 42 deletions

View File

@@ -27,6 +27,7 @@ import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.model.OverwriteWithLatestAvroPayload;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ReflectionUtils;
import org.apache.hudi.common.util.StringUtils;
@@ -215,11 +216,20 @@ public class DataSourceUtils {
/**
* Create a payload class via reflection, passing in an ordering/precombine value.
*/
public static HoodieRecordPayload createPayload(String payloadClass, GenericRecord record, Comparable orderingVal)
throws IOException {
public static HoodieRecordPayload createPayload(String payloadClass, GenericRecord record,
Comparable orderingVal,
String deleteMarkerField) throws IOException {
try {
return (HoodieRecordPayload) ReflectionUtils.loadClass(payloadClass,
new Class<?>[] {GenericRecord.class, Comparable.class}, record, orderingVal);
HoodieRecordPayload payload = null;
if (payloadClass.equals(OverwriteWithLatestAvroPayload.class.getName())) {
payload = (OverwriteWithLatestAvroPayload) ReflectionUtils.loadClass(payloadClass,
new Class<?>[]{GenericRecord.class, Comparable.class, String.class},
record, orderingVal, deleteMarkerField);
} else {
payload = (HoodieRecordPayload) ReflectionUtils.loadClass(payloadClass,
new Class<?>[]{GenericRecord.class, Comparable.class}, record, orderingVal);
}
return payload;
} catch (Throwable e) {
throw new IOException("Could not create payload for class: " + payloadClass, e);
}
@@ -275,8 +285,9 @@ public class DataSourceUtils {
}
public static HoodieRecord createHoodieRecord(GenericRecord gr, Comparable orderingVal, HoodieKey hKey,
String payloadClass) throws IOException {
HoodieRecordPayload payload = DataSourceUtils.createPayload(payloadClass, gr, orderingVal);
String payloadClass,
String deleteMarkerField) throws IOException {
HoodieRecordPayload payload = DataSourceUtils.createPayload(payloadClass, gr, orderingVal, deleteMarkerField);
return new HoodieRecord<>(hKey, payload);
}

View File

@@ -184,6 +184,13 @@ object DataSourceWriteOptions {
val PAYLOAD_CLASS_OPT_KEY = "hoodie.datasource.write.payload.class"
val DEFAULT_PAYLOAD_OPT_VAL = classOf[OverwriteWithLatestAvroPayload].getName
/**
* Field used in OverwriteWithLatestAvroPayload combineAndGetUpdateValue, When two records have the same
* key value, we will check if the new record is deleted by the delete field.
*/
val DELETE_FIELD_OPT_KEY = "hoodie.datasource.write.delete.field"
val DEFAULT_DELETE_FIELD_OPT_VAL = "_hoodie_is_deleted"
/**
* Record key field. Value to be used as the `recordKey` component of `HoodieKey`. Actual value
* will be obtained by invoking .toString() on the field value. Nested fields can be specified using

View File

@@ -110,7 +110,9 @@ private[hudi] object HoodieSparkSqlWriter {
val orderingVal = DataSourceUtils.getNestedFieldVal(gr, parameters(PRECOMBINE_FIELD_OPT_KEY), false)
.asInstanceOf[Comparable[_]]
DataSourceUtils.createHoodieRecord(gr,
orderingVal, keyGenerator.getKey(gr), parameters(PAYLOAD_CLASS_OPT_KEY))
orderingVal, keyGenerator.getKey(gr),
parameters(PAYLOAD_CLASS_OPT_KEY),
parameters(DELETE_FIELD_OPT_KEY))
}).toJavaRDD()
// Handle various save modes
@@ -202,6 +204,7 @@ private[hudi] object HoodieSparkSqlWriter {
TABLE_TYPE_OPT_KEY -> DEFAULT_TABLE_TYPE_OPT_VAL,
PRECOMBINE_FIELD_OPT_KEY -> DEFAULT_PRECOMBINE_FIELD_OPT_VAL,
PAYLOAD_CLASS_OPT_KEY -> DEFAULT_PAYLOAD_OPT_VAL,
DELETE_FIELD_OPT_KEY -> DEFAULT_DELETE_FIELD_OPT_VAL,
RECORDKEY_FIELD_OPT_KEY -> DEFAULT_RECORDKEY_FIELD_OPT_VAL,
PARTITIONPATH_FIELD_OPT_KEY -> DEFAULT_PARTITIONPATH_FIELD_OPT_VAL,
KEYGENERATOR_CLASS_OPT_KEY -> DEFAULT_KEYGENERATOR_CLASS_OPT_VAL,

View File

@@ -100,6 +100,52 @@ class HoodieSparkSqlWriterSuite extends FunSuite with Matchers {
}
}
test("test OverwriteWithLatestAvroPayload with user defined delete field") {
val session = SparkSession.builder()
.appName("test_append_mode")
.master("local[2]")
.config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.getOrCreate()
val path = java.nio.file.Files.createTempDirectory("hoodie_test_path1")
try {
val sqlContext = session.sqlContext
val hoodieFooTableName = "hoodie_foo_tbl"
val keyField = "id"
val deleteMarkerField = "delete_field"
//create a new table
val fooTableModifier = Map("path" -> path.toAbsolutePath.toString,
HoodieWriteConfig.TABLE_NAME -> hoodieFooTableName,
"hoodie.insert.shuffle.parallelism" -> "2",
"hoodie.upsert.shuffle.parallelism" -> "2",
DELETE_FIELD_OPT_KEY -> deleteMarkerField,
RECORDKEY_FIELD_OPT_KEY -> keyField)
val fooTableParams = HoodieSparkSqlWriter.parametersWithWriteDefaults(fooTableModifier)
val id1 = UUID.randomUUID().toString
val dataFrame = session.createDataFrame(Seq(
(id1, 1, false)
)) toDF(keyField, "ts", deleteMarkerField)
HoodieSparkSqlWriter.write(sqlContext, SaveMode.Append, fooTableParams, dataFrame)
val recordCount1 = sqlContext.read.format("org.apache.hudi").load(path.toString + "/*/*.parquet").count
assert(recordCount1 == 1, "result should be 1, but get " + recordCount1)
val dataFrame2 = session.createDataFrame(Seq(
(id1, 2, true)
)) toDF(keyField, "ts", deleteMarkerField)
HoodieSparkSqlWriter.write(sqlContext, SaveMode.Append, fooTableParams, dataFrame2)
val recordCount2 = sqlContext.read.format("org.apache.hudi").load(path.toString + "/*/*.parquet").count()
assert(recordCount2 == 0, "result should be 0, but get " + recordCount2)
} finally {
session.stop()
FileUtils.deleteDirectory(path.toFile)
}
}
case class Test(uuid: String, ts: Long)
}