diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java index c7f2c45ea..b7b410817 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java @@ -447,6 +447,12 @@ public class HoodieWriteConfig extends HoodieConfig { .sinceVersion("0.11.0") .withDocumentation("Master control to disable all table services including archive, clean, compact, cluster, etc."); + public static final ConfigProperty RELEASE_RESOURCE_ENABLE = ConfigProperty + .key("hoodie.release.resource.on.completion.enable") + .defaultValue(true) + .sinceVersion("0.11.0") + .withDocumentation("Control to enable release all persist rdds when the spark job finish."); + private ConsistencyGuardConfig consistencyGuardConfig; private FileSystemRetryConfig fileSystemRetryConfig; @@ -1716,7 +1722,7 @@ public class HoodieWriteConfig extends HoodieConfig { public String getMetricReporterMetricsNamePrefix() { return getStringOrDefault(HoodieMetricsConfig.METRICS_REPORTER_PREFIX); } - + /** * memory configs. */ @@ -1945,6 +1951,10 @@ public class HoodieWriteConfig extends HoodieConfig { return getBooleanOrDefault(TABLE_SERVICES_ENABLED); } + public boolean areReleaseResourceEnabled() { + return getBooleanOrDefault(RELEASE_RESOURCE_ENABLE); + } + /** * Layout configs. */ @@ -2315,6 +2325,11 @@ public class HoodieWriteConfig extends HoodieConfig { return this; } + public Builder withReleaseResourceEnabled(boolean enabled) { + writeConfig.setValue(RELEASE_RESOURCE_ENABLE, Boolean.toString(enabled)); + return this; + } + public Builder withProperties(Properties properties) { this.writeConfig.getProps().putAll(properties); return this; diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java index 2fb27fe79..d51d25616 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java @@ -531,7 +531,11 @@ public class SparkRDDWriteClient extends @Override protected void releaseResources() { - ((HoodieSparkEngineContext) context).getJavaSparkContext().getPersistentRDDs().values() - .forEach(rdd -> rdd.unpersist()); + // If we do not explicitly release the resource, spark will automatically manage the resource and clean it up automatically + // see: https://spark.apache.org/docs/latest/rdd-programming-guide.html#removing-data + if (config.areReleaseResourceEnabled()) { + ((HoodieSparkEngineContext) context).getJavaSparkContext().getPersistentRDDs().values() + .forEach(JavaRDD::unpersist); + } } } diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestHoodieMergeOnReadTable.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestHoodieMergeOnReadTable.java index 1b613949c..dcc41addc 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestHoodieMergeOnReadTable.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestHoodieMergeOnReadTable.java @@ -18,7 +18,6 @@ package org.apache.hudi.table; -import org.apache.hadoop.fs.Path; import org.apache.hudi.client.HoodieReadClient; import org.apache.hudi.client.SparkRDDWriteClient; import org.apache.hudi.client.WriteStatus; @@ -55,10 +54,12 @@ import org.apache.hudi.testutils.SparkClientFunctionalTestHarness; import org.apache.avro.generic.GenericRecord; import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.Path; import org.apache.hadoop.mapred.JobConf; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; +import org.apache.spark.storage.StorageLevel; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; @@ -566,9 +567,52 @@ public class TestHoodieMergeOnReadTable extends SparkClientFunctionalTestHarness WriteStatus status = deleteStatus.get(0).get(0); assertTrue(status.hasErrors()); long numRecordsInPartition = fewRecordsForDelete.stream().filter(u -> - u.getPartitionPath().equals(partitionPath)).count(); + u.getPartitionPath().equals(partitionPath)).count(); assertEquals(fewRecordsForDelete.size() - numRecordsInPartition, status.getTotalErrorRecords()); } } + + @Test + public void testReleaseResource() throws Exception { + HoodieWriteConfig.Builder builder = getConfigBuilder(true); + builder.withReleaseResourceEnabled(true); + builder.withAutoCommit(false); + /** + * Write 1 (test when RELEASE_RESOURCE_ENABLE is true) + */ + try (SparkRDDWriteClient client = getHoodieWriteClient(builder.build())) { + + String newCommitTime = "001"; + client.startCommitWithTime(newCommitTime); + + List records = dataGen.generateInserts(newCommitTime, 20); + JavaRDD writeRecords = jsc().parallelize(records, 1); + writeRecords.persist(StorageLevel.MEMORY_AND_DISK()); + List statuses = client.upsert(writeRecords, newCommitTime).collect(); + assertNoWriteErrors(statuses); + client.commitStats(newCommitTime, statuses.stream().map(WriteStatus::getStat).collect(Collectors.toList()), Option.empty(), metaClient.getCommitActionType()); + assertEquals(spark().sparkContext().persistentRdds().size(), 0); + } + + builder.withReleaseResourceEnabled(false); + + /** + * Write 2 (test when RELEASE_RESOURCE_ENABLE is false) + */ + try (SparkRDDWriteClient client = getHoodieWriteClient(builder.build())) { + String newCommitTime = "002"; + client.startCommitWithTime(newCommitTime); + + List records = dataGen.generateInserts(newCommitTime, 20); + JavaRDD writeRecords = jsc().parallelize(records, 1); + + writeRecords.persist(StorageLevel.MEMORY_AND_DISK()); + List statuses = client.upsert(writeRecords, newCommitTime).collect(); + assertNoWriteErrors(statuses); + client.commitStats(newCommitTime, statuses.stream().map(WriteStatus::getStat).collect(Collectors.toList()), Option.empty(), metaClient.getCommitActionType()); + assertTrue(spark().sparkContext().persistentRdds().size() > 0); + } + + } }