feature(hudi-query): Hudi 时间线查询增加 instant 文件创建时间

Hudi 时间线查询增加分页参数下推,增加性能
This commit is contained in:
2023-06-28 16:53:03 +08:00
parent 1ed1b8105a
commit 1ce0b74e2d
14 changed files with 154 additions and 45 deletions

View File

@@ -12,21 +12,21 @@ import java.util.Map;
* @date 2023-04-26
*/
public class PageResponse<T> {
private List<T> data = new ArrayList<>(0);
private Iterable<T> data = new ArrayList<>(0);
private Long total = 0L;
private Map<String, Object> metadata = new HashMap<>();
public PageResponse() {
}
public PageResponse(List<T> data) {
public PageResponse(Iterable<T> data, Long total) {
this.data = data;
this.total = (long) data.size();
this.total = total;
}
public PageResponse(List<T> data, Long total) {
this(data);
this.total = total;
public PageResponse(Iterable<T> data, Integer total) {
this.data = data;
this.total = total.longValue();
}
public PageResponse(List<T> data, Long total, Map<String, Object> metadata) {
@@ -34,11 +34,11 @@ public class PageResponse<T> {
this.metadata = metadata;
}
public List<T> getData() {
public Iterable<T> getData() {
return data;
}
public void setData(List<T> data) {
public void setData(Iterable<T> data) {
this.data = data;
}

View File

@@ -28,6 +28,7 @@ public class HudiInstant implements Comparable<HudiInstant> {
private String state;
private String timestamp;
private String fileName;
private Long fileTime = 0L;
// active or archive
private String type;
@@ -62,6 +63,13 @@ public class HudiInstant implements Comparable<HudiInstant> {
return type;
}
public Long getFileTime() {
return fileTime;
}
public void setFileTime(Long fileTime) {
this.fileTime = fileTime;
}
@Override
public String toString() {

View File

@@ -1,4 +1,4 @@
package com.lanyuanxiaoyao.service.web.utils;
package com.lanyuanxiaoyao.service.configuration.utils;
import cn.hutool.core.util.StrUtil;
import java.util.Comparator;

View File

@@ -1,4 +1,4 @@
package com.lanyuanxiaoyao.service.web.utils;
package com.lanyuanxiaoyao.service.configuration.utils;
import cn.hutool.core.util.ObjectUtil;
import java.util.function.Supplier;

View File

@@ -3,6 +3,7 @@ package com.lanyuanxiaoyao.service.forest.service;
import com.dtflys.forest.annotation.BaseRequest;
import com.dtflys.forest.annotation.Get;
import com.dtflys.forest.annotation.Query;
import com.lanyuanxiaoyao.service.configuration.entity.PageResponse;
import com.lanyuanxiaoyao.service.configuration.entity.hudi.HudiCompactionPlan;
import com.lanyuanxiaoyao.service.configuration.entity.hudi.HudiInstant;
import java.util.Map;
@@ -17,10 +18,10 @@ import org.eclipse.collections.api.list.ImmutableList;
@BaseRequest(baseURL = "http://service-hudi-query")
public interface HudiService {
@Get("/timeline/list")
ImmutableList<HudiInstant> timelineList(@Query Map<String, Object> queryMap);
PageResponse<HudiInstant> timelineList(@Query Map<String, Object> queryMap);
@Get("/timeline/list_hdfs")
ImmutableList<HudiInstant> timelineHdfsList(@Query Map<String, Object> queryMap);
PageResponse<HudiInstant> timelineHdfsList(@Query Map<String, Object> queryMap);
@Get("/timeline/list_pending_compaction")
ImmutableList<HudiInstant> timelinePendingCompactionList(@Query("flink_job_id") Long flinkJobId, @Query("alias") String alias);

View File

@@ -1,5 +1,6 @@
package com.lanyuanxiaoyao.service.hudi.controller;
import com.lanyuanxiaoyao.service.configuration.entity.PageResponse;
import com.lanyuanxiaoyao.service.configuration.entity.hudi.HudiInstant;
import com.lanyuanxiaoyao.service.hudi.service.TimelineService;
import java.io.IOException;
@@ -31,20 +32,51 @@ public class TimelineController {
}
@GetMapping("list")
public ImmutableList<HudiInstant> allInstants(
public PageResponse<HudiInstant> allInstants(
@RequestParam(value = "page", defaultValue = "1") Integer page,
@RequestParam(value = "count", defaultValue = "10") Integer count,
@RequestParam(value = "order", required = false) String order,
@RequestParam(value = "direction", required = false) String direction,
@RequestParam("flink_job_id") Long flinkJobId,
@RequestParam("alias") String alias,
@RequestParam(value = "filter_type", required = false) List<String> filterType
@RequestParam(value = "filter_type", required = false) List<String> filterType,
@RequestParam(value = "filter_action", required = false) List<String> filterAction,
@RequestParam(value = "filter_state", required = false) List<String> filterState
) throws IOException {
return timelineService.timeline(flinkJobId, alias, Lists.immutable.ofAll(filterType));
return timelineService.timeline(
page,
count,
order,
direction,
flinkJobId,
alias,
Lists.immutable.ofAll(filterType),
Lists.immutable.ofAll(filterAction),
Lists.immutable.ofAll(filterState)
);
}
@GetMapping("list_hdfs")
public ImmutableList<HudiInstant> allInstants(
public PageResponse<HudiInstant> allInstants(
@RequestParam(value = "page", defaultValue = "1") Integer page,
@RequestParam(value = "count", defaultValue = "10") Integer count,
@RequestParam(value = "order", required = false) String order,
@RequestParam(value = "direction", required = false) String direction,
@RequestParam("hdfs") String hdfs,
@RequestParam(value = "filter_type", required = false) List<String> filterType
@RequestParam(value = "filter_type", required = false) List<String> filterType,
@RequestParam(value = "filter_action", required = false) List<String> filterAction,
@RequestParam(value = "filter_state", required = false) List<String> filterState
) throws IOException {
return timelineService.timeline(hdfs, Lists.immutable.ofAll(filterType));
return timelineService.timeline(
page,
count,
order,
direction,
hdfs,
Lists.immutable.ofAll(filterType),
Lists.immutable.ofAll(filterAction),
Lists.immutable.ofAll(filterState)
);
}
@GetMapping("list_pending_compaction")

View File

@@ -3,17 +3,26 @@ package com.lanyuanxiaoyao.service.hudi.service;
import cn.hutool.core.util.ObjectUtil;
import cn.hutool.core.util.StrUtil;
import com.eshore.odcp.hudi.connector.entity.TableMeta;
import com.lanyuanxiaoyao.service.configuration.ExecutorProvider;
import com.lanyuanxiaoyao.service.configuration.entity.PageResponse;
import com.lanyuanxiaoyao.service.configuration.entity.hudi.HudiInstant;
import com.lanyuanxiaoyao.service.configuration.utils.ComparatorUtil;
import com.lanyuanxiaoyao.service.forest.service.InfoService;
import com.lanyuanxiaoyao.service.hudi.utils.HoodieUtils;
import java.io.IOException;
import java.util.function.Function;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.eclipse.collections.api.factory.Lists;
import org.eclipse.collections.api.factory.Maps;
import org.eclipse.collections.api.list.ImmutableList;
import org.eclipse.collections.api.list.MutableList;
import org.eclipse.collections.api.map.ImmutableMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.cache.annotation.Cacheable;
@@ -31,6 +40,10 @@ public class TimelineService {
private static final Logger logger = LoggerFactory.getLogger(TimelineService.class);
private static final String INSTANT_TYPE_ACTIVE = "active";
private static final String INSTANT_TYPE_ARCHIVE = "archive";
private static final ImmutableMap<String, Function<HudiInstant, String>> TIMELINE_SORT_MAP = Maps.immutable.of(
"timestamp",
HudiInstant::getTimestamp
);
private final InfoService infoService;
@SuppressWarnings("SpringJavaInjectionPointsAutowiringInspection")
@@ -40,16 +53,46 @@ public class TimelineService {
@Cacheable(value = "timeline", sync = true, key = "#flinkJobId.toString()+#alias")
@Retryable(Throwable.class)
public ImmutableList<HudiInstant> timeline(Long flinkJobId, String alias, ImmutableList<String> filterType) throws IOException {
public PageResponse<HudiInstant> timeline(
Integer page,
Integer count,
String order,
String direction,
Long flinkJobId,
String alias,
ImmutableList<String> filterType,
ImmutableList<String> filterAction,
ImmutableList<String> filterState
) throws IOException {
TableMeta meta = infoService.tableMetaDetail(flinkJobId, alias);
return timeline(meta.getHudi().getTargetHdfsPath(), filterType);
return timeline(
page,
count,
order,
direction,
meta.getHudi().getTargetHdfsPath(),
filterType,
filterAction,
filterState
);
}
@Cacheable(value = "timeline", sync = true, key = "#hdfs")
@Retryable(Throwable.class)
public ImmutableList<HudiInstant> timeline(String hdfs, ImmutableList<String> filterType) throws IOException {
public PageResponse<HudiInstant> timeline(
Integer page,
Integer count,
String order,
String direction,
String hdfs,
ImmutableList<String> filterType,
ImmutableList<String> filterAction,
ImmutableList<String> filterState
) throws IOException {
Configuration configuration = new Configuration();
FileSystem fileSystem = FileSystem.get(configuration);
HoodieTableMetaClient client = HoodieTableMetaClient.builder()
.setConf(new Configuration())
.setConf(configuration)
.setBasePath(hdfs)
.build();
MutableList<HudiInstant> instants = Lists.mutable.empty();
@@ -66,9 +109,31 @@ public class TimelineService {
.collect(instant -> covert(INSTANT_TYPE_ACTIVE, instant))
.forEach(instants::add);
}
return instants
ImmutableList<HudiInstant> hudiInstants = instants
.toSortedList(HudiInstant::compareTo)
.select(instant -> ObjectUtil.isEmpty(filterAction) || filterAction.contains(instant.getAction()))
.select(instant -> ObjectUtil.isEmpty(filterState) || filterState.contains(instant.getState()))
.toSortedList(ComparatorUtil.stringComparator(order, direction, TIMELINE_SORT_MAP))
.toImmutable();
ImmutableList<HudiInstant> result = hudiInstants
.drop(Math.max(page - 1, 0) * count)
.take(count)
.asParallel(ExecutorProvider.EXECUTORS, 1)
.collect(instant -> {
Path instantPath = new Path(StrUtil.format("{}/{}", client.getMetaPath(), instant.getFileName()));
try {
if (fileSystem.exists(instantPath)) {
FileStatus status = fileSystem.getFileStatus(instantPath);
instant.setFileTime(status.getModificationTime());
}
} catch (IOException e) {
logger.error("IOError", e);
}
return instant;
})
.toList()
.toImmutable();
return new PageResponse<>(result.toList(), hudiInstants.size());
}
@Cacheable(value = "pending_compaction_timeline", sync = true, key = "#flinkJobId.toString()+#alias")

View File

@@ -2,9 +2,10 @@ package com.lanyuanxiaoyao.service.web.controller;
import cn.hutool.core.util.ObjectUtil;
import com.lanyuanxiaoyao.service.configuration.entity.AmisResponse;
import com.lanyuanxiaoyao.service.configuration.entity.PageResponse;
import com.lanyuanxiaoyao.service.configuration.entity.hudi.HudiInstant;
import com.lanyuanxiaoyao.service.forest.service.HudiService;
import com.lanyuanxiaoyao.service.web.utils.ComparatorUtil;
import com.lanyuanxiaoyao.service.configuration.utils.ComparatorUtil;
import java.util.List;
import java.util.function.Function;
import org.eclipse.collections.api.factory.Maps;
@@ -53,20 +54,17 @@ public class HudiController extends BaseController {
@RequestParam(value = "filter_action", required = false) List<String> filterAction,
@RequestParam(value = "filter_state", required = false) List<String> filterState
) {
MutableMap<String, Object> queryMap = Maps.mutable.empty();
queryMap.put("flink_job_id", flinkJobId);
queryMap.put("alias", alias);
MutableMap<String, Object> queryMap = buildQueryMap(page, count, order, direction, flinkJobId.toString(), alias);
if (ObjectUtil.isNotEmpty(filterType)) {
queryMap.put("filter_type", filterType);
}
ImmutableList<HudiInstant> hudiInstants = hudiService.timelineList(queryMap)
.select(instant -> ObjectUtil.isEmpty(filterAction) || filterAction.contains(instant.getAction()))
.select(instant -> ObjectUtil.isEmpty(filterState) || filterState.contains(instant.getState()))
.toSortedList(ComparatorUtil.stringComparator(order, direction, TIMELINE_SORT_MAP))
.toImmutable();
ImmutableList<HudiInstant> result = hudiInstants
.drop(Math.max(page - 1, 0) * count)
.take(count);
return responseCrudData(result, hudiInstants.size());
if (ObjectUtil.isNotEmpty(filterAction)) {
queryMap.put("filter_action", filterAction);
}
if (ObjectUtil.isNotEmpty(filterState)) {
queryMap.put("filter_state", filterState);
}
PageResponse<HudiInstant> response = hudiService.timelineList(queryMap);
return responseCrudData(response.getData(), response.getTotal());
}
}

View File

@@ -14,7 +14,7 @@ import com.lanyuanxiaoyao.service.forest.service.FlinkService;
import com.lanyuanxiaoyao.service.forest.service.YarnService;
import com.lanyuanxiaoyao.service.web.entity.YarnApplicationVO;
import com.lanyuanxiaoyao.service.web.entity.YarnClusterVO;
import com.lanyuanxiaoyao.service.web.utils.ComparatorUtil;
import com.lanyuanxiaoyao.service.configuration.utils.ComparatorUtil;
import java.util.Comparator;
import java.util.List;
import java.util.Optional;

View File

@@ -2,7 +2,7 @@ package com.lanyuanxiaoyao.service.web.entity;
import cn.hutool.core.util.ObjectUtil;
import com.lanyuanxiaoyao.service.configuration.entity.info.CompactionMetrics;
import com.lanyuanxiaoyao.service.web.utils.DatetimeUtil;
import com.lanyuanxiaoyao.service.configuration.utils.DatetimeUtil;
import java.time.Instant;
/**

View File

@@ -2,7 +2,7 @@ package com.lanyuanxiaoyao.service.web.entity;
import com.eshore.odcp.hudi.connector.entity.SyncState;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.lanyuanxiaoyao.service.web.utils.DatetimeUtil;
import com.lanyuanxiaoyao.service.configuration.utils.DatetimeUtil;
import java.time.Instant;
/**

View File

@@ -5,7 +5,7 @@ import cn.hutool.core.util.ReUtil;
import com.eshore.odcp.hudi.connector.utils.NameHelper;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.lanyuanxiaoyao.service.configuration.entity.yarn.YarnApplication;
import com.lanyuanxiaoyao.service.web.utils.DatetimeUtil;
import com.lanyuanxiaoyao.service.configuration.utils.DatetimeUtil;
import java.time.Instant;
/**

View File

@@ -4,7 +4,7 @@ import cn.hutool.core.util.ObjectUtil;
import com.eshore.odcp.hudi.connector.entity.RunMeta;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.lanyuanxiaoyao.service.configuration.entity.zookeeper.ZookeeperNode;
import com.lanyuanxiaoyao.service.web.utils.DatetimeUtil;
import com.lanyuanxiaoyao.service.configuration.utils.DatetimeUtil;
import java.time.Instant;
import java.util.Optional;

View File

@@ -906,13 +906,10 @@ function tableMetaDialog() {
direction: '${orderDir|default:undefined}',
flink_job_id: '${flinkJobId|default:undefined}',
alias: '${tableMeta.alias|default:undefined}',
filter_type: '${type|default:undefined}',
filter_type: "${type|default:active}",
filter_action: '${action|default:undefined}',
filter_state: '${state|default:undefined}',
},
defaultParams: {
filter_type: 'active',
},
},
...crudCommonOptions(),
perPage: 15,
@@ -945,6 +942,14 @@ function tableMetaDialog() {
...mappingField('state', hudiTimelineStateMapping),
filterable: filterableField(hudiTimelineStateMapping, true),
},
{
name: 'fileTime',
label: ' 文件时间',
width: 150,
align: 'center',
type: 'tpl',
tpl: "${DATETOSTR(DATE(fileTime), 'YYYY-MM-DD HH:mm:ss')}",
},
{
name: 'fileName',
label: '文件名',