diff --git a/service-forest/src/main/java/com/lanyuanxiaoyao/service/forest/service/InfoService.java b/service-forest/src/main/java/com/lanyuanxiaoyao/service/forest/service/InfoService.java index 31b767c..1f9b7a5 100644 --- a/service-forest/src/main/java/com/lanyuanxiaoyao/service/forest/service/InfoService.java +++ b/service-forest/src/main/java/com/lanyuanxiaoyao/service/forest/service/InfoService.java @@ -184,4 +184,10 @@ public interface InfoService { @Get("/info/simple_table_metas") ImmutableList simpleTableMetas(@Query("flink_job_id") Long flinkJobId, @Query("alias") String alias); + + @Get("/info/clean_compaction_metrics") + Integer cleanCompactionMetrics(); + + @Get("/info/clean_table_version") + Integer cleanTableVersion(); } diff --git a/service-info-query/src/main/java/com/lanyuanxiaoyao/service/info/controller/CleanController.java b/service-info-query/src/main/java/com/lanyuanxiaoyao/service/info/controller/CleanController.java new file mode 100644 index 0000000..c3cdca3 --- /dev/null +++ b/service-info-query/src/main/java/com/lanyuanxiaoyao/service/info/controller/CleanController.java @@ -0,0 +1,37 @@ +package com.lanyuanxiaoyao.service.info.controller; + +import com.lanyuanxiaoyao.service.info.service.CleanService; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.web.bind.annotation.GetMapping; +import org.springframework.web.bind.annotation.RequestMapping; +import org.springframework.web.bind.annotation.RestController; + +/** + * Version + * + * @author lanyuanxiaoyao + * @date 2024-01-03 + */ +@SuppressWarnings("SpringJavaInjectionPointsAutowiringInspection") +@RestController +@RequestMapping("/info") +public class CleanController { + private static final Logger logger = LoggerFactory.getLogger(CleanController.class); + + private final CleanService cleanService; + + public CleanController(CleanService cleanService) { + this.cleanService = cleanService; + } + + @GetMapping("/clean_compaction_metrics") + public Integer cleanCompactionMetrics() { + return cleanService.cleanCompactionMetrics(); + } + + @GetMapping("/clean_table_version") + public Integer cleanTableVersion() { + return cleanService.cleanTableVersion(); + } +} diff --git a/service-info-query/src/main/java/com/lanyuanxiaoyao/service/info/service/CleanService.java b/service-info-query/src/main/java/com/lanyuanxiaoyao/service/info/service/CleanService.java new file mode 100644 index 0000000..aa0c362 --- /dev/null +++ b/service-info-query/src/main/java/com/lanyuanxiaoyao/service/info/service/CleanService.java @@ -0,0 +1,43 @@ +package com.lanyuanxiaoyao.service.info.service; + +import club.kingon.sql.builder.SqlBuilder; +import club.kingon.sql.builder.entry.Column; +import com.lanyuanxiaoyao.service.common.SQLConstants.HudiCollectBuild.TbAppCollectTableVersion; +import com.lanyuanxiaoyao.service.common.SQLConstants.HudiCollectBuild.TbAppHudiCompactionMetrics; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.jdbc.core.JdbcTemplate; + +/** + * 清理数据库 + * + * @author lanyuanxiaoyao + * @date 2024-05-23 + */ +public class CleanService extends BaseService { + private static final Logger logger = LoggerFactory.getLogger(CleanService.class); + private static final Column before7Days = Column.as("DATE_SUB(CURDATE(), INTERVAL 7 DAY)"); + private static final Column before14Days = Column.as("DATE_SUB(CURDATE(), INTERVAL 14 DAY)"); + + private final JdbcTemplate mysqlJdbcTemplate; + + public CleanService(JdbcTemplate mysqlJdbcTemplate) { + this.mysqlJdbcTemplate = mysqlJdbcTemplate; + } + + public Integer cleanCompactionMetrics() { + return mysqlJdbcTemplate.update( + SqlBuilder.delete(TbAppHudiCompactionMetrics._origin_) + .whereLe(TbAppHudiCompactionMetrics.UPDATE_TIME_O, before7Days) + .build() + ); + } + + public Integer cleanTableVersion() { + return mysqlJdbcTemplate.update( + SqlBuilder.delete(TbAppCollectTableVersion._origin_) + .whereLe(TbAppCollectTableVersion.UPDATE_TIME_O, before14Days) + .build() + ); + } +} diff --git a/service-info-query/src/test/java/com/test/SqlBuilderTests.java b/service-info-query/src/test/java/com/test/SqlBuilderTests.java index 26a14ea..5b0d81f 100644 --- a/service-info-query/src/test/java/com/test/SqlBuilderTests.java +++ b/service-info-query/src/test/java/com/test/SqlBuilderTests.java @@ -69,23 +69,9 @@ public class SqlBuilderTests { public static void main(String[] args) { System.out.println(SqlUtil.formatSql( - SqlBuilder - .insertInto( - TbAppHudiSyncState._origin_, - TbAppHudiSyncState.ID_O, - TbAppHudiSyncState.COMPACTION_LATEST_OP_TS_O - ) - .values() - .addValue(null, null) - .onDuplicateKeyUpdateSetter( - StrUtil.format( - "{} = if({} is null, ?, greatest({}, ?))", - TbAppHudiSyncState.COMPACTION_LATEST_OP_TS_O, - TbAppHudiSyncState.COMPACTION_LATEST_OP_TS_O, - TbAppHudiSyncState.COMPACTION_LATEST_OP_TS_O - ) - ) - .precompileSql() + SqlBuilder.delete(SQLConstants.HudiCollectBuild.TbAppCollectTableVersion._origin_) + .whereLe(SQLConstants.HudiCollectBuild.TbAppCollectTableVersion.UPDATE_TIME_O, Column.as("DATE_SUB(CURDATE(), INTERVAL 14 DAY)")) + .build() )); } } diff --git a/service-monitor/src/main/java/com/lanyuanxiaoyao/service/monitor/clean/Cleaner.java b/service-monitor/src/main/java/com/lanyuanxiaoyao/service/monitor/clean/Cleaner.java new file mode 100644 index 0000000..e891c26 --- /dev/null +++ b/service-monitor/src/main/java/com/lanyuanxiaoyao/service/monitor/clean/Cleaner.java @@ -0,0 +1,9 @@ +package com.lanyuanxiaoyao.service.monitor.clean; + +/** + * @author lanyuanxiaoyao + * @date 2024-05-23 + */ +public abstract class Cleaner { + abstract void clean(); +} diff --git a/service-monitor/src/main/java/com/lanyuanxiaoyao/service/monitor/clean/CompactionMetricsCleaner.java b/service-monitor/src/main/java/com/lanyuanxiaoyao/service/monitor/clean/CompactionMetricsCleaner.java new file mode 100644 index 0000000..95e1bd5 --- /dev/null +++ b/service-monitor/src/main/java/com/lanyuanxiaoyao/service/monitor/clean/CompactionMetricsCleaner.java @@ -0,0 +1,33 @@ +package com.lanyuanxiaoyao.service.monitor.clean; + +import com.lanyuanxiaoyao.service.forest.service.InfoService; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.scheduling.annotation.Scheduled; +import org.springframework.stereotype.Service; + +import static com.lanyuanxiaoyao.service.common.Constants.MINUTE; + +/** + * 清理compaction metrics表 + * + * @author lanyuanxiaoyao + * @date 2024-05-23 + */ +@SuppressWarnings("SpringJavaInjectionPointsAutowiringInspection") +@Service +public class CompactionMetricsCleaner extends Cleaner { + private static final Logger logger = LoggerFactory.getLogger(CompactionMetricsCleaner.class); + + private final InfoService infoService; + + public CompactionMetricsCleaner(InfoService infoService) { + this.infoService = infoService; + } + + @Scheduled(cron = "0 0 6 * * ?") + @Override + void clean() { + logger.info("Clean compaction metrics: {}", infoService.cleanCompactionMetrics()); + } +} diff --git a/service-monitor/src/main/java/com/lanyuanxiaoyao/service/monitor/clean/TableVersionCleaner.java b/service-monitor/src/main/java/com/lanyuanxiaoyao/service/monitor/clean/TableVersionCleaner.java new file mode 100644 index 0000000..212d8f5 --- /dev/null +++ b/service-monitor/src/main/java/com/lanyuanxiaoyao/service/monitor/clean/TableVersionCleaner.java @@ -0,0 +1,31 @@ +package com.lanyuanxiaoyao.service.monitor.clean; + +import com.lanyuanxiaoyao.service.forest.service.InfoService; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.scheduling.annotation.Scheduled; +import org.springframework.stereotype.Service; + +/** + * 清理table version表 + * + * @author lanyuanxiaoyao + * @date 2024-05-23 + */ +@SuppressWarnings("SpringJavaInjectionPointsAutowiringInspection") +@Service +public class TableVersionCleaner extends Cleaner { + private static final Logger logger = LoggerFactory.getLogger(TableVersionCleaner.class); + + private final InfoService infoService; + + public TableVersionCleaner(InfoService infoService) { + this.infoService = infoService; + } + + @Scheduled(cron = "0 0 7 * * ?") + @Override + void clean() { + logger.info("Clean table version: {}", infoService.cleanTableVersion()); + } +}