diff --git a/utils/sync/src/main/java/com/lanyuanxiaoyao/service/sync/functions/PulsarMessageSourceReader.java b/utils/sync/src/main/java/com/lanyuanxiaoyao/service/sync/functions/PulsarMessageSourceReader.java index 860a728..30cbc71 100644 --- a/utils/sync/src/main/java/com/lanyuanxiaoyao/service/sync/functions/PulsarMessageSourceReader.java +++ b/utils/sync/src/main/java/com/lanyuanxiaoyao/service/sync/functions/PulsarMessageSourceReader.java @@ -123,42 +123,37 @@ public class PulsarMessageSourceReader extends RichParallelSourceFunction message; - try { - message = reader.readNext(); - if (ObjectUtil.isNotNull(message)) { - String value = message.getValue(); - currentValue = value; - if (ObjectUtil.isEmpty(value)) { - logger.warn("{} {}", message.getValue(), message.getMessageId()); - } - synchronized (context.getCheckpointLock()) { - context.collect(value); - } - - if (RecordHelper.isNotVersionUpdateRecord(value)) { - latestPublishTime.set(message.getPublishTime()); - latestReceiveTime.set(Instant.now().toEpochMilli()); - } - lastMessageId.set(message.getMessageId()); - - messageReceiveCounter.inc(); - try { - messageReceiveBytesCounter.inc(message.getValue().getBytes().length); - } catch (Throwable t) { - logger.warn("Parse message size failure", t); - } + Message message; + try { + message = reader.readNext(); + if (ObjectUtil.isNotNull(message)) { + String value = message.getValue(); + currentValue = value; + if (ObjectUtil.isEmpty(value)) { + logger.warn("{} {}", message.getValue(), message.getMessageId()); + } + synchronized (context.getCheckpointLock()) { + context.collect(value); + } + + if (RecordHelper.isNotVersionUpdateRecord(value)) { + latestPublishTime.set(message.getPublishTime()); + latestReceiveTime.set(Instant.now().toEpochMilli()); + } + lastMessageId.set(message.getMessageId()); + + messageReceiveCounter.inc(); + try { + messageReceiveBytesCounter.inc(message.getValue().getBytes().length); + } catch (Throwable t) { + logger.warn("Parse message size failure", t); } - } catch (Throwable t) { - throw new Exception("Read message failure, current value: " + currentValue, t); } + } catch (Throwable t) { + throw new Exception("Read message failure, current value: " + currentValue, t); } } } - } catch (Throwable exception) { - logger.error(StrUtil.format("Any error ({})", tableMeta.getAlias()), exception); - throw exception; } }