Files
hudi-service/README.md
2024-04-18 16:23:14 +08:00

1022 lines
30 KiB
Markdown
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
<!-- TOC -->
* [概述](#概述)
* [部署](#部署)
* [配置准备](#配置准备)
* [主机规划](#主机规划)
* [外部资源确认](#外部资源确认)
* [Hadoop配置](#hadoop配置)
* [外部组件部署](#外部组件部署)
* [Victoria Metrics](#victoria-metrics)
* [启动脚本](#启动脚本)
* [采集配置](#采集配置)
* [Loki](#loki)
* [配置文件](#配置文件)
* [Grafana](#grafana)
* [数据源配置](#数据源配置)
* [配置数据库](#配置数据库)
* [部署配置](#部署配置)
* [接口机目录规划](#接口机目录规划)
* [脚本生成](#脚本生成)
* [启动上传服务](#启动上传服务)
* [编译打包项目](#编译打包项目)
* [启动应用](#启动应用)
* [运维](#运维)
* [命令行](#命令行)
* [配置操作](#配置操作)
* [查看配置信息](#查看配置信息)
* [新增配置信息](#新增配置信息)
* [批量新增配置信息](#批量新增配置信息)
* [任务操作](#任务操作)
* [查看同步任务状态](#查看同步任务状态)
* [启停同步任务](#启停同步任务)
* [批量启停同步任务](#批量启停同步任务)
* [Web](#web)
* [概览](#概览)
* [表任务](#表任务)
* [Flink任务详情](#flink任务详情)
* [表任务详情](#表任务详情)
* [同步情况](#同步情况)
* [压缩情况](#压缩情况)
* [历史压缩](#历史压缩)
* [时间线](#时间线)
* [Pulsar队列](#pulsar队列)
* [Hudi表结构](#hudi表结构)
* [压缩队列](#压缩队列)
* [跨天](#跨天)
* [同步集群](#同步集群)
* [压缩集群](#压缩集群)
* [Cloud](#cloud)
* [小工具](#小工具)
* [查询时间线](#查询时间线)
* [提交压缩任务](#提交压缩任务)
* [批量提交压缩任务](#批量提交压缩任务)
* [停止所有压缩任务](#停止所有压缩任务)
* [离线检索](#离线检索)
* [查询记录](#查询记录)
* [检索最后操作时间](#检索最后操作时间)
* [HDFS](#hdfs)
* [案例](#案例)
* [丢主键为xxx的某条数据](#丢主键为xxx的某条数据)
* [xxx表的数据没有到最新](#xxx表的数据没有到最新)
* [开发](#开发)
* [Hudi 运行代码](#hudi-运行代码)
* [sync](#sync)
* [Synchronizer](#synchronizer)
* [Compaction](#compaction)
* [运维服务](#运维服务)
* [部署工具](#部署工具)
<!-- TOC -->
# 概述
围绕Hudi同步和压缩流程建立了一整套的工具和流程。
# 部署
> 由于部份配置需要在源码增删,无法通过配置文件修改,**如果部署新集群建议对master建立分支进行操作避免配置混淆**
## 配置准备
### 主机规划
通常来说,应用部署需要逻辑上规划有
- 一台**接口机**用于操作脚本、存放jar包等。
- 一台**组件机**;用于部署应用配套的外部组件。
- 剩下的作为**应用机**;用于部署应用服务节点。
上述为逻辑规划,原则上对于机器配置没有特别要求,可以混用;如接口机上也用于部署外部组件,同时也作为服务节点部署服务。
组件机上规划有指标和日志汇聚hudi任务多的情况下对IO和磁盘有一定的负载要求建议单独部署。
### 外部资源确认
- Hadoop3
用于存放Hudi数据、Flink任务业务jar包等
- Yarn集群节点到HDFS的增删改查权限
- 应用部署主机到HDFS的增删改查权限
- Zookeeper
用于hudi同步压缩服务注册通常可以使用Hadoop3集群附带的zookeeper
- Yarn集群节点到zookeeper的网络连通
- 应用部署主机到zookeeper的网络连通
- Yarn
用于运行hudi同步压缩任务
- 应用部署主机提交任务的权限
- MySQL
配置数据库;目前和汇聚平台配置库耦合,不能改,只能直接使用汇聚配置数据库
- 应用部署节点到MySQL的访问连通
- Pulsar
数据源
- Yarn集群节点到pulsar的读取连通
- 应用部署主机到pulsar的client连通和admin连通
- 应用部署主机
部署配套应用服务
- JDK 1.8实现OpenJDK标准即可
- Kerberos
- 接口机到应用机的ssh免密用于服务启停部署
#### Hadoop配置
`config`目录下以「hudi数据所有集群名称+hudi同步运行集群名称」的格式建立配置文件夹`b2b12`,并将下列配置文件放在这里
- core-site.xml
- hdfs-site.xml
- yarn-site.xml
- viewfs-mount-table.xml如果需要使用联邦core-site.xml也需要将引用改为相对路径引用
### 外部组件部署
_组件安装包放在研发云制品库:[汇聚平台项目-制品库](https://www.srdcloud.cn/artifact/26151/project#generic)_
odcp-snapshot-generic-local/cluster-tools下面
#### Victoria Metrics
Victoria Metrics是一款替代Prometheus的开源指标采集应用支持分布式部署和指标主动推送完全兼容Prometheus API提供了接近数倍于Prometheus的性能。
_目前项目采用单节点模式部署,性能还有富裕,后期可根据性能需求,改为分布式部署。_
```
victoria_metrics-1.98
├── prometheus.yml 配置文件
├── start_victoria_metrics.sh 启动脚本
├── stop_victoria_metrics.sh 停止脚本
├── version
├── victoria-metrics-prod-amd64 x86架构
└── victoria-metrics-prod-arm64 arm架构
```
##### 启动脚本
启动脚本需要根据部署的主机区分CPU架构修改脚本内容引用合适的可执行文件。
用户名为`EsCFVuNkiDWv7PKmcF`,密码为`Abf%x9ocS^iKr3tgrd`用于保护Victoria Metrics的Web页面。
```Bash
nohup $current_path/victoria-metrics-prod-amd64 \
-search.maxQueryLen=1MB \
-promscrape.maxScrapeSize=100MB \
-loggerLevel=ERROR \
-storageDataPath=$current_path/data \
-retentionPeriod=7d \
-httpAuth.username=EsCFVuNkiDWv7PKmcF \
-httpAuth.password=Abf%x9ocS^iKr3tgrd \
-httpListenAddr=":35710" \
-promscrape.config=$current_path/prometheus.yml \
-promscrape.httpSDCheckInterval=15s \
> $current_path/victoria-metrics.log 2>&1 &
```
启动完成后可以通过脚本指定的端口`35710`访问Web页面即为启动成功。
![](assets/victoria_metrics_web.png)
##### 采集配置
Victoria Metrics的配置文件完全兼容Prometheus可以直接替换使用。
```yaml
global:
scrape_interval: 15s
scrape_timeout: 15s
scrape_configs:
# Victoria Metrics 本身的指标
- job_name: 'Victoria Metrics Single Cluster'
metrics_path: '/metrics'
basic_auth:
username: EsCFVuNkiDWv7PKmcF
password: Abf%x9ocS^iKr3tgrd
static_configs:
- targets:
- '132.126.207.34:35710'
# Loki日志采集
- job_name: 'Loki'
metrics_path: '/metrics'
static_configs:
- targets:
- '132.126.207.34:33100'
# 集群主机管理,根据需要启用
- job_name: 'Node Exporter'
metrics_path: '/metrics'
static_configs:
- targets:
- '132.126.207.124:20110'
# 应用服务的指标采集部署Victoria Metrics的时候可以注释等应用部署完成后再打开
- job_name: 'Service'
metrics_path: '/actuator/prometheus'
basic_auth:
username: AxhEbscwsJDbYMH2
password: cYxg3b4PtWoVD5SjFayWxtnSVsjzRsg4
# 可以通过service-cloud-query模块动态获取各个服务节点的IP
http_sd_configs:
- url: http://132.126.207.34:35690/hudi_services/service_cloud_query/cloud/targets
basic_auth:
username: AxhEbscwsJDbYMH2
password: cYxg3b4PtWoVD5SjFayWxtnSVsjzRsg4
```
#### Loki
Loki是一款日志汇聚工具Grafana公司出品可以通过Grafana直接查询日志相比ELK更为轻量级不需要额外的日志采集组件支持应用直接推送日志对简单的分布式应用更友好。
_根据规划Loki分为服务日志和Hudi日志选择两台机器分别部署Hudi日志较多如果所有日志划分到一个Loki可能会影响服务日志查询_
```
loki-2.9
├── stop_loki.sh 停止脚本
├── version
├── loki-linux-amd64 x86架构
├── loki-linux-arm64 arm架构
├── loki-config.yml 配置文件
├── start_loki.sh 启动脚本
└── logs
```
##### 配置文件
```yaml
# Loki不支持直接配置访问控制官方建议使用Nginx来实现
auth_enabled: false
query_range:
parallelise_shardable_queries: false
server:
# 日志推送端口
http_listen_port: 33100
grpc_listen_port: 39096
log_level: error
grpc_server_max_recv_msg_size: 1572864000
grpc_server_max_send_msg_size: 1572864000
common:
# 指向规划好的数据盘
path_prefix: /data/datalake/data/loki
storage:
filesystem:
chunks_directory: /data/datalake/data/loki/chunks
rules_directory: /data/datalake/data/loki/rules
replication_factor: 1
ring:
instance_addr: 127.0.0.1
kvstore:
store: inmemory
table_manager:
retention_deletes_enabled: true
retention_period: 72h
schema_config:
configs:
- from: 2023-12-06
store: boltdb-shipper
object_store: filesystem
schema: v11
index:
prefix: index_
period: 24h
```
#### Grafana
根据CPU架构选择`grafana-amd64-10.3`或者`grafana-arm64-10.3`部署包。
```
grafana-amd64-10.3
├── data
├── start_grafana.sh
├── stop_grafana.sh
├── npm-artifacts
├── packaging
├── plugins-bundled
├── public
├── bin
├── conf
├── storybook
├── tools
├── VERSION
├── LICENSE
├── NOTICE.md
├── README.md
└── Dockerfile
```
##### 数据源配置
根据部署的组件新增数据源其中Loki区分服务日志和Hudi日志如果没有部署两个可以不区分
![](assets/grafana_datasource.png)
### 配置数据库
`database`路径下,有数据库初始化脚本,其中`tb_app_hudi_job_config``tb_app_yarn_job_config`两张表提供了预设配置。
_建议为每一个环境创建一个单独的database避免不同集群的配置混在一起。_
```
database
├── tb_app_collect_table_info.sql
├── tb_app_collect_table_version.sql
├── tb_app_flink_job_config.sql
├── tb_app_global_config.sql
├── tb_app_hudi_compaction_job.sql
├── tb_app_hudi_compaction_metrics.sql
├── tb_app_hudi_event.sql
├── tb_app_hudi_job_config.sql
├── tb_app_hudi_sync_state.sql
└── tb_app_yarn_job_config.sql
```
### 部署配置
`service-cli/service-cli-runner/src/main/resources/application-b12.yml`为模版,补全应用部署信息
`service-web/src/main/resources/static/common/info.js#commonInfo`为模板补全web页面相关信息
### 接口机目录规划
选择一个空白目录作为根目录(下文简称「根目录」),创建下列子目录。
```
.
├── cloud 服务管理脚本
├── command 应用命令行
├── data 应用数据
├── extra 外部组件
├── logs 应用日志
└── uploader 上传服务
```
## 脚本生成
`bin/generate`
脚本复制到根目录下根据实际情况改动脚本中指向的一些目录、jdk和jar包路径执行脚本正常情况下脚本将会为`cloud``command``uploader`
目录生成脚本。
## 启动上传服务
_如果使用其他文件服务如YTP则不需要操作这一步直接使用其他文件服务即可。_
```
uploader
├── start.sh 启动脚本
├── stop.sh 停止脚本
└── update.sh 更新脚本
```
执行`start.sh`启动上传服务,默认端口为`36800`
## 编译打包项目
将文件服务的url补充到`bin/library.sh`中,执行`bin/build-all.sh`,打包编译全部模块并上传到文件服务。
## 启动应用
`cloud`路径下有已经生成好的服务启动脚本,`deploy-service`开头的脚本为启动脚本脚本默认包含更新并启动保证每次都可以使用最新的jar包启动。
应用包含多个服务,为了方便及时监控应用启动状态,建议按照如下顺序启动:
- service-gateway、service-queue
- service-web
- service-api
- 各种query
- service-cloud-query
- service-flink-query
- service-hudi-query
- service-info-query
- service-pulsar-query
- service-yarn-query
- service-zookeeper-query
- service-launcher-*
- service-executor-manager
- service-monitor
- service-scheduler
由于scheduler包含定时任务通常放在最后确认整个项目都没问题或者同步已经启动并正常运行之后再启动压缩调度模块。
可以配合web页面和grafana日志监控保证服务启动正常通常服务启动完成一个再启动下一个避免服务启动失败导致服务间调用雪崩。
# 运维
应用提供三部分运维方案:
- **命令行**提供命令行操作包括同步的启动、停止等在Web无法使用的情况下做一些简单运维操作
- **Web**:提供表信息、运行总览等,完成常见的运维操作;
- **HDFS**在上述手段出现问题或无法操作的情况下直接访问HDFS查看hudi表相关的状态。
## 命令行
为了避免页面网络不同导致无法运维操作同时也为了方便运维人员在脚本中直接操作hudi服务提供了命令行工具用于操作hudi同步任务。
### 配置操作
用于增加同步表配置信息,涉及到`tb_app_flink_job_config``tb_app_collect_table_info`两张表,由于涉及配置项较多,特别提供命令行工具做基本的配置。
_但由于开发和运维的脱钩,往往命令行工具的配置类型不能完全满足运维同事组织同步表的模式,具体的表配置,还是由运维同事自行发挥,或者统一开发再进行配置操作。_
#### 查看配置信息
其中搜索模式包括`REGEX正则表达式``CONTAINS包含``UN_CONTAINS不包含``EQUALS全等``UN_EQUALS不全等`
```
NAME
job-all - 显示 Flink job 配置信息
SYNOPSYS
job-all [[--type] type] [[--pattern] string]
OPTIONS
--type type
搜索模式
[Optional, default = CONTAINS]
--pattern string
搜索值 (除了 REGEX 模式, 其余可使用英文逗号分隔多个值)
[Optional, default = ]
```
#### 新增配置信息
```
NAME
job-add - 新增 Flink job 配置
SYNOPSYS
job-add [--type] table-type [--database] string [--database-type] source-type [--schema] string [--table] string [--hdfs] string [--pulsar-address] string
OPTIONS
--type table-type
表类型 (BIG 为大表, SMALL 为小表, ACCT 为 ACCT 表)
[Mandatory]
--database string
数据源
[Mandatory]
--database-type source-type
数据源类型
[Mandatory]
--schema string
Scheme
[Mandatory]
--table string
表名
[Mandatory]
--hdfs string
HDFS 路径前缀
[Mandatory]
--pulsar-address string
Pulsar Address
[Mandatory]
```
#### 批量新增配置信息
`source`指向的文件,按行分隔,每行用空格分隔字段,字段内容按「新增配置信息」的字段填写。
```
NAME
job-add-batch - 批量新增 Flink job 配置
SYNOPSYS
job-add-batch [--source] string
OPTIONS
--source string
配置文件路径
[Mandatory]
```
### 任务操作
针对hudi同步任务的操作。
#### 查看同步任务状态
默认查询全部任务,也可以通过关键词筛选。
```
NAME
yarn-all - 查询表同步任务状态
SYNOPSYS
yarn-all [[--pattern] string] [--show-flink-job-id] [--running] [--un-running]
OPTIONS
--pattern string
搜索值 (包含查询)
[Optional, default = ]
--show-flink-job-id 展示结果列表的 Flink job id
[Optional, default = false]
--running 展示运行的任务
[Optional, default = false]
--un-running 展示非运行的任务
[Optional, default = false]
```
![](assets/hudi_command_yarn_all.png)
#### 启停同步任务
`--ignore-check`参数在其他脚本,如定时任务,的时候比较方便,可以跳过二次确认。
```
NAME
yarn-run - 启动表同步任务
SYNOPSYS
yarn-run [--flink-job-id] long [--ignore-check]
OPTIONS
--flink-job-id long
Flink job id
[Mandatory]
--ignore-check Ignore double check
[Optional, default = false]
```
![](assets/hudi_command_yarn_run.png)
```
NAME
yarn-run-batch - 批量启动表同步任务
SYNOPSYS
yarn-run-batch [--flink-job-ids] long[] [--ignore-check]
OPTIONS
--flink-job-ids long[]
Flink job id
[Mandatory]
--ignore-check Ignore double check
[Optional, default = false]
```
```
NAME
yarn-run-un-running - 启动所有未运行的表同步任务
SYNOPSYS
yarn-run-un-running [--ignore-check]
OPTIONS
--ignore-check Ignore double check
[Optional, default = false]
```
#### 批量启停同步任务
```
NAME
yarn-kill - 停止表同步任务
SYNOPSYS
yarn-kill [--flink-job-id] long [--ignore-check]
OPTIONS
--flink-job-id long
Flink job id
[Mandatory]
--ignore-check Ignore double check
[Optional, default = false]
```
![](assets/hudi_command_yarn_kill.png)
```
NAME
yarn-kill-batch - 批量停止表同步任务
SYNOPSYS
yarn-kill-batch [--flink-job-ids] long[] [--ignore-check]
OPTIONS
--flink-job-ids long[]
Flink job id
[Mandatory]
--ignore-check Ignore double check
[Optional, default = false]
```
```
NAME
yarn-kill-running - 停止所有正在运行的表同步任务
SYNOPSYS
yarn-kill-running [--ignore-check]
OPTIONS
--ignore-check Ignore double check
[Optional, default = false]
```
## Web
### 概览
概览页面可以看到应用的主要运行情况,从上到下分别有:
- 表数量
- 逻辑表(根据上游源表名去重得到)
- 湖底表根据目标hudi表路径去重得到
- 嗨福表根据hive表名去重得到
- Flink运行同步任务数量
- flink任务数
- flink任务下对应表总数
- hudi同步集群情况
- hudi压缩集群情况
- 跨天情况
- 重点表跨天情况
- 普通表跨天情况
- 压缩调度定时策略
![](assets/hudi_web_override.png)
### 表任务
可以按表级别查询到相关配置、运行、周边信息,完成常见的表运维跟踪。
![](assets/hudi_web_tables.png)
#### Flink任务详情
点击`Flink job id`项目可以打开Flink任务详情页面。
![](assets/hudi_web_flink_job_detail.png)
#### 表任务详情
点击`别名`项目可以打开表任务详情页面,在这个页面主要可以查看表本身的配置信息,尤其是字段信息。
![](assets/hudi_web_table_meta_detail.png)
##### 同步情况
查看Flink同步任务的详情和历史情况方便直接跳转日志查看。
![](assets/hudi_web_table_meta_detail_sync.png)
##### 压缩情况
查看Flink同步任务的详情和历史情况方便直接跳转日志查看由于压缩任务运行在多集群历史任务保留多久会受到各个集群任务量、配置的限制如果超出集群保留任务列表的限制比如b1集群任务列表整个集群最后10000条就会在这里查询不到在使用的时候需要注意这一点。
![](assets/hudi_web_flink_job_detail_compaction.png)
##### 历史压缩
记录在数据库中的历史压缩情况,由各个服务端维护,方便查询历史压缩情况,不受集群保留历史任务限制。
![](assets/hudi_web_flink_job_detail_compaction_history.png)
##### 时间线
查询实时hudi表时间线可以更清晰地看到压缩情况和同步情况对于查询压缩任务启停时间点以及压缩任务包含文件数非常有帮助。
![](assets/hudi_web_flink_job_detail_timeline.png)
##### Pulsar队列
用于查询Pulsar Topic的相关信息查看Reader积压情况生产者是否在线等。
![](assets/hudi_web_table_meta_detail_pulsar.png)
##### Hudi表结构
常用于查看实际的表结构是否和配置表相同判断hudi表有没有更新表结构。
![](assets/hudi_web_table_meta_detail_struct.png)
### 压缩队列
查看各个压缩队列的任务详情,方便查找问题任务。
![](assets/hudi_web_queue.png)
### 跨天
查看一些简单的跨天信息。
![](assets/hudi_web_version.png)
### 同步集群
常用页面用于监控同步集群中的同步任务的启停状态Yarn页面不能自动刷新状态显示不清晰这个界面基本可以替代Yarn页面页面默认筛选hudi同步任务也可以方便跳转同步任务对应的Yarn页面和日志。
![](assets/hudi_web_sync.png)
### 压缩集群
和「同步集群」页面功能一致,这个页面还能聚合各个集群在同一个表格中,方便同时查看多个集群内的压缩任务。
![](assets/hudi_web_compaction.png)
### Cloud
查看服务部署实时状态和方便跳转服务日志。
![](assets/hudi_web_cloud_service.png)
### 小工具
提供一些实际运维过程中需要的小工具。
#### 查询时间线
根据指定的HDFS路径查询常有一些情况需要查询备份目录下的hudi表情况这些备份表不在配置体系里面可以通过这个工具直接查看指定路径下的hudi表时间线、表结构。
![](assets/hudi_web_tools_timeline.png)
#### 提交压缩任务
手动提交压缩任务,可以指定压缩任务执行的集群。
![](assets/hudi_web_tools_submit_compaction.png)
#### 批量提交压缩任务
批量提交压缩任务一行一个hudi表使用空格分开`flink_job_id``alias`
![](assets/hudi_web_tools_submit_compaction_batch.png)
#### 停止所有压缩任务
停止压缩任务往往遍布各个集群,压缩任务又可能会存在各个压缩队列里,这个工具可以直接清除各个压缩队列里的任务、停止各个集群里正在运行的压缩任务。
![](assets/hudi_web_tools_stop_all.png)
### 离线检索
使用Flink运行一些需要耗时较长的查询任务和hudi同步任务一样提交到Yarn集群上可以做一些牛逼操作查询结果在页面下方的任务列表中查看。
_提交Flink任务有一定的延时尤其在调度压缩任务的时候可能会由于资源不足导致提交失败要观察一些不要多次提交避免消耗集群资源。_
#### 查询记录
可以查询包含指定字符串通常是主键的记录横跨Pulsar、hudi日志、hudi数据通常用于丢数据时找到数据丢在哪一个环节解析出记录的时间线
_有时候Pulsar查询可能会有一点问题导致整个任务失败建议Pulsar查询单独启动一个任务。_
![](assets/hudi_web_offline_query.png)
#### 检索最后操作时间
查询hudi表最后操作时间逐条检索data文件的记录这个时间应该会比较准确通常用于想知道某个表的准确最新更新时间。
![](assets/hudi_web_offline_query_latest_time.png)
![](assets/hudi_web_offline_query_latest_time_running.png)
![](assets/hudi_web_offline_query_latest_time_result.png)
## HDFS
由于运维工具的与实时配置库深度绑定有时候针对临时环境、备份hudi表、测试环境等没有运维工具辅助可以直接查看hudi表实际结构来做简单的运维判断这一节内容介绍一些和hudi表有关的结构辅助做一些简单的运维判断。
一个完整的hudi表有以下部分构成
```
dws_acct_item_gz
├── .hoodie
│ ├── .aux
│ ├── .schema
│ ├── .temp
│ ├── 19700101000000001.deltacommit
│ ├── 19700101000000001.deltacommit.inflight
│ ├── 19700101000000001.deltacommit.requested
│ ├── 20240312151719746.deltacommit
│ ├── 20240312151719746.deltacommit.inflight
│ ├── 20240312151719746.deltacommit.requested
│ ├── 20240312153252615.commit
│ ├── 20240312153252615.compaction.inflight
│ ├── 20240312153252615.compaction.requested
│ ├── archived
│ └── hoodie.properties
└── 200
├── .00000911-5f8f-4a72-9687-d78a0ec7ec5f_19700101000000001.log.1_71-200-0
├── .00000911-5f8f-4a72-9687-d78a0ec7ec5f_20240312153252615.log.1_71-200-0
└── 00000911-5f8f-4a72-9687-d78a0ec7ec5f_4-10-0_19700101000000001.parquet
```
其中,`.hoodie`文件夹通常被称为「时间线」记录了hudi表的各种操作时间点、顺序和操作内容`requested`结尾的时间线文件为**操作请求**,表示一个操作的开始,文件内容往往为将要操作的内容;`inflight`结尾的时间线文件为**操作进行中标志**,意思是这个操作正在进行中;没有结尾的文件为**操作完成标志**,表示这个操作已经完成,文件内容往往为一些操作结果,特别的,压缩操作的完成标志,结尾为`commit`
时间线可以帮助运维人员了解hudi表的操作的详细时间点当压缩或者同步出现问题的时候往往会体现在时间线上时间线的操作类型还有很多可以在源码中了解。
分区文件夹中存放的是hudi表的实际数据其中文件命中带有`log`的文件为增量文件也称为日志文件存放的是checkpoint之间写入磁盘的增量数据经过压缩操作之后增量数据就会被整合成为`parquet`结尾的数据文件;类似`5f8f-4a72-9687-d78a0ec7ec5f`这样的字符串被称为`file_id`通常一个file_id会对应多个日志文件和一个数据文件类似`19700101000000001``20240312153252615`这样的字符串被称为「时间点」和时间线上的操作对应多个时间点对应的文件会同时存在这是hudi配置里定义的保留文件版本用于hudi错误回退。
## 案例
### 丢主键为xxx的某条数据
### xxx表的数据没有到最新
# 开发
主要分为三部分,包含 Hudi 运行代码、运维服务和部署工具。
```plantuml
@startuml
skinparam Dpi 500
title 系统架构图
entity 外部应用 as external_apps
cloud Hadoop集群 as hadoop {
rectangle Hudi同步任务 as hudi_task
}
rectangle Hudi服务群 as hudi_services
rectangle 汇聚平台 as odcp
entity 业务平台 as datasource
datasource -> odcp:数据采集
odcp -right-> hudi_task:数据加工/推送
hudi_services -u.> hudi_task:Hudi同步任务提交
hudi_task -> external_apps:提供数据
```
```plantuml
@startuml
'hide circle
title 功能架构图
'rectangle 业务应用 as source {
' database 业务数据源
'}
'rectangle 汇聚平台 as odcp {
' rectangle 数据采集 as o1
' rectangle 数据转换 as o2
' rectangle 数据加工 as o3
' rectangle 数据管理 as o4
'
' o1 .[hidden] o2
' o2 .[hidden] o3
' o3 .[hidden] o4
'}
@enduml
```
```plantuml
@startuml
'skinparam Linetype ortho
skinparam Dpi 500
title 技术架构图
cloud Yarn集群 as yarn {
rectangle Hudi同步压缩任务 as sc
rectangle "..." as other
sc .[hidden] other
}
cloud Yarn集群 as yarn2
database HDFS as hdfs
database HDFS as hdfs2
database Zookeeper as zk
rectangle 汇聚平台 {
database "MySQL" as db
queue "Pulsar集群" as pulsar
db -u[hidden]- pulsar
}
rectangle "Hudi服务群" as service {
rectangle 调度服务 as schedule
rectangle "查询服务" as query
rectangle web控制台 as web
web --> query:查询
schedule <--> query:调度详情
}
rectangle 外部应用 as app {
rectangle Flink as flink
rectangle Spark as spark
rectangle Hive as hive
flink .[hidden] spark
spark .[hidden] hive
}
schedule -u-> yarn:"提交Hudi同步/压缩任务"
pulsar -u-> sc:数据提供
sc -l-> hdfs:数据输出
hdfs <-u- flink:查询
hdfs <-u- spark:查询
hdfs <-u- hive:查询
pulsar -> query:元数据查询
query <- db: 配置信息
query <-- hdfs2: Hudi信息
query <-- zk: 锁信息
query <-- yarn2: 集群信息
@enduml
```
## Hudi 运行代码
### sync
sync模块包含hudi运行的全部业务逻辑。
项目采用flink作为hudi的运行平台。flink的运行模式为`flink on yarn application`这个模式下每次调度服务向yarn提交任务都会在yarn上启动一个单独的flink集群一个flink集群由一个jobmanager和多个taskmanager组成再在这个独立的flink集群中运行sync模块。
```plantuml
@startuml
'skinparam Linetype ortho
'skinparam Dpi 500
title sync模块和相关模块的关系
database HDFS as hdfs {
component sync模块 as s1
}
rectangle Hudi服务群 as hudi_services {
package 调度服务 as scheduler {
component flink运行依赖 as f1
component hudi运行依赖 as h1
f1 .[hidden] h1
}
}
cloud yarn集群 as yarn {
package Hudi任务 as hudi_task {
component flink运行依赖 as f2
component hudi运行依赖 as h2
component sync模块 as s2
f2 .[hidden] h2
h2 .[hidden] s2
}
}
f1 .. f2:运行依赖提交到集群
h1 .. h2:运行依赖提交到集群
s1 .. s2:从hdfs将sync模块下载到任务中
@enduml
```
### Synchronizer
数据同步模块,用于同步数据的业务代码,`com.lanyuanxiaoyao.service.sync.Synchronizer#main`包含所有的业务流程。
```plantuml
@startuml
title 主要逻辑业务流程
start
:接收传入参数;
if (注册zk锁节点) is (成功) then
:构造flink基本参数;
:判断同步任务类型构造不同的hudi参数;
:启动flink任务;
else (失败)
stop
endif
partition flink任务流程 {
repeat :接收pulsar消息;
:转换pulsar json消息为Record对象;
if (是否为跨天消息) is (是) then
else (不是)
:记录操作时间;
endif
repeat while (消息是否合法) is (否) not (是)
:构造Hudi Schema;
:Record对象转换为flink row;
}
stop
@enduml
```
### Compaction
数据压缩模块,用于压缩数据的业务代码,`com.lanyuanxiaoyao.service.sync.Compaction#main`包含所有的业务流程。
## 运维服务
运维服务包含运行时维护一些查询信息相关的服务
## 部署工具