1
0

[HUDI-2752] The MOR DELETE block breaks the event time sequence of CDC (#4880)

This commit is contained in:
Danny Chan
2022-04-01 20:46:51 +08:00
committed by GitHub
parent 98b4e9796e
commit 6df14f15a3
18 changed files with 356 additions and 71 deletions

View File

@@ -20,9 +20,9 @@ package org.apache.hudi.common.functional;
import org.apache.hudi.avro.HoodieAvroUtils;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.DeleteRecord;
import org.apache.hudi.common.model.HoodieArchivedLogFile;
import org.apache.hudi.common.model.HoodieAvroRecord;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieLogFile;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieTableType;
@@ -1016,13 +1016,13 @@ public class TestHoodieLogFormat extends HoodieCommonTestHarness {
.collect(Collectors.toList());
// Delete 50 keys
List<HoodieKey> deletedKeys = copyOfRecords1.stream()
.map(s -> (new HoodieKey(((GenericRecord) s).get(HoodieRecord.RECORD_KEY_METADATA_FIELD).toString(),
List<DeleteRecord> deletedRecords = copyOfRecords1.stream()
.map(s -> (DeleteRecord.create(((GenericRecord) s).get(HoodieRecord.RECORD_KEY_METADATA_FIELD).toString(),
((GenericRecord) s).get(HoodieRecord.PARTITION_PATH_METADATA_FIELD).toString())))
.collect(Collectors.toList()).subList(0, 50);
header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "102");
HoodieDeleteBlock deleteBlock = new HoodieDeleteBlock(deletedKeys.toArray(new HoodieKey[50]), header);
HoodieDeleteBlock deleteBlock = new HoodieDeleteBlock(deletedRecords.toArray(new DeleteRecord[50]), header);
writer.appendBlock(deleteBlock);
List<String> allLogFiles =
@@ -1063,7 +1063,7 @@ public class TestHoodieLogFormat extends HoodieCommonTestHarness {
});
assertEquals(200, readKeys.size(), "Stream collect should return all 200 records");
assertEquals(50, emptyPayloads.size(), "Stream collect should return all 50 records with empty payloads");
originalKeys.removeAll(deletedKeys);
originalKeys.removeAll(deletedRecords);
Collections.sort(originalKeys);
Collections.sort(readKeys);
assertEquals(originalKeys, readKeys, "CompositeAvroLogReader should return 150 records from 2 versions");
@@ -1097,6 +1097,123 @@ public class TestHoodieLogFormat extends HoodieCommonTestHarness {
assertEquals(200, readKeys.size(), "Stream collect should return all 200 records after rollback of delete");
}
@ParameterizedTest
@MethodSource("testArguments")
public void testAvroLogRecordReaderWithDisorderDelete(ExternalSpillableMap.DiskMapType diskMapType,
boolean isCompressionEnabled,
boolean readBlocksLazily)
throws IOException, URISyntaxException, InterruptedException {
Schema schema = HoodieAvroUtils.addMetadataFields(getSimpleSchema());
// Set a small threshold so that every block is a new version
Writer writer =
HoodieLogFormat.newWriterBuilder().onParentPath(partitionPath).withFileExtension(HoodieLogFile.DELTA_EXTENSION)
.withFileId("test-fileid1").overBaseCommit("100").withFs(fs).build();
// Write 1
List<IndexedRecord> records1 = SchemaTestUtil.generateHoodieTestRecords(0, 100);
List<IndexedRecord> copyOfRecords1 = records1.stream()
.map(record -> HoodieAvroUtils.rewriteRecord((GenericRecord) record, schema)).collect(Collectors.toList());
Map<HoodieLogBlock.HeaderMetadataType, String> header = new HashMap<>();
header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "100");
header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, schema.toString());
HoodieDataBlock dataBlock = getDataBlock(DEFAULT_DATA_BLOCK_TYPE, records1, header);
writer.appendBlock(dataBlock);
// Write 2
header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "101");
List<IndexedRecord> records2 = SchemaTestUtil.generateHoodieTestRecords(0, 100);
List<IndexedRecord> copyOfRecords2 = records2.stream()
.map(record -> HoodieAvroUtils.rewriteRecord((GenericRecord) record, schema)).collect(Collectors.toList());
dataBlock = getDataBlock(DEFAULT_DATA_BLOCK_TYPE, records2, header);
writer.appendBlock(dataBlock);
copyOfRecords1.addAll(copyOfRecords2);
List<String> originalKeys =
copyOfRecords1.stream().map(s -> ((GenericRecord) s).get(HoodieRecord.RECORD_KEY_METADATA_FIELD).toString())
.collect(Collectors.toList());
// Delete 10 keys
// Default orderingVal is 0, which means natural order, the DELETE records
// should overwrite the data records.
List<DeleteRecord> deleteRecords1 = copyOfRecords1.subList(0, 10).stream()
.map(s -> (DeleteRecord.create(((GenericRecord) s).get(HoodieRecord.RECORD_KEY_METADATA_FIELD).toString(),
((GenericRecord) s).get(HoodieRecord.PARTITION_PATH_METADATA_FIELD).toString())))
.collect(Collectors.toList());
header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "102");
HoodieDeleteBlock deleteBlock1 = new HoodieDeleteBlock(deleteRecords1.toArray(new DeleteRecord[0]), header);
writer.appendBlock(deleteBlock1);
// Delete another 10 keys with -1 as orderingVal.
// The deletion should not work
header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "103");
HoodieDeleteBlock deleteBlock2 = new HoodieDeleteBlock(copyOfRecords1.subList(10, 20).stream()
.map(s -> (DeleteRecord.create(((GenericRecord) s).get(HoodieRecord.RECORD_KEY_METADATA_FIELD).toString(),
((GenericRecord) s).get(HoodieRecord.PARTITION_PATH_METADATA_FIELD).toString(), -1))).toArray(DeleteRecord[]::new), header);
writer.appendBlock(deleteBlock2);
// Delete another 10 keys with +1 as orderingVal.
// The deletion should work because the keys has greater ordering value.
List<DeleteRecord> deletedRecords3 = copyOfRecords1.subList(20, 30).stream()
.map(s -> (DeleteRecord.create(((GenericRecord) s).get(HoodieRecord.RECORD_KEY_METADATA_FIELD).toString(),
((GenericRecord) s).get(HoodieRecord.PARTITION_PATH_METADATA_FIELD).toString(), 1)))
.collect(Collectors.toList());
header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "104");
HoodieDeleteBlock deleteBlock3 = new HoodieDeleteBlock(deletedRecords3.toArray(new DeleteRecord[0]), header);
writer.appendBlock(deleteBlock3);
List<String> allLogFiles =
FSUtils.getAllLogFiles(fs, partitionPath, "test-fileid1", HoodieLogFile.DELTA_EXTENSION, "100")
.map(s -> s.getPath().toString()).collect(Collectors.toList());
FileCreateUtils.createDeltaCommit(basePath, "100", fs);
FileCreateUtils.createDeltaCommit(basePath, "101", fs);
FileCreateUtils.createDeltaCommit(basePath, "102", fs);
FileCreateUtils.createDeltaCommit(basePath, "103", fs);
FileCreateUtils.createDeltaCommit(basePath, "104", fs);
HoodieMergedLogRecordScanner scanner = HoodieMergedLogRecordScanner.newBuilder()
.withFileSystem(fs)
.withBasePath(basePath)
.withLogFilePaths(allLogFiles)
.withReaderSchema(schema)
.withLatestInstantTime("104")
.withMaxMemorySizeInBytes(10240L)
.withReadBlocksLazily(readBlocksLazily)
.withReverseReader(false)
.withBufferSize(bufferSize)
.withSpillableMapBasePath(BASE_OUTPUT_PATH)
.withDiskMapType(diskMapType)
.withBitCaskDiskMapCompressionEnabled(isCompressionEnabled)
.build();
assertEquals(200, scanner.getTotalLogRecords(), "We still would read 200 records");
final List<String> readKeys = new ArrayList<>(200);
final List<String> emptyPayloadKeys = new ArrayList<>();
scanner.forEach(s -> readKeys.add(s.getRecordKey()));
scanner.forEach(s -> {
try {
if (!s.getData().getInsertValue(schema).isPresent()) {
emptyPayloadKeys.add(s.getRecordKey());
}
} catch (IOException io) {
throw new UncheckedIOException(io);
}
});
assertEquals(200, readKeys.size(), "Stream collect should return all 200 records");
assertEquals(20, emptyPayloadKeys.size(), "Stream collect should return all 20 records with empty payloads");
originalKeys.removeAll(deleteRecords1.stream().map(DeleteRecord::getRecordKey).collect(Collectors.toSet()));
originalKeys.removeAll(deletedRecords3.stream().map(DeleteRecord::getRecordKey).collect(Collectors.toSet()));
readKeys.removeAll(emptyPayloadKeys);
Collections.sort(originalKeys);
Collections.sort(readKeys);
assertEquals(originalKeys, readKeys, "HoodieMergedLogRecordScanner should return 180 records from 4 versions");
}
@ParameterizedTest
@MethodSource("testArguments")
public void testAvroLogRecordReaderWithFailedRollbacks(ExternalSpillableMap.DiskMapType diskMapType,
@@ -1131,12 +1248,12 @@ public class TestHoodieLogFormat extends HoodieCommonTestHarness {
// Delete 50 keys
// Delete 50 keys
List<HoodieKey> deletedKeys = copyOfRecords1.stream()
.map(s -> (new HoodieKey(((GenericRecord) s).get(HoodieRecord.RECORD_KEY_METADATA_FIELD).toString(),
List<DeleteRecord> deleteRecords = copyOfRecords1.stream()
.map(s -> (DeleteRecord.create(((GenericRecord) s).get(HoodieRecord.RECORD_KEY_METADATA_FIELD).toString(),
((GenericRecord) s).get(HoodieRecord.PARTITION_PATH_METADATA_FIELD).toString())))
.collect(Collectors.toList()).subList(0, 50);
HoodieDeleteBlock deleteBlock = new HoodieDeleteBlock(deletedKeys.toArray(new HoodieKey[50]), header);
HoodieDeleteBlock deleteBlock = new HoodieDeleteBlock(deleteRecords.toArray(new DeleteRecord[50]), header);
writer.appendBlock(deleteBlock);
FileCreateUtils.createDeltaCommit(basePath, "100", fs);
@@ -1208,11 +1325,11 @@ public class TestHoodieLogFormat extends HoodieCommonTestHarness {
writer.appendBlock(dataBlock);
// Delete 50 keys
List<HoodieKey> deletedKeys = copyOfRecords1.stream()
.map(s -> (new HoodieKey(((GenericRecord) s).get(HoodieRecord.RECORD_KEY_METADATA_FIELD).toString(),
List<DeleteRecord> deleteRecords = copyOfRecords1.stream()
.map(s -> (DeleteRecord.create(((GenericRecord) s).get(HoodieRecord.RECORD_KEY_METADATA_FIELD).toString(),
((GenericRecord) s).get(HoodieRecord.PARTITION_PATH_METADATA_FIELD).toString())))
.collect(Collectors.toList()).subList(0, 50);
HoodieDeleteBlock deleteBlock = new HoodieDeleteBlock(deletedKeys.toArray(new HoodieKey[50]), header);
HoodieDeleteBlock deleteBlock = new HoodieDeleteBlock(deleteRecords.toArray(new DeleteRecord[50]), header);
writer.appendBlock(deleteBlock);
FileCreateUtils.createDeltaCommit(basePath, "100", fs);
@@ -1328,11 +1445,11 @@ public class TestHoodieLogFormat extends HoodieCommonTestHarness {
// Delete 50 keys
// Delete 50 keys
List<HoodieKey> deletedKeys = copyOfRecords1.stream()
.map(s -> (new HoodieKey(((GenericRecord) s).get(HoodieRecord.RECORD_KEY_METADATA_FIELD).toString(),
List<DeleteRecord> deleteRecords = copyOfRecords1.stream()
.map(s -> (DeleteRecord.create(((GenericRecord) s).get(HoodieRecord.RECORD_KEY_METADATA_FIELD).toString(),
((GenericRecord) s).get(HoodieRecord.PARTITION_PATH_METADATA_FIELD).toString())))
.collect(Collectors.toList()).subList(0, 50);
HoodieDeleteBlock deleteBlock = new HoodieDeleteBlock(deletedKeys.toArray(new HoodieKey[50]), header);
HoodieDeleteBlock deleteBlock = new HoodieDeleteBlock(deleteRecords.toArray(new DeleteRecord[50]), header);
writer.appendBlock(deleteBlock);
FileCreateUtils.createDeltaCommit(basePath, "100", fs);