1
0

[REVERT] "[HUDI-1058] Make delete marker configurable (#1819)" (#1914)

This reverts commit 433d7d2c98.
This commit is contained in:
Sivabalan Narayanan
2020-08-04 18:20:38 -04:00
committed by GitHub
parent 539621bd33
commit ab11ba43e1
14 changed files with 43 additions and 266 deletions

View File

@@ -94,9 +94,6 @@ public class HoodieWriteConfig extends DefaultHoodieConfig {
public static final String BULKINSERT_SORT_MODE = "hoodie.bulkinsert.sort.mode"; public static final String BULKINSERT_SORT_MODE = "hoodie.bulkinsert.sort.mode";
public static final String DEFAULT_BULKINSERT_SORT_MODE = BulkInsertSortMode.GLOBAL_SORT public static final String DEFAULT_BULKINSERT_SORT_MODE = BulkInsertSortMode.GLOBAL_SORT
.toString(); .toString();
public static final String DELETE_MARKER_FIELD_PROP = "hoodie.write.delete.marker.field";
public static final String DEFAULT_DELETE_MARKER_FIELD = "_hoodie_is_deleted";
public static final String EMBEDDED_TIMELINE_SERVER_ENABLED = "hoodie.embed.timeline.server"; public static final String EMBEDDED_TIMELINE_SERVER_ENABLED = "hoodie.embed.timeline.server";
public static final String DEFAULT_EMBEDDED_TIMELINE_SERVER_ENABLED = "true"; public static final String DEFAULT_EMBEDDED_TIMELINE_SERVER_ENABLED = "true";
@@ -277,10 +274,6 @@ public class HoodieWriteConfig extends DefaultHoodieConfig {
return BulkInsertSortMode.valueOf(sortMode.toUpperCase()); return BulkInsertSortMode.valueOf(sortMode.toUpperCase());
} }
public String getDeleteMarkerField() {
return props.getProperty(DELETE_MARKER_FIELD_PROP);
}
/** /**
* compaction properties. * compaction properties.
*/ */
@@ -964,8 +957,6 @@ public class HoodieWriteConfig extends DefaultHoodieConfig {
setDefaultOnCondition(props, !props.containsKey(AVRO_SCHEMA_VALIDATE), AVRO_SCHEMA_VALIDATE, DEFAULT_AVRO_SCHEMA_VALIDATE); setDefaultOnCondition(props, !props.containsKey(AVRO_SCHEMA_VALIDATE), AVRO_SCHEMA_VALIDATE, DEFAULT_AVRO_SCHEMA_VALIDATE);
setDefaultOnCondition(props, !props.containsKey(BULKINSERT_SORT_MODE), setDefaultOnCondition(props, !props.containsKey(BULKINSERT_SORT_MODE),
BULKINSERT_SORT_MODE, DEFAULT_BULKINSERT_SORT_MODE); BULKINSERT_SORT_MODE, DEFAULT_BULKINSERT_SORT_MODE);
setDefaultOnCondition(props, !props.containsKey(DELETE_MARKER_FIELD_PROP),
DELETE_MARKER_FIELD_PROP, DEFAULT_DELETE_MARKER_FIELD);
// Make sure the props is propagated // Make sure the props is propagated
setDefaultOnCondition(props, !isIndexConfigSet, HoodieIndexConfig.newBuilder().fromProperties(props).build()); setDefaultOnCondition(props, !isIndexConfigSet, HoodieIndexConfig.newBuilder().fromProperties(props).build());

View File

@@ -36,8 +36,6 @@ import java.io.IOException;
public class OverwriteWithLatestAvroPayload extends BaseAvroPayload public class OverwriteWithLatestAvroPayload extends BaseAvroPayload
implements HoodieRecordPayload<OverwriteWithLatestAvroPayload> { implements HoodieRecordPayload<OverwriteWithLatestAvroPayload> {
private String deleteMarkerField = "_hoodie_is_deleted";
/** /**
* *
*/ */
@@ -49,12 +47,6 @@ public class OverwriteWithLatestAvroPayload extends BaseAvroPayload
this(record.isPresent() ? record.get() : null, (record1) -> 0); // natural order this(record.isPresent() ? record.get() : null, (record1) -> 0); // natural order
} }
public OverwriteWithLatestAvroPayload(GenericRecord record, Comparable orderingVal,
String deleteMarkerField) {
this(record, orderingVal);
this.deleteMarkerField = deleteMarkerField;
}
@Override @Override
public OverwriteWithLatestAvroPayload preCombine(OverwriteWithLatestAvroPayload another) { public OverwriteWithLatestAvroPayload preCombine(OverwriteWithLatestAvroPayload another) {
// pick the payload with greatest ordering value // pick the payload with greatest ordering value
@@ -88,7 +80,7 @@ public class OverwriteWithLatestAvroPayload extends BaseAvroPayload
* @returns {@code true} if record represents a delete record. {@code false} otherwise. * @returns {@code true} if record represents a delete record. {@code false} otherwise.
*/ */
private boolean isDeleteRecord(GenericRecord genericRecord) { private boolean isDeleteRecord(GenericRecord genericRecord) {
Object deleteMarker = genericRecord.get(deleteMarkerField); Object deleteMarker = genericRecord.get("_hoodie_is_deleted");
return (deleteMarker instanceof Boolean && (boolean) deleteMarker); return (deleteMarker instanceof Boolean && (boolean) deleteMarker);
} }
} }

View File

@@ -37,8 +37,6 @@ import static org.junit.jupiter.api.Assertions.assertFalse;
public class TestOverwriteWithLatestAvroPayload { public class TestOverwriteWithLatestAvroPayload {
private Schema schema; private Schema schema;
String defaultDeleteMarkerField = "_hoodie_is_deleted";
String deleteMarkerField = "delete_marker_field";
@BeforeEach @BeforeEach
public void setUp() throws Exception { public void setUp() throws Exception {
@@ -46,56 +44,26 @@ public class TestOverwriteWithLatestAvroPayload {
new Schema.Field("id", Schema.create(Schema.Type.STRING), "", null), new Schema.Field("id", Schema.create(Schema.Type.STRING), "", null),
new Schema.Field("partition", Schema.create(Schema.Type.STRING), "", null), new Schema.Field("partition", Schema.create(Schema.Type.STRING), "", null),
new Schema.Field("ts", Schema.create(Schema.Type.LONG), "", null), new Schema.Field("ts", Schema.create(Schema.Type.LONG), "", null),
new Schema.Field(defaultDeleteMarkerField, Schema.create(Type.BOOLEAN), "", false), new Schema.Field("_hoodie_is_deleted", Schema.create(Type.BOOLEAN), "", false)
new Schema.Field(deleteMarkerField, Schema.create(Type.BOOLEAN), "", false)
)); ));
} }
@Test @Test
public void testOverwriteWithLatestAvroPayload() throws IOException { public void testActiveRecords() throws IOException {
GenericRecord record1 = new GenericData.Record(schema); GenericRecord record1 = new GenericData.Record(schema);
record1.put("id", "1"); record1.put("id", "1");
record1.put("partition", "partition0"); record1.put("partition", "partition0");
record1.put("ts", 0L); record1.put("ts", 0L);
record1.put(defaultDeleteMarkerField, false); record1.put("_hoodie_is_deleted", false);
record1.put(deleteMarkerField, false);
// test1: set default marker field value to true and user defined to false
GenericRecord record2 = new GenericData.Record(schema); GenericRecord record2 = new GenericData.Record(schema);
record2.put("id", "2"); record2.put("id", "2");
record2.put("partition", "partition1"); record2.put("partition", "partition1");
record2.put("ts", 1L); record2.put("ts", 1L);
record2.put(defaultDeleteMarkerField, true); record2.put("_hoodie_is_deleted", false);
record2.put(deleteMarkerField, false);
// set to user defined marker field with false, the record should be considered active.
assertActiveRecord(record1, record2, deleteMarkerField);
// set to default marker field with true, the record should be considered delete.
assertDeletedRecord(record1, record2, defaultDeleteMarkerField);
// test2: set default marker field value to false and user defined to true
GenericRecord record3 = new GenericData.Record(schema);
record3.put("id", "2");
record3.put("partition", "partition1");
record3.put("ts", 1L);
record3.put(defaultDeleteMarkerField, false);
record3.put(deleteMarkerField, true);
// set to user defined marker field with true, the record should be considered delete.
assertDeletedRecord(record1, record3, deleteMarkerField);
// set to default marker field with false, the record should be considered active.
assertActiveRecord(record1, record3, defaultDeleteMarkerField);
}
private void assertActiveRecord(GenericRecord record1,
GenericRecord record2, String field) throws IOException {
OverwriteWithLatestAvroPayload payload1 = new OverwriteWithLatestAvroPayload(
record1, 1, field);
OverwriteWithLatestAvroPayload payload2 = new OverwriteWithLatestAvroPayload(
record2, 2, field);
OverwriteWithLatestAvroPayload payload1 = new OverwriteWithLatestAvroPayload(record1, 1);
OverwriteWithLatestAvroPayload payload2 = new OverwriteWithLatestAvroPayload(record2, 2);
assertEquals(payload1.preCombine(payload2), payload2); assertEquals(payload1.preCombine(payload2), payload2);
assertEquals(payload2.preCombine(payload1), payload2); assertEquals(payload2.preCombine(payload1), payload2);
@@ -106,12 +74,22 @@ public class TestOverwriteWithLatestAvroPayload {
assertEquals(payload2.combineAndGetUpdateValue(record1, schema).get(), record2); assertEquals(payload2.combineAndGetUpdateValue(record1, schema).get(), record2);
} }
private void assertDeletedRecord(GenericRecord record1, @Test
GenericRecord delRecord1, String field) throws IOException { public void testDeletedRecord() throws IOException {
OverwriteWithLatestAvroPayload payload1 = new OverwriteWithLatestAvroPayload( GenericRecord record1 = new GenericData.Record(schema);
record1, 1, field); record1.put("id", "1");
OverwriteWithLatestAvroPayload payload2 = new OverwriteWithLatestAvroPayload( record1.put("partition", "partition0");
delRecord1, 2, field); record1.put("ts", 0L);
record1.put("_hoodie_is_deleted", false);
GenericRecord delRecord1 = new GenericData.Record(schema);
delRecord1.put("id", "2");
delRecord1.put("partition", "partition1");
delRecord1.put("ts", 1L);
delRecord1.put("_hoodie_is_deleted", true);
OverwriteWithLatestAvroPayload payload1 = new OverwriteWithLatestAvroPayload(record1, 1);
OverwriteWithLatestAvroPayload payload2 = new OverwriteWithLatestAvroPayload(delRecord1, 2);
assertEquals(payload1.preCombine(payload2), payload2); assertEquals(payload1.preCombine(payload2), payload2);
assertEquals(payload2.preCombine(payload1), payload2); assertEquals(payload2.preCombine(payload1), payload2);
@@ -121,4 +99,5 @@ public class TestOverwriteWithLatestAvroPayload {
assertEquals(payload1.combineAndGetUpdateValue(delRecord1, schema).get(), record1); assertEquals(payload1.combineAndGetUpdateValue(delRecord1, schema).get(), record1);
assertFalse(payload2.combineAndGetUpdateValue(record1, schema).isPresent()); assertFalse(payload2.combineAndGetUpdateValue(record1, schema).isPresent());
} }
} }

View File

@@ -106,7 +106,6 @@ public class HoodieTestDataGenerator {
+ "{\"name\": \"seconds_since_epoch\", \"type\": \"long\"}," + "{\"name\": \"seconds_since_epoch\", \"type\": \"long\"},"
+ "{\"name\": \"weight\", \"type\": \"float\"}," + "{\"name\": \"weight\", \"type\": \"float\"},"
+ "{\"name\": \"nation\", \"type\": \"bytes\"}," + "{\"name\": \"nation\", \"type\": \"bytes\"},"
+ "{\"name\": \"user_defined_delete_marker_field\", \"type\": \"boolean\", \"default\": false},"
+ "{\"name\":\"current_date\",\"type\": {\"type\": \"int\", \"logicalType\": \"date\"}}," + "{\"name\":\"current_date\",\"type\": {\"type\": \"int\", \"logicalType\": \"date\"}},"
+ "{\"name\":\"current_ts\",\"type\": {\"type\": \"long\"}}," + "{\"name\":\"current_ts\",\"type\": {\"type\": \"long\"}},"
+ "{\"name\":\"height\",\"type\":{\"type\":\"fixed\",\"name\":\"abc\",\"size\":5,\"logicalType\":\"decimal\",\"precision\":10,\"scale\":6}},"; + "{\"name\":\"height\",\"type\":{\"type\":\"fixed\",\"name\":\"abc\",\"size\":5,\"logicalType\":\"decimal\",\"precision\":10,\"scale\":6}},";
@@ -124,7 +123,7 @@ public class HoodieTestDataGenerator {
+ "{\"name\":\"driver\",\"type\":\"string\"},{\"name\":\"fare\",\"type\":\"double\"},{\"name\": \"_hoodie_is_deleted\", \"type\": \"boolean\", \"default\": false}]}"; + "{\"name\":\"driver\",\"type\":\"string\"},{\"name\":\"fare\",\"type\":\"double\"},{\"name\": \"_hoodie_is_deleted\", \"type\": \"boolean\", \"default\": false}]}";
public static final String NULL_SCHEMA = Schema.create(Schema.Type.NULL).toString(); public static final String NULL_SCHEMA = Schema.create(Schema.Type.NULL).toString();
public static final String TRIP_HIVE_COLUMN_TYPES = "double,string,string,string,double,double,double,double,int,bigint,float,binary,boolean,int,bigint,decimal(10,6)," public static final String TRIP_HIVE_COLUMN_TYPES = "double,string,string,string,double,double,double,double,int,bigint,float,binary,int,bigint,decimal(10,6),"
+ "map<string,string>,struct<amount:double,currency:string>,array<struct<amount:double,currency:string>>,boolean"; + "map<string,string>,struct<amount:double,currency:string>,array<struct<amount:double,currency:string>>,boolean";
@@ -180,18 +179,6 @@ public class HoodieTestDataGenerator {
return null; return null;
} }
public static List<GenericRecord> generateGenericRecords(int n, boolean isDeleteRecord, int instantTime) {
return IntStream.range(0, n).boxed().map(i -> {
String partitionPath = DEFAULT_FIRST_PARTITION_PATH;
HoodieKey key = new HoodieKey("id_" + i, partitionPath);
HoodieTestDataGenerator.KeyPartition kp = new HoodieTestDataGenerator.KeyPartition();
kp.key = key;
kp.partitionPath = partitionPath;
return HoodieTestDataGenerator.generateGenericRecord(
key.getRecordKey(), "rider-" + instantTime, "driver-" + instantTime, instantTime, isDeleteRecord, false);
}).collect(Collectors.toList());
}
/** /**
* Generates a new avro record of the above nested schema format, * Generates a new avro record of the above nested schema format,
* retaining the key if optionally provided. * retaining the key if optionally provided.
@@ -278,11 +265,11 @@ public class HoodieTestDataGenerator {
rec.put("weight", RAND.nextFloat()); rec.put("weight", RAND.nextFloat());
byte[] bytes = "Canada".getBytes(); byte[] bytes = "Canada".getBytes();
rec.put("nation", ByteBuffer.wrap(bytes)); rec.put("nation", ByteBuffer.wrap(bytes));
rec.put("user_defined_delete_marker_field", isDeleteRecord);
long currentTimeMillis = System.currentTimeMillis(); long currentTimeMillis = System.currentTimeMillis();
Date date = new Date(currentTimeMillis); Date date = new Date(currentTimeMillis);
rec.put("current_date", (int) date.toLocalDate().toEpochDay()); rec.put("current_date", (int) date.toLocalDate().toEpochDay());
rec.put("current_ts", currentTimeMillis); rec.put("current_ts", currentTimeMillis);
BigDecimal bigDecimal = new BigDecimal(String.format("%5f", RAND.nextFloat())); BigDecimal bigDecimal = new BigDecimal(String.format("%5f", RAND.nextFloat()));
Schema decimalSchema = AVRO_SCHEMA.getField("height").schema(); Schema decimalSchema = AVRO_SCHEMA.getField("height").schema();
Conversions.DecimalConversion decimalConversions = new Conversions.DecimalConversion(); Conversions.DecimalConversion decimalConversions = new Conversions.DecimalConversion();
@@ -305,7 +292,11 @@ public class HoodieTestDataGenerator {
rec.put("tip_history", tipHistoryArray); rec.put("tip_history", tipHistoryArray);
} }
rec.put("_hoodie_is_deleted", isDeleteRecord); if (isDeleteRecord) {
rec.put("_hoodie_is_deleted", true);
} else {
rec.put("_hoodie_is_deleted", false);
}
return rec; return rec;
} }
@@ -769,8 +760,8 @@ public class HoodieTestDataGenerator {
public static class KeyPartition implements Serializable { public static class KeyPartition implements Serializable {
public HoodieKey key; HoodieKey key;
public String partitionPath; String partitionPath;
} }
public void close() { public void close() {

View File

@@ -27,7 +27,6 @@ import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.model.HoodieKey; import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordPayload; import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.model.OverwriteWithLatestAvroPayload;
import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ReflectionUtils; import org.apache.hudi.common.util.ReflectionUtils;
import org.apache.hudi.common.util.StringUtils; import org.apache.hudi.common.util.StringUtils;
@@ -208,20 +207,11 @@ public class DataSourceUtils {
/** /**
* Create a payload class via reflection, passing in an ordering/precombine value. * Create a payload class via reflection, passing in an ordering/precombine value.
*/ */
public static HoodieRecordPayload createPayload(String payloadClass, GenericRecord record, public static HoodieRecordPayload createPayload(String payloadClass, GenericRecord record, Comparable orderingVal)
Comparable orderingVal, throws IOException {
String deleteMarkerField) throws IOException {
try { try {
HoodieRecordPayload payload = null; return (HoodieRecordPayload) ReflectionUtils.loadClass(payloadClass,
if (payloadClass.equals(OverwriteWithLatestAvroPayload.class.getName())) { new Class<?>[] {GenericRecord.class, Comparable.class}, record, orderingVal);
payload = (OverwriteWithLatestAvroPayload) ReflectionUtils.loadClass(payloadClass,
new Class<?>[]{GenericRecord.class, Comparable.class, String.class},
record, orderingVal, deleteMarkerField);
} else {
payload = (HoodieRecordPayload) ReflectionUtils.loadClass(payloadClass,
new Class<?>[]{GenericRecord.class, Comparable.class}, record, orderingVal);
}
return payload;
} catch (Throwable e) { } catch (Throwable e) {
throw new IOException("Could not create payload for class: " + payloadClass, e); throw new IOException("Could not create payload for class: " + payloadClass, e);
} }
@@ -277,9 +267,8 @@ public class DataSourceUtils {
} }
public static HoodieRecord createHoodieRecord(GenericRecord gr, Comparable orderingVal, HoodieKey hKey, public static HoodieRecord createHoodieRecord(GenericRecord gr, Comparable orderingVal, HoodieKey hKey,
String payloadClass, String payloadClass) throws IOException {
String deleteMarkerField) throws IOException { HoodieRecordPayload payload = DataSourceUtils.createPayload(payloadClass, gr, orderingVal);
HoodieRecordPayload payload = DataSourceUtils.createPayload(payloadClass, gr, orderingVal, deleteMarkerField);
return new HoodieRecord<>(hKey, payload); return new HoodieRecord<>(hKey, payload);
} }

View File

@@ -70,7 +70,7 @@ public class SparkParquetBootstrapDataProvider extends FullRecordBootstrapDataPr
gr, props.getString("hoodie.datasource.write.precombine.field"), false); gr, props.getString("hoodie.datasource.write.precombine.field"), false);
try { try {
return DataSourceUtils.createHoodieRecord(gr, orderingVal, keyGenerator.getKey(gr), return DataSourceUtils.createHoodieRecord(gr, orderingVal, keyGenerator.getKey(gr),
props.getString("hoodie.datasource.write.payload.class"), "_hoodie_is_deleted"); props.getString("hoodie.datasource.write.payload.class"));
} catch (IOException ioe) { } catch (IOException ioe) {
throw new HoodieIOException(ioe.getMessage(), ioe); throw new HoodieIOException(ioe.getMessage(), ioe);
} }

View File

@@ -184,13 +184,6 @@ object DataSourceWriteOptions {
val PAYLOAD_CLASS_OPT_KEY = "hoodie.datasource.write.payload.class" val PAYLOAD_CLASS_OPT_KEY = "hoodie.datasource.write.payload.class"
val DEFAULT_PAYLOAD_OPT_VAL = classOf[OverwriteWithLatestAvroPayload].getName val DEFAULT_PAYLOAD_OPT_VAL = classOf[OverwriteWithLatestAvroPayload].getName
/**
* Field used in OverwriteWithLatestAvroPayload combineAndGetUpdateValue, When two records have the same
* key value, we will check if the new record is deleted by the delete field.
*/
val DELETE_FIELD_OPT_KEY = "hoodie.datasource.write.delete.field"
val DEFAULT_DELETE_FIELD_OPT_VAL = "_hoodie_is_deleted"
/** /**
* Record key field. Value to be used as the `recordKey` component of `HoodieKey`. Actual value * Record key field. Value to be used as the `recordKey` component of `HoodieKey`. Actual value
* will be obtained by invoking .toString() on the field value. Nested fields can be specified using * will be obtained by invoking .toString() on the field value. Nested fields can be specified using

View File

@@ -111,9 +111,7 @@ private[hudi] object HoodieSparkSqlWriter {
val orderingVal = HoodieAvroUtils.getNestedFieldVal(gr, parameters(PRECOMBINE_FIELD_OPT_KEY), false) val orderingVal = HoodieAvroUtils.getNestedFieldVal(gr, parameters(PRECOMBINE_FIELD_OPT_KEY), false)
.asInstanceOf[Comparable[_]] .asInstanceOf[Comparable[_]]
DataSourceUtils.createHoodieRecord(gr, DataSourceUtils.createHoodieRecord(gr,
orderingVal, keyGenerator.getKey(gr), orderingVal, keyGenerator.getKey(gr), parameters(PAYLOAD_CLASS_OPT_KEY))
parameters(PAYLOAD_CLASS_OPT_KEY),
parameters(DELETE_FIELD_OPT_KEY))
}).toJavaRDD() }).toJavaRDD()
// Handle various save modes // Handle various save modes
@@ -206,7 +204,6 @@ private[hudi] object HoodieSparkSqlWriter {
TABLE_TYPE_OPT_KEY -> DEFAULT_TABLE_TYPE_OPT_VAL, TABLE_TYPE_OPT_KEY -> DEFAULT_TABLE_TYPE_OPT_VAL,
PRECOMBINE_FIELD_OPT_KEY -> DEFAULT_PRECOMBINE_FIELD_OPT_VAL, PRECOMBINE_FIELD_OPT_KEY -> DEFAULT_PRECOMBINE_FIELD_OPT_VAL,
PAYLOAD_CLASS_OPT_KEY -> DEFAULT_PAYLOAD_OPT_VAL, PAYLOAD_CLASS_OPT_KEY -> DEFAULT_PAYLOAD_OPT_VAL,
DELETE_FIELD_OPT_KEY -> DEFAULT_DELETE_FIELD_OPT_VAL,
RECORDKEY_FIELD_OPT_KEY -> DEFAULT_RECORDKEY_FIELD_OPT_VAL, RECORDKEY_FIELD_OPT_KEY -> DEFAULT_RECORDKEY_FIELD_OPT_VAL,
PARTITIONPATH_FIELD_OPT_KEY -> DEFAULT_PARTITIONPATH_FIELD_OPT_VAL, PARTITIONPATH_FIELD_OPT_KEY -> DEFAULT_PARTITIONPATH_FIELD_OPT_VAL,
KEYGENERATOR_CLASS_OPT_KEY -> DEFAULT_KEYGENERATOR_CLASS_OPT_VAL, KEYGENERATOR_CLASS_OPT_KEY -> DEFAULT_KEYGENERATOR_CLASS_OPT_VAL,

View File

@@ -100,52 +100,6 @@ class HoodieSparkSqlWriterSuite extends FunSuite with Matchers {
} }
} }
test("test OverwriteWithLatestAvroPayload with user defined delete field") {
val session = SparkSession.builder()
.appName("test_append_mode")
.master("local[2]")
.config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.getOrCreate()
val path = java.nio.file.Files.createTempDirectory("hoodie_test_path1")
try {
val sqlContext = session.sqlContext
val hoodieFooTableName = "hoodie_foo_tbl"
val keyField = "id"
val deleteMarkerField = "delete_field"
//create a new table
val fooTableModifier = Map("path" -> path.toAbsolutePath.toString,
HoodieWriteConfig.TABLE_NAME -> hoodieFooTableName,
"hoodie.insert.shuffle.parallelism" -> "2",
"hoodie.upsert.shuffle.parallelism" -> "2",
DELETE_FIELD_OPT_KEY -> deleteMarkerField,
RECORDKEY_FIELD_OPT_KEY -> keyField)
val fooTableParams = HoodieSparkSqlWriter.parametersWithWriteDefaults(fooTableModifier)
val id1 = UUID.randomUUID().toString
val dataFrame = session.createDataFrame(Seq(
(id1, 1, false)
)) toDF(keyField, "ts", deleteMarkerField)
HoodieSparkSqlWriter.write(sqlContext, SaveMode.Append, fooTableParams, dataFrame)
val recordCount1 = sqlContext.read.format("org.apache.hudi").load(path.toString + "/*/*.parquet").count
assert(recordCount1 == 1, "result should be 1, but get " + recordCount1)
val dataFrame2 = session.createDataFrame(Seq(
(id1, 2, true)
)) toDF(keyField, "ts", deleteMarkerField)
HoodieSparkSqlWriter.write(sqlContext, SaveMode.Append, fooTableParams, dataFrame2)
val recordCount2 = sqlContext.read.format("org.apache.hudi").load(path.toString + "/*/*.parquet").count()
assert(recordCount2 == 0, "result should be 0, but get " + recordCount2)
} finally {
session.stop()
FileUtils.deleteDirectory(path.toFile)
}
}
case class Test(uuid: String, ts: Long) case class Test(uuid: String, ts: Long)
} }

