[HUDI-3678] Fix record rewrite of create handle when 'preserveMetadata' is true (#5088)
This commit is contained in:
@@ -59,7 +59,7 @@ public class HoodieCreateHandle<T extends HoodieRecordPayload, I, K, O> extends
|
|||||||
protected long recordsDeleted = 0;
|
protected long recordsDeleted = 0;
|
||||||
private Map<String, HoodieRecord<T>> recordMap;
|
private Map<String, HoodieRecord<T>> recordMap;
|
||||||
private boolean useWriterSchema = false;
|
private boolean useWriterSchema = false;
|
||||||
private boolean preserveHoodieMetadata = false;
|
private final boolean preserveMetadata;
|
||||||
|
|
||||||
public HoodieCreateHandle(HoodieWriteConfig config, String instantTime, HoodieTable<T, I, K, O> hoodieTable,
|
public HoodieCreateHandle(HoodieWriteConfig config, String instantTime, HoodieTable<T, I, K, O> hoodieTable,
|
||||||
String partitionPath, String fileId, TaskContextSupplier taskContextSupplier) {
|
String partitionPath, String fileId, TaskContextSupplier taskContextSupplier) {
|
||||||
@@ -69,9 +69,9 @@ public class HoodieCreateHandle<T extends HoodieRecordPayload, I, K, O> extends
|
|||||||
|
|
||||||
public HoodieCreateHandle(HoodieWriteConfig config, String instantTime, HoodieTable<T, I, K, O> hoodieTable,
|
public HoodieCreateHandle(HoodieWriteConfig config, String instantTime, HoodieTable<T, I, K, O> hoodieTable,
|
||||||
String partitionPath, String fileId, TaskContextSupplier taskContextSupplier,
|
String partitionPath, String fileId, TaskContextSupplier taskContextSupplier,
|
||||||
boolean preserveHoodieMetadata) {
|
boolean preserveMetadata) {
|
||||||
this(config, instantTime, hoodieTable, partitionPath, fileId, Option.empty(),
|
this(config, instantTime, hoodieTable, partitionPath, fileId, Option.empty(),
|
||||||
taskContextSupplier, preserveHoodieMetadata);
|
taskContextSupplier, preserveMetadata);
|
||||||
}
|
}
|
||||||
|
|
||||||
public HoodieCreateHandle(HoodieWriteConfig config, String instantTime, HoodieTable<T, I, K, O> hoodieTable,
|
public HoodieCreateHandle(HoodieWriteConfig config, String instantTime, HoodieTable<T, I, K, O> hoodieTable,
|
||||||
@@ -82,10 +82,10 @@ public class HoodieCreateHandle<T extends HoodieRecordPayload, I, K, O> extends
|
|||||||
|
|
||||||
public HoodieCreateHandle(HoodieWriteConfig config, String instantTime, HoodieTable<T, I, K, O> hoodieTable,
|
public HoodieCreateHandle(HoodieWriteConfig config, String instantTime, HoodieTable<T, I, K, O> hoodieTable,
|
||||||
String partitionPath, String fileId, Option<Schema> overriddenSchema,
|
String partitionPath, String fileId, Option<Schema> overriddenSchema,
|
||||||
TaskContextSupplier taskContextSupplier, boolean preserveHoodieMetadata) {
|
TaskContextSupplier taskContextSupplier, boolean preserveMetadata) {
|
||||||
super(config, instantTime, partitionPath, fileId, hoodieTable, overriddenSchema,
|
super(config, instantTime, partitionPath, fileId, hoodieTable, overriddenSchema,
|
||||||
taskContextSupplier);
|
taskContextSupplier);
|
||||||
this.preserveHoodieMetadata = preserveHoodieMetadata;
|
this.preserveMetadata = preserveMetadata;
|
||||||
writeStatus.setFileId(fileId);
|
writeStatus.setFileId(fileId);
|
||||||
writeStatus.setPartitionPath(partitionPath);
|
writeStatus.setPartitionPath(partitionPath);
|
||||||
writeStatus.setStat(new HoodieWriteStat());
|
writeStatus.setStat(new HoodieWriteStat());
|
||||||
@@ -111,7 +111,7 @@ public class HoodieCreateHandle<T extends HoodieRecordPayload, I, K, O> extends
|
|||||||
public HoodieCreateHandle(HoodieWriteConfig config, String instantTime, HoodieTable<T, I, K, O> hoodieTable,
|
public HoodieCreateHandle(HoodieWriteConfig config, String instantTime, HoodieTable<T, I, K, O> hoodieTable,
|
||||||
String partitionPath, String fileId, Map<String, HoodieRecord<T>> recordMap,
|
String partitionPath, String fileId, Map<String, HoodieRecord<T>> recordMap,
|
||||||
TaskContextSupplier taskContextSupplier) {
|
TaskContextSupplier taskContextSupplier) {
|
||||||
this(config, instantTime, hoodieTable, partitionPath, fileId, taskContextSupplier);
|
this(config, instantTime, hoodieTable, partitionPath, fileId, taskContextSupplier, config.isPreserveHoodieCommitMetadataForCompaction());
|
||||||
this.recordMap = recordMap;
|
this.recordMap = recordMap;
|
||||||
this.useWriterSchema = true;
|
this.useWriterSchema = true;
|
||||||
}
|
}
|
||||||
@@ -137,13 +137,11 @@ public class HoodieCreateHandle<T extends HoodieRecordPayload, I, K, O> extends
|
|||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
// Convert GenericRecord to GenericRecord with hoodie commit metadata in schema
|
// Convert GenericRecord to GenericRecord with hoodie commit metadata in schema
|
||||||
IndexedRecord recordWithMetadataInSchema = rewriteRecord((GenericRecord) avroRecord.get());
|
if (preserveMetadata) {
|
||||||
if (preserveHoodieMetadata) {
|
fileWriter.writeAvro(record.getRecordKey(),
|
||||||
// do not preserve FILENAME_METADATA_FIELD
|
rewriteRecordWithMetadata((GenericRecord) avroRecord.get(), path.getName()));
|
||||||
recordWithMetadataInSchema.put(HoodieRecord.HOODIE_META_COLUMNS_NAME_TO_POS.get(HoodieRecord.FILENAME_METADATA_FIELD), path.getName());
|
|
||||||
fileWriter.writeAvro(record.getRecordKey(), recordWithMetadataInSchema);
|
|
||||||
} else {
|
} else {
|
||||||
fileWriter.writeAvroWithMetadata(recordWithMetadataInSchema, record);
|
fileWriter.writeAvroWithMetadata(rewriteRecord((GenericRecord) avroRecord.get()), record);
|
||||||
}
|
}
|
||||||
// update the new location of record, so we know where to find it next
|
// update the new location of record, so we know where to find it next
|
||||||
record.unseal();
|
record.unseal();
|
||||||
|
|||||||
@@ -61,8 +61,6 @@ import java.util.List;
|
|||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
|
|
||||||
import static org.apache.hudi.common.model.HoodieRecord.FILENAME_METADATA_FIELD_POS;
|
|
||||||
|
|
||||||
@SuppressWarnings("Duplicates")
|
@SuppressWarnings("Duplicates")
|
||||||
/**
|
/**
|
||||||
* Handle to merge incoming records to those in storage.
|
* Handle to merge incoming records to those in storage.
|
||||||
@@ -264,7 +262,7 @@ public class HoodieMergeHandle<T extends HoodieRecordPayload, I, K, O> extends H
|
|||||||
isDelete = HoodieOperation.isDelete(hoodieRecord.getOperation());
|
isDelete = HoodieOperation.isDelete(hoodieRecord.getOperation());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return writeRecord(hoodieRecord, indexedRecord, isDelete, oldRecord);
|
return writeRecord(hoodieRecord, indexedRecord, isDelete);
|
||||||
}
|
}
|
||||||
|
|
||||||
protected void writeInsertRecord(HoodieRecord<T> hoodieRecord) throws IOException {
|
protected void writeInsertRecord(HoodieRecord<T> hoodieRecord) throws IOException {
|
||||||
@@ -274,16 +272,16 @@ public class HoodieMergeHandle<T extends HoodieRecordPayload, I, K, O> extends H
|
|||||||
if (insertRecord.isPresent() && insertRecord.get().equals(IGNORE_RECORD)) {
|
if (insertRecord.isPresent() && insertRecord.get().equals(IGNORE_RECORD)) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
if (writeRecord(hoodieRecord, insertRecord, HoodieOperation.isDelete(hoodieRecord.getOperation()), null)) {
|
if (writeRecord(hoodieRecord, insertRecord, HoodieOperation.isDelete(hoodieRecord.getOperation()))) {
|
||||||
insertRecordsWritten++;
|
insertRecordsWritten++;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
protected boolean writeRecord(HoodieRecord<T> hoodieRecord, Option<IndexedRecord> indexedRecord) {
|
protected boolean writeRecord(HoodieRecord<T> hoodieRecord, Option<IndexedRecord> indexedRecord) {
|
||||||
return writeRecord(hoodieRecord, indexedRecord, false, null);
|
return writeRecord(hoodieRecord, indexedRecord, false);
|
||||||
}
|
}
|
||||||
|
|
||||||
protected boolean writeRecord(HoodieRecord<T> hoodieRecord, Option<IndexedRecord> indexedRecord, boolean isDelete, GenericRecord oldRecord) {
|
protected boolean writeRecord(HoodieRecord<T> hoodieRecord, Option<IndexedRecord> indexedRecord, boolean isDelete) {
|
||||||
Option recordMetadata = hoodieRecord.getData().getMetadata();
|
Option recordMetadata = hoodieRecord.getData().getMetadata();
|
||||||
if (!partitionPath.equals(hoodieRecord.getPartitionPath())) {
|
if (!partitionPath.equals(hoodieRecord.getPartitionPath())) {
|
||||||
HoodieUpsertException failureEx = new HoodieUpsertException("mismatched partition path, record partition: "
|
HoodieUpsertException failureEx = new HoodieUpsertException("mismatched partition path, record partition: "
|
||||||
@@ -294,13 +292,11 @@ public class HoodieMergeHandle<T extends HoodieRecordPayload, I, K, O> extends H
|
|||||||
try {
|
try {
|
||||||
if (indexedRecord.isPresent() && !isDelete) {
|
if (indexedRecord.isPresent() && !isDelete) {
|
||||||
// Convert GenericRecord to GenericRecord with hoodie commit metadata in schema
|
// Convert GenericRecord to GenericRecord with hoodie commit metadata in schema
|
||||||
IndexedRecord recordWithMetadataInSchema = rewriteRecord((GenericRecord) indexedRecord.get(), preserveMetadata, oldRecord);
|
if (preserveMetadata && useWriterSchema) { // useWriteSchema will be true only in case of compaction.
|
||||||
if (preserveMetadata && useWriterSchema) { // useWriteSchema will be true only incase of compaction.
|
fileWriter.writeAvro(hoodieRecord.getRecordKey(),
|
||||||
// do not preserve FILENAME_METADATA_FIELD
|
rewriteRecordWithMetadata((GenericRecord) indexedRecord.get(), newFilePath.getName()));
|
||||||
recordWithMetadataInSchema.put(FILENAME_METADATA_FIELD_POS, newFilePath.getName());
|
|
||||||
fileWriter.writeAvro(hoodieRecord.getRecordKey(), recordWithMetadataInSchema);
|
|
||||||
} else {
|
} else {
|
||||||
fileWriter.writeAvroWithMetadata(recordWithMetadataInSchema, hoodieRecord);
|
fileWriter.writeAvroWithMetadata(rewriteRecord((GenericRecord) indexedRecord.get()), hoodieRecord);
|
||||||
}
|
}
|
||||||
recordsWritten++;
|
recordsWritten++;
|
||||||
} else {
|
} else {
|
||||||
|
|||||||
@@ -227,8 +227,8 @@ public abstract class HoodieWriteHandle<T extends HoodieRecordPayload, I, K, O>
|
|||||||
return HoodieAvroUtils.rewriteRecord(record, writeSchemaWithMetaFields);
|
return HoodieAvroUtils.rewriteRecord(record, writeSchemaWithMetaFields);
|
||||||
}
|
}
|
||||||
|
|
||||||
protected GenericRecord rewriteRecord(GenericRecord record, boolean copyOverMetaFields, GenericRecord fallbackRecord) {
|
protected GenericRecord rewriteRecordWithMetadata(GenericRecord record, String fileName) {
|
||||||
return HoodieAvroUtils.rewriteRecord(record, writeSchemaWithMetaFields, copyOverMetaFields, fallbackRecord);
|
return HoodieAvroUtils.rewriteRecordWithMetadata(record, writeSchemaWithMetaFields, fileName);
|
||||||
}
|
}
|
||||||
|
|
||||||
public abstract List<WriteStatus> close();
|
public abstract List<WriteStatus> close();
|
||||||
|
|||||||
@@ -259,7 +259,11 @@ public abstract class HoodieBackedTableMetadataWriter implements HoodieTableMeta
|
|||||||
.withInlineCompaction(false)
|
.withInlineCompaction(false)
|
||||||
.withMaxNumDeltaCommitsBeforeCompaction(writeConfig.getMetadataCompactDeltaCommitMax())
|
.withMaxNumDeltaCommitsBeforeCompaction(writeConfig.getMetadataCompactDeltaCommitMax())
|
||||||
// we will trigger archive manually, to ensure only regular writer invokes it
|
// we will trigger archive manually, to ensure only regular writer invokes it
|
||||||
.withAutoArchive(false).build())
|
.withAutoArchive(false)
|
||||||
|
// by default, the HFile does not keep the metadata fields, set up as false
|
||||||
|
// to always use the metadata of the new record.
|
||||||
|
.withPreserveCommitMetadata(false)
|
||||||
|
.build())
|
||||||
.withParallelism(parallelism, parallelism)
|
.withParallelism(parallelism, parallelism)
|
||||||
.withDeleteParallelism(parallelism)
|
.withDeleteParallelism(parallelism)
|
||||||
.withRollbackParallelism(parallelism)
|
.withRollbackParallelism(parallelism)
|
||||||
|
|||||||
@@ -29,6 +29,7 @@ import org.apache.hudi.common.testutils.HoodieTestUtils;
|
|||||||
import org.apache.hudi.common.testutils.RawTripTestPayload;
|
import org.apache.hudi.common.testutils.RawTripTestPayload;
|
||||||
import org.apache.hudi.common.util.BaseFileUtils;
|
import org.apache.hudi.common.util.BaseFileUtils;
|
||||||
import org.apache.hudi.common.util.Option;
|
import org.apache.hudi.common.util.Option;
|
||||||
|
import org.apache.hudi.config.HoodieCompactionConfig;
|
||||||
import org.apache.hudi.config.HoodieWriteConfig;
|
import org.apache.hudi.config.HoodieWriteConfig;
|
||||||
import org.apache.hudi.exception.HoodieUpsertException;
|
import org.apache.hudi.exception.HoodieUpsertException;
|
||||||
import org.apache.hudi.io.HoodieCreateHandle;
|
import org.apache.hudi.io.HoodieCreateHandle;
|
||||||
@@ -77,6 +78,7 @@ public class TestUpdateSchemaEvolution extends HoodieClientTestHarness {
|
|||||||
private WriteStatus prepareFirstRecordCommit(List<String> recordsStrs) throws IOException {
|
private WriteStatus prepareFirstRecordCommit(List<String> recordsStrs) throws IOException {
|
||||||
// Create a bunch of records with an old version of schema
|
// Create a bunch of records with an old version of schema
|
||||||
final HoodieWriteConfig config = makeHoodieClientConfig("/exampleSchema.avsc");
|
final HoodieWriteConfig config = makeHoodieClientConfig("/exampleSchema.avsc");
|
||||||
|
config.setValue(HoodieCompactionConfig.PRESERVE_COMMIT_METADATA, "false");
|
||||||
final HoodieSparkTable table = HoodieSparkTable.create(config, context);
|
final HoodieSparkTable table = HoodieSparkTable.create(config, context);
|
||||||
final List<WriteStatus> statuses = jsc.parallelize(Arrays.asList(1)).map(x -> {
|
final List<WriteStatus> statuses = jsc.parallelize(Arrays.asList(1)).map(x -> {
|
||||||
List<HoodieRecord> insertRecords = new ArrayList<>();
|
List<HoodieRecord> insertRecords = new ArrayList<>();
|
||||||
|
|||||||
@@ -382,23 +382,13 @@ public class HoodieAvroUtils {
|
|||||||
return newRecord;
|
return newRecord;
|
||||||
}
|
}
|
||||||
|
|
||||||
public static GenericRecord rewriteRecord(GenericRecord genericRecord, Schema newSchema, boolean copyOverMetaFields, GenericRecord fallbackRecord) {
|
public static GenericRecord rewriteRecordWithMetadata(GenericRecord genericRecord, Schema newSchema, String fileName) {
|
||||||
GenericRecord newRecord = new GenericData.Record(newSchema);
|
GenericRecord newRecord = new GenericData.Record(newSchema);
|
||||||
boolean isSpecificRecord = genericRecord instanceof SpecificRecordBase;
|
|
||||||
for (Schema.Field f : newSchema.getFields()) {
|
for (Schema.Field f : newSchema.getFields()) {
|
||||||
if (!(isSpecificRecord && isMetadataField(f.name()))) {
|
|
||||||
copyOldValueOrSetDefault(genericRecord, newRecord, f);
|
copyOldValueOrSetDefault(genericRecord, newRecord, f);
|
||||||
}
|
}
|
||||||
if (isMetadataField(f.name()) && copyOverMetaFields) {
|
// do not preserve FILENAME_METADATA_FIELD
|
||||||
// if meta field exists in primary generic record, copy over.
|
newRecord.put(HoodieRecord.FILENAME_METADATA_FIELD_POS, fileName);
|
||||||
if (genericRecord.getSchema().getField(f.name()) != null) {
|
|
||||||
copyOldValueOrSetDefault(genericRecord, newRecord, f);
|
|
||||||
} else if (fallbackRecord != null && fallbackRecord.getSchema().getField(f.name()) != null) {
|
|
||||||
// if not, try to copy from the fallback record.
|
|
||||||
copyOldValueOrSetDefault(fallbackRecord, newRecord, f);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if (!GenericData.get().validate(newSchema, newRecord)) {
|
if (!GenericData.get().validate(newSchema, newRecord)) {
|
||||||
throw new SchemaCompatibilityException(
|
throw new SchemaCompatibilityException(
|
||||||
"Unable to validate the rewritten record " + genericRecord + " against schema " + newSchema);
|
"Unable to validate the rewritten record " + genericRecord + " against schema " + newSchema);
|
||||||
|
|||||||
Reference in New Issue
Block a user