refactor(yarn-query): 适配yarn配置改动
This commit is contained in:
@@ -49,6 +49,17 @@ public class YarnClusters {
|
|||||||
return clusters;
|
return clusters;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public Cluster getCluster(String name) {
|
||||||
|
return clusters.get(name);
|
||||||
|
}
|
||||||
|
|
||||||
|
public String getWebUrl(String name) {
|
||||||
|
if (clusters.containsKey(name)) {
|
||||||
|
return getCluster(name).getWebUrl();
|
||||||
|
}
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
public ImmutableList<String> getActiveClusters() {
|
public ImmutableList<String> getActiveClusters() {
|
||||||
return activeClusters;
|
return activeClusters;
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -1,41 +0,0 @@
|
|||||||
package com.lanyuanxiaoyao.service.yarn.configuration;
|
|
||||||
|
|
||||||
import java.util.Map;
|
|
||||||
import org.eclipse.collections.api.factory.Maps;
|
|
||||||
import org.eclipse.collections.api.map.ImmutableMap;
|
|
||||||
import org.springframework.boot.context.properties.ConfigurationProperties;
|
|
||||||
import org.springframework.stereotype.Component;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Yarn 配置
|
|
||||||
*
|
|
||||||
* @author lanyuanxiaoyao
|
|
||||||
* @date 2023-04-23
|
|
||||||
*/
|
|
||||||
@Component
|
|
||||||
@ConfigurationProperties("yarn")
|
|
||||||
public class YarnConfiguration {
|
|
||||||
private ImmutableMap<String, String> webUrls;
|
|
||||||
|
|
||||||
public ImmutableMap<String, String> getWebUrls() {
|
|
||||||
return webUrls;
|
|
||||||
}
|
|
||||||
|
|
||||||
public String getWebUrl(String cluster) {
|
|
||||||
if (!webUrls.containsKey(cluster)) {
|
|
||||||
throw new RuntimeException("Cluster not found");
|
|
||||||
}
|
|
||||||
return webUrls.get(cluster);
|
|
||||||
}
|
|
||||||
|
|
||||||
public void setWebUrls(Map<String, String> webUrls) {
|
|
||||||
this.webUrls = Maps.immutable.ofAll(webUrls);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public String toString() {
|
|
||||||
return "YarnConfiguration{" +
|
|
||||||
"clusters=" + webUrls +
|
|
||||||
'}';
|
|
||||||
}
|
|
||||||
}
|
|
||||||
@@ -4,8 +4,8 @@ import cn.hutool.core.util.URLUtil;
|
|||||||
import cn.hutool.http.HttpUtil;
|
import cn.hutool.http.HttpUtil;
|
||||||
import com.fasterxml.jackson.core.JsonProcessingException;
|
import com.fasterxml.jackson.core.JsonProcessingException;
|
||||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||||
|
import com.lanyuanxiaoyao.service.configuration.entity.yarn.YarnClusters;
|
||||||
import com.lanyuanxiaoyao.service.configuration.entity.yarn.YarnRootQueue;
|
import com.lanyuanxiaoyao.service.configuration.entity.yarn.YarnRootQueue;
|
||||||
import com.lanyuanxiaoyao.service.yarn.configuration.YarnConfiguration;
|
|
||||||
import com.lanyuanxiaoyao.service.yarn.response.ClusterInfoResponse;
|
import com.lanyuanxiaoyao.service.yarn.response.ClusterInfoResponse;
|
||||||
import com.lanyuanxiaoyao.service.yarn.service.ClusterService;
|
import com.lanyuanxiaoyao.service.yarn.service.ClusterService;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
@@ -28,10 +28,10 @@ public class ClusterServiceImpl implements ClusterService {
|
|||||||
private static final Logger logger = LoggerFactory.getLogger(ClusterServiceImpl.class);
|
private static final Logger logger = LoggerFactory.getLogger(ClusterServiceImpl.class);
|
||||||
|
|
||||||
private final ObjectMapper mapper;
|
private final ObjectMapper mapper;
|
||||||
private final YarnConfiguration yarnConfiguration;
|
private final YarnClusters yarnClusters;
|
||||||
|
|
||||||
public ClusterServiceImpl(YarnConfiguration yarnConfiguration, Jackson2ObjectMapperBuilder builder) {
|
public ClusterServiceImpl(YarnClusters yarnClusters, Jackson2ObjectMapperBuilder builder) {
|
||||||
this.yarnConfiguration = yarnConfiguration;
|
this.yarnClusters = yarnClusters;
|
||||||
|
|
||||||
this.mapper = builder.build();
|
this.mapper = builder.build();
|
||||||
}
|
}
|
||||||
@@ -40,7 +40,7 @@ public class ClusterServiceImpl implements ClusterService {
|
|||||||
@Retryable(Throwable.class)
|
@Retryable(Throwable.class)
|
||||||
@Override
|
@Override
|
||||||
public YarnRootQueue info(String cluster) throws JsonProcessingException {
|
public YarnRootQueue info(String cluster) throws JsonProcessingException {
|
||||||
String webUrl = yarnConfiguration.getWebUrl(cluster);
|
String webUrl = yarnClusters.getWebUrl(cluster);
|
||||||
String queryUrl = URLUtil.completeUrl(webUrl, "/ws/v1/cluster/scheduler");
|
String queryUrl = URLUtil.completeUrl(webUrl, "/ws/v1/cluster/scheduler");
|
||||||
String body = HttpUtil.createGet(queryUrl).setMaxRedirectCount(10).execute().body();
|
String body = HttpUtil.createGet(queryUrl).setMaxRedirectCount(10).execute().body();
|
||||||
return mapper.readValue(body, ClusterInfoResponse.class).getScheduler().getSchedulerInfo()
|
return mapper.readValue(body, ClusterInfoResponse.class).getScheduler().getSchedulerInfo()
|
||||||
|
|||||||
@@ -8,7 +8,7 @@ import cn.hutool.http.HttpUtil;
|
|||||||
import com.fasterxml.jackson.core.JsonProcessingException;
|
import com.fasterxml.jackson.core.JsonProcessingException;
|
||||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||||
import com.lanyuanxiaoyao.service.configuration.entity.yarn.YarnApplication;
|
import com.lanyuanxiaoyao.service.configuration.entity.yarn.YarnApplication;
|
||||||
import com.lanyuanxiaoyao.service.yarn.configuration.YarnConfiguration;
|
import com.lanyuanxiaoyao.service.configuration.entity.yarn.YarnClusters;
|
||||||
import com.lanyuanxiaoyao.service.yarn.response.ApplicationDetailResponse;
|
import com.lanyuanxiaoyao.service.yarn.response.ApplicationDetailResponse;
|
||||||
import com.lanyuanxiaoyao.service.yarn.response.ApplicationsListResponse;
|
import com.lanyuanxiaoyao.service.yarn.response.ApplicationsListResponse;
|
||||||
import com.lanyuanxiaoyao.service.yarn.service.JobService;
|
import com.lanyuanxiaoyao.service.yarn.service.JobService;
|
||||||
@@ -42,11 +42,11 @@ public class JobServiceImpl implements JobService {
|
|||||||
YarnApplication.State.FAILED,
|
YarnApplication.State.FAILED,
|
||||||
YarnApplication.State.KILLED
|
YarnApplication.State.KILLED
|
||||||
);
|
);
|
||||||
|
private final YarnClusters yarnClusters;
|
||||||
private final ObjectMapper mapper;
|
private final ObjectMapper mapper;
|
||||||
private final YarnConfiguration yarnConfiguration;
|
|
||||||
|
|
||||||
public JobServiceImpl(YarnConfiguration yarnConfiguration, Jackson2ObjectMapperBuilder builder) {
|
public JobServiceImpl(YarnClusters yarnClusters, Jackson2ObjectMapperBuilder builder) {
|
||||||
this.yarnConfiguration = yarnConfiguration;
|
this.yarnClusters = yarnClusters;
|
||||||
|
|
||||||
this.mapper = builder.build();
|
this.mapper = builder.build();
|
||||||
}
|
}
|
||||||
@@ -74,7 +74,7 @@ public class JobServiceImpl implements JobService {
|
|||||||
|
|
||||||
public ImmutableList<YarnApplication> list(String cluster, ImmutableList<String> states) throws JsonProcessingException {
|
public ImmutableList<YarnApplication> list(String cluster, ImmutableList<String> states) throws JsonProcessingException {
|
||||||
boolean isFilter = ObjectUtil.isNotEmpty(states);
|
boolean isFilter = ObjectUtil.isNotEmpty(states);
|
||||||
String queryUrl = URLUtil.completeUrl(yarnConfiguration.getWebUrl(cluster), "/ws/v1/cluster/apps");
|
String queryUrl = URLUtil.completeUrl(yarnClusters.getWebUrl(cluster), "/ws/v1/cluster/apps");
|
||||||
try (HttpResponse response = HttpUtil.createGet(queryUrl).setMaxRedirectCount(10).execute()) {
|
try (HttpResponse response = HttpUtil.createGet(queryUrl).setMaxRedirectCount(10).execute()) {
|
||||||
return mapper.readValue(response.body(), ApplicationsListResponse.class)
|
return mapper.readValue(response.body(), ApplicationsListResponse.class)
|
||||||
.getApps()
|
.getApps()
|
||||||
@@ -138,7 +138,7 @@ public class JobServiceImpl implements JobService {
|
|||||||
@Retryable(Throwable.class)
|
@Retryable(Throwable.class)
|
||||||
@Override
|
@Override
|
||||||
public YarnApplication detail(String cluster, String applicationId) throws JsonProcessingException {
|
public YarnApplication detail(String cluster, String applicationId) throws JsonProcessingException {
|
||||||
String queryUrl = URLUtil.completeUrl(yarnConfiguration.getWebUrl(cluster), "/ws/v1/cluster/apps/" + applicationId);
|
String queryUrl = URLUtil.completeUrl(yarnClusters.getWebUrl(cluster), "/ws/v1/cluster/apps/" + applicationId);
|
||||||
try (HttpResponse response = HttpUtil.createGet(queryUrl).setMaxRedirectCount(10).execute()) {
|
try (HttpResponse response = HttpUtil.createGet(queryUrl).setMaxRedirectCount(10).execute()) {
|
||||||
return mapper.readValue(response.body(), ApplicationDetailResponse.class).getApp().setCluster(cluster);
|
return mapper.readValue(response.body(), ApplicationDetailResponse.class).getApp().setCluster(cluster);
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -5,8 +5,8 @@ import cn.hutool.core.util.URLUtil;
|
|||||||
import cn.hutool.http.HttpUtil;
|
import cn.hutool.http.HttpUtil;
|
||||||
import com.fasterxml.jackson.core.JsonProcessingException;
|
import com.fasterxml.jackson.core.JsonProcessingException;
|
||||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||||
|
import com.lanyuanxiaoyao.service.configuration.entity.yarn.YarnClusters;
|
||||||
import com.lanyuanxiaoyao.service.configuration.entity.yarn.YarnQueue;
|
import com.lanyuanxiaoyao.service.configuration.entity.yarn.YarnQueue;
|
||||||
import com.lanyuanxiaoyao.service.yarn.configuration.YarnConfiguration;
|
|
||||||
import com.lanyuanxiaoyao.service.yarn.response.QueueListResponse;
|
import com.lanyuanxiaoyao.service.yarn.response.QueueListResponse;
|
||||||
import com.lanyuanxiaoyao.service.yarn.service.QueueService;
|
import com.lanyuanxiaoyao.service.yarn.service.QueueService;
|
||||||
import org.eclipse.collections.api.list.ImmutableList;
|
import org.eclipse.collections.api.list.ImmutableList;
|
||||||
@@ -29,11 +29,11 @@ import org.springframework.stereotype.Service;
|
|||||||
public class QueueServiceImpl implements QueueService {
|
public class QueueServiceImpl implements QueueService {
|
||||||
private static final Logger logger = LoggerFactory.getLogger(QueueServiceImpl.class);
|
private static final Logger logger = LoggerFactory.getLogger(QueueServiceImpl.class);
|
||||||
|
|
||||||
|
private final YarnClusters yarnClusters;
|
||||||
private final ObjectMapper mapper;
|
private final ObjectMapper mapper;
|
||||||
private final YarnConfiguration yarnConfiguration;
|
|
||||||
|
|
||||||
public QueueServiceImpl(YarnConfiguration yarnConfiguration, Jackson2ObjectMapperBuilder builder) {
|
public QueueServiceImpl(YarnClusters yarnClusters, Jackson2ObjectMapperBuilder builder) {
|
||||||
this.yarnConfiguration = yarnConfiguration;
|
this.yarnClusters = yarnClusters;
|
||||||
|
|
||||||
this.mapper = builder.build();
|
this.mapper = builder.build();
|
||||||
}
|
}
|
||||||
@@ -42,7 +42,7 @@ public class QueueServiceImpl implements QueueService {
|
|||||||
@Retryable(Throwable.class)
|
@Retryable(Throwable.class)
|
||||||
@Override
|
@Override
|
||||||
public ImmutableList<YarnQueue> list(String cluster) throws JsonProcessingException {
|
public ImmutableList<YarnQueue> list(String cluster) throws JsonProcessingException {
|
||||||
String queryUrl = URLUtil.completeUrl(yarnConfiguration.getWebUrl(cluster), "/ws/v1/cluster/scheduler");
|
String queryUrl = URLUtil.completeUrl(yarnClusters.getWebUrl(cluster), "/ws/v1/cluster/scheduler");
|
||||||
String body = HttpUtil.createGet(queryUrl).setMaxRedirectCount(10).execute().body();
|
String body = HttpUtil.createGet(queryUrl).setMaxRedirectCount(10).execute().body();
|
||||||
QueueListResponse response = mapper.readValue(body, QueueListResponse.class);
|
QueueListResponse response = mapper.readValue(body, QueueListResponse.class);
|
||||||
return response.getScheduler().getSchedulerInfo().getQueues().getQueue()
|
return response.getScheduler().getSchedulerInfo().getQueues().getQueue()
|
||||||
|
|||||||
@@ -1,16 +1,10 @@
|
|||||||
package com.lanyuanxiaoyao.service.yarn;
|
package com.lanyuanxiaoyao.service.yarn;
|
||||||
|
|
||||||
import cn.hutool.core.util.ObjectUtil;
|
|
||||||
import cn.hutool.http.HttpUtil;
|
|
||||||
import com.fasterxml.jackson.annotation.JsonInclude;
|
import com.fasterxml.jackson.annotation.JsonInclude;
|
||||||
import com.fasterxml.jackson.core.JsonProcessingException;
|
import com.fasterxml.jackson.core.JsonProcessingException;
|
||||||
import com.fasterxml.jackson.databind.DeserializationFeature;
|
import com.fasterxml.jackson.databind.DeserializationFeature;
|
||||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||||
import com.fasterxml.jackson.datatype.eclipsecollections.EclipseCollectionsModule;
|
import com.fasterxml.jackson.datatype.eclipsecollections.EclipseCollectionsModule;
|
||||||
import com.lanyuanxiaoyao.service.configuration.entity.yarn.YarnQueue;
|
|
||||||
import com.lanyuanxiaoyao.service.yarn.configuration.YarnConfiguration;
|
|
||||||
import com.lanyuanxiaoyao.service.yarn.response.QueueListResponse;
|
|
||||||
import com.lanyuanxiaoyao.service.yarn.service.impl.QueueServiceImpl;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @author lanyuanxiaoyao
|
* @author lanyuanxiaoyao
|
||||||
|
|||||||
Reference in New Issue
Block a user