View File

@@ -20,7 +20,6 @@ package org.apache.hudi.utilities.deltastreamer;
import org.apache.hudi.AvroConversionUtils; import org.apache.hudi.AvroConversionUtils;
import org.apache.hudi.DataSourceUtils; import org.apache.hudi.DataSourceUtils;
import org.apache.hudi.avro.HoodieAvroUtils;
import org.apache.hudi.client.HoodieWriteClient; import org.apache.hudi.client.HoodieWriteClient;
import org.apache.hudi.client.WriteStatus; import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.common.config.TypedProperties; import org.apache.hudi.common.config.TypedProperties;
@@ -340,12 +339,9 @@ public class DeltaSync implements Serializable {
} }
JavaRDD<GenericRecord> avroRDD = avroRDDOptional.get(); JavaRDD<GenericRecord> avroRDD = avroRDDOptional.get();
String deleteMarkerField = props.getString(HoodieWriteConfig.DELETE_MARKER_FIELD_PROP,
HoodieWriteConfig.DEFAULT_DELETE_MARKER_FIELD);
JavaRDD<HoodieRecord> records = avroRDD.map(gr -> { JavaRDD<HoodieRecord> records = avroRDD.map(gr -> {
HoodieRecordPayload payload = DataSourceUtils.createPayload(cfg.payloadClassName, gr, HoodieRecordPayload payload = DataSourceUtils.createPayload(cfg.payloadClassName, gr,
(Comparable) HoodieAvroUtils.getNestedFieldVal(gr, cfg.sourceOrderingField, false), (Comparable) DataSourceUtils.getNestedFieldVal(gr, cfg.sourceOrderingField, false));
deleteMarkerField);
return new HoodieRecord<>(keyGenerator.getKey(gr), payload); return new HoodieRecord<>(keyGenerator.getKey(gr), payload);
}); });

