[HUDI-3723] Fixed stack overflows in Record Iterators (#5235)
This commit is contained in:
@@ -50,6 +50,7 @@ import org.apache.spark.{Partition, SerializableWritable, SparkContext, TaskCont
|
|||||||
|
|
||||||
import java.io.Closeable
|
import java.io.Closeable
|
||||||
import java.util.Properties
|
import java.util.Properties
|
||||||
|
import scala.annotation.tailrec
|
||||||
import scala.collection.JavaConverters._
|
import scala.collection.JavaConverters._
|
||||||
import scala.util.Try
|
import scala.util.Try
|
||||||
|
|
||||||
@@ -188,17 +189,23 @@ class HoodieMergeOnReadRDD(@transient sc: SparkContext,
|
|||||||
protected def removeLogRecord(key: String): Option[HoodieRecord[_ <: HoodieRecordPayload[_]]] =
|
protected def removeLogRecord(key: String): Option[HoodieRecord[_ <: HoodieRecordPayload[_]]] =
|
||||||
logRecords.remove(key)
|
logRecords.remove(key)
|
||||||
|
|
||||||
override def hasNext: Boolean =
|
override def hasNext: Boolean = hasNextInternal
|
||||||
|
|
||||||
|
// NOTE: It's crucial for this method to be annotated w/ [[@tailrec]] to make sure
|
||||||
|
// that recursion is unfolded into a loop to avoid stack overflows while
|
||||||
|
// handling records
|
||||||
|
@tailrec private def hasNextInternal: Boolean = {
|
||||||
logRecordsIterator.hasNext && {
|
logRecordsIterator.hasNext && {
|
||||||
val avroRecordOpt = logRecordsIterator.next()
|
val avroRecordOpt = logRecordsIterator.next()
|
||||||
if (avroRecordOpt.isEmpty) {
|
if (avroRecordOpt.isEmpty) {
|
||||||
// Record has been deleted, skipping
|
// Record has been deleted, skipping
|
||||||
this.hasNext
|
this.hasNextInternal
|
||||||
} else {
|
} else {
|
||||||
recordToLoad = unsafeProjection(deserialize(avroRecordOpt.get))
|
recordToLoad = unsafeProjection(deserialize(avroRecordOpt.get))
|
||||||
true
|
true
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
override final def next(): InternalRow = recordToLoad
|
override final def next(): InternalRow = recordToLoad
|
||||||
|
|
||||||
@@ -257,7 +264,12 @@ class HoodieMergeOnReadRDD(@transient sc: SparkContext,
|
|||||||
|
|
||||||
private val recordKeyOrdinal = baseFileReaderSchema.structTypeSchema.fieldIndex(tableState.recordKeyField)
|
private val recordKeyOrdinal = baseFileReaderSchema.structTypeSchema.fieldIndex(tableState.recordKeyField)
|
||||||
|
|
||||||
override def hasNext: Boolean = {
|
override def hasNext: Boolean = hasNextInternal
|
||||||
|
|
||||||
|
// NOTE: It's crucial for this method to be annotated w/ [[@tailrec]] to make sure
|
||||||
|
// that recursion is unfolded into a loop to avoid stack overflows while
|
||||||
|
// handling records
|
||||||
|
@tailrec private def hasNextInternal: Boolean = {
|
||||||
if (baseFileIterator.hasNext) {
|
if (baseFileIterator.hasNext) {
|
||||||
val curRowRecord = baseFileIterator.next()
|
val curRowRecord = baseFileIterator.next()
|
||||||
val curKey = curRowRecord.getString(recordKeyOrdinal)
|
val curKey = curRowRecord.getString(recordKeyOrdinal)
|
||||||
@@ -270,7 +282,7 @@ class HoodieMergeOnReadRDD(@transient sc: SparkContext,
|
|||||||
val mergedAvroRecordOpt = merge(serialize(curRowRecord), updatedRecordOpt.get)
|
val mergedAvroRecordOpt = merge(serialize(curRowRecord), updatedRecordOpt.get)
|
||||||
if (mergedAvroRecordOpt.isEmpty) {
|
if (mergedAvroRecordOpt.isEmpty) {
|
||||||
// Record has been deleted, skipping
|
// Record has been deleted, skipping
|
||||||
this.hasNext
|
this.hasNextInternal
|
||||||
} else {
|
} else {
|
||||||
// NOTE: In occurrence of a merge we can't know the schema of the record being returned, b/c
|
// NOTE: In occurrence of a merge we can't know the schema of the record being returned, b/c
|
||||||
// record from the Delta Log will bear (full) Table schema, while record from the Base file
|
// record from the Delta Log will bear (full) Table schema, while record from the Base file
|
||||||
|
|||||||
Reference in New Issue
Block a user