diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala index 9653e6f19..93442d5cd 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala @@ -540,9 +540,8 @@ private[hudi] object HoodieSparkSqlWriter { jsc: JavaSparkContext, tableInstantInfo: TableInstantInfo ): (Boolean, common.util.Option[java.lang.String]) = { - val errorCount = writeResult.getWriteStatuses.rdd.filter(ws => ws.hasErrors).count() - if (errorCount == 0) { - log.info("No errors. Proceeding to commit the write.") + if(writeResult.getWriteStatuses.rdd.filter(ws => ws.hasErrors).isEmpty()) { + log.info("Proceeding to commit the write.") val metaMap = parameters.filter(kv => kv._1.startsWith(parameters(COMMIT_METADATA_KEYPREFIX_OPT_KEY))) val commitSuccess = @@ -559,7 +558,7 @@ private[hudi] object HoodieSparkSqlWriter { } val asyncCompactionEnabled = isAsyncCompactionEnabled(client, tableConfig, parameters, jsc.hadoopConfiguration()) - val compactionInstant : common.util.Option[java.lang.String] = + val compactionInstant: common.util.Option[java.lang.String] = if (asyncCompactionEnabled) { client.scheduleCompaction(common.util.Option.of(new util.HashMap[String, String](mapAsJavaMap(metaMap)))) } else { @@ -568,7 +567,7 @@ private[hudi] object HoodieSparkSqlWriter { log.info(s"Compaction Scheduled is $compactionInstant") - val metaSyncSuccess = metaSync(spark, parameters, tableInstantInfo.basePath, schema) + val metaSyncSuccess = metaSync(spark, parameters, tableInstantInfo.basePath, schema) log.info(s"Is Async Compaction Enabled ? $asyncCompactionEnabled") if (!asyncCompactionEnabled) { @@ -576,7 +575,7 @@ private[hudi] object HoodieSparkSqlWriter { } (commitSuccess && metaSyncSuccess, compactionInstant) } else { - log.error(s"${tableInstantInfo.operation} failed with $errorCount errors :") + log.error(s"${tableInstantInfo.operation} failed with errors") if (log.isTraceEnabled) { log.trace("Printing out the top 100 errors") writeResult.getWriteStatuses.rdd.filter(ws => ws.hasErrors)