From 1628d044ac5b2197fd8a565961f68b3aa7936f67 Mon Sep 17 00:00:00 2001 From: "jiale.tan" Date: Thu, 4 Oct 2018 17:56:51 -0700 Subject: [PATCH] feat(SparkDataSource): add additional feature to drop later arriving dups --- .../common/HoodieTestDataGenerator.java | 10 ++ .../java/com/uber/hoodie/DataSourceUtils.java | 11 ++ .../com/uber/hoodie/DataSourceOptions.scala | 7 + .../scala/com/uber/hoodie/DefaultSource.scala | 169 ++++++++++-------- .../src/test/scala/DataSourceTest.scala | 43 +++++ 5 files changed, 170 insertions(+), 70 deletions(-) diff --git a/hoodie-client/src/test/java/com/uber/hoodie/common/HoodieTestDataGenerator.java b/hoodie-client/src/test/java/com/uber/hoodie/common/HoodieTestDataGenerator.java index b056bd805..06d66aeb4 100644 --- a/hoodie-client/src/test/java/com/uber/hoodie/common/HoodieTestDataGenerator.java +++ b/hoodie-client/src/test/java/com/uber/hoodie/common/HoodieTestDataGenerator.java @@ -196,6 +196,16 @@ public class HoodieTestDataGenerator { return inserts; } + public List generateSameKeyInserts(String commitTime, List origin) throws IOException { + List copy = new ArrayList<>(); + for (HoodieRecord r: origin) { + HoodieKey key = r.getKey(); + HoodieRecord record = new HoodieRecord(key, generateRandomValue(key, commitTime)); + copy.add(record); + } + return copy; + } + public List generateDeletes(String commitTime, Integer n) throws IOException { List inserts = generateInserts(commitTime, n); return generateDeletesFromExistingRecords(inserts); diff --git a/hoodie-spark/src/main/java/com/uber/hoodie/DataSourceUtils.java b/hoodie-spark/src/main/java/com/uber/hoodie/DataSourceUtils.java index f953b2a3d..b02c36675 100644 --- a/hoodie-spark/src/main/java/com/uber/hoodie/DataSourceUtils.java +++ b/hoodie-spark/src/main/java/com/uber/hoodie/DataSourceUtils.java @@ -158,4 +158,15 @@ public class DataSourceUtils { return incomingHoodieRecords; } } + + @SuppressWarnings("unchecked") + public static JavaRDD dropDuplicates(JavaSparkContext jssc, + JavaRDD incomingHoodieRecords, + Map parameters) throws Exception { + HoodieWriteConfig writeConfig = HoodieWriteConfig + .newBuilder() + .withPath(parameters.get("path")) + .withProps(parameters).build(); + return dropDuplicates(jssc, incomingHoodieRecords, writeConfig); + } } diff --git a/hoodie-spark/src/main/scala/com/uber/hoodie/DataSourceOptions.scala b/hoodie-spark/src/main/scala/com/uber/hoodie/DataSourceOptions.scala index e52a0e93a..63e1d19c5 100644 --- a/hoodie-spark/src/main/scala/com/uber/hoodie/DataSourceOptions.scala +++ b/hoodie-spark/src/main/scala/com/uber/hoodie/DataSourceOptions.scala @@ -145,6 +145,13 @@ object DataSourceWriteOptions { val COMMIT_METADATA_KEYPREFIX_OPT_KEY = "hoodie.datasource.write.commitmeta.key.prefix" val DEFAULT_COMMIT_METADATA_KEYPREFIX_OPT_VAL = "_" + /** + * Flag to indicate whether to drop duplicates upon insert. + * By default insert will accept duplicates, to gain extra performance. + */ + val INSERT_DROP_DUPS_OPT_KEY = "hoodie.datasource.write.insert.drop.duplicates" + val DEFAULT_INSERT_DROP_DUPS_OPT_VAL = "false" + // HIVE SYNC SPECIFIC CONFIGS //NOTE: DO NOT USE uppercase for the keys as they are internally lower-cased. Using upper-cases causes // unexpected issues with config getting reset diff --git a/hoodie-spark/src/main/scala/com/uber/hoodie/DefaultSource.scala b/hoodie-spark/src/main/scala/com/uber/hoodie/DefaultSource.scala index 68ac868e8..608974baf 100644 --- a/hoodie-spark/src/main/scala/com/uber/hoodie/DefaultSource.scala +++ b/hoodie-spark/src/main/scala/com/uber/hoodie/DefaultSource.scala @@ -122,6 +122,7 @@ class DefaultSource extends RelationProvider defaultsMap.putIfAbsent(PARTITIONPATH_FIELD_OPT_KEY, DEFAULT_PARTITIONPATH_FIELD_OPT_VAL) defaultsMap.putIfAbsent(KEYGENERATOR_CLASS_OPT_KEY, DEFAULT_KEYGENERATOR_CLASS_OPT_VAL) defaultsMap.putIfAbsent(COMMIT_METADATA_KEYPREFIX_OPT_KEY, DEFAULT_COMMIT_METADATA_KEYPREFIX_OPT_VAL) + defaultsMap.putIfAbsent(INSERT_DROP_DUPS_OPT_KEY, DEFAULT_INSERT_DROP_DUPS_OPT_VAL) defaultsMap.putIfAbsent(HIVE_SYNC_ENABLED_OPT_KEY, DEFAULT_HIVE_SYNC_ENABLED_OPT_VAL) defaultsMap.putIfAbsent(HIVE_DATABASE_OPT_KEY, DEFAULT_HIVE_DATABASE_OPT_VAL) defaultsMap.putIfAbsent(HIVE_TABLE_OPT_KEY, DEFAULT_HIVE_TABLE_OPT_VAL) @@ -159,7 +160,21 @@ class DefaultSource extends RelationProvider } val storageType = parameters(STORAGE_TYPE_OPT_KEY) - val operation = parameters(OPERATION_OPT_KEY) + val operation = + // It does not make sense to allow upsert() operation if INSERT_DROP_DUPS_OPT_KEY is true + // Auto-correct the operation to "insert" if OPERATION_OPT_KEY is set to "upsert" wrongly + // or not set (in which case it will be set as "upsert" by parametersWithWriteDefaults()) . + if (parameters(INSERT_DROP_DUPS_OPT_KEY).toBoolean && + parameters(OPERATION_OPT_KEY) == UPSERT_OPERATION_OPT_VAL) { + + log.warn(s"$UPSERT_OPERATION_OPT_VAL is not applicable " + + s"when $INSERT_DROP_DUPS_OPT_KEY is set to be true, " + + s"overriding the $OPERATION_OPT_KEY to be $INSERT_OPERATION_OPT_VAL") + + INSERT_OPERATION_OPT_VAL + } else { + parameters(OPERATION_OPT_KEY) + } // register classes & schemas val structName = s"${tblName.get}_record" @@ -177,90 +192,104 @@ class DefaultSource extends RelationProvider toProperties(parameters) ) val genericRecords: RDD[GenericRecord] = AvroConversionUtils.createRdd(df, structName, nameSpace) - val hoodieRecords = genericRecords.map(gr => { + val hoodieAllIncomingRecords = genericRecords.map(gr => { val orderingVal = DataSourceUtils.getNestedFieldValAsString( gr, parameters(PRECOMBINE_FIELD_OPT_KEY)).asInstanceOf[Comparable[_]] DataSourceUtils.createHoodieRecord(gr, orderingVal, keyGenerator.getKey(gr), parameters(PAYLOAD_CLASS_OPT_KEY)) }).toJavaRDD(); + val jsc = new JavaSparkContext(sparkContext) - val basePath = new Path(parameters.get("path").get) - val fs = basePath.getFileSystem(sparkContext.hadoopConfiguration) - var exists = fs.exists(basePath) - - // Handle various save modes - if (mode == SaveMode.ErrorIfExists && exists) { - throw new HoodieException(s"basePath ${basePath} already exists.") - } - if (mode == SaveMode.Ignore && exists) { - log.warn(s" basePath ${basePath} already exists. Ignoring & not performing actual writes.") - return createRelation(sqlContext, parameters, df.schema) - } - if (mode == SaveMode.Overwrite && exists) { - log.warn(s" basePath ${basePath} already exists. Deleting existing data & overwriting with new data.") - fs.delete(basePath, true) - exists = false - } - - // Create the dataset if not present (APPEND mode) - if (!exists) { - HoodieTableMetaClient.initTableType(sparkContext.hadoopConfiguration, path.get, storageType, - tblName.get, "archived") - } - - // Create a HoodieWriteClient & issue the write. - val jsc = new JavaSparkContext(sparkContext); - val client = DataSourceUtils.createHoodieClient(jsc, - schema.toString, - path.get, - tblName.get, - mapAsJavaMap(parameters) - ) - val commitTime = client.startCommit(); - - val writeStatuses = DataSourceUtils.doWriteOperation(client, hoodieRecords, commitTime, operation) - // Check for errors and commit the write. - val errorCount = writeStatuses.rdd.filter(ws => ws.hasErrors).count() - if (errorCount == 0) { - log.info("No errors. Proceeding to commit the write."); - val metaMap = parameters.filter(kv => - kv._1.startsWith(parameters(COMMIT_METADATA_KEYPREFIX_OPT_KEY))) - val success = if (metaMap.isEmpty) { - client.commit(commitTime, writeStatuses) + val hoodieRecords = + if (parameters(INSERT_DROP_DUPS_OPT_KEY).toBoolean) { + DataSourceUtils.dropDuplicates( + jsc, + hoodieAllIncomingRecords, + mapAsJavaMap(parameters)) } else { - client.commit(commitTime, writeStatuses, - Optional.of(new util.HashMap[String, String](mapAsJavaMap(metaMap)))) + hoodieAllIncomingRecords } - if (success) { - log.info("Commit " + commitTime + " successful!") + if (!hoodieRecords.isEmpty()) { + val basePath = new Path(parameters.get("path").get) + val fs = basePath.getFileSystem(sparkContext.hadoopConfiguration) + var exists = fs.exists(basePath) + + // Handle various save modes + if (mode == SaveMode.ErrorIfExists && exists) { + throw new HoodieException(s"basePath ${basePath} already exists.") } - else { - log.info("Commit " + commitTime + " failed!") + if (mode == SaveMode.Ignore && exists) { + log.warn(s" basePath ${basePath} already exists. Ignoring & not performing actual writes.") + return createRelation(sqlContext, parameters, df.schema) + } + if (mode == SaveMode.Overwrite && exists) { + log.warn(s" basePath ${basePath} already exists. Deleting existing data & overwriting with new data.") + fs.delete(basePath, true) + exists = false } - val hiveSyncEnabled = parameters.get(HIVE_SYNC_ENABLED_OPT_KEY).map(r => r.toBoolean).getOrElse(false) - if (hiveSyncEnabled) { - log.info("Syncing to Hive Metastore (URL: " + parameters(HIVE_URL_OPT_KEY) + ")") - val fs = FSUtils.getFs(basePath.toString, jsc.hadoopConfiguration) - syncHive(basePath, fs, parameters) + // Create the dataset if not present (APPEND mode) + if (!exists) { + HoodieTableMetaClient.initTableType(sparkContext.hadoopConfiguration, path.get, storageType, + tblName.get, "archived") + } + + // Create a HoodieWriteClient & issue the write. + val client = DataSourceUtils.createHoodieClient(jsc, + schema.toString, + path.get, + tblName.get, + mapAsJavaMap(parameters) + ) + val commitTime = client.startCommit(); + + val writeStatuses = DataSourceUtils.doWriteOperation(client, hoodieRecords, commitTime, operation) + // Check for errors and commit the write. + val errorCount = writeStatuses.rdd.filter(ws => ws.hasErrors).count() + if (errorCount == 0) { + log.info("No errors. Proceeding to commit the write."); + val metaMap = parameters.filter(kv => + kv._1.startsWith(parameters(COMMIT_METADATA_KEYPREFIX_OPT_KEY))) + val success = if (metaMap.isEmpty) { + client.commit(commitTime, writeStatuses) + } else { + client.commit(commitTime, writeStatuses, + Optional.of(new util.HashMap[String, String](mapAsJavaMap(metaMap)))) + } + + if (success) { + log.info("Commit " + commitTime + " successful!") + } + else { + log.info("Commit " + commitTime + " failed!") + } + + val hiveSyncEnabled = parameters.get(HIVE_SYNC_ENABLED_OPT_KEY).map(r => r.toBoolean).getOrElse(false) + if (hiveSyncEnabled) { + log.info("Syncing to Hive Metastore (URL: " + parameters(HIVE_URL_OPT_KEY) + ")") + val fs = FSUtils.getFs(basePath.toString, jsc.hadoopConfiguration) + syncHive(basePath, fs, parameters) + } + client.close + } else { + log.error(s"$operation failed with ${errorCount} errors :"); + if (log.isTraceEnabled) { + log.trace("Printing out the top 100 errors") + writeStatuses.rdd.filter(ws => ws.hasErrors) + .take(100) + .foreach(ws => { + log.trace("Global error :", ws.getGlobalError) + if (ws.getErrors.size() > 0) { + ws.getErrors.foreach(kt => + log.trace(s"Error for key: ${kt._1}", kt._2)) + } + }) + } } - client.close } else { - log.error(s"Upsert failed with ${errorCount} errors :"); - if (log.isTraceEnabled) { - log.trace("Printing out the top 100 errors") - writeStatuses.rdd.filter(ws => ws.hasErrors) - .take(100) - .foreach(ws => { - log.trace("Global error :", ws.getGlobalError) - if (ws.getErrors.size() > 0) { - ws.getErrors.foreach(kt => - log.trace(s"Error for key: ${kt._1}", kt._2)) - } - }) - } + log.info("new batch has no new records, skipping...") } createRelation(sqlContext, parameters, df.schema) } diff --git a/hoodie-spark/src/test/scala/DataSourceTest.scala b/hoodie-spark/src/test/scala/DataSourceTest.scala index f75192b35..1ad42e777 100644 --- a/hoodie-spark/src/test/scala/DataSourceTest.scala +++ b/hoodie-spark/src/test/scala/DataSourceTest.scala @@ -139,4 +139,47 @@ class DataSourceTest extends AssertionsForJUnit { val hoodieROViewDF1 = spark.read.format("com.uber.hoodie").load(basePath + "/*/*/*/*") assertEquals(100, hoodieROViewDF1.count()) // still 100, since we only updated } + + @Test def testDropInsertDup(): Unit = { + val insert1Cnt = 10 + val insert2DupKeyCnt = 9 + val insert2NewKeyCnt = 2 + + val totalUniqueKeyToGenerate = insert1Cnt + insert2NewKeyCnt + val allRecords = dataGen.generateInserts("001", totalUniqueKeyToGenerate) + val inserts1 = allRecords.subList(0, insert1Cnt) + val inserts2New = dataGen.generateSameKeyInserts("002", allRecords.subList(insert1Cnt, insert1Cnt + insert2NewKeyCnt)) + val inserts2Dup = dataGen.generateSameKeyInserts("002", inserts1.subList(0, insert2DupKeyCnt)) + + val records1 = DataSourceTestUtils.convertToStringList(inserts1).toList + val inputDF1: Dataset[Row] = spark.read.json(spark.sparkContext.parallelize(records1, 2)) + inputDF1.write.format("com.uber.hoodie") + .options(commonOpts) + .option(DataSourceWriteOptions.OPERATION_OPT_KEY, DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL) + .mode(SaveMode.Overwrite) + .save(basePath) + val hoodieROViewDF1 = spark.read.format("com.uber.hoodie") + .load(basePath + "/*/*/*/*") + assertEquals(insert1Cnt, hoodieROViewDF1.count()) + + val commitInstantTime1 = HoodieDataSourceHelpers.latestCommit(fs, basePath) + val records2 = DataSourceTestUtils + .convertToStringList(inserts2Dup ++ inserts2New) + .toList + val inputDF2: Dataset[Row] = spark.read.json(spark.sparkContext.parallelize(records2, 2)) + inputDF2.write.format("com.uber.hoodie") + .options(commonOpts) + .option(DataSourceWriteOptions.INSERT_DROP_DUPS_OPT_KEY, "true") + .mode(SaveMode.Append) + .save(basePath) + val hoodieROViewDF2 = spark.read.format("com.uber.hoodie") + .load(basePath + "/*/*/*/*") + assertEquals(hoodieROViewDF2.count(), totalUniqueKeyToGenerate) + + val hoodieIncViewDF2 = spark.read.format("com.uber.hoodie") + .option(DataSourceReadOptions.VIEW_TYPE_OPT_KEY, DataSourceReadOptions.VIEW_TYPE_INCREMENTAL_OPT_VAL) + .option(DataSourceReadOptions.BEGIN_INSTANTTIME_OPT_KEY, commitInstantTime1) + .load(basePath) + assertEquals(hoodieIncViewDF2.count(), insert2NewKeyCnt) + } }