diff --git a/service-api/src/main/java/com/lanyuanxiaoyao/service/api/controller/ApiController.java b/service-api/src/main/java/com/lanyuanxiaoyao/service/api/controller/ApiController.java index ce50302..3f14481 100644 --- a/service-api/src/main/java/com/lanyuanxiaoyao/service/api/controller/ApiController.java +++ b/service-api/src/main/java/com/lanyuanxiaoyao/service/api/controller/ApiController.java @@ -163,4 +163,14 @@ public class ApiController { logger.info(makeMarker(flinkJobId, alias), "Compaction latest operation time: {} {} {}", flinkJobId, alias, latestOpTs); syncStateService.saveCompactionLatestOperationTime(flinkJobId, alias, latestOpTs); } + + @GetMapping("sync_receive_time") + public void syncReceiveTime( + @RequestParam("flink_job_id") Long flinkJobId, + @RequestParam("alias") String alias, + @RequestParam("receive_time") Long receiveTime + ) { + logger.info(makeMarker(flinkJobId, alias), "Source receive time: {} {} {}", flinkJobId, alias, receiveTime); + syncStateService.saveSyncReceiveTime(flinkJobId, alias, receiveTime); + } } diff --git a/service-api/src/main/java/com/lanyuanxiaoyao/service/api/service/SyncStateService.java b/service-api/src/main/java/com/lanyuanxiaoyao/service/api/service/SyncStateService.java index 8728c5f..6f79f17 100644 --- a/service-api/src/main/java/com/lanyuanxiaoyao/service/api/service/SyncStateService.java +++ b/service-api/src/main/java/com/lanyuanxiaoyao/service/api/service/SyncStateService.java @@ -250,7 +250,9 @@ public class SyncStateService { } public void saveCompactionLatestOperationTime(Long flinkJobId, String alias, Long latestOperationTime) { - Date operationDate = latestOperationTime == 0 ? null : Date.from(Instant.ofEpochMilli(latestOperationTime)); + Date operationDate = (ObjectUtil.isNull(latestOperationTime) || latestOperationTime == 0) + ? null + : Date.from(Instant.ofEpochMilli(latestOperationTime)); jdbcTemplate.update( SqlBuilder .insertInto( @@ -275,4 +277,24 @@ public class SyncStateService { operationDate ); } + + public void saveSyncReceiveTime(Long flinkJobId, String alias, Long sourceReceiveTime) { + Date sourceReceiveDate = (ObjectUtil.isNull(sourceReceiveTime) || sourceReceiveTime == 0) + ? null + : Date.from(Instant.ofEpochMilli(sourceReceiveTime)); + jdbcTemplate.update( + SqlBuilder + .insertInto( + TbAppHudiSyncState._origin_, + TbAppHudiSyncState.ID_O, + TbAppHudiSyncState.SOURCE_RECEIVE_TIME_O + ) + .values() + .addValue(null, null) + .onDuplicateKeyUpdateColumn(TbAppHudiSyncState.SOURCE_RECEIVE_TIME_O) + .precompileSql(), + syncStateId(flinkJobId, alias), + sourceReceiveDate + ); + } } diff --git a/service-common/src/main/java/com/lanyuanxiaoyao/service/common/SQLConstants.java b/service-common/src/main/java/com/lanyuanxiaoyao/service/common/SQLConstants.java index 9afdaf5..5519486 100644 --- a/service-common/src/main/java/com/lanyuanxiaoyao/service/common/SQLConstants.java +++ b/service-common/src/main/java/com/lanyuanxiaoyao/service/common/SQLConstants.java @@ -45,11 +45,11 @@ public interface SQLConstants { */ String ALIAS_A = _alias_.getAlias() + "." + ALIAS_O; /** - * 字段 flink_job_id 原始值 flink_job_id Flink Job Id + * 字段 flink_job_id 原始值 flink_job_id Flink Job Id */ String FLINK_JOB_ID_O = "flink_job_id"; /** - * 字段 flink_job_id 别名值 tacti.flink_job_id Flink Job Id + * 字段 flink_job_id 别名值 tacti.flink_job_id Flink Job Id */ String FLINK_JOB_ID_A = _alias_.getAlias() + "." + FLINK_JOB_ID_O; /** @@ -618,6 +618,22 @@ public interface SQLConstants { * 字段 type 别名值 tahcm.type 类型 */ String TYPE_A = _alias_.getAlias() + "." + TYPE_O; + /** + * 字段 source_schema 原始值 source_schema 库名 + */ + String SOURCE_SCHEMA_O = "source_schema"; + /** + * 字段 source_schema 别名值 tahcm.source_schema 库名 + */ + String SOURCE_SCHEMA_A = _alias_.getAlias() + "." + SOURCE_SCHEMA_O; + /** + * 字段 source_table 原始值 source_table 表名 + */ + String SOURCE_TABLE_O = "source_table"; + /** + * 字段 source_table 别名值 tahcm.source_table 表名 + */ + String SOURCE_TABLE_A = _alias_.getAlias() + "." + SOURCE_TABLE_O; /** * 字段 compaction_plan_instant 原始值 compaction_plan_instant 压缩计划时间点 */ @@ -910,6 +926,14 @@ public interface SQLConstants { * 字段 source_start_time 别名值 tahss.source_start_time 同步启动时间 */ String SOURCE_START_TIME_A = _alias_.getAlias() + "." + SOURCE_START_TIME_O; + /** + * 字段 source_receive_time 原始值 source_receive_time pulsar最后接收消息的时间 + */ + String SOURCE_RECEIVE_TIME_O = "source_receive_time"; + /** + * 字段 source_receive_time 别名值 tahss.source_receive_time pulsar最后接收消息的时间 + */ + String SOURCE_RECEIVE_TIME_A = _alias_.getAlias() + "." + SOURCE_RECEIVE_TIME_O; /** * 字段 source_checkpoint_time 原始值 source_checkpoint_time 同步检查时间 */ diff --git a/utils/sync/src/main/java/com/lanyuanxiaoyao/service/sync/functions/PulsarMessage2RecordFunction.java b/utils/sync/src/main/java/com/lanyuanxiaoyao/service/sync/functions/PulsarMessage2RecordFunction.java index 9c10dd1..c8e32ab 100644 --- a/utils/sync/src/main/java/com/lanyuanxiaoyao/service/sync/functions/PulsarMessage2RecordFunction.java +++ b/utils/sync/src/main/java/com/lanyuanxiaoyao/service/sync/functions/PulsarMessage2RecordFunction.java @@ -30,7 +30,10 @@ import org.slf4j.LoggerFactory; */ public class PulsarMessage2RecordFunction extends RichMapFunction implements CheckpointedFunction { private static final Logger logger = LoggerFactory.getLogger(PulsarMessage2RecordFunction.class); - private static final AtomicReference lastOperationTime = new AtomicReference<>(""); + /** + * 最后操作时间 + */ + private static final AtomicReference latestOperationTime = new AtomicReference<>(""); private final static DateTimeFormatter FORMATTER = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"); private static final Pattern OPTS_PATTERN = Pattern.compile("^\\d{4}-\\d{2}-\\d{2} \\d{2}:\\d{2}:\\d{2}$"); private final GlobalConfiguration globalConfiguration; @@ -50,7 +53,7 @@ public class PulsarMessage2RecordFunction extends RichMapFunction lastMessageId = new AtomicReference<>(); - private final AtomicLong lastPublishTime = new AtomicLong(0); + private final AtomicLong latestPublishTime = new AtomicLong(0); + private final AtomicLong latestReceiveTime = new AtomicLong(0); private final RateMetric messageReceiveMetric; private final MessageSizeSizeMetric messageSizeReceiveMetric; private final Map messageIdMap = new ConcurrentHashMap<>(); @@ -143,7 +152,8 @@ public class PulsarMessageSourceReader extends RichParallelSourceFunction + HttpUtil.createGet( + StrUtil.format( + "{}/api/sync_receive_time?flink_job_id={}&alias={}&receive_time={}", + LoadBalance.getCustomPublishUrl(configuration), + flinkJob.getId(), + tableMeta.getAlias(), + receiveTime + ) + ) + .header(Constants.API_HEADER_NAME, Constants.API_VERSION) + .basicAuth(Constants.SPRING_SECURITY_USERNAME, Constants.SPRING_SECURITY_PASSWORD_PLAIN) + .timeout(HTTP_TIMEOUT) + .execute() + ); + } }