feat(SparkDataSource): add additional feature to drop later arriving dups
This commit is contained in:
committed by
vinoth chandar
parent
8485b9e263
commit
1628d044ac
@@ -196,6 +196,16 @@ public class HoodieTestDataGenerator {
|
||||
return inserts;
|
||||
}
|
||||
|
||||
public List<HoodieRecord> generateSameKeyInserts(String commitTime, List<HoodieRecord> origin) throws IOException {
|
||||
List<HoodieRecord> copy = new ArrayList<>();
|
||||
for (HoodieRecord r: origin) {
|
||||
HoodieKey key = r.getKey();
|
||||
HoodieRecord record = new HoodieRecord(key, generateRandomValue(key, commitTime));
|
||||
copy.add(record);
|
||||
}
|
||||
return copy;
|
||||
}
|
||||
|
||||
public List<HoodieRecord> generateDeletes(String commitTime, Integer n) throws IOException {
|
||||
List<HoodieRecord> inserts = generateInserts(commitTime, n);
|
||||
return generateDeletesFromExistingRecords(inserts);
|
||||
|
||||
@@ -158,4 +158,15 @@ public class DataSourceUtils {
|
||||
return incomingHoodieRecords;
|
||||
}
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
public static JavaRDD<HoodieRecord> dropDuplicates(JavaSparkContext jssc,
|
||||
JavaRDD<HoodieRecord> incomingHoodieRecords,
|
||||
Map<String, String> parameters) throws Exception {
|
||||
HoodieWriteConfig writeConfig = HoodieWriteConfig
|
||||
.newBuilder()
|
||||
.withPath(parameters.get("path"))
|
||||
.withProps(parameters).build();
|
||||
return dropDuplicates(jssc, incomingHoodieRecords, writeConfig);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -145,6 +145,13 @@ object DataSourceWriteOptions {
|
||||
val COMMIT_METADATA_KEYPREFIX_OPT_KEY = "hoodie.datasource.write.commitmeta.key.prefix"
|
||||
val DEFAULT_COMMIT_METADATA_KEYPREFIX_OPT_VAL = "_"
|
||||
|
||||
/**
|
||||
* Flag to indicate whether to drop duplicates upon insert.
|
||||
* By default insert will accept duplicates, to gain extra performance.
|
||||
*/
|
||||
val INSERT_DROP_DUPS_OPT_KEY = "hoodie.datasource.write.insert.drop.duplicates"
|
||||
val DEFAULT_INSERT_DROP_DUPS_OPT_VAL = "false"
|
||||
|
||||
// HIVE SYNC SPECIFIC CONFIGS
|
||||
//NOTE: DO NOT USE uppercase for the keys as they are internally lower-cased. Using upper-cases causes
|
||||
// unexpected issues with config getting reset
|
||||
|
||||
@@ -122,6 +122,7 @@ class DefaultSource extends RelationProvider
|
||||
defaultsMap.putIfAbsent(PARTITIONPATH_FIELD_OPT_KEY, DEFAULT_PARTITIONPATH_FIELD_OPT_VAL)
|
||||
defaultsMap.putIfAbsent(KEYGENERATOR_CLASS_OPT_KEY, DEFAULT_KEYGENERATOR_CLASS_OPT_VAL)
|
||||
defaultsMap.putIfAbsent(COMMIT_METADATA_KEYPREFIX_OPT_KEY, DEFAULT_COMMIT_METADATA_KEYPREFIX_OPT_VAL)
|
||||
defaultsMap.putIfAbsent(INSERT_DROP_DUPS_OPT_KEY, DEFAULT_INSERT_DROP_DUPS_OPT_VAL)
|
||||
defaultsMap.putIfAbsent(HIVE_SYNC_ENABLED_OPT_KEY, DEFAULT_HIVE_SYNC_ENABLED_OPT_VAL)
|
||||
defaultsMap.putIfAbsent(HIVE_DATABASE_OPT_KEY, DEFAULT_HIVE_DATABASE_OPT_VAL)
|
||||
defaultsMap.putIfAbsent(HIVE_TABLE_OPT_KEY, DEFAULT_HIVE_TABLE_OPT_VAL)
|
||||
@@ -159,7 +160,21 @@ class DefaultSource extends RelationProvider
|
||||
}
|
||||
|
||||
val storageType = parameters(STORAGE_TYPE_OPT_KEY)
|
||||
val operation = parameters(OPERATION_OPT_KEY)
|
||||
val operation =
|
||||
// It does not make sense to allow upsert() operation if INSERT_DROP_DUPS_OPT_KEY is true
|
||||
// Auto-correct the operation to "insert" if OPERATION_OPT_KEY is set to "upsert" wrongly
|
||||
// or not set (in which case it will be set as "upsert" by parametersWithWriteDefaults()) .
|
||||
if (parameters(INSERT_DROP_DUPS_OPT_KEY).toBoolean &&
|
||||
parameters(OPERATION_OPT_KEY) == UPSERT_OPERATION_OPT_VAL) {
|
||||
|
||||
log.warn(s"$UPSERT_OPERATION_OPT_VAL is not applicable " +
|
||||
s"when $INSERT_DROP_DUPS_OPT_KEY is set to be true, " +
|
||||
s"overriding the $OPERATION_OPT_KEY to be $INSERT_OPERATION_OPT_VAL")
|
||||
|
||||
INSERT_OPERATION_OPT_VAL
|
||||
} else {
|
||||
parameters(OPERATION_OPT_KEY)
|
||||
}
|
||||
|
||||
// register classes & schemas
|
||||
val structName = s"${tblName.get}_record"
|
||||
@@ -177,90 +192,104 @@ class DefaultSource extends RelationProvider
|
||||
toProperties(parameters)
|
||||
)
|
||||
val genericRecords: RDD[GenericRecord] = AvroConversionUtils.createRdd(df, structName, nameSpace)
|
||||
val hoodieRecords = genericRecords.map(gr => {
|
||||
val hoodieAllIncomingRecords = genericRecords.map(gr => {
|
||||
val orderingVal = DataSourceUtils.getNestedFieldValAsString(
|
||||
gr, parameters(PRECOMBINE_FIELD_OPT_KEY)).asInstanceOf[Comparable[_]]
|
||||
DataSourceUtils.createHoodieRecord(gr,
|
||||
orderingVal, keyGenerator.getKey(gr), parameters(PAYLOAD_CLASS_OPT_KEY))
|
||||
}).toJavaRDD();
|
||||
|
||||
val jsc = new JavaSparkContext(sparkContext)
|
||||
|
||||
val basePath = new Path(parameters.get("path").get)
|
||||
val fs = basePath.getFileSystem(sparkContext.hadoopConfiguration)
|
||||
var exists = fs.exists(basePath)
|
||||
|
||||
// Handle various save modes
|
||||
if (mode == SaveMode.ErrorIfExists && exists) {
|
||||
throw new HoodieException(s"basePath ${basePath} already exists.")
|
||||
}
|
||||
if (mode == SaveMode.Ignore && exists) {
|
||||
log.warn(s" basePath ${basePath} already exists. Ignoring & not performing actual writes.")
|
||||
return createRelation(sqlContext, parameters, df.schema)
|
||||
}
|
||||
if (mode == SaveMode.Overwrite && exists) {
|
||||
log.warn(s" basePath ${basePath} already exists. Deleting existing data & overwriting with new data.")
|
||||
fs.delete(basePath, true)
|
||||
exists = false
|
||||
}
|
||||
|
||||
// Create the dataset if not present (APPEND mode)
|
||||
if (!exists) {
|
||||
HoodieTableMetaClient.initTableType(sparkContext.hadoopConfiguration, path.get, storageType,
|
||||
tblName.get, "archived")
|
||||
}
|
||||
|
||||
// Create a HoodieWriteClient & issue the write.
|
||||
val jsc = new JavaSparkContext(sparkContext);
|
||||
val client = DataSourceUtils.createHoodieClient(jsc,
|
||||
schema.toString,
|
||||
path.get,
|
||||
tblName.get,
|
||||
mapAsJavaMap(parameters)
|
||||
)
|
||||
val commitTime = client.startCommit();
|
||||
|
||||
val writeStatuses = DataSourceUtils.doWriteOperation(client, hoodieRecords, commitTime, operation)
|
||||
// Check for errors and commit the write.
|
||||
val errorCount = writeStatuses.rdd.filter(ws => ws.hasErrors).count()
|
||||
if (errorCount == 0) {
|
||||
log.info("No errors. Proceeding to commit the write.");
|
||||
val metaMap = parameters.filter(kv =>
|
||||
kv._1.startsWith(parameters(COMMIT_METADATA_KEYPREFIX_OPT_KEY)))
|
||||
val success = if (metaMap.isEmpty) {
|
||||
client.commit(commitTime, writeStatuses)
|
||||
val hoodieRecords =
|
||||
if (parameters(INSERT_DROP_DUPS_OPT_KEY).toBoolean) {
|
||||
DataSourceUtils.dropDuplicates(
|
||||
jsc,
|
||||
hoodieAllIncomingRecords,
|
||||
mapAsJavaMap(parameters))
|
||||
} else {
|
||||
client.commit(commitTime, writeStatuses,
|
||||
Optional.of(new util.HashMap[String, String](mapAsJavaMap(metaMap))))
|
||||
hoodieAllIncomingRecords
|
||||
}
|
||||
|
||||
if (success) {
|
||||
log.info("Commit " + commitTime + " successful!")
|
||||
if (!hoodieRecords.isEmpty()) {
|
||||
val basePath = new Path(parameters.get("path").get)
|
||||
val fs = basePath.getFileSystem(sparkContext.hadoopConfiguration)
|
||||
var exists = fs.exists(basePath)
|
||||
|
||||
// Handle various save modes
|
||||
if (mode == SaveMode.ErrorIfExists && exists) {
|
||||
throw new HoodieException(s"basePath ${basePath} already exists.")
|
||||
}
|
||||
else {
|
||||
log.info("Commit " + commitTime + " failed!")
|
||||
if (mode == SaveMode.Ignore && exists) {
|
||||
log.warn(s" basePath ${basePath} already exists. Ignoring & not performing actual writes.")
|
||||
return createRelation(sqlContext, parameters, df.schema)
|
||||
}
|
||||
if (mode == SaveMode.Overwrite && exists) {
|
||||
log.warn(s" basePath ${basePath} already exists. Deleting existing data & overwriting with new data.")
|
||||
fs.delete(basePath, true)
|
||||
exists = false
|
||||
}
|
||||
|
||||
val hiveSyncEnabled = parameters.get(HIVE_SYNC_ENABLED_OPT_KEY).map(r => r.toBoolean).getOrElse(false)
|
||||
if (hiveSyncEnabled) {
|
||||
log.info("Syncing to Hive Metastore (URL: " + parameters(HIVE_URL_OPT_KEY) + ")")
|
||||
val fs = FSUtils.getFs(basePath.toString, jsc.hadoopConfiguration)
|
||||
syncHive(basePath, fs, parameters)
|
||||
// Create the dataset if not present (APPEND mode)
|
||||
if (!exists) {
|
||||
HoodieTableMetaClient.initTableType(sparkContext.hadoopConfiguration, path.get, storageType,
|
||||
tblName.get, "archived")
|
||||
}
|
||||
|
||||
// Create a HoodieWriteClient & issue the write.
|
||||
val client = DataSourceUtils.createHoodieClient(jsc,
|
||||
schema.toString,
|
||||
path.get,
|
||||
tblName.get,
|
||||
mapAsJavaMap(parameters)
|
||||
)
|
||||
val commitTime = client.startCommit();
|
||||
|
||||
val writeStatuses = DataSourceUtils.doWriteOperation(client, hoodieRecords, commitTime, operation)
|
||||
// Check for errors and commit the write.
|
||||
val errorCount = writeStatuses.rdd.filter(ws => ws.hasErrors).count()
|
||||
if (errorCount == 0) {
|
||||
log.info("No errors. Proceeding to commit the write.");
|
||||
val metaMap = parameters.filter(kv =>
|
||||
kv._1.startsWith(parameters(COMMIT_METADATA_KEYPREFIX_OPT_KEY)))
|
||||
val success = if (metaMap.isEmpty) {
|
||||
client.commit(commitTime, writeStatuses)
|
||||
} else {
|
||||
client.commit(commitTime, writeStatuses,
|
||||
Optional.of(new util.HashMap[String, String](mapAsJavaMap(metaMap))))
|
||||
}
|
||||
|
||||
if (success) {
|
||||
log.info("Commit " + commitTime + " successful!")
|
||||
}
|
||||
else {
|
||||
log.info("Commit " + commitTime + " failed!")
|
||||
}
|
||||
|
||||
val hiveSyncEnabled = parameters.get(HIVE_SYNC_ENABLED_OPT_KEY).map(r => r.toBoolean).getOrElse(false)
|
||||
if (hiveSyncEnabled) {
|
||||
log.info("Syncing to Hive Metastore (URL: " + parameters(HIVE_URL_OPT_KEY) + ")")
|
||||
val fs = FSUtils.getFs(basePath.toString, jsc.hadoopConfiguration)
|
||||
syncHive(basePath, fs, parameters)
|
||||
}
|
||||
client.close
|
||||
} else {
|
||||
log.error(s"$operation failed with ${errorCount} errors :");
|
||||
if (log.isTraceEnabled) {
|
||||
log.trace("Printing out the top 100 errors")
|
||||
writeStatuses.rdd.filter(ws => ws.hasErrors)
|
||||
.take(100)
|
||||
.foreach(ws => {
|
||||
log.trace("Global error :", ws.getGlobalError)
|
||||
if (ws.getErrors.size() > 0) {
|
||||
ws.getErrors.foreach(kt =>
|
||||
log.trace(s"Error for key: ${kt._1}", kt._2))
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
client.close
|
||||
} else {
|
||||
log.error(s"Upsert failed with ${errorCount} errors :");
|
||||
if (log.isTraceEnabled) {
|
||||
log.trace("Printing out the top 100 errors")
|
||||
writeStatuses.rdd.filter(ws => ws.hasErrors)
|
||||
.take(100)
|
||||
.foreach(ws => {
|
||||
log.trace("Global error :", ws.getGlobalError)
|
||||
if (ws.getErrors.size() > 0) {
|
||||
ws.getErrors.foreach(kt =>
|
||||
log.trace(s"Error for key: ${kt._1}", kt._2))
|
||||
}
|
||||
})
|
||||
}
|
||||
log.info("new batch has no new records, skipping...")
|
||||
}
|
||||
createRelation(sqlContext, parameters, df.schema)
|
||||
}
|
||||
|
||||
@@ -139,4 +139,47 @@ class DataSourceTest extends AssertionsForJUnit {
|
||||
val hoodieROViewDF1 = spark.read.format("com.uber.hoodie").load(basePath + "/*/*/*/*")
|
||||
assertEquals(100, hoodieROViewDF1.count()) // still 100, since we only updated
|
||||
}
|
||||
|
||||
@Test def testDropInsertDup(): Unit = {
|
||||
val insert1Cnt = 10
|
||||
val insert2DupKeyCnt = 9
|
||||
val insert2NewKeyCnt = 2
|
||||
|
||||
val totalUniqueKeyToGenerate = insert1Cnt + insert2NewKeyCnt
|
||||
val allRecords = dataGen.generateInserts("001", totalUniqueKeyToGenerate)
|
||||
val inserts1 = allRecords.subList(0, insert1Cnt)
|
||||
val inserts2New = dataGen.generateSameKeyInserts("002", allRecords.subList(insert1Cnt, insert1Cnt + insert2NewKeyCnt))
|
||||
val inserts2Dup = dataGen.generateSameKeyInserts("002", inserts1.subList(0, insert2DupKeyCnt))
|
||||
|
||||
val records1 = DataSourceTestUtils.convertToStringList(inserts1).toList
|
||||
val inputDF1: Dataset[Row] = spark.read.json(spark.sparkContext.parallelize(records1, 2))
|
||||
inputDF1.write.format("com.uber.hoodie")
|
||||
.options(commonOpts)
|
||||
.option(DataSourceWriteOptions.OPERATION_OPT_KEY, DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL)
|
||||
.mode(SaveMode.Overwrite)
|
||||
.save(basePath)
|
||||
val hoodieROViewDF1 = spark.read.format("com.uber.hoodie")
|
||||
.load(basePath + "/*/*/*/*")
|
||||
assertEquals(insert1Cnt, hoodieROViewDF1.count())
|
||||
|
||||
val commitInstantTime1 = HoodieDataSourceHelpers.latestCommit(fs, basePath)
|
||||
val records2 = DataSourceTestUtils
|
||||
.convertToStringList(inserts2Dup ++ inserts2New)
|
||||
.toList
|
||||
val inputDF2: Dataset[Row] = spark.read.json(spark.sparkContext.parallelize(records2, 2))
|
||||
inputDF2.write.format("com.uber.hoodie")
|
||||
.options(commonOpts)
|
||||
.option(DataSourceWriteOptions.INSERT_DROP_DUPS_OPT_KEY, "true")
|
||||
.mode(SaveMode.Append)
|
||||
.save(basePath)
|
||||
val hoodieROViewDF2 = spark.read.format("com.uber.hoodie")
|
||||
.load(basePath + "/*/*/*/*")
|
||||
assertEquals(hoodieROViewDF2.count(), totalUniqueKeyToGenerate)
|
||||
|
||||
val hoodieIncViewDF2 = spark.read.format("com.uber.hoodie")
|
||||
.option(DataSourceReadOptions.VIEW_TYPE_OPT_KEY, DataSourceReadOptions.VIEW_TYPE_INCREMENTAL_OPT_VAL)
|
||||
.option(DataSourceReadOptions.BEGIN_INSTANTTIME_OPT_KEY, commitInstantTime1)
|
||||
.load(basePath)
|
||||
assertEquals(hoodieIncViewDF2.count(), insert2NewKeyCnt)
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user