[HUDI-3800] Fixed preserve commit metadata for compaction for untouched records (#5232)
This commit is contained in:
committed by
GitHub
parent
e96f08f355
commit
8683fb1d49
@@ -105,7 +105,7 @@ public class HoodieMergeHandle<T extends HoodieRecordPayload, I, K, O> extends H
|
|||||||
protected long recordsDeleted = 0;
|
protected long recordsDeleted = 0;
|
||||||
protected long updatedRecordsWritten = 0;
|
protected long updatedRecordsWritten = 0;
|
||||||
protected long insertRecordsWritten = 0;
|
protected long insertRecordsWritten = 0;
|
||||||
protected boolean useWriterSchema;
|
protected boolean useWriterSchemaForCompaction;
|
||||||
protected Option<BaseKeyGenerator> keyGeneratorOpt;
|
protected Option<BaseKeyGenerator> keyGeneratorOpt;
|
||||||
private HoodieBaseFile baseFileToMerge;
|
private HoodieBaseFile baseFileToMerge;
|
||||||
|
|
||||||
@@ -133,7 +133,7 @@ public class HoodieMergeHandle<T extends HoodieRecordPayload, I, K, O> extends H
|
|||||||
HoodieBaseFile dataFileToBeMerged, TaskContextSupplier taskContextSupplier, Option<BaseKeyGenerator> keyGeneratorOpt) {
|
HoodieBaseFile dataFileToBeMerged, TaskContextSupplier taskContextSupplier, Option<BaseKeyGenerator> keyGeneratorOpt) {
|
||||||
super(config, instantTime, partitionPath, fileId, hoodieTable, taskContextSupplier);
|
super(config, instantTime, partitionPath, fileId, hoodieTable, taskContextSupplier);
|
||||||
this.keyToNewRecords = keyToNewRecords;
|
this.keyToNewRecords = keyToNewRecords;
|
||||||
this.useWriterSchema = true;
|
this.useWriterSchemaForCompaction = true;
|
||||||
this.preserveMetadata = config.isPreserveHoodieCommitMetadataForCompaction();
|
this.preserveMetadata = config.isPreserveHoodieCommitMetadataForCompaction();
|
||||||
init(fileId, this.partitionPath, dataFileToBeMerged);
|
init(fileId, this.partitionPath, dataFileToBeMerged);
|
||||||
validateAndSetAndKeyGenProps(keyGeneratorOpt, config.populateMetaFields());
|
validateAndSetAndKeyGenProps(keyGeneratorOpt, config.populateMetaFields());
|
||||||
@@ -267,7 +267,7 @@ public class HoodieMergeHandle<T extends HoodieRecordPayload, I, K, O> extends H
|
|||||||
}
|
}
|
||||||
|
|
||||||
protected void writeInsertRecord(HoodieRecord<T> hoodieRecord) throws IOException {
|
protected void writeInsertRecord(HoodieRecord<T> hoodieRecord) throws IOException {
|
||||||
Schema schema = useWriterSchema ? tableSchemaWithMetaFields : tableSchema;
|
Schema schema = useWriterSchemaForCompaction ? tableSchemaWithMetaFields : tableSchema;
|
||||||
Option<IndexedRecord> insertRecord = hoodieRecord.getData().getInsertValue(schema, config.getProps());
|
Option<IndexedRecord> insertRecord = hoodieRecord.getData().getInsertValue(schema, config.getProps());
|
||||||
// just skip the ignored record
|
// just skip the ignored record
|
||||||
if (insertRecord.isPresent() && insertRecord.get().equals(IGNORE_RECORD)) {
|
if (insertRecord.isPresent() && insertRecord.get().equals(IGNORE_RECORD)) {
|
||||||
@@ -293,7 +293,7 @@ public class HoodieMergeHandle<T extends HoodieRecordPayload, I, K, O> extends H
|
|||||||
try {
|
try {
|
||||||
if (indexedRecord.isPresent() && !isDelete) {
|
if (indexedRecord.isPresent() && !isDelete) {
|
||||||
// Convert GenericRecord to GenericRecord with hoodie commit metadata in schema
|
// Convert GenericRecord to GenericRecord with hoodie commit metadata in schema
|
||||||
if (preserveMetadata && useWriterSchema) { // useWriteSchema will be true only in case of compaction.
|
if (preserveMetadata && useWriterSchemaForCompaction) { // useWriteSchema will be true only in case of compaction.
|
||||||
fileWriter.writeAvro(hoodieRecord.getRecordKey(),
|
fileWriter.writeAvro(hoodieRecord.getRecordKey(),
|
||||||
rewriteRecordWithMetadata((GenericRecord) indexedRecord.get(), newFilePath.getName()));
|
rewriteRecordWithMetadata((GenericRecord) indexedRecord.get(), newFilePath.getName()));
|
||||||
} else {
|
} else {
|
||||||
@@ -329,7 +329,7 @@ public class HoodieMergeHandle<T extends HoodieRecordPayload, I, K, O> extends H
|
|||||||
try {
|
try {
|
||||||
Option<IndexedRecord> combinedAvroRecord =
|
Option<IndexedRecord> combinedAvroRecord =
|
||||||
hoodieRecord.getData().combineAndGetUpdateValue(oldRecord,
|
hoodieRecord.getData().combineAndGetUpdateValue(oldRecord,
|
||||||
useWriterSchema ? tableSchemaWithMetaFields : tableSchema,
|
useWriterSchemaForCompaction ? tableSchemaWithMetaFields : tableSchema,
|
||||||
config.getPayloadConfig().getProps());
|
config.getPayloadConfig().getProps());
|
||||||
|
|
||||||
if (combinedAvroRecord.isPresent() && combinedAvroRecord.get().equals(IGNORE_RECORD)) {
|
if (combinedAvroRecord.isPresent() && combinedAvroRecord.get().equals(IGNORE_RECORD)) {
|
||||||
@@ -355,6 +355,11 @@ public class HoodieMergeHandle<T extends HoodieRecordPayload, I, K, O> extends H
|
|||||||
if (copyOldRecord) {
|
if (copyOldRecord) {
|
||||||
// this should work as it is, since this is an existing record
|
// this should work as it is, since this is an existing record
|
||||||
try {
|
try {
|
||||||
|
// rewrite file names
|
||||||
|
// do not preserve FILENAME_METADATA_FIELD
|
||||||
|
if (preserveMetadata && useWriterSchemaForCompaction) {
|
||||||
|
oldRecord.put(HoodieRecord.FILENAME_METADATA_FIELD_POS, newFilePath.getName());
|
||||||
|
}
|
||||||
fileWriter.writeAvro(key, oldRecord);
|
fileWriter.writeAvro(key, oldRecord);
|
||||||
} catch (IOException | RuntimeException e) {
|
} catch (IOException | RuntimeException e) {
|
||||||
String errMsg = String.format("Failed to merge old record into new file for key %s from old file %s to new file %s with writerSchema %s",
|
String errMsg = String.format("Failed to merge old record into new file for key %s from old file %s to new file %s with writerSchema %s",
|
||||||
|
|||||||
@@ -90,7 +90,7 @@ public class HoodieSortedMergeHandle<T extends HoodieRecordPayload, I, K, O> ext
|
|||||||
throw new HoodieUpsertException("Insert/Update not in sorted order");
|
throw new HoodieUpsertException("Insert/Update not in sorted order");
|
||||||
}
|
}
|
||||||
try {
|
try {
|
||||||
if (useWriterSchema) {
|
if (useWriterSchemaForCompaction) {
|
||||||
writeRecord(hoodieRecord, hoodieRecord.getData().getInsertValue(tableSchemaWithMetaFields, config.getProps()));
|
writeRecord(hoodieRecord, hoodieRecord.getData().getInsertValue(tableSchemaWithMetaFields, config.getProps()));
|
||||||
} else {
|
} else {
|
||||||
writeRecord(hoodieRecord, hoodieRecord.getData().getInsertValue(tableSchema, config.getProps()));
|
writeRecord(hoodieRecord, hoodieRecord.getData().getInsertValue(tableSchema, config.getProps()));
|
||||||
@@ -113,7 +113,7 @@ public class HoodieSortedMergeHandle<T extends HoodieRecordPayload, I, K, O> ext
|
|||||||
String key = newRecordKeysSorted.poll();
|
String key = newRecordKeysSorted.poll();
|
||||||
HoodieRecord<T> hoodieRecord = keyToNewRecords.get(key);
|
HoodieRecord<T> hoodieRecord = keyToNewRecords.get(key);
|
||||||
if (!writtenRecordKeys.contains(hoodieRecord.getRecordKey())) {
|
if (!writtenRecordKeys.contains(hoodieRecord.getRecordKey())) {
|
||||||
if (useWriterSchema) {
|
if (useWriterSchemaForCompaction) {
|
||||||
writeRecord(hoodieRecord, hoodieRecord.getData().getInsertValue(tableSchemaWithMetaFields, config.getProps()));
|
writeRecord(hoodieRecord, hoodieRecord.getData().getInsertValue(tableSchemaWithMetaFields, config.getProps()));
|
||||||
} else {
|
} else {
|
||||||
writeRecord(hoodieRecord, hoodieRecord.getData().getInsertValue(tableSchema, config.getProps()));
|
writeRecord(hoodieRecord, hoodieRecord.getData().getInsertValue(tableSchema, config.getProps()));
|
||||||
|
|||||||
Reference in New Issue
Block a user