feature(web): 增加时间线 Rollback 和 Clean 的查询

This commit is contained in:
2023-07-06 19:26:49 +08:00
parent f0c4031365
commit a5a9f600f1
8 changed files with 571 additions and 26 deletions

View File

@@ -0,0 +1,130 @@
package com.lanyuanxiaoyao.service.configuration.entity.hudi;
import org.eclipse.collections.api.list.ImmutableList;
import org.eclipse.collections.api.map.ImmutableMap;
/**
* @author lanyuanxiaoyao
* @date 2023-07-06
*/
public final class HudiCleanerPlan {
private Integer version;
private String policy;
private Instant earliestInstantToRetain;
private ImmutableMap<String, ImmutableList<Info>> filePathsToBeDeletedPerPartition;
private ImmutableMap<String, ImmutableList<String>> filesToBeDeletedPerPartition;
private ImmutableList<String> partitionsToBeDeleted;
public HudiCleanerPlan() {
}
public HudiCleanerPlan(Integer version, String policy, Instant earliestInstantToRetain, ImmutableMap<String, ImmutableList<Info>> filePathsToBeDeletedPerPartition, ImmutableMap<String, ImmutableList<String>> filesToBeDeletedPerPartition, ImmutableList<String> partitionsToBeDeleted) {
this.version = version;
this.policy = policy;
this.earliestInstantToRetain = earliestInstantToRetain;
this.filePathsToBeDeletedPerPartition = filePathsToBeDeletedPerPartition;
this.filesToBeDeletedPerPartition = filesToBeDeletedPerPartition;
this.partitionsToBeDeleted = partitionsToBeDeleted;
}
public Integer getVersion() {
return version;
}
public String getPolicy() {
return policy;
}
public Instant getEarliestInstantToRetain() {
return earliestInstantToRetain;
}
public ImmutableMap<String, ImmutableList<Info>> getFilePathsToBeDeletedPerPartition() {
return filePathsToBeDeletedPerPartition;
}
public ImmutableMap<String, ImmutableList<String>> getFilesToBeDeletedPerPartition() {
return filesToBeDeletedPerPartition;
}
public ImmutableList<String> getPartitionsToBeDeleted() {
return partitionsToBeDeleted;
}
@Override
public String toString() {
return "HudiCleanerPlan{" +
"version=" + version +
", policy='" + policy + '\'' +
", earliestInstantToRetain=" + earliestInstantToRetain +
", filePathsToBeDeletedPerPartition=" + filePathsToBeDeletedPerPartition +
", filesToBeDeletedPerPartition=" + filesToBeDeletedPerPartition +
", partitionsToBeDeleted=" + partitionsToBeDeleted +
'}';
}
public static final class Instant {
private String action;
private String state;
private String timestamp;
public Instant() {
}
public Instant(String action, String state, String timestamp) {
this.action = action;
this.state = state;
this.timestamp = timestamp;
}
public String getAction() {
return action;
}
public String getState() {
return state;
}
public String getTimestamp() {
return timestamp;
}
@Override
public String toString() {
return "Instant{" +
"action='" + action + '\'' +
", state='" + state + '\'' +
", timestamp='" + timestamp + '\'' +
'}';
}
}
public static final class Info {
private String filePath;
private Boolean isBootstrapBaseFile;
public Info() {
}
public Info(String filePath, Boolean isBootstrapBaseFile) {
this.filePath = filePath;
this.isBootstrapBaseFile = isBootstrapBaseFile;
}
public String getFilePath() {
return filePath;
}
public Boolean getBootstrapBaseFile() {
return isBootstrapBaseFile;
}
@Override
public String toString() {
return "Info{" +
"filePath='" + filePath + '\'' +
", isBootstrapBaseFile=" + isBootstrapBaseFile +
'}';
}
}
}

View File

