[HUDI-1873] collect() call causing issues with very large upserts (#2907)
Co-authored-by: Sivabalan Narayanan <sivabala@uber.com>
This commit is contained in:
@@ -540,9 +540,8 @@ private[hudi] object HoodieSparkSqlWriter {
|
||||
jsc: JavaSparkContext,
|
||||
tableInstantInfo: TableInstantInfo
|
||||
): (Boolean, common.util.Option[java.lang.String]) = {
|
||||
val errorCount = writeResult.getWriteStatuses.rdd.filter(ws => ws.hasErrors).count()
|
||||
if (errorCount == 0) {
|
||||
log.info("No errors. Proceeding to commit the write.")
|
||||
if(writeResult.getWriteStatuses.rdd.filter(ws => ws.hasErrors).isEmpty()) {
|
||||
log.info("Proceeding to commit the write.")
|
||||
val metaMap = parameters.filter(kv =>
|
||||
kv._1.startsWith(parameters(COMMIT_METADATA_KEYPREFIX_OPT_KEY)))
|
||||
val commitSuccess =
|
||||
@@ -559,7 +558,7 @@ private[hudi] object HoodieSparkSqlWriter {
|
||||
}
|
||||
|
||||
val asyncCompactionEnabled = isAsyncCompactionEnabled(client, tableConfig, parameters, jsc.hadoopConfiguration())
|
||||
val compactionInstant : common.util.Option[java.lang.String] =
|
||||
val compactionInstant: common.util.Option[java.lang.String] =
|
||||
if (asyncCompactionEnabled) {
|
||||
client.scheduleCompaction(common.util.Option.of(new util.HashMap[String, String](mapAsJavaMap(metaMap))))
|
||||
} else {
|
||||
@@ -568,7 +567,7 @@ private[hudi] object HoodieSparkSqlWriter {
|
||||
|
||||
log.info(s"Compaction Scheduled is $compactionInstant")
|
||||
|
||||
val metaSyncSuccess = metaSync(spark, parameters, tableInstantInfo.basePath, schema)
|
||||
val metaSyncSuccess = metaSync(spark, parameters, tableInstantInfo.basePath, schema)
|
||||
|
||||
log.info(s"Is Async Compaction Enabled ? $asyncCompactionEnabled")
|
||||
if (!asyncCompactionEnabled) {
|
||||
@@ -576,7 +575,7 @@ private[hudi] object HoodieSparkSqlWriter {
|
||||
}
|
||||
(commitSuccess && metaSyncSuccess, compactionInstant)
|
||||
} else {
|
||||
log.error(s"${tableInstantInfo.operation} failed with $errorCount errors :")
|
||||
log.error(s"${tableInstantInfo.operation} failed with errors")
|
||||
if (log.isTraceEnabled) {
|
||||
log.trace("Printing out the top 100 errors")
|
||||
writeResult.getWriteStatuses.rdd.filter(ws => ws.hasErrors)
|
||||
|
||||
Reference in New Issue
Block a user