[HUDI-3808] Flink bulk_insert timestamp(3) can not be read by Spark (#5236)
This commit is contained in:
@@ -29,6 +29,10 @@
|
|||||||
<name>hudi-flink-client</name>
|
<name>hudi-flink-client</name>
|
||||||
<packaging>jar</packaging>
|
<packaging>jar</packaging>
|
||||||
|
|
||||||
|
<properties>
|
||||||
|
<parquet.version>${flink.format.parquet.version}</parquet.version>
|
||||||
|
</properties>
|
||||||
|
|
||||||
<dependencies>
|
<dependencies>
|
||||||
<!-- Hudi -->
|
<!-- Hudi -->
|
||||||
<dependency>
|
<dependency>
|
||||||
@@ -87,6 +91,13 @@
|
|||||||
<dependency>
|
<dependency>
|
||||||
<groupId>org.apache.parquet</groupId>
|
<groupId>org.apache.parquet</groupId>
|
||||||
<artifactId>parquet-avro</artifactId>
|
<artifactId>parquet-avro</artifactId>
|
||||||
|
<version>${parquet.version}</version>
|
||||||
|
</dependency>
|
||||||
|
|
||||||
|
<dependency>
|
||||||
|
<groupId>org.apache.parquet</groupId>
|
||||||
|
<artifactId>parquet-column</artifactId>
|
||||||
|
<version>${parquet.version}</version>
|
||||||
</dependency>
|
</dependency>
|
||||||
|
|
||||||
<!-- Hoodie - Test -->
|
<!-- Hoodie - Test -->
|
||||||
|
|||||||
@@ -34,6 +34,7 @@ import org.apache.flink.table.types.logical.RowType;
|
|||||||
|
|
||||||
import org.apache.flink.table.types.logical.TimestampType;
|
import org.apache.flink.table.types.logical.TimestampType;
|
||||||
import org.apache.parquet.schema.GroupType;
|
import org.apache.parquet.schema.GroupType;
|
||||||
|
import org.apache.parquet.schema.LogicalTypeAnnotation;
|
||||||
import org.apache.parquet.schema.MessageType;
|
import org.apache.parquet.schema.MessageType;
|
||||||
import org.apache.parquet.schema.OriginalType;
|
import org.apache.parquet.schema.OriginalType;
|
||||||
import org.apache.parquet.schema.PrimitiveType;
|
import org.apache.parquet.schema.PrimitiveType;
|
||||||
@@ -46,6 +47,8 @@ import java.lang.reflect.Array;
|
|||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
|
||||||
|
import static org.apache.parquet.schema.LogicalTypeAnnotation.TimeUnit;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Schema converter converts Parquet schema to and from Flink internal types.
|
* Schema converter converts Parquet schema to and from Flink internal types.
|
||||||
*
|
*
|
||||||
@@ -436,7 +439,7 @@ public class ParquetSchemaConverter {
|
|||||||
String.format(
|
String.format(
|
||||||
"Can not convert Flink MapTypeInfo %s to Parquet"
|
"Can not convert Flink MapTypeInfo %s to Parquet"
|
||||||
+ " Map type as key has to be String",
|
+ " Map type as key has to be String",
|
||||||
typeInfo.toString()));
|
typeInfo));
|
||||||
}
|
}
|
||||||
} else if (typeInfo instanceof ObjectArrayTypeInfo) {
|
} else if (typeInfo instanceof ObjectArrayTypeInfo) {
|
||||||
ObjectArrayTypeInfo objectArrayTypeInfo = (ObjectArrayTypeInfo) typeInfo;
|
ObjectArrayTypeInfo objectArrayTypeInfo = (ObjectArrayTypeInfo) typeInfo;
|
||||||
@@ -567,18 +570,16 @@ public class ParquetSchemaConverter {
|
|||||||
int numBytes = computeMinBytesForDecimalPrecision(precision);
|
int numBytes = computeMinBytesForDecimalPrecision(precision);
|
||||||
return Types.primitive(
|
return Types.primitive(
|
||||||
PrimitiveType.PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY, repetition)
|
PrimitiveType.PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY, repetition)
|
||||||
.precision(precision)
|
.as(LogicalTypeAnnotation.decimalType(scale, precision))
|
||||||
.scale(scale)
|
|
||||||
.length(numBytes)
|
.length(numBytes)
|
||||||
.as(OriginalType.DECIMAL)
|
|
||||||
.named(name);
|
.named(name);
|
||||||
case TINYINT:
|
case TINYINT:
|
||||||
return Types.primitive(PrimitiveType.PrimitiveTypeName.INT32, repetition)
|
return Types.primitive(PrimitiveType.PrimitiveTypeName.INT32, repetition)
|
||||||
.as(OriginalType.INT_8)
|
.as(LogicalTypeAnnotation.intType(8, true))
|
||||||
.named(name);
|
.named(name);
|
||||||
case SMALLINT:
|
case SMALLINT:
|
||||||
return Types.primitive(PrimitiveType.PrimitiveTypeName.INT32, repetition)
|
return Types.primitive(PrimitiveType.PrimitiveTypeName.INT32, repetition)
|
||||||
.as(OriginalType.INT_16)
|
.as(LogicalTypeAnnotation.intType(16, true))
|
||||||
.named(name);
|
.named(name);
|
||||||
case INTEGER:
|
case INTEGER:
|
||||||
return Types.primitive(PrimitiveType.PrimitiveTypeName.INT32, repetition)
|
return Types.primitive(PrimitiveType.PrimitiveTypeName.INT32, repetition)
|
||||||
@@ -594,16 +595,17 @@ public class ParquetSchemaConverter {
|
|||||||
.named(name);
|
.named(name);
|
||||||
case DATE:
|
case DATE:
|
||||||
return Types.primitive(PrimitiveType.PrimitiveTypeName.INT32, repetition)
|
return Types.primitive(PrimitiveType.PrimitiveTypeName.INT32, repetition)
|
||||||
.as(OriginalType.DATE)
|
.as(LogicalTypeAnnotation.dateType())
|
||||||
.named(name);
|
.named(name);
|
||||||
case TIME_WITHOUT_TIME_ZONE:
|
case TIME_WITHOUT_TIME_ZONE:
|
||||||
return Types.primitive(PrimitiveType.PrimitiveTypeName.INT32, repetition)
|
return Types.primitive(PrimitiveType.PrimitiveTypeName.INT32, repetition)
|
||||||
.as(OriginalType.TIME_MILLIS)
|
.as(LogicalTypeAnnotation.timeType(true, TimeUnit.MILLIS))
|
||||||
.named(name);
|
.named(name);
|
||||||
case TIMESTAMP_WITHOUT_TIME_ZONE:
|
case TIMESTAMP_WITHOUT_TIME_ZONE:
|
||||||
TimestampType timestampType = (TimestampType) type;
|
TimestampType timestampType = (TimestampType) type;
|
||||||
if (timestampType.getPrecision() == 3) {
|
if (timestampType.getPrecision() == 3) {
|
||||||
return Types.primitive(PrimitiveType.PrimitiveTypeName.INT64, repetition)
|
return Types.primitive(PrimitiveType.PrimitiveTypeName.INT64, repetition)
|
||||||
|
.as(LogicalTypeAnnotation.timestampType(true, TimeUnit.MILLIS))
|
||||||
.named(name);
|
.named(name);
|
||||||
} else {
|
} else {
|
||||||
return Types.primitive(PrimitiveType.PrimitiveTypeName.INT96, repetition)
|
return Types.primitive(PrimitiveType.PrimitiveTypeName.INT96, repetition)
|
||||||
@@ -613,6 +615,7 @@ public class ParquetSchemaConverter {
|
|||||||
LocalZonedTimestampType localZonedTimestampType = (LocalZonedTimestampType) type;
|
LocalZonedTimestampType localZonedTimestampType = (LocalZonedTimestampType) type;
|
||||||
if (localZonedTimestampType.getPrecision() == 3) {
|
if (localZonedTimestampType.getPrecision() == 3) {
|
||||||
return Types.primitive(PrimitiveType.PrimitiveTypeName.INT64, repetition)
|
return Types.primitive(PrimitiveType.PrimitiveTypeName.INT64, repetition)
|
||||||
|
.as(LogicalTypeAnnotation.timestampType(false, TimeUnit.MILLIS))
|
||||||
.named(name);
|
.named(name);
|
||||||
} else {
|
} else {
|
||||||
return Types.primitive(PrimitiveType.PrimitiveTypeName.INT96, repetition)
|
return Types.primitive(PrimitiveType.PrimitiveTypeName.INT96, repetition)
|
||||||
|
|||||||
@@ -51,24 +51,41 @@ public class TestParquetSchemaConverter {
|
|||||||
final String expected = "message converted {\n"
|
final String expected = "message converted {\n"
|
||||||
+ " optional group f_array (LIST) {\n"
|
+ " optional group f_array (LIST) {\n"
|
||||||
+ " repeated group list {\n"
|
+ " repeated group list {\n"
|
||||||
+ " optional binary element (UTF8);\n"
|
+ " optional binary element (STRING);\n"
|
||||||
+ " }\n"
|
+ " }\n"
|
||||||
+ " }\n"
|
+ " }\n"
|
||||||
+ " optional group f_map (MAP) {\n"
|
+ " optional group f_map (MAP) {\n"
|
||||||
+ " repeated group key_value {\n"
|
+ " repeated group key_value {\n"
|
||||||
+ " optional int32 key;\n"
|
+ " optional int32 key;\n"
|
||||||
+ " optional binary value (UTF8);\n"
|
+ " optional binary value (STRING);\n"
|
||||||
+ " }\n"
|
+ " }\n"
|
||||||
+ " }\n"
|
+ " }\n"
|
||||||
+ " optional group f_row {\n"
|
+ " optional group f_row {\n"
|
||||||
+ " optional int32 f_row_f0;\n"
|
+ " optional int32 f_row_f0;\n"
|
||||||
+ " optional binary f_row_f1 (UTF8);\n"
|
+ " optional binary f_row_f1 (STRING);\n"
|
||||||
+ " optional group f_row_f2 {\n"
|
+ " optional group f_row_f2 {\n"
|
||||||
+ " optional int32 f_row_f2_f0;\n"
|
+ " optional int32 f_row_f2_f0;\n"
|
||||||
+ " optional binary f_row_f2_f1 (UTF8);\n"
|
+ " optional binary f_row_f2_f1 (STRING);\n"
|
||||||
+ " }\n"
|
+ " }\n"
|
||||||
+ " }\n"
|
+ " }\n"
|
||||||
+ "}\n";
|
+ "}\n";
|
||||||
assertThat(messageType.toString(), is(expected));
|
assertThat(messageType.toString(), is(expected));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
void testConvertTimestampTypes() {
|
||||||
|
DataType dataType = DataTypes.ROW(
|
||||||
|
DataTypes.FIELD("ts_3", DataTypes.TIMESTAMP(3)),
|
||||||
|
DataTypes.FIELD("ts_6", DataTypes.TIMESTAMP(6)),
|
||||||
|
DataTypes.FIELD("ts_9", DataTypes.TIMESTAMP(9)));
|
||||||
|
org.apache.parquet.schema.MessageType messageType =
|
||||||
|
ParquetSchemaConverter.convertToParquetMessageType("converted", (RowType) dataType.getLogicalType());
|
||||||
|
assertThat(messageType.getColumns().size(), is(3));
|
||||||
|
final String expected = "message converted {\n"
|
||||||
|
+ " optional int64 ts_3 (TIMESTAMP(MILLIS,true));\n"
|
||||||
|
+ " optional int96 ts_6;\n"
|
||||||
|
+ " optional int96 ts_9;\n"
|
||||||
|
+ "}\n";
|
||||||
|
assertThat(messageType.toString(), is(expected));
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -32,7 +32,7 @@
|
|||||||
|
|
||||||
<properties>
|
<properties>
|
||||||
<main.basedir>${project.parent.parent.basedir}</main.basedir>
|
<main.basedir>${project.parent.parent.basedir}</main.basedir>
|
||||||
<parquet.version>1.11.1</parquet.version>
|
<parquet.version>${flink.format.parquet.version}</parquet.version>
|
||||||
</properties>
|
</properties>
|
||||||
|
|
||||||
<build>
|
<build>
|
||||||
|
|||||||
@@ -671,7 +671,7 @@ public class FlinkOptions extends HoodieConfig {
|
|||||||
public static final ConfigOption<Boolean> HIVE_SYNC_SUPPORT_TIMESTAMP = ConfigOptions
|
public static final ConfigOption<Boolean> HIVE_SYNC_SUPPORT_TIMESTAMP = ConfigOptions
|
||||||
.key("hive_sync.support_timestamp")
|
.key("hive_sync.support_timestamp")
|
||||||
.booleanType()
|
.booleanType()
|
||||||
.defaultValue(false)
|
.defaultValue(true)
|
||||||
.withDescription("INT64 with original type TIMESTAMP_MICROS is converted to hive timestamp type.\n"
|
.withDescription("INT64 with original type TIMESTAMP_MICROS is converted to hive timestamp type.\n"
|
||||||
+ "Disabled by default for backward compatibility.");
|
+ "Disabled by default for backward compatibility.");
|
||||||
|
|
||||||
|
|||||||
@@ -34,8 +34,8 @@
|
|||||||
<flink.bundle.hive.scope>provided</flink.bundle.hive.scope>
|
<flink.bundle.hive.scope>provided</flink.bundle.hive.scope>
|
||||||
<flink.bundle.shade.prefix>org.apache.hudi.</flink.bundle.shade.prefix>
|
<flink.bundle.shade.prefix>org.apache.hudi.</flink.bundle.shade.prefix>
|
||||||
<javax.servlet.version>3.1.0</javax.servlet.version>
|
<javax.servlet.version>3.1.0</javax.servlet.version>
|
||||||
<!-- override to be same with flink 1.12.2 -->
|
<!-- override to be same with flink 1.15.x -->
|
||||||
<parquet.version>1.11.1</parquet.version>
|
<parquet.version>${flink.format.parquet.version}</parquet.version>
|
||||||
<hive.version>2.3.1</hive.version>
|
<hive.version>2.3.1</hive.version>
|
||||||
<thrift.version>0.9.3</thrift.version>
|
<thrift.version>0.9.3</thrift.version>
|
||||||
</properties>
|
</properties>
|
||||||
|
|||||||
1
pom.xml
1
pom.xml
@@ -129,6 +129,7 @@
|
|||||||
<flink.runtime.artifactId>flink-runtime</flink.runtime.artifactId>
|
<flink.runtime.artifactId>flink-runtime</flink.runtime.artifactId>
|
||||||
<flink.table.runtime.artifactId>flink-table-runtime_${scala.binary.version}</flink.table.runtime.artifactId>
|
<flink.table.runtime.artifactId>flink-table-runtime_${scala.binary.version}</flink.table.runtime.artifactId>
|
||||||
<flink.table.planner.artifactId>flink-table-planner_${scala.binary.version}</flink.table.planner.artifactId>
|
<flink.table.planner.artifactId>flink-table-planner_${scala.binary.version}</flink.table.planner.artifactId>
|
||||||
|
<flink.format.parquet.version>1.12.2</flink.format.parquet.version>
|
||||||
<spark31.version>3.1.3</spark31.version>
|
<spark31.version>3.1.3</spark31.version>
|
||||||
<spark32.version>3.2.1</spark32.version>
|
<spark32.version>3.2.1</spark32.version>
|
||||||
<hudi.spark.module>hudi-spark2</hudi.spark.module>
|
<hudi.spark.module>hudi-spark2</hudi.spark.module>
|
||||||
|
|||||||
Reference in New Issue
Block a user