feat(sync): 使用checkpoint lock报障message id上报的时候状态稳定

This commit is contained in:
v-zhangjc9
2024-06-05 17:37:32 +08:00
parent 0b4cec3955
commit f0a6dbbbb5

View File

@@ -138,6 +138,7 @@ public class PulsarMessageSourceReader extends RichParallelSourceFunction<String
public void run(SourceContext<String> context) throws Exception {
String currentValue = null;
while (running) {
synchronized (context.getCheckpointLock()) {
Message<String> message;
try {
message = reader.readNext();
@@ -169,6 +170,7 @@ public class PulsarMessageSourceReader extends RichParallelSourceFunction<String
}
}
}
}
@Override
public void open(Configuration configuration) throws Exception {