[HUDI-727]: Copy default values of fields if not present when rewriting incoming record with new schema (#1427)
This commit is contained in:
@@ -32,7 +32,6 @@ import org.apache.avro.io.BinaryDecoder;
|
|||||||
import org.apache.avro.io.BinaryEncoder;
|
import org.apache.avro.io.BinaryEncoder;
|
||||||
import org.apache.avro.io.DecoderFactory;
|
import org.apache.avro.io.DecoderFactory;
|
||||||
import org.apache.avro.io.EncoderFactory;
|
import org.apache.avro.io.EncoderFactory;
|
||||||
import org.codehaus.jackson.JsonNode;
|
|
||||||
import org.codehaus.jackson.node.NullNode;
|
import org.codehaus.jackson.node.NullNode;
|
||||||
|
|
||||||
import java.io.ByteArrayInputStream;
|
import java.io.ByteArrayInputStream;
|
||||||
@@ -44,6 +43,7 @@ import java.nio.charset.StandardCharsets;
|
|||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
|
import java.util.LinkedHashSet;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.stream.Collectors;
|
import java.util.stream.Collectors;
|
||||||
@@ -104,15 +104,15 @@ public class HoodieAvroUtils {
|
|||||||
List<Schema.Field> parentFields = new ArrayList<>();
|
List<Schema.Field> parentFields = new ArrayList<>();
|
||||||
|
|
||||||
Schema.Field commitTimeField =
|
Schema.Field commitTimeField =
|
||||||
new Schema.Field(HoodieRecord.COMMIT_TIME_METADATA_FIELD, METADATA_FIELD_SCHEMA, "", NullNode.getInstance());
|
new Schema.Field(HoodieRecord.COMMIT_TIME_METADATA_FIELD, METADATA_FIELD_SCHEMA, "", (Object) null);
|
||||||
Schema.Field commitSeqnoField =
|
Schema.Field commitSeqnoField =
|
||||||
new Schema.Field(HoodieRecord.COMMIT_SEQNO_METADATA_FIELD, METADATA_FIELD_SCHEMA, "", NullNode.getInstance());
|
new Schema.Field(HoodieRecord.COMMIT_SEQNO_METADATA_FIELD, METADATA_FIELD_SCHEMA, "", (Object) null);
|
||||||
Schema.Field recordKeyField =
|
Schema.Field recordKeyField =
|
||||||
new Schema.Field(HoodieRecord.RECORD_KEY_METADATA_FIELD, METADATA_FIELD_SCHEMA, "", NullNode.getInstance());
|
new Schema.Field(HoodieRecord.RECORD_KEY_METADATA_FIELD, METADATA_FIELD_SCHEMA, "", (Object) null);
|
||||||
Schema.Field partitionPathField =
|
Schema.Field partitionPathField =
|
||||||
new Schema.Field(HoodieRecord.PARTITION_PATH_METADATA_FIELD, METADATA_FIELD_SCHEMA, "", NullNode.getInstance());
|
new Schema.Field(HoodieRecord.PARTITION_PATH_METADATA_FIELD, METADATA_FIELD_SCHEMA, "", (Object) null);
|
||||||
Schema.Field fileNameField =
|
Schema.Field fileNameField =
|
||||||
new Schema.Field(HoodieRecord.FILENAME_METADATA_FIELD, METADATA_FIELD_SCHEMA, "", NullNode.getInstance());
|
new Schema.Field(HoodieRecord.FILENAME_METADATA_FIELD, METADATA_FIELD_SCHEMA, "", (Object) null);
|
||||||
|
|
||||||
parentFields.add(commitTimeField);
|
parentFields.add(commitTimeField);
|
||||||
parentFields.add(commitSeqnoField);
|
parentFields.add(commitSeqnoField);
|
||||||
@@ -121,8 +121,8 @@ public class HoodieAvroUtils {
|
|||||||
parentFields.add(fileNameField);
|
parentFields.add(fileNameField);
|
||||||
for (Schema.Field field : schema.getFields()) {
|
for (Schema.Field field : schema.getFields()) {
|
||||||
if (!isMetadataField(field.name())) {
|
if (!isMetadataField(field.name())) {
|
||||||
Schema.Field newField = new Schema.Field(field.name(), field.schema(), field.doc(), field.defaultValue());
|
Schema.Field newField = new Schema.Field(field.name(), field.schema(), field.doc(), field.defaultVal());
|
||||||
for (Map.Entry<String, JsonNode> prop : field.getJsonProps().entrySet()) {
|
for (Map.Entry<String, Object> prop : field.getObjectProps().entrySet()) {
|
||||||
newField.addProp(prop.getKey(), prop.getValue());
|
newField.addProp(prop.getKey(), prop.getValue());
|
||||||
}
|
}
|
||||||
parentFields.add(newField);
|
parentFields.add(newField);
|
||||||
@@ -191,7 +191,7 @@ public class HoodieAvroUtils {
|
|||||||
* schema.
|
* schema.
|
||||||
*/
|
*/
|
||||||
public static GenericRecord rewriteRecord(GenericRecord record, Schema newSchema) {
|
public static GenericRecord rewriteRecord(GenericRecord record, Schema newSchema) {
|
||||||
return rewrite(record, record.getSchema(), newSchema);
|
return rewrite(record, getCombinedFieldsToWrite(record.getSchema(), newSchema), newSchema);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@@ -199,13 +199,17 @@ public class HoodieAvroUtils {
|
|||||||
* schema.
|
* schema.
|
||||||
*/
|
*/
|
||||||
public static GenericRecord rewriteRecordWithOnlyNewSchemaFields(GenericRecord record, Schema newSchema) {
|
public static GenericRecord rewriteRecordWithOnlyNewSchemaFields(GenericRecord record, Schema newSchema) {
|
||||||
return rewrite(record, newSchema, newSchema);
|
return rewrite(record, new LinkedHashSet<>(newSchema.getFields()), newSchema);
|
||||||
}
|
}
|
||||||
|
|
||||||
private static GenericRecord rewrite(GenericRecord record, Schema schemaWithFields, Schema newSchema) {
|
private static GenericRecord rewrite(GenericRecord record, LinkedHashSet<Field> fieldsToWrite, Schema newSchema) {
|
||||||
GenericRecord newRecord = new GenericData.Record(newSchema);
|
GenericRecord newRecord = new GenericData.Record(newSchema);
|
||||||
for (Schema.Field f : schemaWithFields.getFields()) {
|
for (Schema.Field f : fieldsToWrite) {
|
||||||
newRecord.put(f.name(), record.get(f.name()));
|
if (record.get(f.name()) == null) {
|
||||||
|
newRecord.put(f.name(), f.defaultVal());
|
||||||
|
} else {
|
||||||
|
newRecord.put(f.name(), record.get(f.name()));
|
||||||
|
}
|
||||||
}
|
}
|
||||||
if (!GenericData.get().validate(newSchema, newRecord)) {
|
if (!GenericData.get().validate(newSchema, newRecord)) {
|
||||||
throw new SchemaCompatabilityException(
|
throw new SchemaCompatabilityException(
|
||||||
@@ -214,6 +218,19 @@ public class HoodieAvroUtils {
|
|||||||
return newRecord;
|
return newRecord;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Generates a super set of fields from both old and new schema.
|
||||||
|
*/
|
||||||
|
private static LinkedHashSet<Field> getCombinedFieldsToWrite(Schema oldSchema, Schema newSchema) {
|
||||||
|
LinkedHashSet<Field> allFields = new LinkedHashSet<>(oldSchema.getFields());
|
||||||
|
for (Schema.Field f : newSchema.getFields()) {
|
||||||
|
if (!allFields.contains(f) && !isMetadataField(f.name())) {
|
||||||
|
allFields.add(f);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return allFields;
|
||||||
|
}
|
||||||
|
|
||||||
public static byte[] compress(String text) {
|
public static byte[] compress(String text) {
|
||||||
ByteArrayOutputStream baos = new ByteArrayOutputStream();
|
ByteArrayOutputStream baos = new ByteArrayOutputStream();
|
||||||
try {
|
try {
|
||||||
|
|||||||
@@ -19,7 +19,8 @@
|
|||||||
package org.apache.hudi.avro;
|
package org.apache.hudi.avro;
|
||||||
|
|
||||||
import org.apache.avro.Schema;
|
import org.apache.avro.Schema;
|
||||||
import org.codehaus.jackson.JsonNode;
|
import org.apache.avro.generic.GenericData;
|
||||||
|
import org.apache.avro.generic.GenericRecord;
|
||||||
import org.junit.Assert;
|
import org.junit.Assert;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
|
||||||
@@ -30,11 +31,25 @@ import java.util.Map;
|
|||||||
*/
|
*/
|
||||||
public class TestHoodieAvroUtils {
|
public class TestHoodieAvroUtils {
|
||||||
|
|
||||||
|
private static String EVOLVED_SCHEMA = "{\"type\": \"record\",\"name\": \"testrec1\",\"fields\": [ "
|
||||||
|
+ "{\"name\": \"timestamp\",\"type\": \"double\"},{\"name\": \"_row_key\", \"type\": \"string\"},"
|
||||||
|
+ "{\"name\": \"non_pii_col\", \"type\": \"string\"},"
|
||||||
|
+ "{\"name\": \"pii_col\", \"type\": \"string\", \"column_category\": \"user_profile\"},"
|
||||||
|
+ "{\"name\": \"new_col1\", \"type\": \"string\", \"default\": \"dummy_val\"},"
|
||||||
|
+ "{\"name\": \"new_col2\", \"type\": [\"int\", \"null\"]}]}";
|
||||||
|
|
||||||
private static String EXAMPLE_SCHEMA = "{\"type\": \"record\",\"name\": \"testrec\",\"fields\": [ "
|
private static String EXAMPLE_SCHEMA = "{\"type\": \"record\",\"name\": \"testrec\",\"fields\": [ "
|
||||||
+ "{\"name\": \"timestamp\",\"type\": \"double\"},{\"name\": \"_row_key\", \"type\": \"string\"},"
|
+ "{\"name\": \"timestamp\",\"type\": \"double\"},{\"name\": \"_row_key\", \"type\": \"string\"},"
|
||||||
+ "{\"name\": \"non_pii_col\", \"type\": \"string\"},"
|
+ "{\"name\": \"non_pii_col\", \"type\": \"string\"},"
|
||||||
+ "{\"name\": \"pii_col\", \"type\": \"string\", \"column_category\": \"user_profile\"}]}";
|
+ "{\"name\": \"pii_col\", \"type\": \"string\", \"column_category\": \"user_profile\"}]}";
|
||||||
|
|
||||||
|
|
||||||
|
private static String SCHEMA_WITH_METADATA_FIELD = "{\"type\": \"record\",\"name\": \"testrec2\",\"fields\": [ "
|
||||||
|
+ "{\"name\": \"timestamp\",\"type\": \"double\"},{\"name\": \"_row_key\", \"type\": \"string\"},"
|
||||||
|
+ "{\"name\": \"non_pii_col\", \"type\": \"string\"},"
|
||||||
|
+ "{\"name\": \"pii_col\", \"type\": \"string\", \"column_category\": \"user_profile\"},"
|
||||||
|
+ "{\"name\": \"_hoodie_commit_time\", \"type\": [\"null\", \"string\"]}]}";
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testPropsPresent() {
|
public void testPropsPresent() {
|
||||||
Schema schema = HoodieAvroUtils.addMetadataFields(new Schema.Parser().parse(EXAMPLE_SCHEMA));
|
Schema schema = HoodieAvroUtils.addMetadataFields(new Schema.Parser().parse(EXAMPLE_SCHEMA));
|
||||||
@@ -45,7 +60,7 @@ public class TestHoodieAvroUtils {
|
|||||||
}
|
}
|
||||||
|
|
||||||
Assert.assertNotNull("field name is null", field.name());
|
Assert.assertNotNull("field name is null", field.name());
|
||||||
Map<String, JsonNode> props = field.getJsonProps();
|
Map<String, Object> props = field.getObjectProps();
|
||||||
Assert.assertNotNull("The property is null", props);
|
Assert.assertNotNull("The property is null", props);
|
||||||
|
|
||||||
if (field.name().equals("pii_col")) {
|
if (field.name().equals("pii_col")) {
|
||||||
@@ -57,4 +72,39 @@ public class TestHoodieAvroUtils {
|
|||||||
}
|
}
|
||||||
Assert.assertTrue("column pii_col doesn't show up", piiPresent);
|
Assert.assertTrue("column pii_col doesn't show up", piiPresent);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testDefaultValue() {
|
||||||
|
GenericRecord rec = new GenericData.Record(new Schema.Parser().parse(EVOLVED_SCHEMA));
|
||||||
|
rec.put("_row_key", "key1");
|
||||||
|
rec.put("non_pii_col", "val1");
|
||||||
|
rec.put("pii_col", "val2");
|
||||||
|
rec.put("timestamp", 3.5);
|
||||||
|
GenericRecord rec1 = HoodieAvroUtils.rewriteRecord(rec, new Schema.Parser().parse(EVOLVED_SCHEMA));
|
||||||
|
Assert.assertEquals(rec1.get("new_col1"), "dummy_val");
|
||||||
|
Assert.assertNull(rec1.get("new_col2"));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testDefaultValueWithSchemaEvolution() {
|
||||||
|
GenericRecord rec = new GenericData.Record(new Schema.Parser().parse(EXAMPLE_SCHEMA));
|
||||||
|
rec.put("_row_key", "key1");
|
||||||
|
rec.put("non_pii_col", "val1");
|
||||||
|
rec.put("pii_col", "val2");
|
||||||
|
rec.put("timestamp", 3.5);
|
||||||
|
GenericRecord rec1 = HoodieAvroUtils.rewriteRecord(rec, new Schema.Parser().parse(EVOLVED_SCHEMA));
|
||||||
|
Assert.assertEquals(rec1.get("new_col1"), "dummy_val");
|
||||||
|
Assert.assertNull(rec1.get("new_col2"));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testMetadataField() {
|
||||||
|
GenericRecord rec = new GenericData.Record(new Schema.Parser().parse(EXAMPLE_SCHEMA));
|
||||||
|
rec.put("_row_key", "key1");
|
||||||
|
rec.put("non_pii_col", "val1");
|
||||||
|
rec.put("pii_col", "val2");
|
||||||
|
rec.put("timestamp", 3.5);
|
||||||
|
GenericRecord rec1 = HoodieAvroUtils.rewriteRecord(rec, new Schema.Parser().parse(SCHEMA_WITH_METADATA_FIELD));
|
||||||
|
Assert.assertNull(rec1.get("_hoodie_commit_time"));
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -212,7 +212,7 @@ public abstract class AbstractRealtimeRecordReader {
|
|||||||
throw new HoodieException("Field " + fn + " not found in log schema. Query cannot proceed! "
|
throw new HoodieException("Field " + fn + " not found in log schema. Query cannot proceed! "
|
||||||
+ "Derived Schema Fields: " + new ArrayList<>(schemaFieldsMap.keySet()));
|
+ "Derived Schema Fields: " + new ArrayList<>(schemaFieldsMap.keySet()));
|
||||||
} else {
|
} else {
|
||||||
projectedFields.add(new Schema.Field(field.name(), field.schema(), field.doc(), field.defaultValue()));
|
projectedFields.add(new Schema.Field(field.name(), field.schema(), field.doc(), field.defaultVal()));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -367,7 +367,7 @@ public abstract class AbstractRealtimeRecordReader {
|
|||||||
Field field = schemaFieldsMap.get(columnName.toLowerCase());
|
Field field = schemaFieldsMap.get(columnName.toLowerCase());
|
||||||
|
|
||||||
if (field != null) {
|
if (field != null) {
|
||||||
hiveSchemaFields.add(new Schema.Field(field.name(), field.schema(), field.doc(), field.defaultValue()));
|
hiveSchemaFields.add(new Schema.Field(field.name(), field.schema(), field.doc(), field.defaultVal()));
|
||||||
} else {
|
} else {
|
||||||
// Hive has some extra virtual columns like BLOCK__OFFSET__INSIDE__FILE which do not exist in table schema.
|
// Hive has some extra virtual columns like BLOCK__OFFSET__INSIDE__FILE which do not exist in table schema.
|
||||||
// They will get skipped as they won't be found in the original schema.
|
// They will get skipped as they won't be found in the original schema.
|
||||||
|
|||||||
Reference in New Issue
Block a user