feat(command): 迁移service-command项目

This commit is contained in:
2024-02-29 12:10:59 +08:00
parent 474ee6173d
commit 0683068a02
33 changed files with 1881 additions and 8 deletions

54
service-command/pom.xml Normal file
View File

@@ -0,0 +1,54 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>com.lanyuanxiaoyao</groupId>
<artifactId>hudi-service</artifactId>
<version>1.0.0-SNAPSHOT</version>
</parent>
<artifactId>service-command</artifactId>
<dependencies>
<dependency>
<groupId>com.lanyuanxiaoyao</groupId>
<artifactId>service-forest</artifactId>
<version>1.0.0-SNAPSHOT</version>
<exclusions>
<exclusion>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-loadbalancer</artifactId>
</exclusion>
<exclusion>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.springframework.shell</groupId>
<artifactId>spring-shell-starter</artifactId>
<version>2.1.0-M3</version>
</dependency>
<dependency>
<groupId>me.tongfei</groupId>
<artifactId>progressbar</artifactId>
<version>0.9.3</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-source-plugin</artifactId>
</plugin>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>

View File

@@ -0,0 +1,43 @@
package com.lanyuanxiaoyao.service.command;
import com.lanyuanxiaoyao.service.configuration.SecurityConfig;
import com.lanyuanxiaoyao.service.forest.configuration.SpringCloudDiscoveryInterceptor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.context.annotation.FilterType;
/**
* 命令行工具入口
*
* @author ZhangJiacheng
* @date 2022-03-24
*/
@SpringBootApplication
@EnableConfigurationProperties
@ComponentScan(
basePackages = {"com.lanyuanxiaoyao.service"},
excludeFilters = {
@ComponentScan.Filter(type = FilterType.ASSIGNABLE_TYPE, classes = {
SpringCloudDiscoveryInterceptor.class,
SecurityConfig.class
}),
}
)
public class CommandApplication implements ApplicationRunner {
private static final Logger logger = LoggerFactory.getLogger(CommandApplication.class);
public static void main(String[] args) {
SpringApplication.run(CommandApplication.class, args);
}
@Override
public void run(ApplicationArguments args) throws Exception {
// Test
}
}

View File

@@ -0,0 +1,28 @@
package com.lanyuanxiaoyao.service.command.commands;
import org.springframework.shell.component.ConfirmationInput;
import org.springframework.shell.standard.AbstractShellComponent;
public abstract class AbstractUtilShellComponent extends AbstractShellComponent {
protected Boolean doubleCheck(String message, Boolean ignore) {
return ignore || doubleCheck(message);
}
protected Boolean doubleCheck(String message) {
ConfirmationInput confirmationInput = new ConfirmationInput(getTerminal(), message, false);
confirmationInput.setResourceLoader(getResourceLoader());
confirmationInput.setTemplateExecutor(getTemplateExecutor());
ConfirmationInput.ConfirmationInputContext context = confirmationInput.run(ConfirmationInput.ConfirmationInputContext.empty());
return context.getResultValue();
}
protected <T extends Throwable> void doubleCheck(String message, T throwable) throws T {
if (!doubleCheck(message)) {
throw throwable;
}
}
protected <T extends RuntimeException> void doubleCheckQuiet(String message, T throwable) {
doubleCheck(message, new RuntimeException(throwable));
}
}

View File

@@ -0,0 +1,61 @@
package com.lanyuanxiaoyao.service.command.commands;
import cn.hutool.core.util.StrUtil;
import com.eshore.odcp.hudi.connector.entity.TableMeta;
import com.lanyuanxiaoyao.service.configuration.ExecutorProvider;
import com.lanyuanxiaoyao.service.forest.service.InfoService;
import com.lanyuanxiaoyao.service.forest.service.PulsarService;
import org.eclipse.collections.api.factory.Lists;
import org.eclipse.collections.api.list.MutableList;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.shell.standard.ShellComponent;
import org.springframework.shell.standard.ShellMethod;
/**
* @author ZhangJiacheng
* @date 2024-01-30
*/
@SuppressWarnings("SpringJavaInjectionPointsAutowiringInspection")
@ShellComponent("检查相关操作")
public class CheckCommand {
private static final Logger logger = LoggerFactory.getLogger(CheckCommand.class);
private final InfoService infoService;
private final PulsarService pulsarService;
public CheckCommand(InfoService infoService, PulsarService pulsarService) {
this.infoService = infoService;
this.pulsarService = pulsarService;
}
@ShellMethod("检查表topic是否对应")
public void checkPulsarTopicExists() {
MutableList<String> results = Lists.mutable.<String>empty().asSynchronized();
infoService.tableMetaList()
.asParallel(ExecutorProvider.EXECUTORS, 10)
.forEach(meta -> {
logger.info("{} pulsar check...", meta.getAlias());
if (StrUtil.isBlank(meta.getPulsarAddress())) {
results.add(StrUtil.format("{} pulsar url is blank", meta.getAlias()));
}
if (StrUtil.isBlank(meta.getTopic())) {
results.add(StrUtil.format("{} pulsar topic is blank", meta.getAlias()));
}
if (!pulsarService.existsTopic(meta.getPulsarAddress(), meta.getTopic())) {
results.add(StrUtil.format("{} pulsar url {} not contain topic {}", meta.getAlias(), meta.getPulsarAddress(), meta.getTopic()));
}
});
results.toSortedList().forEach(logger::info);
}
@ShellMethod("杂项工具")
public String getAllFieldType() {
return infoService.tableMetaList()
.flatCollect(TableMeta::getFields)
.collect(TableMeta.FieldMeta::getType)
.distinct()
.toSortedList()
.makeString("\n");
}
}

View File

