feat(hudi-query): 增加判断hudi表和hdfs路径是否存在的方法
This commit is contained in:
@@ -55,4 +55,13 @@ public interface HudiService {
|
||||
|
||||
@Get("/table/schema_hdfs")
|
||||
ImmutableMap<String, Object> schema(@Query("hdfs") String hdfs);
|
||||
|
||||
@Get("/hdfs/exists_hudi_table_by_flink_job_id_alias")
|
||||
Boolean existsHudiTable(@Query("flink_job_id") Long flinkJobId, @Query("alias") String alias);
|
||||
|
||||
@Get("/hdfs/exists_hudi_table_by_hdfs")
|
||||
Boolean existsHudiTable(@Query("hdfs") String hdfs);
|
||||
|
||||
@Get("/hdfs/exists_path")
|
||||
Boolean existsPath(@Query("hdfs") String hdfs);
|
||||
}
|
||||
|
||||
@@ -0,0 +1,45 @@
|
||||
package com.lanyuanxiaoyao.service.hudi.controller;
|
||||
|
||||
import com.lanyuanxiaoyao.service.hudi.service.HdfsService;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.springframework.web.bind.annotation.GetMapping;
|
||||
import org.springframework.web.bind.annotation.RequestMapping;
|
||||
import org.springframework.web.bind.annotation.RequestParam;
|
||||
import org.springframework.web.bind.annotation.RestController;
|
||||
|
||||
/**
|
||||
* hudi目标库的hdfs操作
|
||||
*
|
||||
* @author lanyuanxiaoyao
|
||||
* @date 2024-01-24
|
||||
*/
|
||||
@RestController
|
||||
@RequestMapping("hdfs")
|
||||
public class HdfsController {
|
||||
private static final Logger logger = LoggerFactory.getLogger(HdfsController.class);
|
||||
|
||||
private final HdfsService hdfsService;
|
||||
|
||||
public HdfsController(HdfsService hdfsService) {
|
||||
this.hdfsService = hdfsService;
|
||||
}
|
||||
|
||||
@GetMapping("exists_hudi_table_by_flink_job_id_alias")
|
||||
public Boolean existsHudiTable(
|
||||
@RequestParam("flink_job_id") Long flinkJobId,
|
||||
@RequestParam("alias") String alias
|
||||
) {
|
||||
return hdfsService.existsHudiTable(flinkJobId, alias);
|
||||
}
|
||||
|
||||
@GetMapping("exists_hudi_table_by_hdfs")
|
||||
public Boolean existsHudiTable(@RequestParam("hdfs") String hdfs) {
|
||||
return hdfsService.existsHudiTable(hdfs);
|
||||
}
|
||||
|
||||
@GetMapping("exists_path")
|
||||
public Boolean existsPath(@RequestParam("hdfs") String hdfs) {
|
||||
return hdfsService.existsPath(hdfs);
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,67 @@
|
||||
package com.lanyuanxiaoyao.service.hudi.service;
|
||||
|
||||
import cn.hutool.core.util.ObjectUtil;
|
||||
import com.eshore.odcp.hudi.connector.entity.TableMeta;
|
||||
import com.lanyuanxiaoyao.service.forest.service.InfoService;
|
||||
import java.io.IOException;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hudi.common.table.HoodieTableMetaClient;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.springframework.cache.annotation.Cacheable;
|
||||
import org.springframework.stereotype.Service;
|
||||
|
||||
/**
|
||||
* @author lanyuanxiaoyao
|
||||
* @date 2024-01-24
|
||||
*/
|
||||
@Service
|
||||
public class HdfsService {
|
||||
private static final Logger logger = LoggerFactory.getLogger(HdfsService.class);
|
||||
|
||||
private final InfoService infoService;
|
||||
|
||||
@SuppressWarnings("SpringJavaInjectionPointsAutowiringInspection")
|
||||
public HdfsService(InfoService infoService) {
|
||||
this.infoService = infoService;
|
||||
}
|
||||
|
||||
@Cacheable(value = "exists-hudi-table", sync = true, key = "#flinkJobId.toString()+#alias")
|
||||
public Boolean existsHudiTable(Long flinkJobId, String alias) {
|
||||
try {
|
||||
TableMeta meta = infoService.tableMetaDetail(flinkJobId, alias);
|
||||
if (ObjectUtil.isNotNull(meta)) {
|
||||
return existsHudiTable(meta.getHudi().getTargetHdfsPath());
|
||||
}
|
||||
} catch (Throwable ignored) {
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
@Cacheable(value = "exists-hudi-table", sync = true)
|
||||
public Boolean existsHudiTable(String hdfs) {
|
||||
if (!existsPath(hdfs)) {
|
||||
return false;
|
||||
}
|
||||
try {
|
||||
HoodieTableMetaClient.builder()
|
||||
.setConf(new Configuration())
|
||||
.setBasePath(hdfs)
|
||||
.build();
|
||||
return true;
|
||||
} catch (Throwable ignored) {
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
@Cacheable(value = "exists-path", sync = true)
|
||||
public Boolean existsPath(String hdfs) {
|
||||
try(FileSystem fileSystem = FileSystem.get(new Configuration())) {
|
||||
return fileSystem.exists(new Path(hdfs));
|
||||
} catch (IOException ignored) {
|
||||
}
|
||||
return false;
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user