[HUDI-4435] Fix Avro field not found issue introduced by Avro 1.10 (#6155)
Co-authored-by: Wenning Ding <wenningd@amazon.com>
This commit is contained in:
@@ -18,6 +18,7 @@
|
||||
|
||||
package org.apache.hudi.hadoop.utils;
|
||||
|
||||
import org.apache.avro.AvroRuntimeException;
|
||||
import org.apache.avro.JsonProperties;
|
||||
import org.apache.avro.LogicalTypes;
|
||||
import org.apache.avro.Schema;
|
||||
@@ -189,7 +190,14 @@ public class HoodieRealtimeRecordReaderUtils {
|
||||
Writable[] recordValues = new Writable[schema.getFields().size()];
|
||||
int recordValueIndex = 0;
|
||||
for (Schema.Field field : schema.getFields()) {
|
||||
recordValues[recordValueIndex++] = avroToArrayWritable(record.get(field.name()), field.schema());
|
||||
// TODO Revisit Avro exception handling in future
|
||||
Object fieldValue = null;
|
||||
try {
|
||||
fieldValue = record.get(field.name());
|
||||
} catch (AvroRuntimeException e) {
|
||||
LOG.debug("Field:" + field.name() + "not found in Schema:" + schema.toString());
|
||||
}
|
||||
recordValues[recordValueIndex++] = avroToArrayWritable(fieldValue, field.schema());
|
||||
}
|
||||
return new ArrayWritable(Writable.class, recordValues);
|
||||
case ENUM:
|
||||
|
||||
@@ -44,8 +44,10 @@ import org.apache.hudi.common.util.collection.Pair;
|
||||
import org.apache.hudi.exception.HoodieException;
|
||||
import org.apache.hudi.hadoop.RealtimeFileStatus;
|
||||
import org.apache.hudi.hadoop.config.HoodieRealtimeConfig;
|
||||
import org.apache.hudi.hadoop.utils.HoodieRealtimeRecordReaderUtils;
|
||||
import org.apache.hudi.hadoop.testutils.InputFormatTestUtil;
|
||||
|
||||
import org.apache.avro.generic.GenericRecord;
|
||||
import org.apache.avro.Schema;
|
||||
import org.apache.avro.Schema.Field;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
@@ -69,6 +71,7 @@ import org.apache.hadoop.mapred.InputSplit;
|
||||
import org.apache.hadoop.mapred.JobConf;
|
||||
import org.apache.hadoop.mapred.RecordReader;
|
||||
import org.apache.hadoop.mapred.Reporter;
|
||||
|
||||
import org.junit.jupiter.api.BeforeEach;
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.junit.jupiter.api.io.TempDir;
|
||||
@@ -897,6 +900,20 @@ public class TestHoodieRealtimeRecordReader {
|
||||
assertTrue(splits.length == 0);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testAvroToArrayWritable() throws IOException {
|
||||
Schema schema = SchemaTestUtil.getEvolvedSchema();
|
||||
GenericRecord record = SchemaTestUtil.generateAvroRecordFromJson(schema, 1, "100", "100", false);
|
||||
ArrayWritable aWritable = (ArrayWritable) HoodieRealtimeRecordReaderUtils.avroToArrayWritable(record, schema);
|
||||
assertEquals(schema.getFields().size(), aWritable.get().length);
|
||||
|
||||
// In some queries, generic records that Hudi gets are just part of the full records.
|
||||
// Here test the case that some fields are missing in the record.
|
||||
Schema schemaWithMetaFields = HoodieAvroUtils.addMetadataFields(schema);
|
||||
ArrayWritable aWritable2 = (ArrayWritable) HoodieRealtimeRecordReaderUtils.avroToArrayWritable(record, schemaWithMetaFields);
|
||||
assertEquals(schemaWithMetaFields.getFields().size(), aWritable2.get().length);
|
||||
}
|
||||
|
||||
private File createCompactionFile(java.nio.file.Path basePath, String commitTime)
|
||||
throws IOException {
|
||||
File file = basePath.resolve(".hoodie")
|
||||
|
||||
Reference in New Issue
Block a user