refactor(web): 跟随上游更新
This commit is contained in:
@@ -5,6 +5,7 @@ import com.eshore.odcp.hudi.connector.utils.NameHelper;
|
|||||||
import com.fasterxml.jackson.core.JsonProcessingException;
|
import com.fasterxml.jackson.core.JsonProcessingException;
|
||||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||||
import com.lanyuanxiaoyao.service.configuration.entity.AmisResponse;
|
import com.lanyuanxiaoyao.service.configuration.entity.AmisResponse;
|
||||||
|
import com.lanyuanxiaoyao.service.configuration.entity.zookeeper.ZookeeperNode;
|
||||||
import com.lanyuanxiaoyao.service.forest.service.ZookeeperService;
|
import com.lanyuanxiaoyao.service.forest.service.ZookeeperService;
|
||||||
import com.lanyuanxiaoyao.service.web.entity.ZookeeperNodeVO;
|
import com.lanyuanxiaoyao.service.web.entity.ZookeeperNodeVO;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
@@ -43,16 +44,7 @@ public class RunningController extends BaseController {
|
|||||||
return responseCrudData(
|
return responseCrudData(
|
||||||
zookeeperService.getChildren(NameHelper.ZK_SYNC_RUNNING_LOCK_PATH)
|
zookeeperService.getChildren(NameHelper.ZK_SYNC_RUNNING_LOCK_PATH)
|
||||||
.asParallel(EXECUTOR, 1)
|
.asParallel(EXECUTOR, 1)
|
||||||
.collect(node -> {
|
.collect(this::parseRunMeta)
|
||||||
RunMeta.SyncRunMeta meta = null;
|
|
||||||
try {
|
|
||||||
String data = zookeeperService.getData(node.getPath());
|
|
||||||
meta = mapper.readValue(data, RunMeta.SyncRunMeta.class);
|
|
||||||
} catch (JsonProcessingException e) {
|
|
||||||
logger.warn("Json parse failure for" + node.getPath(), e);
|
|
||||||
}
|
|
||||||
return new ZookeeperNodeVO(node, meta);
|
|
||||||
})
|
|
||||||
.toSortedListBy(ZookeeperNodeVO::getCreateTime)
|
.toSortedListBy(ZookeeperNodeVO::getCreateTime)
|
||||||
.toReversed()
|
.toReversed()
|
||||||
.toImmutable()
|
.toImmutable()
|
||||||
@@ -64,19 +56,21 @@ public class RunningController extends BaseController {
|
|||||||
return responseCrudData(
|
return responseCrudData(
|
||||||
zookeeperService.getChildren(NameHelper.ZK_COMPACTION_RUNNING_LOCK_PATH)
|
zookeeperService.getChildren(NameHelper.ZK_COMPACTION_RUNNING_LOCK_PATH)
|
||||||
.asParallel(EXECUTOR, 1)
|
.asParallel(EXECUTOR, 1)
|
||||||
.collect(node -> {
|
.collect(this::parseRunMeta)
|
||||||
RunMeta.CompactionRunMeta meta = null;
|
|
||||||
try {
|
|
||||||
String data = zookeeperService.getData(node.getPath());
|
|
||||||
meta = mapper.readValue(data, RunMeta.CompactionRunMeta.class);
|
|
||||||
} catch (JsonProcessingException e) {
|
|
||||||
logger.warn("Json parse failure for" + node.getPath(), e);
|
|
||||||
}
|
|
||||||
return new ZookeeperNodeVO(node, meta);
|
|
||||||
})
|
|
||||||
.toSortedListBy(ZookeeperNodeVO::getCreateTime)
|
.toSortedListBy(ZookeeperNodeVO::getCreateTime)
|
||||||
.toReversed()
|
.toReversed()
|
||||||
.toImmutable()
|
.toImmutable()
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private ZookeeperNodeVO parseRunMeta(ZookeeperNode node) {
|
||||||
|
RunMeta meta = null;
|
||||||
|
try {
|
||||||
|
String data = zookeeperService.getData(node.getPath());
|
||||||
|
meta = mapper.readValue(data, RunMeta.class);
|
||||||
|
} catch (JsonProcessingException e) {
|
||||||
|
logger.warn("Json parse failure for" + node.getPath(), e);
|
||||||
|
}
|
||||||
|
return new ZookeeperNodeVO(node, meta);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -93,11 +93,11 @@ public class TableController extends BaseController {
|
|||||||
CompletableFuture<TableMeta> tableMetaFuture = CompletableFuture.supplyAsync(() -> infoService.tableMetaDetail(item.getId(), item.getAlias()), EXECUTOR);
|
CompletableFuture<TableMeta> tableMetaFuture = CompletableFuture.supplyAsync(() -> infoService.tableMetaDetail(item.getId(), item.getAlias()), EXECUTOR);
|
||||||
CompletableFuture<SyncState> syncStateFuture = CompletableFuture.supplyAsync(() -> infoService.syncStateDetail(item.getId(), item.getAlias()), EXECUTOR);
|
CompletableFuture<SyncState> syncStateFuture = CompletableFuture.supplyAsync(() -> infoService.syncStateDetail(item.getId(), item.getAlias()), EXECUTOR);
|
||||||
CompletableFuture<RunMeta> syncRuntime = CompletableFuture
|
CompletableFuture<RunMeta> syncRuntime = CompletableFuture
|
||||||
.supplyAsync(() -> zookeeperService.existsPath(NameHelper.syncRunningLockPath(item.getId())), EXECUTOR)
|
.supplyAsync(() -> zookeeperService.existsPath(NameHelper.syncRunningLockPath(item.getId(), item.getAlias())), EXECUTOR)
|
||||||
.thenApply(running -> {
|
.thenApply(running -> {
|
||||||
if (running) {
|
if (running) {
|
||||||
try {
|
try {
|
||||||
String data = zookeeperService.getData(NameHelper.syncRunningLockPath(item.getId()));
|
String data = zookeeperService.getData(NameHelper.syncRunningLockPath(item.getId(), item.getAlias()));
|
||||||
if (StrUtil.isNotBlank(data)) {
|
if (StrUtil.isNotBlank(data)) {
|
||||||
return mapper.readValue(data, RunMeta.class);
|
return mapper.readValue(data, RunMeta.class);
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -55,7 +55,7 @@ public class ZookeeperNodeVO {
|
|||||||
return Optional.of(runMeta).map(RunMeta::getCluster).orElse(null);
|
return Optional.of(runMeta).map(RunMeta::getCluster).orElse(null);
|
||||||
}
|
}
|
||||||
|
|
||||||
public String getFlinkJobId() {
|
public Long getFlinkJobId() {
|
||||||
return Optional.of(runMeta).map(RunMeta::getFlinkJobId).orElse(null);
|
return Optional.of(runMeta).map(RunMeta::getFlinkJobId).orElse(null);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user