1
0

[HUDI-377] Adding Delete() support to DeltaStreamer (#1073)

- Provides ability to perform hard deletes by writing delete marker records into the source data
- if the record contains a special field _hoodie_delete_marker set to true, deletes are performed
This commit is contained in:
Sivabalan Narayanan
2020-01-04 11:07:31 -08:00
committed by vinoth chandar
parent 726ae47ce2
commit 7031445eb3
11 changed files with 150 additions and 71 deletions

View File

@@ -57,6 +57,7 @@ import java.util.stream.Collectors;
import java.util.stream.IntStream; import java.util.stream.IntStream;
import java.util.stream.Stream; import java.util.stream.Stream;
/** /**
* Class to be used in tests to keep generating test inserts and updates against a corpus. * Class to be used in tests to keep generating test inserts and updates against a corpus.
* <p> * <p>
@@ -73,14 +74,15 @@ public class HoodieTestDataGenerator {
public static final String[] DEFAULT_PARTITION_PATHS = public static final String[] DEFAULT_PARTITION_PATHS =
{DEFAULT_FIRST_PARTITION_PATH, DEFAULT_SECOND_PARTITION_PATH, DEFAULT_THIRD_PARTITION_PATH}; {DEFAULT_FIRST_PARTITION_PATH, DEFAULT_SECOND_PARTITION_PATH, DEFAULT_THIRD_PARTITION_PATH};
public static final int DEFAULT_PARTITION_DEPTH = 3; public static final int DEFAULT_PARTITION_DEPTH = 3;
public static String TRIP_EXAMPLE_SCHEMA = "{\"type\": \"record\",\"name\": \"triprec\",\"fields\": [ " public static String TRIP_EXAMPLE_SCHEMA = "{\"type\": \"record\"," + "\"name\": \"triprec\"," + "\"fields\": [ "
+ "{\"name\": \"timestamp\",\"type\": \"double\"},{\"name\": \"_row_key\", \"type\": \"string\"}," + "{\"name\": \"timestamp\",\"type\": \"double\"}," + "{\"name\": \"_row_key\", \"type\": \"string\"},"
+ "{\"name\": \"rider\", \"type\": \"string\"},{\"name\": \"driver\", \"type\": \"string\"}," + "{\"name\": \"rider\", \"type\": \"string\"}," + "{\"name\": \"driver\", \"type\": \"string\"},"
+ "{\"name\": \"begin_lat\", \"type\": \"double\"},{\"name\": \"begin_lon\", \"type\": \"double\"}," + "{\"name\": \"begin_lat\", \"type\": \"double\"}," + "{\"name\": \"begin_lon\", \"type\": \"double\"},"
+ "{\"name\": \"end_lat\", \"type\": \"double\"},{\"name\": \"end_lon\", \"type\": \"double\"}," + "{\"name\": \"end_lat\", \"type\": \"double\"}," + "{\"name\": \"end_lon\", \"type\": \"double\"},"
+ "{\"name\":\"fare\",\"type\": \"double\"}]}"; + "{\"name\":\"fare\",\"type\": \"double\"},"
+ "{\"name\": \"_hoodie_is_deleted\", \"type\": \"boolean\", \"default\": false} ]}";
public static String NULL_SCHEMA = Schema.create(Schema.Type.NULL).toString(); public static String NULL_SCHEMA = Schema.create(Schema.Type.NULL).toString();
public static String TRIP_HIVE_COLUMN_TYPES = "double,string,string,string,double,double,double,double,double"; public static String TRIP_HIVE_COLUMN_TYPES = "double,string,string,string,double,double,double,double,double,boolean";
public static Schema avroSchema = new Schema.Parser().parse(TRIP_EXAMPLE_SCHEMA); public static Schema avroSchema = new Schema.Parser().parse(TRIP_EXAMPLE_SCHEMA);
public static Schema avroSchemaWithMetadataFields = HoodieAvroUtils.addMetadataFields(avroSchema); public static Schema avroSchemaWithMetadataFields = HoodieAvroUtils.addMetadataFields(avroSchema);
@@ -117,6 +119,15 @@ public class HoodieTestDataGenerator {
return new TestRawTripPayload(rec.toString(), key.getRecordKey(), key.getPartitionPath(), TRIP_EXAMPLE_SCHEMA); return new TestRawTripPayload(rec.toString(), key.getRecordKey(), key.getPartitionPath(), TRIP_EXAMPLE_SCHEMA);
} }
/**
* Generates a new avro record of the above schema format for a delete.
*/
public static TestRawTripPayload generateRandomDeleteValue(HoodieKey key, String commitTime) throws IOException {
GenericRecord rec = generateGenericRecord(key.getRecordKey(), "rider-" + commitTime, "driver-" + commitTime, 0.0,
true);
return new TestRawTripPayload(rec.toString(), key.getRecordKey(), key.getPartitionPath(), TRIP_EXAMPLE_SCHEMA);
}
/** /**
* Generates a new avro record of the above schema format, retaining the key if optionally provided. * Generates a new avro record of the above schema format, retaining the key if optionally provided.
*/ */
@@ -126,7 +137,12 @@ public class HoodieTestDataGenerator {
} }
public static GenericRecord generateGenericRecord(String rowKey, String riderName, String driverName, public static GenericRecord generateGenericRecord(String rowKey, String riderName, String driverName,
double timestamp) { double timestamp) {
return generateGenericRecord(rowKey, riderName, driverName, timestamp, false);
}
public static GenericRecord generateGenericRecord(String rowKey, String riderName, String driverName,
double timestamp, boolean isDeleteRecord) {
GenericRecord rec = new GenericData.Record(avroSchema); GenericRecord rec = new GenericData.Record(avroSchema);
rec.put("_row_key", rowKey); rec.put("_row_key", rowKey);
rec.put("timestamp", timestamp); rec.put("timestamp", timestamp);
@@ -137,12 +153,18 @@ public class HoodieTestDataGenerator {
rec.put("end_lat", rand.nextDouble()); rec.put("end_lat", rand.nextDouble());
rec.put("end_lon", rand.nextDouble()); rec.put("end_lon", rand.nextDouble());
rec.put("fare", rand.nextDouble() * 100); rec.put("fare", rand.nextDouble() * 100);
if (isDeleteRecord) {
rec.put("_hoodie_is_deleted", true);
} else {
rec.put("_hoodie_is_deleted", false);
}
return rec; return rec;
} }
public static void createCommitFile(String basePath, String commitTime, Configuration configuration) { public static void createCommitFile(String basePath, String commitTime, Configuration configuration) {
Arrays.asList(HoodieTimeline.makeCommitFileName(commitTime), HoodieTimeline.makeInflightCommitFileName(commitTime), Arrays.asList(HoodieTimeline.makeCommitFileName(commitTime), HoodieTimeline.makeInflightCommitFileName(commitTime),
HoodieTimeline.makeRequestedCommitFileName(commitTime)).forEach(f -> { HoodieTimeline.makeRequestedCommitFileName(commitTime))
.forEach(f -> {
Path commitFile = new Path( Path commitFile = new Path(
basePath + "/" + HoodieTableMetaClient.METAFOLDER_NAME + "/" + f); basePath + "/" + HoodieTableMetaClient.METAFOLDER_NAME + "/" + f);
FSDataOutputStream os = null; FSDataOutputStream os = null;
@@ -176,7 +198,7 @@ public class HoodieTestDataGenerator {
} }
public static void createCompactionAuxiliaryMetadata(String basePath, HoodieInstant instant, public static void createCompactionAuxiliaryMetadata(String basePath, HoodieInstant instant,
Configuration configuration) throws IOException { Configuration configuration) throws IOException {
Path commitFile = Path commitFile =
new Path(basePath + "/" + HoodieTableMetaClient.AUXILIARYFOLDER_NAME + "/" + instant.getFileName()); new Path(basePath + "/" + HoodieTableMetaClient.AUXILIARYFOLDER_NAME + "/" + instant.getFileName());
FileSystem fs = FSUtils.getFs(basePath, configuration); FileSystem fs = FSUtils.getFs(basePath, configuration);
@@ -332,7 +354,7 @@ public class HoodieTestDataGenerator {
* list * list
* *
* @param commitTime Commit Timestamp * @param commitTime Commit Timestamp
* @param n Number of updates (including dups) * @param n Number of updates (including dups)
* @return list of hoodie record updates * @return list of hoodie record updates
*/ */
public List<HoodieRecord> generateUpdates(String commitTime, Integer n) throws IOException { public List<HoodieRecord> generateUpdates(String commitTime, Integer n) throws IOException {
@@ -349,7 +371,7 @@ public class HoodieTestDataGenerator {
* Generates deduped updates of keys previously inserted, randomly distributed across the keys above. * Generates deduped updates of keys previously inserted, randomly distributed across the keys above.
* *
* @param commitTime Commit Timestamp * @param commitTime Commit Timestamp
* @param n Number of unique records * @param n Number of unique records
* @return list of hoodie record updates * @return list of hoodie record updates
*/ */
public List<HoodieRecord> generateUniqueUpdates(String commitTime, Integer n) { public List<HoodieRecord> generateUniqueUpdates(String commitTime, Integer n) {
@@ -370,7 +392,7 @@ public class HoodieTestDataGenerator {
* Generates deduped updates of keys previously inserted, randomly distributed across the keys above. * Generates deduped updates of keys previously inserted, randomly distributed across the keys above.
* *
* @param commitTime Commit Timestamp * @param commitTime Commit Timestamp
* @param n Number of unique records * @param n Number of unique records
* @return stream of hoodie record updates * @return stream of hoodie record updates
*/ */
public Stream<HoodieRecord> generateUniqueUpdatesStream(String commitTime, Integer n) { public Stream<HoodieRecord> generateUniqueUpdatesStream(String commitTime, Integer n) {
@@ -418,11 +440,46 @@ public class HoodieTestDataGenerator {
index = (index + 1) % numExistingKeys; index = (index + 1) % numExistingKeys;
kp = existingKeys.get(index); kp = existingKeys.get(index);
} }
existingKeys.remove(kp);
numExistingKeys--;
used.add(kp); used.add(kp);
return kp.key; return kp.key;
}); });
} }
/**
* Generates deduped delete records previously inserted, randomly distributed across the keys above.
*
* @param commitTime Commit Timestamp
* @param n Number of unique records
* @return stream of hoodie records for delete
*/
public Stream<HoodieRecord> generateUniqueDeleteRecordStream(String commitTime, Integer n) {
final Set<KeyPartition> used = new HashSet<>();
if (n > numExistingKeys) {
throw new IllegalArgumentException("Requested unique deletes is greater than number of available keys");
}
return IntStream.range(0, n).boxed().map(i -> {
int index = numExistingKeys == 1 ? 0 : rand.nextInt(numExistingKeys - 1);
KeyPartition kp = existingKeys.get(index);
// Find the available keyPartition starting from randomly chosen one.
while (used.contains(kp)) {
index = (index + 1) % numExistingKeys;
kp = existingKeys.get(index);
}
existingKeys.remove(kp);
numExistingKeys--;
used.add(kp);
try {
return new HoodieRecord(kp.key, generateRandomDeleteValue(kp.key, commitTime));
} catch (IOException e) {
throw new HoodieIOException(e.getMessage(), e);
}
});
}
public String[] getPartitionPaths() { public String[] getPartitionPaths() {
return partitionPaths; return partitionPaths;
} }

View File

@@ -31,7 +31,6 @@ import java.io.Serializable;
* Base class for all AVRO record based payloads, that can be ordered based on a field. * Base class for all AVRO record based payloads, that can be ordered based on a field.
*/ */
public abstract class BaseAvroPayload implements Serializable { public abstract class BaseAvroPayload implements Serializable {
/** /**
* Avro data extracted from the source converted to bytes. * Avro data extracted from the source converted to bytes.
*/ */
@@ -43,8 +42,10 @@ public abstract class BaseAvroPayload implements Serializable {
protected final Comparable orderingVal; protected final Comparable orderingVal;
/** /**
* @param record * Instantiate {@link BaseAvroPayload}.
* @param orderingVal *
* @param record Generic record for the payload.
* @param orderingVal {@link Comparable} to be used in pre combine.
*/ */
public BaseAvroPayload(GenericRecord record, Comparable orderingVal) { public BaseAvroPayload(GenericRecord record, Comparable orderingVal) {
try { try {

View File

@@ -103,7 +103,7 @@ public class DataSourceUtils {
/** /**
* Create a key generator class via reflection, passing in any configs needed. * Create a key generator class via reflection, passing in any configs needed.
* * <p>
* If the class name of key generator is configured through the properties file, i.e., {@code props}, use the * If the class name of key generator is configured through the properties file, i.e., {@code props}, use the
* corresponding key generator class; otherwise, use the default key generator class specified in {@code * corresponding key generator class; otherwise, use the default key generator class specified in {@code
* DataSourceWriteOptions}. * DataSourceWriteOptions}.
@@ -125,7 +125,7 @@ public class DataSourceUtils {
throws IOException { throws IOException {
try { try {
return (HoodieRecordPayload) ReflectionUtils.loadClass(payloadClass, return (HoodieRecordPayload) ReflectionUtils.loadClass(payloadClass,
new Class<?>[]{GenericRecord.class, Comparable.class}, record, orderingVal); new Class<?>[] {GenericRecord.class, Comparable.class}, record, orderingVal);
} catch (Throwable e) { } catch (Throwable e) {
throw new IOException("Could not create payload for class: " + payloadClass, e); throw new IOException("Could not create payload for class: " + payloadClass, e);
} }
@@ -140,7 +140,7 @@ public class DataSourceUtils {
} }
public static HoodieWriteClient createHoodieClient(JavaSparkContext jssc, String schemaStr, String basePath, public static HoodieWriteClient createHoodieClient(JavaSparkContext jssc, String schemaStr, String basePath,
String tblName, Map<String, String> parameters) { String tblName, Map<String, String> parameters) {
// inline compaction is on by default for MOR // inline compaction is on by default for MOR
boolean inlineCompact = parameters.get(DataSourceWriteOptions.STORAGE_TYPE_OPT_KEY()) boolean inlineCompact = parameters.get(DataSourceWriteOptions.STORAGE_TYPE_OPT_KEY())
@@ -162,7 +162,7 @@ public class DataSourceUtils {
} }
public static JavaRDD<WriteStatus> doWriteOperation(HoodieWriteClient client, JavaRDD<HoodieRecord> hoodieRecords, public static JavaRDD<WriteStatus> doWriteOperation(HoodieWriteClient client, JavaRDD<HoodieRecord> hoodieRecords,
String commitTime, String operation) { String commitTime, String operation) {
if (operation.equals(DataSourceWriteOptions.BULK_INSERT_OPERATION_OPT_VAL())) { if (operation.equals(DataSourceWriteOptions.BULK_INSERT_OPERATION_OPT_VAL())) {
return client.bulkInsert(hoodieRecords, commitTime); return client.bulkInsert(hoodieRecords, commitTime);
} else if (operation.equals(DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL())) { } else if (operation.equals(DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL())) {
@@ -174,19 +174,19 @@ public class DataSourceUtils {
} }
public static JavaRDD<WriteStatus> doDeleteOperation(HoodieWriteClient client, JavaRDD<HoodieKey> hoodieKeys, public static JavaRDD<WriteStatus> doDeleteOperation(HoodieWriteClient client, JavaRDD<HoodieKey> hoodieKeys,
String commitTime) { String commitTime) {
return client.delete(hoodieKeys, commitTime); return client.delete(hoodieKeys, commitTime);
} }
public static HoodieRecord createHoodieRecord(GenericRecord gr, Comparable orderingVal, HoodieKey hKey, public static HoodieRecord createHoodieRecord(GenericRecord gr, Comparable orderingVal, HoodieKey hKey,
String payloadClass) throws IOException { String payloadClass) throws IOException {
HoodieRecordPayload payload = DataSourceUtils.createPayload(payloadClass, gr, orderingVal); HoodieRecordPayload payload = DataSourceUtils.createPayload(payloadClass, gr, orderingVal);
return new HoodieRecord<>(hKey, payload); return new HoodieRecord<>(hKey, payload);
} }
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
public static JavaRDD<HoodieRecord> dropDuplicates(JavaSparkContext jssc, JavaRDD<HoodieRecord> incomingHoodieRecords, public static JavaRDD<HoodieRecord> dropDuplicates(JavaSparkContext jssc, JavaRDD<HoodieRecord> incomingHoodieRecords,
HoodieWriteConfig writeConfig, Option<EmbeddedTimelineService> timelineService) { HoodieWriteConfig writeConfig, Option<EmbeddedTimelineService> timelineService) {
HoodieReadClient client = null; HoodieReadClient client = null;
try { try {
client = new HoodieReadClient<>(jssc, writeConfig, timelineService); client = new HoodieReadClient<>(jssc, writeConfig, timelineService);
@@ -205,7 +205,7 @@ public class DataSourceUtils {
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
public static JavaRDD<HoodieRecord> dropDuplicates(JavaSparkContext jssc, JavaRDD<HoodieRecord> incomingHoodieRecords, public static JavaRDD<HoodieRecord> dropDuplicates(JavaSparkContext jssc, JavaRDD<HoodieRecord> incomingHoodieRecords,
Map<String, String> parameters, Option<EmbeddedTimelineService> timelineService) { Map<String, String> parameters, Option<EmbeddedTimelineService> timelineService) {
HoodieWriteConfig writeConfig = HoodieWriteConfig writeConfig =
HoodieWriteConfig.newBuilder().withPath(parameters.get("path")).withProps(parameters).build(); HoodieWriteConfig.newBuilder().withPath(parameters.get("path")).withProps(parameters).build();
return dropDuplicates(jssc, incomingHoodieRecords, writeConfig, timelineService); return dropDuplicates(jssc, incomingHoodieRecords, writeConfig, timelineService);

View File

@@ -38,8 +38,7 @@ public class OverwriteWithLatestAvroPayload extends BaseAvroPayload
implements HoodieRecordPayload<OverwriteWithLatestAvroPayload> { implements HoodieRecordPayload<OverwriteWithLatestAvroPayload> {
/** /**
* @param record *
* @param orderingVal
*/ */
public OverwriteWithLatestAvroPayload(GenericRecord record, Comparable orderingVal) { public OverwriteWithLatestAvroPayload(GenericRecord record, Comparable orderingVal) {
super(record, orderingVal); super(record, orderingVal);
@@ -61,8 +60,15 @@ public class OverwriteWithLatestAvroPayload extends BaseAvroPayload
@Override @Override
public Option<IndexedRecord> combineAndGetUpdateValue(IndexedRecord currentValue, Schema schema) throws IOException { public Option<IndexedRecord> combineAndGetUpdateValue(IndexedRecord currentValue, Schema schema) throws IOException {
GenericRecord genericRecord = (GenericRecord) getInsertValue(schema).get();
// combining strategy here trivially ignores currentValue on disk and writes this record // combining strategy here trivially ignores currentValue on disk and writes this record
return getInsertValue(schema); Object deleteMarker = genericRecord.get("_hoodie_is_deleted");
if (deleteMarker instanceof Boolean && (boolean) deleteMarker) {
return Option.empty();
} else {
return Option.of(genericRecord);
}
} }
@Override @Override

View File

@@ -158,8 +158,8 @@ public class DeltaSync implements Serializable {
private final HoodieTableType tableType; private final HoodieTableType tableType;
public DeltaSync(HoodieDeltaStreamer.Config cfg, SparkSession sparkSession, SchemaProvider schemaProvider, public DeltaSync(HoodieDeltaStreamer.Config cfg, SparkSession sparkSession, SchemaProvider schemaProvider,
HoodieTableType tableType, TypedProperties props, JavaSparkContext jssc, FileSystem fs, HiveConf hiveConf, HoodieTableType tableType, TypedProperties props, JavaSparkContext jssc, FileSystem fs, HiveConf hiveConf,
Function<HoodieWriteClient, Boolean> onInitializingHoodieWriteClient) throws IOException { Function<HoodieWriteClient, Boolean> onInitializingHoodieWriteClient) throws IOException {
this.cfg = cfg; this.cfg = cfg;
this.jssc = jssc; this.jssc = jssc;
@@ -288,7 +288,7 @@ public class DeltaSync implements Serializable {
// default to RowBasedSchemaProvider // default to RowBasedSchemaProvider
schemaProvider = this.schemaProvider == null || this.schemaProvider.getTargetSchema() == null schemaProvider = this.schemaProvider == null || this.schemaProvider.getTargetSchema() == null
? transformed.map(r -> (SchemaProvider) new RowBasedSchemaProvider(r.schema())).orElse( ? transformed.map(r -> (SchemaProvider) new RowBasedSchemaProvider(r.schema())).orElse(
dataAndCheckpoint.getSchemaProvider()) dataAndCheckpoint.getSchemaProvider())
: this.schemaProvider; : this.schemaProvider;
} else { } else {
// Pull the data from the source & prepare the write // Pull the data from the source & prepare the write
@@ -316,22 +316,22 @@ public class DeltaSync implements Serializable {
(Comparable) DataSourceUtils.getNestedFieldVal(gr, cfg.sourceOrderingField)); (Comparable) DataSourceUtils.getNestedFieldVal(gr, cfg.sourceOrderingField));
return new HoodieRecord<>(keyGenerator.getKey(gr), payload); return new HoodieRecord<>(keyGenerator.getKey(gr), payload);
}); });
return Pair.of(schemaProvider, Pair.of(checkpointStr, records)); return Pair.of(schemaProvider, Pair.of(checkpointStr, records));
} }
/** /**
* Perform Hoodie Write. Run Cleaner, schedule compaction and syncs to hive if needed. * Perform Hoodie Write. Run Cleaner, schedule compaction and syncs to hive if needed.
* *
* @param records Input Records * @param records Input Records
* @param checkpointStr Checkpoint String * @param checkpointStr Checkpoint String
* @param metrics Metrics * @param metrics Metrics
* @return Option Compaction instant if one is scheduled * @return Option Compaction instant if one is scheduled
*/ */
private Option<String> writeToSink(JavaRDD<HoodieRecord> records, String checkpointStr, private Option<String> writeToSink(JavaRDD<HoodieRecord> records, String checkpointStr,
HoodieDeltaStreamerMetrics metrics, Timer.Context overallTimerContext) throws Exception { HoodieDeltaStreamerMetrics metrics, Timer.Context overallTimerContext) throws Exception {
Option<String> scheduledCompactionInstant = Option.empty(); Option<String> scheduledCompactionInstant = Option.empty();
// filter dupes if needed // filter dupes if needed
if (cfg.filterDupes) { if (cfg.filterDupes) {
// turn upserts to insert // turn upserts to insert

View File

@@ -50,10 +50,6 @@ public final class SourceFormatAdapter {
/** /**
* Fetch new data in avro format. If the source provides data in different format, they are translated to Avro format * Fetch new data in avro format. If the source provides data in different format, they are translated to Avro format
*
* @param lastCkptStr
* @param sourceLimit
* @return
*/ */
public InputBatch<JavaRDD<GenericRecord>> fetchNewDataInAvroFormat(Option<String> lastCkptStr, long sourceLimit) { public InputBatch<JavaRDD<GenericRecord>> fetchNewDataInAvroFormat(Option<String> lastCkptStr, long sourceLimit) {
switch (source.getSourceType()) { switch (source.getSourceType()) {
@@ -78,10 +74,6 @@ public final class SourceFormatAdapter {
/** /**
* Fetch new data in row format. If the source provides data in different format, they are translated to Row format * Fetch new data in row format. If the source provides data in different format, they are translated to Row format
*
* @param lastCkptStr
* @param sourceLimit
* @return
*/ */
public InputBatch<Dataset<Row>> fetchNewDataInRowFormat(Option<String> lastCkptStr, long sourceLimit) { public InputBatch<Dataset<Row>> fetchNewDataInRowFormat(Option<String> lastCkptStr, long sourceLimit) {
switch (source.getSourceType()) { switch (source.getSourceType()) {
@@ -95,7 +87,8 @@ public final class SourceFormatAdapter {
.ofNullable( .ofNullable(
r.getBatch() r.getBatch()
.map(rdd -> AvroConversionUtils.createDataFrame(JavaRDD.toRDD(rdd), sourceSchema.toString(), .map(rdd -> AvroConversionUtils.createDataFrame(JavaRDD.toRDD(rdd), sourceSchema.toString(),
source.getSparkSession())) source.getSparkSession())
)
.orElse(null)), .orElse(null)),
r.getCheckpointForNextBatch(), r.getSchemaProvider()); r.getCheckpointForNextBatch(), r.getSchemaProvider());
} }

View File

@@ -179,7 +179,7 @@ public class TestHoodieDeltaStreamer extends UtilitiesTestBase {
} }
static HoodieDeltaStreamer.Config makeConfig(String basePath, Operation op, String transformerClassName, static HoodieDeltaStreamer.Config makeConfig(String basePath, Operation op, String transformerClassName,
String propsFilename, boolean enableHiveSync, boolean useSchemaProviderClass) { String propsFilename, boolean enableHiveSync, boolean useSchemaProviderClass) {
HoodieDeltaStreamer.Config cfg = new HoodieDeltaStreamer.Config(); HoodieDeltaStreamer.Config cfg = new HoodieDeltaStreamer.Config();
cfg.targetBasePath = basePath; cfg.targetBasePath = basePath;
cfg.targetTableName = "hoodie_trips"; cfg.targetTableName = "hoodie_trips";
@@ -198,7 +198,7 @@ public class TestHoodieDeltaStreamer extends UtilitiesTestBase {
} }
static HoodieDeltaStreamer.Config makeConfigForHudiIncrSrc(String srcBasePath, String basePath, Operation op, static HoodieDeltaStreamer.Config makeConfigForHudiIncrSrc(String srcBasePath, String basePath, Operation op,
boolean addReadLatestOnMissingCkpt, String schemaProviderClassName) { boolean addReadLatestOnMissingCkpt, String schemaProviderClassName) {
HoodieDeltaStreamer.Config cfg = new HoodieDeltaStreamer.Config(); HoodieDeltaStreamer.Config cfg = new HoodieDeltaStreamer.Config();
cfg.targetBasePath = basePath; cfg.targetBasePath = basePath;
cfg.targetTableName = "hoodie_trips_copy"; cfg.targetTableName = "hoodie_trips_copy";
@@ -352,11 +352,11 @@ public class TestHoodieDeltaStreamer extends UtilitiesTestBase {
cfg.sourceLimit = 2000; cfg.sourceLimit = 2000;
cfg.operation = Operation.UPSERT; cfg.operation = Operation.UPSERT;
new HoodieDeltaStreamer(cfg, jsc).sync(); new HoodieDeltaStreamer(cfg, jsc).sync();
TestHelpers.assertRecordCount(2000, datasetBasePath + "/*/*.parquet", sqlContext); TestHelpers.assertRecordCount(1950, datasetBasePath + "/*/*.parquet", sqlContext);
TestHelpers.assertDistanceCount(2000, datasetBasePath + "/*/*.parquet", sqlContext); TestHelpers.assertDistanceCount(1950, datasetBasePath + "/*/*.parquet", sqlContext);
TestHelpers.assertCommitMetadata("00001", datasetBasePath, dfs, 2); TestHelpers.assertCommitMetadata("00001", datasetBasePath, dfs, 2);
List<Row> counts = TestHelpers.countsPerCommit(datasetBasePath + "/*/*.parquet", sqlContext); List<Row> counts = TestHelpers.countsPerCommit(datasetBasePath + "/*/*.parquet", sqlContext);
assertEquals(2000, counts.get(0).getLong(1)); assertEquals(1950, counts.stream().mapToLong(entry -> entry.getLong(1)).sum());
} }
@Test @Test
@@ -396,8 +396,8 @@ public class TestHoodieDeltaStreamer extends UtilitiesTestBase {
} else { } else {
TestHelpers.assertAtleastNCompactionCommits(5, datasetBasePath, dfs); TestHelpers.assertAtleastNCompactionCommits(5, datasetBasePath, dfs);
} }
TestHelpers.assertRecordCount(totalRecords, datasetBasePath + "/*/*.parquet", sqlContext); TestHelpers.assertRecordCount(totalRecords + 200, datasetBasePath + "/*/*.parquet", sqlContext);
TestHelpers.assertDistanceCount(totalRecords, datasetBasePath + "/*/*.parquet", sqlContext); TestHelpers.assertDistanceCount(totalRecords + 200, datasetBasePath + "/*/*.parquet", sqlContext);
return true; return true;
}, 180); }, 180);
ds.shutdownGracefully(); ds.shutdownGracefully();
@@ -457,12 +457,12 @@ public class TestHoodieDeltaStreamer extends UtilitiesTestBase {
cfg.sourceLimit = 2000; cfg.sourceLimit = 2000;
cfg.operation = Operation.UPSERT; cfg.operation = Operation.UPSERT;
new HoodieDeltaStreamer(cfg, jsc, dfs, hiveServer.getHiveConf()).sync(); new HoodieDeltaStreamer(cfg, jsc, dfs, hiveServer.getHiveConf()).sync();
TestHelpers.assertRecordCount(2000, datasetBasePath + "/*/*.parquet", sqlContext); TestHelpers.assertRecordCount(1950, datasetBasePath + "/*/*.parquet", sqlContext);
TestHelpers.assertDistanceCount(2000, datasetBasePath + "/*/*.parquet", sqlContext); TestHelpers.assertDistanceCount(1950, datasetBasePath + "/*/*.parquet", sqlContext);
TestHelpers.assertDistanceCountWithExactValue(2000, datasetBasePath + "/*/*.parquet", sqlContext); TestHelpers.assertDistanceCountWithExactValue(1950, datasetBasePath + "/*/*.parquet", sqlContext);
lastInstantForUpstreamTable = TestHelpers.assertCommitMetadata("00001", datasetBasePath, dfs, 2); lastInstantForUpstreamTable = TestHelpers.assertCommitMetadata("00001", datasetBasePath, dfs, 2);
List<Row> counts = TestHelpers.countsPerCommit(datasetBasePath + "/*/*.parquet", sqlContext); List<Row> counts = TestHelpers.countsPerCommit(datasetBasePath + "/*/*.parquet", sqlContext);
assertEquals(2000, counts.get(0).getLong(1)); assertEquals(1950, counts.stream().mapToLong(entry -> entry.getLong(1)).sum());
// Incrementally pull changes in upstream hudi table and apply to downstream table // Incrementally pull changes in upstream hudi table and apply to downstream table
downstreamCfg = downstreamCfg =
@@ -476,7 +476,7 @@ public class TestHoodieDeltaStreamer extends UtilitiesTestBase {
String finalInstant = String finalInstant =
TestHelpers.assertCommitMetadata(lastInstantForUpstreamTable, downstreamDatasetBasePath, dfs, 2); TestHelpers.assertCommitMetadata(lastInstantForUpstreamTable, downstreamDatasetBasePath, dfs, 2);
counts = TestHelpers.countsPerCommit(downstreamDatasetBasePath + "/*/*.parquet", sqlContext); counts = TestHelpers.countsPerCommit(downstreamDatasetBasePath + "/*/*.parquet", sqlContext);
assertEquals(2000, counts.get(0).getLong(1)); assertEquals(2000, counts.stream().mapToLong(entry -> entry.getLong(1)).sum());
// Test Hive integration // Test Hive integration
HoodieHiveClient hiveClient = new HoodieHiveClient(hiveSyncConfig, hiveServer.getHiveConf(), dfs); HoodieHiveClient hiveClient = new HoodieHiveClient(hiveSyncConfig, hiveServer.getHiveConf(), dfs);
@@ -566,12 +566,11 @@ public class TestHoodieDeltaStreamer extends UtilitiesTestBase {
/** /**
* Returns some random number as distance between the points. * Returns some random number as distance between the points.
* *
* @param lat1 Latitiude of source * @param lat1 Latitiude of source
* @param lat2 Latitude of destination * @param lat2 Latitude of destination
* @param lon1 Longitude of source * @param lon1 Longitude of source
* @param lon2 Longitude of destination * @param lon2 Longitude of destination
* @return
*/ */
@Override @Override
public Double call(Double lat1, Double lat2, Double lon1, Double lon2) { public Double call(Double lat1, Double lat2, Double lon1, Double lon2) {
@@ -586,7 +585,7 @@ public class TestHoodieDeltaStreamer extends UtilitiesTestBase {
@Override @Override
public Dataset<Row> apply(JavaSparkContext jsc, SparkSession sparkSession, Dataset<Row> rowDataset, public Dataset<Row> apply(JavaSparkContext jsc, SparkSession sparkSession, Dataset<Row> rowDataset,
TypedProperties properties) { TypedProperties properties) {
rowDataset.sqlContext().udf().register("distance_udf", new DistanceUDF(), DataTypes.DoubleType); rowDataset.sqlContext().udf().register("distance_udf", new DistanceUDF(), DataTypes.DoubleType);
return rowDataset.withColumn("haversine_distance", functions.callUDF("distance_udf", functions.col("begin_lat"), return rowDataset.withColumn("haversine_distance", functions.callUDF("distance_udf", functions.col("begin_lat"),
functions.col("end_lat"), functions.col("begin_lon"), functions.col("end_lat"))); functions.col("end_lat"), functions.col("begin_lon"), functions.col("end_lat")));
@@ -607,7 +606,7 @@ public class TestHoodieDeltaStreamer extends UtilitiesTestBase {
@Override @Override
public Dataset apply(JavaSparkContext jsc, SparkSession sparkSession, Dataset<Row> rowDataset, public Dataset apply(JavaSparkContext jsc, SparkSession sparkSession, Dataset<Row> rowDataset,
TypedProperties properties) { TypedProperties properties) {
System.out.println("DropAllTransformer called !!"); System.out.println("DropAllTransformer called !!");
return sparkSession.createDataFrame(jsc.emptyRDD(), rowDataset.schema()); return sparkSession.createDataFrame(jsc.emptyRDD(), rowDataset.schema());
} }

View File

@@ -76,12 +76,12 @@ public abstract class AbstractBaseTestSource extends AvroSource {
} }
protected AbstractBaseTestSource(TypedProperties props, JavaSparkContext sparkContext, SparkSession sparkSession, protected AbstractBaseTestSource(TypedProperties props, JavaSparkContext sparkContext, SparkSession sparkSession,
SchemaProvider schemaProvider) { SchemaProvider schemaProvider) {
super(props, sparkContext, sparkSession, schemaProvider); super(props, sparkContext, sparkSession, schemaProvider);
} }
protected static Stream<GenericRecord> fetchNextBatch(TypedProperties props, int sourceLimit, String commitTime, protected static Stream<GenericRecord> fetchNextBatch(TypedProperties props, int sourceLimit, String commitTime,
int partition) { int partition) {
int maxUniqueKeys = int maxUniqueKeys =
props.getInteger(TestSourceConfig.MAX_UNIQUE_RECORDS_PROP, TestSourceConfig.DEFAULT_MAX_UNIQUE_RECORDS); props.getInteger(TestSourceConfig.MAX_UNIQUE_RECORDS_PROP, TestSourceConfig.DEFAULT_MAX_UNIQUE_RECORDS);
@@ -94,10 +94,12 @@ public abstract class AbstractBaseTestSource extends AvroSource {
int numUpdates = Math.min(numExistingKeys, sourceLimit / 2); int numUpdates = Math.min(numExistingKeys, sourceLimit / 2);
int numInserts = sourceLimit - numUpdates; int numInserts = sourceLimit - numUpdates;
LOG.info("Before adjustments => numInserts=" + numInserts + ", numUpdates=" + numUpdates); LOG.info("Before adjustments => numInserts=" + numInserts + ", numUpdates=" + numUpdates);
boolean reachedMax = false;
if (numInserts + numExistingKeys > maxUniqueKeys) { if (numInserts + numExistingKeys > maxUniqueKeys) {
// Limit inserts so that maxUniqueRecords is maintained // Limit inserts so that maxUniqueRecords is maintained
numInserts = Math.max(0, maxUniqueKeys - numExistingKeys); numInserts = Math.max(0, maxUniqueKeys - numExistingKeys);
reachedMax = true;
} }
if ((numInserts + numUpdates) < sourceLimit) { if ((numInserts + numUpdates) < sourceLimit) {
@@ -105,16 +107,25 @@ public abstract class AbstractBaseTestSource extends AvroSource {
numUpdates = Math.min(numExistingKeys, sourceLimit - numInserts); numUpdates = Math.min(numExistingKeys, sourceLimit - numInserts);
} }
LOG.info("NumInserts=" + numInserts + ", NumUpdates=" + numUpdates + ", maxUniqueRecords=" + maxUniqueKeys); Stream<GenericRecord> deleteStream = Stream.empty();
Stream<GenericRecord> updateStream;
long memoryUsage1 = Runtime.getRuntime().totalMemory() - Runtime.getRuntime().freeMemory(); long memoryUsage1 = Runtime.getRuntime().totalMemory() - Runtime.getRuntime().freeMemory();
LOG.info("Before DataGen. Memory Usage=" + memoryUsage1 + ", Total Memory=" + Runtime.getRuntime().totalMemory() LOG.info("Before DataGen. Memory Usage=" + memoryUsage1 + ", Total Memory=" + Runtime.getRuntime().totalMemory()
+ ", Free Memory=" + Runtime.getRuntime().freeMemory()); + ", Free Memory=" + Runtime.getRuntime().freeMemory());
if (!reachedMax && numUpdates >= 50) {
Stream<GenericRecord> updateStream = dataGenerator.generateUniqueUpdatesStream(commitTime, numUpdates) LOG.info("After adjustments => NumInserts=" + numInserts + ", NumUpdates=" + (numUpdates - 50) + ", NumDeletes=50, maxUniqueRecords="
.map(hr -> AbstractBaseTestSource.toGenericRecord(hr, dataGenerator)); + maxUniqueKeys);
// if we generate update followed by deletes -> some keys in update batch might be picked up for deletes. Hence generating delete batch followed by updates
deleteStream = dataGenerator.generateUniqueDeleteRecordStream(commitTime, 50).map(hr -> AbstractBaseTestSource.toGenericRecord(hr, dataGenerator));
updateStream = dataGenerator.generateUniqueUpdatesStream(commitTime, numUpdates - 50).map(hr -> AbstractBaseTestSource.toGenericRecord(hr, dataGenerator));
} else {
LOG.info("After adjustments => NumInserts=" + numInserts + ", NumUpdates=" + numUpdates + ", maxUniqueRecords=" + maxUniqueKeys);
updateStream = dataGenerator.generateUniqueUpdatesStream(commitTime, numUpdates)
.map(hr -> AbstractBaseTestSource.toGenericRecord(hr, dataGenerator));
}
Stream<GenericRecord> insertStream = dataGenerator.generateInsertsStream(commitTime, numInserts) Stream<GenericRecord> insertStream = dataGenerator.generateInsertsStream(commitTime, numInserts)
.map(hr -> AbstractBaseTestSource.toGenericRecord(hr, dataGenerator)); .map(hr -> AbstractBaseTestSource.toGenericRecord(hr, dataGenerator));
return Stream.concat(updateStream, insertStream); return Stream.concat(deleteStream, Stream.concat(updateStream, insertStream));
} }
private static GenericRecord toGenericRecord(HoodieRecord hoodieRecord, HoodieTestDataGenerator dataGenerator) { private static GenericRecord toGenericRecord(HoodieRecord hoodieRecord, HoodieTestDataGenerator dataGenerator) {

View File

@@ -43,9 +43,15 @@
}, { }, {
"name" : "end_lon", "name" : "end_lon",
"type" : "double" "type" : "double"
}, { },
{
"name" : "fare", "name" : "fare",
"type" : "double" "type" : "double"
},
{
"name" : "_hoodie_is_deleted",
"type" : "boolean",
"default" : false
} ] } ]
} }

View File

@@ -16,4 +16,4 @@
# limitations under the License. # limitations under the License.
### ###
include=base.properties include=base.properties
hoodie.deltastreamer.transformer.sql=SELECT a.timestamp, a._row_key, a.rider, a.driver, a.begin_lat, a.begin_lon, a.end_lat, a.end_lon, a.fare, CAST(1.0 AS DOUBLE) AS haversine_distance FROM <SRC> a hoodie.deltastreamer.transformer.sql=SELECT a.timestamp, a._row_key, a.rider, a.driver, a.begin_lat, a.begin_lon, a.end_lat, a.end_lon, a.fare, a.`_hoodie_is_deleted`, CAST(1.0 AS DOUBLE) AS haversine_distance FROM <SRC> a

View File

@@ -46,9 +46,15 @@
}, { }, {
"name" : "fare", "name" : "fare",
"type" : "double" "type" : "double"
}, { },
"name" : "haversine_distance", {
"type" : "double" "name" : "_hoodie_is_deleted",
"type" : "boolean",
"default" : false
},
{
"name" : "haversine_distance",
"type" : "double"
}] }]
} }