feature(hudi-query): 完成 Hudi 表时间线查询
对于归档中的时间线查询似乎不会直接给出合并后的时间线状态,需要后续对时间线的内容进行单独处理
This commit is contained in:
@@ -3,6 +3,8 @@ package com.lanyuanxiaoyao.service.hudi.controller;
|
||||
import com.lanyuanxiaoyao.service.configuration.entity.hudi.HudiInstant;
|
||||
import com.lanyuanxiaoyao.service.hudi.service.TimelineService;
|
||||
import java.io.IOException;
|
||||
import java.util.List;
|
||||
import org.eclipse.collections.api.factory.Lists;
|
||||
import org.eclipse.collections.api.list.ImmutableList;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
@@ -29,12 +31,34 @@ public class TimelineController {
|
||||
}
|
||||
|
||||
@GetMapping("list")
|
||||
public ImmutableList<HudiInstant> allInstants(@RequestParam("flink_job_id") Long flinkJobId, @RequestParam("alias") String alias) throws IOException {
|
||||
return timelineService.timeline(flinkJobId, alias);
|
||||
public ImmutableList<HudiInstant> allInstants(
|
||||
@RequestParam("flink_job_id") Long flinkJobId,
|
||||
@RequestParam("alias") String alias,
|
||||
@RequestParam(value = "filter_type", required = false) List<String> filterType,
|
||||
@RequestParam(value = "filter_action", required = false) List<String> filterAction,
|
||||
@RequestParam(value = "filter_state", required = false) List<String> filterState
|
||||
) throws IOException {
|
||||
return timelineService.timeline(
|
||||
flinkJobId,
|
||||
alias,
|
||||
Lists.immutable.ofAll(filterType),
|
||||
Lists.immutable.ofAll(filterAction),
|
||||
Lists.immutable.ofAll(filterState)
|
||||
);
|
||||
}
|
||||
|
||||
@GetMapping("list_hdfs")
|
||||
public ImmutableList<HudiInstant> allInstants(@RequestParam("hdfs") String hdfs) throws IOException {
|
||||
return timelineService.timeline(hdfs);
|
||||
public ImmutableList<HudiInstant> allInstants(
|
||||
@RequestParam("hdfs") String hdfs,
|
||||
@RequestParam(value = "filter_type", required = false) List<String> filterType,
|
||||
@RequestParam(value = "filter_action", required = false) List<String> filterAction,
|
||||
@RequestParam(value = "filter_state", required = false) List<String> filterState
|
||||
) throws IOException {
|
||||
return timelineService.timeline(
|
||||
hdfs,
|
||||
Lists.immutable.ofAll(filterType),
|
||||
Lists.immutable.ofAll(filterAction),
|
||||
Lists.immutable.ofAll(filterState)
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,5 +1,6 @@
|
||||
package com.lanyuanxiaoyao.service.hudi.service;
|
||||
|
||||
import cn.hutool.core.util.ObjectUtil;
|
||||
import com.eshore.odcp.hudi.connector.entity.TableMeta;
|
||||
import com.lanyuanxiaoyao.service.configuration.entity.hudi.HudiInstant;
|
||||
import com.lanyuanxiaoyao.service.forest.service.InfoService;
|
||||
@@ -8,7 +9,9 @@ import java.io.IOException;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hudi.common.table.HoodieTableMetaClient;
|
||||
import org.apache.hudi.common.table.timeline.HoodieInstant;
|
||||
import org.eclipse.collections.api.factory.Lists;
|
||||
import org.eclipse.collections.api.list.ImmutableList;
|
||||
import org.eclipse.collections.api.list.MutableList;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.springframework.cache.annotation.Cacheable;
|
||||
@@ -34,23 +37,40 @@ public class TimelineService {
|
||||
|
||||
@Cacheable(value = "timeline", sync = true, key = "#flinkJobId.toString()+#alias")
|
||||
@Retryable(Throwable.class)
|
||||
public ImmutableList<HudiInstant> timeline(Long flinkJobId, String alias) throws IOException {
|
||||
public ImmutableList<HudiInstant> timeline(Long flinkJobId, String alias, ImmutableList<String> filterType, ImmutableList<String> filterAction, ImmutableList<String> filterState) throws IOException {
|
||||
TableMeta meta = infoService.tableMetaDetail(flinkJobId, alias);
|
||||
return timeline(meta.getHudi().getTargetHdfsPath());
|
||||
return timeline(meta.getHudi().getTargetHdfsPath(), filterType, filterAction, filterState);
|
||||
}
|
||||
|
||||
private static final String INSTANT_TYPE_ACTIVE = "active";
|
||||
private static final String INSTANT_TYPE_ARCHIVE = "archive";
|
||||
|
||||
@Cacheable(value = "timeline", sync = true, key = "#hdfs")
|
||||
@Retryable(Throwable.class)
|
||||
public ImmutableList<HudiInstant> timeline(String hdfs) throws IOException {
|
||||
public ImmutableList<HudiInstant> timeline(String hdfs, ImmutableList<String> filterType, ImmutableList<String> filterAction, ImmutableList<String> filterState) throws IOException {
|
||||
HoodieTableMetaClient client = HoodieTableMetaClient.builder()
|
||||
.setConf(new Configuration())
|
||||
.setBasePath(hdfs)
|
||||
.build();
|
||||
ImmutableList<HudiInstant> activeInstants = HoodieUtils.getAllInstants(client, HoodieTableMetaClient::getActiveTimeline)
|
||||
.collect(instant -> covert("active", instant));
|
||||
ImmutableList<HudiInstant> archiveInstants = HoodieUtils.getAllInstants(client, HoodieTableMetaClient::getArchivedTimeline)
|
||||
.collect(instant -> covert("archive", instant));
|
||||
return activeInstants.newWithAll(archiveInstants)
|
||||
MutableList<HudiInstant> instants = Lists.mutable.empty();
|
||||
if (ObjectUtil.isEmpty(filterType)) {
|
||||
filterType = Lists.immutable.of(INSTANT_TYPE_ARCHIVE, INSTANT_TYPE_ACTIVE);
|
||||
}
|
||||
if (filterType.contains(INSTANT_TYPE_ARCHIVE)) {
|
||||
HoodieUtils.getAllInstants(client, HoodieTableMetaClient::getArchivedTimeline)
|
||||
.collect(instant -> covert(INSTANT_TYPE_ARCHIVE, instant))
|
||||
.select(instant -> ObjectUtil.isEmpty(filterAction) || filterAction.contains(instant.getAction()))
|
||||
.select(instant -> ObjectUtil.isEmpty(filterState) || filterState.contains(instant.getState()))
|
||||
.forEach(instants::add);
|
||||
}
|
||||
if (filterType.contains(INSTANT_TYPE_ACTIVE)) {
|
||||
HoodieUtils.getAllInstants(client, HoodieTableMetaClient::getActiveTimeline)
|
||||
.collect(instant -> covert(INSTANT_TYPE_ACTIVE, instant))
|
||||
.select(instant -> ObjectUtil.isEmpty(filterAction) || filterAction.contains(instant.getAction()))
|
||||
.select(instant -> ObjectUtil.isEmpty(filterState) || filterState.contains(instant.getState()))
|
||||
.forEach(instants::add);
|
||||
}
|
||||
return instants
|
||||
.toSortedList(HudiInstant::compareTo)
|
||||
.toImmutable();
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user