feature(web): 增加批量增加压缩调度

This commit is contained in:
2023-07-06 12:28:46 +08:00
parent 30f42b2734
commit e47de5d5c6
6 changed files with 142 additions and 5 deletions

View File

@@ -0,0 +1,9 @@
package com.lanyuanxiaoyao.service.configuration;
/**
* @author lanyuanxiaoyao
* @date 2023-07-06
*/
public interface Constants {
String SERVICE_NAME_INFO_QUERY = "info-query";
}

View File

@@ -0,0 +1,27 @@
package com.lanyuanxiaoyao.service.configuration.exception;
import cn.hutool.core.util.StrUtil;
/**
* Hudi 服务异常
*
* @author lanyuanxiaoyao
* @date 2023-07-06
*/
public class HudiServiceException extends Exception {
public HudiServiceException(String serviceName) {
super(StrUtil.format("[{}] cause exception", serviceName));
}
public HudiServiceException(String serviceName, String message) {
super(StrUtil.format("[{}] {}", serviceName, message));
}
public HudiServiceException(String serviceName, String message, Throwable cause) {
super(StrUtil.format("[{}] {}", serviceName, message), cause);
}
public HudiServiceException(String serviceName, Throwable cause) {
super(StrUtil.format("[{}] cause exception"), cause);
}
}

View File

@@ -0,0 +1,13 @@
package com.lanyuanxiaoyao.service.configuration.exception;
import cn.hutool.core.util.StrUtil;
/**
* @author lanyuanxiaoyao
* @date 2023-07-06
*/
public class TableNotFoundException extends HudiServiceException {
public TableNotFoundException(String serviceName, Long flinkJobId, String alias) {
super(serviceName, StrUtil.format("Table {} {} is not found", flinkJobId, alias));
}
}

View File

@@ -2,7 +2,10 @@ package com.lanyuanxiaoyao.service.web.controller;
import cn.hutool.core.map.MapUtil;
import cn.hutool.core.util.StrUtil;
import com.lanyuanxiaoyao.service.configuration.Constants;
import com.lanyuanxiaoyao.service.configuration.entity.AmisResponse;
import com.lanyuanxiaoyao.service.configuration.exception.TableNotFoundException;
import com.lanyuanxiaoyao.service.forest.service.InfoService;
import java.util.HashMap;
import java.util.Map;
import org.eclipse.collections.api.factory.Maps;
@@ -92,4 +95,10 @@ public class BaseController {
}
return queryMap;
}
protected void checkTableExists(InfoService infoService, Long flinkJobId, String alias) throws TableNotFoundException {
if (infoService.nonExistsTable(flinkJobId, alias)) {
throw new TableNotFoundException(Constants.SERVICE_NAME_INFO_QUERY, flinkJobId, alias);
}
}
}

View File

@@ -0,0 +1,62 @@
package com.lanyuanxiaoyao.service.web.controller;
import cn.hutool.core.util.ReUtil;
import com.dtflys.forest.annotation.Get;
import com.lanyuanxiaoyao.service.configuration.entity.info.JobIdAndAlias;
import com.lanyuanxiaoyao.service.configuration.exception.TableNotFoundException;
import com.lanyuanxiaoyao.service.forest.service.InfoService;
import com.lanyuanxiaoyao.service.forest.service.ScheduleService;
import java.util.regex.Pattern;
import org.eclipse.collections.api.factory.Lists;
import org.eclipse.collections.api.list.ImmutableList;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
/**
* 调度
*
* @author lanyuanxiaoyao
* @date 2023-07-06
*/
@RestController
@RequestMapping("schedule")
public class ScheduleController extends BaseController {
private static final Logger logger = LoggerFactory.getLogger(ScheduleController.class);
private final ScheduleService scheduleService;
private final InfoService infoService;
@SuppressWarnings("SpringJavaInjectionPointsAutowiringInspection")
public ScheduleController(ScheduleService scheduleService, InfoService infoService) {
this.scheduleService = scheduleService;
this.infoService = infoService;
}
@PostMapping("table_batch")
public void tableBatch(@RequestParam("lines") String lines) throws TableNotFoundException {
Pattern p = Pattern.compile("^(\\d+)\\s*(\\S+)$");
ImmutableList<JobIdAndAlias> jobIdAndAliases = Lists.immutable.of(lines.trim().split("\n"))
.collect(line -> {
Long flinkJobId = Long.valueOf(ReUtil.get(p, line, 1));
String alias = ReUtil.get(p, line, 2);
return new JobIdAndAlias(flinkJobId, alias);
});
for (JobIdAndAlias jobIdAndAlias : jobIdAndAliases) {
checkTableExists(infoService, jobIdAndAlias.getId(), jobIdAndAlias.getAlias());
}
jobIdAndAliases.forEach(item -> scheduleService.scheduleTable(item.getId(), item.getAlias()));
}
@Get("table")
public void table(
@RequestParam("flink_job_id") Long flinkJobId,
@RequestParam("alias") String alias
) throws TableNotFoundException {
checkTableExists(infoService, flinkJobId, alias);
scheduleService.scheduleTable(flinkJobId, alias);
}
}

View File

@@ -5,10 +5,6 @@ function toolTab() {
{
type: 'form',
title: '查询时间线',
mode: 'horizontal',
horizontal: {
leftFixed: 'md',
},
actions: [
{
type: 'submit',
@@ -54,10 +50,31 @@ function toolTab() {
name: 'hdfs',
label: '表HDFS路经',
required: true,
clearable: true,
}
]
},
{
type: 'form',
title: '批量提交压缩任务',
api: {
method: 'post',
url: '${base}/schedule/table_batch',
dataType: 'form',
},
body: [
{
name: 'lines',
type: 'textarea',
label: '表信息 (flink_job_id alias\\n)',
clearable: true,
minRows: 5,
maxRows: 5,
className: 'no-resize',
required: true,
}
]
},
{type: 'divider'},
]
}
}