feature(hudi-query): 增加时间线压缩计划读取

This commit is contained in:
2023-07-05 19:07:48 +08:00
parent 7c2d6e5336
commit 317cb5d56f
7 changed files with 213 additions and 111 deletions

View File

@@ -1,38 +0,0 @@
package com.lanyuanxiaoyao.service.hudi.controller;
import com.lanyuanxiaoyao.service.configuration.entity.hudi.HudiCompactionPlan;
import com.lanyuanxiaoyao.service.hudi.service.CompactionService;
import java.io.IOException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
/**
* 压缩操作
*
* @author lanyuanxiaoyao
* @date 2023-05-11
*/
@RestController
@RequestMapping("compaction")
public class CompactionController {
private static final Logger logger = LoggerFactory.getLogger(CompactionController.class);
private final CompactionService compactionService;
public CompactionController(CompactionService compactionService) {
this.compactionService = compactionService;
}
@GetMapping("plan")
public HudiCompactionPlan compactionPlan(
@RequestParam("flink_job_id") Long flinkJobId,
@RequestParam("alias") String alias,
@RequestParam("timestamp") String timestamp
) throws IOException {
return compactionService.getCompactionPlan(flinkJobId, alias, timestamp);
}
}

View File

@@ -1,6 +1,7 @@
package com.lanyuanxiaoyao.service.hudi.controller;
import com.lanyuanxiaoyao.service.configuration.entity.PageResponse;
import com.lanyuanxiaoyao.service.configuration.entity.hudi.HudiCompactionPlan;
import com.lanyuanxiaoyao.service.configuration.entity.hudi.HudiInstant;
import com.lanyuanxiaoyao.service.hudi.service.TimelineService;
import java.io.IOException;
@@ -79,6 +80,23 @@ public class TimelineController {
);
}
@GetMapping("read_compaction_plan")
public HudiCompactionPlan readCompactionPlan(
@RequestParam("flink_job_id") Long flinkJobId,
@RequestParam("alias") String alias,
@RequestParam("instant") String instant
) throws IOException {
return timelineService.readCompactionPlan(flinkJobId, alias, instant);
}
@GetMapping("read_compaction_plan_hdfs")
public HudiCompactionPlan readCompactionPlan(
@RequestParam("hdfs") String hdfs,
@RequestParam("instant") String instant
) throws IOException {
return timelineService.readCompactionPlan(hdfs, instant);
}
@GetMapping("list_pending_compaction")
public ImmutableList<HudiInstant> pendingCompactionInstants(
@RequestParam("flink_job_id") Long flinkJobId,

View File

@@ -1,68 +0,0 @@
package com.lanyuanxiaoyao.service.hudi.service;
import cn.hutool.core.util.ObjectUtil;
import com.eshore.odcp.hudi.connector.entity.TableMeta;
import com.lanyuanxiaoyao.service.configuration.entity.hudi.HudiCompactionPlan;
import com.lanyuanxiaoyao.service.forest.service.InfoService;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hudi.avro.model.HoodieCompactionPlan;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.util.CompactionUtils;
import org.eclipse.collections.api.factory.Lists;
import org.eclipse.collections.api.factory.Maps;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.cache.annotation.Cacheable;
import org.springframework.retry.annotation.Retryable;
import org.springframework.stereotype.Service;
/**
* 压缩相关操作
*
* @author lanyuanxiaoyao
* @date 2023-05-11
*/
@Service
public class CompactionService {
private static final Logger logger = LoggerFactory.getLogger(CompactionService.class);
private final InfoService infoService;
@SuppressWarnings("SpringJavaInjectionPointsAutowiringInspection")
public CompactionService(InfoService infoService) {
this.infoService = infoService;
}
@Cacheable(value = "compaction_plan", sync = true, key = "#flinkJobId.toString()+#alias+#timestamp")
@Retryable(Throwable.class)
public HudiCompactionPlan getCompactionPlan(Long flinkJobId, String alias, String timestamp) throws IOException {
TableMeta meta = infoService.tableMetaDetail(flinkJobId, alias);
HoodieTableMetaClient client = HoodieTableMetaClient.builder()
.setConf(new Configuration())
.setBasePath(meta.getHudi().getTargetHdfsPath())
.build();
try {
HoodieCompactionPlan plan = CompactionUtils.getCompactionPlan(client, timestamp);
return new HudiCompactionPlan(
ObjectUtil.isNotNull(plan.getOperations())
? Lists.immutable.ofAll(plan.getOperations())
.collect(o -> new HudiCompactionPlan.Operation(
o.getBaseInstantTime(),
Lists.immutable.ofAll(o.getDeltaFilePaths()),
o.getDataFilePath(),
o.getFileId(),
o.getPartitionPath(),
Maps.immutable.ofAll(o.getMetrics()),
o.getBootstrapFilePath()
))
: Lists.immutable.empty(),
ObjectUtil.isNotNull(plan.getExtraMetadata()) ? Maps.immutable.ofAll(plan.getExtraMetadata()) : Maps.immutable.empty(),
plan.getVersion()
);
} catch (IOException e) {
logger.error("Read compaction plan failure", e);
throw e;
}
}
}

View File

@@ -5,6 +5,7 @@ import cn.hutool.core.util.StrUtil;
import com.eshore.odcp.hudi.connector.entity.TableMeta;
import com.lanyuanxiaoyao.service.configuration.ExecutorProvider;
import com.lanyuanxiaoyao.service.configuration.entity.PageResponse;
import com.lanyuanxiaoyao.service.configuration.entity.hudi.HudiCompactionPlan;
import com.lanyuanxiaoyao.service.configuration.entity.hudi.HudiInstant;
import com.lanyuanxiaoyao.service.configuration.utils.ComparatorUtil;
import com.lanyuanxiaoyao.service.forest.service.InfoService;
@@ -15,9 +16,11 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hudi.avro.model.HoodieCompactionPlan;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.util.CompactionUtils;
import org.eclipse.collections.api.factory.Lists;
import org.eclipse.collections.api.factory.Maps;
import org.eclipse.collections.api.list.ImmutableList;
@@ -139,6 +142,39 @@ public class TimelineService {
return new PageResponse<>(result.toList(), hudiInstants.size());
}
@Cacheable(value = "read_compaction_plan", sync = true)
@Retryable(Throwable.class)
public HudiCompactionPlan readCompactionPlan(Long flinkJobId, String alias, String instant) throws IOException {
TableMeta meta = infoService.tableMetaDetail(flinkJobId, alias);
return readCompactionPlan(meta.getHudi().getTargetHdfsPath(), instant);
}
@Cacheable(value = "read_compaction_plan", sync = true)
@Retryable(Throwable.class)
public HudiCompactionPlan readCompactionPlan(String hdfs, String instant) throws IOException {
HoodieTableMetaClient client = HoodieTableMetaClient.builder()
.setConf(new Configuration())
.setBasePath(hdfs)
.build();
HoodieCompactionPlan plan = CompactionUtils.getCompactionPlan(client, instant);
return new HudiCompactionPlan(
ObjectUtil.isNotNull(plan.getOperations())
? Lists.immutable.ofAll(plan.getOperations())
.collect(o -> new HudiCompactionPlan.Operation(
o.getBaseInstantTime(),
Lists.immutable.ofAll(o.getDeltaFilePaths()),
o.getDataFilePath(),
o.getFileId(),
o.getPartitionPath(),
Maps.immutable.ofAll(o.getMetrics()),
o.getBootstrapFilePath()
))
: Lists.immutable.empty(),
ObjectUtil.isNotNull(plan.getExtraMetadata()) ? Maps.immutable.ofAll(plan.getExtraMetadata()) : Maps.immutable.empty(),
plan.getVersion()
);
}
@Cacheable(value = "pending_compaction_timeline", sync = true, key = "#flinkJobId.toString()+#alias")
@Retryable(Throwable.class)
public ImmutableList<HudiInstant> pendingCompactionTimeline(Long flinkJobId, String alias) throws IOException {