diff --git a/utils/executor/pom.xml b/utils/executor/pom.xml index 7b77fa7..05bfe6b 100644 --- a/utils/executor/pom.xml +++ b/utils/executor/pom.xml @@ -96,10 +96,6 @@ - - cn.hutool - hutool-all - org.apache.parquet parquet-format diff --git a/utils/executor/src/main/java/com/lanyuanxiaoyao/service/executor/metrics/VictoriaMetricsReporter.java b/utils/executor/src/main/java/com/lanyuanxiaoyao/service/executor/metrics/VictoriaMetricsReporter.java index 09a5163..61b4e04 100644 --- a/utils/executor/src/main/java/com/lanyuanxiaoyao/service/executor/metrics/VictoriaMetricsReporter.java +++ b/utils/executor/src/main/java/com/lanyuanxiaoyao/service/executor/metrics/VictoriaMetricsReporter.java @@ -1,9 +1,5 @@ package com.lanyuanxiaoyao.service.executor.metrics; -import cn.hutool.core.util.StrUtil; -import cn.hutool.http.HttpRequest; -import cn.hutool.http.HttpResponse; -import cn.hutool.http.HttpUtil; import io.prometheus.client.CollectorRegistry; import io.prometheus.client.exporter.common.TextFormat; import java.io.IOException; @@ -13,6 +9,17 @@ import java.util.stream.Collectors; import org.apache.flink.metrics.prometheus.AbstractPrometheusReporter; import org.apache.flink.metrics.reporter.InstantiateViaFactory; import org.apache.flink.metrics.reporter.Scheduled; +import org.apache.flink.shaded.hadoop2.org.apache.http.HttpEntity; +import org.apache.flink.shaded.hadoop2.org.apache.http.HttpResponse; +import org.apache.flink.shaded.hadoop2.org.apache.http.auth.AuthScope; +import org.apache.flink.shaded.hadoop2.org.apache.http.auth.UsernamePasswordCredentials; +import org.apache.flink.shaded.hadoop2.org.apache.http.client.CredentialsProvider; +import org.apache.flink.shaded.hadoop2.org.apache.http.client.config.RequestConfig; +import org.apache.flink.shaded.hadoop2.org.apache.http.client.methods.HttpPost; +import org.apache.flink.shaded.hadoop2.org.apache.http.impl.client.BasicCredentialsProvider; +import org.apache.flink.shaded.hadoop2.org.apache.http.impl.client.CloseableHttpClient; +import org.apache.flink.shaded.hadoop2.org.apache.http.impl.client.HttpClientBuilder; +import org.apache.flink.shaded.hadoop2.org.apache.http.util.EntityUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -25,19 +32,22 @@ public class VictoriaMetricsReporter extends AbstractPrometheusReporter implemen private static final Logger logger = LoggerFactory.getLogger(VictoriaMetricsReporter.class); private final String endpoint; - private final Integer timout; private final Map tags; private final Boolean enableBasicAuth; private final String basicAuthUsername; private final String basicAuthPassword; + private final RequestConfig requestConfig; public VictoriaMetricsReporter(String endpoint, Integer timout, Map tags, Boolean enableBasicAuth, String basicAuthUsername, String basicAuthPassword) { this.endpoint = endpoint; - this.timout = timout; this.tags = tags; this.enableBasicAuth = enableBasicAuth; this.basicAuthUsername = basicAuthUsername; this.basicAuthPassword = basicAuthPassword; + this.requestConfig = RequestConfig.custom() + .setConnectTimeout(timout) + .setSocketTimeout(timout) + .build(); } @Override @@ -47,20 +57,35 @@ public class VictoriaMetricsReporter extends AbstractPrometheusReporter implemen String query = tags.entrySet() .stream() - .map(entry -> StrUtil.format("extra_label={}={}", entry.getKey(), entry.getValue())) + .map(entry -> String.format("extra_label=%s=%s", entry.getKey(), entry.getValue())) .collect(Collectors.joining("&")); - HttpRequest request = HttpUtil.createPost(StrUtil.format("{}?{}", endpoint, query)) - .body(writer.toString()) - .timeout(timout); + HttpClientBuilder builder = HttpClientBuilder.create(); if (enableBasicAuth) { - request.basicAuth(basicAuthUsername, basicAuthPassword); + CredentialsProvider provider = new BasicCredentialsProvider(); + provider.setCredentials( + new AuthScope(AuthScope.ANY_HOST, AuthScope.ANY_PORT, AuthScope.ANY_REALM), + new UsernamePasswordCredentials(basicAuthUsername, basicAuthPassword) + ); + builder.setDefaultCredentialsProvider(provider); } - HttpResponse response = request.execute(); - if (!response.isOk()) { - logger.warn("Fail to push metrics: {}, {}, endpoint: {}, tags: {}", response.getStatus(), response.body(), endpoint, tags); + try (CloseableHttpClient client = builder.build()) { + HttpPost post = new HttpPost(String.format("%s?%s", endpoint, query)); + post.setConfig(requestConfig); + HttpResponse response = client.execute(post); + int code = response.getStatusLine().getStatusCode(); + if (code < 200 || code >= 300) { + HttpEntity entity = response.getEntity(); + logger.warn( + "Fail to push metrics: {}, {}, endpoint: {}, tags: {}", + response.getStatusLine().getStatusCode(), + entity == null ? "" : EntityUtils.toString(entity), + endpoint, + tags + ); + } } } catch (IOException e) { - logger.error("Fail to write metrics, endpoint: {}, tags: {}", endpoint, tags, e); + logger.error(String.format("Fail to write metrics, endpoint: %s, tags: %s", endpoint, tags), e); } } } diff --git a/utils/executor/src/main/java/com/lanyuanxiaoyao/service/executor/metrics/VictoriaMetricsReporterFactory.java b/utils/executor/src/main/java/com/lanyuanxiaoyao/service/executor/metrics/VictoriaMetricsReporterFactory.java index 321f269..589742d 100644 --- a/utils/executor/src/main/java/com/lanyuanxiaoyao/service/executor/metrics/VictoriaMetricsReporterFactory.java +++ b/utils/executor/src/main/java/com/lanyuanxiaoyao/service/executor/metrics/VictoriaMetricsReporterFactory.java @@ -1,6 +1,5 @@ package com.lanyuanxiaoyao.service.executor.metrics; -import cn.hutool.core.util.StrUtil; import java.util.HashMap; import java.util.Map; import java.util.Properties; @@ -36,7 +35,7 @@ public class VictoriaMetricsReporterFactory implements MetricReporterFactory { String authPassword = metricConfig.getString(AUTH_PASSWORD.key(), AUTH_PASSWORD.defaultValue()); Map tags = new HashMap<>(10); - if (StrUtil.isNotBlank(tagsText)) { + if (tagsText != null && !tagsText.isEmpty()) { for (String item : tagsText.split(";")) { String[] parsed = item.split("="); tags.put(parsed[0], parsed[1]);