From e2520fa15e7f9cdf199d7253157aa34b3d2385e6 Mon Sep 17 00:00:00 2001 From: lanyuanxiaoyao Date: Mon, 1 May 2023 15:39:41 +0800 Subject: [PATCH] =?UTF-8?q?feature(hudi-query):=20=E5=A2=9E=E5=8A=A0?= =?UTF-8?q?=E6=9F=A5=E8=AF=A2=20Hudi=20=E8=A1=A8=E5=85=A8=E9=83=A8?= =?UTF-8?q?=E6=97=B6=E9=97=B4=E7=BA=BF=E7=9A=84=E5=8A=9F=E8=83=BD?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 查询全部时间线默认包含已归档的时间线 --- bin/build-hudi-query.sh | 8 +- .../entity/hudi/HudiInstant.java | 81 ++++++++++ service-hudi-query/pom.xml | 151 +++++++++++++++++- .../service/hudi/HudiQueryApplication.java | 22 ++- .../hudi/controller/TimelineController.java | 40 +++++ .../service/hudi/service/TimelineService.java | 67 ++++++++ .../service/hudi/utils/HoodieUtils.java | 64 ++++++++ 7 files changed, 420 insertions(+), 13 deletions(-) create mode 100644 service-configuration/src/main/java/com/lanyuanxiaoyao/service/configuration/entity/hudi/HudiInstant.java create mode 100644 service-hudi-query/src/main/java/com/lanyuanxiaoyao/service/hudi/controller/TimelineController.java create mode 100644 service-hudi-query/src/main/java/com/lanyuanxiaoyao/service/hudi/service/TimelineService.java create mode 100644 service-hudi-query/src/main/java/com/lanyuanxiaoyao/service/hudi/utils/HoodieUtils.java diff --git a/bin/build-hudi-query.sh b/bin/build-hudi-query.sh index c207b33..daf1042 100755 --- a/bin/build-hudi-query.sh +++ b/bin/build-hudi-query.sh @@ -1,5 +1,5 @@ #!/bin/bash -mvn -pl service-configuration clean deploy -D skipTests -P local -s ~/.m2/settings-development.xml -mvn -pl service-info-query clean package spring-boot:repackage -D skipTests -s ~/.m2/settings-development.xml -sshpass -p $(/Users/lanyuanxiaoyao/Project/Work/Host/keepassxc-password.sh SSH/iap/132.122.1.162) scp /Users/lanyuanxiaoyao/Project/IdeaProjects/hudi-service/service-info-query/target/service-info-query-1.0.0-SNAPSHOT.jar iap@132.122.1.162:/apps/iap/tmp/lanyuanxiaoyao -sshpass -p $(/Users/lanyuanxiaoyao/Project/Work/Host/keepassxc-password.sh SSH/iap/132.122.1.162) ssh -o 'StrictHostKeyChecking no' iap@132.122.1.162 'curl ftp://yyy:QeY\!68\)4nH1@132.121.122.15:2222 -T /apps/iap/tmp/lanyuanxiaoyao/service-info-query-1.0.0-SNAPSHOT.jar' \ No newline at end of file +mvn -pl service-configuration,service-forest clean deploy -D skipTests -P local -s ~/.m2/settings-development.xml +mvn -pl service-hudi-query clean package -D skipTests -s ~/.m2/settings-development.xml -P b2s119 +sshpass -p $(/Users/lanyuanxiaoyao/Project/Work/Host/keepassxc-password.sh SSH/iap/132.122.1.162) scp /Users/lanyuanxiaoyao/Project/IdeaProjects/hudi-service/service-hudi-query/target/service-hudi-query-1.0.0-SNAPSHOT.jar iap@132.122.1.162:/apps/iap/tmp/lanyuanxiaoyao +sshpass -p $(/Users/lanyuanxiaoyao/Project/Work/Host/keepassxc-password.sh SSH/iap/132.122.1.162) ssh -o 'StrictHostKeyChecking no' iap@132.122.1.162 'curl ftp://yyy:QeY\!68\)4nH1@132.121.122.15:2222 -T /apps/iap/tmp/lanyuanxiaoyao/service-hudi-query-1.0.0-SNAPSHOT.jar' \ No newline at end of file diff --git a/service-configuration/src/main/java/com/lanyuanxiaoyao/service/configuration/entity/hudi/HudiInstant.java b/service-configuration/src/main/java/com/lanyuanxiaoyao/service/configuration/entity/hudi/HudiInstant.java new file mode 100644 index 0000000..6560ab1 --- /dev/null +++ b/service-configuration/src/main/java/com/lanyuanxiaoyao/service/configuration/entity/hudi/HudiInstant.java @@ -0,0 +1,81 @@ +package com.lanyuanxiaoyao.service.configuration.entity.hudi; + +import java.util.Comparator; +import org.eclipse.collections.api.factory.Maps; +import org.eclipse.collections.api.map.ImmutableMap; + +/** + * Instant + * + * @author lanyuanxiaoyao + * @date 2023-05-01 + */ +public class HudiInstant implements Comparable { + private static final ImmutableMap COMPARABLE_ACTIONS = Maps.immutable.of("compaction", "commit"); + private static final Comparator ACTION_COMPARATOR = + Comparator.comparing(instant -> getComparableAction(instant.getAction())); + private static final Comparator COMPARATOR = + Comparator.comparing(HudiInstant::getTimestamp) + .thenComparing(ACTION_COMPARATOR) + .thenComparing(HudiInstant::getState); + + private static String getComparableAction(String action) { + return COMPARABLE_ACTIONS.getOrDefault(action, action); + } + + private String action; + // REQUESTED, INFLIGHT, COMPLETED, INVALID + private String state; + private String timestamp; + private String fileName; + // active or archive + private String type; + + public HudiInstant() { + } + + public HudiInstant(String action, String state, String timestamp, String fileName, String type) { + this.action = action; + this.state = state; + this.timestamp = timestamp; + this.fileName = fileName; + this.type = type; + } + + public String getAction() { + return action; + } + + public String getState() { + return state; + } + + public String getTimestamp() { + return timestamp; + } + + public String getFileName() { + return fileName; + } + + public String getType() { + return type; + } + + + @Override + public String toString() { + return "HudiInstant{" + + "action='" + action + '\'' + + ", state='" + state + '\'' + + ", timestamp='" + timestamp + '\'' + + ", fileName='" + fileName + '\'' + + ", type='" + type + '\'' + + '}'; + } + + @Override + public int compareTo(HudiInstant o) { + return COMPARATOR.compare(this, o); + } +} diff --git a/service-hudi-query/pom.xml b/service-hudi-query/pom.xml index de4b056..bee2508 100644 --- a/service-hudi-query/pom.xml +++ b/service-hudi-query/pom.xml @@ -11,11 +11,158 @@ service-hudi-query + + + com.lanyuanxiaoyao + service-configuration + 1.0.0-SNAPSHOT + + + com.lanyuanxiaoyao + service-forest + 1.0.0-SNAPSHOT + + + com.google.protobuf + protobuf-java + + + + + com.sun.jersey + jersey-client + 1.19.1 + + + com.sun.jersey + jersey-core + 1.19.1 + + + com.sun.jersey.contribs + jersey-apache-client4 + 1.19.1 + + + org.apache.hadoop + hadoop-client + + + com.squareup.okio + okio + + + com.fasterxml.woodstox + woodstox-core + + + com.google.guava + guava + + + commons-io + commons-io + + + commons-logging + commons-logging + + + + + org.apache.hudi + hudi-flink1.13-bundle + 0.12.0-eshore-SNAPSHOT + + + - org.springframework.boot - spring-boot-maven-plugin + org.apache.maven.plugins + maven-resources-plugin + 3.2.0 + + + copy-config-file + validate + + copy-resources + + + ${project.build.directory}/classes + + + ${project.parent.basedir}/config/${build-tag} + + *.xml + + + + + + + + + org.apache.maven.plugins + maven-shade-plugin + 3.3.0 + + false + true + + + META-INF/spring.handlers + + + META-INF/spring.factories + + + META-INF/spring.schemas + + + com.lanyuanxiaoyao.service.hudi.HudiQueryApplication + + + reference.conf + + + + + + *:* + + META-INF/*.SF + META-INF/*.DSA + META-INF/*.RSA + core-default.xml + hdfs-default.xml + yarn-default.xml + log4j-surefire*.properties + + + + + + + package + + shade + + + + + + org.springframework.boot + spring-boot-maven-plugin + ${spring-boot.version} + + diff --git a/service-hudi-query/src/main/java/com/lanyuanxiaoyao/service/hudi/HudiQueryApplication.java b/service-hudi-query/src/main/java/com/lanyuanxiaoyao/service/hudi/HudiQueryApplication.java index 05159b6..74bd8b4 100644 --- a/service-hudi-query/src/main/java/com/lanyuanxiaoyao/service/hudi/HudiQueryApplication.java +++ b/service-hudi-query/src/main/java/com/lanyuanxiaoyao/service/hudi/HudiQueryApplication.java @@ -1,12 +1,14 @@ package com.lanyuanxiaoyao.service.hudi; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.boot.ApplicationArguments; +import org.springframework.boot.ApplicationRunner; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.boot.autoconfigure.gson.GsonAutoConfiguration; import org.springframework.boot.context.properties.EnableConfigurationProperties; import org.springframework.cloud.client.discovery.EnableDiscoveryClient; -import org.springframework.context.annotation.ComponentScan; -import org.springframework.context.annotation.ComponentScans; import org.springframework.retry.annotation.EnableRetry; /** @@ -14,14 +16,20 @@ import org.springframework.retry.annotation.EnableRetry; * @date 2023-04-27 */ @EnableDiscoveryClient -@SpringBootApplication(exclude = {GsonAutoConfiguration.class}) -@ComponentScans({ - @ComponentScan("com.lanyuanxiaoyao.service") -}) +@SpringBootApplication( + scanBasePackages = {"com.lanyuanxiaoyao.service"}, + exclude = {GsonAutoConfiguration.class} +) @EnableConfigurationProperties @EnableRetry -public class HudiQueryApplication { +public class HudiQueryApplication implements ApplicationRunner { + private static final Logger logger = LoggerFactory.getLogger(HudiQueryApplication.class); + public static void main(String[] args) { SpringApplication.run(HudiQueryApplication.class, args); } + + @Override + public void run(ApplicationArguments args) { + } } diff --git a/service-hudi-query/src/main/java/com/lanyuanxiaoyao/service/hudi/controller/TimelineController.java b/service-hudi-query/src/main/java/com/lanyuanxiaoyao/service/hudi/controller/TimelineController.java new file mode 100644 index 0000000..96d6869 --- /dev/null +++ b/service-hudi-query/src/main/java/com/lanyuanxiaoyao/service/hudi/controller/TimelineController.java @@ -0,0 +1,40 @@ +package com.lanyuanxiaoyao.service.hudi.controller; + +import com.lanyuanxiaoyao.service.configuration.entity.hudi.HudiInstant; +import com.lanyuanxiaoyao.service.hudi.service.TimelineService; +import java.io.IOException; +import org.eclipse.collections.api.list.ImmutableList; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.web.bind.annotation.GetMapping; +import org.springframework.web.bind.annotation.RequestMapping; +import org.springframework.web.bind.annotation.RequestParam; +import org.springframework.web.bind.annotation.RestController; + +/** + * 时间线 + * + * @author lanyuanxiaoyao + * @date 2023-05-01 + */ +@RestController +@RequestMapping("timeline") +public class TimelineController { + private static final Logger logger = LoggerFactory.getLogger(TimelineController.class); + + private final TimelineService timelineService; + + public TimelineController(TimelineService timelineService) { + this.timelineService = timelineService; + } + + @GetMapping("list") + public ImmutableList allInstants(@RequestParam("flink_job_id") Long flinkJobId, @RequestParam("alias") String alias) throws IOException { + return timelineService.timeline(flinkJobId, alias); + } + + @GetMapping("list_hdfs") + public ImmutableList allInstants(@RequestParam("hdfs") String hdfs) throws IOException { + return timelineService.timeline(hdfs); + } +} diff --git a/service-hudi-query/src/main/java/com/lanyuanxiaoyao/service/hudi/service/TimelineService.java b/service-hudi-query/src/main/java/com/lanyuanxiaoyao/service/hudi/service/TimelineService.java new file mode 100644 index 0000000..c8c80ec --- /dev/null +++ b/service-hudi-query/src/main/java/com/lanyuanxiaoyao/service/hudi/service/TimelineService.java @@ -0,0 +1,67 @@ +package com.lanyuanxiaoyao.service.hudi.service; + +import com.eshore.odcp.hudi.connector.entity.TableMeta; +import com.lanyuanxiaoyao.service.configuration.entity.hudi.HudiInstant; +import com.lanyuanxiaoyao.service.forest.service.InfoService; +import com.lanyuanxiaoyao.service.hudi.utils.HoodieUtils; +import java.io.IOException; +import org.apache.hadoop.conf.Configuration; +import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.common.table.timeline.HoodieInstant; +import org.eclipse.collections.api.list.ImmutableList; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.cache.annotation.Cacheable; +import org.springframework.retry.annotation.Retryable; +import org.springframework.stereotype.Service; + +/** + * 时间线查询 + * + * @author lanayuanxiaoyao + * @date 2023-05-01 + */ +@Service +public class TimelineService { + private static final Logger logger = LoggerFactory.getLogger(TimelineService.class); + + private final InfoService infoService; + + @SuppressWarnings("SpringJavaInjectionPointsAutowiringInspection") + public TimelineService(InfoService infoService) { + this.infoService = infoService; + } + + @Cacheable(value = "timeline", sync = true, key = "#flinkJobId.toString()+#alias") + @Retryable(Throwable.class) + public ImmutableList timeline(Long flinkJobId, String alias) throws IOException { + TableMeta meta = infoService.tableMetaDetail(flinkJobId, alias); + return timeline(meta.getHudi().getTargetHdfsPath()); + } + + @Cacheable(value = "timeline", sync = true, key = "#hdfs") + @Retryable(Throwable.class) + public ImmutableList timeline(String hdfs) throws IOException { + HoodieTableMetaClient client = HoodieTableMetaClient.builder() + .setConf(new Configuration()) + .setBasePath(hdfs) + .build(); + ImmutableList activeInstants = HoodieUtils.getAllInstants(client, HoodieTableMetaClient::getActiveTimeline) + .collect(instant -> covert("active", instant)); + ImmutableList archiveInstants = HoodieUtils.getAllInstants(client, HoodieTableMetaClient::getArchivedTimeline) + .collect(instant -> covert("archive", instant)); + return activeInstants.newWithAll(archiveInstants) + .toSortedList(HudiInstant::compareTo) + .toImmutable(); + } + + private HudiInstant covert(String type, HoodieInstant instant) { + return new HudiInstant( + instant.getAction(), + instant.getState().name(), + instant.getTimestamp(), + instant.getFileName(), + type + ); + } +} diff --git a/service-hudi-query/src/main/java/com/lanyuanxiaoyao/service/hudi/utils/HoodieUtils.java b/service-hudi-query/src/main/java/com/lanyuanxiaoyao/service/hudi/utils/HoodieUtils.java new file mode 100644 index 0000000..b08552c --- /dev/null +++ b/service-hudi-query/src/main/java/com/lanyuanxiaoyao/service/hudi/utils/HoodieUtils.java @@ -0,0 +1,64 @@ +package com.lanyuanxiaoyao.service.hudi.utils; + +import cn.hutool.core.util.ReUtil; +import java.io.IOException; +import java.util.Arrays; +import java.util.Comparator; +import java.util.List; +import java.util.Set; +import java.util.function.Function; +import java.util.stream.Collectors; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.common.table.timeline.HoodieDefaultTimeline; +import org.apache.hudi.common.table.timeline.HoodieInstant; +import org.apache.hudi.common.table.timeline.HoodieTimeline; +import org.eclipse.collections.api.factory.Lists; +import org.eclipse.collections.api.list.ImmutableList; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Hudi 工具 + * + * @author lanyuanxiaoyao + * @date 2023-05-01 + */ +public class HoodieUtils { + private static final Logger logger = LoggerFactory.getLogger(HoodieUtils.class); + + public static ImmutableList getAllInstants(HoodieTableMetaClient client, Function getTimeline) throws IOException { + FileSystem fileSystem = client.getRawFs(); + // 直接使用 toString 方法得到的值是被缓存的 + String hdfs = client.getBasePathV2().toUri().toString(); + Path metadataPath = new Path(hdfs + "/.hoodie"); + return getAllInstants(getTimeline.apply(client), fileSystem, metadataPath) + .toSortedList(HoodieInstant::compareTo) + .toImmutable(); + } + + private static ImmutableList getAllInstants(HoodieDefaultTimeline timeline, FileSystem fileSystem, Path metadataPath) throws IOException { + Set committedTimestamps = timeline.getCommitsTimeline() + .filterCompletedInstants() + .getInstants() + .map(HoodieInstant::getTimestamp) + .collect(Collectors.toSet()); + List compactionRequestedTimestamps = Arrays.stream(fileSystem.listStatus(metadataPath)) + .filter(status -> status.getPath().toString().endsWith(HoodieTimeline.REQUESTED_COMPACTION_EXTENSION)) + .map(status -> status.getPath().getName()) + .map(name -> ReUtil.get("^(\\d+)\\..+", name, 1)) + .filter(committedTimestamps::contains) + .collect(Collectors.toList()); + return Lists.immutable.ofAll(timeline.getInstants() + .map(instant -> { + if (compactionRequestedTimestamps.contains(instant.getTimestamp())) { + return new HoodieInstant(HoodieInstant.State.COMPLETED, HoodieTimeline.COMPACTION_ACTION, instant.getTimestamp()); + } + return instant; + }) + .sorted(Comparator.comparingLong(i -> Long.parseLong(i.getTimestamp()))) + .collect(Collectors.toList())); + } +} +