From bd0a56217d37743d29bd6ffd09909b6f42395ef9 Mon Sep 17 00:00:00 2001 From: lanyuanxiaoyao Date: Fri, 7 Jul 2023 12:33:14 +0800 Subject: [PATCH] =?UTF-8?q?feature(hudi-query):=20=E5=A2=9E=E5=8A=A0?= =?UTF-8?q?=E6=9F=A5=E8=AF=A2=20Hudi=20=E8=A1=A8=E7=BB=93=E6=9E=84?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../service/forest/service/HudiService.java | 7 +++ .../hudi/controller/TableController.java | 43 ++++++++++++++ .../service/hudi/service/TableService.java | 57 +++++++++++++++++++ .../web/controller/HudiController.java | 15 +++++ web/components/common.js | 29 ++++++++++ web/components/tool-tab.js | 47 ++++++++++++++- web/index.html | 2 +- 7 files changed, 198 insertions(+), 2 deletions(-) create mode 100644 service-hudi-query/src/main/java/com/lanyuanxiaoyao/service/hudi/controller/TableController.java create mode 100644 service-hudi-query/src/main/java/com/lanyuanxiaoyao/service/hudi/service/TableService.java diff --git a/service-forest/src/main/java/com/lanyuanxiaoyao/service/forest/service/HudiService.java b/service-forest/src/main/java/com/lanyuanxiaoyao/service/forest/service/HudiService.java index 9d61f32..a631f7c 100644 --- a/service-forest/src/main/java/com/lanyuanxiaoyao/service/forest/service/HudiService.java +++ b/service-forest/src/main/java/com/lanyuanxiaoyao/service/forest/service/HudiService.java @@ -10,6 +10,7 @@ import com.lanyuanxiaoyao.service.configuration.entity.hudi.HudiInstant; import com.lanyuanxiaoyao.service.configuration.entity.hudi.HudiRollbackPlan; import java.util.Map; import org.eclipse.collections.api.list.ImmutableList; +import org.eclipse.collections.api.map.ImmutableMap; /** * Hudi 操作 @@ -45,4 +46,10 @@ public interface HudiService { @Get("/timeline/list_pending_compaction") ImmutableList timelinePendingCompactionList(@Query("flink_job_id") Long flinkJobId, @Query("alias") String alias); + + @Get("/table/schema") + ImmutableMap schema(@Query("flink_job_id") Long flinkJobId, @Query("alias") String alias); + + @Get("/table/schema_hdfs") + ImmutableMap schema(@Query("hdfs") String hdfs); } diff --git a/service-hudi-query/src/main/java/com/lanyuanxiaoyao/service/hudi/controller/TableController.java b/service-hudi-query/src/main/java/com/lanyuanxiaoyao/service/hudi/controller/TableController.java new file mode 100644 index 0000000..2067114 --- /dev/null +++ b/service-hudi-query/src/main/java/com/lanyuanxiaoyao/service/hudi/controller/TableController.java @@ -0,0 +1,43 @@ +package com.lanyuanxiaoyao.service.hudi.controller; + +import com.lanyuanxiaoyao.service.hudi.service.TableService; +import org.eclipse.collections.api.map.ImmutableMap; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.web.bind.annotation.GetMapping; +import org.springframework.web.bind.annotation.RequestMapping; +import org.springframework.web.bind.annotation.RequestParam; +import org.springframework.web.bind.annotation.RestController; + +/** + * Hudi 表 + * + * @author lanyuanxiaoyao + * @date 2023-07-07 + */ +@RestController +@RequestMapping("table") +public class TableController { + private static final Logger logger = LoggerFactory.getLogger(TableController.class); + + private final TableService tableService; + + public TableController(TableService tableService) { + this.tableService = tableService; + } + + @GetMapping("schema") + public ImmutableMap schema( + @RequestParam("flink_job_id") Long flinkJobId, + @RequestParam("alias") String alias + ) throws Exception { + return tableService.schema(flinkJobId, alias); + } + + @GetMapping("schema_hdfs") + public ImmutableMap schema( + @RequestParam("hdfs") String hdfs + ) throws Exception { + return tableService.schema(hdfs); + } +} diff --git a/service-hudi-query/src/main/java/com/lanyuanxiaoyao/service/hudi/service/TableService.java b/service-hudi-query/src/main/java/com/lanyuanxiaoyao/service/hudi/service/TableService.java new file mode 100644 index 0000000..5bb1eb0 --- /dev/null +++ b/service-hudi-query/src/main/java/com/lanyuanxiaoyao/service/hudi/service/TableService.java @@ -0,0 +1,57 @@ +package com.lanyuanxiaoyao.service.hudi.service; + +import com.eshore.odcp.hudi.connector.entity.TableMeta; +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.lanyuanxiaoyao.service.forest.service.InfoService; +import org.apache.hadoop.conf.Configuration; +import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.common.table.TableSchemaResolver; +import org.apache.hudi.org.apache.avro.Schema; +import org.eclipse.collections.api.map.ImmutableMap; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.cache.annotation.Cacheable; +import org.springframework.http.converter.json.Jackson2ObjectMapperBuilder; +import org.springframework.retry.annotation.Retryable; +import org.springframework.stereotype.Service; + +/** + * Hudi 表服务 + * + * @author lanyuanxiaoyao + * @date 2023-07-07 + */ +@Service +public class TableService { + private static final Logger logger = LoggerFactory.getLogger(TableService.class); + + private final ObjectMapper mapper; + private final InfoService infoService; + + @SuppressWarnings("SpringJavaInjectionPointsAutowiringInspection") + public TableService(Jackson2ObjectMapperBuilder builder, InfoService infoService) { + this.mapper = builder.build(); + this.infoService = infoService; + } + + @Cacheable(value = "table-schema", sync = true) + @Retryable(Throwable.class) + public ImmutableMap schema(Long flinkJobId, String alias) throws Exception { + TableMeta meta = infoService.tableMetaDetail(flinkJobId, alias); + return schema(meta.getHudi().getTargetHdfsPath()); + } + + @Cacheable(value = "table-schema", sync = true) + @Retryable(Throwable.class) + public ImmutableMap schema(String hdfs) throws Exception { + HoodieTableMetaClient client = HoodieTableMetaClient.builder() + .setConf(new Configuration()) + .setBasePath(hdfs) + .build(); + TableSchemaResolver schemaUtil = new TableSchemaResolver(client); + Schema schema = schemaUtil.getTableAvroSchema(true); + return mapper.readValue(schema.toString(), new TypeReference>() { + }); + } +} diff --git a/service-web/src/main/java/com/lanyuanxiaoyao/service/web/controller/HudiController.java b/service-web/src/main/java/com/lanyuanxiaoyao/service/web/controller/HudiController.java index 3b30689..aa2967c 100644 --- a/service-web/src/main/java/com/lanyuanxiaoyao/service/web/controller/HudiController.java +++ b/service-web/src/main/java/com/lanyuanxiaoyao/service/web/controller/HudiController.java @@ -9,6 +9,7 @@ import com.lanyuanxiaoyao.service.configuration.entity.hudi.HudiInstant; import com.lanyuanxiaoyao.service.configuration.entity.hudi.HudiRollbackPlan; import com.lanyuanxiaoyao.service.forest.service.HudiService; import com.lanyuanxiaoyao.service.web.controller.base.AmisCrudResponse; +import com.lanyuanxiaoyao.service.web.controller.base.AmisDetailResponse; import com.lanyuanxiaoyao.service.web.controller.base.AmisResponse; import com.lanyuanxiaoyao.service.web.controller.base.BaseController; import java.util.List; @@ -148,4 +149,18 @@ public class HudiController extends BaseController { } throw new Exception("Flink job id and alias or hdfs cannot be blank"); } + + @GetMapping("schema") + public AmisDetailResponse readCleanerPlan( + @RequestParam(value = "flink_job_id", required = false) Long flinkJobId, + @RequestParam(value = "alias", required = false) String alias, + @RequestParam(value = "hdfs", required = false) String hdfs + ) throws Exception { + if (StrUtil.isNotBlank(hdfs)) { + return AmisResponse.responseDetailData(hudiService.schema(hdfs)); + } else if (ObjectUtil.isNotNull(flinkJobId) && StrUtil.isNotBlank(alias)) { + return AmisResponse.responseDetailData(hudiService.schema(flinkJobId, alias)); + } + throw new Exception("Flink job id and alias or hdfs cannot be blank"); + } } diff --git a/web/components/common.js b/web/components/common.js index 0890a2a..e72addb 100644 --- a/web/components/common.js +++ b/web/components/common.js @@ -1556,6 +1556,35 @@ function tableMetaDialog() { } } }, + { + type: 'button', + label: 'Hudi 表结构', + icon: 'fa fa-table', + actionType: 'dialog', + dialog: { + title: 'Hudi 表结构', + actions: [], + body: { + type: 'service', + api: { + method: 'get', + url: '${base}/hudi/schema', + data: { + flink_job_id: '${flinkJobId|default:undefined}', + alias: '${tableMeta.alias|default:undefined}', + }, + }, + body: { + type: 'page', + body: { + type: 'json', + source: '${detail}', + levelExpand: 3, + } + } + } + } + }, ] }, ] diff --git a/web/components/tool-tab.js b/web/components/tool-tab.js index f9900bb..1c1a310 100644 --- a/web/components/tool-tab.js +++ b/web/components/tool-tab.js @@ -48,9 +48,54 @@ function toolTab() { { type: 'input-text', name: 'hdfs', - label: '表HDFS路经', + label: 'HDFS路经', required: true, clearable: true, + description: '输入表HDFS路径', + } + ] + }, + { + type: 'form', + title: '查询表结构', + actions: [ + { + type: 'submit', + label: '查询', + actionType: 'dialog', + dialog: { + title: 'Hudi 表结构', + actions: [], + size: 'lg', + body: { + type: 'service', + api: { + method: 'get', + url: '${base}/hudi/schema', + data: { + hdfs: '${hdfs|default:undefined}', + }, + }, + body: { + type: 'page', + body: { + type: 'json', + source: '${detail}', + levelExpand: 3, + } + } + } + } + } + ], + body: [ + { + type: 'input-text', + name: 'hdfs', + label: 'HDFS路经', + required: true, + clearable: true, + description: '输入表HDFS路径', } ] }, diff --git a/web/index.html b/web/index.html index 99fecb8..3f18f5f 100644 --- a/web/index.html +++ b/web/index.html @@ -91,8 +91,8 @@ enableAMISDebug: debug, }, ); + console.log('Source', amisJSON) if (debug) { - console.log('Source', amisJSON) } })()