feat(sync): 使用checkpoint lock报障message id上报的时候状态稳定
This commit is contained in:
@@ -138,6 +138,7 @@ public class PulsarMessageSourceReader extends RichParallelSourceFunction<String
|
|||||||
public void run(SourceContext<String> context) throws Exception {
|
public void run(SourceContext<String> context) throws Exception {
|
||||||
String currentValue = null;
|
String currentValue = null;
|
||||||
while (running) {
|
while (running) {
|
||||||
|
synchronized (context.getCheckpointLock()) {
|
||||||
Message<String> message;
|
Message<String> message;
|
||||||
try {
|
try {
|
||||||
message = reader.readNext();
|
message = reader.readNext();
|
||||||
@@ -169,6 +170,7 @@ public class PulsarMessageSourceReader extends RichParallelSourceFunction<String
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void open(Configuration configuration) throws Exception {
|
public void open(Configuration configuration) throws Exception {
|
||||||
|
|||||||
Reference in New Issue
Block a user