From 9073a7706c4013430c5fd7383882e1a1005cd597 Mon Sep 17 00:00:00 2001 From: lanyuanxiaoyao Date: Mon, 1 May 2023 15:37:55 +0800 Subject: [PATCH] =?UTF-8?q?feature(forest):=20=E5=A2=9E=E5=8A=A0=20forest?= =?UTF-8?q?=20=E6=A8=A1=E5=9D=97?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 为模块间的 HTTP 请求创建一个公共模块,集中模块间的互操作 --- pom.xml | 54 +++++++++++++++++-- service-forest/pom.xml | 27 ++++++++++ .../configuration/ForestsConfiguration.java | 36 +++++++++++++ .../SpringCloudDiscoveryInterceptor.java | 54 +++++++++++++++++++ .../service/forest/service/InfoService.java | 48 +++++++++++++++++ 5 files changed, 216 insertions(+), 3 deletions(-) create mode 100644 service-forest/pom.xml create mode 100644 service-forest/src/main/java/com/lanyuanxiaoyao/service/forest/configuration/ForestsConfiguration.java create mode 100644 service-forest/src/main/java/com/lanyuanxiaoyao/service/forest/configuration/SpringCloudDiscoveryInterceptor.java create mode 100644 service-forest/src/main/java/com/lanyuanxiaoyao/service/forest/service/InfoService.java diff --git a/pom.xml b/pom.xml index 6fa87d1..11534d0 100644 --- a/pom.xml +++ b/pom.xml @@ -16,6 +16,7 @@ service-yarn-query service-flink-query service-web + service-forest @@ -23,8 +24,45 @@ 8 UTF-8 + b1e11 + + 2.6.8 + 2021.0.3 + + + b1e1 + + b1e1 + + + + b1e11 + + b1e11 + + + + b2e1 + + b2e1 + + + + b5s119 + + b5s119 + + + + b2s119 + + b2s119 + + + + com.eshore.odcp.hudi.connector @@ -127,17 +165,27 @@ pulsar-client-admin 2.8.0 + + org.apache.hadoop + hadoop-client + 3.1.2 + + + org.apache.hadoop + hadoop-yarn-client + 3.1.2 + org.springframework.boot spring-boot-dependencies - 2.6.8 + ${spring-boot.version} pom import org.springframework.cloud spring-cloud-dependencies - 2021.0.3 + ${spring-cloud.version} pom import @@ -150,7 +198,7 @@ org.springframework.boot spring-boot-maven-plugin - 2.6.8 + ${spring-boot.version} diff --git a/service-forest/pom.xml b/service-forest/pom.xml new file mode 100644 index 0000000..84eb000 --- /dev/null +++ b/service-forest/pom.xml @@ -0,0 +1,27 @@ + + + 4.0.0 + + com.lanyuanxiaoyao + hudi-service + 1.0.0-SNAPSHOT + + + service-forest + + + + com.lanyuanxiaoyao + service-configuration + 1.0.0-SNAPSHOT + + + com.dtflys.forest + forest-spring-boot-starter + 1.5.30 + + + + \ No newline at end of file diff --git a/service-forest/src/main/java/com/lanyuanxiaoyao/service/forest/configuration/ForestsConfiguration.java b/service-forest/src/main/java/com/lanyuanxiaoyao/service/forest/configuration/ForestsConfiguration.java new file mode 100644 index 0000000..5777e34 --- /dev/null +++ b/service-forest/src/main/java/com/lanyuanxiaoyao/service/forest/configuration/ForestsConfiguration.java @@ -0,0 +1,36 @@ +package com.lanyuanxiaoyao.service.forest.configuration; + +import cn.hutool.core.collection.ListUtil; +import com.dtflys.forest.converter.json.ForestJacksonConverter; +import com.dtflys.forest.springboot.annotation.ForestScan; +import com.dtflys.forest.springboot.properties.ForestConfigurationProperties; +import com.fasterxml.jackson.datatype.eclipsecollections.EclipseCollectionsModule; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; + +/** + * @author lanyuanxiaoyao + * @date 2023-04-24 + */ +@ForestScan(basePackages = { + "com.lanyuanxiaoyao.service.forest.service" +}) +@Configuration +public class ForestsConfiguration { + @Bean + public ForestJacksonConverter forestJacksonConverter() { + ForestJacksonConverter converter = new ForestJacksonConverter(); + converter.getMapper().registerModule(new EclipseCollectionsModule()); + return converter; + } + + /*@Bean + public ForestConfigurationProperties forestConfigurationProperties() { + ForestConfigurationProperties properties = new ForestConfigurationProperties(); + properties.setBackend("httpclient"); + properties.setInterceptors(ListUtil.of(SpringCloudDiscoveryInterceptor.class)); + properties.setLogEnabled(false); + properties.setTimeout(60000); + return properties; + }*/ +} diff --git a/service-forest/src/main/java/com/lanyuanxiaoyao/service/forest/configuration/SpringCloudDiscoveryInterceptor.java b/service-forest/src/main/java/com/lanyuanxiaoyao/service/forest/configuration/SpringCloudDiscoveryInterceptor.java new file mode 100644 index 0000000..cac04b4 --- /dev/null +++ b/service-forest/src/main/java/com/lanyuanxiaoyao/service/forest/configuration/SpringCloudDiscoveryInterceptor.java @@ -0,0 +1,54 @@ +package com.lanyuanxiaoyao.service.forest.configuration; + +import cn.hutool.core.util.ObjectUtil; +import cn.hutool.core.util.RandomUtil; +import cn.hutool.core.util.StrUtil; +import cn.hutool.core.util.URLUtil; +import com.dtflys.forest.auth.BasicAuth; +import com.dtflys.forest.http.ForestAddress; +import com.dtflys.forest.http.ForestRequest; +import com.dtflys.forest.interceptor.Interceptor; +import java.net.URL; +import org.eclipse.collections.api.factory.Lists; +import org.eclipse.collections.api.list.ImmutableList; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.cloud.client.ServiceInstance; +import org.springframework.cloud.client.discovery.DiscoveryClient; +import org.springframework.stereotype.Component; + +/** + * @author lanyuanxiaoyao + * @date 2023-04-24 + */ +@Component +public class SpringCloudDiscoveryInterceptor implements Interceptor { + private static final Logger logger = LoggerFactory.getLogger(SpringCloudDiscoveryInterceptor.class); + + private final DiscoveryClient client; + + public SpringCloudDiscoveryInterceptor(DiscoveryClient client) { + this.client = client; + } + + @Override + public boolean beforeExecute(ForestRequest request) { + // Load + URL url = URLUtil.url(request.getUrl()); + String host = url.getHost(); + if (StrUtil.isNotBlank(host)) { + ImmutableList instances = Lists.immutable.ofAll(client.getInstances(host)); + if (ObjectUtil.isNotEmpty(instances)) { + int index = RandomUtil.randomInt(instances.size()); + ServiceInstance instance = instances.get(index); + request.setAddress(new ForestAddress(instance.getScheme(), instance.getHost(), instance.getPort())); + } + } + + // Basic auth + BasicAuth basicAuth = new BasicAuth("AxhEbscwsJDbYMH2", "cYxg3b4PtWoVD5SjFayWxtnSVsjzRsg4"); + basicAuth.enhanceAuthorization(request); + + return Interceptor.super.beforeExecute(request); + } +} diff --git a/service-forest/src/main/java/com/lanyuanxiaoyao/service/forest/service/InfoService.java b/service-forest/src/main/java/com/lanyuanxiaoyao/service/forest/service/InfoService.java new file mode 100644 index 0000000..1d19de9 --- /dev/null +++ b/service-forest/src/main/java/com/lanyuanxiaoyao/service/forest/service/InfoService.java @@ -0,0 +1,48 @@ +package com.lanyuanxiaoyao.service.forest.service; + +import com.dtflys.forest.annotation.BaseRequest; +import com.dtflys.forest.annotation.ForestClient; +import com.dtflys.forest.annotation.Get; +import com.dtflys.forest.annotation.Query; +import com.eshore.odcp.hudi.connector.entity.FlinkJob; +import com.eshore.odcp.hudi.connector.entity.SyncState; +import com.eshore.odcp.hudi.connector.entity.TableMeta; +import com.lanyuanxiaoyao.service.configuration.entity.PageResponse; +import com.lanyuanxiaoyao.service.configuration.entity.info.JobAndMetas; +import com.lanyuanxiaoyao.service.configuration.entity.info.JobIdAndAlias; +import java.util.Map; +import org.eclipse.collections.api.list.ImmutableList; + +/** + * Info 接口 + * + * @author lanyuanxiaoyao + * @date 2023-04-24 + */ +@ForestClient +@BaseRequest(baseURL = "http://service-info-query") +public interface InfoService { + @Get("/info/job_id_alias") + PageResponse jobIdAndAlias(@Query Map queryMap); + + @Get("/info/job_metas") + ImmutableList jobAndMetas(); + + @Get("/info/flink_job/list") + ImmutableList flinkJobList(); + + @Get("/info/flink_job/detail") + FlinkJob flinkJobDetail(@Query("flink_job_id") Long flinkJobId); + + @Get("/info/table_meta/list") + ImmutableList tableMetaList(); + + @Get("/info/table_meta/list") + ImmutableList tableMetaList(@Query("flink_job_id") Long flinkJobId); + + @Get("/info/table_meta/detail") + TableMeta tableMetaDetail(@Query("flink_job_id") Long flinkJobId, @Query("alias") String alias); + + @Get("/info/sync_state/detail") + SyncState syncStateDetail(@Query("flink_job_id") Long flinkJobId, @Query("alias") String alias); +}