diff --git a/service-web/src/main/java/com/lanyuanxiaoyao/service/web/controller/TableController.java b/service-web/src/main/java/com/lanyuanxiaoyao/service/web/controller/TableController.java index b547365..57df1c4 100644 --- a/service-web/src/main/java/com/lanyuanxiaoyao/service/web/controller/TableController.java +++ b/service-web/src/main/java/com/lanyuanxiaoyao/service/web/controller/TableController.java @@ -3,14 +3,16 @@ package com.lanyuanxiaoyao.service.web.controller; import cn.hutool.core.util.ObjectUtil; import cn.hutool.core.util.StrUtil; import com.eshore.odcp.hudi.connector.entity.FlinkJob; +import com.eshore.odcp.hudi.connector.entity.RunMeta; import com.eshore.odcp.hudi.connector.entity.SyncState; import com.eshore.odcp.hudi.connector.entity.TableMeta; +import com.eshore.odcp.hudi.connector.utils.NameHelper; +import com.fasterxml.jackson.databind.ObjectMapper; import com.lanyuanxiaoyao.service.configuration.entity.AmisResponse; import com.lanyuanxiaoyao.service.configuration.entity.PageResponse; import com.lanyuanxiaoyao.service.configuration.entity.info.JobIdAndAlias; import com.lanyuanxiaoyao.service.forest.service.InfoService; -import com.lanyuanxiaoyao.service.forest.service.YarnB1Service; -import com.lanyuanxiaoyao.service.forest.service.YarnB5SyncService; +import com.lanyuanxiaoyao.service.forest.service.ZookeeperService; import com.lanyuanxiaoyao.service.web.entity.SyncStateVO; import com.lanyuanxiaoyao.service.web.entity.TableVO; import java.util.List; @@ -24,6 +26,7 @@ import org.eclipse.collections.api.list.ImmutableList; import org.eclipse.collections.api.map.MutableMap; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.springframework.http.converter.json.Jackson2ObjectMapperBuilder; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RequestParam; @@ -39,16 +42,16 @@ import org.springframework.web.bind.annotation.RestController; @RequestMapping("table") public class TableController extends BaseController { private static final Logger logger = LoggerFactory.getLogger(TableController.class); - private static final ExecutorService EXECUTOR = Executors.newWorkStealingPool(20); + private static final ExecutorService EXECUTOR = Executors.newWorkStealingPool(50); private final InfoService infoService; - private final YarnB5SyncService yarnB5SyncService; - private final YarnB1Service yarnB1Service; + private final ZookeeperService zookeeperService; + private final ObjectMapper mapper; @SuppressWarnings("SpringJavaInjectionPointsAutowiringInspection") - public TableController(InfoService infoService, YarnB5SyncService yarnB5SyncService, YarnB1Service yarnB1Service) { + public TableController(InfoService infoService, ZookeeperService zookeeperService, Jackson2ObjectMapperBuilder jackson2ObjectMapperBuilder) { this.infoService = infoService; - this.yarnB5SyncService = yarnB5SyncService; - this.yarnB1Service = yarnB1Service; + this.zookeeperService = zookeeperService; + this.mapper = jackson2ObjectMapperBuilder.build(); } @GetMapping("list") @@ -62,12 +65,6 @@ public class TableController extends BaseController { @RequestParam(value = "filter_run_mode", required = false) List runMode, @RequestParam(value = "filter_compaction_status", required = false) List compactionStatus ) { - - /*CompletableFuture> syncAppsFuture = CompletableFuture.supplyAsync(syncYarnService::jobList, EXECUTOR); - CompletableFuture> compactionAppsFuture = CompletableFuture.supplyAsync(compactionYarnService::jobList, EXECUTOR); - AsyncUtil.waitAll(syncAppsFuture, compactionAppsFuture); - ImmutableList syncApps = AsyncUtil.get(syncAppsFuture); - ImmutableList compactionApps = AsyncUtil.get(compactionAppsFuture);*/ MutableMap queryMap = Maps.mutable.empty(); queryMap.put("page", page); queryMap.put("count", count); @@ -95,10 +92,51 @@ public class TableController extends BaseController { CompletableFuture flinkJobFuture = CompletableFuture.supplyAsync(() -> infoService.flinkJobDetail(item.getId()), EXECUTOR); CompletableFuture tableMetaFuture = CompletableFuture.supplyAsync(() -> infoService.tableMetaDetail(item.getId(), item.getAlias()), EXECUTOR); CompletableFuture syncStateFuture = CompletableFuture.supplyAsync(() -> infoService.syncStateDetail(item.getId(), item.getAlias()), EXECUTOR); + CompletableFuture syncRunning = CompletableFuture.supplyAsync(() -> zookeeperService.existsPath(NameHelper.syncRunningLockPath(item.getId())), EXECUTOR); + CompletableFuture syncRuntime = CompletableFuture.supplyAsync(() -> { + try { + String data = zookeeperService.getData(NameHelper.syncRunningLockPath(item.getId())); + if (StrUtil.isNotBlank(data)) { + return mapper.readValue(data, RunMeta.class); + } + } catch (Exception e) { + logger.error("Parse sync runtime info for: {}", item.getId()); + } + return null; + }, EXECUTOR); + CompletableFuture compactionRunning = CompletableFuture.supplyAsync(() -> zookeeperService.existsPath(NameHelper.compactionRunningLockPath(item.getId(), item.getAlias())), EXECUTOR); + CompletableFuture compactionRuntime = CompletableFuture.supplyAsync(() -> { + try { + String data = zookeeperService.getData(NameHelper.compactionRunningLockPath(item.getId(), item.getAlias())); + if (StrUtil.isNotBlank(data)) { + return mapper.readValue(data, RunMeta.class); + } + } catch (Exception e) { + logger.error("Parse compaction runtime info for: {}", item.getId()); + } + return null; + }, EXECUTOR); try { - CompletableFuture.allOf(flinkJobFuture, tableMetaFuture, syncStateFuture).get(); - return new TableVO(flinkJobFuture.get(), tableMetaFuture.get(), new SyncStateVO(syncStateFuture.get())); + CompletableFuture.allOf( + flinkJobFuture, + tableMetaFuture, + syncStateFuture, + syncRunning, + syncRuntime, + compactionRunning, + compactionRuntime + ).get(); + return new TableVO( + flinkJobFuture.get(), + tableMetaFuture.get(), + new SyncStateVO(syncStateFuture.get()), + syncRunning.get(), + syncRuntime.get(), + compactionRunning.get(), + compactionRuntime.get() + ); } catch (InterruptedException | ExecutionException e) { + logger.error("Something bad", e); return null; } }) @@ -113,6 +151,6 @@ public class TableController extends BaseController { if (ObjectUtil.isNull(flinkJobId)) { throw new Exception("flink job id is null"); } - return responseCrudData(infoService.tableMetaList(flinkJobId).collect(meta -> new TableVO(null, meta, null))); + return responseCrudData(infoService.tableMetaList(flinkJobId).collect(meta -> new TableVO(null, meta, null, null, null, null, null))); } } diff --git a/service-web/src/main/java/com/lanyuanxiaoyao/service/web/entity/TableVO.java b/service-web/src/main/java/com/lanyuanxiaoyao/service/web/entity/TableVO.java index 4af5eab..001a35a 100644 --- a/service-web/src/main/java/com/lanyuanxiaoyao/service/web/entity/TableVO.java +++ b/service-web/src/main/java/com/lanyuanxiaoyao/service/web/entity/TableVO.java @@ -2,6 +2,7 @@ package com.lanyuanxiaoyao.service.web.entity; import cn.hutool.core.util.ObjectUtil; import com.eshore.odcp.hudi.connector.entity.FlinkJob; +import com.eshore.odcp.hudi.connector.entity.RunMeta; import com.eshore.odcp.hudi.connector.entity.TableMeta; import com.eshore.odcp.hudi.connector.utils.NameHelper; @@ -16,15 +17,23 @@ public class TableVO { private final FlinkJob flinkJob; private final TableMeta tableMeta; private final SyncStateVO syncState; + private final Boolean syncRunning; + private final RunMeta syncRuntime; + private final Boolean compactionRunning; + private final RunMeta compactionRuntime; private String syncJobName; private YarnApplicationVO syncApp; private String compactionJobName; private YarnApplicationVO compactionApp; - public TableVO(FlinkJob flinkJob, TableMeta tableMeta, SyncStateVO syncState) { + public TableVO(FlinkJob flinkJob, TableMeta tableMeta, SyncStateVO syncState, Boolean syncRunning, RunMeta syncRuntime, Boolean compactionRunning, RunMeta compactionRuntime) { this.flinkJobId = ObjectUtil.isNull(flinkJob) ? null : flinkJob.getId().toString(); this.flinkJob = flinkJob; this.tableMeta = tableMeta; + this.syncRunning = syncRunning; + this.syncRuntime = syncRuntime; + this.compactionRunning = compactionRunning; + this.compactionRuntime = compactionRuntime; if (ObjectUtil.isNotNull(flinkJob) && ObjectUtil.isNotNull(tableMeta)) { this.syncJobName = NameHelper.syncJobName(this.flinkJob.getId(), this.flinkJob.getName()); this.compactionJobName = NameHelper.compactionJobName(this.flinkJob.getId(), this.tableMeta.getSchema(), this.tableMeta.getAlias()); @@ -44,6 +53,22 @@ public class TableVO { return tableMeta; } + public Boolean getSyncRunning() { + return syncRunning; + } + + public RunMeta getSyncRuntime() { + return syncRuntime; + } + + public Boolean getCompactionRunning() { + return compactionRunning; + } + + public RunMeta getCompactionRuntime() { + return compactionRuntime; + } + public String getSyncJobName() { return syncJobName; } @@ -75,13 +100,18 @@ public class TableVO { @Override public String toString() { return "TableVO{" + - "flinkJob=" + flinkJob + + "flinkJobId='" + flinkJobId + '\'' + + ", flinkJob=" + flinkJob + ", tableMeta=" + tableMeta + + ", syncState=" + syncState + + ", syncRunning=" + syncRunning + + ", syncRuntime=" + syncRuntime + + ", compactionRunning=" + compactionRunning + + ", compactionRuntime=" + compactionRuntime + ", syncJobName='" + syncJobName + '\'' + ", syncApp=" + syncApp + ", compactionJobName='" + compactionJobName + '\'' + ", compactionApp=" + compactionApp + - ", syncState=" + syncState + '}'; } }