feat(sync): 使用checkpoint lock报障message id上报的时候状态稳定

This commit is contained in:
v-zhangjc9
2024-06-05 17:37:32 +08:00
parent 0b4cec3955
commit f0a6dbbbb5

View File

@@ -138,6 +138,7 @@ public class PulsarMessageSourceReader extends RichParallelSourceFunction<String
public void run(SourceContext<String> context) throws Exception { public void run(SourceContext<String> context) throws Exception {
String currentValue = null; String currentValue = null;
while (running) { while (running) {
synchronized (context.getCheckpointLock()) {
Message<String> message; Message<String> message;
try { try {
message = reader.readNext(); message = reader.readNext();
@@ -169,6 +170,7 @@ public class PulsarMessageSourceReader extends RichParallelSourceFunction<String
} }
} }
} }
}
@Override @Override
public void open(Configuration configuration) throws Exception { public void open(Configuration configuration) throws Exception {