feature(hudi-query): 增加查询 Hudi 表结构
This commit is contained in:
@@ -0,0 +1,43 @@
|
||||
package com.lanyuanxiaoyao.service.hudi.controller;
|
||||
|
||||
import com.lanyuanxiaoyao.service.hudi.service.TableService;
|
||||
import org.eclipse.collections.api.map.ImmutableMap;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.springframework.web.bind.annotation.GetMapping;
|
||||
import org.springframework.web.bind.annotation.RequestMapping;
|
||||
import org.springframework.web.bind.annotation.RequestParam;
|
||||
import org.springframework.web.bind.annotation.RestController;
|
||||
|
||||
/**
|
||||
* Hudi 表
|
||||
*
|
||||
* @author lanyuanxiaoyao
|
||||
* @date 2023-07-07
|
||||
*/
|
||||
@RestController
|
||||
@RequestMapping("table")
|
||||
public class TableController {
|
||||
private static final Logger logger = LoggerFactory.getLogger(TableController.class);
|
||||
|
||||
private final TableService tableService;
|
||||
|
||||
public TableController(TableService tableService) {
|
||||
this.tableService = tableService;
|
||||
}
|
||||
|
||||
@GetMapping("schema")
|
||||
public ImmutableMap<String, Object> schema(
|
||||
@RequestParam("flink_job_id") Long flinkJobId,
|
||||
@RequestParam("alias") String alias
|
||||
) throws Exception {
|
||||
return tableService.schema(flinkJobId, alias);
|
||||
}
|
||||
|
||||
@GetMapping("schema_hdfs")
|
||||
public ImmutableMap<String, Object> schema(
|
||||
@RequestParam("hdfs") String hdfs
|
||||
) throws Exception {
|
||||
return tableService.schema(hdfs);
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,57 @@
|
||||
package com.lanyuanxiaoyao.service.hudi.service;
|
||||
|
||||
import com.eshore.odcp.hudi.connector.entity.TableMeta;
|
||||
import com.fasterxml.jackson.core.type.TypeReference;
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.lanyuanxiaoyao.service.forest.service.InfoService;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hudi.common.table.HoodieTableMetaClient;
|
||||
import org.apache.hudi.common.table.TableSchemaResolver;
|
||||
import org.apache.hudi.org.apache.avro.Schema;
|
||||
import org.eclipse.collections.api.map.ImmutableMap;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.springframework.cache.annotation.Cacheable;
|
||||
import org.springframework.http.converter.json.Jackson2ObjectMapperBuilder;
|
||||
import org.springframework.retry.annotation.Retryable;
|
||||
import org.springframework.stereotype.Service;
|
||||
|
||||
/**
|
||||
* Hudi 表服务
|
||||
*
|
||||
* @author lanyuanxiaoyao
|
||||
* @date 2023-07-07
|
||||
*/
|
||||
@Service
|
||||
public class TableService {
|
||||
private static final Logger logger = LoggerFactory.getLogger(TableService.class);
|
||||
|
||||
private final ObjectMapper mapper;
|
||||
private final InfoService infoService;
|
||||
|
||||
@SuppressWarnings("SpringJavaInjectionPointsAutowiringInspection")
|
||||
public TableService(Jackson2ObjectMapperBuilder builder, InfoService infoService) {
|
||||
this.mapper = builder.build();
|
||||
this.infoService = infoService;
|
||||
}
|
||||
|
||||
@Cacheable(value = "table-schema", sync = true)
|
||||
@Retryable(Throwable.class)
|
||||
public ImmutableMap<String, Object> schema(Long flinkJobId, String alias) throws Exception {
|
||||
TableMeta meta = infoService.tableMetaDetail(flinkJobId, alias);
|
||||
return schema(meta.getHudi().getTargetHdfsPath());
|
||||
}
|
||||
|
||||
@Cacheable(value = "table-schema", sync = true)
|
||||
@Retryable(Throwable.class)
|
||||
public ImmutableMap<String, Object> schema(String hdfs) throws Exception {
|
||||
HoodieTableMetaClient client = HoodieTableMetaClient.builder()
|
||||
.setConf(new Configuration())
|
||||
.setBasePath(hdfs)
|
||||
.build();
|
||||
TableSchemaResolver schemaUtil = new TableSchemaResolver(client);
|
||||
Schema schema = schemaUtil.getTableAvroSchema(true);
|
||||
return mapper.readValue(schema.toString(), new TypeReference<ImmutableMap<String, Object>>() {
|
||||
});
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user