2024-06-17 18:48:49 +08:00
2024-03-15 10:17:03 +08:00
2024-04-18 16:23:14 +08:00

概述

围绕Hudi同步和压缩流程建立了一整套的工具和流程。

部署

由于部份配置需要在源码增删,无法通过配置文件修改,如果部署新集群建议对master建立分支进行操作避免配置混淆

配置准备

主机规划

通常来说,应用部署需要逻辑上规划有

  • 一台接口机用于操作脚本、存放jar包等。
  • 一台组件机;用于部署应用配套的外部组件。
  • 剩下的作为应用机;用于部署应用服务节点。

上述为逻辑规划,原则上对于机器配置没有特别要求,可以混用;如接口机上也用于部署外部组件,同时也作为服务节点部署服务。

组件机上规划有指标和日志汇聚hudi任务多的情况下对IO和磁盘有一定的负载要求建议单独部署。

外部资源确认

  • Hadoop3
    用于存放Hudi数据、Flink任务业务jar包等
    • Yarn集群节点到HDFS的增删改查权限
    • 应用部署主机到HDFS的增删改查权限
  • Zookeeper
    用于hudi同步压缩服务注册通常可以使用Hadoop3集群附带的zookeeper
    • Yarn集群节点到zookeeper的网络连通
    • 应用部署主机到zookeeper的网络连通
  • Yarn
    用于运行hudi同步压缩任务
    • 应用部署主机提交任务的权限
  • MySQL
    配置数据库;目前和汇聚平台配置库耦合,不能改,只能直接使用汇聚配置数据库
    • 应用部署节点到MySQL的访问连通
  • Pulsar
    数据源
    • Yarn集群节点到pulsar的读取连通
    • 应用部署主机到pulsar的client连通和admin连通
  • 应用部署主机
    部署配套应用服务
    • JDK 1.8实现OpenJDK标准即可
    • Kerberos
    • 接口机到应用机的ssh免密用于服务启停部署

Hadoop配置

config目录下以「hudi数据所有集群名称+hudi同步运行集群名称」的格式建立配置文件夹b2b12,并将下列配置文件放在这里

  • core-site.xml
  • hdfs-site.xml
  • yarn-site.xml
  • viewfs-mount-table.xml如果需要使用联邦core-site.xml也需要将引用改为相对路径引用

外部组件部署

组件安装包放在研发云制品库:汇聚平台项目-制品库 odcp-snapshot-generic-local/cluster-tools下面

Victoria Metrics

Victoria Metrics是一款替代Prometheus的开源指标采集应用支持分布式部署和指标主动推送完全兼容Prometheus API提供了接近数倍于Prometheus的性能。

目前项目采用单节点模式部署,性能还有富裕,后期可根据性能需求,改为分布式部署。

victoria_metrics-1.98
 ├── prometheus.yml                配置文件
 ├── start_victoria_metrics.sh     启动脚本
 ├── stop_victoria_metrics.sh      停止脚本
 ├── version
 ├── victoria-metrics-prod-amd64   x86架构
 └── victoria-metrics-prod-arm64   arm架构
启动脚本

启动脚本需要根据部署的主机区分CPU架构修改脚本内容引用合适的可执行文件。

用户名为EsCFVuNkiDWv7PKmcF,密码为Abf%x9ocS^iKr3tgrd用于保护Victoria Metrics的Web页面。

nohup $current_path/victoria-metrics-prod-amd64 \
  -search.maxQueryLen=1MB \
  -promscrape.maxScrapeSize=100MB \
  -loggerLevel=ERROR \
  -storageDataPath=$current_path/data \
  -retentionPeriod=7d \
  -httpAuth.username=EsCFVuNkiDWv7PKmcF \
  -httpAuth.password=Abf%x9ocS^iKr3tgrd \
  -httpListenAddr=":35710" \
  -promscrape.config=$current_path/prometheus.yml \
  -promscrape.httpSDCheckInterval=15s \
  > $current_path/victoria-metrics.log 2>&1 &

