1
0

[HUDI-1153] Spark DataSource and Streaming Write must fail when operation type is misconfigured (#2014)

This commit is contained in:
Sreeram Ramji
2020-09-04 09:08:30 -07:00
committed by GitHub
parent 8d19ebfd0f
commit 6537af2676
5 changed files with 47 additions and 36 deletions

View File

@@ -71,6 +71,14 @@ public enum WriteOperationType {
}
}
/**
* Getter for value.
* @return string form of WriteOperationType
*/
public String value() {
return value;
}
public static boolean isChangingRecords(WriteOperationType operationType) {
return operationType == UPSERT || operationType == UPSERT_PREPPED || operationType == DELETE;
}

View File

@@ -25,6 +25,7 @@ import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ReflectionUtils;
import org.apache.hudi.common.util.StringUtils;
@@ -248,16 +249,18 @@ public class DataSourceUtils {
}
public static JavaRDD<WriteStatus> doWriteOperation(HoodieWriteClient client, JavaRDD<HoodieRecord> hoodieRecords,
String instantTime, String operation) throws HoodieException {
if (operation.equals(DataSourceWriteOptions.BULK_INSERT_OPERATION_OPT_VAL())) {
Option<BulkInsertPartitioner> userDefinedBulkInsertPartitioner =
createUserDefinedBulkInsertPartitioner(client.getConfig());
return client.bulkInsert(hoodieRecords, instantTime, userDefinedBulkInsertPartitioner);
} else if (operation.equals(DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL())) {
return client.insert(hoodieRecords, instantTime);
} else {
// default is upsert
return client.upsert(hoodieRecords, instantTime);
String instantTime, WriteOperationType operation) throws HoodieException {
switch (operation) {
case BULK_INSERT:
Option<BulkInsertPartitioner> userDefinedBulkInsertPartitioner =
createUserDefinedBulkInsertPartitioner(client.getConfig());
return client.bulkInsert(hoodieRecords, instantTime, userDefinedBulkInsertPartitioner);
case INSERT:
return client.insert(hoodieRecords, instantTime);
case UPSERT:
return client.upsert(hoodieRecords, instantTime);
default:
throw new HoodieException("Not a valid operation type for doWriteOperation: " + operation.toString());
}
}

View File

@@ -19,6 +19,7 @@ package org.apache.hudi
import org.apache.hudi.common.model.HoodieTableType
import org.apache.hudi.common.model.OverwriteWithLatestAvroPayload
import org.apache.hudi.common.model.WriteOperationType
import org.apache.hudi.hive.HiveSyncTool
import org.apache.hudi.hive.SlashEncodedDayPartitionValueExtractor
import org.apache.hudi.keygen.SimpleKeyGenerator
@@ -136,11 +137,11 @@ object DataSourceWriteOptions {
* Default: upsert()
*/
val OPERATION_OPT_KEY = "hoodie.datasource.write.operation"
val BULK_INSERT_OPERATION_OPT_VAL = "bulk_insert"
val INSERT_OPERATION_OPT_VAL = "insert"
val UPSERT_OPERATION_OPT_VAL = "upsert"
val DELETE_OPERATION_OPT_VAL = "delete"
val BOOTSTRAP_OPERATION_OPT_VAL = "bootstrap"
val BULK_INSERT_OPERATION_OPT_VAL = WriteOperationType.BULK_INSERT.value
val INSERT_OPERATION_OPT_VAL = WriteOperationType.INSERT.value
val UPSERT_OPERATION_OPT_VAL = WriteOperationType.UPSERT.value
val DELETE_OPERATION_OPT_VAL = WriteOperationType.DELETE.value
val BOOTSTRAP_OPERATION_OPT_VAL = WriteOperationType.BOOTSTRAP.value
val DEFAULT_OPERATION_OPT_VAL = UPSERT_OPERATION_OPT_VAL
/**

View File

@@ -29,7 +29,7 @@ import org.apache.hudi.DataSourceWriteOptions._
import org.apache.hudi.avro.HoodieAvroUtils
import org.apache.hudi.client.{HoodieWriteClient, WriteStatus}
import org.apache.hudi.common.config.TypedProperties
import org.apache.hudi.common.model.{HoodieRecordPayload, HoodieTableType}
import org.apache.hudi.common.model.{HoodieRecordPayload, HoodieTableType, WriteOperationType}
import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient}
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline
import org.apache.hudi.common.util.ReflectionUtils
@@ -76,21 +76,19 @@ private[hudi] object HoodieSparkSqlWriter {
case _ => throw new HoodieException("hoodie only support org.apache.spark.serializer.KryoSerializer as spark.serializer")
}
val tableType = parameters(TABLE_TYPE_OPT_KEY)
val operation =
var operation = WriteOperationType.fromValue(parameters(OPERATION_OPT_KEY))
// It does not make sense to allow upsert() operation if INSERT_DROP_DUPS_OPT_KEY is true
// Auto-correct the operation to "insert" if OPERATION_OPT_KEY is set to "upsert" wrongly
// or not set (in which case it will be set as "upsert" by parametersWithWriteDefaults()) .
if (parameters(INSERT_DROP_DUPS_OPT_KEY).toBoolean &&
parameters(OPERATION_OPT_KEY) == UPSERT_OPERATION_OPT_VAL) {
if (parameters(INSERT_DROP_DUPS_OPT_KEY).toBoolean &&
operation == WriteOperationType.UPSERT) {
log.warn(s"$UPSERT_OPERATION_OPT_VAL is not applicable " +
s"when $INSERT_DROP_DUPS_OPT_KEY is set to be true, " +
s"overriding the $OPERATION_OPT_KEY to be $INSERT_OPERATION_OPT_VAL")
log.warn(s"$UPSERT_OPERATION_OPT_VAL is not applicable " +
s"when $INSERT_DROP_DUPS_OPT_KEY is set to be true, " +
s"overriding the $OPERATION_OPT_KEY to be $INSERT_OPERATION_OPT_VAL")
INSERT_OPERATION_OPT_VAL
} else {
parameters(OPERATION_OPT_KEY)
}
operation = WriteOperationType.INSERT
}
val jsc = new JavaSparkContext(sparkContext)
val basePath = new Path(path.get)
@@ -123,7 +121,7 @@ private[hudi] object HoodieSparkSqlWriter {
// scalastyle:on
val (writeStatuses, writeClient: HoodieWriteClient[HoodieRecordPayload[Nothing]]) =
if (!operation.equalsIgnoreCase(DELETE_OPERATION_OPT_VAL)) {
if (operation != WriteOperationType.DELETE) {
// register classes & schemas
val (structName, nameSpace) = AvroConversionUtils.getAvroRecordNameAndNamespace(tblName)
sparkContext.getConf.registerKryoClasses(
@@ -242,7 +240,7 @@ private[hudi] object HoodieSparkSqlWriter {
log.warn(s"hoodie table at $basePath already exists. Ignoring & not performing actual writes.")
false
} else {
handleSaveModes(mode, basePath, tableConfig, tableName, BOOTSTRAP_OPERATION_OPT_VAL, fs)
handleSaveModes(mode, basePath, tableConfig, tableName, WriteOperationType.BOOTSTRAP, fs)
}
if (!tableExists) {
@@ -290,7 +288,7 @@ private[hudi] object HoodieSparkSqlWriter {
}
private def handleSaveModes(mode: SaveMode, tablePath: Path, tableConfig: HoodieTableConfig, tableName: String,
operation: String, fs: FileSystem): Unit = {
operation: WriteOperationType, fs: FileSystem): Unit = {
if (mode == SaveMode.Append && tableExists) {
val existingTableName = tableConfig.getTableName
if (!existingTableName.equals(tableName)) {
@@ -298,7 +296,7 @@ private[hudi] object HoodieSparkSqlWriter {
}
}
if (!operation.equalsIgnoreCase(DELETE_OPERATION_OPT_VAL)) {
if (operation != WriteOperationType.DELETE) {
if (mode == SaveMode.ErrorIfExists && tableExists) {
throw new HoodieException(s"hoodie table at $tablePath already exists.")
} else if (mode == SaveMode.Overwrite && tableExists) {
@@ -309,7 +307,7 @@ private[hudi] object HoodieSparkSqlWriter {
} else {
// Delete Operation only supports Append mode
if (mode != SaveMode.Append) {
throw new HoodieException(s"Append is the only save mode applicable for $operation operation")
throw new HoodieException(s"Append is the only save mode applicable for ${operation.toString} operation")
}
}
}
@@ -384,7 +382,7 @@ private[hudi] object HoodieSparkSqlWriter {
tableConfig: HoodieTableConfig,
instantTime: String,
basePath: Path,
operation: String,
operation: WriteOperationType,
jsc: JavaSparkContext): (Boolean, common.util.Option[java.lang.String]) = {
val errorCount = writeStatuses.rdd.filter(ws => ws.hasErrors).count()
if (errorCount == 0) {
@@ -422,7 +420,7 @@ private[hudi] object HoodieSparkSqlWriter {
}
(commitSuccess && metaSyncSuccess, compactionInstant)
} else {
log.error(s"$operation failed with $errorCount errors :")
log.error(s"${operation.toString} failed with $errorCount errors :")
if (log.isTraceEnabled) {
log.trace("Printing out the top 100 errors")
writeStatuses.rdd.filter(ws => ws.hasErrors)

View File

@@ -22,6 +22,7 @@ import org.apache.hudi.avro.HoodieAvroUtils;
import org.apache.hudi.client.HoodieWriteClient;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieException;
@@ -108,7 +109,7 @@ public class TestDataSourceUtils {
when(hoodieWriteClient.getConfig()).thenReturn(config);
DataSourceUtils.doWriteOperation(hoodieWriteClient, hoodieRecords, "test-time",
DataSourceWriteOptions.BULK_INSERT_OPERATION_OPT_VAL());
WriteOperationType.BULK_INSERT);
verify(hoodieWriteClient, times(1)).bulkInsert(any(hoodieRecords.getClass()), anyString(),
optionCaptor.capture());
@@ -121,7 +122,7 @@ public class TestDataSourceUtils {
Exception exception = assertThrows(HoodieException.class, () -> {
DataSourceUtils.doWriteOperation(hoodieWriteClient, hoodieRecords, "test-time",
DataSourceWriteOptions.BULK_INSERT_OPERATION_OPT_VAL());
WriteOperationType.BULK_INSERT);
});
assertThat(exception.getMessage(), containsString("Could not create UserDefinedBulkInsertPartitioner"));
@@ -132,7 +133,7 @@ public class TestDataSourceUtils {
setAndVerifyHoodieWriteClientWith(NoOpBulkInsertPartitioner.class.getName());
DataSourceUtils.doWriteOperation(hoodieWriteClient, hoodieRecords, "test-time",
DataSourceWriteOptions.BULK_INSERT_OPERATION_OPT_VAL());
WriteOperationType.BULK_INSERT);
verify(hoodieWriteClient, times(1)).bulkInsert(any(hoodieRecords.getClass()), anyString(),
optionCaptor.capture());