diff --git a/service-configuration/src/main/java/com/lanyuanxiaoyao/service/configuration/entity/hudi/HudiCompactionPlan.java b/service-configuration/src/main/java/com/lanyuanxiaoyao/service/configuration/entity/hudi/HudiCompactionPlan.java new file mode 100644 index 0000000..8ad8e90 --- /dev/null +++ b/service-configuration/src/main/java/com/lanyuanxiaoyao/service/configuration/entity/hudi/HudiCompactionPlan.java @@ -0,0 +1,110 @@ +package com.lanyuanxiaoyao.service.configuration.entity.hudi; + +import org.eclipse.collections.api.list.ImmutableList; +import org.eclipse.collections.api.map.ImmutableMap; + +/** + * Hudi 压缩计划 + * + * @author lanyuanxiaoyao + * @date 2023-05-11 + */ +public final class HudiCompactionPlan { + private ImmutableList operations; + private ImmutableMap extraMetadata; + private Integer version; + + public HudiCompactionPlan() { + } + + public HudiCompactionPlan(ImmutableList operations, ImmutableMap extraMetadata, Integer version) { + this.operations = operations; + this.extraMetadata = extraMetadata; + this.version = version; + } + + public ImmutableList getOperations() { + return operations; + } + + public ImmutableMap getExtraMetadata() { + return extraMetadata; + } + + public Integer getVersion() { + return version; + } + + @Override + public String toString() { + return "HudiCompactionPlan{" + + "operations=" + operations + + ", extraMetadata=" + extraMetadata + + ", version=" + version + + '}'; + } + + public static final class Operation { + private String baseInstantTime; + private ImmutableList deltaFilePaths; + private String dataFilePath; + private String fileId; + private String partitionPath; + private ImmutableMap metrics; + private String bootstrapFilePath; + + public Operation() { + } + + public Operation(String baseInstantTime, ImmutableList deltaFilePaths, String dataFilePath, String fileId, String partitionPath, ImmutableMap metrics, String bootstrapFilePath) { + this.baseInstantTime = baseInstantTime; + this.deltaFilePaths = deltaFilePaths; + this.dataFilePath = dataFilePath; + this.fileId = fileId; + this.partitionPath = partitionPath; + this.metrics = metrics; + this.bootstrapFilePath = bootstrapFilePath; + } + + public String getBaseInstantTime() { + return baseInstantTime; + } + + public ImmutableList getDeltaFilePaths() { + return deltaFilePaths; + } + + public String getDataFilePath() { + return dataFilePath; + } + + public String getFileId() { + return fileId; + } + + public String getPartitionPath() { + return partitionPath; + } + + public ImmutableMap getMetrics() { + return metrics; + } + + public String getBootstrapFilePath() { + return bootstrapFilePath; + } + + @Override + public String toString() { + return "Operation{" + + "baseInstantTime='" + baseInstantTime + '\'' + + ", deltaFilePaths=" + deltaFilePaths + + ", dataFilePath='" + dataFilePath + '\'' + + ", fileId='" + fileId + '\'' + + ", partitionPath='" + partitionPath + '\'' + + ", metrics=" + metrics + + ", bootstrapFilePath='" + bootstrapFilePath + '\'' + + '}'; + } + } +} diff --git a/service-forest/src/main/java/com/lanyuanxiaoyao/service/forest/service/HudiService.java b/service-forest/src/main/java/com/lanyuanxiaoyao/service/forest/service/HudiService.java index bda2712..6d1e3cf 100644 --- a/service-forest/src/main/java/com/lanyuanxiaoyao/service/forest/service/HudiService.java +++ b/service-forest/src/main/java/com/lanyuanxiaoyao/service/forest/service/HudiService.java @@ -3,6 +3,7 @@ package com.lanyuanxiaoyao.service.forest.service; import com.dtflys.forest.annotation.BaseRequest; import com.dtflys.forest.annotation.Get; import com.dtflys.forest.annotation.Query; +import com.lanyuanxiaoyao.service.configuration.entity.hudi.HudiCompactionPlan; import com.lanyuanxiaoyao.service.configuration.entity.hudi.HudiInstant; import java.util.Map; import org.eclipse.collections.api.list.ImmutableList; @@ -20,4 +21,10 @@ public interface HudiService { @Get("/timeline/list_hdfs") ImmutableList timelineHdfsList(@Query Map queryMap); + + @Get("/timeline/list_pending_compaction") + ImmutableList timelinePendingCompactionList(@Query("flink_job_id") Long flinkJobId, @Query("alias") String alias); + + @Get("/compaction/plan") + HudiCompactionPlan compactionPlan(@Query("flink_job_id") Long flinkJobId, @Query("alias") String alias, @Query("timestamp") String timestamp); } diff --git a/service-hudi-query/src/main/java/com/lanyuanxiaoyao/service/hudi/controller/CompactionController.java b/service-hudi-query/src/main/java/com/lanyuanxiaoyao/service/hudi/controller/CompactionController.java new file mode 100644 index 0000000..e6a8b88 --- /dev/null +++ b/service-hudi-query/src/main/java/com/lanyuanxiaoyao/service/hudi/controller/CompactionController.java @@ -0,0 +1,38 @@ +package com.lanyuanxiaoyao.service.hudi.controller; + +import com.lanyuanxiaoyao.service.configuration.entity.hudi.HudiCompactionPlan; +import com.lanyuanxiaoyao.service.hudi.service.CompactionService; +import java.io.IOException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.web.bind.annotation.GetMapping; +import org.springframework.web.bind.annotation.RequestMapping; +import org.springframework.web.bind.annotation.RequestParam; +import org.springframework.web.bind.annotation.RestController; + +/** + * 压缩操作 + * + * @author lanyuanxiaoyao + * @date 2023-05-11 + */ +@RestController +@RequestMapping("compaction") +public class CompactionController { + private static final Logger logger = LoggerFactory.getLogger(CompactionController.class); + + private final CompactionService compactionService; + + public CompactionController(CompactionService compactionService) { + this.compactionService = compactionService; + } + + @GetMapping("plan") + public HudiCompactionPlan compactionPlan( + @RequestParam("flink_job_id") Long flinkJobId, + @RequestParam("alias") String alias, + @RequestParam("timestamp") String timestamp + ) throws IOException { + return compactionService.getCompactionPlan(flinkJobId, alias, timestamp); + } +} diff --git a/service-hudi-query/src/main/java/com/lanyuanxiaoyao/service/hudi/controller/TimelineController.java b/service-hudi-query/src/main/java/com/lanyuanxiaoyao/service/hudi/controller/TimelineController.java index ac94095..6ce37b4 100644 --- a/service-hudi-query/src/main/java/com/lanyuanxiaoyao/service/hudi/controller/TimelineController.java +++ b/service-hudi-query/src/main/java/com/lanyuanxiaoyao/service/hudi/controller/TimelineController.java @@ -46,4 +46,12 @@ public class TimelineController { ) throws IOException { return timelineService.timeline(hdfs, Lists.immutable.ofAll(filterType)); } + + @GetMapping("list_pending_compaction") + public ImmutableList pendingCompactionInstants( + @RequestParam("flink_job_id") Long flinkJobId, + @RequestParam("alias") String alias + ) throws IOException { + return timelineService.pendingCompactionTimeline(flinkJobId, alias); + } } diff --git a/service-hudi-query/src/main/java/com/lanyuanxiaoyao/service/hudi/service/CompactionService.java b/service-hudi-query/src/main/java/com/lanyuanxiaoyao/service/hudi/service/CompactionService.java new file mode 100644 index 0000000..6639873 --- /dev/null +++ b/service-hudi-query/src/main/java/com/lanyuanxiaoyao/service/hudi/service/CompactionService.java @@ -0,0 +1,68 @@ +package com.lanyuanxiaoyao.service.hudi.service; + +import cn.hutool.core.util.ObjectUtil; +import com.eshore.odcp.hudi.connector.entity.TableMeta; +import com.lanyuanxiaoyao.service.configuration.entity.hudi.HudiCompactionPlan; +import com.lanyuanxiaoyao.service.forest.service.InfoService; +import java.io.IOException; +import org.apache.hadoop.conf.Configuration; +import org.apache.hudi.avro.model.HoodieCompactionPlan; +import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.common.util.CompactionUtils; +import org.eclipse.collections.api.factory.Lists; +import org.eclipse.collections.api.factory.Maps; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.cache.annotation.Cacheable; +import org.springframework.retry.annotation.Retryable; +import org.springframework.stereotype.Service; + +/** + * 压缩相关操作 + * + * @author lanyuanxiaoyao + * @date 2023-05-11 + */ +@Service +public class CompactionService { + private static final Logger logger = LoggerFactory.getLogger(CompactionService.class); + + private final InfoService infoService; + + @SuppressWarnings("SpringJavaInjectionPointsAutowiringInspection") + public CompactionService(InfoService infoService) { + this.infoService = infoService; + } + + @Cacheable(value = "compaction_plan", sync = true, key = "#flinkJobId.toString()+#alias+#timestamp") + @Retryable(Throwable.class) + public HudiCompactionPlan getCompactionPlan(Long flinkJobId, String alias, String timestamp) throws IOException { + TableMeta meta = infoService.tableMetaDetail(flinkJobId, alias); + HoodieTableMetaClient client = HoodieTableMetaClient.builder() + .setConf(new Configuration()) + .setBasePath(meta.getHudi().getTargetHdfsPath()) + .build(); + try { + HoodieCompactionPlan plan = CompactionUtils.getCompactionPlan(client, timestamp); + return new HudiCompactionPlan( + ObjectUtil.isNotNull(plan.getOperations()) + ? Lists.immutable.ofAll(plan.getOperations()) + .collect(o -> new HudiCompactionPlan.Operation( + o.getBaseInstantTime(), + Lists.immutable.ofAll(o.getDeltaFilePaths()), + o.getDataFilePath(), + o.getFileId(), + o.getPartitionPath(), + Maps.immutable.ofAll(o.getMetrics()), + o.getBootstrapFilePath() + )) + : Lists.immutable.empty(), + ObjectUtil.isNotNull(plan.getExtraMetadata()) ? Maps.immutable.ofAll(plan.getExtraMetadata()) : Maps.immutable.empty(), + plan.getVersion() + ); + } catch (IOException e) { + logger.error("Read compaction plan failure", e); + throw e; + } + } +} diff --git a/service-hudi-query/src/main/java/com/lanyuanxiaoyao/service/hudi/service/TimelineService.java b/service-hudi-query/src/main/java/com/lanyuanxiaoyao/service/hudi/service/TimelineService.java index 236593a..c6dcd3c 100644 --- a/service-hudi-query/src/main/java/com/lanyuanxiaoyao/service/hudi/service/TimelineService.java +++ b/service-hudi-query/src/main/java/com/lanyuanxiaoyao/service/hudi/service/TimelineService.java @@ -1,6 +1,7 @@ package com.lanyuanxiaoyao.service.hudi.service; import cn.hutool.core.util.ObjectUtil; +import cn.hutool.core.util.StrUtil; import com.eshore.odcp.hudi.connector.entity.TableMeta; import com.lanyuanxiaoyao.service.configuration.entity.hudi.HudiInstant; import com.lanyuanxiaoyao.service.forest.service.InfoService; @@ -9,6 +10,7 @@ import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.timeline.HoodieInstant; +import org.apache.hudi.common.table.timeline.HoodieTimeline; import org.eclipse.collections.api.factory.Lists; import org.eclipse.collections.api.list.ImmutableList; import org.eclipse.collections.api.list.MutableList; @@ -27,7 +29,8 @@ import org.springframework.stereotype.Service; @Service public class TimelineService { private static final Logger logger = LoggerFactory.getLogger(TimelineService.class); - + private static final String INSTANT_TYPE_ACTIVE = "active"; + private static final String INSTANT_TYPE_ARCHIVE = "archive"; private final InfoService infoService; @SuppressWarnings("SpringJavaInjectionPointsAutowiringInspection") @@ -42,9 +45,6 @@ public class TimelineService { return timeline(meta.getHudi().getTargetHdfsPath(), filterType); } - private static final String INSTANT_TYPE_ACTIVE = "active"; - private static final String INSTANT_TYPE_ARCHIVE = "archive"; - @Cacheable(value = "timeline", sync = true, key = "#hdfs") @Retryable(Throwable.class) public ImmutableList timeline(String hdfs, ImmutableList filterType) throws IOException { @@ -71,6 +71,20 @@ public class TimelineService { .toImmutable(); } + @Cacheable(value = "pending_compaction_timeline", sync = true, key = "#flinkJobId.toString()+#alias") + @Retryable(Throwable.class) + public ImmutableList pendingCompactionTimeline(Long flinkJobId, String alias) throws IOException { + TableMeta meta = infoService.tableMetaDetail(flinkJobId, alias); + HoodieTableMetaClient client = HoodieTableMetaClient.builder() + .setConf(new Configuration()) + .setBasePath(meta.getHudi().getTargetHdfsPath()) + .build(); + return HoodieUtils.getAllInstants(client, HoodieTableMetaClient::getActiveTimeline) + .select(instant -> StrUtil.equals(instant.getAction(), HoodieTimeline.COMPACTION_ACTION)) + .reject(instant -> ObjectUtil.equals(instant.getState(), HoodieInstant.State.COMPLETED)) + .collect(instant -> covert(INSTANT_TYPE_ACTIVE, instant)); + } + private HudiInstant covert(String type, HoodieInstant instant) { return new HudiInstant( instant.getAction(),