feature(hudi-query): 增加未完成压缩时间线和压缩计划内容查询

未完成压缩时间线经常使用,用于压缩列表查询
This commit is contained in:
2023-05-11 21:58:13 +08:00
parent f42f963550
commit 843eff2656
6 changed files with 249 additions and 4 deletions

View File

@@ -0,0 +1,38 @@
package com.lanyuanxiaoyao.service.hudi.controller;
import com.lanyuanxiaoyao.service.configuration.entity.hudi.HudiCompactionPlan;
import com.lanyuanxiaoyao.service.hudi.service.CompactionService;
import java.io.IOException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
/**
* 压缩操作
*
* @author lanyuanxiaoyao
* @date 2023-05-11
*/
@RestController
@RequestMapping("compaction")
public class CompactionController {
private static final Logger logger = LoggerFactory.getLogger(CompactionController.class);
private final CompactionService compactionService;
public CompactionController(CompactionService compactionService) {
this.compactionService = compactionService;
}
@GetMapping("plan")
public HudiCompactionPlan compactionPlan(
@RequestParam("flink_job_id") Long flinkJobId,
@RequestParam("alias") String alias,
@RequestParam("timestamp") String timestamp
) throws IOException {
return compactionService.getCompactionPlan(flinkJobId, alias, timestamp);
}
}

View File

@@ -46,4 +46,12 @@ public class TimelineController {
) throws IOException {
return timelineService.timeline(hdfs, Lists.immutable.ofAll(filterType));
}
@GetMapping("list_pending_compaction")
public ImmutableList<HudiInstant> pendingCompactionInstants(
@RequestParam("flink_job_id") Long flinkJobId,
@RequestParam("alias") String alias
) throws IOException {
return timelineService.pendingCompactionTimeline(flinkJobId, alias);
}
}

View File

@@ -0,0 +1,68 @@
package com.lanyuanxiaoyao.service.hudi.service;
import cn.hutool.core.util.ObjectUtil;
import com.eshore.odcp.hudi.connector.entity.TableMeta;
import com.lanyuanxiaoyao.service.configuration.entity.hudi.HudiCompactionPlan;
import com.lanyuanxiaoyao.service.forest.service.InfoService;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hudi.avro.model.HoodieCompactionPlan;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.util.CompactionUtils;
import org.eclipse.collections.api.factory.Lists;
import org.eclipse.collections.api.factory.Maps;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.cache.annotation.Cacheable;
import org.springframework.retry.annotation.Retryable;
import org.springframework.stereotype.Service;
/**
* 压缩相关操作
*
* @author lanyuanxiaoyao
* @date 2023-05-11
*/
@Service
public class CompactionService {
private static final Logger logger = LoggerFactory.getLogger(CompactionService.class);
private final InfoService infoService;
@SuppressWarnings("SpringJavaInjectionPointsAutowiringInspection")
public CompactionService(InfoService infoService) {
this.infoService = infoService;
}
@Cacheable(value = "compaction_plan", sync = true, key = "#flinkJobId.toString()+#alias+#timestamp")
@Retryable(Throwable.class)
public HudiCompactionPlan getCompactionPlan(Long flinkJobId, String alias, String timestamp) throws IOException {
TableMeta meta = infoService.tableMetaDetail(flinkJobId, alias);
HoodieTableMetaClient client = HoodieTableMetaClient.builder()
.setConf(new Configuration())
.setBasePath(meta.getHudi().getTargetHdfsPath())
.build();
try {
HoodieCompactionPlan plan = CompactionUtils.getCompactionPlan(client, timestamp);
return new HudiCompactionPlan(
ObjectUtil.isNotNull(plan.getOperations())
? Lists.immutable.ofAll(plan.getOperations())
.collect(o -> new HudiCompactionPlan.Operation(
o.getBaseInstantTime(),
Lists.immutable.ofAll(o.getDeltaFilePaths()),
o.getDataFilePath(),
o.getFileId(),
o.getPartitionPath(),
Maps.immutable.ofAll(o.getMetrics()),
o.getBootstrapFilePath()
))
: Lists.immutable.empty(),
ObjectUtil.isNotNull(plan.getExtraMetadata()) ? Maps.immutable.ofAll(plan.getExtraMetadata()) : Maps.immutable.empty(),
plan.getVersion()
);
} catch (IOException e) {
logger.error("Read compaction plan failure", e);
throw e;
}
}
}

View File

@@ -1,6 +1,7 @@
package com.lanyuanxiaoyao.service.hudi.service;
import cn.hutool.core.util.ObjectUtil;
import cn.hutool.core.util.StrUtil;
import com.eshore.odcp.hudi.connector.entity.TableMeta;
import com.lanyuanxiaoyao.service.configuration.entity.hudi.HudiInstant;
import com.lanyuanxiaoyao.service.forest.service.InfoService;
@@ -9,6 +10,7 @@ import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.eclipse.collections.api.factory.Lists;
import org.eclipse.collections.api.list.ImmutableList;
import org.eclipse.collections.api.list.MutableList;
@@ -27,7 +29,8 @@ import org.springframework.stereotype.Service;
@Service
public class TimelineService {
private static final Logger logger = LoggerFactory.getLogger(TimelineService.class);
private static final String INSTANT_TYPE_ACTIVE = "active";
private static final String INSTANT_TYPE_ARCHIVE = "archive";
private final InfoService infoService;
@SuppressWarnings("SpringJavaInjectionPointsAutowiringInspection")
@@ -42,9 +45,6 @@ public class TimelineService {
return timeline(meta.getHudi().getTargetHdfsPath(), filterType);
}
private static final String INSTANT_TYPE_ACTIVE = "active";
private static final String INSTANT_TYPE_ARCHIVE = "archive";
@Cacheable(value = "timeline", sync = true, key = "#hdfs")
@Retryable(Throwable.class)
public ImmutableList<HudiInstant> timeline(String hdfs, ImmutableList<String> filterType) throws IOException {
@@ -71,6 +71,20 @@ public class TimelineService {
.toImmutable();
}
@Cacheable(value = "pending_compaction_timeline", sync = true, key = "#flinkJobId.toString()+#alias")
@Retryable(Throwable.class)
public ImmutableList<HudiInstant> pendingCompactionTimeline(Long flinkJobId, String alias) throws IOException {
TableMeta meta = infoService.tableMetaDetail(flinkJobId, alias);
HoodieTableMetaClient client = HoodieTableMetaClient.builder()
.setConf(new Configuration())
.setBasePath(meta.getHudi().getTargetHdfsPath())
.build();
return HoodieUtils.getAllInstants(client, HoodieTableMetaClient::getActiveTimeline)
.select(instant -> StrUtil.equals(instant.getAction(), HoodieTimeline.COMPACTION_ACTION))
.reject(instant -> ObjectUtil.equals(instant.getState(), HoodieInstant.State.COMPLETED))
.collect(instant -> covert(INSTANT_TYPE_ACTIVE, instant));
}
private HudiInstant covert(String type, HoodieInstant instant) {
return new HudiInstant(
instant.getAction(),