feat(api): 迁移hudi-api项目到这里
This commit is contained in:
@@ -2,9 +2,10 @@
|
||||
|
||||
mvn -pl service-dependencies,service-configuration,service-forest,service-executor,service-executor/service-executor-core,service-cli,service-cli/service-cli-core clean source:jar deploy -D skipTests -P local -s ~/.m2/settings-development.xml
|
||||
mvn -pl service-gateway,service-queue,service-cli/service-cli-runner clean package spring-boot:repackage -D skipTests -s ~/.m2/settings-development.xml
|
||||
mvn -pl service-cloud-query,service-flink-query,service-info-query,service-loki-query,service-pulsar-query,service-yarn-query,service-zookeeper-query,service-web clean package spring-boot:repackage -D skipTests -s ~/.m2/settings-development.xml -P b2b12
|
||||
mvn -pl service-api,service-cloud-query,service-flink-query,service-info-query,service-loki-query,service-pulsar-query,service-yarn-query,service-zookeeper-query,service-web clean package spring-boot:repackage -D skipTests -s ~/.m2/settings-development.xml -P b2b12
|
||||
mvn -pl service-hudi-query,service-executor/service-executor-manager,service-executor/service-executor-task clean package -D skipTests -s ~/.m2/settings-development.xml -P b2b12
|
||||
|
||||
ytp-transfer2 /Users/lanyuanxiaoyao/Project/IdeaProjects/hudi-service/service-api/target/service-api-1.0.0-SNAPSHOT.jar
|
||||
ytp-transfer2 /Users/lanyuanxiaoyao/Project/IdeaProjects/hudi-service/service-cli/service-cli-runner/target/service-cli-runner-1.0.0-SNAPSHOT.jar
|
||||
ytp-transfer2 /Users/lanyuanxiaoyao/Project/IdeaProjects/hudi-service/service-cloud-query/target/service-cloud-query-1.0.0-SNAPSHOT.jar
|
||||
ytp-transfer2 /Users/lanyuanxiaoyao/Project/IdeaProjects/hudi-service/service-executor/service-executor-manager/target/service-executor-manager-1.0.0-SNAPSHOT.jar
|
||||
|
||||
6
bin/build-api.sh
Executable file
6
bin/build-api.sh
Executable file
@@ -0,0 +1,6 @@
|
||||
#!/bin/bash
|
||||
mvn -pl service-dependencies,service-configuration clean deploy -D skipTests -P local -s ~/.m2/settings-development.xml
|
||||
mvn -pl service-api clean package spring-boot:repackage -D skipTests -s ~/.m2/settings-development.xml
|
||||
file_path=/Users/lanyuanxiaoyao/Project/IdeaProjects/hudi-service/service-api/target/service-api-1.0.0-SNAPSHOT.jar
|
||||
ytp-transfer2 $file_path
|
||||
rm $file_path
|
||||
1
pom.xml
1
pom.xml
@@ -28,6 +28,7 @@
|
||||
<module>service-cloud-query</module>
|
||||
<module>service-exporter</module>
|
||||
<module>service-check</module>
|
||||
<module>service-api</module>
|
||||
</modules>
|
||||
|
||||
<properties>
|
||||
|
||||
47
service-api/pom.xml
Normal file
47
service-api/pom.xml
Normal file
@@ -0,0 +1,47 @@
|
||||
<?xml version="1.0" encoding="UTF-8"?>
|
||||
<project xmlns="http://maven.apache.org/POM/4.0.0"
|
||||
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
|
||||
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
|
||||
<modelVersion>4.0.0</modelVersion>
|
||||
<parent>
|
||||
<groupId>com.lanyuanxiaoyao</groupId>
|
||||
<artifactId>hudi-service</artifactId>
|
||||
<version>1.0.0-SNAPSHOT</version>
|
||||
</parent>
|
||||
|
||||
<artifactId>service-api</artifactId>
|
||||
|
||||
<dependencies>
|
||||
<dependency>
|
||||
<groupId>com.lanyuanxiaoyao</groupId>
|
||||
<artifactId>service-configuration</artifactId>
|
||||
<version>1.0.0-SNAPSHOT</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.springframework.boot</groupId>
|
||||
<artifactId>spring-boot-starter-jdbc</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>mysql</groupId>
|
||||
<artifactId>mysql-connector-java</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>com.alibaba</groupId>
|
||||
<artifactId>druid-spring-boot-starter</artifactId>
|
||||
<version>1.2.17</version>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
|
||||
<build>
|
||||
<plugins>
|
||||
<plugin>
|
||||
<groupId>org.apache.maven.plugins</groupId>
|
||||
<artifactId>maven-source-plugin</artifactId>
|
||||
</plugin>
|
||||
<plugin>
|
||||
<groupId>org.springframework.boot</groupId>
|
||||
<artifactId>spring-boot-maven-plugin</artifactId>
|
||||
</plugin>
|
||||
</plugins>
|
||||
</build>
|
||||
</project>
|
||||
@@ -0,0 +1,34 @@
|
||||
package com.lanyuanxiaoyao.service.api;
|
||||
|
||||
import com.lanyuanxiaoyao.service.configuration.SecurityConfig;
|
||||
import com.ulisesbocchio.jasyptspringboot.annotation.EnableEncryptableProperties;
|
||||
import org.springframework.boot.SpringApplication;
|
||||
import org.springframework.boot.autoconfigure.SpringBootApplication;
|
||||
import org.springframework.boot.context.properties.EnableConfigurationProperties;
|
||||
import org.springframework.cloud.client.discovery.EnableDiscoveryClient;
|
||||
import org.springframework.context.annotation.ComponentScan;
|
||||
import org.springframework.context.annotation.FilterType;
|
||||
import org.springframework.retry.annotation.EnableRetry;
|
||||
|
||||
/**
|
||||
* 启动类
|
||||
*
|
||||
* @author ZhangJiacheng
|
||||
* @date 2022-07-05
|
||||
*/
|
||||
@EnableDiscoveryClient
|
||||
@EnableConfigurationProperties
|
||||
@EnableEncryptableProperties
|
||||
@SpringBootApplication
|
||||
@ComponentScan(
|
||||
basePackages = {
|
||||
"com.lanyuanxiaoyao.service",
|
||||
},
|
||||
excludeFilters = @ComponentScan.Filter(type = FilterType.ASSIGNABLE_TYPE, classes = {SecurityConfig.class})
|
||||
)
|
||||
@EnableRetry
|
||||
public class ApiApplication {
|
||||
public static void main(String[] args) {
|
||||
SpringApplication.run(ApiApplication.class, args);
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,26 @@
|
||||
package com.lanyuanxiaoyao.service.api.configuration;
|
||||
|
||||
import javax.sql.DataSource;
|
||||
import org.springframework.context.annotation.Bean;
|
||||
import org.springframework.context.annotation.Configuration;
|
||||
import org.springframework.jdbc.datasource.DataSourceTransactionManager;
|
||||
import org.springframework.transaction.support.TransactionTemplate;
|
||||
|
||||
/**
|
||||
* JDBC 配置
|
||||
*
|
||||
* @author lanyuanxiaoyao
|
||||
* @date 2023-04-26
|
||||
*/
|
||||
@Configuration
|
||||
public class JdbcConfiguration {
|
||||
@Bean
|
||||
public DataSourceTransactionManager transactionManager(DataSource dataSource) {
|
||||
return new DataSourceTransactionManager(dataSource);
|
||||
}
|
||||
|
||||
@Bean
|
||||
public TransactionTemplate transactionTemplate(DataSourceTransactionManager dataSourceTransactionManager) {
|
||||
return new TransactionTemplate(dataSourceTransactionManager);
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,50 @@
|
||||
package com.lanyuanxiaoyao.service.api.configuration;
|
||||
|
||||
import com.lanyuanxiaoyao.service.configuration.SecurityProperties;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.springframework.context.annotation.Configuration;
|
||||
import org.springframework.security.config.annotation.authentication.builders.AuthenticationManagerBuilder;
|
||||
import org.springframework.security.config.annotation.web.builders.HttpSecurity;
|
||||
import org.springframework.security.config.annotation.web.configuration.EnableWebSecurity;
|
||||
import org.springframework.security.config.annotation.web.configuration.WebSecurityConfigurerAdapter;
|
||||
|
||||
/**
|
||||
* Spring Security Config
|
||||
*
|
||||
* @author lanyuanxiaoyao
|
||||
* @date 2023-01-30
|
||||
*/
|
||||
@Configuration
|
||||
@EnableWebSecurity
|
||||
public class SecurityConfig extends WebSecurityConfigurerAdapter {
|
||||
private static final Logger logger = LoggerFactory.getLogger(SecurityConfig.class);
|
||||
|
||||
private final SecurityProperties securityProperties;
|
||||
|
||||
public SecurityConfig(SecurityProperties securityProperties) {
|
||||
this.securityProperties = securityProperties;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void configure(HttpSecurity http) throws Exception {
|
||||
http.authorizeHttpRequests()
|
||||
.antMatchers("/actuator")
|
||||
.authenticated()
|
||||
.anyRequest()
|
||||
.permitAll()
|
||||
.and()
|
||||
.httpBasic()
|
||||
.and()
|
||||
.csrf()
|
||||
.disable();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void configure(AuthenticationManagerBuilder builder) throws Exception {
|
||||
builder.inMemoryAuthentication()
|
||||
.withUser(securityProperties.getUsername())
|
||||
.password("{noop}" + securityProperties.getDarkcode())
|
||||
.authorities(securityProperties.getAuthority());
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,164 @@
|
||||
package com.lanyuanxiaoyao.service.api.controller;
|
||||
|
||||
import cn.hutool.core.util.StrUtil;
|
||||
import com.eshore.odcp.hudi.connector.Constants;
|
||||
import com.github.loki4j.slf4j.marker.LabelMarker;
|
||||
import com.lanyuanxiaoyao.service.api.entity.HoodieCommitMetadata;
|
||||
import com.lanyuanxiaoyao.service.api.service.SyncStateService;
|
||||
import com.lanyuanxiaoyao.service.api.service.VersionUpdateService;
|
||||
import javax.annotation.Resource;
|
||||
import org.eclipse.collections.api.factory.Maps;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.slf4j.Marker;
|
||||
import org.springframework.web.bind.annotation.*;
|
||||
|
||||
/**
|
||||
* Rest 接口
|
||||
*
|
||||
* @author ZhangJiacheng
|
||||
* @date 2022-07-05
|
||||
*/
|
||||
@RequestMapping("api")
|
||||
@RestController
|
||||
public class ApiController {
|
||||
private static final Logger logger = LoggerFactory.getLogger(ApiController.class);
|
||||
|
||||
@Resource
|
||||
private SyncStateService syncStateService;
|
||||
@Resource
|
||||
private VersionUpdateService versionUpdateService;
|
||||
|
||||
private Marker makeMarker(Long flinkJobId, String alias) {
|
||||
return LabelMarker.of(() -> Maps.mutable.of(Constants.LOG_FLINK_JOB_ID, flinkJobId.toString(), Constants.LOG_ALIAS, alias));
|
||||
}
|
||||
|
||||
@ResponseBody
|
||||
@GetMapping("message_id")
|
||||
public String messageId(
|
||||
@RequestParam("flink_job_id") Long flinkJobId,
|
||||
@RequestParam("alias") String alias
|
||||
) {
|
||||
logger.info(makeMarker(flinkJobId, alias), "Get message id: {} {}", flinkJobId, alias);
|
||||
return syncStateService.getMessageId(flinkJobId, alias);
|
||||
}
|
||||
|
||||
@GetMapping("sync_start")
|
||||
public void syncStart(
|
||||
@RequestParam("flink_job_id") Long flinkJobId,
|
||||
@RequestParam("alias") String alias,
|
||||
@RequestParam(value = "cluster", required = false) String cluster,
|
||||
@RequestParam(value = "application_id", required = false) String applicationId
|
||||
) {
|
||||
logger.info(makeMarker(flinkJobId, alias), "Sync start: {} {} {} {}", flinkJobId, alias, cluster, applicationId);
|
||||
syncStateService.saveSyncStartTime(flinkJobId, alias, cluster, applicationId);
|
||||
}
|
||||
|
||||
@GetMapping("sync_checkpoint_state")
|
||||
public void syncCheckpointState(
|
||||
@RequestParam("flink_job_id") Long flinkJobId,
|
||||
@RequestParam("alias") String alias,
|
||||
@RequestParam("message_id") String messageId,
|
||||
@RequestParam("publish_time") Long publishTime
|
||||
) {
|
||||
logger.info(makeMarker(flinkJobId, alias), "Sync checkpoint state: {} {} {} {}", flinkJobId, alias, messageId, publishTime);
|
||||
syncStateService.saveSyncCheckpointState(flinkJobId, alias, messageId, publishTime);
|
||||
}
|
||||
|
||||
@GetMapping("sync_operation_state")
|
||||
public void syncOperationState(
|
||||
@RequestParam("flink_job_id") Long flinkJobId,
|
||||
@RequestParam("alias") String alias,
|
||||
@RequestParam(value = "operation_time",
|
||||
required = false) Long operationTime
|
||||
) {
|
||||
logger.info(makeMarker(flinkJobId, alias), "Sync operation state: {} {} {}", flinkJobId, alias, operationTime);
|
||||
syncStateService.saveSyncOperationState(flinkJobId, alias, operationTime);
|
||||
}
|
||||
|
||||
@GetMapping("compaction_start")
|
||||
public void compactionStart(
|
||||
@RequestParam("flink_job_id") Long flinkJobId,
|
||||
@RequestParam("alias") String alias,
|
||||
@RequestParam(value = "type", required = false) String type,
|
||||
@RequestParam(value = "cluster", required = false) String cluster,
|
||||
@RequestParam(value = "application_id", required = false) String applicationId
|
||||
) {
|
||||
logger.info(makeMarker(flinkJobId, alias), "Compaction start: {} {} {} {}", flinkJobId, alias, cluster, applicationId);
|
||||
syncStateService.saveCompactionStartTime(flinkJobId, alias, cluster, applicationId, null);
|
||||
}
|
||||
|
||||
@PostMapping("compaction_pre_commit")
|
||||
public void compactionPreCommit(
|
||||
@RequestParam("flink_job_id") Long flinkJobId,
|
||||
@RequestParam("alias") String alias,
|
||||
@RequestParam("instant") String instant,
|
||||
@RequestParam(value = "cluster", required = false) String cluster,
|
||||
@RequestParam(value = "application_id", required = false) String applicationId,
|
||||
@RequestBody HoodieCommitMetadata metadata
|
||||
) {
|
||||
logger.info(makeMarker(flinkJobId, alias), "Compaction state: {} {} {} {} {}", flinkJobId, alias, instant, cluster, applicationId);
|
||||
syncStateService.saveCompactionMetrics(
|
||||
flinkJobId,
|
||||
alias,
|
||||
"pre",
|
||||
instant,
|
||||
cluster,
|
||||
applicationId
|
||||
);
|
||||
}
|
||||
|
||||
@PostMapping("compaction_commit")
|
||||
public void compactionCommit(
|
||||
@RequestParam("flink_job_id") Long flinkJobId,
|
||||
@RequestParam("alias") String alias,
|
||||
@RequestParam("instant") String instant,
|
||||
@RequestParam(value = "cluster", required = false) String cluster,
|
||||
@RequestParam(value = "application_id", required = false) String applicationId,
|
||||
@RequestBody HoodieCommitMetadata metadata
|
||||
) {
|
||||
logger.info(makeMarker(flinkJobId, alias), "Compaction state: {} {} {} {} {}", flinkJobId, alias, instant, cluster, applicationId);
|
||||
syncStateService.saveCompactionFinishTime(flinkJobId, alias);
|
||||
syncStateService.saveCompactionMetrics(
|
||||
flinkJobId,
|
||||
alias,
|
||||
"complete",
|
||||
instant,
|
||||
cluster,
|
||||
applicationId
|
||||
);
|
||||
}
|
||||
|
||||
@PostMapping("compaction_finish")
|
||||
public void compactionFinish(
|
||||
@RequestParam("flink_job_id") Long flinkJobId,
|
||||
@RequestParam("alias") String alias,
|
||||
@RequestParam("time") Long timestamp,
|
||||
@RequestParam("state") Boolean success,
|
||||
@RequestBody(required = false) String exception
|
||||
) {
|
||||
logger.info(makeMarker(flinkJobId, alias), "Compaction finish: {} {} {} {}", flinkJobId, alias, timestamp, success);
|
||||
syncStateService.saveCompactionFinishTime(flinkJobId, alias, timestamp, StrUtil.isBlank(exception) ? null : exception);
|
||||
}
|
||||
|
||||
@GetMapping("version_update")
|
||||
public void versionUpdate(
|
||||
@RequestParam("flink_job_id") Long flinkJobId,
|
||||
@RequestParam("alias") String alias,
|
||||
@RequestParam("version") String version,
|
||||
@RequestParam(value = "opts", defaultValue = "") String opTs
|
||||
) {
|
||||
logger.info(makeMarker(flinkJobId, alias), "Version update: {} {} {}", flinkJobId, alias, version);
|
||||
versionUpdateService.saveUpdateVersion(flinkJobId, alias, version, opTs);
|
||||
}
|
||||
|
||||
@GetMapping("compaction_latest_operation_time")
|
||||
public void compactionLatestOperationTime(
|
||||
@RequestParam("flink_job_id") Long flinkJobId,
|
||||
@RequestParam("alias") String alias,
|
||||
@RequestParam("latest_op_ts") Long latestOpTs
|
||||
) {
|
||||
logger.info(makeMarker(flinkJobId, alias), "Compaction latest operation time: {} {} {}", flinkJobId, alias, latestOpTs);
|
||||
syncStateService.saveCompactionLatestOperationTime(flinkJobId, alias, latestOpTs);
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,79 @@
|
||||
package com.lanyuanxiaoyao.service.api.entity;
|
||||
|
||||
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
|
||||
|
||||
/**
|
||||
* Hoodie 提交信息
|
||||
*
|
||||
* @author ZhangJiacheng
|
||||
* @date 2022-07-05
|
||||
*/
|
||||
@JsonIgnoreProperties(ignoreUnknown = true)
|
||||
public class HoodieCommitMetadata {
|
||||
private Long totalScanTime;
|
||||
private Long totalLogFilesCompacted;
|
||||
private Long totalLogFilesSize;
|
||||
private Long totalRecordsDeleted;
|
||||
private Long totalCompactedRecordsUpdated;
|
||||
private Long totalLogRecordsCompacted;
|
||||
|
||||
public Long getTotalScanTime() {
|
||||
return totalScanTime;
|
||||
}
|
||||
|
||||
public void setTotalScanTime(Long totalScanTime) {
|
||||
this.totalScanTime = totalScanTime;
|
||||
}
|
||||
|
||||
public Long getTotalLogFilesCompacted() {
|
||||
return totalLogFilesCompacted;
|
||||
}
|
||||
|
||||
public void setTotalLogFilesCompacted(Long totalLogFilesCompacted) {
|
||||
this.totalLogFilesCompacted = totalLogFilesCompacted;
|
||||
}
|
||||
|
||||
public Long getTotalLogFilesSize() {
|
||||
return totalLogFilesSize;
|
||||
}
|
||||
|
||||
public void setTotalLogFilesSize(Long totalLogFilesSize) {
|
||||
this.totalLogFilesSize = totalLogFilesSize;
|
||||
}
|
||||
|
||||
public Long getTotalRecordsDeleted() {
|
||||
return totalRecordsDeleted;
|
||||
}
|
||||
|
||||
public void setTotalRecordsDeleted(Long totalRecordsDeleted) {
|
||||
this.totalRecordsDeleted = totalRecordsDeleted;
|
||||
}
|
||||
|
||||
public Long getTotalCompactedRecordsUpdated() {
|
||||
return totalCompactedRecordsUpdated;
|
||||
}
|
||||
|
||||
public void setTotalCompactedRecordsUpdated(Long totalCompactedRecordsUpdated) {
|
||||
this.totalCompactedRecordsUpdated = totalCompactedRecordsUpdated;
|
||||
}
|
||||
|
||||
public Long getTotalLogRecordsCompacted() {
|
||||
return totalLogRecordsCompacted;
|
||||
}
|
||||
|
||||
public void setTotalLogRecordsCompacted(Long totalLogRecordsCompacted) {
|
||||
this.totalLogRecordsCompacted = totalLogRecordsCompacted;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return "HoodieCommitMetadata{" +
|
||||
"totalScanTime=" + totalScanTime +
|
||||
", totalLogFilesCompacted=" + totalLogFilesCompacted +
|
||||
", totalLogFilesSize=" + totalLogFilesSize +
|
||||
", totalRecordsDeleted=" + totalRecordsDeleted +
|
||||
", totalCompactedRecordsUpdated=" + totalCompactedRecordsUpdated +
|
||||
", totalLogRecordsCompacted=" + totalLogRecordsCompacted +
|
||||
'}';
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,229 @@
|
||||
package com.lanyuanxiaoyao.service.api.service;
|
||||
|
||||
import club.kingon.sql.builder.SqlBuilder;
|
||||
import cn.hutool.core.date.DateUtil;
|
||||
import cn.hutool.core.util.ObjectUtil;
|
||||
import com.eshore.odcp.hudi.connector.Constants;
|
||||
import java.time.Instant;
|
||||
import java.util.Date;
|
||||
import java.util.List;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.springframework.jdbc.core.JdbcTemplate;
|
||||
import org.springframework.stereotype.Service;
|
||||
|
||||
import static com.eshore.odcp.hudi.connector.SQLConstants.HudiCollectBuild.TbAppHudiCompactionMetrics;
|
||||
import static com.eshore.odcp.hudi.connector.SQLConstants.HudiCollectBuild.TbAppHudiSyncState;
|
||||
|
||||
/**
|
||||
* 服务
|
||||
*
|
||||
* @author ZhangJiacheng
|
||||
* @date 2022-07-05
|
||||
*/
|
||||
@Service
|
||||
public class SyncStateService {
|
||||
private static final Logger logger = LoggerFactory.getLogger(SyncStateService.class);
|
||||
|
||||
private final JdbcTemplate jdbcTemplate;
|
||||
|
||||
public SyncStateService(JdbcTemplate jdbcTemplate) {
|
||||
this.jdbcTemplate = jdbcTemplate;
|
||||
}
|
||||
|
||||
private String syncStateId(Long flinkJobId, String alias) {
|
||||
return flinkJobId + "-" + alias;
|
||||
}
|
||||
|
||||
private Date now() {
|
||||
return Date.from(Instant.now());
|
||||
}
|
||||
|
||||
public String getMessageId(Long flinkJobId, String alias) {
|
||||
List<String> ids = jdbcTemplate.queryForList(
|
||||
SqlBuilder
|
||||
.select(TbAppHudiSyncState.MESSAGE_ID_A)
|
||||
.from(TbAppHudiSyncState._alias_)
|
||||
.whereEq(TbAppHudiSyncState.ID_A, null)
|
||||
.precompileSql(),
|
||||
String.class,
|
||||
syncStateId(flinkJobId, alias)
|
||||
);
|
||||
if (ObjectUtil.isNotEmpty(ids)) {
|
||||
return ids.get(0);
|
||||
}
|
||||
return "";
|
||||
}
|
||||
|
||||
public void saveSyncStartTime(Long flinkJobId, String alias, String cluster, String applicationId) {
|
||||
jdbcTemplate.update(
|
||||
SqlBuilder
|
||||
.insertInto(
|
||||
TbAppHudiSyncState._origin_,
|
||||
TbAppHudiSyncState.ID_O,
|
||||
TbAppHudiSyncState.SOURCE_START_TIME_O,
|
||||
TbAppHudiSyncState.SOURCE_CLUSTER_O,
|
||||
TbAppHudiSyncState.SOURCE_APPLICATION_ID_O
|
||||
)
|
||||
.values()
|
||||
.addValue(null, null, null, null)
|
||||
.onDuplicateKeyUpdateColumn(TbAppHudiSyncState.SOURCE_START_TIME_O)
|
||||
.addUpdateColumn(TbAppHudiSyncState.SOURCE_CLUSTER_O)
|
||||
.addUpdateColumn(TbAppHudiSyncState.SOURCE_APPLICATION_ID_O)
|
||||
.precompileSql(),
|
||||
syncStateId(flinkJobId, alias),
|
||||
now(),
|
||||
cluster,
|
||||
applicationId
|
||||
);
|
||||
}
|
||||
|
||||
public void saveSyncCheckpointState(Long flinkJobId, String alias, String messageId, Long publishTime) {
|
||||
Date publishDate = Date.from(Instant.ofEpochMilli(publishTime));
|
||||
jdbcTemplate.update(
|
||||
SqlBuilder
|
||||
.insertInto(
|
||||
TbAppHudiSyncState._origin_,
|
||||
TbAppHudiSyncState.ID_O,
|
||||
TbAppHudiSyncState.MESSAGE_ID_O,
|
||||
TbAppHudiSyncState.SOURCE_PUBLISH_TIME_O,
|
||||
TbAppHudiSyncState.SOURCE_CHECKPOINT_TIME_O
|
||||
)
|
||||
.values()
|
||||
.addValue(null, null, null, null)
|
||||
.onDuplicateKeyUpdateColumn(TbAppHudiSyncState.MESSAGE_ID_O)
|
||||
.addUpdateColumn(TbAppHudiSyncState.SOURCE_PUBLISH_TIME_O)
|
||||
.addUpdateColumn(TbAppHudiSyncState.SOURCE_CHECKPOINT_TIME_O)
|
||||
.precompileSql(),
|
||||
syncStateId(flinkJobId, alias),
|
||||
messageId,
|
||||
publishDate,
|
||||
now()
|
||||
);
|
||||
}
|
||||
|
||||
public void saveSyncOperationState(Long flinkJobId, String alias, Long operationTime) {
|
||||
Date operationDate = ObjectUtil.isNull(operationTime) ? null : Date.from(Instant.ofEpochMilli(operationTime));
|
||||
jdbcTemplate.update(
|
||||
SqlBuilder
|
||||
.insertInto(
|
||||
TbAppHudiSyncState._origin_,
|
||||
TbAppHudiSyncState.ID_O,
|
||||
TbAppHudiSyncState.SOURCE_OP_TIME_O
|
||||
)
|
||||
.values()
|
||||
.addValue(null, null)
|
||||
.onDuplicateKeyUpdateColumn(TbAppHudiSyncState.SOURCE_OP_TIME_O)
|
||||
.precompileSql(),
|
||||
syncStateId(flinkJobId, alias),
|
||||
operationDate
|
||||
);
|
||||
}
|
||||
|
||||
public void saveCompactionStartTime(Long flinkJobId, String alias, String cluster, String applicationId, String message) {
|
||||
jdbcTemplate.update(
|
||||
SqlBuilder
|
||||
.insertInto(
|
||||
TbAppHudiSyncState._origin_,
|
||||
TbAppHudiSyncState.ID_O,
|
||||
TbAppHudiSyncState.COMPACTION_START_TIME_O,
|
||||
TbAppHudiSyncState.COMPACTION_MESSAGE_O,
|
||||
TbAppHudiSyncState.COMPACTION_STATUS_O,
|
||||
TbAppHudiSyncState.COMPACTION_STATUS_TIME_O,
|
||||
TbAppHudiSyncState.COMPACTION_CLUSTER_O,
|
||||
TbAppHudiSyncState.COMPACTION_APPLICATION_ID_O
|
||||
)
|
||||
.values()
|
||||
.addValue(null, null, null, null, null, null, null)
|
||||
.onDuplicateKeyUpdateColumn(TbAppHudiSyncState.COMPACTION_START_TIME_O)
|
||||
.addUpdateColumn(TbAppHudiSyncState.COMPACTION_MESSAGE_O)
|
||||
.addUpdateColumn(TbAppHudiSyncState.COMPACTION_STATUS_O)
|
||||
.addUpdateColumn(TbAppHudiSyncState.COMPACTION_STATUS_TIME_O)
|
||||
.addUpdateColumn(TbAppHudiSyncState.COMPACTION_CLUSTER_O)
|
||||
.addUpdateColumn(TbAppHudiSyncState.COMPACTION_APPLICATION_ID_O)
|
||||
.precompileSql(),
|
||||
syncStateId(flinkJobId, alias),
|
||||
now(),
|
||||
message,
|
||||
Constants.COMPACTION_STATUS_START,
|
||||
now(),
|
||||
cluster,
|
||||
applicationId
|
||||
);
|
||||
}
|
||||
|
||||
public void saveCompactionFinishTime(Long flinkJobId, String alias) {
|
||||
saveCompactionFinishTime(flinkJobId, alias, Instant.now().toEpochMilli(), "");
|
||||
}
|
||||
|
||||
public void saveCompactionFinishTime(Long flinkJobId, String alias, Long timestamp, String message) {
|
||||
Date finishDate = Date.from(Instant.ofEpochMilli(timestamp));
|
||||
jdbcTemplate.update(
|
||||
SqlBuilder
|
||||
.insertInto(
|
||||
TbAppHudiSyncState._origin_,
|
||||
TbAppHudiSyncState.ID_O,
|
||||
TbAppHudiSyncState.COMPACTION_FINISH_TIME_O,
|
||||
TbAppHudiSyncState.COMPACTION_MESSAGE_O,
|
||||
TbAppHudiSyncState.COMPACTION_STATUS_O,
|
||||
TbAppHudiSyncState.COMPACTION_STATUS_TIME_O
|
||||
)
|
||||
.values()
|
||||
.addValue(null, null, null, null, null)
|
||||
.onDuplicateKeyUpdateColumn(TbAppHudiSyncState.COMPACTION_FINISH_TIME_O)
|
||||
.addUpdateColumn(TbAppHudiSyncState.COMPACTION_MESSAGE_O)
|
||||
.addUpdateColumn(TbAppHudiSyncState.COMPACTION_STATUS_O)
|
||||
.addUpdateColumn(TbAppHudiSyncState.COMPACTION_STATUS_TIME_O)
|
||||
.precompileSql(),
|
||||
syncStateId(flinkJobId, alias),
|
||||
finishDate,
|
||||
message,
|
||||
Constants.COMPACTION_STATUS_FINISH,
|
||||
now()
|
||||
);
|
||||
}
|
||||
|
||||
public void saveCompactionMetrics(Long flinkJobId, String alias, String type, String instant, String cluster, String applicationId) {
|
||||
jdbcTemplate.update(
|
||||
SqlBuilder
|
||||
.insertInto(
|
||||
TbAppHudiCompactionMetrics._origin_,
|
||||
TbAppHudiCompactionMetrics.FLINK_JOB_ID_O,
|
||||
TbAppHudiCompactionMetrics.ALIAS_O,
|
||||
TbAppHudiCompactionMetrics.TYPE_O,
|
||||
TbAppHudiCompactionMetrics.COMPACTION_PLAN_INSTANT_O,
|
||||
TbAppHudiCompactionMetrics.CLUSTER_O,
|
||||
TbAppHudiCompactionMetrics.APPLICATION_ID_O,
|
||||
TbAppHudiCompactionMetrics.UPDATE_TIME_O
|
||||
)
|
||||
.values()
|
||||
.addValue(null, null, null, null, null, null, null)
|
||||
.precompileSql(),
|
||||
flinkJobId,
|
||||
alias,
|
||||
type,
|
||||
Date.from(Instant.ofEpochMilli(DateUtil.parse(instant).getTime())),
|
||||
cluster,
|
||||
applicationId,
|
||||
now()
|
||||
);
|
||||
}
|
||||
|
||||
public void saveCompactionLatestOperationTime(Long flinkJobId, String alias, Long latestOperationTime) {
|
||||
Date operationDate = ObjectUtil.isNull(latestOperationTime) ? null : Date.from(Instant.ofEpochMilli(latestOperationTime));
|
||||
jdbcTemplate.update(
|
||||
SqlBuilder
|
||||
.insertInto(
|
||||
TbAppHudiSyncState._origin_,
|
||||
TbAppHudiSyncState.ID_O,
|
||||
TbAppHudiSyncState.COMPACTION_LATEST_OP_TS_O
|
||||
)
|
||||
.values()
|
||||
.addValue(null, null)
|
||||
.onDuplicateKeyUpdateColumn(TbAppHudiSyncState.COMPACTION_LATEST_OP_TS_O)
|
||||
.precompileSql(),
|
||||
syncStateId(flinkJobId, alias),
|
||||
operationDate
|
||||
);
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,54 @@
|
||||
package com.lanyuanxiaoyao.service.api.service;
|
||||
|
||||
import club.kingon.sql.builder.SqlBuilder;
|
||||
import java.time.Instant;
|
||||
import java.util.Date;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.springframework.jdbc.core.JdbcTemplate;
|
||||
import org.springframework.stereotype.Service;
|
||||
|
||||
import static com.eshore.odcp.hudi.connector.SQLConstants.HudiCollectBuild.TbAppCollectTableVersion;
|
||||
|
||||
/**
|
||||
* 跨天版本更新
|
||||
*
|
||||
* @author ZhangJiacheng
|
||||
* @date 2022-11-17
|
||||
*/
|
||||
@Service
|
||||
public class VersionUpdateService {
|
||||
private static final Logger logger = LoggerFactory.getLogger(VersionUpdateService.class);
|
||||
|
||||
private final JdbcTemplate jdbcTemplate;
|
||||
|
||||
public VersionUpdateService(JdbcTemplate jdbcTemplate) {
|
||||
this.jdbcTemplate = jdbcTemplate;
|
||||
}
|
||||
|
||||
private Date now() {
|
||||
return Date.from(Instant.now());
|
||||
}
|
||||
|
||||
public void saveUpdateVersion(Long flinkJobId, String alias, String version, String opTs) {
|
||||
jdbcTemplate.update(
|
||||
SqlBuilder
|
||||
.insertInto(
|
||||
TbAppCollectTableVersion._origin_,
|
||||
TbAppCollectTableVersion.FLINK_JOB_ID_O,
|
||||
TbAppCollectTableVersion.ALIAS_O,
|
||||
TbAppCollectTableVersion.VERSION_O,
|
||||
TbAppCollectTableVersion.OP_TS_O,
|
||||
TbAppCollectTableVersion.CREATE_TIME_O
|
||||
)
|
||||
.values()
|
||||
.addValue(null, null, null, null, null)
|
||||
.precompileSql(),
|
||||
flinkJobId,
|
||||
alias,
|
||||
version,
|
||||
opTs,
|
||||
now()
|
||||
);
|
||||
}
|
||||
}
|
||||
13
service-api/src/main/resources/application.yml
Normal file
13
service-api/src/main/resources/application.yml
Normal file
@@ -0,0 +1,13 @@
|
||||
spring:
|
||||
application:
|
||||
name: service-api
|
||||
profiles:
|
||||
include: random-port,common,discovery,metrics
|
||||
datasource:
|
||||
url: ENC(tDeB9gYs1IHN90VV+KhNQAVEKSqeInEmFhgma7A0g6surB5pOyJC3lJx0QrvQo4zYz0WOhSgqjglHDSX7nh7k9ak3OQMgLYkHQGVawAZfcsGT/1m0csyjQzCxTCOZr5r)
|
||||
username: ENC(29BKiU1cMKlA61gszFeYfWvoDVtLCCQAtfraxMq+f6Gm2LFu+67lkkBhoWgWIJga)
|
||||
password: ENC(t+7GZM/tfqYeTlOugjcO6lUsHlacoVaomxLOeDpb6LhVB5+wbZHkKkW1xke8jNhQ)
|
||||
driver-class-name: com.mysql.jdbc.Driver
|
||||
druid:
|
||||
initial-size: 5
|
||||
max-active: 10
|
||||
52
service-api/src/main/resources/logback-spring.xml
Normal file
52
service-api/src/main/resources/logback-spring.xml
Normal file
@@ -0,0 +1,52 @@
|
||||
<configuration>
|
||||
<conversionRule conversionWord="clr" converterClass="org.springframework.boot.logging.logback.ColorConverter" />
|
||||
<conversionRule conversionWord="wex" converterClass="org.springframework.boot.logging.logback.WhitespaceThrowableProxyConverter" />
|
||||
<conversionRule conversionWord="wEx" converterClass="org.springframework.boot.logging.logback.ExtendedWhitespaceThrowableProxyConverter" />
|
||||
|
||||
<springProperty scope="context" name="LOKI_PUSH_URL" source="loki.url"/>
|
||||
<springProperty scope="context" name="LOGGING_PARENT" source="logging.parent"/>
|
||||
<springProperty scope="context" name="APP_NAME" source="spring.application.name"/>
|
||||
|
||||
<appender name="Loki" class="com.github.loki4j.logback.Loki4jAppender">
|
||||
<metricsEnabled>true</metricsEnabled>
|
||||
<http class="com.github.loki4j.logback.ApacheHttpSender">
|
||||
<url>${LOKI_PUSH_URL:-http://localhost/loki/api/v1/push}</url>
|
||||
</http>
|
||||
<format>
|
||||
<label>
|
||||
<pattern>app=${APP_NAME:-none},host=${HOSTNAME:-none},level=%level</pattern>
|
||||
<readMarkers>true</readMarkers>
|
||||
</label>
|
||||
<message>
|
||||
<pattern>%d{yyyy-MM-dd HH:mm:ss.SSS} %p [${HOSTNAME}] [%t] %logger #@# %m%n%wEx</pattern>
|
||||
</message>
|
||||
<sortByTime>true</sortByTime>
|
||||
</format>
|
||||
</appender>
|
||||
|
||||
<appender name="Console" class="ch.qos.logback.core.ConsoleAppender">
|
||||
<encoder>
|
||||
<pattern>%d{yyyy-MM-dd HH:mm:ss.SSS} %clr(%5p) %clr([${HOSTNAME}]){yellow} %clr([%t]){magenta} %clr(%logger{40}){cyan} #@# %m%n%wEx</pattern>
|
||||
</encoder>
|
||||
</appender>
|
||||
|
||||
<appender name="RollingFile" class="ch.qos.logback.core.rolling.RollingFileAppender">
|
||||
<file>${LOGGING_PARENT:-.}/${APP_NAME:-run}.log</file>
|
||||
<rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
|
||||
<fileNamePattern>${LOGGING_PARENT:-.}/archive/${APP_NAME:-run}-%d{yyyy-MM-dd}.gz</fileNamePattern>
|
||||
<MaxHistory>7</MaxHistory>
|
||||
</rollingPolicy>
|
||||
<encoder>
|
||||
<pattern>%d{yyyy-MM-dd HH:mm:ss.SSS} %p [${HOSTNAME}] [%t] %logger #@# %m%n%wEx</pattern>
|
||||
</encoder>
|
||||
</appender>
|
||||
|
||||
<logger name="com.zaxxer.hikari" level="ERROR"/>
|
||||
<logger name="com.netflix.discovery.shared.resolver.aws.ConfigClusterResolver" level="WARN"/>
|
||||
|
||||
<root level="INFO">
|
||||
<appender-ref ref="Loki"/>
|
||||
<appender-ref ref="Console"/>
|
||||
<appender-ref ref="RollingFile"/>
|
||||
</root>
|
||||
</configuration>
|
||||
@@ -17,7 +17,7 @@ import org.springframework.stereotype.Component;
|
||||
@Configuration
|
||||
@ConfigurationProperties("deploy")
|
||||
public class DeployInformationProperties {
|
||||
private Generate generate;
|
||||
private Generate generate = new Generate();
|
||||
private Boolean shuffler = false;
|
||||
private RuntimeInfo runtime;
|
||||
private Map<String, HostInfo> hosts;
|
||||
|
||||
@@ -30,7 +30,7 @@ deploy:
|
||||
sync-clusters: b12
|
||||
compaction-clusters: b12,b1,b5,a4
|
||||
services:
|
||||
api:
|
||||
service-api:
|
||||
replicas: 10
|
||||
service-launcher-runner-b1:
|
||||
replicas: 6
|
||||
|
||||
@@ -9,9 +9,9 @@ deploy:
|
||||
arguments:
|
||||
data_save_enable: true
|
||||
data_save_location: ${deploy.runtime.data-path}
|
||||
api:
|
||||
service-api:
|
||||
order: 1
|
||||
source-jar: api-1.0.0-SNAPSHOT.jar
|
||||
source-jar: service-api-1.0.0-SNAPSHOT.jar
|
||||
replicas: 10
|
||||
service-scheduler:
|
||||
order: 3
|
||||
|
||||
@@ -45,7 +45,7 @@ public class GatewayApplication {
|
||||
@Bean
|
||||
public RouteLocator routeLocator(RouteLocatorBuilder builder) {
|
||||
return builder.routes()
|
||||
.route("hudi", createRoute("/hudi_services/hudi_api", "lb://hudi-api"))
|
||||
.route("hudi", createRoute("/hudi_services/hudi_api", "lb://service-api"))
|
||||
.route("queue", createRoute("/hudi_services/queue", "lb://service-queue"))
|
||||
.route("scheduler", createRoute("/hudi_services/service_scheduler", "lb://service-scheduler"))
|
||||
.route("web", createRoute("/hudi_services/service_web", "lb://service-web"))
|
||||
|
||||
Reference in New Issue
Block a user