refactor(all): 初始化项目

迁移项目到独立的 service 进行集中开发
包含以下组件的查询 API 服务
flink
hudi
database(info)
pulsar
yarn
包含前端服务和 UI
web
包含公共代码
configuration
This commit is contained in:
2023-05-01 00:35:57 +08:00
parent 6b4374ffcc
commit 2f2c10e7b7
191 changed files with 789282 additions and 0 deletions

View File

@@ -0,0 +1,41 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>com.lanyuanxiaoyao</groupId>
<artifactId>hudi-service</artifactId>
<version>1.0.0-SNAPSHOT</version>
</parent>
<artifactId>service-info-query</artifactId>
<dependencies>
<dependency>
<groupId>com.lanyuanxiaoyao</groupId>
<artifactId>service-configuration</artifactId>
<version>1.0.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>com.eshore.odcp.hudi.connector</groupId>
<artifactId>database</artifactId>
<version>1.0.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>druid-spring-boot-starter</artifactId>
<version>1.2.17</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>

View File

@@ -0,0 +1,36 @@
package com.lanyuanxiaoyao.service.info;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.autoconfigure.gson.GsonAutoConfiguration;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.cloud.client.discovery.EnableDiscoveryClient;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.context.annotation.ComponentScans;
import org.springframework.retry.annotation.EnableRetry;
import org.springframework.scheduling.annotation.EnableScheduling;
/**
* 启动类
*
* @author lanyuanxiaoyao
* @date 2023-04-24
*/
@EnableDiscoveryClient
@SpringBootApplication(exclude = {GsonAutoConfiguration.class})
@ComponentScans({
@ComponentScan("com.lanyuanxiaoyao.service"),
@ComponentScan("com.eshore.odcp.hudi.connector.utils.database"),
})
@EnableConfigurationProperties
@EnableRetry
@EnableScheduling
public class InfoQueryApplication {
private static final Logger logger = LoggerFactory.getLogger(InfoQueryApplication.class);
public static void main(String[] args) {
SpringApplication.run(InfoQueryApplication.class, args);
}
}

View File

@@ -0,0 +1,26 @@
package com.lanyuanxiaoyao.service.info.configuration;
import javax.sql.DataSource;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.jdbc.datasource.DataSourceTransactionManager;
import org.springframework.transaction.support.TransactionTemplate;
/**
* JDBC 配置
*
* @author lanyuanxiaoyao
* @date 2023-04-26
*/
@Configuration
public class JdbcConfiguration {
@Bean
public DataSourceTransactionManager transactionManager(DataSource dataSource) {
return new DataSourceTransactionManager(dataSource);
}
@Bean
public TransactionTemplate transactionTemplate(DataSourceTransactionManager dataSourceTransactionManager) {
return new TransactionTemplate(dataSourceTransactionManager);
}
}

View File

