[HUDI-1688]hudi write should uncache rdd, when the write operation is finnished (#2673)
This commit is contained in:
@@ -221,6 +221,23 @@ private[hudi] object HoodieSparkSqlWriter {
|
|||||||
val (writeSuccessful, compactionInstant) =
|
val (writeSuccessful, compactionInstant) =
|
||||||
commitAndPerformPostOperations(writeResult, parameters, writeClient, tableConfig, jsc,
|
commitAndPerformPostOperations(writeResult, parameters, writeClient, tableConfig, jsc,
|
||||||
TableInstantInfo(basePath, instantTime, commitActionType, operation))
|
TableInstantInfo(basePath, instantTime, commitActionType, operation))
|
||||||
|
|
||||||
|
def unpersistRdd(rdd: RDD[_]): Unit = {
|
||||||
|
if (sparkContext.getPersistentRDDs.contains(rdd.id)) {
|
||||||
|
try {
|
||||||
|
rdd.unpersist()
|
||||||
|
} catch {
|
||||||
|
case t: Exception => log.warn("Got excepting trying to unpersist rdd", t)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
val parentRdds = rdd.dependencies.map(_.rdd)
|
||||||
|
parentRdds.foreach { parentRdd =>
|
||||||
|
unpersistRdd(parentRdd)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
// it's safe to unpersist cached rdds here
|
||||||
|
unpersistRdd(writeResult.getWriteStatuses.rdd)
|
||||||
|
|
||||||
(writeSuccessful, common.util.Option.ofNullable(instantTime), compactionInstant, writeClient, tableConfig)
|
(writeSuccessful, common.util.Option.ofNullable(instantTime), compactionInstant, writeClient, tableConfig)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user