@@ -0,0 +1,218 @@
package com.lanyuanxiaoyao.service.command.commands;
import cn.hutool.core.collection.ListUtil;
import cn.hutool.core.util.ObjectUtil;
import cn.hutool.core.util.StrUtil;
import com.eshore.odcp.hudi.connector.entity.FlinkJob;
import com.eshore.odcp.hudi.connector.entity.TableMeta;
import com.lanyuanxiaoyao.service.command.commands.jobs.*;
import com.lanyuanxiaoyao.service.command.provider.SearchTypeValueProvider;
import com.lanyuanxiaoyao.service.command.utils.SearchUtils;
import com.lanyuanxiaoyao.service.configuration.entity.info.JobAndMetas;
import com.lanyuanxiaoyao.service.forest.service.InfoService;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.List;
import org.eclipse.collections.api.list.ImmutableList;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.core.MethodParameter;
import org.springframework.shell.CompletionContext;
import org.springframework.shell.CompletionProposal;
import org.springframework.shell.standard.ShellComponent;
import org.springframework.shell.standard.ShellMethod;
import org.springframework.shell.standard.ShellOption;
import org.springframework.shell.standard.ValueProviderSupport;
import org.springframework.shell.table.BorderStyle;
import org.springframework.shell.table.TableBuilder;
import org.springframework.shell.table.TableModelBuilder;
/**
* 任务操作
*
* @author ZhangJiacheng
* @date 2022-03-24
*/
@SuppressWarnings("SpringJavaInjectionPointsAutowiringInspection")
@ShellComponent("Config job")
public class JobCommand {
private static final Logger logger = LoggerFactory.getLogger(JobCommand.class);
private final InfoService infoService;
public JobCommand(InfoService infoService) {
this.infoService = infoService;
}
@ShellMethod("显示 Flink job 配置信息")
public String jobAll(
@ShellOption(help = "搜索模式",
defaultValue = "CONTAINS",
valueProvider = SearchTypeValueProvider.class) SearchUtils.Type type,
@ShellOption(help = "搜索值 (除了 REGEX 模式, 其余可使用英文逗号分隔多个值)",
defaultValue = "") String pattern
) {
ImmutableList<JobAndMetas> jobAndMetas = infoService.allFlinkJobId()
.collect(id -> {
FlinkJob flinkJob = infoService.flinkJobDetail(id);
ImmutableList<TableMeta> metas = infoService.tableMetaList(id)
.select(meta -> SearchUtils.search(type, pattern, meta.getSchema() + meta.getAlias()));
return new JobAndMetas(flinkJob, metas);
})
.reject(job -> job.getMetas().isEmpty());
TableModelBuilder<Object> builder = new TableModelBuilder<>();
builder.addRow();
builder.addValue("#");
builder.addValue("Id");
builder.addValue("Name/Table");
builder.addValue("Bucket");
builder.addValue("Compaction seconds");
builder.addValue("Hdfs");
for (int index = 0; index < jobAndMetas.size(); index++) {
JobAndMetas job = jobAndMetas.get(index);
builder.addRow();
builder.addValue(StrUtil.format("{}-0", index + 1));
builder.addValue(job.getJob().getId());
builder.addValue(job.getJob().getName());
builder.addValue("");
builder.addValue("");
builder.addValue("");
if (ObjectUtil.isNotNull(job.getMetas()) && ObjectUtil.isNotEmpty(job.getMetas())) {
ImmutableList<TableMeta> tableMetas = job.getMetas();
for (int indexSub = 0; indexSub < tableMetas.size(); indexSub++) {
TableMeta meta = tableMetas.get(indexSub);
builder.addRow();
builder.addValue(StrUtil.format("{}-{}", index + 1, indexSub + 1));
builder.addValue("");
builder.addValue(meta.getTable());
builder.addValue(meta.getHudi().getBucketIndexNumber());
builder.addValue(meta.getHudi().getCompactionDeltaSeconds());
builder.addValue(meta.getHudi().getTargetHdfsPath());
}
}
}
return new TableBuilder(builder.build())
.addHeaderAndVerticalsBorders(BorderStyle.oldschool)
.build()
.render(200);
}
@ShellMethod("批量新增 Flink job 配置")
public void jobAddBatch(@ShellOption(help = "配置文件路径") String source) throws IOException {
Path sourcePath = Paths.get(source);
if (Files.notExists(sourcePath)) {
throw new RuntimeException(StrUtil.format("{} not found", source));
}
Files.lines(sourcePath)
.filter(StrUtil::isNotBlank)
.map(line -> line.split("\\s+"))
.forEach(fields -> {
TableType type = TableType.valueOf(fields[0]);
switch (type) {
case SMALL:
jobAdd(TableType.SMALL, fields[1], TableMeta.SourceType.valueOf(fields[2]), fields[3], fields[4], fields[5], fields[6]);
break;
case BIG:
jobAdd(TableType.BIG, fields[1], TableMeta.SourceType.valueOf(fields[2]), fields[3], fields[4], fields[5], fields[6]);
break;
case ACCT_SMALL:
jobAdd(TableType.ACCT_SMALL, fields[1], TableMeta.SourceType.valueOf(fields[2]), fields[3], fields[4], fields[5], fields[6]);
break;
case ACCT_BIG:
jobAdd(TableType.ACCT_BIG, fields[1], TableMeta.SourceType.valueOf(fields[2]), fields[3], fields[4], fields[5], fields[6]);
break;
case ACCT_ITEM:
jobAdd(TableType.ACCT_ITEM, fields[1], TableMeta.SourceType.valueOf(fields[2]), fields[3], fields[4], fields[5], fields[6]);
break;
default:
throw new RuntimeException("不支持的表类型: " + type);
}
});
}
/**
* udal__crm_order crm_order order_balance_his SMALL hdfs://b1/apps/iap/hive pulsar://132.122.113.171:16660,132.122.113.172:16660,132.122.113.173:16660,132.122.113.174:16660,132.122.113.175:16660,132.122.113.176:16660 persistent://odcp/crm_order/ord_prod_inst_acc_num dws_crm_order dws_order_balance_his
*/
@ShellMethod("新增 Flink job 配置")
public void jobAdd(
@ShellOption(help = "表类型 (BIG 为大表, SMALL 为小表, ACCT 为 ACCT 表)",
valueProvider = TableTypeValueProvider.class) TableType type,
@ShellOption(help = "数据源") String database,
@ShellOption(help = "数据源类型",
valueProvider = DatabaseTypeValueProvider.class) TableMeta.SourceType databaseType,
@ShellOption(help = "Scheme") String schema,
@ShellOption(help = "表名") String table,
@ShellOption(help = "HDFS 路径前缀") String hdfs,
@ShellOption(help = "Pulsar Address") String pulsarAddress
) {
if (infoService.existsTableByHdfs(StrUtil.format("{}/dws_{}/dws_{}", hdfs, schema, table))) {
throw new RuntimeException(StrUtil.format("tb_app_collect_table_info 已存在指定的 HDFS 路径 {}", StrUtil.format("{}/dws_{}/dws_{}", hdfs, schema, table)));
}
switch (type) {
case SMALL:
new SmallTableAdder(infoService, database, databaseType.name(), schema, table, pulsarAddress, hdfs).execute();
break;
case BIG:
new BigTableAdder(infoService, database, databaseType.name(), schema, table, pulsarAddress, hdfs).execute();
break;
case ACCT_SMALL:
new AcctSmallTableAdder(infoService, database, databaseType.name(), schema, table, pulsarAddress, hdfs).execute();
break;
case ACCT_BIG:
new AcctBigTableAdder(infoService, database, databaseType.name(), schema, table, pulsarAddress, hdfs).execute();
break;
case ACCT_ITEM:
new AcctItemTableAdder(infoService, database, databaseType.name(), schema, table, pulsarAddress, hdfs).execute();
break;
default:
throw new RuntimeException("不支持的表类型: " + type);
}
}
public enum TableType {
/**
* 小表
*/
SMALL,
/**
* 大表
*/
BIG,
/**
* ACCT 小表
*/
ACCT_SMALL,
/**
* ACCT 大表
*/
ACCT_BIG,
/**
* ACCT_ITEM
*/
ACCT_ITEM,
}
public static final class TableTypeValueProvider extends ValueProviderSupport {
@Override
public List<CompletionProposal> complete(MethodParameter parameter, CompletionContext context, String[] hints) {
return ListUtil.of(
new CompletionProposal("SMALL"),
new CompletionProposal("BIG"),
new CompletionProposal("ACCT"),
new CompletionProposal("ACCT_ITEM")
);
}
}
public static final class DatabaseTypeValueProvider extends ValueProviderSupport {
@Override
public List<CompletionProposal> complete(MethodParameter parameter, CompletionContext completionContext, String[] hints) {
return ListUtil.of(
new CompletionProposal("UDAL"),
new CompletionProposal("TELEPG")
);
}
}
}

View File

