From 8683fb1d49b408a5012185104b13ae59c376a709 Mon Sep 17 00:00:00 2001 From: Sivabalan Narayanan Date: Wed, 6 Apr 2022 00:56:53 -0700 Subject: [PATCH] [HUDI-3800] Fixed preserve commit metadata for compaction for untouched records (#5232) --- .../org/apache/hudi/io/HoodieMergeHandle.java | 15 ++++++++++----- .../apache/hudi/io/HoodieSortedMergeHandle.java | 4 ++-- 2 files changed, 12 insertions(+), 7 deletions(-) diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java index 567ae63e1..d09087d88 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java @@ -105,7 +105,7 @@ public class HoodieMergeHandle extends H protected long recordsDeleted = 0; protected long updatedRecordsWritten = 0; protected long insertRecordsWritten = 0; - protected boolean useWriterSchema; + protected boolean useWriterSchemaForCompaction; protected Option keyGeneratorOpt; private HoodieBaseFile baseFileToMerge; @@ -133,7 +133,7 @@ public class HoodieMergeHandle extends H HoodieBaseFile dataFileToBeMerged, TaskContextSupplier taskContextSupplier, Option keyGeneratorOpt) { super(config, instantTime, partitionPath, fileId, hoodieTable, taskContextSupplier); this.keyToNewRecords = keyToNewRecords; - this.useWriterSchema = true; + this.useWriterSchemaForCompaction = true; this.preserveMetadata = config.isPreserveHoodieCommitMetadataForCompaction(); init(fileId, this.partitionPath, dataFileToBeMerged); validateAndSetAndKeyGenProps(keyGeneratorOpt, config.populateMetaFields()); @@ -267,7 +267,7 @@ public class HoodieMergeHandle extends H } protected void writeInsertRecord(HoodieRecord hoodieRecord) throws IOException { - Schema schema = useWriterSchema ? tableSchemaWithMetaFields : tableSchema; + Schema schema = useWriterSchemaForCompaction ? tableSchemaWithMetaFields : tableSchema; Option insertRecord = hoodieRecord.getData().getInsertValue(schema, config.getProps()); // just skip the ignored record if (insertRecord.isPresent() && insertRecord.get().equals(IGNORE_RECORD)) { @@ -293,7 +293,7 @@ public class HoodieMergeHandle extends H try { if (indexedRecord.isPresent() && !isDelete) { // Convert GenericRecord to GenericRecord with hoodie commit metadata in schema - if (preserveMetadata && useWriterSchema) { // useWriteSchema will be true only in case of compaction. + if (preserveMetadata && useWriterSchemaForCompaction) { // useWriteSchema will be true only in case of compaction. fileWriter.writeAvro(hoodieRecord.getRecordKey(), rewriteRecordWithMetadata((GenericRecord) indexedRecord.get(), newFilePath.getName())); } else { @@ -329,7 +329,7 @@ public class HoodieMergeHandle extends H try { Option combinedAvroRecord = hoodieRecord.getData().combineAndGetUpdateValue(oldRecord, - useWriterSchema ? tableSchemaWithMetaFields : tableSchema, + useWriterSchemaForCompaction ? tableSchemaWithMetaFields : tableSchema, config.getPayloadConfig().getProps()); if (combinedAvroRecord.isPresent() && combinedAvroRecord.get().equals(IGNORE_RECORD)) { @@ -355,6 +355,11 @@ public class HoodieMergeHandle extends H if (copyOldRecord) { // this should work as it is, since this is an existing record try { + // rewrite file names + // do not preserve FILENAME_METADATA_FIELD + if (preserveMetadata && useWriterSchemaForCompaction) { + oldRecord.put(HoodieRecord.FILENAME_METADATA_FIELD_POS, newFilePath.getName()); + } fileWriter.writeAvro(key, oldRecord); } catch (IOException | RuntimeException e) { String errMsg = String.format("Failed to merge old record into new file for key %s from old file %s to new file %s with writerSchema %s", diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieSortedMergeHandle.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieSortedMergeHandle.java index 897491b90..931b08c2f 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieSortedMergeHandle.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieSortedMergeHandle.java @@ -90,7 +90,7 @@ public class HoodieSortedMergeHandle ext throw new HoodieUpsertException("Insert/Update not in sorted order"); } try { - if (useWriterSchema) { + if (useWriterSchemaForCompaction) { writeRecord(hoodieRecord, hoodieRecord.getData().getInsertValue(tableSchemaWithMetaFields, config.getProps())); } else { writeRecord(hoodieRecord, hoodieRecord.getData().getInsertValue(tableSchema, config.getProps())); @@ -113,7 +113,7 @@ public class HoodieSortedMergeHandle ext String key = newRecordKeysSorted.poll(); HoodieRecord hoodieRecord = keyToNewRecords.get(key); if (!writtenRecordKeys.contains(hoodieRecord.getRecordKey())) { - if (useWriterSchema) { + if (useWriterSchemaForCompaction) { writeRecord(hoodieRecord, hoodieRecord.getData().getInsertValue(tableSchemaWithMetaFields, config.getProps())); } else { writeRecord(hoodieRecord, hoodieRecord.getData().getInsertValue(tableSchema, config.getProps()));