diff --git a/hudi-common/src/main/java/org/apache/hudi/common/model/OverwriteWithLatestAvroPayload.java b/hudi-common/src/main/java/org/apache/hudi/common/model/OverwriteWithLatestAvroPayload.java index 845967c00..6d32d502f 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/model/OverwriteWithLatestAvroPayload.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/model/OverwriteWithLatestAvroPayload.java @@ -44,7 +44,7 @@ public class OverwriteWithLatestAvroPayload extends BaseAvroPayload } public OverwriteWithLatestAvroPayload(Option record) { - this(record.isPresent() ? record.get() : null, (record1) -> 0); // natural order + this(record.isPresent() ? record.get() : null, 0); // natural order } @Override diff --git a/hudi-spark/src/main/java/org/apache/hudi/DataSourceUtils.java b/hudi-spark/src/main/java/org/apache/hudi/DataSourceUtils.java index a4d47157a..0b16c3149 100644 --- a/hudi-spark/src/main/java/org/apache/hudi/DataSourceUtils.java +++ b/hudi-spark/src/main/java/org/apache/hudi/DataSourceUtils.java @@ -142,6 +142,19 @@ public class DataSourceUtils { } } + /** + * Create a payload class via reflection, do not ordering/precombine value. + */ + public static HoodieRecordPayload createPayload(String payloadClass, GenericRecord record) + throws IOException { + try { + return (HoodieRecordPayload) ReflectionUtils.loadClass(payloadClass, + new Class[] {Option.class}, Option.of(record)); + } catch (Throwable e) { + throw new IOException("Could not create payload for class: " + payloadClass, e); + } + } + public static void checkRequiredProperties(TypedProperties props, List checkPropNames) { checkPropNames.forEach(prop -> { if (!props.containsKey(prop)) { @@ -214,6 +227,12 @@ public class DataSourceUtils { return new HoodieRecord<>(hKey, payload); } + public static HoodieRecord createHoodieRecord(GenericRecord gr, HoodieKey hKey, + String payloadClass) throws IOException { + HoodieRecordPayload payload = DataSourceUtils.createPayload(payloadClass, gr); + return new HoodieRecord<>(hKey, payload); + } + /** * Drop records already present in the dataset. * diff --git a/hudi-spark/src/main/java/org/apache/hudi/payload/AWSDmsAvroPayload.java b/hudi-spark/src/main/java/org/apache/hudi/payload/AWSDmsAvroPayload.java index 73711c705..d0e132676 100644 --- a/hudi-spark/src/main/java/org/apache/hudi/payload/AWSDmsAvroPayload.java +++ b/hudi-spark/src/main/java/org/apache/hudi/payload/AWSDmsAvroPayload.java @@ -50,7 +50,7 @@ public class AWSDmsAvroPayload extends OverwriteWithLatestAvroPayload { } public AWSDmsAvroPayload(Option record) { - this(record.get(), (record1) -> 0); // natural order + this(record.get(), 0); // natural order } /** diff --git a/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala b/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala index 7173a2834..b9731df8b 100644 --- a/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala +++ b/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala @@ -141,12 +141,18 @@ private[hudi] object HoodieSparkSqlWriter { // Convert to RDD[HoodieRecord] val keyGenerator = DataSourceUtils.createKeyGenerator(toProperties(parameters)) val genericRecords: RDD[GenericRecord] = AvroConversionUtils.createRdd(df, structName, nameSpace) + val shouldCombine = parameters(INSERT_DROP_DUPS_OPT_KEY).toBoolean || operation.equals(WriteOperationType.UPSERT); val hoodieAllIncomingRecords = genericRecords.map(gr => { - val orderingVal = HoodieAvroUtils.getNestedFieldVal(gr, parameters(PRECOMBINE_FIELD_OPT_KEY), false) - .asInstanceOf[Comparable[_]] - DataSourceUtils.createHoodieRecord(gr, - orderingVal, keyGenerator.getKey(gr), - parameters(PAYLOAD_CLASS_OPT_KEY)) + val hoodieRecord = if (shouldCombine) { + val orderingVal = HoodieAvroUtils.getNestedFieldVal(gr, parameters(PRECOMBINE_FIELD_OPT_KEY), false) + .asInstanceOf[Comparable[_]] + DataSourceUtils.createHoodieRecord(gr, + orderingVal, keyGenerator.getKey(gr), + parameters(PAYLOAD_CLASS_OPT_KEY)) + } else { + DataSourceUtils.createHoodieRecord(gr, keyGenerator.getKey(gr), parameters(PAYLOAD_CLASS_OPT_KEY)) + } + hoodieRecord }).toJavaRDD() // Create a HoodieWriteClient & issue the write. diff --git a/hudi-spark/src/test/scala/org/apache/hudi/functional/HoodieSparkSqlWriterSuite.scala b/hudi-spark/src/test/scala/org/apache/hudi/functional/HoodieSparkSqlWriterSuite.scala index 15872dd22..e1659048f 100644 --- a/hudi-spark/src/test/scala/org/apache/hudi/functional/HoodieSparkSqlWriterSuite.scala +++ b/hudi-spark/src/test/scala/org/apache/hudi/functional/HoodieSparkSqlWriterSuite.scala @@ -124,7 +124,6 @@ class HoodieSparkSqlWriterSuite extends FunSuite with Matchers { HoodieWriteConfig.TABLE_NAME -> hoodieFooTableName, "hoodie.bulkinsert.shuffle.parallelism" -> "4", DataSourceWriteOptions.OPERATION_OPT_KEY -> DataSourceWriteOptions.BULK_INSERT_OPERATION_OPT_VAL, - DataSourceWriteOptions.ENABLE_ROW_WRITER_OPT_KEY -> "true", DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY -> "_row_key", DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY -> "partition", DataSourceWriteOptions.KEYGENERATOR_CLASS_OPT_KEY -> "org.apache.hudi.keygen.SimpleKeyGenerator") @@ -163,6 +162,63 @@ class HoodieSparkSqlWriterSuite extends FunSuite with Matchers { } } + test("test insert dataset without precombine field") { + val session = SparkSession.builder() + .appName("test_insert_without_precombine") + .master("local[2]") + .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") + .getOrCreate() + val path = java.nio.file.Files.createTempDirectory("hoodie_test_path") + try { + + val sqlContext = session.sqlContext + val sc = session.sparkContext + val hoodieFooTableName = "hoodie_foo_tbl" + + //create a new table + val fooTableModifier = Map("path" -> path.toAbsolutePath.toString, + HoodieWriteConfig.TABLE_NAME -> hoodieFooTableName, + "hoodie.bulkinsert.shuffle.parallelism" -> "1", + DataSourceWriteOptions.OPERATION_OPT_KEY -> DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL, + DataSourceWriteOptions.INSERT_DROP_DUPS_OPT_KEY -> "false", + DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY -> "_row_key", + DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY -> "partition", + DataSourceWriteOptions.KEYGENERATOR_CLASS_OPT_KEY -> "org.apache.hudi.keygen.SimpleKeyGenerator") + val fooTableParams = HoodieWriterUtils.parametersWithWriteDefaults(fooTableModifier) + + // generate the inserts + val schema = DataSourceTestUtils.getStructTypeExampleSchema + val structType = AvroConversionUtils.convertAvroSchemaToStructType(schema) + val records = DataSourceTestUtils.generateRandomRows(100) + val recordsSeq = convertRowListToSeq(records) + val df = session.createDataFrame(sc.parallelize(recordsSeq), structType) + // write to Hudi + HoodieSparkSqlWriter.write(sqlContext, SaveMode.Append, fooTableParams - DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY, df) + + // collect all parition paths to issue read of parquet files + val partitions = Seq(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH, + HoodieTestDataGenerator.DEFAULT_THIRD_PARTITION_PATH) + // Check the entire dataset has all records still + val fullPartitionPaths = new Array[String](3) + for (i <- 0 until fullPartitionPaths.length) { + fullPartitionPaths(i) = String.format("%s/%s/*", path.toAbsolutePath.toString, partitions(i)) + } + + // fetch all records from parquet files generated from write to hudi + val actualDf = session.sqlContext.read.parquet(fullPartitionPaths(0), fullPartitionPaths(1), fullPartitionPaths(2)) + + // remove metadata columns so that expected and actual DFs can be compared as is + val trimmedDf = actualDf.drop(HoodieRecord.HOODIE_META_COLUMNS.get(0)).drop(HoodieRecord.HOODIE_META_COLUMNS.get(1)) + .drop(HoodieRecord.HOODIE_META_COLUMNS.get(2)).drop(HoodieRecord.HOODIE_META_COLUMNS.get(3)) + .drop(HoodieRecord.HOODIE_META_COLUMNS.get(4)) + + assert(df.except(trimmedDf).count() == 0) + } finally { + session.stop() + FileUtils.deleteDirectory(path.toFile) + } + } + test("test bulk insert dataset with datasource impl multiple rounds") { initSparkContext("test_bulk_insert_datasource") val path = java.nio.file.Files.createTempDirectory("hoodie_test_path") diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java index a8d2ac108..36f121344 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java @@ -351,10 +351,12 @@ public class DeltaSync implements Serializable { return Pair.of(schemaProvider, Pair.of(checkpointStr, jssc.emptyRDD())); } + boolean shouldCombine = cfg.filterDupes || cfg.operation.equals(HoodieDeltaStreamer.Operation.UPSERT); JavaRDD avroRDD = avroRDDOptional.get(); JavaRDD records = avroRDD.map(gr -> { - HoodieRecordPayload payload = DataSourceUtils.createPayload(cfg.payloadClassName, gr, - (Comparable) HoodieAvroUtils.getNestedFieldVal(gr, cfg.sourceOrderingField, false)); + HoodieRecordPayload payload = shouldCombine ? DataSourceUtils.createPayload(cfg.payloadClassName, gr, + (Comparable) HoodieAvroUtils.getNestedFieldVal(gr, cfg.sourceOrderingField, false)) + : DataSourceUtils.createPayload(cfg.payloadClassName, gr); return new HoodieRecord<>(keyGenerator.getKey(gr), payload); });