1
0

[HUDI-236] Failed to close stream

This commit is contained in:
leesf
2019-09-03 15:07:23 +08:00
committed by vinoth chandar
parent 555dd55c16
commit 821e0dcffc
2 changed files with 19 additions and 18 deletions

View File

@@ -23,6 +23,7 @@ import com.beust.jcommander.Parameter;
import com.google.common.base.Preconditions; import com.google.common.base.Preconditions;
import io.javalin.Javalin; import io.javalin.Javalin;
import java.io.BufferedReader; import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader; import java.io.InputStreamReader;
import java.io.Serializable; import java.io.Serializable;
import java.net.InetAddress; import java.net.InetAddress;
@@ -30,20 +31,15 @@ import java.net.UnknownHostException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.stream.IntStream; import java.util.stream.IntStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.http.HttpResponse; import org.apache.http.HttpResponse;
import org.apache.http.client.methods.HttpGet; import org.apache.http.client.methods.HttpGet;
import org.apache.http.impl.client.CloseableHttpClient; import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClientBuilder; import org.apache.http.impl.client.HttpClientBuilder;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.JavaSparkContext;
public class HoodieWithTimelineServer implements Serializable { public class HoodieWithTimelineServer implements Serializable {
private static volatile Logger logger = LogManager.getLogger(HoodieCompactor.class);
private final Config cfg; private final Config cfg;
private transient FileSystem fs;
private transient Javalin app = null; private transient Javalin app = null;
@@ -99,29 +95,29 @@ public class HoodieWithTimelineServer implements Serializable {
public String sendRequest(String driverHost, int port) throws RuntimeException { public String sendRequest(String driverHost, int port) throws RuntimeException {
String url = String.format("http://%s:%d/", driverHost, port); String url = String.format("http://%s:%d/", driverHost, port);
try { try (CloseableHttpClient client = HttpClientBuilder.create().build()) {
System.out.println("Sleeping for " + cfg.delaySecs + " secs "); System.out.println("Sleeping for " + cfg.delaySecs + " secs ");
Thread.sleep(cfg.delaySecs * 1000); Thread.sleep(cfg.delaySecs * 1000);
System.out.println("Woke up after sleeping for " + cfg.delaySecs + " secs "); System.out.println("Woke up after sleeping for " + cfg.delaySecs + " secs ");
CloseableHttpClient client = HttpClientBuilder.create().build();
HttpGet request = new HttpGet(url); HttpGet request = new HttpGet(url);
HttpResponse response = client.execute(request); HttpResponse response = client.execute(request);
System.out.println("Response Code from(" + url + ") : " + response.getStatusLine().getStatusCode()); System.out.println("Response Code from(" + url + ") : " + response.getStatusLine().getStatusCode());
BufferedReader rd = new BufferedReader( try (BufferedReader rd = new BufferedReader(new InputStreamReader(response.getEntity().getContent()))) {
new InputStreamReader(response.getEntity().getContent())); StringBuffer result = new StringBuffer();
String line;
StringBuffer result = new StringBuffer(); while ((line = rd.readLine()) != null) {
String line = ""; result.append(line);
while ((line = rd.readLine()) != null) { }
result.append(line); System.out.println("Got result (" + result + ")");
return result.toString();
} catch (IOException e) {
throw new RuntimeException(e);
} }
System.out.println("Got result (" + result + ")");
return result.toString();
} catch (Exception ex) { } catch (Exception ex) {
throw new RuntimeException(ex); throw new RuntimeException(ex);
} }

View File

@@ -54,8 +54,13 @@ public class UpgradePayloadFromUberToApache implements Serializable {
} }
public void run() throws IOException { public void run() throws IOException {
BufferedReader reader = new BufferedReader(new FileReader(cfg.inputPath)); String basePath = null;
String basePath = reader.readLine(); try (BufferedReader reader = new BufferedReader(new FileReader(cfg.inputPath))) {
basePath = reader.readLine();
} catch (IOException e) {
logger.error("Read from path: " + cfg.inputPath + " error.", e);
}
while (basePath != null) { while (basePath != null) {
basePath = basePath.trim(); basePath = basePath.trim();
if (!basePath.startsWith("#")) { if (!basePath.startsWith("#")) {