启动完成后可以通过脚本指定的端口35710访问Web页面即为启动成功。

采集配置

Victoria Metrics的配置文件完全兼容Prometheus可以直接替换使用。

global:
  scrape_interval: 15s
  scrape_timeout: 15s

scrape_configs:
  # Victoria Metrics 本身的指标
  - job_name: 'Victoria Metrics Single Cluster'
    metrics_path: '/metrics'
    basic_auth:
      username: EsCFVuNkiDWv7PKmcF
      password: Abf%x9ocS^iKr3tgrd
    static_configs:
      - targets:
          - '132.126.207.34:35710'
  # Loki日志采集
  - job_name: 'Loki'
    metrics_path: '/metrics'
    static_configs:
      - targets:
          - '132.126.207.34:33100'
  # 集群主机管理,根据需要启用
  - job_name: 'Node Exporter'
    metrics_path: '/metrics'
    static_configs:
      - targets:
          - '132.126.207.124:20110'
  # 应用服务的指标采集部署Victoria Metrics的时候可以注释等应用部署完成后再打开
  - job_name: 'Service'
    metrics_path: '/actuator/prometheus'
    basic_auth:
      username: AxhEbscwsJDbYMH2
      password: cYxg3b4PtWoVD5SjFayWxtnSVsjzRsg4
    # 可以通过service-cloud-query模块动态获取各个服务节点的IP
    http_sd_configs:
      - url: http://132.126.207.34:35690/hudi_services/service_cloud_query/cloud/targets
        basic_auth:
          username: AxhEbscwsJDbYMH2
          password: cYxg3b4PtWoVD5SjFayWxtnSVsjzRsg4

Loki

Loki是一款日志汇聚工具Grafana公司出品可以通过Grafana直接查询日志相比ELK更为轻量级不需要额外的日志采集组件支持应用直接推送日志对简单的分布式应用更友好。

根据规划Loki分为服务日志和Hudi日志选择两台机器分别部署Hudi日志较多如果所有日志划分到一个Loki可能会影响服务日志查询

loki-2.9
 ├── stop_loki.sh       停止脚本
 ├── version
 ├── loki-linux-amd64   x86架构
 ├── loki-linux-arm64   arm架构
 ├── loki-config.yml    配置文件
 ├── start_loki.sh      启动脚本
 └── logs
配置文件
# Loki不支持直接配置访问控制官方建议使用Nginx来实现
auth_enabled: false

query_range:
  parallelise_shardable_queries: false

server:
  # 日志推送端口
  http_listen_port: 33100
  grpc_listen_port: 39096
  log_level: error
  grpc_server_max_recv_msg_size: 1572864000
  grpc_server_max_send_msg_size: 1572864000

common:
  # 指向规划好的数据盘
  path_prefix: /data/datalake/data/loki
  storage:
    filesystem:
      chunks_directory: /data/datalake/data/loki/chunks
      rules_directory: /data/datalake/data/loki/rules
  replication_factor: 1
  ring:
    instance_addr: 127.0.0.1
    kvstore:
      store: inmemory

table_manager:
  retention_deletes_enabled: true
  retention_period: 72h

schema_config:
  configs:
    - from: 2023-12-06
      store: boltdb-shipper
      object_store: filesystem
      schema: v11
      index:
        prefix: index_
        period: 24h

Grafana

根据CPU架构选择grafana-amd64-10.3或者grafana-arm64-10.3部署包。

grafana-amd64-10.3
 ├── data
 ├── start_grafana.sh
 ├── stop_grafana.sh
 ├── npm-artifacts
 ├── packaging
 ├── plugins-bundled
 ├── public
 ├── bin
 ├── conf
 ├── storybook
 ├── tools
 ├── VERSION
 ├── LICENSE
 ├── NOTICE.md
 ├── README.md
 └── Dockerfile
