1
0

[HUDI-1296] Support Metadata Table in Spark Datasource (#4789)

* Bootstrapping initial support for Metadata Table in Spark Datasource

- Consolidated Avro/Row conversion utilities to center around Spark's AvroDeserializer ; removed duplication
- Bootstrapped HoodieBaseRelation
- Updated HoodieMergeOnReadRDD to be able to handle Metadata Table
- Modified MOR relations to be able to read different Base File formats (Parquet, HFile)
This commit is contained in:
Alexey Kudinkin
2022-02-24 13:23:13 -08:00
committed by GitHub
parent 521338b4d9
commit 85e8a5c4de
56 changed files with 1634 additions and 1010 deletions

View File

@@ -18,13 +18,13 @@
package org.apache.hudi
import java.time.LocalDate
import org.apache.avro.Schema
import org.apache.avro.generic.GenericData
import org.apache.spark.sql.catalyst.expressions.GenericRow
import org.scalatest.{FunSuite, Matchers}
import java.time.LocalDate
class TestAvroConversionHelper extends FunSuite with Matchers {
val dateSchema = s"""
@@ -42,7 +42,7 @@ class TestAvroConversionHelper extends FunSuite with Matchers {
test("Logical type: date") {
val schema = new Schema.Parser().parse(dateSchema)
val convertor = AvroConversionHelper.createConverterToRow(schema, AvroConversionUtils.convertAvroSchemaToStructType(schema))
val convertor = AvroConversionUtils.createConverterToRow(schema, AvroConversionUtils.convertAvroSchemaToStructType(schema))
val dateOutputData = dateInputData.map(x => {
val record = new GenericData.Record(schema) {{ put("date", x) }}

View File

@@ -243,7 +243,7 @@ class TestDataSourceDefaults {
val partitionPathProp: String = props.getString(DataSourceWriteOptions.PARTITIONPATH_FIELD.key)
val STRUCT_NAME: String = "hoodieRowTopLevelField"
val NAMESPACE: String = "hoodieRow"
var converterFn: Function1[Any, Any] = _
var converterFn: Function1[Row, GenericRecord] = _
override def getKey(record: GenericRecord): HoodieKey = {
new HoodieKey(HoodieAvroUtils.getNestedFieldValAsString(record, recordKeyProp, true, false),
@@ -251,13 +251,13 @@ class TestDataSourceDefaults {
}
override def getRecordKey(row: Row): String = {
if (null == converterFn) converterFn = AvroConversionHelper.createConverterToAvro(row.schema, STRUCT_NAME, NAMESPACE)
if (null == converterFn) converterFn = AvroConversionUtils.createConverterToAvro(row.schema, STRUCT_NAME, NAMESPACE)
val genericRecord = converterFn.apply(row).asInstanceOf[GenericRecord]
getKey(genericRecord).getRecordKey
}
override def getPartitionPath(row: Row): String = {
if (null == converterFn) converterFn = AvroConversionHelper.createConverterToAvro(row.schema, STRUCT_NAME, NAMESPACE)
if (null == converterFn) converterFn = AvroConversionUtils.createConverterToAvro(row.schema, STRUCT_NAME, NAMESPACE)
val genericRecord = converterFn.apply(row).asInstanceOf[GenericRecord]
getKey(genericRecord).getPartitionPath
}

View File

@@ -32,13 +32,13 @@ import org.apache.hudi.functional.TestBootstrap
import org.apache.hudi.hive.HiveSyncConfig
import org.apache.hudi.keygen.{ComplexKeyGenerator, NonpartitionedKeyGenerator, SimpleKeyGenerator}
import org.apache.hudi.testutils.DataSourceTestUtils
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.api.java.JavaSparkContext
import org.apache.spark.sql._
import org.apache.spark.sql.functions.{expr, lit}
import org.apache.spark.sql.hudi.HoodieSparkSessionExtension
import org.apache.spark.sql.hudi.command.SqlKeyGenerator
import org.apache.spark.sql.internal.{SQLConf, StaticSQLConf}
import org.apache.spark.{SparkConf, SparkContext}
import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse, assertTrue, fail}
import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
import org.junit.jupiter.params.ParameterizedTest
@@ -827,33 +827,32 @@ class TestHoodieSparkSqlWriter {
/**
* Test case for non partition table with metatable support.
*/
@Test
def testNonPartitionTableWithMetatableSupport(): Unit = {
List(DataSourceWriteOptions.COW_TABLE_TYPE_OPT_VAL, DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL).foreach { tableType =>
val options = Map(DataSourceWriteOptions.TABLE_TYPE.key -> tableType,
DataSourceWriteOptions.PRECOMBINE_FIELD.key -> "col3",
DataSourceWriteOptions.RECORDKEY_FIELD.key -> "keyid",
DataSourceWriteOptions.PARTITIONPATH_FIELD.key -> "",
DataSourceWriteOptions.KEYGENERATOR_CLASS_NAME.key -> "org.apache.hudi.keygen.NonpartitionedKeyGenerator",
HoodieWriteConfig.TBL_NAME.key -> "hoodie_test",
"hoodie.insert.shuffle.parallelism" -> "1",
"hoodie.metadata.enable" -> "true")
val df = spark.range(0, 10).toDF("keyid")
.withColumn("col3", expr("keyid"))
.withColumn("age", expr("keyid + 1000"))
df.write.format("hudi")
.options(options.updated(DataSourceWriteOptions.OPERATION.key, "insert"))
.mode(SaveMode.Overwrite).save(tempBasePath)
// upsert same record again
val df_update = spark.range(0, 10).toDF("keyid")
.withColumn("col3", expr("keyid"))
.withColumn("age", expr("keyid + 2000"))
df_update.write.format("hudi")
.options(options.updated(DataSourceWriteOptions.OPERATION.key, "upsert"))
.mode(SaveMode.Append).save(tempBasePath)
assert(spark.read.format("hudi").load(tempBasePath).count() == 10)
assert(spark.read.format("hudi").load(tempBasePath).where("age >= 2000").count() == 10)
}
@ParameterizedTest
@EnumSource(value = classOf[HoodieTableType])
def testNonPartitionTableWithMetatableSupport(tableType: HoodieTableType): Unit = {
val options = Map(DataSourceWriteOptions.TABLE_TYPE.key -> tableType.name,
DataSourceWriteOptions.PRECOMBINE_FIELD.key -> "col3",
DataSourceWriteOptions.RECORDKEY_FIELD.key -> "keyid",
DataSourceWriteOptions.PARTITIONPATH_FIELD.key -> "",
DataSourceWriteOptions.KEYGENERATOR_CLASS_NAME.key -> "org.apache.hudi.keygen.NonpartitionedKeyGenerator",
HoodieWriteConfig.TBL_NAME.key -> "hoodie_test",
"hoodie.insert.shuffle.parallelism" -> "1",
"hoodie.metadata.enable" -> "true")
val df = spark.range(0, 10).toDF("keyid")
.withColumn("col3", expr("keyid"))
.withColumn("age", expr("keyid + 1000"))
df.write.format("hudi")
.options(options.updated(DataSourceWriteOptions.OPERATION.key, "insert"))
.mode(SaveMode.Overwrite).save(tempBasePath)
// upsert same record again
val df_update = spark.range(0, 10).toDF("keyid")
.withColumn("col3", expr("keyid"))
.withColumn("age", expr("keyid + 2000"))
df_update.write.format("hudi")
.options(options.updated(DataSourceWriteOptions.OPERATION.key, "upsert"))
.mode(SaveMode.Append).save(tempBasePath)
assert(spark.read.format("hudi").load(tempBasePath).count() == 10)
assert(spark.read.format("hudi").load(tempBasePath).where("age >= 2000").count() == 10)
}
/**

View File

@@ -20,22 +20,18 @@ package org.apache.hudi
import org.apache.avro.Schema
import org.apache.avro.generic.GenericRecord
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.apache.hudi.exception.SchemaCompatibilityException
import org.apache.hudi.testutils.DataSourceTestUtils
import org.apache.spark.sql.types.{StructType, TimestampType}
import org.apache.spark.sql.{Row, SparkSession}
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.Test
import org.junit.jupiter.api.io.TempDir
import java.io.File
import java.nio.file.Paths
import scala.collection.JavaConverters
class TestHoodieSparkUtils {
@@ -232,8 +228,9 @@ class TestHoodieSparkUtils {
fail("createRdd should fail, because records don't have a column which is not nullable in the passed in schema")
} catch {
case e: Exception =>
e.getCause.asInstanceOf[NullPointerException]
assertTrue(e.getMessage.contains("null of string in field new_nested_col of"))
val cause = e.getCause
assertTrue(cause.isInstanceOf[SchemaCompatibilityException])
assertTrue(e.getMessage.contains("Unable to validate the rewritten record {\"innerKey\": \"innerKey1_2\", \"innerValue\": 2} against schema"))
}
spark.stop()
}

View File

@@ -17,11 +17,11 @@
package org.apache.hudi.functional
import org.apache.avro.generic.GenericRecord
import org.apache.hadoop.fs.Path
import org.apache.hudi.DataSourceWriteOptions._
import org.apache.hudi.common.config.HoodieMetadataConfig
import org.apache.hudi.common.model.{DefaultHoodieRecordPayload, HoodieTableType}
import org.apache.hudi.common.model.{DefaultHoodieRecordPayload, HoodieRecord, HoodieRecordPayload, HoodieTableType}
import org.apache.hudi.common.table.HoodieTableMetaClient
import org.apache.hudi.common.testutils.HoodieTestDataGenerator
import org.apache.hudi.common.testutils.RawTripTestPayload.recordsToStrings
@@ -30,18 +30,18 @@ import org.apache.hudi.index.HoodieIndex.IndexType
import org.apache.hudi.keygen.NonpartitionedKeyGenerator
import org.apache.hudi.keygen.constant.KeyGeneratorOptions.Config
import org.apache.hudi.testutils.{DataSourceTestUtils, HoodieClientTestBase}
import org.apache.hudi.{DataSourceReadOptions, DataSourceWriteOptions, HoodieDataSourceHelpers, HoodieSparkUtils}
import org.apache.hudi.{AvroConversionUtils, DataSourceReadOptions, DataSourceWriteOptions, HoodieDataSourceHelpers, HoodieSparkUtils}
import org.apache.log4j.LogManager
import org.apache.spark.rdd.RDD
import org.apache.spark.sql._
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types.BooleanType
import org.junit.jupiter.api.Assertions.{assertEquals, assertTrue}
import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.CsvSource
import java.util
import scala.collection.JavaConversions._
import scala.collection.JavaConverters._
@@ -349,11 +349,15 @@ class TestMORDataSource extends HoodieClientTestBase {
// First Operation:
// Producing parquet files to three default partitions.
// SNAPSHOT view on MOR table with parquet files only.
// Overriding the partition-path field
val opts = commonOpts + (DataSourceWriteOptions.PARTITIONPATH_FIELD.key -> "partition_path")
val hoodieRecords1 = dataGen.generateInserts("001", 100)
val records1 = recordsToStrings(hoodieRecords1).toList
val inputDF1 = spark.read.json(spark.sparkContext.parallelize(records1, 2))
val inputDF1 = toDataset(hoodieRecords1)
inputDF1.write.format("org.apache.hudi")
.options(commonOpts)
.options(opts)
.option("hoodie.compact.inline", "false") // else fails due to compaction & deltacommit instant times being same
.option(DataSourceWriteOptions.OPERATION.key, DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL)
.option(DataSourceWriteOptions.TABLE_TYPE.key, DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL)
@@ -376,11 +380,10 @@ class TestMORDataSource extends HoodieClientTestBase {
// Second Operation:
// Upsert 50 update records
// Snopshot view should read 100 records
val records2 = recordsToStrings(dataGen.generateUniqueUpdates("002", 50))
.toList
val inputDF2: Dataset[Row] = spark.read.json(spark.sparkContext.parallelize(records2, 2))
val records2 = dataGen.generateUniqueUpdates("002", 50)
val inputDF2 = toDataset(records2)
inputDF2.write.format("org.apache.hudi")
.options(commonOpts)
.options(opts)
.mode(SaveMode.Append)
.save(basePath)
val hudiSnapshotDF2 = spark.read.format("org.apache.hudi")
@@ -424,17 +427,31 @@ class TestMORDataSource extends HoodieClientTestBase {
verifyShow(hudiIncDF2)
verifyShow(hudiIncDF1Skipmerge)
val record3 = recordsToStrings(dataGen.generateUpdatesWithTS("003", hoodieRecords1, -1))
spark.read.json(spark.sparkContext.parallelize(record3, 2))
.write.format("org.apache.hudi").options(commonOpts)
val record3 = dataGen.generateUpdatesWithTS("003", hoodieRecords1, -1)
val inputDF3 = toDataset(record3)
inputDF3.write.format("org.apache.hudi").options(opts)
.mode(SaveMode.Append).save(basePath)
val hudiSnapshotDF3 = spark.read.format("org.apache.hudi")
.option(DataSourceReadOptions.QUERY_TYPE.key, DataSourceReadOptions.QUERY_TYPE_SNAPSHOT_OPT_VAL)
.load(basePath + "/*/*/*/*")
verifyShow(hudiSnapshotDF3);
assertEquals(100, hudiSnapshotDF3.count())
assertEquals(0, hudiSnapshotDF3.filter("rider = 'rider-003'").count())
}
private def toDataset(records: util.List[HoodieRecord[_]]) = {
val avroRecords = records.map(_.getData
.asInstanceOf[HoodieRecordPayload[_]]
.getInsertValue(HoodieTestDataGenerator.AVRO_SCHEMA)
.get
.asInstanceOf[GenericRecord])
val rdd: RDD[GenericRecord] = spark.sparkContext.parallelize(avroRecords, 2)
AvroConversionUtils.createDataFrame(rdd, HoodieTestDataGenerator.AVRO_SCHEMA.toString, spark)
}
@Test
def testVectorizedReader() {
spark.conf.set("spark.sql.parquet.enableVectorizedReader", true)
@@ -553,15 +570,10 @@ class TestMORDataSource extends HoodieClientTestBase {
.orderBy(desc("_hoodie_commit_time"))
.head()
assertEquals(sampleRow.getDouble(0), sampleRow.get(0))
assertEquals(sampleRow.getLong(1), sampleRow.get(1))
assertEquals(sampleRow.getDate(1), sampleRow.get(1))
assertEquals(sampleRow.getString(2), sampleRow.get(2))
assertEquals(sampleRow.getSeq(3), sampleRow.get(3))
if (HoodieSparkUtils.gteqSpark3_2) {
// Since Spark3.2, the `nation` column is parsed as String, not Struct.
assertEquals(sampleRow.getString(4), sampleRow.get(4))
} else {
assertEquals(sampleRow.getStruct(4), sampleRow.get(4))
}
assertEquals(sampleRow.getAs[Array[Byte]](4), sampleRow.get(4))
}
def verifyShow(df: DataFrame): Unit = {

View File

@@ -0,0 +1,98 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.functional
import org.apache.hudi.DataSourceWriteOptions
import org.apache.hudi.common.config.HoodieMetadataConfig
import org.apache.hudi.common.testutils.HoodieTestDataGenerator
import org.apache.hudi.common.testutils.RawTripTestPayload.recordsToStrings
import org.apache.hudi.config.HoodieWriteConfig
import org.apache.hudi.testutils.SparkClientFunctionalTestHarness
import org.apache.spark.sql.SaveMode
import org.junit.jupiter.api.Assertions.assertEquals
import org.junit.jupiter.api.{Tag, Test}
import scala.collection.JavaConverters._
@Tag("functional")
class TestMetadataTableWithSparkDataSource extends SparkClientFunctionalTestHarness {
val hudi = "org.apache.hudi"
var commonOpts = Map(
"hoodie.insert.shuffle.parallelism" -> "4",
"hoodie.upsert.shuffle.parallelism" -> "4",
"hoodie.bulkinsert.shuffle.parallelism" -> "2",
"hoodie.delete.shuffle.parallelism" -> "1",
DataSourceWriteOptions.RECORDKEY_FIELD.key -> "_row_key",
DataSourceWriteOptions.PARTITIONPATH_FIELD.key -> "partition",
DataSourceWriteOptions.PRECOMBINE_FIELD.key -> "timestamp",
HoodieWriteConfig.TBL_NAME.key -> "hoodie_test"
)
@Test
def testReadability(): Unit = {
val dataGen = new HoodieTestDataGenerator()
val opts: Map[String, String] = commonOpts ++ Map(
HoodieMetadataConfig.ENABLE.key -> "true",
HoodieMetadataConfig.COMPACT_NUM_DELTA_COMMITS.key -> "1"
)
// Insert records
val newRecords = dataGen.generateInserts("001", 100)
val newRecordsDF = parseRecords(recordsToStrings(newRecords).asScala)
newRecordsDF.write.format(hudi)
.options(opts)
.option(DataSourceWriteOptions.OPERATION.key, DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL)
.mode(SaveMode.Append)
.save(basePath)
// Update records
val updatedRecords = dataGen.generateUpdates("002", newRecords)
val updatedRecordsDF = parseRecords(recordsToStrings(updatedRecords).asScala)
updatedRecordsDF.write.format(hudi)
.options(opts)
.option(DataSourceWriteOptions.OPERATION.key, DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL)
.mode(SaveMode.Append)
.save(basePath)
val metadataDF = spark.read.format(hudi).load(s"$basePath/.hoodie/metadata")
// Smoke test
metadataDF.show()
// Query w/ 0 requested columns should be working fine
assertEquals(4, metadataDF.count())
val expectedKeys = Seq("2015/03/16", "2015/03/17", "2016/03/15", "__all_partitions__")
val keys = metadataDF.select("key")
.collect()
.map(_.getString(0))
.toSeq
.sorted
assertEquals(expectedKeys, keys)
}
private def parseRecords(records: Seq[String]) = {
spark.read.json(spark.sparkContext.parallelize(records, 2))
}
}