diff --git a/docker/demo/presto-batch1.commands b/docker/demo/presto-batch1.commands new file mode 100644 index 000000000..8b0c960cd --- /dev/null +++ b/docker/demo/presto-batch1.commands @@ -0,0 +1,4 @@ +select symbol, max(ts) from stock_ticks_cow group by symbol HAVING symbol = 'GOOG'; +select symbol, max(ts) from stock_ticks_mor group by symbol HAVING symbol = 'GOOG'; +select symbol, ts, volume, open, close from stock_ticks_cow where symbol = 'GOOG'; +select symbol, ts, volume, open, close from stock_ticks_mor where symbol = 'GOOG'; diff --git a/docker/demo/presto-batch2-after-compaction.commands b/docker/demo/presto-batch2-after-compaction.commands new file mode 100644 index 000000000..698d8c0e4 --- /dev/null +++ b/docker/demo/presto-batch2-after-compaction.commands @@ -0,0 +1,2 @@ +select symbol, max(ts) from stock_ticks_mor group by symbol HAVING symbol = 'GOOG'; +select symbol, ts, volume, open, close from stock_ticks_mor where symbol = 'GOOG'; diff --git a/docker/demo/presto-table-check.commands b/docker/demo/presto-table-check.commands new file mode 100644 index 000000000..26abcfc3a --- /dev/null +++ b/docker/demo/presto-table-check.commands @@ -0,0 +1 @@ +show tables; diff --git a/hudi-integ-test/src/test/java/org/apache/hudi/integ/ITTestBase.java b/hudi-integ-test/src/test/java/org/apache/hudi/integ/ITTestBase.java index 1c97cec1e..647a390cd 100644 --- a/hudi-integ-test/src/test/java/org/apache/hudi/integ/ITTestBase.java +++ b/hudi-integ-test/src/test/java/org/apache/hudi/integ/ITTestBase.java @@ -52,6 +52,7 @@ public abstract class ITTestBase { protected static final String ADHOC_1_CONTAINER = "/adhoc-1"; protected static final String ADHOC_2_CONTAINER = "/adhoc-2"; protected static final String HIVESERVER = "/hiveserver"; + protected static final String PRESTO_COORDINATOR = "/presto-coordinator-1"; protected static final String HOODIE_WS_ROOT = "/var/hoodie/ws"; protected static final String HOODIE_JAVA_APP = HOODIE_WS_ROOT + "/hudi-spark/run_hoodie_app.sh"; protected static final String HUDI_HADOOP_BUNDLE = @@ -63,6 +64,7 @@ public abstract class ITTestBase { protected static final String HUDI_UTILITIES_BUNDLE = HOODIE_WS_ROOT + "/docker/hoodie/hadoop/hive_base/target/hoodie-utilities.jar"; protected static final String HIVE_SERVER_JDBC_URL = "jdbc:hive2://hiveserver:10000"; + protected static final String PRESTO_COORDINATOR_URL = "presto-coordinator-1:8090"; protected static final String HADOOP_CONF_DIR = "/etc/hadoop"; // Skip these lines when capturing output from hive @@ -106,6 +108,14 @@ public abstract class ITTestBase { .append(" --packages com.databricks:spark-avro_2.11:4.0.0 ").append(" -i ").append(commandFile).toString(); } + static String getPrestoConsoleCommand(String commandFile) { + StringBuilder builder = new StringBuilder().append("presto --server " + PRESTO_COORDINATOR_URL) + .append(" --catalog hive --schema default") + .append(" -f " + commandFile ); + System.out.println("Presto comamnd " + builder.toString()); + return builder.toString(); + } + @Before public void init() { String dockerHost = (OVERRIDDEN_DOCKER_HOST != null) ? OVERRIDDEN_DOCKER_HOST : DEFAULT_DOCKER_HOST; @@ -207,6 +217,20 @@ public abstract class ITTestBase { return Pair.of(callback.getStdout().toString(), callback.getStderr().toString()); } + Pair executePrestoCommandFile(String commandFile) throws Exception { + String prestoCmd = getPrestoConsoleCommand(commandFile); + TestExecStartResultCallback callback = executeCommandStringInDocker(PRESTO_COORDINATOR, prestoCmd, true); + return Pair.of(callback.getStdout().toString().trim(), callback.getStderr().toString().trim()); + } + + void executePrestoCopyCommand(String fromFile, String remotePath){ + Container sparkWorkerContainer = runningContainers.get(PRESTO_COORDINATOR); + dockerClient.copyArchiveToContainerCmd(sparkWorkerContainer.getId()) + .withHostResource(fromFile) + .withRemotePath(remotePath) + .exec(); + } + private void saveUpLogs() { try { // save up the Hive log files for introspection diff --git a/hudi-integ-test/src/test/java/org/apache/hudi/integ/ITTestHoodieDemo.java b/hudi-integ-test/src/test/java/org/apache/hudi/integ/ITTestHoodieDemo.java index e1159be7a..e92ef7b85 100644 --- a/hudi-integ-test/src/test/java/org/apache/hudi/integ/ITTestHoodieDemo.java +++ b/hudi-integ-test/src/test/java/org/apache/hudi/integ/ITTestHoodieDemo.java @@ -34,9 +34,15 @@ public class ITTestHoodieDemo extends ITTestBase { private static String HDFS_DATA_DIR = "/usr/hive/data/input"; private static String HDFS_BATCH_PATH1 = HDFS_DATA_DIR + "/" + "batch_1.json"; private static String HDFS_BATCH_PATH2 = HDFS_DATA_DIR + "/" + "batch_2.json"; + private static String HDFS_PRESTO_INPUT_TABLE_CHECK_PATH = HDFS_DATA_DIR + "/" + "presto-table-check.commands"; + private static String HDFS_PRESTO_INPUT_BATCH1_PATH = HDFS_DATA_DIR + "/" + "presto-batch1.commands"; + private static String HDFS_PRESTO_INPUT_BATCH2_PATH = HDFS_DATA_DIR + "/" + "presto-batch2-after-compaction.commands"; private static String INPUT_BATCH_PATH1 = HOODIE_WS_ROOT + "/docker/demo/data/batch_1.json"; + private static String PRESTO_INPUT_TABLE_CHECK_RELATIVE_PATH = "/docker/demo/presto-table-check.commands"; + private static String PRESTO_INPUT_BATCH1_RELATIVE_PATH = "/docker/demo/presto-batch1.commands"; private static String INPUT_BATCH_PATH2 = HOODIE_WS_ROOT + "/docker/demo/data/batch_2.json"; + private static String PRESTO_INPUT_BATCH2_RELATIVE_PATH = "/docker/demo/presto-batch2-after-compaction.commands"; private static String COW_BASE_PATH = "/user/hive/warehouse/stock_ticks_cow"; private static String MOR_BASE_PATH = "/user/hive/warehouse/stock_ticks_mor"; @@ -55,7 +61,6 @@ public class ITTestHoodieDemo extends ITTestBase { private static String HIVE_BATCH2_COMMANDS = HOODIE_WS_ROOT + "/docker/demo/hive-batch2-after-compaction.commands"; private static String HIVE_INCREMENTAL_COMMANDS = HOODIE_WS_ROOT + "/docker/demo/hive-incremental.commands"; - private static String HIVE_SYNC_CMD_FMT = " --enable-hive-sync " + " --hoodie-conf hoodie.datasource.hive_sync.jdbcurl=jdbc:hive2://hiveserver:10000 " + " --hoodie-conf hoodie.datasource.hive_sync.username=hive " @@ -64,7 +69,6 @@ public class ITTestHoodieDemo extends ITTestBase { + " --hoodie-conf hoodie.datasource.hive_sync.database=default " + " --hoodie-conf hoodie.datasource.hive_sync.table=%s"; - @Test public void testDemo() throws Exception { setupDemo(); @@ -72,11 +76,13 @@ public class ITTestHoodieDemo extends ITTestBase { // batch 1 ingestFirstBatchAndHiveSync(); testHiveAfterFirstBatch(); + testPrestoAfterFirstBatch(); testSparkSQLAfterFirstBatch(); // batch 2 ingestSecondBatchAndHiveSync(); testHiveAfterSecondBatch(); + testPrestoAfterSecondBatch(); testSparkSQLAfterSecondBatch(); testIncrementalHiveQuery(); testIncrementalSparkSQLQuery(); @@ -84,6 +90,7 @@ public class ITTestHoodieDemo extends ITTestBase { // compaction scheduleAndRunCompaction(); testHiveAfterSecondBatchAfterCompaction(); + testPrestoAfterSecondBatchAfterCompaction(); testIncrementalHiveQueryAfterCompaction(); } @@ -94,6 +101,16 @@ public class ITTestHoodieDemo extends ITTestBase { .add("hdfs dfs -copyFromLocal -f " + INPUT_BATCH_PATH1 + " " + HDFS_BATCH_PATH1) .add("/bin/bash " + DEMO_CONTAINER_SCRIPT).build(); executeCommandStringsInDocker(ADHOC_1_CONTAINER, cmds); + + // create input dir in presto coordinator + cmds = new ImmutableList.Builder() + .add("mkdir -p " + HDFS_DATA_DIR).build(); + executeCommandStringsInDocker(PRESTO_COORDINATOR, cmds); + + // copy presto sql files to presto coordinator + executePrestoCopyCommand( System.getProperty("user.dir") + "/.." + PRESTO_INPUT_TABLE_CHECK_RELATIVE_PATH, HDFS_DATA_DIR); + executePrestoCopyCommand( System.getProperty("user.dir") + "/.." + PRESTO_INPUT_BATCH1_RELATIVE_PATH, HDFS_DATA_DIR); + executePrestoCopyCommand( System.getProperty("user.dir") + "/.." + PRESTO_INPUT_BATCH2_RELATIVE_PATH, HDFS_DATA_DIR); } private void ingestFirstBatchAndHiveSync() throws Exception { @@ -168,6 +185,20 @@ public class ITTestHoodieDemo extends ITTestBase { executeCommandStringsInDocker(ADHOC_1_CONTAINER, cmds); } + private void testPrestoAfterFirstBatch() throws Exception { + Pair stdOutErrPair = executePrestoCommandFile(HDFS_PRESTO_INPUT_TABLE_CHECK_PATH); + assertStdOutContains(stdOutErrPair, "stock_ticks_cow"); + assertStdOutContains(stdOutErrPair, "stock_ticks_mor",2); + + stdOutErrPair = executePrestoCommandFile(HDFS_PRESTO_INPUT_BATCH1_PATH); + assertStdOutContains(stdOutErrPair, + "\"GOOG\",\"2018-08-31 10:29:00\"", 4); + assertStdOutContains(stdOutErrPair, + "\"GOOG\",\"2018-08-31 09:59:00\",\"6330\",\"1230.5\",\"1230.02\"", 2); + assertStdOutContains(stdOutErrPair, + "\"GOOG\",\"2018-08-31 10:29:00\",\"3391\",\"1230.1899\",\"1230.085\"", 2); + } + private void testHiveAfterSecondBatch() throws Exception { Pair stdOutErrPair = executeHiveCommandFile(HIVE_BATCH1_COMMANDS); assertStdOutContains(stdOutErrPair, "| symbol | _c1 |\n" + "+---------+----------------------+\n" @@ -187,6 +218,20 @@ public class ITTestHoodieDemo extends ITTestBase { 2); } + private void testPrestoAfterSecondBatch() throws Exception { + Pair stdOutErrPair = executePrestoCommandFile(HDFS_PRESTO_INPUT_BATCH1_PATH); + assertStdOutContains(stdOutErrPair, + "\"GOOG\",\"2018-08-31 10:29:00\"", 2); + assertStdOutContains(stdOutErrPair, + "\"GOOG\",\"2018-08-31 10:59:00\"", 2); + assertStdOutContains(stdOutErrPair, + "\"GOOG\",\"2018-08-31 09:59:00\",\"6330\",\"1230.5\",\"1230.02\"",2); + assertStdOutContains(stdOutErrPair, + "\"GOOG\",\"2018-08-31 10:29:00\",\"3391\",\"1230.1899\",\"1230.085\""); + assertStdOutContains(stdOutErrPair, + "\"GOOG\",\"2018-08-31 10:59:00\",\"9021\",\"1227.1993\",\"1227.215\""); + } + private void testHiveAfterSecondBatchAfterCompaction() throws Exception { Pair stdOutErrPair = executeHiveCommandFile(HIVE_BATCH2_COMMANDS); assertStdOutContains(stdOutErrPair, "| symbol | _c1 |\n" + "+---------+----------------------+\n" @@ -199,6 +244,16 @@ public class ITTestHoodieDemo extends ITTestBase { 2); } + private void testPrestoAfterSecondBatchAfterCompaction() throws Exception { + Pair stdOutErrPair = executePrestoCommandFile(HDFS_PRESTO_INPUT_BATCH2_PATH); + assertStdOutContains(stdOutErrPair, + "\"GOOG\",\"2018-08-31 10:59:00\"", 2); + assertStdOutContains(stdOutErrPair, + "\"GOOG\",\"2018-08-31 09:59:00\",\"6330\",\"1230.5\",\"1230.02\""); + assertStdOutContains(stdOutErrPair, + "\"GOOG\",\"2018-08-31 10:59:00\",\"9021\",\"1227.1993\",\"1227.215\""); + } + private void testSparkSQLAfterSecondBatch() throws Exception { Pair stdOutErrPair = executeSparkSQLCommand(SPARKSQL_BATCH2_COMMANDS, true); assertStdOutContains(stdOutErrPair,