@@ -0,0 +1,304 @@
package com.lanyuanxiaoyao.service.command.commands;
import cn.hutool.core.thread.ThreadUtil;
import cn.hutool.core.util.ObjectUtil;
import cn.hutool.core.util.StrUtil;
import com.eshore.odcp.hudi.connector.Constants;
import com.eshore.odcp.hudi.connector.entity.RunMeta;
import com.eshore.odcp.hudi.connector.utils.NameHelper;
import com.lanyuanxiaoyao.service.command.entity.TableMetaJob;
import com.lanyuanxiaoyao.service.command.utils.CommandLineUtils;
import com.lanyuanxiaoyao.service.command.utils.ShellTableUtils;
import com.lanyuanxiaoyao.service.configuration.ExecutorProvider;
import com.lanyuanxiaoyao.service.configuration.entity.yarn.YarnApplication;
import com.lanyuanxiaoyao.service.configuration.entity.yarn.YarnClusters;
import com.lanyuanxiaoyao.service.configuration.entity.zookeeper.ZookeeperNode;
import com.lanyuanxiaoyao.service.forest.service.InfoService;
import com.lanyuanxiaoyao.service.forest.service.YarnService;
import com.lanyuanxiaoyao.service.forest.service.ZookeeperService;
import com.lanyuanxiaoyao.service.forest.service.launcher.LauncherService;
import com.lanyuanxiaoyao.service.forest.service.launcher.LaunchersService;
import java.time.Instant;
import java.time.LocalDateTime;
import java.time.ZoneId;
import me.tongfei.progressbar.ProgressBar;
import org.eclipse.collections.api.factory.Lists;
import org.eclipse.collections.api.list.ImmutableList;
import org.eclipse.collections.api.map.ImmutableMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.shell.standard.ShellComponent;
import org.springframework.shell.standard.ShellMethod;
import org.springframework.shell.standard.ShellOption;
import org.springframework.shell.table.TableModelBuilder;
/**
* 表同步任务 description
*
* @author ZhangJiacheng
* @date 2022-04-22
*/
@SuppressWarnings("SpringJavaInjectionPointsAutowiringInspection")
@ShellComponent("同步任务相关操作")
public class YarnCommand extends AbstractUtilShellComponent {
private static final Logger logger = LoggerFactory.getLogger(YarnCommand.class);
private final static String KILL_CONFIRMATION_MESSAGE = "操作将会停止上表所显示的表同步任务, 请再次确认操作";
private final static String RUN_CONFIRMATION_MESSAGE = "操作将会启动上表所显示的表同步任务, 请再次确认操作";
private final InfoService infoService;
private final LauncherService launcherService;
private final ZookeeperService zookeeperService;
private final YarnService yarnService;
private final YarnClusters yarnClusters;
public YarnCommand(InfoService infoService, LaunchersService launchersService, ZookeeperService zookeeperService, YarnService yarnService, YarnClusters yarnClusters) {
this.infoService = infoService;
this.zookeeperService = zookeeperService;
this.yarnService = yarnService;
this.yarnClusters = yarnClusters;
logger.info("Default sync cluster: {}", yarnClusters.getDefaultSyncCluster());
this.launcherService = launchersService.getService(yarnClusters.getDefaultSyncCluster());
}
private String generateFlinkJobsTable(ImmutableList<TableMetaJob> jobs, Boolean showFinishTime) {
TableModelBuilder<Object> builder = new TableModelBuilder<>();
builder.addRow();
builder.addValue("#");
builder.addValue("Id");
builder.addValue("Name");
builder.addValue("State");
builder.addValue("Application Id");
builder.addValue("Launch time");
if (showFinishTime) {
builder.addValue("Finish time");
}
builder.addValue("Tracking url");
for (int index = 0; index < jobs.size(); index++) {
TableMetaJob job = jobs.get(index);
builder.addRow();
builder.addValue(index + 1);
builder.addValue(job.getFlinkJobId());
builder.addValue(job.getAlias());
if (job.isRunning()) {
builder.addValue(job.getState());
builder.addValue(job.getId());
LocalDateTime launchTime = LocalDateTime.ofInstant(Instant.ofEpochMilli(job.getStartedTime()), ZoneId.systemDefault());
builder.addValue(launchTime.format(Constants.FORMATTER));
if (showFinishTime) {
if (ObjectUtil.isNotNull(job.getFinishedTime()) || job.getFinishedTime() != 0) {
LocalDateTime finishTime = LocalDateTime.ofInstant(Instant.ofEpochMilli(job.getFinishedTime()), ZoneId.systemDefault());
builder.addValue(finishTime.format(Constants.FORMATTER));
} else {
builder.addValue("");
}
}
builder.addValue(job.getTrackingUrl() + " ");
} else {
builder.addValue("");
builder.addValue("");
if (showFinishTime) {
builder.addValue("");
}
builder.addValue("");
builder.addValue("");
}
}
return ShellTableUtils.renderTable(builder);
}
@ShellMethod("启动表同步任务")
public String yarnRun(
@ShellOption(help = "Flink job id") Long flinkJobId,
@ShellOption(
help = "Ignore double check",
defaultValue = "false"
) Boolean ignoreCheck
) {
return yarnRunBatch(new Long[]{flinkJobId}, ignoreCheck);
}
@ShellMethod("停止所有正在运行的表同步任务")
public String yarnKillRunning(
@ShellOption(
help = "Ignore double check",
defaultValue = "false"
) Boolean ignoreCheck
) {
return yarnKillBatch(getAllJobId(true).toArray(new Long[]{}), ignoreCheck);
}
@ShellMethod("启动所有未运行的表同步任务")
public String yarnRunUnRunning(
@ShellOption(
help = "Ignore double check",
defaultValue = "false"
) Boolean ignoreCheck
) {
return yarnRunBatch(getAllJobId(false).toArray(new Long[]{}), ignoreCheck);
}
private ImmutableList<Long> getAllJobId(Boolean isRunning) {
return infoService.allFlinkJobId()
.select(id -> {
String lockPath = NameHelper.syncRunningLockPath(id);
if (isRunning) {
return zookeeperService.existsPath(lockPath);
} else {
return !zookeeperService.existsPath(lockPath);
}
});
}
private ImmutableList<TableMetaJob> getRunningJobs(Long[] flinkJobIds) {
ImmutableList<String> runningPaths = zookeeperService.getChildren(NameHelper.ZK_SYNC_RUNNING_LOCK_PATH).collect(ZookeeperNode::getPath);
return Lists.immutable.of(flinkJobIds)
.distinct()
.collect(infoService::tableMetaList)
.flatCollect(meta -> meta)
.asParallel(ExecutorProvider.EXECUTORS, 5)
.collect(meta -> {
if (runningPaths.contains(NameHelper.syncRunningLockPath(meta.getJob().getId()))) {
RunMeta runMeta = zookeeperService.getSyncRunMeta(meta.getJob().getId());
YarnApplication application = yarnService.jobDetail(yarnClusters.getDefaultSyncCluster(), runMeta.getApplicationId());
return new TableMetaJob(meta, application);
} else {
return new TableMetaJob(meta);
}
})
.toList()
.toImmutable();
}
@ShellMethod("批量停止表同步任务")
public String yarnKillBatch(
@ShellOption(help = "Flink job id") Long[] flinkJobIds,
@ShellOption(
help = "Ignore double check",
defaultValue = "false"
) Boolean ignoreCheck
) {
ImmutableList<TableMetaJob> targetJobs = getRunningJobs(flinkJobIds);
return CommandLineUtils.generateResultLines(
() -> {
if (ObjectUtil.isEmpty(targetJobs)) {
return "没有找到包含指定 flink job id 对应的任务";
} else {
System.out.println(generateFlinkJobsTable(targetJobs, false));
if (doubleCheck(KILL_CONFIRMATION_MESSAGE, ignoreCheck)) {
ImmutableList<Long> ids = targetJobs.select(TableMetaJob::isRunning)
.collect(TableMetaJob::getFlinkJobId)
.distinct();
try (ProgressBar pb = CommandLineUtils.progressbarBuilder("Kill jobs", ids.size()).build()) {
ids.forEach(id -> {
launcherService.syncStop(id);
pb.setExtraMessage(id.toString());
pb.step();
});
}
return Constants.OPERATION_DONE;
} else {
return Constants.OPERATION_CANCEL;
}
}
}
);
}
@ShellMethod("停止表同步任务")
public String yarnKill(
@ShellOption(help = "Flink job id") Long flinkJobId,
@ShellOption(
help = "Ignore double check",
defaultValue = "false"
) Boolean ignoreCheck
) {
return yarnKillBatch(new Long[]{flinkJobId}, ignoreCheck);
}
@ShellMethod("批量启动表同步任务")
public String yarnRunBatch(
@ShellOption(help = "Flink job id") Long[] flinkJobIds,
@ShellOption(
help = "Ignore double check",
defaultValue = "false"
) Boolean ignoreCheck
) {
ImmutableList<TableMetaJob> targetJobs = getRunningJobs(flinkJobIds);
return CommandLineUtils.generateResultLines(
() -> {
if (ObjectUtil.isEmpty(targetJobs)) {
return "没有找到包含指定 flink job id 对应的任务";
} else {
System.out.println(generateFlinkJobsTable(targetJobs, false));
if (doubleCheck(RUN_CONFIRMATION_MESSAGE, ignoreCheck)) {
ImmutableList<Long> ids = targetJobs.select(TableMetaJob::isNotRunning)
.collect(TableMetaJob::getFlinkJobId)
.distinct();
try (ProgressBar pb = CommandLineUtils.progressbarBuilder("Run jobs", ids.size()).build()) {
ids.forEach(id -> {
ThreadUtil.safeSleep(1000);
launcherService.syncStart(id);
pb.setExtraMessage(id.toString());
pb.step();
});
}
return Constants.OPERATION_DONE;
} else {
return Constants.OPERATION_CANCEL;
}
}
}
);
}
@ShellMethod("查询表同步任务状态")
public String yarnAll(
@ShellOption(help = "搜索值 (包含查询)",
defaultValue = "") String pattern,
@ShellOption(help = "展示结果列表的 Flink job id",
defaultValue = "false") Boolean showFlinkJobId,
@ShellOption(help = "展示运行的任务",
defaultValue = "false") Boolean running,
@ShellOption(help = "展示非运行的任务",
defaultValue = "false") Boolean unRunning
) {
if (running && unRunning) {
throw new RuntimeException("--running 和 --un-running 参数不能同时使用");
}
ImmutableMap<String, YarnApplication> applicationMap = yarnService.jobListRunning(yarnClusters.getDefaultSyncCluster())
.toMap(YarnApplication::getName, app -> app)
.toImmutable();
ImmutableList<String> locks = zookeeperService.getChildren(NameHelper.ZK_SYNC_RUNNING_LOCK_PATH).collect(ZookeeperNode::getPath);
ImmutableList<TableMetaJob> jobs = infoService.tableMetaList()
.select(meta -> StrUtil.contains(meta.getAlias(), pattern))
.select(meta -> {
String lockPath = NameHelper.syncRunningLockPath(meta.getJob().getId(), meta.getAlias());
if (running && locks.contains(lockPath)) {
return true;
} else if (unRunning && !locks.contains(lockPath)) {
return true;
} else {
return !running && !unRunning;
}
})
.collect(meta -> {
String jobName = NameHelper.syncJobName(meta.getJob().getId(), meta.getJob().getName());
return new TableMetaJob(meta, applicationMap.get(jobName));
});
return CommandLineUtils.generateResultLines(
() -> generateFlinkJobsTable(jobs, false),
() -> showFlinkJobId ? CommandLineUtils.generateResultLines(
() -> "Flink ids",
() -> jobs
.collect(TableMetaJob::getFlinkJobId)
.collect(String::valueOf)
.distinct()
.makeString(",")
) : ""
);
}
@ShellMethod("test")
public void test() {
}
}

