[HUDI-312] Make docker hdfs cluster ephemeral. This is needed to fix flakiness in integration tests. Also, Fix DeltaStreamer hanging issue due to uncaught exception
This commit is contained in:
committed by
Balaji Varadarajan
parent
144ea4eedf
commit
a6390aefc4
@@ -21,8 +21,6 @@ services:
|
|||||||
image: apachehudi/hudi-hadoop_2.8.4-namenode:latest
|
image: apachehudi/hudi-hadoop_2.8.4-namenode:latest
|
||||||
hostname: namenode
|
hostname: namenode
|
||||||
container_name: namenode
|
container_name: namenode
|
||||||
volumes:
|
|
||||||
- /tmp/hadoop_name:/hadoop/dfs/name
|
|
||||||
environment:
|
environment:
|
||||||
- CLUSTER_NAME=hudi_hadoop284_hive232_spark231
|
- CLUSTER_NAME=hudi_hadoop284_hive232_spark231
|
||||||
ports:
|
ports:
|
||||||
@@ -57,8 +55,6 @@ services:
|
|||||||
retries: 3
|
retries: 3
|
||||||
depends_on:
|
depends_on:
|
||||||
- namenode
|
- namenode
|
||||||
volumes:
|
|
||||||
- /tmp/hadoop_data:/hadoop/dfs/data
|
|
||||||
|
|
||||||
historyserver:
|
historyserver:
|
||||||
image: apachehudi/hudi-hadoop_2.8.4-history:latest
|
image: apachehudi/hudi-hadoop_2.8.4-history:latest
|
||||||
|
|||||||
@@ -24,7 +24,7 @@ USER root
|
|||||||
ENV LANG C.UTF-8
|
ENV LANG C.UTF-8
|
||||||
|
|
||||||
ARG HADOOP_VERSION=2.8.4
|
ARG HADOOP_VERSION=2.8.4
|
||||||
ARG HADOOP_URL=https://www.apache.org/dist/hadoop/common/hadoop-${HADOOP_VERSION}/hadoop-${HADOOP_VERSION}.tar.gz
|
ARG HADOOP_URL=https://archive.apache.org/dist/hadoop/common/hadoop-${HADOOP_VERSION}/hadoop-${HADOOP_VERSION}.tar.gz
|
||||||
ENV HADOOP_VERSION ${HADOOP_VERSION}
|
ENV HADOOP_VERSION ${HADOOP_VERSION}
|
||||||
ENV HADOOP_URL ${HADOOP_URL}
|
ENV HADOOP_URL ${HADOOP_URL}
|
||||||
|
|
||||||
|
|||||||
@@ -18,16 +18,10 @@
|
|||||||
# limitations under the License.
|
# limitations under the License.
|
||||||
################################################################################
|
################################################################################
|
||||||
|
|
||||||
# Create host mount directory and copy
|
|
||||||
mkdir -p /tmp/hadoop_name
|
|
||||||
mkdir -p /tmp/hadoop_data
|
|
||||||
|
|
||||||
WS_ROOT=`dirname $PWD`
|
WS_ROOT=`dirname $PWD`
|
||||||
# restart cluster
|
# restart cluster
|
||||||
HUDI_WS=${WS_ROOT} docker-compose -f compose/docker-compose_hadoop284_hive233_spark231.yml down
|
HUDI_WS=${WS_ROOT} docker-compose -f compose/docker-compose_hadoop284_hive233_spark231.yml down
|
||||||
HUDI_WS=${WS_ROOT} docker-compose -f compose/docker-compose_hadoop284_hive233_spark231.yml pull
|
HUDI_WS=${WS_ROOT} docker-compose -f compose/docker-compose_hadoop284_hive233_spark231.yml pull
|
||||||
rm -rf /tmp/hadoop_data/*
|
|
||||||
rm -rf /tmp/hadoop_name/*
|
|
||||||
sleep 5
|
sleep 5
|
||||||
HUDI_WS=${WS_ROOT} docker-compose -f compose/docker-compose_hadoop284_hive233_spark231.yml up -d
|
HUDI_WS=${WS_ROOT} docker-compose -f compose/docker-compose_hadoop284_hive233_spark231.yml up -d
|
||||||
sleep 15
|
sleep 15
|
||||||
|
|||||||
@@ -88,8 +88,8 @@ public class ITTestHoodieDemo extends ITTestBase {
|
|||||||
}
|
}
|
||||||
|
|
||||||
private void setupDemo() throws Exception {
|
private void setupDemo() throws Exception {
|
||||||
List<String> cmds = new ImmutableList.Builder<String>().add("hdfs dfsadmin -safemode wait") // handle NN going into
|
List<String> cmds = new ImmutableList.Builder<String>()
|
||||||
// safe mode at times
|
.add("hdfs dfsadmin -safemode wait") // handle NN going into safe mode at times
|
||||||
.add("hdfs dfs -mkdir -p " + HDFS_DATA_DIR)
|
.add("hdfs dfs -mkdir -p " + HDFS_DATA_DIR)
|
||||||
.add("hdfs dfs -copyFromLocal -f " + INPUT_BATCH_PATH1 + " " + HDFS_BATCH_PATH1)
|
.add("hdfs dfs -copyFromLocal -f " + INPUT_BATCH_PATH1 + " " + HDFS_BATCH_PATH1)
|
||||||
.add("/bin/bash " + DEMO_CONTAINER_SCRIPT).build();
|
.add("/bin/bash " + DEMO_CONTAINER_SCRIPT).build();
|
||||||
|
|||||||
@@ -30,6 +30,7 @@ import java.util.HashMap;
|
|||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Objects;
|
import java.util.Objects;
|
||||||
import java.util.function.Function;
|
import java.util.function.Function;
|
||||||
|
import java.util.stream.Collectors;
|
||||||
import org.apache.avro.Schema;
|
import org.apache.avro.Schema;
|
||||||
import org.apache.avro.generic.GenericRecord;
|
import org.apache.avro.generic.GenericRecord;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
@@ -251,7 +252,9 @@ public class DeltaSync implements Serializable {
|
|||||||
} else {
|
} else {
|
||||||
throw new HoodieDeltaStreamerException(
|
throw new HoodieDeltaStreamerException(
|
||||||
"Unable to find previous checkpoint. Please double check if this table "
|
"Unable to find previous checkpoint. Please double check if this table "
|
||||||
+ "was indeed built via delta streamer ");
|
+ "was indeed built via delta streamer. Last Commit :" + lastCommit + ", Instants :"
|
||||||
|
+ commitTimelineOpt.get().getInstants().collect(Collectors.toList()) + ", CommitMetadata="
|
||||||
|
+ commitMetadata.toJsonString());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
|
|||||||
@@ -117,11 +117,17 @@ public class HoodieDeltaStreamer implements Serializable {
|
|||||||
log.info("Delta Sync shutting down");
|
log.info("Delta Sync shutting down");
|
||||||
} else {
|
} else {
|
||||||
log.info("Delta Streamer running only single round");
|
log.info("Delta Streamer running only single round");
|
||||||
|
try {
|
||||||
deltaSyncService.getDeltaSync().syncOnce();
|
deltaSyncService.getDeltaSync().syncOnce();
|
||||||
|
} catch (Exception ex) {
|
||||||
|
log.error("Got error running delta sync once. Shutting down", ex);
|
||||||
|
throw ex;
|
||||||
|
} finally {
|
||||||
deltaSyncService.close();
|
deltaSyncService.close();
|
||||||
log.info("Shut down deltastreamer");
|
log.info("Shut down deltastreamer");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
private boolean onDeltaSyncShutdown(boolean error) {
|
private boolean onDeltaSyncShutdown(boolean error) {
|
||||||
log.info("DeltaSync shutdown. Closing write client. Error?" + error);
|
log.info("DeltaSync shutdown. Closing write client. Error?" + error);
|
||||||
|
|||||||
Reference in New Issue
Block a user