feature(web): 使用 zookeeper 查询表同步和压缩是否存在
This commit is contained in:
@@ -3,14 +3,16 @@ package com.lanyuanxiaoyao.service.web.controller;
|
||||
import cn.hutool.core.util.ObjectUtil;
|
||||
import cn.hutool.core.util.StrUtil;
|
||||
import com.eshore.odcp.hudi.connector.entity.FlinkJob;
|
||||
import com.eshore.odcp.hudi.connector.entity.RunMeta;
|
||||
import com.eshore.odcp.hudi.connector.entity.SyncState;
|
||||
import com.eshore.odcp.hudi.connector.entity.TableMeta;
|
||||
import com.eshore.odcp.hudi.connector.utils.NameHelper;
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.lanyuanxiaoyao.service.configuration.entity.AmisResponse;
|
||||
import com.lanyuanxiaoyao.service.configuration.entity.PageResponse;
|
||||
import com.lanyuanxiaoyao.service.configuration.entity.info.JobIdAndAlias;
|
||||
import com.lanyuanxiaoyao.service.forest.service.InfoService;
|
||||
import com.lanyuanxiaoyao.service.forest.service.YarnB1Service;
|
||||
import com.lanyuanxiaoyao.service.forest.service.YarnB5SyncService;
|
||||
import com.lanyuanxiaoyao.service.forest.service.ZookeeperService;
|
||||
import com.lanyuanxiaoyao.service.web.entity.SyncStateVO;
|
||||
import com.lanyuanxiaoyao.service.web.entity.TableVO;
|
||||
import java.util.List;
|
||||
@@ -24,6 +26,7 @@ import org.eclipse.collections.api.list.ImmutableList;
|
||||
import org.eclipse.collections.api.map.MutableMap;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.springframework.http.converter.json.Jackson2ObjectMapperBuilder;
|
||||
import org.springframework.web.bind.annotation.GetMapping;
|
||||
import org.springframework.web.bind.annotation.RequestMapping;
|
||||
import org.springframework.web.bind.annotation.RequestParam;
|
||||
@@ -39,16 +42,16 @@ import org.springframework.web.bind.annotation.RestController;
|
||||
@RequestMapping("table")
|
||||
public class TableController extends BaseController {
|
||||
private static final Logger logger = LoggerFactory.getLogger(TableController.class);
|
||||
private static final ExecutorService EXECUTOR = Executors.newWorkStealingPool(20);
|
||||
private static final ExecutorService EXECUTOR = Executors.newWorkStealingPool(50);
|
||||
private final InfoService infoService;
|
||||
private final YarnB5SyncService yarnB5SyncService;
|
||||
private final YarnB1Service yarnB1Service;
|
||||
private final ZookeeperService zookeeperService;
|
||||
private final ObjectMapper mapper;
|
||||
|
||||
@SuppressWarnings("SpringJavaInjectionPointsAutowiringInspection")
|
||||
public TableController(InfoService infoService, YarnB5SyncService yarnB5SyncService, YarnB1Service yarnB1Service) {
|
||||
public TableController(InfoService infoService, ZookeeperService zookeeperService, Jackson2ObjectMapperBuilder jackson2ObjectMapperBuilder) {
|
||||
this.infoService = infoService;
|
||||
this.yarnB5SyncService = yarnB5SyncService;
|
||||
this.yarnB1Service = yarnB1Service;
|
||||
this.zookeeperService = zookeeperService;
|
||||
this.mapper = jackson2ObjectMapperBuilder.build();
|
||||
}
|
||||
|
||||
@GetMapping("list")
|
||||
@@ -62,12 +65,6 @@ public class TableController extends BaseController {
|
||||
@RequestParam(value = "filter_run_mode", required = false) List<String> runMode,
|
||||
@RequestParam(value = "filter_compaction_status", required = false) List<String> compactionStatus
|
||||
) {
|
||||
|
||||
/*CompletableFuture<ImmutableList<YarnApplication>> syncAppsFuture = CompletableFuture.supplyAsync(syncYarnService::jobList, EXECUTOR);
|
||||
CompletableFuture<ImmutableList<YarnApplication>> compactionAppsFuture = CompletableFuture.supplyAsync(compactionYarnService::jobList, EXECUTOR);
|
||||
AsyncUtil.waitAll(syncAppsFuture, compactionAppsFuture);
|
||||
ImmutableList<YarnApplication> syncApps = AsyncUtil.get(syncAppsFuture);
|
||||
ImmutableList<YarnApplication> compactionApps = AsyncUtil.get(compactionAppsFuture);*/
|
||||
MutableMap<String, Object> queryMap = Maps.mutable.empty();
|
||||
queryMap.put("page", page);
|
||||
queryMap.put("count", count);
|
||||
@@ -95,10 +92,51 @@ public class TableController extends BaseController {
|
||||
CompletableFuture<FlinkJob> flinkJobFuture = CompletableFuture.supplyAsync(() -> infoService.flinkJobDetail(item.getId()), EXECUTOR);
|
||||
CompletableFuture<TableMeta> tableMetaFuture = CompletableFuture.supplyAsync(() -> infoService.tableMetaDetail(item.getId(), item.getAlias()), EXECUTOR);
|
||||
CompletableFuture<SyncState> syncStateFuture = CompletableFuture.supplyAsync(() -> infoService.syncStateDetail(item.getId(), item.getAlias()), EXECUTOR);
|
||||
CompletableFuture<Boolean> syncRunning = CompletableFuture.supplyAsync(() -> zookeeperService.existsPath(NameHelper.syncRunningLockPath(item.getId())), EXECUTOR);
|
||||
CompletableFuture<RunMeta> syncRuntime = CompletableFuture.supplyAsync(() -> {
|
||||
try {
|
||||
String data = zookeeperService.getData(NameHelper.syncRunningLockPath(item.getId()));
|
||||
if (StrUtil.isNotBlank(data)) {
|
||||
return mapper.readValue(data, RunMeta.class);
|
||||
}
|
||||
} catch (Exception e) {
|
||||
logger.error("Parse sync runtime info for: {}", item.getId());
|
||||
}
|
||||
return null;
|
||||
}, EXECUTOR);
|
||||
CompletableFuture<Boolean> compactionRunning = CompletableFuture.supplyAsync(() -> zookeeperService.existsPath(NameHelper.compactionRunningLockPath(item.getId(), item.getAlias())), EXECUTOR);
|
||||
CompletableFuture<RunMeta> compactionRuntime = CompletableFuture.supplyAsync(() -> {
|
||||
try {
|
||||
String data = zookeeperService.getData(NameHelper.compactionRunningLockPath(item.getId(), item.getAlias()));
|
||||
if (StrUtil.isNotBlank(data)) {
|
||||
return mapper.readValue(data, RunMeta.class);
|
||||
}
|
||||
} catch (Exception e) {
|
||||
logger.error("Parse compaction runtime info for: {}", item.getId());
|
||||
}
|
||||
return null;
|
||||
}, EXECUTOR);
|
||||
try {
|
||||
CompletableFuture.allOf(flinkJobFuture, tableMetaFuture, syncStateFuture).get();
|
||||
return new TableVO(flinkJobFuture.get(), tableMetaFuture.get(), new SyncStateVO(syncStateFuture.get()));
|
||||
CompletableFuture.allOf(
|
||||
flinkJobFuture,
|
||||
tableMetaFuture,
|
||||
syncStateFuture,
|
||||
syncRunning,
|
||||
syncRuntime,
|
||||
compactionRunning,
|
||||
compactionRuntime
|
||||
).get();
|
||||
return new TableVO(
|
||||
flinkJobFuture.get(),
|
||||
tableMetaFuture.get(),
|
||||
new SyncStateVO(syncStateFuture.get()),
|
||||
syncRunning.get(),
|
||||
syncRuntime.get(),
|
||||
compactionRunning.get(),
|
||||
compactionRuntime.get()
|
||||
);
|
||||
} catch (InterruptedException | ExecutionException e) {
|
||||
logger.error("Something bad", e);
|
||||
return null;
|
||||
}
|
||||
})
|
||||
@@ -113,6 +151,6 @@ public class TableController extends BaseController {
|
||||
if (ObjectUtil.isNull(flinkJobId)) {
|
||||
throw new Exception("flink job id is null");
|
||||
}
|
||||
return responseCrudData(infoService.tableMetaList(flinkJobId).collect(meta -> new TableVO(null, meta, null)));
|
||||
return responseCrudData(infoService.tableMetaList(flinkJobId).collect(meta -> new TableVO(null, meta, null, null, null, null, null)));
|
||||
}
|
||||
}
|
||||
|
||||
@@ -2,6 +2,7 @@ package com.lanyuanxiaoyao.service.web.entity;
|
||||
|
||||
import cn.hutool.core.util.ObjectUtil;
|
||||
import com.eshore.odcp.hudi.connector.entity.FlinkJob;
|
||||
import com.eshore.odcp.hudi.connector.entity.RunMeta;
|
||||
import com.eshore.odcp.hudi.connector.entity.TableMeta;
|
||||
import com.eshore.odcp.hudi.connector.utils.NameHelper;
|
||||
|
||||
@@ -16,15 +17,23 @@ public class TableVO {
|
||||
private final FlinkJob flinkJob;
|
||||
private final TableMeta tableMeta;
|
||||
private final SyncStateVO syncState;
|
||||
private final Boolean syncRunning;
|
||||
private final RunMeta syncRuntime;
|
||||
private final Boolean compactionRunning;
|
||||
private final RunMeta compactionRuntime;
|
||||
private String syncJobName;
|
||||
private YarnApplicationVO syncApp;
|
||||
private String compactionJobName;
|
||||
private YarnApplicationVO compactionApp;
|
||||
|
||||
public TableVO(FlinkJob flinkJob, TableMeta tableMeta, SyncStateVO syncState) {
|
||||
public TableVO(FlinkJob flinkJob, TableMeta tableMeta, SyncStateVO syncState, Boolean syncRunning, RunMeta syncRuntime, Boolean compactionRunning, RunMeta compactionRuntime) {
|
||||
this.flinkJobId = ObjectUtil.isNull(flinkJob) ? null : flinkJob.getId().toString();
|
||||
this.flinkJob = flinkJob;
|
||||
this.tableMeta = tableMeta;
|
||||
this.syncRunning = syncRunning;
|
||||
this.syncRuntime = syncRuntime;
|
||||
this.compactionRunning = compactionRunning;
|
||||
this.compactionRuntime = compactionRuntime;
|
||||
if (ObjectUtil.isNotNull(flinkJob) && ObjectUtil.isNotNull(tableMeta)) {
|
||||
this.syncJobName = NameHelper.syncJobName(this.flinkJob.getId(), this.flinkJob.getName());
|
||||
this.compactionJobName = NameHelper.compactionJobName(this.flinkJob.getId(), this.tableMeta.getSchema(), this.tableMeta.getAlias());
|
||||
@@ -44,6 +53,22 @@ public class TableVO {
|
||||
return tableMeta;
|
||||
}
|
||||
|
||||
public Boolean getSyncRunning() {
|
||||
return syncRunning;
|
||||
}
|
||||
|
||||
public RunMeta getSyncRuntime() {
|
||||
return syncRuntime;
|
||||
}
|
||||
|
||||
public Boolean getCompactionRunning() {
|
||||
return compactionRunning;
|
||||
}
|
||||
|
||||
public RunMeta getCompactionRuntime() {
|
||||
return compactionRuntime;
|
||||
}
|
||||
|
||||
public String getSyncJobName() {
|
||||
return syncJobName;
|
||||
}
|
||||
@@ -75,13 +100,18 @@ public class TableVO {
|
||||
@Override
|
||||
public String toString() {
|
||||
return "TableVO{" +
|
||||
"flinkJob=" + flinkJob +
|
||||
"flinkJobId='" + flinkJobId + '\'' +
|
||||
", flinkJob=" + flinkJob +
|
||||
", tableMeta=" + tableMeta +
|
||||
", syncState=" + syncState +
|
||||
", syncRunning=" + syncRunning +
|
||||
", syncRuntime=" + syncRuntime +
|
||||
", compactionRunning=" + compactionRunning +
|
||||
", compactionRuntime=" + compactionRuntime +
|
||||
", syncJobName='" + syncJobName + '\'' +
|
||||
", syncApp=" + syncApp +
|
||||
", compactionJobName='" + compactionJobName + '\'' +
|
||||
", compactionApp=" + compactionApp +
|
||||
", syncState=" + syncState +
|
||||
'}';
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user