diff --git a/service-forest/src/main/java/com/lanyuanxiaoyao/service/forest/service/InfoService.java b/service-forest/src/main/java/com/lanyuanxiaoyao/service/forest/service/InfoService.java index fe13da8..fee5464 100644 --- a/service-forest/src/main/java/com/lanyuanxiaoyao/service/forest/service/InfoService.java +++ b/service-forest/src/main/java/com/lanyuanxiaoyao/service/forest/service/InfoService.java @@ -7,10 +7,7 @@ import com.eshore.odcp.hudi.connector.entity.FlinkJob; import com.eshore.odcp.hudi.connector.entity.SyncState; import com.eshore.odcp.hudi.connector.entity.TableMeta; import com.lanyuanxiaoyao.service.configuration.entity.PageResponse; -import com.lanyuanxiaoyao.service.configuration.entity.info.CompactionMetrics; -import com.lanyuanxiaoyao.service.configuration.entity.info.JobAndMetas; -import com.lanyuanxiaoyao.service.configuration.entity.info.JobIdAndAlias; -import com.lanyuanxiaoyao.service.configuration.entity.info.VersionUpdated; +import com.lanyuanxiaoyao.service.configuration.entity.info.*; import java.util.Map; import org.eclipse.collections.api.list.ImmutableList; @@ -20,98 +17,101 @@ import org.eclipse.collections.api.list.ImmutableList; * @author lanyuanxiaoyao * @date 2023-04-24 */ -@BaseRequest(baseURL = "http://service-info-query/info") +@BaseRequest(baseURL = "http://service-info-query") public interface InfoService { - @Get("/table_count") + @Get("/logs") + ImmutableList sqlLogs(); + + @Get("/info/table_count") Long tableCount(); - @Get("/table_focus_count") + @Get("/info/table_focus_count") Long tableFocusCount(); - @Get("/hudi_count") + @Get("/info/hudi_count") Long hudiCount(); - @Get("/hudi_focus_count") + @Get("/info/hudi_focus_count") Long hudiFocusCount(); - @Get("/hive_count") + @Get("/info/hive_count") Long hiveCount(); - @Get("/hive_focus_count") + @Get("/info/hive_focus_count") Long hiveFocusCount(); - @Get("/un_receive_version_normal_table") + @Get("/info/un_receive_version_normal_table") ImmutableList unReceiveVersionNormalTable(@Query("version") String version); - @Get("/un_receive_version_normal_table_count") + @Get("/info/un_receive_version_normal_table_count") Long unReceiveVersionNormalTableCount(@Query("version") String version); - @Get("/un_receive_version_focus_table") + @Get("/info/un_receive_version_focus_table") ImmutableList unReceiveVersionFocusTable(@Query("version") String version); - @Get("/un_receive_version_focus_table_count") + @Get("/info/un_receive_version_focus_table_count") Long unReceiveVersionFocusTableCount(@Query("version") String version); - @Get("/un_scheduled_normal_table") + @Get("/info/un_scheduled_normal_table") ImmutableList unScheduledNormalTable(@Query("version") String version); - @Get("/un_scheduled_normal_table_count") + @Get("/info/un_scheduled_normal_table_count") Long unScheduledNormalTableCount(@Query("version") String version); - @Get("/un_scheduled_focus_table") + @Get("/info/un_scheduled_focus_table") ImmutableList unScheduledFocusTable(@Query("version") String version); - @Get("/un_scheduled_focus_table_count") + @Get("/info/un_scheduled_focus_table_count") Long unScheduledFocusTableCount(@Query("version") String version); - @Get("/job_id_alias") + @Get("/info/job_id_alias") PageResponse jobIdAndAlias(@Query Map queryMap); - @Get("/version_tables") + @Get("/info/version_tables") PageResponse versionTables(@Query Map queryMap); - @Get("/compaction_metrics") + @Get("/info/compaction_metrics") PageResponse compactionMetrics(@Query Map queryMap); - @Get("/job_metas") + @Get("/info/job_metas") ImmutableList jobAndMetas(); - @Get("/flink_job/list") + @Get("/info/flink_job/list") ImmutableList flinkJobList(); - @Get("/flink_job/detail") + @Get("/info/flink_job/detail") FlinkJob flinkJobDetail(@Query("flink_job_id") Long flinkJobId); - @Get("/table_meta/list") + @Get("/info/table_meta/list") ImmutableList tableMetaList(); - @Get("/table_meta/list") + @Get("/info/table_meta/list") ImmutableList tableMetaList(@Query("flink_job_id") Long flinkJobId); - @Get("/table_meta/detail") + @Get("/info/table_meta/detail") TableMeta tableMetaDetail(@Query("flink_job_id") Long flinkJobId, @Query("alias") String alias); - @Get("/sync_state/detail") + @Get("/info/sync_state/detail") SyncState syncStateDetail(@Query("flink_job_id") Long flinkJobId, @Query("alias") String alias); - @Get("/non_updated_version_tables") + @Get("/info/non_updated_version_tables") ImmutableList nonUpdatedVersionTables(); - @Get("/updated_version_tables") + @Get("/info/updated_version_tables") ImmutableList updatedVersionTables(); - @Get("/exists_table") + @Get("/info/exists_table") Boolean existsTable(@Query("flink_job_id") Long flinkJobId, @Query("alias") String alias); - @Get("/non_exists_table") + @Get("/info/non_exists_table") Boolean nonExistsTable(@Query("flink_job_id") Long flinkJobId, @Query("alias") String alias); - @Get("/all_flink_job_id") + @Get("/info/all_flink_job_id") ImmutableList allFlinkJobId(); - @Get("/all_alias") + @Get("/info/all_alias") ImmutableList allAlias(); - @Get("/all_hdfs") + @Get("/info/all_hdfs") ImmutableList allHdfs(); } diff --git a/service-info-query/src/main/java/com/lanyuanxiaoyao/service/info/controller/InfoController.java b/service-info-query/src/main/java/com/lanyuanxiaoyao/service/info/controller/InfoController.java index 763611b..588acc5 100644 --- a/service-info-query/src/main/java/com/lanyuanxiaoyao/service/info/controller/InfoController.java +++ b/service-info-query/src/main/java/com/lanyuanxiaoyao/service/info/controller/InfoController.java @@ -45,6 +45,7 @@ public class InfoController { @RequestParam(value = "alias", required = false) String alias, @RequestParam(value = "order", required = false) String order, @RequestParam(value = "direction", required = false) String direction, + @RequestParam(value = "filter_hudi_table_type", required = false) List selectHudiTableType, @RequestParam(value = "filter_run_mode", required = false) List selectedRunMode, @RequestParam(value = "filter_compaction_status", required = false) List selectedCompactionStatus ) { @@ -55,6 +56,7 @@ public class InfoController { alias, order, direction, + Lists.immutable.ofAll(selectHudiTableType), Lists.immutable.ofAll(selectedRunMode), Lists.immutable.ofAll(selectedCompactionStatus) ); diff --git a/service-info-query/src/main/java/com/lanyuanxiaoyao/service/info/service/InfoService.java b/service-info-query/src/main/java/com/lanyuanxiaoyao/service/info/service/InfoService.java index bb4fa9b..59dac69 100644 --- a/service-info-query/src/main/java/com/lanyuanxiaoyao/service/info/service/InfoService.java +++ b/service-info-query/src/main/java/com/lanyuanxiaoyao/service/info/service/InfoService.java @@ -59,6 +59,33 @@ public class InfoService { return databaseService.getSyncState(flinkJobId, alias); } + private SqlBuilder generateJobIdAndAliasCriteria( + SelectSqlBuilder builder, + Integer page, + Integer count, + Long flinkJobId, + String alias, + String order, + String direction, + ImmutableList selectHudiTableType, + ImmutableList selectedRunMode, + ImmutableList selectedCompactionStatus, + boolean limited + ) { + int limit = Math.max(count, 1); + int offset = limit * Math.max(page - 1, 0); + return builder.from(TABLE_FLINK_JOB, TABLE_INFO, TABLE_SYNC_STATE) + .whereEq(TABLE_FLINK_JOB_ID, Column.as(TABLE_INFO_FLINK_JOB_ID)) + .andEq(TABLE_SYNC_STATE_ID, Column.as(StrUtil.format("concat({}, '-', {})", TABLE_FLINK_JOB_ID, TABLE_INFO_ALIAS))) + .andLike(ObjectUtil.isNotNull(flinkJobId), TABLE_FLINK_JOB_ID, flinkJobId) + .andLike(ObjectUtil.isNotNull(alias), TABLE_INFO_ALIAS, alias) + .andIn(ObjectUtil.isNotEmpty(selectHudiTableType), TABLE_INFO_TARGET_TABLE_TYPE, selectHudiTableType) + .andIn(ObjectUtil.isNotEmpty(selectedRunMode), TABLE_FLINK_JOB_RUN_MODE, selectedRunMode) + .andIn(ObjectUtil.isNotEmpty(selectedCompactionStatus), TABLE_SYNC_STATE_COMPACTION_STATE, selectedCompactionStatus) + .orderBy(StrUtil.isNotBlank(order) && StrUtil.isNotBlank(direction), () -> StrUtil.format("{} {}", order, direction)) + .limit(limited, offset, limit); + } + @Cacheable(value = "job-id-alias", sync = true) @Retryable(Throwable.class) public PageResponse findAllJobIdAndAlias( @@ -68,50 +95,40 @@ public class InfoService { String alias, String order, String direction, + ImmutableList selectHudiTableType, ImmutableList selectedRunMode, ImmutableList selectedCompactionStatus ) { return mysqlTransactionTemplate.execute(status -> { - int limit = Math.max(count, 1); - int offset = limit * Math.max(page - 1, 0); - String searchFlinkJobId = flinkJobId == null ? "" : "and job.id like '%" + flinkJobId + "%'\n"; - String searchAlias = alias == null ? "" : "and info.alias like '%" + alias + "%'\n"; - String filterRunMode = selectedRunMode == null || selectedRunMode.isEmpty() - ? "" - : "and job.run_mode in ('" + selectedRunMode.makeString("','") + "')\n"; - String filterCompactionStatus = selectedCompactionStatus == null || selectedCompactionStatus.isEmpty() - ? "" - : "and state.compaction_status in ('" + selectedCompactionStatus.makeString("','") + "')\n"; - String orderBy = ""; + String orderField = null; + String orderDirection = null; if (order != null && direction != null) { - String orderField = null; switch (order) { case "source_start_time": case "sourceStartTime": - orderField = "state.source_start_time"; + orderField = "tahss.source_start_time"; break; case "source_checkpoint_time": case "sourceCheckpointTime": - orderField = "state.source_checkpoint_time"; + orderField = "tahss.source_checkpoint_time"; break; case "source_publish_time": case "sourcePublishTime": - orderField = "state.source_publish_time"; + orderField = "tahss.source_publish_time"; break; case "source_operation_time": case "sourceOperationTime": - orderField = "state.source_op_time"; + orderField = "tahss.source_op_time"; break; case "compaction_start_time": case "compactionStartTime": - orderField = "state.compaction_start_time"; + orderField = "tahss.compaction_start_time"; break; case "compaction_finish_time": case "compactionFinishTime": - orderField = "state.compaction_finish_time"; + orderField = "tahss.compaction_finish_time"; break; } - String orderDirection = null; switch (direction) { case "desc": case "DESC": @@ -126,37 +143,37 @@ public class InfoService { orderDirection = "asc"; break; } - if (ObjectUtil.isNotNull(orderField) && ObjectUtil.isNotNull(direction)) { - orderBy = StrUtil.format("order by {} {}\n", orderField, orderDirection); - } } Long total = mysqlJdbcTemplate.queryForObject( - "select count(*)\n" + - "from " + DATABASE_NAME + ".tb_app_flink_job_config job,\n" + - " " + DATABASE_NAME + ".tb_app_collect_table_info info,\n" + - " " + DATABASE_NAME + ".tb_app_hudi_sync_state state\n" + - "where job.id = info.flink_job_id\n" + - "and state.id = concat(job.id, '-', info.alias)\n" + - searchFlinkJobId + - searchAlias + - filterRunMode + - filterCompactionStatus + - orderBy, + generateJobIdAndAliasCriteria( + SqlBuilder.select(COUNT), + page, + count, + flinkJobId, + alias, + orderField, + orderDirection, + selectHudiTableType, + selectedRunMode, + selectedCompactionStatus, + false + ).build(), Long.class ); List list = mysqlJdbcTemplate.query( - "select job.id, info.alias\n" + - "from " + DATABASE_NAME + ".tb_app_flink_job_config job,\n" + - " " + DATABASE_NAME + ".tb_app_collect_table_info info,\n" + - " " + DATABASE_NAME + ".tb_app_hudi_sync_state state\n" + - "where job.id = info.flink_job_id\n" + - "and state.id = concat(job.id, '-', info.alias)\n" + - searchFlinkJobId + - searchAlias + - filterRunMode + - filterCompactionStatus + - orderBy + - "limit " + limit + " offset " + offset, + generateJobIdAndAliasCriteria( + SqlBuilder.select(TABLE_FLINK_JOB_ID, TABLE_INFO_ALIAS), + page, + count, + flinkJobId, + alias, + orderField, + orderDirection, + selectHudiTableType, + selectedRunMode, + selectedCompactionStatus, + true + ).build(), (rs, row) -> new JobIdAndAlias(rs.getLong(1), rs.getString(2)) ); return new PageResponse<>(list, total); @@ -243,9 +260,11 @@ public class InfoService { private static final String TABLE_INFO_PRIORITY = column(TABLE_INFO, "priority"); private static final String TABLE_INFO_STATUS = column(TABLE_INFO, "status"); private static final String TABLE_INFO_TARGET_HDFS = column(TABLE_INFO, "tgt_hdfs_path"); + private static final String TABLE_INFO_TARGET_TABLE_TYPE = column(TABLE_INFO, "tgt_table_type"); private static final Alias TABLE_SYNC_STATE = Alias.of(StrUtil.format("{}.tb_app_hudi_sync_state", DATABASE_NAME), "tahss"); private static final String TABLE_SYNC_STATE_ID = column(TABLE_SYNC_STATE, "id"); + private static final String TABLE_SYNC_STATE_COMPACTION_STATE = column(TABLE_SYNC_STATE, "compaction_status"); private SqlBuilder generateVersionTableCriteria( SelectSqlBuilder builder, @@ -730,6 +749,7 @@ public class InfoService { private static final Alias TABLE_FLINK_JOB = Alias.of(StrUtil.format("{}.tb_app_flink_job_config", DATABASE_NAME), "tafjc"); private static final String TABLE_FLINK_JOB_ID = column(TABLE_FLINK_JOB, "id"); private static final String TABLE_FLINK_JOB_STATUS = column(TABLE_FLINK_JOB, "status"); + private static final String TABLE_FLINK_JOB_RUN_MODE = column(TABLE_FLINK_JOB, "run_mode"); @Cacheable(value = "exists-table", sync = true) @Retryable(Throwable.class) diff --git a/service-info-query/src/test/java/SqlBuilderTests.java b/service-info-query/src/test/java/SqlBuilderTests.java index aae65e4..605d1bc 100644 --- a/service-info-query/src/test/java/SqlBuilderTests.java +++ b/service-info-query/src/test/java/SqlBuilderTests.java @@ -1,8 +1,12 @@ +import club.kingon.sql.builder.SelectSqlBuilder; import club.kingon.sql.builder.SqlBuilder; import club.kingon.sql.builder.entry.Alias; import club.kingon.sql.builder.entry.Column; +import cn.hutool.core.util.ObjectUtil; import cn.hutool.core.util.StrUtil; import cn.hutool.db.sql.SqlFormatter; +import org.eclipse.collections.api.factory.Lists; +import org.eclipse.collections.api.list.ImmutableList; import static com.eshore.odcp.hudi.connector.Constants.DATABASE_NAME; @@ -24,9 +28,11 @@ public class SqlBuilderTests { private static final String TABLE_INFO_ALIAS = column(TABLE_INFO, "alias"); private static final String TABLE_INFO_PRIORITY = column(TABLE_INFO, "priority"); private static final String TABLE_INFO_STATUS = column(TABLE_INFO, "status"); + private static final String TABLE_INFO_TARGET_TABLE_TYPE = column(TABLE_INFO, "tgt_table_type"); private static final Alias TABLE_SYNC_STATE = Alias.of(StrUtil.format("{}.tb_app_hudi_sync_state", DATABASE_NAME), "tahss"); private static final String TABLE_SYNC_STATE_ID = column(TABLE_SYNC_STATE, "id"); + private static final String TABLE_SYNC_STATE_COMPACTION_STATE = column(TABLE_SYNC_STATE, "compaction_status"); private static String column(Alias table, String column) { return StrUtil.format("{}.{}", table.getAlias(), column); @@ -40,15 +46,62 @@ public class SqlBuilderTests { private static final Alias TABLE_FLINK_JOB = Alias.of(StrUtil.format("{}.tb_app_flink_job_config", DATABASE_NAME), "tafjc"); private static final String TABLE_FLINK_JOB_ID = column(TABLE_FLINK_JOB, "id"); private static final String TABLE_FLINK_JOB_STATUS = column(TABLE_FLINK_JOB, "status"); + private static final String TABLE_FLINK_JOB_RUN_MODE = column(TABLE_FLINK_JOB, "run_mode"); + + private static SqlBuilder generateJobIdAndAliasCriteria( + SelectSqlBuilder builder, + Integer page, + Integer count, + Long flinkJobId, + String alias, + String order, + String direction, + ImmutableList selectHudiTableType, + ImmutableList selectedRunMode, + ImmutableList selectedCompactionStatus, + boolean limited + ) { + int limit = Math.max(count, 1); + int offset = limit * Math.max(page - 1, 0); + return builder.from(TABLE_FLINK_JOB, TABLE_INFO, TABLE_SYNC_STATE) + .whereEq(TABLE_FLINK_JOB_ID, Column.as(TABLE_INFO_FLINK_JOB_ID)) + .andEq(TABLE_SYNC_STATE_ID, Column.as(StrUtil.format("concat({}, '-', {})", TABLE_FLINK_JOB_ID, TABLE_INFO_ALIAS))) + .andLike(ObjectUtil.isNotNull(flinkJobId), TABLE_FLINK_JOB_ID, flinkJobId) + .andLike(ObjectUtil.isNotNull(alias), TABLE_INFO_ALIAS, alias) + .andIn(ObjectUtil.isNotEmpty(selectHudiTableType), TABLE_INFO_TARGET_TABLE_TYPE, selectHudiTableType) + .andIn(ObjectUtil.isNotEmpty(selectedRunMode), TABLE_FLINK_JOB_RUN_MODE, selectedRunMode) + .andIn(ObjectUtil.isNotEmpty(selectedCompactionStatus), TABLE_SYNC_STATE_COMPACTION_STATE, selectedCompactionStatus) + .orderBy(StrUtil.isNotBlank(order) && StrUtil.isNotBlank(direction), () -> StrUtil.format("{} {}", order, direction)) + .limit(limited, offset, limit); + } public static void main(String[] args) { System.out.println(SqlFormatter.format( - SqlBuilder.select(StrUtil.format("distinct {}", TABLE_INFO_ALIAS)) - .from(TABLE_INFO, TABLE_FLINK_JOB) - .whereEq(TABLE_INFO_FLINK_JOB_ID, Column.as(TABLE_FLINK_JOB_ID)) - .andEq(TABLE_FLINK_JOB_STATUS, "y") - .andEq(TABLE_INFO_STATUS, "y") - .build() + /*SqlBuilder.select(StrUtil.format("distinct {}", TABLE_INFO_ALIAS)) + .from(TABLE_FLINK_JOB, TABLE_INFO, TABLE_SYNC_STATE) + .whereEq(TABLE_FLINK_JOB_ID, Column.as(TABLE_INFO_FLINK_JOB_ID)) + .andEq(TABLE_SYNC_STATE_ID, StrUtil.format("concat({}, '-', {})", TABLE_FLINK_JOB_ID, TABLE_INFO_ALIAS)) + .andLike(ObjectUtil.isNotNull(flinkJobId), TABLE_FLINK_JOB_ID, flinkJobId) + .andLike(ObjectUtil.isNotNull(alias), TABLE_INFO_ALIAS, alias) + .andIn(ObjectUtil.isNotEmpty(selectHudiTableType), TABLE_INFO_TARGET_TABLE_TYPE, selectHudiTableType) + .andIn(ObjectUtil.isNotEmpty(selectedRunMode), TABLE_FLINK_JOB_RUN_MODE, selectedRunMode) + .andIn(ObjectUtil.isNotEmpty(selectedCompactionStatus), TABLE_SYNC_STATE_COMPACTION_STATE, selectedCompactionStatus) + .orderBy(StrUtil.isNotBlank(order) && StrUtil.isNotBlank(direction), () -> StrUtil.format("{} {}", order, direction)) + .limit(limited, offset, limit) + .build()*/ + generateJobIdAndAliasCriteria( + SqlBuilder.selectAll(), + 1, + 10, + 1000L, + "hello", + "dog", + "asc", + Lists.immutable.of("MOR"), + Lists.immutable.of("ONE_IN_ONE", "ALL_IN_ONE"), + Lists.immutable.of("COMPLETE"), + true + ).build() )); } } diff --git a/service-web/src/main/java/com/lanyuanxiaoyao/service/web/controller/TableController.java b/service-web/src/main/java/com/lanyuanxiaoyao/service/web/controller/TableController.java index c0433af..707f4ef 100644 --- a/service-web/src/main/java/com/lanyuanxiaoyao/service/web/controller/TableController.java +++ b/service-web/src/main/java/com/lanyuanxiaoyao/service/web/controller/TableController.java @@ -27,6 +27,7 @@ import com.lanyuanxiaoyao.service.web.entity.TableVO; import java.time.Duration; import java.util.List; import java.util.Objects; +import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import org.eclipse.collections.api.factory.Lists; @@ -83,10 +84,14 @@ public class TableController extends BaseController { @RequestParam(value = "direction", required = false) String direction, @RequestParam(value = "search_flink_job_id", required = false) String searchFlinkJobId, @RequestParam(value = "search_alias", required = false) String searchAlias, + @RequestParam(value = "filter_hudi_table_type", required = false) List hudiTableType, @RequestParam(value = "filter_run_mode", required = false) List runMode, @RequestParam(value = "filter_compaction_status", required = false) List compactionStatus ) { MutableMap queryMap = buildQueryMap(page, count, order, direction, searchFlinkJobId, searchAlias); + if (ObjectUtil.isNotEmpty(hudiTableType)) { + queryMap.put("filter_hudi_table_type", hudiTableType); + } if (ObjectUtil.isNotEmpty(runMode)) { queryMap.put("filter_run_mode", runMode); } @@ -144,7 +149,7 @@ public class TableController extends BaseController { return new TableVO( flinkJobFuture.get(), tableMetaFuture.get(), - new SyncStateVO(syncStateFuture.get()), + Optional.of(syncStateFuture.get()).map(SyncStateVO::new).orElse(null), ObjectUtil.isNotNull(syncRunMeta), syncRunMeta, ObjectUtil.isNotNull(compactionRunMeta), diff --git a/web/components/table-tab.js b/web/components/table-tab.js index 8d6bc3f..1b3e300 100644 --- a/web/components/table-tab.js +++ b/web/components/table-tab.js @@ -19,6 +19,7 @@ function tableTab() { direction: '${orderDir|default:undefined}', search_flink_job_id: '${flinkJobId|default:undefined}', search_alias: '${alias|default:undefined}', + filter_hudi_table_type: '${tableMeta\\.hudi\\.targetTableType|default:undefined}', filter_run_mode: '${flinkJob\\.runMode|default:undefined}', filter_compaction_status: '${syncState\\.compactionStatus|default:undefined}' } diff --git a/web/components/tool-tab.js b/web/components/tool-tab.js index 0203fea..16d892c 100644 --- a/web/components/tool-tab.js +++ b/web/components/tool-tab.js @@ -153,6 +153,38 @@ function toolTab() { } ] }, + { + type: 'panel', + title: 'SQL日志', + body: [ + { + type: 'action', + label: '查看', + actionType: 'dialog', + dialog: { + title: '日志', + ...readOnlyDialogOptions(), + size: 'lg', + body: { + type: 'crud', + api: '${base}/log/query_sql_log', + ...crudCommonOptions(), + loadDataOnce: true, + perPage: 50, + headerToolbar: [ + "reload", + paginationCommonOptions(undefined, 10), + ], + footerToolbar: [ + paginationCommonOptions(undefined, 10), + ], + columns: [ + ], + } + } + } + ] + } ] } } \ No newline at end of file