1
0

[HUDI-3921] Fixed schema evolution cannot work with HUDI-3855 (#5376)

- when columns names are renamed (schema evolution enabled), while copying records from old data file with HoodieMergeHande, renamed columns wasn't handled well.
This commit is contained in:
xiarixiaoyao
2022-04-22 06:27:54 +08:00
committed by GitHub
parent de5fa1fe03
commit 037f89ee7c
8 changed files with 137 additions and 39 deletions

View File

@@ -46,6 +46,9 @@ import org.apache.log4j.Logger;
import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.HashMap;
import static org.apache.hudi.common.util.StringUtils.isNullOrEmpty;
/**
* Base class for all write operations logically performed at the file group level.
@@ -98,6 +101,8 @@ public abstract class HoodieWriteHandle<T extends HoodieRecordPayload, I, K, O>
protected final String fileId;
protected final String writeToken;
protected final TaskContextSupplier taskContextSupplier;
// For full schema evolution
protected final boolean schemaOnReadEnabled;
public HoodieWriteHandle(HoodieWriteConfig config, String instantTime, String partitionPath,
String fileId, HoodieTable<T, I, K, O> hoodieTable, TaskContextSupplier taskContextSupplier) {
@@ -120,6 +125,7 @@ public abstract class HoodieWriteHandle<T extends HoodieRecordPayload, I, K, O>
!hoodieTable.getIndex().isImplicitWithStorage(), config.getWriteStatusFailureFraction());
this.taskContextSupplier = taskContextSupplier;
this.writeToken = makeWriteToken();
schemaOnReadEnabled = !isNullOrEmpty(hoodieTable.getConfig().getInternalSchema());
}
/**
@@ -224,11 +230,13 @@ public abstract class HoodieWriteHandle<T extends HoodieRecordPayload, I, K, O>
* Rewrite the GenericRecord with the Schema containing the Hoodie Metadata fields.
*/
protected GenericRecord rewriteRecord(GenericRecord record) {
return HoodieAvroUtils.rewriteRecord(record, writeSchemaWithMetaFields);
return schemaOnReadEnabled ? HoodieAvroUtils.rewriteRecordWithNewSchema(record, writeSchemaWithMetaFields, new HashMap<>())
: HoodieAvroUtils.rewriteRecord(record, writeSchemaWithMetaFields);
}
protected GenericRecord rewriteRecordWithMetadata(GenericRecord record, String fileName) {
return HoodieAvroUtils.rewriteRecordWithMetadata(record, writeSchemaWithMetaFields, fileName);
return schemaOnReadEnabled ? HoodieAvroUtils.rewriteEvolutionRecordWithMetadata(record, writeSchemaWithMetaFields, fileName)
: HoodieAvroUtils.rewriteRecordWithMetadata(record, writeSchemaWithMetaFields, fileName);
}
public abstract List<WriteStatus> close();

View File

@@ -36,6 +36,7 @@ import org.apache.hudi.internal.schema.action.InternalSchemaMerger;
import org.apache.hudi.internal.schema.convert.AvroInternalSchemaConverter;
import org.apache.hudi.internal.schema.utils.AvroSchemaEvolutionUtils;
import org.apache.hudi.internal.schema.utils.SerDeHelper;
import org.apache.hudi.internal.schema.utils.InternalSchemaUtils;
import org.apache.hudi.io.HoodieMergeHandle;
import org.apache.hudi.io.storage.HoodieFileReader;
import org.apache.hudi.io.storage.HoodieFileReaderFactory;
@@ -52,6 +53,8 @@ import org.apache.hadoop.conf.Configuration;
import java.io.IOException;
import java.util.Iterator;
import java.util.List;
import java.util.HashMap;
import java.util.Map;
import java.util.stream.Collectors;
public class HoodieMergeHelper<T extends HoodieRecordPayload> extends
@@ -93,6 +96,7 @@ public class HoodieMergeHelper<T extends HoodieRecordPayload> extends
Option<InternalSchema> querySchemaOpt = SerDeHelper.fromJson(table.getConfig().getInternalSchema());
boolean needToReWriteRecord = false;
Map<String, String> renameCols = new HashMap<>();
// TODO support bootstrap
if (querySchemaOpt.isPresent() && !baseFile.getBootstrapBaseFile().isPresent()) {
// check implicitly add columns, and position reorder(spark sql may change cols order)
@@ -109,10 +113,14 @@ public class HoodieMergeHelper<T extends HoodieRecordPayload> extends
&& writeInternalSchema.findIdByName(f) == querySchema.findIdByName(f)
&& writeInternalSchema.findIdByName(f) != -1
&& writeInternalSchema.findType(writeInternalSchema.findIdByName(f)).equals(querySchema.findType(writeInternalSchema.findIdByName(f)))).collect(Collectors.toList());
readSchema = AvroInternalSchemaConverter.convert(new InternalSchemaMerger(writeInternalSchema, querySchema, true, false).mergeSchema(), readSchema.getName());
readSchema = AvroInternalSchemaConverter
.convert(new InternalSchemaMerger(writeInternalSchema, querySchema, true, false, false).mergeSchema(), readSchema.getName());
Schema writeSchemaFromFile = AvroInternalSchemaConverter.convert(writeInternalSchema, readSchema.getName());
needToReWriteRecord = sameCols.size() != colNamesFromWriteSchema.size()
|| SchemaCompatibility.checkReaderWriterCompatibility(writeSchemaFromFile, readSchema).getType() == org.apache.avro.SchemaCompatibility.SchemaCompatibilityType.COMPATIBLE;
|| SchemaCompatibility.checkReaderWriterCompatibility(readSchema, writeSchemaFromFile).getType() == org.apache.avro.SchemaCompatibility.SchemaCompatibilityType.COMPATIBLE;
if (needToReWriteRecord) {
renameCols = InternalSchemaUtils.collectRenameCols(writeInternalSchema, querySchema);
}
}
try {
@@ -121,7 +129,7 @@ public class HoodieMergeHelper<T extends HoodieRecordPayload> extends
readerIterator = getMergingIterator(table, mergeHandle, baseFile, reader, readSchema, externalSchemaTransformation);
} else {
if (needToReWriteRecord) {
readerIterator = HoodieAvroUtils.rewriteRecordWithNewSchema(reader.getRecordIterator(), readSchema);
readerIterator = HoodieAvroUtils.rewriteRecordWithNewSchema(reader.getRecordIterator(), readSchema, renameCols);
} else {
readerIterator = reader.getRecordIterator(readSchema);
}