1
0

[MINOR] Some minor optimizations in HoodieJavaStreamingApp (#1046)

This commit is contained in:
filippo balicchia
2019-11-25 11:49:13 +01:00
committed by leesf
parent c3355109b1
commit 845a0509b3

View File

@@ -19,7 +19,6 @@
import com.beust.jcommander.JCommander; import com.beust.jcommander.JCommander;
import com.beust.jcommander.Parameter; import com.beust.jcommander.Parameter;
import java.util.List; import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
import java.util.concurrent.Future; import java.util.concurrent.Future;
@@ -141,23 +140,19 @@ public class HoodieJavaStreamingApp {
ExecutorService executor = Executors.newFixedThreadPool(2); ExecutorService executor = Executors.newFixedThreadPool(2);
// thread for spark strucutured streaming // thread for spark strucutured streaming
Future<Void> streamFuture = executor.submit(new Callable<Void>() { Future<Void> streamFuture = executor.submit(() -> {
public Void call() throws Exception {
logger.info("===== Streaming Starting ====="); logger.info("===== Streaming Starting =====");
stream(streamingInput); stream(streamingInput);
logger.info("===== Streaming Ends ====="); logger.info("===== Streaming Ends =====");
return null; return null;
}
}); });
// thread for adding data to the streaming source and showing results over time // thread for adding data to the streaming source and showing results over time
Future<Void> showFuture = executor.submit(new Callable<Void>() { Future<Void> showFuture = executor.submit(() -> {
public Void call() throws Exception {
logger.info("===== Showing Starting ====="); logger.info("===== Showing Starting =====");
show(spark, fs, inputDF1, inputDF2); show(spark, fs, inputDF1, inputDF2);
logger.info("===== Showing Ends ====="); logger.info("===== Showing Ends =====");
return null; return null;
}
}); });
// let the threads run // let the threads run
@@ -187,7 +182,7 @@ public class HoodieJavaStreamingApp {
// wait for spark streaming to process one microbatch // wait for spark streaming to process one microbatch
Thread.sleep(3000); Thread.sleep(3000);
String commitInstantTime2 = HoodieDataSourceHelpers.latestCommit(fs, tablePath); String commitInstantTime2 = HoodieDataSourceHelpers.latestCommit(fs, tablePath);
logger.info("Second commit at instant time :" + commitInstantTime1); logger.info("Second commit at instant time :" + commitInstantTime2);
/** /**
* Read & do some queries * Read & do some queries