[HUDI-3018] Adding validation to dataframe scheme to ensure reserved field does not have diff data type (#4852)
This commit is contained in:
committed by
GitHub
parent
2f99e8458a
commit
d5444ff7ff
@@ -17,7 +17,6 @@
|
|||||||
*/
|
*/
|
||||||
|
|
||||||
package org.apache.hudi
|
package org.apache.hudi
|
||||||
|
|
||||||
import org.apache.avro.Schema.Type
|
import org.apache.avro.Schema.Type
|
||||||
import org.apache.avro.generic.{GenericRecord, GenericRecordBuilder, IndexedRecord}
|
import org.apache.avro.generic.{GenericRecord, GenericRecordBuilder, IndexedRecord}
|
||||||
import org.apache.avro.{AvroRuntimeException, JsonProperties, Schema}
|
import org.apache.avro.{AvroRuntimeException, JsonProperties, Schema}
|
||||||
|
|||||||
@@ -40,6 +40,7 @@ public abstract class HoodieRecord<T> implements Serializable {
|
|||||||
public static final String PARTITION_PATH_METADATA_FIELD = "_hoodie_partition_path";
|
public static final String PARTITION_PATH_METADATA_FIELD = "_hoodie_partition_path";
|
||||||
public static final String FILENAME_METADATA_FIELD = "_hoodie_file_name";
|
public static final String FILENAME_METADATA_FIELD = "_hoodie_file_name";
|
||||||
public static final String OPERATION_METADATA_FIELD = "_hoodie_operation";
|
public static final String OPERATION_METADATA_FIELD = "_hoodie_operation";
|
||||||
|
public static final String HOODIE_IS_DELETED = "_hoodie_is_deleted";
|
||||||
|
|
||||||
public static final List<String> HOODIE_META_COLUMNS =
|
public static final List<String> HOODIE_META_COLUMNS =
|
||||||
CollectionUtils.createImmutableList(COMMIT_TIME_METADATA_FIELD, COMMIT_SEQNO_METADATA_FIELD,
|
CollectionUtils.createImmutableList(COMMIT_TIME_METADATA_FIELD, COMMIT_SEQNO_METADATA_FIELD,
|
||||||
|
|||||||
@@ -85,7 +85,7 @@ public class OverwriteWithLatestAvroPayload extends BaseAvroPayload
|
|||||||
* @returns {@code true} if record represents a delete record. {@code false} otherwise.
|
* @returns {@code true} if record represents a delete record. {@code false} otherwise.
|
||||||
*/
|
*/
|
||||||
protected boolean isDeleteRecord(GenericRecord genericRecord) {
|
protected boolean isDeleteRecord(GenericRecord genericRecord) {
|
||||||
final String isDeleteKey = "_hoodie_is_deleted";
|
final String isDeleteKey = HoodieRecord.HOODIE_IS_DELETED;
|
||||||
// Modify to be compatible with new version Avro.
|
// Modify to be compatible with new version Avro.
|
||||||
// The new version Avro throws for GenericRecord.get if the field name
|
// The new version Avro throws for GenericRecord.get if the field name
|
||||||
// does not exist in the schema.
|
// does not exist in the schema.
|
||||||
|
|||||||
@@ -19,11 +19,10 @@ package org.apache.hudi
|
|||||||
|
|
||||||
import org.apache.avro.Schema
|
import org.apache.avro.Schema
|
||||||
import org.apache.avro.generic.GenericRecord
|
import org.apache.avro.generic.GenericRecord
|
||||||
|
import org.apache.avro.reflect.AvroSchema
|
||||||
import org.apache.hadoop.conf.Configuration
|
import org.apache.hadoop.conf.Configuration
|
||||||
import org.apache.hadoop.fs.{FileSystem, Path}
|
import org.apache.hadoop.fs.{FileSystem, Path}
|
||||||
import org.apache.hadoop.hive.conf.HiveConf
|
import org.apache.hadoop.hive.conf.HiveConf
|
||||||
|
|
||||||
import org.apache.hudi.DataSourceWriteOptions._
|
import org.apache.hudi.DataSourceWriteOptions._
|
||||||
import org.apache.hudi.HoodieWriterUtils._
|
import org.apache.hudi.HoodieWriterUtils._
|
||||||
import org.apache.hudi.avro.HoodieAvroUtils
|
import org.apache.hudi.avro.HoodieAvroUtils
|
||||||
@@ -45,9 +44,7 @@ import org.apache.hudi.keygen.{TimestampBasedAvroKeyGenerator, TimestampBasedKey
|
|||||||
import org.apache.hudi.keygen.factory.HoodieSparkKeyGeneratorFactory
|
import org.apache.hudi.keygen.factory.HoodieSparkKeyGeneratorFactory
|
||||||
import org.apache.hudi.sync.common.AbstractSyncTool
|
import org.apache.hudi.sync.common.AbstractSyncTool
|
||||||
import org.apache.hudi.table.BulkInsertPartitioner
|
import org.apache.hudi.table.BulkInsertPartitioner
|
||||||
|
|
||||||
import org.apache.log4j.LogManager
|
import org.apache.log4j.LogManager
|
||||||
|
|
||||||
import org.apache.spark.SPARK_VERSION
|
import org.apache.spark.SPARK_VERSION
|
||||||
import org.apache.spark.api.java.JavaSparkContext
|
import org.apache.spark.api.java.JavaSparkContext
|
||||||
import org.apache.spark.sql.hive.HiveExternalCatalog
|
import org.apache.spark.sql.hive.HiveExternalCatalog
|
||||||
@@ -58,7 +55,6 @@ import org.apache.spark.sql._
|
|||||||
import org.apache.spark.SparkContext
|
import org.apache.spark.SparkContext
|
||||||
|
|
||||||
import java.util.Properties
|
import java.util.Properties
|
||||||
|
|
||||||
import scala.collection.JavaConversions._
|
import scala.collection.JavaConversions._
|
||||||
import scala.collection.mutable
|
import scala.collection.mutable
|
||||||
import scala.collection.mutable.ListBuffer
|
import scala.collection.mutable.ListBuffer
|
||||||
@@ -242,6 +238,7 @@ object HoodieSparkSqlWriter {
|
|||||||
if (reconcileSchema) {
|
if (reconcileSchema) {
|
||||||
schema = getLatestTableSchema(fs, basePath, sparkContext, schema)
|
schema = getLatestTableSchema(fs, basePath, sparkContext, schema)
|
||||||
}
|
}
|
||||||
|
validateSchemaForHoodieIsDeleted(schema)
|
||||||
sparkContext.getConf.registerAvroSchemas(schema)
|
sparkContext.getConf.registerAvroSchemas(schema)
|
||||||
log.info(s"Registered avro schema : ${schema.toString(true)}")
|
log.info(s"Registered avro schema : ${schema.toString(true)}")
|
||||||
|
|
||||||
@@ -432,6 +429,14 @@ object HoodieSparkSqlWriter {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
def validateSchemaForHoodieIsDeleted(schema: Schema): Unit = {
|
||||||
|
if (schema.getField(HoodieRecord.HOODIE_IS_DELETED) != null &&
|
||||||
|
AvroConversionUtils.resolveAvroTypeNullability(schema.getField(HoodieRecord.HOODIE_IS_DELETED).schema())._2.getType != Schema.Type.BOOLEAN) {
|
||||||
|
throw new HoodieException(HoodieRecord.HOODIE_IS_DELETED + " has to be BOOLEAN type. Passed in dataframe's schema has type "
|
||||||
|
+ schema.getField(HoodieRecord.HOODIE_IS_DELETED).schema().getType)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
def bulkInsertAsRow(sqlContext: SQLContext,
|
def bulkInsertAsRow(sqlContext: SQLContext,
|
||||||
parameters: Map[String, String],
|
parameters: Map[String, String],
|
||||||
df: DataFrame,
|
df: DataFrame,
|
||||||
@@ -454,6 +459,7 @@ object HoodieSparkSqlWriter {
|
|||||||
if (dropPartitionColumns) {
|
if (dropPartitionColumns) {
|
||||||
schema = generateSchemaWithoutPartitionColumns(partitionColumns, schema)
|
schema = generateSchemaWithoutPartitionColumns(partitionColumns, schema)
|
||||||
}
|
}
|
||||||
|
validateSchemaForHoodieIsDeleted(schema)
|
||||||
sparkContext.getConf.registerAvroSchemas(schema)
|
sparkContext.getConf.registerAvroSchemas(schema)
|
||||||
log.info(s"Registered avro schema : ${schema.toString(true)}")
|
log.info(s"Registered avro schema : ${schema.toString(true)}")
|
||||||
if (parameters(INSERT_DROP_DUPS.key).toBoolean) {
|
if (parameters(INSERT_DROP_DUPS.key).toBoolean) {
|
||||||
|
|||||||
@@ -18,23 +18,21 @@
|
|||||||
package org.apache.hudi.functional
|
package org.apache.hudi.functional
|
||||||
|
|
||||||
import org.apache.hadoop.fs.FileSystem
|
import org.apache.hadoop.fs.FileSystem
|
||||||
|
|
||||||
import org.apache.hudi.common.config.HoodieMetadataConfig
|
import org.apache.hudi.common.config.HoodieMetadataConfig
|
||||||
|
import org.apache.hudi.common.model.HoodieRecord
|
||||||
import org.apache.hudi.common.table.timeline.HoodieInstant
|
import org.apache.hudi.common.table.timeline.HoodieInstant
|
||||||
import org.apache.hudi.common.table.{HoodieTableMetaClient, TableSchemaResolver}
|
import org.apache.hudi.common.table.{HoodieTableMetaClient, TableSchemaResolver}
|
||||||
import org.apache.hudi.common.testutils.HoodieTestDataGenerator
|
import org.apache.hudi.common.testutils.HoodieTestDataGenerator
|
||||||
import org.apache.hudi.common.testutils.RawTripTestPayload.{deleteRecordsToStrings, recordsToStrings}
|
import org.apache.hudi.common.testutils.RawTripTestPayload.{deleteRecordsToStrings, recordsToStrings}
|
||||||
import org.apache.hudi.config.HoodieWriteConfig
|
import org.apache.hudi.config.HoodieWriteConfig
|
||||||
import org.apache.hudi.exception.HoodieUpsertException
|
import org.apache.hudi.exception.{HoodieException, HoodieUpsertException}
|
||||||
import org.apache.hudi.keygen._
|
import org.apache.hudi.keygen._
|
||||||
import org.apache.hudi.keygen.constant.KeyGeneratorOptions.Config
|
import org.apache.hudi.keygen.constant.KeyGeneratorOptions.Config
|
||||||
import org.apache.hudi.testutils.HoodieClientTestBase
|
import org.apache.hudi.testutils.HoodieClientTestBase
|
||||||
import org.apache.hudi.{AvroConversionUtils, DataSourceReadOptions, DataSourceWriteOptions, HoodieDataSourceHelpers}
|
import org.apache.hudi.{AvroConversionUtils, DataSourceReadOptions, DataSourceWriteOptions, HoodieDataSourceHelpers, HoodieMergeOnReadRDD}
|
||||||
|
|
||||||
import org.apache.spark.sql._
|
import org.apache.spark.sql._
|
||||||
import org.apache.spark.sql.functions.{col, concat, lit, udf}
|
import org.apache.spark.sql.functions.{col, concat, lit, udf}
|
||||||
import org.apache.spark.sql.types._
|
import org.apache.spark.sql.types._
|
||||||
|
|
||||||
import org.joda.time.DateTime
|
import org.joda.time.DateTime
|
||||||
import org.joda.time.format.DateTimeFormat
|
import org.joda.time.format.DateTimeFormat
|
||||||
import org.junit.jupiter.api.Assertions.{assertEquals, assertThrows, assertTrue, fail}
|
import org.junit.jupiter.api.Assertions.{assertEquals, assertThrows, assertTrue, fail}
|
||||||
@@ -44,7 +42,6 @@ import org.junit.jupiter.params.ParameterizedTest
|
|||||||
import org.junit.jupiter.params.provider.{CsvSource, ValueSource}
|
import org.junit.jupiter.params.provider.{CsvSource, ValueSource}
|
||||||
|
|
||||||
import java.sql.{Date, Timestamp}
|
import java.sql.{Date, Timestamp}
|
||||||
|
|
||||||
import scala.collection.JavaConversions._
|
import scala.collection.JavaConversions._
|
||||||
import scala.collection.JavaConverters._
|
import scala.collection.JavaConverters._
|
||||||
|
|
||||||
@@ -98,6 +95,23 @@ class TestCOWDataSource extends HoodieClientTestBase {
|
|||||||
assertTrue(HoodieDataSourceHelpers.hasNewCommits(fs, basePath, "000"))
|
assertTrue(HoodieDataSourceHelpers.hasNewCommits(fs, basePath, "000"))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@Test def testHoodieIsDeletedNonBooleanField() {
|
||||||
|
// Insert Operation
|
||||||
|
val records = recordsToStrings(dataGen.generateInserts("000", 100)).toList
|
||||||
|
val inputDF = spark.read.json(spark.sparkContext.parallelize(records, 2))
|
||||||
|
val df = inputDF.withColumn(HoodieRecord.HOODIE_IS_DELETED, lit("abc"))
|
||||||
|
|
||||||
|
assertThrows(classOf[HoodieException], new Executable {
|
||||||
|
override def execute(): Unit = {
|
||||||
|
df.write.format("hudi")
|
||||||
|
.options(commonOpts)
|
||||||
|
.mode(SaveMode.Overwrite)
|
||||||
|
.save(basePath)
|
||||||
|
}
|
||||||
|
}, "Should have failed since _hoodie_is_deleted is not a BOOLEAN data type")
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* This tests the case that query by with a specified partition condition on hudi table which is
|
* This tests the case that query by with a specified partition condition on hudi table which is
|
||||||
* different between the value of the partition field and the actual partition path,
|
* different between the value of the partition field and the actual partition path,
|
||||||
|
|||||||
Reference in New Issue
Block a user