diff --git a/hudi-flink/src/main/java/org/apache/hudi/table/format/cow/Int64TimestampColumnReader.java b/hudi-flink/src/main/java/org/apache/hudi/table/format/cow/Int64TimestampColumnReader.java index f71f6e599..024567c83 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/table/format/cow/Int64TimestampColumnReader.java +++ b/hudi-flink/src/main/java/org/apache/hudi/table/format/cow/Int64TimestampColumnReader.java @@ -27,6 +27,8 @@ import org.apache.parquet.schema.PrimitiveType; import java.io.IOException; import java.nio.ByteBuffer; import java.sql.Timestamp; +import java.time.Instant; +import java.time.temporal.ChronoUnit; /** * Timestamp {@link org.apache.flink.formats.parquet.vector.reader.ColumnReader} that supports INT64 8 bytes, @@ -40,12 +42,25 @@ public class Int64TimestampColumnReader extends AbstractColumnReader StringData.fromString(avroObject.toString()); @@ -200,22 +202,36 @@ public class AvroToRowDataConverters { }; } - private static TimestampData convertToTimestamp(Object object) { - final long millis; - if (object instanceof Long) { - millis = (Long) object; - } else if (object instanceof Instant) { - millis = ((Instant) object).toEpochMilli(); + private static AvroToRowDataConverter createTimestampConverter(int precision) { + final ChronoUnit chronoUnit; + if (precision <= 3) { + chronoUnit = ChronoUnit.MILLIS; + } else if (precision <= 6) { + chronoUnit = ChronoUnit.MICROS; } else { - JodaConverter jodaConverter = JodaConverter.getConverter(); - if (jodaConverter != null) { - millis = jodaConverter.convertTimestamp(object); - } else { - throw new IllegalArgumentException( - "Unexpected object type for TIMESTAMP logical type. Received: " + object); - } + throw new IllegalArgumentException( + "Avro does not support TIMESTAMP type with precision: " + + precision + + ", it only supports precision less than 6."); } - return TimestampData.fromEpochMillis(millis); + return avroObject -> { + final Instant instant; + if (avroObject instanceof Long) { + instant = Instant.EPOCH.plus((Long) avroObject, chronoUnit); + } else if (avroObject instanceof Instant) { + instant = (Instant) avroObject; + } else { + JodaConverter jodaConverter = JodaConverter.getConverter(); + if (jodaConverter != null) { + // joda time has only millisecond precision + instant = Instant.ofEpochMilli(jodaConverter.convertTimestamp(avroObject)); + } else { + throw new IllegalArgumentException( + "Unexpected object type for TIMESTAMP logical type. Received: " + avroObject); + } + } + return TimestampData.fromInstant(instant); + }; } private static int convertToDate(Object object) { @@ -272,10 +288,9 @@ public class AvroToRowDataConverters { static class JodaConverter { private static JodaConverter instance; - private static boolean instantiated = false; public static JodaConverter getConverter() { - if (instantiated) { + if (instance != null) { return instance; } @@ -287,8 +302,6 @@ public class AvroToRowDataConverters { instance = new JodaConverter(); } catch (ClassNotFoundException e) { instance = null; - } finally { - instantiated = true; } return instance; } diff --git a/hudi-flink/src/main/java/org/apache/hudi/util/RowDataToAvroConverters.java b/hudi-flink/src/main/java/org/apache/hudi/util/RowDataToAvroConverters.java index 26e21770b..d90670ff4 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/util/RowDataToAvroConverters.java +++ b/hudi-flink/src/main/java/org/apache/hudi/util/RowDataToAvroConverters.java @@ -31,9 +31,12 @@ import org.apache.flink.table.data.TimestampData; import org.apache.flink.table.types.logical.ArrayType; import org.apache.flink.table.types.logical.LogicalType; import org.apache.flink.table.types.logical.RowType; +import org.apache.flink.table.types.logical.TimestampType; import java.io.Serializable; import java.nio.ByteBuffer; +import java.time.Instant; +import java.time.temporal.ChronoUnit; import java.util.ArrayList; import java.util.HashMap; import java.util.List; @@ -151,15 +154,30 @@ public class RowDataToAvroConverters { }; break; case TIMESTAMP_WITHOUT_TIME_ZONE: - converter = - new RowDataToAvroConverter() { - private static final long serialVersionUID = 1L; + final int precision = ((TimestampType) type).getPrecision(); + if (precision <= 3) { + converter = + new RowDataToAvroConverter() { + private static final long serialVersionUID = 1L; - @Override - public Object convert(Schema schema, Object object) { - return ((TimestampData) object).toInstant().toEpochMilli(); - } - }; + @Override + public Object convert(Schema schema, Object object) { + return ((TimestampData) object).toInstant().toEpochMilli(); + } + }; + } else if (precision <= 6) { + converter = + new RowDataToAvroConverter() { + private static final long serialVersionUID = 1L; + + @Override + public Object convert(Schema schema, Object object) { + return ChronoUnit.MICROS.between(Instant.EPOCH, ((TimestampData) object).toInstant()); + } + }; + } else { + throw new UnsupportedOperationException("Unsupported timestamp precision: " + precision); + } break; case DECIMAL: converter = diff --git a/hudi-flink/src/main/java/org/apache/hudi/util/StringToRowDataConverter.java b/hudi-flink/src/main/java/org/apache/hudi/util/StringToRowDataConverter.java index b30d6d6e7..47aed1ed3 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/util/StringToRowDataConverter.java +++ b/hudi-flink/src/main/java/org/apache/hudi/util/StringToRowDataConverter.java @@ -18,6 +18,7 @@ package org.apache.hudi.util; +import org.apache.flink.table.types.logical.TimestampType; import org.apache.hudi.common.util.ValidationUtils; import org.apache.flink.annotation.Internal; @@ -29,7 +30,9 @@ import org.apache.flink.table.types.logical.LogicalType; import java.math.BigDecimal; import java.nio.charset.StandardCharsets; +import java.time.Instant; import java.time.LocalDate; +import java.time.temporal.ChronoUnit; import java.util.Arrays; /** @@ -85,7 +88,14 @@ public class StringToRowDataConverter { // see HoodieAvroUtils#convertValueForAvroLogicalTypes return field -> (int) LocalDate.parse(field).toEpochDay(); case TIMESTAMP_WITHOUT_TIME_ZONE: - return field -> TimestampData.fromEpochMillis(Long.parseLong(field)); + final int precision = ((TimestampType) logicalType).getPrecision(); + if (precision <= 3) { + return field -> TimestampData.fromInstant(Instant.EPOCH.plus(Long.parseLong(field), ChronoUnit.MILLIS)); + } else if (precision <= 6) { + return field -> TimestampData.fromInstant(Instant.EPOCH.plus(Long.parseLong(field), ChronoUnit.MICROS)); + } else { + throw new UnsupportedOperationException("Unsupported type: " + logicalType); + } case CHAR: case VARCHAR: return StringData::fromString; @@ -100,8 +110,7 @@ public class StringToRowDataConverter { decimalType.getPrecision(), decimalType.getScale()); default: - throw new UnsupportedOperationException( - "Unsupported type " + logicalType.getTypeRoot() + " for " + StringToRowDataConverter.class.getName()); + throw new UnsupportedOperationException("Unsupported type: " + logicalType); } } } diff --git a/hudi-flink/src/test/java/org/apache/hudi/table/HoodieDataSourceITCase.java b/hudi-flink/src/test/java/org/apache/hudi/table/HoodieDataSourceITCase.java index 9e0da3adf..f1ca68e63 100644 --- a/hudi-flink/src/test/java/org/apache/hudi/table/HoodieDataSourceITCase.java +++ b/hudi-flink/src/test/java/org/apache/hudi/table/HoodieDataSourceITCase.java @@ -545,6 +545,42 @@ public class HoodieDataSourceITCase extends AbstractTestBase { assertRowsEquals(result2, expected); } + @ParameterizedTest + @EnumSource(value = ExecMode.class) + void testWriteAndReadWithTimestampMicros(ExecMode execMode) throws Exception { + boolean streaming = execMode == ExecMode.STREAM; + String hoodieTableDDL = sql("t1") + .field("id int") + .field("name varchar(10)") + .field("ts timestamp(6)") + .pkField("id") + .noPartition() + .option(FlinkOptions.PATH, tempFile.getAbsolutePath()) + .option(FlinkOptions.READ_AS_STREAMING, streaming) + .end(); + streamTableEnv.executeSql(hoodieTableDDL); + String insertInto = "insert into t1 values\n" + + "(1,'Danny',TIMESTAMP '2021-12-01 01:02:01.100001'),\n" + + "(2,'Stephen',TIMESTAMP '2021-12-02 03:04:02.200002'),\n" + + "(3,'Julian',TIMESTAMP '2021-12-03 13:14:03.300003'),\n" + + "(4,'Fabian',TIMESTAMP '2021-12-04 15:16:04.400004')"; + execInsertSql(streamTableEnv, insertInto); + + final String expected = "[" + + "+I[1, Danny, 2021-12-01T01:02:01.100001], " + + "+I[2, Stephen, 2021-12-02T03:04:02.200002], " + + "+I[3, Julian, 2021-12-03T13:14:03.300003], " + + "+I[4, Fabian, 2021-12-04T15:16:04.400004]]"; + + List result = execSelectSql(streamTableEnv, "select * from t1", execMode); + assertRowsEquals(result, expected); + + // insert another batch of data + execInsertSql(streamTableEnv, insertInto); + List result2 = execSelectSql(streamTableEnv, "select * from t1", execMode); + assertRowsEquals(result2, expected); + } + @ParameterizedTest @EnumSource(value = ExecMode.class) void testInsertOverwrite(ExecMode execMode) { diff --git a/hudi-flink/src/test/java/org/apache/hudi/utils/TestStringToRowDataConverter.java b/hudi-flink/src/test/java/org/apache/hudi/utils/TestStringToRowDataConverter.java index 8f36fd18b..8f7ecad13 100644 --- a/hudi-flink/src/test/java/org/apache/hudi/utils/TestStringToRowDataConverter.java +++ b/hudi-flink/src/test/java/org/apache/hudi/utils/TestStringToRowDataConverter.java @@ -49,13 +49,14 @@ import static org.junit.jupiter.api.Assertions.assertArrayEquals; public class TestStringToRowDataConverter { @Test void testConvert() { - String[] fields = new String[] {"1.1", "3.4", "2021-03-30", "56669000", "1617119069000", "12345.67"}; + String[] fields = new String[] {"1.1", "3.4", "2021-03-30", "56669000", "1617119069000", "1617119069666111", "12345.67"}; LogicalType[] fieldTypes = new LogicalType[] { DataTypes.FLOAT().getLogicalType(), DataTypes.DOUBLE().getLogicalType(), DataTypes.DATE().getLogicalType(), DataTypes.TIME(3).getLogicalType(), - DataTypes.TIMESTAMP().getLogicalType(), + DataTypes.TIMESTAMP(3).getLogicalType(), + DataTypes.TIMESTAMP(6).getLogicalType(), DataTypes.DECIMAL(7, 2).getLogicalType() }; StringToRowDataConverter converter = new StringToRowDataConverter(fieldTypes); @@ -63,7 +64,8 @@ public class TestStringToRowDataConverter { Object[] expected = new Object[] { 1.1f, 3.4D, (int) LocalDate.parse("2021-03-30").toEpochDay(), LocalTime.parse("15:44:29").get(ChronoField.MILLI_OF_DAY), - TimestampData.fromEpochMillis(Instant.parse("2021-03-30T15:44:29Z").toEpochMilli()), + TimestampData.fromInstant(Instant.parse("2021-03-30T15:44:29Z")), + TimestampData.fromInstant(Instant.parse("2021-03-30T15:44:29.666111Z")), DecimalData.fromBigDecimal(new BigDecimal("12345.67"), 7, 2) }; assertArrayEquals(expected, converted); @@ -71,13 +73,14 @@ public class TestStringToRowDataConverter { @Test void testRowDataToAvroStringToRowData() { - GenericRowData rowData = new GenericRowData(6); + GenericRowData rowData = new GenericRowData(7); rowData.setField(0, 1.1f); rowData.setField(1, 3.4D); rowData.setField(2, (int) LocalDate.parse("2021-03-30").toEpochDay()); rowData.setField(3, LocalTime.parse("15:44:29").get(ChronoField.MILLI_OF_DAY)); - rowData.setField(4, TimestampData.fromEpochMillis(Instant.parse("2021-03-30T15:44:29Z").toEpochMilli())); - rowData.setField(5, DecimalData.fromBigDecimal(new BigDecimal("12345.67"), 7, 2)); + rowData.setField(4, TimestampData.fromInstant(Instant.parse("2021-03-30T15:44:29Z"))); + rowData.setField(5, TimestampData.fromInstant(Instant.parse("2021-03-30T15:44:29.666111Z"))); + rowData.setField(6, DecimalData.fromBigDecimal(new BigDecimal("12345.67"), 7, 2)); DataType dataType = DataTypes.ROW( DataTypes.FIELD("f_float", DataTypes.FLOAT()), @@ -85,6 +88,7 @@ public class TestStringToRowDataConverter { DataTypes.FIELD("f_date", DataTypes.DATE()), DataTypes.FIELD("f_time", DataTypes.TIME(3)), DataTypes.FIELD("f_timestamp", DataTypes.TIMESTAMP(3)), + DataTypes.FIELD("f_timestamp_micros", DataTypes.TIMESTAMP(6)), DataTypes.FIELD("f_decimal", DataTypes.DECIMAL(7, 2)) ); RowType rowType = (RowType) dataType.getLogicalType(); @@ -98,8 +102,8 @@ public class TestStringToRowDataConverter { final String[] recordKeys = KeyGenUtils.extractRecordKeys(recordKey); Object[] convertedKeys = stringToRowDataConverter.convert(recordKeys); - GenericRowData converted = new GenericRowData(6); - for (int i = 0; i < 6; i++) { + GenericRowData converted = new GenericRowData(7); + for (int i = 0; i < 7; i++) { converted.setField(i, convertedKeys[i]); } assertThat(converted, is(rowData));