1
0

[HUDI-218] Adding Presto support to Integration Test (#1003)

This commit is contained in:
Sivabalan Narayanan
2019-11-11 06:21:49 -08:00
committed by vinoth chandar
parent 5f1309407a
commit 23b303e4b1
5 changed files with 88 additions and 2 deletions

View File

@@ -52,6 +52,7 @@ public abstract class ITTestBase {
protected static final String ADHOC_1_CONTAINER = "/adhoc-1";
protected static final String ADHOC_2_CONTAINER = "/adhoc-2";
protected static final String HIVESERVER = "/hiveserver";
protected static final String PRESTO_COORDINATOR = "/presto-coordinator-1";
protected static final String HOODIE_WS_ROOT = "/var/hoodie/ws";
protected static final String HOODIE_JAVA_APP = HOODIE_WS_ROOT + "/hudi-spark/run_hoodie_app.sh";
protected static final String HUDI_HADOOP_BUNDLE =
@@ -63,6 +64,7 @@ public abstract class ITTestBase {
protected static final String HUDI_UTILITIES_BUNDLE =
HOODIE_WS_ROOT + "/docker/hoodie/hadoop/hive_base/target/hoodie-utilities.jar";
protected static final String HIVE_SERVER_JDBC_URL = "jdbc:hive2://hiveserver:10000";
protected static final String PRESTO_COORDINATOR_URL = "presto-coordinator-1:8090";
protected static final String HADOOP_CONF_DIR = "/etc/hadoop";
// Skip these lines when capturing output from hive
@@ -106,6 +108,14 @@ public abstract class ITTestBase {
.append(" --packages com.databricks:spark-avro_2.11:4.0.0 ").append(" -i ").append(commandFile).toString();
}
static String getPrestoConsoleCommand(String commandFile) {
StringBuilder builder = new StringBuilder().append("presto --server " + PRESTO_COORDINATOR_URL)
.append(" --catalog hive --schema default")
.append(" -f " + commandFile );
System.out.println("Presto comamnd " + builder.toString());
return builder.toString();
}
@Before
public void init() {
String dockerHost = (OVERRIDDEN_DOCKER_HOST != null) ? OVERRIDDEN_DOCKER_HOST : DEFAULT_DOCKER_HOST;
@@ -207,6 +217,20 @@ public abstract class ITTestBase {
return Pair.of(callback.getStdout().toString(), callback.getStderr().toString());
}
Pair<String, String> executePrestoCommandFile(String commandFile) throws Exception {
String prestoCmd = getPrestoConsoleCommand(commandFile);
TestExecStartResultCallback callback = executeCommandStringInDocker(PRESTO_COORDINATOR, prestoCmd, true);
return Pair.of(callback.getStdout().toString().trim(), callback.getStderr().toString().trim());
}
void executePrestoCopyCommand(String fromFile, String remotePath){
Container sparkWorkerContainer = runningContainers.get(PRESTO_COORDINATOR);
dockerClient.copyArchiveToContainerCmd(sparkWorkerContainer.getId())
.withHostResource(fromFile)
.withRemotePath(remotePath)
.exec();
}
private void saveUpLogs() {
try {
// save up the Hive log files for introspection

View File

@@ -34,9 +34,15 @@ public class ITTestHoodieDemo extends ITTestBase {
private static String HDFS_DATA_DIR = "/usr/hive/data/input";
private static String HDFS_BATCH_PATH1 = HDFS_DATA_DIR + "/" + "batch_1.json";
private static String HDFS_BATCH_PATH2 = HDFS_DATA_DIR + "/" + "batch_2.json";
private static String HDFS_PRESTO_INPUT_TABLE_CHECK_PATH = HDFS_DATA_DIR + "/" + "presto-table-check.commands";
private static String HDFS_PRESTO_INPUT_BATCH1_PATH = HDFS_DATA_DIR + "/" + "presto-batch1.commands";
private static String HDFS_PRESTO_INPUT_BATCH2_PATH = HDFS_DATA_DIR + "/" + "presto-batch2-after-compaction.commands";
private static String INPUT_BATCH_PATH1 = HOODIE_WS_ROOT + "/docker/demo/data/batch_1.json";
private static String PRESTO_INPUT_TABLE_CHECK_RELATIVE_PATH = "/docker/demo/presto-table-check.commands";
private static String PRESTO_INPUT_BATCH1_RELATIVE_PATH = "/docker/demo/presto-batch1.commands";
private static String INPUT_BATCH_PATH2 = HOODIE_WS_ROOT + "/docker/demo/data/batch_2.json";
private static String PRESTO_INPUT_BATCH2_RELATIVE_PATH = "/docker/demo/presto-batch2-after-compaction.commands";
private static String COW_BASE_PATH = "/user/hive/warehouse/stock_ticks_cow";
private static String MOR_BASE_PATH = "/user/hive/warehouse/stock_ticks_mor";
@@ -55,7 +61,6 @@ public class ITTestHoodieDemo extends ITTestBase {
private static String HIVE_BATCH2_COMMANDS = HOODIE_WS_ROOT + "/docker/demo/hive-batch2-after-compaction.commands";
private static String HIVE_INCREMENTAL_COMMANDS = HOODIE_WS_ROOT + "/docker/demo/hive-incremental.commands";
private static String HIVE_SYNC_CMD_FMT =
" --enable-hive-sync " + " --hoodie-conf hoodie.datasource.hive_sync.jdbcurl=jdbc:hive2://hiveserver:10000 "
+ " --hoodie-conf hoodie.datasource.hive_sync.username=hive "
@@ -64,7 +69,6 @@ public class ITTestHoodieDemo extends ITTestBase {
+ " --hoodie-conf hoodie.datasource.hive_sync.database=default "
+ " --hoodie-conf hoodie.datasource.hive_sync.table=%s";
@Test
public void testDemo() throws Exception {
setupDemo();
@@ -72,11 +76,13 @@ public class ITTestHoodieDemo extends ITTestBase {
// batch 1
ingestFirstBatchAndHiveSync();
testHiveAfterFirstBatch();
testPrestoAfterFirstBatch();
testSparkSQLAfterFirstBatch();
// batch 2
ingestSecondBatchAndHiveSync();
testHiveAfterSecondBatch();
testPrestoAfterSecondBatch();
testSparkSQLAfterSecondBatch();
testIncrementalHiveQuery();
testIncrementalSparkSQLQuery();
@@ -84,6 +90,7 @@ public class ITTestHoodieDemo extends ITTestBase {
// compaction
scheduleAndRunCompaction();
testHiveAfterSecondBatchAfterCompaction();
testPrestoAfterSecondBatchAfterCompaction();
testIncrementalHiveQueryAfterCompaction();
}
@@ -94,6 +101,16 @@ public class ITTestHoodieDemo extends ITTestBase {
.add("hdfs dfs -copyFromLocal -f " + INPUT_BATCH_PATH1 + " " + HDFS_BATCH_PATH1)
.add("/bin/bash " + DEMO_CONTAINER_SCRIPT).build();
executeCommandStringsInDocker(ADHOC_1_CONTAINER, cmds);
// create input dir in presto coordinator
cmds = new ImmutableList.Builder<String>()
.add("mkdir -p " + HDFS_DATA_DIR).build();
executeCommandStringsInDocker(PRESTO_COORDINATOR, cmds);
// copy presto sql files to presto coordinator
executePrestoCopyCommand( System.getProperty("user.dir") + "/.." + PRESTO_INPUT_TABLE_CHECK_RELATIVE_PATH, HDFS_DATA_DIR);
executePrestoCopyCommand( System.getProperty("user.dir") + "/.." + PRESTO_INPUT_BATCH1_RELATIVE_PATH, HDFS_DATA_DIR);
executePrestoCopyCommand( System.getProperty("user.dir") + "/.." + PRESTO_INPUT_BATCH2_RELATIVE_PATH, HDFS_DATA_DIR);
}
private void ingestFirstBatchAndHiveSync() throws Exception {
@@ -168,6 +185,20 @@ public class ITTestHoodieDemo extends ITTestBase {
executeCommandStringsInDocker(ADHOC_1_CONTAINER, cmds);
}
private void testPrestoAfterFirstBatch() throws Exception {
Pair<String, String> stdOutErrPair = executePrestoCommandFile(HDFS_PRESTO_INPUT_TABLE_CHECK_PATH);
assertStdOutContains(stdOutErrPair, "stock_ticks_cow");
assertStdOutContains(stdOutErrPair, "stock_ticks_mor",2);
stdOutErrPair = executePrestoCommandFile(HDFS_PRESTO_INPUT_BATCH1_PATH);
assertStdOutContains(stdOutErrPair,
"\"GOOG\",\"2018-08-31 10:29:00\"", 4);
assertStdOutContains(stdOutErrPair,
"\"GOOG\",\"2018-08-31 09:59:00\",\"6330\",\"1230.5\",\"1230.02\"", 2);
assertStdOutContains(stdOutErrPair,
"\"GOOG\",\"2018-08-31 10:29:00\",\"3391\",\"1230.1899\",\"1230.085\"", 2);
}
private void testHiveAfterSecondBatch() throws Exception {
Pair<String, String> stdOutErrPair = executeHiveCommandFile(HIVE_BATCH1_COMMANDS);
assertStdOutContains(stdOutErrPair, "| symbol | _c1 |\n" + "+---------+----------------------+\n"
@@ -187,6 +218,20 @@ public class ITTestHoodieDemo extends ITTestBase {
2);
}
private void testPrestoAfterSecondBatch() throws Exception {
Pair<String, String> stdOutErrPair = executePrestoCommandFile(HDFS_PRESTO_INPUT_BATCH1_PATH);
assertStdOutContains(stdOutErrPair,
"\"GOOG\",\"2018-08-31 10:29:00\"", 2);
assertStdOutContains(stdOutErrPair,
"\"GOOG\",\"2018-08-31 10:59:00\"", 2);
assertStdOutContains(stdOutErrPair,
"\"GOOG\",\"2018-08-31 09:59:00\",\"6330\",\"1230.5\",\"1230.02\"",2);
assertStdOutContains(stdOutErrPair,
"\"GOOG\",\"2018-08-31 10:29:00\",\"3391\",\"1230.1899\",\"1230.085\"");
assertStdOutContains(stdOutErrPair,
"\"GOOG\",\"2018-08-31 10:59:00\",\"9021\",\"1227.1993\",\"1227.215\"");
}
private void testHiveAfterSecondBatchAfterCompaction() throws Exception {
Pair<String, String> stdOutErrPair = executeHiveCommandFile(HIVE_BATCH2_COMMANDS);
assertStdOutContains(stdOutErrPair, "| symbol | _c1 |\n" + "+---------+----------------------+\n"
@@ -199,6 +244,16 @@ public class ITTestHoodieDemo extends ITTestBase {
2);
}
private void testPrestoAfterSecondBatchAfterCompaction() throws Exception {
Pair<String, String> stdOutErrPair = executePrestoCommandFile(HDFS_PRESTO_INPUT_BATCH2_PATH);
assertStdOutContains(stdOutErrPair,
"\"GOOG\",\"2018-08-31 10:59:00\"", 2);
assertStdOutContains(stdOutErrPair,
"\"GOOG\",\"2018-08-31 09:59:00\",\"6330\",\"1230.5\",\"1230.02\"");
assertStdOutContains(stdOutErrPair,
"\"GOOG\",\"2018-08-31 10:59:00\",\"9021\",\"1227.1993\",\"1227.215\"");
}
private void testSparkSQLAfterSecondBatch() throws Exception {
Pair<String, String> stdOutErrPair = executeSparkSQLCommand(SPARKSQL_BATCH2_COMMANDS, true);
assertStdOutContains(stdOutErrPair,