From def035fa9095eb538fbfcd457374e9ed3ec62a8b Mon Sep 17 00:00:00 2001 From: v-zhangjc9 Date: Thu, 9 May 2024 18:12:25 +0800 Subject: [PATCH] =?UTF-8?q?perf(api):=20=E4=BC=98=E5=8C=96api=E6=8C=87?= =?UTF-8?q?=E6=A0=87=E5=AD=98=E5=82=A8=E5=92=8C=E4=BC=A0=E8=BE=93?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 统一命名格式,减少不必要的时间字段传输,统一字段命名,减少默认值的传输 --- .idea/httpRequests/http-requests-log.http | 50 ++++---- .../service/api/controller/ApiController.java | 109 +++++++++------- .../service/api/service/SyncStateService.java | 114 +++++++++-------- .../api/service/VersionUpdateService.java | 4 +- .../functions/CompactionEventHandler.java | 9 +- .../PulsarMessage2RecordFunction.java | 21 +-- .../functions/PulsarMessageSourceReader.java | 15 ++- .../service/sync/utils/StatusUtils.java | 120 ++++++++---------- 8 files changed, 238 insertions(+), 204 deletions(-) diff --git a/.idea/httpRequests/http-requests-log.http b/.idea/httpRequests/http-requests-log.http index 6946787..d5c5e9d 100644 --- a/.idea/httpRequests/http-requests-log.http +++ b/.idea/httpRequests/http-requests-log.http @@ -1,3 +1,29 @@ +GET http://AxhEbscwsJDbYMH2:cYxg3b4PtWoVD5SjFayWxtnSVsjzRsg4@132.126.207.130:35690/hudi_services/service_scheduler/schedule/all +Connection: Keep-Alive +User-Agent: Apache-HttpClient/4.5.14 (Java/17.0.10) +Cookie: JSESSIONID=7A4C34E0240A98C1186F3A2551BC5E80 +Accept-Encoding: br,deflate,gzip,x-gzip + +### + +GET http://AxhEbscwsJDbYMH2:cYxg3b4PtWoVD5SjFayWxtnSVsjzRsg4@132.126.207.130:35690/hudi_services/service_web/cloud/list +Connection: Keep-Alive +User-Agent: Apache-HttpClient/4.5.14 (Java/17.0.10) +Cookie: JSESSIONID=F5F155198FAF72435339CC2E21B873CC +Accept-Encoding: br,deflate,gzip,x-gzip + +<> 2024-05-09T170723.200.json + +### + +GET http://AxhEbscwsJDbYMH2:cYxg3b4PtWoVD5SjFayWxtnSVsjzRsg4@132.126.207.130:35690/hudi_services/hudi_api/api/message_id?flink_job_id=1542097984132706304&alias=crm_cfguse_mkt_cam_strategy_rel +Connection: Keep-Alive +User-Agent: Apache-HttpClient/4.5.14 (Java/17.0.10) +Cookie: JSESSIONID=F5F155198FAF72435339CC2E21B873CC +Accept-Encoding: br,deflate,gzip,x-gzip + +### + POST http://AxhEbscwsJDbYMH2:cYxg3b4PtWoVD5SjFayWxtnSVsjzRsg4@b12s8.hdp.dc:15391/hdfs/write?root=hdfs://b2/apps/datalake/test/test.txt&overwrite=true Content-Type: text/plain Content-Length: 738 @@ -430,27 +456,3 @@ Accept-Encoding: br,deflate,gzip,x-gzip ### -GET http://AxhEbscwsJDbYMH2:cYxg3b4PtWoVD5SjFayWxtnSVsjzRsg4@132.126.207.130:35690/hudi_services/queue/queue/clear?name=compaction-queue-pre -Connection: Keep-Alive -User-Agent: Apache-HttpClient/4.5.14 (Java/17.0.9) -Cookie: JSESSIONID=9AB8D98C10FACE15EA1CB758D79F8877 -Accept-Encoding: br,deflate,gzip,x-gzip - -### - -GET http://AxhEbscwsJDbYMH2:cYxg3b4PtWoVD5SjFayWxtnSVsjzRsg4@132.126.207.130:35690/hudi_services/queue/queue/clear?name=compaction-queue-pre -Connection: Keep-Alive -User-Agent: Apache-HttpClient/4.5.14 (Java/17.0.9) -Cookie: JSESSIONID=9AB8D98C10FACE15EA1CB758D79F8877 -Accept-Encoding: br,deflate,gzip,x-gzip - -### - -GET http://AxhEbscwsJDbYMH2:cYxg3b4PtWoVD5SjFayWxtnSVsjzRsg4@132.126.207.130:35690/hudi_services/queue/queue/clear?name=compaction-queue-pre -Connection: Keep-Alive -User-Agent: Apache-HttpClient/4.5.14 (Java/17.0.9) -Cookie: JSESSIONID=9AB8D98C10FACE15EA1CB758D79F8877 -Accept-Encoding: br,deflate,gzip,x-gzip - -### - diff --git a/service-api/src/main/java/com/lanyuanxiaoyao/service/api/controller/ApiController.java b/service-api/src/main/java/com/lanyuanxiaoyao/service/api/controller/ApiController.java index 3f14481..96dd51c 100644 --- a/service-api/src/main/java/com/lanyuanxiaoyao/service/api/controller/ApiController.java +++ b/service-api/src/main/java/com/lanyuanxiaoyao/service/api/controller/ApiController.java @@ -11,7 +11,13 @@ import org.eclipse.collections.api.factory.Maps; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.slf4j.Marker; -import org.springframework.web.bind.annotation.*; +import org.springframework.web.bind.annotation.GetMapping; +import org.springframework.web.bind.annotation.PostMapping; +import org.springframework.web.bind.annotation.RequestBody; +import org.springframework.web.bind.annotation.RequestMapping; +import org.springframework.web.bind.annotation.RequestParam; +import org.springframework.web.bind.annotation.ResponseBody; +import org.springframework.web.bind.annotation.RestController; /** * Rest 接口 @@ -47,45 +53,65 @@ public class ApiController { public void syncStart( @RequestParam("flink_job_id") Long flinkJobId, @RequestParam("alias") String alias, - @RequestParam(value = "cluster", required = false) String cluster, - @RequestParam(value = "application_id", required = false) String applicationId + @RequestParam("database") String database, + @RequestParam("schema") String schema, + @RequestParam("table") String table, + @RequestParam("cluster") String cluster, + @RequestParam("application_id") String applicationId ) { logger.info(makeMarker(flinkJobId, alias), "Sync start: {} {} {} {}", flinkJobId, alias, cluster, applicationId); - syncStateService.saveSyncStartTime(flinkJobId, alias, cluster, applicationId); + syncStateService.syncStart(flinkJobId, alias, database, schema, table, cluster, applicationId); } - @GetMapping("sync_checkpoint_state") - public void syncCheckpointState( + @GetMapping("sync_checkpoint") + public void syncCheckpoint( + @RequestParam("flink_job_id") Long flinkJobId, + @RequestParam("alias") String alias, + @RequestParam("message_id") String messageId + ) { + logger.info(makeMarker(flinkJobId, alias), "Sync checkpoint: {} {} {}", flinkJobId, alias, messageId); + syncStateService.syncCheckpoint(flinkJobId, alias, messageId); + } + + @GetMapping("sync_latest_publish_time") + public void syncLatestPublishTime( @RequestParam("flink_job_id") Long flinkJobId, @RequestParam("alias") String alias, - @RequestParam("message_id") String messageId, @RequestParam("publish_time") Long publishTime ) { - logger.info(makeMarker(flinkJobId, alias), "Sync checkpoint state: {} {} {} {}", flinkJobId, alias, messageId, publishTime); - syncStateService.saveSyncCheckpointState(flinkJobId, alias, messageId, publishTime); + logger.info(makeMarker(flinkJobId, alias), "Sync latest publish time: {} {} {}", flinkJobId, alias, publishTime); + syncStateService.syncPublishTime(flinkJobId, alias, publishTime); } - @GetMapping("sync_operation_state") - public void syncOperationState( + @GetMapping("sync_latest_receive_time") + public void syncLatestReceiveTime( @RequestParam("flink_job_id") Long flinkJobId, @RequestParam("alias") String alias, - @RequestParam(value = "operation_time", - required = false) Long operationTime + @RequestParam("receive_time") Long receiveTime ) { - logger.info(makeMarker(flinkJobId, alias), "Sync operation state: {} {} {}", flinkJobId, alias, operationTime); - syncStateService.saveSyncOperationState(flinkJobId, alias, operationTime); + logger.info(makeMarker(flinkJobId, alias), "Sync latest receive time: {} {} {}", flinkJobId, alias, receiveTime); + syncStateService.syncLatestReceiveTime(flinkJobId, alias, receiveTime); + } + + @GetMapping("sync_latest_operation_time") + public void syncLatestOperationTime( + @RequestParam("flink_job_id") Long flinkJobId, + @RequestParam("alias") String alias, + @RequestParam("operation_time") Long operationTime + ) { + logger.info(makeMarker(flinkJobId, alias), "Sync latest operation time: {} {} {}", flinkJobId, alias, operationTime); + syncStateService.syncLatestOperationTime(flinkJobId, alias, operationTime); } @GetMapping("compaction_start") public void compactionStart( @RequestParam("flink_job_id") Long flinkJobId, @RequestParam("alias") String alias, - @RequestParam(value = "type", required = false) String type, - @RequestParam(value = "cluster", required = false) String cluster, - @RequestParam(value = "application_id", required = false) String applicationId + @RequestParam("cluster") String cluster, + @RequestParam("application_id") String applicationId ) { logger.info(makeMarker(flinkJobId, alias), "Compaction start: {} {} {} {}", flinkJobId, alias, cluster, applicationId); - syncStateService.saveCompactionStartTime(flinkJobId, alias, cluster, applicationId, null); + syncStateService.compactionStart(flinkJobId, alias, cluster, applicationId); } @PostMapping("compaction_pre_commit") @@ -93,12 +119,12 @@ public class ApiController { @RequestParam("flink_job_id") Long flinkJobId, @RequestParam("alias") String alias, @RequestParam("instant") String instant, - @RequestParam(value = "cluster", required = false) String cluster, - @RequestParam(value = "application_id", required = false) String applicationId, + @RequestParam("cluster") String cluster, + @RequestParam("application_id") String applicationId, @RequestBody HoodieCommitMetadata metadata ) { - logger.info(makeMarker(flinkJobId, alias), "Compaction state: {} {} {} {} {}", flinkJobId, alias, instant, cluster, applicationId); - syncStateService.saveCompactionMetrics( + logger.info(makeMarker(flinkJobId, alias), "Compaction pre commit: {} {} {} {} {}", flinkJobId, alias, instant, cluster, applicationId); + syncStateService.compactionCommit( flinkJobId, alias, "pre", @@ -114,13 +140,13 @@ public class ApiController { @RequestParam("flink_job_id") Long flinkJobId, @RequestParam("alias") String alias, @RequestParam("instant") String instant, - @RequestParam(value = "cluster", required = false) String cluster, - @RequestParam(value = "application_id", required = false) String applicationId, + @RequestParam("cluster") String cluster, + @RequestParam("application_id") String applicationId, @RequestBody HoodieCommitMetadata metadata ) { - logger.info(makeMarker(flinkJobId, alias), "Compaction state: {} {} {} {} {}", flinkJobId, alias, instant, cluster, applicationId); - syncStateService.saveCompactionFinishTime(flinkJobId, alias); - syncStateService.saveCompactionMetrics( + logger.info(makeMarker(flinkJobId, alias), "Compaction commit: {} {} {} {} {}", flinkJobId, alias, instant, cluster, applicationId); + syncStateService.compactionFinish(flinkJobId, alias); + syncStateService.compactionCommit( flinkJobId, alias, "complete", @@ -135,12 +161,11 @@ public class ApiController { public void compactionFinish( @RequestParam("flink_job_id") Long flinkJobId, @RequestParam("alias") String alias, - @RequestParam("time") Long timestamp, @RequestParam("state") Boolean success, @RequestBody(required = false) String exception ) { - logger.info(makeMarker(flinkJobId, alias), "Compaction finish: {} {} {} {}", flinkJobId, alias, timestamp, success); - syncStateService.saveCompactionFinishTime(flinkJobId, alias, timestamp, StrUtil.isBlank(exception) ? null : exception); + logger.info(makeMarker(flinkJobId, alias), "Compaction finish: {} {} {}", flinkJobId, alias, success); + syncStateService.compactionFinish(flinkJobId, alias, StrUtil.isBlank(exception) ? null : exception); } @GetMapping("version_update") @@ -148,29 +173,19 @@ public class ApiController { @RequestParam("flink_job_id") Long flinkJobId, @RequestParam("alias") String alias, @RequestParam("version") String version, - @RequestParam(value = "opts", defaultValue = "") String opTs + @RequestParam("operation_time") String operationTime ) { - logger.info(makeMarker(flinkJobId, alias), "Version update: {} {} {}", flinkJobId, alias, version); - versionUpdateService.saveUpdateVersion(flinkJobId, alias, version, opTs); + logger.info(makeMarker(flinkJobId, alias), "Version update: {} {} {} {}", flinkJobId, alias, version, operationTime); + versionUpdateService.versionUpdate(flinkJobId, alias, version, operationTime); } @GetMapping("compaction_latest_operation_time") public void compactionLatestOperationTime( @RequestParam("flink_job_id") Long flinkJobId, @RequestParam("alias") String alias, - @RequestParam("latest_op_ts") Long latestOpTs + @RequestParam("operation_time") Long operationTime ) { - logger.info(makeMarker(flinkJobId, alias), "Compaction latest operation time: {} {} {}", flinkJobId, alias, latestOpTs); - syncStateService.saveCompactionLatestOperationTime(flinkJobId, alias, latestOpTs); - } - - @GetMapping("sync_receive_time") - public void syncReceiveTime( - @RequestParam("flink_job_id") Long flinkJobId, - @RequestParam("alias") String alias, - @RequestParam("receive_time") Long receiveTime - ) { - logger.info(makeMarker(flinkJobId, alias), "Source receive time: {} {} {}", flinkJobId, alias, receiveTime); - syncStateService.saveSyncReceiveTime(flinkJobId, alias, receiveTime); + logger.info(makeMarker(flinkJobId, alias), "Compaction latest operation time: {} {} {}", flinkJobId, alias, operationTime); + syncStateService.compactionLatestOperationTime(flinkJobId, alias, operationTime); } } diff --git a/service-api/src/main/java/com/lanyuanxiaoyao/service/api/service/SyncStateService.java b/service-api/src/main/java/com/lanyuanxiaoyao/service/api/service/SyncStateService.java index 6f79f17..440a7d1 100644 --- a/service-api/src/main/java/com/lanyuanxiaoyao/service/api/service/SyncStateService.java +++ b/service-api/src/main/java/com/lanyuanxiaoyao/service/api/service/SyncStateService.java @@ -3,7 +3,6 @@ package com.lanyuanxiaoyao.service.api.service; import club.kingon.sql.builder.SqlBuilder; import cn.hutool.core.date.DateUtil; import cn.hutool.core.util.ObjectUtil; -import cn.hutool.core.util.StrUtil; import com.lanyuanxiaoyao.service.api.entity.HoodieCommitMetadata; import com.lanyuanxiaoyao.service.common.Constants; import java.time.Instant; @@ -57,54 +56,95 @@ public class SyncStateService { return ""; } - public void saveSyncStartTime(Long flinkJobId, String alias, String cluster, String applicationId) { + public void syncStart(Long flinkJobId, String alias, String database, String schema, String table, String cluster, String applicationId) { jdbcTemplate.update( SqlBuilder .insertInto( TbAppHudiSyncState._origin_, TbAppHudiSyncState.ID_O, + TbAppHudiSyncState.SOURCE_DATABASE_O, + TbAppHudiSyncState.SOURCE_SCHEMA_O, + TbAppHudiSyncState.SOURCE_TABLE_O, TbAppHudiSyncState.SOURCE_START_TIME_O, TbAppHudiSyncState.SOURCE_CLUSTER_O, TbAppHudiSyncState.SOURCE_APPLICATION_ID_O ) .values() - .addValue(null, null, null, null) - .onDuplicateKeyUpdateColumn(TbAppHudiSyncState.SOURCE_START_TIME_O) + .addValue(null, null, null, null, null, null, null) + .onDuplicateKeyUpdateColumn(TbAppHudiSyncState.SOURCE_DATABASE_O) + .addUpdateColumn(TbAppHudiSyncState.SOURCE_SCHEMA_O) + .addUpdateColumn(TbAppHudiSyncState.SOURCE_TABLE_O) + .addUpdateColumn(TbAppHudiSyncState.SOURCE_START_TIME_O) .addUpdateColumn(TbAppHudiSyncState.SOURCE_CLUSTER_O) .addUpdateColumn(TbAppHudiSyncState.SOURCE_APPLICATION_ID_O) .precompileSql(), syncStateId(flinkJobId, alias), + database, + schema, + table, now(), cluster, applicationId ); } - public void saveSyncCheckpointState(Long flinkJobId, String alias, String messageId, Long publishTime) { - Date publishDate = Date.from(Instant.ofEpochMilli(publishTime)); + public void syncCheckpoint(Long flinkJobId, String alias, String messageId) { jdbcTemplate.update( SqlBuilder .insertInto( TbAppHudiSyncState._origin_, TbAppHudiSyncState.ID_O, TbAppHudiSyncState.MESSAGE_ID_O, - TbAppHudiSyncState.SOURCE_PUBLISH_TIME_O, TbAppHudiSyncState.SOURCE_CHECKPOINT_TIME_O ) .values() - .addValue(null, null, null, null) + .addValue(null, null, null) .onDuplicateKeyUpdateColumn(TbAppHudiSyncState.MESSAGE_ID_O) - .addUpdateColumn(TbAppHudiSyncState.SOURCE_PUBLISH_TIME_O) .addUpdateColumn(TbAppHudiSyncState.SOURCE_CHECKPOINT_TIME_O) .precompileSql(), syncStateId(flinkJobId, alias), messageId, - publishDate, now() ); } - public void saveSyncOperationState(Long flinkJobId, String alias, Long operationTime) { + public void syncPublishTime(Long flinkJobId, String alias, Long publishTime) { + Date publishDate = ObjectUtil.isNull(publishTime) ? null : Date.from(Instant.ofEpochMilli(publishTime)); + jdbcTemplate.update( + SqlBuilder + .insertInto( + TbAppHudiSyncState._origin_, + TbAppHudiSyncState.ID_O, + TbAppHudiSyncState.SOURCE_PUBLISH_TIME_O + ) + .values() + .addValue(null, null) + .onDuplicateKeyUpdateColumn(TbAppHudiSyncState.SOURCE_PUBLISH_TIME_O) + .precompileSql(), + syncStateId(flinkJobId, alias), + publishDate + ); + } + + public void syncLatestReceiveTime(Long flinkJobId, String alias, Long receiveTime) { + Date receiveDate = ObjectUtil.isNull(receiveTime) ? null : Date.from(Instant.ofEpochMilli(receiveTime)); + jdbcTemplate.update( + SqlBuilder + .insertInto( + TbAppHudiSyncState._origin_, + TbAppHudiSyncState.ID_O, + TbAppHudiSyncState.SOURCE_RECEIVE_TIME_O + ) + .values() + .addValue(null, null) + .onDuplicateKeyUpdateColumn(TbAppHudiSyncState.SOURCE_RECEIVE_TIME_O) + .precompileSql(), + syncStateId(flinkJobId, alias), + receiveDate + ); + } + + public void syncLatestOperationTime(Long flinkJobId, String alias, Long operationTime) { Date operationDate = ObjectUtil.isNull(operationTime) ? null : Date.from(Instant.ofEpochMilli(operationTime)); jdbcTemplate.update( SqlBuilder @@ -122,7 +162,7 @@ public class SyncStateService { ); } - public void saveCompactionStartTime(Long flinkJobId, String alias, String cluster, String applicationId, String message) { + public void compactionStart(Long flinkJobId, String alias, String cluster, String applicationId) { jdbcTemplate.update( SqlBuilder .insertInto( @@ -146,7 +186,7 @@ public class SyncStateService { .precompileSql(), syncStateId(flinkJobId, alias), now(), - message, + null, Constants.COMPACTION_STATUS_START, now(), cluster, @@ -154,12 +194,11 @@ public class SyncStateService { ); } - public void saveCompactionFinishTime(Long flinkJobId, String alias) { - saveCompactionFinishTime(flinkJobId, alias, Instant.now().toEpochMilli(), ""); + public void compactionFinish(Long flinkJobId, String alias) { + compactionFinish(flinkJobId, alias, ""); } - public void saveCompactionFinishTime(Long flinkJobId, String alias, Long timestamp, String message) { - Date finishDate = Date.from(Instant.ofEpochMilli(timestamp)); + public void compactionFinish(Long flinkJobId, String alias, String message) { jdbcTemplate.update( SqlBuilder .insertInto( @@ -178,14 +217,14 @@ public class SyncStateService { .addUpdateColumn(TbAppHudiSyncState.COMPACTION_STATUS_TIME_O) .precompileSql(), syncStateId(flinkJobId, alias), - finishDate, + now(), message, Constants.COMPACTION_STATUS_FINISH, now() ); } - public void saveCompactionMetrics(Long flinkJobId, String alias, String type, String instant, String cluster, String applicationId, HoodieCommitMetadata metadata) { + public void compactionCommit(Long flinkJobId, String alias, String type, String instant, String cluster, String applicationId, HoodieCommitMetadata metadata) { if (ObjectUtil.isNull(metadata)) { jdbcTemplate.update( SqlBuilder @@ -249,10 +288,8 @@ public class SyncStateService { } } - public void saveCompactionLatestOperationTime(Long flinkJobId, String alias, Long latestOperationTime) { - Date operationDate = (ObjectUtil.isNull(latestOperationTime) || latestOperationTime == 0) - ? null - : Date.from(Instant.ofEpochMilli(latestOperationTime)); + public void compactionLatestOperationTime(Long flinkJobId, String alias, Long operationTime) { + Date operationDate = ObjectUtil.isNull(operationTime) ? null : Date.from(Instant.ofEpochMilli(operationTime)); jdbcTemplate.update( SqlBuilder .insertInto( @@ -262,39 +299,10 @@ public class SyncStateService { ) .values() .addValue(null, null) - .onDuplicateKeyUpdateSetter( - StrUtil.format( - "{} = if({} is null, ?, greatest({}, ?))", - TbAppHudiSyncState.COMPACTION_LATEST_OP_TS_O, - TbAppHudiSyncState.COMPACTION_LATEST_OP_TS_O, - TbAppHudiSyncState.COMPACTION_LATEST_OP_TS_O - ) - ) + .onDuplicateKeyUpdateColumn(TbAppHudiSyncState.COMPACTION_LATEST_OP_TS_O) .precompileSql(), syncStateId(flinkJobId, alias), - operationDate, - operationDate, operationDate ); } - - public void saveSyncReceiveTime(Long flinkJobId, String alias, Long sourceReceiveTime) { - Date sourceReceiveDate = (ObjectUtil.isNull(sourceReceiveTime) || sourceReceiveTime == 0) - ? null - : Date.from(Instant.ofEpochMilli(sourceReceiveTime)); - jdbcTemplate.update( - SqlBuilder - .insertInto( - TbAppHudiSyncState._origin_, - TbAppHudiSyncState.ID_O, - TbAppHudiSyncState.SOURCE_RECEIVE_TIME_O - ) - .values() - .addValue(null, null) - .onDuplicateKeyUpdateColumn(TbAppHudiSyncState.SOURCE_RECEIVE_TIME_O) - .precompileSql(), - syncStateId(flinkJobId, alias), - sourceReceiveDate - ); - } } diff --git a/service-api/src/main/java/com/lanyuanxiaoyao/service/api/service/VersionUpdateService.java b/service-api/src/main/java/com/lanyuanxiaoyao/service/api/service/VersionUpdateService.java index d577616..1794caa 100644 --- a/service-api/src/main/java/com/lanyuanxiaoyao/service/api/service/VersionUpdateService.java +++ b/service-api/src/main/java/com/lanyuanxiaoyao/service/api/service/VersionUpdateService.java @@ -30,7 +30,7 @@ public class VersionUpdateService { return Date.from(Instant.now()); } - public void saveUpdateVersion(Long flinkJobId, String alias, String version, String opTs) { + public void versionUpdate(Long flinkJobId, String alias, String version, String operationTime) { jdbcTemplate.update( SqlBuilder .insertInto( @@ -47,7 +47,7 @@ public class VersionUpdateService { flinkJobId, alias, version, - opTs, + operationTime, now() ); } diff --git a/utils/sync/src/main/java/com/lanyuanxiaoyao/service/sync/functions/CompactionEventHandler.java b/utils/sync/src/main/java/com/lanyuanxiaoyao/service/sync/functions/CompactionEventHandler.java index 1cad1cf..711f4e9 100644 --- a/utils/sync/src/main/java/com/lanyuanxiaoyao/service/sync/functions/CompactionEventHandler.java +++ b/utils/sync/src/main/java/com/lanyuanxiaoyao/service/sync/functions/CompactionEventHandler.java @@ -1,5 +1,6 @@ package com.lanyuanxiaoyao.service.sync.functions; +import cn.hutool.core.util.ObjectUtil; import com.lanyuanxiaoyao.service.common.entity.FlinkJob; import com.lanyuanxiaoyao.service.common.entity.TableMeta; import com.lanyuanxiaoyao.service.sync.configuration.GlobalConfiguration; @@ -41,6 +42,7 @@ public class CompactionEventHandler implements CompactEventHandler, Serializable StatusUtils.compactionCommit(configuration, flinkJob, tableMeta, instant, metadata); // Trace latest_op_ts statuses.stream() + .filter(ObjectUtil::isNotNull) .map(status -> { if (status instanceof TraceWriteStatus) { TraceWriteStatus s = (TraceWriteStatus) status; @@ -48,10 +50,11 @@ public class CompactionEventHandler implements CompactEventHandler, Serializable } return 0L; }) + .filter(opts -> opts > 0) .max(Long::compare) - .ifPresent(max -> { - logger.info("Latest op ts: {}", max); - StatusUtils.compactionLatestOpTs(configuration, flinkJob, tableMeta, max); + .ifPresent(time -> { + logger.info("Latest op ts: {}", time); + StatusUtils.compactionLatestOperationTime(configuration, flinkJob, tableMeta, time); }); } diff --git a/utils/sync/src/main/java/com/lanyuanxiaoyao/service/sync/functions/PulsarMessage2RecordFunction.java b/utils/sync/src/main/java/com/lanyuanxiaoyao/service/sync/functions/PulsarMessage2RecordFunction.java index c8e32ab..c58a191 100644 --- a/utils/sync/src/main/java/com/lanyuanxiaoyao/service/sync/functions/PulsarMessage2RecordFunction.java +++ b/utils/sync/src/main/java/com/lanyuanxiaoyao/service/sync/functions/PulsarMessage2RecordFunction.java @@ -1,5 +1,6 @@ package com.lanyuanxiaoyao.service.sync.functions; +import cn.hutool.core.util.ObjectUtil; import cn.hutool.core.util.StrUtil; import com.lanyuanxiaoyao.service.common.entity.FlinkJob; import com.lanyuanxiaoyao.service.common.entity.Record; @@ -64,17 +65,19 @@ public class PulsarMessage2RecordFunction extends RichMapFunction 0) { + StatusUtils.syncLatestPublishTime(globalConfiguration, flinkJob, tableMeta, publishTime); + } + + long receiveTime = latestReceiveTime.get(); + if (receiveTime > 0) { + StatusUtils.syncLatestReceiveTime(globalConfiguration, flinkJob, tableMeta, receiveTime); + } + messageIdMap.remove(checkpointId); } diff --git a/utils/sync/src/main/java/com/lanyuanxiaoyao/service/sync/utils/StatusUtils.java b/utils/sync/src/main/java/com/lanyuanxiaoyao/service/sync/utils/StatusUtils.java index 891e80c..31fc8b5 100644 --- a/utils/sync/src/main/java/com/lanyuanxiaoyao/service/sync/utils/StatusUtils.java +++ b/utils/sync/src/main/java/com/lanyuanxiaoyao/service/sync/utils/StatusUtils.java @@ -1,7 +1,5 @@ package com.lanyuanxiaoyao.service.sync.utils; -import cn.hutool.core.util.EnumUtil; -import cn.hutool.core.util.ObjectUtil; import cn.hutool.core.util.StrUtil; import cn.hutool.http.HttpUtil; import com.lanyuanxiaoyao.service.common.Constants; @@ -10,7 +8,6 @@ import com.lanyuanxiaoyao.service.common.entity.TableMeta; import com.lanyuanxiaoyao.service.sync.configuration.GlobalConfiguration; import com.lanyuanxiaoyao.service.sync.configuration.RetryPolicyProvider; import dev.failsafe.Failsafe; -import java.time.Instant; import java.util.Map; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper; import org.apache.hudi.common.model.HoodieCommitMetadata; @@ -30,7 +27,6 @@ public class StatusUtils { private static final int HTTP_TIMEOUT = (int) Constants.MINUTE; public static void syncStart(GlobalConfiguration configuration, FlinkJob flinkJob, TableMeta tableMeta) { - // logger.info("Enter method: syncStart[configuration, flinkJob, tableMeta]. " + "configuration:" + configuration + "," + "flinkJob:" + flinkJob + "," + "tableMeta:" + tableMeta); try { Failsafe.with(RetryPolicyProvider.HTTP_RETRY) .run(() -> @@ -57,18 +53,16 @@ public class StatusUtils { } } - public static void syncCheckpoint(GlobalConfiguration configuration, FlinkJob flinkJob, TableMeta tableMeta, String messageId, Long publishTime) { - logger.info("Enter method: syncCheckpoint[configuration, flinkJob, tableMeta, messageId, publishTime]. " + "configuration:" + configuration + "," + "flinkJob:" + flinkJob.getId() + "," + "tableMeta:" + tableMeta.getAlias() + "," + "messageId:" + messageId + "," + "publishTime:" + publishTime); + public static void syncCheckpoint(GlobalConfiguration configuration, FlinkJob flinkJob, TableMeta tableMeta, String messageId) { try { Failsafe.with(RetryPolicyProvider.HTTP_RETRY) .run(() -> HttpUtil.createGet( StrUtil.format( - "{}/api/sync_checkpoint_state?flink_job_id={}&alias={}&message_id={}&publish_time={}", + "{}/api/sync_checkpoint?flink_job_id={}&alias={}&message_id={}", LoadBalance.getCustomPublishUrl(configuration), flinkJob.getId(), tableMeta.getAlias(), - messageId, - publishTime + messageId ) ) .header(Constants.API_HEADER_NAME, Constants.API_VERSION) @@ -81,32 +75,58 @@ public class StatusUtils { } } - public static void syncOperation(GlobalConfiguration configuration, FlinkJob flinkJob, TableMeta tableMeta) { - syncOperation(configuration, flinkJob, tableMeta, null); - } - - public static void syncOperation(GlobalConfiguration configuration, FlinkJob flinkJob, TableMeta tableMeta, Long operationTime) { - // logger.info("Enter method: syncOperation[configuration, flinkJob, tableMeta, operationTime]. " + "configuration:" + configuration + "," + "flinkJob:" + flinkJob + "," + "tableMeta:" + tableMeta + "," + "operationTime:" + operationTime); + public static void syncLatestPublishTime(GlobalConfiguration configuration, FlinkJob flinkJob, TableMeta tableMeta, Long publishTime) { try { Failsafe.with(RetryPolicyProvider.HTTP_RETRY) - .run(() -> { - if (ObjectUtil.isNull(operationTime)) { - HttpUtil.createGet( + .run(() -> HttpUtil.createGet( StrUtil.format( - "{}/api/sync_operation_state?flink_job_id={}&alias={}", + "{}/api/sync_latest_publish_time?flink_job_id={}&alias={}&publish_time={}", LoadBalance.getCustomPublishUrl(configuration), flinkJob.getId(), - tableMeta.getAlias() + tableMeta.getAlias(), + publishTime ) ) .header(Constants.API_HEADER_NAME, Constants.API_VERSION) .basicAuth(Constants.SPRING_SECURITY_USERNAME, Constants.SPRING_SECURITY_PASSWORD_PLAIN) .timeout(HTTP_TIMEOUT) - .execute(); - } else { + .execute() + ); + } catch (Exception e) { + logger.warn("sync latest publish time submit failure"); + } + } + + public static void syncLatestReceiveTime(GlobalConfiguration configuration, FlinkJob flinkJob, TableMeta tableMeta, Long receiveTime) { + try { + Failsafe.with(RetryPolicyProvider.HTTP_RETRY) + .run(() -> HttpUtil.createGet( StrUtil.format( - "{}/api/sync_operation_state?flink_job_id={}&alias={}&operation_time={}", + "{}/api/sync_latest_receive_time?flink_job_id={}&alias={}&receive_time={}", + LoadBalance.getCustomPublishUrl(configuration), + flinkJob.getId(), + tableMeta.getAlias(), + receiveTime + ) + ) + .header(Constants.API_HEADER_NAME, Constants.API_VERSION) + .basicAuth(Constants.SPRING_SECURITY_USERNAME, Constants.SPRING_SECURITY_PASSWORD_PLAIN) + .timeout(HTTP_TIMEOUT) + .execute() + ); + } catch (Exception e) { + logger.warn("sync latest receive time submit failure"); + } + } + + public static void syncLatestOperationTime(GlobalConfiguration configuration, FlinkJob flinkJob, TableMeta tableMeta, Long operationTime) { + try { + Failsafe.with(RetryPolicyProvider.HTTP_RETRY) + .run(() -> + HttpUtil.createGet( + StrUtil.format( + "{}/api/sync_latest_operation_time?flink_job_id={}&alias={}&operation_time={}", LoadBalance.getCustomPublishUrl(configuration), flinkJob.getId(), tableMeta.getAlias(), @@ -116,26 +136,23 @@ public class StatusUtils { .header(Constants.API_HEADER_NAME, Constants.API_VERSION) .basicAuth(Constants.SPRING_SECURITY_USERNAME, Constants.SPRING_SECURITY_PASSWORD_PLAIN) .timeout(HTTP_TIMEOUT) - .execute(); - } - }); + .execute() + ); } catch (Exception e) { - logger.warn("sync operation metrics submit failure"); + logger.warn("sync latest operation metrics submit failure"); } } public static void compactionStart(GlobalConfiguration configuration, FlinkJob flinkJob, TableMeta tableMeta) { - // logger.info("Enter method: compactionStart[configuration, flinkJob, tableMeta]. " + "configuration:" + configuration + "," + "flinkJob:" + flinkJob + "," + "tableMeta:" + tableMeta); try { Failsafe.with(RetryPolicyProvider.HTTP_RETRY) .run(() -> HttpUtil.createGet( StrUtil.format( - "{}/api/compaction_start?flink_job_id={}&alias={}&type={}&cluster={}&application_id={}", + "{}/api/compaction_start?flink_job_id={}&alias={}&cluster={}&application_id={}", LoadBalance.getCustomPublishUrl(configuration), flinkJob.getId(), tableMeta.getAlias(), - EnumUtil.toString(tableMeta.getSourceType()), configuration.getCluster(), configuration.getApplicationId() ) @@ -151,7 +168,6 @@ public class StatusUtils { } public static void compactionPreCommit(GlobalConfiguration configuration, FlinkJob flinkJob, TableMeta tableMeta, String instant, Map metadata) { - // logger.info("Enter method: compactionPreCommit[configuration, flinkJob, tableMeta, instant, metadata]. " + "configuration:" + configuration + "," + "flinkJob:" + flinkJob + "," + "tableMeta:" + tableMeta + "," + "instant:" + instant + "," + "metadata:" + metadata); try { Failsafe.with(RetryPolicyProvider.HTTP_RETRY) .run(() -> HttpUtil.createPost( @@ -177,7 +193,6 @@ public class StatusUtils { } public static void compactionCommit(GlobalConfiguration configuration, FlinkJob flinkJob, TableMeta tableMeta, String instant, HoodieCommitMetadata metadata) { - // logger.info("Enter method: compactionCommit[configuration, flinkJob, tableMeta, instant, metadata]. " + "configuration:" + configuration + "," + "flinkJob:" + flinkJob + "," + "tableMeta:" + tableMeta + "," + "instant:" + instant + "," + "metadata:" + metadata); try { Failsafe.with(RetryPolicyProvider.HTTP_RETRY) .run(() -> HttpUtil.createPost( @@ -203,22 +218,20 @@ public class StatusUtils { } public static void compactionFinish(GlobalConfiguration configuration, FlinkJob flinkJob, TableMeta tableMeta, String message, Exception exception) { - // logger.info("Enter method: compactionFinish[configuration, flinkJob, tableMeta, message, exception]. " + "configuration:" + configuration + "," + "flinkJob:" + flinkJob + "," + "tableMeta:" + tableMeta + "," + "message:" + message + "," + "exception:" + exception); try { Failsafe.with(RetryPolicyProvider.HTTP_RETRY) .run(() -> { boolean success = (exception == null); HttpUtil.createPost(StrUtil.format( - "{}/api/compaction_finish?flink_job_id={}&alias={}&time={}&state={}", + "{}/api/compaction_finish?flink_job_id={}&alias={}&state={}", LoadBalance.getCustomPublishUrl(configuration), flinkJob.getId(), tableMeta.getAlias(), - Instant.now().toEpochMilli(), success )) .header(Constants.API_HEADER_NAME, Constants.API_VERSION) .basicAuth(Constants.SPRING_SECURITY_USERNAME, Constants.SPRING_SECURITY_PASSWORD_PLAIN) - .body(success ? message == null ? "" : message : exception.toString(), "text/plain") + .body(success ? message == null ? "success" : message : exception.toString(), "text/plain") .timeout(HTTP_TIMEOUT) .execute() .close(); @@ -228,18 +241,18 @@ public class StatusUtils { } } - public static void versionUpdate(GlobalConfiguration configuration, FlinkJob flinkJob, TableMeta tableMeta, String version, String opts) { - logger.info("Enter method: versionUpdate[configuration, flinkJob, tableMeta, version, opts]. " + "configuration:" + configuration + "," + "flinkJob:" + flinkJob.getId() + "," + "tableMeta:" + tableMeta.getAlias() + "," + "version:" + version + "," + "opts:" + opts); + public static void versionUpdate(GlobalConfiguration configuration, FlinkJob flinkJob, TableMeta tableMeta, String version, String operationTime) { + logger.info("Enter method: versionUpdate[configuration, flinkJob, tableMeta, version, opts]. " + "configuration:" + configuration + "," + "flinkJob:" + flinkJob.getId() + "," + "tableMeta:" + tableMeta.getAlias() + "," + "version:" + version + "," + "opts:" + operationTime); Failsafe.with(RetryPolicyProvider.HTTP_RETRY) .run(() -> HttpUtil.createGet( StrUtil.format( - "{}/api/version_update?flink_job_id={}&alias={}&version={}&opts={}", + "{}/api/version_update?flink_job_id={}&alias={}&version={}&operation_time={}", LoadBalance.getCustomPublishUrl(configuration), flinkJob.getId(), tableMeta.getAlias(), version, - opts + operationTime ) ) .header(Constants.API_HEADER_NAME, Constants.API_VERSION) @@ -249,37 +262,16 @@ public class StatusUtils { ); } - public static void compactionLatestOpTs(GlobalConfiguration configuration, FlinkJob flinkJob, TableMeta tableMeta, Long latestOpTs) { - logger.info("Enter method: compactionLatestOpTs[configuration, flinkJob, tableMeta, latestOpTs]. " + "configuration:" + configuration + "," + "flinkJob:" + flinkJob.getId() + "," + "tableMeta:" + tableMeta.getAlias() + "," + "latestOpTs:" + latestOpTs); + public static void compactionLatestOperationTime(GlobalConfiguration configuration, FlinkJob flinkJob, TableMeta tableMeta, Long operationTime) { Failsafe.with(RetryPolicyProvider.HTTP_RETRY) .run(() -> HttpUtil.createGet( StrUtil.format( - "{}/api/compaction_latest_operation_time?flink_job_id={}&alias={}&latest_op_ts={}", + "{}/api/compaction_latest_operation_time?flink_job_id={}&alias={}&operation_time={}", LoadBalance.getCustomPublishUrl(configuration), flinkJob.getId(), tableMeta.getAlias(), - latestOpTs - ) - ) - .header(Constants.API_HEADER_NAME, Constants.API_VERSION) - .basicAuth(Constants.SPRING_SECURITY_USERNAME, Constants.SPRING_SECURITY_PASSWORD_PLAIN) - .timeout(HTTP_TIMEOUT) - .execute() - ); - } - - public static void syncReceive(GlobalConfiguration configuration, FlinkJob flinkJob, TableMeta tableMeta, Long receiveTime) { - // logger.info("Enter method: syncReceive[configuration, flinkJob, tableMeta, receiveTime]. " + "configuration:" + configuration + "," + "flinkJob:" + flinkJob + "," + "tableMeta:" + tableMeta + "," + "receiveTime:" + receiveTime); - Failsafe.with(RetryPolicyProvider.HTTP_RETRY) - .run(() -> - HttpUtil.createGet( - StrUtil.format( - "{}/api/sync_receive_time?flink_job_id={}&alias={}&receive_time={}", - LoadBalance.getCustomPublishUrl(configuration), - flinkJob.getId(), - tableMeta.getAlias(), - receiveTime + operationTime ) ) .header(Constants.API_HEADER_NAME, Constants.API_VERSION)