[HUDI-2479] HoodieFileIndex throws NPE for FileSlice with pure log files (#3702)
This commit is contained in:
@@ -56,7 +56,7 @@ import scala.collection.mutable
|
|||||||
*
|
*
|
||||||
* 2、If the partition columns size is not equal to the partition path level, but the partition
|
* 2、If the partition columns size is not equal to the partition path level, but the partition
|
||||||
* column size is "1" (e.g. partition column is "dt", but the partition path is "2021/03/10"
|
* column size is "1" (e.g. partition column is "dt", but the partition path is "2021/03/10"
|
||||||
* who'es directory level is 3).We can still read it as a partitioned table. We will mapping the
|
* who's directory level is 3).We can still read it as a partitioned table. We will mapping the
|
||||||
* partition path (e.g. 2021/03/10) to the only partition column (e.g. "dt").
|
* partition path (e.g. 2021/03/10) to the only partition column (e.g. "dt").
|
||||||
*
|
*
|
||||||
* 3、Else the the partition columns size is not equal to the partition directory level and the
|
* 3、Else the the partition columns size is not equal to the partition directory level and the
|
||||||
@@ -256,7 +256,7 @@ case class HoodieFileIndex(
|
|||||||
.iterator().asScala.toSeq
|
.iterator().asScala.toSeq
|
||||||
(p._1, fileSlices)
|
(p._1, fileSlices)
|
||||||
})
|
})
|
||||||
cachedFileSize = cachedAllInputFileSlices.values.flatten.map(_.getBaseFile.get().getFileLen).sum
|
cachedFileSize = cachedAllInputFileSlices.values.flatten.map(fileSliceSize).sum
|
||||||
}
|
}
|
||||||
|
|
||||||
// If the partition value contains InternalRow.empty, we query it as a non-partitioned table.
|
// If the partition value contains InternalRow.empty, we query it as a non-partitioned table.
|
||||||
@@ -266,6 +266,15 @@ case class HoodieFileIndex(
|
|||||||
s" spend: $flushSpend ms")
|
s" spend: $flushSpend ms")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private def fileSliceSize(fileSlice: FileSlice): Long = {
|
||||||
|
val logFileSize = fileSlice.getLogFiles.iterator().asScala.map(_.getFileSize).filter(_ > 0).sum
|
||||||
|
if (fileSlice.getBaseFile.isPresent) {
|
||||||
|
fileSlice.getBaseFile.get().getFileLen + logFileSize
|
||||||
|
} else {
|
||||||
|
logFileSize
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
override def sizeInBytes: Long = {
|
override def sizeInBytes: Long = {
|
||||||
cachedFileSize
|
cachedFileSize
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user