diff --git a/service-common/src/main/java/com/lanyuanxiaoyao/service/common/Constants.java b/service-common/src/main/java/com/lanyuanxiaoyao/service/common/Constants.java index e3723f5..f43380a 100644 --- a/service-common/src/main/java/com/lanyuanxiaoyao/service/common/Constants.java +++ b/service-common/src/main/java/com/lanyuanxiaoyao/service/common/Constants.java @@ -207,6 +207,7 @@ public interface Constants { String TAGS_USE_TEST_JAR = "USE_TEST_JAR"; String TAGS_ODS = "ODS"; String TAGS_ODS_FOCUS = "ODS_FOCUS"; + String TAGS_CRM_FOCUS = "CRM_FOCUS"; String COMPACTION_QUEUE_PRE = "compaction-queue-pre"; String COMPACTION_QUEUE_B1 = "compaction-queue-b1"; diff --git a/service-common/src/main/java/com/lanyuanxiaoyao/service/common/utils/LogHelper.java b/service-common/src/main/java/com/lanyuanxiaoyao/service/common/utils/LogHelper.java index 80b0d14..9f74c85 100644 --- a/service-common/src/main/java/com/lanyuanxiaoyao/service/common/utils/LogHelper.java +++ b/service-common/src/main/java/com/lanyuanxiaoyao/service/common/utils/LogHelper.java @@ -92,6 +92,9 @@ public class LogHelper { public enum LogPoint { PULSAR_SOURCE_CHECKPOINT_INITIAL(100), PULSAR_SOURCE_CHECKPOINT_INITIAL_MESSAGE_ID(101), + PULSAR_SOURCE_CHECKPOINT_SNAPSHOT_START(102), + PULSAR_SOURCE_CHECKPOINT_SNAPSHOT_MESSAGE_ID(103), + PULSAR_SOURCE_CHECKPOINT_COMPLETE_MESSAGE_ID(104), MESSAGE_ID_EMPTY(1), CHECKPOINT_INITIAL(2), diff --git a/utils/sync/src/main/java/com/lanyuanxiaoyao/service/sync/functions/PulsarMessageSourceReader.java b/utils/sync/src/main/java/com/lanyuanxiaoyao/service/sync/functions/PulsarMessageSourceReader.java index 5e82422..860a728 100644 --- a/utils/sync/src/main/java/com/lanyuanxiaoyao/service/sync/functions/PulsarMessageSourceReader.java +++ b/utils/sync/src/main/java/com/lanyuanxiaoyao/service/sync/functions/PulsarMessageSourceReader.java @@ -37,9 +37,10 @@ import org.apache.pulsar.client.internal.DefaultImplementation; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import static com.lanyuanxiaoyao.service.common.utils.LogHelper.LogPoint.CHECKPOINT_COMPLETE; import static com.lanyuanxiaoyao.service.common.utils.LogHelper.LogPoint.CHECKPOINT_INITIAL_MESSAGE_ID; -import static com.lanyuanxiaoyao.service.common.utils.LogHelper.LogPoint.CHECKPOINT_START; +import static com.lanyuanxiaoyao.service.common.utils.LogHelper.LogPoint.PULSAR_SOURCE_CHECKPOINT_COMPLETE_MESSAGE_ID; +import static com.lanyuanxiaoyao.service.common.utils.LogHelper.LogPoint.PULSAR_SOURCE_CHECKPOINT_SNAPSHOT_MESSAGE_ID; +import static com.lanyuanxiaoyao.service.common.utils.LogHelper.LogPoint.PULSAR_SOURCE_CHECKPOINT_SNAPSHOT_START; /** * Pulsar Reader Source @@ -177,15 +178,16 @@ public class PulsarMessageSourceReader extends RichParallelSourceFunction