From e99a6b031bf4f2e3037d4cb5307d443cda2d2002 Mon Sep 17 00:00:00 2001 From: zhangyue19921010 <69956021+zhangyue19921010@users.noreply.github.com> Date: Sun, 27 Jun 2021 13:03:41 +0800 Subject: [PATCH] [HUDI-2073] Fix the bug of hoodieClusteringJob never quit (#3157) Co-authored-by: yuezhang --- .../hudi/utilities/HoodieClusteringJob.java | 22 +++++++++---------- 1 file changed, 11 insertions(+), 11 deletions(-) diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieClusteringJob.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieClusteringJob.java index 65cb4048a..a4dc74127 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieClusteringJob.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieClusteringJob.java @@ -149,11 +149,11 @@ public class HoodieClusteringJob { private int doCluster(JavaSparkContext jsc) throws Exception { String schemaStr = getSchemaFromLatestInstant(); - SparkRDDWriteClient client = - UtilHelpers.createHoodieClient(jsc, cfg.basePath, schemaStr, cfg.parallelism, Option.empty(), props); - JavaRDD writeResponse = - client.cluster(cfg.clusteringInstantTime, true).getWriteStatuses(); - return UtilHelpers.handleErrors(jsc, cfg.clusteringInstantTime, writeResponse); + try (SparkRDDWriteClient client = UtilHelpers.createHoodieClient(jsc, cfg.basePath, schemaStr, cfg.parallelism, Option.empty(), props)) { + JavaRDD writeResponse = + client.cluster(cfg.clusteringInstantTime, true).getWriteStatuses(); + return UtilHelpers.handleErrors(jsc, cfg.clusteringInstantTime, writeResponse); + } } @TestOnly @@ -163,12 +163,12 @@ public class HoodieClusteringJob { private Option doSchedule(JavaSparkContext jsc) throws Exception { String schemaStr = getSchemaFromLatestInstant(); - SparkRDDWriteClient client = - UtilHelpers.createHoodieClient(jsc, cfg.basePath, schemaStr, cfg.parallelism, Option.empty(), props); - if (cfg.clusteringInstantTime != null) { - client.scheduleClusteringAtInstant(cfg.clusteringInstantTime, Option.empty()); - return Option.of(cfg.clusteringInstantTime); + try (SparkRDDWriteClient client = UtilHelpers.createHoodieClient(jsc, cfg.basePath, schemaStr, cfg.parallelism, Option.empty(), props)) { + if (cfg.clusteringInstantTime != null) { + client.scheduleClusteringAtInstant(cfg.clusteringInstantTime, Option.empty()); + return Option.of(cfg.clusteringInstantTime); + } + return client.scheduleClustering(Option.empty()); } - return client.scheduleClustering(Option.empty()); } }