1
0

[HUDI-2278] Use INT64 timestamp with precision 3 for flink parquet writer (#3414)

This commit is contained in:
Danny Chan
2021-08-06 11:06:21 +08:00
committed by GitHub
parent b7586a5632
commit 20feb1a897
4 changed files with 997 additions and 2 deletions

View File

@@ -58,6 +58,12 @@
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-runtime-blink_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-parquet_${scala.binary.version}</artifactId>

View File

@@ -18,8 +18,9 @@
package org.apache.hudi.io.storage.row;
import org.apache.flink.formats.parquet.row.ParquetRowDataWriter;
import org.apache.flink.formats.parquet.utils.ParquetSchemaConverter;
import org.apache.hudi.io.storage.row.parquet.ParquetRowDataWriter;
import org.apache.hudi.io.storage.row.parquet.ParquetSchemaConverter;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.logical.RowType;
import org.apache.hadoop.conf.Configuration;

View File

@@ -0,0 +1,357 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.io.storage.row.parquet;
import org.apache.flink.table.data.DecimalDataUtils;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.TimestampData;
import org.apache.flink.table.types.logical.DecimalType;
import org.apache.flink.table.types.logical.LocalZonedTimestampType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.table.types.logical.TimestampType;
import org.apache.flink.util.Preconditions;
import org.apache.parquet.io.api.Binary;
import org.apache.parquet.io.api.RecordConsumer;
import org.apache.parquet.schema.GroupType;
import org.apache.parquet.schema.Type;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.sql.Timestamp;
import java.util.Arrays;
import static org.apache.flink.formats.parquet.utils.ParquetSchemaConverter.computeMinBytesForDecimalPrecision;
import static org.apache.flink.formats.parquet.vector.reader.TimestampColumnReader.JULIAN_EPOCH_OFFSET_DAYS;
import static org.apache.flink.formats.parquet.vector.reader.TimestampColumnReader.MILLIS_IN_DAY;
import static org.apache.flink.formats.parquet.vector.reader.TimestampColumnReader.NANOS_PER_MILLISECOND;
import static org.apache.flink.formats.parquet.vector.reader.TimestampColumnReader.NANOS_PER_SECOND;
/**
* Writes a record to the Parquet API with the expected schema in order to be written to a file.
*
* <p>Reference org.apache.flink.formats.parquet.row.ParquetRowDataWriter to support timestamp of INT64 8 bytes.
*/
public class ParquetRowDataWriter {
private final RecordConsumer recordConsumer;
private final boolean utcTimestamp;
private final FieldWriter[] filedWriters;
private final String[] fieldNames;
public ParquetRowDataWriter(
RecordConsumer recordConsumer,
RowType rowType,
GroupType schema,
boolean utcTimestamp) {
this.recordConsumer = recordConsumer;
this.utcTimestamp = utcTimestamp;
this.filedWriters = new FieldWriter[rowType.getFieldCount()];
this.fieldNames = rowType.getFieldNames().toArray(new String[0]);
for (int i = 0; i < rowType.getFieldCount(); i++) {
this.filedWriters[i] = createWriter(rowType.getTypeAt(i), schema.getType(i));
}
}
/**
* It writes a record to Parquet.
*
* @param record Contains the record that is going to be written.
*/
public void write(final RowData record) {
recordConsumer.startMessage();
for (int i = 0; i < filedWriters.length; i++) {
if (!record.isNullAt(i)) {
String fieldName = fieldNames[i];
FieldWriter writer = filedWriters[i];
recordConsumer.startField(fieldName, i);
writer.write(record, i);
recordConsumer.endField(fieldName, i);
}
}
recordConsumer.endMessage();
}
private FieldWriter createWriter(LogicalType t, Type type) {
if (type.isPrimitive()) {
switch (t.getTypeRoot()) {
case CHAR:
case VARCHAR:
return new StringWriter();
case BOOLEAN:
return new BooleanWriter();
case BINARY:
case VARBINARY:
return new BinaryWriter();
case DECIMAL:
DecimalType decimalType = (DecimalType) t;
return createDecimalWriter(decimalType.getPrecision(), decimalType.getScale());
case TINYINT:
return new ByteWriter();
case SMALLINT:
return new ShortWriter();
case DATE:
case TIME_WITHOUT_TIME_ZONE:
case INTEGER:
return new IntWriter();
case BIGINT:
return new LongWriter();
case FLOAT:
return new FloatWriter();
case DOUBLE:
return new DoubleWriter();
case TIMESTAMP_WITHOUT_TIME_ZONE:
TimestampType timestampType = (TimestampType) t;
if (timestampType.getPrecision() == 3) {
return new Timestamp64Writer();
} else {
return new Timestamp96Writer(timestampType.getPrecision());
}
case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
LocalZonedTimestampType localZonedTimestampType = (LocalZonedTimestampType) t;
if (localZonedTimestampType.getPrecision() == 3) {
return new Timestamp64Writer();
} else {
return new Timestamp96Writer(localZonedTimestampType.getPrecision());
}
default:
throw new UnsupportedOperationException("Unsupported type: " + type);
}
} else {
throw new IllegalArgumentException("Unsupported data type: " + t);
}
}
private interface FieldWriter {
void write(RowData row, int ordinal);
}
private class BooleanWriter implements FieldWriter {
@Override
public void write(RowData row, int ordinal) {
recordConsumer.addBoolean(row.getBoolean(ordinal));
}
}
private class ByteWriter implements FieldWriter {
@Override
public void write(RowData row, int ordinal) {
recordConsumer.addInteger(row.getByte(ordinal));
}
}
private class ShortWriter implements FieldWriter {
@Override
public void write(RowData row, int ordinal) {
recordConsumer.addInteger(row.getShort(ordinal));
}
}
private class LongWriter implements FieldWriter {
@Override
public void write(RowData row, int ordinal) {
recordConsumer.addLong(row.getLong(ordinal));
}
}
private class FloatWriter implements FieldWriter {
@Override
public void write(RowData row, int ordinal) {
recordConsumer.addFloat(row.getFloat(ordinal));
}
}
private class DoubleWriter implements FieldWriter {
@Override
public void write(RowData row, int ordinal) {
recordConsumer.addDouble(row.getDouble(ordinal));
}
}
private class StringWriter implements FieldWriter {
@Override
public void write(RowData row, int ordinal) {
recordConsumer.addBinary(Binary.fromReusedByteArray(row.getString(ordinal).toBytes()));
}
}
private class BinaryWriter implements FieldWriter {
@Override
public void write(RowData row, int ordinal) {
recordConsumer.addBinary(Binary.fromReusedByteArray(row.getBinary(ordinal)));
}
}
private class IntWriter implements FieldWriter {
@Override
public void write(RowData row, int ordinal) {
recordConsumer.addInteger(row.getInt(ordinal));
}
}
/**
* Timestamp of INT96 bytes, julianDay(4) + nanosOfDay(8). See
* https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#timestamp
* TIMESTAMP_MILLIS and TIMESTAMP_MICROS are the deprecated ConvertedType.
*/
private class Timestamp64Writer implements FieldWriter {
private Timestamp64Writer() {
}
@Override
public void write(RowData row, int ordinal) {
recordConsumer.addLong(timestampToInt64(row.getTimestamp(ordinal, 3)));
}
}
private long timestampToInt64(TimestampData timestampData) {
return utcTimestamp ? timestampData.getMillisecond() : timestampData.toTimestamp().getTime();
}
/**
* Timestamp of INT96 bytes, julianDay(4) + nanosOfDay(8). See
* https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#timestamp
* TIMESTAMP_MILLIS and TIMESTAMP_MICROS are the deprecated ConvertedType.
*/
private class Timestamp96Writer implements FieldWriter {
private final int precision;
private Timestamp96Writer(int precision) {
this.precision = precision;
}
@Override
public void write(RowData row, int ordinal) {
recordConsumer.addBinary(timestampToInt96(row.getTimestamp(ordinal, precision)));
}
}
private Binary timestampToInt96(TimestampData timestampData) {
int julianDay;
long nanosOfDay;
if (utcTimestamp) {
long mills = timestampData.getMillisecond();
julianDay = (int) ((mills / MILLIS_IN_DAY) + JULIAN_EPOCH_OFFSET_DAYS);
nanosOfDay =
(mills % MILLIS_IN_DAY) * NANOS_PER_MILLISECOND
+ timestampData.getNanoOfMillisecond();
} else {
Timestamp timestamp = timestampData.toTimestamp();
long mills = timestamp.getTime();
julianDay = (int) ((mills / MILLIS_IN_DAY) + JULIAN_EPOCH_OFFSET_DAYS);
nanosOfDay = ((mills % MILLIS_IN_DAY) / 1000) * NANOS_PER_SECOND + timestamp.getNanos();
}
ByteBuffer buf = ByteBuffer.allocate(12);
buf.order(ByteOrder.LITTLE_ENDIAN);
buf.putLong(nanosOfDay);
buf.putInt(julianDay);
buf.flip();
return Binary.fromConstantByteBuffer(buf);
}
private FieldWriter createDecimalWriter(int precision, int scale) {
Preconditions.checkArgument(
precision <= DecimalType.MAX_PRECISION,
"Decimal precision %s exceeds max precision %s",
precision,
DecimalType.MAX_PRECISION);
/*
* This is optimizer for UnscaledBytesWriter.
*/
class LongUnscaledBytesWriter implements FieldWriter {
private final int numBytes;
private final int initShift;
private final byte[] decimalBuffer;
private LongUnscaledBytesWriter() {
this.numBytes = computeMinBytesForDecimalPrecision(precision);
this.initShift = 8 * (numBytes - 1);
this.decimalBuffer = new byte[numBytes];
}
@Override
public void write(RowData row, int ordinal) {
long unscaledLong = row.getDecimal(ordinal, precision, scale).toUnscaledLong();
int i = 0;
int shift = initShift;
while (i < numBytes) {
decimalBuffer[i] = (byte) (unscaledLong >> shift);
i += 1;
shift -= 8;
}
recordConsumer.addBinary(Binary.fromReusedByteArray(decimalBuffer, 0, numBytes));
}
}
class UnscaledBytesWriter implements FieldWriter {
private final int numBytes;
private final byte[] decimalBuffer;
private UnscaledBytesWriter() {
this.numBytes = computeMinBytesForDecimalPrecision(precision);
this.decimalBuffer = new byte[numBytes];
}
@Override
public void write(RowData row, int ordinal) {
byte[] bytes = row.getDecimal(ordinal, precision, scale).toUnscaledBytes();
byte[] writtenBytes;
if (bytes.length == numBytes) {
// Avoid copy.
writtenBytes = bytes;
} else {
byte signByte = bytes[0] < 0 ? (byte) -1 : (byte) 0;
Arrays.fill(decimalBuffer, 0, numBytes - bytes.length, signByte);
System.arraycopy(
bytes, 0, decimalBuffer, numBytes - bytes.length, bytes.length);
writtenBytes = decimalBuffer;
}
recordConsumer.addBinary(Binary.fromReusedByteArray(writtenBytes, 0, numBytes));
}
}
// 1 <= precision <= 18, writes as FIXED_LEN_BYTE_ARRAY
// optimizer for UnscaledBytesWriter
if (DecimalDataUtils.is32BitDecimal(precision)
|| DecimalDataUtils.is64BitDecimal(precision)) {
return new LongUnscaledBytesWriter();
}
// 19 <= precision <= 38, writes as FIXED_LEN_BYTE_ARRAY
return new UnscaledBytesWriter();
}
}

View File

@@ -0,0 +1,631 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.io.storage.row.parquet;
import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.SqlTimeTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.typeutils.MapTypeInfo;
import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo;
import org.apache.flink.api.java.typeutils.RowTypeInfo;
import org.apache.flink.table.types.logical.DecimalType;
import org.apache.flink.table.types.logical.LocalZonedTimestampType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.table.types.logical.TimestampType;
import org.apache.parquet.schema.GroupType;
import org.apache.parquet.schema.MessageType;
import org.apache.parquet.schema.OriginalType;
import org.apache.parquet.schema.PrimitiveType;
import org.apache.parquet.schema.Type;
import org.apache.parquet.schema.Types;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.lang.reflect.Array;
import java.util.ArrayList;
import java.util.List;
/**
* Schema converter converts Parquet schema to and from Flink internal types.
*
* <p>Reference org.apache.flink.formats.parquet.utils.ParquetSchemaConverter to support timestamp of INT64 8 bytes.
*/
public class ParquetSchemaConverter {
private static final Logger LOGGER = LoggerFactory.getLogger(ParquetSchemaConverter.class);
public static final String MAP_VALUE = "value";
public static final String LIST_ARRAY_TYPE = "array";
public static final String LIST_ELEMENT = "element";
public static final String LIST_GROUP_NAME = "list";
public static final String MESSAGE_ROOT = "root";
/**
* Converts Parquet schema to Flink Internal Type.
*
* @param type Parquet schema
* @return Flink type information
*/
public static TypeInformation<?> fromParquetType(MessageType type) {
return convertFields(type.getFields());
}
/**
* Converts Flink Internal Type to Parquet schema.
*
* @param typeInformation Flink type information
* @param legacyMode is standard LIST and MAP schema or back-compatible schema
* @return Parquet schema
*/
public static MessageType toParquetType(
TypeInformation<?> typeInformation, boolean legacyMode) {
return (MessageType)
convertField(null, typeInformation, Type.Repetition.OPTIONAL, legacyMode);
}
public static TypeInformation<?> convertFields(List<Type> parquetFields) {
List<TypeInformation<?>> types = new ArrayList<>();
List<String> names = new ArrayList<>();
for (Type field : parquetFields) {
TypeInformation<?> subType = convertParquetTypeToTypeInfo(field);
if (subType != null) {
types.add(subType);
names.add(field.getName());
} else {
LOGGER.error(
"Parquet field {} in schema type {} can not be converted to Flink Internal Type",
field.getName(),
field.getOriginalType().name());
}
}
return new RowTypeInfo(
types.toArray(new TypeInformation<?>[0]), names.toArray(new String[0]));
}
public static TypeInformation<?> convertParquetTypeToTypeInfo(final Type fieldType) {
TypeInformation<?> typeInfo;
if (fieldType.isPrimitive()) {
OriginalType originalType = fieldType.getOriginalType();
PrimitiveType primitiveType = fieldType.asPrimitiveType();
switch (primitiveType.getPrimitiveTypeName()) {
case BINARY:
if (originalType != null) {
switch (originalType) {
case DECIMAL:
typeInfo = BasicTypeInfo.BIG_DEC_TYPE_INFO;
break;
case UTF8:
case ENUM:
case JSON:
case BSON:
typeInfo = BasicTypeInfo.STRING_TYPE_INFO;
break;
default:
throw new UnsupportedOperationException(
"Unsupported original type : "
+ originalType.name()
+ " for primitive type BINARY");
}
} else {
typeInfo = BasicTypeInfo.STRING_TYPE_INFO;
}
break;
case BOOLEAN:
typeInfo = BasicTypeInfo.BOOLEAN_TYPE_INFO;
break;
case INT32:
if (originalType != null) {
switch (originalType) {
case TIME_MICROS:
case TIME_MILLIS:
typeInfo = SqlTimeTypeInfo.TIME;
break;
case TIMESTAMP_MICROS:
case TIMESTAMP_MILLIS:
typeInfo = SqlTimeTypeInfo.TIMESTAMP;
break;
case DATE:
typeInfo = SqlTimeTypeInfo.DATE;
break;
case UINT_8:
case UINT_16:
case UINT_32:
typeInfo = BasicTypeInfo.INT_TYPE_INFO;
break;
case INT_8:
typeInfo = org.apache.flink.api.common.typeinfo.Types.BYTE;
break;
case INT_16:
typeInfo = org.apache.flink.api.common.typeinfo.Types.SHORT;
break;
case INT_32:
typeInfo = BasicTypeInfo.INT_TYPE_INFO;
break;
default:
throw new UnsupportedOperationException(
"Unsupported original type : "
+ originalType.name()
+ " for primitive type INT32");
}
} else {
typeInfo = BasicTypeInfo.INT_TYPE_INFO;
}
break;
case INT64:
if (originalType != null) {
switch (originalType) {
case TIME_MICROS:
typeInfo = SqlTimeTypeInfo.TIME;
break;
case TIMESTAMP_MICROS:
case TIMESTAMP_MILLIS:
typeInfo = SqlTimeTypeInfo.TIMESTAMP;
break;
case INT_64:
case DECIMAL:
typeInfo = BasicTypeInfo.LONG_TYPE_INFO;
break;
default:
throw new UnsupportedOperationException(
"Unsupported original type : "
+ originalType.name()
+ " for primitive type INT64");
}
} else {
typeInfo = BasicTypeInfo.LONG_TYPE_INFO;
}
break;
case INT96:
// It stores a timestamp type data, we read it as millisecond
typeInfo = SqlTimeTypeInfo.TIMESTAMP;
break;
case FLOAT:
typeInfo = BasicTypeInfo.FLOAT_TYPE_INFO;
break;
case DOUBLE:
typeInfo = BasicTypeInfo.DOUBLE_TYPE_INFO;
break;
case FIXED_LEN_BYTE_ARRAY:
if (originalType != null) {
switch (originalType) {
case DECIMAL:
typeInfo = BasicTypeInfo.BIG_DEC_TYPE_INFO;
break;
default:
throw new UnsupportedOperationException(
"Unsupported original type : "
+ originalType.name()
+ " for primitive type FIXED_LEN_BYTE_ARRAY");
}
} else {
typeInfo = BasicTypeInfo.BIG_DEC_TYPE_INFO;
}
break;
default:
throw new UnsupportedOperationException("Unsupported schema: " + fieldType);
}
} else {
GroupType parquetGroupType = fieldType.asGroupType();
OriginalType originalType = parquetGroupType.getOriginalType();
if (originalType != null) {
switch (originalType) {
case LIST:
if (parquetGroupType.getFieldCount() != 1) {
throw new UnsupportedOperationException(
"Invalid list type " + parquetGroupType);
}
Type repeatedType = parquetGroupType.getType(0);
if (!repeatedType.isRepetition(Type.Repetition.REPEATED)) {
throw new UnsupportedOperationException(
"Invalid list type " + parquetGroupType);
}
if (repeatedType.isPrimitive()) {
typeInfo = convertParquetPrimitiveListToFlinkArray(repeatedType);
} else {
// Backward-compatibility element group name can be any string
// (element/array/other)
GroupType elementType = repeatedType.asGroupType();
// If the repeated field is a group with multiple fields, then its type
// is the element
// type and elements are required.
if (elementType.getFieldCount() > 1) {
for (Type type : elementType.getFields()) {
if (!type.isRepetition(Type.Repetition.REQUIRED)) {
throw new UnsupportedOperationException(
String.format(
"List field [%s] in List [%s] has to be required. ",
type.toString(), fieldType.getName()));
}
}
typeInfo =
ObjectArrayTypeInfo.getInfoFor(
convertParquetTypeToTypeInfo(elementType));
} else {
Type internalType = elementType.getType(0);
if (internalType.isPrimitive()) {
typeInfo =
convertParquetPrimitiveListToFlinkArray(internalType);
} else {
// No need to do special process for group named array and tuple
GroupType tupleGroup = internalType.asGroupType();
if (tupleGroup.getFieldCount() == 1
&& tupleGroup
.getFields()
.get(0)
.isRepetition(Type.Repetition.REQUIRED)) {
typeInfo =
ObjectArrayTypeInfo.getInfoFor(
convertParquetTypeToTypeInfo(internalType));
} else {
throw new UnsupportedOperationException(
String.format(
"Unrecgonized List schema [%s] according to Parquet"
+ " standard",
parquetGroupType.toString()));
}
}
}
}
break;
case MAP_KEY_VALUE:
case MAP:
// The outer-most level must be a group annotated with MAP
// that contains a single field named key_value
if (parquetGroupType.getFieldCount() != 1
|| parquetGroupType.getType(0).isPrimitive()) {
throw new UnsupportedOperationException(
"Invalid map type " + parquetGroupType);
}
// The middle level must be a repeated group with a key field for map keys
// and, optionally, a value field for map values. But we can't enforce two
// strict condition here
// the schema generated by Parquet lib doesn't contain LogicalType
// ! mapKeyValType.getOriginalType().equals(OriginalType.MAP_KEY_VALUE)
GroupType mapKeyValType = parquetGroupType.getType(0).asGroupType();
if (!mapKeyValType.isRepetition(Type.Repetition.REPEATED)
|| mapKeyValType.getFieldCount() != 2) {
throw new UnsupportedOperationException(
"The middle level of Map should be single field named key_value. Invalid map type "
+ parquetGroupType);
}
Type keyType = mapKeyValType.getType(0);
// The key field encodes the map's key type. This field must have repetition
// required and
// must always be present.
if (!keyType.isPrimitive()
|| !keyType.isRepetition(Type.Repetition.REQUIRED)
|| !keyType.asPrimitiveType()
.getPrimitiveTypeName()
.equals(PrimitiveType.PrimitiveTypeName.BINARY)
|| !keyType.getOriginalType().equals(OriginalType.UTF8)) {
throw new IllegalArgumentException(
"Map key type must be required binary (UTF8): " + keyType);
}
Type valueType = mapKeyValType.getType(1);
return new MapTypeInfo<>(
BasicTypeInfo.STRING_TYPE_INFO,
convertParquetTypeToTypeInfo(valueType));
default:
throw new UnsupportedOperationException("Unsupported schema: " + fieldType);
}
} else {
// if no original type than it is a record
return convertFields(parquetGroupType.getFields());
}
}
return typeInfo;
}
private static TypeInformation<?> convertParquetPrimitiveListToFlinkArray(Type type) {
// Backward-compatibility element group doesn't exist also allowed
TypeInformation<?> flinkType = convertParquetTypeToTypeInfo(type);
if (flinkType.isBasicType()) {
return BasicArrayTypeInfo.getInfoFor(
Array.newInstance(flinkType.getTypeClass(), 0).getClass());
} else {
// flinkType here can be either SqlTimeTypeInfo or BasicTypeInfo.BIG_DEC_TYPE_INFO,
// So it should be converted to ObjectArrayTypeInfo
return ObjectArrayTypeInfo.getInfoFor(flinkType);
}
}
private static Type convertField(
String fieldName,
TypeInformation<?> typeInfo,
Type.Repetition inheritRepetition,
boolean legacyMode) {
Type fieldType = null;
Type.Repetition repetition =
inheritRepetition == null ? Type.Repetition.OPTIONAL : inheritRepetition;
if (typeInfo instanceof BasicTypeInfo) {
BasicTypeInfo basicTypeInfo = (BasicTypeInfo) typeInfo;
if (basicTypeInfo.equals(BasicTypeInfo.BIG_DEC_TYPE_INFO)
|| basicTypeInfo.equals(BasicTypeInfo.BIG_INT_TYPE_INFO)) {
fieldType =
Types.primitive(PrimitiveType.PrimitiveTypeName.BINARY, repetition)
.as(OriginalType.DECIMAL)
.named(fieldName);
} else if (basicTypeInfo.equals(BasicTypeInfo.INT_TYPE_INFO)) {
fieldType =
Types.primitive(PrimitiveType.PrimitiveTypeName.INT32, repetition)
.as(OriginalType.INT_32)
.named(fieldName);
} else if (basicTypeInfo.equals(BasicTypeInfo.DOUBLE_TYPE_INFO)) {
fieldType =
Types.primitive(PrimitiveType.PrimitiveTypeName.DOUBLE, repetition)
.named(fieldName);
} else if (basicTypeInfo.equals(BasicTypeInfo.FLOAT_TYPE_INFO)) {
fieldType =
Types.primitive(PrimitiveType.PrimitiveTypeName.FLOAT, repetition)
.named(fieldName);
} else if (basicTypeInfo.equals(BasicTypeInfo.LONG_TYPE_INFO)) {
fieldType =
Types.primitive(PrimitiveType.PrimitiveTypeName.INT64, repetition)
.as(OriginalType.INT_64)
.named(fieldName);
} else if (basicTypeInfo.equals(BasicTypeInfo.SHORT_TYPE_INFO)) {
fieldType =
Types.primitive(PrimitiveType.PrimitiveTypeName.INT32, repetition)
.as(OriginalType.INT_16)
.named(fieldName);
} else if (basicTypeInfo.equals(BasicTypeInfo.BYTE_TYPE_INFO)) {
fieldType =
Types.primitive(PrimitiveType.PrimitiveTypeName.INT32, repetition)
.as(OriginalType.INT_8)
.named(fieldName);
} else if (basicTypeInfo.equals(BasicTypeInfo.CHAR_TYPE_INFO)) {
fieldType =
Types.primitive(PrimitiveType.PrimitiveTypeName.BINARY, repetition)
.as(OriginalType.UTF8)
.named(fieldName);
} else if (basicTypeInfo.equals(BasicTypeInfo.BOOLEAN_TYPE_INFO)) {
fieldType =
Types.primitive(PrimitiveType.PrimitiveTypeName.BOOLEAN, repetition)
.named(fieldName);
} else if (basicTypeInfo.equals(BasicTypeInfo.DATE_TYPE_INFO)
|| basicTypeInfo.equals(BasicTypeInfo.STRING_TYPE_INFO)) {
fieldType =
Types.primitive(PrimitiveType.PrimitiveTypeName.BINARY, repetition)
.as(OriginalType.UTF8)
.named(fieldName);
}
} else if (typeInfo instanceof MapTypeInfo) {
MapTypeInfo mapTypeInfo = (MapTypeInfo) typeInfo;
if (mapTypeInfo.getKeyTypeInfo().equals(BasicTypeInfo.STRING_TYPE_INFO)) {
fieldType =
Types.map(repetition)
.value(
convertField(
MAP_VALUE,
mapTypeInfo.getValueTypeInfo(),
Type.Repetition.OPTIONAL,
legacyMode))
.named(fieldName);
} else {
throw new UnsupportedOperationException(
String.format(
"Can not convert Flink MapTypeInfo %s to Parquet"
+ " Map type as key has to be String",
typeInfo.toString()));
}
} else if (typeInfo instanceof ObjectArrayTypeInfo) {
ObjectArrayTypeInfo objectArrayTypeInfo = (ObjectArrayTypeInfo) typeInfo;
// Get all required sub fields
GroupType componentGroup =
(GroupType)
convertField(
LIST_ELEMENT,
objectArrayTypeInfo.getComponentInfo(),
Type.Repetition.REQUIRED,
legacyMode);
GroupType elementGroup = Types.repeatedGroup().named(LIST_ELEMENT);
elementGroup = elementGroup.withNewFields(componentGroup.getFields());
fieldType =
Types.buildGroup(repetition)
.addField(elementGroup)
.as(OriginalType.LIST)
.named(fieldName);
} else if (typeInfo instanceof BasicArrayTypeInfo) {
BasicArrayTypeInfo basicArrayType = (BasicArrayTypeInfo) typeInfo;
if (legacyMode) {
// Add extra layer of Group according to Parquet's standard
Type listGroup =
Types.repeatedGroup()
.addField(
convertField(
LIST_ELEMENT,
basicArrayType.getComponentInfo(),
Type.Repetition.REQUIRED,
legacyMode))
.named(LIST_GROUP_NAME);
fieldType =
Types.buildGroup(repetition)
.addField(listGroup)
.as(OriginalType.LIST)
.named(fieldName);
} else {
PrimitiveType primitiveTyp =
convertField(
fieldName,
basicArrayType.getComponentInfo(),
Type.Repetition.REQUIRED,
legacyMode)
.asPrimitiveType();
fieldType =
Types.buildGroup(repetition)
.repeated(primitiveTyp.getPrimitiveTypeName())
.as(primitiveTyp.getOriginalType())
.named(LIST_ARRAY_TYPE)
.as(OriginalType.LIST)
.named(fieldName);
}
} else if (typeInfo instanceof SqlTimeTypeInfo) {
if (typeInfo.equals(SqlTimeTypeInfo.DATE)) {
fieldType =
Types.primitive(PrimitiveType.PrimitiveTypeName.INT32, repetition)
.as(OriginalType.DATE)
.named(fieldName);
} else if (typeInfo.equals(SqlTimeTypeInfo.TIME)) {
fieldType =
Types.primitive(PrimitiveType.PrimitiveTypeName.INT32, repetition)
.as(OriginalType.TIME_MILLIS)
.named(fieldName);
} else if (typeInfo.equals(SqlTimeTypeInfo.TIMESTAMP)) {
fieldType =
Types.primitive(PrimitiveType.PrimitiveTypeName.INT64, repetition)
.as(OriginalType.TIMESTAMP_MILLIS)
.named(fieldName);
} else {
throw new UnsupportedOperationException(
"Unsupported SqlTimeTypeInfo " + typeInfo.toString());
}
} else {
RowTypeInfo rowTypeInfo = (RowTypeInfo) typeInfo;
List<Type> types = new ArrayList<>();
String[] fieldNames = rowTypeInfo.getFieldNames();
TypeInformation<?>[] fieldTypes = rowTypeInfo.getFieldTypes();
for (int i = 0; i < rowTypeInfo.getArity(); i++) {
types.add(convertField(fieldNames[i], fieldTypes[i], repetition, legacyMode));
}
if (fieldName == null) {
fieldType = new MessageType(MESSAGE_ROOT, types);
} else {
fieldType = new GroupType(repetition, fieldName, types);
}
}
return fieldType;
}
public static MessageType convertToParquetMessageType(String name, RowType rowType) {
Type[] types = new Type[rowType.getFieldCount()];
for (int i = 0; i < rowType.getFieldCount(); i++) {
types[i] = convertToParquetType(rowType.getFieldNames().get(i), rowType.getTypeAt(i));
}
return new MessageType(name, types);
}
private static Type convertToParquetType(String name, LogicalType type) {
return convertToParquetType(name, type, Type.Repetition.OPTIONAL);
}
private static Type convertToParquetType(
String name, LogicalType type, Type.Repetition repetition) {
switch (type.getTypeRoot()) {
case CHAR:
case VARCHAR:
return Types.primitive(PrimitiveType.PrimitiveTypeName.BINARY, repetition)
.as(OriginalType.UTF8)
.named(name);
case BOOLEAN:
return Types.primitive(PrimitiveType.PrimitiveTypeName.BOOLEAN, repetition)
.named(name);
case BINARY:
case VARBINARY:
return Types.primitive(PrimitiveType.PrimitiveTypeName.BINARY, repetition)
.named(name);
case DECIMAL:
int precision = ((DecimalType) type).getPrecision();
int scale = ((DecimalType) type).getScale();
int numBytes = computeMinBytesForDecimalPrecision(precision);
return Types.primitive(
PrimitiveType.PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY, repetition)
.precision(precision)
.scale(scale)
.length(numBytes)
.as(OriginalType.DECIMAL)
.named(name);
case TINYINT:
return Types.primitive(PrimitiveType.PrimitiveTypeName.INT32, repetition)
.as(OriginalType.INT_8)
.named(name);
case SMALLINT:
return Types.primitive(PrimitiveType.PrimitiveTypeName.INT32, repetition)
.as(OriginalType.INT_16)
.named(name);
case INTEGER:
return Types.primitive(PrimitiveType.PrimitiveTypeName.INT32, repetition)
.named(name);
case BIGINT:
return Types.primitive(PrimitiveType.PrimitiveTypeName.INT64, repetition)
.named(name);
case FLOAT:
return Types.primitive(PrimitiveType.PrimitiveTypeName.FLOAT, repetition)
.named(name);
case DOUBLE:
return Types.primitive(PrimitiveType.PrimitiveTypeName.DOUBLE, repetition)
.named(name);
case DATE:
return Types.primitive(PrimitiveType.PrimitiveTypeName.INT32, repetition)
.as(OriginalType.DATE)
.named(name);
case TIME_WITHOUT_TIME_ZONE:
return Types.primitive(PrimitiveType.PrimitiveTypeName.INT32, repetition)
.as(OriginalType.TIME_MILLIS)
.named(name);
case TIMESTAMP_WITHOUT_TIME_ZONE:
TimestampType timestampType = (TimestampType) type;
if (timestampType.getPrecision() == 3) {
return Types.primitive(PrimitiveType.PrimitiveTypeName.INT64, repetition)
.named(name);
} else {
return Types.primitive(PrimitiveType.PrimitiveTypeName.INT96, repetition)
.named(name);
}
case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
LocalZonedTimestampType localZonedTimestampType = (LocalZonedTimestampType) type;
if (localZonedTimestampType.getPrecision() == 3) {
return Types.primitive(PrimitiveType.PrimitiveTypeName.INT64, repetition)
.named(name);
} else {
return Types.primitive(PrimitiveType.PrimitiveTypeName.INT96, repetition)
.named(name);
}
default:
throw new UnsupportedOperationException("Unsupported type: " + type);
}
}
public static int computeMinBytesForDecimalPrecision(int precision) {
int numBytes = 1;
while (Math.pow(2.0, 8 * numBytes - 1) < Math.pow(10.0, precision)) {
numBytes += 1;
}
return numBytes;
}
}