[HUDI-1208] Ordering Field should be optional when precombine is turned off (#2088)
This commit is contained in:
@@ -44,7 +44,7 @@ public class OverwriteWithLatestAvroPayload extends BaseAvroPayload
|
|||||||
}
|
}
|
||||||
|
|
||||||
public OverwriteWithLatestAvroPayload(Option<GenericRecord> record) {
|
public OverwriteWithLatestAvroPayload(Option<GenericRecord> record) {
|
||||||
this(record.isPresent() ? record.get() : null, (record1) -> 0); // natural order
|
this(record.isPresent() ? record.get() : null, 0); // natural order
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|||||||
@@ -142,6 +142,19 @@ public class DataSourceUtils {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Create a payload class via reflection, do not ordering/precombine value.
|
||||||
|
*/
|
||||||
|
public static HoodieRecordPayload createPayload(String payloadClass, GenericRecord record)
|
||||||
|
throws IOException {
|
||||||
|
try {
|
||||||
|
return (HoodieRecordPayload) ReflectionUtils.loadClass(payloadClass,
|
||||||
|
new Class<?>[] {Option.class}, Option.of(record));
|
||||||
|
} catch (Throwable e) {
|
||||||
|
throw new IOException("Could not create payload for class: " + payloadClass, e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
public static void checkRequiredProperties(TypedProperties props, List<String> checkPropNames) {
|
public static void checkRequiredProperties(TypedProperties props, List<String> checkPropNames) {
|
||||||
checkPropNames.forEach(prop -> {
|
checkPropNames.forEach(prop -> {
|
||||||
if (!props.containsKey(prop)) {
|
if (!props.containsKey(prop)) {
|
||||||
@@ -214,6 +227,12 @@ public class DataSourceUtils {
|
|||||||
return new HoodieRecord<>(hKey, payload);
|
return new HoodieRecord<>(hKey, payload);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public static HoodieRecord createHoodieRecord(GenericRecord gr, HoodieKey hKey,
|
||||||
|
String payloadClass) throws IOException {
|
||||||
|
HoodieRecordPayload payload = DataSourceUtils.createPayload(payloadClass, gr);
|
||||||
|
return new HoodieRecord<>(hKey, payload);
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Drop records already present in the dataset.
|
* Drop records already present in the dataset.
|
||||||
*
|
*
|
||||||
|
|||||||
@@ -50,7 +50,7 @@ public class AWSDmsAvroPayload extends OverwriteWithLatestAvroPayload {
|
|||||||
}
|
}
|
||||||
|
|
||||||
public AWSDmsAvroPayload(Option<GenericRecord> record) {
|
public AWSDmsAvroPayload(Option<GenericRecord> record) {
|
||||||
this(record.get(), (record1) -> 0); // natural order
|
this(record.get(), 0); // natural order
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|||||||
@@ -141,12 +141,18 @@ private[hudi] object HoodieSparkSqlWriter {
|
|||||||
// Convert to RDD[HoodieRecord]
|
// Convert to RDD[HoodieRecord]
|
||||||
val keyGenerator = DataSourceUtils.createKeyGenerator(toProperties(parameters))
|
val keyGenerator = DataSourceUtils.createKeyGenerator(toProperties(parameters))
|
||||||
val genericRecords: RDD[GenericRecord] = AvroConversionUtils.createRdd(df, structName, nameSpace)
|
val genericRecords: RDD[GenericRecord] = AvroConversionUtils.createRdd(df, structName, nameSpace)
|
||||||
|
val shouldCombine = parameters(INSERT_DROP_DUPS_OPT_KEY).toBoolean || operation.equals(WriteOperationType.UPSERT);
|
||||||
val hoodieAllIncomingRecords = genericRecords.map(gr => {
|
val hoodieAllIncomingRecords = genericRecords.map(gr => {
|
||||||
|
val hoodieRecord = if (shouldCombine) {
|
||||||
val orderingVal = HoodieAvroUtils.getNestedFieldVal(gr, parameters(PRECOMBINE_FIELD_OPT_KEY), false)
|
val orderingVal = HoodieAvroUtils.getNestedFieldVal(gr, parameters(PRECOMBINE_FIELD_OPT_KEY), false)
|
||||||
.asInstanceOf[Comparable[_]]
|
.asInstanceOf[Comparable[_]]
|
||||||
DataSourceUtils.createHoodieRecord(gr,
|
DataSourceUtils.createHoodieRecord(gr,
|
||||||
orderingVal, keyGenerator.getKey(gr),
|
orderingVal, keyGenerator.getKey(gr),
|
||||||
parameters(PAYLOAD_CLASS_OPT_KEY))
|
parameters(PAYLOAD_CLASS_OPT_KEY))
|
||||||
|
} else {
|
||||||
|
DataSourceUtils.createHoodieRecord(gr, keyGenerator.getKey(gr), parameters(PAYLOAD_CLASS_OPT_KEY))
|
||||||
|
}
|
||||||
|
hoodieRecord
|
||||||
}).toJavaRDD()
|
}).toJavaRDD()
|
||||||
|
|
||||||
// Create a HoodieWriteClient & issue the write.
|
// Create a HoodieWriteClient & issue the write.
|
||||||
|
|||||||
@@ -124,7 +124,6 @@ class HoodieSparkSqlWriterSuite extends FunSuite with Matchers {
|
|||||||
HoodieWriteConfig.TABLE_NAME -> hoodieFooTableName,
|
HoodieWriteConfig.TABLE_NAME -> hoodieFooTableName,
|
||||||
"hoodie.bulkinsert.shuffle.parallelism" -> "4",
|
"hoodie.bulkinsert.shuffle.parallelism" -> "4",
|
||||||
DataSourceWriteOptions.OPERATION_OPT_KEY -> DataSourceWriteOptions.BULK_INSERT_OPERATION_OPT_VAL,
|
DataSourceWriteOptions.OPERATION_OPT_KEY -> DataSourceWriteOptions.BULK_INSERT_OPERATION_OPT_VAL,
|
||||||
DataSourceWriteOptions.ENABLE_ROW_WRITER_OPT_KEY -> "true",
|
|
||||||
DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY -> "_row_key",
|
DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY -> "_row_key",
|
||||||
DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY -> "partition",
|
DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY -> "partition",
|
||||||
DataSourceWriteOptions.KEYGENERATOR_CLASS_OPT_KEY -> "org.apache.hudi.keygen.SimpleKeyGenerator")
|
DataSourceWriteOptions.KEYGENERATOR_CLASS_OPT_KEY -> "org.apache.hudi.keygen.SimpleKeyGenerator")
|
||||||
@@ -163,6 +162,63 @@ class HoodieSparkSqlWriterSuite extends FunSuite with Matchers {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
test("test insert dataset without precombine field") {
|
||||||
|
val session = SparkSession.builder()
|
||||||
|
.appName("test_insert_without_precombine")
|
||||||
|
.master("local[2]")
|
||||||
|
.config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
|
||||||
|
.getOrCreate()
|
||||||
|
val path = java.nio.file.Files.createTempDirectory("hoodie_test_path")
|
||||||
|
try {
|
||||||
|
|
||||||
|
val sqlContext = session.sqlContext
|
||||||
|
val sc = session.sparkContext
|
||||||
|
val hoodieFooTableName = "hoodie_foo_tbl"
|
||||||
|
|
||||||
|
//create a new table
|
||||||
|
val fooTableModifier = Map("path" -> path.toAbsolutePath.toString,
|
||||||
|
HoodieWriteConfig.TABLE_NAME -> hoodieFooTableName,
|
||||||
|
"hoodie.bulkinsert.shuffle.parallelism" -> "1",
|
||||||
|
DataSourceWriteOptions.OPERATION_OPT_KEY -> DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL,
|
||||||
|
DataSourceWriteOptions.INSERT_DROP_DUPS_OPT_KEY -> "false",
|
||||||
|
DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY -> "_row_key",
|
||||||
|
DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY -> "partition",
|
||||||
|
DataSourceWriteOptions.KEYGENERATOR_CLASS_OPT_KEY -> "org.apache.hudi.keygen.SimpleKeyGenerator")
|
||||||
|
val fooTableParams = HoodieWriterUtils.parametersWithWriteDefaults(fooTableModifier)
|
||||||
|
|
||||||
|
// generate the inserts
|
||||||
|
val schema = DataSourceTestUtils.getStructTypeExampleSchema
|
||||||
|
val structType = AvroConversionUtils.convertAvroSchemaToStructType(schema)
|
||||||
|
val records = DataSourceTestUtils.generateRandomRows(100)
|
||||||
|
val recordsSeq = convertRowListToSeq(records)
|
||||||
|
val df = session.createDataFrame(sc.parallelize(recordsSeq), structType)
|
||||||
|
// write to Hudi
|
||||||
|
HoodieSparkSqlWriter.write(sqlContext, SaveMode.Append, fooTableParams - DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY, df)
|
||||||
|
|
||||||
|
// collect all parition paths to issue read of parquet files
|
||||||
|
val partitions = Seq(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH,
|
||||||
|
HoodieTestDataGenerator.DEFAULT_THIRD_PARTITION_PATH)
|
||||||
|
// Check the entire dataset has all records still
|
||||||
|
val fullPartitionPaths = new Array[String](3)
|
||||||
|
for (i <- 0 until fullPartitionPaths.length) {
|
||||||
|
fullPartitionPaths(i) = String.format("%s/%s/*", path.toAbsolutePath.toString, partitions(i))
|
||||||
|
}
|
||||||
|
|
||||||
|
// fetch all records from parquet files generated from write to hudi
|
||||||
|
val actualDf = session.sqlContext.read.parquet(fullPartitionPaths(0), fullPartitionPaths(1), fullPartitionPaths(2))
|
||||||
|
|
||||||
|
// remove metadata columns so that expected and actual DFs can be compared as is
|
||||||
|
val trimmedDf = actualDf.drop(HoodieRecord.HOODIE_META_COLUMNS.get(0)).drop(HoodieRecord.HOODIE_META_COLUMNS.get(1))
|
||||||
|
.drop(HoodieRecord.HOODIE_META_COLUMNS.get(2)).drop(HoodieRecord.HOODIE_META_COLUMNS.get(3))
|
||||||
|
.drop(HoodieRecord.HOODIE_META_COLUMNS.get(4))
|
||||||
|
|
||||||
|
assert(df.except(trimmedDf).count() == 0)
|
||||||
|
} finally {
|
||||||
|
session.stop()
|
||||||
|
FileUtils.deleteDirectory(path.toFile)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
test("test bulk insert dataset with datasource impl multiple rounds") {
|
test("test bulk insert dataset with datasource impl multiple rounds") {
|
||||||
initSparkContext("test_bulk_insert_datasource")
|
initSparkContext("test_bulk_insert_datasource")
|
||||||
val path = java.nio.file.Files.createTempDirectory("hoodie_test_path")
|
val path = java.nio.file.Files.createTempDirectory("hoodie_test_path")
|
||||||
|
|||||||
@@ -351,10 +351,12 @@ public class DeltaSync implements Serializable {
|
|||||||
return Pair.of(schemaProvider, Pair.of(checkpointStr, jssc.emptyRDD()));
|
return Pair.of(schemaProvider, Pair.of(checkpointStr, jssc.emptyRDD()));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
boolean shouldCombine = cfg.filterDupes || cfg.operation.equals(HoodieDeltaStreamer.Operation.UPSERT);
|
||||||
JavaRDD<GenericRecord> avroRDD = avroRDDOptional.get();
|
JavaRDD<GenericRecord> avroRDD = avroRDDOptional.get();
|
||||||
JavaRDD<HoodieRecord> records = avroRDD.map(gr -> {
|
JavaRDD<HoodieRecord> records = avroRDD.map(gr -> {
|
||||||
HoodieRecordPayload payload = DataSourceUtils.createPayload(cfg.payloadClassName, gr,
|
HoodieRecordPayload payload = shouldCombine ? DataSourceUtils.createPayload(cfg.payloadClassName, gr,
|
||||||
(Comparable) HoodieAvroUtils.getNestedFieldVal(gr, cfg.sourceOrderingField, false));
|
(Comparable) HoodieAvroUtils.getNestedFieldVal(gr, cfg.sourceOrderingField, false))
|
||||||
|
: DataSourceUtils.createPayload(cfg.payloadClassName, gr);
|
||||||
return new HoodieRecord<>(keyGenerator.getKey(gr), payload);
|
return new HoodieRecord<>(keyGenerator.getKey(gr), payload);
|
||||||
});
|
});
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user