perf(web): 增强查询同步压缩任务存在的性能

先判断是否有同步压缩锁,有再获取运行指标
This commit is contained in:
2023-05-14 00:50:54 +08:00
parent 41ca5319d2
commit 3f0d9f11da

View File

@@ -92,48 +92,54 @@ public class TableController extends BaseController {
CompletableFuture<FlinkJob> flinkJobFuture = CompletableFuture.supplyAsync(() -> infoService.flinkJobDetail(item.getId()), EXECUTOR);
CompletableFuture<TableMeta> tableMetaFuture = CompletableFuture.supplyAsync(() -> infoService.tableMetaDetail(item.getId(), item.getAlias()), EXECUTOR);
CompletableFuture<SyncState> syncStateFuture = CompletableFuture.supplyAsync(() -> infoService.syncStateDetail(item.getId(), item.getAlias()), EXECUTOR);
CompletableFuture<Boolean> syncRunning = CompletableFuture.supplyAsync(() -> zookeeperService.existsPath(NameHelper.syncRunningLockPath(item.getId())), EXECUTOR);
CompletableFuture<RunMeta> syncRuntime = CompletableFuture.supplyAsync(() -> {
try {
String data = zookeeperService.getData(NameHelper.syncRunningLockPath(item.getId()));
if (StrUtil.isNotBlank(data)) {
return mapper.readValue(data, RunMeta.class);
}
} catch (Exception e) {
logger.error("Parse sync runtime info for: {}", item.getId());
}
return null;
}, EXECUTOR);
CompletableFuture<Boolean> compactionRunning = CompletableFuture.supplyAsync(() -> zookeeperService.existsPath(NameHelper.compactionRunningLockPath(item.getId(), item.getAlias())), EXECUTOR);
CompletableFuture<RunMeta> compactionRuntime = CompletableFuture.supplyAsync(() -> {
try {
String data = zookeeperService.getData(NameHelper.compactionRunningLockPath(item.getId(), item.getAlias()));
if (StrUtil.isNotBlank(data)) {
return mapper.readValue(data, RunMeta.class);
}
} catch (Exception e) {
logger.error("Parse compaction runtime info for: {}", item.getId());
}
return null;
}, EXECUTOR);
CompletableFuture<RunMeta> syncRuntime = CompletableFuture
.supplyAsync(() -> zookeeperService.existsPath(NameHelper.syncRunningLockPath(item.getId())), EXECUTOR)
.thenApply(running -> {
if (running) {
try {
String data = zookeeperService.getData(NameHelper.syncRunningLockPath(item.getId()));
if (StrUtil.isNotBlank(data)) {
return mapper.readValue(data, RunMeta.class);
}
} catch (Exception e) {
logger.error("Parse sync runtime info for: {}", item.getId());
}
}
return null;
});
CompletableFuture<RunMeta> compactionRuntime = CompletableFuture
.supplyAsync(() -> zookeeperService.existsPath(NameHelper.compactionRunningLockPath(item.getId(), item.getAlias())), EXECUTOR)
.thenApply(running -> {
if (running) {
try {
String data = zookeeperService.getData(NameHelper.compactionRunningLockPath(item.getId(), item.getAlias()));
if (StrUtil.isNotBlank(data)) {
return mapper.readValue(data, RunMeta.class);
}
} catch (Exception e) {
logger.error("Parse sync runtime info for: {}", item.getId());
}
}
return null;
});
try {
CompletableFuture.allOf(
flinkJobFuture,
tableMetaFuture,
syncStateFuture,
syncRunning,
syncRuntime,
compactionRunning,
compactionRuntime
).get();
RunMeta syncRunMeta = syncRuntime.get();
RunMeta compactionRunMeta = compactionRuntime.get();
return new TableVO(
flinkJobFuture.get(),
tableMetaFuture.get(),
new SyncStateVO(syncStateFuture.get()),
syncRunning.get(),
syncRuntime.get(),
compactionRunning.get(),
compactionRuntime.get()
ObjectUtil.isNotNull(syncRunMeta),
syncRunMeta,
ObjectUtil.isNotNull(compactionRunMeta),
compactionRunMeta
);
} catch (InterruptedException | ExecutionException e) {
logger.error("Something bad", e);