1
0

[HUDI-3515] Making rdd unpersist optional at the end of writes (#4898)

Co-authored-by: 苏承祥 <sucx@tuya.com>
This commit is contained in:
苏承祥
2022-02-26 00:30:10 +08:00
committed by GitHub
parent b50f4b491c
commit 92cdc5987a
3 changed files with 68 additions and 5 deletions

View File

@@ -447,6 +447,12 @@ public class HoodieWriteConfig extends HoodieConfig {
.sinceVersion("0.11.0")
.withDocumentation("Master control to disable all table services including archive, clean, compact, cluster, etc.");
public static final ConfigProperty<Boolean> RELEASE_RESOURCE_ENABLE = ConfigProperty
.key("hoodie.release.resource.on.completion.enable")
.defaultValue(true)
.sinceVersion("0.11.0")
.withDocumentation("Control to enable release all persist rdds when the spark job finish.");
private ConsistencyGuardConfig consistencyGuardConfig;
private FileSystemRetryConfig fileSystemRetryConfig;
@@ -1716,7 +1722,7 @@ public class HoodieWriteConfig extends HoodieConfig {
public String getMetricReporterMetricsNamePrefix() {
return getStringOrDefault(HoodieMetricsConfig.METRICS_REPORTER_PREFIX);
}
/**
* memory configs.
*/
@@ -1945,6 +1951,10 @@ public class HoodieWriteConfig extends HoodieConfig {
return getBooleanOrDefault(TABLE_SERVICES_ENABLED);
}
public boolean areReleaseResourceEnabled() {
return getBooleanOrDefault(RELEASE_RESOURCE_ENABLE);
}
/**
* Layout configs.
*/
@@ -2315,6 +2325,11 @@ public class HoodieWriteConfig extends HoodieConfig {
return this;
}
public Builder withReleaseResourceEnabled(boolean enabled) {
writeConfig.setValue(RELEASE_RESOURCE_ENABLE, Boolean.toString(enabled));
return this;
}
public Builder withProperties(Properties properties) {
this.writeConfig.getProps().putAll(properties);
return this;

View File

@@ -531,7 +531,11 @@ public class SparkRDDWriteClient<T extends HoodieRecordPayload> extends
@Override
protected void releaseResources() {
((HoodieSparkEngineContext) context).getJavaSparkContext().getPersistentRDDs().values()
.forEach(rdd -> rdd.unpersist());
// If we do not explicitly release the resource, spark will automatically manage the resource and clean it up automatically
// see: https://spark.apache.org/docs/latest/rdd-programming-guide.html#removing-data
if (config.areReleaseResourceEnabled()) {
((HoodieSparkEngineContext) context).getJavaSparkContext().getPersistentRDDs().values()
.forEach(JavaRDD::unpersist);
}
}
}

View File

@@ -18,7 +18,6 @@
package org.apache.hudi.table;
import org.apache.hadoop.fs.Path;
import org.apache.hudi.client.HoodieReadClient;
import org.apache.hudi.client.SparkRDDWriteClient;
import org.apache.hudi.client.WriteStatus;
@@ -55,10 +54,12 @@ import org.apache.hudi.testutils.SparkClientFunctionalTestHarness;
import org.apache.avro.generic.GenericRecord;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapred.JobConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.storage.StorageLevel;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
@@ -566,9 +567,52 @@ public class TestHoodieMergeOnReadTable extends SparkClientFunctionalTestHarness
WriteStatus status = deleteStatus.get(0).get(0);
assertTrue(status.hasErrors());
long numRecordsInPartition = fewRecordsForDelete.stream().filter(u ->
u.getPartitionPath().equals(partitionPath)).count();
u.getPartitionPath().equals(partitionPath)).count();
assertEquals(fewRecordsForDelete.size() - numRecordsInPartition, status.getTotalErrorRecords());
}
}
@Test
public void testReleaseResource() throws Exception {
HoodieWriteConfig.Builder builder = getConfigBuilder(true);
builder.withReleaseResourceEnabled(true);
builder.withAutoCommit(false);
/**
* Write 1 (test when RELEASE_RESOURCE_ENABLE is true)
*/
try (SparkRDDWriteClient client = getHoodieWriteClient(builder.build())) {
String newCommitTime = "001";
client.startCommitWithTime(newCommitTime);
List<HoodieRecord> records = dataGen.generateInserts(newCommitTime, 20);
JavaRDD<HoodieRecord> writeRecords = jsc().parallelize(records, 1);
writeRecords.persist(StorageLevel.MEMORY_AND_DISK());
List<WriteStatus> statuses = client.upsert(writeRecords, newCommitTime).collect();
assertNoWriteErrors(statuses);
client.commitStats(newCommitTime, statuses.stream().map(WriteStatus::getStat).collect(Collectors.toList()), Option.empty(), metaClient.getCommitActionType());
assertEquals(spark().sparkContext().persistentRdds().size(), 0);
}
builder.withReleaseResourceEnabled(false);
/**
* Write 2 (test when RELEASE_RESOURCE_ENABLE is false)
*/
try (SparkRDDWriteClient client = getHoodieWriteClient(builder.build())) {
String newCommitTime = "002";
client.startCommitWithTime(newCommitTime);
List<HoodieRecord> records = dataGen.generateInserts(newCommitTime, 20);
JavaRDD<HoodieRecord> writeRecords = jsc().parallelize(records, 1);
writeRecords.persist(StorageLevel.MEMORY_AND_DISK());
List<WriteStatus> statuses = client.upsert(writeRecords, newCommitTime).collect();
assertNoWriteErrors(statuses);
client.commitStats(newCommitTime, statuses.stream().map(WriteStatus::getStat).collect(Collectors.toList()), Option.empty(), metaClient.getCommitActionType());
assertTrue(spark().sparkContext().persistentRdds().size() > 0);
}
}
}