feature(yarn-query): 适配 yarn 队列的子队列
This commit is contained in:
@@ -1,5 +1,6 @@
|
||||
package com.lanyuanxiaoyao.service.yarn.service.impl;
|
||||
|
||||
import cn.hutool.core.util.ObjectUtil;
|
||||
import cn.hutool.core.util.StrUtil;
|
||||
import cn.hutool.core.util.URLUtil;
|
||||
import cn.hutool.http.HttpUtil;
|
||||
@@ -11,6 +12,7 @@ import com.lanyuanxiaoyao.service.yarn.response.QueueListResponse;
|
||||
import com.lanyuanxiaoyao.service.yarn.service.QueueService;
|
||||
import org.eclipse.collections.api.factory.Lists;
|
||||
import org.eclipse.collections.api.list.ImmutableList;
|
||||
import org.eclipse.collections.api.list.MutableList;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.springframework.cache.annotation.CacheConfig;
|
||||
@@ -44,7 +46,17 @@ public class QueueServiceImpl implements QueueService {
|
||||
String queryUrl = URLUtil.completeUrl(yarnConfiguration.getWebUrl(), "/ws/v1/cluster/scheduler");
|
||||
String body = HttpUtil.createGet(queryUrl).setMaxRedirectCount(10).execute().body();
|
||||
QueueListResponse response = mapper.readValue(body, QueueListResponse.class);
|
||||
return response.getScheduler().getSchedulerInfo().getQueues().getQueue().tap(q -> q.setCluster(yarnConfiguration.getCluster()));
|
||||
return response.getScheduler().getSchedulerInfo().getQueues().getQueue()
|
||||
.flatCollect(q -> {
|
||||
MutableList<YarnQueue> queues = Lists.mutable.of(q);
|
||||
if (ObjectUtil.isNotNull(q.getQueues())) {
|
||||
q.getQueues().getQueue()
|
||||
.tap(q1 -> q1.setQueueName(StrUtil.format("{}.{}", q.getQueueName(), q1.getQueueName())))
|
||||
.forEach(queues::add);
|
||||
}
|
||||
return queues;
|
||||
})
|
||||
.tap(q -> q.setCluster(yarnConfiguration.getCluster()));
|
||||
}
|
||||
|
||||
@Cacheable(value = "queue-detail", sync = true, key = "#name")
|
||||
|
||||
@@ -0,0 +1,43 @@
|
||||
package com.lanyuanxiaoyao.service.yarn;
|
||||
|
||||
import cn.hutool.core.util.ObjectUtil;
|
||||
import cn.hutool.http.HttpUtil;
|
||||
import com.fasterxml.jackson.annotation.JsonInclude;
|
||||
import com.fasterxml.jackson.core.JsonProcessingException;
|
||||
import com.fasterxml.jackson.databind.DeserializationFeature;
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.fasterxml.jackson.datatype.eclipsecollections.EclipseCollectionsModule;
|
||||
import com.lanyuanxiaoyao.service.configuration.entity.yarn.YarnQueue;
|
||||
import com.lanyuanxiaoyao.service.yarn.configuration.YarnConfiguration;
|
||||
import com.lanyuanxiaoyao.service.yarn.response.QueueListResponse;
|
||||
import com.lanyuanxiaoyao.service.yarn.service.impl.QueueServiceImpl;
|
||||
|
||||
/**
|
||||
* @author lanyuanxiaoyao
|
||||
* @date 2023-05-29
|
||||
*/
|
||||
public class QueueResponseParseTest {
|
||||
public static void main(String[] args) throws JsonProcessingException {
|
||||
ObjectMapper mapper = new ObjectMapper();
|
||||
mapper.registerModule(new EclipseCollectionsModule());
|
||||
mapper.configure(DeserializationFeature.FAIL_ON_IGNORED_PROPERTIES, false);
|
||||
mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
|
||||
mapper.setDefaultPropertyInclusion(JsonInclude.Include.NON_NULL);
|
||||
/*String response = HttpUtil.get("http://b5m2.hdp.dc:8088/ws/v1/cluster/scheduler");
|
||||
QueueListResponse queueListResponse = mapper.readValue(response, QueueListResponse.class);
|
||||
queueListResponse.getScheduler().getSchedulerInfo().getQueues().getQueue().forEach(q -> {
|
||||
System.out.println(q.getQueueName());
|
||||
if (ObjectUtil.isNotNull(q.getQueues())) {
|
||||
q.getQueues().getQueue().forEach(q1 -> System.out.println(q1.getQueueName()));
|
||||
}
|
||||
});*/
|
||||
|
||||
YarnConfiguration configuration = new YarnConfiguration();
|
||||
configuration.setCluster("b5");
|
||||
configuration.setWebUrl("http://b5m2.hdp.dc:8088");
|
||||
QueueServiceImpl service = new QueueServiceImpl(mapper, configuration);
|
||||
for (YarnQueue yarnQueue : service.list()) {
|
||||
System.out.println(yarnQueue.getQueueName());
|
||||
}
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user