[HUDI-803] Replaced used of NullNode with JsonProperties.NULL_VALUE in HoodieAvroUtils (#1538)
- added more test cases in TestHoodieAvroUtils.class Co-authored-by: Vinoth Chandar <vinoth@apache.org>
This commit is contained in:
@@ -18,7 +18,7 @@
|
|||||||
|
|
||||||
package org.apache.hudi.avro;
|
package org.apache.hudi.avro;
|
||||||
|
|
||||||
import org.apache.avro.JsonProperties.Null;
|
import org.apache.avro.JsonProperties;
|
||||||
import org.apache.hudi.common.model.HoodieRecord;
|
import org.apache.hudi.common.model.HoodieRecord;
|
||||||
import org.apache.hudi.exception.HoodieIOException;
|
import org.apache.hudi.exception.HoodieIOException;
|
||||||
import org.apache.hudi.exception.SchemaCompatabilityException;
|
import org.apache.hudi.exception.SchemaCompatabilityException;
|
||||||
@@ -64,7 +64,7 @@ public class HoodieAvroUtils {
|
|||||||
private static ThreadLocal<BinaryDecoder> reuseDecoder = ThreadLocal.withInitial(() -> null);
|
private static ThreadLocal<BinaryDecoder> reuseDecoder = ThreadLocal.withInitial(() -> null);
|
||||||
|
|
||||||
// All metadata fields are optional strings.
|
// All metadata fields are optional strings.
|
||||||
private static final Schema METADATA_FIELD_SCHEMA =
|
static final Schema METADATA_FIELD_SCHEMA =
|
||||||
Schema.createUnion(Arrays.asList(Schema.create(Schema.Type.NULL), Schema.create(Schema.Type.STRING)));
|
Schema.createUnion(Arrays.asList(Schema.create(Schema.Type.NULL), Schema.create(Schema.Type.STRING)));
|
||||||
|
|
||||||
private static final Schema RECORD_KEY_SCHEMA = initRecordKeySchema();
|
private static final Schema RECORD_KEY_SCHEMA = initRecordKeySchema();
|
||||||
@@ -96,7 +96,6 @@ public class HoodieAvroUtils {
|
|||||||
writer.write(record, jsonEncoder);
|
writer.write(record, jsonEncoder);
|
||||||
jsonEncoder.flush();
|
jsonEncoder.flush();
|
||||||
return out.toByteArray();
|
return out.toByteArray();
|
||||||
//metadata.toJsonString().getBytes(StandardCharsets.UTF_8));
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@@ -142,15 +141,15 @@ public class HoodieAvroUtils {
|
|||||||
List<Schema.Field> parentFields = new ArrayList<>();
|
List<Schema.Field> parentFields = new ArrayList<>();
|
||||||
|
|
||||||
Schema.Field commitTimeField =
|
Schema.Field commitTimeField =
|
||||||
new Schema.Field(HoodieRecord.COMMIT_TIME_METADATA_FIELD, METADATA_FIELD_SCHEMA, "", NullNode.getInstance());
|
new Schema.Field(HoodieRecord.COMMIT_TIME_METADATA_FIELD, METADATA_FIELD_SCHEMA, "", JsonProperties.NULL_VALUE);
|
||||||
Schema.Field commitSeqnoField =
|
Schema.Field commitSeqnoField =
|
||||||
new Schema.Field(HoodieRecord.COMMIT_SEQNO_METADATA_FIELD, METADATA_FIELD_SCHEMA, "", NullNode.getInstance());
|
new Schema.Field(HoodieRecord.COMMIT_SEQNO_METADATA_FIELD, METADATA_FIELD_SCHEMA, "", JsonProperties.NULL_VALUE);
|
||||||
Schema.Field recordKeyField =
|
Schema.Field recordKeyField =
|
||||||
new Schema.Field(HoodieRecord.RECORD_KEY_METADATA_FIELD, METADATA_FIELD_SCHEMA, "", NullNode.getInstance());
|
new Schema.Field(HoodieRecord.RECORD_KEY_METADATA_FIELD, METADATA_FIELD_SCHEMA, "", JsonProperties.NULL_VALUE);
|
||||||
Schema.Field partitionPathField =
|
Schema.Field partitionPathField =
|
||||||
new Schema.Field(HoodieRecord.PARTITION_PATH_METADATA_FIELD, METADATA_FIELD_SCHEMA, "", NullNode.getInstance());
|
new Schema.Field(HoodieRecord.PARTITION_PATH_METADATA_FIELD, METADATA_FIELD_SCHEMA, "", JsonProperties.NULL_VALUE);
|
||||||
Schema.Field fileNameField =
|
Schema.Field fileNameField =
|
||||||
new Schema.Field(HoodieRecord.FILENAME_METADATA_FIELD, METADATA_FIELD_SCHEMA, "", NullNode.getInstance());
|
new Schema.Field(HoodieRecord.FILENAME_METADATA_FIELD, METADATA_FIELD_SCHEMA, "", JsonProperties.NULL_VALUE);
|
||||||
|
|
||||||
parentFields.add(commitTimeField);
|
parentFields.add(commitTimeField);
|
||||||
parentFields.add(commitSeqnoField);
|
parentFields.add(commitSeqnoField);
|
||||||
@@ -272,7 +271,7 @@ public class HoodieAvroUtils {
|
|||||||
GenericRecord newRecord = new GenericData.Record(newSchema);
|
GenericRecord newRecord = new GenericData.Record(newSchema);
|
||||||
for (Schema.Field f : fieldsToWrite) {
|
for (Schema.Field f : fieldsToWrite) {
|
||||||
if (record.get(f.name()) == null) {
|
if (record.get(f.name()) == null) {
|
||||||
if (f.defaultVal() instanceof Null) {
|
if (f.defaultVal() instanceof JsonProperties.Null) {
|
||||||
newRecord.put(f.name(), null);
|
newRecord.put(f.name(), null);
|
||||||
} else {
|
} else {
|
||||||
newRecord.put(f.name(), f.defaultVal());
|
newRecord.put(f.name(), f.defaultVal());
|
||||||
|
|||||||
@@ -18,16 +18,24 @@
|
|||||||
|
|
||||||
package org.apache.hudi.avro;
|
package org.apache.hudi.avro;
|
||||||
|
|
||||||
|
import org.apache.avro.JsonProperties;
|
||||||
|
import org.apache.hudi.common.model.HoodieRecord;
|
||||||
|
import org.apache.hudi.exception.SchemaCompatabilityException;
|
||||||
|
|
||||||
import org.apache.avro.Schema;
|
import org.apache.avro.Schema;
|
||||||
import org.apache.avro.generic.GenericData;
|
import org.apache.avro.generic.GenericData;
|
||||||
import org.apache.avro.generic.GenericRecord;
|
import org.apache.avro.generic.GenericRecord;
|
||||||
|
import org.codehaus.jackson.node.NullNode;
|
||||||
import org.junit.jupiter.api.Test;
|
import org.junit.jupiter.api.Test;
|
||||||
|
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
|
||||||
import static org.junit.jupiter.api.Assertions.assertEquals;
|
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||||
import static org.junit.jupiter.api.Assertions.assertNotNull;
|
import static org.junit.jupiter.api.Assertions.assertNotNull;
|
||||||
import static org.junit.jupiter.api.Assertions.assertNull;
|
import static org.junit.jupiter.api.Assertions.assertNull;
|
||||||
|
import static org.junit.jupiter.api.Assertions.assertThrows;
|
||||||
import static org.junit.jupiter.api.Assertions.assertTrue;
|
import static org.junit.jupiter.api.Assertions.assertTrue;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@@ -47,13 +55,28 @@ public class TestHoodieAvroUtils {
|
|||||||
+ "{\"name\": \"non_pii_col\", \"type\": \"string\"},"
|
+ "{\"name\": \"non_pii_col\", \"type\": \"string\"},"
|
||||||
+ "{\"name\": \"pii_col\", \"type\": \"string\", \"column_category\": \"user_profile\"}]}";
|
+ "{\"name\": \"pii_col\", \"type\": \"string\", \"column_category\": \"user_profile\"}]}";
|
||||||
|
|
||||||
private static String SCHEMA_WITH_METADATA_FIELD =
|
private static String SCHEMA_WITH_METADATA_FIELD = "{\"type\": \"record\",\"name\": \"testrec2\",\"fields\": [ "
|
||||||
"{\"type\": \"record\",\"name\": \"testrec2\",\"fields\": [ "
|
|
||||||
+ "{\"name\": \"timestamp\",\"type\": \"double\"},{\"name\": \"_row_key\", \"type\": \"string\"},"
|
+ "{\"name\": \"timestamp\",\"type\": \"double\"},{\"name\": \"_row_key\", \"type\": \"string\"},"
|
||||||
+ "{\"name\": \"non_pii_col\", \"type\": \"string\"},"
|
+ "{\"name\": \"non_pii_col\", \"type\": \"string\"},"
|
||||||
+ "{\"name\": \"pii_col\", \"type\": \"string\", \"column_category\": \"user_profile\"},"
|
+ "{\"name\": \"pii_col\", \"type\": \"string\", \"column_category\": \"user_profile\"},"
|
||||||
+ "{\"name\": \"_hoodie_commit_time\", \"type\": [\"null\", \"string\"]},"
|
+ "{\"name\": \"_hoodie_commit_time\", \"type\": [\"null\", \"string\"]},"
|
||||||
+ "{\"name\": \"nullable_field\",\"type\": [\"null\" ,\"string\"],\"default\": null}]}";
|
+ "{\"name\": \"nullable_field\",\"type\": [\"null\" ,\"string\"],\"default\": null},"
|
||||||
|
+ "{\"name\": \"nullable_field_wo_default\",\"type\": [\"null\" ,\"string\"]}]}";
|
||||||
|
|
||||||
|
private static String SCHEMA_WITH_NON_NULLABLE_FIELD = "{\"type\": \"record\",\"name\": \"testrec3\",\"fields\": [ "
|
||||||
|
+ "{\"name\": \"timestamp\",\"type\": \"double\"},{\"name\": \"_row_key\", \"type\": \"string\"},"
|
||||||
|
+ "{\"name\": \"non_pii_col\", \"type\": \"string\"},"
|
||||||
|
+ "{\"name\": \"pii_col\", \"type\": \"string\", \"column_category\": \"user_profile\"},"
|
||||||
|
+ "{\"name\": \"nullable_field\",\"type\": [\"null\" ,\"string\"],\"default\": null},"
|
||||||
|
+ "{\"name\": \"non_nullable_field_wo_default\",\"type\": \"string\"},"
|
||||||
|
+ "{\"name\": \"non_nullable_field_with_default\",\"type\": \"string\", \"default\": \"dummy\"}]}";
|
||||||
|
|
||||||
|
private static String SCHEMA_WITH_NON_NULLABLE_FIELD_WITH_DEFAULT = "{\"type\": \"record\",\"name\": \"testrec4\",\"fields\": [ "
|
||||||
|
+ "{\"name\": \"timestamp\",\"type\": \"double\"},{\"name\": \"_row_key\", \"type\": \"string\"},"
|
||||||
|
+ "{\"name\": \"non_pii_col\", \"type\": \"string\"},"
|
||||||
|
+ "{\"name\": \"pii_col\", \"type\": \"string\", \"column_category\": \"user_profile\"},"
|
||||||
|
+ "{\"name\": \"nullable_field\",\"type\": [\"null\" ,\"string\"],\"default\": null},"
|
||||||
|
+ "{\"name\": \"non_nullable_field_with_default\",\"type\": \"string\", \"default\": \"dummy\"}]}";
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testPropsPresent() {
|
public void testPropsPresent() {
|
||||||
@@ -85,9 +108,11 @@ public class TestHoodieAvroUtils {
|
|||||||
rec.put("non_pii_col", "val1");
|
rec.put("non_pii_col", "val1");
|
||||||
rec.put("pii_col", "val2");
|
rec.put("pii_col", "val2");
|
||||||
rec.put("timestamp", 3.5);
|
rec.put("timestamp", 3.5);
|
||||||
GenericRecord rec1 = HoodieAvroUtils.rewriteRecord(rec, new Schema.Parser().parse(EVOLVED_SCHEMA));
|
Schema schemaWithMetadata = HoodieAvroUtils.addMetadataFields(new Schema.Parser().parse(EVOLVED_SCHEMA));
|
||||||
|
GenericRecord rec1 = HoodieAvroUtils.rewriteRecord(rec, schemaWithMetadata);
|
||||||
assertEquals(rec1.get("new_col1"), "dummy_val");
|
assertEquals(rec1.get("new_col1"), "dummy_val");
|
||||||
assertNull(rec1.get("new_col2"));
|
assertNull(rec1.get("new_col2"));
|
||||||
|
assertNull(rec1.get(HoodieRecord.RECORD_KEY_METADATA_FIELD));
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
@@ -111,5 +136,65 @@ public class TestHoodieAvroUtils {
|
|||||||
rec.put("timestamp", 3.5);
|
rec.put("timestamp", 3.5);
|
||||||
GenericRecord rec1 = HoodieAvroUtils.rewriteRecord(rec, new Schema.Parser().parse(SCHEMA_WITH_METADATA_FIELD));
|
GenericRecord rec1 = HoodieAvroUtils.rewriteRecord(rec, new Schema.Parser().parse(SCHEMA_WITH_METADATA_FIELD));
|
||||||
assertNull(rec1.get("_hoodie_commit_time"));
|
assertNull(rec1.get("_hoodie_commit_time"));
|
||||||
|
assertNull(rec1.get("nullable_field"));
|
||||||
|
assertNull(rec1.get("nullable_field_wo_default"));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testNonNullableFieldWithoutDefault() {
|
||||||
|
GenericRecord rec = new GenericData.Record(new Schema.Parser().parse(EXAMPLE_SCHEMA));
|
||||||
|
rec.put("_row_key", "key1");
|
||||||
|
rec.put("non_pii_col", "val1");
|
||||||
|
rec.put("pii_col", "val2");
|
||||||
|
rec.put("timestamp", 3.5);
|
||||||
|
assertThrows(SchemaCompatabilityException.class, () -> HoodieAvroUtils.rewriteRecord(rec, new Schema.Parser().parse(SCHEMA_WITH_NON_NULLABLE_FIELD)));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testNonNullableFieldWithDefault() {
|
||||||
|
GenericRecord rec = new GenericData.Record(new Schema.Parser().parse(EXAMPLE_SCHEMA));
|
||||||
|
rec.put("_row_key", "key1");
|
||||||
|
rec.put("non_pii_col", "val1");
|
||||||
|
rec.put("pii_col", "val2");
|
||||||
|
rec.put("timestamp", 3.5);
|
||||||
|
GenericRecord rec1 = HoodieAvroUtils.rewriteRecord(rec, new Schema.Parser().parse(SCHEMA_WITH_NON_NULLABLE_FIELD_WITH_DEFAULT));
|
||||||
|
assertEquals(rec1.get("non_nullable_field_with_default"), "dummy");
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testJsonNodeNullWithDefaultValues() {
|
||||||
|
List<Schema.Field> fields = new ArrayList<>();
|
||||||
|
Schema initialSchema = Schema.createRecord("test_record", "test record", "org.test.namespace", false);
|
||||||
|
Schema.Field field1 = new Schema.Field("key", HoodieAvroUtils.METADATA_FIELD_SCHEMA, "", JsonProperties.NULL_VALUE);
|
||||||
|
Schema.Field field2 = new Schema.Field("key1", HoodieAvroUtils.METADATA_FIELD_SCHEMA, "", JsonProperties.NULL_VALUE);
|
||||||
|
Schema.Field field3 = new Schema.Field("key2", HoodieAvroUtils.METADATA_FIELD_SCHEMA, "", JsonProperties.NULL_VALUE);
|
||||||
|
fields.add(field1);
|
||||||
|
fields.add(field2);
|
||||||
|
fields.add(field3);
|
||||||
|
initialSchema.setFields(fields);
|
||||||
|
GenericRecord rec = new GenericData.Record(initialSchema);
|
||||||
|
rec.put("key", "val");
|
||||||
|
rec.put("key1", "val1");
|
||||||
|
rec.put("key2", "val2");
|
||||||
|
|
||||||
|
List<Schema.Field> evolvedFields = new ArrayList<>();
|
||||||
|
Schema evolvedSchema = Schema.createRecord("evolved_record", "evolved record", "org.evolved.namespace", false);
|
||||||
|
Schema.Field evolvedField1 = new Schema.Field("key", HoodieAvroUtils.METADATA_FIELD_SCHEMA, "", JsonProperties.NULL_VALUE);
|
||||||
|
Schema.Field evolvedField2 = new Schema.Field("key1", HoodieAvroUtils.METADATA_FIELD_SCHEMA, "", JsonProperties.NULL_VALUE);
|
||||||
|
Schema.Field evolvedField3 = new Schema.Field("key2", HoodieAvroUtils.METADATA_FIELD_SCHEMA, "", JsonProperties.NULL_VALUE);
|
||||||
|
Schema.Field evolvedField4 = new Schema.Field("evolved_field", HoodieAvroUtils.METADATA_FIELD_SCHEMA, "", NullNode.getInstance());
|
||||||
|
Schema.Field evolvedField5 = new Schema.Field("evolved_field1", HoodieAvroUtils.METADATA_FIELD_SCHEMA, "", JsonProperties.NULL_VALUE);
|
||||||
|
evolvedFields.add(evolvedField1);
|
||||||
|
evolvedFields.add(evolvedField2);
|
||||||
|
evolvedFields.add(evolvedField3);
|
||||||
|
evolvedFields.add(evolvedField4);
|
||||||
|
evolvedFields.add(evolvedField5);
|
||||||
|
evolvedSchema.setFields(evolvedFields);
|
||||||
|
|
||||||
|
GenericRecord rec1 = HoodieAvroUtils.rewriteRecord(rec, evolvedSchema);
|
||||||
|
//evolvedField4.defaultVal() returns a JsonProperties.Null instance.
|
||||||
|
assertNull(rec1.get("evolved_field"));
|
||||||
|
//evolvedField5.defaultVal() returns null.
|
||||||
|
assertNull(rec1.get("evolved_field1"));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user