feat(sync): 使用checkpoint lock报障message id上报的时候状态稳定

This commit is contained in:
v-zhangjc9
2024-06-05 17:37:32 +08:00
parent 0b4cec3955
commit f0a6dbbbb5

View File

@@ -138,34 +138,36 @@ public class PulsarMessageSourceReader extends RichParallelSourceFunction<String
public void run(SourceContext<String> context) throws Exception {
String currentValue = null;
while (running) {
Message<String> message;
try {
message = reader.readNext();
if (ObjectUtil.isNotNull(message)) {
String value = message.getValue();
currentValue = value;
if (ObjectUtil.isEmpty(value)) {
logger.warn("{} {}", message.getValue(), message.getMessageId());
}
synchronized (context.getCheckpointLock()) {
context.collect(value);
}
synchronized (context.getCheckpointLock()) {
Message<String> message;
try {
message = reader.readNext();
if (ObjectUtil.isNotNull(message)) {
String value = message.getValue();
currentValue = value;
if (ObjectUtil.isEmpty(value)) {
logger.warn("{} {}", message.getValue(), message.getMessageId());
}
synchronized (context.getCheckpointLock()) {
context.collect(value);
}
if (RecordHelper.isNotVersionUpdateRecord(value)) {
latestPublishTime.set(message.getPublishTime());
latestReceiveTime.set(Instant.now().toEpochMilli());
}
lastMessageId.set(message.getMessageId());
if (RecordHelper.isNotVersionUpdateRecord(value)) {
latestPublishTime.set(message.getPublishTime());
latestReceiveTime.set(Instant.now().toEpochMilli());
}
lastMessageId.set(message.getMessageId());
messageReceiveMetric.increment();
try {
messageSizeReceiveMetric.increment(message.getValue().getBytes().length);
} catch (Throwable t) {
logger.warn("Parse message size failure", t);
messageReceiveMetric.increment();
try {
messageSizeReceiveMetric.increment(message.getValue().getBytes().length);
} catch (Throwable t) {
logger.warn("Parse message size failure", t);
}
}
} catch (Throwable t) {
throw new Exception("Read message failure, current value: " + currentValue, t);
}
} catch (Throwable t) {
throw new Exception("Read message failure, current value: " + currentValue, t);
}
}
}