From ac65189458eff9962e442a6ca3973cb236749840 Mon Sep 17 00:00:00 2001 From: pengzhiwei Date: Fri, 2 Jul 2021 17:39:57 +0800 Subject: [PATCH] [HUDI-2114] Spark Query MOR Table Written By Flink Return Incorrect Timestamp Value (#3208) --- .../hudi/MergeOnReadIncrementalRelation.scala | 12 ++--- .../hudi/MergeOnReadSnapshotRelation.scala | 23 +++++---- .../TestMergeOnReadSnapshotRelation.scala | 50 +++++++++++++++++++ 3 files changed, 67 insertions(+), 18 deletions(-) create mode 100644 hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestMergeOnReadSnapshotRelation.scala diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/MergeOnReadIncrementalRelation.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/MergeOnReadIncrementalRelation.scala index 3beb4630f..b1fcf2fc6 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/MergeOnReadIncrementalRelation.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/MergeOnReadIncrementalRelation.scala @@ -111,15 +111,9 @@ class MergeOnReadIncrementalRelation(val sqlContext: SQLContext, val lessThanFilter = LessThanOrEqual(HoodieRecord.COMMIT_TIME_METADATA_FIELD, commitsToReturn.last.getTimestamp) filters :+isNotNullFilter :+ largerThanFilter :+ lessThanFilter } - var requiredStructSchema = StructType(Seq()) - requiredColumns.foreach(col => { - val field = tableStructSchema.find(_.name == col) - if (field.isDefined) { - requiredStructSchema = requiredStructSchema.add(field.get) - } - }) - val requiredAvroSchema = AvroConversionUtils - .convertStructTypeToAvroSchema(requiredStructSchema, tableAvroSchema.getName, tableAvroSchema.getNamespace) + val (requiredAvroSchema, requiredStructSchema) = + MergeOnReadSnapshotRelation.getRequiredSchema(tableAvroSchema, requiredColumns) + val hoodieTableState = HoodieMergeOnReadTableState( tableStructSchema, requiredStructSchema, diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/MergeOnReadSnapshotRelation.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/MergeOnReadSnapshotRelation.scala index 8c478e1e9..f20bb4d25 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/MergeOnReadSnapshotRelation.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/MergeOnReadSnapshotRelation.scala @@ -18,6 +18,7 @@ package org.apache.hudi +import org.apache.avro.Schema import org.apache.hudi.common.model.HoodieBaseFile import org.apache.hudi.common.table.{HoodieTableMetaClient, TableSchemaResolver} import org.apache.hudi.common.table.view.HoodieTableFileSystemView @@ -94,16 +95,9 @@ class MergeOnReadSnapshotRelation(val sqlContext: SQLContext, override def buildScan(requiredColumns: Array[String], filters: Array[Filter]): RDD[Row] = { log.debug(s" buildScan requiredColumns = ${requiredColumns.mkString(",")}") log.debug(s" buildScan filters = ${filters.mkString(",")}") - var requiredStructSchema = StructType(Seq()) - requiredColumns.foreach(col => { - val field = tableStructSchema.find(_.name == col) - if (field.isDefined) { - requiredStructSchema = requiredStructSchema.add(field.get) - } - }) - val requiredAvroSchema = AvroConversionUtils - .convertStructTypeToAvroSchema(requiredStructSchema, tableAvroSchema.getName, tableAvroSchema.getNamespace) + val (requiredAvroSchema, requiredStructSchema) = + MergeOnReadSnapshotRelation.getRequiredSchema(tableAvroSchema, requiredColumns) val fileIndex = buildFileIndex(filters) val hoodieTableState = HoodieMergeOnReadTableState( tableStructSchema, @@ -211,4 +205,15 @@ object MergeOnReadSnapshotRelation { // when create PartitionedFile. path.toUri.toString } + + def getRequiredSchema(tableAvroSchema: Schema, requiredColumns: Array[String]): (Schema, StructType) = { + // First get the required avro-schema, then convert the avro-schema to spark schema. + val name2Fields = tableAvroSchema.getFields.asScala.map(f => f.name() -> f).toMap + val requiredFields = requiredColumns.map(c => name2Fields(c)) + .map(f => new Schema.Field(f.name(), f.schema(), f.doc(), f.defaultVal(), f.order())).toList + val requiredAvroSchema = Schema.createRecord(tableAvroSchema.getName, tableAvroSchema.getDoc, + tableAvroSchema.getNamespace, tableAvroSchema.isError, requiredFields.asJava) + val requiredStructSchema = AvroConversionUtils.convertAvroSchemaToStructType(requiredAvroSchema) + (requiredAvroSchema, requiredStructSchema) + } } diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestMergeOnReadSnapshotRelation.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestMergeOnReadSnapshotRelation.scala new file mode 100644 index 000000000..80a883a00 --- /dev/null +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestMergeOnReadSnapshotRelation.scala @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi + +import org.apache.avro.Schema +import org.apache.spark.sql.types.TimestampType +import org.junit.jupiter.api.Assertions.assertEquals +import org.junit.jupiter.api.Test + +class TestMergeOnReadSnapshotRelation { + + @Test + def testGetRequiredSchema(): Unit = { + val avroSchemaString = "{\"type\":\"record\",\"name\":\"record\"," + + "\"fields\":[{\"name\":\"_hoodie_commit_time\",\"type\":[\"null\",\"string\"],\"doc\":\"\",\"default\":null}," + + "{\"name\":\"_hoodie_commit_seqno\",\"type\":[\"null\",\"string\"],\"doc\":\"\",\"default\":null}," + + "{\"name\":\"_hoodie_record_key\",\"type\":[\"null\",\"string\"],\"doc\":\"\",\"default\":null}," + + "{\"name\":\"_hoodie_partition_path\",\"type\":[\"null\",\"string\"],\"doc\":\"\",\"default\":null}," + + "{\"name\":\"_hoodie_file_name\",\"type\":[\"null\",\"string\"],\"doc\":\"\",\"default\":null}," + + "{\"name\":\"uuid\",\"type\":\"string\"},{\"name\":\"name\",\"type\":[\"null\",\"string\"],\"default\":null}," + + "{\"name\":\"age\",\"type\":[\"null\",\"int\"],\"default\":null}," + + "{\"name\":\"ts\",\"type\":[\"null\",{\"type\":\"long\",\"logicalType\":\"timestamp-millis\"}],\"default\":null}," + + "{\"name\":\"partition\",\"type\":[\"null\",\"string\"],\"default\":null}]}" + + val tableAvroSchema = new Schema.Parser().parse(avroSchemaString) + + val (requiredAvroSchema, requiredStructSchema) = + MergeOnReadSnapshotRelation.getRequiredSchema(tableAvroSchema, Array("ts")) + + assertEquals("timestamp-millis", + requiredAvroSchema.getField("ts").schema().getTypes.get(1).getLogicalType.getName) + assertEquals(TimestampType, requiredStructSchema.fields(0).dataType) + } +}