feature(web): 增加一个直接通过 zookeeper 查询运行时信息的页面
This commit is contained in:
@@ -0,0 +1,50 @@
|
||||
package com.lanyuanxiaoyao.service.configuration.entity.zookeeper;
|
||||
|
||||
/**
|
||||
* Zookeeper 节点
|
||||
*
|
||||
* @author lanyuanxiaoyao
|
||||
* @date 2023-05-23
|
||||
*/
|
||||
public class ZookeeperNode {
|
||||
private String path;
|
||||
private String label;
|
||||
private Long createTime;
|
||||
private Long modifiedTime;
|
||||
|
||||
public ZookeeperNode() {
|
||||
}
|
||||
|
||||
public ZookeeperNode(String path, String label, Long createTime, Long modifiedTime) {
|
||||
this.path = path;
|
||||
this.label = label;
|
||||
this.createTime = createTime;
|
||||
this.modifiedTime = modifiedTime;
|
||||
}
|
||||
|
||||
public String getPath() {
|
||||
return path;
|
||||
}
|
||||
|
||||
public String getLabel() {
|
||||
return label;
|
||||
}
|
||||
|
||||
public Long getCreateTime() {
|
||||
return createTime;
|
||||
}
|
||||
|
||||
public Long getModifiedTime() {
|
||||
return modifiedTime;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return "ZookeeperNode{" +
|
||||
"path='" + path + '\'' +
|
||||
", label='" + label + '\'' +
|
||||
", createTime=" + createTime +
|
||||
", modifiedTime=" + modifiedTime +
|
||||
'}';
|
||||
}
|
||||
}
|
||||
@@ -3,6 +3,8 @@ package com.lanyuanxiaoyao.service.forest.service;
|
||||
import com.dtflys.forest.annotation.BaseRequest;
|
||||
import com.dtflys.forest.annotation.Get;
|
||||
import com.dtflys.forest.annotation.Query;
|
||||
import com.lanyuanxiaoyao.service.configuration.entity.zookeeper.ZookeeperNode;
|
||||
import org.eclipse.collections.api.list.ImmutableList;
|
||||
|
||||
/**
|
||||
* Zookeeper 查询
|
||||
@@ -20,4 +22,7 @@ public interface ZookeeperService {
|
||||
|
||||
@Get("/get_data")
|
||||
String getData(@Query("path") String path);
|
||||
|
||||
@Get("/get_children")
|
||||
ImmutableList<ZookeeperNode> getChildren(@Query("path") String path);
|
||||
}
|
||||
|
||||
@@ -0,0 +1,82 @@
|
||||
package com.lanyuanxiaoyao.service.web.controller;
|
||||
|
||||
import com.eshore.odcp.hudi.connector.entity.RunMeta;
|
||||
import com.eshore.odcp.hudi.connector.utils.NameHelper;
|
||||
import com.fasterxml.jackson.core.JsonProcessingException;
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.lanyuanxiaoyao.service.configuration.entity.AmisResponse;
|
||||
import com.lanyuanxiaoyao.service.forest.service.ZookeeperService;
|
||||
import com.lanyuanxiaoyao.service.web.entity.ZookeeperNodeVO;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.springframework.http.converter.json.Jackson2ObjectMapperBuilder;
|
||||
import org.springframework.web.bind.annotation.GetMapping;
|
||||
import org.springframework.web.bind.annotation.RequestMapping;
|
||||
import org.springframework.web.bind.annotation.RestController;
|
||||
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Executors;
|
||||
|
||||
/**
|
||||
* 运行时监控
|
||||
*
|
||||
* @author lanyuanxiaoyao
|
||||
* @date 2023-05-23
|
||||
*/
|
||||
@RestController
|
||||
@RequestMapping("running")
|
||||
public class RunningController extends BaseController {
|
||||
private static final Logger logger = LoggerFactory.getLogger(RunningController.class);
|
||||
private static final ExecutorService EXECUTOR = Executors.newWorkStealingPool(20);
|
||||
|
||||
private final ZookeeperService zookeeperService;
|
||||
private final ObjectMapper mapper;
|
||||
|
||||
@SuppressWarnings("SpringJavaInjectionPointsAutowiringInspection")
|
||||
public RunningController(ZookeeperService zookeeperService, Jackson2ObjectMapperBuilder builder) {
|
||||
this.zookeeperService = zookeeperService;
|
||||
this.mapper = builder.build();
|
||||
}
|
||||
|
||||
@GetMapping("sync")
|
||||
public AmisResponse sync() {
|
||||
return responseCrudData(
|
||||
zookeeperService.getChildren(NameHelper.ZK_SYNC_RUNNING_LOCK_PATH)
|
||||
.asParallel(EXECUTOR, 1)
|
||||
.collect(node -> {
|
||||
RunMeta.SyncRunMeta meta = null;
|
||||
try {
|
||||
String data = zookeeperService.getData(node.getPath());
|
||||
meta = mapper.readValue(data, RunMeta.SyncRunMeta.class);
|
||||
} catch (JsonProcessingException e) {
|
||||
logger.warn("Json parse failure for" + node.getPath(), e);
|
||||
}
|
||||
return new ZookeeperNodeVO(node, meta);
|
||||
})
|
||||
.toSortedListBy(ZookeeperNodeVO::getCreateTime)
|
||||
.toReversed()
|
||||
.toImmutable()
|
||||
);
|
||||
}
|
||||
|
||||
@GetMapping("compaction")
|
||||
public AmisResponse compaction() {
|
||||
return responseCrudData(
|
||||
zookeeperService.getChildren(NameHelper.ZK_COMPACTION_RUNNING_LOCK_PATH)
|
||||
.asParallel(EXECUTOR, 1)
|
||||
.collect(node -> {
|
||||
RunMeta.CompactionRunMeta meta = null;
|
||||
try {
|
||||
String data = zookeeperService.getData(node.getPath());
|
||||
meta = mapper.readValue(data, RunMeta.CompactionRunMeta.class);
|
||||
} catch (JsonProcessingException e) {
|
||||
logger.warn("Json parse failure for" + node.getPath(), e);
|
||||
}
|
||||
return new ZookeeperNodeVO(node, meta);
|
||||
})
|
||||
.toSortedListBy(ZookeeperNodeVO::getCreateTime)
|
||||
.toReversed()
|
||||
.toImmutable()
|
||||
);
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,114 @@
|
||||
package com.lanyuanxiaoyao.service.web.entity;
|
||||
|
||||
import cn.hutool.core.util.ObjectUtil;
|
||||
import com.eshore.odcp.hudi.connector.entity.RunMeta;
|
||||
import com.fasterxml.jackson.annotation.JsonIgnore;
|
||||
import com.lanyuanxiaoyao.service.configuration.entity.zookeeper.ZookeeperNode;
|
||||
import com.lanyuanxiaoyao.service.web.utils.DatetimeUtil;
|
||||
|
||||
import java.time.Instant;
|
||||
import java.util.Optional;
|
||||
|
||||
/**
|
||||
* Zookeeper 节点
|
||||
*
|
||||
* @author lanyuanxiaoyao
|
||||
* @date 2023-05-23
|
||||
*/
|
||||
public class ZookeeperNodeVO {
|
||||
@JsonIgnore
|
||||
private final ZookeeperNode node;
|
||||
private final RunMeta runMeta;
|
||||
private String createTimeFromNow;
|
||||
private String modifiedTimeFromNow;
|
||||
|
||||
public ZookeeperNodeVO(ZookeeperNode node, RunMeta runMeta) {
|
||||
this.node = node;
|
||||
this.runMeta = runMeta;
|
||||
|
||||
long now = Instant.now().toEpochMilli();
|
||||
if (ObjectUtil.isNotNull(node.getCreateTime()) && node.getCreateTime() != 0) {
|
||||
this.createTimeFromNow = DatetimeUtil.fromNow(now, node.getCreateTime());
|
||||
}
|
||||
if (ObjectUtil.isNotNull(node.getModifiedTime()) && node.getModifiedTime() != 0) {
|
||||
this.modifiedTimeFromNow = DatetimeUtil.fromNow(now, node.getModifiedTime());
|
||||
}
|
||||
}
|
||||
|
||||
public String getPath() {
|
||||
return node.getPath();
|
||||
}
|
||||
|
||||
public String getLabel() {
|
||||
return node.getLabel();
|
||||
}
|
||||
|
||||
public Long getCreateTime() {
|
||||
return node.getCreateTime();
|
||||
}
|
||||
|
||||
public Long getModifiedTime() {
|
||||
return node.getModifiedTime();
|
||||
}
|
||||
|
||||
public String getCluster() {
|
||||
return Optional.of(runMeta).map(RunMeta::getCluster).orElse(null);
|
||||
}
|
||||
|
||||
public String getFlinkJobId() {
|
||||
return Optional.of(runMeta).map(RunMeta::getFlinkJobId).orElse(null);
|
||||
}
|
||||
|
||||
public String getFlinkJobName() {
|
||||
return Optional.of(runMeta).map(RunMeta::getFlinkJobName).orElse(null);
|
||||
}
|
||||
|
||||
public String getHost() {
|
||||
return Optional.of(runMeta).map(RunMeta::getHost).orElse(null);
|
||||
}
|
||||
|
||||
public String getApplicationId() {
|
||||
return Optional.of(runMeta).map(RunMeta::getApplicationId).orElse(null);
|
||||
}
|
||||
|
||||
public String getContainerId() {
|
||||
return Optional.of(runMeta).map(RunMeta::getContainerId).orElse(null);
|
||||
}
|
||||
|
||||
public String getContainerPath() {
|
||||
return Optional.of(runMeta).map(RunMeta::getContainerPath).orElse(null);
|
||||
}
|
||||
|
||||
public String getRunType() {
|
||||
return Optional.of(runMeta).map(RunMeta::getRunType).orElse(null);
|
||||
}
|
||||
|
||||
public String getExecutorVersion() {
|
||||
return Optional.of(runMeta).map(RunMeta::getExecutorVersion).orElse(null);
|
||||
}
|
||||
|
||||
public String getJvmPid() {
|
||||
return Optional.of(runMeta).map(RunMeta::getJvmPid).orElse(null);
|
||||
}
|
||||
|
||||
public String getApplicationProxy() {
|
||||
return Optional.of(runMeta).map(RunMeta::getApplicationProxy).orElse(null);
|
||||
}
|
||||
|
||||
public String getCreateTimeFromNow() {
|
||||
return createTimeFromNow;
|
||||
}
|
||||
|
||||
public String getModifiedTimeFromNow() {
|
||||
return modifiedTimeFromNow;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return "ZookeeperNodeVO{" +
|
||||
"node=" + node +
|
||||
", createTimeFormNow='" + createTimeFromNow + '\'' +
|
||||
", modifiedTimeFormNow='" + modifiedTimeFromNow + '\'' +
|
||||
'}';
|
||||
}
|
||||
}
|
||||
@@ -1,9 +1,9 @@
|
||||
function timeAndFrom(field, fromNow, emptyText) {
|
||||
if (!emptyText) {
|
||||
emptyText = '未停止'
|
||||
function timeAndFrom(field, fromNow, emptyText = '未停止', showSource = true) {
|
||||
let tpl = "<span class='font-bold'>${" + fromNow + "}</span>"
|
||||
if (showSource) {
|
||||
tpl = tpl + "<br> ${IF(" + field + " === 0, '(" + emptyText + ")', CONCATENATE('(',DATETOSTR(DATE(" + field + ")),')'))}"
|
||||
}
|
||||
// language=TEXT
|
||||
return "<span class='font-bold'>${" + fromNow + "}</span><br> ${IF(" + field + " === 0, '(" + emptyText + ")', CONCATENATE('(',DATETOSTR(DATE(" + field + ")),')'))}"
|
||||
return tpl
|
||||
}
|
||||
|
||||
function yarnCrudColumns() {
|
||||
|
||||
@@ -0,0 +1,57 @@
|
||||
function runningTable(name) {
|
||||
return {
|
||||
type: 'table',
|
||||
title: `${name}`,
|
||||
itemAction: {
|
||||
type: 'button',
|
||||
actionType: 'dialog',
|
||||
dialog: {
|
||||
title: 'Path',
|
||||
body: '${path}',
|
||||
actions: [],
|
||||
}
|
||||
},
|
||||
columns: [
|
||||
{name: 'label', label: '名称'},
|
||||
{
|
||||
name: 'createTime',
|
||||
label: '创建时间',
|
||||
type: 'tpl',
|
||||
tpl: timeAndFrom('createTime', 'createTimeFromNow', undefined, false),
|
||||
align: 'center',
|
||||
width: 70,
|
||||
},
|
||||
]
|
||||
}
|
||||
}
|
||||
|
||||
function runningTab() {
|
||||
return {
|
||||
title: '运行中',
|
||||
tab: [
|
||||
{
|
||||
type: 'grid',
|
||||
columns: [
|
||||
{
|
||||
body: [
|
||||
{
|
||||
type: 'service',
|
||||
api: '${base}/running/sync',
|
||||
body: runningTable('同步')
|
||||
}
|
||||
]
|
||||
},
|
||||
{
|
||||
body: [
|
||||
{
|
||||
type: 'service',
|
||||
api: '${base}/running/compaction',
|
||||
body: runningTable('压缩')
|
||||
}
|
||||
]
|
||||
}
|
||||
]
|
||||
}
|
||||
],
|
||||
}
|
||||
}
|
||||
@@ -39,6 +39,7 @@
|
||||
<script src="components/yarn-tab.js"></script>
|
||||
<script src="components/cloud-tab.js"></script>
|
||||
<script src="components/queue-tab.js"></script>
|
||||
<script src="components/running-tab.js"></script>
|
||||
<script type="text/javascript">
|
||||
(function () {
|
||||
let amis = amisRequire('amis/embed')
|
||||
@@ -52,6 +53,7 @@
|
||||
unmountOnExit: true,
|
||||
tabsMode: 'strong',
|
||||
tabs: [
|
||||
// runningTab(),
|
||||
tableTab(),
|
||||
queueTab(),
|
||||
yarnTab('b5-sync', '同步 b5', undefined, 'Sync'),
|
||||
|
||||
@@ -1,10 +1,14 @@
|
||||
package com.lanyuanxiaoyao.service.zookeeper;
|
||||
|
||||
import cn.hutool.core.util.ObjectUtil;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import com.lanyuanxiaoyao.service.configuration.entity.zookeeper.ZookeeperNode;
|
||||
import org.apache.curator.framework.CuratorFramework;
|
||||
import org.apache.curator.framework.recipes.locks.InterProcessLock;
|
||||
import org.apache.curator.framework.recipes.locks.InterProcessMutex;
|
||||
import org.apache.zookeeper.data.Stat;
|
||||
import org.eclipse.collections.api.factory.Lists;
|
||||
import org.eclipse.collections.api.list.ImmutableList;
|
||||
import org.eclipse.collections.api.list.MutableList;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.springframework.web.bind.annotation.GetMapping;
|
||||
@@ -12,6 +16,8 @@ import org.springframework.web.bind.annotation.RequestMapping;
|
||||
import org.springframework.web.bind.annotation.RequestParam;
|
||||
import org.springframework.web.bind.annotation.RestController;
|
||||
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
/**
|
||||
* Zookeeper 查询
|
||||
*
|
||||
@@ -44,6 +50,20 @@ public class ZookeeperController {
|
||||
}
|
||||
}
|
||||
|
||||
@GetMapping("get_children")
|
||||
public ImmutableList<ZookeeperNode> children(@RequestParam("path") String path) throws Exception {
|
||||
if (existsPath(path)) {
|
||||
MutableList<ZookeeperNode> nodes = Lists.mutable.empty();
|
||||
for (String label : client.getChildren().forPath(path)) {
|
||||
String childPath = path + "/" + label;
|
||||
Stat stat = client.checkExists().forPath(childPath);
|
||||
nodes.add(new ZookeeperNode(childPath, label, stat.getCtime(), stat.getMtime()));
|
||||
}
|
||||
return nodes.toImmutable();
|
||||
}
|
||||
return Lists.immutable.empty();
|
||||
}
|
||||
|
||||
@GetMapping("exists_lock")
|
||||
public Boolean existsLock(@RequestParam("path") String path) throws Exception {
|
||||
InterProcessLock lock = new InterProcessMutex(client, path);
|
||||
|
||||
Reference in New Issue
Block a user