feat(sync): 增加checkpoint的日志记录

This commit is contained in:
v-zhangjc9
2024-06-17 18:49:43 +08:00
parent b787a4f393
commit 7f10d53c4e
3 changed files with 10 additions and 4 deletions

View File

@@ -37,9 +37,10 @@ import org.apache.pulsar.client.internal.DefaultImplementation;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static com.lanyuanxiaoyao.service.common.utils.LogHelper.LogPoint.CHECKPOINT_COMPLETE;
import static com.lanyuanxiaoyao.service.common.utils.LogHelper.LogPoint.CHECKPOINT_INITIAL_MESSAGE_ID;
import static com.lanyuanxiaoyao.service.common.utils.LogHelper.LogPoint.CHECKPOINT_START;
import static com.lanyuanxiaoyao.service.common.utils.LogHelper.LogPoint.PULSAR_SOURCE_CHECKPOINT_COMPLETE_MESSAGE_ID;
import static com.lanyuanxiaoyao.service.common.utils.LogHelper.LogPoint.PULSAR_SOURCE_CHECKPOINT_SNAPSHOT_MESSAGE_ID;
import static com.lanyuanxiaoyao.service.common.utils.LogHelper.LogPoint.PULSAR_SOURCE_CHECKPOINT_SNAPSHOT_START;
/**
* Pulsar Reader Source
@@ -177,15 +178,16 @@ public class PulsarMessageSourceReader extends RichParallelSourceFunction<String
@Override
public void snapshotState(FunctionSnapshotContext context) {
LogHelper.info(logger, PULSAR_SOURCE_CHECKPOINT_SNAPSHOT_START);
MessageId messageId = lastMessageId.get();
messageIdMap.put(context.getCheckpointId(), messageId);
LogHelper.info(logger, CHECKPOINT_START, "Checkpoint start message id: {}, checkpoint id: {}", messageId, context.getCheckpointId());
LogHelper.info(logger, PULSAR_SOURCE_CHECKPOINT_SNAPSHOT_MESSAGE_ID, "Checkpoint start message id: {}, checkpoint id: {}", messageId, context.getCheckpointId());
}
@Override
public void notifyCheckpointComplete(long checkpointId) {
MessageId messageId = messageIdMap.getOrDefault(checkpointId, MessageId.earliest);
LogHelper.info(logger, CHECKPOINT_COMPLETE, "Checkpoint complete message id: {}, checkpoint id: {}", messageId, checkpointId);
LogHelper.info(logger, PULSAR_SOURCE_CHECKPOINT_COMPLETE_MESSAGE_ID, "Checkpoint complete message id: {}, checkpoint id: {}", messageId, checkpointId);
StatusUtils.syncCheckpoint(globalConfiguration, flinkJob, tableMeta, messageId.toString());