[HUDI-607] Fix to allow creation/syncing of Hive tables partitioned by Date type columns (#1330)
This commit is contained in:
@@ -18,6 +18,8 @@
|
||||
|
||||
package org.apache.hudi;
|
||||
|
||||
import org.apache.avro.LogicalTypes;
|
||||
import org.apache.avro.Schema;
|
||||
import org.apache.hudi.client.HoodieReadClient;
|
||||
import org.apache.hudi.client.HoodieWriteClient;
|
||||
import org.apache.hudi.client.WriteStatus;
|
||||
@@ -45,6 +47,7 @@ import org.apache.spark.api.java.JavaRDD;
|
||||
import org.apache.spark.api.java.JavaSparkContext;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.time.LocalDate;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
@@ -80,7 +83,8 @@ public class DataSourceUtils {
|
||||
|
||||
// return, if last part of name
|
||||
if (i == parts.length - 1) {
|
||||
return val;
|
||||
Schema fieldSchema = valueNode.getSchema().getField(part).schema();
|
||||
return convertValueForSpecificDataTypes(fieldSchema, val);
|
||||
} else {
|
||||
// VC: Need a test here
|
||||
if (!(val instanceof GenericRecord)) {
|
||||
@@ -99,6 +103,40 @@ public class DataSourceUtils {
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* This method converts values for fields with certain Avro/Parquet data types that require special handling.
|
||||
*
|
||||
* Logical Date Type is converted to actual Date value instead of Epoch Integer which is how it is
|
||||
* represented/stored in parquet.
|
||||
*
|
||||
* @param fieldSchema avro field schema
|
||||
* @param fieldValue avro field value
|
||||
* @return field value either converted (for certain data types) or as it is.
|
||||
*/
|
||||
private static Object convertValueForSpecificDataTypes(Schema fieldSchema, Object fieldValue) {
|
||||
if (fieldSchema == null) {
|
||||
return fieldValue;
|
||||
}
|
||||
|
||||
if (isLogicalTypeDate(fieldSchema)) {
|
||||
return LocalDate.ofEpochDay(Long.parseLong(fieldValue.toString()));
|
||||
}
|
||||
return fieldValue;
|
||||
}
|
||||
|
||||
/**
|
||||
* Given an Avro field schema checks whether the field is of Logical Date Type or not.
|
||||
*
|
||||
* @param fieldSchema avro field schema
|
||||
* @return boolean indicating whether fieldSchema is of Avro's Date Logical Type
|
||||
*/
|
||||
private static boolean isLogicalTypeDate(Schema fieldSchema) {
|
||||
if (fieldSchema.getType() == Schema.Type.UNION) {
|
||||
return fieldSchema.getTypes().stream().anyMatch(schema -> schema.getLogicalType() == LogicalTypes.date());
|
||||
}
|
||||
return fieldSchema.getLogicalType() == LogicalTypes.date();
|
||||
}
|
||||
|
||||
/**
|
||||
* Create a key generator class via reflection, passing in any configs needed.
|
||||
* <p>
|
||||
|
||||
Reference in New Issue
Block a user