From 1ed1b8105a9735b248c1aed5906880a2f53b2364 Mon Sep 17 00:00:00 2001 From: lanyuanxiaoyao Date: Wed, 28 Jun 2023 14:56:01 +0800 Subject: [PATCH] =?UTF-8?q?fix(info-query):=20=E4=BF=AE=E5=A4=8D=E6=9F=A5?= =?UTF-8?q?=E8=AF=A2=E5=B7=B2=E8=B7=A8=E5=A4=A9=E8=A1=A8=E6=8E=A5=E5=8F=A3?= =?UTF-8?q?=E8=B0=83=E7=94=A8=E6=9C=AA=E8=B7=A8=E5=A4=A9=E6=8E=A5=E5=8F=A3?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../info/controller/InfoController.java | 2 +- .../service/info/service/InfoService.java | 18 ++++- .../src/test/java/SqlBuilderTests.java | 70 ++----------------- test/test.http | 2 +- 4 files changed, 23 insertions(+), 69 deletions(-) diff --git a/service-info-query/src/main/java/com/lanyuanxiaoyao/service/info/controller/InfoController.java b/service-info-query/src/main/java/com/lanyuanxiaoyao/service/info/controller/InfoController.java index 5d61806..129ac97 100644 --- a/service-info-query/src/main/java/com/lanyuanxiaoyao/service/info/controller/InfoController.java +++ b/service-info-query/src/main/java/com/lanyuanxiaoyao/service/info/controller/InfoController.java @@ -122,7 +122,7 @@ public class InfoController { @GetMapping("/updated_version_tables") public ImmutableList updatedVersionTables() { - return infoService.nonUpdatedVersionTables(); + return infoService.updatedVersionTables(); } @GetMapping("/un_receive_version_normal_table") diff --git a/service-info-query/src/main/java/com/lanyuanxiaoyao/service/info/service/InfoService.java b/service-info-query/src/main/java/com/lanyuanxiaoyao/service/info/service/InfoService.java index b5a5a38..1d45216 100644 --- a/service-info-query/src/main/java/com/lanyuanxiaoyao/service/info/service/InfoService.java +++ b/service-info-query/src/main/java/com/lanyuanxiaoyao/service/info/service/InfoService.java @@ -199,16 +199,30 @@ public class InfoService { return databaseService.getTableMeta(flinkJobId, alias); } + private static String generateVersionTableIdCriteria(Boolean scheduled) { + return SqlBuilder.select(StrUtil.format("concat({}, '-', {})", TABLE_VERSION_FLINK_JOB_ID, TABLE_VERSION_ALIAS)) + .from(TABLE_VERSION) + .whereEq(TABLE_VERSION_SCHEDULED, scheduled) + .andEq(TABLE_VERSION_VERSION, Column.as("date_format(subdate(current_date(), 1), '%Y%m%d')")) + .build(); + } + @Cacheable("un-updated-version-table") @Retryable(Throwable.class) public ImmutableList nonUpdatedVersionTables() { - return databaseService.findAllUnScheduledTable(); + return mysqlTransactionTemplate.execute(status -> { + List ids = mysqlJdbcTemplate.queryForList(generateVersionTableIdCriteria(false), String.class); + return Lists.immutable.ofAll(ids); + }); } @Cacheable("updated-version-table") @Retryable(Throwable.class) public ImmutableList updatedVersionTables() { - return databaseService.findAllScheduledTable(); + return mysqlTransactionTemplate.execute(status -> { + List ids = mysqlJdbcTemplate.queryForList(generateVersionTableIdCriteria(true), String.class); + return Lists.immutable.ofAll(ids); + }); } private static String column(Alias table, String column) { diff --git a/service-info-query/src/test/java/SqlBuilderTests.java b/service-info-query/src/test/java/SqlBuilderTests.java index 579f880..ed0df77 100644 --- a/service-info-query/src/test/java/SqlBuilderTests.java +++ b/service-info-query/src/test/java/SqlBuilderTests.java @@ -1,11 +1,8 @@ -import club.kingon.sql.builder.SelectSqlBuilder; import club.kingon.sql.builder.SqlBuilder; import club.kingon.sql.builder.entry.Alias; import club.kingon.sql.builder.entry.Column; import cn.hutool.core.util.StrUtil; import cn.hutool.db.sql.SqlFormatter; -import org.eclipse.collections.api.factory.Lists; -import org.eclipse.collections.api.list.ImmutableList; import static com.eshore.odcp.hudi.connector.Constants.DATABASE_NAME; @@ -40,70 +37,13 @@ public class SqlBuilderTests { private static final String TABLE_COMPACTION_METRICS_FLINK_JOB_ID = column(TABLE_COMPACTION_METRICS, "flink_job_id"); private static final String TABLE_COMPACTION_METRICS_ALIAS = column(TABLE_COMPACTION_METRICS, "alias"); - private static SqlBuilder generateCompactionMetricsCriteria( - SelectSqlBuilder builder, - Integer page, - Integer count, - Long flinkJobId, - String alias, - String order, - String direction, - ImmutableList filterCompletes, - boolean limited - ) { - int limit = Math.max(count, 1); - int offset = limit * Math.max(page - 1, 0); - Alias m1 = Alias.of( - SqlBuilder.selectAll() - .from(TABLE_COMPACTION_METRICS) - .whereEq(TABLE_COMPACTION_METRICS_TYPE, "pre") - .andEq(TABLE_COMPACTION_METRICS_FLINK_JOB_ID, flinkJobId) - .andEq(TABLE_COMPACTION_METRICS_ALIAS, alias), - "m1" - ); - Alias m2 = Alias.of( - SqlBuilder.selectAll() - .from(TABLE_COMPACTION_METRICS) - .whereEq(TABLE_COMPACTION_METRICS_TYPE, "complete") - .andEq(TABLE_COMPACTION_METRICS_FLINK_JOB_ID, flinkJobId) - .andEq(TABLE_COMPACTION_METRICS_ALIAS, alias), - "m2" - ); - return builder - .from(m1) - .leftJoin(m2) - .onEq(column(m1, "flink_job_id"), Column.as(column(m2, "flink_job_id"))) - .andEq(column(m1, "alias"), Column.as(column(m2, "alias"))) - .andEq(column(m1, "application_id"), Column.as(column(m2, "application_id"))) - .andEq(column(m1, "compaction_plan_instant"), Column.as(column(m2, "compaction_plan_instant"))) - .whereNotNull(filterCompletes.anySatisfy(b -> b), column(m2, "type")) - .orNull(filterCompletes.anySatisfy(b -> !b), column(m2, "type")) - .orderBy(StrUtil.isNotBlank(order) && StrUtil.isNotBlank(direction), StrUtil.format("m1.{} {}", order, direction)) - .limit(limited, offset, count); - } - public static void main(String[] args) { System.out.println(SqlFormatter.format( - generateCompactionMetricsCriteria( - SqlBuilder.select( - "m1.flink_job_id", - "m1.alias", - "m1.application_id", - "m1.cluster", - "m1.compaction_plan_instant", - "m2.type is not null as is_complete", - "m1.update_time as started_time", - "m2.update_time as finished_time" - ), - 1, - 10, - 1542097983881048064L, - "conf_center_balance_type", - null, - null, - Lists.immutable.of(false), - true - ).build() + SqlBuilder.select(StrUtil.format("concat({}, '-', {})", TABLE_VERSION_FLINK_JOB_ID, TABLE_VERSION_ALIAS)) + .from(TABLE_VERSION) + .whereEq(TABLE_VERSION_SCHEDULED, false) + .andEq(TABLE_VERSION_VERSION, Column.as("date_format(subdate(current_date(), 1), '%Y%m%d')")) + .build() )); } } diff --git a/test/test.http b/test/test.http index 5b99920..9de2e2f 100644 --- a/test/test.http +++ b/test/test.http @@ -45,4 +45,4 @@ GET http://{{username}}:{{password}}@132.122.116.146:18166/info/compaction_metri GET http://{{username}}:{{password}}@132.122.116.150:27510/table/list_compaction_metrics?search_flink_job_id=1542097996099055616&search_alias=acct_acct_item_fs ### Query Scheduler -GET {{scheduler-url}}/schedule/schedule_jobs +GET {{scheduler-url}}/schedule/all_un_scheduled