数据源配置

根据部署的组件新增数据源其中Loki区分服务日志和Hudi日志如果没有部署两个可以不区分

配置数据库

database路径下,有数据库初始化脚本,其中tb_app_hudi_job_configtb_app_yarn_job_config两张表提供了预设配置。

建议为每一个环境创建一个单独的database避免不同集群的配置混在一起。

database
 ├── tb_app_collect_table_info.sql
 ├── tb_app_collect_table_version.sql
 ├── tb_app_flink_job_config.sql
 ├── tb_app_global_config.sql
 ├── tb_app_hudi_compaction_job.sql
 ├── tb_app_hudi_compaction_metrics.sql
 ├── tb_app_hudi_event.sql
 ├── tb_app_hudi_job_config.sql
 ├── tb_app_hudi_sync_state.sql
 └── tb_app_yarn_job_config.sql

部署配置

service-cli/service-cli-runner/src/main/resources/application-b12.yml为模版,补全应用部署信息

service-web/src/main/resources/static/common/info.js#commonInfo为模板补全web页面相关信息

接口机目录规划

选择一个空白目录作为根目录(下文简称「根目录」),创建下列子目录。

.
 ├── cloud      服务管理脚本
 ├── command    应用命令行
 ├── data       应用数据
 ├── extra      外部组件
 ├── logs       应用日志
 └── uploader   上传服务

脚本生成

bin/generate 脚本复制到根目录下根据实际情况改动脚本中指向的一些目录、jdk和jar包路径执行脚本正常情况下脚本将会为cloudcommanduploader 目录生成脚本。

启动上传服务

如果使用其他文件服务如YTP则不需要操作这一步直接使用其他文件服务即可。

uploader
  ├── start.sh    启动脚本
  ├── stop.sh     停止脚本
  └── update.sh   更新脚本

执行start.sh启动上传服务,默认端口为36800

编译打包项目

将文件服务的url补充到bin/library.sh中,执行bin/build-all.sh,打包编译全部模块并上传到文件服务。

启动应用

cloud路径下有已经生成好的服务启动脚本,deploy-service开头的脚本为启动脚本脚本默认包含更新并启动保证每次都可以使用最新的jar包启动。

应用包含多个服务,为了方便及时监控应用启动状态,建议按照如下顺序启动:

  • service-gateway、service-queue
  • service-web
  • service-api
  • 各种query
    • service-cloud-query
    • service-flink-query
    • service-hudi-query
    • service-info-query
    • service-pulsar-query
    • service-yarn-query
    • service-zookeeper-query
  • service-launcher-*
  • service-executor-manager
  • service-monitor
  • service-scheduler

由于scheduler包含定时任务通常放在最后确认整个项目都没问题或者同步已经启动并正常运行之后再启动压缩调度模块。

可以配合web页面和grafana日志监控保证服务启动正常通常服务启动完成一个再启动下一个避免服务启动失败导致服务间调用雪崩。

运维

应用提供三部分运维方案:

  • 命令行提供命令行操作包括同步的启动、停止等在Web无法使用的情况下做一些简单运维操作
  • Web:提供表信息、运行总览等,完成常见的运维操作;
  • HDFS在上述手段出现问题或无法操作的情况下直接访问HDFS查看hudi表相关的状态。

命令行

为了避免页面网络不同导致无法运维操作同时也为了方便运维人员在脚本中直接操作hudi服务提供了命令行工具用于操作hudi同步任务。

配置操作

用于增加同步表配置信息,涉及到tb_app_flink_job_configtb_app_collect_table_info两张表,由于涉及配置项较多,特别提供命令行工具做基本的配置。

但由于开发和运维的脱钩,往往命令行工具的配置类型不能完全满足运维同事组织同步表的模式,具体的表配置,还是由运维同事自行发挥,或者统一开发再进行配置操作。

