diff --git a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/AvroConversionUtils.scala b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/AvroConversionUtils.scala index a006eebf7..62bcbf684 100644 --- a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/AvroConversionUtils.scala +++ b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/AvroConversionUtils.scala @@ -17,7 +17,6 @@ */ package org.apache.hudi - import org.apache.avro.Schema.Type import org.apache.avro.generic.{GenericRecord, GenericRecordBuilder, IndexedRecord} import org.apache.avro.{AvroRuntimeException, JsonProperties, Schema} diff --git a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecord.java b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecord.java index f90448e7d..ac30766dd 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecord.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecord.java @@ -40,6 +40,7 @@ public abstract class HoodieRecord implements Serializable { public static final String PARTITION_PATH_METADATA_FIELD = "_hoodie_partition_path"; public static final String FILENAME_METADATA_FIELD = "_hoodie_file_name"; public static final String OPERATION_METADATA_FIELD = "_hoodie_operation"; + public static final String HOODIE_IS_DELETED = "_hoodie_is_deleted"; public static final List HOODIE_META_COLUMNS = CollectionUtils.createImmutableList(COMMIT_TIME_METADATA_FIELD, COMMIT_SEQNO_METADATA_FIELD, diff --git a/hudi-common/src/main/java/org/apache/hudi/common/model/OverwriteWithLatestAvroPayload.java b/hudi-common/src/main/java/org/apache/hudi/common/model/OverwriteWithLatestAvroPayload.java index 4be2e3e09..7b7bd6c6b 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/model/OverwriteWithLatestAvroPayload.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/model/OverwriteWithLatestAvroPayload.java @@ -85,7 +85,7 @@ public class OverwriteWithLatestAvroPayload extends BaseAvroPayload * @returns {@code true} if record represents a delete record. {@code false} otherwise. */ protected boolean isDeleteRecord(GenericRecord genericRecord) { - final String isDeleteKey = "_hoodie_is_deleted"; + final String isDeleteKey = HoodieRecord.HOODIE_IS_DELETED; // Modify to be compatible with new version Avro. // The new version Avro throws for GenericRecord.get if the field name // does not exist in the schema. diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala index b7f04c54e..89304d3d0 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala @@ -19,11 +19,10 @@ package org.apache.hudi import org.apache.avro.Schema import org.apache.avro.generic.GenericRecord - +import org.apache.avro.reflect.AvroSchema import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{FileSystem, Path} import org.apache.hadoop.hive.conf.HiveConf - import org.apache.hudi.DataSourceWriteOptions._ import org.apache.hudi.HoodieWriterUtils._ import org.apache.hudi.avro.HoodieAvroUtils @@ -45,9 +44,7 @@ import org.apache.hudi.keygen.{TimestampBasedAvroKeyGenerator, TimestampBasedKey import org.apache.hudi.keygen.factory.HoodieSparkKeyGeneratorFactory import org.apache.hudi.sync.common.AbstractSyncTool import org.apache.hudi.table.BulkInsertPartitioner - import org.apache.log4j.LogManager - import org.apache.spark.SPARK_VERSION import org.apache.spark.api.java.JavaSparkContext import org.apache.spark.sql.hive.HiveExternalCatalog @@ -58,7 +55,6 @@ import org.apache.spark.sql._ import org.apache.spark.SparkContext import java.util.Properties - import scala.collection.JavaConversions._ import scala.collection.mutable import scala.collection.mutable.ListBuffer @@ -242,6 +238,7 @@ object HoodieSparkSqlWriter { if (reconcileSchema) { schema = getLatestTableSchema(fs, basePath, sparkContext, schema) } + validateSchemaForHoodieIsDeleted(schema) sparkContext.getConf.registerAvroSchemas(schema) log.info(s"Registered avro schema : ${schema.toString(true)}") @@ -432,6 +429,14 @@ object HoodieSparkSqlWriter { } } + def validateSchemaForHoodieIsDeleted(schema: Schema): Unit = { + if (schema.getField(HoodieRecord.HOODIE_IS_DELETED) != null && + AvroConversionUtils.resolveAvroTypeNullability(schema.getField(HoodieRecord.HOODIE_IS_DELETED).schema())._2.getType != Schema.Type.BOOLEAN) { + throw new HoodieException(HoodieRecord.HOODIE_IS_DELETED + " has to be BOOLEAN type. Passed in dataframe's schema has type " + + schema.getField(HoodieRecord.HOODIE_IS_DELETED).schema().getType) + } + } + def bulkInsertAsRow(sqlContext: SQLContext, parameters: Map[String, String], df: DataFrame, @@ -454,6 +459,7 @@ object HoodieSparkSqlWriter { if (dropPartitionColumns) { schema = generateSchemaWithoutPartitionColumns(partitionColumns, schema) } + validateSchemaForHoodieIsDeleted(schema) sparkContext.getConf.registerAvroSchemas(schema) log.info(s"Registered avro schema : ${schema.toString(true)}") if (parameters(INSERT_DROP_DUPS.key).toBoolean) { diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala index 58b36f833..96d50f6b5 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala @@ -18,23 +18,21 @@ package org.apache.hudi.functional import org.apache.hadoop.fs.FileSystem - import org.apache.hudi.common.config.HoodieMetadataConfig +import org.apache.hudi.common.model.HoodieRecord import org.apache.hudi.common.table.timeline.HoodieInstant import org.apache.hudi.common.table.{HoodieTableMetaClient, TableSchemaResolver} import org.apache.hudi.common.testutils.HoodieTestDataGenerator import org.apache.hudi.common.testutils.RawTripTestPayload.{deleteRecordsToStrings, recordsToStrings} import org.apache.hudi.config.HoodieWriteConfig -import org.apache.hudi.exception.HoodieUpsertException +import org.apache.hudi.exception.{HoodieException, HoodieUpsertException} import org.apache.hudi.keygen._ import org.apache.hudi.keygen.constant.KeyGeneratorOptions.Config import org.apache.hudi.testutils.HoodieClientTestBase -import org.apache.hudi.{AvroConversionUtils, DataSourceReadOptions, DataSourceWriteOptions, HoodieDataSourceHelpers} - +import org.apache.hudi.{AvroConversionUtils, DataSourceReadOptions, DataSourceWriteOptions, HoodieDataSourceHelpers, HoodieMergeOnReadRDD} import org.apache.spark.sql._ import org.apache.spark.sql.functions.{col, concat, lit, udf} import org.apache.spark.sql.types._ - import org.joda.time.DateTime import org.joda.time.format.DateTimeFormat import org.junit.jupiter.api.Assertions.{assertEquals, assertThrows, assertTrue, fail} @@ -44,7 +42,6 @@ import org.junit.jupiter.params.ParameterizedTest import org.junit.jupiter.params.provider.{CsvSource, ValueSource} import java.sql.{Date, Timestamp} - import scala.collection.JavaConversions._ import scala.collection.JavaConverters._ @@ -98,6 +95,23 @@ class TestCOWDataSource extends HoodieClientTestBase { assertTrue(HoodieDataSourceHelpers.hasNewCommits(fs, basePath, "000")) } + + @Test def testHoodieIsDeletedNonBooleanField() { + // Insert Operation + val records = recordsToStrings(dataGen.generateInserts("000", 100)).toList + val inputDF = spark.read.json(spark.sparkContext.parallelize(records, 2)) + val df = inputDF.withColumn(HoodieRecord.HOODIE_IS_DELETED, lit("abc")) + + assertThrows(classOf[HoodieException], new Executable { + override def execute(): Unit = { + df.write.format("hudi") + .options(commonOpts) + .mode(SaveMode.Overwrite) + .save(basePath) + } + }, "Should have failed since _hoodie_is_deleted is not a BOOLEAN data type") + } + /** * This tests the case that query by with a specified partition condition on hudi table which is * different between the value of the partition field and the actual partition path,