feature(info-query): 优化SQL logger的埋点,增加说明

This commit is contained in:
2023-07-14 10:27:06 +08:00
parent ab50b3254b
commit 01500d2b5d
8 changed files with 293 additions and 231 deletions

View File

@@ -1,36 +0,0 @@
package com.lanyuanxiaoyao.service.info.configuration;
import org.aspectj.lang.ProceedingJoinPoint;
import org.aspectj.lang.annotation.Around;
import org.aspectj.lang.annotation.Aspect;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.annotation.EnableAspectJAutoProxy;
import org.springframework.stereotype.Component;
/**
* SQL记录
*
* @author lanyuanxiaoyao
* @date 2023-07-11
*/
@Aspect
@Component
@EnableAspectJAutoProxy
public class SQLLoggerAdvice {
private static final Logger logger = LoggerFactory.getLogger(SQLLoggerAdvice.class);
private final SQLLoggerProvider.SQLLogger sqlLogger;
public SQLLoggerAdvice(SQLLoggerProvider.SQLLogger sqlLogger) {
this.sqlLogger = sqlLogger;
}
@Around("execution(* org.springframework.jdbc.core.JdbcTemplate.query(String, org.springframework.jdbc.core.ResultSetExtractor))")
public Object around(ProceedingJoinPoint joinPoint) throws Throwable {
Object[] args = joinPoint.getArgs();
if (args != null && args.length > 0 && args[0] instanceof String) {
sqlLogger.log((String) args[0]);
}
return joinPoint.proceed();
}
}

View File

@@ -1,7 +1,6 @@
package com.lanyuanxiaoyao.service.info.configuration;
import com.lanyuanxiaoyao.service.configuration.entity.info.SQLLine;
import java.time.Instant;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import org.eclipse.collections.api.factory.Lists;
@@ -17,6 +16,11 @@ import org.springframework.context.annotation.Configuration;
*/
@Configuration
public class SQLLoggerProvider {
@Bean
public SQLLogger sqlLogger() {
return new SQLLogger(200);
}
public static final class SQLLogger {
private final int size;
private final Queue<SQLLine> container = new ConcurrentLinkedQueue<>();
@@ -26,19 +30,22 @@ public class SQLLoggerProvider {
}
public void log(String sql) {
log(new SQLLine(sql));
}
public void log(String sql, String comment) {
log(new SQLLine(sql, comment));
}
public void log(SQLLine line) {
if (container.size() >= size) {
container.poll();
}
container.add(new SQLLine(sql));
container.add(line);
}
public ImmutableList<SQLLine> getLogs() {
return Lists.immutable.ofAll(container);
}
}
@Bean
public SQLLogger sqlLogger() {
return new SQLLogger(200);
}
}

View File