@@ -0,0 +1,92 @@
package com.lanyuanxiaoyao.service.info.controller;
import cn.hutool.core.util.ObjectUtil;
import com.eshore.odcp.hudi.connector.entity.FlinkJob;
import com.eshore.odcp.hudi.connector.entity.SyncState;
import com.eshore.odcp.hudi.connector.entity.TableMeta;
import com.lanyuanxiaoyao.service.configuration.entity.PageResponse;
import com.lanyuanxiaoyao.service.configuration.entity.info.JobAndMetas;
import com.lanyuanxiaoyao.service.configuration.entity.info.JobIdAndAlias;
import com.lanyuanxiaoyao.service.info.service.InfoService;
import java.util.List;
import org.eclipse.collections.api.factory.Lists;
import org.eclipse.collections.api.list.ImmutableList;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
/**
* 信息接口
*
* @author lanyuanxiaoyao
* @date 2023-04-24
*/
@RestController
@RequestMapping("info")
public class InfoController {
private static final Logger logger = LoggerFactory.getLogger(InfoController.class);
private final InfoService infoService;
public InfoController(InfoService infoService) {
this.infoService = infoService;
}
@GetMapping("/job_id_alias")
public PageResponse<JobIdAndAlias> jobIdAndAlias(
@RequestParam(value = "page", defaultValue = "1") Integer page,
@RequestParam(value = "count", defaultValue = "10") Integer count,
@RequestParam(value = "flink_job_id", required = false) Long flinkJobId,
@RequestParam(value = "alias", required = false) String alias,
@RequestParam(value = "order", required = false) String order,
@RequestParam(value = "direction", required = false) String direction,
@RequestParam(value = "filter_run_mode", required = false) List<String> selectedRunMode,
@RequestParam(value = "filter_compaction_status", required = false) List<String> selectedCompactionStatus
) {
return infoService.findAllJobIdAndAlias(
page,
count,
flinkJobId,
alias,
order,
direction,
Lists.immutable.ofAll(selectedRunMode),
Lists.immutable.ofAll(selectedCompactionStatus)
);
}
@GetMapping("/job_metas")
public ImmutableList<JobAndMetas> jobAndMetas() {
return infoService.jobAndMetas();
}
@GetMapping("/flink_job/list")
public ImmutableList<FlinkJob> flinkJobs() {
return infoService.flinkJobs();
}
@GetMapping("/flink_job/detail")
public FlinkJob flinkJob(@RequestParam("flink_job_id") Long flinkJobId) {
return infoService.flinkJob(flinkJobId);
}
@GetMapping("/table_meta/list")
public ImmutableList<TableMeta> tableMetas(@RequestParam(value = "flink_job_id", required = false) Long flinkJobId) {
return ObjectUtil.isNull(flinkJobId)
? infoService.tableMetas()
: infoService.tableMetas(flinkJobId);
}
@GetMapping("/table_meta/detail")
public TableMeta tableMeta(@RequestParam("flink_job_id") Long flinkJobId, @RequestParam("alias") String alias) {
return infoService.tableMeta(flinkJobId, alias);
}
@GetMapping("/sync_state/detail")
public SyncState syncState(@RequestParam("flink_job_id") Long flinkJobId, @RequestParam("alias") String alias) {
return infoService.syncState(flinkJobId, alias);
}
}

View File

