perf(api): 优化api指标存储和传输

统一命名格式,减少不必要的时间字段传输,统一字段命名,减少默认值的传输
This commit is contained in:
v-zhangjc9
2024-05-09 18:12:25 +08:00
parent 7fb5710cb3
commit def035fa90
8 changed files with 238 additions and 204 deletions

View File

@@ -1,5 +1,6 @@
package com.lanyuanxiaoyao.service.sync.functions;
import cn.hutool.core.util.ObjectUtil;
import com.lanyuanxiaoyao.service.common.entity.FlinkJob;
import com.lanyuanxiaoyao.service.common.entity.TableMeta;
import com.lanyuanxiaoyao.service.sync.configuration.GlobalConfiguration;
@@ -41,6 +42,7 @@ public class CompactionEventHandler implements CompactEventHandler, Serializable
StatusUtils.compactionCommit(configuration, flinkJob, tableMeta, instant, metadata);
// Trace latest_op_ts
statuses.stream()
.filter(ObjectUtil::isNotNull)
.map(status -> {
if (status instanceof TraceWriteStatus) {
TraceWriteStatus s = (TraceWriteStatus) status;
@@ -48,10 +50,11 @@ public class CompactionEventHandler implements CompactEventHandler, Serializable
}
return 0L;
})
.filter(opts -> opts > 0)
.max(Long::compare)
.ifPresent(max -> {
logger.info("Latest op ts: {}", max);
StatusUtils.compactionLatestOpTs(configuration, flinkJob, tableMeta, max);
.ifPresent(time -> {
logger.info("Latest op ts: {}", time);
StatusUtils.compactionLatestOperationTime(configuration, flinkJob, tableMeta, time);
});
}

View File

@@ -1,5 +1,6 @@
package com.lanyuanxiaoyao.service.sync.functions;
import cn.hutool.core.util.ObjectUtil;
import cn.hutool.core.util.StrUtil;
import com.lanyuanxiaoyao.service.common.entity.FlinkJob;
import com.lanyuanxiaoyao.service.common.entity.Record;
@@ -64,17 +65,19 @@ public class PulsarMessage2RecordFunction extends RichMapFunction<String, Record
@Override
public void snapshotState(FunctionSnapshotContext context) {
String opTs = latestOperationTime.get();
Long timestamp = null;
try {
if (StrUtil.isNotBlank(opTs) && OPTS_PATTERN.matcher(opTs).matches()) {
timestamp = LocalDateTime.parse(opTs, FORMATTER).toInstant(ZoneOffset.ofHours(8)).toEpochMilli();
} else {
throw new Exception(StrUtil.format("Opts is not match regex {} value: {}", OPTS_PATTERN.pattern(), opTs));
if (StrUtil.isNotBlank(opTs)) {
Long timestamp = null;
try {
if (OPTS_PATTERN.matcher(opTs).matches()) {
timestamp = LocalDateTime.parse(opTs, FORMATTER).toInstant(ZoneOffset.ofHours(8)).toEpochMilli();
}
} catch (Exception e) {
logger.error("Parse operation time error", e);
}
if (ObjectUtil.isNotNull(timestamp)) {
StatusUtils.syncLatestOperationTime(globalConfiguration, flinkJob, tableMeta, timestamp);
}
} catch (Exception e) {
logger.error("Parse operation time error", e);
}
StatusUtils.syncOperation(globalConfiguration, flinkJob, tableMeta, timestamp);
}
@Override

View File

@@ -234,8 +234,19 @@ public class PulsarMessageSourceReader extends RichParallelSourceFunction<String
public void notifyCheckpointComplete(long checkpointId) {
MessageId messageId = messageIdMap.getOrDefault(checkpointId, MessageId.earliest);
LogHelper.info(logger, CHECKPOINT_COMPLETE, "Checkpoint complete message id: {}, checkpoint id: {}", messageId, checkpointId);
StatusUtils.syncCheckpoint(globalConfiguration, flinkJob, tableMeta, messageId.toString(), latestPublishTime.get());
StatusUtils.syncReceive(globalConfiguration, flinkJob, tableMeta, latestReceiveTime.get());
StatusUtils.syncCheckpoint(globalConfiguration, flinkJob, tableMeta, messageId.toString());
long publishTime = latestPublishTime.get();
if (publishTime > 0) {
StatusUtils.syncLatestPublishTime(globalConfiguration, flinkJob, tableMeta, publishTime);
}
long receiveTime = latestReceiveTime.get();
if (receiveTime > 0) {
StatusUtils.syncLatestReceiveTime(globalConfiguration, flinkJob, tableMeta, receiveTime);
}
messageIdMap.remove(checkpointId);
}

View File

@@ -1,7 +1,5 @@
package com.lanyuanxiaoyao.service.sync.utils;
import cn.hutool.core.util.EnumUtil;
import cn.hutool.core.util.ObjectUtil;
import cn.hutool.core.util.StrUtil;
import cn.hutool.http.HttpUtil;
import com.lanyuanxiaoyao.service.common.Constants;
@@ -10,7 +8,6 @@ import com.lanyuanxiaoyao.service.common.entity.TableMeta;
import com.lanyuanxiaoyao.service.sync.configuration.GlobalConfiguration;
import com.lanyuanxiaoyao.service.sync.configuration.RetryPolicyProvider;
import dev.failsafe.Failsafe;
import java.time.Instant;
import java.util.Map;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.hudi.common.model.HoodieCommitMetadata;
@@ -30,7 +27,6 @@ public class StatusUtils {
private static final int HTTP_TIMEOUT = (int) Constants.MINUTE;
public static void syncStart(GlobalConfiguration configuration, FlinkJob flinkJob, TableMeta tableMeta) {
// logger.info("Enter method: syncStart[configuration, flinkJob, tableMeta]. " + "configuration:" + configuration + "," + "flinkJob:" + flinkJob + "," + "tableMeta:" + tableMeta);
try {
Failsafe.with(RetryPolicyProvider.HTTP_RETRY)
.run(() ->
@@ -57,18 +53,16 @@ public class StatusUtils {
}
}
public static void syncCheckpoint(GlobalConfiguration configuration, FlinkJob flinkJob, TableMeta tableMeta, String messageId, Long publishTime) {
logger.info("Enter method: syncCheckpoint[configuration, flinkJob, tableMeta, messageId, publishTime]. " + "configuration:" + configuration + "," + "flinkJob:" + flinkJob.getId() + "," + "tableMeta:" + tableMeta.getAlias() + "," + "messageId:" + messageId + "," + "publishTime:" + publishTime);
public static void syncCheckpoint(GlobalConfiguration configuration, FlinkJob flinkJob, TableMeta tableMeta, String messageId) {
try {
Failsafe.with(RetryPolicyProvider.HTTP_RETRY)
.run(() -> HttpUtil.createGet(
StrUtil.format(
"{}/api/sync_checkpoint_state?flink_job_id={}&alias={}&message_id={}&publish_time={}",
"{}/api/sync_checkpoint?flink_job_id={}&alias={}&message_id={}",
LoadBalance.getCustomPublishUrl(configuration),
flinkJob.getId(),
tableMeta.getAlias(),
messageId,
publishTime
messageId
)
)
.header(Constants.API_HEADER_NAME, Constants.API_VERSION)
@@ -81,32 +75,58 @@ public class StatusUtils {
}
}
public static void syncOperation(GlobalConfiguration configuration, FlinkJob flinkJob, TableMeta tableMeta) {
syncOperation(configuration, flinkJob, tableMeta, null);
}
public static void syncOperation(GlobalConfiguration configuration, FlinkJob flinkJob, TableMeta tableMeta, Long operationTime) {
// logger.info("Enter method: syncOperation[configuration, flinkJob, tableMeta, operationTime]. " + "configuration:" + configuration + "," + "flinkJob:" + flinkJob + "," + "tableMeta:" + tableMeta + "," + "operationTime:" + operationTime);
public static void syncLatestPublishTime(GlobalConfiguration configuration, FlinkJob flinkJob, TableMeta tableMeta, Long publishTime) {
try {
Failsafe.with(RetryPolicyProvider.HTTP_RETRY)
.run(() -> {
if (ObjectUtil.isNull(operationTime)) {
HttpUtil.createGet(
.run(() -> HttpUtil.createGet(
StrUtil.format(
"{}/api/sync_operation_state?flink_job_id={}&alias={}",
"{}/api/sync_latest_publish_time?flink_job_id={}&alias={}&publish_time={}",
LoadBalance.getCustomPublishUrl(configuration),
flinkJob.getId(),
tableMeta.getAlias()
tableMeta.getAlias(),
publishTime
)
)
.header(Constants.API_HEADER_NAME, Constants.API_VERSION)
.basicAuth(Constants.SPRING_SECURITY_USERNAME, Constants.SPRING_SECURITY_PASSWORD_PLAIN)
.timeout(HTTP_TIMEOUT)
.execute();
} else {
.execute()
);
} catch (Exception e) {
logger.warn("sync latest publish time submit failure");
}
}
public static void syncLatestReceiveTime(GlobalConfiguration configuration, FlinkJob flinkJob, TableMeta tableMeta, Long receiveTime) {
try {
Failsafe.with(RetryPolicyProvider.HTTP_RETRY)
.run(() ->
HttpUtil.createGet(
StrUtil.format(
"{}/api/sync_operation_state?flink_job_id={}&alias={}&operation_time={}",
"{}/api/sync_latest_receive_time?flink_job_id={}&alias={}&receive_time={}",
LoadBalance.getCustomPublishUrl(configuration),
flinkJob.getId(),
tableMeta.getAlias(),
receiveTime
)
)
.header(Constants.API_HEADER_NAME, Constants.API_VERSION)
.basicAuth(Constants.SPRING_SECURITY_USERNAME, Constants.SPRING_SECURITY_PASSWORD_PLAIN)
.timeout(HTTP_TIMEOUT)
.execute()
);
} catch (Exception e) {
logger.warn("sync latest receive time submit failure");
}
}
public static void syncLatestOperationTime(GlobalConfiguration configuration, FlinkJob flinkJob, TableMeta tableMeta, Long operationTime) {
try {
Failsafe.with(RetryPolicyProvider.HTTP_RETRY)
.run(() ->
HttpUtil.createGet(
StrUtil.format(
"{}/api/sync_latest_operation_time?flink_job_id={}&alias={}&operation_time={}",
LoadBalance.getCustomPublishUrl(configuration),
flinkJob.getId(),
tableMeta.getAlias(),
@@ -116,26 +136,23 @@ public class StatusUtils {
.header(Constants.API_HEADER_NAME, Constants.API_VERSION)
.basicAuth(Constants.SPRING_SECURITY_USERNAME, Constants.SPRING_SECURITY_PASSWORD_PLAIN)
.timeout(HTTP_TIMEOUT)
.execute();
}
});
.execute()
);
} catch (Exception e) {
logger.warn("sync operation metrics submit failure");
logger.warn("sync latest operation metrics submit failure");
}
}
public static void compactionStart(GlobalConfiguration configuration, FlinkJob flinkJob, TableMeta tableMeta) {
// logger.info("Enter method: compactionStart[configuration, flinkJob, tableMeta]. " + "configuration:" + configuration + "," + "flinkJob:" + flinkJob + "," + "tableMeta:" + tableMeta);
try {
Failsafe.with(RetryPolicyProvider.HTTP_RETRY)
.run(() ->
HttpUtil.createGet(
StrUtil.format(
"{}/api/compaction_start?flink_job_id={}&alias={}&type={}&cluster={}&application_id={}",
"{}/api/compaction_start?flink_job_id={}&alias={}&cluster={}&application_id={}",
LoadBalance.getCustomPublishUrl(configuration),
flinkJob.getId(),
tableMeta.getAlias(),
EnumUtil.toString(tableMeta.getSourceType()),
configuration.getCluster(),
configuration.getApplicationId()
)
@@ -151,7 +168,6 @@ public class StatusUtils {
}
public static void compactionPreCommit(GlobalConfiguration configuration, FlinkJob flinkJob, TableMeta tableMeta, String instant, Map<String, Long> metadata) {
// logger.info("Enter method: compactionPreCommit[configuration, flinkJob, tableMeta, instant, metadata]. " + "configuration:" + configuration + "," + "flinkJob:" + flinkJob + "," + "tableMeta:" + tableMeta + "," + "instant:" + instant + "," + "metadata:" + metadata);
try {
Failsafe.with(RetryPolicyProvider.HTTP_RETRY)
.run(() -> HttpUtil.createPost(
@@ -177,7 +193,6 @@ public class StatusUtils {
}
public static void compactionCommit(GlobalConfiguration configuration, FlinkJob flinkJob, TableMeta tableMeta, String instant, HoodieCommitMetadata metadata) {
// logger.info("Enter method: compactionCommit[configuration, flinkJob, tableMeta, instant, metadata]. " + "configuration:" + configuration + "," + "flinkJob:" + flinkJob + "," + "tableMeta:" + tableMeta + "," + "instant:" + instant + "," + "metadata:" + metadata);
try {
Failsafe.with(RetryPolicyProvider.HTTP_RETRY)
.run(() -> HttpUtil.createPost(
@@ -203,22 +218,20 @@ public class StatusUtils {
}
public static void compactionFinish(GlobalConfiguration configuration, FlinkJob flinkJob, TableMeta tableMeta, String message, Exception exception) {
// logger.info("Enter method: compactionFinish[configuration, flinkJob, tableMeta, message, exception]. " + "configuration:" + configuration + "," + "flinkJob:" + flinkJob + "," + "tableMeta:" + tableMeta + "," + "message:" + message + "," + "exception:" + exception);
try {
Failsafe.with(RetryPolicyProvider.HTTP_RETRY)
.run(() -> {
boolean success = (exception == null);
HttpUtil.createPost(StrUtil.format(
"{}/api/compaction_finish?flink_job_id={}&alias={}&time={}&state={}",
"{}/api/compaction_finish?flink_job_id={}&alias={}&state={}",
LoadBalance.getCustomPublishUrl(configuration),
flinkJob.getId(),
tableMeta.getAlias(),
Instant.now().toEpochMilli(),
success
))
.header(Constants.API_HEADER_NAME, Constants.API_VERSION)
.basicAuth(Constants.SPRING_SECURITY_USERNAME, Constants.SPRING_SECURITY_PASSWORD_PLAIN)
.body(success ? message == null ? "" : message : exception.toString(), "text/plain")
.body(success ? message == null ? "success" : message : exception.toString(), "text/plain")
.timeout(HTTP_TIMEOUT)
.execute()
.close();
@@ -228,18 +241,18 @@ public class StatusUtils {
}
}
public static void versionUpdate(GlobalConfiguration configuration, FlinkJob flinkJob, TableMeta tableMeta, String version, String opts) {
logger.info("Enter method: versionUpdate[configuration, flinkJob, tableMeta, version, opts]. " + "configuration:" + configuration + "," + "flinkJob:" + flinkJob.getId() + "," + "tableMeta:" + tableMeta.getAlias() + "," + "version:" + version + "," + "opts:" + opts);
public static void versionUpdate(GlobalConfiguration configuration, FlinkJob flinkJob, TableMeta tableMeta, String version, String operationTime) {
logger.info("Enter method: versionUpdate[configuration, flinkJob, tableMeta, version, opts]. " + "configuration:" + configuration + "," + "flinkJob:" + flinkJob.getId() + "," + "tableMeta:" + tableMeta.getAlias() + "," + "version:" + version + "," + "opts:" + operationTime);
Failsafe.with(RetryPolicyProvider.HTTP_RETRY)
.run(() ->
HttpUtil.createGet(
StrUtil.format(
"{}/api/version_update?flink_job_id={}&alias={}&version={}&opts={}",
"{}/api/version_update?flink_job_id={}&alias={}&version={}&operation_time={}",
LoadBalance.getCustomPublishUrl(configuration),
flinkJob.getId(),
tableMeta.getAlias(),
version,
opts
operationTime
)
)
.header(Constants.API_HEADER_NAME, Constants.API_VERSION)
@@ -249,37 +262,16 @@ public class StatusUtils {
);
}
public static void compactionLatestOpTs(GlobalConfiguration configuration, FlinkJob flinkJob, TableMeta tableMeta, Long latestOpTs) {
logger.info("Enter method: compactionLatestOpTs[configuration, flinkJob, tableMeta, latestOpTs]. " + "configuration:" + configuration + "," + "flinkJob:" + flinkJob.getId() + "," + "tableMeta:" + tableMeta.getAlias() + "," + "latestOpTs:" + latestOpTs);
public static void compactionLatestOperationTime(GlobalConfiguration configuration, FlinkJob flinkJob, TableMeta tableMeta, Long operationTime) {
Failsafe.with(RetryPolicyProvider.HTTP_RETRY)
.run(() ->
HttpUtil.createGet(
StrUtil.format(
"{}/api/compaction_latest_operation_time?flink_job_id={}&alias={}&latest_op_ts={}",
"{}/api/compaction_latest_operation_time?flink_job_id={}&alias={}&operation_time={}",
LoadBalance.getCustomPublishUrl(configuration),
flinkJob.getId(),
tableMeta.getAlias(),
latestOpTs
)
)
.header(Constants.API_HEADER_NAME, Constants.API_VERSION)
.basicAuth(Constants.SPRING_SECURITY_USERNAME, Constants.SPRING_SECURITY_PASSWORD_PLAIN)
.timeout(HTTP_TIMEOUT)
.execute()
);
}
public static void syncReceive(GlobalConfiguration configuration, FlinkJob flinkJob, TableMeta tableMeta, Long receiveTime) {
// logger.info("Enter method: syncReceive[configuration, flinkJob, tableMeta, receiveTime]. " + "configuration:" + configuration + "," + "flinkJob:" + flinkJob + "," + "tableMeta:" + tableMeta + "," + "receiveTime:" + receiveTime);
Failsafe.with(RetryPolicyProvider.HTTP_RETRY)
.run(() ->
HttpUtil.createGet(
StrUtil.format(
"{}/api/sync_receive_time?flink_job_id={}&alias={}&receive_time={}",
LoadBalance.getCustomPublishUrl(configuration),
flinkJob.getId(),
tableMeta.getAlias(),
receiveTime
operationTime
)
)
.header(Constants.API_HEADER_NAME, Constants.API_VERSION)