[HUDI-2117] Unpersist the input rdd after the commit is completed to … (#3207)
Co-authored-by: Vinoth Chandar <vinoth@apache.org>
This commit is contained in:
@@ -18,9 +18,6 @@
|
|||||||
|
|
||||||
package org.apache.hudi.client;
|
package org.apache.hudi.client;
|
||||||
|
|
||||||
import com.codahale.metrics.Timer;
|
|
||||||
import java.util.stream.Stream;
|
|
||||||
import org.apache.hadoop.conf.Configuration;
|
|
||||||
import org.apache.hudi.avro.model.HoodieCleanMetadata;
|
import org.apache.hudi.avro.model.HoodieCleanMetadata;
|
||||||
import org.apache.hudi.avro.model.HoodieCleanerPlan;
|
import org.apache.hudi.avro.model.HoodieCleanerPlan;
|
||||||
import org.apache.hudi.avro.model.HoodieClusteringPlan;
|
import org.apache.hudi.avro.model.HoodieClusteringPlan;
|
||||||
@@ -70,6 +67,9 @@ import org.apache.hudi.table.HoodieTimelineArchiveLog;
|
|||||||
import org.apache.hudi.table.MarkerFiles;
|
import org.apache.hudi.table.MarkerFiles;
|
||||||
import org.apache.hudi.table.action.HoodieWriteMetadata;
|
import org.apache.hudi.table.action.HoodieWriteMetadata;
|
||||||
import org.apache.hudi.table.action.savepoint.SavepointHelpers;
|
import org.apache.hudi.table.action.savepoint.SavepointHelpers;
|
||||||
|
|
||||||
|
import com.codahale.metrics.Timer;
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.log4j.LogManager;
|
import org.apache.log4j.LogManager;
|
||||||
import org.apache.log4j.Logger;
|
import org.apache.log4j.Logger;
|
||||||
|
|
||||||
@@ -81,6 +81,7 @@ import java.util.Collections;
|
|||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.stream.Collectors;
|
import java.util.stream.Collectors;
|
||||||
|
import java.util.stream.Stream;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Abstract Write Client providing functionality for performing commit, index updates and rollback
|
* Abstract Write Client providing functionality for performing commit, index updates and rollback
|
||||||
@@ -189,6 +190,7 @@ public abstract class AbstractHoodieWriteClient<T extends HoodieRecordPayload, I
|
|||||||
commit(table, commitActionType, instantTime, metadata, stats);
|
commit(table, commitActionType, instantTime, metadata, stats);
|
||||||
postCommit(table, metadata, instantTime, extraMetadata);
|
postCommit(table, metadata, instantTime, extraMetadata);
|
||||||
LOG.info("Committed " + instantTime);
|
LOG.info("Committed " + instantTime);
|
||||||
|
releaseResources();
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
throw new HoodieCommitException("Failed to complete commit " + config.getBasePath() + " at time " + instantTime, e);
|
throw new HoodieCommitException("Failed to complete commit " + config.getBasePath() + " at time " + instantTime, e);
|
||||||
} finally {
|
} finally {
|
||||||
@@ -1041,6 +1043,13 @@ public abstract class AbstractHoodieWriteClient<T extends HoodieRecordPayload, I
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Called after each write, to release any resources used.
|
||||||
|
*/
|
||||||
|
protected void releaseResources() {
|
||||||
|
// do nothing here
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void close() {
|
public void close() {
|
||||||
// release AsyncCleanerService
|
// release AsyncCleanerService
|
||||||
|
|||||||
@@ -18,8 +18,6 @@
|
|||||||
|
|
||||||
package org.apache.hudi.client;
|
package org.apache.hudi.client;
|
||||||
|
|
||||||
import com.codahale.metrics.Timer;
|
|
||||||
import org.apache.hadoop.conf.Configuration;
|
|
||||||
import org.apache.hudi.client.common.HoodieSparkEngineContext;
|
import org.apache.hudi.client.common.HoodieSparkEngineContext;
|
||||||
import org.apache.hudi.client.embedded.EmbeddedTimelineService;
|
import org.apache.hudi.client.embedded.EmbeddedTimelineService;
|
||||||
import org.apache.hudi.client.utils.TransactionUtils;
|
import org.apache.hudi.client.utils.TransactionUtils;
|
||||||
@@ -48,8 +46,8 @@ import org.apache.hudi.exception.HoodieMetadataException;
|
|||||||
import org.apache.hudi.index.HoodieIndex;
|
import org.apache.hudi.index.HoodieIndex;
|
||||||
import org.apache.hudi.index.SparkHoodieIndex;
|
import org.apache.hudi.index.SparkHoodieIndex;
|
||||||
import org.apache.hudi.metadata.HoodieTableMetadataWriter;
|
import org.apache.hudi.metadata.HoodieTableMetadataWriter;
|
||||||
import org.apache.hudi.metrics.DistributedRegistry;
|
|
||||||
import org.apache.hudi.metadata.SparkHoodieBackedTableMetadataWriter;
|
import org.apache.hudi.metadata.SparkHoodieBackedTableMetadataWriter;
|
||||||
|
import org.apache.hudi.metrics.DistributedRegistry;
|
||||||
import org.apache.hudi.table.BulkInsertPartitioner;
|
import org.apache.hudi.table.BulkInsertPartitioner;
|
||||||
import org.apache.hudi.table.HoodieSparkTable;
|
import org.apache.hudi.table.HoodieSparkTable;
|
||||||
import org.apache.hudi.table.HoodieTable;
|
import org.apache.hudi.table.HoodieTable;
|
||||||
@@ -58,6 +56,9 @@ import org.apache.hudi.table.action.HoodieWriteMetadata;
|
|||||||
import org.apache.hudi.table.action.compact.SparkCompactHelpers;
|
import org.apache.hudi.table.action.compact.SparkCompactHelpers;
|
||||||
import org.apache.hudi.table.upgrade.AbstractUpgradeDowngrade;
|
import org.apache.hudi.table.upgrade.AbstractUpgradeDowngrade;
|
||||||
import org.apache.hudi.table.upgrade.SparkUpgradeDowngrade;
|
import org.apache.hudi.table.upgrade.SparkUpgradeDowngrade;
|
||||||
|
|
||||||
|
import com.codahale.metrics.Timer;
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.log4j.LogManager;
|
import org.apache.log4j.LogManager;
|
||||||
import org.apache.log4j.Logger;
|
import org.apache.log4j.Logger;
|
||||||
import org.apache.spark.SparkConf;
|
import org.apache.spark.SparkConf;
|
||||||
@@ -482,4 +483,10 @@ public class SparkRDDWriteClient<T extends HoodieRecordPayload> extends
|
|||||||
HoodieWrapperFileSystem.setMetricsRegistry(registry, registryMeta);
|
HoodieWrapperFileSystem.setMetricsRegistry(registry, registryMeta);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected void releaseResources() {
|
||||||
|
((HoodieSparkEngineContext) context).getJavaSparkContext().getPersistentRDDs().values()
|
||||||
|
.forEach(rdd -> rdd.unpersist());
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -695,8 +695,8 @@ public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness {
|
|||||||
thirdClient.startCommitWithTime(newCommitTime);
|
thirdClient.startCommitWithTime(newCommitTime);
|
||||||
|
|
||||||
writeStatusJavaRDD = thirdClient.upsert(writeRecords, newCommitTime);
|
writeStatusJavaRDD = thirdClient.upsert(writeRecords, newCommitTime);
|
||||||
thirdClient.commit(newCommitTime, writeStatusJavaRDD);
|
|
||||||
statuses = writeStatusJavaRDD.collect();
|
statuses = writeStatusJavaRDD.collect();
|
||||||
|
thirdClient.commit(newCommitTime, writeStatusJavaRDD);
|
||||||
// Verify there are no errors
|
// Verify there are no errors
|
||||||
assertNoWriteErrors(statuses);
|
assertNoWriteErrors(statuses);
|
||||||
|
|
||||||
@@ -1199,8 +1199,6 @@ public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness {
|
|||||||
JavaRDD<HoodieRecord> recordsRDD = jsc.parallelize(records, 1);
|
JavaRDD<HoodieRecord> recordsRDD = jsc.parallelize(records, 1);
|
||||||
JavaRDD<WriteStatus> statuses = writeClient.insert(recordsRDD, newCommitTime);
|
JavaRDD<WriteStatus> statuses = writeClient.insert(recordsRDD, newCommitTime);
|
||||||
writeClient.commit(newCommitTime, statuses);
|
writeClient.commit(newCommitTime, statuses);
|
||||||
// trigger an action
|
|
||||||
statuses.collect();
|
|
||||||
|
|
||||||
HoodieTable table = HoodieSparkTable.create(config, context, getHoodieMetaClient(hadoopConf, basePath));
|
HoodieTable table = HoodieSparkTable.create(config, context, getHoodieMetaClient(hadoopConf, basePath));
|
||||||
SliceView tableRTFileSystemView = table.getSliceView();
|
SliceView tableRTFileSystemView = table.getSliceView();
|
||||||
|
|||||||
@@ -40,12 +40,12 @@ import org.apache.hudi.keygen.factory.HoodieSparkKeyGeneratorFactory
|
|||||||
import org.apache.hudi.sync.common.AbstractSyncTool
|
import org.apache.hudi.sync.common.AbstractSyncTool
|
||||||
import org.apache.hudi.table.BulkInsertPartitioner
|
import org.apache.hudi.table.BulkInsertPartitioner
|
||||||
import org.apache.log4j.LogManager
|
import org.apache.log4j.LogManager
|
||||||
import org.apache.spark.{SPARK_VERSION, SparkContext}
|
|
||||||
import org.apache.spark.api.java.JavaSparkContext
|
import org.apache.spark.api.java.JavaSparkContext
|
||||||
import org.apache.spark.rdd.RDD
|
import org.apache.spark.rdd.RDD
|
||||||
import org.apache.spark.sql.types.StructType
|
|
||||||
import org.apache.spark.sql.{DataFrame, Dataset,Row, SQLContext, SaveMode, SparkSession}
|
|
||||||
import org.apache.spark.sql.internal.{SQLConf, StaticSQLConf}
|
import org.apache.spark.sql.internal.{SQLConf, StaticSQLConf}
|
||||||
|
import org.apache.spark.sql.types.StructType
|
||||||
|
import org.apache.spark.sql.{DataFrame, Dataset, Row, SQLContext, SaveMode, SparkSession}
|
||||||
|
import org.apache.spark.{SPARK_VERSION, SparkContext}
|
||||||
|
|
||||||
import java.util
|
import java.util
|
||||||
import java.util.Properties
|
import java.util.Properties
|
||||||
@@ -238,22 +238,6 @@ object HoodieSparkSqlWriter {
|
|||||||
writeResult, parameters, writeClient, tableConfig, jsc,
|
writeResult, parameters, writeClient, tableConfig, jsc,
|
||||||
TableInstantInfo(basePath, instantTime, commitActionType, operation))
|
TableInstantInfo(basePath, instantTime, commitActionType, operation))
|
||||||
|
|
||||||
def unpersistRdd(rdd: RDD[_]): Unit = {
|
|
||||||
if (sparkContext.getPersistentRDDs.contains(rdd.id)) {
|
|
||||||
try {
|
|
||||||
rdd.unpersist()
|
|
||||||
} catch {
|
|
||||||
case t: Exception => log.warn("Got excepting trying to unpersist rdd", t)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
val parentRdds = rdd.dependencies.map(_.rdd)
|
|
||||||
parentRdds.foreach { parentRdd =>
|
|
||||||
unpersistRdd(parentRdd)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
// it's safe to unpersist cached rdds here
|
|
||||||
unpersistRdd(writeResult.getWriteStatuses.rdd)
|
|
||||||
|
|
||||||
(writeSuccessful, common.util.Option.ofNullable(instantTime), compactionInstant, clusteringInstant, writeClient, tableConfig)
|
(writeSuccessful, common.util.Option.ofNullable(instantTime), compactionInstant, clusteringInstant, writeClient, tableConfig)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user