diff --git a/bin/build-command.sh b/bin/build-command.sh new file mode 100755 index 0000000..5969316 --- /dev/null +++ b/bin/build-command.sh @@ -0,0 +1,4 @@ +#!/bin/bash +mvn -pl service-dependencies,service-configuration,service-forest clean deploy -D skipTests -P local -s ~/.m2/settings-development.xml +mvn -pl service-command clean package spring-boot:repackage -D skipTests -s ~/.m2/settings-development.xml +ytp-transfer2 /Users/lanyuanxiaoyao/Project/IdeaProjects/hudi-service/service-command/target/service-command-1.0.0-SNAPSHOT.jar \ No newline at end of file diff --git a/pom.xml b/pom.xml index 48e000f..7e8e5b3 100644 --- a/pom.xml +++ b/pom.xml @@ -31,6 +31,7 @@ service-api service-scheduler service-launcher + service-command diff --git a/service-cli/service-cli-runner/src/main/resources/template/command/cli.ftl b/service-cli/service-cli-runner/src/main/resources/template/command/cli.ftl index bf72b9c..5c9c40c 100644 --- a/service-cli/service-cli-runner/src/main/resources/template/command/cli.ftl +++ b/service-cli/service-cli-runner/src/main/resources/template/command/cli.ftl @@ -1,10 +1,4 @@ #!/bin/bash - -datetime=`date +%Y%m%d%H%M%S` -log_path='${runtime.logPath}' -loki_url='${runtime.loki.servicePushUrl}' - mkdir -p ${runtime.jarPath} - export JASYPT_ENCRYPTOR_PASSWORD='r#(R,P\"Dp^A47>WSn:Wn].gs/+\"v:q_Q*An~zF*g-@j@jtSTv5H/,S-3:R?r9R}.' -${runtime.jdkPath} <#noparse>-Ddatetime=${datetime} -Dhostname=${HOSTNAME} -Dlogging.parent=${log_path} -Dloki.url=${loki_url} -Dspring.profiles.include=default,b12 -jar ${runtime.jarPath}/service-command.jar<#if directly> $@ +${runtime.jdkPath} <#noparse>-Ddatetime=$(date +%Y%m%d%H%M%S) -Dhostname=$(ssh $host 'hostname') -Dlogging.parent=${runtime.logPath} -Dloki.url=${runtime.loki.servicePushUrl} -Dspring.cloud.zookeeper.connect-string=${runtime.zkUrl} -Dyarn-cluster.sync-clusters=${runtime.yarn.syncClusters} -Dyarn-cluster.compaction-clusters=${runtime.yarn.compactionClusters} -Dspring.profiles.include=default,b12 -jar ${runtime.jarPath}/service-command.jar<#if directly> $@ diff --git a/service-cli/service-cli-runner/src/main/resources/template/command/update.ftl b/service-cli/service-cli-runner/src/main/resources/template/command/update.ftl index 40ab693..62abfb9 100644 --- a/service-cli/service-cli-runner/src/main/resources/template/command/update.ftl +++ b/service-cli/service-cli-runner/src/main/resources/template/command/update.ftl @@ -1,3 +1,3 @@ #!/bin/bash -curl ftp://yyy:QeY\!68\)4nH1@132.121.122.15:2222/command-1.0.0-SNAPSHOT.jar -o ${runtime.jarPath}/service-command.jar \ No newline at end of file +curl ftp://yyy:QeY\!68\)4nH1@132.121.122.15:2222/service-command-1.0.0-SNAPSHOT.jar -o ${runtime.jarPath}/service-command.jar \ No newline at end of file diff --git a/service-command/pom.xml b/service-command/pom.xml new file mode 100644 index 0000000..e183cf4 --- /dev/null +++ b/service-command/pom.xml @@ -0,0 +1,54 @@ + + + 4.0.0 + + com.lanyuanxiaoyao + hudi-service + 1.0.0-SNAPSHOT + + + service-command + + + + com.lanyuanxiaoyao + service-forest + 1.0.0-SNAPSHOT + + + org.springframework.cloud + spring-cloud-starter-loadbalancer + + + org.springframework.boot + spring-boot-starter-web + + + + + org.springframework.shell + spring-shell-starter + 2.1.0-M3 + + + me.tongfei + progressbar + 0.9.3 + + + + + + + org.apache.maven.plugins + maven-source-plugin + + + org.springframework.boot + spring-boot-maven-plugin + + + + \ No newline at end of file diff --git a/service-command/src/main/java/com/lanyuanxiaoyao/service/command/CommandApplication.java b/service-command/src/main/java/com/lanyuanxiaoyao/service/command/CommandApplication.java new file mode 100644 index 0000000..a03f045 --- /dev/null +++ b/service-command/src/main/java/com/lanyuanxiaoyao/service/command/CommandApplication.java @@ -0,0 +1,43 @@ +package com.lanyuanxiaoyao.service.command; + +import com.lanyuanxiaoyao.service.configuration.SecurityConfig; +import com.lanyuanxiaoyao.service.forest.configuration.SpringCloudDiscoveryInterceptor; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.boot.ApplicationArguments; +import org.springframework.boot.ApplicationRunner; +import org.springframework.boot.SpringApplication; +import org.springframework.boot.autoconfigure.SpringBootApplication; +import org.springframework.boot.context.properties.EnableConfigurationProperties; +import org.springframework.context.annotation.ComponentScan; +import org.springframework.context.annotation.FilterType; + +/** + * 命令行工具入口 + * + * @author ZhangJiacheng + * @date 2022-03-24 + */ +@SpringBootApplication +@EnableConfigurationProperties +@ComponentScan( + basePackages = {"com.lanyuanxiaoyao.service"}, + excludeFilters = { + @ComponentScan.Filter(type = FilterType.ASSIGNABLE_TYPE, classes = { + SpringCloudDiscoveryInterceptor.class, + SecurityConfig.class + }), + } +) +public class CommandApplication implements ApplicationRunner { + private static final Logger logger = LoggerFactory.getLogger(CommandApplication.class); + + public static void main(String[] args) { + SpringApplication.run(CommandApplication.class, args); + } + + @Override + public void run(ApplicationArguments args) throws Exception { + // Test + } +} diff --git a/service-command/src/main/java/com/lanyuanxiaoyao/service/command/commands/AbstractUtilShellComponent.java b/service-command/src/main/java/com/lanyuanxiaoyao/service/command/commands/AbstractUtilShellComponent.java new file mode 100644 index 0000000..c26e291 --- /dev/null +++ b/service-command/src/main/java/com/lanyuanxiaoyao/service/command/commands/AbstractUtilShellComponent.java @@ -0,0 +1,28 @@ +package com.lanyuanxiaoyao.service.command.commands; + +import org.springframework.shell.component.ConfirmationInput; +import org.springframework.shell.standard.AbstractShellComponent; + +public abstract class AbstractUtilShellComponent extends AbstractShellComponent { + protected Boolean doubleCheck(String message, Boolean ignore) { + return ignore || doubleCheck(message); + } + + protected Boolean doubleCheck(String message) { + ConfirmationInput confirmationInput = new ConfirmationInput(getTerminal(), message, false); + confirmationInput.setResourceLoader(getResourceLoader()); + confirmationInput.setTemplateExecutor(getTemplateExecutor()); + ConfirmationInput.ConfirmationInputContext context = confirmationInput.run(ConfirmationInput.ConfirmationInputContext.empty()); + return context.getResultValue(); + } + + protected void doubleCheck(String message, T throwable) throws T { + if (!doubleCheck(message)) { + throw throwable; + } + } + + protected void doubleCheckQuiet(String message, T throwable) { + doubleCheck(message, new RuntimeException(throwable)); + } +} diff --git a/service-command/src/main/java/com/lanyuanxiaoyao/service/command/commands/CheckCommand.java b/service-command/src/main/java/com/lanyuanxiaoyao/service/command/commands/CheckCommand.java new file mode 100644 index 0000000..9ddf9ee --- /dev/null +++ b/service-command/src/main/java/com/lanyuanxiaoyao/service/command/commands/CheckCommand.java @@ -0,0 +1,61 @@ +package com.lanyuanxiaoyao.service.command.commands; + +import cn.hutool.core.util.StrUtil; +import com.eshore.odcp.hudi.connector.entity.TableMeta; +import com.lanyuanxiaoyao.service.configuration.ExecutorProvider; +import com.lanyuanxiaoyao.service.forest.service.InfoService; +import com.lanyuanxiaoyao.service.forest.service.PulsarService; +import org.eclipse.collections.api.factory.Lists; +import org.eclipse.collections.api.list.MutableList; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.shell.standard.ShellComponent; +import org.springframework.shell.standard.ShellMethod; + +/** + * @author ZhangJiacheng + * @date 2024-01-30 + */ +@SuppressWarnings("SpringJavaInjectionPointsAutowiringInspection") +@ShellComponent("检查相关操作") +public class CheckCommand { + private static final Logger logger = LoggerFactory.getLogger(CheckCommand.class); + + private final InfoService infoService; + private final PulsarService pulsarService; + + public CheckCommand(InfoService infoService, PulsarService pulsarService) { + this.infoService = infoService; + this.pulsarService = pulsarService; + } + + @ShellMethod("检查表topic是否对应") + public void checkPulsarTopicExists() { + MutableList results = Lists.mutable.empty().asSynchronized(); + infoService.tableMetaList() + .asParallel(ExecutorProvider.EXECUTORS, 10) + .forEach(meta -> { + logger.info("{} pulsar check...", meta.getAlias()); + if (StrUtil.isBlank(meta.getPulsarAddress())) { + results.add(StrUtil.format("{} pulsar url is blank", meta.getAlias())); + } + if (StrUtil.isBlank(meta.getTopic())) { + results.add(StrUtil.format("{} pulsar topic is blank", meta.getAlias())); + } + if (!pulsarService.existsTopic(meta.getPulsarAddress(), meta.getTopic())) { + results.add(StrUtil.format("{} pulsar url {} not contain topic {}", meta.getAlias(), meta.getPulsarAddress(), meta.getTopic())); + } + }); + results.toSortedList().forEach(logger::info); + } + + @ShellMethod("杂项工具") + public String getAllFieldType() { + return infoService.tableMetaList() + .flatCollect(TableMeta::getFields) + .collect(TableMeta.FieldMeta::getType) + .distinct() + .toSortedList() + .makeString("\n"); + } +} diff --git a/service-command/src/main/java/com/lanyuanxiaoyao/service/command/commands/JobCommand.java b/service-command/src/main/java/com/lanyuanxiaoyao/service/command/commands/JobCommand.java new file mode 100644 index 0000000..853c02c --- /dev/null +++ b/service-command/src/main/java/com/lanyuanxiaoyao/service/command/commands/JobCommand.java @@ -0,0 +1,218 @@ +package com.lanyuanxiaoyao.service.command.commands; + +import cn.hutool.core.collection.ListUtil; +import cn.hutool.core.util.ObjectUtil; +import cn.hutool.core.util.StrUtil; +import com.eshore.odcp.hudi.connector.entity.FlinkJob; +import com.eshore.odcp.hudi.connector.entity.TableMeta; +import com.lanyuanxiaoyao.service.command.commands.jobs.*; +import com.lanyuanxiaoyao.service.command.provider.SearchTypeValueProvider; +import com.lanyuanxiaoyao.service.command.utils.SearchUtils; +import com.lanyuanxiaoyao.service.configuration.entity.info.JobAndMetas; +import com.lanyuanxiaoyao.service.forest.service.InfoService; +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.List; +import org.eclipse.collections.api.list.ImmutableList; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.core.MethodParameter; +import org.springframework.shell.CompletionContext; +import org.springframework.shell.CompletionProposal; +import org.springframework.shell.standard.ShellComponent; +import org.springframework.shell.standard.ShellMethod; +import org.springframework.shell.standard.ShellOption; +import org.springframework.shell.standard.ValueProviderSupport; +import org.springframework.shell.table.BorderStyle; +import org.springframework.shell.table.TableBuilder; +import org.springframework.shell.table.TableModelBuilder; + +/** + * 任务操作 + * + * @author ZhangJiacheng + * @date 2022-03-24 + */ +@SuppressWarnings("SpringJavaInjectionPointsAutowiringInspection") +@ShellComponent("Config job") +public class JobCommand { + private static final Logger logger = LoggerFactory.getLogger(JobCommand.class); + + private final InfoService infoService; + + public JobCommand(InfoService infoService) { + this.infoService = infoService; + } + + @ShellMethod("显示 Flink job 配置信息") + public String jobAll( + @ShellOption(help = "搜索模式", + defaultValue = "CONTAINS", + valueProvider = SearchTypeValueProvider.class) SearchUtils.Type type, + @ShellOption(help = "搜索值 (除了 REGEX 模式, 其余可使用英文逗号分隔多个值)", + defaultValue = "") String pattern + ) { + ImmutableList jobAndMetas = infoService.allFlinkJobId() + .collect(id -> { + FlinkJob flinkJob = infoService.flinkJobDetail(id); + ImmutableList metas = infoService.tableMetaList(id) + .select(meta -> SearchUtils.search(type, pattern, meta.getSchema() + meta.getAlias())); + return new JobAndMetas(flinkJob, metas); + }) + .reject(job -> job.getMetas().isEmpty()); + TableModelBuilder builder = new TableModelBuilder<>(); + builder.addRow(); + builder.addValue("#"); + builder.addValue("Id"); + builder.addValue("Name/Table"); + builder.addValue("Bucket"); + builder.addValue("Compaction seconds"); + builder.addValue("Hdfs"); + for (int index = 0; index < jobAndMetas.size(); index++) { + JobAndMetas job = jobAndMetas.get(index); + builder.addRow(); + builder.addValue(StrUtil.format("{}-0", index + 1)); + builder.addValue(job.getJob().getId()); + builder.addValue(job.getJob().getName()); + builder.addValue(""); + builder.addValue(""); + builder.addValue(""); + if (ObjectUtil.isNotNull(job.getMetas()) && ObjectUtil.isNotEmpty(job.getMetas())) { + ImmutableList tableMetas = job.getMetas(); + for (int indexSub = 0; indexSub < tableMetas.size(); indexSub++) { + TableMeta meta = tableMetas.get(indexSub); + builder.addRow(); + builder.addValue(StrUtil.format("{}-{}", index + 1, indexSub + 1)); + builder.addValue(""); + builder.addValue(meta.getTable()); + builder.addValue(meta.getHudi().getBucketIndexNumber()); + builder.addValue(meta.getHudi().getCompactionDeltaSeconds()); + builder.addValue(meta.getHudi().getTargetHdfsPath()); + } + } + } + return new TableBuilder(builder.build()) + .addHeaderAndVerticalsBorders(BorderStyle.oldschool) + .build() + .render(200); + } + + @ShellMethod("批量新增 Flink job 配置") + public void jobAddBatch(@ShellOption(help = "配置文件路径") String source) throws IOException { + Path sourcePath = Paths.get(source); + if (Files.notExists(sourcePath)) { + throw new RuntimeException(StrUtil.format("{} not found", source)); + } + Files.lines(sourcePath) + .filter(StrUtil::isNotBlank) + .map(line -> line.split("\\s+")) + .forEach(fields -> { + TableType type = TableType.valueOf(fields[0]); + switch (type) { + case SMALL: + jobAdd(TableType.SMALL, fields[1], TableMeta.SourceType.valueOf(fields[2]), fields[3], fields[4], fields[5], fields[6]); + break; + case BIG: + jobAdd(TableType.BIG, fields[1], TableMeta.SourceType.valueOf(fields[2]), fields[3], fields[4], fields[5], fields[6]); + break; + case ACCT_SMALL: + jobAdd(TableType.ACCT_SMALL, fields[1], TableMeta.SourceType.valueOf(fields[2]), fields[3], fields[4], fields[5], fields[6]); + break; + case ACCT_BIG: + jobAdd(TableType.ACCT_BIG, fields[1], TableMeta.SourceType.valueOf(fields[2]), fields[3], fields[4], fields[5], fields[6]); + break; + case ACCT_ITEM: + jobAdd(TableType.ACCT_ITEM, fields[1], TableMeta.SourceType.valueOf(fields[2]), fields[3], fields[4], fields[5], fields[6]); + break; + default: + throw new RuntimeException("不支持的表类型: " + type); + } + }); + } + + /** + * udal__crm_order crm_order order_balance_his SMALL hdfs://b1/apps/iap/hive pulsar://132.122.113.171:16660,132.122.113.172:16660,132.122.113.173:16660,132.122.113.174:16660,132.122.113.175:16660,132.122.113.176:16660 persistent://odcp/crm_order/ord_prod_inst_acc_num dws_crm_order dws_order_balance_his + */ + @ShellMethod("新增 Flink job 配置") + public void jobAdd( + @ShellOption(help = "表类型 (BIG 为大表, SMALL 为小表, ACCT 为 ACCT 表)", + valueProvider = TableTypeValueProvider.class) TableType type, + @ShellOption(help = "数据源") String database, + @ShellOption(help = "数据源类型", + valueProvider = DatabaseTypeValueProvider.class) TableMeta.SourceType databaseType, + @ShellOption(help = "Scheme") String schema, + @ShellOption(help = "表名") String table, + @ShellOption(help = "HDFS 路径前缀") String hdfs, + @ShellOption(help = "Pulsar Address") String pulsarAddress + ) { + if (infoService.existsTableByHdfs(StrUtil.format("{}/dws_{}/dws_{}", hdfs, schema, table))) { + throw new RuntimeException(StrUtil.format("tb_app_collect_table_info 已存在指定的 HDFS 路径 {}", StrUtil.format("{}/dws_{}/dws_{}", hdfs, schema, table))); + } + switch (type) { + case SMALL: + new SmallTableAdder(infoService, database, databaseType.name(), schema, table, pulsarAddress, hdfs).execute(); + break; + case BIG: + new BigTableAdder(infoService, database, databaseType.name(), schema, table, pulsarAddress, hdfs).execute(); + break; + case ACCT_SMALL: + new AcctSmallTableAdder(infoService, database, databaseType.name(), schema, table, pulsarAddress, hdfs).execute(); + break; + case ACCT_BIG: + new AcctBigTableAdder(infoService, database, databaseType.name(), schema, table, pulsarAddress, hdfs).execute(); + break; + case ACCT_ITEM: + new AcctItemTableAdder(infoService, database, databaseType.name(), schema, table, pulsarAddress, hdfs).execute(); + break; + default: + throw new RuntimeException("不支持的表类型: " + type); + } + } + + public enum TableType { + /** + * 小表 + */ + SMALL, + /** + * 大表 + */ + BIG, + /** + * ACCT 小表 + */ + ACCT_SMALL, + /** + * ACCT 大表 + */ + ACCT_BIG, + /** + * ACCT_ITEM + */ + ACCT_ITEM, + } + + public static final class TableTypeValueProvider extends ValueProviderSupport { + @Override + public List complete(MethodParameter parameter, CompletionContext context, String[] hints) { + return ListUtil.of( + new CompletionProposal("SMALL"), + new CompletionProposal("BIG"), + new CompletionProposal("ACCT"), + new CompletionProposal("ACCT_ITEM") + ); + } + } + + public static final class DatabaseTypeValueProvider extends ValueProviderSupport { + @Override + public List complete(MethodParameter parameter, CompletionContext completionContext, String[] hints) { + return ListUtil.of( + new CompletionProposal("UDAL"), + new CompletionProposal("TELEPG") + ); + } + } +} diff --git a/service-command/src/main/java/com/lanyuanxiaoyao/service/command/commands/YarnCommand.java b/service-command/src/main/java/com/lanyuanxiaoyao/service/command/commands/YarnCommand.java new file mode 100644 index 0000000..7eaeb31 --- /dev/null +++ b/service-command/src/main/java/com/lanyuanxiaoyao/service/command/commands/YarnCommand.java @@ -0,0 +1,304 @@ +package com.lanyuanxiaoyao.service.command.commands; + +import cn.hutool.core.thread.ThreadUtil; +import cn.hutool.core.util.ObjectUtil; +import cn.hutool.core.util.StrUtil; +import com.eshore.odcp.hudi.connector.Constants; +import com.eshore.odcp.hudi.connector.entity.RunMeta; +import com.eshore.odcp.hudi.connector.utils.NameHelper; +import com.lanyuanxiaoyao.service.command.entity.TableMetaJob; +import com.lanyuanxiaoyao.service.command.utils.CommandLineUtils; +import com.lanyuanxiaoyao.service.command.utils.ShellTableUtils; +import com.lanyuanxiaoyao.service.configuration.ExecutorProvider; +import com.lanyuanxiaoyao.service.configuration.entity.yarn.YarnApplication; +import com.lanyuanxiaoyao.service.configuration.entity.yarn.YarnClusters; +import com.lanyuanxiaoyao.service.configuration.entity.zookeeper.ZookeeperNode; +import com.lanyuanxiaoyao.service.forest.service.InfoService; +import com.lanyuanxiaoyao.service.forest.service.YarnService; +import com.lanyuanxiaoyao.service.forest.service.ZookeeperService; +import com.lanyuanxiaoyao.service.forest.service.launcher.LauncherService; +import com.lanyuanxiaoyao.service.forest.service.launcher.LaunchersService; +import java.time.Instant; +import java.time.LocalDateTime; +import java.time.ZoneId; +import me.tongfei.progressbar.ProgressBar; +import org.eclipse.collections.api.factory.Lists; +import org.eclipse.collections.api.list.ImmutableList; +import org.eclipse.collections.api.map.ImmutableMap; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.shell.standard.ShellComponent; +import org.springframework.shell.standard.ShellMethod; +import org.springframework.shell.standard.ShellOption; +import org.springframework.shell.table.TableModelBuilder; + +/** + * 表同步任务 description + * + * @author ZhangJiacheng + * @date 2022-04-22 + */ +@SuppressWarnings("SpringJavaInjectionPointsAutowiringInspection") +@ShellComponent("同步任务相关操作") +public class YarnCommand extends AbstractUtilShellComponent { + private static final Logger logger = LoggerFactory.getLogger(YarnCommand.class); + private final static String KILL_CONFIRMATION_MESSAGE = "操作将会停止上表所显示的表同步任务, 请再次确认操作"; + private final static String RUN_CONFIRMATION_MESSAGE = "操作将会启动上表所显示的表同步任务, 请再次确认操作"; + private final InfoService infoService; + private final LauncherService launcherService; + private final ZookeeperService zookeeperService; + private final YarnService yarnService; + private final YarnClusters yarnClusters; + + public YarnCommand(InfoService infoService, LaunchersService launchersService, ZookeeperService zookeeperService, YarnService yarnService, YarnClusters yarnClusters) { + this.infoService = infoService; + this.zookeeperService = zookeeperService; + this.yarnService = yarnService; + this.yarnClusters = yarnClusters; + logger.info("Default sync cluster: {}", yarnClusters.getDefaultSyncCluster()); + this.launcherService = launchersService.getService(yarnClusters.getDefaultSyncCluster()); + } + + private String generateFlinkJobsTable(ImmutableList jobs, Boolean showFinishTime) { + TableModelBuilder builder = new TableModelBuilder<>(); + builder.addRow(); + builder.addValue("#"); + builder.addValue("Id"); + builder.addValue("Name"); + builder.addValue("State"); + builder.addValue("Application Id"); + builder.addValue("Launch time"); + if (showFinishTime) { + builder.addValue("Finish time"); + } + builder.addValue("Tracking url"); + for (int index = 0; index < jobs.size(); index++) { + TableMetaJob job = jobs.get(index); + builder.addRow(); + builder.addValue(index + 1); + builder.addValue(job.getFlinkJobId()); + builder.addValue(job.getAlias()); + if (job.isRunning()) { + builder.addValue(job.getState()); + builder.addValue(job.getId()); + LocalDateTime launchTime = LocalDateTime.ofInstant(Instant.ofEpochMilli(job.getStartedTime()), ZoneId.systemDefault()); + builder.addValue(launchTime.format(Constants.FORMATTER)); + if (showFinishTime) { + if (ObjectUtil.isNotNull(job.getFinishedTime()) || job.getFinishedTime() != 0) { + LocalDateTime finishTime = LocalDateTime.ofInstant(Instant.ofEpochMilli(job.getFinishedTime()), ZoneId.systemDefault()); + builder.addValue(finishTime.format(Constants.FORMATTER)); + } else { + builder.addValue(""); + } + } + builder.addValue(job.getTrackingUrl() + " "); + } else { + builder.addValue(""); + builder.addValue(""); + if (showFinishTime) { + builder.addValue(""); + } + builder.addValue(""); + builder.addValue(""); + } + } + return ShellTableUtils.renderTable(builder); + } + + @ShellMethod("启动表同步任务") + public String yarnRun( + @ShellOption(help = "Flink job id") Long flinkJobId, + @ShellOption( + help = "Ignore double check", + defaultValue = "false" + ) Boolean ignoreCheck + ) { + return yarnRunBatch(new Long[]{flinkJobId}, ignoreCheck); + } + + @ShellMethod("停止所有正在运行的表同步任务") + public String yarnKillRunning( + @ShellOption( + help = "Ignore double check", + defaultValue = "false" + ) Boolean ignoreCheck + ) { + return yarnKillBatch(getAllJobId(true).toArray(new Long[]{}), ignoreCheck); + } + + @ShellMethod("启动所有未运行的表同步任务") + public String yarnRunUnRunning( + @ShellOption( + help = "Ignore double check", + defaultValue = "false" + ) Boolean ignoreCheck + ) { + return yarnRunBatch(getAllJobId(false).toArray(new Long[]{}), ignoreCheck); + } + + private ImmutableList getAllJobId(Boolean isRunning) { + return infoService.allFlinkJobId() + .select(id -> { + String lockPath = NameHelper.syncRunningLockPath(id); + if (isRunning) { + return zookeeperService.existsPath(lockPath); + } else { + return !zookeeperService.existsPath(lockPath); + } + }); + } + + private ImmutableList getRunningJobs(Long[] flinkJobIds) { + ImmutableList runningPaths = zookeeperService.getChildren(NameHelper.ZK_SYNC_RUNNING_LOCK_PATH).collect(ZookeeperNode::getPath); + return Lists.immutable.of(flinkJobIds) + .distinct() + .collect(infoService::tableMetaList) + .flatCollect(meta -> meta) + .asParallel(ExecutorProvider.EXECUTORS, 5) + .collect(meta -> { + if (runningPaths.contains(NameHelper.syncRunningLockPath(meta.getJob().getId()))) { + RunMeta runMeta = zookeeperService.getSyncRunMeta(meta.getJob().getId()); + YarnApplication application = yarnService.jobDetail(yarnClusters.getDefaultSyncCluster(), runMeta.getApplicationId()); + return new TableMetaJob(meta, application); + } else { + return new TableMetaJob(meta); + } + }) + .toList() + .toImmutable(); + } + + @ShellMethod("批量停止表同步任务") + public String yarnKillBatch( + @ShellOption(help = "Flink job id") Long[] flinkJobIds, + @ShellOption( + help = "Ignore double check", + defaultValue = "false" + ) Boolean ignoreCheck + ) { + ImmutableList targetJobs = getRunningJobs(flinkJobIds); + return CommandLineUtils.generateResultLines( + () -> { + if (ObjectUtil.isEmpty(targetJobs)) { + return "没有找到包含指定 flink job id 对应的任务"; + } else { + System.out.println(generateFlinkJobsTable(targetJobs, false)); + if (doubleCheck(KILL_CONFIRMATION_MESSAGE, ignoreCheck)) { + ImmutableList ids = targetJobs.select(TableMetaJob::isRunning) + .collect(TableMetaJob::getFlinkJobId) + .distinct(); + try (ProgressBar pb = CommandLineUtils.progressbarBuilder("Kill jobs", ids.size()).build()) { + ids.forEach(id -> { + launcherService.syncStop(id); + pb.setExtraMessage(id.toString()); + pb.step(); + }); + } + return Constants.OPERATION_DONE; + } else { + return Constants.OPERATION_CANCEL; + } + } + } + ); + } + + + @ShellMethod("停止表同步任务") + public String yarnKill( + @ShellOption(help = "Flink job id") Long flinkJobId, + @ShellOption( + help = "Ignore double check", + defaultValue = "false" + ) Boolean ignoreCheck + ) { + return yarnKillBatch(new Long[]{flinkJobId}, ignoreCheck); + } + + @ShellMethod("批量启动表同步任务") + public String yarnRunBatch( + @ShellOption(help = "Flink job id") Long[] flinkJobIds, + @ShellOption( + help = "Ignore double check", + defaultValue = "false" + ) Boolean ignoreCheck + ) { + ImmutableList targetJobs = getRunningJobs(flinkJobIds); + return CommandLineUtils.generateResultLines( + () -> { + if (ObjectUtil.isEmpty(targetJobs)) { + return "没有找到包含指定 flink job id 对应的任务"; + } else { + System.out.println(generateFlinkJobsTable(targetJobs, false)); + if (doubleCheck(RUN_CONFIRMATION_MESSAGE, ignoreCheck)) { + ImmutableList ids = targetJobs.select(TableMetaJob::isNotRunning) + .collect(TableMetaJob::getFlinkJobId) + .distinct(); + try (ProgressBar pb = CommandLineUtils.progressbarBuilder("Run jobs", ids.size()).build()) { + ids.forEach(id -> { + ThreadUtil.safeSleep(1000); + launcherService.syncStart(id); + pb.setExtraMessage(id.toString()); + pb.step(); + }); + } + return Constants.OPERATION_DONE; + } else { + return Constants.OPERATION_CANCEL; + } + } + } + ); + } + + @ShellMethod("查询表同步任务状态") + public String yarnAll( + @ShellOption(help = "搜索值 (包含查询)", + defaultValue = "") String pattern, + @ShellOption(help = "展示结果列表的 Flink job id", + defaultValue = "false") Boolean showFlinkJobId, + @ShellOption(help = "展示运行的任务", + defaultValue = "false") Boolean running, + @ShellOption(help = "展示非运行的任务", + defaultValue = "false") Boolean unRunning + ) { + if (running && unRunning) { + throw new RuntimeException("--running 和 --un-running 参数不能同时使用"); + } + ImmutableMap applicationMap = yarnService.jobListRunning(yarnClusters.getDefaultSyncCluster()) + .toMap(YarnApplication::getName, app -> app) + .toImmutable(); + ImmutableList locks = zookeeperService.getChildren(NameHelper.ZK_SYNC_RUNNING_LOCK_PATH).collect(ZookeeperNode::getPath); + ImmutableList jobs = infoService.tableMetaList() + .select(meta -> StrUtil.contains(meta.getAlias(), pattern)) + .select(meta -> { + String lockPath = NameHelper.syncRunningLockPath(meta.getJob().getId(), meta.getAlias()); + if (running && locks.contains(lockPath)) { + return true; + } else if (unRunning && !locks.contains(lockPath)) { + return true; + } else { + return !running && !unRunning; + } + }) + .collect(meta -> { + String jobName = NameHelper.syncJobName(meta.getJob().getId(), meta.getJob().getName()); + return new TableMetaJob(meta, applicationMap.get(jobName)); + }); + return CommandLineUtils.generateResultLines( + () -> generateFlinkJobsTable(jobs, false), + () -> showFlinkJobId ? CommandLineUtils.generateResultLines( + () -> "Flink ids", + () -> jobs + .collect(TableMetaJob::getFlinkJobId) + .collect(String::valueOf) + .distinct() + .makeString(",") + ) : "" + ); + } + + @ShellMethod("test") + public void test() { + } +} diff --git a/service-command/src/main/java/com/lanyuanxiaoyao/service/command/commands/jobs/AbstractAdder.java b/service-command/src/main/java/com/lanyuanxiaoyao/service/command/commands/jobs/AbstractAdder.java new file mode 100644 index 0000000..d9dc2f5 --- /dev/null +++ b/service-command/src/main/java/com/lanyuanxiaoyao/service/command/commands/jobs/AbstractAdder.java @@ -0,0 +1,76 @@ +package com.lanyuanxiaoyao.service.command.commands.jobs; + +import cn.hutool.core.lang.Snowflake; +import com.eshore.odcp.hudi.connector.entity.FlinkJob; +import com.lanyuanxiaoyao.service.configuration.entity.info.TableMetaAdd; +import com.lanyuanxiaoyao.service.forest.service.InfoService; +import org.eclipse.collections.api.list.ImmutableList; + +/** + * @date 2022-06-29 + */ +public abstract class AbstractAdder implements Adder { + protected final ImmutableList flinkJobs; + protected final InfoService infoService; + protected final String database; + protected final String type; + protected final String schema; + protected final String table; + protected final String pulsarAddress; + protected final String hdfs; + + protected final Snowflake snowflake = new Snowflake(); + + public AbstractAdder(InfoService infoService, String database, String type, String schema, String table, String pulsarAddress, String hdfs) { + this.infoService = infoService; + this.flinkJobs = infoService.flinkJobList(); + + this.database = database; + this.type = type; + this.schema = schema; + this.table = table; + this.pulsarAddress = pulsarAddress; + this.hdfs = hdfs; + } + + protected abstract ImmutableList getJobs(); + + @Override + public void execute() { + for (Job j : getJobs()) { + if (flinkJobs.noneSatisfy(job -> j.name.equals(job.getName()))) { + infoService.saveFlinkJob(j.id, j.name, j.runMode); + } else { + if (j.isAppended) { + FlinkJob flinkJob = flinkJobs.select(job -> j.name.equals(job.getName())).getFirstOptional().orElseThrow(() -> new RuntimeException("Found duplicate flink job name")); + j.id = flinkJob.getId(); + } else { + throw new RuntimeException("Found duplicate flink job name"); + } + } + for (Table t : j.tables) { + infoService.saveTableMeta(new TableMetaAdd( + t.id, + t.alias, + j.id, + t.hudiJobId, + t.syncYarnJobId, + t.compactionYarnJobId, + t.srcDb, + t.srcType, + t.srcSchema, + t.srcTable, + t.srcPulsarAddr, + t.srcTopic, + t.tgtDb, + t.tgtTable, + t.tgtHdfsPath, + t.filterField, + t.filterValues, + t.filterType, + t.bucketNumber + )); + } + } + } +} diff --git a/service-command/src/main/java/com/lanyuanxiaoyao/service/command/commands/jobs/AcctBigTableAdder.java b/service-command/src/main/java/com/lanyuanxiaoyao/service/command/commands/jobs/AcctBigTableAdder.java new file mode 100644 index 0000000..f7ea974 --- /dev/null +++ b/service-command/src/main/java/com/lanyuanxiaoyao/service/command/commands/jobs/AcctBigTableAdder.java @@ -0,0 +1,50 @@ +package com.lanyuanxiaoyao.service.command.commands.jobs; + +import cn.hutool.core.util.StrUtil; +import com.eshore.odcp.hudi.connector.entity.FlinkJob; +import com.lanyuanxiaoyao.service.forest.service.InfoService; +import org.eclipse.collections.api.factory.Lists; +import org.eclipse.collections.api.list.ImmutableList; + +/** + * @date 2022-06-29 + */ +public class AcctBigTableAdder extends BigTableAdder { + public AcctBigTableAdder(InfoService infoService, String database, String type, String schema, String table, String pulsarAddress, String hdfs) { + super(infoService, database, type, schema, table, pulsarAddress, hdfs); + } + + protected ImmutableList getTables(String suffix) { + return getTables() + .tap(table -> { + table.alias = table.alias + "_" + suffix; + table.srcDb = table.srcDb + "_" + suffix; + table.srcSchema = table.srcSchema + "_" + suffix; + table.tgtDb = table.tgtDb + "_" + suffix; + table.srcTopic = StrUtil.format("persistent://odcp/{}/{}", table.srcSchema, table.srcTable); + if (StrUtil.endWith(table.tgtHdfsPath, "b")) { + table.tgtHdfsPath = StrUtil.format("{}/{}/{}_b", hdfs, table.tgtDb, table.tgtTable); + } else { + table.tgtHdfsPath = StrUtil.format("{}/{}/{}", hdfs, table.tgtDb, table.tgtTable); + } + }); + } + + @Override + protected ImmutableList getJobs() { + return Lists.immutable.of( + new Job( + snowflake.nextId(), + StrUtil.format("{} {}", schema, table), + FlinkJob.RunMode.ONE_IN_ONE.name(), + Lists.immutable.of( + getTables("gz"), + getTables("fs"), + getTables("sz"), + getTables("dg") + ).flatCollect(list -> list), + true + ) + ); + } +} diff --git a/service-command/src/main/java/com/lanyuanxiaoyao/service/command/commands/jobs/AcctItemTableAdder.java b/service-command/src/main/java/com/lanyuanxiaoyao/service/command/commands/jobs/AcctItemTableAdder.java new file mode 100644 index 0000000..07d8b85 --- /dev/null +++ b/service-command/src/main/java/com/lanyuanxiaoyao/service/command/commands/jobs/AcctItemTableAdder.java @@ -0,0 +1,79 @@ +package com.lanyuanxiaoyao.service.command.commands.jobs; + +import cn.hutool.core.lang.Tuple; +import cn.hutool.core.util.StrUtil; +import com.eshore.odcp.hudi.connector.entity.FlinkJob; +import com.lanyuanxiaoyao.service.forest.service.InfoService; +import org.eclipse.collections.api.factory.Lists; +import org.eclipse.collections.api.list.ImmutableList; + +/** + * @date 2022-06-29 + */ +public class AcctItemTableAdder extends AbstractAdder { + private final ImmutableList mappings = Lists.immutable.of( + new Tuple("fs", "757", "fs"), + new Tuple("dg", "769", "dg"), + new Tuple("dg", "668", "mm"), + new Tuple("dg", "756", "zh"), + new Tuple("dg", "759", "zj"), + new Tuple("dg", "760", "zs"), + new Tuple("dg", "763", "qy"), + new Tuple("gz", "200", "gz"), + new Tuple("gz", "662", "yj"), + new Tuple("gz", "750", "jm"), + new Tuple("gz", "752", "hz"), + new Tuple("gz", "754", "st"), + new Tuple("gz", "766", "yf"), + new Tuple("gz", "768", "cz"), + new Tuple("sz", "755", "sz"), + new Tuple("sz", "660", "sw"), + new Tuple("sz", "663", "jy"), + new Tuple("sz", "751", "sg"), + new Tuple("sz", "753", "mz"), + new Tuple("sz", "758", "zq"), + new Tuple("sz", "762", "hy"), + new Tuple("szx", "93", "qs") + ); + + public AcctItemTableAdder(InfoService infoService, String database, String type, String schema, String table, String pulsarAddress, String hdfs) { + super(infoService, database, type, schema, table, pulsarAddress, hdfs); + } + + @Override + protected ImmutableList getJobs() { + return Lists.immutable.of( + new Job( + snowflake.nextId(), + "acct acct_item", + FlinkJob.RunMode.ONE_IN_ONE.name(), + mappings.collect(tuple -> { + String area = tuple.get(0); + String code = tuple.get(1); + String city = tuple.get(2); + return new Table( + snowflake.nextId(), + StrUtil.format("{}_{}_{}", schema, table, city), + 3L, + 3L, + 4L, + StrUtil.format("{}_{}", database, area), + type, + StrUtil.format("{}_{}", schema, area), + table, + pulsarAddress, + StrUtil.format("persistent://odcp/{}_{}/{}_{}", schema, area, table, code), + StrUtil.format("dws_{}", schema), + StrUtil.format("dws_{}_{}", table, city), + StrUtil.format("{}/dws_{}/external_table_hudi/dws_{}_{}", hdfs, schema, table, city), + null, + null, + "NONE", + 50L + ); + }), + true + ) + ); + } +} diff --git a/service-command/src/main/java/com/lanyuanxiaoyao/service/command/commands/jobs/AcctSmallTableAdder.java b/service-command/src/main/java/com/lanyuanxiaoyao/service/command/commands/jobs/AcctSmallTableAdder.java new file mode 100644 index 0000000..9079296 --- /dev/null +++ b/service-command/src/main/java/com/lanyuanxiaoyao/service/command/commands/jobs/AcctSmallTableAdder.java @@ -0,0 +1,46 @@ +package com.lanyuanxiaoyao.service.command.commands.jobs; + +import cn.hutool.core.util.StrUtil; +import com.eshore.odcp.hudi.connector.entity.FlinkJob; +import com.lanyuanxiaoyao.service.forest.service.InfoService; +import org.eclipse.collections.api.factory.Lists; +import org.eclipse.collections.api.list.ImmutableList; + +/** + * @date 2022-06-29 + */ +public class AcctSmallTableAdder extends SmallTableAdder { + public AcctSmallTableAdder(InfoService infoService, String database, String type, String schema, String table, String pulsarAddress, String hdfs) { + super(infoService, database, type, schema, table, pulsarAddress, hdfs); + } + + private ImmutableList
getTables(String suffix) { + return getTables() + .tap(table -> { + table.alias = table.alias + "_" + suffix; + table.srcDb = table.srcDb + "_" + suffix; + table.srcSchema = table.srcSchema + "_" + suffix; + table.tgtDb = table.tgtDb + "_" + suffix; + table.srcTopic = StrUtil.format("persistent://odcp/{}/{}", table.srcSchema, table.srcTable); + table.tgtHdfsPath = StrUtil.format("{}/{}/external_table_hudi/{}", hdfs, table.tgtDb, table.tgtTable); + }); + } + + @Override + protected ImmutableList getJobs() { + return Lists.immutable.of( + new Job( + snowflake.nextId(), + schema, + FlinkJob.RunMode.ALL_IN_ONE_BY_TABLE.name(), + Lists.immutable.of( + getTables("gz"), + getTables("fs"), + getTables("sz"), + getTables("dg") + ).flatCollect(list -> list), + true + ) + ); + } +} diff --git a/service-command/src/main/java/com/lanyuanxiaoyao/service/command/commands/jobs/Adder.java b/service-command/src/main/java/com/lanyuanxiaoyao/service/command/commands/jobs/Adder.java new file mode 100644 index 0000000..12fca8d --- /dev/null +++ b/service-command/src/main/java/com/lanyuanxiaoyao/service/command/commands/jobs/Adder.java @@ -0,0 +1,68 @@ +package com.lanyuanxiaoyao.service.command.commands.jobs; + +import org.eclipse.collections.api.list.ImmutableList; + +/** + * @date 2022-06-29 + */ +public interface Adder { + void execute(); + + class Job implements Cloneable { + public Long id; + public String name; + public String runMode; + public ImmutableList
tables; + public Boolean isAppended; + + public Job(Long id, String name, String runMode, ImmutableList
tables, Boolean isAppended) { + this.id = id; + this.name = name; + this.runMode = runMode; + this.tables = tables; + this.isAppended = isAppended; + } + } + + class Table { + public Long id; + public String alias; + public Long hudiJobId; + public Long syncYarnJobId; + public Long compactionYarnJobId; + public String srcDb; + public String srcType; + public String srcSchema; + public String srcTable; + public String srcPulsarAddr; + public String srcTopic; + public String tgtDb; + public String tgtTable; + public String tgtHdfsPath; + public String filterField; + public String filterValues; + public String filterType; + public Long bucketNumber; + + public Table(Long id, String alias, Long hudiJobId, Long syncYarnJobId, Long compactionYarnJobId, String srcDb, String srcType, String srcSchema, String srcTable, String srcPulsarAddr, String srcTopic, String tgtDb, String tgtTable, String tgtHdfsPath, String filterField, String filterValues, String filterType, Long bucketNumber) { + this.id = id; + this.alias = alias; + this.hudiJobId = hudiJobId; + this.syncYarnJobId = syncYarnJobId; + this.compactionYarnJobId = compactionYarnJobId; + this.srcDb = srcDb; + this.srcType = srcType; + this.srcSchema = srcSchema; + this.srcTable = srcTable; + this.srcPulsarAddr = srcPulsarAddr; + this.srcTopic = srcTopic; + this.tgtDb = tgtDb; + this.tgtTable = tgtTable; + this.tgtHdfsPath = tgtHdfsPath; + this.filterField = filterField; + this.filterValues = filterValues; + this.filterType = filterType; + this.bucketNumber = bucketNumber; + } + } +} diff --git a/service-command/src/main/java/com/lanyuanxiaoyao/service/command/commands/jobs/BigTableAdder.java b/service-command/src/main/java/com/lanyuanxiaoyao/service/command/commands/jobs/BigTableAdder.java new file mode 100644 index 0000000..3b08847 --- /dev/null +++ b/service-command/src/main/java/com/lanyuanxiaoyao/service/command/commands/jobs/BigTableAdder.java @@ -0,0 +1,66 @@ +package com.lanyuanxiaoyao.service.command.commands.jobs; + +import cn.hutool.core.util.StrUtil; +import com.eshore.odcp.hudi.connector.entity.FlinkJob; +import com.lanyuanxiaoyao.service.command.commands.jobs.AbstractAdder; +import com.lanyuanxiaoyao.service.forest.service.InfoService; +import org.eclipse.collections.api.factory.Lists; +import org.eclipse.collections.api.list.ImmutableList; + +/** + * @date 2022-06-29 + */ +public class BigTableAdder extends AbstractAdder implements TableGetter { + public BigTableAdder(InfoService infoService, String database, String type, String schema, String table, String pulsarAddress, String hdfs) { + super(infoService, database, type, schema, table, pulsarAddress, hdfs); + } + + @Override + public ImmutableList
getTables() { + return Lists.immutable.of( + new Table( + snowflake.nextId(), + StrUtil.format("{}_{}", schema, table), + 2L, + 2L, + 4L, + database, + type, + schema, + table, + pulsarAddress, + StrUtil.format("persistent://odcp/{}/{}", schema, table), + StrUtil.format("dws_{}", schema), + StrUtil.format("dws_{}", table), + StrUtil.format("{}/dws_{}/external_table_hudi/dws_{}", hdfs, schema, table), + "CITY_ID", + "200,755,757,769", + "EXCLUDE", 20L + ), + new Table( + snowflake.nextId(), + StrUtil.format("{}_{}_b", schema, table), + 3L, + 3L, + 4L, + database, + type, + schema, + table, + pulsarAddress, + StrUtil.format("persistent://odcp/{}/{}", schema, table), + StrUtil.format("dws_{}", schema), + StrUtil.format("dws_{}", table), + StrUtil.format("{}/dws_{}/external_table_hudi/dws_{}_b", hdfs, schema, table), + "CITY_ID", + "200,755,757,769", + "INCLUDE", + 50L) + ); + } + + @Override + protected ImmutableList getJobs() { + return Lists.immutable.of(new Job(snowflake.nextId(), StrUtil.format("{} {}", schema, table), FlinkJob.RunMode.ONE_IN_ONE.name(), getTables(), false)); + } +} diff --git a/service-command/src/main/java/com/lanyuanxiaoyao/service/command/commands/jobs/SmallTableAdder.java b/service-command/src/main/java/com/lanyuanxiaoyao/service/command/commands/jobs/SmallTableAdder.java new file mode 100644 index 0000000..05ea5a1 --- /dev/null +++ b/service-command/src/main/java/com/lanyuanxiaoyao/service/command/commands/jobs/SmallTableAdder.java @@ -0,0 +1,55 @@ +package com.lanyuanxiaoyao.service.command.commands.jobs; + +import cn.hutool.core.util.StrUtil; +import com.eshore.odcp.hudi.connector.entity.FlinkJob; +import com.lanyuanxiaoyao.service.forest.service.InfoService; +import org.eclipse.collections.api.factory.Lists; +import org.eclipse.collections.api.list.ImmutableList; + +/** + * @date 2022-06-29 + */ +public class SmallTableAdder extends AbstractAdder implements TableGetter { + public SmallTableAdder(InfoService infoService, String database, String type, String schema, String table, String pulsarAddress, String hdfs) { + super(infoService, database, type, schema, table, pulsarAddress, hdfs); + } + + @Override + public ImmutableList
getTables() { + return Lists.immutable.of( + new Table( + snowflake.nextId(), + StrUtil.format("{}_{}", schema, table), + 1L, + 1L, + 4L, + database, + type, + schema, + table, + pulsarAddress, + StrUtil.format("persistent://odcp/{}/{}", schema, table), + StrUtil.format("dws_{}", schema), + StrUtil.format("dws_{}", table), + StrUtil.format("{}/dws_{}/external_table_hudi/dws_{}", hdfs, schema, table), + null, + null, + "NONE", + 1L + ) + ); + } + + @Override + protected ImmutableList getJobs() { + return Lists.immutable.of( + new Job( + snowflake.nextId(), + schema, + FlinkJob.RunMode.ALL_IN_ONE.name(), + getTables(), + true + ) + ); + } +} diff --git a/service-command/src/main/java/com/lanyuanxiaoyao/service/command/commands/jobs/TableGetter.java b/service-command/src/main/java/com/lanyuanxiaoyao/service/command/commands/jobs/TableGetter.java new file mode 100644 index 0000000..f2f48ff --- /dev/null +++ b/service-command/src/main/java/com/lanyuanxiaoyao/service/command/commands/jobs/TableGetter.java @@ -0,0 +1,10 @@ +package com.lanyuanxiaoyao.service.command.commands.jobs; + +import org.eclipse.collections.api.list.ImmutableList; + +/** + * @date 2022-06-29 + */ +public interface TableGetter { + ImmutableList getTables(); +} diff --git a/service-command/src/main/java/com/lanyuanxiaoyao/service/command/configuration/ServiceConfiguration.java b/service-command/src/main/java/com/lanyuanxiaoyao/service/command/configuration/ServiceConfiguration.java new file mode 100644 index 0000000..9afd324 --- /dev/null +++ b/service-command/src/main/java/com/lanyuanxiaoyao/service/command/configuration/ServiceConfiguration.java @@ -0,0 +1,32 @@ +package com.lanyuanxiaoyao.service.command.configuration; + +import java.util.List; +import org.springframework.boot.context.properties.ConfigurationProperties; +import org.springframework.context.annotation.Configuration; + +/** + * 服务配置 + * + * @author ZhangJiacheng + * @date 2023-12-20 + */ +@Configuration +@ConfigurationProperties("hudi-service") +public class ServiceConfiguration { + private List gatewayUrls; + + public List getGatewayUrls() { + return gatewayUrls; + } + + public void setGatewayUrls(List gatewayUrls) { + this.gatewayUrls = gatewayUrls; + } + + @Override + public String toString() { + return "ServiceConfiguration{" + + "gatewayUrls=" + gatewayUrls + + '}'; + } +} diff --git a/service-command/src/main/java/com/lanyuanxiaoyao/service/command/configuration/ShellConfiguration.java b/service-command/src/main/java/com/lanyuanxiaoyao/service/command/configuration/ShellConfiguration.java new file mode 100644 index 0000000..631a56d --- /dev/null +++ b/service-command/src/main/java/com/lanyuanxiaoyao/service/command/configuration/ShellConfiguration.java @@ -0,0 +1,19 @@ +package com.lanyuanxiaoyao.service.command.configuration; + +import org.jline.utils.AttributedString; +import org.springframework.context.annotation.Configuration; +import org.springframework.shell.jline.PromptProvider; + +/** + * Spring Shell 配置 + * + * @author ZhangJiacheng + * @date 2022-04-27 + */ +@Configuration +public class ShellConfiguration implements PromptProvider { + @Override + public AttributedString getPrompt() { + return new AttributedString("odcp-utils:->"); + } +} diff --git a/service-command/src/main/java/com/lanyuanxiaoyao/service/command/configuration/SpringCloudDiscoveryInterceptor.java b/service-command/src/main/java/com/lanyuanxiaoyao/service/command/configuration/SpringCloudDiscoveryInterceptor.java new file mode 100644 index 0000000..7d152bc --- /dev/null +++ b/service-command/src/main/java/com/lanyuanxiaoyao/service/command/configuration/SpringCloudDiscoveryInterceptor.java @@ -0,0 +1,67 @@ +package com.lanyuanxiaoyao.service.command.configuration; + +import cn.hutool.core.util.ObjectUtil; +import cn.hutool.core.util.RandomUtil; +import cn.hutool.core.util.StrUtil; +import cn.hutool.core.util.URLUtil; +import cn.hutool.http.HttpResponse; +import cn.hutool.http.HttpUtil; +import cn.hutool.json.JSONArray; +import cn.hutool.json.JSONUtil; +import com.dtflys.forest.auth.BasicAuth; +import com.dtflys.forest.http.ForestAddress; +import com.dtflys.forest.http.ForestRequest; +import com.dtflys.forest.interceptor.Interceptor; +import com.eshore.odcp.hudi.connector.Constants; +import com.github.benmanes.caffeine.cache.Caffeine; +import com.github.benmanes.caffeine.cache.LoadingCache; +import java.net.URI; +import java.net.URL; +import java.util.List; +import java.util.concurrent.TimeUnit; +import org.eclipse.collections.api.factory.Lists; +import org.eclipse.collections.api.list.ImmutableList; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.cloud.client.ServiceInstance; +import org.springframework.cloud.client.discovery.DiscoveryClient; +import org.springframework.stereotype.Component; + +/** + * @author lanyuanxiaoyao + * @date 2023-04-24 + */ +@Component +public class SpringCloudDiscoveryInterceptor implements Interceptor { + private static final Logger logger = LoggerFactory.getLogger(SpringCloudDiscoveryInterceptor.class); + + private final DiscoveryClient discoveryClient; + + public SpringCloudDiscoveryInterceptor(DiscoveryClient discoveryClient) { + logger.info("Services: {}", discoveryClient.getServices()); + this.discoveryClient = discoveryClient; + } + + @Override + public boolean beforeExecute(ForestRequest request) { + // Load + URL url = URLUtil.url(request.getUrl()); + String host = url.getHost(); + if (StrUtil.isNotBlank(host)) { + ImmutableList urls = Lists.immutable.ofAll(discoveryClient.getInstances(host)) + .collect(instance -> instance.getUri().toString()); + if (ObjectUtil.isNotEmpty(urls)) { + @SuppressWarnings("DataFlowIssue") + String targetUrl = urls.get(RandomUtil.randomInt(urls.size())); + URI uri = URI.create(targetUrl); + request.setAddress(new ForestAddress(uri.getScheme(), uri.getHost(), uri.getPort())); + } + } + + // Basic auth + BasicAuth basicAuth = new BasicAuth(Constants.SPRING_SECURITY_USERNAME, Constants.SPRING_SECURITY_PASSWORD_PLAIN); + basicAuth.enhanceAuthorization(request); + + return Interceptor.super.beforeExecute(request); + } +} diff --git a/service-command/src/main/java/com/lanyuanxiaoyao/service/command/entity/TableMetaJob.java b/service-command/src/main/java/com/lanyuanxiaoyao/service/command/entity/TableMetaJob.java new file mode 100644 index 0000000..796c125 --- /dev/null +++ b/service-command/src/main/java/com/lanyuanxiaoyao/service/command/entity/TableMetaJob.java @@ -0,0 +1,123 @@ +package com.lanyuanxiaoyao.service.command.entity; + +import cn.hutool.core.util.ObjectUtil; +import com.eshore.odcp.hudi.connector.entity.TableMeta; +import com.lanyuanxiaoyao.service.configuration.entity.yarn.YarnApplication; + +/** + * @author ZhangJiacheng + * @date 2023-12-21 + */ +public class TableMetaJob { + private final TableMeta meta; + private final YarnApplication application; + + public TableMetaJob(TableMeta meta, YarnApplication application) { + this.meta = meta; + this.application = application; + } + + public TableMetaJob(TableMeta meta) { + this(meta, null); + } + + public Boolean isRunning() { + return ObjectUtil.isNotNull(application); + } + + public Boolean isNotRunning() { + return ObjectUtil.isNull(application); + } + + public Long getFlinkJobId() { + return meta.getJob().getId(); + } + + public String getFlinkJobName() { + return meta.getJob().getName(); + } + + public String getAlias() { + return meta.getAlias(); + } + + public String getCluster() { + return application.getCluster(); + } + + public String getId() { + return application.getId(); + } + + public String getUser() { + return application.getUser(); + } + + public String getQueue() { + return application.getQueue(); + } + + public String getName() { + return application.getName(); + } + + public String getState() { + return application.getState(); + } + + public String getFinalStatus() { + return application.getFinalStatus(); + } + + public Float getProgress() { + return application.getProgress(); + } + + public String getDiagnostics() { + return application.getDiagnostics(); + } + + public String getApplicationType() { + return application.getApplicationType(); + } + + public Long getStartedTime() { + return application.getStartedTime(); + } + + public Long getLaunchTime() { + return application.getLaunchTime(); + } + + public Long getFinishedTime() { + return application.getFinishedTime(); + } + + public Long getElapsedTime() { + return application.getElapsedTime(); + } + + public String getTrackingUrl() { + return application.getTrackingUrl(); + } + + public Long getAllocatedMB() { + return application.getAllocatedMB(); + } + + public Long getAllocatedVCores() { + return application.getAllocatedVCores(); + } + + public Integer getRunningContainers() { + return application.getRunningContainers(); + } + + public Float getQueueUsagePercentage() { + return application.getQueueUsagePercentage(); + } + + public Float getClusterUsagePercentage() { + return application.getClusterUsagePercentage(); + } +} diff --git a/service-command/src/main/java/com/lanyuanxiaoyao/service/command/provider/AliasProvider.java b/service-command/src/main/java/com/lanyuanxiaoyao/service/command/provider/AliasProvider.java new file mode 100644 index 0000000..1304b1b --- /dev/null +++ b/service-command/src/main/java/com/lanyuanxiaoyao/service/command/provider/AliasProvider.java @@ -0,0 +1,36 @@ +package com.lanyuanxiaoyao.service.command.provider; + +import com.lanyuanxiaoyao.service.forest.service.InfoService; +import java.util.List; +import java.util.Optional; +import org.springframework.core.MethodParameter; +import org.springframework.shell.CompletionContext; +import org.springframework.shell.CompletionProposal; +import org.springframework.stereotype.Component; + +/** + * Table name 待选项 + * + * @author ZhangJiacheng + * @date 2022-05-30 + */ +@SuppressWarnings("SpringJavaInjectionPointsAutowiringInspection") +@Component +public class AliasProvider extends BaseValueProvider { + private final InfoService infoService; + + public AliasProvider(InfoService infoService) { + this.infoService = infoService; + } + + @Override + public List complete(MethodParameter parameter, CompletionContext completionContext, String[] hints) { + Optional field = getField(completionContext.getWords(), "--flink-job-id"); + return field.map(id -> infoService.simpleTableMetas(Long.parseLong(id)) + .collect(info -> new CompletionProposal(info.getAlias()).description(info.getSchema())) + .toList()) + .orElseGet(() -> infoService.allAlias() + .collect(CompletionProposal::new) + .toList()); + } +} diff --git a/service-command/src/main/java/com/lanyuanxiaoyao/service/command/provider/BaseValueProvider.java b/service-command/src/main/java/com/lanyuanxiaoyao/service/command/provider/BaseValueProvider.java new file mode 100644 index 0000000..a45e8b8 --- /dev/null +++ b/service-command/src/main/java/com/lanyuanxiaoyao/service/command/provider/BaseValueProvider.java @@ -0,0 +1,22 @@ +package com.lanyuanxiaoyao.service.command.provider; + +import java.util.List; +import java.util.Optional; +import org.springframework.shell.standard.ValueProviderSupport; + +/** + * @author ZhangJiacheng + * @date 2022-05-30 + */ +public abstract class BaseValueProvider extends ValueProviderSupport { + protected Optional getField(List words, String fieldName) { + boolean hasField = words.stream().anyMatch(word -> word.contains(fieldName)); + if (hasField) { + int index = words.indexOf(fieldName) + 1; + if (index < words.size()) { + return Optional.of(words.get(index)); + } + } + return Optional.empty(); + } +} diff --git a/service-command/src/main/java/com/lanyuanxiaoyao/service/command/provider/FlinkJobIdProvider.java b/service-command/src/main/java/com/lanyuanxiaoyao/service/command/provider/FlinkJobIdProvider.java new file mode 100644 index 0000000..c293691 --- /dev/null +++ b/service-command/src/main/java/com/lanyuanxiaoyao/service/command/provider/FlinkJobIdProvider.java @@ -0,0 +1,37 @@ +package com.lanyuanxiaoyao.service.command.provider; + +import com.lanyuanxiaoyao.service.forest.service.InfoService; +import java.util.List; +import java.util.Optional; +import org.springframework.core.MethodParameter; +import org.springframework.shell.CompletionContext; +import org.springframework.shell.CompletionProposal; +import org.springframework.stereotype.Component; + +/** + * Flink Job Id 待选项 + * + * @author ZhangJiacheng + * @date 2022-05-30 + */ +@SuppressWarnings("SpringJavaInjectionPointsAutowiringInspection") +@Component +public class FlinkJobIdProvider extends BaseValueProvider { + private final InfoService infoService; + + public FlinkJobIdProvider(InfoService infoService) { + this.infoService = infoService; + } + + @Override + public List complete(MethodParameter parameter, CompletionContext completionContext, String[] hints) { + Optional field = getField(completionContext.getWords(), "--table"); + return field.map(id -> infoService.simpleTableMetas(field.get()) + .collect(info -> new CompletionProposal(info.getFlinkJobId().toString()).description(info.getSchema())) + .toList()) + .orElseGet(() -> infoService.allFlinkJobId() + .collect(String::valueOf) + .collect(CompletionProposal::new) + .toList()); + } +} diff --git a/service-command/src/main/java/com/lanyuanxiaoyao/service/command/provider/SearchTypeValueProvider.java b/service-command/src/main/java/com/lanyuanxiaoyao/service/command/provider/SearchTypeValueProvider.java new file mode 100644 index 0000000..24f5e70 --- /dev/null +++ b/service-command/src/main/java/com/lanyuanxiaoyao/service/command/provider/SearchTypeValueProvider.java @@ -0,0 +1,22 @@ +package com.lanyuanxiaoyao.service.command.provider; + +import com.lanyuanxiaoyao.service.command.utils.SearchUtils; +import java.util.Arrays; +import java.util.List; +import java.util.stream.Collectors; +import org.springframework.core.MethodParameter; +import org.springframework.shell.CompletionContext; +import org.springframework.shell.CompletionProposal; + +/** + * @author ZhangJiacheng + * @date 2022-04-29 + */ +public class SearchTypeValueProvider extends BaseValueProvider { + @Override + public List complete(MethodParameter parameter, CompletionContext completionContext, String[] hints) { + return Arrays.stream(SearchUtils.Type.values()) + .map(type -> new CompletionProposal(type.name())) + .collect(Collectors.toList()); + } +} diff --git a/service-command/src/main/java/com/lanyuanxiaoyao/service/command/utils/CommandLineUtils.java b/service-command/src/main/java/com/lanyuanxiaoyao/service/command/utils/CommandLineUtils.java new file mode 100644 index 0000000..3916378 --- /dev/null +++ b/service-command/src/main/java/com/lanyuanxiaoyao/service/command/utils/CommandLineUtils.java @@ -0,0 +1,42 @@ +package com.lanyuanxiaoyao.service.command.utils; + +import cn.hutool.core.util.StrUtil; +import java.util.Arrays; +import java.util.function.Supplier; +import java.util.stream.Collectors; +import me.tongfei.progressbar.ConsoleProgressBarConsumer; +import me.tongfei.progressbar.DelegatingProgressBarConsumer; +import me.tongfei.progressbar.ProgressBarBuilder; +import me.tongfei.progressbar.ProgressBarStyle; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * 命令行工具类 + * + * @author ZhangJiacheng + * @date 2022-04-27 + */ +public class CommandLineUtils { + private static final Logger logger = LoggerFactory.getLogger(CommandLineUtils.class); + + @SafeVarargs + public static String generateResultLines(Supplier... suppliers) { + return Arrays.stream(suppliers) + .map(Supplier::get) + .filter(StrUtil::isNotBlank) + .collect(Collectors.joining("\n")); + } + + public static ProgressBarBuilder progressbarBuilder(String name, Integer max) { + return progressbarBuilder(name, max.longValue()); + } + + public static ProgressBarBuilder progressbarBuilder(String name, Long max) { + return new ProgressBarBuilder() + .setConsumer(new ConsoleProgressBarConsumer(System.out)) + .setInitialMax(max) + .setStyle(ProgressBarStyle.ASCII) + .setTaskName(name); + } +} diff --git a/service-command/src/main/java/com/lanyuanxiaoyao/service/command/utils/SearchUtils.java b/service-command/src/main/java/com/lanyuanxiaoyao/service/command/utils/SearchUtils.java new file mode 100644 index 0000000..48b4eb8 --- /dev/null +++ b/service-command/src/main/java/com/lanyuanxiaoyao/service/command/utils/SearchUtils.java @@ -0,0 +1,64 @@ +package com.lanyuanxiaoyao.service.command.utils; + +import cn.hutool.core.util.StrUtil; +import java.util.Arrays; +import java.util.stream.Stream; + +/** + * 过滤使用 + * + * @author ZhangJiacheng + * @date 2022-04-29 + */ +public class SearchUtils { + public enum Type { + /** + * 什么都不操作 + */ + NONE, + /** + * 正则表达式 + */ + REGEX, + /** + * 包含 + */ + CONTAINS, + /** + * 不包含 + */ + UN_CONTAINS, + /** + * 等于 + */ + EQUALS, + /** + * 不等于 + */ + UN_EQUALS + } + + public static Boolean search(Type type, String patternInput, String value) { + String pattern = patternInput.trim(); + Stream patterns = Arrays.stream(pattern.split(",")); + switch (type) { + case REGEX: + if (StrUtil.isBlank(pattern)) { + throw new RuntimeException("Regex pattern cannot be blank"); + } + return value.matches(pattern); + case CONTAINS: + return patterns.anyMatch(value::contains); + case UN_CONTAINS: + return patterns.noneMatch(value::contains); + case EQUALS: + return patterns.anyMatch(value::equals); + case UN_EQUALS: + return patterns.noneMatch(value::equals); + case NONE: + return true; + default: + throw new RuntimeException("Unsupported search type"); + } + } +} diff --git a/service-command/src/main/java/com/lanyuanxiaoyao/service/command/utils/ShellTableUtils.java b/service-command/src/main/java/com/lanyuanxiaoyao/service/command/utils/ShellTableUtils.java new file mode 100644 index 0000000..8487578 --- /dev/null +++ b/service-command/src/main/java/com/lanyuanxiaoyao/service/command/utils/ShellTableUtils.java @@ -0,0 +1,35 @@ +package com.lanyuanxiaoyao.service.command.utils; + +import org.springframework.shell.table.BorderSpecification; +import org.springframework.shell.table.BorderStyle; +import org.springframework.shell.table.TableBuilder; +import org.springframework.shell.table.TableModelBuilder; + +/** + * Shell 表显示工具 + * + * @author ZhangJiacheng + * @date 2022-05-23 + */ +public class ShellTableUtils { + public static final String CHECK_TRUE = "√"; + public static final int TABLE_WIDTH = 500; + public static final BorderStyle INNER_STYLE = BorderStyle.air; + public static final int INNER_SPECIFICATION = BorderSpecification.INNER_VERTICAL; + + public static String renderTable(TableModelBuilder builder) { + return new TableBuilder(builder.build()) + .addHeaderAndVerticalsBorders(BorderStyle.oldschool) + .build() + .render(TABLE_WIDTH); + } + + public static String renderInnerTable(TableModelBuilder builder) { + return new TableBuilder(builder.build()) + .paintBorder(INNER_STYLE, INNER_SPECIFICATION) + .fromTopLeft() + .toBottomRight() + .build() + .render(TABLE_WIDTH); + } +} diff --git a/service-command/src/main/java/com/lanyuanxiaoyao/service/command/utils/TableUtils.java b/service-command/src/main/java/com/lanyuanxiaoyao/service/command/utils/TableUtils.java new file mode 100644 index 0000000..f588a16 --- /dev/null +++ b/service-command/src/main/java/com/lanyuanxiaoyao/service/command/utils/TableUtils.java @@ -0,0 +1,132 @@ +package com.lanyuanxiaoyao.service.command.utils; + +import com.eshore.odcp.hudi.connector.entity.TableMeta; +import java.util.Comparator; +import java.util.List; +import java.util.function.Function; +import java.util.stream.Collectors; +import org.springframework.shell.table.TableModelBuilder; + +/** + * 表相关工具 + * + * @author ZhangJiacheng + * @date 2022-05-23 + */ +public class TableUtils { + public static String makeTableMeta(TableMeta meta) { + TableModelBuilder tableMetaModelBuilder = new TableModelBuilder<>(); + tableMetaModelBuilder.addRow(); + tableMetaModelBuilder.addValue("Alias"); + tableMetaModelBuilder.addValue(meta.getAlias()); + tableMetaModelBuilder.addRow(); + tableMetaModelBuilder.addValue("Source database"); + tableMetaModelBuilder.addValue(meta.getSource()); + tableMetaModelBuilder.addRow(); + tableMetaModelBuilder.addValue("Source Type"); + tableMetaModelBuilder.addValue(meta.getSourceType()); + tableMetaModelBuilder.addRow(); + tableMetaModelBuilder.addValue("Source schema"); + tableMetaModelBuilder.addValue(meta.getSchema()); + tableMetaModelBuilder.addRow(); + tableMetaModelBuilder.addValue("Source table"); + tableMetaModelBuilder.addValue(meta.getTable()); + tableMetaModelBuilder.addRow(); + tableMetaModelBuilder.addValue("Table type"); + tableMetaModelBuilder.addValue(meta.getType()); + tableMetaModelBuilder.addRow(); + tableMetaModelBuilder.addValue("Primary keys"); + tableMetaModelBuilder.addValue(meta.getPrimaryKeys().stream().map(TableMeta.FieldMeta::getName).collect(Collectors.joining(","))); + tableMetaModelBuilder.addRow(); + tableMetaModelBuilder.addValue("Partition keys"); + tableMetaModelBuilder.addValue(meta.getPartitionKeys().stream().map(TableMeta.FieldMeta::getName).collect(Collectors.joining(","))); + tableMetaModelBuilder.addRow(); + tableMetaModelBuilder.addValue("Hudi partition field"); + tableMetaModelBuilder.addValue(meta.getPartitionField()); + tableMetaModelBuilder.addRow(); + tableMetaModelBuilder.addValue("Tags"); + tableMetaModelBuilder.addValue(String.join(", ", meta.getTags())); + tableMetaModelBuilder.addRow(); + tableMetaModelBuilder.addValue("Fields"); + TableModelBuilder fieldsModelBuilder = new TableModelBuilder<>(); + fieldsModelBuilder.addRow(); + fieldsModelBuilder.addValue("#"); + fieldsModelBuilder.addValue("Name"); + fieldsModelBuilder.addValue("Type"); + fieldsModelBuilder.addValue("Length"); + fieldsModelBuilder.addValue("Primary key"); + fieldsModelBuilder.addValue("Partition key"); + List fields = meta.getFields() + .stream() + .sorted(Comparator.comparingInt(TableMeta.FieldMeta::getSequence)) + .collect(Collectors.toList()); + for (TableMeta.FieldMeta field : fields) { + fieldsModelBuilder.addRow(); + fieldsModelBuilder.addValue(field.getSequence()); + fieldsModelBuilder.addValue(field.getName()); + fieldsModelBuilder.addValue(field.getType()); + fieldsModelBuilder.addValue(field.getLength()); + fieldsModelBuilder.addValue(field.getPrimaryKey() ? ShellTableUtils.CHECK_TRUE : ""); + fieldsModelBuilder.addValue(field.getPartitionKey() ? ShellTableUtils.CHECK_TRUE : ""); + } + tableMetaModelBuilder.addValue(ShellTableUtils.renderInnerTable(fieldsModelBuilder)); + tableMetaModelBuilder.addRow(); + tableMetaModelBuilder.addValue("Hudi config"); + TableModelBuilder hudiModelBuilder = new TableModelBuilder<>(); + hudiModelBuilder.addRow(); + hudiModelBuilder.addValue("Write parallelism"); + hudiModelBuilder.addValue(meta.getHudi().getWriteTasks()); + hudiModelBuilder.addRow(); + hudiModelBuilder.addValue("Write task memory"); + hudiModelBuilder.addValue(meta.getHudi().getWriteTaskMaxMemory()); + hudiModelBuilder.addRow(); + hudiModelBuilder.addValue("Write batch size"); + hudiModelBuilder.addValue(meta.getHudi().getWriteBatchSize()); + hudiModelBuilder.addRow(); + hudiModelBuilder.addValue("Write rate limit"); + hudiModelBuilder.addValue(meta.getHudi().getWriteRateLimit()); + hudiModelBuilder.addRow(); + hudiModelBuilder.addValue("Bucket number"); + hudiModelBuilder.addValue(meta.getHudi().getBucketIndexNumber()); + hudiModelBuilder.addRow(); + hudiModelBuilder.addValue("Compaction parallelism"); + hudiModelBuilder.addValue(meta.getHudi().getCompactionTasks()); + hudiModelBuilder.addRow(); + hudiModelBuilder.addValue("Compaction strategy"); + hudiModelBuilder.addValue(meta.getHudi().getCompactionStrategy()); + hudiModelBuilder.addRow(); + hudiModelBuilder.addValue("Compaction delta commits"); + hudiModelBuilder.addValue(meta.getHudi().getCompactionDeltaCommits()); + hudiModelBuilder.addRow(); + hudiModelBuilder.addValue("Compaction delta seconds"); + hudiModelBuilder.addValue(meta.getHudi().getCompactionDeltaSeconds()); + hudiModelBuilder.addRow(); + hudiModelBuilder.addValue("Keep file version number"); + hudiModelBuilder.addValue(meta.getHudi().getKeepFileVersion()); + hudiModelBuilder.addRow(); + hudiModelBuilder.addValue("Keep commit version number"); + hudiModelBuilder.addValue(meta.getHudi().getKeepCommitVersion()); + hudiModelBuilder.addRow(); + hudiModelBuilder.addValue("Target HDFS path"); + hudiModelBuilder.addValue(meta.getHudi().getTargetHdfsPath()); + tableMetaModelBuilder.addValue(ShellTableUtils.renderInnerTable(hudiModelBuilder)); + Function> generateYarnJob = yarnMeta -> { + TableModelBuilder builder = new TableModelBuilder<>(); + builder.addRow(); + builder.addValue("Job manager memory"); + builder.addValue(yarnMeta.getJobManagerMemory()); + builder.addRow(); + builder.addValue("Task manager memory"); + builder.addValue(yarnMeta.getTaskManagerMemory()); + return builder; + }; + tableMetaModelBuilder.addRow(); + tableMetaModelBuilder.addValue("Sync yarn"); + tableMetaModelBuilder.addValue(ShellTableUtils.renderInnerTable(generateYarnJob.apply(meta.getSyncYarn()))); + tableMetaModelBuilder.addRow(); + tableMetaModelBuilder.addValue("Compaction yarn"); + tableMetaModelBuilder.addValue(ShellTableUtils.renderInnerTable(generateYarnJob.apply(meta.getCompactionYarn()))); + + return ShellTableUtils.renderTable(tableMetaModelBuilder); + } +} diff --git a/service-command/src/main/resources/application-b12.yml b/service-command/src/main/resources/application-b12.yml new file mode 100644 index 0000000..3d8f3a0 --- /dev/null +++ b/service-command/src/main/resources/application-b12.yml @@ -0,0 +1,50 @@ +hudi-service: + gateway-urls: + - http://b12s10.hdp.dc:35690 + - http://b12s11.hdp.dc:35690 + - http://b12s12.hdp.dc:35690 + - http://b12s13.hdp.dc:35690 + - http://b12s14.hdp.dc:35690 + - http://b12s15.hdp.dc:35690 + - http://b12s16.hdp.dc:35690 + - http://b12s17.hdp.dc:35690 + - http://b12s18.hdp.dc:35690 + - http://b12s19.hdp.dc:35690 + - http://b12s20.hdp.dc:35690 + - http://b12s21.hdp.dc:35690 + - http://b12s22.hdp.dc:35690 + - http://b12s23.hdp.dc:35690 + - http://b12s24.hdp.dc:35690 + - http://b12s25.hdp.dc:35690 + - http://b12s26.hdp.dc:35690 + - http://b12s27.hdp.dc:35690 + - http://b12s28.hdp.dc:35690 + - http://b12s29.hdp.dc:35690 + - http://b12s30.hdp.dc:35690 + - http://b12s31.hdp.dc:35690 + - http://b12s32.hdp.dc:35690 + - http://b12s33.hdp.dc:35690 + - http://b12s34.hdp.dc:35690 + - http://b12s35.hdp.dc:35690 + - http://b12s36.hdp.dc:35690 + - http://b12s37.hdp.dc:35690 + - http://b12s38.hdp.dc:35690 + - http://b12s39.hdp.dc:35690 + - http://b12s4.hdp.dc:35690 + - http://b12s40.hdp.dc:35690 + - http://b12s41.hdp.dc:35690 + - http://b12s42.hdp.dc:35690 + - http://b12s43.hdp.dc:35690 + - http://b12s44.hdp.dc:35690 + - http://b12s45.hdp.dc:35690 + - http://b12s46.hdp.dc:35690 + - http://b12s47.hdp.dc:35690 + - http://b12s48.hdp.dc:35690 + - http://b12s49.hdp.dc:35690 + - http://b12s5.hdp.dc:35690 + - http://b12s50.hdp.dc:35690 + - http://b12s51.hdp.dc:35690 + - http://b12s6.hdp.dc:35690 + - http://b12s7.hdp.dc:35690 + - http://b12s8.hdp.dc:35690 + - http://b12s9.hdp.dc:35690 \ No newline at end of file diff --git a/service-command/src/main/resources/application-b5.yml b/service-command/src/main/resources/application-b5.yml new file mode 100644 index 0000000..c37ece5 --- /dev/null +++ b/service-command/src/main/resources/application-b5.yml @@ -0,0 +1,17 @@ +hudi-service: + gateway-urls: + - http://b5s119.hdp.dc:35690 + - http://b5s120.hdp.dc:35690 + - http://b5s121.hdp.dc:35690 + - http://b5s122.hdp.dc:35690 + - http://b5s123.hdp.dc:35690 + - http://b5s124.hdp.dc:35690 + - http://b5s125.hdp.dc:35690 + - http://b5s126.hdp.dc:35690 + - http://b5s127.hdp.dc:35690 + - http://b5s128.hdp.dc:35690 + - http://b5s129.hdp.dc:35690 + - http://b5s130.hdp.dc:35690 + - http://b5s131.hdp.dc:35690 + - http://b5s132.hdp.dc:35690 + - http://b5s133.hdp.dc:35690 \ No newline at end of file diff --git a/service-command/src/main/resources/application.yml b/service-command/src/main/resources/application.yml new file mode 100644 index 0000000..7063355 --- /dev/null +++ b/service-command/src/main/resources/application.yml @@ -0,0 +1,18 @@ +spring: + application: + name: service-command + profiles: + include: common,discovery + shell: + interactive: + enabled: true + command: + script: + enabled: false + history: + enabled: false +forest: + backend: httpclient + timeout: 120000 + log-enabled: false + interceptors: com.lanyuanxiaoyao.service.command.configuration.SpringCloudDiscoveryInterceptor \ No newline at end of file