From 4b75cb6f23a0708fd55659cadf691d30086f5f13 Mon Sep 17 00:00:00 2001 From: peanut-chenzhong <58263343+peanut-chenzhong@users.noreply.github.com> Date: Mon, 14 Mar 2022 16:40:38 +0800 Subject: [PATCH] fix NPE when run schdule using spark-sql if the commits time < hoodie.compact.inline.max.delta.commits (#4976) * Update CompactionHoodiePathCommand.scala fix NPE when run schdule using spark-sql if the commits time < hoodie.compact.inline.max.delta.commits * Update CompactionHoodiePathCommand.scala fix IndexOutOfBoundsException when there`s no schedule for compaction * Update CompactionHoodiePathCommand.scala fix CI issue --- .../sql/hudi/command/CompactionHoodiePathCommand.scala | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/CompactionHoodiePathCommand.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/CompactionHoodiePathCommand.scala index 1135981a9..7bd9a3f22 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/CompactionHoodiePathCommand.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/CompactionHoodiePathCommand.scala @@ -50,7 +50,7 @@ case class CompactionHoodiePathCommand(path: String, if (client.scheduleCompactionAtInstant(instantTime, HOption.empty[java.util.Map[String, String]])) { Seq(Row(instantTime)) } else { - Seq(Row(null)) + Seq.empty[Row] } case RUN => // Do compaction @@ -64,8 +64,12 @@ case class CompactionHoodiePathCommand(path: String, pendingCompactionInstants } else { // If there are no pending compaction, schedule to generate one. // CompactionHoodiePathCommand will return instanceTime for SCHEDULE. - Seq(CompactionHoodiePathCommand(path, CompactionOperation.SCHEDULE) - .run(sparkSession).take(1).get(0).getString(0)).filter(_ != null) + val scheduleSeq = CompactionHoodiePathCommand(path, CompactionOperation.SCHEDULE).run(sparkSession) + if (scheduleSeq.isEmpty) { + Seq.empty + } else { + Seq(scheduleSeq.take(1).get(0).getString(0)).filter(_ != null) + } } } else { // Check if the compaction timestamp has exists in the pending compaction