View File

@@ -1,97 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.utilities.functional;
import org.apache.avro.generic.GenericRecord;
import org.apache.hadoop.fs.Path;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer;
import org.apache.hudi.utilities.sources.ParquetDFSSource;
import org.apache.hudi.utilities.testutils.UtilitiesTestBase;
import org.apache.spark.sql.Row;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import java.util.List;
import static org.junit.jupiter.api.Assertions.assertEquals;
public class TestDeltaStreamerWithOverwriteLatestAvroPayload extends UtilitiesTestBase {
private static String PARQUET_SOURCE_ROOT;
private static final String PROPS_FILENAME_TEST_PARQUET = "test-parquet-dfs-source.properties";
@BeforeAll
public static void initClass() throws Exception {
UtilitiesTestBase.initClass(true);
PARQUET_SOURCE_ROOT = dfsBasePath + "/parquetFiles";
// prepare the configs.
UtilitiesTestBase.Helpers.copyToDFS("delta-streamer-config/base.properties", dfs, dfsBasePath + "/base.properties");
UtilitiesTestBase.Helpers.copyToDFS("delta-streamer-config/sql-transformer.properties", dfs,
dfsBasePath + "/sql-transformer.properties");
UtilitiesTestBase.Helpers.copyToDFS("delta-streamer-config/source.avsc", dfs, dfsBasePath + "/source.avsc");
UtilitiesTestBase.Helpers.copyToDFS("delta-streamer-config/source-flattened.avsc", dfs, dfsBasePath + "/source-flattened.avsc");
UtilitiesTestBase.Helpers.copyToDFS("delta-streamer-config/target.avsc", dfs, dfsBasePath + "/target.avsc");
}
@Test
public void testOverwriteLatestAvroPayload() throws Exception {
// test defaultDeleteMarkerField
this.testOverwriteLatestAvroPayload(null);
// test userDefinedDeleteMarkerField
this.testOverwriteLatestAvroPayload("user_defined_delete_marker_field");
}
private void testOverwriteLatestAvroPayload(String deleteMarkerField) throws Exception {
String path = PARQUET_SOURCE_ROOT + "/1.parquet";
List<GenericRecord> records = HoodieTestDataGenerator.generateGenericRecords(5, false, 0);
Helpers.saveParquetToDFS(records, new Path(path));
TypedProperties parquetProps = new TypedProperties();
parquetProps.setProperty("include", "base.properties");
parquetProps.setProperty("hoodie.datasource.write.recordkey.field", "_row_key");
parquetProps.setProperty("hoodie.datasource.write.partitionpath.field", "not_there");
parquetProps.setProperty("hoodie.deltastreamer.source.dfs.root", PARQUET_SOURCE_ROOT);
if (deleteMarkerField != null) {
parquetProps.setProperty(HoodieWriteConfig.DELETE_MARKER_FIELD_PROP, deleteMarkerField);
}
Helpers.savePropsToDFS(parquetProps, dfs, dfsBasePath + "/" + PROPS_FILENAME_TEST_PARQUET);
String tableBasePath = dfsBasePath + "/test_overwrite_lastest_avro_payload_table";
HoodieDeltaStreamer deltaStreamer = new HoodieDeltaStreamer(
TestHoodieDeltaStreamer.TestHelpers.makeConfig(tableBasePath, HoodieDeltaStreamer.Operation.INSERT, ParquetDFSSource.class.getName(),
null, PROPS_FILENAME_TEST_PARQUET, false,
false, 100000, false, null, null, "timestamp"), jsc);
deltaStreamer.sync();
TestHoodieDeltaStreamer.TestHelpers.assertRecordCount(5, tableBasePath + "/*/*.parquet", sqlContext);
String path2 = PARQUET_SOURCE_ROOT + "/2.parquet";
List<GenericRecord> records2 = HoodieTestDataGenerator.generateGenericRecords(4, true, 1);
Helpers.saveParquetToDFS(records2, new Path(path2));
deltaStreamer.sync();
List<Row> rows = sqlContext.read().format("org.apache.hudi").load(tableBasePath + "/*/*.parquet").collectAsList();
assertEquals(1, rows.size());
assertEquals(records.get(4).get("_row_key"), rows.get(0).getString(2));
}
}

