From 317cb5d56fbb4a55e1e3cc9be589fe0f09a37cfe Mon Sep 17 00:00:00 2001 From: lanyuanxiaoyao Date: Wed, 5 Jul 2023 19:07:48 +0800 Subject: [PATCH] =?UTF-8?q?feature(hudi-query):=20=E5=A2=9E=E5=8A=A0?= =?UTF-8?q?=E6=97=B6=E9=97=B4=E7=BA=BF=E5=8E=8B=E7=BC=A9=E8=AE=A1=E5=88=92?= =?UTF-8?q?=E8=AF=BB=E5=8F=96?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../service/forest/service/HudiService.java | 9 +- .../hudi/controller/CompactionController.java | 38 ----- .../hudi/controller/TimelineController.java | 18 +++ .../hudi/service/CompactionService.java | 68 --------- .../service/hudi/service/TimelineService.java | 36 +++++ .../web/controller/HudiController.java | 17 ++- web/components/common.js | 138 ++++++++++++++++++ 7 files changed, 213 insertions(+), 111 deletions(-) delete mode 100644 service-hudi-query/src/main/java/com/lanyuanxiaoyao/service/hudi/controller/CompactionController.java delete mode 100644 service-hudi-query/src/main/java/com/lanyuanxiaoyao/service/hudi/service/CompactionService.java diff --git a/service-forest/src/main/java/com/lanyuanxiaoyao/service/forest/service/HudiService.java b/service-forest/src/main/java/com/lanyuanxiaoyao/service/forest/service/HudiService.java index 503feb5..9091cb3 100644 --- a/service-forest/src/main/java/com/lanyuanxiaoyao/service/forest/service/HudiService.java +++ b/service-forest/src/main/java/com/lanyuanxiaoyao/service/forest/service/HudiService.java @@ -23,9 +23,12 @@ public interface HudiService { @Get("/timeline/list_hdfs") PageResponse timelineHdfsList(@Query Map queryMap); + @Get("/timeline/read_compaction_plan") + HudiCompactionPlan readCompactionPlan(@Query("flink_job_id") Long flinkJobId, @Query("alias") String alias, @Query("instant") String instant); + + @Get("/timeline/read_compaction_plan_hdfs") + HudiCompactionPlan readCompactionPlanHdfs(@Query("hdfs") String hdfs, @Query("instant") String instant); + @Get("/timeline/list_pending_compaction") ImmutableList timelinePendingCompactionList(@Query("flink_job_id") Long flinkJobId, @Query("alias") String alias); - - @Get("/compaction/plan") - HudiCompactionPlan compactionPlan(@Query("flink_job_id") Long flinkJobId, @Query("alias") String alias, @Query("timestamp") String timestamp); } diff --git a/service-hudi-query/src/main/java/com/lanyuanxiaoyao/service/hudi/controller/CompactionController.java b/service-hudi-query/src/main/java/com/lanyuanxiaoyao/service/hudi/controller/CompactionController.java deleted file mode 100644 index e6a8b88..0000000 --- a/service-hudi-query/src/main/java/com/lanyuanxiaoyao/service/hudi/controller/CompactionController.java +++ /dev/null @@ -1,38 +0,0 @@ -package com.lanyuanxiaoyao.service.hudi.controller; - -import com.lanyuanxiaoyao.service.configuration.entity.hudi.HudiCompactionPlan; -import com.lanyuanxiaoyao.service.hudi.service.CompactionService; -import java.io.IOException; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.springframework.web.bind.annotation.GetMapping; -import org.springframework.web.bind.annotation.RequestMapping; -import org.springframework.web.bind.annotation.RequestParam; -import org.springframework.web.bind.annotation.RestController; - -/** - * 压缩操作 - * - * @author lanyuanxiaoyao - * @date 2023-05-11 - */ -@RestController -@RequestMapping("compaction") -public class CompactionController { - private static final Logger logger = LoggerFactory.getLogger(CompactionController.class); - - private final CompactionService compactionService; - - public CompactionController(CompactionService compactionService) { - this.compactionService = compactionService; - } - - @GetMapping("plan") - public HudiCompactionPlan compactionPlan( - @RequestParam("flink_job_id") Long flinkJobId, - @RequestParam("alias") String alias, - @RequestParam("timestamp") String timestamp - ) throws IOException { - return compactionService.getCompactionPlan(flinkJobId, alias, timestamp); - } -} diff --git a/service-hudi-query/src/main/java/com/lanyuanxiaoyao/service/hudi/controller/TimelineController.java b/service-hudi-query/src/main/java/com/lanyuanxiaoyao/service/hudi/controller/TimelineController.java index 2e2dcf5..41062d8 100644 --- a/service-hudi-query/src/main/java/com/lanyuanxiaoyao/service/hudi/controller/TimelineController.java +++ b/service-hudi-query/src/main/java/com/lanyuanxiaoyao/service/hudi/controller/TimelineController.java @@ -1,6 +1,7 @@ package com.lanyuanxiaoyao.service.hudi.controller; import com.lanyuanxiaoyao.service.configuration.entity.PageResponse; +import com.lanyuanxiaoyao.service.configuration.entity.hudi.HudiCompactionPlan; import com.lanyuanxiaoyao.service.configuration.entity.hudi.HudiInstant; import com.lanyuanxiaoyao.service.hudi.service.TimelineService; import java.io.IOException; @@ -79,6 +80,23 @@ public class TimelineController { ); } + @GetMapping("read_compaction_plan") + public HudiCompactionPlan readCompactionPlan( + @RequestParam("flink_job_id") Long flinkJobId, + @RequestParam("alias") String alias, + @RequestParam("instant") String instant + ) throws IOException { + return timelineService.readCompactionPlan(flinkJobId, alias, instant); + } + + @GetMapping("read_compaction_plan_hdfs") + public HudiCompactionPlan readCompactionPlan( + @RequestParam("hdfs") String hdfs, + @RequestParam("instant") String instant + ) throws IOException { + return timelineService.readCompactionPlan(hdfs, instant); + } + @GetMapping("list_pending_compaction") public ImmutableList pendingCompactionInstants( @RequestParam("flink_job_id") Long flinkJobId, diff --git a/service-hudi-query/src/main/java/com/lanyuanxiaoyao/service/hudi/service/CompactionService.java b/service-hudi-query/src/main/java/com/lanyuanxiaoyao/service/hudi/service/CompactionService.java deleted file mode 100644 index 6639873..0000000 --- a/service-hudi-query/src/main/java/com/lanyuanxiaoyao/service/hudi/service/CompactionService.java +++ /dev/null @@ -1,68 +0,0 @@ -package com.lanyuanxiaoyao.service.hudi.service; - -import cn.hutool.core.util.ObjectUtil; -import com.eshore.odcp.hudi.connector.entity.TableMeta; -import com.lanyuanxiaoyao.service.configuration.entity.hudi.HudiCompactionPlan; -import com.lanyuanxiaoyao.service.forest.service.InfoService; -import java.io.IOException; -import org.apache.hadoop.conf.Configuration; -import org.apache.hudi.avro.model.HoodieCompactionPlan; -import org.apache.hudi.common.table.HoodieTableMetaClient; -import org.apache.hudi.common.util.CompactionUtils; -import org.eclipse.collections.api.factory.Lists; -import org.eclipse.collections.api.factory.Maps; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.springframework.cache.annotation.Cacheable; -import org.springframework.retry.annotation.Retryable; -import org.springframework.stereotype.Service; - -/** - * 压缩相关操作 - * - * @author lanyuanxiaoyao - * @date 2023-05-11 - */ -@Service -public class CompactionService { - private static final Logger logger = LoggerFactory.getLogger(CompactionService.class); - - private final InfoService infoService; - - @SuppressWarnings("SpringJavaInjectionPointsAutowiringInspection") - public CompactionService(InfoService infoService) { - this.infoService = infoService; - } - - @Cacheable(value = "compaction_plan", sync = true, key = "#flinkJobId.toString()+#alias+#timestamp") - @Retryable(Throwable.class) - public HudiCompactionPlan getCompactionPlan(Long flinkJobId, String alias, String timestamp) throws IOException { - TableMeta meta = infoService.tableMetaDetail(flinkJobId, alias); - HoodieTableMetaClient client = HoodieTableMetaClient.builder() - .setConf(new Configuration()) - .setBasePath(meta.getHudi().getTargetHdfsPath()) - .build(); - try { - HoodieCompactionPlan plan = CompactionUtils.getCompactionPlan(client, timestamp); - return new HudiCompactionPlan( - ObjectUtil.isNotNull(plan.getOperations()) - ? Lists.immutable.ofAll(plan.getOperations()) - .collect(o -> new HudiCompactionPlan.Operation( - o.getBaseInstantTime(), - Lists.immutable.ofAll(o.getDeltaFilePaths()), - o.getDataFilePath(), - o.getFileId(), - o.getPartitionPath(), - Maps.immutable.ofAll(o.getMetrics()), - o.getBootstrapFilePath() - )) - : Lists.immutable.empty(), - ObjectUtil.isNotNull(plan.getExtraMetadata()) ? Maps.immutable.ofAll(plan.getExtraMetadata()) : Maps.immutable.empty(), - plan.getVersion() - ); - } catch (IOException e) { - logger.error("Read compaction plan failure", e); - throw e; - } - } -} diff --git a/service-hudi-query/src/main/java/com/lanyuanxiaoyao/service/hudi/service/TimelineService.java b/service-hudi-query/src/main/java/com/lanyuanxiaoyao/service/hudi/service/TimelineService.java index ad31a30..e9f292f 100644 --- a/service-hudi-query/src/main/java/com/lanyuanxiaoyao/service/hudi/service/TimelineService.java +++ b/service-hudi-query/src/main/java/com/lanyuanxiaoyao/service/hudi/service/TimelineService.java @@ -5,6 +5,7 @@ import cn.hutool.core.util.StrUtil; import com.eshore.odcp.hudi.connector.entity.TableMeta; import com.lanyuanxiaoyao.service.configuration.ExecutorProvider; import com.lanyuanxiaoyao.service.configuration.entity.PageResponse; +import com.lanyuanxiaoyao.service.configuration.entity.hudi.HudiCompactionPlan; import com.lanyuanxiaoyao.service.configuration.entity.hudi.HudiInstant; import com.lanyuanxiaoyao.service.configuration.utils.ComparatorUtil; import com.lanyuanxiaoyao.service.forest.service.InfoService; @@ -15,9 +16,11 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hudi.avro.model.HoodieCompactionPlan; import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.table.timeline.HoodieTimeline; +import org.apache.hudi.common.util.CompactionUtils; import org.eclipse.collections.api.factory.Lists; import org.eclipse.collections.api.factory.Maps; import org.eclipse.collections.api.list.ImmutableList; @@ -139,6 +142,39 @@ public class TimelineService { return new PageResponse<>(result.toList(), hudiInstants.size()); } + @Cacheable(value = "read_compaction_plan", sync = true) + @Retryable(Throwable.class) + public HudiCompactionPlan readCompactionPlan(Long flinkJobId, String alias, String instant) throws IOException { + TableMeta meta = infoService.tableMetaDetail(flinkJobId, alias); + return readCompactionPlan(meta.getHudi().getTargetHdfsPath(), instant); + } + + @Cacheable(value = "read_compaction_plan", sync = true) + @Retryable(Throwable.class) + public HudiCompactionPlan readCompactionPlan(String hdfs, String instant) throws IOException { + HoodieTableMetaClient client = HoodieTableMetaClient.builder() + .setConf(new Configuration()) + .setBasePath(hdfs) + .build(); + HoodieCompactionPlan plan = CompactionUtils.getCompactionPlan(client, instant); + return new HudiCompactionPlan( + ObjectUtil.isNotNull(plan.getOperations()) + ? Lists.immutable.ofAll(plan.getOperations()) + .collect(o -> new HudiCompactionPlan.Operation( + o.getBaseInstantTime(), + Lists.immutable.ofAll(o.getDeltaFilePaths()), + o.getDataFilePath(), + o.getFileId(), + o.getPartitionPath(), + Maps.immutable.ofAll(o.getMetrics()), + o.getBootstrapFilePath() + )) + : Lists.immutable.empty(), + ObjectUtil.isNotNull(plan.getExtraMetadata()) ? Maps.immutable.ofAll(plan.getExtraMetadata()) : Maps.immutable.empty(), + plan.getVersion() + ); + } + @Cacheable(value = "pending_compaction_timeline", sync = true, key = "#flinkJobId.toString()+#alias") @Retryable(Throwable.class) public ImmutableList pendingCompactionTimeline(Long flinkJobId, String alias) throws IOException { diff --git a/service-web/src/main/java/com/lanyuanxiaoyao/service/web/controller/HudiController.java b/service-web/src/main/java/com/lanyuanxiaoyao/service/web/controller/HudiController.java index 42abf18..48e7f16 100644 --- a/service-web/src/main/java/com/lanyuanxiaoyao/service/web/controller/HudiController.java +++ b/service-web/src/main/java/com/lanyuanxiaoyao/service/web/controller/HudiController.java @@ -6,11 +6,9 @@ import com.lanyuanxiaoyao.service.configuration.entity.AmisResponse; import com.lanyuanxiaoyao.service.configuration.entity.PageResponse; import com.lanyuanxiaoyao.service.configuration.entity.hudi.HudiInstant; import com.lanyuanxiaoyao.service.forest.service.HudiService; -import com.lanyuanxiaoyao.service.configuration.utils.ComparatorUtil; import java.util.List; import java.util.function.Function; import org.eclipse.collections.api.factory.Maps; -import org.eclipse.collections.api.list.ImmutableList; import org.eclipse.collections.api.map.ImmutableMap; import org.eclipse.collections.api.map.MutableMap; import org.slf4j.Logger; @@ -102,4 +100,19 @@ public class HudiController extends BaseController { PageResponse response = hudiService.timelineHdfsList(queryMap); return responseCrudData(response.getData(), response.getTotal()); } + + @GetMapping("read_compaction_plan") + public AmisResponse readCompactionPlan( + @RequestParam(value = "flink_job_id", required = false) Long flinkJobId, + @RequestParam(value = "alias", required = false) String alias, + @RequestParam(value = "hdfs", required = false) String hdfs, + @RequestParam("instant") String instant + ) throws Exception { + if (StrUtil.isNotBlank(hdfs)) { + return responseDetail(hudiService.readCompactionPlanHdfs(hdfs, instant)); + } else if (ObjectUtil.isNotNull(flinkJobId) && StrUtil.isNotBlank(alias)) { + return responseDetail(hudiService.readCompactionPlan(flinkJobId, alias, instant)); + } + throw new Exception("Flink job id and alias or hdfs cannot be blank"); + } } diff --git a/web/components/common.js b/web/components/common.js index 03554ec..ffccb84 100644 --- a/web/components/common.js +++ b/web/components/common.js @@ -8,6 +8,16 @@ function crudCommonOptions() { } } +function readOnlyDialogOptions() { + return { + actions: [], + showCloseButton: false, + closeOnEsc: true, + closeOnOutside: true, + disabled: true, + } +} + function paginationCommonOptions(perPage = true, maxButtons = 5) { let option = { type: 'pagination', @@ -774,6 +784,134 @@ function timelineColumns() { { name: 'fileName', label: '文件名', + type: 'wrapper', + size: 'none', + body: [ + { + type: 'tpl', + tpl: '${fileName}' + }, + { + visibleOn: "action === 'compaction'", + type: 'action', + icon: 'fa fa-eye', + level: 'link', + tooltip: '查看压缩计划', + size: 'sm', + actionType: 'dialog', + dialog: { + title: '压缩计划详情', + actions: [], + size: 'lg', + body: { + type: 'crud', + api: { + method: 'get', + url: '${base}/hudi/read_compaction_plan', + data: { + hdfs: '${hdfs|default:undefined}', + flink_job_id: '${flinkJobId|default:undefined}', + alias: '${tableMeta.alias|default:undefined}', + instant: '${timestamp|default:undefined}', + }, + adaptor: (payload, response) => { + return { + items: (payload['data']['detail']['operations'] ? payload['data']['detail']['operations'] : []) + .map(operation => { + if (operation['deltaFilePaths']) { + operation.deltaFilePaths = operation.deltaFilePaths + .map(p => { + return { + path: p + } + }) + } + return operation + }) + } + } + }, + ...crudCommonOptions(), + loadDataOnce: true, + columns: [ + { + name: 'fileId', + label: '文件 ID', + searchable: true, + }, + { + name: 'baseInstantTime', + label: '版本时间点', + width: 120, + align: 'center', + }, + { + name: 'partitionPath', + label: '分区', + width: 50, + align: 'center', + }, + { + label: '读/写(MB)/数', + type: 'tpl', + tpl: '${metrics[\'TOTAL_IO_READ_MB\']} / ${metrics[\'TOTAL_IO_WRITE_MB\']} / ${metrics[\'TOTAL_LOG_FILES\']}', + align: 'center', + width: 90, + }, + { + type: 'operation', + label: '操作', + width: 150, + buttons: [ + { + label: '数据文件名', + type: 'action', + level: 'link', + actionType: 'copy', + content: '${dataFilePath}', + tooltip: '复制 ${dataFilePath}', + }, + { + label: '日志文件', + type: 'action', + level: 'link', + actionType: 'dialog', + dialog: { + title: '操作日志文件列表', + size: 'md', + ...readOnlyDialogOptions(), + body: { + type: 'crud', + source: '${deltaFilePaths}', + mode: 'list', + ...crudCommonOptions(), + loadDataOnce: true, + title: null, + listItem: { + title: '${path}' + } + } + } + } + ] + } + ] + } + } + }, + /*{ + type: 'tpl', + tpl: '${hdfs}' + }, + { + type: 'tpl', + tpl: '${flinkJobId}' + }, + { + type: 'tpl', + tpl: '${tableMeta.alias}' + },*/ + ] }, { name: 'type',