查看配置信息

其中搜索模式包括REGEX正则表达式CONTAINS包含UN_CONTAINS不包含EQUALS全等UN_EQUALS不全等

NAME
	job-all - 显示 Flink job 配置信息

SYNOPSYS
	job-all [[--type] type]  [[--pattern] string]

OPTIONS
	--type  type
		搜索模式
		[Optional, default = CONTAINS]

	--pattern  string
		搜索值 (除了 REGEX 模式, 其余可使用英文逗号分隔多个值)
		[Optional, default = ]

新增配置信息

NAME
	job-add - 新增 Flink job 配置

SYNOPSYS
	job-add [--type] table-type  [--database] string  [--database-type] source-type  [--schema] string  [--table] string  [--hdfs] string  [--pulsar-address] string

OPTIONS
	--type  table-type
		表类型 (BIG 为大表, SMALL 为小表, ACCT 为 ACCT 表)
		[Mandatory]

	--database  string
		数据源
		[Mandatory]

	--database-type  source-type
		数据源类型
		[Mandatory]

	--schema  string
		Scheme
		[Mandatory]

	--table  string
		表名
		[Mandatory]

	--hdfs  string
		HDFS 路径前缀
		[Mandatory]

	--pulsar-address  string
		Pulsar Address
		[Mandatory]

批量新增配置信息

source指向的文件,按行分隔,每行用空格分隔字段,字段内容按「新增配置信息」的字段填写。

NAME
	job-add-batch - 批量新增 Flink job 配置

SYNOPSYS
	job-add-batch [--source] string

OPTIONS
	--source  string
		配置文件路径
		[Mandatory]

任务操作

针对hudi同步任务的操作。

查看同步任务状态

默认查询全部任务,也可以通过关键词筛选。

NAME
	yarn-all - 查询表同步任务状态

SYNOPSYS
	yarn-all [[--pattern] string]  [--show-flink-job-id]  [--running]  [--un-running]

OPTIONS
	--pattern  string
		搜索值 (包含查询)
		[Optional, default = ]

	--show-flink-job-id	展示结果列表的 Flink job id
		[Optional, default = false]

	--running	展示运行的任务
		[Optional, default = false]

	--un-running	展示非运行的任务
		[Optional, default = false]

启停同步任务

--ignore-check参数在其他脚本,如定时任务,的时候比较方便,可以跳过二次确认。

NAME
	yarn-run - 启动表同步任务

SYNOPSYS
	yarn-run [--flink-job-id] long  [--ignore-check]

OPTIONS
	--flink-job-id  long
		Flink job id
		[Mandatory]

	--ignore-check	Ignore double check
		[Optional, default = false]

NAME
	yarn-run-batch - 批量启动表同步任务

SYNOPSYS
	yarn-run-batch [--flink-job-ids] long[]  [--ignore-check]

OPTIONS
	--flink-job-ids  long[]
		Flink job id
		[Mandatory]

	--ignore-check	Ignore double check
		[Optional, default = false]
NAME
	yarn-run-un-running - 启动所有未运行的表同步任务

SYNOPSYS
	yarn-run-un-running [--ignore-check]

OPTIONS
	--ignore-check	Ignore double check
		[Optional, default = false]

批量启停同步任务

NAME
	yarn-kill - 停止表同步任务

SYNOPSYS
	yarn-kill [--flink-job-id] long  [--ignore-check]

OPTIONS
	--flink-job-id  long
		Flink job id
		[Mandatory]

	--ignore-check	Ignore double check
		[Optional, default = false]

NAME
	yarn-kill-batch - 批量停止表同步任务

SYNOPSYS
	yarn-kill-batch [--flink-job-ids] long[]  [--ignore-check]

OPTIONS
	--flink-job-ids  long[]
		Flink job id
		[Mandatory]

	--ignore-check	Ignore double check
		[Optional, default = false]