View File

@@ -55,10 +55,6 @@
},{ },{
"name" : "nation", "name" : "nation",
"type" : "bytes" "type" : "bytes"
},{
"name" : "user_defined_delete_marker_field",
"type" : "boolean",
"default" : false
},{ },{
"name" : "current_date", "name" : "current_date",
"type" : { "type" : {

View File

@@ -16,4 +16,4 @@
# limitations under the License. # limitations under the License.
### ###
include=base.properties include=base.properties
hoodie.deltastreamer.transformer.sql=SELECT a.timestamp, a._row_key, a.rider, a.driver, a.begin_lat, a.begin_lon, a.end_lat, a.end_lon, a.distance_in_meters, a.seconds_since_epoch, a.weight, a.nation, a.user_defined_delete_marker_field, a.current_date, a.current_ts, a.height, a.city_to_state, a.fare, a.tip_history, a.`_hoodie_is_deleted`, CAST(1.0 AS DOUBLE) AS haversine_distance FROM <SRC> a hoodie.deltastreamer.transformer.sql=SELECT a.timestamp, a._row_key, a.rider, a.driver, a.begin_lat, a.begin_lon, a.end_lat, a.end_lon, a.distance_in_meters, a.seconds_since_epoch, a.weight, a.nation, a.current_date, a.current_ts, a.height, a.city_to_state, a.fare, a.tip_history, a.`_hoodie_is_deleted`, CAST(1.0 AS DOUBLE) AS haversine_distance FROM <SRC> a

View File

@@ -55,10 +55,6 @@
}, { }, {
"name" : "nation", "name" : "nation",
"type" : "bytes" "type" : "bytes"
},{
"name" : "user_defined_delete_marker_field",
"type" : "boolean",
"default" : false
},{ },{
"name" : "current_date", "name" : "current_date",
"type" : { "type" : {