feature(hudi-query): 增加查询 Hudi 表全部时间线的功能
查询全部时间线默认包含已归档的时间线
This commit is contained in:
@@ -1,5 +1,5 @@
|
||||
#!/bin/bash
|
||||
mvn -pl service-configuration clean deploy -D skipTests -P local -s ~/.m2/settings-development.xml
|
||||
mvn -pl service-info-query clean package spring-boot:repackage -D skipTests -s ~/.m2/settings-development.xml
|
||||
sshpass -p $(/Users/lanyuanxiaoyao/Project/Work/Host/keepassxc-password.sh SSH/iap/132.122.1.162) scp /Users/lanyuanxiaoyao/Project/IdeaProjects/hudi-service/service-info-query/target/service-info-query-1.0.0-SNAPSHOT.jar iap@132.122.1.162:/apps/iap/tmp/lanyuanxiaoyao
|
||||
sshpass -p $(/Users/lanyuanxiaoyao/Project/Work/Host/keepassxc-password.sh SSH/iap/132.122.1.162) ssh -o 'StrictHostKeyChecking no' iap@132.122.1.162 'curl ftp://yyy:QeY\!68\)4nH1@132.121.122.15:2222 -T /apps/iap/tmp/lanyuanxiaoyao/service-info-query-1.0.0-SNAPSHOT.jar'
|
||||
mvn -pl service-configuration,service-forest clean deploy -D skipTests -P local -s ~/.m2/settings-development.xml
|
||||
mvn -pl service-hudi-query clean package -D skipTests -s ~/.m2/settings-development.xml -P b2s119
|
||||
sshpass -p $(/Users/lanyuanxiaoyao/Project/Work/Host/keepassxc-password.sh SSH/iap/132.122.1.162) scp /Users/lanyuanxiaoyao/Project/IdeaProjects/hudi-service/service-hudi-query/target/service-hudi-query-1.0.0-SNAPSHOT.jar iap@132.122.1.162:/apps/iap/tmp/lanyuanxiaoyao
|
||||
sshpass -p $(/Users/lanyuanxiaoyao/Project/Work/Host/keepassxc-password.sh SSH/iap/132.122.1.162) ssh -o 'StrictHostKeyChecking no' iap@132.122.1.162 'curl ftp://yyy:QeY\!68\)4nH1@132.121.122.15:2222 -T /apps/iap/tmp/lanyuanxiaoyao/service-hudi-query-1.0.0-SNAPSHOT.jar'
|
||||
@@ -0,0 +1,81 @@
|
||||
package com.lanyuanxiaoyao.service.configuration.entity.hudi;
|
||||
|
||||
import java.util.Comparator;
|
||||
import org.eclipse.collections.api.factory.Maps;
|
||||
import org.eclipse.collections.api.map.ImmutableMap;
|
||||
|
||||
/**
|
||||
* Instant
|
||||
*
|
||||
* @author lanyuanxiaoyao
|
||||
* @date 2023-05-01
|
||||
*/
|
||||
public class HudiInstant implements Comparable<HudiInstant> {
|
||||
private static final ImmutableMap<String, String> COMPARABLE_ACTIONS = Maps.immutable.of("compaction", "commit");
|
||||
private static final Comparator<HudiInstant> ACTION_COMPARATOR =
|
||||
Comparator.comparing(instant -> getComparableAction(instant.getAction()));
|
||||
private static final Comparator<HudiInstant> COMPARATOR =
|
||||
Comparator.comparing(HudiInstant::getTimestamp)
|
||||
.thenComparing(ACTION_COMPARATOR)
|
||||
.thenComparing(HudiInstant::getState);
|
||||
|
||||
private static String getComparableAction(String action) {
|
||||
return COMPARABLE_ACTIONS.getOrDefault(action, action);
|
||||
}
|
||||
|
||||
private String action;
|
||||
// REQUESTED, INFLIGHT, COMPLETED, INVALID
|
||||
private String state;
|
||||
private String timestamp;
|
||||
private String fileName;
|
||||
// active or archive
|
||||
private String type;
|
||||
|
||||
public HudiInstant() {
|
||||
}
|
||||
|
||||
public HudiInstant(String action, String state, String timestamp, String fileName, String type) {
|
||||
this.action = action;
|
||||
this.state = state;
|
||||
this.timestamp = timestamp;
|
||||
this.fileName = fileName;
|
||||
this.type = type;
|
||||
}
|
||||
|
||||
public String getAction() {
|
||||
return action;
|
||||
}
|
||||
|
||||
public String getState() {
|
||||
return state;
|
||||
}
|
||||
|
||||
public String getTimestamp() {
|
||||
return timestamp;
|
||||
}
|
||||
|
||||
public String getFileName() {
|
||||
return fileName;
|
||||
}
|
||||
|
||||
public String getType() {
|
||||
return type;
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return "HudiInstant{" +
|
||||
"action='" + action + '\'' +
|
||||
", state='" + state + '\'' +
|
||||
", timestamp='" + timestamp + '\'' +
|
||||
", fileName='" + fileName + '\'' +
|
||||
", type='" + type + '\'' +
|
||||
'}';
|
||||
}
|
||||
|
||||
@Override
|
||||
public int compareTo(HudiInstant o) {
|
||||
return COMPARATOR.compare(this, o);
|
||||
}
|
||||
}
|
||||
@@ -11,11 +11,158 @@
|
||||
|
||||
<artifactId>service-hudi-query</artifactId>
|
||||
|
||||
<dependencies>
|
||||
<dependency>
|
||||
<groupId>com.lanyuanxiaoyao</groupId>
|
||||
<artifactId>service-configuration</artifactId>
|
||||
<version>1.0.0-SNAPSHOT</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>com.lanyuanxiaoyao</groupId>
|
||||
<artifactId>service-forest</artifactId>
|
||||
<version>1.0.0-SNAPSHOT</version>
|
||||
<exclusions>
|
||||
<exclusion>
|
||||
<groupId>com.google.protobuf</groupId>
|
||||
<artifactId>protobuf-java</artifactId>
|
||||
</exclusion>
|
||||
</exclusions>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>com.sun.jersey</groupId>
|
||||
<artifactId>jersey-client</artifactId>
|
||||
<version>1.19.1</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>com.sun.jersey</groupId>
|
||||
<artifactId>jersey-core</artifactId>
|
||||
<version>1.19.1</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>com.sun.jersey.contribs</groupId>
|
||||
<artifactId>jersey-apache-client4</artifactId>
|
||||
<version>1.19.1</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.hadoop</groupId>
|
||||
<artifactId>hadoop-client</artifactId>
|
||||
<exclusions>
|
||||
<exclusion>
|
||||
<groupId>com.squareup.okio</groupId>
|
||||
<artifactId>okio</artifactId>
|
||||
</exclusion>
|
||||
<exclusion>
|
||||
<groupId>com.fasterxml.woodstox</groupId>
|
||||
<artifactId>woodstox-core</artifactId>
|
||||
</exclusion>
|
||||
<exclusion>
|
||||
<groupId>com.google.guava</groupId>
|
||||
<artifactId>guava</artifactId>
|
||||
</exclusion>
|
||||
<exclusion>
|
||||
<groupId>commons-io</groupId>
|
||||
<artifactId>commons-io</artifactId>
|
||||
</exclusion>
|
||||
<exclusion>
|
||||
<groupId>commons-logging</groupId>
|
||||
<artifactId>commons-logging</artifactId>
|
||||
</exclusion>
|
||||
</exclusions>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.hudi</groupId>
|
||||
<artifactId>hudi-flink1.13-bundle</artifactId>
|
||||
<version>0.12.0-eshore-SNAPSHOT</version>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
|
||||
<build>
|
||||
<plugins>
|
||||
<plugin>
|
||||
<groupId>org.springframework.boot</groupId>
|
||||
<artifactId>spring-boot-maven-plugin</artifactId>
|
||||
<groupId>org.apache.maven.plugins</groupId>
|
||||
<artifactId>maven-resources-plugin</artifactId>
|
||||
<version>3.2.0</version>
|
||||
<executions>
|
||||
<execution>
|
||||
<id>copy-config-file</id>
|
||||
<phase>validate</phase>
|
||||
<goals>
|
||||
<goal>copy-resources</goal>
|
||||
</goals>
|
||||
<configuration>
|
||||
<outputDirectory>${project.build.directory}/classes</outputDirectory>
|
||||
<resources>
|
||||
<resource>
|
||||
<directory>${project.parent.basedir}/config/${build-tag}</directory>
|
||||
<includes>
|
||||
<include>*.xml</include>
|
||||
</includes>
|
||||
</resource>
|
||||
</resources>
|
||||
</configuration>
|
||||
</execution>
|
||||
</executions>
|
||||
</plugin>
|
||||
<plugin>
|
||||
<groupId>org.apache.maven.plugins</groupId>
|
||||
<artifactId>maven-shade-plugin</artifactId>
|
||||
<version>3.3.0</version>
|
||||
<configuration>
|
||||
<createDependencyReducedPom>false</createDependencyReducedPom>
|
||||
<promoteTransitiveDependencies>true</promoteTransitiveDependencies>
|
||||
<transformers>
|
||||
<transformer
|
||||
implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
|
||||
<resource>META-INF/spring.handlers</resource>
|
||||
</transformer>
|
||||
<transformer
|
||||
implementation="org.springframework.boot.maven.PropertiesMergingResourceTransformer">
|
||||
<resource>META-INF/spring.factories</resource>
|
||||
</transformer>
|
||||
<transformer
|
||||
implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
|
||||
<resource>META-INF/spring.schemas</resource>
|
||||
</transformer>
|
||||
<transformer
|
||||
implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
|
||||
<mainClass>com.lanyuanxiaoyao.service.hudi.HudiQueryApplication</mainClass>
|
||||
</transformer>
|
||||
<transformer implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
|
||||
<resource>reference.conf</resource>
|
||||
</transformer>
|
||||
<transformer
|
||||
implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
|
||||
</transformers>
|
||||
<filters>
|
||||
<filter>
|
||||
<artifact>*:*</artifact>
|
||||
<excludes>
|
||||
<exclude>META-INF/*.SF</exclude>
|
||||
<exclude>META-INF/*.DSA</exclude>
|
||||
<exclude>META-INF/*.RSA</exclude>
|
||||
<exclude>core-default.xml</exclude>
|
||||
<exclude>hdfs-default.xml</exclude>
|
||||
<exclude>yarn-default.xml</exclude>
|
||||
<exclude>log4j-surefire*.properties</exclude>
|
||||
</excludes>
|
||||
</filter>
|
||||
</filters>
|
||||
</configuration>
|
||||
<executions>
|
||||
<execution>
|
||||
<phase>package</phase>
|
||||
<goals>
|
||||
<goal>shade</goal>
|
||||
</goals>
|
||||
</execution>
|
||||
</executions>
|
||||
<dependencies>
|
||||
<dependency>
|
||||
<groupId>org.springframework.boot</groupId>
|
||||
<artifactId>spring-boot-maven-plugin</artifactId>
|
||||
<version>${spring-boot.version}</version>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
</plugin>
|
||||
</plugins>
|
||||
</build>
|
||||
|
||||
@@ -1,12 +1,14 @@
|
||||
package com.lanyuanxiaoyao.service.hudi;
|
||||
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.springframework.boot.ApplicationArguments;
|
||||
import org.springframework.boot.ApplicationRunner;
|
||||
import org.springframework.boot.SpringApplication;
|
||||
import org.springframework.boot.autoconfigure.SpringBootApplication;
|
||||
import org.springframework.boot.autoconfigure.gson.GsonAutoConfiguration;
|
||||
import org.springframework.boot.context.properties.EnableConfigurationProperties;
|
||||
import org.springframework.cloud.client.discovery.EnableDiscoveryClient;
|
||||
import org.springframework.context.annotation.ComponentScan;
|
||||
import org.springframework.context.annotation.ComponentScans;
|
||||
import org.springframework.retry.annotation.EnableRetry;
|
||||
|
||||
/**
|
||||
@@ -14,14 +16,20 @@ import org.springframework.retry.annotation.EnableRetry;
|
||||
* @date 2023-04-27
|
||||
*/
|
||||
@EnableDiscoveryClient
|
||||
@SpringBootApplication(exclude = {GsonAutoConfiguration.class})
|
||||
@ComponentScans({
|
||||
@ComponentScan("com.lanyuanxiaoyao.service")
|
||||
})
|
||||
@SpringBootApplication(
|
||||
scanBasePackages = {"com.lanyuanxiaoyao.service"},
|
||||
exclude = {GsonAutoConfiguration.class}
|
||||
)
|
||||
@EnableConfigurationProperties
|
||||
@EnableRetry
|
||||
public class HudiQueryApplication {
|
||||
public class HudiQueryApplication implements ApplicationRunner {
|
||||
private static final Logger logger = LoggerFactory.getLogger(HudiQueryApplication.class);
|
||||
|
||||
public static void main(String[] args) {
|
||||
SpringApplication.run(HudiQueryApplication.class, args);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void run(ApplicationArguments args) {
|
||||
}
|
||||
}
|
||||
|
||||
@@ -0,0 +1,40 @@
|
||||
package com.lanyuanxiaoyao.service.hudi.controller;
|
||||
|
||||
import com.lanyuanxiaoyao.service.configuration.entity.hudi.HudiInstant;
|
||||
import com.lanyuanxiaoyao.service.hudi.service.TimelineService;
|
||||
import java.io.IOException;
|
||||
import org.eclipse.collections.api.list.ImmutableList;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.springframework.web.bind.annotation.GetMapping;
|
||||
import org.springframework.web.bind.annotation.RequestMapping;
|
||||
import org.springframework.web.bind.annotation.RequestParam;
|
||||
import org.springframework.web.bind.annotation.RestController;
|
||||
|
||||
/**
|
||||
* 时间线
|
||||
*
|
||||
* @author lanyuanxiaoyao
|
||||
* @date 2023-05-01
|
||||
*/
|
||||
@RestController
|
||||
@RequestMapping("timeline")
|
||||
public class TimelineController {
|
||||
private static final Logger logger = LoggerFactory.getLogger(TimelineController.class);
|
||||
|
||||
private final TimelineService timelineService;
|
||||
|
||||
public TimelineController(TimelineService timelineService) {
|
||||
this.timelineService = timelineService;
|
||||
}
|
||||
|
||||
@GetMapping("list")
|
||||
public ImmutableList<HudiInstant> allInstants(@RequestParam("flink_job_id") Long flinkJobId, @RequestParam("alias") String alias) throws IOException {
|
||||
return timelineService.timeline(flinkJobId, alias);
|
||||
}
|
||||
|
||||
@GetMapping("list_hdfs")
|
||||
public ImmutableList<HudiInstant> allInstants(@RequestParam("hdfs") String hdfs) throws IOException {
|
||||
return timelineService.timeline(hdfs);
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,67 @@
|
||||
package com.lanyuanxiaoyao.service.hudi.service;
|
||||
|
||||
import com.eshore.odcp.hudi.connector.entity.TableMeta;
|
||||
import com.lanyuanxiaoyao.service.configuration.entity.hudi.HudiInstant;
|
||||
import com.lanyuanxiaoyao.service.forest.service.InfoService;
|
||||
import com.lanyuanxiaoyao.service.hudi.utils.HoodieUtils;
|
||||
import java.io.IOException;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hudi.common.table.HoodieTableMetaClient;
|
||||
import org.apache.hudi.common.table.timeline.HoodieInstant;
|
||||
import org.eclipse.collections.api.list.ImmutableList;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.springframework.cache.annotation.Cacheable;
|
||||
import org.springframework.retry.annotation.Retryable;
|
||||
import org.springframework.stereotype.Service;
|
||||
|
||||
/**
|
||||
* 时间线查询
|
||||
*
|
||||
* @author lanayuanxiaoyao
|
||||
* @date 2023-05-01
|
||||
*/
|
||||
@Service
|
||||
public class TimelineService {
|
||||
private static final Logger logger = LoggerFactory.getLogger(TimelineService.class);
|
||||
|
||||
private final InfoService infoService;
|
||||
|
||||
@SuppressWarnings("SpringJavaInjectionPointsAutowiringInspection")
|
||||
public TimelineService(InfoService infoService) {
|
||||
this.infoService = infoService;
|
||||
}
|
||||
|
||||
@Cacheable(value = "timeline", sync = true, key = "#flinkJobId.toString()+#alias")
|
||||
@Retryable(Throwable.class)
|
||||
public ImmutableList<HudiInstant> timeline(Long flinkJobId, String alias) throws IOException {
|
||||
TableMeta meta = infoService.tableMetaDetail(flinkJobId, alias);
|
||||
return timeline(meta.getHudi().getTargetHdfsPath());
|
||||
}
|
||||
|
||||
@Cacheable(value = "timeline", sync = true, key = "#hdfs")
|
||||
@Retryable(Throwable.class)
|
||||
public ImmutableList<HudiInstant> timeline(String hdfs) throws IOException {
|
||||
HoodieTableMetaClient client = HoodieTableMetaClient.builder()
|
||||
.setConf(new Configuration())
|
||||
.setBasePath(hdfs)
|
||||
.build();
|
||||
ImmutableList<HudiInstant> activeInstants = HoodieUtils.getAllInstants(client, HoodieTableMetaClient::getActiveTimeline)
|
||||
.collect(instant -> covert("active", instant));
|
||||
ImmutableList<HudiInstant> archiveInstants = HoodieUtils.getAllInstants(client, HoodieTableMetaClient::getArchivedTimeline)
|
||||
.collect(instant -> covert("archive", instant));
|
||||
return activeInstants.newWithAll(archiveInstants)
|
||||
.toSortedList(HudiInstant::compareTo)
|
||||
.toImmutable();
|
||||
}
|
||||
|
||||
private HudiInstant covert(String type, HoodieInstant instant) {
|
||||
return new HudiInstant(
|
||||
instant.getAction(),
|
||||
instant.getState().name(),
|
||||
instant.getTimestamp(),
|
||||
instant.getFileName(),
|
||||
type
|
||||
);
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,64 @@
|
||||
package com.lanyuanxiaoyao.service.hudi.utils;
|
||||
|
||||
import cn.hutool.core.util.ReUtil;
|
||||
import java.io.IOException;
|
||||
import java.util.Arrays;
|
||||
import java.util.Comparator;
|
||||
import java.util.List;
|
||||
import java.util.Set;
|
||||
import java.util.function.Function;
|
||||
import java.util.stream.Collectors;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hudi.common.table.HoodieTableMetaClient;
|
||||
import org.apache.hudi.common.table.timeline.HoodieDefaultTimeline;
|
||||
import org.apache.hudi.common.table.timeline.HoodieInstant;
|
||||
import org.apache.hudi.common.table.timeline.HoodieTimeline;
|
||||
import org.eclipse.collections.api.factory.Lists;
|
||||
import org.eclipse.collections.api.list.ImmutableList;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
/**
|
||||
* Hudi 工具
|
||||
*
|
||||
* @author lanyuanxiaoyao
|
||||
* @date 2023-05-01
|
||||
*/
|
||||
public class HoodieUtils {
|
||||
private static final Logger logger = LoggerFactory.getLogger(HoodieUtils.class);
|
||||
|
||||
public static ImmutableList<HoodieInstant> getAllInstants(HoodieTableMetaClient client, Function<HoodieTableMetaClient, HoodieDefaultTimeline> getTimeline) throws IOException {
|
||||
FileSystem fileSystem = client.getRawFs();
|
||||
// 直接使用 toString 方法得到的值是被缓存的
|
||||
String hdfs = client.getBasePathV2().toUri().toString();
|
||||
Path metadataPath = new Path(hdfs + "/.hoodie");
|
||||
return getAllInstants(getTimeline.apply(client), fileSystem, metadataPath)
|
||||
.toSortedList(HoodieInstant::compareTo)
|
||||
.toImmutable();
|
||||
}
|
||||
|
||||
private static ImmutableList<HoodieInstant> getAllInstants(HoodieDefaultTimeline timeline, FileSystem fileSystem, Path metadataPath) throws IOException {
|
||||
Set<String> committedTimestamps = timeline.getCommitsTimeline()
|
||||
.filterCompletedInstants()
|
||||
.getInstants()
|
||||
.map(HoodieInstant::getTimestamp)
|
||||
.collect(Collectors.toSet());
|
||||
List<String> compactionRequestedTimestamps = Arrays.stream(fileSystem.listStatus(metadataPath))
|
||||
.filter(status -> status.getPath().toString().endsWith(HoodieTimeline.REQUESTED_COMPACTION_EXTENSION))
|
||||
.map(status -> status.getPath().getName())
|
||||
.map(name -> ReUtil.get("^(\\d+)\\..+", name, 1))
|
||||
.filter(committedTimestamps::contains)
|
||||
.collect(Collectors.toList());
|
||||
return Lists.immutable.ofAll(timeline.getInstants()
|
||||
.map(instant -> {
|
||||
if (compactionRequestedTimestamps.contains(instant.getTimestamp())) {
|
||||
return new HoodieInstant(HoodieInstant.State.COMPLETED, HoodieTimeline.COMPACTION_ACTION, instant.getTimestamp());
|
||||
}
|
||||
return instant;
|
||||
})
|
||||
.sorted(Comparator.comparingLong(i -> Long.parseLong(i.getTimestamp())))
|
||||
.collect(Collectors.toList()));
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user