feat(sync): 使用checkpoint lock报障message id上报的时候状态稳定

This commit is contained in:
v-zhangjc9
2024-06-05 17:37:32 +08:00
parent 0b4cec3955
commit f0a6dbbbb5

View File

@@ -138,34 +138,36 @@ public class PulsarMessageSourceReader extends RichParallelSourceFunction<String
public void run(SourceContext<String> context) throws Exception { public void run(SourceContext<String> context) throws Exception {
String currentValue = null; String currentValue = null;
while (running) { while (running) {
Message<String> message; synchronized (context.getCheckpointLock()) {
try { Message<String> message;
message = reader.readNext(); try {
if (ObjectUtil.isNotNull(message)) { message = reader.readNext();
String value = message.getValue(); if (ObjectUtil.isNotNull(message)) {
currentValue = value; String value = message.getValue();
if (ObjectUtil.isEmpty(value)) { currentValue = value;
logger.warn("{} {}", message.getValue(), message.getMessageId()); if (ObjectUtil.isEmpty(value)) {
} logger.warn("{} {}", message.getValue(), message.getMessageId());
synchronized (context.getCheckpointLock()) { }
context.collect(value); synchronized (context.getCheckpointLock()) {
} context.collect(value);
}
if (RecordHelper.isNotVersionUpdateRecord(value)) { if (RecordHelper.isNotVersionUpdateRecord(value)) {
latestPublishTime.set(message.getPublishTime()); latestPublishTime.set(message.getPublishTime());
latestReceiveTime.set(Instant.now().toEpochMilli()); latestReceiveTime.set(Instant.now().toEpochMilli());
} }
lastMessageId.set(message.getMessageId()); lastMessageId.set(message.getMessageId());
messageReceiveMetric.increment(); messageReceiveMetric.increment();
try { try {
messageSizeReceiveMetric.increment(message.getValue().getBytes().length); messageSizeReceiveMetric.increment(message.getValue().getBytes().length);
} catch (Throwable t) { } catch (Throwable t) {
logger.warn("Parse message size failure", t); logger.warn("Parse message size failure", t);
}
} }
} catch (Throwable t) {
throw new Exception("Read message failure, current value: " + currentValue, t);
} }
} catch (Throwable t) {
throw new Exception("Read message failure, current value: " + currentValue, t);
} }
} }
} }