1
0

[HUDI-2592] Fix write empty array when write.precombine.field is decimal type (#3837)

This commit is contained in:
Matrix42
2021-10-22 19:42:13 +08:00
committed by GitHub
parent 84ca981cd0
commit 499af7c039
2 changed files with 39 additions and 12 deletions

View File

@@ -18,8 +18,6 @@
package org.apache.hudi.avro;
import org.apache.avro.specific.SpecificRecordBase;
import org.apache.hudi.common.model.HoodieOperation;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.util.Option;
@@ -49,12 +47,14 @@ import org.apache.avro.io.DecoderFactory;
import org.apache.avro.io.EncoderFactory;
import org.apache.avro.io.JsonDecoder;
import org.apache.avro.io.JsonEncoder;
import org.apache.avro.specific.SpecificRecordBase;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.math.BigDecimal;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.time.LocalDate;
@@ -546,8 +546,11 @@ public class HoodieAvroUtils {
return decimalConversion.fromFixed((GenericFixed) fieldValue, fieldSchema,
LogicalTypes.decimal(dc.getPrecision(), dc.getScale()));
} else if (fieldSchema.getType() == Schema.Type.BYTES) {
return decimalConversion.fromBytes((ByteBuffer) fieldValue, fieldSchema,
LogicalTypes.decimal(dc.getPrecision(), dc.getScale()));
ByteBuffer byteBuffer = (ByteBuffer) fieldValue;
BigDecimal convertedValue = decimalConversion.fromBytes(byteBuffer, fieldSchema,
LogicalTypes.decimal(dc.getPrecision(), dc.getScale()));
byteBuffer.rewind();
return convertedValue;
}
}
return fieldValue;

View File

@@ -18,15 +18,17 @@
package org.apache.hudi.avro;
import org.apache.avro.JsonProperties;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.exception.SchemaCompatibilityException;
import org.apache.avro.JsonProperties;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
import org.junit.jupiter.api.Test;
import java.math.BigDecimal;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
@@ -81,6 +83,11 @@ public class TestHoodieAvroUtils {
+ "{\"name\": \"nullable_field\",\"type\": [\"null\" ,\"string\"],\"default\": null},"
+ "{\"name\": \"non_nullable_field_with_default\",\"type\": \"string\", \"default\": \"dummy\"}]}";
private static String SCHEMA_WITH_DECIMAL_FIELD = "{\"type\":\"record\",\"name\":\"record\",\"fields\":["
+ "{\"name\":\"key_col\",\"type\":[\"null\",\"int\"],\"default\":null},"
+ "{\"name\":\"decimal_col\",\"type\":[\"null\","
+ "{\"type\":\"bytes\",\"logicalType\":\"decimal\",\"precision\":8,\"scale\":4}],\"default\":null}]}";
@Test
public void testPropsPresent() {
Schema schema = HoodieAvroUtils.addMetadataFields(new Schema.Parser().parse(EXAMPLE_SCHEMA));
@@ -113,10 +120,10 @@ public class TestHoodieAvroUtils {
rec.put("timestamp", 3.5);
Schema schemaWithMetadata = HoodieAvroUtils.addMetadataFields(new Schema.Parser().parse(EVOLVED_SCHEMA));
GenericRecord rec1 = HoodieAvroUtils.rewriteRecord(rec, schemaWithMetadata);
assertEquals(rec1.get("new_col_not_nullable_default_dummy_val"), "dummy_val");
assertEquals("dummy_val", rec1.get("new_col_not_nullable_default_dummy_val"));
assertNull(rec1.get("new_col_nullable_wo_default"));
assertNull(rec1.get("new_col_nullable_default_null"));
assertEquals(rec1.get("new_col_nullable_default_dummy_val"), "dummy_val");
assertEquals("dummy_val", rec1.get("new_col_nullable_default_dummy_val"));
assertNull(rec1.get(HoodieRecord.RECORD_KEY_METADATA_FIELD));
}
@@ -128,7 +135,7 @@ public class TestHoodieAvroUtils {
rec.put("pii_col", "val2");
rec.put("timestamp", 3.5);
GenericRecord rec1 = HoodieAvroUtils.rewriteRecord(rec, new Schema.Parser().parse(EVOLVED_SCHEMA));
assertEquals(rec1.get("new_col_not_nullable_default_dummy_val"), "dummy_val");
assertEquals("dummy_val", rec1.get("new_col_not_nullable_default_dummy_val"));
assertNull(rec1.get("new_col_nullable_wo_default"));
}
@@ -163,7 +170,7 @@ public class TestHoodieAvroUtils {
rec.put("pii_col", "val2");
rec.put("timestamp", 3.5);
GenericRecord rec1 = HoodieAvroUtils.rewriteRecord(rec, new Schema.Parser().parse(SCHEMA_WITH_NON_NULLABLE_FIELD_WITH_DEFAULT));
assertEquals(rec1.get("non_nullable_field_with_default"), "dummy");
assertEquals("dummy", rec1.get("non_nullable_field_with_default"));
}
@Test
@@ -206,9 +213,9 @@ public class TestHoodieAvroUtils {
@Test
public void testAddingAndRemovingMetadataFields() {
Schema schemaWithMetaCols = HoodieAvroUtils.addMetadataFields(new Schema.Parser().parse(EXAMPLE_SCHEMA));
assertEquals(schemaWithMetaCols.getFields().size(), NUM_FIELDS_IN_EXAMPLE_SCHEMA + HoodieRecord.HOODIE_META_COLUMNS.size());
assertEquals(NUM_FIELDS_IN_EXAMPLE_SCHEMA + HoodieRecord.HOODIE_META_COLUMNS.size(), schemaWithMetaCols.getFields().size());
Schema schemaWithoutMetaCols = HoodieAvroUtils.removeMetadataFields(schemaWithMetaCols);
assertEquals(schemaWithoutMetaCols.getFields().size(), NUM_FIELDS_IN_EXAMPLE_SCHEMA);
assertEquals(NUM_FIELDS_IN_EXAMPLE_SCHEMA, schemaWithoutMetaCols.getFields().size());
}
@Test
@@ -219,7 +226,7 @@ public class TestHoodieAvroUtils {
rec.put("pii_col", "val2");
Object rowKey = HoodieAvroUtils.getNestedFieldVal(rec, "_row_key", true);
assertEquals(rowKey, "key1");
assertEquals("key1", rowKey);
Object rowKeyNotExist = HoodieAvroUtils.getNestedFieldVal(rec, "fake_key", true);
assertNull(rowKeyNotExist);
@@ -240,4 +247,21 @@ public class TestHoodieAvroUtils {
}
}
@Test
public void testGetNestedFieldValWithDecimalFiled() {
GenericRecord rec = new GenericData.Record(new Schema.Parser().parse(SCHEMA_WITH_DECIMAL_FIELD));
rec.put("key_col", "key");
BigDecimal bigDecimal = new BigDecimal("1234.5678");
ByteBuffer byteBuffer = ByteBuffer.wrap(bigDecimal.unscaledValue().toByteArray());
rec.put("decimal_col", byteBuffer);
Object decimalCol = HoodieAvroUtils.getNestedFieldVal(rec, "decimal_col", true);
assertEquals(bigDecimal, decimalCol);
Object obj = rec.get(1);
assertTrue(obj instanceof ByteBuffer);
ByteBuffer buffer = (ByteBuffer) obj;
assertEquals(0, buffer.position());
}
}