[HUDI-3025] Add additional wait time for namenode availability during IT tests initiatialization (#4328)
- Co-authored-by: Sivabalan Narayanan <n.siva.b@gmail.com>
This commit is contained in:
@@ -18,9 +18,9 @@
|
|||||||
|
|
||||||
package org.apache.hudi.integ;
|
package org.apache.hudi.integ;
|
||||||
|
|
||||||
import java.util.concurrent.TimeoutException;
|
|
||||||
import org.apache.hudi.common.util.FileIOUtils;
|
import org.apache.hudi.common.util.FileIOUtils;
|
||||||
import org.apache.hudi.common.util.collection.Pair;
|
import org.apache.hudi.common.util.collection.Pair;
|
||||||
|
import org.apache.hudi.exception.HoodieException;
|
||||||
|
|
||||||
import com.github.dockerjava.api.DockerClient;
|
import com.github.dockerjava.api.DockerClient;
|
||||||
import com.github.dockerjava.api.command.DockerCmdExecFactory;
|
import com.github.dockerjava.api.command.DockerCmdExecFactory;
|
||||||
@@ -42,6 +42,7 @@ import java.util.ArrayList;
|
|||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
import java.util.concurrent.TimeoutException;
|
||||||
import java.util.stream.Collectors;
|
import java.util.stream.Collectors;
|
||||||
|
|
||||||
import static java.util.concurrent.TimeUnit.SECONDS;
|
import static java.util.concurrent.TimeUnit.SECONDS;
|
||||||
@@ -130,10 +131,14 @@ public abstract class ITTestBase {
|
|||||||
DockerClientConfig config =
|
DockerClientConfig config =
|
||||||
DefaultDockerClientConfig.createDefaultConfigBuilder().withDockerHost(dockerHost).build();
|
DefaultDockerClientConfig.createDefaultConfigBuilder().withDockerHost(dockerHost).build();
|
||||||
// using jaxrs/jersey implementation here (netty impl is also available)
|
// using jaxrs/jersey implementation here (netty impl is also available)
|
||||||
DockerCmdExecFactory dockerCmdExecFactory = new JerseyDockerCmdExecFactory().withConnectTimeout(1000)
|
DockerCmdExecFactory dockerCmdExecFactory = new JerseyDockerCmdExecFactory().withConnectTimeout(10000)
|
||||||
.withMaxTotalConnections(100).withMaxPerRouteConnections(10);
|
.withMaxTotalConnections(100).withMaxPerRouteConnections(50);
|
||||||
dockerClient = DockerClientBuilder.getInstance(config).withDockerCmdExecFactory(dockerCmdExecFactory).build();
|
dockerClient = DockerClientBuilder.getInstance(config).withDockerCmdExecFactory(dockerCmdExecFactory).build();
|
||||||
await().atMost(60, SECONDS).until(this::servicesUp);
|
LOG.info("Start waiting for all the containers and services to be ready");
|
||||||
|
long currTs = System.currentTimeMillis();
|
||||||
|
await().atMost(300, SECONDS).until(this::servicesUp);
|
||||||
|
LOG.info(String.format("Waiting for all the containers and services finishes in %d ms",
|
||||||
|
System.currentTimeMillis() - currTs));
|
||||||
}
|
}
|
||||||
|
|
||||||
private boolean servicesUp() {
|
private boolean servicesUp() {
|
||||||
@@ -144,8 +149,29 @@ public abstract class ITTestBase {
|
|||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
runningContainers = containerList.stream().map(c -> Pair.of(c.getNames()[0], c))
|
|
||||||
.collect(Collectors.toMap(Pair::getLeft, Pair::getRight));
|
if (runningContainers == null) {
|
||||||
|
runningContainers = containerList.stream().map(c -> Pair.of(c.getNames()[0], c))
|
||||||
|
.collect(Collectors.toMap(Pair::getLeft, Pair::getRight));
|
||||||
|
}
|
||||||
|
|
||||||
|
return checkHealth(ADHOC_1_CONTAINER, "namenode", 8020);
|
||||||
|
}
|
||||||
|
|
||||||
|
private boolean checkHealth(String fromContainerName, String hostname, int port) {
|
||||||
|
try {
|
||||||
|
String command = String.format("nc -z -v %s %d", hostname, port);
|
||||||
|
TestExecStartResultCallback resultCallback =
|
||||||
|
executeCommandStringInDocker(fromContainerName, command, false, true);
|
||||||
|
String stderrString = resultCallback.getStderr().toString().trim();
|
||||||
|
if (!stderrString.contains("open")) {
|
||||||
|
Thread.sleep(1000);
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
} catch (Exception e) {
|
||||||
|
throw new HoodieException(String.format("Exception thrown while checking health from %s for %s:%d",
|
||||||
|
fromContainerName, hostname, port), e);
|
||||||
|
}
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -153,8 +179,13 @@ public abstract class ITTestBase {
|
|||||||
return str.replaceAll("[\\s]+", " ");
|
return str.replaceAll("[\\s]+", " ");
|
||||||
}
|
}
|
||||||
|
|
||||||
private TestExecStartResultCallback executeCommandInDocker(String containerName, String[] command,
|
private TestExecStartResultCallback executeCommandInDocker(
|
||||||
boolean expectedToSucceed) throws Exception {
|
String containerName, String[] command, boolean expectedToSucceed) throws Exception {
|
||||||
|
return executeCommandInDocker(containerName, command, true, expectedToSucceed);
|
||||||
|
}
|
||||||
|
|
||||||
|
private TestExecStartResultCallback executeCommandInDocker(
|
||||||
|
String containerName, String[] command, boolean checkIfSucceed, boolean expectedToSucceed) throws Exception {
|
||||||
Container sparkWorkerContainer = runningContainers.get(containerName);
|
Container sparkWorkerContainer = runningContainers.get(containerName);
|
||||||
ExecCreateCmd cmd = dockerClient.execCreateCmd(sparkWorkerContainer.getId()).withCmd(command).withAttachStdout(true)
|
ExecCreateCmd cmd = dockerClient.execCreateCmd(sparkWorkerContainer.getId()).withCmd(command).withAttachStdout(true)
|
||||||
.withAttachStderr(true);
|
.withAttachStderr(true);
|
||||||
@@ -171,11 +202,11 @@ public abstract class ITTestBase {
|
|||||||
if (!completed) {
|
if (!completed) {
|
||||||
callback.getStderr().flush();
|
callback.getStderr().flush();
|
||||||
callback.getStdout().flush();
|
callback.getStdout().flush();
|
||||||
LOG.error("\n\n ###### Timed Out Command : " + Arrays.asList(command));
|
LOG.error("\n\n ###### Timed Out Command : " + Arrays.asList(command));
|
||||||
LOG.error("\n\n ###### Stderr of timed-out command #######\n" + callback.getStderr().toString());
|
LOG.error("\n\n ###### Stderr of timed-out command #######\n" + callback.getStderr().toString());
|
||||||
LOG.error("\n\n ###### stdout of timed-out command #######\n" + callback.getStdout().toString());
|
LOG.error("\n\n ###### stdout of timed-out command #######\n" + callback.getStdout().toString());
|
||||||
throw new TimeoutException("Command " + command + " has been running for more than 9 minutes. "
|
throw new TimeoutException("Command " + command + " has been running for more than 9 minutes. "
|
||||||
+ "Killing and failing !!");
|
+ "Killing and failing !!");
|
||||||
}
|
}
|
||||||
int exitCode = dockerClient.inspectExecCmd(createCmdResponse.getId()).exec().getExitCode();
|
int exitCode = dockerClient.inspectExecCmd(createCmdResponse.getId()).exec().getExitCode();
|
||||||
LOG.info("Exit code for command : " + exitCode);
|
LOG.info("Exit code for command : " + exitCode);
|
||||||
@@ -184,10 +215,12 @@ public abstract class ITTestBase {
|
|||||||
}
|
}
|
||||||
LOG.error("\n\n ###### Stderr #######\n" + callback.getStderr().toString());
|
LOG.error("\n\n ###### Stderr #######\n" + callback.getStderr().toString());
|
||||||
|
|
||||||
if (expectedToSucceed) {
|
if (checkIfSucceed) {
|
||||||
assertEquals(0, exitCode, "Command (" + Arrays.toString(command) + ") expected to succeed. Exit (" + exitCode + ")");
|
if (expectedToSucceed) {
|
||||||
} else {
|
assertEquals(0, exitCode, "Command (" + Arrays.toString(command) + ") expected to succeed. Exit (" + exitCode + ")");
|
||||||
assertNotEquals(0, exitCode, "Command (" + Arrays.toString(command) + ") expected to fail. Exit (" + exitCode + ")");
|
} else {
|
||||||
|
assertNotEquals(0, exitCode, "Command (" + Arrays.toString(command) + ") expected to fail. Exit (" + exitCode + ")");
|
||||||
|
}
|
||||||
}
|
}
|
||||||
cmd.close();
|
cmd.close();
|
||||||
return callback;
|
return callback;
|
||||||
@@ -199,14 +232,20 @@ public abstract class ITTestBase {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
protected TestExecStartResultCallback executeCommandStringInDocker(String containerName, String cmd, boolean expectedToSucceed)
|
protected TestExecStartResultCallback executeCommandStringInDocker(
|
||||||
|
String containerName, String cmd, boolean expectedToSucceed) throws Exception {
|
||||||
|
return executeCommandStringInDocker(containerName, cmd, true, expectedToSucceed);
|
||||||
|
}
|
||||||
|
|
||||||
|
protected TestExecStartResultCallback executeCommandStringInDocker(
|
||||||
|
String containerName, String cmd, boolean checkIfSucceed, boolean expectedToSucceed)
|
||||||
throws Exception {
|
throws Exception {
|
||||||
LOG.info("\n\n#################################################################################################");
|
LOG.info("\n\n#################################################################################################");
|
||||||
LOG.info("Container : " + containerName + ", Running command :" + cmd);
|
LOG.info("Container : " + containerName + ", Running command :" + cmd);
|
||||||
LOG.info("\n#################################################################################################");
|
LOG.info("\n#################################################################################################");
|
||||||
|
|
||||||
String[] cmdSplits = singleSpace(cmd).split(" ");
|
String[] cmdSplits = singleSpace(cmd).split(" ");
|
||||||
return executeCommandInDocker(containerName, cmdSplits, expectedToSucceed);
|
return executeCommandInDocker(containerName, cmdSplits, checkIfSucceed, expectedToSucceed);
|
||||||
}
|
}
|
||||||
|
|
||||||
protected Pair<String, String> executeHiveCommand(String hiveCommand) throws Exception {
|
protected Pair<String, String> executeHiveCommand(String hiveCommand) throws Exception {
|
||||||
|
|||||||
Reference in New Issue
Block a user