From 44e41dc9bb7c64f8b517d4a44742019f4dee4f70 Mon Sep 17 00:00:00 2001 From: Shawy Geng Date: Thu, 29 Jul 2021 23:16:58 +0800 Subject: [PATCH] =?UTF-8?q?[HUDI-2117]=20Unpersist=20the=20input=20rdd=20a?= =?UTF-8?q?fter=20the=20commit=20is=20completed=20to=20=E2=80=A6=20(#3207)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: Vinoth Chandar --- .../client/AbstractHoodieWriteClient.java | 15 ++++++++++--- .../hudi/client/SparkRDDWriteClient.java | 13 ++++++++--- .../table/TestHoodieMergeOnReadTable.java | 4 +--- .../apache/hudi/HoodieSparkSqlWriter.scala | 22 +++---------------- 4 files changed, 26 insertions(+), 28 deletions(-) diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java index 8c48f73b1..11af7f9e9 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java @@ -18,9 +18,6 @@ package org.apache.hudi.client; -import com.codahale.metrics.Timer; -import java.util.stream.Stream; -import org.apache.hadoop.conf.Configuration; import org.apache.hudi.avro.model.HoodieCleanMetadata; import org.apache.hudi.avro.model.HoodieCleanerPlan; import org.apache.hudi.avro.model.HoodieClusteringPlan; @@ -70,6 +67,9 @@ import org.apache.hudi.table.HoodieTimelineArchiveLog; import org.apache.hudi.table.MarkerFiles; import org.apache.hudi.table.action.HoodieWriteMetadata; import org.apache.hudi.table.action.savepoint.SavepointHelpers; + +import com.codahale.metrics.Timer; +import org.apache.hadoop.conf.Configuration; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; @@ -81,6 +81,7 @@ import java.util.Collections; import java.util.List; import java.util.Map; import java.util.stream.Collectors; +import java.util.stream.Stream; /** * Abstract Write Client providing functionality for performing commit, index updates and rollback @@ -189,6 +190,7 @@ public abstract class AbstractHoodieWriteClient extends HoodieWrapperFileSystem.setMetricsRegistry(registry, registryMeta); } } + + @Override + protected void releaseResources() { + ((HoodieSparkEngineContext) context).getJavaSparkContext().getPersistentRDDs().values() + .forEach(rdd -> rdd.unpersist()); + } } diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestHoodieMergeOnReadTable.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestHoodieMergeOnReadTable.java index 84d0b03e2..ab55462a5 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestHoodieMergeOnReadTable.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestHoodieMergeOnReadTable.java @@ -695,8 +695,8 @@ public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness { thirdClient.startCommitWithTime(newCommitTime); writeStatusJavaRDD = thirdClient.upsert(writeRecords, newCommitTime); - thirdClient.commit(newCommitTime, writeStatusJavaRDD); statuses = writeStatusJavaRDD.collect(); + thirdClient.commit(newCommitTime, writeStatusJavaRDD); // Verify there are no errors assertNoWriteErrors(statuses); @@ -1199,8 +1199,6 @@ public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness { JavaRDD recordsRDD = jsc.parallelize(records, 1); JavaRDD statuses = writeClient.insert(recordsRDD, newCommitTime); writeClient.commit(newCommitTime, statuses); - // trigger an action - statuses.collect(); HoodieTable table = HoodieSparkTable.create(config, context, getHoodieMetaClient(hadoopConf, basePath)); SliceView tableRTFileSystemView = table.getSliceView(); diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala index b2a8a0e0b..59d3207bd 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala @@ -40,12 +40,12 @@ import org.apache.hudi.keygen.factory.HoodieSparkKeyGeneratorFactory import org.apache.hudi.sync.common.AbstractSyncTool import org.apache.hudi.table.BulkInsertPartitioner import org.apache.log4j.LogManager -import org.apache.spark.{SPARK_VERSION, SparkContext} import org.apache.spark.api.java.JavaSparkContext import org.apache.spark.rdd.RDD -import org.apache.spark.sql.types.StructType -import org.apache.spark.sql.{DataFrame, Dataset,Row, SQLContext, SaveMode, SparkSession} import org.apache.spark.sql.internal.{SQLConf, StaticSQLConf} +import org.apache.spark.sql.types.StructType +import org.apache.spark.sql.{DataFrame, Dataset, Row, SQLContext, SaveMode, SparkSession} +import org.apache.spark.{SPARK_VERSION, SparkContext} import java.util import java.util.Properties @@ -238,22 +238,6 @@ object HoodieSparkSqlWriter { writeResult, parameters, writeClient, tableConfig, jsc, TableInstantInfo(basePath, instantTime, commitActionType, operation)) - def unpersistRdd(rdd: RDD[_]): Unit = { - if (sparkContext.getPersistentRDDs.contains(rdd.id)) { - try { - rdd.unpersist() - } catch { - case t: Exception => log.warn("Got excepting trying to unpersist rdd", t) - } - } - val parentRdds = rdd.dependencies.map(_.rdd) - parentRdds.foreach { parentRdd => - unpersistRdd(parentRdd) - } - } - // it's safe to unpersist cached rdds here - unpersistRdd(writeResult.getWriteStatuses.rdd) - (writeSuccessful, common.util.Option.ofNullable(instantTime), compactionInstant, clusteringInstant, writeClient, tableConfig) } }