From 8ba4296670ecc3c66312a36279570adbdb12a487 Mon Sep 17 00:00:00 2001 From: lanyuanxiaoyao Date: Wed, 24 Jan 2024 18:23:19 +0800 Subject: [PATCH] =?UTF-8?q?feat(hudi-query):=20=E5=A2=9E=E5=8A=A0=E5=88=A4?= =?UTF-8?q?=E6=96=ADhudi=E8=A1=A8=E5=92=8Chdfs=E8=B7=AF=E5=BE=84=E6=98=AF?= =?UTF-8?q?=E5=90=A6=E5=AD=98=E5=9C=A8=E7=9A=84=E6=96=B9=E6=B3=95?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../service/forest/service/HudiService.java | 9 +++ .../hudi/controller/HdfsController.java | 45 +++++++++++++ .../service/hudi/service/HdfsService.java | 67 +++++++++++++++++++ 3 files changed, 121 insertions(+) create mode 100644 service-hudi-query/src/main/java/com/lanyuanxiaoyao/service/hudi/controller/HdfsController.java create mode 100644 service-hudi-query/src/main/java/com/lanyuanxiaoyao/service/hudi/service/HdfsService.java diff --git a/service-forest/src/main/java/com/lanyuanxiaoyao/service/forest/service/HudiService.java b/service-forest/src/main/java/com/lanyuanxiaoyao/service/forest/service/HudiService.java index 1a0d8ec..c8d8240 100644 --- a/service-forest/src/main/java/com/lanyuanxiaoyao/service/forest/service/HudiService.java +++ b/service-forest/src/main/java/com/lanyuanxiaoyao/service/forest/service/HudiService.java @@ -55,4 +55,13 @@ public interface HudiService { @Get("/table/schema_hdfs") ImmutableMap schema(@Query("hdfs") String hdfs); + + @Get("/hdfs/exists_hudi_table_by_flink_job_id_alias") + Boolean existsHudiTable(@Query("flink_job_id") Long flinkJobId, @Query("alias") String alias); + + @Get("/hdfs/exists_hudi_table_by_hdfs") + Boolean existsHudiTable(@Query("hdfs") String hdfs); + + @Get("/hdfs/exists_path") + Boolean existsPath(@Query("hdfs") String hdfs); } diff --git a/service-hudi-query/src/main/java/com/lanyuanxiaoyao/service/hudi/controller/HdfsController.java b/service-hudi-query/src/main/java/com/lanyuanxiaoyao/service/hudi/controller/HdfsController.java new file mode 100644 index 0000000..402f4e6 --- /dev/null +++ b/service-hudi-query/src/main/java/com/lanyuanxiaoyao/service/hudi/controller/HdfsController.java @@ -0,0 +1,45 @@ +package com.lanyuanxiaoyao.service.hudi.controller; + +import com.lanyuanxiaoyao.service.hudi.service.HdfsService; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.web.bind.annotation.GetMapping; +import org.springframework.web.bind.annotation.RequestMapping; +import org.springframework.web.bind.annotation.RequestParam; +import org.springframework.web.bind.annotation.RestController; + +/** + * hudi目标库的hdfs操作 + * + * @author lanyuanxiaoyao + * @date 2024-01-24 + */ +@RestController +@RequestMapping("hdfs") +public class HdfsController { + private static final Logger logger = LoggerFactory.getLogger(HdfsController.class); + + private final HdfsService hdfsService; + + public HdfsController(HdfsService hdfsService) { + this.hdfsService = hdfsService; + } + + @GetMapping("exists_hudi_table_by_flink_job_id_alias") + public Boolean existsHudiTable( + @RequestParam("flink_job_id") Long flinkJobId, + @RequestParam("alias") String alias + ) { + return hdfsService.existsHudiTable(flinkJobId, alias); + } + + @GetMapping("exists_hudi_table_by_hdfs") + public Boolean existsHudiTable(@RequestParam("hdfs") String hdfs) { + return hdfsService.existsHudiTable(hdfs); + } + + @GetMapping("exists_path") + public Boolean existsPath(@RequestParam("hdfs") String hdfs) { + return hdfsService.existsPath(hdfs); + } +} diff --git a/service-hudi-query/src/main/java/com/lanyuanxiaoyao/service/hudi/service/HdfsService.java b/service-hudi-query/src/main/java/com/lanyuanxiaoyao/service/hudi/service/HdfsService.java new file mode 100644 index 0000000..9d358b7 --- /dev/null +++ b/service-hudi-query/src/main/java/com/lanyuanxiaoyao/service/hudi/service/HdfsService.java @@ -0,0 +1,67 @@ +package com.lanyuanxiaoyao.service.hudi.service; + +import cn.hutool.core.util.ObjectUtil; +import com.eshore.odcp.hudi.connector.entity.TableMeta; +import com.lanyuanxiaoyao.service.forest.service.InfoService; +import java.io.IOException; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.cache.annotation.Cacheable; +import org.springframework.stereotype.Service; + +/** + * @author lanyuanxiaoyao + * @date 2024-01-24 + */ +@Service +public class HdfsService { + private static final Logger logger = LoggerFactory.getLogger(HdfsService.class); + + private final InfoService infoService; + + @SuppressWarnings("SpringJavaInjectionPointsAutowiringInspection") + public HdfsService(InfoService infoService) { + this.infoService = infoService; + } + + @Cacheable(value = "exists-hudi-table", sync = true, key = "#flinkJobId.toString()+#alias") + public Boolean existsHudiTable(Long flinkJobId, String alias) { + try { + TableMeta meta = infoService.tableMetaDetail(flinkJobId, alias); + if (ObjectUtil.isNotNull(meta)) { + return existsHudiTable(meta.getHudi().getTargetHdfsPath()); + } + } catch (Throwable ignored) { + } + return false; + } + + @Cacheable(value = "exists-hudi-table", sync = true) + public Boolean existsHudiTable(String hdfs) { + if (!existsPath(hdfs)) { + return false; + } + try { + HoodieTableMetaClient.builder() + .setConf(new Configuration()) + .setBasePath(hdfs) + .build(); + return true; + } catch (Throwable ignored) { + } + return false; + } + + @Cacheable(value = "exists-path", sync = true) + public Boolean existsPath(String hdfs) { + try(FileSystem fileSystem = FileSystem.get(new Configuration())) { + return fileSystem.exists(new Path(hdfs)); + } catch (IOException ignored) { + } + return false; + } +}