[HUDI-3125] spark-sql write timestamp directly (#4471)
This commit is contained in:
@@ -27,6 +27,8 @@ import org.apache.spark.sql.types.DataTypes;
|
|||||||
import org.apache.spark.sql.types.StructField;
|
import org.apache.spark.sql.types.StructField;
|
||||||
import org.apache.spark.sql.types.StructType;
|
import org.apache.spark.sql.types.StructType;
|
||||||
|
|
||||||
|
import java.sql.Timestamp;
|
||||||
|
import java.time.Instant;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
@@ -106,7 +108,8 @@ public class RowKeyGeneratorHelper {
|
|||||||
if (fieldPos == -1 || row.isNullAt(fieldPos)) {
|
if (fieldPos == -1 || row.isNullAt(fieldPos)) {
|
||||||
val = HUDI_DEFAULT_PARTITION_PATH;
|
val = HUDI_DEFAULT_PARTITION_PATH;
|
||||||
} else {
|
} else {
|
||||||
val = row.getAs(field).toString();
|
Object data = row.get(fieldPos);
|
||||||
|
val = convertToTimestampIfInstant(data).toString();
|
||||||
if (val.isEmpty()) {
|
if (val.isEmpty()) {
|
||||||
val = HUDI_DEFAULT_PARTITION_PATH;
|
val = HUDI_DEFAULT_PARTITION_PATH;
|
||||||
}
|
}
|
||||||
@@ -115,11 +118,12 @@ public class RowKeyGeneratorHelper {
|
|||||||
val = field + "=" + val;
|
val = field + "=" + val;
|
||||||
}
|
}
|
||||||
} else { // nested
|
} else { // nested
|
||||||
Object nestedVal = getNestedFieldVal(row, partitionPathPositions.get(field));
|
Object data = getNestedFieldVal(row, partitionPathPositions.get(field));
|
||||||
if (nestedVal.toString().contains(NULL_RECORDKEY_PLACEHOLDER) || nestedVal.toString().contains(EMPTY_RECORDKEY_PLACEHOLDER)) {
|
data = convertToTimestampIfInstant(data);
|
||||||
|
if (data.toString().contains(NULL_RECORDKEY_PLACEHOLDER) || data.toString().contains(EMPTY_RECORDKEY_PLACEHOLDER)) {
|
||||||
val = hiveStylePartitioning ? field + "=" + HUDI_DEFAULT_PARTITION_PATH : HUDI_DEFAULT_PARTITION_PATH;
|
val = hiveStylePartitioning ? field + "=" + HUDI_DEFAULT_PARTITION_PATH : HUDI_DEFAULT_PARTITION_PATH;
|
||||||
} else {
|
} else {
|
||||||
val = hiveStylePartitioning ? field + "=" + nestedVal.toString() : nestedVal.toString();
|
val = hiveStylePartitioning ? field + "=" + data.toString() : data.toString();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return val;
|
return val;
|
||||||
@@ -266,4 +270,11 @@ public class RowKeyGeneratorHelper {
|
|||||||
}
|
}
|
||||||
return positions;
|
return positions;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private static Object convertToTimestampIfInstant(Object data) {
|
||||||
|
if (data instanceof Instant) {
|
||||||
|
return Timestamp.from((Instant) data);
|
||||||
|
}
|
||||||
|
return data;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -20,6 +20,7 @@ package org.apache.hudi
|
|||||||
|
|
||||||
import java.nio.ByteBuffer
|
import java.nio.ByteBuffer
|
||||||
import java.sql.{Date, Timestamp}
|
import java.sql.{Date, Timestamp}
|
||||||
|
import java.time.Instant
|
||||||
|
|
||||||
import org.apache.avro.Conversions.DecimalConversion
|
import org.apache.avro.Conversions.DecimalConversion
|
||||||
import org.apache.avro.LogicalTypes.{TimestampMicros, TimestampMillis}
|
import org.apache.avro.LogicalTypes.{TimestampMicros, TimestampMillis}
|
||||||
@@ -301,9 +302,17 @@ object AvroConversionHelper {
|
|||||||
}.orNull
|
}.orNull
|
||||||
}
|
}
|
||||||
case TimestampType => (item: Any) =>
|
case TimestampType => (item: Any) =>
|
||||||
|
if (item == null) {
|
||||||
|
null
|
||||||
|
} else {
|
||||||
|
val timestamp = item match {
|
||||||
|
case i: Instant => Timestamp.from(i)
|
||||||
|
case t: Timestamp => t
|
||||||
|
}
|
||||||
// Convert time to microseconds since spark-avro by default converts TimestampType to
|
// Convert time to microseconds since spark-avro by default converts TimestampType to
|
||||||
// Avro Logical TimestampMicros
|
// Avro Logical TimestampMicros
|
||||||
Option(item).map(_.asInstanceOf[Timestamp].getTime * 1000).orNull
|
timestamp.getTime * 1000
|
||||||
|
}
|
||||||
case DateType => (item: Any) =>
|
case DateType => (item: Any) =>
|
||||||
Option(item).map(_.asInstanceOf[Date].toLocalDate.toEpochDay.toInt).orNull
|
Option(item).map(_.asInstanceOf[Date].toLocalDate.toEpochDay.toInt).orNull
|
||||||
case ArrayType(elementType, _) =>
|
case ArrayType(elementType, _) =>
|
||||||
|
|||||||
@@ -0,0 +1,102 @@
|
|||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hudi.keygen
|
||||||
|
|
||||||
|
import java.sql.Timestamp
|
||||||
|
|
||||||
|
import org.apache.spark.sql.Row
|
||||||
|
|
||||||
|
import org.apache.hudi.keygen.RowKeyGeneratorHelper._
|
||||||
|
|
||||||
|
import org.junit.jupiter.api.{Assertions, Test}
|
||||||
|
|
||||||
|
import scala.collection.JavaConverters._
|
||||||
|
|
||||||
|
class TestRowGeneratorHelper {
|
||||||
|
|
||||||
|
@Test
|
||||||
|
def testGetPartitionPathFromRow(): Unit = {
|
||||||
|
|
||||||
|
/** single plain partition */
|
||||||
|
val row1 = Row.fromSeq(Seq(1, "z3", 10.0, "20220108"))
|
||||||
|
val ptField1 = List("dt").asJava
|
||||||
|
val ptPos1 = Map("dt" -> List(new Integer(3)).asJava).asJava
|
||||||
|
Assertions.assertEquals("20220108",
|
||||||
|
getPartitionPathFromRow(row1, ptField1, false, ptPos1))
|
||||||
|
Assertions.assertEquals("dt=20220108",
|
||||||
|
getPartitionPathFromRow(row1, ptField1, true, ptPos1))
|
||||||
|
|
||||||
|
/** multiple plain partitions */
|
||||||
|
val row2 = Row.fromSeq(Seq(1, "z3", 10.0, "2022", "01", "08"))
|
||||||
|
val ptField2 = List("year", "month", "day").asJava
|
||||||
|
val ptPos2 = Map("year" -> List(new Integer(3)).asJava,
|
||||||
|
"month" -> List(new Integer(4)).asJava,
|
||||||
|
"day" -> List(new Integer(5)).asJava
|
||||||
|
).asJava
|
||||||
|
Assertions.assertEquals("2022/01/08",
|
||||||
|
getPartitionPathFromRow(row2, ptField2, false, ptPos2))
|
||||||
|
Assertions.assertEquals("year=2022/month=01/day=08",
|
||||||
|
getPartitionPathFromRow(row2, ptField2, true, ptPos2))
|
||||||
|
|
||||||
|
/** multiple partitions which contains TimeStamp type or Instant type */
|
||||||
|
val timestamp = Timestamp.valueOf("2020-01-08 10:00:00")
|
||||||
|
val instant = timestamp.toInstant
|
||||||
|
val ptField3 = List("event", "event_time").asJava
|
||||||
|
val ptPos3 = Map("event" -> List(new Integer(3)).asJava,
|
||||||
|
"event_time" -> List(new Integer(4)).asJava
|
||||||
|
).asJava
|
||||||
|
|
||||||
|
// with timeStamp type
|
||||||
|
val row2_ts = Row.fromSeq(Seq(1, "z3", 10.0, "click", timestamp))
|
||||||
|
Assertions.assertEquals("click/2020-01-08 10:00:00.0",
|
||||||
|
getPartitionPathFromRow(row2_ts, ptField3, false, ptPos3))
|
||||||
|
Assertions.assertEquals("event=click/event_time=2020-01-08 10:00:00.0",
|
||||||
|
getPartitionPathFromRow(row2_ts, ptField3, true, ptPos3))
|
||||||
|
|
||||||
|
// with instant type
|
||||||
|
val row2_instant = Row.fromSeq(Seq(1, "z3", 10.0, "click", instant))
|
||||||
|
Assertions.assertEquals("click/2020-01-08 10:00:00.0",
|
||||||
|
getPartitionPathFromRow(row2_instant, ptField3, false, ptPos3))
|
||||||
|
Assertions.assertEquals("event=click/event_time=2020-01-08 10:00:00.0",
|
||||||
|
getPartitionPathFromRow(row2_instant, ptField3, true, ptPos3))
|
||||||
|
|
||||||
|
/** mixed case with plain and nested partitions */
|
||||||
|
val nestedRow4 = Row.fromSeq(Seq(instant, "ad"))
|
||||||
|
val ptField4 = List("event_time").asJava
|
||||||
|
val ptPos4 = Map("event_time" -> List(new Integer(3), new Integer(0)).asJava).asJava
|
||||||
|
// with instant type
|
||||||
|
val row4 = Row.fromSeq(Seq(1, "z3", 10.0, nestedRow4, "click"))
|
||||||
|
Assertions.assertEquals("2020-01-08 10:00:00.0",
|
||||||
|
getPartitionPathFromRow(row4, ptField4, false, ptPos4))
|
||||||
|
Assertions.assertEquals("event_time=2020-01-08 10:00:00.0",
|
||||||
|
getPartitionPathFromRow(row4, ptField4, true, ptPos4))
|
||||||
|
|
||||||
|
val nestedRow5 = Row.fromSeq(Seq(timestamp, "ad"))
|
||||||
|
val ptField5 = List("event", "event_time").asJava
|
||||||
|
val ptPos5 = Map(
|
||||||
|
"event_time" -> List(new Integer(3), new Integer(0)).asJava,
|
||||||
|
"event" -> List(new Integer(4)).asJava
|
||||||
|
).asJava
|
||||||
|
val row5 = Row.fromSeq(Seq(1, "z3", 10.0, nestedRow5, "click"))
|
||||||
|
Assertions.assertEquals("click/2020-01-08 10:00:00.0",
|
||||||
|
getPartitionPathFromRow(row5, ptField5, false, ptPos5))
|
||||||
|
Assertions.assertEquals("event=click/event_time=2020-01-08 10:00:00.0",
|
||||||
|
getPartitionPathFromRow(row5, ptField5, true, ptPos5))
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -330,6 +330,33 @@ class TestCreateTable extends TestHoodieSqlBase {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
test("Test Create Table As Select when 'spark.sql.datetime.java8API.enabled' enables") {
|
||||||
|
try {
|
||||||
|
// enable spark.sql.datetime.java8API.enabled
|
||||||
|
// and use java.time.Instant to replace java.sql.Timestamp to represent TimestampType.
|
||||||
|
spark.conf.set("spark.sql.datetime.java8API.enabled", value = true)
|
||||||
|
|
||||||
|
val tableName = generateTableName
|
||||||
|
spark.sql(
|
||||||
|
s"""
|
||||||
|
|create table $tableName
|
||||||
|
|using hudi
|
||||||
|
|partitioned by(dt)
|
||||||
|
|options(type = 'cow', primaryKey = 'id')
|
||||||
|
|as
|
||||||
|
|select 1 as id, 'a1' as name, 10 as price, cast('2021-05-07 00:00:00' as timestamp) as dt
|
||||||
|
|""".stripMargin
|
||||||
|
)
|
||||||
|
|
||||||
|
checkAnswer(s"select id, name, price, cast(dt as string) from $tableName")(
|
||||||
|
Seq(1, "a1", 10, "2021-05-07 00:00:00")
|
||||||
|
)
|
||||||
|
|
||||||
|
} finally {
|
||||||
|
spark.conf.set("spark.sql.datetime.java8API.enabled", value = false)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
test("Test Create Table From Exist Hoodie Table") {
|
test("Test Create Table From Exist Hoodie Table") {
|
||||||
withTempDir { tmp =>
|
withTempDir { tmp =>
|
||||||
Seq("2021-08-02", "2021/08/02").foreach { partitionValue =>
|
Seq("2021-08-02", "2021/08/02").foreach { partitionValue =>
|
||||||
|
|||||||
@@ -370,6 +370,38 @@ class TestInsertTable extends TestHoodieSqlBase {
|
|||||||
spark.sql("set hoodie.sql.insert.mode = upsert")
|
spark.sql("set hoodie.sql.insert.mode = upsert")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
test("Test Insert timestamp when 'spark.sql.datetime.java8API.enabled' enables") {
|
||||||
|
try {
|
||||||
|
// enable spark.sql.datetime.java8API.enabled
|
||||||
|
// and use java.time.Instant to replace java.sql.Timestamp to represent TimestampType.
|
||||||
|
spark.conf.set("spark.sql.datetime.java8API.enabled", value = true)
|
||||||
|
|
||||||
|
val tableName = generateTableName
|
||||||
|
spark.sql(
|
||||||
|
s"""
|
||||||
|
|create table $tableName (
|
||||||
|
| id int,
|
||||||
|
| name string,
|
||||||
|
| price double,
|
||||||
|
| dt timestamp
|
||||||
|
|)
|
||||||
|
|using hudi
|
||||||
|
|partitioned by(dt)
|
||||||
|
|options(type = 'cow', primaryKey = 'id')
|
||||||
|
|""".stripMargin
|
||||||
|
)
|
||||||
|
|
||||||
|
spark.sql(s"insert into $tableName values (1, 'a1', 10, cast('2021-05-07 00:00:00' as timestamp))")
|
||||||
|
checkAnswer(s"select id, name, price, cast(dt as string) from $tableName")(
|
||||||
|
Seq(1, "a1", 10, "2021-05-07 00:00:00")
|
||||||
|
)
|
||||||
|
|
||||||
|
} finally {
|
||||||
|
spark.conf.set("spark.sql.datetime.java8API.enabled", value = false)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
test("Test bulk insert") {
|
test("Test bulk insert") {
|
||||||
spark.sql("set hoodie.sql.insert.mode = non-strict")
|
spark.sql("set hoodie.sql.insert.mode = non-strict")
|
||||||
withTempDir { tmp =>
|
withTempDir { tmp =>
|
||||||
|
|||||||
Reference in New Issue
Block a user