[HUDI-3586] Add Trino Queries in integration tests (#4988)
This commit is contained in:
@@ -62,6 +62,7 @@ public abstract class ITTestBase {
|
||||
protected static final String ADHOC_2_CONTAINER = "/adhoc-2";
|
||||
protected static final String HIVESERVER = "/hiveserver";
|
||||
protected static final String PRESTO_COORDINATOR = "/presto-coordinator-1";
|
||||
protected static final String TRINO_COORDINATOR = "/trino-coordinator-1";
|
||||
protected static final String HOODIE_WS_ROOT = "/var/hoodie/ws";
|
||||
protected static final String HOODIE_JAVA_APP = HOODIE_WS_ROOT + "/hudi-spark-datasource/hudi-spark/run_hoodie_app.sh";
|
||||
protected static final String HOODIE_GENERATE_APP = HOODIE_WS_ROOT + "/hudi-spark-datasource/hudi-spark/run_hoodie_generate_app.sh";
|
||||
@@ -76,6 +77,7 @@ public abstract class ITTestBase {
|
||||
HOODIE_WS_ROOT + "/docker/hoodie/hadoop/hive_base/target/hoodie-utilities.jar";
|
||||
protected static final String HIVE_SERVER_JDBC_URL = "jdbc:hive2://hiveserver:10000";
|
||||
protected static final String PRESTO_COORDINATOR_URL = "presto-coordinator-1:8090";
|
||||
protected static final String TRINO_COORDINATOR_URL = "trino-coordinator-1:8091";
|
||||
protected static final String HADOOP_CONF_DIR = "/etc/hadoop";
|
||||
|
||||
// Skip these lines when capturing output from hive
|
||||
@@ -122,6 +124,12 @@ public abstract class ITTestBase {
|
||||
.append(" -f " + commandFile).toString();
|
||||
}
|
||||
|
||||
static String getTrinoConsoleCommand(String commandFile) {
|
||||
return new StringBuilder().append("trino --server " + TRINO_COORDINATOR_URL)
|
||||
.append(" --catalog hive --schema default")
|
||||
.append(" -f " + commandFile).toString();
|
||||
}
|
||||
|
||||
@BeforeEach
|
||||
public void init() {
|
||||
String dockerHost = (OVERRIDDEN_DOCKER_HOST != null) ? OVERRIDDEN_DOCKER_HOST : DEFAULT_DOCKER_HOST;
|
||||
@@ -309,6 +317,20 @@ public abstract class ITTestBase {
|
||||
.exec();
|
||||
}
|
||||
|
||||
Pair<String, String> executeTrinoCommandFile(String commandFile) throws Exception {
|
||||
String trinoCmd = getTrinoConsoleCommand(commandFile);
|
||||
TestExecStartResultCallback callback = executeCommandStringInDocker(ADHOC_1_CONTAINER, trinoCmd, true);
|
||||
return Pair.of(callback.getStdout().toString().trim(), callback.getStderr().toString().trim());
|
||||
}
|
||||
|
||||
void executeTrinoCopyCommand(String fromFile, String remotePath) {
|
||||
Container adhocContainer = runningContainers.get(ADHOC_1_CONTAINER);
|
||||
dockerClient.copyArchiveToContainerCmd(adhocContainer.getId())
|
||||
.withHostResource(fromFile)
|
||||
.withRemotePath(remotePath)
|
||||
.exec();
|
||||
}
|
||||
|
||||
private void saveUpLogs() {
|
||||
try {
|
||||
// save up the Hive log files for introspection
|
||||
|
||||
@@ -21,7 +21,6 @@ package org.apache.hudi.integ;
|
||||
import org.apache.hudi.common.model.HoodieFileFormat;
|
||||
import org.apache.hudi.common.util.CollectionUtils;
|
||||
import org.apache.hudi.common.util.collection.Pair;
|
||||
|
||||
import org.apache.hudi.keygen.SimpleKeyGenerator;
|
||||
|
||||
import org.junit.jupiter.api.AfterEach;
|
||||
@@ -33,24 +32,34 @@ import java.util.List;
|
||||
|
||||
/**
|
||||
* Goes through steps described in https://hudi.apache.org/docker_demo.html
|
||||
*
|
||||
* <p>
|
||||
* To run this as a standalone test in the IDE or command line. First bring up the demo setup using
|
||||
* `docker/setup_demo.sh` and then run the test class as you would do normally.
|
||||
*/
|
||||
public class ITTestHoodieDemo extends ITTestBase {
|
||||
|
||||
private static final String TRINO_TABLE_CHECK_FILENAME = "trino-table-check.commands";
|
||||
private static final String TRINO_BATCH1_FILENAME = "trino-batch1.commands";
|
||||
private static final String TRINO_BATCH2_FILENAME = "trino-batch2-after-compaction.commands";
|
||||
|
||||
private static final String HDFS_DATA_DIR = "/usr/hive/data/input";
|
||||
private static final String HDFS_BATCH_PATH1 = HDFS_DATA_DIR + "/batch_1.json";
|
||||
private static final String HDFS_BATCH_PATH2 = HDFS_DATA_DIR + "/batch_2.json";
|
||||
private static final String HDFS_PRESTO_INPUT_TABLE_CHECK_PATH = HDFS_DATA_DIR + "/presto-table-check.commands";
|
||||
private static final String HDFS_PRESTO_INPUT_BATCH1_PATH = HDFS_DATA_DIR + "/presto-batch1.commands";
|
||||
private static final String HDFS_PRESTO_INPUT_BATCH2_PATH = HDFS_DATA_DIR + "/presto-batch2-after-compaction.commands";
|
||||
private static final String HDFS_TRINO_INPUT_TABLE_CHECK_PATH = HDFS_DATA_DIR + "/" + TRINO_TABLE_CHECK_FILENAME;
|
||||
private static final String HDFS_TRINO_INPUT_BATCH1_PATH = HDFS_DATA_DIR + "/" + TRINO_BATCH1_FILENAME;
|
||||
private static final String HDFS_TRINO_INPUT_BATCH2_PATH = HDFS_DATA_DIR + "/" + TRINO_BATCH2_FILENAME;
|
||||
|
||||
private static final String INPUT_BATCH_PATH1 = HOODIE_WS_ROOT + "/docker/demo/data/batch_1.json";
|
||||
private static final String PRESTO_INPUT_TABLE_CHECK_RELATIVE_PATH = "/docker/demo/presto-table-check.commands";
|
||||
private static final String PRESTO_INPUT_BATCH1_RELATIVE_PATH = "/docker/demo/presto-batch1.commands";
|
||||
private static final String INPUT_BATCH_PATH2 = HOODIE_WS_ROOT + "/docker/demo/data/batch_2.json";
|
||||
private static final String PRESTO_INPUT_BATCH2_RELATIVE_PATH = "/docker/demo/presto-batch2-after-compaction.commands";
|
||||
private static final String TRINO_INPUT_TABLE_CHECK_RELATIVE_PATH = "/docker/demo/" + TRINO_TABLE_CHECK_FILENAME;
|
||||
private static final String TRINO_INPUT_BATCH1_RELATIVE_PATH = "/docker/demo/" + TRINO_BATCH1_FILENAME;
|
||||
private static final String TRINO_INPUT_BATCH2_RELATIVE_PATH = "/docker/demo/" + TRINO_BATCH2_FILENAME;
|
||||
|
||||
private static final String COW_BASE_PATH = "/user/hive/warehouse/stock_ticks_cow";
|
||||
private static final String MOR_BASE_PATH = "/user/hive/warehouse/stock_ticks_mor";
|
||||
@@ -110,12 +119,14 @@ public class ITTestHoodieDemo extends ITTestBase {
|
||||
ingestFirstBatchAndHiveSync();
|
||||
testHiveAfterFirstBatch();
|
||||
testPrestoAfterFirstBatch();
|
||||
testTrinoAfterFirstBatch();
|
||||
testSparkSQLAfterFirstBatch();
|
||||
|
||||
// batch 2
|
||||
ingestSecondBatchAndHiveSync();
|
||||
testHiveAfterSecondBatch();
|
||||
testPrestoAfterSecondBatch();
|
||||
testTrinoAfterSecondBatch();
|
||||
testSparkSQLAfterSecondBatch();
|
||||
testIncrementalHiveQueryBeforeCompaction();
|
||||
testIncrementalSparkSQLQuery();
|
||||
@@ -125,6 +136,7 @@ public class ITTestHoodieDemo extends ITTestBase {
|
||||
|
||||
testHiveAfterSecondBatchAfterCompaction();
|
||||
testPrestoAfterSecondBatchAfterCompaction();
|
||||
testTrinoAfterSecondBatchAfterCompaction();
|
||||
testIncrementalHiveQueryAfterCompaction();
|
||||
}
|
||||
|
||||
@@ -133,7 +145,7 @@ public class ITTestHoodieDemo extends ITTestBase {
|
||||
public void testHFileDemo() throws Exception {
|
||||
baseFileFormat = HoodieFileFormat.HFILE;
|
||||
|
||||
// TODO: Preseto and SparkSQL support for HFile format
|
||||
// TODO: Presto, Trino and SparkSQL support for HFile format
|
||||
|
||||
setupDemo();
|
||||
|
||||
@@ -141,12 +153,14 @@ public class ITTestHoodieDemo extends ITTestBase {
|
||||
ingestFirstBatchAndHiveSync();
|
||||
testHiveAfterFirstBatch();
|
||||
//testPrestoAfterFirstBatch();
|
||||
//testTrinoAfterFirstBatch();
|
||||
//testSparkSQLAfterFirstBatch();
|
||||
|
||||
// batch 2
|
||||
ingestSecondBatchAndHiveSync();
|
||||
testHiveAfterSecondBatch();
|
||||
//testPrestoAfterSecondBatch();
|
||||
//testTrinoAfterSecondBatch();
|
||||
//testSparkSQLAfterSecondBatch();
|
||||
testIncrementalHiveQueryBeforeCompaction();
|
||||
//testIncrementalSparkSQLQuery();
|
||||
@@ -155,6 +169,7 @@ public class ITTestHoodieDemo extends ITTestBase {
|
||||
scheduleAndRunCompaction();
|
||||
testHiveAfterSecondBatchAfterCompaction();
|
||||
//testPrestoAfterSecondBatchAfterCompaction();
|
||||
//testTrinoAfterSecondBatchAfterCompaction();
|
||||
//testIncrementalHiveQueryAfterCompaction();
|
||||
}
|
||||
|
||||
@@ -162,7 +177,8 @@ public class ITTestHoodieDemo extends ITTestBase {
|
||||
List<String> cmds = CollectionUtils.createImmutableList("hdfs dfsadmin -safemode wait",
|
||||
"hdfs dfs -mkdir -p " + HDFS_DATA_DIR,
|
||||
"hdfs dfs -copyFromLocal -f " + INPUT_BATCH_PATH1 + " " + HDFS_BATCH_PATH1,
|
||||
"/bin/bash " + DEMO_CONTAINER_SCRIPT);
|
||||
"/bin/bash " + DEMO_CONTAINER_SCRIPT,
|
||||
"mkdir -p " + HDFS_DATA_DIR);
|
||||
|
||||
executeCommandStringsInDocker(ADHOC_1_CONTAINER, cmds);
|
||||
|
||||
@@ -174,6 +190,10 @@ public class ITTestHoodieDemo extends ITTestBase {
|
||||
executePrestoCopyCommand(System.getProperty("user.dir") + "/.." + PRESTO_INPUT_TABLE_CHECK_RELATIVE_PATH, HDFS_DATA_DIR);
|
||||
executePrestoCopyCommand(System.getProperty("user.dir") + "/.." + PRESTO_INPUT_BATCH1_RELATIVE_PATH, HDFS_DATA_DIR);
|
||||
executePrestoCopyCommand(System.getProperty("user.dir") + "/.." + PRESTO_INPUT_BATCH2_RELATIVE_PATH, HDFS_DATA_DIR);
|
||||
|
||||
executeTrinoCopyCommand(System.getProperty("user.dir") + "/.." + TRINO_INPUT_TABLE_CHECK_RELATIVE_PATH, HDFS_DATA_DIR);
|
||||
executeTrinoCopyCommand(System.getProperty("user.dir") + "/.." + TRINO_INPUT_BATCH1_RELATIVE_PATH, HDFS_DATA_DIR);
|
||||
executeTrinoCopyCommand(System.getProperty("user.dir") + "/.." + TRINO_INPUT_BATCH2_RELATIVE_PATH, HDFS_DATA_DIR);
|
||||
}
|
||||
|
||||
private void ingestFirstBatchAndHiveSync() throws Exception {
|
||||
@@ -335,6 +355,20 @@ public class ITTestHoodieDemo extends ITTestBase {
|
||||
"\"GOOG\",\"2018-08-31 10:29:00\",\"3391\",\"1230.1899\",\"1230.085\"", 2);
|
||||
}
|
||||
|
||||
private void testTrinoAfterFirstBatch() throws Exception {
|
||||
Pair<String, String> stdOutErrPair = executeTrinoCommandFile(HDFS_TRINO_INPUT_TABLE_CHECK_PATH);
|
||||
assertStdOutContains(stdOutErrPair, "stock_ticks_cow", 2);
|
||||
assertStdOutContains(stdOutErrPair, "stock_ticks_mor", 4);
|
||||
|
||||
stdOutErrPair = executeTrinoCommandFile(HDFS_TRINO_INPUT_BATCH1_PATH);
|
||||
assertStdOutContains(stdOutErrPair,
|
||||
"\"GOOG\",\"2018-08-31 10:29:00\"", 4);
|
||||
assertStdOutContains(stdOutErrPair,
|
||||
"\"GOOG\",\"2018-08-31 09:59:00\",\"6330\",\"1230.5\",\"1230.02\"", 2);
|
||||
assertStdOutContains(stdOutErrPair,
|
||||
"\"GOOG\",\"2018-08-31 10:29:00\",\"3391\",\"1230.1899\",\"1230.085\"", 2);
|
||||
}
|
||||
|
||||
private void testHiveAfterSecondBatch() throws Exception {
|
||||
Pair<String, String> stdOutErrPair = executeHiveCommandFile(HIVE_BATCH1_COMMANDS);
|
||||
assertStdOutContains(stdOutErrPair, "| symbol | _c1 |\n+---------+----------------------+\n"
|
||||
@@ -361,7 +395,21 @@ public class ITTestHoodieDemo extends ITTestBase {
|
||||
assertStdOutContains(stdOutErrPair,
|
||||
"\"GOOG\",\"2018-08-31 10:59:00\"", 2);
|
||||
assertStdOutContains(stdOutErrPair,
|
||||
"\"GOOG\",\"2018-08-31 09:59:00\",\"6330\",\"1230.5\",\"1230.02\"",2);
|
||||
"\"GOOG\",\"2018-08-31 09:59:00\",\"6330\",\"1230.5\",\"1230.02\"", 2);
|
||||
assertStdOutContains(stdOutErrPair,
|
||||
"\"GOOG\",\"2018-08-31 10:29:00\",\"3391\",\"1230.1899\",\"1230.085\"");
|
||||
assertStdOutContains(stdOutErrPair,
|
||||
"\"GOOG\",\"2018-08-31 10:59:00\",\"9021\",\"1227.1993\",\"1227.215\"");
|
||||
}
|
||||
|
||||
private void testTrinoAfterSecondBatch() throws Exception {
|
||||
Pair<String, String> stdOutErrPair = executeTrinoCommandFile(HDFS_TRINO_INPUT_BATCH1_PATH);
|
||||
assertStdOutContains(stdOutErrPair,
|
||||
"\"GOOG\",\"2018-08-31 10:29:00\"", 2);
|
||||
assertStdOutContains(stdOutErrPair,
|
||||
"\"GOOG\",\"2018-08-31 10:59:00\"", 2);
|
||||
assertStdOutContains(stdOutErrPair,
|
||||
"\"GOOG\",\"2018-08-31 09:59:00\",\"6330\",\"1230.5\",\"1230.02\"", 2);
|
||||
assertStdOutContains(stdOutErrPair,
|
||||
"\"GOOG\",\"2018-08-31 10:29:00\",\"3391\",\"1230.1899\",\"1230.085\"");
|
||||
assertStdOutContains(stdOutErrPair,
|
||||
@@ -390,6 +438,16 @@ public class ITTestHoodieDemo extends ITTestBase {
|
||||
"\"GOOG\",\"2018-08-31 10:59:00\",\"9021\",\"1227.1993\",\"1227.215\"");
|
||||
}
|
||||
|
||||
private void testTrinoAfterSecondBatchAfterCompaction() throws Exception {
|
||||
Pair<String, String> stdOutErrPair = executeTrinoCommandFile(HDFS_TRINO_INPUT_BATCH2_PATH);
|
||||
assertStdOutContains(stdOutErrPair,
|
||||
"\"GOOG\",\"2018-08-31 10:59:00\"", 2);
|
||||
assertStdOutContains(stdOutErrPair,
|
||||
"\"GOOG\",\"2018-08-31 09:59:00\",\"6330\",\"1230.5\",\"1230.02\"");
|
||||
assertStdOutContains(stdOutErrPair,
|
||||
"\"GOOG\",\"2018-08-31 10:59:00\",\"9021\",\"1227.1993\",\"1227.215\"");
|
||||
}
|
||||
|
||||
private void testSparkSQLAfterSecondBatch() throws Exception {
|
||||
Pair<String, String> stdOutErrPair = executeSparkSQLCommand(SPARKSQL_BATCH2_COMMANDS, true);
|
||||
assertStdOutContains(stdOutErrPair,
|
||||
|
||||
Reference in New Issue
Block a user