Revert "[HUDI-2495] Resolve inconsistent key generation for timestamp types by GenericRecord and Row (#3944)" (#4201)
This commit is contained in:
@@ -59,7 +59,6 @@ import java.io.OutputStream;
|
|||||||
import java.math.BigDecimal;
|
import java.math.BigDecimal;
|
||||||
import java.nio.ByteBuffer;
|
import java.nio.ByteBuffer;
|
||||||
import java.nio.charset.StandardCharsets;
|
import java.nio.charset.StandardCharsets;
|
||||||
import java.sql.Timestamp;
|
|
||||||
import java.time.LocalDate;
|
import java.time.LocalDate;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
@@ -542,8 +541,6 @@ public class HoodieAvroUtils {
|
|||||||
private static Object convertValueForAvroLogicalTypes(Schema fieldSchema, Object fieldValue) {
|
private static Object convertValueForAvroLogicalTypes(Schema fieldSchema, Object fieldValue) {
|
||||||
if (fieldSchema.getLogicalType() == LogicalTypes.date()) {
|
if (fieldSchema.getLogicalType() == LogicalTypes.date()) {
|
||||||
return LocalDate.ofEpochDay(Long.parseLong(fieldValue.toString()));
|
return LocalDate.ofEpochDay(Long.parseLong(fieldValue.toString()));
|
||||||
} else if (fieldSchema.getLogicalType() == LogicalTypes.timestampMicros()) {
|
|
||||||
return new Timestamp(Long.parseLong(fieldValue.toString()) / 1000);
|
|
||||||
} else if (fieldSchema.getLogicalType() instanceof LogicalTypes.Decimal) {
|
} else if (fieldSchema.getLogicalType() instanceof LogicalTypes.Decimal) {
|
||||||
Decimal dc = (Decimal) fieldSchema.getLogicalType();
|
Decimal dc = (Decimal) fieldSchema.getLogicalType();
|
||||||
DecimalConversion decimalConversion = new DecimalConversion();
|
DecimalConversion decimalConversion = new DecimalConversion();
|
||||||
|
|||||||
@@ -17,7 +17,6 @@
|
|||||||
|
|
||||||
package org.apache.spark.sql.hudi.command
|
package org.apache.spark.sql.hudi.command
|
||||||
|
|
||||||
import java.sql.Timestamp
|
|
||||||
import java.util.concurrent.TimeUnit.{MICROSECONDS, MILLISECONDS}
|
import java.util.concurrent.TimeUnit.{MICROSECONDS, MILLISECONDS}
|
||||||
|
|
||||||
import org.apache.avro.generic.GenericRecord
|
import org.apache.avro.generic.GenericRecord
|
||||||
@@ -97,7 +96,7 @@ class SqlKeyGenerator(props: TypedProperties) extends ComplexKeyGenerator(props)
|
|||||||
val timeMs = if (rowType) { // In RowType, the partitionPathValue is the time format string, convert to millis
|
val timeMs = if (rowType) { // In RowType, the partitionPathValue is the time format string, convert to millis
|
||||||
SqlKeyGenerator.sqlTimestampFormat.parseMillis(_partitionValue)
|
SqlKeyGenerator.sqlTimestampFormat.parseMillis(_partitionValue)
|
||||||
} else {
|
} else {
|
||||||
Timestamp.valueOf(_partitionValue).getTime
|
MILLISECONDS.convert(_partitionValue.toLong, MICROSECONDS)
|
||||||
}
|
}
|
||||||
val timestampFormat = PartitionPathEncodeUtils.escapePathName(
|
val timestampFormat = PartitionPathEncodeUtils.escapePathName(
|
||||||
SqlKeyGenerator.timestampTimeFormat.print(timeMs))
|
SqlKeyGenerator.timestampTimeFormat.print(timeMs))
|
||||||
|
|||||||
@@ -1,104 +0,0 @@
|
|||||||
/*
|
|
||||||
* Licensed to the Apache Software Foundation (ASF) under one
|
|
||||||
* or more contributor license agreements. See the NOTICE file
|
|
||||||
* distributed with this work for additional information
|
|
||||||
* regarding copyright ownership. The ASF licenses this file
|
|
||||||
* to you under the Apache License, Version 2.0 (the
|
|
||||||
* "License"); you may not use this file except in compliance
|
|
||||||
* with the License. You may obtain a copy of the License at
|
|
||||||
*
|
|
||||||
* http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
*
|
|
||||||
* Unless required by applicable law or agreed to in writing, software
|
|
||||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
* See the License for the specific language governing permissions and
|
|
||||||
* limitations under the License.
|
|
||||||
*/
|
|
||||||
|
|
||||||
package org.apache.hudi
|
|
||||||
|
|
||||||
import org.apache.hudi.config.HoodieWriteConfig
|
|
||||||
import org.apache.hudi.testutils.SparkClientFunctionalTestHarness
|
|
||||||
import org.apache.spark.sql.DataFrame
|
|
||||||
import org.junit.jupiter.api.Test
|
|
||||||
|
|
||||||
import java.sql.{Date, Timestamp}
|
|
||||||
|
|
||||||
class TestGenericRecordAndRowConsistency extends SparkClientFunctionalTestHarness {
|
|
||||||
|
|
||||||
val commonOpts = Map(
|
|
||||||
HoodieWriteConfig.TBL_NAME.key -> "hoodie_type_consistency_tbl",
|
|
||||||
"hoodie.insert.shuffle.parallelism" -> "1",
|
|
||||||
"hoodie.upsert.shuffle.parallelism" -> "1",
|
|
||||||
DataSourceWriteOptions.TABLE_TYPE.key -> "COPY_ON_WRITE",
|
|
||||||
DataSourceWriteOptions.RECORDKEY_FIELD.key -> "str,eventTime",
|
|
||||||
DataSourceWriteOptions.PRECOMBINE_FIELD.key -> "typeId",
|
|
||||||
DataSourceWriteOptions.PARTITIONPATH_FIELD.key -> "typeId",
|
|
||||||
DataSourceWriteOptions.KEYGENERATOR_CLASS_NAME.key -> "org.apache.hudi.keygen.ComplexKeyGenerator"
|
|
||||||
)
|
|
||||||
|
|
||||||
@Test
|
|
||||||
def testTimestampTypeConsistency(): Unit = {
|
|
||||||
val _spark = spark
|
|
||||||
import _spark.implicits._
|
|
||||||
|
|
||||||
val df = Seq(
|
|
||||||
(1, Timestamp.valueOf("2014-01-01 23:00:01"), "abc"),
|
|
||||||
(1, Timestamp.valueOf("2014-11-30 12:40:32"), "abc"),
|
|
||||||
(2, Timestamp.valueOf("2016-12-29 09:54:00"), "def"),
|
|
||||||
(2, Timestamp.valueOf("2016-05-09 10:12:43"), "def")
|
|
||||||
).toDF("typeId", "eventTime", "str")
|
|
||||||
|
|
||||||
testConsistencyBetweenGenericRecordAndRow(df)
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test
|
|
||||||
def testDateTypeConsistency(): Unit = {
|
|
||||||
val _spark = spark
|
|
||||||
import _spark.implicits._
|
|
||||||
|
|
||||||
val df = Seq(
|
|
||||||
(1, Date.valueOf("2014-01-01"), "abc"),
|
|
||||||
(1, Date.valueOf("2014-11-30"), "abc"),
|
|
||||||
(2, Date.valueOf("2016-12-29"), "def"),
|
|
||||||
(2, Date.valueOf("2016-05-09"), "def")
|
|
||||||
).toDF("typeId", "eventTime", "str")
|
|
||||||
|
|
||||||
testConsistencyBetweenGenericRecordAndRow(df)
|
|
||||||
}
|
|
||||||
|
|
||||||
private def testConsistencyBetweenGenericRecordAndRow(df: DataFrame): Unit = {
|
|
||||||
val _spark = spark
|
|
||||||
import _spark.implicits._
|
|
||||||
|
|
||||||
// upsert operation generate recordKey by GenericRecord
|
|
||||||
val tempRecordPath = basePath + "/record_tbl/"
|
|
||||||
df.write.format("hudi")
|
|
||||||
.options(commonOpts)
|
|
||||||
.option(DataSourceWriteOptions.OPERATION.key, "upsert")
|
|
||||||
.mode(org.apache.spark.sql.SaveMode.Overwrite)
|
|
||||||
.save(tempRecordPath)
|
|
||||||
|
|
||||||
val data1 = spark.read.format("hudi")
|
|
||||||
.load(tempRecordPath)
|
|
||||||
.select("_hoodie_record_key")
|
|
||||||
.map(_.toString()).collect().sorted
|
|
||||||
|
|
||||||
// bulk_insert operation generate recordKey by Row
|
|
||||||
val tempRowPath = basePath + "/row_tbl/"
|
|
||||||
df.write.format("hudi")
|
|
||||||
.options(commonOpts)
|
|
||||||
.option(DataSourceWriteOptions.OPERATION.key, "bulk_insert")
|
|
||||||
.mode(org.apache.spark.sql.SaveMode.Overwrite)
|
|
||||||
.save(tempRowPath)
|
|
||||||
|
|
||||||
val data2 = spark.read.format("hudi")
|
|
||||||
.load(tempRowPath)
|
|
||||||
.select("_hoodie_record_key")
|
|
||||||
.map(_.toString()).collect().sorted
|
|
||||||
|
|
||||||
assert(data1 sameElements data2)
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
|
||||||
Reference in New Issue
Block a user