diff --git a/service-forest/src/main/java/com/lanyuanxiaoyao/service/forest/service/InfoService.java b/service-forest/src/main/java/com/lanyuanxiaoyao/service/forest/service/InfoService.java index 009853f..31b767c 100644 --- a/service-forest/src/main/java/com/lanyuanxiaoyao/service/forest/service/InfoService.java +++ b/service-forest/src/main/java/com/lanyuanxiaoyao/service/forest/service/InfoService.java @@ -107,6 +107,9 @@ public interface InfoService { @Get("/info/sync_state/save") void saveSyncState(@Query("id") String id); + @Get("/info/sync_state/save_pulsar_back_log") + void savePulsarBacklog(@Query("flink_job_id") Long flinkJobId, @Query("alias") String alias, @Query("back_log") Long backlog); + @Get("/info/non_updated_version_tables") ImmutableList nonUpdatedVersionTables(); diff --git a/service-info-query/src/main/java/com/lanyuanxiaoyao/service/info/controller/SyncStateController.java b/service-info-query/src/main/java/com/lanyuanxiaoyao/service/info/controller/SyncStateController.java index 3da53e6..dc60f32 100644 --- a/service-info-query/src/main/java/com/lanyuanxiaoyao/service/info/controller/SyncStateController.java +++ b/service-info-query/src/main/java/com/lanyuanxiaoyao/service/info/controller/SyncStateController.java @@ -44,4 +44,13 @@ public class SyncStateController { public void saveCompactionId(@RequestParam("id") String id) { syncStateService.saveSyncState(id); } + + @GetMapping("/sync_state/save_pulsar_back_log") + public void savePulsarBacklog( + @RequestParam("flink_job_id") Long flinkJobId, + @RequestParam("alias") String alias, + @RequestParam("back_log") Long backlog + ) { + syncStateService.savePulsarBacklog(flinkJobId, alias, backlog); + } } diff --git a/service-info-query/src/main/java/com/lanyuanxiaoyao/service/info/service/SyncStateService.java b/service-info-query/src/main/java/com/lanyuanxiaoyao/service/info/service/SyncStateService.java index 3036590..81b1720 100644 --- a/service-info-query/src/main/java/com/lanyuanxiaoyao/service/info/service/SyncStateService.java +++ b/service-info-query/src/main/java/com/lanyuanxiaoyao/service/info/service/SyncStateService.java @@ -17,7 +17,9 @@ import org.springframework.jdbc.core.JdbcTemplate; import org.springframework.retry.annotation.Retryable; import org.springframework.stereotype.Service; -import static com.lanyuanxiaoyao.service.common.SQLConstants.HudiCollectBuild.*; +import static com.lanyuanxiaoyao.service.common.SQLConstants.HudiCollectBuild.TbAppCollectTableInfo; +import static com.lanyuanxiaoyao.service.common.SQLConstants.HudiCollectBuild.TbAppFlinkJobConfig; +import static com.lanyuanxiaoyao.service.common.SQLConstants.HudiCollectBuild.TbAppHudiSyncState; /** * Sync State @@ -117,4 +119,21 @@ public class SyncStateService extends BaseService { "-1:-1:-1" ); } + + public void savePulsarBacklog(Long flinkJobId, String alias, Long backlog) { + mysqlJdbcTemplate.update( + SqlBuilder + .insertInto( + TbAppHudiSyncState._origin_, + TbAppHudiSyncState.ID_O, + TbAppHudiSyncState.PULSAR_BACK_LOG_O + ) + .values() + .addValue(Q, Q) + .onDuplicateKeyUpdateColumn(TbAppHudiSyncState.PULSAR_BACK_LOG_O) + .precompileSql(), + StrUtil.format("{}-{}", flinkJobId, alias), + backlog + ); + } } diff --git a/service-info-query/src/test/java/com/test/SqlBuilderTests.java b/service-info-query/src/test/java/com/test/SqlBuilderTests.java index 97033d3..b991ac8 100644 --- a/service-info-query/src/test/java/com/test/SqlBuilderTests.java +++ b/service-info-query/src/test/java/com/test/SqlBuilderTests.java @@ -9,7 +9,12 @@ import cn.hutool.core.util.StrUtil; import cn.hutool.db.sql.SqlUtil; import com.lanyuanxiaoyao.service.common.SQLConstants; -import static com.lanyuanxiaoyao.service.common.SQLConstants.HudiCollectBuild.*; +import static com.lanyuanxiaoyao.service.common.SQLConstants.HudiCollectBuild.TbAppCollectTableInfo; +import static com.lanyuanxiaoyao.service.common.SQLConstants.HudiCollectBuild.TbAppFlinkJobConfig; +import static com.lanyuanxiaoyao.service.common.SQLConstants.HudiCollectBuild.TbAppGlobalConfig; +import static com.lanyuanxiaoyao.service.common.SQLConstants.HudiCollectBuild.TbAppHudiJobConfig; +import static com.lanyuanxiaoyao.service.common.SQLConstants.HudiCollectBuild.TbAppHudiSyncState; +import static com.lanyuanxiaoyao.service.common.SQLConstants.HudiCollectBuild.TbAppYarnJobConfig; /** * @author lanyuanxiaoyao @@ -64,17 +69,10 @@ public class SqlBuilderTests { public static void main(String[] args) { System.out.println(SqlUtil.formatSql( - generateTableMetaList( - SqlBuilder.select( - TbAppFlinkJobConfig.ID_A, - TbAppCollectTableInfo.ALIAS_A, - TbAppCollectTableInfo.TGT_HDFS_PATH_A, - TbAppCollectTableInfo.SRC_PULSAR_ADDR_A, - TbAppCollectTableInfo.SRC_TOPIC_A - ), - null, - null - ).build() + SqlBuilder.update(TbAppHudiSyncState._origin_) + .set(TbAppHudiSyncState.PULSAR_BACK_LOG_O, "?") + .whereEq(TbAppHudiSyncState.ID_O, Column.as("?")) + .build() )); } } diff --git a/service-monitor/src/main/java/com/lanyuanxiaoyao/service/monitor/metric/PulsarMetrics.java b/service-monitor/src/main/java/com/lanyuanxiaoyao/service/monitor/metric/PulsarMetrics.java index fdc277c..482a6eb 100644 --- a/service-monitor/src/main/java/com/lanyuanxiaoyao/service/monitor/metric/PulsarMetrics.java +++ b/service-monitor/src/main/java/com/lanyuanxiaoyao/service/monitor/metric/PulsarMetrics.java @@ -17,6 +17,7 @@ import org.slf4j.LoggerFactory; import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Service; +import static com.lanyuanxiaoyao.service.common.Constants.HALF_HOUR; import static com.lanyuanxiaoyao.service.common.Constants.MINUTE; /** @@ -46,7 +47,7 @@ public class PulsarMetrics extends Metrics { backlogMap = Maps.mutable.empty(); } - @Scheduled(fixedDelay = MINUTE, initialDelay = MINUTE) + @Scheduled(fixedDelay = HALF_HOUR, initialDelay = MINUTE) @Override void update() { infoService.tableMetaList() @@ -71,6 +72,7 @@ public class PulsarMetrics extends Metrics { if (StrUtil.isNotBlank(name)) { Long backlog = pulsarService.backlog(name, meta.getTopic(), NameHelper.pulsarSubscriptionName(meta.getJob().getId(), meta.getAlias(), hudiServiceProperties.getSignature())); backlogCache.set(backlog); + infoService.savePulsarBacklog(meta.getJob().getId(), meta.getAlias(), backlog); } } catch (Exception ignored) { }