@@ -0,0 +1,189 @@
package com.lanyuanxiaoyao.service.info.service;
import cn.hutool.core.util.ObjectUtil;
import cn.hutool.core.util.StrUtil;
import com.eshore.odcp.hudi.connector.entity.FlinkJob;
import com.eshore.odcp.hudi.connector.entity.SyncState;
import com.eshore.odcp.hudi.connector.entity.TableMeta;
import com.eshore.odcp.hudi.connector.utils.database.DatabaseService;
import com.lanyuanxiaoyao.service.configuration.entity.PageResponse;
import com.lanyuanxiaoyao.service.configuration.entity.info.JobAndMetas;
import com.lanyuanxiaoyao.service.configuration.entity.info.JobIdAndAlias;
import java.util.List;
import org.eclipse.collections.api.list.ImmutableList;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.cache.annotation.CacheConfig;
import org.springframework.cache.annotation.Cacheable;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.retry.annotation.Retryable;
import org.springframework.stereotype.Service;
import org.springframework.transaction.support.TransactionTemplate;
import static com.eshore.odcp.hudi.connector.Constants.DATABASE_NAME;
/**
* @author lanyuanxiaoyao
* @date 2023-04-24
*/
@CacheConfig(cacheManager = "normal-cache")
@Service
public class InfoService {
private static final Logger logger = LoggerFactory.getLogger(InfoService.class);
private final DatabaseService databaseService;
private final JdbcTemplate mysqlJdbcTemplate;
private final TransactionTemplate mysqlTransactionTemplate;
public InfoService(DatabaseService databaseService, JdbcTemplate mysqlJdbcTemplate, TransactionTemplate mysqlTransactionTemplate) {
this.databaseService = databaseService;
this.mysqlJdbcTemplate = mysqlJdbcTemplate;
this.mysqlTransactionTemplate = mysqlTransactionTemplate;
}
@Cacheable(value = "sync-state", sync = true, key = "#flinkJobId.toString()+#alias")
@Retryable(Throwable.class)
public SyncState syncState(Long flinkJobId, String alias) {
return databaseService.getSyncState(flinkJobId, alias);
}
@Cacheable(value = "job-id-alias", sync = true)
@Retryable(Throwable.class)
public PageResponse<JobIdAndAlias> findAllJobIdAndAlias(
Integer page,
Integer count,
Long flinkJobId,
String alias,
String order,
String direction,
ImmutableList<String> selectedRunMode,
ImmutableList<String> selectedCompactionStatus
) {
return mysqlTransactionTemplate.execute(status -> {
int limit = Math.max(count, 1);
int offset = limit * Math.max(page - 1, 0);
String searchFlinkJobId = flinkJobId == null ? "" : "and job.id like '%" + flinkJobId + "%'\n";
String searchAlias = alias == null ? "" : "and info.alias like '%" + alias + "%'\n";
String filterRunMode = selectedRunMode == null || selectedRunMode.isEmpty()
? ""
: "and job.run_mode in ('" + selectedRunMode.makeString("','") + "')\n";
String filterCompactionStatus = selectedCompactionStatus == null || selectedCompactionStatus.isEmpty()
? ""
: "and state.compaction_status in ('" + selectedCompactionStatus.makeString("','") + "')\n";
String orderBy = "";
if (order != null && direction != null) {
String orderField = null;
switch (order) {
case "source_start_time":
case "sourceStartTime":
orderField = "state.source_start_time";
break;
case "source_checkpoint_time":
case "sourceCheckpointTime":
orderField = "state.source_checkpoint_time";
break;
case "source_publish_time":
case "sourcePublishTime":
orderField = "state.source_publish_time";
break;
case "source_operation_time":
case "sourceOperationTime":
orderField = "state.source_op_time";
break;
case "compaction_start_time":
case "compactionStartTime":
orderField = "state.compaction_start_time";
break;
case "compaction_finish_time":
case "compactionFinishTime":
orderField = "state.compaction_finish_time";
break;
}
String orderDirection = null;
switch (direction) {
case "desc":
case "DESC":
case "down":
case "DOWN":
orderDirection = "desc";
break;
case "asc":
case "ASC":
case "up":
case "UP":
orderDirection = "asc";
break;
}
if (ObjectUtil.isNotNull(orderField) && ObjectUtil.isNotNull(direction)) {
orderBy = StrUtil.format("order by {} {}\n", orderField, orderDirection);
}
}
Long total = mysqlJdbcTemplate.queryForObject(
"select count(*)\n" +
"from " + DATABASE_NAME + ".tb_app_flink_job_config job,\n" +
" " + DATABASE_NAME + ".tb_app_collect_table_info info,\n" +
" " + DATABASE_NAME + ".tb_app_hudi_sync_state state\n" +
"where job.id = info.flink_job_id\n" +
"and state.id = concat(job.id, '-', info.alias)\n" +
searchFlinkJobId +
searchAlias +
filterRunMode +
filterCompactionStatus +
orderBy,
Long.class
);
List<JobIdAndAlias> list = mysqlJdbcTemplate.query(
"select job.id, info.alias\n" +
"from " + DATABASE_NAME + ".tb_app_flink_job_config job,\n" +
" " + DATABASE_NAME + ".tb_app_collect_table_info info,\n" +
" " + DATABASE_NAME + ".tb_app_hudi_sync_state state\n" +
"where job.id = info.flink_job_id\n" +
"and state.id = concat(job.id, '-', info.alias)\n" +
searchFlinkJobId +
searchAlias +
filterRunMode +
filterCompactionStatus +
orderBy +
"limit " + limit + " offset " + offset,
(rs, row) -> new JobIdAndAlias(rs.getLong(1), rs.getString(2))
);
return new PageResponse<>(list, total);
});
}
@Cacheable(value = "job-metas", sync = true)
@Retryable(Throwable.class)
public ImmutableList<JobAndMetas> jobAndMetas() {
return databaseService.findAllFlinkJob().collect(job -> new JobAndMetas(job, databaseService.findTableMeta(job.getId()).toList()));
}
@Cacheable(value = "flink-jobs", sync = true)
@Retryable(Throwable.class)
public ImmutableList<FlinkJob> flinkJobs() {
return databaseService.findAllFlinkJob();
}
@Cacheable(value = "flink-jobs", sync = true, key = "#flinkJobId")
@Retryable(Throwable.class)
public FlinkJob flinkJob(Long flinkJobId) {
return databaseService.getFlinkJob(flinkJobId);
}
@Cacheable(value = "table-metas", sync = true)
@Retryable(Throwable.class)
public ImmutableList<TableMeta> tableMetas() {
return flinkJobs().flatCollect(job -> databaseService.findTableMeta(job.getId()));
}
@Cacheable(value = "table-metas", sync = true, key = "#flinkJobId")
@Retryable(Throwable.class)
public ImmutableList<TableMeta> tableMetas(Long flinkJobId) {
return databaseService.findTableMeta(flinkJobId);
}
@Cacheable(value = "table-metas", sync = true, key = "#flinkJobId.toString()+#alias")
@Retryable(Throwable.class)
public TableMeta tableMeta(Long flinkJobId, String alias) {
return databaseService.getTableMeta(flinkJobId, alias);
}
}