NAME
	yarn-kill-running - 停止所有正在运行的表同步任务

SYNOPSYS
	yarn-kill-running [--ignore-check]

OPTIONS
	--ignore-check	Ignore double check
		[Optional, default = false]

Web

概览

概览页面可以看到应用的主要运行情况,从上到下分别有:

  • 表数量
    • 逻辑表(根据上游源表名去重得到)
    • 湖底表根据目标hudi表路径去重得到
    • 嗨福表根据hive表名去重得到
  • Flink运行同步任务数量
    • flink任务数
    • flink任务下对应表总数
  • hudi同步集群情况
  • hudi压缩集群情况
  • 跨天情况
    • 重点表跨天情况
    • 普通表跨天情况
  • 压缩调度定时策略

表任务

可以按表级别查询到相关配置、运行、周边信息,完成常见的表运维跟踪。

Flink任务详情

点击Flink job id项目可以打开Flink任务详情页面。

表任务详情

点击别名项目可以打开表任务详情页面,在这个页面主要可以查看表本身的配置信息,尤其是字段信息。

同步情况

查看Flink同步任务的详情和历史情况方便直接跳转日志查看。

压缩情况

查看Flink同步任务的详情和历史情况方便直接跳转日志查看由于压缩任务运行在多集群历史任务保留多久会受到各个集群任务量、配置的限制如果超出集群保留任务列表的限制比如b1集群任务列表整个集群最后10000条就会在这里查询不到在使用的时候需要注意这一点。

历史压缩

记录在数据库中的历史压缩情况,由各个服务端维护,方便查询历史压缩情况,不受集群保留历史任务限制。

时间线

查询实时hudi表时间线可以更清晰地看到压缩情况和同步情况对于查询压缩任务启停时间点以及压缩任务包含文件数非常有帮助。

Pulsar队列

用于查询Pulsar Topic的相关信息查看Reader积压情况生产者是否在线等。

Hudi表结构

常用于查看实际的表结构是否和配置表相同判断hudi表有没有更新表结构。

压缩队列

查看各个压缩队列的任务详情,方便查找问题任务。

跨天

查看一些简单的跨天信息。

同步集群

常用页面用于监控同步集群中的同步任务的启停状态Yarn页面不能自动刷新状态显示不清晰这个界面基本可以替代Yarn页面页面默认筛选hudi同步任务也可以方便跳转同步任务对应的Yarn页面和日志。

压缩集群

和「同步集群」页面功能一致,这个页面还能聚合各个集群在同一个表格中,方便同时查看多个集群内的压缩任务。

Cloud

查看服务部署实时状态和方便跳转服务日志。

小工具

提供一些实际运维过程中需要的小工具。

查询时间线

根据指定的HDFS路径查询常有一些情况需要查询备份目录下的hudi表情况这些备份表不在配置体系里面可以通过这个工具直接查看指定路径下的hudi表时间线、表结构。

提交压缩任务

手动提交压缩任务,可以指定压缩任务执行的集群。

批量提交压缩任务

批量提交压缩任务一行一个hudi表使用空格分开flink_job_idalias

停止所有压缩任务

停止压缩任务往往遍布各个集群,压缩任务又可能会存在各个压缩队列里,这个工具可以直接清除各个压缩队列里的任务、停止各个集群里正在运行的压缩任务。

离线检索

使用Flink运行一些需要耗时较长的查询任务和hudi同步任务一样提交到Yarn集群上可以做一些牛逼操作查询结果在页面下方的任务列表中查看。

提交Flink任务有一定的延时尤其在调度压缩任务的时候可能会由于资源不足导致提交失败要观察一些不要多次提交避免消耗集群资源。

查询记录

可以查询包含指定字符串通常是主键的记录横跨Pulsar、hudi日志、hudi数据通常用于丢数据时找到数据丢在哪一个环节解析出记录的时间线

