From bf1a33d72ea5fcfdca258ce1b618a430d79c0701 Mon Sep 17 00:00:00 2001 From: lanyuanxiaoyao Date: Mon, 29 May 2023 17:58:41 +0800 Subject: [PATCH] =?UTF-8?q?feature(yarn-query):=20=E9=80=82=E9=85=8D=20yar?= =?UTF-8?q?n=20=E9=98=9F=E5=88=97=E7=9A=84=E5=AD=90=E9=98=9F=E5=88=97?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../configuration/entity/yarn/YarnQueue.java | 28 ++++++++++++ .../yarn/service/impl/QueueServiceImpl.java | 14 +++++- .../service/yarn/QueueResponseParseTest.java | 43 +++++++++++++++++++ 3 files changed, 84 insertions(+), 1 deletion(-) create mode 100644 service-yarn-query/src/test/java/com/lanyuanxiaoyao/service/yarn/QueueResponseParseTest.java diff --git a/service-configuration/src/main/java/com/lanyuanxiaoyao/service/configuration/entity/yarn/YarnQueue.java b/service-configuration/src/main/java/com/lanyuanxiaoyao/service/configuration/entity/yarn/YarnQueue.java index 058b3d7..e8d673e 100644 --- a/service-configuration/src/main/java/com/lanyuanxiaoyao/service/configuration/entity/yarn/YarnQueue.java +++ b/service-configuration/src/main/java/com/lanyuanxiaoyao/service/configuration/entity/yarn/YarnQueue.java @@ -1,5 +1,7 @@ package com.lanyuanxiaoyao.service.configuration.entity.yarn; +import org.eclipse.collections.api.list.ImmutableList; + /** * Yarn 队列简单实体类 * @@ -38,6 +40,7 @@ public final class YarnQueue { private ResourcesUsed AMResourceLimit; private ResourcesUsed usedAMResource; private ResourcesUsed userAMResourceLimit; + private Queues queues; public String getCluster() { return cluster; @@ -52,6 +55,11 @@ public final class YarnQueue { return queueName; } + public YarnQueue setQueueName(String queueName) { + this.queueName = queueName; + return this; + } + public String getState() { return state; } @@ -168,6 +176,10 @@ public final class YarnQueue { return userAMResourceLimit; } + public Queues getQueues() { + return queues; + } + @Override public String toString() { return "YarnQueue{" + @@ -202,6 +214,7 @@ public final class YarnQueue { ", AMResourceLimit=" + AMResourceLimit + ", usedAMResource=" + usedAMResource + ", userAMResourceLimit=" + userAMResourceLimit + + ", queues=" + queues + '}'; } @@ -225,4 +238,19 @@ public final class YarnQueue { '}'; } } + + public static final class Queues { + private ImmutableList queue; + + public ImmutableList getQueue() { + return queue; + } + + @Override + public String toString() { + return "Queues{" + + "queue=" + queue + + '}'; + } + } } diff --git a/service-yarn-query/src/main/java/com/lanyuanxiaoyao/service/yarn/service/impl/QueueServiceImpl.java b/service-yarn-query/src/main/java/com/lanyuanxiaoyao/service/yarn/service/impl/QueueServiceImpl.java index ce907ce..5834ad4 100644 --- a/service-yarn-query/src/main/java/com/lanyuanxiaoyao/service/yarn/service/impl/QueueServiceImpl.java +++ b/service-yarn-query/src/main/java/com/lanyuanxiaoyao/service/yarn/service/impl/QueueServiceImpl.java @@ -1,5 +1,6 @@ package com.lanyuanxiaoyao.service.yarn.service.impl; +import cn.hutool.core.util.ObjectUtil; import cn.hutool.core.util.StrUtil; import cn.hutool.core.util.URLUtil; import cn.hutool.http.HttpUtil; @@ -11,6 +12,7 @@ import com.lanyuanxiaoyao.service.yarn.response.QueueListResponse; import com.lanyuanxiaoyao.service.yarn.service.QueueService; import org.eclipse.collections.api.factory.Lists; import org.eclipse.collections.api.list.ImmutableList; +import org.eclipse.collections.api.list.MutableList; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.cache.annotation.CacheConfig; @@ -44,7 +46,17 @@ public class QueueServiceImpl implements QueueService { String queryUrl = URLUtil.completeUrl(yarnConfiguration.getWebUrl(), "/ws/v1/cluster/scheduler"); String body = HttpUtil.createGet(queryUrl).setMaxRedirectCount(10).execute().body(); QueueListResponse response = mapper.readValue(body, QueueListResponse.class); - return response.getScheduler().getSchedulerInfo().getQueues().getQueue().tap(q -> q.setCluster(yarnConfiguration.getCluster())); + return response.getScheduler().getSchedulerInfo().getQueues().getQueue() + .flatCollect(q -> { + MutableList queues = Lists.mutable.of(q); + if (ObjectUtil.isNotNull(q.getQueues())) { + q.getQueues().getQueue() + .tap(q1 -> q1.setQueueName(StrUtil.format("{}.{}", q.getQueueName(), q1.getQueueName()))) + .forEach(queues::add); + } + return queues; + }) + .tap(q -> q.setCluster(yarnConfiguration.getCluster())); } @Cacheable(value = "queue-detail", sync = true, key = "#name") diff --git a/service-yarn-query/src/test/java/com/lanyuanxiaoyao/service/yarn/QueueResponseParseTest.java b/service-yarn-query/src/test/java/com/lanyuanxiaoyao/service/yarn/QueueResponseParseTest.java new file mode 100644 index 0000000..f010a2d --- /dev/null +++ b/service-yarn-query/src/test/java/com/lanyuanxiaoyao/service/yarn/QueueResponseParseTest.java @@ -0,0 +1,43 @@ +package com.lanyuanxiaoyao.service.yarn; + +import cn.hutool.core.util.ObjectUtil; +import cn.hutool.http.HttpUtil; +import com.fasterxml.jackson.annotation.JsonInclude; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.DeserializationFeature; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.datatype.eclipsecollections.EclipseCollectionsModule; +import com.lanyuanxiaoyao.service.configuration.entity.yarn.YarnQueue; +import com.lanyuanxiaoyao.service.yarn.configuration.YarnConfiguration; +import com.lanyuanxiaoyao.service.yarn.response.QueueListResponse; +import com.lanyuanxiaoyao.service.yarn.service.impl.QueueServiceImpl; + +/** + * @author lanyuanxiaoyao + * @date 2023-05-29 + */ +public class QueueResponseParseTest { + public static void main(String[] args) throws JsonProcessingException { + ObjectMapper mapper = new ObjectMapper(); + mapper.registerModule(new EclipseCollectionsModule()); + mapper.configure(DeserializationFeature.FAIL_ON_IGNORED_PROPERTIES, false); + mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false); + mapper.setDefaultPropertyInclusion(JsonInclude.Include.NON_NULL); + /*String response = HttpUtil.get("http://b5m2.hdp.dc:8088/ws/v1/cluster/scheduler"); + QueueListResponse queueListResponse = mapper.readValue(response, QueueListResponse.class); + queueListResponse.getScheduler().getSchedulerInfo().getQueues().getQueue().forEach(q -> { + System.out.println(q.getQueueName()); + if (ObjectUtil.isNotNull(q.getQueues())) { + q.getQueues().getQueue().forEach(q1 -> System.out.println(q1.getQueueName())); + } + });*/ + + YarnConfiguration configuration = new YarnConfiguration(); + configuration.setCluster("b5"); + configuration.setWebUrl("http://b5m2.hdp.dc:8088"); + QueueServiceImpl service = new QueueServiceImpl(mapper, configuration); + for (YarnQueue yarnQueue : service.list()) { + System.out.println(yarnQueue.getQueueName()); + } + } +}