1
0

[HUDI-802] Fixing deletes for inserts in same batch in write path (#1792)

* Fixing deletes for inserts in same batch in write path
* Fixing delta streamer tests
* Adding tests for OverwriteWithLatestAvroPayload
This commit is contained in:
Sivabalan Narayanan
2020-07-22 22:39:57 -04:00
committed by GitHub
parent 12ef8c9249
commit 5b6026ba43
6 changed files with 166 additions and 25 deletions

View File

@@ -410,7 +410,37 @@ public class TestHoodieClientOnCopyOnWriteStorage extends HoodieClientTestBase {
}
/**
* Tests when update partition path is set in global bloom, existing record in old partition is deleted appropriately.
* When records getting inserted are deleted in the same write batch, hudi should have deleted those records and
* not be available in read path.
* @throws Exception
*/
@Test
public void testDeletesForInsertsInSameBatch() throws Exception {
HoodieWriteClient client = getHoodieWriteClient(getConfig(), false);
/**
* Write 200 inserts and issue deletes to a subset(50) of inserts.
*/
String initCommitTime = "000";
String newCommitTime = "001";
final List<HoodieRecord> recordsInFirstBatch = new ArrayList<>();
Function2<List<HoodieRecord>, String, Integer> recordGenFunction =
(String instantTime, Integer numRecordsInThisCommit) -> {
List<HoodieRecord> fewRecordsForInsert = dataGen.generateInserts(instantTime, 200);
List<HoodieRecord> fewRecordsForDelete = fewRecordsForInsert.subList(40, 90);
recordsInFirstBatch.addAll(fewRecordsForInsert);
recordsInFirstBatch.addAll(dataGen.generateDeletesFromExistingRecords(fewRecordsForDelete));
return recordsInFirstBatch;
};
writeBatch(client, newCommitTime, initCommitTime, Option.empty(), initCommitTime,
-1, recordGenFunction, HoodieWriteClient::upsert, true, 150, 150, 1);
}
/**
* Test update of a record to different partition with Global Index.
*/
@ParameterizedTest
@EnumSource(value = IndexType.class, names = {"GLOBAL_BLOOM", "GLOBAL_SIMPLE"})

View File

@@ -226,7 +226,7 @@ public class HoodieTestDataGenerator {
public static TestRawTripPayload generateRandomDeleteValue(HoodieKey key, String instantTime) throws IOException {
GenericRecord rec = generateGenericRecord(key.getRecordKey(), "rider-" + instantTime, "driver-" + instantTime, 0.0,
true, false);
return new TestRawTripPayload(rec.toString(), key.getRecordKey(), key.getPartitionPath(), TRIP_EXAMPLE_SCHEMA);
return new TestRawTripPayload(Option.of(rec.toString()), key.getRecordKey(), key.getPartitionPath(), TRIP_EXAMPLE_SCHEMA, true);
}
/**

View File

@@ -101,6 +101,11 @@ public class TestRawTripPayload implements HoodieRecordPayload<TestRawTripPayloa
}
}
public IndexedRecord getRecordToInsert(Schema schema) throws IOException {
MercifulJsonConverter jsonConverter = new MercifulJsonConverter();
return jsonConverter.convert(getJsonData(), schema);
}
@Override
public Option<Map<String, String>> getMetadata() {
// Let's assume we want to count the number of input row change events