有时候Pulsar查询可能会有一点问题导致整个任务失败建议Pulsar查询单独启动一个任务。

检索最后操作时间

查询hudi表最后操作时间逐条检索data文件的记录这个时间应该会比较准确通常用于想知道某个表的准确最新更新时间。

HDFS

由于运维工具的与实时配置库深度绑定有时候针对临时环境、备份hudi表、测试环境等没有运维工具辅助可以直接查看hudi表实际结构来做简单的运维判断这一节内容介绍一些和hudi表有关的结构辅助做一些简单的运维判断。

一个完整的hudi表有以下部分构成

dws_acct_item_gz
 ├── .hoodie
 │   ├── .aux
 │   ├── .schema
 │   ├── .temp
 │   ├── 19700101000000001.deltacommit
 │   ├── 19700101000000001.deltacommit.inflight
 │   ├── 19700101000000001.deltacommit.requested
 │   ├── 20240312151719746.deltacommit
 │   ├── 20240312151719746.deltacommit.inflight
 │   ├── 20240312151719746.deltacommit.requested
 │   ├── 20240312153252615.commit
 │   ├── 20240312153252615.compaction.inflight
 │   ├── 20240312153252615.compaction.requested
 │   ├── archived
 │   └── hoodie.properties
 └── 200
     ├── .00000911-5f8f-4a72-9687-d78a0ec7ec5f_19700101000000001.log.1_71-200-0
     ├── .00000911-5f8f-4a72-9687-d78a0ec7ec5f_20240312153252615.log.1_71-200-0
     └── 00000911-5f8f-4a72-9687-d78a0ec7ec5f_4-10-0_19700101000000001.parquet

其中,.hoodie文件夹通常被称为「时间线」记录了hudi表的各种操作时间点、顺序和操作内容requested结尾的时间线文件为操作请求,表示一个操作的开始,文件内容往往为将要操作的内容;inflight结尾的时间线文件为操作进行中标志,意思是这个操作正在进行中;没有结尾的文件为操作完成标志,表示这个操作已经完成,文件内容往往为一些操作结果,特别的,压缩操作的完成标志,结尾为commit

时间线可以帮助运维人员了解hudi表的操作的详细时间点当压缩或者同步出现问题的时候往往会体现在时间线上时间线的操作类型还有很多可以在源码中了解。

分区文件夹中存放的是hudi表的实际数据其中文件命中带有log的文件为增量文件也称为日志文件存放的是checkpoint之间写入磁盘的增量数据经过压缩操作之后增量数据就会被整合成为parquet结尾的数据文件;类似5f8f-4a72-9687-d78a0ec7ec5f这样的字符串被称为file_id通常一个file_id会对应多个日志文件和一个数据文件类似1970010100000000120240312153252615这样的字符串被称为「时间点」和时间线上的操作对应多个时间点对应的文件会同时存在这是hudi配置里定义的保留文件版本用于hudi错误回退。

案例

丢主键为xxx的某条数据

xxx表的数据没有到最新

开发

主要分为三部分,包含 Hudi 运行代码、运维服务和部署工具。

@startuml
skinparam Dpi 500

title 系统架构图

entity 外部应用 as external_apps

cloud Hadoop集群 as hadoop {
  rectangle Hudi同步任务 as hudi_task
}

rectangle Hudi服务群 as hudi_services

rectangle 汇聚平台 as odcp

entity 业务平台 as datasource

datasource -> odcp:数据采集
odcp -right-> hudi_task:数据加工/推送
hudi_services -u.> hudi_task:Hudi同步任务提交
hudi_task -> external_apps:提供数据
@startuml
'hide circle

title 功能架构图

'rectangle 业务应用 as source {
'  database 业务数据源
'}

'rectangle 汇聚平台 as odcp {
'  rectangle 数据采集 as o1
'  rectangle 数据转换 as o2
'  rectangle 数据加工 as o3
'  rectangle 数据管理 as o4
'  
'  o1 .[hidden] o2
'  o2 .[hidden] o3
'  o3 .[hidden] o4
'}
@enduml
@startuml
'skinparam Linetype ortho
skinparam Dpi 500

