feature(web): yarn 任务增加完成度查询

This commit is contained in:
2023-06-27 09:51:00 +08:00
parent 29c328a899
commit 5d77f5c29b
6 changed files with 62 additions and 22 deletions

View File

@@ -5,9 +5,12 @@ import cn.hutool.core.util.ObjectUtil;
import cn.hutool.core.util.StrUtil;
import com.lanyuanxiaoyao.service.configuration.ExecutorProvider;
import com.lanyuanxiaoyao.service.configuration.entity.AmisResponse;
import com.lanyuanxiaoyao.service.configuration.entity.flink.FlinkVertex;
import com.lanyuanxiaoyao.service.configuration.entity.flink.FlinkVertexOverview;
import com.lanyuanxiaoyao.service.configuration.entity.yarn.YarnApplication;
import com.lanyuanxiaoyao.service.configuration.entity.yarn.YarnQueue;
import com.lanyuanxiaoyao.service.configuration.entity.yarn.YarnRootQueue;
import com.lanyuanxiaoyao.service.forest.service.FlinkService;
import com.lanyuanxiaoyao.service.forest.service.YarnService;
import com.lanyuanxiaoyao.service.web.entity.YarnApplicationVO;
import com.lanyuanxiaoyao.service.web.entity.YarnClusterVO;
@@ -44,10 +47,12 @@ public class YarnController extends BaseController {
YarnApplication::getFinishedTime
);
private final YarnService yarnService;
private final FlinkService flinkService;
@SuppressWarnings("SpringJavaInjectionPointsAutowiringInspection")
public YarnController(YarnService yarnService) {
public YarnController(YarnService yarnService, FlinkService flinkService) {
this.yarnService = yarnService;
this.flinkService = flinkService;
}
@GetMapping("job_list")
@@ -61,7 +66,8 @@ public class YarnController extends BaseController {
@RequestParam(value = "filter_final_status", defaultValue = "") String filterFinalStatus,
@RequestParam(value = "search_id", defaultValue = "") String searchId,
@RequestParam(value = "search_name", defaultValue = "") String searchName,
@RequestParam(value = "precise", defaultValue = "false") Boolean precise
@RequestParam(value = "precise", defaultValue = "false") Boolean precise,
@RequestParam(value = "completion", defaultValue = "false") Boolean completion
) {
boolean isFilterState = StrUtil.isNotBlank(filterState);
boolean isFilterFinalStatus = StrUtil.isNotBlank(filterFinalStatus);
@@ -82,7 +88,36 @@ public class YarnController extends BaseController {
ImmutableList<YarnApplicationVO> result = applications
.drop(Math.max(page - 1, 0) * count)
.take(count)
.collect(YarnApplicationVO::new);
.asParallel(ExecutorProvider.EXECUTORS, 1)
.collect(yarnApplication -> {
YarnApplicationVO vo = new YarnApplicationVO(yarnApplication);
if (!completion) {
return vo;
}
try {
if (vo.getCompactionApplication()
&& StrUtil.equals(yarnApplication.getState(), "RUNNING")
&& StrUtil.isNotBlank(yarnApplication.getTrackingUrl())) {
FlinkVertexOverview vertexOverview = flinkService.vertexOverview(yarnApplication.getTrackingUrl());
if (ObjectUtil.isNotNull(vertexOverview) && ObjectUtil.isNotEmpty(vertexOverview.getJobs())) {
Optional<FlinkVertex> vertex = vertexOverview.getJobs().getFirstOptional();
if (vertex.isPresent() && ObjectUtil.isNotNull(vertex.get().getTasks())) {
FlinkVertex.Tasks tasks = vertex.get().getTasks();
vo.setCompactionCompletionRatio(
ObjectUtil.isNotNull(tasks.getTotal()) && tasks.getTotal() != 0
? tasks.getRunning() * 1.0 / tasks.getTotal()
: 0
);
}
}
}
} catch (Throwable e) {
logger.error("Get ratio failure", e);
}
return vo;
})
.toList()
.toImmutable();
return responseCrudData(result, applications.size())
.withData("running", running)
.withData("unRunning", unRunning);

View File

@@ -28,6 +28,8 @@ public class YarnApplicationVO {
private String flinkJobName;
private String alias;
private Double compactionCompletionRatio = 0.0;
public YarnApplicationVO(YarnApplication yarnApplication) {
this.yarnApplication = yarnApplication;
@@ -164,4 +166,12 @@ public class YarnApplicationVO {
public String getAlias() {
return alias;
}
public Double getCompactionCompletionRatio() {
return compactionCompletionRatio;
}
public void setCompactionCompletionRatio(Double compactionCompletionRatio) {
this.compactionCompletionRatio = compactionCompletionRatio;
}
}