1
0

[HUDI-3184] hudi-flink support timestamp-micros (#4548)

* support both avro and parquet code path
* string rowdata conversion is also supported
This commit is contained in:
Town
2022-01-11 20:53:51 -06:00
committed by GitHub
parent a392e9ba46
commit 4b0111974f
8 changed files with 153 additions and 51 deletions

View File

@@ -27,6 +27,8 @@ import org.apache.parquet.schema.PrimitiveType;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.sql.Timestamp;
import java.time.Instant;
import java.time.temporal.ChronoUnit;
/**
* Timestamp {@link org.apache.flink.formats.parquet.vector.reader.ColumnReader} that supports INT64 8 bytes,
@@ -40,12 +42,25 @@ public class Int64TimestampColumnReader extends AbstractColumnReader<WritableTim
private final boolean utcTimestamp;
private final ChronoUnit chronoUnit;
public Int64TimestampColumnReader(
boolean utcTimestamp,
ColumnDescriptor descriptor,
PageReader pageReader) throws IOException {
PageReader pageReader,
int precision) throws IOException {
super(descriptor, pageReader);
this.utcTimestamp = utcTimestamp;
if (precision <= 3) {
this.chronoUnit = ChronoUnit.MILLIS;
} else if (precision <= 6) {
this.chronoUnit = ChronoUnit.MICROS;
} else {
throw new IllegalArgumentException(
"Avro does not support TIMESTAMP type with precision: "
+ precision
+ ", it only supports precision less than 6.");
}
checkTypeName(PrimitiveType.PrimitiveTypeName.INT64);
}
@@ -59,7 +74,7 @@ public class Int64TimestampColumnReader extends AbstractColumnReader<WritableTim
for (int i = 0; i < num; i++) {
if (runLenDecoder.readInteger() == maxDefLevel) {
ByteBuffer buffer = readDataBuffer(8);
column.setTimestamp(rowId + i, int64ToTimestamp(utcTimestamp, buffer.getLong()));
column.setTimestamp(rowId + i, int64ToTimestamp(utcTimestamp, buffer.getLong(), chronoUnit));
} else {
column.setNullAt(rowId + i);
}
@@ -75,7 +90,7 @@ public class Int64TimestampColumnReader extends AbstractColumnReader<WritableTim
for (int i = rowId; i < rowId + num; ++i) {
if (!column.isNullAt(i)) {
column.setTimestamp(i, decodeInt64ToTimestamp(
utcTimestamp, dictionary, dictionaryIds.getInt(i)));
utcTimestamp, dictionary, dictionaryIds.getInt(i), chronoUnit));
}
}
}
@@ -83,17 +98,22 @@ public class Int64TimestampColumnReader extends AbstractColumnReader<WritableTim
public static TimestampData decodeInt64ToTimestamp(
boolean utcTimestamp,
org.apache.parquet.column.Dictionary dictionary,
int id) {
int id,
ChronoUnit unit) {
long value = dictionary.decodeToLong(id);
return int64ToTimestamp(utcTimestamp, value);
return int64ToTimestamp(utcTimestamp, value, unit);
}
private static TimestampData int64ToTimestamp(boolean utcTimestamp, long millionsOfDay) {
private static TimestampData int64ToTimestamp(
boolean utcTimestamp,
long interval,
ChronoUnit unit) {
final Instant instant = Instant.EPOCH.plus(interval, unit);
if (utcTimestamp) {
return TimestampData.fromEpochMillis(millionsOfDay, 0);
return TimestampData.fromInstant(instant);
} else {
Timestamp timestamp = new Timestamp(millionsOfDay);
return TimestampData.fromTimestamp(timestamp);
// this applies the local timezone
return TimestampData.fromTimestamp(Timestamp.from(instant));
}
}
}

View File

@@ -58,6 +58,7 @@ import org.apache.flink.table.types.logical.IntType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.MapType;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.table.types.logical.TimestampType;
import org.apache.flink.table.types.logical.VarBinaryType;
import org.apache.flink.util.Preconditions;
import org.apache.hadoop.conf.Configuration;
@@ -329,7 +330,7 @@ public class ParquetSplitReaderUtil {
case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
switch (descriptor.getPrimitiveType().getPrimitiveTypeName()) {
case INT64:
return new Int64TimestampColumnReader(utcTimestamp, descriptor, pageReader);
return new Int64TimestampColumnReader(utcTimestamp, descriptor, pageReader, ((TimestampType)fieldType).getPrecision());
case INT96:
return new TimestampColumnReader(utcTimestamp, descriptor, pageReader);
default:

View File

@@ -217,12 +217,13 @@ public class AvroSchemaConverter {
org.apache.avro.LogicalType avroLogicalType;
if (precision <= 3) {
avroLogicalType = LogicalTypes.timestampMillis();
} else if (precision <= 6) {
avroLogicalType = LogicalTypes.timestampMicros();
} else {
throw new IllegalArgumentException(
"Avro does not support TIMESTAMP type "
+ "with precision: "
"Avro does not support TIMESTAMP type with precision: "
+ precision
+ ", it only supports precision less than 3.");
+ ", it only supports precision less than 6.");
}
Schema timestamp = avroLogicalType.addToSchema(SchemaBuilder.builder().longType());
return nullable ? nullableSchema(timestamp) : timestamp;

View File

@@ -34,6 +34,7 @@ import org.apache.flink.table.types.logical.ArrayType;
import org.apache.flink.table.types.logical.DecimalType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.table.types.logical.TimestampType;
import org.apache.flink.table.types.logical.utils.LogicalTypeUtils;
import org.joda.time.DateTime;
import org.joda.time.DateTimeFieldType;
@@ -45,6 +46,7 @@ import java.time.Instant;
import java.time.LocalDate;
import java.time.LocalTime;
import java.time.temporal.ChronoField;
import java.time.temporal.ChronoUnit;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -125,7 +127,7 @@ public class AvroToRowDataConverters {
case TIME_WITHOUT_TIME_ZONE:
return AvroToRowDataConverters::convertToTime;
case TIMESTAMP_WITHOUT_TIME_ZONE:
return AvroToRowDataConverters::convertToTimestamp;
return createTimestampConverter(((TimestampType) type).getPrecision());
case CHAR:
case VARCHAR:
return avroObject -> StringData.fromString(avroObject.toString());
@@ -200,22 +202,36 @@ public class AvroToRowDataConverters {
};
}
private static TimestampData convertToTimestamp(Object object) {
final long millis;
if (object instanceof Long) {
millis = (Long) object;
} else if (object instanceof Instant) {
millis = ((Instant) object).toEpochMilli();
private static AvroToRowDataConverter createTimestampConverter(int precision) {
final ChronoUnit chronoUnit;
if (precision <= 3) {
chronoUnit = ChronoUnit.MILLIS;
} else if (precision <= 6) {
chronoUnit = ChronoUnit.MICROS;
} else {
JodaConverter jodaConverter = JodaConverter.getConverter();
if (jodaConverter != null) {
millis = jodaConverter.convertTimestamp(object);
} else {
throw new IllegalArgumentException(
"Unexpected object type for TIMESTAMP logical type. Received: " + object);
}
throw new IllegalArgumentException(
"Avro does not support TIMESTAMP type with precision: "
+ precision
+ ", it only supports precision less than 6.");
}
return TimestampData.fromEpochMillis(millis);
return avroObject -> {
final Instant instant;
if (avroObject instanceof Long) {
instant = Instant.EPOCH.plus((Long) avroObject, chronoUnit);
} else if (avroObject instanceof Instant) {
instant = (Instant) avroObject;
} else {
JodaConverter jodaConverter = JodaConverter.getConverter();
if (jodaConverter != null) {
// joda time has only millisecond precision
instant = Instant.ofEpochMilli(jodaConverter.convertTimestamp(avroObject));
} else {
throw new IllegalArgumentException(
"Unexpected object type for TIMESTAMP logical type. Received: " + avroObject);
}
}
return TimestampData.fromInstant(instant);
};
}
private static int convertToDate(Object object) {
@@ -272,10 +288,9 @@ public class AvroToRowDataConverters {
static class JodaConverter {
private static JodaConverter instance;
private static boolean instantiated = false;
public static JodaConverter getConverter() {
if (instantiated) {
if (instance != null) {
return instance;
}
@@ -287,8 +302,6 @@ public class AvroToRowDataConverters {
instance = new JodaConverter();
} catch (ClassNotFoundException e) {
instance = null;
} finally {
instantiated = true;
}
return instance;
}

View File

@@ -31,9 +31,12 @@ import org.apache.flink.table.data.TimestampData;
import org.apache.flink.table.types.logical.ArrayType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.table.types.logical.TimestampType;
import java.io.Serializable;
import java.nio.ByteBuffer;
import java.time.Instant;
import java.time.temporal.ChronoUnit;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
@@ -151,15 +154,30 @@ public class RowDataToAvroConverters {
};
break;
case TIMESTAMP_WITHOUT_TIME_ZONE:
converter =
new RowDataToAvroConverter() {
private static final long serialVersionUID = 1L;
final int precision = ((TimestampType) type).getPrecision();
if (precision <= 3) {
converter =
new RowDataToAvroConverter() {
private static final long serialVersionUID = 1L;
@Override
public Object convert(Schema schema, Object object) {
return ((TimestampData) object).toInstant().toEpochMilli();
}
};
@Override
public Object convert(Schema schema, Object object) {
return ((TimestampData) object).toInstant().toEpochMilli();
}
};
} else if (precision <= 6) {
converter =
new RowDataToAvroConverter() {
private static final long serialVersionUID = 1L;
@Override
public Object convert(Schema schema, Object object) {
return ChronoUnit.MICROS.between(Instant.EPOCH, ((TimestampData) object).toInstant());
}
};
} else {
throw new UnsupportedOperationException("Unsupported timestamp precision: " + precision);
}
break;
case DECIMAL:
converter =

View File

@@ -18,6 +18,7 @@
package org.apache.hudi.util;
import org.apache.flink.table.types.logical.TimestampType;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.flink.annotation.Internal;
@@ -29,7 +30,9 @@ import org.apache.flink.table.types.logical.LogicalType;
import java.math.BigDecimal;
import java.nio.charset.StandardCharsets;
import java.time.Instant;
import java.time.LocalDate;
import java.time.temporal.ChronoUnit;
import java.util.Arrays;
/**
@@ -85,7 +88,14 @@ public class StringToRowDataConverter {
// see HoodieAvroUtils#convertValueForAvroLogicalTypes
return field -> (int) LocalDate.parse(field).toEpochDay();
case TIMESTAMP_WITHOUT_TIME_ZONE:
return field -> TimestampData.fromEpochMillis(Long.parseLong(field));
final int precision = ((TimestampType) logicalType).getPrecision();
if (precision <= 3) {
return field -> TimestampData.fromInstant(Instant.EPOCH.plus(Long.parseLong(field), ChronoUnit.MILLIS));
} else if (precision <= 6) {
return field -> TimestampData.fromInstant(Instant.EPOCH.plus(Long.parseLong(field), ChronoUnit.MICROS));
} else {
throw new UnsupportedOperationException("Unsupported type: " + logicalType);
}
case CHAR:
case VARCHAR:
return StringData::fromString;
@@ -100,8 +110,7 @@ public class StringToRowDataConverter {
decimalType.getPrecision(),
decimalType.getScale());
default:
throw new UnsupportedOperationException(
"Unsupported type " + logicalType.getTypeRoot() + " for " + StringToRowDataConverter.class.getName());
throw new UnsupportedOperationException("Unsupported type: " + logicalType);
}
}
}

View File

@@ -545,6 +545,42 @@ public class HoodieDataSourceITCase extends AbstractTestBase {
assertRowsEquals(result2, expected);
}
@ParameterizedTest
@EnumSource(value = ExecMode.class)
void testWriteAndReadWithTimestampMicros(ExecMode execMode) throws Exception {
boolean streaming = execMode == ExecMode.STREAM;
String hoodieTableDDL = sql("t1")
.field("id int")
.field("name varchar(10)")
.field("ts timestamp(6)")
.pkField("id")
.noPartition()
.option(FlinkOptions.PATH, tempFile.getAbsolutePath())
.option(FlinkOptions.READ_AS_STREAMING, streaming)
.end();
streamTableEnv.executeSql(hoodieTableDDL);
String insertInto = "insert into t1 values\n"
+ "(1,'Danny',TIMESTAMP '2021-12-01 01:02:01.100001'),\n"
+ "(2,'Stephen',TIMESTAMP '2021-12-02 03:04:02.200002'),\n"
+ "(3,'Julian',TIMESTAMP '2021-12-03 13:14:03.300003'),\n"
+ "(4,'Fabian',TIMESTAMP '2021-12-04 15:16:04.400004')";
execInsertSql(streamTableEnv, insertInto);
final String expected = "["
+ "+I[1, Danny, 2021-12-01T01:02:01.100001], "
+ "+I[2, Stephen, 2021-12-02T03:04:02.200002], "
+ "+I[3, Julian, 2021-12-03T13:14:03.300003], "
+ "+I[4, Fabian, 2021-12-04T15:16:04.400004]]";
List<Row> result = execSelectSql(streamTableEnv, "select * from t1", execMode);
assertRowsEquals(result, expected);
// insert another batch of data
execInsertSql(streamTableEnv, insertInto);
List<Row> result2 = execSelectSql(streamTableEnv, "select * from t1", execMode);
assertRowsEquals(result2, expected);
}
@ParameterizedTest
@EnumSource(value = ExecMode.class)
void testInsertOverwrite(ExecMode execMode) {

View File

@@ -49,13 +49,14 @@ import static org.junit.jupiter.api.Assertions.assertArrayEquals;
public class TestStringToRowDataConverter {
@Test
void testConvert() {
String[] fields = new String[] {"1.1", "3.4", "2021-03-30", "56669000", "1617119069000", "12345.67"};
String[] fields = new String[] {"1.1", "3.4", "2021-03-30", "56669000", "1617119069000", "1617119069666111", "12345.67"};
LogicalType[] fieldTypes = new LogicalType[] {
DataTypes.FLOAT().getLogicalType(),
DataTypes.DOUBLE().getLogicalType(),
DataTypes.DATE().getLogicalType(),
DataTypes.TIME(3).getLogicalType(),
DataTypes.TIMESTAMP().getLogicalType(),
DataTypes.TIMESTAMP(3).getLogicalType(),
DataTypes.TIMESTAMP(6).getLogicalType(),
DataTypes.DECIMAL(7, 2).getLogicalType()
};
StringToRowDataConverter converter = new StringToRowDataConverter(fieldTypes);
@@ -63,7 +64,8 @@ public class TestStringToRowDataConverter {
Object[] expected = new Object[] {
1.1f, 3.4D, (int) LocalDate.parse("2021-03-30").toEpochDay(),
LocalTime.parse("15:44:29").get(ChronoField.MILLI_OF_DAY),
TimestampData.fromEpochMillis(Instant.parse("2021-03-30T15:44:29Z").toEpochMilli()),
TimestampData.fromInstant(Instant.parse("2021-03-30T15:44:29Z")),
TimestampData.fromInstant(Instant.parse("2021-03-30T15:44:29.666111Z")),
DecimalData.fromBigDecimal(new BigDecimal("12345.67"), 7, 2)
};
assertArrayEquals(expected, converted);
@@ -71,13 +73,14 @@ public class TestStringToRowDataConverter {
@Test
void testRowDataToAvroStringToRowData() {
GenericRowData rowData = new GenericRowData(6);
GenericRowData rowData = new GenericRowData(7);
rowData.setField(0, 1.1f);
rowData.setField(1, 3.4D);
rowData.setField(2, (int) LocalDate.parse("2021-03-30").toEpochDay());
rowData.setField(3, LocalTime.parse("15:44:29").get(ChronoField.MILLI_OF_DAY));
rowData.setField(4, TimestampData.fromEpochMillis(Instant.parse("2021-03-30T15:44:29Z").toEpochMilli()));
rowData.setField(5, DecimalData.fromBigDecimal(new BigDecimal("12345.67"), 7, 2));
rowData.setField(4, TimestampData.fromInstant(Instant.parse("2021-03-30T15:44:29Z")));
rowData.setField(5, TimestampData.fromInstant(Instant.parse("2021-03-30T15:44:29.666111Z")));
rowData.setField(6, DecimalData.fromBigDecimal(new BigDecimal("12345.67"), 7, 2));
DataType dataType = DataTypes.ROW(
DataTypes.FIELD("f_float", DataTypes.FLOAT()),
@@ -85,6 +88,7 @@ public class TestStringToRowDataConverter {
DataTypes.FIELD("f_date", DataTypes.DATE()),
DataTypes.FIELD("f_time", DataTypes.TIME(3)),
DataTypes.FIELD("f_timestamp", DataTypes.TIMESTAMP(3)),
DataTypes.FIELD("f_timestamp_micros", DataTypes.TIMESTAMP(6)),
DataTypes.FIELD("f_decimal", DataTypes.DECIMAL(7, 2))
);
RowType rowType = (RowType) dataType.getLogicalType();
@@ -98,8 +102,8 @@ public class TestStringToRowDataConverter {
final String[] recordKeys = KeyGenUtils.extractRecordKeys(recordKey);
Object[] convertedKeys = stringToRowDataConverter.convert(recordKeys);
GenericRowData converted = new GenericRowData(6);
for (int i = 0; i < 6; i++) {
GenericRowData converted = new GenericRowData(7);
for (int i = 0; i < 7; i++) {
converted.setField(i, convertedKeys[i]);
}
assertThat(converted, is(rowData));