From 369a8493378f069d5471098d75a251fa67d99737 Mon Sep 17 00:00:00 2001 From: mpouttu <83429688+mpouttu@users.noreply.github.com> Date: Sun, 23 May 2021 22:29:01 -0700 Subject: [PATCH] [HUDI-1873] collect() call causing issues with very large upserts (#2907) Co-authored-by: Sivabalan Narayanan --- .../scala/org/apache/hudi/HoodieSparkSqlWriter.scala | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala index 9653e6f19..93442d5cd 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala @@ -540,9 +540,8 @@ private[hudi] object HoodieSparkSqlWriter { jsc: JavaSparkContext, tableInstantInfo: TableInstantInfo ): (Boolean, common.util.Option[java.lang.String]) = { - val errorCount = writeResult.getWriteStatuses.rdd.filter(ws => ws.hasErrors).count() - if (errorCount == 0) { - log.info("No errors. Proceeding to commit the write.") + if(writeResult.getWriteStatuses.rdd.filter(ws => ws.hasErrors).isEmpty()) { + log.info("Proceeding to commit the write.") val metaMap = parameters.filter(kv => kv._1.startsWith(parameters(COMMIT_METADATA_KEYPREFIX_OPT_KEY))) val commitSuccess = @@ -559,7 +558,7 @@ private[hudi] object HoodieSparkSqlWriter { } val asyncCompactionEnabled = isAsyncCompactionEnabled(client, tableConfig, parameters, jsc.hadoopConfiguration()) - val compactionInstant : common.util.Option[java.lang.String] = + val compactionInstant: common.util.Option[java.lang.String] = if (asyncCompactionEnabled) { client.scheduleCompaction(common.util.Option.of(new util.HashMap[String, String](mapAsJavaMap(metaMap)))) } else { @@ -568,7 +567,7 @@ private[hudi] object HoodieSparkSqlWriter { log.info(s"Compaction Scheduled is $compactionInstant") - val metaSyncSuccess = metaSync(spark, parameters, tableInstantInfo.basePath, schema) + val metaSyncSuccess = metaSync(spark, parameters, tableInstantInfo.basePath, schema) log.info(s"Is Async Compaction Enabled ? $asyncCompactionEnabled") if (!asyncCompactionEnabled) { @@ -576,7 +575,7 @@ private[hudi] object HoodieSparkSqlWriter { } (commitSuccess && metaSyncSuccess, compactionInstant) } else { - log.error(s"${tableInstantInfo.operation} failed with $errorCount errors :") + log.error(s"${tableInstantInfo.operation} failed with errors") if (log.isTraceEnabled) { log.trace("Printing out the top 100 errors") writeResult.getWriteStatuses.rdd.filter(ws => ws.hasErrors)