View File

@@ -0,0 +1,76 @@
package com.lanyuanxiaoyao.service.command.commands.jobs;
import cn.hutool.core.lang.Snowflake;
import com.eshore.odcp.hudi.connector.entity.FlinkJob;
import com.lanyuanxiaoyao.service.configuration.entity.info.TableMetaAdd;
import com.lanyuanxiaoyao.service.forest.service.InfoService;
import org.eclipse.collections.api.list.ImmutableList;
/**
* @date 2022-06-29
*/
public abstract class AbstractAdder implements Adder {
protected final ImmutableList<FlinkJob> flinkJobs;
protected final InfoService infoService;
protected final String database;
protected final String type;
protected final String schema;
protected final String table;
protected final String pulsarAddress;
protected final String hdfs;
protected final Snowflake snowflake = new Snowflake();
public AbstractAdder(InfoService infoService, String database, String type, String schema, String table, String pulsarAddress, String hdfs) {
this.infoService = infoService;
this.flinkJobs = infoService.flinkJobList();
this.database = database;
this.type = type;
this.schema = schema;
this.table = table;
this.pulsarAddress = pulsarAddress;
this.hdfs = hdfs;
}
protected abstract ImmutableList<Job> getJobs();
@Override
public void execute() {
for (Job j : getJobs()) {
if (flinkJobs.noneSatisfy(job -> j.name.equals(job.getName()))) {
infoService.saveFlinkJob(j.id, j.name, j.runMode);
} else {
if (j.isAppended) {
FlinkJob flinkJob = flinkJobs.select(job -> j.name.equals(job.getName())).getFirstOptional().orElseThrow(() -> new RuntimeException("Found duplicate flink job name"));
j.id = flinkJob.getId();
} else {
throw new RuntimeException("Found duplicate flink job name");
}
}
for (Table t : j.tables) {
infoService.saveTableMeta(new TableMetaAdd(
t.id,
t.alias,
j.id,
t.hudiJobId,
t.syncYarnJobId,
t.compactionYarnJobId,
t.srcDb,
t.srcType,
t.srcSchema,
t.srcTable,
t.srcPulsarAddr,
t.srcTopic,
t.tgtDb,
t.tgtTable,
t.tgtHdfsPath,
t.filterField,
t.filterValues,
t.filterType,
t.bucketNumber
));
}
}
}
}

View File

@@ -0,0 +1,50 @@
package com.lanyuanxiaoyao.service.command.commands.jobs;
import cn.hutool.core.util.StrUtil;
import com.eshore.odcp.hudi.connector.entity.FlinkJob;
import com.lanyuanxiaoyao.service.forest.service.InfoService;
import org.eclipse.collections.api.factory.Lists;
import org.eclipse.collections.api.list.ImmutableList;
/**
* @date 2022-06-29
*/
public class AcctBigTableAdder extends BigTableAdder {
public AcctBigTableAdder(InfoService infoService, String database, String type, String schema, String table, String pulsarAddress, String hdfs) {
super(infoService, database, type, schema, table, pulsarAddress, hdfs);
}
protected ImmutableList<Table> getTables(String suffix) {
return getTables()
.tap(table -> {
table.alias = table.alias + "_" + suffix;
table.srcDb = table.srcDb + "_" + suffix;
table.srcSchema = table.srcSchema + "_" + suffix;
table.tgtDb = table.tgtDb + "_" + suffix;
table.srcTopic = StrUtil.format("persistent://odcp/{}/{}", table.srcSchema, table.srcTable);
if (StrUtil.endWith(table.tgtHdfsPath, "b")) {
table.tgtHdfsPath = StrUtil.format("{}/{}/{}_b", hdfs, table.tgtDb, table.tgtTable);
} else {
table.tgtHdfsPath = StrUtil.format("{}/{}/{}", hdfs, table.tgtDb, table.tgtTable);
}
});
}
@Override
protected ImmutableList<Job> getJobs() {
return Lists.immutable.of(
new Job(
snowflake.nextId(),
StrUtil.format("{} {}", schema, table),
FlinkJob.RunMode.ONE_IN_ONE.name(),
Lists.immutable.of(
getTables("gz"),
getTables("fs"),
getTables("sz"),
getTables("dg")
).flatCollect(list -> list),
true
)
);
}
}

View File

@@ -0,0 +1,79 @@
package com.lanyuanxiaoyao.service.command.commands.jobs;
import cn.hutool.core.lang.Tuple;
import cn.hutool.core.util.StrUtil;
import com.eshore.odcp.hudi.connector.entity.FlinkJob;
import com.lanyuanxiaoyao.service.forest.service.InfoService;
import org.eclipse.collections.api.factory.Lists;
import org.eclipse.collections.api.list.ImmutableList;
/**
* @date 2022-06-29
*/
public class AcctItemTableAdder extends AbstractAdder {
private final ImmutableList<Tuple> mappings = Lists.immutable.of(
new Tuple("fs", "757", "fs"),
new Tuple("dg", "769", "dg"),
new Tuple("dg", "668", "mm"),
new Tuple("dg", "756", "zh"),
new Tuple("dg", "759", "zj"),
new Tuple("dg", "760", "zs"),
new Tuple("dg", "763", "qy"),
new Tuple("gz", "200", "gz"),
new Tuple("gz", "662", "yj"),
new Tuple("gz", "750", "jm"),
new Tuple("gz", "752", "hz"),
new Tuple("gz", "754", "st"),
new Tuple("gz", "766", "yf"),
new Tuple("gz", "768", "cz"),
new Tuple("sz", "755", "sz"),
new Tuple("sz", "660", "sw"),
new Tuple("sz", "663", "jy"),
new Tuple("sz", "751", "sg"),
new Tuple("sz", "753", "mz"),
new Tuple("sz", "758", "zq"),
new Tuple("sz", "762", "hy"),
new Tuple("szx", "93", "qs")
);
public AcctItemTableAdder(InfoService infoService, String database, String type, String schema, String table, String pulsarAddress, String hdfs) {
super(infoService, database, type, schema, table, pulsarAddress, hdfs);
}
@Override
protected ImmutableList<Job> getJobs() {
return Lists.immutable.of(
new Job(
snowflake.nextId(),
"acct acct_item",
FlinkJob.RunMode.ONE_IN_ONE.name(),
mappings.collect(tuple -> {
String area = tuple.get(0);
String code = tuple.get(1);
String city = tuple.get(2);
return new Table(
snowflake.nextId(),
StrUtil.format("{}_{}_{}", schema, table, city),
3L,
3L,
4L,
StrUtil.format("{}_{}", database, area),
type,
StrUtil.format("{}_{}", schema, area),
table,
pulsarAddress,
StrUtil.format("persistent://odcp/{}_{}/{}_{}", schema, area, table, code),
StrUtil.format("dws_{}", schema),
StrUtil.format("dws_{}_{}", table, city),
StrUtil.format("{}/dws_{}/external_table_hudi/dws_{}_{}", hdfs, schema, table, city),
null,
null,
"NONE",
50L
);
}),
true
)
);
}
}

