diff --git a/service-info-query/src/main/java/com/lanyuanxiaoyao/service/info/controller/InfoController.java b/service-info-query/src/main/java/com/lanyuanxiaoyao/service/info/controller/InfoController.java index 5d61806..129ac97 100644 --- a/service-info-query/src/main/java/com/lanyuanxiaoyao/service/info/controller/InfoController.java +++ b/service-info-query/src/main/java/com/lanyuanxiaoyao/service/info/controller/InfoController.java @@ -122,7 +122,7 @@ public class InfoController { @GetMapping("/updated_version_tables") public ImmutableList updatedVersionTables() { - return infoService.nonUpdatedVersionTables(); + return infoService.updatedVersionTables(); } @GetMapping("/un_receive_version_normal_table") diff --git a/service-info-query/src/main/java/com/lanyuanxiaoyao/service/info/service/InfoService.java b/service-info-query/src/main/java/com/lanyuanxiaoyao/service/info/service/InfoService.java index b5a5a38..1d45216 100644 --- a/service-info-query/src/main/java/com/lanyuanxiaoyao/service/info/service/InfoService.java +++ b/service-info-query/src/main/java/com/lanyuanxiaoyao/service/info/service/InfoService.java @@ -199,16 +199,30 @@ public class InfoService { return databaseService.getTableMeta(flinkJobId, alias); } + private static String generateVersionTableIdCriteria(Boolean scheduled) { + return SqlBuilder.select(StrUtil.format("concat({}, '-', {})", TABLE_VERSION_FLINK_JOB_ID, TABLE_VERSION_ALIAS)) + .from(TABLE_VERSION) + .whereEq(TABLE_VERSION_SCHEDULED, scheduled) + .andEq(TABLE_VERSION_VERSION, Column.as("date_format(subdate(current_date(), 1), '%Y%m%d')")) + .build(); + } + @Cacheable("un-updated-version-table") @Retryable(Throwable.class) public ImmutableList nonUpdatedVersionTables() { - return databaseService.findAllUnScheduledTable(); + return mysqlTransactionTemplate.execute(status -> { + List ids = mysqlJdbcTemplate.queryForList(generateVersionTableIdCriteria(false), String.class); + return Lists.immutable.ofAll(ids); + }); } @Cacheable("updated-version-table") @Retryable(Throwable.class) public ImmutableList updatedVersionTables() { - return databaseService.findAllScheduledTable(); + return mysqlTransactionTemplate.execute(status -> { + List ids = mysqlJdbcTemplate.queryForList(generateVersionTableIdCriteria(true), String.class); + return Lists.immutable.ofAll(ids); + }); } private static String column(Alias table, String column) { diff --git a/service-info-query/src/test/java/SqlBuilderTests.java b/service-info-query/src/test/java/SqlBuilderTests.java index 579f880..ed0df77 100644 --- a/service-info-query/src/test/java/SqlBuilderTests.java +++ b/service-info-query/src/test/java/SqlBuilderTests.java @@ -1,11 +1,8 @@ -import club.kingon.sql.builder.SelectSqlBuilder; import club.kingon.sql.builder.SqlBuilder; import club.kingon.sql.builder.entry.Alias; import club.kingon.sql.builder.entry.Column; import cn.hutool.core.util.StrUtil; import cn.hutool.db.sql.SqlFormatter; -import org.eclipse.collections.api.factory.Lists; -import org.eclipse.collections.api.list.ImmutableList; import static com.eshore.odcp.hudi.connector.Constants.DATABASE_NAME; @@ -40,70 +37,13 @@ public class SqlBuilderTests { private static final String TABLE_COMPACTION_METRICS_FLINK_JOB_ID = column(TABLE_COMPACTION_METRICS, "flink_job_id"); private static final String TABLE_COMPACTION_METRICS_ALIAS = column(TABLE_COMPACTION_METRICS, "alias"); - private static SqlBuilder generateCompactionMetricsCriteria( - SelectSqlBuilder builder, - Integer page, - Integer count, - Long flinkJobId, - String alias, - String order, - String direction, - ImmutableList filterCompletes, - boolean limited - ) { - int limit = Math.max(count, 1); - int offset = limit * Math.max(page - 1, 0); - Alias m1 = Alias.of( - SqlBuilder.selectAll() - .from(TABLE_COMPACTION_METRICS) - .whereEq(TABLE_COMPACTION_METRICS_TYPE, "pre") - .andEq(TABLE_COMPACTION_METRICS_FLINK_JOB_ID, flinkJobId) - .andEq(TABLE_COMPACTION_METRICS_ALIAS, alias), - "m1" - ); - Alias m2 = Alias.of( - SqlBuilder.selectAll() - .from(TABLE_COMPACTION_METRICS) - .whereEq(TABLE_COMPACTION_METRICS_TYPE, "complete") - .andEq(TABLE_COMPACTION_METRICS_FLINK_JOB_ID, flinkJobId) - .andEq(TABLE_COMPACTION_METRICS_ALIAS, alias), - "m2" - ); - return builder - .from(m1) - .leftJoin(m2) - .onEq(column(m1, "flink_job_id"), Column.as(column(m2, "flink_job_id"))) - .andEq(column(m1, "alias"), Column.as(column(m2, "alias"))) - .andEq(column(m1, "application_id"), Column.as(column(m2, "application_id"))) - .andEq(column(m1, "compaction_plan_instant"), Column.as(column(m2, "compaction_plan_instant"))) - .whereNotNull(filterCompletes.anySatisfy(b -> b), column(m2, "type")) - .orNull(filterCompletes.anySatisfy(b -> !b), column(m2, "type")) - .orderBy(StrUtil.isNotBlank(order) && StrUtil.isNotBlank(direction), StrUtil.format("m1.{} {}", order, direction)) - .limit(limited, offset, count); - } - public static void main(String[] args) { System.out.println(SqlFormatter.format( - generateCompactionMetricsCriteria( - SqlBuilder.select( - "m1.flink_job_id", - "m1.alias", - "m1.application_id", - "m1.cluster", - "m1.compaction_plan_instant", - "m2.type is not null as is_complete", - "m1.update_time as started_time", - "m2.update_time as finished_time" - ), - 1, - 10, - 1542097983881048064L, - "conf_center_balance_type", - null, - null, - Lists.immutable.of(false), - true - ).build() + SqlBuilder.select(StrUtil.format("concat({}, '-', {})", TABLE_VERSION_FLINK_JOB_ID, TABLE_VERSION_ALIAS)) + .from(TABLE_VERSION) + .whereEq(TABLE_VERSION_SCHEDULED, false) + .andEq(TABLE_VERSION_VERSION, Column.as("date_format(subdate(current_date(), 1), '%Y%m%d')")) + .build() )); } } diff --git a/test/test.http b/test/test.http index 5b99920..9de2e2f 100644 --- a/test/test.http +++ b/test/test.http @@ -45,4 +45,4 @@ GET http://{{username}}:{{password}}@132.122.116.146:18166/info/compaction_metri GET http://{{username}}:{{password}}@132.122.116.150:27510/table/list_compaction_metrics?search_flink_job_id=1542097996099055616&search_alias=acct_acct_item_fs ### Query Scheduler -GET {{scheduler-url}}/schedule/schedule_jobs +GET {{scheduler-url}}/schedule/all_un_scheduled