feat(yarn-query): 增加运行时和非运行时任务的搜索
This commit is contained in:
@@ -34,16 +34,46 @@ public class JobController {
|
||||
return jobService.list(cluster);
|
||||
}
|
||||
|
||||
@GetMapping("list_running")
|
||||
public ImmutableList<YarnApplication> listRunning(@RequestParam("cluster") String cluster) throws JsonProcessingException {
|
||||
return jobService.listRunning(cluster);
|
||||
}
|
||||
|
||||
@GetMapping("list_not_running")
|
||||
public ImmutableList<YarnApplication> listNotRunning(@RequestParam("cluster") String cluster) throws JsonProcessingException {
|
||||
return jobService.listNotRunning(cluster);
|
||||
}
|
||||
|
||||
@GetMapping("list_equals")
|
||||
public ImmutableList<YarnApplication> listEquals(@RequestParam("cluster") String cluster, @RequestParam("name") String name) throws JsonProcessingException {
|
||||
return jobService.listEquals(cluster, name);
|
||||
}
|
||||
|
||||
@GetMapping("list_running_equals")
|
||||
public ImmutableList<YarnApplication> listRunningEquals(@RequestParam("cluster") String cluster, @RequestParam("name") String name) throws JsonProcessingException {
|
||||
return jobService.listRunningEquals(cluster, name);
|
||||
}
|
||||
|
||||
@GetMapping("list_not_running_equals")
|
||||
public ImmutableList<YarnApplication> listNotRunningEquals(@RequestParam("cluster") String cluster, @RequestParam("name") String name) throws JsonProcessingException {
|
||||
return jobService.listNotRunningEquals(cluster, name);
|
||||
}
|
||||
|
||||
@GetMapping("list_like")
|
||||
public ImmutableList<YarnApplication> listLike(@RequestParam("cluster") String cluster, @RequestParam("text") String text) throws JsonProcessingException {
|
||||
return jobService.listLike(cluster, text);
|
||||
}
|
||||
|
||||
@GetMapping("list_running_like")
|
||||
public ImmutableList<YarnApplication> listRunningLike(@RequestParam("cluster") String cluster, @RequestParam("text") String text) throws JsonProcessingException {
|
||||
return jobService.listRunningLike(cluster, text);
|
||||
}
|
||||
|
||||
@GetMapping("list_not_running_like")
|
||||
public ImmutableList<YarnApplication> listNotRunningLike(@RequestParam("cluster") String cluster, @RequestParam("text") String text) throws JsonProcessingException {
|
||||
return jobService.listNotRunningLike(cluster, text);
|
||||
}
|
||||
|
||||
@GetMapping("detail")
|
||||
public YarnApplication detail(@RequestParam("cluster") String cluster, @RequestParam("application_id") String applicationId) throws Exception {
|
||||
return jobService.detail(cluster, applicationId);
|
||||
|
||||
@@ -13,9 +13,21 @@ import org.eclipse.collections.api.list.ImmutableList;
|
||||
public interface JobService {
|
||||
ImmutableList<YarnApplication> list(String cluster) throws JsonProcessingException;
|
||||
|
||||
ImmutableList<YarnApplication> listRunning(String cluster) throws JsonProcessingException;
|
||||
|
||||
ImmutableList<YarnApplication> listNotRunning(String cluster) throws JsonProcessingException;
|
||||
|
||||
ImmutableList<YarnApplication> listEquals(String cluster, String name) throws JsonProcessingException;
|
||||
|
||||
ImmutableList<YarnApplication> listRunningEquals(String cluster, String name) throws JsonProcessingException;
|
||||
|
||||
ImmutableList<YarnApplication> listNotRunningEquals(String cluster, String name) throws JsonProcessingException;
|
||||
|
||||
ImmutableList<YarnApplication> listLike(String cluster, String text) throws JsonProcessingException;
|
||||
|
||||
ImmutableList<YarnApplication> listRunningLike(String cluster, String text) throws JsonProcessingException;
|
||||
|
||||
ImmutableList<YarnApplication> listNotRunningLike(String cluster, String text) throws JsonProcessingException;
|
||||
|
||||
YarnApplication detail(String cluster, String applicationId) throws Exception;
|
||||
}
|
||||
|
||||
@@ -1,5 +1,6 @@
|
||||
package com.lanyuanxiaoyao.service.yarn.service.impl;
|
||||
|
||||
import cn.hutool.core.util.ObjectUtil;
|
||||
import cn.hutool.core.util.StrUtil;
|
||||
import cn.hutool.core.util.URLUtil;
|
||||
import cn.hutool.http.HttpResponse;
|
||||
@@ -11,6 +12,7 @@ import com.lanyuanxiaoyao.service.yarn.configuration.YarnConfiguration;
|
||||
import com.lanyuanxiaoyao.service.yarn.response.ApplicationDetailResponse;
|
||||
import com.lanyuanxiaoyao.service.yarn.response.ApplicationsListResponse;
|
||||
import com.lanyuanxiaoyao.service.yarn.service.JobService;
|
||||
import org.eclipse.collections.api.factory.Lists;
|
||||
import org.eclipse.collections.api.list.ImmutableList;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
@@ -30,7 +32,16 @@ import org.springframework.stereotype.Service;
|
||||
@Service("JobService")
|
||||
public class JobServiceImpl implements JobService {
|
||||
private static final Logger logger = LoggerFactory.getLogger(JobServiceImpl.class);
|
||||
|
||||
private static final ImmutableList<String> RUNNING_STATE = Lists.immutable.of(YarnApplication.State.RUNNING);
|
||||
private static final ImmutableList<String> NOT_RUNNING_STATE = Lists.immutable.of(
|
||||
YarnApplication.State.NEW,
|
||||
YarnApplication.State.NEW_SAVING,
|
||||
YarnApplication.State.SUBMITTED,
|
||||
YarnApplication.State.ACCEPTED,
|
||||
YarnApplication.State.FINISHED,
|
||||
YarnApplication.State.FAILED,
|
||||
YarnApplication.State.KILLED
|
||||
);
|
||||
private final ObjectMapper mapper;
|
||||
private final YarnConfiguration yarnConfiguration;
|
||||
|
||||
@@ -40,13 +51,36 @@ public class JobServiceImpl implements JobService {
|
||||
this.mapper = builder.build();
|
||||
}
|
||||
|
||||
@Cacheable(value = "job-list", sync = true)
|
||||
@Cacheable(value = "job-list", sync = true, key = "#methodName+#cluster")
|
||||
@Retryable(Throwable.class)
|
||||
@Override
|
||||
public ImmutableList<YarnApplication> list(String cluster) throws JsonProcessingException {
|
||||
return list(cluster, null);
|
||||
}
|
||||
|
||||
@Cacheable(value = "job-list", sync = true, key = "#methodName+#cluster")
|
||||
@Retryable(Throwable.class)
|
||||
@Override
|
||||
public ImmutableList<YarnApplication> listRunning(String cluster) throws JsonProcessingException {
|
||||
return list(cluster, RUNNING_STATE);
|
||||
}
|
||||
|
||||
@Cacheable(value = "job-list", sync = true, key = "#methodName+#cluster")
|
||||
@Retryable(Throwable.class)
|
||||
@Override
|
||||
public ImmutableList<YarnApplication> listNotRunning(String cluster) throws JsonProcessingException {
|
||||
return list(cluster, NOT_RUNNING_STATE);
|
||||
}
|
||||
|
||||
public ImmutableList<YarnApplication> list(String cluster, ImmutableList<String> states) throws JsonProcessingException {
|
||||
boolean isFilter = ObjectUtil.isNotEmpty(states);
|
||||
String queryUrl = URLUtil.completeUrl(yarnConfiguration.getWebUrl(cluster), "/ws/v1/cluster/apps");
|
||||
try (HttpResponse response = HttpUtil.createGet(queryUrl).setMaxRedirectCount(10).execute()) {
|
||||
return mapper.readValue(response.body(), ApplicationsListResponse.class).getApps().getApp().tap(app -> app.setCluster(cluster));
|
||||
return mapper.readValue(response.body(), ApplicationsListResponse.class)
|
||||
.getApps()
|
||||
.getApp()
|
||||
.tap(app -> app.setCluster(cluster))
|
||||
.select(app -> !isFilter || states.contains(app.getState()));
|
||||
}
|
||||
}
|
||||
|
||||
@@ -54,14 +88,50 @@ public class JobServiceImpl implements JobService {
|
||||
@Retryable(Throwable.class)
|
||||
@Override
|
||||
public ImmutableList<YarnApplication> listEquals(String cluster, String name) throws JsonProcessingException {
|
||||
return list(cluster).select(app -> StrUtil.equals(app.getName(), name));
|
||||
return listEquals(cluster, null, name);
|
||||
}
|
||||
|
||||
@Cacheable(value = "job-list", sync = true, key = "#methodName+#cluster+#name")
|
||||
@Retryable(Throwable.class)
|
||||
@Override
|
||||
public ImmutableList<YarnApplication> listLike(String cluster, String name) throws JsonProcessingException {
|
||||
return list(cluster).select(app -> StrUtil.contains(app.getName(), name));
|
||||
public ImmutableList<YarnApplication> listRunningEquals(String cluster, String name) throws JsonProcessingException {
|
||||
return listEquals(cluster, RUNNING_STATE, name);
|
||||
}
|
||||
|
||||
@Cacheable(value = "job-list", sync = true, key = "#methodName+#cluster+#name")
|
||||
@Retryable(Throwable.class)
|
||||
@Override
|
||||
public ImmutableList<YarnApplication> listNotRunningEquals(String cluster, String name) throws JsonProcessingException {
|
||||
return listEquals(cluster, NOT_RUNNING_STATE, name);
|
||||
}
|
||||
|
||||
public ImmutableList<YarnApplication> listEquals(String cluster, ImmutableList<String> states, String name) throws JsonProcessingException {
|
||||
return list(cluster, states).select(app -> StrUtil.equals(app.getName(), name));
|
||||
}
|
||||
|
||||
@Cacheable(value = "job-list", sync = true, key = "#methodName+#cluster+#text")
|
||||
@Retryable(Throwable.class)
|
||||
@Override
|
||||
public ImmutableList<YarnApplication> listLike(String cluster, String text) throws JsonProcessingException {
|
||||
return listLike(cluster, null, text);
|
||||
}
|
||||
|
||||
@Cacheable(value = "job-list", sync = true, key = "#methodName+#cluster+#text")
|
||||
@Retryable(Throwable.class)
|
||||
@Override
|
||||
public ImmutableList<YarnApplication> listRunningLike(String cluster, String text) throws JsonProcessingException {
|
||||
return listLike(cluster, RUNNING_STATE, text);
|
||||
}
|
||||
|
||||
@Cacheable(value = "job-list", sync = true, key = "#methodName+#cluster+#text")
|
||||
@Retryable(Throwable.class)
|
||||
@Override
|
||||
public ImmutableList<YarnApplication> listNotRunningLike(String cluster, String text) throws JsonProcessingException {
|
||||
return listLike(cluster, NOT_RUNNING_STATE, text);
|
||||
}
|
||||
|
||||
public ImmutableList<YarnApplication> listLike(String cluster, ImmutableList<String> states, String text) throws JsonProcessingException {
|
||||
return list(cluster, states).select(app -> StrUtil.contains(app.getName(), text));
|
||||
}
|
||||
|
||||
@Cacheable(value = "job-detail", sync = true)
|
||||
|
||||
Reference in New Issue
Block a user