@@ -10,17 +10,21 @@ import org.eclipse.collections.api.map.ImmutableMap;
* @date 2023-05-11
*/
public final class HudiCompactionPlan {
private Integer version;
private ImmutableList<Operation> operations;
private ImmutableMap<String, String> extraMetadata;
private Integer version;
public HudiCompactionPlan() {
}
public HudiCompactionPlan(ImmutableList<Operation> operations, ImmutableMap<String, String> extraMetadata, Integer version) {
public HudiCompactionPlan(Integer version, ImmutableList<Operation> operations, ImmutableMap<String, String> extraMetadata) {
this.version = version;
this.operations = operations;
this.extraMetadata = extraMetadata;
this.version = version;
}
public Integer getVersion() {
return version;
}
public ImmutableList<Operation> getOperations() {
@@ -31,16 +35,12 @@ public final class HudiCompactionPlan {
return extraMetadata;
}
public Integer getVersion() {
return version;
}
@Override
public String toString() {
return "HudiCompactionPlan{" +
"operations=" + operations +
"version=" + version +
", operations=" + operations +
", extraMetadata=" + extraMetadata +
", version=" + version +
'}';
}

View File

@@ -0,0 +1,125 @@
package com.lanyuanxiaoyao.service.configuration.entity.hudi;
import org.eclipse.collections.api.list.ImmutableList;
import org.eclipse.collections.api.map.ImmutableMap;
/**
* Hudi Rollback
*
* @author lanyuanxiaoyao
* @date 2023-07-06
*/
public final class HudiRollbackPlan {
private Integer version;
private Info instantToRollback;
private ImmutableList<Request> rollbackRequests;
public HudiRollbackPlan() {
}
public HudiRollbackPlan(Integer version, Info instantToRollback, ImmutableList<Request> rollbackRequests) {
this.version = version;
this.instantToRollback = instantToRollback;
this.rollbackRequests = rollbackRequests;
}
public Integer getVersion() {
return version;
}
public Info getInstantToRollback() {
return instantToRollback;
}
public ImmutableList<Request> getRollbackRequests() {
return rollbackRequests;
}
@Override
public String toString() {
return "HudiRollbackPlan{" +
"version=" + version +
", instantToRollback=" + instantToRollback +
", rollbackRequests=" + rollbackRequests +
'}';
}
public static final class Info {
private String action;
private String commitTime;
public Info() {
}
public Info(String action, String commitTime) {
this.action = action;
this.commitTime = commitTime;
}
public String getAction() {
return action;
}
public String getCommitTime() {
return commitTime;
}
@Override
public String toString() {
return "Info{" +
"action='" + action + '\'' +
", commitTime='" + commitTime + '\'' +
'}';
}
}
public static final class Request {
private String fileId;
private String partitionPath;
private String latestBaseInstant;
private ImmutableList<String> filesToBeDeleted;
private ImmutableMap<String, Long> logBlocksToBeDeteled;
public Request() {
}
public Request(String fileId, String partitionPath, String latestBaseInstant, ImmutableList<String> filesToBeDeleted, ImmutableMap<String, Long> logBlocksToBeDeteled) {
this.fileId = fileId;
this.partitionPath = partitionPath;
this.latestBaseInstant = latestBaseInstant;
this.filesToBeDeleted = filesToBeDeleted;
this.logBlocksToBeDeteled = logBlocksToBeDeteled;
}
public String getFileId() {
return fileId;
}
public String getPartitionPath() {
return partitionPath;
}
public String getLatestBaseInstant() {
return latestBaseInstant;
}
public ImmutableList<String> getFilesToBeDeleted() {
return filesToBeDeleted;
}
public ImmutableMap<String, Long> getLogBlocksToBeDeteled() {
return logBlocksToBeDeteled;
}
@Override
public String toString() {
return "Request{" +
"fileId='" + fileId + '\'' +
", partitionPath='" + partitionPath + '\'' +
", latestBaseInstant='" + latestBaseInstant + '\'' +
", filesToBeDeleted=" + filesToBeDeleted +
", logBlocksToBeDeteled=" + logBlocksToBeDeteled +
'}';
}
}
}

View File

@@ -1,8 +1,10 @@
package com.lanyuanxiaoyao.service.hudi.controller;
import com.lanyuanxiaoyao.service.configuration.entity.PageResponse;
import com.lanyuanxiaoyao.service.configuration.entity.hudi.HudiCleanerPlan;
import com.lanyuanxiaoyao.service.configuration.entity.hudi.HudiCompactionPlan;
import com.lanyuanxiaoyao.service.configuration.entity.hudi.HudiInstant;
import com.lanyuanxiaoyao.service.configuration.entity.hudi.HudiRollbackPlan;
import com.lanyuanxiaoyao.service.hudi.service.TimelineService;
import java.io.IOException;
import java.util.List;
@@ -90,13 +92,47 @@ public class TimelineController {
}
@GetMapping("read_compaction_plan_hdfs")
public HudiCompactionPlan readCompactionPlan(
public HudiCompactionPlan readCompactionPlanHdfs(
@RequestParam("hdfs") String hdfs,
@RequestParam("instant") String instant
) throws IOException {
return timelineService.readCompactionPlan(hdfs, instant);
}
@GetMapping("read_rollback_plan")
public HudiRollbackPlan readRollbackPlan(
@RequestParam("flink_job_id") Long flinkJobId,
@RequestParam("alias") String alias,
@RequestParam("instant") String instant
) throws IOException {
return timelineService.readRollbackPlan(flinkJobId, alias, instant);
}
@GetMapping("read_rollback_plan_hdfs")
public HudiRollbackPlan readRollbackPlanHdfs(
@RequestParam("hdfs") String hdfs,
@RequestParam("instant") String instant
) throws IOException {
return timelineService.readRollbackPlan(hdfs, instant);
}
@GetMapping("read_cleaner_plan")
public HudiCleanerPlan readCleanerPlan(
@RequestParam("flink_job_id") Long flinkJobId,
@RequestParam("alias") String alias,
@RequestParam("instant") String instant
) throws IOException {
return timelineService.readCleanerPlan(flinkJobId, alias, instant);
}
@GetMapping("read_cleaner_plan_hdfs")
public HudiCleanerPlan readCleanerPlanHdfs(
@RequestParam("hdfs") String hdfs,
@RequestParam("instant") String instant
) throws IOException {
return timelineService.readCleanerPlan(hdfs, instant);
}
@GetMapping("list_pending_compaction")
public ImmutableList<HudiInstant> pendingCompactionInstants(
@RequestParam("flink_job_id") Long flinkJobId,

View File

@@ -5,8 +5,10 @@ import cn.hutool.core.util.StrUtil;
import com.eshore.odcp.hudi.connector.entity.TableMeta;
import com.lanyuanxiaoyao.service.configuration.ExecutorProvider;
import com.lanyuanxiaoyao.service.configuration.entity.PageResponse;
import com.lanyuanxiaoyao.service.configuration.entity.hudi.HudiCleanerPlan;
import com.lanyuanxiaoyao.service.configuration.entity.hudi.HudiCompactionPlan;
import com.lanyuanxiaoyao.service.configuration.entity.hudi.HudiInstant;
import com.lanyuanxiaoyao.service.configuration.entity.hudi.HudiRollbackPlan;
import com.lanyuanxiaoyao.service.configuration.utils.ComparatorUtil;
import com.lanyuanxiaoyao.service.forest.service.InfoService;
import com.lanyuanxiaoyao.service.hudi.utils.HoodieUtils;
@@ -16,11 +18,15 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hudi.avro.model.HoodieCleanerPlan;
import org.apache.hudi.avro.model.HoodieCompactionPlan;
import org.apache.hudi.avro.model.HoodieRollbackPlan;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.util.CleanerUtils;
import org.apache.hudi.common.util.CompactionUtils;
import org.apache.hudi.table.action.rollback.RollbackUtils;
import org.eclipse.collections.api.factory.Lists;
import org.eclipse.collections.api.factory.Maps;
import org.eclipse.collections.api.list.ImmutableList;
@@ -142,14 +148,14 @@ public class TimelineService {
return new PageResponse<>(result.toList(), hudiInstants.size());
}
@Cacheable(value = "read_compaction_plan", sync = true)
@Cacheable(value = "read-compaction-plan", sync = true)
@Retryable(Throwable.class)
public HudiCompactionPlan readCompactionPlan(Long flinkJobId, String alias, String instant) throws IOException {
TableMeta meta = infoService.tableMetaDetail(flinkJobId, alias);
return readCompactionPlan(meta.getHudi().getTargetHdfsPath(), instant);
}
@Cacheable(value = "read_compaction_plan", sync = true)
@Cacheable(value = "read-compaction-plan", sync = true)
@Retryable(Throwable.class)
public HudiCompactionPlan readCompactionPlan(String hdfs, String instant) throws IOException {
HoodieTableMetaClient client = HoodieTableMetaClient.builder()
@@ -158,6 +164,7 @@ public class TimelineService {
.build();
HoodieCompactionPlan plan = CompactionUtils.getCompactionPlan(client, instant);
return new HudiCompactionPlan(
plan.getVersion(),
ObjectUtil.isNotNull(plan.getOperations())
? Lists.immutable.ofAll(plan.getOperations())
.collect(o -> new HudiCompactionPlan.Operation(
@@ -170,8 +177,73 @@ public class TimelineService {
o.getBootstrapFilePath()
))
: Lists.immutable.empty(),
ObjectUtil.isNotNull(plan.getExtraMetadata()) ? Maps.immutable.ofAll(plan.getExtraMetadata()) : Maps.immutable.empty(),
plan.getVersion()
ObjectUtil.isNotNull(plan.getExtraMetadata()) ? Maps.immutable.ofAll(plan.getExtraMetadata()) : Maps.immutable.empty()
);
}
@Cacheable(value = "read-rollback-plan", sync = true)
@Retryable(Throwable.class)
public HudiRollbackPlan readRollbackPlan(Long flinkJobId, String alias, String instant) throws IOException {
TableMeta meta = infoService.tableMetaDetail(flinkJobId, alias);
return readRollbackPlan(meta.getHudi().getTargetHdfsPath(), instant);
}
@Cacheable(value = "read-rollback-plan", sync = true)
@Retryable(Throwable.class)
public HudiRollbackPlan readRollbackPlan(String hdfs, String instant) throws IOException {
HoodieTableMetaClient client = HoodieTableMetaClient.builder()
.setConf(new Configuration())
.setBasePath(hdfs)
.build();
HoodieRollbackPlan plan = RollbackUtils.getRollbackPlan(client, new HoodieInstant(HoodieInstant.State.INFLIGHT, HoodieTimeline.ROLLBACK_ACTION, instant));
return new HudiRollbackPlan(
plan.getVersion(),
ObjectUtil.isNotNull(plan.getInstantToRollback())
? new HudiRollbackPlan.Info(plan.getInstantToRollback().getAction(), plan.getInstantToRollback().getCommitTime())
: null,
ObjectUtil.isNotNull(plan.getRollbackRequests())
? Lists.immutable.ofAll(plan.getRollbackRequests())
.collect(r -> new HudiRollbackPlan.Request(
r.getFileId(),
r.getPartitionPath(),
r.getLatestBaseInstant(),
Lists.immutable.ofAll(r.getFilesToBeDeleted()),
Maps.immutable.ofAll(r.getLogBlocksToBeDeleted())
))
: Lists.immutable.empty()
);
}
@Cacheable(value = "read-cleaner-plan", sync = true)
@Retryable(Throwable.class)
public HudiCleanerPlan readCleanerPlan(Long flinkJobId, String alias, String instant) throws IOException {
TableMeta meta = infoService.tableMetaDetail(flinkJobId, alias);
return readCleanerPlan(meta.getHudi().getTargetHdfsPath(), instant);
}
@Cacheable(value = "read-cleaner-plan", sync = true)
@Retryable(Throwable.class)
public HudiCleanerPlan readCleanerPlan(String hdfs, String instant) throws IOException {
HoodieTableMetaClient client = HoodieTableMetaClient.builder()
.setConf(new Configuration())
.setBasePath(hdfs)
.build();
HoodieCleanerPlan plan = CleanerUtils.getCleanerPlan(client, HoodieTimeline.getCleanInflightInstant(instant));
return new HudiCleanerPlan(
plan.getVersion(),
plan.getPolicy(),
ObjectUtil.isNotNull(plan.getEarliestInstantToRetain())
? new HudiCleanerPlan.Instant(plan.getEarliestInstantToRetain().getAction(), plan.getEarliestInstantToRetain().getState(), plan.getEarliestInstantToRetain().getTimestamp())
: null,
Maps.immutable.ofAll(plan.getFilePathsToBeDeletedPerPartition())
.collectValues((key, value) -> Lists.immutable.ofAll(value)
.collect(i -> new HudiCleanerPlan.Info(
i.getFilePath(),
i.getIsBootstrapBaseFile()
))),
Maps.immutable.ofAll(plan.getFilesToBeDeletedPerPartition())
.collectValues((key, value) -> Lists.immutable.ofAll(value)),
Lists.immutable.ofAll(plan.getPartitionsToBeDeleted())
);
}

View File

@@ -3,8 +3,10 @@ package com.lanyuanxiaoyao.service.web.controller;
import cn.hutool.core.util.ObjectUtil;
import cn.hutool.core.util.StrUtil;
import com.lanyuanxiaoyao.service.configuration.entity.PageResponse;
import com.lanyuanxiaoyao.service.configuration.entity.hudi.HudiCleanerPlan;
import com.lanyuanxiaoyao.service.configuration.entity.hudi.HudiCompactionPlan;
import com.lanyuanxiaoyao.service.configuration.entity.hudi.HudiInstant;
import com.lanyuanxiaoyao.service.configuration.entity.hudi.HudiRollbackPlan;
import com.lanyuanxiaoyao.service.forest.service.HudiService;
import com.lanyuanxiaoyao.service.web.controller.base.AmisCrudResponse;
import com.lanyuanxiaoyao.service.web.controller.base.AmisResponse;
@@ -116,4 +118,34 @@ public class HudiController extends BaseController {
}
throw new Exception("Flink job id and alias or hdfs cannot be blank");
}
@GetMapping("read_rollback_plan")
public AmisResponse<HudiRollbackPlan> readRollbackPlan(
@RequestParam(value = "flink_job_id", required = false) Long flinkJobId,
@RequestParam(value = "alias", required = false) String alias,
@RequestParam(value = "hdfs", required = false) String hdfs,
@RequestParam("instant") String instant
) throws Exception {
if (StrUtil.isNotBlank(hdfs)) {
return AmisResponse.responseSuccess(hudiService.readRollbackPlanHdfs(hdfs, instant));
} else if (ObjectUtil.isNotNull(flinkJobId) && StrUtil.isNotBlank(alias)) {
return AmisResponse.responseSuccess(hudiService.readRollbackPlan(flinkJobId, alias, instant));
}
throw new Exception("Flink job id and alias or hdfs cannot be blank");
}
@GetMapping("read_cleaner_plan")
public AmisResponse<HudiCleanerPlan> readCleanerPlan(
@RequestParam(value = "flink_job_id", required = false) Long flinkJobId,
@RequestParam(value = "alias", required = false) String alias,
@RequestParam(value = "hdfs", required = false) String hdfs,
@RequestParam("instant") String instant
) throws Exception {
if (StrUtil.isNotBlank(hdfs)) {
return AmisResponse.responseSuccess(hudiService.readCleanerPlanHdfs(hdfs, instant));
} else if (ObjectUtil.isNotNull(flinkJobId) && StrUtil.isNotBlank(alias)) {
return AmisResponse.responseSuccess(hudiService.readCleanerPlan(flinkJobId, alias, instant));
}
throw new Exception("Flink job id and alias or hdfs cannot be blank");
}
}

View File

@@ -899,18 +899,160 @@ function timelineColumns() {
}
}
},
/*{
type: 'tpl',
tpl: '${hdfs}'
{
visibleOn: "action === 'rollback'",
type: 'action',
icon: 'fa fa-eye',
level: 'link',
tooltip: '查看回滚计划',
size: 'sm',
actionType: 'dialog',
dialog: {
title: '回滚计划详情',
actions: [],
size: 'lg',
body: {
type: 'service',
api: {
method: 'get',
url: '${base}/hudi/read_rollback_plan',
data: {
hdfs: '${hdfs|default:undefined}',
flink_job_id: '${flinkJobId|default:undefined}',
alias: '${tableMeta.alias|default:undefined}',
instant: '${timestamp|default:undefined}',
},
},
body: [
{
type: 'property',
title: '回滚目标',
items: [
{
label: 'Action',
content: {
value: '${instantToRollback.action}',
...mappingField('instantToRollback.action', hudiTimelineActionMapping)
},
},
{label: '时间点', content: '${instantToRollback.commitTime}', span: 2},
],
},
{
type: 'crud',
source: '${rollbackRequests}',
...crudCommonOptions(),
columns: [
{
name: 'fileId',
label: '文件 ID',
searchable: true,
},
{
name: 'partitionPath',
label: '分区',
width: 50,
},
{
name: 'latestBaseInstant',
label: '数据文件版本',
},
]
}
]
}
}
},
{
type: 'tpl',
tpl: '${flinkJobId}'
visibleOn: "action === 'clean'",
type: 'action',
icon: 'fa fa-eye',
level: 'link',
tooltip: '查看清理计划',
size: 'sm',
actionType: 'dialog',
dialog: {
title: '清理计划详情',
actions: [],
size: 'lg',
body: {
type: 'service',
api: {
method: 'get',
url: '${base}/hudi/read_cleaner_plan',
data: {
hdfs: '${hdfs|default:undefined}',
flink_job_id: '${flinkJobId|default:undefined}',
alias: '${tableMeta.alias|default:undefined}',
instant: '${timestamp|default:undefined}',
},
adaptor: (payload, response) => {
if (payload.data['filePathsToBeDeletedPerPartition']) {
let map = payload.data['filePathsToBeDeletedPerPartition']
let list = []
Object.keys(map)
.forEach(key => {
list.push({
partitionPath: key,
files: map[key],
})
})
payload.data['filePathsToBeDeletedPerPartition'] = list
}
return payload
}
},
body: [
{
type: 'property',
title: '最早回滚时间点',
items: [
{label: '策略', content: '${policy}', span: 3},
{
label: '操作',
content: {
value: '${earliestInstantToRetain.action}',
...mappingField('earliestInstantToRetain.action', hudiTimelineActionMapping)
},
},
{
label: '状态',
content: {
value: '${earliestInstantToRetain.state}',
...mappingField('earliestInstantToRetain.state', hudiTimelineStateMapping)
},
},
{label: '时间点', content: '${earliestInstantToRetain.timestamp}'},
],
},
{
type: 'crud',
source: '${filePathsToBeDeletedPerPartition}',
...crudCommonOptions(),
loadDataOnce: true,
title: '分区删除文件',
columns: [
{
name: 'partitionPath',
label: '分区',
width: 50,
align: 'center',
},
{
name: 'files',
label: '清理文件',
type: 'list',
className: 'nowrap',
listItem: {
body: '${filePath}',
}
}
]
}
]
}
}
},
{
type: 'tpl',
tpl: '${tableMeta.alias}'
},*/
]
},
{

View File

@@ -29,6 +29,10 @@
margin: 0;
padding: 0;
}
.no-resize textarea {
resize: none !important;
}
</style>
</head>
<body>
@@ -72,7 +76,8 @@
]
}
}
amis.embed(
let debug = true
let server = amis.embed(
'#root',
amisJSON,
{
@@ -83,9 +88,12 @@
},
{
theme: 'ang',
// enableAMISDebug: true,
enableAMISDebug: debug,
},
)
);
if (debug) {
console.log('Source', amisJSON)
}
})()
</script>
</body>