View File

@@ -0,0 +1,46 @@
package com.lanyuanxiaoyao.service.command.commands.jobs;
import cn.hutool.core.util.StrUtil;
import com.eshore.odcp.hudi.connector.entity.FlinkJob;
import com.lanyuanxiaoyao.service.forest.service.InfoService;
import org.eclipse.collections.api.factory.Lists;
import org.eclipse.collections.api.list.ImmutableList;
/**
* @date 2022-06-29
*/
public class AcctSmallTableAdder extends SmallTableAdder {
public AcctSmallTableAdder(InfoService infoService, String database, String type, String schema, String table, String pulsarAddress, String hdfs) {
super(infoService, database, type, schema, table, pulsarAddress, hdfs);
}
private ImmutableList<Table> getTables(String suffix) {
return getTables()
.tap(table -> {
table.alias = table.alias + "_" + suffix;
table.srcDb = table.srcDb + "_" + suffix;
table.srcSchema = table.srcSchema + "_" + suffix;
table.tgtDb = table.tgtDb + "_" + suffix;
table.srcTopic = StrUtil.format("persistent://odcp/{}/{}", table.srcSchema, table.srcTable);
table.tgtHdfsPath = StrUtil.format("{}/{}/external_table_hudi/{}", hdfs, table.tgtDb, table.tgtTable);
});
}
@Override
protected ImmutableList<Job> getJobs() {
return Lists.immutable.of(
new Job(
snowflake.nextId(),
schema,
FlinkJob.RunMode.ALL_IN_ONE_BY_TABLE.name(),
Lists.immutable.of(
getTables("gz"),
getTables("fs"),
getTables("sz"),
getTables("dg")
).flatCollect(list -> list),
true
)
);
}
}

View File

@@ -0,0 +1,68 @@
package com.lanyuanxiaoyao.service.command.commands.jobs;
import org.eclipse.collections.api.list.ImmutableList;
/**
* @date 2022-06-29
*/
public interface Adder {
void execute();
class Job implements Cloneable {
public Long id;
public String name;
public String runMode;
public ImmutableList<Table> tables;
public Boolean isAppended;
public Job(Long id, String name, String runMode, ImmutableList<Table> tables, Boolean isAppended) {
this.id = id;
this.name = name;
this.runMode = runMode;
this.tables = tables;
this.isAppended = isAppended;
}
}
class Table {
public Long id;
public String alias;
public Long hudiJobId;
public Long syncYarnJobId;
public Long compactionYarnJobId;
public String srcDb;
public String srcType;
public String srcSchema;
public String srcTable;
public String srcPulsarAddr;
public String srcTopic;
public String tgtDb;
public String tgtTable;
public String tgtHdfsPath;
public String filterField;
public String filterValues;
public String filterType;
public Long bucketNumber;
public Table(Long id, String alias, Long hudiJobId, Long syncYarnJobId, Long compactionYarnJobId, String srcDb, String srcType, String srcSchema, String srcTable, String srcPulsarAddr, String srcTopic, String tgtDb, String tgtTable, String tgtHdfsPath, String filterField, String filterValues, String filterType, Long bucketNumber) {
this.id = id;
this.alias = alias;
this.hudiJobId = hudiJobId;
this.syncYarnJobId = syncYarnJobId;
this.compactionYarnJobId = compactionYarnJobId;
this.srcDb = srcDb;
this.srcType = srcType;
this.srcSchema = srcSchema;
this.srcTable = srcTable;
this.srcPulsarAddr = srcPulsarAddr;
this.srcTopic = srcTopic;
this.tgtDb = tgtDb;
this.tgtTable = tgtTable;
this.tgtHdfsPath = tgtHdfsPath;
this.filterField = filterField;
this.filterValues = filterValues;
this.filterType = filterType;
this.bucketNumber = bucketNumber;
}
}
}

View File

@@ -0,0 +1,66 @@
package com.lanyuanxiaoyao.service.command.commands.jobs;
import cn.hutool.core.util.StrUtil;
import com.eshore.odcp.hudi.connector.entity.FlinkJob;
import com.lanyuanxiaoyao.service.command.commands.jobs.AbstractAdder;
import com.lanyuanxiaoyao.service.forest.service.InfoService;
import org.eclipse.collections.api.factory.Lists;
import org.eclipse.collections.api.list.ImmutableList;
/**
* @date 2022-06-29
*/
public class BigTableAdder extends AbstractAdder implements TableGetter {
public BigTableAdder(InfoService infoService, String database, String type, String schema, String table, String pulsarAddress, String hdfs) {
super(infoService, database, type, schema, table, pulsarAddress, hdfs);
}
@Override
public ImmutableList<Table> getTables() {
return Lists.immutable.of(
new Table(
snowflake.nextId(),
StrUtil.format("{}_{}", schema, table),
2L,
2L,
4L,
database,
type,
schema,
table,
pulsarAddress,
StrUtil.format("persistent://odcp/{}/{}", schema, table),
StrUtil.format("dws_{}", schema),
StrUtil.format("dws_{}", table),
StrUtil.format("{}/dws_{}/external_table_hudi/dws_{}", hdfs, schema, table),
"CITY_ID",
"200,755,757,769",
"EXCLUDE", 20L
),
new Table(
snowflake.nextId(),
StrUtil.format("{}_{}_b", schema, table),
3L,
3L,
4L,
database,
type,
schema,
table,
pulsarAddress,
StrUtil.format("persistent://odcp/{}/{}", schema, table),
StrUtil.format("dws_{}", schema),
StrUtil.format("dws_{}", table),
StrUtil.format("{}/dws_{}/external_table_hudi/dws_{}_b", hdfs, schema, table),
"CITY_ID",
"200,755,757,769",
"INCLUDE",
50L)
);
}
@Override
protected ImmutableList<Job> getJobs() {
return Lists.immutable.of(new Job(snowflake.nextId(), StrUtil.format("{} {}", schema, table), FlinkJob.RunMode.ONE_IN_ONE.name(), getTables(), false));
}
}

View File

@@ -0,0 +1,55 @@
package com.lanyuanxiaoyao.service.command.commands.jobs;
import cn.hutool.core.util.StrUtil;
import com.eshore.odcp.hudi.connector.entity.FlinkJob;
import com.lanyuanxiaoyao.service.forest.service.InfoService;
import org.eclipse.collections.api.factory.Lists;
import org.eclipse.collections.api.list.ImmutableList;
/**
* @date 2022-06-29
*/
public class SmallTableAdder extends AbstractAdder implements TableGetter {
public SmallTableAdder(InfoService infoService, String database, String type, String schema, String table, String pulsarAddress, String hdfs) {
super(infoService, database, type, schema, table, pulsarAddress, hdfs);
}
@Override
public ImmutableList<Table> getTables() {
return Lists.immutable.of(
new Table(
snowflake.nextId(),
StrUtil.format("{}_{}", schema, table),
1L,
1L,
4L,
database,
type,
schema,
table,
pulsarAddress,
StrUtil.format("persistent://odcp/{}/{}", schema, table),
StrUtil.format("dws_{}", schema),
StrUtil.format("dws_{}", table),
StrUtil.format("{}/dws_{}/external_table_hudi/dws_{}", hdfs, schema, table),
null,
null,
"NONE",
1L
)
);
}
@Override
protected ImmutableList<Job> getJobs() {
return Lists.immutable.of(
new Job(
snowflake.nextId(),
schema,
FlinkJob.RunMode.ALL_IN_ONE.name(),
getTables(),
true
)
);
}
}

View File

