[HUDI-1181] Fix decimal type display issue for record key field (#1953)
* [HUDI-1181] Fix decimal type display issue for record key field * Remove getNestedFieldVal method from DataSourceUtils * resolve comments Co-authored-by: Wenning Ding <wenningd@amazon.com>
This commit is contained in:
@@ -18,10 +18,6 @@
|
|||||||
|
|
||||||
package org.apache.hudi.avro;
|
package org.apache.hudi.avro;
|
||||||
|
|
||||||
import org.apache.avro.JsonProperties;
|
|
||||||
import java.time.LocalDate;
|
|
||||||
import org.apache.avro.LogicalTypes;
|
|
||||||
import org.apache.avro.generic.GenericData.Record;
|
|
||||||
import org.apache.hudi.common.model.HoodieRecord;
|
import org.apache.hudi.common.model.HoodieRecord;
|
||||||
import org.apache.hudi.common.util.StringUtils;
|
import org.apache.hudi.common.util.StringUtils;
|
||||||
import org.apache.hudi.common.util.collection.Pair;
|
import org.apache.hudi.common.util.collection.Pair;
|
||||||
@@ -29,11 +25,17 @@ import org.apache.hudi.exception.HoodieException;
|
|||||||
import org.apache.hudi.exception.HoodieIOException;
|
import org.apache.hudi.exception.HoodieIOException;
|
||||||
import org.apache.hudi.exception.SchemaCompatabilityException;
|
import org.apache.hudi.exception.SchemaCompatabilityException;
|
||||||
|
|
||||||
|
import org.apache.avro.Conversions.DecimalConversion;
|
||||||
|
import org.apache.avro.JsonProperties;
|
||||||
|
import org.apache.avro.LogicalTypes;
|
||||||
|
import org.apache.avro.LogicalTypes.Decimal;
|
||||||
import org.apache.avro.Schema;
|
import org.apache.avro.Schema;
|
||||||
import org.apache.avro.Schema.Field;
|
import org.apache.avro.Schema.Field;
|
||||||
import org.apache.avro.generic.GenericData;
|
import org.apache.avro.generic.GenericData;
|
||||||
|
import org.apache.avro.generic.GenericData.Record;
|
||||||
import org.apache.avro.generic.GenericDatumReader;
|
import org.apache.avro.generic.GenericDatumReader;
|
||||||
import org.apache.avro.generic.GenericDatumWriter;
|
import org.apache.avro.generic.GenericDatumWriter;
|
||||||
|
import org.apache.avro.generic.GenericFixed;
|
||||||
import org.apache.avro.generic.GenericRecord;
|
import org.apache.avro.generic.GenericRecord;
|
||||||
import org.apache.avro.generic.IndexedRecord;
|
import org.apache.avro.generic.IndexedRecord;
|
||||||
import org.apache.avro.io.BinaryDecoder;
|
import org.apache.avro.io.BinaryDecoder;
|
||||||
@@ -50,7 +52,9 @@ import java.io.ByteArrayOutputStream;
|
|||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.io.InputStream;
|
import java.io.InputStream;
|
||||||
import java.io.OutputStream;
|
import java.io.OutputStream;
|
||||||
|
import java.nio.ByteBuffer;
|
||||||
import java.nio.charset.StandardCharsets;
|
import java.nio.charset.StandardCharsets;
|
||||||
|
import java.time.LocalDate;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
@@ -433,9 +437,6 @@ public class HoodieAvroUtils {
|
|||||||
/**
|
/**
|
||||||
* This method converts values for fields with certain Avro/Parquet data types that require special handling.
|
* This method converts values for fields with certain Avro/Parquet data types that require special handling.
|
||||||
*
|
*
|
||||||
* Logical Date Type is converted to actual Date value instead of Epoch Integer which is how it is
|
|
||||||
* represented/stored in parquet.
|
|
||||||
*
|
|
||||||
* @param fieldSchema avro field schema
|
* @param fieldSchema avro field schema
|
||||||
* @param fieldValue avro field value
|
* @param fieldValue avro field value
|
||||||
* @return field value either converted (for certain data types) or as it is.
|
* @return field value either converted (for certain data types) or as it is.
|
||||||
@@ -445,23 +446,44 @@ public class HoodieAvroUtils {
|
|||||||
return fieldValue;
|
return fieldValue;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (isLogicalTypeDate(fieldSchema)) {
|
if (fieldSchema.getType() == Schema.Type.UNION) {
|
||||||
return LocalDate.ofEpochDay(Long.parseLong(fieldValue.toString()));
|
for (Schema schema : fieldSchema.getTypes()) {
|
||||||
|
if (schema.getType() != Schema.Type.NULL) {
|
||||||
|
return convertValueForAvroLogicalTypes(schema, fieldValue);
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
return fieldValue;
|
return convertValueForAvroLogicalTypes(fieldSchema, fieldValue);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Given an Avro field schema checks whether the field is of Logical Date Type or not.
|
* This method converts values for fields with certain Avro Logical data types that require special handling.
|
||||||
|
*
|
||||||
|
* Logical Date Type is converted to actual Date value instead of Epoch Integer which is how it is
|
||||||
|
* represented/stored in parquet.
|
||||||
|
*
|
||||||
|
* Decimal Data Type is converted to actual decimal value instead of bytes/fixed which is how it is
|
||||||
|
* represented/stored in parquet.
|
||||||
*
|
*
|
||||||
* @param fieldSchema avro field schema
|
* @param fieldSchema avro field schema
|
||||||
* @return boolean indicating whether fieldSchema is of Avro's Date Logical Type
|
* @param fieldValue avro field value
|
||||||
|
* @return field value either converted (for certain data types) or as it is.
|
||||||
*/
|
*/
|
||||||
private static boolean isLogicalTypeDate(Schema fieldSchema) {
|
private static Object convertValueForAvroLogicalTypes(Schema fieldSchema, Object fieldValue) {
|
||||||
if (fieldSchema.getType() == Schema.Type.UNION) {
|
if (fieldSchema.getLogicalType() == LogicalTypes.date()) {
|
||||||
return fieldSchema.getTypes().stream().anyMatch(schema -> schema.getLogicalType() == LogicalTypes.date());
|
return LocalDate.ofEpochDay(Long.parseLong(fieldValue.toString()));
|
||||||
|
} else if (fieldSchema.getLogicalType() instanceof LogicalTypes.Decimal) {
|
||||||
|
Decimal dc = (Decimal) fieldSchema.getLogicalType();
|
||||||
|
DecimalConversion decimalConversion = new DecimalConversion();
|
||||||
|
if (fieldSchema.getType() == Schema.Type.FIXED) {
|
||||||
|
return decimalConversion.fromFixed((GenericFixed) fieldValue, fieldSchema,
|
||||||
|
LogicalTypes.decimal(dc.getPrecision(), dc.getScale()));
|
||||||
|
} else if (fieldSchema.getType() == Schema.Type.BYTES) {
|
||||||
|
return decimalConversion.fromBytes((ByteBuffer) fieldValue, fieldSchema,
|
||||||
|
LogicalTypes.decimal(dc.getPrecision(), dc.getScale()));
|
||||||
|
}
|
||||||
}
|
}
|
||||||
return fieldSchema.getLogicalType() == LogicalTypes.date();
|
return fieldValue;
|
||||||
}
|
}
|
||||||
|
|
||||||
public static Schema getNullSchema() {
|
public static Schema getNullSchema() {
|
||||||
|
|||||||
@@ -43,9 +43,6 @@ import org.apache.hudi.keygen.KeyGenerator;
|
|||||||
import org.apache.hudi.keygen.parser.AbstractHoodieDateTimeParser;
|
import org.apache.hudi.keygen.parser.AbstractHoodieDateTimeParser;
|
||||||
import org.apache.hudi.table.BulkInsertPartitioner;
|
import org.apache.hudi.table.BulkInsertPartitioner;
|
||||||
|
|
||||||
import org.apache.avro.LogicalTypes;
|
|
||||||
import org.apache.avro.Schema;
|
|
||||||
import org.apache.avro.Schema.Field;
|
|
||||||
import org.apache.avro.generic.GenericRecord;
|
import org.apache.avro.generic.GenericRecord;
|
||||||
import org.apache.hadoop.fs.FileSystem;
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
@@ -55,12 +52,10 @@ import org.apache.spark.api.java.JavaRDD;
|
|||||||
import org.apache.spark.api.java.JavaSparkContext;
|
import org.apache.spark.api.java.JavaSparkContext;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.time.LocalDate;
|
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.stream.Collectors;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Utilities used throughout the data source.
|
* Utilities used throughout the data source.
|
||||||
@@ -69,42 +64,6 @@ public class DataSourceUtils {
|
|||||||
|
|
||||||
private static final Logger LOG = LogManager.getLogger(DataSourceUtils.class);
|
private static final Logger LOG = LogManager.getLogger(DataSourceUtils.class);
|
||||||
|
|
||||||
/**
|
|
||||||
* Obtain value of the provided field, denoted by dot notation. e.g: a.b.c
|
|
||||||
*/
|
|
||||||
public static Object getNestedFieldVal(GenericRecord record, String fieldName, boolean returnNullIfNotFound) {
|
|
||||||
String[] parts = fieldName.split("\\.");
|
|
||||||
GenericRecord valueNode = record;
|
|
||||||
int i = 0;
|
|
||||||
for (; i < parts.length; i++) {
|
|
||||||
String part = parts[i];
|
|
||||||
Object val = valueNode.get(part);
|
|
||||||
if (val == null) {
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
|
|
||||||
// return, if last part of name
|
|
||||||
if (i == parts.length - 1) {
|
|
||||||
Schema fieldSchema = valueNode.getSchema().getField(part).schema();
|
|
||||||
return convertValueForSpecificDataTypes(fieldSchema, val);
|
|
||||||
} else {
|
|
||||||
// VC: Need a test here
|
|
||||||
if (!(val instanceof GenericRecord)) {
|
|
||||||
throw new HoodieException("Cannot find a record at part value :" + part);
|
|
||||||
}
|
|
||||||
valueNode = (GenericRecord) val;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if (returnNullIfNotFound) {
|
|
||||||
return null;
|
|
||||||
} else {
|
|
||||||
throw new HoodieException(
|
|
||||||
fieldName + "(Part -" + parts[i] + ") field not found in record. Acceptable fields were :"
|
|
||||||
+ valueNode.getSchema().getFields().stream().map(Field::name).collect(Collectors.toList()));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
public static String getTablePath(FileSystem fs, Path[] userProvidedPaths) throws IOException {
|
public static String getTablePath(FileSystem fs, Path[] userProvidedPaths) throws IOException {
|
||||||
LOG.info("Getting table path..");
|
LOG.info("Getting table path..");
|
||||||
for (Path path : userProvidedPaths) {
|
for (Path path : userProvidedPaths) {
|
||||||
@@ -121,39 +80,6 @@ public class DataSourceUtils {
|
|||||||
throw new TableNotFoundException("Unable to find a hudi table for the user provided paths.");
|
throw new TableNotFoundException("Unable to find a hudi table for the user provided paths.");
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* This method converts values for fields with certain Avro/Parquet data types that require special handling.
|
|
||||||
*
|
|
||||||
* Logical Date Type is converted to actual Date value instead of Epoch Integer which is how it is represented/stored in parquet.
|
|
||||||
*
|
|
||||||
* @param fieldSchema avro field schema
|
|
||||||
* @param fieldValue avro field value
|
|
||||||
* @return field value either converted (for certain data types) or as it is.
|
|
||||||
*/
|
|
||||||
private static Object convertValueForSpecificDataTypes(Schema fieldSchema, Object fieldValue) {
|
|
||||||
if (fieldSchema == null) {
|
|
||||||
return fieldValue;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (isLogicalTypeDate(fieldSchema)) {
|
|
||||||
return LocalDate.ofEpochDay(Long.parseLong(fieldValue.toString()));
|
|
||||||
}
|
|
||||||
return fieldValue;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Given an Avro field schema checks whether the field is of Logical Date Type or not.
|
|
||||||
*
|
|
||||||
* @param fieldSchema avro field schema
|
|
||||||
* @return boolean indicating whether fieldSchema is of Avro's Date Logical Type
|
|
||||||
*/
|
|
||||||
private static boolean isLogicalTypeDate(Schema fieldSchema) {
|
|
||||||
if (fieldSchema.getType() == Schema.Type.UNION) {
|
|
||||||
return fieldSchema.getTypes().stream().anyMatch(schema -> schema.getLogicalType() == LogicalTypes.date());
|
|
||||||
}
|
|
||||||
return fieldSchema.getLogicalType() == LogicalTypes.date();
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Create a key generator class via reflection, passing in any configs needed.
|
* Create a key generator class via reflection, passing in any configs needed.
|
||||||
* <p>
|
* <p>
|
||||||
|
|||||||
@@ -28,8 +28,11 @@ import org.apache.hudi.config.HoodieWriteConfig;
|
|||||||
import org.apache.hudi.exception.HoodieException;
|
import org.apache.hudi.exception.HoodieException;
|
||||||
import org.apache.hudi.table.BulkInsertPartitioner;
|
import org.apache.hudi.table.BulkInsertPartitioner;
|
||||||
|
|
||||||
|
import org.apache.avro.Conversions;
|
||||||
|
import org.apache.avro.LogicalTypes;
|
||||||
import org.apache.avro.Schema;
|
import org.apache.avro.Schema;
|
||||||
import org.apache.avro.generic.GenericData;
|
import org.apache.avro.generic.GenericData;
|
||||||
|
import org.apache.avro.generic.GenericFixed;
|
||||||
import org.apache.avro.generic.GenericRecord;
|
import org.apache.avro.generic.GenericRecord;
|
||||||
import org.apache.spark.api.java.JavaRDD;
|
import org.apache.spark.api.java.JavaRDD;
|
||||||
import org.junit.jupiter.api.BeforeEach;
|
import org.junit.jupiter.api.BeforeEach;
|
||||||
@@ -40,6 +43,7 @@ import org.mockito.Captor;
|
|||||||
import org.mockito.Mock;
|
import org.mockito.Mock;
|
||||||
import org.mockito.junit.jupiter.MockitoExtension;
|
import org.mockito.junit.jupiter.MockitoExtension;
|
||||||
|
|
||||||
|
import java.math.BigDecimal;
|
||||||
import java.time.LocalDate;
|
import java.time.LocalDate;
|
||||||
|
|
||||||
import static org.hamcrest.CoreMatchers.containsString;
|
import static org.hamcrest.CoreMatchers.containsString;
|
||||||
@@ -77,13 +81,20 @@ public class TestDataSourceUtils {
|
|||||||
public void testAvroRecordsFieldConversion() {
|
public void testAvroRecordsFieldConversion() {
|
||||||
// There are fields event_date1, event_date2, event_date3 with logical type as Date. event_date1 & event_date3 are
|
// There are fields event_date1, event_date2, event_date3 with logical type as Date. event_date1 & event_date3 are
|
||||||
// of UNION schema type, which is a union of null and date type in different orders. event_date2 is non-union
|
// of UNION schema type, which is a union of null and date type in different orders. event_date2 is non-union
|
||||||
// date type
|
// date type. event_cost1, event_cost2, event3 are decimal logical types with UNION schema, which is similar to
|
||||||
|
// the event_date.
|
||||||
String avroSchemaString = "{\"type\": \"record\"," + "\"name\": \"events\"," + "\"fields\": [ "
|
String avroSchemaString = "{\"type\": \"record\"," + "\"name\": \"events\"," + "\"fields\": [ "
|
||||||
+ "{\"name\": \"event_date1\", \"type\" : [{\"type\" : \"int\", \"logicalType\" : \"date\"}, \"null\"]},"
|
+ "{\"name\": \"event_date1\", \"type\" : [{\"type\" : \"int\", \"logicalType\" : \"date\"}, \"null\"]},"
|
||||||
+ "{\"name\": \"event_date2\", \"type\" : {\"type\": \"int\", \"logicalType\" : \"date\"}},"
|
+ "{\"name\": \"event_date2\", \"type\" : {\"type\": \"int\", \"logicalType\" : \"date\"}},"
|
||||||
+ "{\"name\": \"event_date3\", \"type\" : [\"null\", {\"type\" : \"int\", \"logicalType\" : \"date\"}]},"
|
+ "{\"name\": \"event_date3\", \"type\" : [\"null\", {\"type\" : \"int\", \"logicalType\" : \"date\"}]},"
|
||||||
+ "{\"name\": \"event_name\", \"type\": \"string\"},"
|
+ "{\"name\": \"event_name\", \"type\": \"string\"},"
|
||||||
+ "{\"name\": \"event_organizer\", \"type\": \"string\"}"
|
+ "{\"name\": \"event_organizer\", \"type\": \"string\"},"
|
||||||
|
+ "{\"name\": \"event_cost1\", \"type\": "
|
||||||
|
+ "[{\"type\": \"fixed\", \"name\": \"dc\", \"size\": 5, \"logicalType\": \"decimal\", \"precision\": 10, \"scale\": 6}, \"null\"]},"
|
||||||
|
+ "{\"name\": \"event_cost2\", \"type\": "
|
||||||
|
+ "{\"type\": \"fixed\", \"name\": \"ef\", \"size\": 5, \"logicalType\": \"decimal\", \"precision\": 10, \"scale\": 6}},"
|
||||||
|
+ "{\"name\": \"event_cost3\", \"type\": "
|
||||||
|
+ "[\"null\", {\"type\": \"fixed\", \"name\": \"fg\", \"size\": 5, \"logicalType\": \"decimal\", \"precision\": 10, \"scale\": 6}]}"
|
||||||
+ "]}";
|
+ "]}";
|
||||||
|
|
||||||
Schema avroSchema = new Schema.Parser().parse(avroSchemaString);
|
Schema avroSchema = new Schema.Parser().parse(avroSchemaString);
|
||||||
@@ -94,6 +105,14 @@ public class TestDataSourceUtils {
|
|||||||
record.put("event_name", "Hudi Meetup");
|
record.put("event_name", "Hudi Meetup");
|
||||||
record.put("event_organizer", "Hudi PMC");
|
record.put("event_organizer", "Hudi PMC");
|
||||||
|
|
||||||
|
BigDecimal bigDecimal = new BigDecimal("123.184331");
|
||||||
|
Schema decimalSchema = avroSchema.getField("event_cost1").schema().getTypes().get(0);
|
||||||
|
Conversions.DecimalConversion decimalConversions = new Conversions.DecimalConversion();
|
||||||
|
GenericFixed genericFixed = decimalConversions.toFixed(bigDecimal, decimalSchema, LogicalTypes.decimal(10, 6));
|
||||||
|
record.put("event_cost1", genericFixed);
|
||||||
|
record.put("event_cost2", genericFixed);
|
||||||
|
record.put("event_cost3", genericFixed);
|
||||||
|
|
||||||
assertEquals(LocalDate.ofEpochDay(18000).toString(), HoodieAvroUtils.getNestedFieldValAsString(record, "event_date1",
|
assertEquals(LocalDate.ofEpochDay(18000).toString(), HoodieAvroUtils.getNestedFieldValAsString(record, "event_date1",
|
||||||
true));
|
true));
|
||||||
assertEquals(LocalDate.ofEpochDay(18001).toString(), HoodieAvroUtils.getNestedFieldValAsString(record, "event_date2",
|
assertEquals(LocalDate.ofEpochDay(18001).toString(), HoodieAvroUtils.getNestedFieldValAsString(record, "event_date2",
|
||||||
@@ -102,6 +121,9 @@ public class TestDataSourceUtils {
|
|||||||
true));
|
true));
|
||||||
assertEquals("Hudi Meetup", HoodieAvroUtils.getNestedFieldValAsString(record, "event_name", true));
|
assertEquals("Hudi Meetup", HoodieAvroUtils.getNestedFieldValAsString(record, "event_name", true));
|
||||||
assertEquals("Hudi PMC", HoodieAvroUtils.getNestedFieldValAsString(record, "event_organizer", true));
|
assertEquals("Hudi PMC", HoodieAvroUtils.getNestedFieldValAsString(record, "event_organizer", true));
|
||||||
|
assertEquals(bigDecimal.toString(), HoodieAvroUtils.getNestedFieldValAsString(record, "event_cost1", true));
|
||||||
|
assertEquals(bigDecimal.toString(), HoodieAvroUtils.getNestedFieldValAsString(record, "event_cost2", true));
|
||||||
|
assertEquals(bigDecimal.toString(), HoodieAvroUtils.getNestedFieldValAsString(record, "event_cost3", true));
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
|||||||
@@ -20,6 +20,7 @@ package org.apache.hudi.utilities.deltastreamer;
|
|||||||
|
|
||||||
import org.apache.hudi.AvroConversionUtils;
|
import org.apache.hudi.AvroConversionUtils;
|
||||||
import org.apache.hudi.DataSourceUtils;
|
import org.apache.hudi.DataSourceUtils;
|
||||||
|
import org.apache.hudi.avro.HoodieAvroUtils;
|
||||||
import org.apache.hudi.client.HoodieWriteClient;
|
import org.apache.hudi.client.HoodieWriteClient;
|
||||||
import org.apache.hudi.client.WriteStatus;
|
import org.apache.hudi.client.WriteStatus;
|
||||||
import org.apache.hudi.common.config.TypedProperties;
|
import org.apache.hudi.common.config.TypedProperties;
|
||||||
@@ -351,7 +352,7 @@ public class DeltaSync implements Serializable {
|
|||||||
JavaRDD<GenericRecord> avroRDD = avroRDDOptional.get();
|
JavaRDD<GenericRecord> avroRDD = avroRDDOptional.get();
|
||||||
JavaRDD<HoodieRecord> records = avroRDD.map(gr -> {
|
JavaRDD<HoodieRecord> records = avroRDD.map(gr -> {
|
||||||
HoodieRecordPayload payload = DataSourceUtils.createPayload(cfg.payloadClassName, gr,
|
HoodieRecordPayload payload = DataSourceUtils.createPayload(cfg.payloadClassName, gr,
|
||||||
(Comparable) DataSourceUtils.getNestedFieldVal(gr, cfg.sourceOrderingField, false));
|
(Comparable) HoodieAvroUtils.getNestedFieldVal(gr, cfg.sourceOrderingField, false));
|
||||||
return new HoodieRecord<>(keyGenerator.getKey(gr), payload);
|
return new HoodieRecord<>(keyGenerator.getKey(gr), payload);
|
||||||
});
|
});
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user