From 499af7c039da12e397dca0819d77723bce92c145 Mon Sep 17 00:00:00 2001 From: Matrix42 Date: Fri, 22 Oct 2021 19:42:13 +0800 Subject: [PATCH] [HUDI-2592] Fix write empty array when write.precombine.field is decimal type (#3837) --- .../org/apache/hudi/avro/HoodieAvroUtils.java | 11 +++-- .../apache/hudi/avro/TestHoodieAvroUtils.java | 40 +++++++++++++++---- 2 files changed, 39 insertions(+), 12 deletions(-) diff --git a/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java b/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java index 96b1a1e83..91c214713 100644 --- a/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java +++ b/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java @@ -18,8 +18,6 @@ package org.apache.hudi.avro; -import org.apache.avro.specific.SpecificRecordBase; - import org.apache.hudi.common.model.HoodieOperation; import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.util.Option; @@ -49,12 +47,14 @@ import org.apache.avro.io.DecoderFactory; import org.apache.avro.io.EncoderFactory; import org.apache.avro.io.JsonDecoder; import org.apache.avro.io.JsonEncoder; +import org.apache.avro.specific.SpecificRecordBase; import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; +import java.math.BigDecimal; import java.nio.ByteBuffer; import java.nio.charset.StandardCharsets; import java.time.LocalDate; @@ -546,8 +546,11 @@ public class HoodieAvroUtils { return decimalConversion.fromFixed((GenericFixed) fieldValue, fieldSchema, LogicalTypes.decimal(dc.getPrecision(), dc.getScale())); } else if (fieldSchema.getType() == Schema.Type.BYTES) { - return decimalConversion.fromBytes((ByteBuffer) fieldValue, fieldSchema, - LogicalTypes.decimal(dc.getPrecision(), dc.getScale())); + ByteBuffer byteBuffer = (ByteBuffer) fieldValue; + BigDecimal convertedValue = decimalConversion.fromBytes(byteBuffer, fieldSchema, + LogicalTypes.decimal(dc.getPrecision(), dc.getScale())); + byteBuffer.rewind(); + return convertedValue; } } return fieldValue; diff --git a/hudi-common/src/test/java/org/apache/hudi/avro/TestHoodieAvroUtils.java b/hudi-common/src/test/java/org/apache/hudi/avro/TestHoodieAvroUtils.java index 6f5fe9215..b4304a4d5 100644 --- a/hudi-common/src/test/java/org/apache/hudi/avro/TestHoodieAvroUtils.java +++ b/hudi-common/src/test/java/org/apache/hudi/avro/TestHoodieAvroUtils.java @@ -18,15 +18,17 @@ package org.apache.hudi.avro; -import org.apache.avro.JsonProperties; import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.exception.SchemaCompatibilityException; +import org.apache.avro.JsonProperties; import org.apache.avro.Schema; import org.apache.avro.generic.GenericData; import org.apache.avro.generic.GenericRecord; import org.junit.jupiter.api.Test; +import java.math.BigDecimal; +import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.List; import java.util.Map; @@ -81,6 +83,11 @@ public class TestHoodieAvroUtils { + "{\"name\": \"nullable_field\",\"type\": [\"null\" ,\"string\"],\"default\": null}," + "{\"name\": \"non_nullable_field_with_default\",\"type\": \"string\", \"default\": \"dummy\"}]}"; + private static String SCHEMA_WITH_DECIMAL_FIELD = "{\"type\":\"record\",\"name\":\"record\",\"fields\":[" + + "{\"name\":\"key_col\",\"type\":[\"null\",\"int\"],\"default\":null}," + + "{\"name\":\"decimal_col\",\"type\":[\"null\"," + + "{\"type\":\"bytes\",\"logicalType\":\"decimal\",\"precision\":8,\"scale\":4}],\"default\":null}]}"; + @Test public void testPropsPresent() { Schema schema = HoodieAvroUtils.addMetadataFields(new Schema.Parser().parse(EXAMPLE_SCHEMA)); @@ -113,10 +120,10 @@ public class TestHoodieAvroUtils { rec.put("timestamp", 3.5); Schema schemaWithMetadata = HoodieAvroUtils.addMetadataFields(new Schema.Parser().parse(EVOLVED_SCHEMA)); GenericRecord rec1 = HoodieAvroUtils.rewriteRecord(rec, schemaWithMetadata); - assertEquals(rec1.get("new_col_not_nullable_default_dummy_val"), "dummy_val"); + assertEquals("dummy_val", rec1.get("new_col_not_nullable_default_dummy_val")); assertNull(rec1.get("new_col_nullable_wo_default")); assertNull(rec1.get("new_col_nullable_default_null")); - assertEquals(rec1.get("new_col_nullable_default_dummy_val"), "dummy_val"); + assertEquals("dummy_val", rec1.get("new_col_nullable_default_dummy_val")); assertNull(rec1.get(HoodieRecord.RECORD_KEY_METADATA_FIELD)); } @@ -128,7 +135,7 @@ public class TestHoodieAvroUtils { rec.put("pii_col", "val2"); rec.put("timestamp", 3.5); GenericRecord rec1 = HoodieAvroUtils.rewriteRecord(rec, new Schema.Parser().parse(EVOLVED_SCHEMA)); - assertEquals(rec1.get("new_col_not_nullable_default_dummy_val"), "dummy_val"); + assertEquals("dummy_val", rec1.get("new_col_not_nullable_default_dummy_val")); assertNull(rec1.get("new_col_nullable_wo_default")); } @@ -163,7 +170,7 @@ public class TestHoodieAvroUtils { rec.put("pii_col", "val2"); rec.put("timestamp", 3.5); GenericRecord rec1 = HoodieAvroUtils.rewriteRecord(rec, new Schema.Parser().parse(SCHEMA_WITH_NON_NULLABLE_FIELD_WITH_DEFAULT)); - assertEquals(rec1.get("non_nullable_field_with_default"), "dummy"); + assertEquals("dummy", rec1.get("non_nullable_field_with_default")); } @Test @@ -206,9 +213,9 @@ public class TestHoodieAvroUtils { @Test public void testAddingAndRemovingMetadataFields() { Schema schemaWithMetaCols = HoodieAvroUtils.addMetadataFields(new Schema.Parser().parse(EXAMPLE_SCHEMA)); - assertEquals(schemaWithMetaCols.getFields().size(), NUM_FIELDS_IN_EXAMPLE_SCHEMA + HoodieRecord.HOODIE_META_COLUMNS.size()); + assertEquals(NUM_FIELDS_IN_EXAMPLE_SCHEMA + HoodieRecord.HOODIE_META_COLUMNS.size(), schemaWithMetaCols.getFields().size()); Schema schemaWithoutMetaCols = HoodieAvroUtils.removeMetadataFields(schemaWithMetaCols); - assertEquals(schemaWithoutMetaCols.getFields().size(), NUM_FIELDS_IN_EXAMPLE_SCHEMA); + assertEquals(NUM_FIELDS_IN_EXAMPLE_SCHEMA, schemaWithoutMetaCols.getFields().size()); } @Test @@ -219,7 +226,7 @@ public class TestHoodieAvroUtils { rec.put("pii_col", "val2"); Object rowKey = HoodieAvroUtils.getNestedFieldVal(rec, "_row_key", true); - assertEquals(rowKey, "key1"); + assertEquals("key1", rowKey); Object rowKeyNotExist = HoodieAvroUtils.getNestedFieldVal(rec, "fake_key", true); assertNull(rowKeyNotExist); @@ -240,4 +247,21 @@ public class TestHoodieAvroUtils { } } + @Test + public void testGetNestedFieldValWithDecimalFiled() { + GenericRecord rec = new GenericData.Record(new Schema.Parser().parse(SCHEMA_WITH_DECIMAL_FIELD)); + rec.put("key_col", "key"); + BigDecimal bigDecimal = new BigDecimal("1234.5678"); + ByteBuffer byteBuffer = ByteBuffer.wrap(bigDecimal.unscaledValue().toByteArray()); + rec.put("decimal_col", byteBuffer); + + Object decimalCol = HoodieAvroUtils.getNestedFieldVal(rec, "decimal_col", true); + assertEquals(bigDecimal, decimalCol); + + Object obj = rec.get(1); + assertTrue(obj instanceof ByteBuffer); + ByteBuffer buffer = (ByteBuffer) obj; + assertEquals(0, buffer.position()); + } + }