@@ -0,0 +1,10 @@
package com.lanyuanxiaoyao.service.command.commands.jobs;
import org.eclipse.collections.api.list.ImmutableList;
/**
* @date 2022-06-29
*/
public interface TableGetter {
ImmutableList<Adder.Table> getTables();
}

View File

@@ -0,0 +1,32 @@
package com.lanyuanxiaoyao.service.command.configuration;
import java.util.List;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Configuration;
/**
* 服务配置
*
* @author ZhangJiacheng
* @date 2023-12-20
*/
@Configuration
@ConfigurationProperties("hudi-service")
public class ServiceConfiguration {
private List<String> gatewayUrls;
public List<String> getGatewayUrls() {
return gatewayUrls;
}
public void setGatewayUrls(List<String> gatewayUrls) {
this.gatewayUrls = gatewayUrls;
}
@Override
public String toString() {
return "ServiceConfiguration{" +
"gatewayUrls=" + gatewayUrls +
'}';
}
}

View File

@@ -0,0 +1,19 @@
package com.lanyuanxiaoyao.service.command.configuration;
import org.jline.utils.AttributedString;
import org.springframework.context.annotation.Configuration;
import org.springframework.shell.jline.PromptProvider;
/**
* Spring Shell 配置
*
* @author ZhangJiacheng
* @date 2022-04-27
*/
@Configuration
public class ShellConfiguration implements PromptProvider {
@Override
public AttributedString getPrompt() {
return new AttributedString("odcp-utils:->");
}
}

View File

@@ -0,0 +1,67 @@
package com.lanyuanxiaoyao.service.command.configuration;
import cn.hutool.core.util.ObjectUtil;
import cn.hutool.core.util.RandomUtil;
import cn.hutool.core.util.StrUtil;
import cn.hutool.core.util.URLUtil;
import cn.hutool.http.HttpResponse;
import cn.hutool.http.HttpUtil;
import cn.hutool.json.JSONArray;
import cn.hutool.json.JSONUtil;
import com.dtflys.forest.auth.BasicAuth;
import com.dtflys.forest.http.ForestAddress;
import com.dtflys.forest.http.ForestRequest;
import com.dtflys.forest.interceptor.Interceptor;
import com.eshore.odcp.hudi.connector.Constants;
import com.github.benmanes.caffeine.cache.Caffeine;
import com.github.benmanes.caffeine.cache.LoadingCache;
import java.net.URI;
import java.net.URL;
import java.util.List;
import java.util.concurrent.TimeUnit;
import org.eclipse.collections.api.factory.Lists;
import org.eclipse.collections.api.list.ImmutableList;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.cloud.client.ServiceInstance;
import org.springframework.cloud.client.discovery.DiscoveryClient;
import org.springframework.stereotype.Component;
/**
* @author lanyuanxiaoyao
* @date 2023-04-24
*/
@Component
public class SpringCloudDiscoveryInterceptor implements Interceptor<Object> {
private static final Logger logger = LoggerFactory.getLogger(SpringCloudDiscoveryInterceptor.class);
private final DiscoveryClient discoveryClient;
public SpringCloudDiscoveryInterceptor(DiscoveryClient discoveryClient) {
logger.info("Services: {}", discoveryClient.getServices());
this.discoveryClient = discoveryClient;
}
@Override
public boolean beforeExecute(ForestRequest request) {
// Load
URL url = URLUtil.url(request.getUrl());
String host = url.getHost();
if (StrUtil.isNotBlank(host)) {
ImmutableList<String> urls = Lists.immutable.ofAll(discoveryClient.getInstances(host))
.collect(instance -> instance.getUri().toString());
if (ObjectUtil.isNotEmpty(urls)) {
@SuppressWarnings("DataFlowIssue")
String targetUrl = urls.get(RandomUtil.randomInt(urls.size()));
URI uri = URI.create(targetUrl);
request.setAddress(new ForestAddress(uri.getScheme(), uri.getHost(), uri.getPort()));
}
}
// Basic auth
BasicAuth basicAuth = new BasicAuth(Constants.SPRING_SECURITY_USERNAME, Constants.SPRING_SECURITY_PASSWORD_PLAIN);
basicAuth.enhanceAuthorization(request);
return Interceptor.super.beforeExecute(request);
}
}

View File

@@ -0,0 +1,123 @@
package com.lanyuanxiaoyao.service.command.entity;
import cn.hutool.core.util.ObjectUtil;
import com.eshore.odcp.hudi.connector.entity.TableMeta;
import com.lanyuanxiaoyao.service.configuration.entity.yarn.YarnApplication;
/**
* @author ZhangJiacheng
* @date 2023-12-21
*/
public class TableMetaJob {
private final TableMeta meta;
private final YarnApplication application;
public TableMetaJob(TableMeta meta, YarnApplication application) {
this.meta = meta;
this.application = application;
}
public TableMetaJob(TableMeta meta) {
this(meta, null);
}
public Boolean isRunning() {
return ObjectUtil.isNotNull(application);
}
public Boolean isNotRunning() {
return ObjectUtil.isNull(application);
}
public Long getFlinkJobId() {
return meta.getJob().getId();
}
public String getFlinkJobName() {
return meta.getJob().getName();
}
public String getAlias() {
return meta.getAlias();
}
public String getCluster() {
return application.getCluster();
}
public String getId() {
return application.getId();
}
public String getUser() {
return application.getUser();
}
public String getQueue() {
return application.getQueue();
}
public String getName() {
return application.getName();
}
public String getState() {
return application.getState();
}
public String getFinalStatus() {
return application.getFinalStatus();
}
public Float getProgress() {
return application.getProgress();
}
public String getDiagnostics() {
return application.getDiagnostics();
}
public String getApplicationType() {
return application.getApplicationType();
}
public Long getStartedTime() {
return application.getStartedTime();
}
public Long getLaunchTime() {
return application.getLaunchTime();
}
public Long getFinishedTime() {
return application.getFinishedTime();
}
public Long getElapsedTime() {
return application.getElapsedTime();
}
public String getTrackingUrl() {
return application.getTrackingUrl();
}
public Long getAllocatedMB() {
return application.getAllocatedMB();
}
public Long getAllocatedVCores() {
return application.getAllocatedVCores();
}
public Integer getRunningContainers() {
return application.getRunningContainers();
}
public Float getQueueUsagePercentage() {
return application.getQueueUsagePercentage();
}
public Float getClusterUsagePercentage() {
return application.getClusterUsagePercentage();
}
}

View File

@@ -0,0 +1,36 @@
package com.lanyuanxiaoyao.service.command.provider;
import com.lanyuanxiaoyao.service.forest.service.InfoService;
import java.util.List;
import java.util.Optional;
import org.springframework.core.MethodParameter;
import org.springframework.shell.CompletionContext;
import org.springframework.shell.CompletionProposal;
import org.springframework.stereotype.Component;
/**
* Table name 待选项
*
* @author ZhangJiacheng
* @date 2022-05-30
*/
@SuppressWarnings("SpringJavaInjectionPointsAutowiringInspection")
@Component
public class AliasProvider extends BaseValueProvider {
private final InfoService infoService;
public AliasProvider(InfoService infoService) {
this.infoService = infoService;
}
@Override
public List<CompletionProposal> complete(MethodParameter parameter, CompletionContext completionContext, String[] hints) {
Optional<String> field = getField(completionContext.getWords(), "--flink-job-id");
return field.map(id -> infoService.simpleTableMetas(Long.parseLong(id))
.collect(info -> new CompletionProposal(info.getAlias()).description(info.getSchema()))
.toList())
.orElseGet(() -> infoService.allAlias()
.collect(CompletionProposal::new)
.toList());
}
}

View File

@@ -0,0 +1,22 @@
package com.lanyuanxiaoyao.service.command.provider;
import java.util.List;
import java.util.Optional;
import org.springframework.shell.standard.ValueProviderSupport;
/**
* @author ZhangJiacheng
* @date 2022-05-30
*/
public abstract class BaseValueProvider extends ValueProviderSupport {
protected Optional<String> getField(List<String> words, String fieldName) {
boolean hasField = words.stream().anyMatch(word -> word.contains(fieldName));
if (hasField) {
int index = words.indexOf(fieldName) + 1;
if (index < words.size()) {
return Optional.of(words.get(index));
}
}
return Optional.empty();
}
}