title 技术架构图

cloud Yarn集群 as yarn {
  rectangle Hudi同步压缩任务 as sc
  rectangle "..." as other
  
  sc .[hidden] other
}

cloud Yarn集群 as yarn2

database HDFS as hdfs
database HDFS as hdfs2
database Zookeeper as zk

rectangle 汇聚平台 {
 database "MySQL" as db
 queue "Pulsar集群" as pulsar
 
 db -u[hidden]- pulsar
}

rectangle "Hudi服务群" as service {
  rectangle 调度服务 as schedule
  rectangle "查询服务" as query
  rectangle web控制台 as web

  web --> query:查询
  schedule <--> query:调度详情
}

rectangle 外部应用 as app {
  rectangle Flink as flink
  rectangle Spark as spark
  rectangle Hive as hive

  flink .[hidden] spark
  spark .[hidden] hive
}

schedule -u-> yarn:"提交Hudi同步/压缩任务"
pulsar -u-> sc:数据提供
sc -l-> hdfs:数据输出
hdfs <-u- flink:查询
hdfs <-u- spark:查询
hdfs <-u- hive:查询

pulsar -> query:元数据查询

query <- db: 配置信息
query <-- hdfs2: Hudi信息
query <-- zk: 锁信息
query <-- yarn2: 集群信息

@enduml

Hudi 运行代码

sync

sync模块包含hudi运行的全部业务逻辑。

项目采用flink作为hudi的运行平台。flink的运行模式为flink on yarn application这个模式下每次调度服务向yarn提交任务都会在yarn上启动一个单独的flink集群一个flink集群由一个jobmanager和多个taskmanager组成再在这个独立的flink集群中运行sync模块。

@startuml
'skinparam Linetype ortho
'skinparam Dpi 500

title sync模块和相关模块的关系

database HDFS as hdfs {
  component sync模块 as s1
}

rectangle Hudi服务群 as hudi_services {
  package 调度服务 as scheduler {
    component flink运行依赖 as f1
    component hudi运行依赖 as h1
    
    f1 .[hidden] h1
  }
}

cloud yarn集群 as yarn {
  package Hudi任务 as hudi_task {
    component flink运行依赖 as f2
    component hudi运行依赖 as h2
    component sync模块 as s2
    
    f2 .[hidden] h2
    h2 .[hidden] s2
  }
}

f1 .. f2:运行依赖提交到集群
h1 .. h2:运行依赖提交到集群
s1 .. s2:从hdfs将sync模块下载到任务中
@enduml

Synchronizer

数据同步模块,用于同步数据的业务代码,com.lanyuanxiaoyao.service.sync.Synchronizer#main包含所有的业务流程。

@startuml
title 主要逻辑业务流程
start
:接收传入参数;
if (注册zk锁节点) is (成功) then
  :构造flink基本参数;
  :判断同步任务类型构造不同的hudi参数;
  :启动flink任务;
else (失败)
  stop
endif
partition flink任务流程 {
  repeat :接收pulsar消息;
  :转换pulsar json消息为Record对象;
  if (是否为跨天消息) is (是) then
  else (不是)
  :记录操作时间;
  endif
  repeat while (消息是否合法) is (否) not (是)
  :构造Hudi Schema;
  :Record对象转换为flink row;
  
}
stop
@enduml

Compaction

数据压缩模块,用于压缩数据的业务代码,com.lanyuanxiaoyao.service.sync.Compaction#main包含所有的业务流程。

运维服务

运维服务包含运行时维护一些查询信息相关的服务

部署工具

Description
一个完整的基于Hudi的同步应用
Readme 32 MiB
Languages
CSS 55.2%
JavaScript 41.3%
Java 3.5%