1
0

[HUDI-68] Pom cleanup & demo automation (#846)

- [HUDI-172] Cleanup Maven POM/Classpath
  - Fix ordering of dependencies in poms, to enable better resolution
  - Idea is to place more specific ones at the top
  - And place dependencies which use them below them
- [HUDI-68] : Automate demo steps on docker setup
 - Move hive queries from hive cli to beeline
 - Standardize on taking query input from text command files
 - Deltastreamer ingest, also does hive sync in a single step
 - Spark Incremental Query materialized as a derived Hive table using datasource
 - Fix flakiness in HDFS spin up and output comparison
 - Code cleanup around streamlining and loc reduction
 - Also fixed pom to not shade some hive classs in spark, to enable hive sync
This commit is contained in:
vinoth chandar
2019-08-22 20:18:50 -07:00
committed by GitHub
parent 92eed6aca8
commit 6edf0b9def
34 changed files with 2382 additions and 1681 deletions

View File

@@ -31,14 +31,14 @@ import com.github.dockerjava.core.DockerClientBuilder;
import com.github.dockerjava.core.DockerClientConfig;
import com.github.dockerjava.core.command.ExecStartResultCallback;
import com.github.dockerjava.jaxrs.JerseyDockerCmdExecFactory;
import com.google.common.collect.ImmutableList;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.junit.Assert;
@@ -52,38 +52,64 @@ public abstract class ITTestBase {
protected static final String ADHOC_2_CONTAINER = "/adhoc-2";
protected static final String HIVESERVER = "/hiveserver";
protected static final String HOODIE_WS_ROOT = "/var/hoodie/ws";
protected static final String HOODIE_JAVA_APP = HOODIE_WS_ROOT + "/hoodie-spark/run_hoodie_app.sh";
protected static final String HOODIE_JAVA_APP = HOODIE_WS_ROOT + "/hudi-spark/run_hoodie_app.sh";
protected static final String HUDI_HADOOP_BUNDLE =
HOODIE_WS_ROOT + "/docker/hoodie/hadoop/hive_base/target/hoodie-hadoop-mr-bundle.jar";
protected static final String HUDI_HIVE_BUNDLE =
HOODIE_WS_ROOT + "/docker/hoodie/hadoop/hive_base/target/hoodie-hive-bundle.jar";
protected static final String HUDI_SPARK_BUNDLE =
HOODIE_WS_ROOT + "/docker/hoodie/hadoop/hive_base/target/hoodie-spark-bundle.jar";
protected static final String HUDI_UTILITIES_BUNDLE =
HOODIE_WS_ROOT + "/docker/hoodie/hadoop/hive_base/target/hoodie-utilities.jar";
protected static final String HIVE_SERVER_JDBC_URL = "jdbc:hive2://hiveserver:10000";
protected static final String HADOOP_CONF_DIR = "/etc/hadoop";
// Skip these lines when capturing output from hive
protected static final Integer SLF4J_WARNING_LINE_COUNT_IN_HIVE_CMD = 9;
private static final String DEFAULT_DOCKER_HOST = "unix:///var/run/docker.sock";
private static final String OVERRIDDEN_DOCKER_HOST = System.getenv("DOCKER_HOST");
protected DockerClient dockerClient;
protected Map<String, Container> runningContainers;
protected static String[] getHiveConsoleCommand(String rawCommand) {
static String[] getHiveConsoleCommand(String rawCommand) {
String jarCommand = "add jar " + HUDI_HADOOP_BUNDLE + ";";
String fullCommand = jarCommand + rawCommand;
List<String> cmd = new ImmutableList.Builder().add("hive")
.add("--hiveconf")
.add("hive.input.format=org.apache.hadoop.hive.ql.io.HiveInputFormat")
.add("--hiveconf")
.add("hive.stats.autogather=false")
.add("-e")
.add("\"" + fullCommand + "\"")
.build();
List<String> cmd = new ArrayList<>();
cmd.add("hive");
cmd.add("--hiveconf");
cmd.add("hive.input.format=org.apache.hadoop.hive.ql.io.HiveInputFormat");
cmd.add("--hiveconf");
cmd.add("hive.stats.autogather=false");
cmd.add("-e");
cmd.add("\"" + fullCommand + "\"");
return cmd.stream().toArray(String[]::new);
}
private static String getHiveConsoleCommandFile(String commandFile, String additionalVar) {
StringBuilder builder = new StringBuilder()
.append("beeline -u " + HIVE_SERVER_JDBC_URL)
.append(" --hiveconf hive.input.format=org.apache.hadoop.hive.ql.io.HiveInputFormat ")
.append(" --hiveconf hive.stats.autogather=false ")
.append(" --hivevar hudi.hadoop.bundle=" + HUDI_HADOOP_BUNDLE);
if (additionalVar != null) {
builder.append(" --hivevar " + additionalVar + " ");
}
return builder.append(" -f ").append(commandFile).toString();
}
static String getSparkShellCommand(String commandFile) {
return new StringBuilder()
.append("spark-shell --jars ").append(HUDI_SPARK_BUNDLE)
.append(" --master local[2] --driver-class-path ").append(HADOOP_CONF_DIR)
.append(" --conf spark.sql.hive.convertMetastoreParquet=false --deploy-mode client --driver-memory 1G --executor-memory 1G --num-executors 1 ")
.append(" --packages com.databricks:spark-avro_2.11:4.0.0 ")
.append(" -i ").append(commandFile)
.toString();
}
@Before
public void init() throws IOException {
public void init() {
String dockerHost = (OVERRIDDEN_DOCKER_HOST != null) ? OVERRIDDEN_DOCKER_HOST : DEFAULT_DOCKER_HOST;
//Assuming insecure docker engine
DockerClientConfig config = DefaultDockerClientConfig.createDefaultConfigBuilder()
@@ -104,7 +130,7 @@ public abstract class ITTestBase {
List<Container> containerList = dockerClient.listContainersCmd().exec();
for (Container c : containerList) {
if (!c.getState().equalsIgnoreCase("running")) {
System.out.println("Container : " + Arrays.toString(c.getNames())
LOG.info("Container : " + Arrays.toString(c.getNames())
+ "not in running state, Curr State :" + c.getState());
return false;
}
@@ -114,10 +140,12 @@ public abstract class ITTestBase {
return true;
}
protected TestExecStartResultCallback executeCommandInDocker(String containerName, String[] command,
boolean expectedToSucceed)
throws Exception {
LOG.info("Executing command (" + Arrays.toString(command) + ") in container " + containerName);
private String singleSpace(String str) {
return str.replaceAll("[\\s]+"," ");
}
private TestExecStartResultCallback executeCommandInDocker(String containerName, String[] command,
boolean expectedToSucceed) throws Exception {
Container sparkWorkerContainer = runningContainers.get(containerName);
ExecCreateCmd cmd = dockerClient.execCreateCmd(sparkWorkerContainer.getId())
.withCmd(command).withAttachStdout(true).withAttachStderr(true);
@@ -128,12 +156,10 @@ public abstract class ITTestBase {
dockerClient.execStartCmd(createCmdResponse.getId()).withDetach(false).withTty(false)
.exec(callback).awaitCompletion();
int exitCode = dockerClient.inspectExecCmd(createCmdResponse.getId()).exec().getExitCode();
LOG.info("Exit code for command (" + Arrays.toString(command) + ") is " + exitCode);
if (exitCode != 0) {
LOG.error("Command (" + Arrays.toString(command) + ") failed.");
LOG.error("Stdout is :" + callback.getStdout().toString());
LOG.error("Stderr is :" + callback.getStderr().toString());
}
LOG.info("Exit code for command : " + exitCode);
LOG.error("\n\n ###### Stdout #######\n" + callback.getStdout().toString());
LOG.error("\n\n ###### Stderr #######\n" + callback.getStderr().toString());
if (expectedToSucceed) {
Assert.assertTrue("Command (" + Arrays.toString(command)
+ ") expected to succeed. Exit (" + exitCode + ")", exitCode == 0);
@@ -145,6 +171,71 @@ public abstract class ITTestBase {
return callback;
}
void executeCommandStringsInDocker(String containerName, List<String> commands) throws Exception {
for (String cmd : commands) {
executeCommandStringInDocker(containerName, cmd, true);
}
}
TestExecStartResultCallback executeCommandStringInDocker(String containerName, String cmd,
boolean expectedToSucceed) throws Exception {
LOG.info("\n\n#################################################################################################");
LOG.info("Container : " + containerName + ", Running command :" + cmd);
LOG.info("\n#################################################################################################");
String[] cmdSplits = singleSpace(cmd).split(" ");
return executeCommandInDocker(containerName, cmdSplits, expectedToSucceed);
}
Pair<String, String> executeHiveCommand(String hiveCommand) throws Exception {
LOG.info("\n\n#################################################################################################");
LOG.info("Running hive command :" + hiveCommand);
LOG.info("\n#################################################################################################");
String[] hiveCmd = getHiveConsoleCommand(hiveCommand);
TestExecStartResultCallback callback = executeCommandInDocker(HIVESERVER, hiveCmd, true);
return Pair.of(callback.getStdout().toString().trim(), callback.getStderr().toString().trim());
}
Pair<String, String> executeHiveCommandFile(String commandFile) throws Exception {
return executeHiveCommandFile(commandFile, null);
}
Pair<String, String> executeHiveCommandFile(String commandFile, String additionalVar) throws Exception {
String hiveCmd = getHiveConsoleCommandFile(commandFile, additionalVar);
TestExecStartResultCallback callback = executeCommandStringInDocker(HIVESERVER, hiveCmd, true);
return Pair.of(callback.getStdout().toString().trim(), callback.getStderr().toString().trim());
}
Pair<String, String> executeSparkSQLCommand(String commandFile, boolean expectedToSucceed) throws Exception {
String sparkShellCmd = getSparkShellCommand(commandFile);
TestExecStartResultCallback callback = executeCommandStringInDocker(ADHOC_1_CONTAINER,
sparkShellCmd, expectedToSucceed);
return Pair.of(callback.getStdout().toString(), callback.getStderr().toString());
}
void assertStdOutContains(Pair<String, String> stdOutErr, String expectedOutput) {
assertStdOutContains(stdOutErr, expectedOutput, 1);
}
void assertStdOutContains(Pair<String, String> stdOutErr, String expectedOutput, int times) {
// this is so that changes in padding don't affect comparison
String stdOutSingleSpaced = singleSpace(stdOutErr.getLeft()).replaceAll(" ", "");
expectedOutput = singleSpace(expectedOutput).replaceAll(" ", "");
int lastIndex = 0;
int count = 0;
while(lastIndex != -1){
lastIndex = stdOutSingleSpaced.indexOf(expectedOutput, lastIndex);
if(lastIndex != -1){
count ++;
lastIndex += expectedOutput.length();
}
}
Assert.assertEquals("Did not find output the expected number of times", times, count);
}
public class TestExecStartResultCallback extends ExecStartResultCallback {
// Storing the reference in subclass to expose to clients

View File

@@ -0,0 +1,271 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.integ;
import com.google.common.collect.ImmutableList;
import org.apache.hudi.common.util.collection.Pair;
import java.util.List;
import org.junit.Test;
/**
* Goes through steps described in https://hudi.incubator.apache.org/docker_demo.html
*
* To run this as a standalone test in the IDE or command line. First bring up the demo setup using
* `docker/setup_demo.sh` and then run the test class as you would do normally.
*/
public class ITTestHoodieDemo extends ITTestBase {
private static String HDFS_DATA_DIR = "/usr/hive/data/input";
private static String HDFS_BATCH_PATH1 = HDFS_DATA_DIR + "/" + "batch_1.json";
private static String HDFS_BATCH_PATH2 = HDFS_DATA_DIR + "/" + "batch_2.json";
private static String INPUT_BATCH_PATH1 = HOODIE_WS_ROOT +
"/docker/demo/data/batch_1.json";
private static String INPUT_BATCH_PATH2 = HOODIE_WS_ROOT +
"/docker/demo/data/batch_2.json";
private static String COW_BASE_PATH = "/user/hive/warehouse/stock_ticks_cow";
private static String MOR_BASE_PATH = "/user/hive/warehouse/stock_ticks_mor";
private static String COW_TABLE_NAME = "stock_ticks_cow";
private static String MOR_TABLE_NAME = "stock_ticks_mor";
private static String DEMO_CONTAINER_SCRIPT = HOODIE_WS_ROOT + "/docker/demo/setup_demo_container.sh";
private static String MIN_COMMIT_TIME_SCRIPT = HOODIE_WS_ROOT + "/docker/demo/get_min_commit_time.sh";
private static String HUDI_CLI_TOOL = HOODIE_WS_ROOT + "/hudi-cli/hudi-cli.sh";
private static String COMPACTION_COMMANDS = HOODIE_WS_ROOT + "/docker/demo/compaction.commands";
private static String SPARKSQL_BATCH1_COMMANDS = HOODIE_WS_ROOT + "/docker/demo/sparksql-batch1.commands";
private static String SPARKSQL_BATCH2_COMMANDS = HOODIE_WS_ROOT + "/docker/demo/sparksql-batch2.commands";
private static String SPARKSQL_INCREMENTAL_COMMANDS = HOODIE_WS_ROOT + "/docker/demo/sparksql-incremental.commands";
private static String HIVE_TBLCHECK_COMMANDS = HOODIE_WS_ROOT + "/docker/demo/hive-table-check.commands";
private static String HIVE_BATCH1_COMMANDS = HOODIE_WS_ROOT + "/docker/demo/hive-batch1.commands";
private static String HIVE_BATCH2_COMMANDS = HOODIE_WS_ROOT + "/docker/demo/hive-batch2-after-compaction.commands";
private static String HIVE_INCREMENTAL_COMMANDS = HOODIE_WS_ROOT + "/docker/demo/hive-incremental.commands";
private static String HIVE_SYNC_CMD_FMT = " --enable-hive-sync "
+ " --hoodie-conf hoodie.datasource.hive_sync.jdbcurl=jdbc:hive2://hiveserver:10000 "
+ " --hoodie-conf hoodie.datasource.hive_sync.username=hive "
+ " --hoodie-conf hoodie.datasource.hive_sync.password=hive "
+ " --hoodie-conf hoodie.datasource.hive_sync.partition_fields=%s "
+ " --hoodie-conf hoodie.datasource.hive_sync.database=default "
+ " --hoodie-conf hoodie.datasource.hive_sync.table=%s";
@Test
public void testDemo() throws Exception {
setupDemo();
// batch 1
ingestFirstBatchAndHiveSync();
testHiveAfterFirstBatch();
testSparkSQLAfterFirstBatch();
// batch 2
ingestSecondBatchAndHiveSync();
testHiveAfterSecondBatch();
testSparkSQLAfterSecondBatch();
testIncrementalHiveQuery();
testIncrementalSparkSQLQuery();
// compaction
scheduleAndRunCompaction();
testHiveAfterSecondBatchAfterCompaction();
testIncrementalHiveQueryAfterCompaction();
}
private void setupDemo() throws Exception {
List<String> cmds = new ImmutableList.Builder<String>()
.add("hdfs dfsadmin -safemode wait") // handle NN going into safe mode at times
.add("hdfs dfs -mkdir -p " + HDFS_DATA_DIR)
.add("hdfs dfs -copyFromLocal -f " + INPUT_BATCH_PATH1 + " " + HDFS_BATCH_PATH1)
.add("/bin/bash " + DEMO_CONTAINER_SCRIPT)
.build();
executeCommandStringsInDocker(ADHOC_1_CONTAINER, cmds);
}
private void ingestFirstBatchAndHiveSync() throws Exception {
List<String> cmds = new ImmutableList.Builder<String>()
.add("spark-submit --class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer "
+ HUDI_UTILITIES_BUNDLE + " --storage-type COPY_ON_WRITE "
+ " --source-class org.apache.hudi.utilities.sources.JsonDFSSource --source-ordering-field ts "
+ " --target-base-path " + COW_BASE_PATH + " --target-table " + COW_TABLE_NAME
+ " --props /var/demo/config/dfs-source.properties "
+ " --schemaprovider-class org.apache.hudi.utilities.schema.FilebasedSchemaProvider "
+ String.format(HIVE_SYNC_CMD_FMT, "dt", COW_TABLE_NAME))
.add("spark-submit --class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer "
+ HUDI_UTILITIES_BUNDLE + " --storage-type MERGE_ON_READ "
+ " --source-class org.apache.hudi.utilities.sources.JsonDFSSource --source-ordering-field ts "
+ " --target-base-path " + MOR_BASE_PATH + " --target-table " + MOR_TABLE_NAME
+ " --props /var/demo/config/dfs-source.properties "
+ " --schemaprovider-class org.apache.hudi.utilities.schema.FilebasedSchemaProvider "
+ " --disable-compaction "
+ String.format(HIVE_SYNC_CMD_FMT, "dt", MOR_TABLE_NAME))
.build();
executeCommandStringsInDocker(ADHOC_1_CONTAINER, cmds);
}
private void testHiveAfterFirstBatch() throws Exception {
Pair<String, String> stdOutErrPair = executeHiveCommandFile(HIVE_TBLCHECK_COMMANDS);
assertStdOutContains(stdOutErrPair, "| stock_ticks_cow |");
assertStdOutContains(stdOutErrPair, "| stock_ticks_mor |");
assertStdOutContains(stdOutErrPair, "| stock_ticks_mor_rt |");
assertStdOutContains(stdOutErrPair,
"| partition |\n"
+ "+----------------+\n"
+ "| dt=2018-08-31 |\n"
+ "+----------------+\n", 3);
stdOutErrPair = executeHiveCommandFile(HIVE_BATCH1_COMMANDS);
assertStdOutContains(stdOutErrPair, "| symbol | _c1 |\n"
+ "+---------+----------------------+\n"
+ "| GOOG | 2018-08-31 10:29:00 |\n", 3);
assertStdOutContains(stdOutErrPair,
"| symbol | ts | volume | open | close |\n"
+ "+---------+----------------------+---------+------------+-----------+\n"
+ "| GOOG | 2018-08-31 09:59:00 | 6330 | 1230.5 | 1230.02 |\n"
+ "| GOOG | 2018-08-31 10:29:00 | 3391 | 1230.1899 | 1230.085 |\n", 3);
}
private void testSparkSQLAfterFirstBatch() throws Exception {
Pair<String, String> stdOutErrPair = executeSparkSQLCommand(SPARKSQL_BATCH1_COMMANDS, true);
assertStdOutContains(stdOutErrPair,
"|default |stock_ticks_cow |false |\n"
+ "|default |stock_ticks_mor |false |\n"
+ "|default |stock_ticks_mor_rt |false |");
assertStdOutContains(stdOutErrPair,
"+------+-------------------+\n"
+ "|GOOG |2018-08-31 10:29:00|\n"
+ "+------+-------------------+", 3);
assertStdOutContains(stdOutErrPair, "|GOOG |2018-08-31 09:59:00|6330 |1230.5 |1230.02 |", 3);
assertStdOutContains(stdOutErrPair, "|GOOG |2018-08-31 10:29:00|3391 |1230.1899|1230.085|", 3);
}
private void ingestSecondBatchAndHiveSync() throws Exception {
List<String> cmds = new ImmutableList.Builder<String>()
.add("hdfs dfs -copyFromLocal -f " + INPUT_BATCH_PATH2 + " " + HDFS_BATCH_PATH2)
.add("spark-submit --class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer "
+ HUDI_UTILITIES_BUNDLE + " --storage-type COPY_ON_WRITE "
+ " --source-class org.apache.hudi.utilities.sources.JsonDFSSource --source-ordering-field ts "
+ " --target-base-path " + COW_BASE_PATH + " --target-table " + COW_TABLE_NAME
+ " --props /var/demo/config/dfs-source.properties "
+ " --schemaprovider-class org.apache.hudi.utilities.schema.FilebasedSchemaProvider "
+ String.format(HIVE_SYNC_CMD_FMT, "dt", COW_TABLE_NAME))
.add("spark-submit --class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer "
+ HUDI_UTILITIES_BUNDLE + " --storage-type MERGE_ON_READ "
+ " --source-class org.apache.hudi.utilities.sources.JsonDFSSource --source-ordering-field ts "
+ " --target-base-path " + MOR_BASE_PATH + " --target-table " + MOR_TABLE_NAME
+ " --props /var/demo/config/dfs-source.properties "
+ " --schemaprovider-class org.apache.hudi.utilities.schema.FilebasedSchemaProvider "
+ " --disable-compaction "
+ String.format(HIVE_SYNC_CMD_FMT, "dt", MOR_TABLE_NAME))
.build();
executeCommandStringsInDocker(ADHOC_1_CONTAINER, cmds);
}
private void testHiveAfterSecondBatch() throws Exception {
Pair<String, String> stdOutErrPair = executeHiveCommandFile(HIVE_BATCH1_COMMANDS);
assertStdOutContains(stdOutErrPair,
"| symbol | _c1 |\n"
+ "+---------+----------------------+\n"
+ "| GOOG | 2018-08-31 10:29:00 |\n");
assertStdOutContains(stdOutErrPair,
"| symbol | _c1 |\n"
+ "+---------+----------------------+\n"
+ "| GOOG | 2018-08-31 10:59:00 |\n", 2);
assertStdOutContains(stdOutErrPair,
"| symbol | ts | volume | open | close |\n"
+ "+---------+----------------------+---------+------------+-----------+\n"
+ "| GOOG | 2018-08-31 09:59:00 | 6330 | 1230.5 | 1230.02 |\n"
+ "| GOOG | 2018-08-31 10:29:00 | 3391 | 1230.1899 | 1230.085 |\n");
assertStdOutContains(stdOutErrPair,
"| symbol | ts | volume | open | close |\n"
+ "+---------+----------------------+---------+------------+-----------+\n"
+ "| GOOG | 2018-08-31 09:59:00 | 6330 | 1230.5 | 1230.02 |\n"
+ "| GOOG | 2018-08-31 10:59:00 | 9021 | 1227.1993 | 1227.215 |\n", 2);
}
private void testHiveAfterSecondBatchAfterCompaction() throws Exception {
Pair<String, String> stdOutErrPair = executeHiveCommandFile(HIVE_BATCH2_COMMANDS);
assertStdOutContains(stdOutErrPair,
"| symbol | _c1 |\n"
+ "+---------+----------------------+\n"
+ "| GOOG | 2018-08-31 10:59:00 |", 2);
assertStdOutContains(stdOutErrPair, "| symbol | ts | volume | open | close |\n"
+ "+---------+----------------------+---------+------------+-----------+\n"
+ "| GOOG | 2018-08-31 09:59:00 | 6330 | 1230.5 | 1230.02 |\n"
+ "| GOOG | 2018-08-31 10:59:00 | 9021 | 1227.1993 | 1227.215 |", 2);
}
private void testSparkSQLAfterSecondBatch() throws Exception {
Pair<String, String> stdOutErrPair = executeSparkSQLCommand(SPARKSQL_BATCH2_COMMANDS, true);
assertStdOutContains(stdOutErrPair,
"+------+-------------------+\n"
+ "|GOOG |2018-08-31 10:59:00|\n"
+ "+------+-------------------+", 2);
assertStdOutContains(stdOutErrPair, "|GOOG |2018-08-31 09:59:00|6330 |1230.5 |1230.02 |", 3);
assertStdOutContains(stdOutErrPair, "|GOOG |2018-08-31 10:59:00|9021 |1227.1993|1227.215|", 2);
assertStdOutContains(stdOutErrPair,
"+------+-------------------+\n"
+ "|GOOG |2018-08-31 10:29:00|\n"
+ "+------+-------------------+");
assertStdOutContains(stdOutErrPair, "|GOOG |2018-08-31 10:29:00|3391 |1230.1899|1230.085|");
}
private void testIncrementalHiveQuery() throws Exception {
String minCommitTime = executeCommandStringInDocker(ADHOC_2_CONTAINER, MIN_COMMIT_TIME_SCRIPT, true)
.getStdout().toString();
Pair<String, String> stdOutErrPair = executeHiveCommandFile(HIVE_INCREMENTAL_COMMANDS,
"min.commit.time=" + minCommitTime +"`");
assertStdOutContains(stdOutErrPair, "| GOOG | 2018-08-31 10:59:00 | 9021 | 1227.1993 | 1227.215 |");
}
private void testIncrementalHiveQueryAfterCompaction() throws Exception {
String minCommitTime = executeCommandStringInDocker(ADHOC_2_CONTAINER, MIN_COMMIT_TIME_SCRIPT, true)
.getStdout().toString();
Pair<String, String> stdOutErrPair = executeHiveCommandFile(HIVE_INCREMENTAL_COMMANDS,
"min.commit.time=" + minCommitTime +"`");
assertStdOutContains(stdOutErrPair,
"| symbol | ts | volume | open | close |\n"
+ "+---------+----------------------+---------+------------+-----------+\n"
+ "| GOOG | 2018-08-31 10:59:00 | 9021 | 1227.1993 | 1227.215 |");
}
private void testIncrementalSparkSQLQuery() throws Exception {
Pair<String, String> stdOutErrPair = executeSparkSQLCommand(SPARKSQL_INCREMENTAL_COMMANDS, true);
assertStdOutContains(stdOutErrPair, "|GOOG |2018-08-31 10:59:00|9021 |1227.1993|1227.215|");
assertStdOutContains(stdOutErrPair,
"|default |stock_ticks_cow |false |\n"
+ "|default |stock_ticks_derived_mor |false |\n"
+ "|default |stock_ticks_derived_mor_rt|false |\n"
+ "|default |stock_ticks_mor |false |\n"
+ "|default |stock_ticks_mor_rt |false |\n"
+ "| |stock_ticks_cow_incr |true |");
assertStdOutContains(stdOutErrPair,
"|count(1)|\n"
+ "+--------+\n"
+ "|99 |", 2);
}
private void scheduleAndRunCompaction() throws Exception {
executeCommandStringInDocker(ADHOC_1_CONTAINER, HUDI_CLI_TOOL + " --cmdfile " + COMPACTION_COMMANDS, true);
}
}

View File

@@ -18,7 +18,7 @@
package org.apache.hudi.integ;
import java.util.Arrays;
import org.apache.hudi.common.util.collection.Pair;
import org.junit.Assert;
import org.junit.Test;
@@ -33,17 +33,6 @@ public class ITTestHoodieSanity extends ITTestBase {
NON_PARTITIONED,
}
@Test
public void testRunEcho() throws Exception {
String[] cmd = new String[]{"echo", "Happy Testing"};
TestExecStartResultCallback callback = executeCommandInDocker(ADHOC_1_CONTAINER,
cmd, true);
String stdout = callback.getStdout().toString();
String stderr = callback.getStderr().toString();
LOG.info("Got output for (" + Arrays.toString(cmd) + ") :" + stdout);
LOG.info("Got error output for (" + Arrays.toString(cmd) + ") :" + stderr);
}
@Test
/**
* A basic integration test that runs HoodieJavaApp to create a sample COW Hoodie with single partition key
@@ -53,6 +42,7 @@ public class ITTestHoodieSanity extends ITTestBase {
public void testRunHoodieJavaAppOnSinglePartitionKeyCOWTable() throws Exception {
String hiveTableName = "docker_hoodie_single_partition_key_cow_test";
testRunHoodieJavaAppOnCOWTable(hiveTableName, PartitionType.SINGLE_KEY_PARTITIONED);
executeHiveCommand("drop table if exists " + hiveTableName);
}
@Test
@@ -64,6 +54,7 @@ public class ITTestHoodieSanity extends ITTestBase {
public void testRunHoodieJavaAppOnMultiPartitionKeysCOWTable() throws Exception {
String hiveTableName = "docker_hoodie_multi_partition_key_cow_test";
testRunHoodieJavaAppOnCOWTable(hiveTableName, PartitionType.MULTI_KEYS_PARTITIONED);
executeHiveCommand("drop table if exists " + hiveTableName);
}
@Test
@@ -75,6 +66,7 @@ public class ITTestHoodieSanity extends ITTestBase {
public void testRunHoodieJavaAppOnNonPartitionedCOWTable() throws Exception {
String hiveTableName = "docker_hoodie_non_partition_key_cow_test";
testRunHoodieJavaAppOnCOWTable(hiveTableName, PartitionType.NON_PARTITIONED);
executeHiveCommand("drop table if exists " + hiveTableName);
}
/**
@@ -89,109 +81,54 @@ public class ITTestHoodieSanity extends ITTestBase {
String hdfsUrl = "hdfs://namenode" + hdfsPath;
// Drop Table if it exists
{
String[] hiveDropCmd = getHiveConsoleCommand("drop table if exists " + hiveTableName);
executeCommandInDocker(HIVESERVER, hiveDropCmd, true);
String hiveDropCmd = "drop table if exists " + hiveTableName;
try {
executeHiveCommand(hiveDropCmd);
} catch (AssertionError ex) {
// In travis, sometimes, the hivemetastore is not ready even though we wait for the port to be up
// Workaround to sleep for 5 secs and retry
Thread.sleep(5000);
executeHiveCommand(hiveDropCmd);
}
// Ensure table does not exist
{
String[] hiveTableCheck = getHiveConsoleCommand("show tables like '" + hiveTableName + "'");
TestExecStartResultCallback callback =
executeCommandInDocker(HIVESERVER, hiveTableCheck, true);
String stderr = callback.getStderr().toString();
String stdout = callback.getStdout().toString();
LOG.info("Got output for (" + Arrays.toString(hiveTableCheck) + ") :" + stdout);
LOG.info("Got error output for (" + Arrays.toString(hiveTableCheck) + ") :" + stderr);
Assert.assertTrue("Result :" + callback.getStdout().toString(), stdout.trim().isEmpty());
}
Pair<String, String> stdOutErr = executeHiveCommand("show tables like '" + hiveTableName + "'");
Assert.assertTrue("Dropped table " + hiveTableName + " exists!", stdOutErr.getLeft().isEmpty());
// Run Hoodie Java App
{
String[] cmd = null;
if (partitionType == PartitionType.SINGLE_KEY_PARTITIONED) {
cmd = new String[]{
HOODIE_JAVA_APP,
"--hive-sync",
"--table-path", hdfsUrl,
"--hive-url", HIVE_SERVER_JDBC_URL,
"--hive-table", hiveTableName
};
} else if (partitionType == PartitionType.MULTI_KEYS_PARTITIONED) {
cmd = new String[]{
HOODIE_JAVA_APP,
"--hive-sync",
"--table-path", hdfsUrl,
"--hive-url", HIVE_SERVER_JDBC_URL,
"--use-multi-partition-keys",
"--hive-table", hiveTableName
};
} else {
cmd = new String[]{
HOODIE_JAVA_APP,
"--hive-sync",
"--table-path", hdfsUrl,
"--hive-url", HIVE_SERVER_JDBC_URL,
"--non-partitioned",
"--hive-table", hiveTableName
};
}
TestExecStartResultCallback callback = executeCommandInDocker(ADHOC_1_CONTAINER,
cmd, true);
String stdout = callback.getStdout().toString().trim();
String stderr = callback.getStderr().toString().trim();
LOG.info("Got output for (" + Arrays.toString(cmd) + ") :" + stdout);
LOG.info("Got error output for (" + Arrays.toString(cmd) + ") :" + stderr);
String cmd;
if (partitionType == PartitionType.SINGLE_KEY_PARTITIONED) {
cmd = HOODIE_JAVA_APP + " --hive-sync --table-path " + hdfsUrl
+ " --hive-url " + HIVE_SERVER_JDBC_URL + " --hive-table " + hiveTableName;
} else if (partitionType == PartitionType.MULTI_KEYS_PARTITIONED) {
cmd = HOODIE_JAVA_APP + " --hive-sync --table-path " + hdfsUrl
+ " --hive-url " + HIVE_SERVER_JDBC_URL + " --hive-table " + hiveTableName
+ " --use-multi-partition-keys";
} else {
cmd = HOODIE_JAVA_APP + " --hive-sync --table-path " + hdfsUrl
+ " --hive-url " + HIVE_SERVER_JDBC_URL + " --hive-table " + hiveTableName
+ " --non-partitioned";
}
executeCommandStringInDocker(ADHOC_1_CONTAINER, cmd, true);
// Ensure table does exist
{
String[] hiveTableCheck = getHiveConsoleCommand("show tables like '" + hiveTableName + "'");
TestExecStartResultCallback callback =
executeCommandInDocker(HIVESERVER, hiveTableCheck, true);
String stderr = callback.getStderr().toString().trim();
String stdout = callback.getStdout().toString().trim();
LOG.info("Got output for (" + Arrays.toString(hiveTableCheck) + ") : (" + stdout + ")");
LOG.info("Got error output for (" + Arrays.toString(hiveTableCheck) + ") : (" + stderr + ")");
Assert.assertEquals("Table exists", hiveTableName, stdout);
}
stdOutErr = executeHiveCommand("show tables like '" + hiveTableName + "'");
Assert.assertEquals("Table exists", hiveTableName, stdOutErr.getLeft());
// Ensure row count is 100 (without duplicates)
{
String[] hiveTableCheck = getHiveConsoleCommand("select count(1) from " + hiveTableName);
TestExecStartResultCallback callback =
executeCommandInDocker(ADHOC_1_CONTAINER, hiveTableCheck, true);
String stderr = callback.getStderr().toString().trim();
String stdout = callback.getStdout().toString().trim();
LOG.info("Got output for (" + Arrays.toString(hiveTableCheck) + ") : (" + stdout + ")");
LOG.info("Got error output for (" + Arrays.toString(hiveTableCheck) + ") : (" + stderr + ")");
Assert.assertEquals("Expecting 100 rows to be present in the new table", 100,
Integer.parseInt(stdout.trim()));
}
stdOutErr = executeHiveCommand("select count(1) from " + hiveTableName);
Assert.assertEquals("Expecting 100 rows to be present in the new table", 100,
Integer.parseInt(stdOutErr.getLeft().trim()));
// Make the HDFS dataset non-hoodie and run the same query
// Checks for interoperability with non-hoodie tables
{
// Delete Hoodie directory to make it non-hoodie dataset
String[] cmd = new String[]{
"hadoop", "fs", "-rm", "-r", hdfsPath + "/.hoodie"
};
TestExecStartResultCallback callback =
executeCommandInDocker(ADHOC_1_CONTAINER, cmd, true);
String stderr = callback.getStderr().toString().trim();
String stdout = callback.getStdout().toString().trim();
LOG.info("Got output for (" + Arrays.toString(cmd) + ") : (" + stdout + ")");
LOG.info("Got error output for (" + Arrays.toString(cmd) + ") : (" + stderr + ")");
// Run the count query again. Without Hoodie, all versions are included. So we get a wrong count
String[] hiveTableCheck = getHiveConsoleCommand("select count(1) from " + hiveTableName);
callback = executeCommandInDocker(ADHOC_1_CONTAINER, hiveTableCheck, true);
stderr = callback.getStderr().toString().trim();
stdout = callback.getStdout().toString().trim();
LOG.info("Got output for (" + Arrays.toString(hiveTableCheck) + ") : (" + stdout + ")");
LOG.info("Got error output for (" + Arrays.toString(hiveTableCheck) + ") : (" + stderr + ")");
Assert.assertEquals("Expecting 200 rows to be present in the new table", 200,
Integer.parseInt(stdout.trim()));
}
// Delete Hoodie directory to make it non-hoodie dataset
executeCommandStringInDocker(ADHOC_1_CONTAINER, "hdfs dfs -rm -r " + hdfsPath + "/.hoodie", true);
// Run the count query again. Without Hoodie, all versions are included. So we get a wrong count
stdOutErr = executeHiveCommand("select count(1) from " + hiveTableName);
Assert.assertEquals("Expecting 200 rows to be present in the new table", 200,
Integer.parseInt(stdOutErr.getLeft().trim()));
}
}