From 821e0dcffcd7d0ca5b0140ebe585639fc0c32e22 Mon Sep 17 00:00:00 2001 From: leesf <490081539@qq.com> Date: Tue, 3 Sep 2019 15:07:23 +0800 Subject: [PATCH] [HUDI-236] Failed to close stream --- .../utilities/HoodieWithTimelineServer.java | 28 ++++++++----------- .../adhoc/UpgradePayloadFromUberToApache.java | 9 ++++-- 2 files changed, 19 insertions(+), 18 deletions(-) diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieWithTimelineServer.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieWithTimelineServer.java index dfab9b3c1..f06f3477a 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieWithTimelineServer.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieWithTimelineServer.java @@ -23,6 +23,7 @@ import com.beust.jcommander.Parameter; import com.google.common.base.Preconditions; import io.javalin.Javalin; import java.io.BufferedReader; +import java.io.IOException; import java.io.InputStreamReader; import java.io.Serializable; import java.net.InetAddress; @@ -30,20 +31,15 @@ import java.net.UnknownHostException; import java.util.ArrayList; import java.util.List; import java.util.stream.IntStream; -import org.apache.hadoop.fs.FileSystem; import org.apache.http.HttpResponse; import org.apache.http.client.methods.HttpGet; import org.apache.http.impl.client.CloseableHttpClient; import org.apache.http.impl.client.HttpClientBuilder; -import org.apache.log4j.LogManager; -import org.apache.log4j.Logger; import org.apache.spark.api.java.JavaSparkContext; public class HoodieWithTimelineServer implements Serializable { - private static volatile Logger logger = LogManager.getLogger(HoodieCompactor.class); private final Config cfg; - private transient FileSystem fs; private transient Javalin app = null; @@ -99,29 +95,29 @@ public class HoodieWithTimelineServer implements Serializable { public String sendRequest(String driverHost, int port) throws RuntimeException { String url = String.format("http://%s:%d/", driverHost, port); - try { + try (CloseableHttpClient client = HttpClientBuilder.create().build()) { System.out.println("Sleeping for " + cfg.delaySecs + " secs "); Thread.sleep(cfg.delaySecs * 1000); System.out.println("Woke up after sleeping for " + cfg.delaySecs + " secs "); - CloseableHttpClient client = HttpClientBuilder.create().build(); HttpGet request = new HttpGet(url); HttpResponse response = client.execute(request); System.out.println("Response Code from(" + url + ") : " + response.getStatusLine().getStatusCode()); - BufferedReader rd = new BufferedReader( - new InputStreamReader(response.getEntity().getContent())); - - StringBuffer result = new StringBuffer(); - String line = ""; - while ((line = rd.readLine()) != null) { - result.append(line); + try (BufferedReader rd = new BufferedReader(new InputStreamReader(response.getEntity().getContent()))) { + StringBuffer result = new StringBuffer(); + String line; + while ((line = rd.readLine()) != null) { + result.append(line); + } + System.out.println("Got result (" + result + ")"); + return result.toString(); + } catch (IOException e) { + throw new RuntimeException(e); } - System.out.println("Got result (" + result + ")"); - return result.toString(); } catch (Exception ex) { throw new RuntimeException(ex); } diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/adhoc/UpgradePayloadFromUberToApache.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/adhoc/UpgradePayloadFromUberToApache.java index 7386af880..0bc6ceb31 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/adhoc/UpgradePayloadFromUberToApache.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/adhoc/UpgradePayloadFromUberToApache.java @@ -54,8 +54,13 @@ public class UpgradePayloadFromUberToApache implements Serializable { } public void run() throws IOException { - BufferedReader reader = new BufferedReader(new FileReader(cfg.inputPath)); - String basePath = reader.readLine(); + String basePath = null; + try (BufferedReader reader = new BufferedReader(new FileReader(cfg.inputPath))) { + basePath = reader.readLine(); + } catch (IOException e) { + logger.error("Read from path: " + cfg.inputPath + " error.", e); + } + while (basePath != null) { basePath = basePath.trim(); if (!basePath.startsWith("#")) {