[HUDI-793] Adding proper default to hudi metadata fields and proper handling to rewrite routine (#1513)
* Adding proper default to hudi metadata fields and proper handling to rewrite routine * Handle fields declared with a null default Co-authored-by: Alex Filipchik <alex.filipchik@csscompany.com>
This commit is contained in:
committed by
GitHub
parent
0d4848b68b
commit
83796b3189
@@ -18,6 +18,7 @@
|
|||||||
|
|
||||||
package org.apache.hudi.avro;
|
package org.apache.hudi.avro;
|
||||||
|
|
||||||
|
import org.apache.avro.JsonProperties.Null;
|
||||||
import org.apache.hudi.common.model.HoodieRecord;
|
import org.apache.hudi.common.model.HoodieRecord;
|
||||||
import org.apache.hudi.exception.HoodieIOException;
|
import org.apache.hudi.exception.HoodieIOException;
|
||||||
import org.apache.hudi.exception.SchemaCompatabilityException;
|
import org.apache.hudi.exception.SchemaCompatabilityException;
|
||||||
@@ -141,15 +142,15 @@ public class HoodieAvroUtils {
|
|||||||
List<Schema.Field> parentFields = new ArrayList<>();
|
List<Schema.Field> parentFields = new ArrayList<>();
|
||||||
|
|
||||||
Schema.Field commitTimeField =
|
Schema.Field commitTimeField =
|
||||||
new Schema.Field(HoodieRecord.COMMIT_TIME_METADATA_FIELD, METADATA_FIELD_SCHEMA, "", (Object) null);
|
new Schema.Field(HoodieRecord.COMMIT_TIME_METADATA_FIELD, METADATA_FIELD_SCHEMA, "", NullNode.getInstance());
|
||||||
Schema.Field commitSeqnoField =
|
Schema.Field commitSeqnoField =
|
||||||
new Schema.Field(HoodieRecord.COMMIT_SEQNO_METADATA_FIELD, METADATA_FIELD_SCHEMA, "", (Object) null);
|
new Schema.Field(HoodieRecord.COMMIT_SEQNO_METADATA_FIELD, METADATA_FIELD_SCHEMA, "", NullNode.getInstance());
|
||||||
Schema.Field recordKeyField =
|
Schema.Field recordKeyField =
|
||||||
new Schema.Field(HoodieRecord.RECORD_KEY_METADATA_FIELD, METADATA_FIELD_SCHEMA, "", (Object) null);
|
new Schema.Field(HoodieRecord.RECORD_KEY_METADATA_FIELD, METADATA_FIELD_SCHEMA, "", NullNode.getInstance());
|
||||||
Schema.Field partitionPathField =
|
Schema.Field partitionPathField =
|
||||||
new Schema.Field(HoodieRecord.PARTITION_PATH_METADATA_FIELD, METADATA_FIELD_SCHEMA, "", (Object) null);
|
new Schema.Field(HoodieRecord.PARTITION_PATH_METADATA_FIELD, METADATA_FIELD_SCHEMA, "", NullNode.getInstance());
|
||||||
Schema.Field fileNameField =
|
Schema.Field fileNameField =
|
||||||
new Schema.Field(HoodieRecord.FILENAME_METADATA_FIELD, METADATA_FIELD_SCHEMA, "", (Object) null);
|
new Schema.Field(HoodieRecord.FILENAME_METADATA_FIELD, METADATA_FIELD_SCHEMA, "", NullNode.getInstance());
|
||||||
|
|
||||||
parentFields.add(commitTimeField);
|
parentFields.add(commitTimeField);
|
||||||
parentFields.add(commitSeqnoField);
|
parentFields.add(commitSeqnoField);
|
||||||
@@ -253,7 +254,11 @@ public class HoodieAvroUtils {
|
|||||||
GenericRecord newRecord = new GenericData.Record(newSchema);
|
GenericRecord newRecord = new GenericData.Record(newSchema);
|
||||||
for (Schema.Field f : fieldsToWrite) {
|
for (Schema.Field f : fieldsToWrite) {
|
||||||
if (record.get(f.name()) == null) {
|
if (record.get(f.name()) == null) {
|
||||||
newRecord.put(f.name(), f.defaultVal());
|
if (f.defaultVal() instanceof Null) {
|
||||||
|
newRecord.put(f.name(), null);
|
||||||
|
} else {
|
||||||
|
newRecord.put(f.name(), f.defaultVal());
|
||||||
|
}
|
||||||
} else {
|
} else {
|
||||||
newRecord.put(f.name(), record.get(f.name()));
|
newRecord.put(f.name(), record.get(f.name()));
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -47,12 +47,13 @@ public class TestHoodieAvroUtils {
|
|||||||
+ "{\"name\": \"non_pii_col\", \"type\": \"string\"},"
|
+ "{\"name\": \"non_pii_col\", \"type\": \"string\"},"
|
||||||
+ "{\"name\": \"pii_col\", \"type\": \"string\", \"column_category\": \"user_profile\"}]}";
|
+ "{\"name\": \"pii_col\", \"type\": \"string\", \"column_category\": \"user_profile\"}]}";
|
||||||
|
|
||||||
|
private static String SCHEMA_WITH_METADATA_FIELD =
|
||||||
private static String SCHEMA_WITH_METADATA_FIELD = "{\"type\": \"record\",\"name\": \"testrec2\",\"fields\": [ "
|
"{\"type\": \"record\",\"name\": \"testrec2\",\"fields\": [ "
|
||||||
+ "{\"name\": \"timestamp\",\"type\": \"double\"},{\"name\": \"_row_key\", \"type\": \"string\"},"
|
+ "{\"name\": \"timestamp\",\"type\": \"double\"},{\"name\": \"_row_key\", \"type\": \"string\"},"
|
||||||
+ "{\"name\": \"non_pii_col\", \"type\": \"string\"},"
|
+ "{\"name\": \"non_pii_col\", \"type\": \"string\"},"
|
||||||
+ "{\"name\": \"pii_col\", \"type\": \"string\", \"column_category\": \"user_profile\"},"
|
+ "{\"name\": \"pii_col\", \"type\": \"string\", \"column_category\": \"user_profile\"},"
|
||||||
+ "{\"name\": \"_hoodie_commit_time\", \"type\": [\"null\", \"string\"]}]}";
|
+ "{\"name\": \"_hoodie_commit_time\", \"type\": [\"null\", \"string\"]},"
|
||||||
|
+ "{\"name\": \"nullable_field\",\"type\": [\"null\" ,\"string\"],\"default\": null}]}";
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testPropsPresent() {
|
public void testPropsPresent() {
|
||||||
|
|||||||
Reference in New Issue
Block a user