feat(info-query): 增加更新pulsar backlog数据的接口

This commit is contained in:
v-zhangjc9
2024-04-30 18:28:52 +08:00
parent d88fb7f1e9
commit e894c58d7b
5 changed files with 45 additions and 14 deletions

View File

@@ -107,6 +107,9 @@ public interface InfoService {
@Get("/info/sync_state/save") @Get("/info/sync_state/save")
void saveSyncState(@Query("id") String id); void saveSyncState(@Query("id") String id);
@Get("/info/sync_state/save_pulsar_back_log")
void savePulsarBacklog(@Query("flink_job_id") Long flinkJobId, @Query("alias") String alias, @Query("back_log") Long backlog);
@Get("/info/non_updated_version_tables") @Get("/info/non_updated_version_tables")
ImmutableList<String> nonUpdatedVersionTables(); ImmutableList<String> nonUpdatedVersionTables();

View File

@@ -44,4 +44,13 @@ public class SyncStateController {
public void saveCompactionId(@RequestParam("id") String id) { public void saveCompactionId(@RequestParam("id") String id) {
syncStateService.saveSyncState(id); syncStateService.saveSyncState(id);
} }
@GetMapping("/sync_state/save_pulsar_back_log")
public void savePulsarBacklog(
@RequestParam("flink_job_id") Long flinkJobId,
@RequestParam("alias") String alias,
@RequestParam("back_log") Long backlog
) {
syncStateService.savePulsarBacklog(flinkJobId, alias, backlog);
}
} }

View File

@@ -17,7 +17,9 @@ import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.retry.annotation.Retryable; import org.springframework.retry.annotation.Retryable;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import static com.lanyuanxiaoyao.service.common.SQLConstants.HudiCollectBuild.*; import static com.lanyuanxiaoyao.service.common.SQLConstants.HudiCollectBuild.TbAppCollectTableInfo;
import static com.lanyuanxiaoyao.service.common.SQLConstants.HudiCollectBuild.TbAppFlinkJobConfig;
import static com.lanyuanxiaoyao.service.common.SQLConstants.HudiCollectBuild.TbAppHudiSyncState;
/** /**
* Sync State * Sync State
@@ -117,4 +119,21 @@ public class SyncStateService extends BaseService {
"-1:-1:-1" "-1:-1:-1"
); );
} }
public void savePulsarBacklog(Long flinkJobId, String alias, Long backlog) {
mysqlJdbcTemplate.update(
SqlBuilder
.insertInto(
TbAppHudiSyncState._origin_,
TbAppHudiSyncState.ID_O,
TbAppHudiSyncState.PULSAR_BACK_LOG_O
)
.values()
.addValue(Q, Q)
.onDuplicateKeyUpdateColumn(TbAppHudiSyncState.PULSAR_BACK_LOG_O)
.precompileSql(),
StrUtil.format("{}-{}", flinkJobId, alias),
backlog
);
}
} }

View File

@@ -9,7 +9,12 @@ import cn.hutool.core.util.StrUtil;
import cn.hutool.db.sql.SqlUtil; import cn.hutool.db.sql.SqlUtil;
import com.lanyuanxiaoyao.service.common.SQLConstants; import com.lanyuanxiaoyao.service.common.SQLConstants;
import static com.lanyuanxiaoyao.service.common.SQLConstants.HudiCollectBuild.*; import static com.lanyuanxiaoyao.service.common.SQLConstants.HudiCollectBuild.TbAppCollectTableInfo;
import static com.lanyuanxiaoyao.service.common.SQLConstants.HudiCollectBuild.TbAppFlinkJobConfig;
import static com.lanyuanxiaoyao.service.common.SQLConstants.HudiCollectBuild.TbAppGlobalConfig;
import static com.lanyuanxiaoyao.service.common.SQLConstants.HudiCollectBuild.TbAppHudiJobConfig;
import static com.lanyuanxiaoyao.service.common.SQLConstants.HudiCollectBuild.TbAppHudiSyncState;
import static com.lanyuanxiaoyao.service.common.SQLConstants.HudiCollectBuild.TbAppYarnJobConfig;
/** /**
* @author lanyuanxiaoyao * @author lanyuanxiaoyao
@@ -64,17 +69,10 @@ public class SqlBuilderTests {
public static void main(String[] args) { public static void main(String[] args) {
System.out.println(SqlUtil.formatSql( System.out.println(SqlUtil.formatSql(
generateTableMetaList( SqlBuilder.update(TbAppHudiSyncState._origin_)
SqlBuilder.select( .set(TbAppHudiSyncState.PULSAR_BACK_LOG_O, "?")
TbAppFlinkJobConfig.ID_A, .whereEq(TbAppHudiSyncState.ID_O, Column.as("?"))
TbAppCollectTableInfo.ALIAS_A, .build()
TbAppCollectTableInfo.TGT_HDFS_PATH_A,
TbAppCollectTableInfo.SRC_PULSAR_ADDR_A,
TbAppCollectTableInfo.SRC_TOPIC_A
),
null,
null
).build()
)); ));
} }
} }

View File

@@ -17,6 +17,7 @@ import org.slf4j.LoggerFactory;
import org.springframework.scheduling.annotation.Scheduled; import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import static com.lanyuanxiaoyao.service.common.Constants.HALF_HOUR;
import static com.lanyuanxiaoyao.service.common.Constants.MINUTE; import static com.lanyuanxiaoyao.service.common.Constants.MINUTE;
/** /**
@@ -46,7 +47,7 @@ public class PulsarMetrics extends Metrics {
backlogMap = Maps.mutable.empty(); backlogMap = Maps.mutable.empty();
} }
@Scheduled(fixedDelay = MINUTE, initialDelay = MINUTE) @Scheduled(fixedDelay = HALF_HOUR, initialDelay = MINUTE)
@Override @Override
void update() { void update() {
infoService.tableMetaList() infoService.tableMetaList()
@@ -71,6 +72,7 @@ public class PulsarMetrics extends Metrics {
if (StrUtil.isNotBlank(name)) { if (StrUtil.isNotBlank(name)) {
Long backlog = pulsarService.backlog(name, meta.getTopic(), NameHelper.pulsarSubscriptionName(meta.getJob().getId(), meta.getAlias(), hudiServiceProperties.getSignature())); Long backlog = pulsarService.backlog(name, meta.getTopic(), NameHelper.pulsarSubscriptionName(meta.getJob().getId(), meta.getAlias(), hudiServiceProperties.getSignature()));
backlogCache.set(backlog); backlogCache.set(backlog);
infoService.savePulsarBacklog(meta.getJob().getId(), meta.getAlias(), backlog);
} }
} catch (Exception ignored) { } catch (Exception ignored) {
} }