From f0a6dbbbb5ad1d05b0ea0c158921a2dcdd8d2b81 Mon Sep 17 00:00:00 2001 From: v-zhangjc9 Date: Wed, 5 Jun 2024 17:37:32 +0800 Subject: [PATCH] =?UTF-8?q?feat(sync):=20=E4=BD=BF=E7=94=A8checkpoint=20lo?= =?UTF-8?q?ck=E6=8A=A5=E9=9A=9Cmessage=20id=E4=B8=8A=E6=8A=A5=E7=9A=84?= =?UTF-8?q?=E6=97=B6=E5=80=99=E7=8A=B6=E6=80=81=E7=A8=B3=E5=AE=9A?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../functions/PulsarMessageSourceReader.java | 50 ++++++++++--------- 1 file changed, 26 insertions(+), 24 deletions(-) diff --git a/utils/sync/src/main/java/com/lanyuanxiaoyao/service/sync/functions/PulsarMessageSourceReader.java b/utils/sync/src/main/java/com/lanyuanxiaoyao/service/sync/functions/PulsarMessageSourceReader.java index d8b7cc6..595015d 100644 --- a/utils/sync/src/main/java/com/lanyuanxiaoyao/service/sync/functions/PulsarMessageSourceReader.java +++ b/utils/sync/src/main/java/com/lanyuanxiaoyao/service/sync/functions/PulsarMessageSourceReader.java @@ -138,34 +138,36 @@ public class PulsarMessageSourceReader extends RichParallelSourceFunction context) throws Exception { String currentValue = null; while (running) { - Message message; - try { - message = reader.readNext(); - if (ObjectUtil.isNotNull(message)) { - String value = message.getValue(); - currentValue = value; - if (ObjectUtil.isEmpty(value)) { - logger.warn("{} {}", message.getValue(), message.getMessageId()); - } - synchronized (context.getCheckpointLock()) { - context.collect(value); - } + synchronized (context.getCheckpointLock()) { + Message message; + try { + message = reader.readNext(); + if (ObjectUtil.isNotNull(message)) { + String value = message.getValue(); + currentValue = value; + if (ObjectUtil.isEmpty(value)) { + logger.warn("{} {}", message.getValue(), message.getMessageId()); + } + synchronized (context.getCheckpointLock()) { + context.collect(value); + } - if (RecordHelper.isNotVersionUpdateRecord(value)) { - latestPublishTime.set(message.getPublishTime()); - latestReceiveTime.set(Instant.now().toEpochMilli()); - } - lastMessageId.set(message.getMessageId()); + if (RecordHelper.isNotVersionUpdateRecord(value)) { + latestPublishTime.set(message.getPublishTime()); + latestReceiveTime.set(Instant.now().toEpochMilli()); + } + lastMessageId.set(message.getMessageId()); - messageReceiveMetric.increment(); - try { - messageSizeReceiveMetric.increment(message.getValue().getBytes().length); - } catch (Throwable t) { - logger.warn("Parse message size failure", t); + messageReceiveMetric.increment(); + try { + messageSizeReceiveMetric.increment(message.getValue().getBytes().length); + } catch (Throwable t) { + logger.warn("Parse message size failure", t); + } } + } catch (Throwable t) { + throw new Exception("Read message failure, current value: " + currentValue, t); } - } catch (Throwable t) { - throw new Exception("Read message failure, current value: " + currentValue, t); } } }