Spring AI默认大模型配置不支持同时配置两个文本大模型,比如一个文本大模型和一个图像大模型,改用自定义的配置
概述
围绕Hudi同步和压缩流程建立了一整套的工具和流程。
部署
由于部份配置需要在源码增删,无法通过配置文件修改,如果部署新集群,建议对master建立分支进行操作,避免配置混淆
配置准备
主机规划
通常来说,应用部署需要逻辑上规划有
- 一台接口机;用于操作脚本、存放jar包等。
- 一台组件机;用于部署应用配套的外部组件。
- 剩下的作为应用机;用于部署应用服务节点。
上述为逻辑规划,原则上对于机器配置没有特别要求,可以混用;如接口机上也用于部署外部组件,同时也作为服务节点部署服务。
组件机上规划有指标和日志汇聚,hudi任务多的情况下,对IO和磁盘有一定的负载要求,建议单独部署。
外部资源确认
- Hadoop3
用于存放Hudi数据、Flink任务业务jar包等- Yarn集群节点到HDFS的增删改查权限
- 应用部署主机到HDFS的增删改查权限
- Zookeeper
用于hudi同步压缩服务注册,通常可以使用Hadoop3集群附带的zookeeper- Yarn集群节点到zookeeper的网络连通
- 应用部署主机到zookeeper的网络连通
- Yarn
用于运行hudi同步压缩任务- 应用部署主机提交任务的权限
- MySQL
配置数据库;目前和汇聚平台配置库耦合,不能改,只能直接使用汇聚配置数据库- 应用部署节点到MySQL的访问连通
- Pulsar
数据源- Yarn集群节点到pulsar的读取连通
- 应用部署主机到pulsar的client连通和admin连通
- 应用部署主机
部署配套应用服务- JDK 1.8(实现OpenJDK标准即可)
- Kerberos
- 接口机到应用机的ssh免密(用于服务启停部署)
Hadoop配置
在config目录下以「hudi数据所有集群名称+hudi同步运行集群名称」的格式建立配置文件夹,如b2b12,并将下列配置文件放在这里
- core-site.xml
- hdfs-site.xml
- yarn-site.xml
- viewfs-mount-table.xml(如果需要使用联邦,core-site.xml也需要将引用改为相对路径引用)
外部组件部署
组件安装包放在研发云制品库:汇聚平台项目-制品库 odcp-snapshot-generic-local/cluster-tools下面
Victoria Metrics
Victoria Metrics是一款替代Prometheus的开源指标采集应用,支持分布式部署和指标主动推送,完全兼容Prometheus API,提供了接近数倍于Prometheus的性能。
目前项目采用单节点模式部署,性能还有富裕,后期可根据性能需求,改为分布式部署。
victoria_metrics-1.98
├── prometheus.yml 配置文件
├── start_victoria_metrics.sh 启动脚本
├── stop_victoria_metrics.sh 停止脚本
├── version
├── victoria-metrics-prod-amd64 x86架构
└── victoria-metrics-prod-arm64 arm架构
启动脚本
启动脚本需要根据部署的主机区分CPU架构,修改脚本内容,引用合适的可执行文件。
用户名为EsCFVuNkiDWv7PKmcF,密码为Abf%x9ocS^iKr3tgrd,用于保护Victoria Metrics的Web页面。
nohup $current_path/victoria-metrics-prod-amd64 \
-search.maxQueryLen=1MB \
-promscrape.maxScrapeSize=100MB \
-loggerLevel=ERROR \
-storageDataPath=$current_path/data \
-retentionPeriod=7d \
-httpAuth.username=EsCFVuNkiDWv7PKmcF \
-httpAuth.password=Abf%x9ocS^iKr3tgrd \
-httpListenAddr=":35710" \
-promscrape.config=$current_path/prometheus.yml \
-promscrape.httpSDCheckInterval=15s \
> $current_path/victoria-metrics.log 2>&1 &
启动完成后可以通过脚本指定的端口35710访问Web页面,即为启动成功。
采集配置
Victoria Metrics的配置文件完全兼容Prometheus,可以直接替换使用。
global:
scrape_interval: 15s
scrape_timeout: 15s
scrape_configs:
# Victoria Metrics 本身的指标
- job_name: 'Victoria Metrics Single Cluster'
metrics_path: '/metrics'
basic_auth:
username: EsCFVuNkiDWv7PKmcF
password: Abf%x9ocS^iKr3tgrd
static_configs:
- targets:
- '132.126.207.34:35710'
# Loki日志采集
- job_name: 'Loki'
metrics_path: '/metrics'
static_configs:
- targets:
- '132.126.207.34:33100'
# 集群主机管理,根据需要启用
- job_name: 'Node Exporter'
metrics_path: '/metrics'
static_configs:
- targets:
- '132.126.207.124:20110'
# 应用服务的指标采集,部署Victoria Metrics的时候可以注释,等应用部署完成后再打开
- job_name: 'Service'
metrics_path: '/actuator/prometheus'
basic_auth:
username: AxhEbscwsJDbYMH2
password: cYxg3b4PtWoVD5SjFayWxtnSVsjzRsg4
# 可以通过service-cloud-query模块动态获取各个服务节点的IP
http_sd_configs:
- url: http://132.126.207.34:35690/hudi_services/service_cloud_query/cloud/targets
basic_auth:
username: AxhEbscwsJDbYMH2
password: cYxg3b4PtWoVD5SjFayWxtnSVsjzRsg4
Loki
Loki是一款日志汇聚工具,Grafana公司出品,可以通过Grafana直接查询日志,相比ELK更为轻量级,不需要额外的日志采集组件,支持应用直接推送日志,对简单的分布式应用更友好。
根据规划,Loki分为服务日志和Hudi日志,选择两台机器分别部署;Hudi日志较多,如果所有日志划分到一个Loki,可能会影响服务日志查询
loki-2.9
├── stop_loki.sh 停止脚本
├── version
├── loki-linux-amd64 x86架构
├── loki-linux-arm64 arm架构
├── loki-config.yml 配置文件
├── start_loki.sh 启动脚本
└── logs
配置文件
# Loki不支持直接配置访问控制,官方建议使用Nginx来实现
auth_enabled: false
query_range:
parallelise_shardable_queries: false
server:
# 日志推送端口
http_listen_port: 33100
grpc_listen_port: 39096
log_level: error
grpc_server_max_recv_msg_size: 1572864000
grpc_server_max_send_msg_size: 1572864000
common:
# 指向规划好的数据盘
path_prefix: /data/datalake/data/loki
storage:
filesystem:
chunks_directory: /data/datalake/data/loki/chunks
rules_directory: /data/datalake/data/loki/rules
replication_factor: 1
ring:
instance_addr: 127.0.0.1
kvstore:
store: inmemory
table_manager:
retention_deletes_enabled: true
retention_period: 72h
schema_config:
configs:
- from: 2023-12-06
store: boltdb-shipper
object_store: filesystem
schema: v11
index:
prefix: index_
period: 24h
Grafana
根据CPU架构选择grafana-amd64-10.3或者grafana-arm64-10.3部署包。
grafana-amd64-10.3
├── data
├── start_grafana.sh
├── stop_grafana.sh
├── npm-artifacts
├── packaging
├── plugins-bundled
├── public
├── bin
├── conf
├── storybook
├── tools
├── VERSION
├── LICENSE
├── NOTICE.md
├── README.md
└── Dockerfile
数据源配置
根据部署的组件新增数据源,其中Loki区分服务日志和Hudi日志(如果没有部署两个,可以不区分)。
配置数据库
在database路径下,有数据库初始化脚本,其中tb_app_hudi_job_config和tb_app_yarn_job_config两张表提供了预设配置。
建议为每一个环境创建一个单独的database,避免不同集群的配置混在一起。
database
├── tb_app_collect_table_info.sql
├── tb_app_collect_table_version.sql
├── tb_app_flink_job_config.sql
├── tb_app_global_config.sql
├── tb_app_hudi_compaction_job.sql
├── tb_app_hudi_compaction_metrics.sql
├── tb_app_hudi_event.sql
├── tb_app_hudi_job_config.sql
├── tb_app_hudi_sync_state.sql
└── tb_app_yarn_job_config.sql
部署配置
以service-cli/service-cli-runner/src/main/resources/application-b12.yml为模版,补全应用部署信息
以service-web/src/main/resources/static/common/info.js#commonInfo为模板,补全web页面相关信息
接口机目录规划
选择一个空白目录作为根目录(下文简称「根目录」),创建下列子目录。
.
├── cloud 服务管理脚本
├── command 应用命令行
├── data 应用数据
├── extra 外部组件
├── logs 应用日志
└── uploader 上传服务
脚本生成
将bin/generate
脚本复制到根目录下,根据实际情况改动脚本中指向的一些目录、jdk和jar包路径,执行脚本;正常情况下,脚本将会为cloud、command、uploader
目录生成脚本。
启动上传服务
如果使用其他文件服务,如YTP,则不需要操作这一步,直接使用其他文件服务即可。
uploader
├── start.sh 启动脚本
├── stop.sh 停止脚本
└── update.sh 更新脚本
执行start.sh启动上传服务,默认端口为36800。
编译打包项目
将文件服务的url补充到bin/library.sh中,执行bin/build-all.sh,打包编译全部模块并上传到文件服务。
启动应用
在cloud路径下有已经生成好的服务启动脚本,deploy-service开头的脚本为启动脚本,脚本默认包含更新并启动,保证每次都可以使用最新的jar包启动。
应用包含多个服务,为了方便及时监控应用启动状态,建议按照如下顺序启动:
- service-gateway、service-queue
- service-web
- service-api
- 各种query
- service-cloud-query
- service-flink-query
- service-hudi-query
- service-info-query
- service-pulsar-query
- service-yarn-query
- service-zookeeper-query
- service-launcher-*
- service-executor-manager
- service-monitor
- service-scheduler
由于scheduler包含定时任务,通常放在最后确认整个项目都没问题,或者同步已经启动并正常运行之后,再启动压缩调度模块。
可以配合web页面和grafana日志监控保证服务启动正常,通常服务启动完成一个再启动下一个,避免服务启动失败,导致服务间调用雪崩。
运维
应用提供三部分运维方案:
- 命令行:提供命令行操作,包括同步的启动、停止等,在Web无法使用的情况下做一些简单运维操作;
- Web:提供表信息、运行总览等,完成常见的运维操作;
- HDFS:在上述手段出现问题,或无法操作的情况下,直接访问HDFS查看hudi表相关的状态。
命令行
为了避免页面网络不同导致无法运维操作,同时也为了方便运维人员在脚本中直接操作hudi服务,提供了命令行工具用于操作hudi同步任务。
配置操作
用于增加同步表配置信息,涉及到tb_app_flink_job_config和tb_app_collect_table_info两张表,由于涉及配置项较多,特别提供命令行工具做基本的配置。
但由于开发和运维的脱钩,往往命令行工具的配置类型不能完全满足运维同事组织同步表的模式,具体的表配置,还是由运维同事自行发挥,或者统一开发再进行配置操作。
查看配置信息
其中搜索模式包括REGEX(正则表达式)、CONTAINS(包含)、UN_CONTAINS(不包含)、EQUALS(全等)、UN_EQUALS(不全等)
NAME
job-all - 显示 Flink job 配置信息
SYNOPSYS
job-all [[--type] type] [[--pattern] string]
OPTIONS
--type type
搜索模式
[Optional, default = CONTAINS]
--pattern string
搜索值 (除了 REGEX 模式, 其余可使用英文逗号分隔多个值)
[Optional, default = ]
新增配置信息
NAME
job-add - 新增 Flink job 配置
SYNOPSYS
job-add [--type] table-type [--database] string [--database-type] source-type [--schema] string [--table] string [--hdfs] string [--pulsar-address] string
OPTIONS
--type table-type
表类型 (BIG 为大表, SMALL 为小表, ACCT 为 ACCT 表)
[Mandatory]
--database string
数据源
[Mandatory]
--database-type source-type
数据源类型
[Mandatory]
--schema string
Scheme
[Mandatory]
--table string
表名
[Mandatory]
--hdfs string
HDFS 路径前缀
[Mandatory]
--pulsar-address string
Pulsar Address
[Mandatory]
批量新增配置信息
source指向的文件,按行分隔,每行用空格分隔字段,字段内容按「新增配置信息」的字段填写。
NAME
job-add-batch - 批量新增 Flink job 配置
SYNOPSYS
job-add-batch [--source] string
OPTIONS
--source string
配置文件路径
[Mandatory]
任务操作
针对hudi同步任务的操作。
查看同步任务状态
默认查询全部任务,也可以通过关键词筛选。
NAME
yarn-all - 查询表同步任务状态
SYNOPSYS
yarn-all [[--pattern] string] [--show-flink-job-id] [--running] [--un-running]
OPTIONS
--pattern string
搜索值 (包含查询)
[Optional, default = ]
--show-flink-job-id 展示结果列表的 Flink job id
[Optional, default = false]
--running 展示运行的任务
[Optional, default = false]
--un-running 展示非运行的任务
[Optional, default = false]
启停同步任务
--ignore-check参数在其他脚本,如定时任务,的时候比较方便,可以跳过二次确认。
NAME
yarn-run - 启动表同步任务
SYNOPSYS
yarn-run [--flink-job-id] long [--ignore-check]
OPTIONS
--flink-job-id long
Flink job id
[Mandatory]
--ignore-check Ignore double check
[Optional, default = false]
NAME
yarn-run-batch - 批量启动表同步任务
SYNOPSYS
yarn-run-batch [--flink-job-ids] long[] [--ignore-check]
OPTIONS
--flink-job-ids long[]
Flink job id
[Mandatory]
--ignore-check Ignore double check
[Optional, default = false]
NAME
yarn-run-un-running - 启动所有未运行的表同步任务
SYNOPSYS
yarn-run-un-running [--ignore-check]
OPTIONS
--ignore-check Ignore double check
[Optional, default = false]
批量启停同步任务
NAME
yarn-kill - 停止表同步任务
SYNOPSYS
yarn-kill [--flink-job-id] long [--ignore-check]
OPTIONS
--flink-job-id long
Flink job id
[Mandatory]
--ignore-check Ignore double check
[Optional, default = false]
NAME
yarn-kill-batch - 批量停止表同步任务
SYNOPSYS
yarn-kill-batch [--flink-job-ids] long[] [--ignore-check]
OPTIONS
--flink-job-ids long[]
Flink job id
[Mandatory]
--ignore-check Ignore double check
[Optional, default = false]
NAME
yarn-kill-running - 停止所有正在运行的表同步任务
SYNOPSYS
yarn-kill-running [--ignore-check]
OPTIONS
--ignore-check Ignore double check
[Optional, default = false]
Web
概览
概览页面可以看到应用的主要运行情况,从上到下分别有:
- 表数量
- 逻辑表(根据上游源表名去重得到)
- 湖底表(根据目标hudi表路径去重得到)
- 嗨福表(根据hive表名去重得到)
- Flink运行同步任务数量
- flink任务数
- flink任务下对应表总数
- hudi同步集群情况
- hudi压缩集群情况
- 跨天情况
- 重点表跨天情况
- 普通表跨天情况
- 压缩调度定时策略
表任务
可以按表级别查询到相关配置、运行、周边信息,完成常见的表运维跟踪。
Flink任务详情
点击Flink job id项目可以打开Flink任务详情页面。
表任务详情
点击别名项目可以打开表任务详情页面,在这个页面主要可以查看表本身的配置信息,尤其是字段信息。
同步情况
查看Flink同步任务的详情和历史情况,方便直接跳转日志查看。
压缩情况
查看Flink同步任务的详情和历史情况,方便直接跳转日志查看;由于压缩任务运行在多集群,历史任务保留多久,会受到各个集群任务量、配置的限制,如果超出集群保留任务列表的限制(比如b1集群任务列表整个集群最后10000条),就会在这里查询不到,在使用的时候需要注意这一点。
历史压缩
记录在数据库中的历史压缩情况,由各个服务端维护,方便查询历史压缩情况,不受集群保留历史任务限制。
时间线
查询实时hudi表时间线,可以更清晰地看到压缩情况和同步情况,对于查询压缩任务启停时间点,以及压缩任务包含文件数,非常有帮助。
Pulsar队列
用于查询Pulsar Topic的相关信息,查看Reader积压情况,生产者是否在线等。
Hudi表结构
常用于查看实际的表结构是否和配置表相同,判断hudi表有没有更新表结构。
压缩队列
查看各个压缩队列的任务详情,方便查找问题任务。
跨天
查看一些简单的跨天信息。
同步集群
常用页面,用于监控同步集群中的同步任务的启停状态,Yarn页面不能自动刷新,状态显示不清晰,这个界面基本可以替代Yarn页面,页面默认筛选hudi同步任务,也可以方便跳转同步任务对应的Yarn页面和日志。
压缩集群
和「同步集群」页面功能一致,这个页面还能聚合各个集群在同一个表格中,方便同时查看多个集群内的压缩任务。
Cloud
查看服务部署实时状态和方便跳转服务日志。
小工具
提供一些实际运维过程中需要的小工具。
查询时间线
根据指定的HDFS路径查询;常有一些情况需要查询备份目录下的hudi表情况,这些备份表不在配置体系里面,可以通过这个工具直接查看指定路径下的hudi表时间线、表结构。
提交压缩任务
手动提交压缩任务,可以指定压缩任务执行的集群。
批量提交压缩任务
批量提交压缩任务,一行一个hudi表,使用空格分开flink_job_id和alias。
停止所有压缩任务
停止压缩任务往往遍布各个集群,压缩任务又可能会存在各个压缩队列里,这个工具可以直接清除各个压缩队列里的任务、停止各个集群里正在运行的压缩任务。
离线检索
使用Flink运行一些需要耗时较长的查询任务,和hudi同步任务一样,提交到Yarn集群上,可以做一些牛逼操作;查询结果在页面下方的任务列表中查看。
提交Flink任务有一定的延时,尤其在调度压缩任务的时候,可能会由于资源不足导致提交失败,要观察一些,不要多次提交,避免消耗集群资源。
查询记录
可以查询包含指定字符串(通常是主键)的记录,横跨Pulsar、hudi日志、hudi数据,通常用于丢数据时,找到数据丢在哪一个环节,解析出记录的时间线;
有时候Pulsar查询可能会有一点问题,导致整个任务失败,建议Pulsar查询单独启动一个任务。
检索最后操作时间
查询hudi表最后操作时间,逐条检索data文件的记录,这个时间应该会比较准确;通常用于想知道某个表的准确最新更新时间。
HDFS
由于运维工具的与实时配置库深度绑定,有时候针对临时环境、备份hudi表、测试环境等,没有运维工具辅助,可以直接查看hudi表实际结构来做简单的运维判断,这一节内容介绍一些和hudi表有关的结构,辅助做一些简单的运维判断。
一个完整的hudi表有以下部分构成:
dws_acct_item_gz
├── .hoodie
│ ├── .aux
│ ├── .schema
│ ├── .temp
│ ├── 19700101000000001.deltacommit
│ ├── 19700101000000001.deltacommit.inflight
│ ├── 19700101000000001.deltacommit.requested
│ ├── 20240312151719746.deltacommit
│ ├── 20240312151719746.deltacommit.inflight
│ ├── 20240312151719746.deltacommit.requested
│ ├── 20240312153252615.commit
│ ├── 20240312153252615.compaction.inflight
│ ├── 20240312153252615.compaction.requested
│ ├── archived
│ └── hoodie.properties
└── 200
├── .00000911-5f8f-4a72-9687-d78a0ec7ec5f_19700101000000001.log.1_71-200-0
├── .00000911-5f8f-4a72-9687-d78a0ec7ec5f_20240312153252615.log.1_71-200-0
└── 00000911-5f8f-4a72-9687-d78a0ec7ec5f_4-10-0_19700101000000001.parquet
其中,.hoodie文件夹通常被称为「时间线」,记录了hudi表的各种操作时间点、顺序和操作内容,requested结尾的时间线文件为操作请求,表示一个操作的开始,文件内容往往为将要操作的内容;inflight结尾的时间线文件为操作进行中标志,意思是这个操作正在进行中;没有结尾的文件为操作完成标志,表示这个操作已经完成,文件内容往往为一些操作结果,特别的,压缩操作的完成标志,结尾为commit。
时间线可以帮助运维人员了解hudi表的操作的详细时间点,当压缩或者同步出现问题的时候,往往会体现在时间线上,时间线的操作类型还有很多,可以在源码中了解。
分区文件夹中,存放的是hudi表的实际数据,其中文件命中带有log的文件为增量文件,也称为日志文件,存放的是checkpoint之间写入磁盘的增量数据;经过压缩操作之后,增量数据就会被整合成为parquet结尾的数据文件;类似5f8f-4a72-9687-d78a0ec7ec5f这样的字符串被称为file_id,通常一个file_id会对应多个日志文件和一个数据文件;类似19700101000000001和20240312153252615这样的字符串被称为「时间点」,和时间线上的操作对应,多个时间点对应的文件会同时存在,这是hudi配置里定义的保留文件版本,用于hudi错误回退。
案例
丢主键为xxx的某条数据
xxx表的数据没有到最新
开发
主要分为三部分,包含 Hudi 运行代码、运维服务和部署工具。
@startuml
skinparam Dpi 500
title 系统架构图
entity 外部应用 as external_apps
cloud Hadoop集群 as hadoop {
rectangle Hudi同步任务 as hudi_task
}
rectangle Hudi服务群 as hudi_services
rectangle 汇聚平台 as odcp
entity 业务平台 as datasource
datasource -> odcp:数据采集
odcp -right-> hudi_task:数据加工/推送
hudi_services -u.> hudi_task:Hudi同步任务提交
hudi_task -> external_apps:提供数据
@startuml
'hide circle
title 功能架构图
'rectangle 业务应用 as source {
' database 业务数据源
'}
'rectangle 汇聚平台 as odcp {
' rectangle 数据采集 as o1
' rectangle 数据转换 as o2
' rectangle 数据加工 as o3
' rectangle 数据管理 as o4
'
' o1 .[hidden] o2
' o2 .[hidden] o3
' o3 .[hidden] o4
'}
@enduml
@startuml
'skinparam Linetype ortho
skinparam Dpi 500
title 技术架构图
cloud Yarn集群 as yarn {
rectangle Hudi同步压缩任务 as sc
rectangle "..." as other
sc .[hidden] other
}
cloud Yarn集群 as yarn2
database HDFS as hdfs
database HDFS as hdfs2
database Zookeeper as zk
rectangle 汇聚平台 {
database "MySQL" as db
queue "Pulsar集群" as pulsar
db -u[hidden]- pulsar
}
rectangle "Hudi服务群" as service {
rectangle 调度服务 as schedule
rectangle "查询服务" as query
rectangle web控制台 as web
web --> query:查询
schedule <--> query:调度详情
}
rectangle 外部应用 as app {
rectangle Flink as flink
rectangle Spark as spark
rectangle Hive as hive
flink .[hidden] spark
spark .[hidden] hive
}
schedule -u-> yarn:"提交Hudi同步/压缩任务"
pulsar -u-> sc:数据提供
sc -l-> hdfs:数据输出
hdfs <-u- flink:查询
hdfs <-u- spark:查询
hdfs <-u- hive:查询
pulsar -> query:元数据查询
query <- db: 配置信息
query <-- hdfs2: Hudi信息
query <-- zk: 锁信息
query <-- yarn2: 集群信息
@enduml
Hudi 运行代码
sync
sync模块包含hudi运行的全部业务逻辑。
项目采用flink作为hudi的运行平台。flink的运行模式为flink on yarn application,这个模式下每次调度服务向yarn提交任务都会在yarn上启动一个单独的flink集群(一个flink集群由一个jobmanager和多个taskmanager组成),再在这个独立的flink集群中运行sync模块。
@startuml
'skinparam Linetype ortho
'skinparam Dpi 500
title sync模块和相关模块的关系
database HDFS as hdfs {
component sync模块 as s1
}
rectangle Hudi服务群 as hudi_services {
package 调度服务 as scheduler {
component flink运行依赖 as f1
component hudi运行依赖 as h1
f1 .[hidden] h1
}
}
cloud yarn集群 as yarn {
package Hudi任务 as hudi_task {
component flink运行依赖 as f2
component hudi运行依赖 as h2
component sync模块 as s2
f2 .[hidden] h2
h2 .[hidden] s2
}
}
f1 .. f2:运行依赖提交到集群
h1 .. h2:运行依赖提交到集群
s1 .. s2:从hdfs将sync模块下载到任务中
@enduml
Synchronizer
数据同步模块,用于同步数据的业务代码,com.lanyuanxiaoyao.service.sync.Synchronizer#main包含所有的业务流程。
@startuml
title 主要逻辑业务流程
start
:接收传入参数;
if (注册zk锁节点) is (成功) then
:构造flink基本参数;
:判断同步任务类型,构造不同的hudi参数;
:启动flink任务;
else (失败)
stop
endif
partition flink任务流程 {
repeat :接收pulsar消息;
:转换pulsar json消息为Record对象;
if (是否为跨天消息) is (是) then
else (不是)
:记录操作时间;
endif
repeat while (消息是否合法) is (否) not (是)
:构造Hudi Schema;
:Record对象转换为flink row;
}
stop
@enduml
Compaction
数据压缩模块,用于压缩数据的业务代码,com.lanyuanxiaoyao.service.sync.Compaction#main包含所有的业务流程。
运维服务
运维服务包含运行时维护一些查询信息相关的服务



























