revert(sync): 使用checkpoint lock报障message id上报的时候状态稳定
会导致部分hudi表checkpoint时间超长
This reverts commit f0a6dbbb
This commit is contained in:
@@ -123,42 +123,37 @@ public class PulsarMessageSourceReader extends RichParallelSourceFunction<String
|
|||||||
.create()) {
|
.create()) {
|
||||||
String currentValue = null;
|
String currentValue = null;
|
||||||
while (running) {
|
while (running) {
|
||||||
synchronized (context.getCheckpointLock()) {
|
Message<String> message;
|
||||||
Message<String> message;
|
try {
|
||||||
try {
|
message = reader.readNext();
|
||||||
message = reader.readNext();
|
if (ObjectUtil.isNotNull(message)) {
|
||||||
if (ObjectUtil.isNotNull(message)) {
|
String value = message.getValue();
|
||||||
String value = message.getValue();
|
currentValue = value;
|
||||||
currentValue = value;
|
if (ObjectUtil.isEmpty(value)) {
|
||||||
if (ObjectUtil.isEmpty(value)) {
|
logger.warn("{} {}", message.getValue(), message.getMessageId());
|
||||||
logger.warn("{} {}", message.getValue(), message.getMessageId());
|
}
|
||||||
}
|
synchronized (context.getCheckpointLock()) {
|
||||||
synchronized (context.getCheckpointLock()) {
|
context.collect(value);
|
||||||
context.collect(value);
|
}
|
||||||
}
|
|
||||||
|
if (RecordHelper.isNotVersionUpdateRecord(value)) {
|
||||||
if (RecordHelper.isNotVersionUpdateRecord(value)) {
|
latestPublishTime.set(message.getPublishTime());
|
||||||
latestPublishTime.set(message.getPublishTime());
|
latestReceiveTime.set(Instant.now().toEpochMilli());
|
||||||
latestReceiveTime.set(Instant.now().toEpochMilli());
|
}
|
||||||
}
|
lastMessageId.set(message.getMessageId());
|
||||||
lastMessageId.set(message.getMessageId());
|
|
||||||
|
messageReceiveCounter.inc();
|
||||||
messageReceiveCounter.inc();
|
try {
|
||||||
try {
|
messageReceiveBytesCounter.inc(message.getValue().getBytes().length);
|
||||||
messageReceiveBytesCounter.inc(message.getValue().getBytes().length);
|
} catch (Throwable t) {
|
||||||
} catch (Throwable t) {
|
logger.warn("Parse message size failure", t);
|
||||||
logger.warn("Parse message size failure", t);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
} catch (Throwable t) {
|
|
||||||
throw new Exception("Read message failure, current value: " + currentValue, t);
|
|
||||||
}
|
}
|
||||||
|
} catch (Throwable t) {
|
||||||
|
throw new Exception("Read message failure, current value: " + currentValue, t);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
} catch (Throwable exception) {
|
|
||||||
logger.error(StrUtil.format("Any error ({})", tableMeta.getAlias()), exception);
|
|
||||||
throw exception;
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user