feature(info-query): 增加历史压缩情况查询

This commit is contained in:
2023-06-14 15:19:56 +08:00
parent 437b2188b7
commit edbf54a519
5 changed files with 301 additions and 7 deletions

View File

@@ -5,10 +5,12 @@ import com.eshore.odcp.hudi.connector.entity.FlinkJob;
import com.eshore.odcp.hudi.connector.entity.SyncState;
import com.eshore.odcp.hudi.connector.entity.TableMeta;
import com.lanyuanxiaoyao.service.configuration.entity.PageResponse;
import com.lanyuanxiaoyao.service.configuration.entity.info.CompactionMetrics;
import com.lanyuanxiaoyao.service.configuration.entity.info.JobAndMetas;
import com.lanyuanxiaoyao.service.configuration.entity.info.JobIdAndAlias;
import com.lanyuanxiaoyao.service.configuration.entity.info.VersionUpdated;
import com.lanyuanxiaoyao.service.info.service.InfoService;
import java.util.List;
import org.eclipse.collections.api.factory.Lists;
import org.eclipse.collections.api.list.ImmutableList;
import org.slf4j.Logger;
@@ -18,8 +20,6 @@ import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
import java.util.List;
/**
* 信息接口
*
@@ -194,4 +194,25 @@ public class InfoController {
public Long hiveFocusCount() {
return infoService.hiveFocusCount();
}
@GetMapping("/compaction_metrics")
public PageResponse<CompactionMetrics> compactionMetrics(
@RequestParam(value = "page", defaultValue = "1") Integer page,
@RequestParam(value = "count", defaultValue = "10") Integer count,
@RequestParam(value = "flink_job_id") Long flinkJobId,
@RequestParam(value = "alias") String alias,
@RequestParam(value = "order", required = false) String order,
@RequestParam(value = "direction", required = false) String direction,
@RequestParam(value = "filter_completes", required = false) List<Boolean> filterCompletes
) {
return infoService.findAllCompactionMetrics(
page,
count,
flinkJobId,
alias,
order,
direction,
ObjectUtil.isNull(filterCompletes) ? Lists.immutable.empty() : Lists.immutable.ofAll(filterCompletes)
);
}
}

View File

@@ -13,9 +13,11 @@ import com.eshore.odcp.hudi.connector.entity.SyncState;
import com.eshore.odcp.hudi.connector.entity.TableMeta;
import com.eshore.odcp.hudi.connector.utils.database.DatabaseService;
import com.lanyuanxiaoyao.service.configuration.entity.PageResponse;
import com.lanyuanxiaoyao.service.configuration.entity.info.CompactionMetrics;
import com.lanyuanxiaoyao.service.configuration.entity.info.JobAndMetas;
import com.lanyuanxiaoyao.service.configuration.entity.info.JobIdAndAlias;
import com.lanyuanxiaoyao.service.configuration.entity.info.VersionUpdated;
import java.sql.Timestamp;
import java.util.List;
import org.eclipse.collections.api.factory.Lists;
import org.eclipse.collections.api.list.ImmutableList;
@@ -554,4 +556,129 @@ public class InfoService {
Long.class
);
}
private static final Alias TABLE_COMPACTION_METRICS = Alias.of(StrUtil.format("{}.tb_app_hudi_compaction_metrics", DATABASE_NAME), "tahcm");
private static final String TABLE_COMPACTION_METRICS_TYPE = column(TABLE_COMPACTION_METRICS, "type");
private static final String TABLE_COMPACTION_METRICS_FLINK_JOB_ID = column(TABLE_COMPACTION_METRICS, "flink_job_id");
private static final String TABLE_COMPACTION_METRICS_ALIAS = column(TABLE_COMPACTION_METRICS, "alias");
private SqlBuilder generateCompactionMetricsCriteria(
SelectSqlBuilder builder,
Integer page,
Integer count,
Long flinkJobId,
String alias,
String order,
String direction,
ImmutableList<Boolean> filterCompletes,
boolean limited
) {
int limit = Math.max(count, 1);
int offset = limit * Math.max(page - 1, 0);
Alias m1 = Alias.of(
SqlBuilder.selectAll()
.from(TABLE_COMPACTION_METRICS)
.whereEq(TABLE_COMPACTION_METRICS_TYPE, "pre")
.andEq(TABLE_COMPACTION_METRICS_FLINK_JOB_ID, flinkJobId)
.andEq(TABLE_COMPACTION_METRICS_ALIAS, alias),
"m1"
);
Alias m2 = Alias.of(
SqlBuilder.selectAll()
.from(TABLE_COMPACTION_METRICS)
.whereEq(TABLE_COMPACTION_METRICS_TYPE, "complete")
.andEq(TABLE_COMPACTION_METRICS_FLINK_JOB_ID, flinkJobId)
.andEq(TABLE_COMPACTION_METRICS_ALIAS, alias),
"m2"
);
return builder
.from(m1)
.leftJoin(m2)
.onEq(column(m1, "flink_job_id"), Column.as(column(m2, "flink_job_id")))
.andEq(column(m1, "alias"), Column.as(column(m2, "alias")))
.andEq(column(m1, "application_id"), Column.as(column(m2, "application_id")))
.andEq(column(m1, "compaction_plan_instant"), Column.as(column(m2, "compaction_plan_instant")))
.whereNotNull(filterCompletes.anySatisfy(b -> b), column(m2, "type"))
.orNull(filterCompletes.anySatisfy(b -> !b), column(m2, "type"))
.orderBy(StrUtil.isNotBlank(order) && StrUtil.isNotBlank(direction), StrUtil.format("m1.{} {}", order, direction))
.limit(limited, offset, count);
}
@Cacheable(value = "compaction-metrics", sync = true)
@Retryable(Throwable.class)
public PageResponse<CompactionMetrics> findAllCompactionMetrics(
Integer page,
Integer count,
Long flinkJobId,
String alias,
String order,
String direction,
ImmutableList<Boolean> filterCompletes
) {
return mysqlTransactionTemplate.execute(status -> {
Long total = mysqlJdbcTemplate.queryForObject(
generateCompactionMetricsCriteria(
SqlBuilder.select(COUNT),
page,
count,
flinkJobId,
alias,
order,
direction,
filterCompletes,
false
).build(),
Long.class
);
List<CompactionMetrics> list = mysqlJdbcTemplate.query(
generateCompactionMetricsCriteria(
SqlBuilder.select(
"m1.flink_job_id",
"m1.alias",
"m1.application_id",
"m1.cluster",
"m1.compaction_plan_instant",
"m2.type is not null as is_complete",
"m1.update_time as started_time",
"m2.update_time as finished_time"
),
page,
count,
flinkJobId,
alias,
order,
direction,
filterCompletes,
true
).build(),
(rs, row) -> {
boolean isComplete = rs.getBoolean(6);
if (ObjectUtil.isNull(isComplete)) {
isComplete = false;
}
Timestamp startedTimestamp = rs.getTimestamp(7);
long startedTime = 0;
if (ObjectUtil.isNotNull(startedTimestamp)) {
startedTime = startedTimestamp.getTime();
}
Timestamp finishedTimestamp = rs.getTimestamp(8);
long finishedTime = 0;
if (ObjectUtil.isNotNull(finishedTimestamp)) {
finishedTime = finishedTimestamp.getTime();
}
return new CompactionMetrics(
rs.getLong(1),
rs.getString(2),
rs.getString(3),
rs.getString(4),
rs.getString(5),
isComplete,
startedTime,
finishedTime
);
}
);
return new PageResponse<>(list, total);
});
}
}

View File

@@ -1,7 +1,11 @@
import club.kingon.sql.builder.SelectSqlBuilder;
import club.kingon.sql.builder.SqlBuilder;
import club.kingon.sql.builder.entry.Alias;
import club.kingon.sql.builder.entry.Column;
import cn.hutool.core.util.StrUtil;
import cn.hutool.db.sql.SqlFormatter;
import org.eclipse.collections.api.factory.Lists;
import org.eclipse.collections.api.list.ImmutableList;
import static com.eshore.odcp.hudi.connector.Constants.DATABASE_NAME;
@@ -31,13 +35,73 @@ public class SqlBuilderTests {
return StrUtil.format("{}.{}", table.getAlias(), column);
}
private static final Alias TABLE_COMPACTION_METRICS = Alias.of(StrUtil.format("{}.tb_app_hudi_compaction_metrics", DATABASE_NAME), "tahcm");
private static final String TABLE_COMPACTION_METRICS_TYPE = column(TABLE_COMPACTION_METRICS, "type");
private static final String TABLE_COMPACTION_METRICS_FLINK_JOB_ID = column(TABLE_COMPACTION_METRICS, "flink_job_id");
private static final String TABLE_COMPACTION_METRICS_ALIAS = column(TABLE_COMPACTION_METRICS, "alias");
private static SqlBuilder generateCompactionMetricsCriteria(
SelectSqlBuilder builder,
Integer page,
Integer count,
Long flinkJobId,
String alias,
String order,
String direction,
ImmutableList<Boolean> filterCompletes
) {
int limit = Math.max(count, 1);
int offset = limit * Math.max(page - 1, 0);
Alias m1 = Alias.of(
SqlBuilder.selectAll()
.from(TABLE_COMPACTION_METRICS)
.whereEq(TABLE_COMPACTION_METRICS_TYPE, "pre")
.andEq(TABLE_COMPACTION_METRICS_FLINK_JOB_ID, flinkJobId)
.andEq(TABLE_COMPACTION_METRICS_ALIAS, alias),
"m1"
);
Alias m2 = Alias.of(
SqlBuilder.selectAll()
.from(TABLE_COMPACTION_METRICS)
.whereEq(TABLE_COMPACTION_METRICS_TYPE, "complete")
.andEq(TABLE_COMPACTION_METRICS_FLINK_JOB_ID, flinkJobId)
.andEq(TABLE_COMPACTION_METRICS_ALIAS, alias),
"m2"
);
return builder
.from(m1)
.leftJoin(m2)
.onEq(column(m1, "flink_job_id"), Column.as(column(m2, "flink_job_id")))
.andEq(column(m1, "alias"), Column.as(column(m2, "alias")))
.andEq(column(m1, "application_id"), Column.as(column(m2, "application_id")))
.andEq(column(m1, "compaction_plan_instant"), Column.as(column(m2, "compaction_plan_instant")))
.whereNotNull(filterCompletes.anySatisfy(b -> b), column(m2, "type"))
.orNull(filterCompletes.anySatisfy(b -> !b), column(m2, "type"))
.orderBy(StrUtil.isNotBlank(order) && StrUtil.isNotBlank(direction), StrUtil.format("m1.{} {}", order, direction))
.limit(offset, count);
}
public static void main(String[] args) {
System.out.println(SqlFormatter.format(
SqlBuilder.select("count(distinct concat(src_schema, src_table))")
.from(TABLE_INFO)
.whereGe(TABLE_INFO_PRIORITY, 10000)
.andEq(TABLE_INFO_STATUS, "y")
.build()
generateCompactionMetricsCriteria(
SqlBuilder.select(
"m1.flink_job_id",
"m1.alias",
"m1.application_id",
"m1.cluster",
"m1.compaction_plan_instant",
"m2.type is not null as is_complete",
"m1.update_time as started_time",
"m2.update_time as finished_time"
),
1,
100,
1542097996099055616L,
"acct_acct_item_fs",
"update_time",
"asc",
Lists.immutable.of(false, true)
).build()
));
}
}