fix NPE when run schdule using spark-sql if the commits time < hoodie.compact.inline.max.delta.commits (#4976)
* Update CompactionHoodiePathCommand.scala fix NPE when run schdule using spark-sql if the commits time < hoodie.compact.inline.max.delta.commits * Update CompactionHoodiePathCommand.scala fix IndexOutOfBoundsException when there`s no schedule for compaction * Update CompactionHoodiePathCommand.scala fix CI issue
This commit is contained in:
@@ -50,7 +50,7 @@ case class CompactionHoodiePathCommand(path: String,
|
||||
if (client.scheduleCompactionAtInstant(instantTime, HOption.empty[java.util.Map[String, String]])) {
|
||||
Seq(Row(instantTime))
|
||||
} else {
|
||||
Seq(Row(null))
|
||||
Seq.empty[Row]
|
||||
}
|
||||
case RUN =>
|
||||
// Do compaction
|
||||
@@ -64,8 +64,12 @@ case class CompactionHoodiePathCommand(path: String,
|
||||
pendingCompactionInstants
|
||||
} else { // If there are no pending compaction, schedule to generate one.
|
||||
// CompactionHoodiePathCommand will return instanceTime for SCHEDULE.
|
||||
Seq(CompactionHoodiePathCommand(path, CompactionOperation.SCHEDULE)
|
||||
.run(sparkSession).take(1).get(0).getString(0)).filter(_ != null)
|
||||
val scheduleSeq = CompactionHoodiePathCommand(path, CompactionOperation.SCHEDULE).run(sparkSession)
|
||||
if (scheduleSeq.isEmpty) {
|
||||
Seq.empty
|
||||
} else {
|
||||
Seq(scheduleSeq.take(1).get(0).getString(0)).filter(_ != null)
|
||||
}
|
||||
}
|
||||
} else {
|
||||
// Check if the compaction timestamp has exists in the pending compaction
|
||||
|
||||
Reference in New Issue
Block a user