fix(info-query): 修复查询已跨天表接口调用未跨天接口
This commit is contained in:
@@ -122,7 +122,7 @@ public class InfoController {
|
|||||||
|
|
||||||
@GetMapping("/updated_version_tables")
|
@GetMapping("/updated_version_tables")
|
||||||
public ImmutableList<String> updatedVersionTables() {
|
public ImmutableList<String> updatedVersionTables() {
|
||||||
return infoService.nonUpdatedVersionTables();
|
return infoService.updatedVersionTables();
|
||||||
}
|
}
|
||||||
|
|
||||||
@GetMapping("/un_receive_version_normal_table")
|
@GetMapping("/un_receive_version_normal_table")
|
||||||
|
|||||||
@@ -199,16 +199,30 @@ public class InfoService {
|
|||||||
return databaseService.getTableMeta(flinkJobId, alias);
|
return databaseService.getTableMeta(flinkJobId, alias);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private static String generateVersionTableIdCriteria(Boolean scheduled) {
|
||||||
|
return SqlBuilder.select(StrUtil.format("concat({}, '-', {})", TABLE_VERSION_FLINK_JOB_ID, TABLE_VERSION_ALIAS))
|
||||||
|
.from(TABLE_VERSION)
|
||||||
|
.whereEq(TABLE_VERSION_SCHEDULED, scheduled)
|
||||||
|
.andEq(TABLE_VERSION_VERSION, Column.as("date_format(subdate(current_date(), 1), '%Y%m%d')"))
|
||||||
|
.build();
|
||||||
|
}
|
||||||
|
|
||||||
@Cacheable("un-updated-version-table")
|
@Cacheable("un-updated-version-table")
|
||||||
@Retryable(Throwable.class)
|
@Retryable(Throwable.class)
|
||||||
public ImmutableList<String> nonUpdatedVersionTables() {
|
public ImmutableList<String> nonUpdatedVersionTables() {
|
||||||
return databaseService.findAllUnScheduledTable();
|
return mysqlTransactionTemplate.execute(status -> {
|
||||||
|
List<String> ids = mysqlJdbcTemplate.queryForList(generateVersionTableIdCriteria(false), String.class);
|
||||||
|
return Lists.immutable.ofAll(ids);
|
||||||
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
@Cacheable("updated-version-table")
|
@Cacheable("updated-version-table")
|
||||||
@Retryable(Throwable.class)
|
@Retryable(Throwable.class)
|
||||||
public ImmutableList<String> updatedVersionTables() {
|
public ImmutableList<String> updatedVersionTables() {
|
||||||
return databaseService.findAllScheduledTable();
|
return mysqlTransactionTemplate.execute(status -> {
|
||||||
|
List<String> ids = mysqlJdbcTemplate.queryForList(generateVersionTableIdCriteria(true), String.class);
|
||||||
|
return Lists.immutable.ofAll(ids);
|
||||||
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
private static String column(Alias table, String column) {
|
private static String column(Alias table, String column) {
|
||||||
|
|||||||
@@ -1,11 +1,8 @@
|
|||||||
import club.kingon.sql.builder.SelectSqlBuilder;
|
|
||||||
import club.kingon.sql.builder.SqlBuilder;
|
import club.kingon.sql.builder.SqlBuilder;
|
||||||
import club.kingon.sql.builder.entry.Alias;
|
import club.kingon.sql.builder.entry.Alias;
|
||||||
import club.kingon.sql.builder.entry.Column;
|
import club.kingon.sql.builder.entry.Column;
|
||||||
import cn.hutool.core.util.StrUtil;
|
import cn.hutool.core.util.StrUtil;
|
||||||
import cn.hutool.db.sql.SqlFormatter;
|
import cn.hutool.db.sql.SqlFormatter;
|
||||||
import org.eclipse.collections.api.factory.Lists;
|
|
||||||
import org.eclipse.collections.api.list.ImmutableList;
|
|
||||||
|
|
||||||
import static com.eshore.odcp.hudi.connector.Constants.DATABASE_NAME;
|
import static com.eshore.odcp.hudi.connector.Constants.DATABASE_NAME;
|
||||||
|
|
||||||
@@ -40,70 +37,13 @@ public class SqlBuilderTests {
|
|||||||
private static final String TABLE_COMPACTION_METRICS_FLINK_JOB_ID = column(TABLE_COMPACTION_METRICS, "flink_job_id");
|
private static final String TABLE_COMPACTION_METRICS_FLINK_JOB_ID = column(TABLE_COMPACTION_METRICS, "flink_job_id");
|
||||||
private static final String TABLE_COMPACTION_METRICS_ALIAS = column(TABLE_COMPACTION_METRICS, "alias");
|
private static final String TABLE_COMPACTION_METRICS_ALIAS = column(TABLE_COMPACTION_METRICS, "alias");
|
||||||
|
|
||||||
private static SqlBuilder generateCompactionMetricsCriteria(
|
|
||||||
SelectSqlBuilder builder,
|
|
||||||
Integer page,
|
|
||||||
Integer count,
|
|
||||||
Long flinkJobId,
|
|
||||||
String alias,
|
|
||||||
String order,
|
|
||||||
String direction,
|
|
||||||
ImmutableList<Boolean> filterCompletes,
|
|
||||||
boolean limited
|
|
||||||
) {
|
|
||||||
int limit = Math.max(count, 1);
|
|
||||||
int offset = limit * Math.max(page - 1, 0);
|
|
||||||
Alias m1 = Alias.of(
|
|
||||||
SqlBuilder.selectAll()
|
|
||||||
.from(TABLE_COMPACTION_METRICS)
|
|
||||||
.whereEq(TABLE_COMPACTION_METRICS_TYPE, "pre")
|
|
||||||
.andEq(TABLE_COMPACTION_METRICS_FLINK_JOB_ID, flinkJobId)
|
|
||||||
.andEq(TABLE_COMPACTION_METRICS_ALIAS, alias),
|
|
||||||
"m1"
|
|
||||||
);
|
|
||||||
Alias m2 = Alias.of(
|
|
||||||
SqlBuilder.selectAll()
|
|
||||||
.from(TABLE_COMPACTION_METRICS)
|
|
||||||
.whereEq(TABLE_COMPACTION_METRICS_TYPE, "complete")
|
|
||||||
.andEq(TABLE_COMPACTION_METRICS_FLINK_JOB_ID, flinkJobId)
|
|
||||||
.andEq(TABLE_COMPACTION_METRICS_ALIAS, alias),
|
|
||||||
"m2"
|
|
||||||
);
|
|
||||||
return builder
|
|
||||||
.from(m1)
|
|
||||||
.leftJoin(m2)
|
|
||||||
.onEq(column(m1, "flink_job_id"), Column.as(column(m2, "flink_job_id")))
|
|
||||||
.andEq(column(m1, "alias"), Column.as(column(m2, "alias")))
|
|
||||||
.andEq(column(m1, "application_id"), Column.as(column(m2, "application_id")))
|
|
||||||
.andEq(column(m1, "compaction_plan_instant"), Column.as(column(m2, "compaction_plan_instant")))
|
|
||||||
.whereNotNull(filterCompletes.anySatisfy(b -> b), column(m2, "type"))
|
|
||||||
.orNull(filterCompletes.anySatisfy(b -> !b), column(m2, "type"))
|
|
||||||
.orderBy(StrUtil.isNotBlank(order) && StrUtil.isNotBlank(direction), StrUtil.format("m1.{} {}", order, direction))
|
|
||||||
.limit(limited, offset, count);
|
|
||||||
}
|
|
||||||
|
|
||||||
public static void main(String[] args) {
|
public static void main(String[] args) {
|
||||||
System.out.println(SqlFormatter.format(
|
System.out.println(SqlFormatter.format(
|
||||||
generateCompactionMetricsCriteria(
|
SqlBuilder.select(StrUtil.format("concat({}, '-', {})", TABLE_VERSION_FLINK_JOB_ID, TABLE_VERSION_ALIAS))
|
||||||
SqlBuilder.select(
|
.from(TABLE_VERSION)
|
||||||
"m1.flink_job_id",
|
.whereEq(TABLE_VERSION_SCHEDULED, false)
|
||||||
"m1.alias",
|
.andEq(TABLE_VERSION_VERSION, Column.as("date_format(subdate(current_date(), 1), '%Y%m%d')"))
|
||||||
"m1.application_id",
|
.build()
|
||||||
"m1.cluster",
|
|
||||||
"m1.compaction_plan_instant",
|
|
||||||
"m2.type is not null as is_complete",
|
|
||||||
"m1.update_time as started_time",
|
|
||||||
"m2.update_time as finished_time"
|
|
||||||
),
|
|
||||||
1,
|
|
||||||
10,
|
|
||||||
1542097983881048064L,
|
|
||||||
"conf_center_balance_type",
|
|
||||||
null,
|
|
||||||
null,
|
|
||||||
Lists.immutable.of(false),
|
|
||||||
true
|
|
||||||
).build()
|
|
||||||
));
|
));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -45,4 +45,4 @@ GET http://{{username}}:{{password}}@132.122.116.146:18166/info/compaction_metri
|
|||||||
GET http://{{username}}:{{password}}@132.122.116.150:27510/table/list_compaction_metrics?search_flink_job_id=1542097996099055616&search_alias=acct_acct_item_fs
|
GET http://{{username}}:{{password}}@132.122.116.150:27510/table/list_compaction_metrics?search_flink_job_id=1542097996099055616&search_alias=acct_acct_item_fs
|
||||||
|
|
||||||
### Query Scheduler
|
### Query Scheduler
|
||||||
GET {{scheduler-url}}/schedule/schedule_jobs
|
GET {{scheduler-url}}/schedule/all_un_scheduled
|
||||||
|
|||||||
Reference in New Issue
Block a user