@@ -17,6 +17,7 @@ import com.lanyuanxiaoyao.service.configuration.entity.info.CompactionMetrics;
import com.lanyuanxiaoyao.service.configuration.entity.info.JobAndMetas;
import com.lanyuanxiaoyao.service.configuration.entity.info.JobIdAndAlias;
import com.lanyuanxiaoyao.service.configuration.entity.info.VersionUpdated;
import com.lanyuanxiaoyao.service.info.configuration.SQLLoggerProvider;
import java.sql.Timestamp;
import java.util.List;
import org.eclipse.collections.api.factory.Lists;
@@ -42,15 +43,52 @@ import static com.eshore.odcp.hudi.connector.Constants.DATABASE_NAME;
@Service
public class InfoService {
private static final Logger logger = LoggerFactory.getLogger(InfoService.class);
private static final String COUNT = "count(*)";
private static final Alias TABLE_VERSION = Alias.of(StrUtil.format("{}.tb_app_collect_table_version", DATABASE_NAME), "tactv");
private static final String TABLE_VERSION_FLINK_JOB_ID = column(TABLE_VERSION, "flink_job_id");
private static final String TABLE_VERSION_ALIAS = column(TABLE_VERSION, "alias");
private static final String TABLE_VERSION_VERSION = column(TABLE_VERSION, "version");
private static final String TABLE_VERSION_SCHEDULED = column(TABLE_VERSION, "scheduled");
private static final Alias TABLE_INFO = Alias.of(StrUtil.format("{}.tb_app_collect_table_info", DATABASE_NAME), "tacti");
private static final String TABLE_INFO_FLINK_JOB_ID = column(TABLE_INFO, "flink_job_id");
private static final String TABLE_INFO_ALIAS = column(TABLE_INFO, "alias");
private static final String TABLE_INFO_PRIORITY = column(TABLE_INFO, "priority");
private static final String TABLE_INFO_STATUS = column(TABLE_INFO, "status");
private static final String TABLE_INFO_TARGET_HDFS = column(TABLE_INFO, "tgt_hdfs_path");
private static final String TABLE_INFO_TARGET_TABLE_TYPE = column(TABLE_INFO, "tgt_table_type");
private static final Alias TABLE_SYNC_STATE = Alias.of(StrUtil.format("{}.tb_app_hudi_sync_state", DATABASE_NAME), "tahss");
private static final String TABLE_SYNC_STATE_ID = column(TABLE_SYNC_STATE, "id");
private static final String TABLE_SYNC_STATE_COMPACTION_STATE = column(TABLE_SYNC_STATE, "compaction_status");
private static final Alias TABLE_COMPACTION_METRICS = Alias.of(StrUtil.format("{}.tb_app_hudi_compaction_metrics", DATABASE_NAME), "tahcm");
private static final String TABLE_COMPACTION_METRICS_TYPE = column(TABLE_COMPACTION_METRICS, "type");
private static final String TABLE_COMPACTION_METRICS_FLINK_JOB_ID = column(TABLE_COMPACTION_METRICS, "flink_job_id");
private static final String TABLE_COMPACTION_METRICS_ALIAS = column(TABLE_COMPACTION_METRICS, "alias");
private static final Alias TABLE_FLINK_JOB = Alias.of(StrUtil.format("{}.tb_app_flink_job_config", DATABASE_NAME), "tafjc");
private static final String TABLE_FLINK_JOB_ID = column(TABLE_FLINK_JOB, "id");
private static final String TABLE_FLINK_JOB_STATUS = column(TABLE_FLINK_JOB, "status");
private static final String TABLE_FLINK_JOB_RUN_MODE = column(TABLE_FLINK_JOB, "run_mode");
private final DatabaseService databaseService;
private final JdbcTemplate mysqlJdbcTemplate;
private final TransactionTemplate mysqlTransactionTemplate;
private final SQLLoggerProvider.SQLLogger sqlLogger;
public InfoService(DatabaseService databaseService, JdbcTemplate mysqlJdbcTemplate, TransactionTemplate mysqlTransactionTemplate) {
public InfoService(DatabaseService databaseService, JdbcTemplate mysqlJdbcTemplate, TransactionTemplate mysqlTransactionTemplate, SQLLoggerProvider.SQLLogger sqlLogger) {
this.databaseService = databaseService;
this.mysqlJdbcTemplate = mysqlJdbcTemplate;
this.mysqlTransactionTemplate = mysqlTransactionTemplate;
this.sqlLogger = sqlLogger;
}
private static String generateVersionTableIdCriteria(Boolean scheduled) {
return SqlBuilder.select(StrUtil.format("concat({}, '-', {})", TABLE_VERSION_FLINK_JOB_ID, TABLE_VERSION_ALIAS))
.from(TABLE_VERSION)
.whereEq(TABLE_VERSION_SCHEDULED, scheduled)
.andEq(TABLE_VERSION_VERSION, Column.as("date_format(subdate(current_date(), 1), '%Y%m%d')"))
.build();
}
private static String column(Alias table, String column) {
return StrUtil.format("{}.{}", table.getAlias(), column);
}
@Cacheable(value = "sync-state", sync = true, key = "#flinkJobId.toString()+#alias")
@@ -160,22 +198,21 @@ public class InfoService {
).build(),
Long.class
);
List<JobIdAndAlias> list = mysqlJdbcTemplate.query(
generateJobIdAndAliasCriteria(
SqlBuilder.select(TABLE_FLINK_JOB_ID, TABLE_INFO_ALIAS),
page,
count,
flinkJobId,
alias,
orderField,
orderDirection,
selectHudiTableType,
selectedRunMode,
selectedCompactionStatus,
true
).build(),
(rs, row) -> new JobIdAndAlias(rs.getLong(1), rs.getString(2))
);
String listSQL = generateJobIdAndAliasCriteria(
SqlBuilder.select(TABLE_FLINK_JOB_ID, TABLE_INFO_ALIAS),
page,
count,
flinkJobId,
alias,
orderField,
orderDirection,
selectHudiTableType,
selectedRunMode,
selectedCompactionStatus,
true
).build();
sqlLogger.log(listSQL, "findAllJobIdAndAlias");
List<JobIdAndAlias> list = mysqlJdbcTemplate.query(listSQL, (rs, row) -> new JobIdAndAlias(rs.getLong(1), rs.getString(2)));
return new PageResponse<>(list, total);
});
}
@@ -216,19 +253,13 @@ public class InfoService {
return databaseService.getTableMeta(flinkJobId, alias);
}
private static String generateVersionTableIdCriteria(Boolean scheduled) {
return SqlBuilder.select(StrUtil.format("concat({}, '-', {})", TABLE_VERSION_FLINK_JOB_ID, TABLE_VERSION_ALIAS))
.from(TABLE_VERSION)
.whereEq(TABLE_VERSION_SCHEDULED, scheduled)
.andEq(TABLE_VERSION_VERSION, Column.as("date_format(subdate(current_date(), 1), '%Y%m%d')"))
.build();
}
@Cacheable("un-updated-version-table")
@Retryable(Throwable.class)
public ImmutableList<String> nonUpdatedVersionTables() {
return mysqlTransactionTemplate.execute(status -> {
List<String> ids = mysqlJdbcTemplate.queryForList(generateVersionTableIdCriteria(false), String.class);
String listSQL = generateVersionTableIdCriteria(false);
sqlLogger.log(listSQL, "nonUpdatedVersionTables");
List<String> ids = mysqlJdbcTemplate.queryForList(listSQL, String.class);
return Lists.immutable.ofAll(ids);
});
}
@@ -237,35 +268,13 @@ public class InfoService {
@Retryable(Throwable.class)
public ImmutableList<String> updatedVersionTables() {
return mysqlTransactionTemplate.execute(status -> {
List<String> ids = mysqlJdbcTemplate.queryForList(generateVersionTableIdCriteria(true), String.class);
String listSQL = generateVersionTableIdCriteria(true);
sqlLogger.log(listSQL, "updatedVersionTables");
List<String> ids = mysqlJdbcTemplate.queryForList(listSQL, String.class);
return Lists.immutable.ofAll(ids);
});
}
private static String column(Alias table, String column) {
return StrUtil.format("{}.{}", table.getAlias(), column);
}
private static final String COUNT = "count(*)";
private static final Alias TABLE_VERSION = Alias.of(StrUtil.format("{}.tb_app_collect_table_version", DATABASE_NAME), "tactv");
private static final String TABLE_VERSION_FLINK_JOB_ID = column(TABLE_VERSION, "flink_job_id");
private static final String TABLE_VERSION_ALIAS = column(TABLE_VERSION, "alias");
private static final String TABLE_VERSION_VERSION = column(TABLE_VERSION, "version");
private static final String TABLE_VERSION_SCHEDULED = column(TABLE_VERSION, "scheduled");
private static final Alias TABLE_INFO = Alias.of(StrUtil.format("{}.tb_app_collect_table_info", DATABASE_NAME), "tacti");
private static final String TABLE_INFO_FLINK_JOB_ID = column(TABLE_INFO, "flink_job_id");
private static final String TABLE_INFO_ALIAS = column(TABLE_INFO, "alias");
private static final String TABLE_INFO_PRIORITY = column(TABLE_INFO, "priority");
private static final String TABLE_INFO_STATUS = column(TABLE_INFO, "status");
private static final String TABLE_INFO_TARGET_HDFS = column(TABLE_INFO, "tgt_hdfs_path");
private static final String TABLE_INFO_TARGET_TABLE_TYPE = column(TABLE_INFO, "tgt_table_type");
private static final Alias TABLE_SYNC_STATE = Alias.of(StrUtil.format("{}.tb_app_hudi_sync_state", DATABASE_NAME), "tahss");
private static final String TABLE_SYNC_STATE_ID = column(TABLE_SYNC_STATE, "id");
private static final String TABLE_SYNC_STATE_COMPACTION_STATE = column(TABLE_SYNC_STATE, "compaction_status");
private SqlBuilder generateVersionTableCriteria(
SelectSqlBuilder builder,
Integer page,
@@ -351,26 +360,33 @@ public class InfoService {
.toMap()
.collectValues((key, list) -> list.getOnly().getValue())
.toImmutable();
String listSQL = generateVersionTableCriteria(
SqlBuilder.select(
TABLE_INFO_FLINK_JOB_ID,
TABLE_INFO_ALIAS,
TABLE_VERSION_VERSION,
TABLE_VERSION_SCHEDULED
),
page,
count,
version,
flinkJobId,
alias,
order,
direction,
filterSchedules,
true,
false
).build();
sqlLogger.log(listSQL, "findAllVersionTables");
List<VersionUpdated> list = mysqlJdbcTemplate.query(
generateVersionTableCriteria(
SqlBuilder.select(
TABLE_INFO_FLINK_JOB_ID,
TABLE_INFO_ALIAS,
TABLE_VERSION_VERSION,
TABLE_VERSION_SCHEDULED
),
page,
count,
version,
flinkJobId,
alias,
order,
direction,
filterSchedules,
true,
false
).build(),
(rs, row) -> new VersionUpdated(rs.getLong(1), rs.getString(2), rs.getString(3), rs.getBoolean(4))
listSQL,
(rs, row) -> new VersionUpdated(
rs.getLong(1),
rs.getString(2),
rs.getString(3),
rs.getBoolean(4)
)
);
return new PageResponse<>(list, total)
.withMetadata("scheduled", scheduleCount.getOrDefault(true, 0))
@@ -591,11 +607,6 @@ public class InfoService {
);
}
private static final Alias TABLE_COMPACTION_METRICS = Alias.of(StrUtil.format("{}.tb_app_hudi_compaction_metrics", DATABASE_NAME), "tahcm");
private static final String TABLE_COMPACTION_METRICS_TYPE = column(TABLE_COMPACTION_METRICS, "type");
private static final String TABLE_COMPACTION_METRICS_FLINK_JOB_ID = column(TABLE_COMPACTION_METRICS, "flink_job_id");
private static final String TABLE_COMPACTION_METRICS_ALIAS = column(TABLE_COMPACTION_METRICS, "alias");
private SqlBuilder generateCompactionMetricsCriteria(
SelectSqlBuilder builder,
Integer page,
@@ -746,11 +757,6 @@ public class InfoService {
});
}
private static final Alias TABLE_FLINK_JOB = Alias.of(StrUtil.format("{}.tb_app_flink_job_config", DATABASE_NAME), "tafjc");
private static final String TABLE_FLINK_JOB_ID = column(TABLE_FLINK_JOB, "id");
private static final String TABLE_FLINK_JOB_STATUS = column(TABLE_FLINK_JOB, "status");
private static final String TABLE_FLINK_JOB_RUN_MODE = column(TABLE_FLINK_JOB, "run_mode");
@Cacheable(value = "exists-table", sync = true)
@Retryable(Throwable.class)
public Boolean existsTable(Long flinkJobId, String alias) {