feature(yarn-query): 优化 yarn-query 为集群共享查询

原本一个 yarn-query 对应一个集群,集群多了之后,比较浪费,改为一个组件可以查询任意集群的信息,减少部署的复杂性
This commit is contained in:
2023-06-02 09:09:26 +08:00
parent 7768b427aa
commit cd676367c6
22 changed files with 112 additions and 491 deletions

View File

@@ -1,5 +1,8 @@
package com.lanyuanxiaoyao.service.yarn.configuration;
import java.util.Map;
import org.eclipse.collections.api.factory.Maps;
import org.eclipse.collections.api.map.ImmutableMap;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.stereotype.Component;
@@ -12,30 +15,27 @@ import org.springframework.stereotype.Component;
@Component
@ConfigurationProperties("yarn")
public class YarnConfiguration {
private String cluster;
private String webUrl;
private ImmutableMap<String, String> webUrls;
public String getCluster() {
return cluster;
public ImmutableMap<String, String> getWebUrls() {
return webUrls;
}
public void setCluster(String cluster) {
this.cluster = cluster;
public String getWebUrl(String cluster) {
if (!webUrls.containsKey(cluster)) {
throw new RuntimeException("Cluster not found");
}
return webUrls.get(cluster);
}
public String getWebUrl() {
return webUrl;
}
public void setWebUrl(String webUrl) {
this.webUrl = webUrl;
public void setWebUrls(Map<String, String> webUrls) {
this.webUrls = Maps.immutable.ofAll(webUrls);
}
@Override
public String toString() {
return "YarnConfiguration{" +
"cluster='" + cluster + '\'' +
", webUrl='" + webUrl + '\'' +
"clusters=" + webUrls +
'}';
}
}

View File

@@ -7,6 +7,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
/**
@@ -27,7 +28,7 @@ public class ClusterController {
}
@GetMapping("info")
public YarnRootQueue info() throws JsonProcessingException {
return clusterService.info();
public YarnRootQueue info(@RequestParam("cluster") String cluster) throws JsonProcessingException {
return clusterService.info(cluster);
}
}

View File

@@ -30,27 +30,22 @@ public class JobController {
}
@GetMapping("list")
public ImmutableList<YarnApplication> list() throws JsonProcessingException {
return jobService.list();
public ImmutableList<YarnApplication> list(@RequestParam("cluster") String cluster) throws JsonProcessingException {
return jobService.list(cluster);
}
@GetMapping("list_equals")
public ImmutableList<YarnApplication> listEquals(@RequestParam("name") String name) throws JsonProcessingException {
return jobService.listEquals(name);
public ImmutableList<YarnApplication> listEquals(@RequestParam("cluster") String cluster, @RequestParam("name") String name) throws JsonProcessingException {
return jobService.listEquals(cluster, name);
}
@GetMapping("list_like")
public ImmutableList<YarnApplication> listLike(@RequestParam("text") String text) throws JsonProcessingException {
return jobService.listLike(text);
}
@GetMapping("kill")
public void kill(@RequestParam("application_id") String applicationId) {
jobService.kill(applicationId);
public ImmutableList<YarnApplication> listLike(@RequestParam("cluster") String cluster, @RequestParam("text") String text) throws JsonProcessingException {
return jobService.listLike(cluster, text);
}
@GetMapping("detail")
public YarnApplication detail(@RequestParam("application_id") String applicationId) throws Exception {
return jobService.detail(applicationId);
public YarnApplication detail(@RequestParam("cluster") String cluster, @RequestParam("application_id") String applicationId) throws Exception {
return jobService.detail(cluster, applicationId);
}
}

View File

@@ -30,12 +30,12 @@ public class QueueController {
}
@GetMapping("list")
public ImmutableList<YarnQueue> list() throws JsonProcessingException {
return queueService.list();
public ImmutableList<YarnQueue> list(@RequestParam("cluster") String cluster) throws JsonProcessingException {
return queueService.list(cluster);
}
@GetMapping("detail")
public YarnQueue detail(@RequestParam("name") String name) throws Exception {
return queueService.detail(name);
public YarnQueue detail(@RequestParam("cluster") String cluster, @RequestParam("name") String name) throws Exception {
return queueService.detail(cluster, name);
}
}

View File

@@ -10,5 +10,5 @@ import com.lanyuanxiaoyao.service.configuration.entity.yarn.YarnRootQueue;
* @date 2023-04-23
*/
public interface ClusterService {
YarnRootQueue info() throws JsonProcessingException;
YarnRootQueue info(String name) throws JsonProcessingException;
}

View File

@@ -11,13 +11,11 @@ import org.eclipse.collections.api.list.ImmutableList;
* @date 2023-04-23
*/
public interface JobService {
ImmutableList<YarnApplication> list() throws JsonProcessingException;
ImmutableList<YarnApplication> list(String cluster) throws JsonProcessingException;
ImmutableList<YarnApplication> listEquals(String name) throws JsonProcessingException;
ImmutableList<YarnApplication> listEquals(String cluster, String name) throws JsonProcessingException;
ImmutableList<YarnApplication> listLike(String text) throws JsonProcessingException;
ImmutableList<YarnApplication> listLike(String cluster, String text) throws JsonProcessingException;
void kill(String applicationId);
YarnApplication detail(String applicationId) throws Exception;
YarnApplication detail(String cluster, String applicationId) throws Exception;
}

View File

@@ -18,9 +18,9 @@ import org.eclipse.collections.api.list.MutableList;
* @date 2023-04-23
*/
public interface QueueService {
ImmutableList<YarnQueue> list() throws JsonProcessingException;
ImmutableList<YarnQueue> list(String cluster) throws JsonProcessingException;
YarnQueue detail(String name) throws Exception;
YarnQueue detail(String cluster, String name) throws Exception;
default Function<YarnQueue, Iterable<YarnQueue>> flatChildren() {
return queue -> {

View File

@@ -39,11 +39,12 @@ public class ClusterServiceImpl implements ClusterService {
@Cacheable(value = "cluster-info", sync = true)
@Retryable(Throwable.class)
@Override
public YarnRootQueue info() throws JsonProcessingException {
String queryUrl = URLUtil.completeUrl(yarnConfiguration.getWebUrl(), "/ws/v1/cluster/scheduler");
public YarnRootQueue info(String cluster) throws JsonProcessingException {
String webUrl = yarnConfiguration.getWebUrl(cluster);
String queryUrl = URLUtil.completeUrl(webUrl, "/ws/v1/cluster/scheduler");
String body = HttpUtil.createGet(queryUrl).setMaxRedirectCount(10).execute().body();
return mapper.readValue(body, ClusterInfoResponse.class).getScheduler().getSchedulerInfo()
.setCluster(yarnConfiguration.getCluster())
.setWebUrl(URLUtil.completeUrl(yarnConfiguration.getWebUrl(), "/cluster/scheduler"));
.setCluster(cluster)
.setWebUrl(URLUtil.completeUrl(webUrl, "/cluster/scheduler"));
}
}

View File

@@ -1,74 +0,0 @@
package com.lanyuanxiaoyao.service.yarn.service.impl;
import cn.hutool.core.util.StrUtil;
import cn.hutool.core.util.URLUtil;
import cn.hutool.http.HttpUtil;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.lanyuanxiaoyao.service.configuration.entity.yarn.YarnApplication;
import com.lanyuanxiaoyao.service.yarn.configuration.YarnConfiguration;
import com.lanyuanxiaoyao.service.yarn.response.ApplicationsListResponse;
import com.lanyuanxiaoyao.service.yarn.service.JobService;
import java.util.concurrent.atomic.AtomicReference;
import org.eclipse.collections.api.factory.Lists;
import org.eclipse.collections.api.list.ImmutableList;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.http.converter.json.Jackson2ObjectMapperBuilder;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service;
/**
* Job
*
* @author lanyuanxiaoyao
* @date 2023-04-23
*/
@Service("JobAutoRefreshService")
public class JobAutoRefreshServiceImpl implements JobService {
private static final Logger logger = LoggerFactory.getLogger(JobAutoRefreshServiceImpl.class);
private final ObjectMapper mapper;
private final YarnConfiguration yarnConfiguration;
public JobAutoRefreshServiceImpl( YarnConfiguration yarnConfiguration, Jackson2ObjectMapperBuilder builder) {
this.yarnConfiguration = yarnConfiguration;
this.mapper = builder.build();
}
private static final AtomicReference<ImmutableList<YarnApplication>> CACHE = new AtomicReference<>(Lists.immutable.empty());
@Scheduled(fixedRate = 50000)
public void refresh() throws JsonProcessingException {
String queryUrl = URLUtil.completeUrl(yarnConfiguration.getWebUrl(), "/ws/v1/cluster/apps");
String body = HttpUtil.createGet(queryUrl).setMaxRedirectCount(10).execute().body();
ImmutableList<YarnApplication> apps = mapper.readValue(body, ApplicationsListResponse.class).getApps().getApp().tap(app -> app.setCluster(yarnConfiguration.getCluster()));
CACHE.set(apps);
}
@Override
public ImmutableList<YarnApplication> list() {
return CACHE.get();
}
@Override
public ImmutableList<YarnApplication> listEquals(String name) {
return list().select(app -> StrUtil.equals(app.getName(), name));
}
@Override
public ImmutableList<YarnApplication> listLike(String name) {
return list().select(app -> StrUtil.contains(app.getName(), name));
}
@Override
public void kill(String applicationId) {
}
@Override
public YarnApplication detail(String applicationId) throws Exception {
return list().select(app -> StrUtil.equals(app.getId(), applicationId))
.getFirstOptional().orElseThrow(() -> new Exception("cannot found " + applicationId));
}
}

View File

@@ -40,41 +40,37 @@ public class JobServiceImpl implements JobService {
this.mapper = builder.build();
}
@Cacheable(value = "job-list", sync = true)
@Cacheable(value = "job-list", sync = true, key = "#cluster")
@Retryable(Throwable.class)
@Override
public ImmutableList<YarnApplication> list() throws JsonProcessingException {
String queryUrl = URLUtil.completeUrl(yarnConfiguration.getWebUrl(), "/ws/v1/cluster/apps");
public ImmutableList<YarnApplication> list(String cluster) throws JsonProcessingException {
String queryUrl = URLUtil.completeUrl(yarnConfiguration.getWebUrl(cluster), "/ws/v1/cluster/apps");
try (HttpResponse response = HttpUtil.createGet(queryUrl).setMaxRedirectCount(10).execute()) {
return mapper.readValue(response.body(), ApplicationsListResponse.class).getApps().getApp().tap(app -> app.setCluster(yarnConfiguration.getCluster()));
return mapper.readValue(response.body(), ApplicationsListResponse.class).getApps().getApp().tap(app -> app.setCluster(cluster));
}
}
@Cacheable(value = "job-list", sync = true, key = "#{methodName+name}")
@Cacheable(value = "job-list", sync = true, key = "#{methodName+cluster+name}")
@Retryable(Throwable.class)
@Override
public ImmutableList<YarnApplication> listEquals(String name) throws JsonProcessingException {
return list().select(app -> StrUtil.equals(app.getName(), name));
public ImmutableList<YarnApplication> listEquals(String cluster, String name) throws JsonProcessingException {
return list(cluster).select(app -> StrUtil.equals(app.getName(), name));
}
@Cacheable(value = "job-list", sync = true, key = "#{methodName+name}")
@Cacheable(value = "job-list", sync = true, key = "#{methodName+cluster+name}")
@Retryable(Throwable.class)
@Override
public ImmutableList<YarnApplication> listLike(String name) throws JsonProcessingException {
return list().select(app -> StrUtil.contains(app.getName(), name));
public ImmutableList<YarnApplication> listLike(String cluster, String name) throws JsonProcessingException {
return list(cluster).select(app -> StrUtil.contains(app.getName(), name));
}
@Override
public void kill(String applicationId) {
}
@Cacheable(value = "job-detail", sync = true, key = "#applicationId")
@Cacheable(value = "job-detail", sync = true)
@Retryable(Throwable.class)
@Override
public YarnApplication detail(String applicationId) throws JsonProcessingException {
String queryUrl = URLUtil.completeUrl(yarnConfiguration.getWebUrl(), "/ws/v1/cluster/apps/" + applicationId);
public YarnApplication detail(String cluster, String applicationId) throws JsonProcessingException {
String queryUrl = URLUtil.completeUrl(yarnConfiguration.getWebUrl(cluster), "/ws/v1/cluster/apps/" + applicationId);
try (HttpResponse response = HttpUtil.createGet(queryUrl).setMaxRedirectCount(10).execute()) {
return mapper.readValue(response.body(), ApplicationDetailResponse.class).getApp().setCluster(yarnConfiguration.getCluster());
return mapper.readValue(response.body(), ApplicationDetailResponse.class).getApp().setCluster(cluster);
}
}
}

View File

@@ -1,67 +0,0 @@
package com.lanyuanxiaoyao.service.yarn.service.impl;
import cn.hutool.core.util.ObjectUtil;
import cn.hutool.core.util.StrUtil;
import cn.hutool.core.util.URLUtil;
import cn.hutool.http.HttpUtil;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.lanyuanxiaoyao.service.configuration.entity.yarn.YarnQueue;
import com.lanyuanxiaoyao.service.yarn.configuration.YarnConfiguration;
import com.lanyuanxiaoyao.service.yarn.response.QueueListResponse;
import com.lanyuanxiaoyao.service.yarn.service.QueueService;
import java.util.concurrent.atomic.AtomicReference;
import org.eclipse.collections.api.factory.Lists;
import org.eclipse.collections.api.list.ImmutableList;
import org.eclipse.collections.api.list.MutableList;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.http.converter.json.Jackson2ObjectMapperBuilder;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service;
/**
* Queue
*
* @author lanyuanxiaoyao
* @date 2023-04-23
*/
@Service("QueueAutoRefreshService")
public class QueueAutoRefreshServiceImpl implements QueueService {
private static final Logger logger = LoggerFactory.getLogger(QueueAutoRefreshServiceImpl.class);
private final ObjectMapper mapper;
private final YarnConfiguration yarnConfiguration;
public QueueAutoRefreshServiceImpl(YarnConfiguration yarnConfiguration, Jackson2ObjectMapperBuilder builder) {
this.yarnConfiguration = yarnConfiguration;
this.mapper = builder.build();
}
private static final AtomicReference<ImmutableList<YarnQueue>> CACHE = new AtomicReference<>(Lists.immutable.empty());
@Scheduled(fixedRate = 50000)
public void refresh() throws JsonProcessingException {
String queryUrl = URLUtil.completeUrl(yarnConfiguration.getWebUrl(), "/ws/v1/cluster/scheduler");
String body = HttpUtil.createGet(queryUrl).setMaxRedirectCount(10).execute().body();
QueueListResponse response = mapper.readValue(body, QueueListResponse.class);
ImmutableList<YarnQueue> queues = response.getScheduler().getSchedulerInfo().getQueues().getQueue()
.flatCollect(flatChildren())
.tap(q -> q.setCluster(yarnConfiguration.getCluster()));
CACHE.set(queues);
}
@Override
public ImmutableList<YarnQueue> list() throws JsonProcessingException {
return CACHE.get();
}
@Override
public YarnQueue detail(String name) throws Exception {
return list()
.select(q -> StrUtil.equals(q.getQueueName(), name))
.getFirstOptional()
.orElseThrow(() -> new Exception("cannot found " + name));
}
}

View File

@@ -41,23 +41,23 @@ public class QueueServiceImpl implements QueueService {
@Cacheable(value = "queue-list", sync = true)
@Retryable(Throwable.class)
@Override
public ImmutableList<YarnQueue> list() throws JsonProcessingException {
String queryUrl = URLUtil.completeUrl(yarnConfiguration.getWebUrl(), "/ws/v1/cluster/scheduler");
public ImmutableList<YarnQueue> list(String cluster) throws JsonProcessingException {
String queryUrl = URLUtil.completeUrl(yarnConfiguration.getWebUrl(cluster), "/ws/v1/cluster/scheduler");
String body = HttpUtil.createGet(queryUrl).setMaxRedirectCount(10).execute().body();
QueueListResponse response = mapper.readValue(body, QueueListResponse.class);
return response.getScheduler().getSchedulerInfo().getQueues().getQueue()
.flatCollect(flatChildren())
.tap(q -> q.setCluster(yarnConfiguration.getCluster()));
.tap(q -> q.setCluster(cluster));
}
@Cacheable(value = "queue-detail", sync = true, key = "#name")
@Cacheable(value = "queue-detail", sync = true)
@Retryable(Throwable.class)
@Override
public YarnQueue detail(String name) throws Exception {
return list()
public YarnQueue detail(String cluster, String name) throws Exception {
return list(cluster)
.select(q -> StrUtil.equals(q.getQueueName(), name))
.getFirstOptional()
.orElseThrow(() -> new Exception("cannot found " + name))
.setCluster(yarnConfiguration.getCluster());
.setCluster(cluster);
}
}

View File

@@ -3,3 +3,10 @@ spring:
name: service-yarn-query
profiles:
include: random-port,common,eureka,metrics
yarn:
web-urls:
a4: http://132.121.107.91:8088
b1: http://132.122.98.13:8088
b4: http://132.122.112.30:8088
b5: http://132.122.116.12:8088
b5-sync: http://132.122.116.143:8088