diff --git a/utils/sync/src/main/java/com/lanyuanxiaoyao/service/sync/functions/PulsarMessageSourceReader.java b/utils/sync/src/main/java/com/lanyuanxiaoyao/service/sync/functions/PulsarMessageSourceReader.java index d8b7cc6..595015d 100644 --- a/utils/sync/src/main/java/com/lanyuanxiaoyao/service/sync/functions/PulsarMessageSourceReader.java +++ b/utils/sync/src/main/java/com/lanyuanxiaoyao/service/sync/functions/PulsarMessageSourceReader.java @@ -138,34 +138,36 @@ public class PulsarMessageSourceReader extends RichParallelSourceFunction context) throws Exception { String currentValue = null; while (running) { - Message message; - try { - message = reader.readNext(); - if (ObjectUtil.isNotNull(message)) { - String value = message.getValue(); - currentValue = value; - if (ObjectUtil.isEmpty(value)) { - logger.warn("{} {}", message.getValue(), message.getMessageId()); - } - synchronized (context.getCheckpointLock()) { - context.collect(value); - } + synchronized (context.getCheckpointLock()) { + Message message; + try { + message = reader.readNext(); + if (ObjectUtil.isNotNull(message)) { + String value = message.getValue(); + currentValue = value; + if (ObjectUtil.isEmpty(value)) { + logger.warn("{} {}", message.getValue(), message.getMessageId()); + } + synchronized (context.getCheckpointLock()) { + context.collect(value); + } - if (RecordHelper.isNotVersionUpdateRecord(value)) { - latestPublishTime.set(message.getPublishTime()); - latestReceiveTime.set(Instant.now().toEpochMilli()); - } - lastMessageId.set(message.getMessageId()); + if (RecordHelper.isNotVersionUpdateRecord(value)) { + latestPublishTime.set(message.getPublishTime()); + latestReceiveTime.set(Instant.now().toEpochMilli()); + } + lastMessageId.set(message.getMessageId()); - messageReceiveMetric.increment(); - try { - messageSizeReceiveMetric.increment(message.getValue().getBytes().length); - } catch (Throwable t) { - logger.warn("Parse message size failure", t); + messageReceiveMetric.increment(); + try { + messageSizeReceiveMetric.increment(message.getValue().getBytes().length); + } catch (Throwable t) { + logger.warn("Parse message size failure", t); + } } + } catch (Throwable t) { + throw new Exception("Read message failure, current value: " + currentValue, t); } - } catch (Throwable t) { - throw new Exception("Read message failure, current value: " + currentValue, t); } } }