diff --git a/bin/build-all.sh b/bin/build-all.sh
index 9a503cd..48202e0 100644
--- a/bin/build-all.sh
+++ b/bin/build-all.sh
@@ -2,9 +2,10 @@
mvn -pl service-dependencies,service-configuration,service-forest,service-executor,service-executor/service-executor-core,service-cli,service-cli/service-cli-core clean source:jar deploy -D skipTests -P local -s ~/.m2/settings-development.xml
mvn -pl service-gateway,service-queue,service-cli/service-cli-runner clean package spring-boot:repackage -D skipTests -s ~/.m2/settings-development.xml
-mvn -pl service-cloud-query,service-flink-query,service-info-query,service-loki-query,service-pulsar-query,service-yarn-query,service-zookeeper-query,service-web clean package spring-boot:repackage -D skipTests -s ~/.m2/settings-development.xml -P b2b12
+mvn -pl service-api,service-cloud-query,service-flink-query,service-info-query,service-loki-query,service-pulsar-query,service-yarn-query,service-zookeeper-query,service-web clean package spring-boot:repackage -D skipTests -s ~/.m2/settings-development.xml -P b2b12
mvn -pl service-hudi-query,service-executor/service-executor-manager,service-executor/service-executor-task clean package -D skipTests -s ~/.m2/settings-development.xml -P b2b12
+ytp-transfer2 /Users/lanyuanxiaoyao/Project/IdeaProjects/hudi-service/service-api/target/service-api-1.0.0-SNAPSHOT.jar
ytp-transfer2 /Users/lanyuanxiaoyao/Project/IdeaProjects/hudi-service/service-cli/service-cli-runner/target/service-cli-runner-1.0.0-SNAPSHOT.jar
ytp-transfer2 /Users/lanyuanxiaoyao/Project/IdeaProjects/hudi-service/service-cloud-query/target/service-cloud-query-1.0.0-SNAPSHOT.jar
ytp-transfer2 /Users/lanyuanxiaoyao/Project/IdeaProjects/hudi-service/service-executor/service-executor-manager/target/service-executor-manager-1.0.0-SNAPSHOT.jar
diff --git a/bin/build-api.sh b/bin/build-api.sh
new file mode 100755
index 0000000..fa658ce
--- /dev/null
+++ b/bin/build-api.sh
@@ -0,0 +1,6 @@
+#!/bin/bash
+mvn -pl service-dependencies,service-configuration clean deploy -D skipTests -P local -s ~/.m2/settings-development.xml
+mvn -pl service-api clean package spring-boot:repackage -D skipTests -s ~/.m2/settings-development.xml
+file_path=/Users/lanyuanxiaoyao/Project/IdeaProjects/hudi-service/service-api/target/service-api-1.0.0-SNAPSHOT.jar
+ytp-transfer2 $file_path
+rm $file_path
\ No newline at end of file
diff --git a/pom.xml b/pom.xml
index d49c3e5..93dd0aa 100644
--- a/pom.xml
+++ b/pom.xml
@@ -28,6 +28,7 @@
service-cloud-query
service-exporter
service-check
+ service-api
diff --git a/service-api/pom.xml b/service-api/pom.xml
new file mode 100644
index 0000000..7b266d5
--- /dev/null
+++ b/service-api/pom.xml
@@ -0,0 +1,47 @@
+
+
+ 4.0.0
+
+ com.lanyuanxiaoyao
+ hudi-service
+ 1.0.0-SNAPSHOT
+
+
+ service-api
+
+
+
+ com.lanyuanxiaoyao
+ service-configuration
+ 1.0.0-SNAPSHOT
+
+
+ org.springframework.boot
+ spring-boot-starter-jdbc
+
+
+ mysql
+ mysql-connector-java
+
+
+ com.alibaba
+ druid-spring-boot-starter
+ 1.2.17
+
+
+
+
+
+
+ org.apache.maven.plugins
+ maven-source-plugin
+
+
+ org.springframework.boot
+ spring-boot-maven-plugin
+
+
+
+
\ No newline at end of file
diff --git a/service-api/src/main/java/com/lanyuanxiaoyao/service/api/ApiApplication.java b/service-api/src/main/java/com/lanyuanxiaoyao/service/api/ApiApplication.java
new file mode 100644
index 0000000..452e2b9
--- /dev/null
+++ b/service-api/src/main/java/com/lanyuanxiaoyao/service/api/ApiApplication.java
@@ -0,0 +1,34 @@
+package com.lanyuanxiaoyao.service.api;
+
+import com.lanyuanxiaoyao.service.configuration.SecurityConfig;
+import com.ulisesbocchio.jasyptspringboot.annotation.EnableEncryptableProperties;
+import org.springframework.boot.SpringApplication;
+import org.springframework.boot.autoconfigure.SpringBootApplication;
+import org.springframework.boot.context.properties.EnableConfigurationProperties;
+import org.springframework.cloud.client.discovery.EnableDiscoveryClient;
+import org.springframework.context.annotation.ComponentScan;
+import org.springframework.context.annotation.FilterType;
+import org.springframework.retry.annotation.EnableRetry;
+
+/**
+ * 启动类
+ *
+ * @author ZhangJiacheng
+ * @date 2022-07-05
+ */
+@EnableDiscoveryClient
+@EnableConfigurationProperties
+@EnableEncryptableProperties
+@SpringBootApplication
+@ComponentScan(
+ basePackages = {
+ "com.lanyuanxiaoyao.service",
+ },
+ excludeFilters = @ComponentScan.Filter(type = FilterType.ASSIGNABLE_TYPE, classes = {SecurityConfig.class})
+)
+@EnableRetry
+public class ApiApplication {
+ public static void main(String[] args) {
+ SpringApplication.run(ApiApplication.class, args);
+ }
+}
diff --git a/service-api/src/main/java/com/lanyuanxiaoyao/service/api/configuration/JdbcConfiguration.java b/service-api/src/main/java/com/lanyuanxiaoyao/service/api/configuration/JdbcConfiguration.java
new file mode 100644
index 0000000..8c07883
--- /dev/null
+++ b/service-api/src/main/java/com/lanyuanxiaoyao/service/api/configuration/JdbcConfiguration.java
@@ -0,0 +1,26 @@
+package com.lanyuanxiaoyao.service.api.configuration;
+
+import javax.sql.DataSource;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.jdbc.datasource.DataSourceTransactionManager;
+import org.springframework.transaction.support.TransactionTemplate;
+
+/**
+ * JDBC 配置
+ *
+ * @author lanyuanxiaoyao
+ * @date 2023-04-26
+ */
+@Configuration
+public class JdbcConfiguration {
+ @Bean
+ public DataSourceTransactionManager transactionManager(DataSource dataSource) {
+ return new DataSourceTransactionManager(dataSource);
+ }
+
+ @Bean
+ public TransactionTemplate transactionTemplate(DataSourceTransactionManager dataSourceTransactionManager) {
+ return new TransactionTemplate(dataSourceTransactionManager);
+ }
+}
diff --git a/service-api/src/main/java/com/lanyuanxiaoyao/service/api/configuration/SecurityConfig.java b/service-api/src/main/java/com/lanyuanxiaoyao/service/api/configuration/SecurityConfig.java
new file mode 100644
index 0000000..9ece647
--- /dev/null
+++ b/service-api/src/main/java/com/lanyuanxiaoyao/service/api/configuration/SecurityConfig.java
@@ -0,0 +1,50 @@
+package com.lanyuanxiaoyao.service.api.configuration;
+
+import com.lanyuanxiaoyao.service.configuration.SecurityProperties;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.security.config.annotation.authentication.builders.AuthenticationManagerBuilder;
+import org.springframework.security.config.annotation.web.builders.HttpSecurity;
+import org.springframework.security.config.annotation.web.configuration.EnableWebSecurity;
+import org.springframework.security.config.annotation.web.configuration.WebSecurityConfigurerAdapter;
+
+/**
+ * Spring Security Config
+ *
+ * @author lanyuanxiaoyao
+ * @date 2023-01-30
+ */
+@Configuration
+@EnableWebSecurity
+public class SecurityConfig extends WebSecurityConfigurerAdapter {
+ private static final Logger logger = LoggerFactory.getLogger(SecurityConfig.class);
+
+ private final SecurityProperties securityProperties;
+
+ public SecurityConfig(SecurityProperties securityProperties) {
+ this.securityProperties = securityProperties;
+ }
+
+ @Override
+ protected void configure(HttpSecurity http) throws Exception {
+ http.authorizeHttpRequests()
+ .antMatchers("/actuator")
+ .authenticated()
+ .anyRequest()
+ .permitAll()
+ .and()
+ .httpBasic()
+ .and()
+ .csrf()
+ .disable();
+ }
+
+ @Override
+ protected void configure(AuthenticationManagerBuilder builder) throws Exception {
+ builder.inMemoryAuthentication()
+ .withUser(securityProperties.getUsername())
+ .password("{noop}" + securityProperties.getDarkcode())
+ .authorities(securityProperties.getAuthority());
+ }
+}
diff --git a/service-api/src/main/java/com/lanyuanxiaoyao/service/api/controller/ApiController.java b/service-api/src/main/java/com/lanyuanxiaoyao/service/api/controller/ApiController.java
new file mode 100644
index 0000000..2f2d323
--- /dev/null
+++ b/service-api/src/main/java/com/lanyuanxiaoyao/service/api/controller/ApiController.java
@@ -0,0 +1,164 @@
+package com.lanyuanxiaoyao.service.api.controller;
+
+import cn.hutool.core.util.StrUtil;
+import com.eshore.odcp.hudi.connector.Constants;
+import com.github.loki4j.slf4j.marker.LabelMarker;
+import com.lanyuanxiaoyao.service.api.entity.HoodieCommitMetadata;
+import com.lanyuanxiaoyao.service.api.service.SyncStateService;
+import com.lanyuanxiaoyao.service.api.service.VersionUpdateService;
+import javax.annotation.Resource;
+import org.eclipse.collections.api.factory.Maps;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.slf4j.Marker;
+import org.springframework.web.bind.annotation.*;
+
+/**
+ * Rest 接口
+ *
+ * @author ZhangJiacheng
+ * @date 2022-07-05
+ */
+@RequestMapping("api")
+@RestController
+public class ApiController {
+ private static final Logger logger = LoggerFactory.getLogger(ApiController.class);
+
+ @Resource
+ private SyncStateService syncStateService;
+ @Resource
+ private VersionUpdateService versionUpdateService;
+
+ private Marker makeMarker(Long flinkJobId, String alias) {
+ return LabelMarker.of(() -> Maps.mutable.of(Constants.LOG_FLINK_JOB_ID, flinkJobId.toString(), Constants.LOG_ALIAS, alias));
+ }
+
+ @ResponseBody
+ @GetMapping("message_id")
+ public String messageId(
+ @RequestParam("flink_job_id") Long flinkJobId,
+ @RequestParam("alias") String alias
+ ) {
+ logger.info(makeMarker(flinkJobId, alias), "Get message id: {} {}", flinkJobId, alias);
+ return syncStateService.getMessageId(flinkJobId, alias);
+ }
+
+ @GetMapping("sync_start")
+ public void syncStart(
+ @RequestParam("flink_job_id") Long flinkJobId,
+ @RequestParam("alias") String alias,
+ @RequestParam(value = "cluster", required = false) String cluster,
+ @RequestParam(value = "application_id", required = false) String applicationId
+ ) {
+ logger.info(makeMarker(flinkJobId, alias), "Sync start: {} {} {} {}", flinkJobId, alias, cluster, applicationId);
+ syncStateService.saveSyncStartTime(flinkJobId, alias, cluster, applicationId);
+ }
+
+ @GetMapping("sync_checkpoint_state")
+ public void syncCheckpointState(
+ @RequestParam("flink_job_id") Long flinkJobId,
+ @RequestParam("alias") String alias,
+ @RequestParam("message_id") String messageId,
+ @RequestParam("publish_time") Long publishTime
+ ) {
+ logger.info(makeMarker(flinkJobId, alias), "Sync checkpoint state: {} {} {} {}", flinkJobId, alias, messageId, publishTime);
+ syncStateService.saveSyncCheckpointState(flinkJobId, alias, messageId, publishTime);
+ }
+
+ @GetMapping("sync_operation_state")
+ public void syncOperationState(
+ @RequestParam("flink_job_id") Long flinkJobId,
+ @RequestParam("alias") String alias,
+ @RequestParam(value = "operation_time",
+ required = false) Long operationTime
+ ) {
+ logger.info(makeMarker(flinkJobId, alias), "Sync operation state: {} {} {}", flinkJobId, alias, operationTime);
+ syncStateService.saveSyncOperationState(flinkJobId, alias, operationTime);
+ }
+
+ @GetMapping("compaction_start")
+ public void compactionStart(
+ @RequestParam("flink_job_id") Long flinkJobId,
+ @RequestParam("alias") String alias,
+ @RequestParam(value = "type", required = false) String type,
+ @RequestParam(value = "cluster", required = false) String cluster,
+ @RequestParam(value = "application_id", required = false) String applicationId
+ ) {
+ logger.info(makeMarker(flinkJobId, alias), "Compaction start: {} {} {} {}", flinkJobId, alias, cluster, applicationId);
+ syncStateService.saveCompactionStartTime(flinkJobId, alias, cluster, applicationId, null);
+ }
+
+ @PostMapping("compaction_pre_commit")
+ public void compactionPreCommit(
+ @RequestParam("flink_job_id") Long flinkJobId,
+ @RequestParam("alias") String alias,
+ @RequestParam("instant") String instant,
+ @RequestParam(value = "cluster", required = false) String cluster,
+ @RequestParam(value = "application_id", required = false) String applicationId,
+ @RequestBody HoodieCommitMetadata metadata
+ ) {
+ logger.info(makeMarker(flinkJobId, alias), "Compaction state: {} {} {} {} {}", flinkJobId, alias, instant, cluster, applicationId);
+ syncStateService.saveCompactionMetrics(
+ flinkJobId,
+ alias,
+ "pre",
+ instant,
+ cluster,
+ applicationId
+ );
+ }
+
+ @PostMapping("compaction_commit")
+ public void compactionCommit(
+ @RequestParam("flink_job_id") Long flinkJobId,
+ @RequestParam("alias") String alias,
+ @RequestParam("instant") String instant,
+ @RequestParam(value = "cluster", required = false) String cluster,
+ @RequestParam(value = "application_id", required = false) String applicationId,
+ @RequestBody HoodieCommitMetadata metadata
+ ) {
+ logger.info(makeMarker(flinkJobId, alias), "Compaction state: {} {} {} {} {}", flinkJobId, alias, instant, cluster, applicationId);
+ syncStateService.saveCompactionFinishTime(flinkJobId, alias);
+ syncStateService.saveCompactionMetrics(
+ flinkJobId,
+ alias,
+ "complete",
+ instant,
+ cluster,
+ applicationId
+ );
+ }
+
+ @PostMapping("compaction_finish")
+ public void compactionFinish(
+ @RequestParam("flink_job_id") Long flinkJobId,
+ @RequestParam("alias") String alias,
+ @RequestParam("time") Long timestamp,
+ @RequestParam("state") Boolean success,
+ @RequestBody(required = false) String exception
+ ) {
+ logger.info(makeMarker(flinkJobId, alias), "Compaction finish: {} {} {} {}", flinkJobId, alias, timestamp, success);
+ syncStateService.saveCompactionFinishTime(flinkJobId, alias, timestamp, StrUtil.isBlank(exception) ? null : exception);
+ }
+
+ @GetMapping("version_update")
+ public void versionUpdate(
+ @RequestParam("flink_job_id") Long flinkJobId,
+ @RequestParam("alias") String alias,
+ @RequestParam("version") String version,
+ @RequestParam(value = "opts", defaultValue = "") String opTs
+ ) {
+ logger.info(makeMarker(flinkJobId, alias), "Version update: {} {} {}", flinkJobId, alias, version);
+ versionUpdateService.saveUpdateVersion(flinkJobId, alias, version, opTs);
+ }
+
+ @GetMapping("compaction_latest_operation_time")
+ public void compactionLatestOperationTime(
+ @RequestParam("flink_job_id") Long flinkJobId,
+ @RequestParam("alias") String alias,
+ @RequestParam("latest_op_ts") Long latestOpTs
+ ) {
+ logger.info(makeMarker(flinkJobId, alias), "Compaction latest operation time: {} {} {}", flinkJobId, alias, latestOpTs);
+ syncStateService.saveCompactionLatestOperationTime(flinkJobId, alias, latestOpTs);
+ }
+}
diff --git a/service-api/src/main/java/com/lanyuanxiaoyao/service/api/entity/HoodieCommitMetadata.java b/service-api/src/main/java/com/lanyuanxiaoyao/service/api/entity/HoodieCommitMetadata.java
new file mode 100644
index 0000000..95dbeef
--- /dev/null
+++ b/service-api/src/main/java/com/lanyuanxiaoyao/service/api/entity/HoodieCommitMetadata.java
@@ -0,0 +1,79 @@
+package com.lanyuanxiaoyao.service.api.entity;
+
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+
+/**
+ * Hoodie 提交信息
+ *
+ * @author ZhangJiacheng
+ * @date 2022-07-05
+ */
+@JsonIgnoreProperties(ignoreUnknown = true)
+public class HoodieCommitMetadata {
+ private Long totalScanTime;
+ private Long totalLogFilesCompacted;
+ private Long totalLogFilesSize;
+ private Long totalRecordsDeleted;
+ private Long totalCompactedRecordsUpdated;
+ private Long totalLogRecordsCompacted;
+
+ public Long getTotalScanTime() {
+ return totalScanTime;
+ }
+
+ public void setTotalScanTime(Long totalScanTime) {
+ this.totalScanTime = totalScanTime;
+ }
+
+ public Long getTotalLogFilesCompacted() {
+ return totalLogFilesCompacted;
+ }
+
+ public void setTotalLogFilesCompacted(Long totalLogFilesCompacted) {
+ this.totalLogFilesCompacted = totalLogFilesCompacted;
+ }
+
+ public Long getTotalLogFilesSize() {
+ return totalLogFilesSize;
+ }
+
+ public void setTotalLogFilesSize(Long totalLogFilesSize) {
+ this.totalLogFilesSize = totalLogFilesSize;
+ }
+
+ public Long getTotalRecordsDeleted() {
+ return totalRecordsDeleted;
+ }
+
+ public void setTotalRecordsDeleted(Long totalRecordsDeleted) {
+ this.totalRecordsDeleted = totalRecordsDeleted;
+ }
+
+ public Long getTotalCompactedRecordsUpdated() {
+ return totalCompactedRecordsUpdated;
+ }
+
+ public void setTotalCompactedRecordsUpdated(Long totalCompactedRecordsUpdated) {
+ this.totalCompactedRecordsUpdated = totalCompactedRecordsUpdated;
+ }
+
+ public Long getTotalLogRecordsCompacted() {
+ return totalLogRecordsCompacted;
+ }
+
+ public void setTotalLogRecordsCompacted(Long totalLogRecordsCompacted) {
+ this.totalLogRecordsCompacted = totalLogRecordsCompacted;
+ }
+
+ @Override
+ public String toString() {
+ return "HoodieCommitMetadata{" +
+ "totalScanTime=" + totalScanTime +
+ ", totalLogFilesCompacted=" + totalLogFilesCompacted +
+ ", totalLogFilesSize=" + totalLogFilesSize +
+ ", totalRecordsDeleted=" + totalRecordsDeleted +
+ ", totalCompactedRecordsUpdated=" + totalCompactedRecordsUpdated +
+ ", totalLogRecordsCompacted=" + totalLogRecordsCompacted +
+ '}';
+ }
+}
diff --git a/service-api/src/main/java/com/lanyuanxiaoyao/service/api/service/SyncStateService.java b/service-api/src/main/java/com/lanyuanxiaoyao/service/api/service/SyncStateService.java
new file mode 100644
index 0000000..ddd6a87
--- /dev/null
+++ b/service-api/src/main/java/com/lanyuanxiaoyao/service/api/service/SyncStateService.java
@@ -0,0 +1,229 @@
+package com.lanyuanxiaoyao.service.api.service;
+
+import club.kingon.sql.builder.SqlBuilder;
+import cn.hutool.core.date.DateUtil;
+import cn.hutool.core.util.ObjectUtil;
+import com.eshore.odcp.hudi.connector.Constants;
+import java.time.Instant;
+import java.util.Date;
+import java.util.List;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.jdbc.core.JdbcTemplate;
+import org.springframework.stereotype.Service;
+
+import static com.eshore.odcp.hudi.connector.SQLConstants.HudiCollectBuild.TbAppHudiCompactionMetrics;
+import static com.eshore.odcp.hudi.connector.SQLConstants.HudiCollectBuild.TbAppHudiSyncState;
+
+/**
+ * 服务
+ *
+ * @author ZhangJiacheng
+ * @date 2022-07-05
+ */
+@Service
+public class SyncStateService {
+ private static final Logger logger = LoggerFactory.getLogger(SyncStateService.class);
+
+ private final JdbcTemplate jdbcTemplate;
+
+ public SyncStateService(JdbcTemplate jdbcTemplate) {
+ this.jdbcTemplate = jdbcTemplate;
+ }
+
+ private String syncStateId(Long flinkJobId, String alias) {
+ return flinkJobId + "-" + alias;
+ }
+
+ private Date now() {
+ return Date.from(Instant.now());
+ }
+
+ public String getMessageId(Long flinkJobId, String alias) {
+ List ids = jdbcTemplate.queryForList(
+ SqlBuilder
+ .select(TbAppHudiSyncState.MESSAGE_ID_A)
+ .from(TbAppHudiSyncState._alias_)
+ .whereEq(TbAppHudiSyncState.ID_A, null)
+ .precompileSql(),
+ String.class,
+ syncStateId(flinkJobId, alias)
+ );
+ if (ObjectUtil.isNotEmpty(ids)) {
+ return ids.get(0);
+ }
+ return "";
+ }
+
+ public void saveSyncStartTime(Long flinkJobId, String alias, String cluster, String applicationId) {
+ jdbcTemplate.update(
+ SqlBuilder
+ .insertInto(
+ TbAppHudiSyncState._origin_,
+ TbAppHudiSyncState.ID_O,
+ TbAppHudiSyncState.SOURCE_START_TIME_O,
+ TbAppHudiSyncState.SOURCE_CLUSTER_O,
+ TbAppHudiSyncState.SOURCE_APPLICATION_ID_O
+ )
+ .values()
+ .addValue(null, null, null, null)
+ .onDuplicateKeyUpdateColumn(TbAppHudiSyncState.SOURCE_START_TIME_O)
+ .addUpdateColumn(TbAppHudiSyncState.SOURCE_CLUSTER_O)
+ .addUpdateColumn(TbAppHudiSyncState.SOURCE_APPLICATION_ID_O)
+ .precompileSql(),
+ syncStateId(flinkJobId, alias),
+ now(),
+ cluster,
+ applicationId
+ );
+ }
+
+ public void saveSyncCheckpointState(Long flinkJobId, String alias, String messageId, Long publishTime) {
+ Date publishDate = Date.from(Instant.ofEpochMilli(publishTime));
+ jdbcTemplate.update(
+ SqlBuilder
+ .insertInto(
+ TbAppHudiSyncState._origin_,
+ TbAppHudiSyncState.ID_O,
+ TbAppHudiSyncState.MESSAGE_ID_O,
+ TbAppHudiSyncState.SOURCE_PUBLISH_TIME_O,
+ TbAppHudiSyncState.SOURCE_CHECKPOINT_TIME_O
+ )
+ .values()
+ .addValue(null, null, null, null)
+ .onDuplicateKeyUpdateColumn(TbAppHudiSyncState.MESSAGE_ID_O)
+ .addUpdateColumn(TbAppHudiSyncState.SOURCE_PUBLISH_TIME_O)
+ .addUpdateColumn(TbAppHudiSyncState.SOURCE_CHECKPOINT_TIME_O)
+ .precompileSql(),
+ syncStateId(flinkJobId, alias),
+ messageId,
+ publishDate,
+ now()
+ );
+ }
+
+ public void saveSyncOperationState(Long flinkJobId, String alias, Long operationTime) {
+ Date operationDate = ObjectUtil.isNull(operationTime) ? null : Date.from(Instant.ofEpochMilli(operationTime));
+ jdbcTemplate.update(
+ SqlBuilder
+ .insertInto(
+ TbAppHudiSyncState._origin_,
+ TbAppHudiSyncState.ID_O,
+ TbAppHudiSyncState.SOURCE_OP_TIME_O
+ )
+ .values()
+ .addValue(null, null)
+ .onDuplicateKeyUpdateColumn(TbAppHudiSyncState.SOURCE_OP_TIME_O)
+ .precompileSql(),
+ syncStateId(flinkJobId, alias),
+ operationDate
+ );
+ }
+
+ public void saveCompactionStartTime(Long flinkJobId, String alias, String cluster, String applicationId, String message) {
+ jdbcTemplate.update(
+ SqlBuilder
+ .insertInto(
+ TbAppHudiSyncState._origin_,
+ TbAppHudiSyncState.ID_O,
+ TbAppHudiSyncState.COMPACTION_START_TIME_O,
+ TbAppHudiSyncState.COMPACTION_MESSAGE_O,
+ TbAppHudiSyncState.COMPACTION_STATUS_O,
+ TbAppHudiSyncState.COMPACTION_STATUS_TIME_O,
+ TbAppHudiSyncState.COMPACTION_CLUSTER_O,
+ TbAppHudiSyncState.COMPACTION_APPLICATION_ID_O
+ )
+ .values()
+ .addValue(null, null, null, null, null, null, null)
+ .onDuplicateKeyUpdateColumn(TbAppHudiSyncState.COMPACTION_START_TIME_O)
+ .addUpdateColumn(TbAppHudiSyncState.COMPACTION_MESSAGE_O)
+ .addUpdateColumn(TbAppHudiSyncState.COMPACTION_STATUS_O)
+ .addUpdateColumn(TbAppHudiSyncState.COMPACTION_STATUS_TIME_O)
+ .addUpdateColumn(TbAppHudiSyncState.COMPACTION_CLUSTER_O)
+ .addUpdateColumn(TbAppHudiSyncState.COMPACTION_APPLICATION_ID_O)
+ .precompileSql(),
+ syncStateId(flinkJobId, alias),
+ now(),
+ message,
+ Constants.COMPACTION_STATUS_START,
+ now(),
+ cluster,
+ applicationId
+ );
+ }
+
+ public void saveCompactionFinishTime(Long flinkJobId, String alias) {
+ saveCompactionFinishTime(flinkJobId, alias, Instant.now().toEpochMilli(), "");
+ }
+
+ public void saveCompactionFinishTime(Long flinkJobId, String alias, Long timestamp, String message) {
+ Date finishDate = Date.from(Instant.ofEpochMilli(timestamp));
+ jdbcTemplate.update(
+ SqlBuilder
+ .insertInto(
+ TbAppHudiSyncState._origin_,
+ TbAppHudiSyncState.ID_O,
+ TbAppHudiSyncState.COMPACTION_FINISH_TIME_O,
+ TbAppHudiSyncState.COMPACTION_MESSAGE_O,
+ TbAppHudiSyncState.COMPACTION_STATUS_O,
+ TbAppHudiSyncState.COMPACTION_STATUS_TIME_O
+ )
+ .values()
+ .addValue(null, null, null, null, null)
+ .onDuplicateKeyUpdateColumn(TbAppHudiSyncState.COMPACTION_FINISH_TIME_O)
+ .addUpdateColumn(TbAppHudiSyncState.COMPACTION_MESSAGE_O)
+ .addUpdateColumn(TbAppHudiSyncState.COMPACTION_STATUS_O)
+ .addUpdateColumn(TbAppHudiSyncState.COMPACTION_STATUS_TIME_O)
+ .precompileSql(),
+ syncStateId(flinkJobId, alias),
+ finishDate,
+ message,
+ Constants.COMPACTION_STATUS_FINISH,
+ now()
+ );
+ }
+
+ public void saveCompactionMetrics(Long flinkJobId, String alias, String type, String instant, String cluster, String applicationId) {
+ jdbcTemplate.update(
+ SqlBuilder
+ .insertInto(
+ TbAppHudiCompactionMetrics._origin_,
+ TbAppHudiCompactionMetrics.FLINK_JOB_ID_O,
+ TbAppHudiCompactionMetrics.ALIAS_O,
+ TbAppHudiCompactionMetrics.TYPE_O,
+ TbAppHudiCompactionMetrics.COMPACTION_PLAN_INSTANT_O,
+ TbAppHudiCompactionMetrics.CLUSTER_O,
+ TbAppHudiCompactionMetrics.APPLICATION_ID_O,
+ TbAppHudiCompactionMetrics.UPDATE_TIME_O
+ )
+ .values()
+ .addValue(null, null, null, null, null, null, null)
+ .precompileSql(),
+ flinkJobId,
+ alias,
+ type,
+ Date.from(Instant.ofEpochMilli(DateUtil.parse(instant).getTime())),
+ cluster,
+ applicationId,
+ now()
+ );
+ }
+
+ public void saveCompactionLatestOperationTime(Long flinkJobId, String alias, Long latestOperationTime) {
+ Date operationDate = ObjectUtil.isNull(latestOperationTime) ? null : Date.from(Instant.ofEpochMilli(latestOperationTime));
+ jdbcTemplate.update(
+ SqlBuilder
+ .insertInto(
+ TbAppHudiSyncState._origin_,
+ TbAppHudiSyncState.ID_O,
+ TbAppHudiSyncState.COMPACTION_LATEST_OP_TS_O
+ )
+ .values()
+ .addValue(null, null)
+ .onDuplicateKeyUpdateColumn(TbAppHudiSyncState.COMPACTION_LATEST_OP_TS_O)
+ .precompileSql(),
+ syncStateId(flinkJobId, alias),
+ operationDate
+ );
+ }
+}
diff --git a/service-api/src/main/java/com/lanyuanxiaoyao/service/api/service/VersionUpdateService.java b/service-api/src/main/java/com/lanyuanxiaoyao/service/api/service/VersionUpdateService.java
new file mode 100644
index 0000000..46fc4be
--- /dev/null
+++ b/service-api/src/main/java/com/lanyuanxiaoyao/service/api/service/VersionUpdateService.java
@@ -0,0 +1,54 @@
+package com.lanyuanxiaoyao.service.api.service;
+
+import club.kingon.sql.builder.SqlBuilder;
+import java.time.Instant;
+import java.util.Date;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.jdbc.core.JdbcTemplate;
+import org.springframework.stereotype.Service;
+
+import static com.eshore.odcp.hudi.connector.SQLConstants.HudiCollectBuild.TbAppCollectTableVersion;
+
+/**
+ * 跨天版本更新
+ *
+ * @author ZhangJiacheng
+ * @date 2022-11-17
+ */
+@Service
+public class VersionUpdateService {
+ private static final Logger logger = LoggerFactory.getLogger(VersionUpdateService.class);
+
+ private final JdbcTemplate jdbcTemplate;
+
+ public VersionUpdateService(JdbcTemplate jdbcTemplate) {
+ this.jdbcTemplate = jdbcTemplate;
+ }
+
+ private Date now() {
+ return Date.from(Instant.now());
+ }
+
+ public void saveUpdateVersion(Long flinkJobId, String alias, String version, String opTs) {
+ jdbcTemplate.update(
+ SqlBuilder
+ .insertInto(
+ TbAppCollectTableVersion._origin_,
+ TbAppCollectTableVersion.FLINK_JOB_ID_O,
+ TbAppCollectTableVersion.ALIAS_O,
+ TbAppCollectTableVersion.VERSION_O,
+ TbAppCollectTableVersion.OP_TS_O,
+ TbAppCollectTableVersion.CREATE_TIME_O
+ )
+ .values()
+ .addValue(null, null, null, null, null)
+ .precompileSql(),
+ flinkJobId,
+ alias,
+ version,
+ opTs,
+ now()
+ );
+ }
+}
diff --git a/service-api/src/main/resources/application.yml b/service-api/src/main/resources/application.yml
new file mode 100644
index 0000000..a5e754c
--- /dev/null
+++ b/service-api/src/main/resources/application.yml
@@ -0,0 +1,13 @@
+spring:
+ application:
+ name: service-api
+ profiles:
+ include: random-port,common,discovery,metrics
+ datasource:
+ url: ENC(tDeB9gYs1IHN90VV+KhNQAVEKSqeInEmFhgma7A0g6surB5pOyJC3lJx0QrvQo4zYz0WOhSgqjglHDSX7nh7k9ak3OQMgLYkHQGVawAZfcsGT/1m0csyjQzCxTCOZr5r)
+ username: ENC(29BKiU1cMKlA61gszFeYfWvoDVtLCCQAtfraxMq+f6Gm2LFu+67lkkBhoWgWIJga)
+ password: ENC(t+7GZM/tfqYeTlOugjcO6lUsHlacoVaomxLOeDpb6LhVB5+wbZHkKkW1xke8jNhQ)
+ driver-class-name: com.mysql.jdbc.Driver
+ druid:
+ initial-size: 5
+ max-active: 10
diff --git a/service-api/src/main/resources/logback-spring.xml b/service-api/src/main/resources/logback-spring.xml
new file mode 100644
index 0000000..f272f36
--- /dev/null
+++ b/service-api/src/main/resources/logback-spring.xml
@@ -0,0 +1,52 @@
+
+
+
+
+
+
+
+
+
+
+ true
+
+ ${LOKI_PUSH_URL:-http://localhost/loki/api/v1/push}
+
+
+
+
+ %d{yyyy-MM-dd HH:mm:ss.SSS} %p [${HOSTNAME}] [%t] %logger #@# %m%n%wEx
+
+ true
+
+
+
+
+
+ %d{yyyy-MM-dd HH:mm:ss.SSS} %clr(%5p) %clr([${HOSTNAME}]){yellow} %clr([%t]){magenta} %clr(%logger{40}){cyan} #@# %m%n%wEx
+
+
+
+
+ ${LOGGING_PARENT:-.}/${APP_NAME:-run}.log
+
+ ${LOGGING_PARENT:-.}/archive/${APP_NAME:-run}-%d{yyyy-MM-dd}.gz
+ 7
+
+
+ %d{yyyy-MM-dd HH:mm:ss.SSS} %p [${HOSTNAME}] [%t] %logger #@# %m%n%wEx
+
+
+
+
+
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/service-cli/service-cli-runner/src/main/java/com/lanyuanxiaoyao/service/cli/runner/DeployInformationProperties.java b/service-cli/service-cli-runner/src/main/java/com/lanyuanxiaoyao/service/cli/runner/DeployInformationProperties.java
index 6f4eff6..a71be2f 100644
--- a/service-cli/service-cli-runner/src/main/java/com/lanyuanxiaoyao/service/cli/runner/DeployInformationProperties.java
+++ b/service-cli/service-cli-runner/src/main/java/com/lanyuanxiaoyao/service/cli/runner/DeployInformationProperties.java
@@ -17,7 +17,7 @@ import org.springframework.stereotype.Component;
@Configuration
@ConfigurationProperties("deploy")
public class DeployInformationProperties {
- private Generate generate;
+ private Generate generate = new Generate();
private Boolean shuffler = false;
private RuntimeInfo runtime;
private Map hosts;
diff --git a/service-cli/service-cli-runner/src/main/resources/application-b12.yml b/service-cli/service-cli-runner/src/main/resources/application-b12.yml
index b38d250..c0d4cca 100644
--- a/service-cli/service-cli-runner/src/main/resources/application-b12.yml
+++ b/service-cli/service-cli-runner/src/main/resources/application-b12.yml
@@ -30,7 +30,7 @@ deploy:
sync-clusters: b12
compaction-clusters: b12,b1,b5,a4
services:
- api:
+ service-api:
replicas: 10
service-launcher-runner-b1:
replicas: 6
diff --git a/service-cli/service-cli-runner/src/main/resources/application.yml b/service-cli/service-cli-runner/src/main/resources/application.yml
index a6c934f..92c7f66 100644
--- a/service-cli/service-cli-runner/src/main/resources/application.yml
+++ b/service-cli/service-cli-runner/src/main/resources/application.yml
@@ -9,9 +9,9 @@ deploy:
arguments:
data_save_enable: true
data_save_location: ${deploy.runtime.data-path}
- api:
+ service-api:
order: 1
- source-jar: api-1.0.0-SNAPSHOT.jar
+ source-jar: service-api-1.0.0-SNAPSHOT.jar
replicas: 10
service-scheduler:
order: 3
diff --git a/service-gateway/src/main/java/com/lanyuanxiaoyao/service/gateway/GatewayApplication.java b/service-gateway/src/main/java/com/lanyuanxiaoyao/service/gateway/GatewayApplication.java
index 67c65d5..15af80b 100644
--- a/service-gateway/src/main/java/com/lanyuanxiaoyao/service/gateway/GatewayApplication.java
+++ b/service-gateway/src/main/java/com/lanyuanxiaoyao/service/gateway/GatewayApplication.java
@@ -45,7 +45,7 @@ public class GatewayApplication {
@Bean
public RouteLocator routeLocator(RouteLocatorBuilder builder) {
return builder.routes()
- .route("hudi", createRoute("/hudi_services/hudi_api", "lb://hudi-api"))
+ .route("hudi", createRoute("/hudi_services/hudi_api", "lb://service-api"))
.route("queue", createRoute("/hudi_services/queue", "lb://service-queue"))
.route("scheduler", createRoute("/hudi_services/service_scheduler", "lb://service-scheduler"))
.route("web", createRoute("/hudi_services/service_web", "lb://service-web"))