From 7f10d53c4e3a2ebba6b7fff7c3efcdca794c1213 Mon Sep 17 00:00:00 2001 From: v-zhangjc9 Date: Mon, 17 Jun 2024 18:49:43 +0800 Subject: [PATCH] =?UTF-8?q?feat(sync):=20=E5=A2=9E=E5=8A=A0checkpoint?= =?UTF-8?q?=E7=9A=84=E6=97=A5=E5=BF=97=E8=AE=B0=E5=BD=95?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../com/lanyuanxiaoyao/service/common/Constants.java | 1 + .../lanyuanxiaoyao/service/common/utils/LogHelper.java | 3 +++ .../sync/functions/PulsarMessageSourceReader.java | 10 ++++++---- 3 files changed, 10 insertions(+), 4 deletions(-) diff --git a/service-common/src/main/java/com/lanyuanxiaoyao/service/common/Constants.java b/service-common/src/main/java/com/lanyuanxiaoyao/service/common/Constants.java index e3723f5..f43380a 100644 --- a/service-common/src/main/java/com/lanyuanxiaoyao/service/common/Constants.java +++ b/service-common/src/main/java/com/lanyuanxiaoyao/service/common/Constants.java @@ -207,6 +207,7 @@ public interface Constants { String TAGS_USE_TEST_JAR = "USE_TEST_JAR"; String TAGS_ODS = "ODS"; String TAGS_ODS_FOCUS = "ODS_FOCUS"; + String TAGS_CRM_FOCUS = "CRM_FOCUS"; String COMPACTION_QUEUE_PRE = "compaction-queue-pre"; String COMPACTION_QUEUE_B1 = "compaction-queue-b1"; diff --git a/service-common/src/main/java/com/lanyuanxiaoyao/service/common/utils/LogHelper.java b/service-common/src/main/java/com/lanyuanxiaoyao/service/common/utils/LogHelper.java index 80b0d14..9f74c85 100644 --- a/service-common/src/main/java/com/lanyuanxiaoyao/service/common/utils/LogHelper.java +++ b/service-common/src/main/java/com/lanyuanxiaoyao/service/common/utils/LogHelper.java @@ -92,6 +92,9 @@ public class LogHelper { public enum LogPoint { PULSAR_SOURCE_CHECKPOINT_INITIAL(100), PULSAR_SOURCE_CHECKPOINT_INITIAL_MESSAGE_ID(101), + PULSAR_SOURCE_CHECKPOINT_SNAPSHOT_START(102), + PULSAR_SOURCE_CHECKPOINT_SNAPSHOT_MESSAGE_ID(103), + PULSAR_SOURCE_CHECKPOINT_COMPLETE_MESSAGE_ID(104), MESSAGE_ID_EMPTY(1), CHECKPOINT_INITIAL(2), diff --git a/utils/sync/src/main/java/com/lanyuanxiaoyao/service/sync/functions/PulsarMessageSourceReader.java b/utils/sync/src/main/java/com/lanyuanxiaoyao/service/sync/functions/PulsarMessageSourceReader.java index 5e82422..860a728 100644 --- a/utils/sync/src/main/java/com/lanyuanxiaoyao/service/sync/functions/PulsarMessageSourceReader.java +++ b/utils/sync/src/main/java/com/lanyuanxiaoyao/service/sync/functions/PulsarMessageSourceReader.java @@ -37,9 +37,10 @@ import org.apache.pulsar.client.internal.DefaultImplementation; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import static com.lanyuanxiaoyao.service.common.utils.LogHelper.LogPoint.CHECKPOINT_COMPLETE; import static com.lanyuanxiaoyao.service.common.utils.LogHelper.LogPoint.CHECKPOINT_INITIAL_MESSAGE_ID; -import static com.lanyuanxiaoyao.service.common.utils.LogHelper.LogPoint.CHECKPOINT_START; +import static com.lanyuanxiaoyao.service.common.utils.LogHelper.LogPoint.PULSAR_SOURCE_CHECKPOINT_COMPLETE_MESSAGE_ID; +import static com.lanyuanxiaoyao.service.common.utils.LogHelper.LogPoint.PULSAR_SOURCE_CHECKPOINT_SNAPSHOT_MESSAGE_ID; +import static com.lanyuanxiaoyao.service.common.utils.LogHelper.LogPoint.PULSAR_SOURCE_CHECKPOINT_SNAPSHOT_START; /** * Pulsar Reader Source @@ -177,15 +178,16 @@ public class PulsarMessageSourceReader extends RichParallelSourceFunction