From 6537af26761ca785aaf4f8e9158909205875c1bf Mon Sep 17 00:00:00 2001 From: Sreeram Ramji Date: Fri, 4 Sep 2020 09:08:30 -0700 Subject: [PATCH] [HUDI-1153] Spark DataSource and Streaming Write must fail when operation type is misconfigured (#2014) --- .../hudi/common/model/WriteOperationType.java | 8 +++++ .../java/org/apache/hudi/DataSourceUtils.java | 23 +++++++------ .../org/apache/hudi/DataSourceOptions.scala | 11 +++--- .../apache/hudi/HoodieSparkSqlWriter.scala | 34 +++++++++---------- .../org/apache/hudi/TestDataSourceUtils.java | 7 ++-- 5 files changed, 47 insertions(+), 36 deletions(-) diff --git a/hudi-common/src/main/java/org/apache/hudi/common/model/WriteOperationType.java b/hudi-common/src/main/java/org/apache/hudi/common/model/WriteOperationType.java index d1a53d5da..1521c5f7a 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/model/WriteOperationType.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/model/WriteOperationType.java @@ -71,6 +71,14 @@ public enum WriteOperationType { } } + /** + * Getter for value. + * @return string form of WriteOperationType + */ + public String value() { + return value; + } + public static boolean isChangingRecords(WriteOperationType operationType) { return operationType == UPSERT || operationType == UPSERT_PREPPED || operationType == DELETE; } diff --git a/hudi-spark/src/main/java/org/apache/hudi/DataSourceUtils.java b/hudi-spark/src/main/java/org/apache/hudi/DataSourceUtils.java index 19316d5e7..0115f226f 100644 --- a/hudi-spark/src/main/java/org/apache/hudi/DataSourceUtils.java +++ b/hudi-spark/src/main/java/org/apache/hudi/DataSourceUtils.java @@ -25,6 +25,7 @@ import org.apache.hudi.common.config.TypedProperties; import org.apache.hudi.common.model.HoodieKey; import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieRecordPayload; +import org.apache.hudi.common.model.WriteOperationType; import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.ReflectionUtils; import org.apache.hudi.common.util.StringUtils; @@ -248,16 +249,18 @@ public class DataSourceUtils { } public static JavaRDD doWriteOperation(HoodieWriteClient client, JavaRDD hoodieRecords, - String instantTime, String operation) throws HoodieException { - if (operation.equals(DataSourceWriteOptions.BULK_INSERT_OPERATION_OPT_VAL())) { - Option userDefinedBulkInsertPartitioner = - createUserDefinedBulkInsertPartitioner(client.getConfig()); - return client.bulkInsert(hoodieRecords, instantTime, userDefinedBulkInsertPartitioner); - } else if (operation.equals(DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL())) { - return client.insert(hoodieRecords, instantTime); - } else { - // default is upsert - return client.upsert(hoodieRecords, instantTime); + String instantTime, WriteOperationType operation) throws HoodieException { + switch (operation) { + case BULK_INSERT: + Option userDefinedBulkInsertPartitioner = + createUserDefinedBulkInsertPartitioner(client.getConfig()); + return client.bulkInsert(hoodieRecords, instantTime, userDefinedBulkInsertPartitioner); + case INSERT: + return client.insert(hoodieRecords, instantTime); + case UPSERT: + return client.upsert(hoodieRecords, instantTime); + default: + throw new HoodieException("Not a valid operation type for doWriteOperation: " + operation.toString()); } } diff --git a/hudi-spark/src/main/scala/org/apache/hudi/DataSourceOptions.scala b/hudi-spark/src/main/scala/org/apache/hudi/DataSourceOptions.scala index ba6bc879c..63cebd8b9 100644 --- a/hudi-spark/src/main/scala/org/apache/hudi/DataSourceOptions.scala +++ b/hudi-spark/src/main/scala/org/apache/hudi/DataSourceOptions.scala @@ -19,6 +19,7 @@ package org.apache.hudi import org.apache.hudi.common.model.HoodieTableType import org.apache.hudi.common.model.OverwriteWithLatestAvroPayload +import org.apache.hudi.common.model.WriteOperationType import org.apache.hudi.hive.HiveSyncTool import org.apache.hudi.hive.SlashEncodedDayPartitionValueExtractor import org.apache.hudi.keygen.SimpleKeyGenerator @@ -136,11 +137,11 @@ object DataSourceWriteOptions { * Default: upsert() */ val OPERATION_OPT_KEY = "hoodie.datasource.write.operation" - val BULK_INSERT_OPERATION_OPT_VAL = "bulk_insert" - val INSERT_OPERATION_OPT_VAL = "insert" - val UPSERT_OPERATION_OPT_VAL = "upsert" - val DELETE_OPERATION_OPT_VAL = "delete" - val BOOTSTRAP_OPERATION_OPT_VAL = "bootstrap" + val BULK_INSERT_OPERATION_OPT_VAL = WriteOperationType.BULK_INSERT.value + val INSERT_OPERATION_OPT_VAL = WriteOperationType.INSERT.value + val UPSERT_OPERATION_OPT_VAL = WriteOperationType.UPSERT.value + val DELETE_OPERATION_OPT_VAL = WriteOperationType.DELETE.value + val BOOTSTRAP_OPERATION_OPT_VAL = WriteOperationType.BOOTSTRAP.value val DEFAULT_OPERATION_OPT_VAL = UPSERT_OPERATION_OPT_VAL /** diff --git a/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala b/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala index 36886cdac..569ed345f 100644 --- a/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala +++ b/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala @@ -29,7 +29,7 @@ import org.apache.hudi.DataSourceWriteOptions._ import org.apache.hudi.avro.HoodieAvroUtils import org.apache.hudi.client.{HoodieWriteClient, WriteStatus} import org.apache.hudi.common.config.TypedProperties -import org.apache.hudi.common.model.{HoodieRecordPayload, HoodieTableType} +import org.apache.hudi.common.model.{HoodieRecordPayload, HoodieTableType, WriteOperationType} import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient} import org.apache.hudi.common.table.timeline.HoodieActiveTimeline import org.apache.hudi.common.util.ReflectionUtils @@ -76,21 +76,19 @@ private[hudi] object HoodieSparkSqlWriter { case _ => throw new HoodieException("hoodie only support org.apache.spark.serializer.KryoSerializer as spark.serializer") } val tableType = parameters(TABLE_TYPE_OPT_KEY) - val operation = + var operation = WriteOperationType.fromValue(parameters(OPERATION_OPT_KEY)) // It does not make sense to allow upsert() operation if INSERT_DROP_DUPS_OPT_KEY is true // Auto-correct the operation to "insert" if OPERATION_OPT_KEY is set to "upsert" wrongly // or not set (in which case it will be set as "upsert" by parametersWithWriteDefaults()) . - if (parameters(INSERT_DROP_DUPS_OPT_KEY).toBoolean && - parameters(OPERATION_OPT_KEY) == UPSERT_OPERATION_OPT_VAL) { + if (parameters(INSERT_DROP_DUPS_OPT_KEY).toBoolean && + operation == WriteOperationType.UPSERT) { - log.warn(s"$UPSERT_OPERATION_OPT_VAL is not applicable " + - s"when $INSERT_DROP_DUPS_OPT_KEY is set to be true, " + - s"overriding the $OPERATION_OPT_KEY to be $INSERT_OPERATION_OPT_VAL") + log.warn(s"$UPSERT_OPERATION_OPT_VAL is not applicable " + + s"when $INSERT_DROP_DUPS_OPT_KEY is set to be true, " + + s"overriding the $OPERATION_OPT_KEY to be $INSERT_OPERATION_OPT_VAL") - INSERT_OPERATION_OPT_VAL - } else { - parameters(OPERATION_OPT_KEY) - } + operation = WriteOperationType.INSERT + } val jsc = new JavaSparkContext(sparkContext) val basePath = new Path(path.get) @@ -123,7 +121,7 @@ private[hudi] object HoodieSparkSqlWriter { // scalastyle:on val (writeStatuses, writeClient: HoodieWriteClient[HoodieRecordPayload[Nothing]]) = - if (!operation.equalsIgnoreCase(DELETE_OPERATION_OPT_VAL)) { + if (operation != WriteOperationType.DELETE) { // register classes & schemas val (structName, nameSpace) = AvroConversionUtils.getAvroRecordNameAndNamespace(tblName) sparkContext.getConf.registerKryoClasses( @@ -242,7 +240,7 @@ private[hudi] object HoodieSparkSqlWriter { log.warn(s"hoodie table at $basePath already exists. Ignoring & not performing actual writes.") false } else { - handleSaveModes(mode, basePath, tableConfig, tableName, BOOTSTRAP_OPERATION_OPT_VAL, fs) + handleSaveModes(mode, basePath, tableConfig, tableName, WriteOperationType.BOOTSTRAP, fs) } if (!tableExists) { @@ -290,7 +288,7 @@ private[hudi] object HoodieSparkSqlWriter { } private def handleSaveModes(mode: SaveMode, tablePath: Path, tableConfig: HoodieTableConfig, tableName: String, - operation: String, fs: FileSystem): Unit = { + operation: WriteOperationType, fs: FileSystem): Unit = { if (mode == SaveMode.Append && tableExists) { val existingTableName = tableConfig.getTableName if (!existingTableName.equals(tableName)) { @@ -298,7 +296,7 @@ private[hudi] object HoodieSparkSqlWriter { } } - if (!operation.equalsIgnoreCase(DELETE_OPERATION_OPT_VAL)) { + if (operation != WriteOperationType.DELETE) { if (mode == SaveMode.ErrorIfExists && tableExists) { throw new HoodieException(s"hoodie table at $tablePath already exists.") } else if (mode == SaveMode.Overwrite && tableExists) { @@ -309,7 +307,7 @@ private[hudi] object HoodieSparkSqlWriter { } else { // Delete Operation only supports Append mode if (mode != SaveMode.Append) { - throw new HoodieException(s"Append is the only save mode applicable for $operation operation") + throw new HoodieException(s"Append is the only save mode applicable for ${operation.toString} operation") } } } @@ -384,7 +382,7 @@ private[hudi] object HoodieSparkSqlWriter { tableConfig: HoodieTableConfig, instantTime: String, basePath: Path, - operation: String, + operation: WriteOperationType, jsc: JavaSparkContext): (Boolean, common.util.Option[java.lang.String]) = { val errorCount = writeStatuses.rdd.filter(ws => ws.hasErrors).count() if (errorCount == 0) { @@ -422,7 +420,7 @@ private[hudi] object HoodieSparkSqlWriter { } (commitSuccess && metaSyncSuccess, compactionInstant) } else { - log.error(s"$operation failed with $errorCount errors :") + log.error(s"${operation.toString} failed with $errorCount errors :") if (log.isTraceEnabled) { log.trace("Printing out the top 100 errors") writeStatuses.rdd.filter(ws => ws.hasErrors) diff --git a/hudi-spark/src/test/java/org/apache/hudi/TestDataSourceUtils.java b/hudi-spark/src/test/java/org/apache/hudi/TestDataSourceUtils.java index 2cd4c4222..32071d664 100644 --- a/hudi-spark/src/test/java/org/apache/hudi/TestDataSourceUtils.java +++ b/hudi-spark/src/test/java/org/apache/hudi/TestDataSourceUtils.java @@ -22,6 +22,7 @@ import org.apache.hudi.avro.HoodieAvroUtils; import org.apache.hudi.client.HoodieWriteClient; import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieRecordPayload; +import org.apache.hudi.common.model.WriteOperationType; import org.apache.hudi.common.util.Option; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.exception.HoodieException; @@ -108,7 +109,7 @@ public class TestDataSourceUtils { when(hoodieWriteClient.getConfig()).thenReturn(config); DataSourceUtils.doWriteOperation(hoodieWriteClient, hoodieRecords, "test-time", - DataSourceWriteOptions.BULK_INSERT_OPERATION_OPT_VAL()); + WriteOperationType.BULK_INSERT); verify(hoodieWriteClient, times(1)).bulkInsert(any(hoodieRecords.getClass()), anyString(), optionCaptor.capture()); @@ -121,7 +122,7 @@ public class TestDataSourceUtils { Exception exception = assertThrows(HoodieException.class, () -> { DataSourceUtils.doWriteOperation(hoodieWriteClient, hoodieRecords, "test-time", - DataSourceWriteOptions.BULK_INSERT_OPERATION_OPT_VAL()); + WriteOperationType.BULK_INSERT); }); assertThat(exception.getMessage(), containsString("Could not create UserDefinedBulkInsertPartitioner")); @@ -132,7 +133,7 @@ public class TestDataSourceUtils { setAndVerifyHoodieWriteClientWith(NoOpBulkInsertPartitioner.class.getName()); DataSourceUtils.doWriteOperation(hoodieWriteClient, hoodieRecords, "test-time", - DataSourceWriteOptions.BULK_INSERT_OPERATION_OPT_VAL()); + WriteOperationType.BULK_INSERT); verify(hoodieWriteClient, times(1)).bulkInsert(any(hoodieRecords.getClass()), anyString(), optionCaptor.capture());