fix(executor): 简化三方依赖引用

This commit is contained in:
v-zhangjc9
2024-04-30 18:29:50 +08:00
parent e894c58d7b
commit 69b5ab558e
3 changed files with 41 additions and 21 deletions

View File

@@ -1,9 +1,5 @@
package com.lanyuanxiaoyao.service.executor.metrics;
import cn.hutool.core.util.StrUtil;
import cn.hutool.http.HttpRequest;
import cn.hutool.http.HttpResponse;
import cn.hutool.http.HttpUtil;
import io.prometheus.client.CollectorRegistry;
import io.prometheus.client.exporter.common.TextFormat;
import java.io.IOException;
@@ -13,6 +9,17 @@ import java.util.stream.Collectors;
import org.apache.flink.metrics.prometheus.AbstractPrometheusReporter;
import org.apache.flink.metrics.reporter.InstantiateViaFactory;
import org.apache.flink.metrics.reporter.Scheduled;
import org.apache.flink.shaded.hadoop2.org.apache.http.HttpEntity;
import org.apache.flink.shaded.hadoop2.org.apache.http.HttpResponse;
import org.apache.flink.shaded.hadoop2.org.apache.http.auth.AuthScope;
import org.apache.flink.shaded.hadoop2.org.apache.http.auth.UsernamePasswordCredentials;
import org.apache.flink.shaded.hadoop2.org.apache.http.client.CredentialsProvider;
import org.apache.flink.shaded.hadoop2.org.apache.http.client.config.RequestConfig;
import org.apache.flink.shaded.hadoop2.org.apache.http.client.methods.HttpPost;
import org.apache.flink.shaded.hadoop2.org.apache.http.impl.client.BasicCredentialsProvider;
import org.apache.flink.shaded.hadoop2.org.apache.http.impl.client.CloseableHttpClient;
import org.apache.flink.shaded.hadoop2.org.apache.http.impl.client.HttpClientBuilder;
import org.apache.flink.shaded.hadoop2.org.apache.http.util.EntityUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -25,19 +32,22 @@ public class VictoriaMetricsReporter extends AbstractPrometheusReporter implemen
private static final Logger logger = LoggerFactory.getLogger(VictoriaMetricsReporter.class);
private final String endpoint;
private final Integer timout;
private final Map<String, String> tags;
private final Boolean enableBasicAuth;
private final String basicAuthUsername;
private final String basicAuthPassword;
private final RequestConfig requestConfig;
public VictoriaMetricsReporter(String endpoint, Integer timout, Map<String, String> tags, Boolean enableBasicAuth, String basicAuthUsername, String basicAuthPassword) {
this.endpoint = endpoint;
this.timout = timout;
this.tags = tags;
this.enableBasicAuth = enableBasicAuth;
this.basicAuthUsername = basicAuthUsername;
this.basicAuthPassword = basicAuthPassword;
this.requestConfig = RequestConfig.custom()
.setConnectTimeout(timout)
.setSocketTimeout(timout)
.build();
}
@Override
@@ -47,20 +57,35 @@ public class VictoriaMetricsReporter extends AbstractPrometheusReporter implemen
String query = tags.entrySet()
.stream()
.map(entry -> StrUtil.format("extra_label={}={}", entry.getKey(), entry.getValue()))
.map(entry -> String.format("extra_label=%s=%s", entry.getKey(), entry.getValue()))
.collect(Collectors.joining("&"));
HttpRequest request = HttpUtil.createPost(StrUtil.format("{}?{}", endpoint, query))
.body(writer.toString())
.timeout(timout);
HttpClientBuilder builder = HttpClientBuilder.create();
if (enableBasicAuth) {
request.basicAuth(basicAuthUsername, basicAuthPassword);
CredentialsProvider provider = new BasicCredentialsProvider();
provider.setCredentials(
new AuthScope(AuthScope.ANY_HOST, AuthScope.ANY_PORT, AuthScope.ANY_REALM),
new UsernamePasswordCredentials(basicAuthUsername, basicAuthPassword)
);
builder.setDefaultCredentialsProvider(provider);
}
HttpResponse response = request.execute();
if (!response.isOk()) {
logger.warn("Fail to push metrics: {}, {}, endpoint: {}, tags: {}", response.getStatus(), response.body(), endpoint, tags);
try (CloseableHttpClient client = builder.build()) {
HttpPost post = new HttpPost(String.format("%s?%s", endpoint, query));
post.setConfig(requestConfig);
HttpResponse response = client.execute(post);
int code = response.getStatusLine().getStatusCode();
if (code < 200 || code >= 300) {
HttpEntity entity = response.getEntity();
logger.warn(
"Fail to push metrics: {}, {}, endpoint: {}, tags: {}",
response.getStatusLine().getStatusCode(),
entity == null ? "" : EntityUtils.toString(entity),
endpoint,
tags
);
}
}
} catch (IOException e) {
logger.error("Fail to write metrics, endpoint: {}, tags: {}", endpoint, tags, e);
logger.error(String.format("Fail to write metrics, endpoint: %s, tags: %s", endpoint, tags), e);
}
}
}

View File

@@ -1,6 +1,5 @@
package com.lanyuanxiaoyao.service.executor.metrics;
import cn.hutool.core.util.StrUtil;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
@@ -36,7 +35,7 @@ public class VictoriaMetricsReporterFactory implements MetricReporterFactory {
String authPassword = metricConfig.getString(AUTH_PASSWORD.key(), AUTH_PASSWORD.defaultValue());
Map<String, String> tags = new HashMap<>(10);
if (StrUtil.isNotBlank(tagsText)) {
if (tagsText != null && !tagsText.isEmpty()) {
for (String item : tagsText.split(";")) {
String[] parsed = item.split("=");
tags.put(parsed[0], parsed[1]);