View File

@@ -0,0 +1,37 @@
package com.lanyuanxiaoyao.service.command.provider;
import com.lanyuanxiaoyao.service.forest.service.InfoService;
import java.util.List;
import java.util.Optional;
import org.springframework.core.MethodParameter;
import org.springframework.shell.CompletionContext;
import org.springframework.shell.CompletionProposal;
import org.springframework.stereotype.Component;
/**
* Flink Job Id 待选项
*
* @author ZhangJiacheng
* @date 2022-05-30
*/
@SuppressWarnings("SpringJavaInjectionPointsAutowiringInspection")
@Component
public class FlinkJobIdProvider extends BaseValueProvider {
private final InfoService infoService;
public FlinkJobIdProvider(InfoService infoService) {
this.infoService = infoService;
}
@Override
public List<CompletionProposal> complete(MethodParameter parameter, CompletionContext completionContext, String[] hints) {
Optional<String> field = getField(completionContext.getWords(), "--table");
return field.map(id -> infoService.simpleTableMetas(field.get())
.collect(info -> new CompletionProposal(info.getFlinkJobId().toString()).description(info.getSchema()))
.toList())
.orElseGet(() -> infoService.allFlinkJobId()
.collect(String::valueOf)
.collect(CompletionProposal::new)
.toList());
}
}

View File

@@ -0,0 +1,22 @@
package com.lanyuanxiaoyao.service.command.provider;
import com.lanyuanxiaoyao.service.command.utils.SearchUtils;
import java.util.Arrays;
import java.util.List;
import java.util.stream.Collectors;
import org.springframework.core.MethodParameter;
import org.springframework.shell.CompletionContext;
import org.springframework.shell.CompletionProposal;
/**
* @author ZhangJiacheng
* @date 2022-04-29
*/
public class SearchTypeValueProvider extends BaseValueProvider {
@Override
public List<CompletionProposal> complete(MethodParameter parameter, CompletionContext completionContext, String[] hints) {
return Arrays.stream(SearchUtils.Type.values())
.map(type -> new CompletionProposal(type.name()))
.collect(Collectors.toList());
}
}

View File

@@ -0,0 +1,42 @@
package com.lanyuanxiaoyao.service.command.utils;
import cn.hutool.core.util.StrUtil;
import java.util.Arrays;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import me.tongfei.progressbar.ConsoleProgressBarConsumer;
import me.tongfei.progressbar.DelegatingProgressBarConsumer;
import me.tongfei.progressbar.ProgressBarBuilder;
import me.tongfei.progressbar.ProgressBarStyle;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* 命令行工具类
*
* @author ZhangJiacheng
* @date 2022-04-27
*/
public class CommandLineUtils {
private static final Logger logger = LoggerFactory.getLogger(CommandLineUtils.class);
@SafeVarargs
public static String generateResultLines(Supplier<String>... suppliers) {
return Arrays.stream(suppliers)
.map(Supplier::get)
.filter(StrUtil::isNotBlank)
.collect(Collectors.joining("\n"));
}
public static ProgressBarBuilder progressbarBuilder(String name, Integer max) {
return progressbarBuilder(name, max.longValue());
}
public static ProgressBarBuilder progressbarBuilder(String name, Long max) {
return new ProgressBarBuilder()
.setConsumer(new ConsoleProgressBarConsumer(System.out))
.setInitialMax(max)
.setStyle(ProgressBarStyle.ASCII)
.setTaskName(name);
}
}

View File

@@ -0,0 +1,64 @@
package com.lanyuanxiaoyao.service.command.utils;
import cn.hutool.core.util.StrUtil;
import java.util.Arrays;
import java.util.stream.Stream;
/**
* 过滤使用
*
* @author ZhangJiacheng
* @date 2022-04-29
*/
public class SearchUtils {
public enum Type {
/**
* 什么都不操作
*/
NONE,
/**
* 正则表达式
*/
REGEX,
/**
* 包含
*/
CONTAINS,
/**
* 不包含
*/
UN_CONTAINS,
/**
* 等于
*/
EQUALS,
/**
* 不等于
*/
UN_EQUALS
}
public static Boolean search(Type type, String patternInput, String value) {
String pattern = patternInput.trim();
Stream<String> patterns = Arrays.stream(pattern.split(","));
switch (type) {
case REGEX:
if (StrUtil.isBlank(pattern)) {
throw new RuntimeException("Regex pattern cannot be blank");
}
return value.matches(pattern);
case CONTAINS:
return patterns.anyMatch(value::contains);
case UN_CONTAINS:
return patterns.noneMatch(value::contains);
case EQUALS:
return patterns.anyMatch(value::equals);
case UN_EQUALS:
return patterns.noneMatch(value::equals);
case NONE:
return true;
default:
throw new RuntimeException("Unsupported search type");
}
}
}

View File

@@ -0,0 +1,35 @@
package com.lanyuanxiaoyao.service.command.utils;
import org.springframework.shell.table.BorderSpecification;
import org.springframework.shell.table.BorderStyle;
import org.springframework.shell.table.TableBuilder;
import org.springframework.shell.table.TableModelBuilder;
/**
* Shell 表显示工具
*
* @author ZhangJiacheng
* @date 2022-05-23
*/
public class ShellTableUtils {
public static final String CHECK_TRUE = "";
public static final int TABLE_WIDTH = 500;
public static final BorderStyle INNER_STYLE = BorderStyle.air;
public static final int INNER_SPECIFICATION = BorderSpecification.INNER_VERTICAL;
public static String renderTable(TableModelBuilder<?> builder) {
return new TableBuilder(builder.build())
.addHeaderAndVerticalsBorders(BorderStyle.oldschool)
.build()
.render(TABLE_WIDTH);
}
public static String renderInnerTable(TableModelBuilder<?> builder) {
return new TableBuilder(builder.build())
.paintBorder(INNER_STYLE, INNER_SPECIFICATION)
.fromTopLeft()
.toBottomRight()
.build()
.render(TABLE_WIDTH);
}
}

View File

