feat(sync): 增加pulsar消息最后接收时间

用于判断pulsar有多久没有传入消息了
This commit is contained in:
v-zhangjc9
2024-05-09 09:03:00 +08:00
parent 103bde5cdc
commit 0bf3d17009
6 changed files with 101 additions and 11 deletions

View File

@@ -30,7 +30,10 @@ import org.slf4j.LoggerFactory;
*/
public class PulsarMessage2RecordFunction extends RichMapFunction<String, Record> implements CheckpointedFunction {
private static final Logger logger = LoggerFactory.getLogger(PulsarMessage2RecordFunction.class);
private static final AtomicReference<String> lastOperationTime = new AtomicReference<>("");
/**
* 最后操作时间
*/
private static final AtomicReference<String> latestOperationTime = new AtomicReference<>("");
private final static DateTimeFormatter FORMATTER = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
private static final Pattern OPTS_PATTERN = Pattern.compile("^\\d{4}-\\d{2}-\\d{2} \\d{2}:\\d{2}:\\d{2}$");
private final GlobalConfiguration globalConfiguration;
@@ -50,7 +53,7 @@ public class PulsarMessage2RecordFunction extends RichMapFunction<String, Record
try {
record = mapper.readValue(message, Record.class);
if (RecordHelper.isNotVersionUpdateRecord(record)) {
lastOperationTime.set(record.getStatement().getOpTs());
latestOperationTime.set(record.getStatement().getOpTs());
}
} catch (Exception exception) {
logger.error("Message json parse failure", exception);
@@ -60,7 +63,7 @@ public class PulsarMessage2RecordFunction extends RichMapFunction<String, Record
@Override
public void snapshotState(FunctionSnapshotContext context) {
String opTs = lastOperationTime.get();
String opTs = latestOperationTime.get();
Long timestamp = null;
try {
if (StrUtil.isNotBlank(opTs) && OPTS_PATTERN.matcher(opTs).matches()) {

View File

@@ -18,6 +18,7 @@ import com.lanyuanxiaoyao.service.sync.utils.StatusUtils;
import dev.failsafe.Failsafe;
import dev.failsafe.RetryPolicy;
import java.time.Duration;
import java.time.Instant;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;
@@ -28,13 +29,20 @@ import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
import org.apache.pulsar.client.api.*;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Reader;
import org.apache.pulsar.client.impl.schema.StringSchema;
import org.apache.pulsar.client.internal.DefaultImplementation;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static com.lanyuanxiaoyao.service.common.utils.LogHelper.LogPoint.*;
import static com.lanyuanxiaoyao.service.common.utils.LogHelper.LogPoint.CHECKPOINT_COMPLETE;
import static com.lanyuanxiaoyao.service.common.utils.LogHelper.LogPoint.CHECKPOINT_INITIAL;
import static com.lanyuanxiaoyao.service.common.utils.LogHelper.LogPoint.CHECKPOINT_INITIAL_MESSAGE_ID;
import static com.lanyuanxiaoyao.service.common.utils.LogHelper.LogPoint.CHECKPOINT_START;
/**
* Pulsar Reader Source
@@ -54,7 +62,8 @@ public class PulsarMessageSourceReader extends RichParallelSourceFunction<String
private final FlinkJob flinkJob;
private final TableMeta tableMeta;
private final AtomicReference<MessageId> lastMessageId = new AtomicReference<>();
private final AtomicLong lastPublishTime = new AtomicLong(0);
private final AtomicLong latestPublishTime = new AtomicLong(0);
private final AtomicLong latestReceiveTime = new AtomicLong(0);
private final RateMetric messageReceiveMetric;
private final MessageSizeSizeMetric messageSizeReceiveMetric;
private final Map<Long, MessageId> messageIdMap = new ConcurrentHashMap<>();
@@ -143,7 +152,8 @@ public class PulsarMessageSourceReader extends RichParallelSourceFunction<String
}
if (RecordHelper.isNotVersionUpdateRecord(value)) {
lastPublishTime.set(message.getPublishTime());
latestPublishTime.set(message.getPublishTime());
latestReceiveTime.set(Instant.now().toEpochMilli());
}
lastMessageId.set(message.getMessageId());
@@ -224,7 +234,8 @@ public class PulsarMessageSourceReader extends RichParallelSourceFunction<String
public void notifyCheckpointComplete(long checkpointId) {
MessageId messageId = messageIdMap.getOrDefault(checkpointId, MessageId.earliest);
LogHelper.info(logger, CHECKPOINT_COMPLETE, "Checkpoint complete message id: {}, checkpoint id: {}", messageId, checkpointId);
StatusUtils.syncCheckpoint(globalConfiguration, flinkJob, tableMeta, messageId.toString(), lastPublishTime.get());
StatusUtils.syncCheckpoint(globalConfiguration, flinkJob, tableMeta, messageId.toString(), latestPublishTime.get());
StatusUtils.syncReceive(globalConfiguration, flinkJob, tableMeta, latestReceiveTime.get());
messageIdMap.remove(checkpointId);
}

View File

@@ -268,4 +268,24 @@ public class StatusUtils {
.execute()
);
}
public static void syncReceive(GlobalConfiguration configuration, FlinkJob flinkJob, TableMeta tableMeta, Long receiveTime) {
// logger.info("Enter method: syncReceive[configuration, flinkJob, tableMeta, receiveTime]. " + "configuration:" + configuration + "," + "flinkJob:" + flinkJob + "," + "tableMeta:" + tableMeta + "," + "receiveTime:" + receiveTime);
Failsafe.with(RetryPolicyProvider.HTTP_RETRY)
.run(() ->
HttpUtil.createGet(
StrUtil.format(
"{}/api/sync_receive_time?flink_job_id={}&alias={}&receive_time={}",
LoadBalance.getCustomPublishUrl(configuration),
flinkJob.getId(),
tableMeta.getAlias(),
receiveTime
)
)
.header(Constants.API_HEADER_NAME, Constants.API_VERSION)
.basicAuth(Constants.SPRING_SECURITY_USERNAME, Constants.SPRING_SECURITY_PASSWORD_PLAIN)
.timeout(HTTP_TIMEOUT)
.execute()
);
}
}