View File

@@ -0,0 +1,38 @@
server:
port: 0
spring:
application:
name: service-info-query
datasource:
url: jdbc:mysql://132.121.204.217:17906/iap-datahub?useSSL=false
username: odcp
password: wFg_fR492#&
driver-class-name: com.mysql.jdbc.Driver
druid:
initial-size: 5
max-active: 10
main:
banner-mode: off
jackson:
serialization:
fail-on-empty-beans: false
management:
endpoints:
web:
exposure:
include: "*"
endpoint:
prometheus:
enabled: true
metrics:
export:
jmx:
enabled: false
eureka:
instance:
hostname: localhost
prefer-ip-address: true
instance-id: ${spring.application.name}-${eureka.instance.hostname}-${random.uuid}-${datetime}
client:
service-url:
defaultZone: http://localhost:35670/eureka/

View File

@@ -0,0 +1,50 @@
<configuration>
<conversionRule conversionWord="clr" converterClass="org.springframework.boot.logging.logback.ColorConverter" />
<conversionRule conversionWord="wex" converterClass="org.springframework.boot.logging.logback.WhitespaceThrowableProxyConverter" />
<conversionRule conversionWord="wEx" converterClass="org.springframework.boot.logging.logback.ExtendedWhitespaceThrowableProxyConverter" />
<springProperty scope="context" name="LOKI_PUSH_URL" source="loki.url"/>
<springProperty scope="context" name="LOGGING_PARENT" source="logging.parent"/>
<springProperty scope="context" name="APP_NAME" source="spring.application.name"/>
<appender name="Loki" class="com.github.loki4j.logback.Loki4jAppender">
<metricsEnabled>true</metricsEnabled>
<http class="com.github.loki4j.logback.ApacheHttpSender">
<url>${LOKI_PUSH_URL:-http://localhost/loki/api/v1/push}</url>
</http>
<format>
<label>
<pattern>app=${APP_NAME:- },host=${HOSTNAME},level=%level</pattern>
</label>
<message>
<pattern>${FILE_LOG_PATTERN:-%d{${LOG_DATEFORMAT_PATTERN:-yyyy-MM-dd HH:mm:ss.SSS}} [${HOSTNAME}] ${LOG_LEVEL_PATTERN:-%5p} ${PID:- } -&#45;&#45; [%t] %-40.40logger{39} : %m%n${LOG_EXCEPTION_CONVERSION_WORD:-%wEx}}</pattern>
</message>
<sortByTime>true</sortByTime>
</format>
</appender>
<appender name="Console" class="ch.qos.logback.core.ConsoleAppender">
<encoder>
<pattern>${CONSOLE_LOG_PATTERN:-%clr(%d{${LOG_DATEFORMAT_PATTERN:-yyyy-MM-dd HH:mm:ss.SSS}}){faint} %clr(${LOG_LEVEL_PATTERN:-%5p}) %clr(${PID:- }){magenta} %clr(---){faint} %clr([%15.15t]){faint} %clr(%-40.40logger{39}){cyan} %clr(:){faint} %m%n${LOG_EXCEPTION_CONVERSION_WORD:-%wEx}}</pattern>
</encoder>
</appender>
<appender name="RollingFile" class="ch.qos.logback.core.rolling.RollingFileAppender">
<file>${LOGGING_PARENT:-.}/${APP_NAME:-run}.log</file>
<rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
<fileNamePattern>${LOGGING_PARENT:-.}/archive/${APP_NAME:-run}-%d{yyyy-MM-dd}.gz</fileNamePattern>
<MaxHistory>7</MaxHistory>
</rollingPolicy>
<encoder>
<pattern>${FILE_LOG_PATTERN:-%d{${LOG_DATEFORMAT_PATTERN:-yyyy-MM-dd HH:mm:ss.SSS}} [${HOSTNAME}] ${LOG_LEVEL_PATTERN:-%5p} ${PID:- } -&#45;&#45; [%t] %-40.40logger{39} : %m%n${LOG_EXCEPTION_CONVERSION_WORD:-%wEx}}</pattern>
</encoder>
</appender>
<logger name="com.zaxxer.hikari" level="ERROR"/>
<root level="INFO">
<appender-ref ref="Loki"/>
<appender-ref ref="Console"/>
<appender-ref ref="RollingFile"/>
</root>
</configuration>