@@ -0,0 +1,132 @@
package com.lanyuanxiaoyao.service.command.utils;
import com.eshore.odcp.hudi.connector.entity.TableMeta;
import java.util.Comparator;
import java.util.List;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.springframework.shell.table.TableModelBuilder;
/**
* 表相关工具
*
* @author ZhangJiacheng
* @date 2022-05-23
*/
public class TableUtils {
public static String makeTableMeta(TableMeta meta) {
TableModelBuilder<Object> tableMetaModelBuilder = new TableModelBuilder<>();
tableMetaModelBuilder.addRow();
tableMetaModelBuilder.addValue("Alias");
tableMetaModelBuilder.addValue(meta.getAlias());
tableMetaModelBuilder.addRow();
tableMetaModelBuilder.addValue("Source database");
tableMetaModelBuilder.addValue(meta.getSource());
tableMetaModelBuilder.addRow();
tableMetaModelBuilder.addValue("Source Type");
tableMetaModelBuilder.addValue(meta.getSourceType());
tableMetaModelBuilder.addRow();
tableMetaModelBuilder.addValue("Source schema");
tableMetaModelBuilder.addValue(meta.getSchema());
tableMetaModelBuilder.addRow();
tableMetaModelBuilder.addValue("Source table");
tableMetaModelBuilder.addValue(meta.getTable());
tableMetaModelBuilder.addRow();
tableMetaModelBuilder.addValue("Table type");
tableMetaModelBuilder.addValue(meta.getType());
tableMetaModelBuilder.addRow();
tableMetaModelBuilder.addValue("Primary keys");
tableMetaModelBuilder.addValue(meta.getPrimaryKeys().stream().map(TableMeta.FieldMeta::getName).collect(Collectors.joining(",")));
tableMetaModelBuilder.addRow();
tableMetaModelBuilder.addValue("Partition keys");
tableMetaModelBuilder.addValue(meta.getPartitionKeys().stream().map(TableMeta.FieldMeta::getName).collect(Collectors.joining(",")));
tableMetaModelBuilder.addRow();
tableMetaModelBuilder.addValue("Hudi partition field");
tableMetaModelBuilder.addValue(meta.getPartitionField());
tableMetaModelBuilder.addRow();
tableMetaModelBuilder.addValue("Tags");
tableMetaModelBuilder.addValue(String.join(", ", meta.getTags()));
tableMetaModelBuilder.addRow();
tableMetaModelBuilder.addValue("Fields");
TableModelBuilder<Object> fieldsModelBuilder = new TableModelBuilder<>();
fieldsModelBuilder.addRow();
fieldsModelBuilder.addValue("#");
fieldsModelBuilder.addValue("Name");
fieldsModelBuilder.addValue("Type");
fieldsModelBuilder.addValue("Length");
fieldsModelBuilder.addValue("Primary key");
fieldsModelBuilder.addValue("Partition key");
List<TableMeta.FieldMeta> fields = meta.getFields()
.stream()
.sorted(Comparator.comparingInt(TableMeta.FieldMeta::getSequence))
.collect(Collectors.toList());
for (TableMeta.FieldMeta field : fields) {
fieldsModelBuilder.addRow();
fieldsModelBuilder.addValue(field.getSequence());
fieldsModelBuilder.addValue(field.getName());
fieldsModelBuilder.addValue(field.getType());
fieldsModelBuilder.addValue(field.getLength());
fieldsModelBuilder.addValue(field.getPrimaryKey() ? ShellTableUtils.CHECK_TRUE : "");
fieldsModelBuilder.addValue(field.getPartitionKey() ? ShellTableUtils.CHECK_TRUE : "");
}
tableMetaModelBuilder.addValue(ShellTableUtils.renderInnerTable(fieldsModelBuilder));
tableMetaModelBuilder.addRow();
tableMetaModelBuilder.addValue("Hudi config");
TableModelBuilder<Object> hudiModelBuilder = new TableModelBuilder<>();
hudiModelBuilder.addRow();
hudiModelBuilder.addValue("Write parallelism");
hudiModelBuilder.addValue(meta.getHudi().getWriteTasks());
hudiModelBuilder.addRow();
hudiModelBuilder.addValue("Write task memory");
hudiModelBuilder.addValue(meta.getHudi().getWriteTaskMaxMemory());
hudiModelBuilder.addRow();
hudiModelBuilder.addValue("Write batch size");
hudiModelBuilder.addValue(meta.getHudi().getWriteBatchSize());
hudiModelBuilder.addRow();
hudiModelBuilder.addValue("Write rate limit");
hudiModelBuilder.addValue(meta.getHudi().getWriteRateLimit());
hudiModelBuilder.addRow();
hudiModelBuilder.addValue("Bucket number");
hudiModelBuilder.addValue(meta.getHudi().getBucketIndexNumber());
hudiModelBuilder.addRow();
hudiModelBuilder.addValue("Compaction parallelism");
hudiModelBuilder.addValue(meta.getHudi().getCompactionTasks());
hudiModelBuilder.addRow();
hudiModelBuilder.addValue("Compaction strategy");
hudiModelBuilder.addValue(meta.getHudi().getCompactionStrategy());
hudiModelBuilder.addRow();
hudiModelBuilder.addValue("Compaction delta commits");
hudiModelBuilder.addValue(meta.getHudi().getCompactionDeltaCommits());
hudiModelBuilder.addRow();
hudiModelBuilder.addValue("Compaction delta seconds");
hudiModelBuilder.addValue(meta.getHudi().getCompactionDeltaSeconds());
hudiModelBuilder.addRow();
hudiModelBuilder.addValue("Keep file version number");
hudiModelBuilder.addValue(meta.getHudi().getKeepFileVersion());
hudiModelBuilder.addRow();
hudiModelBuilder.addValue("Keep commit version number");
hudiModelBuilder.addValue(meta.getHudi().getKeepCommitVersion());
hudiModelBuilder.addRow();
hudiModelBuilder.addValue("Target HDFS path");
hudiModelBuilder.addValue(meta.getHudi().getTargetHdfsPath());
tableMetaModelBuilder.addValue(ShellTableUtils.renderInnerTable(hudiModelBuilder));
Function<TableMeta.YarnMeta, TableModelBuilder<Object>> generateYarnJob = yarnMeta -> {
TableModelBuilder<Object> builder = new TableModelBuilder<>();
builder.addRow();
builder.addValue("Job manager memory");
builder.addValue(yarnMeta.getJobManagerMemory());
builder.addRow();
builder.addValue("Task manager memory");
builder.addValue(yarnMeta.getTaskManagerMemory());
return builder;
};
tableMetaModelBuilder.addRow();
tableMetaModelBuilder.addValue("Sync yarn");
tableMetaModelBuilder.addValue(ShellTableUtils.renderInnerTable(generateYarnJob.apply(meta.getSyncYarn())));
tableMetaModelBuilder.addRow();
tableMetaModelBuilder.addValue("Compaction yarn");
tableMetaModelBuilder.addValue(ShellTableUtils.renderInnerTable(generateYarnJob.apply(meta.getCompactionYarn())));
return ShellTableUtils.renderTable(tableMetaModelBuilder);
}
}

View File

@@ -0,0 +1,50 @@
hudi-service:
gateway-urls:
- http://b12s10.hdp.dc:35690
- http://b12s11.hdp.dc:35690
- http://b12s12.hdp.dc:35690
- http://b12s13.hdp.dc:35690
- http://b12s14.hdp.dc:35690
- http://b12s15.hdp.dc:35690
- http://b12s16.hdp.dc:35690
- http://b12s17.hdp.dc:35690
- http://b12s18.hdp.dc:35690
- http://b12s19.hdp.dc:35690
- http://b12s20.hdp.dc:35690
- http://b12s21.hdp.dc:35690
- http://b12s22.hdp.dc:35690
- http://b12s23.hdp.dc:35690
- http://b12s24.hdp.dc:35690
- http://b12s25.hdp.dc:35690
- http://b12s26.hdp.dc:35690
- http://b12s27.hdp.dc:35690
- http://b12s28.hdp.dc:35690
- http://b12s29.hdp.dc:35690
- http://b12s30.hdp.dc:35690
- http://b12s31.hdp.dc:35690
- http://b12s32.hdp.dc:35690
- http://b12s33.hdp.dc:35690
- http://b12s34.hdp.dc:35690
- http://b12s35.hdp.dc:35690
- http://b12s36.hdp.dc:35690
- http://b12s37.hdp.dc:35690
- http://b12s38.hdp.dc:35690
- http://b12s39.hdp.dc:35690
- http://b12s4.hdp.dc:35690
- http://b12s40.hdp.dc:35690
- http://b12s41.hdp.dc:35690
- http://b12s42.hdp.dc:35690
- http://b12s43.hdp.dc:35690
- http://b12s44.hdp.dc:35690
- http://b12s45.hdp.dc:35690
- http://b12s46.hdp.dc:35690
- http://b12s47.hdp.dc:35690
- http://b12s48.hdp.dc:35690
- http://b12s49.hdp.dc:35690
- http://b12s5.hdp.dc:35690
- http://b12s50.hdp.dc:35690
- http://b12s51.hdp.dc:35690
- http://b12s6.hdp.dc:35690
- http://b12s7.hdp.dc:35690
- http://b12s8.hdp.dc:35690
- http://b12s9.hdp.dc:35690

View File

@@ -0,0 +1,17 @@
hudi-service:
gateway-urls:
- http://b5s119.hdp.dc:35690
- http://b5s120.hdp.dc:35690
- http://b5s121.hdp.dc:35690
- http://b5s122.hdp.dc:35690
- http://b5s123.hdp.dc:35690
- http://b5s124.hdp.dc:35690
- http://b5s125.hdp.dc:35690
- http://b5s126.hdp.dc:35690
- http://b5s127.hdp.dc:35690
- http://b5s128.hdp.dc:35690
- http://b5s129.hdp.dc:35690
- http://b5s130.hdp.dc:35690
- http://b5s131.hdp.dc:35690
- http://b5s132.hdp.dc:35690
- http://b5s133.hdp.dc:35690

View File

@@ -0,0 +1,18 @@
spring:
application:
name: service-command
profiles:
include: common,discovery
shell:
interactive:
enabled: true
command:
script:
enabled: false
history:
enabled: false
forest:
backend: httpclient
timeout: 120000
log-enabled: false
interceptors: com.lanyuanxiaoyao.service.command.configuration.SpringCloudDiscoveryInterceptor