From 36790709f76f71e72026f3f54218ea8b8e247ed2 Mon Sep 17 00:00:00 2001 From: Yann Byron Date: Sun, 9 Jan 2022 15:43:25 +0800 Subject: [PATCH] [HUDI-3125] spark-sql write timestamp directly (#4471) --- .../hudi/keygen/RowKeyGeneratorHelper.java | 19 +++- .../apache/hudi/AvroConversionHelper.scala | 15 ++- .../hudi/keygen/TestRowGeneratorHelper.scala | 102 ++++++++++++++++++ .../spark/sql/hudi/TestCreateTable.scala | 27 +++++ .../spark/sql/hudi/TestInsertTable.scala | 32 ++++++ 5 files changed, 188 insertions(+), 7 deletions(-) create mode 100644 hudi-client/hudi-spark-client/src/test/scala/org/apache/hudi/keygen/TestRowGeneratorHelper.scala diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/RowKeyGeneratorHelper.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/RowKeyGeneratorHelper.java index 5c6a0e490..24f6e7a4f 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/RowKeyGeneratorHelper.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/RowKeyGeneratorHelper.java @@ -27,6 +27,8 @@ import org.apache.spark.sql.types.DataTypes; import org.apache.spark.sql.types.StructField; import org.apache.spark.sql.types.StructType; +import java.sql.Timestamp; +import java.time.Instant; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; @@ -106,7 +108,8 @@ public class RowKeyGeneratorHelper { if (fieldPos == -1 || row.isNullAt(fieldPos)) { val = HUDI_DEFAULT_PARTITION_PATH; } else { - val = row.getAs(field).toString(); + Object data = row.get(fieldPos); + val = convertToTimestampIfInstant(data).toString(); if (val.isEmpty()) { val = HUDI_DEFAULT_PARTITION_PATH; } @@ -115,11 +118,12 @@ public class RowKeyGeneratorHelper { val = field + "=" + val; } } else { // nested - Object nestedVal = getNestedFieldVal(row, partitionPathPositions.get(field)); - if (nestedVal.toString().contains(NULL_RECORDKEY_PLACEHOLDER) || nestedVal.toString().contains(EMPTY_RECORDKEY_PLACEHOLDER)) { + Object data = getNestedFieldVal(row, partitionPathPositions.get(field)); + data = convertToTimestampIfInstant(data); + if (data.toString().contains(NULL_RECORDKEY_PLACEHOLDER) || data.toString().contains(EMPTY_RECORDKEY_PLACEHOLDER)) { val = hiveStylePartitioning ? field + "=" + HUDI_DEFAULT_PARTITION_PATH : HUDI_DEFAULT_PARTITION_PATH; } else { - val = hiveStylePartitioning ? field + "=" + nestedVal.toString() : nestedVal.toString(); + val = hiveStylePartitioning ? field + "=" + data.toString() : data.toString(); } } return val; @@ -266,4 +270,11 @@ public class RowKeyGeneratorHelper { } return positions; } + + private static Object convertToTimestampIfInstant(Object data) { + if (data instanceof Instant) { + return Timestamp.from((Instant) data); + } + return data; + } } diff --git a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/AvroConversionHelper.scala b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/AvroConversionHelper.scala index 2900f08cc..f968cbe1c 100644 --- a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/AvroConversionHelper.scala +++ b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/AvroConversionHelper.scala @@ -20,6 +20,7 @@ package org.apache.hudi import java.nio.ByteBuffer import java.sql.{Date, Timestamp} +import java.time.Instant import org.apache.avro.Conversions.DecimalConversion import org.apache.avro.LogicalTypes.{TimestampMicros, TimestampMillis} @@ -301,9 +302,17 @@ object AvroConversionHelper { }.orNull } case TimestampType => (item: Any) => - // Convert time to microseconds since spark-avro by default converts TimestampType to - // Avro Logical TimestampMicros - Option(item).map(_.asInstanceOf[Timestamp].getTime * 1000).orNull + if (item == null) { + null + } else { + val timestamp = item match { + case i: Instant => Timestamp.from(i) + case t: Timestamp => t + } + // Convert time to microseconds since spark-avro by default converts TimestampType to + // Avro Logical TimestampMicros + timestamp.getTime * 1000 + } case DateType => (item: Any) => Option(item).map(_.asInstanceOf[Date].toLocalDate.toEpochDay.toInt).orNull case ArrayType(elementType, _) => diff --git a/hudi-client/hudi-spark-client/src/test/scala/org/apache/hudi/keygen/TestRowGeneratorHelper.scala b/hudi-client/hudi-spark-client/src/test/scala/org/apache/hudi/keygen/TestRowGeneratorHelper.scala new file mode 100644 index 000000000..d4b89e1c9 --- /dev/null +++ b/hudi-client/hudi-spark-client/src/test/scala/org/apache/hudi/keygen/TestRowGeneratorHelper.scala @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.keygen + +import java.sql.Timestamp + +import org.apache.spark.sql.Row + +import org.apache.hudi.keygen.RowKeyGeneratorHelper._ + +import org.junit.jupiter.api.{Assertions, Test} + +import scala.collection.JavaConverters._ + +class TestRowGeneratorHelper { + + @Test + def testGetPartitionPathFromRow(): Unit = { + + /** single plain partition */ + val row1 = Row.fromSeq(Seq(1, "z3", 10.0, "20220108")) + val ptField1 = List("dt").asJava + val ptPos1 = Map("dt" -> List(new Integer(3)).asJava).asJava + Assertions.assertEquals("20220108", + getPartitionPathFromRow(row1, ptField1, false, ptPos1)) + Assertions.assertEquals("dt=20220108", + getPartitionPathFromRow(row1, ptField1, true, ptPos1)) + + /** multiple plain partitions */ + val row2 = Row.fromSeq(Seq(1, "z3", 10.0, "2022", "01", "08")) + val ptField2 = List("year", "month", "day").asJava + val ptPos2 = Map("year" -> List(new Integer(3)).asJava, + "month" -> List(new Integer(4)).asJava, + "day" -> List(new Integer(5)).asJava + ).asJava + Assertions.assertEquals("2022/01/08", + getPartitionPathFromRow(row2, ptField2, false, ptPos2)) + Assertions.assertEquals("year=2022/month=01/day=08", + getPartitionPathFromRow(row2, ptField2, true, ptPos2)) + + /** multiple partitions which contains TimeStamp type or Instant type */ + val timestamp = Timestamp.valueOf("2020-01-08 10:00:00") + val instant = timestamp.toInstant + val ptField3 = List("event", "event_time").asJava + val ptPos3 = Map("event" -> List(new Integer(3)).asJava, + "event_time" -> List(new Integer(4)).asJava + ).asJava + + // with timeStamp type + val row2_ts = Row.fromSeq(Seq(1, "z3", 10.0, "click", timestamp)) + Assertions.assertEquals("click/2020-01-08 10:00:00.0", + getPartitionPathFromRow(row2_ts, ptField3, false, ptPos3)) + Assertions.assertEquals("event=click/event_time=2020-01-08 10:00:00.0", + getPartitionPathFromRow(row2_ts, ptField3, true, ptPos3)) + + // with instant type + val row2_instant = Row.fromSeq(Seq(1, "z3", 10.0, "click", instant)) + Assertions.assertEquals("click/2020-01-08 10:00:00.0", + getPartitionPathFromRow(row2_instant, ptField3, false, ptPos3)) + Assertions.assertEquals("event=click/event_time=2020-01-08 10:00:00.0", + getPartitionPathFromRow(row2_instant, ptField3, true, ptPos3)) + + /** mixed case with plain and nested partitions */ + val nestedRow4 = Row.fromSeq(Seq(instant, "ad")) + val ptField4 = List("event_time").asJava + val ptPos4 = Map("event_time" -> List(new Integer(3), new Integer(0)).asJava).asJava + // with instant type + val row4 = Row.fromSeq(Seq(1, "z3", 10.0, nestedRow4, "click")) + Assertions.assertEquals("2020-01-08 10:00:00.0", + getPartitionPathFromRow(row4, ptField4, false, ptPos4)) + Assertions.assertEquals("event_time=2020-01-08 10:00:00.0", + getPartitionPathFromRow(row4, ptField4, true, ptPos4)) + + val nestedRow5 = Row.fromSeq(Seq(timestamp, "ad")) + val ptField5 = List("event", "event_time").asJava + val ptPos5 = Map( + "event_time" -> List(new Integer(3), new Integer(0)).asJava, + "event" -> List(new Integer(4)).asJava + ).asJava + val row5 = Row.fromSeq(Seq(1, "z3", 10.0, nestedRow5, "click")) + Assertions.assertEquals("click/2020-01-08 10:00:00.0", + getPartitionPathFromRow(row5, ptField5, false, ptPos5)) + Assertions.assertEquals("event=click/event_time=2020-01-08 10:00:00.0", + getPartitionPathFromRow(row5, ptField5, true, ptPos5)) + } +} diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestCreateTable.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestCreateTable.scala index 14bc70428..7f51c59b0 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestCreateTable.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestCreateTable.scala @@ -330,6 +330,33 @@ class TestCreateTable extends TestHoodieSqlBase { } } + test("Test Create Table As Select when 'spark.sql.datetime.java8API.enabled' enables") { + try { + // enable spark.sql.datetime.java8API.enabled + // and use java.time.Instant to replace java.sql.Timestamp to represent TimestampType. + spark.conf.set("spark.sql.datetime.java8API.enabled", value = true) + + val tableName = generateTableName + spark.sql( + s""" + |create table $tableName + |using hudi + |partitioned by(dt) + |options(type = 'cow', primaryKey = 'id') + |as + |select 1 as id, 'a1' as name, 10 as price, cast('2021-05-07 00:00:00' as timestamp) as dt + |""".stripMargin + ) + + checkAnswer(s"select id, name, price, cast(dt as string) from $tableName")( + Seq(1, "a1", 10, "2021-05-07 00:00:00") + ) + + } finally { + spark.conf.set("spark.sql.datetime.java8API.enabled", value = false) + } + } + test("Test Create Table From Exist Hoodie Table") { withTempDir { tmp => Seq("2021-08-02", "2021/08/02").foreach { partitionValue => diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestInsertTable.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestInsertTable.scala index c34ed7910..4d12d987f 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestInsertTable.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestInsertTable.scala @@ -370,6 +370,38 @@ class TestInsertTable extends TestHoodieSqlBase { spark.sql("set hoodie.sql.insert.mode = upsert") } + + test("Test Insert timestamp when 'spark.sql.datetime.java8API.enabled' enables") { + try { + // enable spark.sql.datetime.java8API.enabled + // and use java.time.Instant to replace java.sql.Timestamp to represent TimestampType. + spark.conf.set("spark.sql.datetime.java8API.enabled", value = true) + + val tableName = generateTableName + spark.sql( + s""" + |create table $tableName ( + | id int, + | name string, + | price double, + | dt timestamp + |) + |using hudi + |partitioned by(dt) + |options(type = 'cow', primaryKey = 'id') + |""".stripMargin + ) + + spark.sql(s"insert into $tableName values (1, 'a1', 10, cast('2021-05-07 00:00:00' as timestamp))") + checkAnswer(s"select id, name, price, cast(dt as string) from $tableName")( + Seq(1, "a1", 10, "2021-05-07 00:00:00") + ) + + } finally { + spark.conf.set("spark.sql.datetime.java8API.enabled", value = false) + } + } + test("Test bulk insert") { spark.sql("set hoodie.sql.insert.mode = non-strict") withTempDir { tmp =>