diff --git a/hudi-integ-test/src/test/java/org/apache/hudi/integ/ITTestBase.java b/hudi-integ-test/src/test/java/org/apache/hudi/integ/ITTestBase.java index 97ec316a1..ccdf4679b 100644 --- a/hudi-integ-test/src/test/java/org/apache/hudi/integ/ITTestBase.java +++ b/hudi-integ-test/src/test/java/org/apache/hudi/integ/ITTestBase.java @@ -18,9 +18,9 @@ package org.apache.hudi.integ; -import java.util.concurrent.TimeoutException; import org.apache.hudi.common.util.FileIOUtils; import org.apache.hudi.common.util.collection.Pair; +import org.apache.hudi.exception.HoodieException; import com.github.dockerjava.api.DockerClient; import com.github.dockerjava.api.command.DockerCmdExecFactory; @@ -42,6 +42,7 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.List; import java.util.Map; +import java.util.concurrent.TimeoutException; import java.util.stream.Collectors; import static java.util.concurrent.TimeUnit.SECONDS; @@ -130,10 +131,14 @@ public abstract class ITTestBase { DockerClientConfig config = DefaultDockerClientConfig.createDefaultConfigBuilder().withDockerHost(dockerHost).build(); // using jaxrs/jersey implementation here (netty impl is also available) - DockerCmdExecFactory dockerCmdExecFactory = new JerseyDockerCmdExecFactory().withConnectTimeout(1000) - .withMaxTotalConnections(100).withMaxPerRouteConnections(10); + DockerCmdExecFactory dockerCmdExecFactory = new JerseyDockerCmdExecFactory().withConnectTimeout(10000) + .withMaxTotalConnections(100).withMaxPerRouteConnections(50); dockerClient = DockerClientBuilder.getInstance(config).withDockerCmdExecFactory(dockerCmdExecFactory).build(); - await().atMost(60, SECONDS).until(this::servicesUp); + LOG.info("Start waiting for all the containers and services to be ready"); + long currTs = System.currentTimeMillis(); + await().atMost(300, SECONDS).until(this::servicesUp); + LOG.info(String.format("Waiting for all the containers and services finishes in %d ms", + System.currentTimeMillis() - currTs)); } private boolean servicesUp() { @@ -144,8 +149,29 @@ public abstract class ITTestBase { return false; } } - runningContainers = containerList.stream().map(c -> Pair.of(c.getNames()[0], c)) - .collect(Collectors.toMap(Pair::getLeft, Pair::getRight)); + + if (runningContainers == null) { + runningContainers = containerList.stream().map(c -> Pair.of(c.getNames()[0], c)) + .collect(Collectors.toMap(Pair::getLeft, Pair::getRight)); + } + + return checkHealth(ADHOC_1_CONTAINER, "namenode", 8020); + } + + private boolean checkHealth(String fromContainerName, String hostname, int port) { + try { + String command = String.format("nc -z -v %s %d", hostname, port); + TestExecStartResultCallback resultCallback = + executeCommandStringInDocker(fromContainerName, command, false, true); + String stderrString = resultCallback.getStderr().toString().trim(); + if (!stderrString.contains("open")) { + Thread.sleep(1000); + return false; + } + } catch (Exception e) { + throw new HoodieException(String.format("Exception thrown while checking health from %s for %s:%d", + fromContainerName, hostname, port), e); + } return true; } @@ -153,8 +179,13 @@ public abstract class ITTestBase { return str.replaceAll("[\\s]+", " "); } - private TestExecStartResultCallback executeCommandInDocker(String containerName, String[] command, - boolean expectedToSucceed) throws Exception { + private TestExecStartResultCallback executeCommandInDocker( + String containerName, String[] command, boolean expectedToSucceed) throws Exception { + return executeCommandInDocker(containerName, command, true, expectedToSucceed); + } + + private TestExecStartResultCallback executeCommandInDocker( + String containerName, String[] command, boolean checkIfSucceed, boolean expectedToSucceed) throws Exception { Container sparkWorkerContainer = runningContainers.get(containerName); ExecCreateCmd cmd = dockerClient.execCreateCmd(sparkWorkerContainer.getId()).withCmd(command).withAttachStdout(true) .withAttachStderr(true); @@ -171,11 +202,11 @@ public abstract class ITTestBase { if (!completed) { callback.getStderr().flush(); callback.getStdout().flush(); - LOG.error("\n\n ###### Timed Out Command : " + Arrays.asList(command)); + LOG.error("\n\n ###### Timed Out Command : " + Arrays.asList(command)); LOG.error("\n\n ###### Stderr of timed-out command #######\n" + callback.getStderr().toString()); LOG.error("\n\n ###### stdout of timed-out command #######\n" + callback.getStdout().toString()); - throw new TimeoutException("Command " + command + " has been running for more than 9 minutes. " - + "Killing and failing !!"); + throw new TimeoutException("Command " + command + " has been running for more than 9 minutes. " + + "Killing and failing !!"); } int exitCode = dockerClient.inspectExecCmd(createCmdResponse.getId()).exec().getExitCode(); LOG.info("Exit code for command : " + exitCode); @@ -184,10 +215,12 @@ public abstract class ITTestBase { } LOG.error("\n\n ###### Stderr #######\n" + callback.getStderr().toString()); - if (expectedToSucceed) { - assertEquals(0, exitCode, "Command (" + Arrays.toString(command) + ") expected to succeed. Exit (" + exitCode + ")"); - } else { - assertNotEquals(0, exitCode, "Command (" + Arrays.toString(command) + ") expected to fail. Exit (" + exitCode + ")"); + if (checkIfSucceed) { + if (expectedToSucceed) { + assertEquals(0, exitCode, "Command (" + Arrays.toString(command) + ") expected to succeed. Exit (" + exitCode + ")"); + } else { + assertNotEquals(0, exitCode, "Command (" + Arrays.toString(command) + ") expected to fail. Exit (" + exitCode + ")"); + } } cmd.close(); return callback; @@ -199,14 +232,20 @@ public abstract class ITTestBase { } } - protected TestExecStartResultCallback executeCommandStringInDocker(String containerName, String cmd, boolean expectedToSucceed) + protected TestExecStartResultCallback executeCommandStringInDocker( + String containerName, String cmd, boolean expectedToSucceed) throws Exception { + return executeCommandStringInDocker(containerName, cmd, true, expectedToSucceed); + } + + protected TestExecStartResultCallback executeCommandStringInDocker( + String containerName, String cmd, boolean checkIfSucceed, boolean expectedToSucceed) throws Exception { LOG.info("\n\n#################################################################################################"); LOG.info("Container : " + containerName + ", Running command :" + cmd); LOG.info("\n#################################################################################################"); String[] cmdSplits = singleSpace(cmd).split(" "); - return executeCommandInDocker(containerName, cmdSplits, expectedToSucceed); + return executeCommandInDocker(containerName, cmdSplits, checkIfSucceed, expectedToSucceed); } protected Pair executeHiveCommand(String hiveCommand) throws Exception {