[HUDI-3835] Add UT for delete in java client (#5270)
This commit is contained in:
@@ -465,6 +465,90 @@ public class TestJavaCopyOnWriteActionExecutor extends HoodieJavaClientTestBase
|
|||||||
verifyStatusResult(returnedStatuses, generateExpectedPartitionNumRecords(inputRecords));
|
verifyStatusResult(returnedStatuses, generateExpectedPartitionNumRecords(inputRecords));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testDeleteRecords() throws Exception {
|
||||||
|
// Prepare the AvroParquetIO
|
||||||
|
HoodieWriteConfig config = makeHoodieClientConfig();
|
||||||
|
int startInstant = 1;
|
||||||
|
String firstCommitTime = makeNewCommitTime(startInstant++, "%09d");
|
||||||
|
HoodieJavaWriteClient writeClient = getHoodieWriteClient(config);
|
||||||
|
writeClient.startCommitWithTime(firstCommitTime);
|
||||||
|
metaClient = HoodieTableMetaClient.reload(metaClient);
|
||||||
|
BaseFileUtils fileUtils = BaseFileUtils.getInstance(metaClient);
|
||||||
|
|
||||||
|
String partitionPath = "2022/04/09";
|
||||||
|
|
||||||
|
// Get some records belong to the same partition (2016/01/31)
|
||||||
|
String recordStr1 = "{\"_row_key\":\"8eb5b87a-1feh-4edd-87b4-6ec96dc405a0\","
|
||||||
|
+ "\"time\":\"2022-04-09T03:16:41.415Z\",\"number\":1}";
|
||||||
|
String recordStr2 = "{\"_row_key\":\"8eb5b87b-1feu-4edd-87b4-6ec96dc405a0\","
|
||||||
|
+ "\"time\":\"2022-04-09T03:20:41.415Z\",\"number\":2}";
|
||||||
|
String recordStr3 = "{\"_row_key\":\"8eb5b87c-1fej-4edd-87b4-6ec96dc405a0\","
|
||||||
|
+ "\"time\":\"2022-04-09T03:16:41.415Z\",\"number\":3}";
|
||||||
|
|
||||||
|
List<HoodieRecord> records = new ArrayList<>();
|
||||||
|
RawTripTestPayload rowChange1 = new RawTripTestPayload(recordStr1);
|
||||||
|
records.add(new HoodieAvroRecord(new HoodieKey(rowChange1.getRowKey(), rowChange1.getPartitionPath()), rowChange1));
|
||||||
|
RawTripTestPayload rowChange2 = new RawTripTestPayload(recordStr2);
|
||||||
|
records.add(new HoodieAvroRecord(new HoodieKey(rowChange2.getRowKey(), rowChange2.getPartitionPath()), rowChange2));
|
||||||
|
RawTripTestPayload rowChange3 = new RawTripTestPayload(recordStr3);
|
||||||
|
records.add(new HoodieAvroRecord(new HoodieKey(rowChange3.getRowKey(), rowChange3.getPartitionPath()), rowChange3));
|
||||||
|
|
||||||
|
// Insert new records
|
||||||
|
writeClient.insert(records, firstCommitTime);
|
||||||
|
|
||||||
|
FileStatus[] allFiles = getIncrementalFiles(partitionPath, "0", -1);
|
||||||
|
assertEquals(1, allFiles.length);
|
||||||
|
|
||||||
|
// Read out the bloom filter and make sure filter can answer record exist or not
|
||||||
|
Path filePath = allFiles[0].getPath();
|
||||||
|
BloomFilter filter = fileUtils.readBloomFilterFromMetadata(hadoopConf, filePath);
|
||||||
|
for (HoodieRecord record : records) {
|
||||||
|
assertTrue(filter.mightContain(record.getRecordKey()));
|
||||||
|
}
|
||||||
|
|
||||||
|
// Read the base file, check the record content
|
||||||
|
List<GenericRecord> fileRecords = fileUtils.readAvroRecords(hadoopConf, filePath);
|
||||||
|
int index = 0;
|
||||||
|
for (GenericRecord record : fileRecords) {
|
||||||
|
assertEquals(records.get(index).getRecordKey(), record.get("_row_key").toString());
|
||||||
|
index++;
|
||||||
|
}
|
||||||
|
|
||||||
|
String newCommitTime = makeNewCommitTime(startInstant++, "%09d");
|
||||||
|
writeClient.startCommitWithTime(newCommitTime);
|
||||||
|
|
||||||
|
// Test delete two records
|
||||||
|
List<HoodieKey> keysForDelete = new ArrayList(Arrays.asList(records.get(0).getKey(), records.get(2).getKey()));
|
||||||
|
writeClient.delete(keysForDelete, newCommitTime);
|
||||||
|
|
||||||
|
allFiles = getIncrementalFiles(partitionPath, "0", -1);
|
||||||
|
assertEquals(1, allFiles.length);
|
||||||
|
|
||||||
|
filePath = allFiles[0].getPath();
|
||||||
|
// Read the base file, check the record content
|
||||||
|
fileRecords = fileUtils.readAvroRecords(hadoopConf, filePath);
|
||||||
|
// Check that the two records are deleted successfully
|
||||||
|
assertEquals(1, fileRecords.size());
|
||||||
|
assertEquals(records.get(1).getRecordKey(), fileRecords.get(0).get("_row_key").toString());
|
||||||
|
|
||||||
|
newCommitTime = makeNewCommitTime(startInstant++, "%09d");
|
||||||
|
writeClient.startCommitWithTime(newCommitTime);
|
||||||
|
|
||||||
|
// Test delete last record
|
||||||
|
keysForDelete = new ArrayList(Arrays.asList(records.get(1).getKey()));
|
||||||
|
writeClient.delete(keysForDelete, newCommitTime);
|
||||||
|
|
||||||
|
allFiles = getIncrementalFiles(partitionPath, "0", -1);
|
||||||
|
assertEquals(1, allFiles.length);
|
||||||
|
|
||||||
|
filePath = allFiles[0].getPath();
|
||||||
|
// Read the base file, check the record content
|
||||||
|
fileRecords = fileUtils.readAvroRecords(hadoopConf, filePath);
|
||||||
|
// Check whether all records have been deleted
|
||||||
|
assertEquals(0, fileRecords.size());
|
||||||
|
}
|
||||||
|
|
||||||
public static Map<String, Long> generateExpectedPartitionNumRecords(List<HoodieRecord> records) {
|
public static Map<String, Long> generateExpectedPartitionNumRecords(List<HoodieRecord> records) {
|
||||||
return records.stream().map(record -> Pair.of(record.getPartitionPath(), 1))
|
return records.stream().map(record -> Pair.of(record.getPartitionPath(), 1))
|
||||||
.collect(Collectors.groupingBy(Pair::getLeft, Collectors.counting()));
|
.collect(Collectors.groupingBy(Pair::getLeft, Collectors.counting()));
|
||||||
|
|||||||
Reference in New Issue
Block a user