feat(sync): 使用checkpoint lock报障message id上报的时候状态稳定
This commit is contained in:
@@ -138,34 +138,36 @@ public class PulsarMessageSourceReader extends RichParallelSourceFunction<String
|
||||
public void run(SourceContext<String> context) throws Exception {
|
||||
String currentValue = null;
|
||||
while (running) {
|
||||
Message<String> message;
|
||||
try {
|
||||
message = reader.readNext();
|
||||
if (ObjectUtil.isNotNull(message)) {
|
||||
String value = message.getValue();
|
||||
currentValue = value;
|
||||
if (ObjectUtil.isEmpty(value)) {
|
||||
logger.warn("{} {}", message.getValue(), message.getMessageId());
|
||||
}
|
||||
synchronized (context.getCheckpointLock()) {
|
||||
context.collect(value);
|
||||
}
|
||||
synchronized (context.getCheckpointLock()) {
|
||||
Message<String> message;
|
||||
try {
|
||||
message = reader.readNext();
|
||||
if (ObjectUtil.isNotNull(message)) {
|
||||
String value = message.getValue();
|
||||
currentValue = value;
|
||||
if (ObjectUtil.isEmpty(value)) {
|
||||
logger.warn("{} {}", message.getValue(), message.getMessageId());
|
||||
}
|
||||
synchronized (context.getCheckpointLock()) {
|
||||
context.collect(value);
|
||||
}
|
||||
|
||||
if (RecordHelper.isNotVersionUpdateRecord(value)) {
|
||||
latestPublishTime.set(message.getPublishTime());
|
||||
latestReceiveTime.set(Instant.now().toEpochMilli());
|
||||
}
|
||||
lastMessageId.set(message.getMessageId());
|
||||
if (RecordHelper.isNotVersionUpdateRecord(value)) {
|
||||
latestPublishTime.set(message.getPublishTime());
|
||||
latestReceiveTime.set(Instant.now().toEpochMilli());
|
||||
}
|
||||
lastMessageId.set(message.getMessageId());
|
||||
|
||||
messageReceiveMetric.increment();
|
||||
try {
|
||||
messageSizeReceiveMetric.increment(message.getValue().getBytes().length);
|
||||
} catch (Throwable t) {
|
||||
logger.warn("Parse message size failure", t);
|
||||
messageReceiveMetric.increment();
|
||||
try {
|
||||
messageSizeReceiveMetric.increment(message.getValue().getBytes().length);
|
||||
} catch (Throwable t) {
|
||||
logger.warn("Parse message size failure", t);
|
||||
}
|
||||
}
|
||||
} catch (Throwable t) {
|
||||
throw new Exception("Read message failure, current value: " + currentValue, t);
|
||||
}
|
||||
} catch (Throwable t) {
|
||||
throw new Exception("Read message failure, current value: " + currentValue, t);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user