feature(info-query): 增加hudi表类型过滤

This commit is contained in:
2023-07-11 17:43:43 +08:00
parent 2ac1e0be2c
commit 5abfe2d017
7 changed files with 201 additions and 88 deletions

View File

@@ -45,6 +45,7 @@ public class InfoController {
@RequestParam(value = "alias", required = false) String alias,
@RequestParam(value = "order", required = false) String order,
@RequestParam(value = "direction", required = false) String direction,
@RequestParam(value = "filter_hudi_table_type", required = false) List<String> selectHudiTableType,
@RequestParam(value = "filter_run_mode", required = false) List<String> selectedRunMode,
@RequestParam(value = "filter_compaction_status", required = false) List<String> selectedCompactionStatus
) {
@@ -55,6 +56,7 @@ public class InfoController {
alias,
order,
direction,
Lists.immutable.ofAll(selectHudiTableType),
Lists.immutable.ofAll(selectedRunMode),
Lists.immutable.ofAll(selectedCompactionStatus)
);

View File

@@ -59,6 +59,33 @@ public class InfoService {
return databaseService.getSyncState(flinkJobId, alias);
}
private SqlBuilder generateJobIdAndAliasCriteria(
SelectSqlBuilder builder,
Integer page,
Integer count,
Long flinkJobId,
String alias,
String order,
String direction,
ImmutableList<String> selectHudiTableType,
ImmutableList<String> selectedRunMode,
ImmutableList<String> selectedCompactionStatus,
boolean limited
) {
int limit = Math.max(count, 1);
int offset = limit * Math.max(page - 1, 0);
return builder.from(TABLE_FLINK_JOB, TABLE_INFO, TABLE_SYNC_STATE)
.whereEq(TABLE_FLINK_JOB_ID, Column.as(TABLE_INFO_FLINK_JOB_ID))
.andEq(TABLE_SYNC_STATE_ID, Column.as(StrUtil.format("concat({}, '-', {})", TABLE_FLINK_JOB_ID, TABLE_INFO_ALIAS)))
.andLike(ObjectUtil.isNotNull(flinkJobId), TABLE_FLINK_JOB_ID, flinkJobId)
.andLike(ObjectUtil.isNotNull(alias), TABLE_INFO_ALIAS, alias)
.andIn(ObjectUtil.isNotEmpty(selectHudiTableType), TABLE_INFO_TARGET_TABLE_TYPE, selectHudiTableType)
.andIn(ObjectUtil.isNotEmpty(selectedRunMode), TABLE_FLINK_JOB_RUN_MODE, selectedRunMode)
.andIn(ObjectUtil.isNotEmpty(selectedCompactionStatus), TABLE_SYNC_STATE_COMPACTION_STATE, selectedCompactionStatus)
.orderBy(StrUtil.isNotBlank(order) && StrUtil.isNotBlank(direction), () -> StrUtil.format("{} {}", order, direction))
.limit(limited, offset, limit);
}
@Cacheable(value = "job-id-alias", sync = true)
@Retryable(Throwable.class)
public PageResponse<JobIdAndAlias> findAllJobIdAndAlias(
@@ -68,50 +95,40 @@ public class InfoService {
String alias,
String order,
String direction,
ImmutableList<String> selectHudiTableType,
ImmutableList<String> selectedRunMode,
ImmutableList<String> selectedCompactionStatus
) {
return mysqlTransactionTemplate.execute(status -> {
int limit = Math.max(count, 1);
int offset = limit * Math.max(page - 1, 0);
String searchFlinkJobId = flinkJobId == null ? "" : "and job.id like '%" + flinkJobId + "%'\n";
String searchAlias = alias == null ? "" : "and info.alias like '%" + alias + "%'\n";
String filterRunMode = selectedRunMode == null || selectedRunMode.isEmpty()
? ""
: "and job.run_mode in ('" + selectedRunMode.makeString("','") + "')\n";
String filterCompactionStatus = selectedCompactionStatus == null || selectedCompactionStatus.isEmpty()
? ""
: "and state.compaction_status in ('" + selectedCompactionStatus.makeString("','") + "')\n";
String orderBy = "";
String orderField = null;
String orderDirection = null;
if (order != null && direction != null) {
String orderField = null;
switch (order) {
case "source_start_time":
case "sourceStartTime":
orderField = "state.source_start_time";
orderField = "tahss.source_start_time";
break;
case "source_checkpoint_time":
case "sourceCheckpointTime":
orderField = "state.source_checkpoint_time";
orderField = "tahss.source_checkpoint_time";
break;
case "source_publish_time":
case "sourcePublishTime":
orderField = "state.source_publish_time";
orderField = "tahss.source_publish_time";
break;
case "source_operation_time":
case "sourceOperationTime":
orderField = "state.source_op_time";
orderField = "tahss.source_op_time";
break;
case "compaction_start_time":
case "compactionStartTime":
orderField = "state.compaction_start_time";
orderField = "tahss.compaction_start_time";
break;
case "compaction_finish_time":
case "compactionFinishTime":
orderField = "state.compaction_finish_time";
orderField = "tahss.compaction_finish_time";
break;
}
String orderDirection = null;
switch (direction) {
case "desc":
case "DESC":
@@ -126,37 +143,37 @@ public class InfoService {
orderDirection = "asc";
break;
}
if (ObjectUtil.isNotNull(orderField) && ObjectUtil.isNotNull(direction)) {
orderBy = StrUtil.format("order by {} {}\n", orderField, orderDirection);
}
}
Long total = mysqlJdbcTemplate.queryForObject(
"select count(*)\n" +
"from " + DATABASE_NAME + ".tb_app_flink_job_config job,\n" +
" " + DATABASE_NAME + ".tb_app_collect_table_info info,\n" +
" " + DATABASE_NAME + ".tb_app_hudi_sync_state state\n" +
"where job.id = info.flink_job_id\n" +
"and state.id = concat(job.id, '-', info.alias)\n" +
searchFlinkJobId +
searchAlias +
filterRunMode +
filterCompactionStatus +
orderBy,
generateJobIdAndAliasCriteria(
SqlBuilder.select(COUNT),
page,
count,
flinkJobId,
alias,
orderField,
orderDirection,
selectHudiTableType,
selectedRunMode,
selectedCompactionStatus,
false
).build(),
Long.class
);
List<JobIdAndAlias> list = mysqlJdbcTemplate.query(
"select job.id, info.alias\n" +
"from " + DATABASE_NAME + ".tb_app_flink_job_config job,\n" +
" " + DATABASE_NAME + ".tb_app_collect_table_info info,\n" +
" " + DATABASE_NAME + ".tb_app_hudi_sync_state state\n" +
"where job.id = info.flink_job_id\n" +
"and state.id = concat(job.id, '-', info.alias)\n" +
searchFlinkJobId +
searchAlias +
filterRunMode +
filterCompactionStatus +
orderBy +
"limit " + limit + " offset " + offset,
generateJobIdAndAliasCriteria(
SqlBuilder.select(TABLE_FLINK_JOB_ID, TABLE_INFO_ALIAS),
page,
count,
flinkJobId,
alias,
orderField,
orderDirection,
selectHudiTableType,
selectedRunMode,
selectedCompactionStatus,
true
).build(),
(rs, row) -> new JobIdAndAlias(rs.getLong(1), rs.getString(2))
);
return new PageResponse<>(list, total);
@@ -243,9 +260,11 @@ public class InfoService {
private static final String TABLE_INFO_PRIORITY = column(TABLE_INFO, "priority");
private static final String TABLE_INFO_STATUS = column(TABLE_INFO, "status");
private static final String TABLE_INFO_TARGET_HDFS = column(TABLE_INFO, "tgt_hdfs_path");
private static final String TABLE_INFO_TARGET_TABLE_TYPE = column(TABLE_INFO, "tgt_table_type");
private static final Alias TABLE_SYNC_STATE = Alias.of(StrUtil.format("{}.tb_app_hudi_sync_state", DATABASE_NAME), "tahss");
private static final String TABLE_SYNC_STATE_ID = column(TABLE_SYNC_STATE, "id");
private static final String TABLE_SYNC_STATE_COMPACTION_STATE = column(TABLE_SYNC_STATE, "compaction_status");
private SqlBuilder generateVersionTableCriteria(
SelectSqlBuilder builder,
@@ -730,6 +749,7 @@ public class InfoService {
private static final Alias TABLE_FLINK_JOB = Alias.of(StrUtil.format("{}.tb_app_flink_job_config", DATABASE_NAME), "tafjc");
private static final String TABLE_FLINK_JOB_ID = column(TABLE_FLINK_JOB, "id");
private static final String TABLE_FLINK_JOB_STATUS = column(TABLE_FLINK_JOB, "status");
private static final String TABLE_FLINK_JOB_RUN_MODE = column(TABLE_FLINK_JOB, "run_mode");
@Cacheable(value = "exists-table", sync = true)
@Retryable(Throwable.class)

View File

@@ -1,8 +1,12 @@
import club.kingon.sql.builder.SelectSqlBuilder;
import club.kingon.sql.builder.SqlBuilder;
import club.kingon.sql.builder.entry.Alias;
import club.kingon.sql.builder.entry.Column;
import cn.hutool.core.util.ObjectUtil;
import cn.hutool.core.util.StrUtil;
import cn.hutool.db.sql.SqlFormatter;
import org.eclipse.collections.api.factory.Lists;
import org.eclipse.collections.api.list.ImmutableList;
import static com.eshore.odcp.hudi.connector.Constants.DATABASE_NAME;
@@ -24,9 +28,11 @@ public class SqlBuilderTests {
private static final String TABLE_INFO_ALIAS = column(TABLE_INFO, "alias");
private static final String TABLE_INFO_PRIORITY = column(TABLE_INFO, "priority");
private static final String TABLE_INFO_STATUS = column(TABLE_INFO, "status");
private static final String TABLE_INFO_TARGET_TABLE_TYPE = column(TABLE_INFO, "tgt_table_type");
private static final Alias TABLE_SYNC_STATE = Alias.of(StrUtil.format("{}.tb_app_hudi_sync_state", DATABASE_NAME), "tahss");
private static final String TABLE_SYNC_STATE_ID = column(TABLE_SYNC_STATE, "id");
private static final String TABLE_SYNC_STATE_COMPACTION_STATE = column(TABLE_SYNC_STATE, "compaction_status");
private static String column(Alias table, String column) {
return StrUtil.format("{}.{}", table.getAlias(), column);
@@ -40,15 +46,62 @@ public class SqlBuilderTests {
private static final Alias TABLE_FLINK_JOB = Alias.of(StrUtil.format("{}.tb_app_flink_job_config", DATABASE_NAME), "tafjc");
private static final String TABLE_FLINK_JOB_ID = column(TABLE_FLINK_JOB, "id");
private static final String TABLE_FLINK_JOB_STATUS = column(TABLE_FLINK_JOB, "status");
private static final String TABLE_FLINK_JOB_RUN_MODE = column(TABLE_FLINK_JOB, "run_mode");
private static SqlBuilder generateJobIdAndAliasCriteria(
SelectSqlBuilder builder,
Integer page,
Integer count,
Long flinkJobId,
String alias,
String order,
String direction,
ImmutableList<String> selectHudiTableType,
ImmutableList<String> selectedRunMode,
ImmutableList<String> selectedCompactionStatus,
boolean limited
) {
int limit = Math.max(count, 1);
int offset = limit * Math.max(page - 1, 0);
return builder.from(TABLE_FLINK_JOB, TABLE_INFO, TABLE_SYNC_STATE)
.whereEq(TABLE_FLINK_JOB_ID, Column.as(TABLE_INFO_FLINK_JOB_ID))
.andEq(TABLE_SYNC_STATE_ID, Column.as(StrUtil.format("concat({}, '-', {})", TABLE_FLINK_JOB_ID, TABLE_INFO_ALIAS)))
.andLike(ObjectUtil.isNotNull(flinkJobId), TABLE_FLINK_JOB_ID, flinkJobId)
.andLike(ObjectUtil.isNotNull(alias), TABLE_INFO_ALIAS, alias)
.andIn(ObjectUtil.isNotEmpty(selectHudiTableType), TABLE_INFO_TARGET_TABLE_TYPE, selectHudiTableType)
.andIn(ObjectUtil.isNotEmpty(selectedRunMode), TABLE_FLINK_JOB_RUN_MODE, selectedRunMode)
.andIn(ObjectUtil.isNotEmpty(selectedCompactionStatus), TABLE_SYNC_STATE_COMPACTION_STATE, selectedCompactionStatus)
.orderBy(StrUtil.isNotBlank(order) && StrUtil.isNotBlank(direction), () -> StrUtil.format("{} {}", order, direction))
.limit(limited, offset, limit);
}
public static void main(String[] args) {
System.out.println(SqlFormatter.format(
SqlBuilder.select(StrUtil.format("distinct {}", TABLE_INFO_ALIAS))
.from(TABLE_INFO, TABLE_FLINK_JOB)
.whereEq(TABLE_INFO_FLINK_JOB_ID, Column.as(TABLE_FLINK_JOB_ID))
.andEq(TABLE_FLINK_JOB_STATUS, "y")
.andEq(TABLE_INFO_STATUS, "y")
.build()
/*SqlBuilder.select(StrUtil.format("distinct {}", TABLE_INFO_ALIAS))
.from(TABLE_FLINK_JOB, TABLE_INFO, TABLE_SYNC_STATE)
.whereEq(TABLE_FLINK_JOB_ID, Column.as(TABLE_INFO_FLINK_JOB_ID))
.andEq(TABLE_SYNC_STATE_ID, StrUtil.format("concat({}, '-', {})", TABLE_FLINK_JOB_ID, TABLE_INFO_ALIAS))
.andLike(ObjectUtil.isNotNull(flinkJobId), TABLE_FLINK_JOB_ID, flinkJobId)
.andLike(ObjectUtil.isNotNull(alias), TABLE_INFO_ALIAS, alias)
.andIn(ObjectUtil.isNotEmpty(selectHudiTableType), TABLE_INFO_TARGET_TABLE_TYPE, selectHudiTableType)
.andIn(ObjectUtil.isNotEmpty(selectedRunMode), TABLE_FLINK_JOB_RUN_MODE, selectedRunMode)
.andIn(ObjectUtil.isNotEmpty(selectedCompactionStatus), TABLE_SYNC_STATE_COMPACTION_STATE, selectedCompactionStatus)
.orderBy(StrUtil.isNotBlank(order) && StrUtil.isNotBlank(direction), () -> StrUtil.format("{} {}", order, direction))
.limit(limited, offset, limit)
.build()*/
generateJobIdAndAliasCriteria(
SqlBuilder.selectAll(),
1,
10,
1000L,
"hello",
"dog",
"asc",
Lists.immutable.of("MOR"),
Lists.immutable.of("ONE_IN_ONE", "ALL_IN_ONE"),
Lists.immutable.of("COMPLETE"),
true
).build()
));
}
}