1
0

[HUDI-1768] add spark datasource unit test for schema validate add column (#2776)

This commit is contained in:
lw0090
2021-05-12 04:49:18 +08:00
committed by GitHub
parent be9db2c4f5
commit 5a8b2a4f86
2 changed files with 96 additions and 1 deletions

View File

@@ -21,7 +21,6 @@ import java.sql.{Date, Timestamp}
import scala.collection.JavaConversions._
import scala.collection.JavaConverters._
import org.apache.hudi.common.config.HoodieMetadataConfig
import org.apache.hudi.common.table.{HoodieTableMetaClient, TableSchemaResolver}
import org.apache.hudi.common.table.timeline.HoodieInstant
@@ -29,6 +28,7 @@ import org.apache.hudi.common.testutils.HoodieTestDataGenerator
import org.apache.hudi.common.testutils.RawTripTestPayload.recordsToStrings
import org.apache.hudi.common.testutils.RawTripTestPayload.deleteRecordsToStrings
import org.apache.hudi.config.HoodieWriteConfig
import org.apache.hudi.exception.HoodieUpsertException
import org.apache.hudi.keygen._
import org.apache.hudi.keygen.TimestampBasedAvroKeyGenerator.Config
import org.apache.hudi.testutils.HoodieClientTestBase
@@ -703,4 +703,85 @@ class TestCOWDataSource extends HoodieClientTestBase {
.load(basePath)
assertEquals(N + 1, hoodieIncViewDF1.count())
}
@Test def testSchemaEvolution(): Unit = {
// open the schema validate
val opts = commonOpts ++ Map("hoodie.avro.schema.validate" -> "true")
// 1. write records with schema1
val schema1 = StructType(StructField("_row_key", StringType, true) :: StructField("name", StringType, true)::
StructField("timestamp", IntegerType, true) :: StructField("partition", IntegerType, true)::Nil)
val records1 = Seq(Row("1", "Andy", 1, 1),
Row("2", "lisi", 1, 1),
Row("3", "zhangsan", 1, 1))
val rdd = jsc.parallelize(records1)
val recordsDF = spark.createDataFrame(rdd, schema1)
recordsDF.write.format("org.apache.hudi")
.options(opts)
.mode(SaveMode.Append)
.save(basePath)
// 2. write records with schema2 add column age
val schema2 = StructType(StructField("_row_key", StringType, true) :: StructField("name", StringType, true) ::
StructField("age", StringType, true) :: StructField("timestamp", IntegerType, true) ::
StructField("partition", IntegerType, true)::Nil)
val records2 = Seq(Row("11", "Andy", "10", 1, 1),
Row("22", "lisi", "11",1, 1),
Row("33", "zhangsan", "12", 1, 1))
val rdd2 = jsc.parallelize(records2)
val recordsDF2 = spark.createDataFrame(rdd2, schema2)
recordsDF2.write.format("org.apache.hudi")
.options(opts)
.mode(SaveMode.Append)
.save(basePath)
val recordsReadDF = spark.read.format("org.apache.hudi")
.load(basePath + "/*/*")
val resultSchema = new StructType(recordsReadDF.schema.filter(p=> !p.name.startsWith("_hoodie")).toArray)
assertEquals(resultSchema, schema2)
// 3. write records with schema3 delete column name
try {
val schema3 = StructType(StructField("_row_key", StringType, true) ::
StructField("age", StringType, true) :: StructField("timestamp", IntegerType, true) ::
StructField("partition", IntegerType, true)::Nil)
val records3 = Seq(Row("11", "10", 1, 1),
Row("22", "11",1, 1),
Row("33", "12", 1, 1))
val rdd3 = jsc.parallelize(records3)
val recordsDF3 = spark.createDataFrame(rdd3, schema3)
recordsDF3.write.format("org.apache.hudi")
.options(opts)
.mode(SaveMode.Append)
.save(basePath)
fail("Delete column should fail")
} catch {
case ex: HoodieUpsertException =>
assertTrue(ex.getMessage.equals("Failed upsert schema compatibility check."))
}
}
@Test def testSchemaNotEqualData(): Unit = {
val opts = commonOpts ++ Map("hoodie.avro.schema.validate" -> "true")
val schema1 = StructType(StructField("_row_key", StringType, true) :: StructField("name", StringType, true)::
StructField("timestamp", IntegerType, true):: StructField("age", StringType, true) :: StructField("partition", IntegerType, true)::Nil)
val records = Array("{\"_row_key\":\"1\",\"name\":\"lisi\",\"timestamp\":1,\"partition\":1}",
"{\"_row_key\":\"1\",\"name\":\"lisi\",\"timestamp\":1,\"partition\":1}")
val inputDF = spark.read.schema(schema1.toDDL).json(spark.sparkContext.parallelize(records, 2))
inputDF.write.format("org.apache.hudi")
.options(opts)
.mode(SaveMode.Append)
.save(basePath)
val recordsReadDF = spark.read.format("org.apache.hudi")
.load(basePath + "/*/*")
val resultSchema = new StructType(recordsReadDF.schema.filter(p=> !p.name.startsWith("_hoodie")).toArray)
assertEquals(resultSchema, schema1)
}
}