1
0

[HUDI-2114] Spark Query MOR Table Written By Flink Return Incorrect Timestamp Value (#3208)

This commit is contained in:
pengzhiwei
2021-07-02 17:39:57 +08:00
committed by GitHub
parent 7462fdefc3
commit ac65189458
3 changed files with 67 additions and 18 deletions

View File

@@ -111,15 +111,9 @@ class MergeOnReadIncrementalRelation(val sqlContext: SQLContext,
val lessThanFilter = LessThanOrEqual(HoodieRecord.COMMIT_TIME_METADATA_FIELD, commitsToReturn.last.getTimestamp)
filters :+isNotNullFilter :+ largerThanFilter :+ lessThanFilter
}
var requiredStructSchema = StructType(Seq())
requiredColumns.foreach(col => {
val field = tableStructSchema.find(_.name == col)
if (field.isDefined) {
requiredStructSchema = requiredStructSchema.add(field.get)
}
})
val requiredAvroSchema = AvroConversionUtils
.convertStructTypeToAvroSchema(requiredStructSchema, tableAvroSchema.getName, tableAvroSchema.getNamespace)
val (requiredAvroSchema, requiredStructSchema) =
MergeOnReadSnapshotRelation.getRequiredSchema(tableAvroSchema, requiredColumns)
val hoodieTableState = HoodieMergeOnReadTableState(
tableStructSchema,
requiredStructSchema,

View File

@@ -18,6 +18,7 @@
package org.apache.hudi
import org.apache.avro.Schema
import org.apache.hudi.common.model.HoodieBaseFile
import org.apache.hudi.common.table.{HoodieTableMetaClient, TableSchemaResolver}
import org.apache.hudi.common.table.view.HoodieTableFileSystemView
@@ -94,16 +95,9 @@ class MergeOnReadSnapshotRelation(val sqlContext: SQLContext,
override def buildScan(requiredColumns: Array[String], filters: Array[Filter]): RDD[Row] = {
log.debug(s" buildScan requiredColumns = ${requiredColumns.mkString(",")}")
log.debug(s" buildScan filters = ${filters.mkString(",")}")
var requiredStructSchema = StructType(Seq())
requiredColumns.foreach(col => {
val field = tableStructSchema.find(_.name == col)
if (field.isDefined) {
requiredStructSchema = requiredStructSchema.add(field.get)
}
})
val requiredAvroSchema = AvroConversionUtils
.convertStructTypeToAvroSchema(requiredStructSchema, tableAvroSchema.getName, tableAvroSchema.getNamespace)
val (requiredAvroSchema, requiredStructSchema) =
MergeOnReadSnapshotRelation.getRequiredSchema(tableAvroSchema, requiredColumns)
val fileIndex = buildFileIndex(filters)
val hoodieTableState = HoodieMergeOnReadTableState(
tableStructSchema,
@@ -211,4 +205,15 @@ object MergeOnReadSnapshotRelation {
// when create PartitionedFile.
path.toUri.toString
}
def getRequiredSchema(tableAvroSchema: Schema, requiredColumns: Array[String]): (Schema, StructType) = {
// First get the required avro-schema, then convert the avro-schema to spark schema.
val name2Fields = tableAvroSchema.getFields.asScala.map(f => f.name() -> f).toMap
val requiredFields = requiredColumns.map(c => name2Fields(c))
.map(f => new Schema.Field(f.name(), f.schema(), f.doc(), f.defaultVal(), f.order())).toList
val requiredAvroSchema = Schema.createRecord(tableAvroSchema.getName, tableAvroSchema.getDoc,
tableAvroSchema.getNamespace, tableAvroSchema.isError, requiredFields.asJava)
val requiredStructSchema = AvroConversionUtils.convertAvroSchemaToStructType(requiredAvroSchema)
(requiredAvroSchema, requiredStructSchema)
}
}

View File

@@ -0,0 +1,50 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi
import org.apache.avro.Schema
import org.apache.spark.sql.types.TimestampType
import org.junit.jupiter.api.Assertions.assertEquals
import org.junit.jupiter.api.Test
class TestMergeOnReadSnapshotRelation {
@Test
def testGetRequiredSchema(): Unit = {
val avroSchemaString = "{\"type\":\"record\",\"name\":\"record\"," +
"\"fields\":[{\"name\":\"_hoodie_commit_time\",\"type\":[\"null\",\"string\"],\"doc\":\"\",\"default\":null}," +
"{\"name\":\"_hoodie_commit_seqno\",\"type\":[\"null\",\"string\"],\"doc\":\"\",\"default\":null}," +
"{\"name\":\"_hoodie_record_key\",\"type\":[\"null\",\"string\"],\"doc\":\"\",\"default\":null}," +
"{\"name\":\"_hoodie_partition_path\",\"type\":[\"null\",\"string\"],\"doc\":\"\",\"default\":null}," +
"{\"name\":\"_hoodie_file_name\",\"type\":[\"null\",\"string\"],\"doc\":\"\",\"default\":null}," +
"{\"name\":\"uuid\",\"type\":\"string\"},{\"name\":\"name\",\"type\":[\"null\",\"string\"],\"default\":null}," +
"{\"name\":\"age\",\"type\":[\"null\",\"int\"],\"default\":null}," +
"{\"name\":\"ts\",\"type\":[\"null\",{\"type\":\"long\",\"logicalType\":\"timestamp-millis\"}],\"default\":null}," +
"{\"name\":\"partition\",\"type\":[\"null\",\"string\"],\"default\":null}]}"
val tableAvroSchema = new Schema.Parser().parse(avroSchemaString)
val (requiredAvroSchema, requiredStructSchema) =
MergeOnReadSnapshotRelation.getRequiredSchema(tableAvroSchema, Array("ts"))
assertEquals("timestamp-millis",
requiredAvroSchema.getField("ts").schema().getTypes.get(1).getLogicalType.getName)
assertEquals(TimestampType, requiredStructSchema.fields(0).dataType)
}
}