1
0

Add victoria metrics reporter

This commit is contained in:
v-zhangjc9
2022-06-30 17:50:53 +08:00
parent eb4b741c38
commit 215a794fd3
7 changed files with 376 additions and 1 deletions

View File

@@ -54,6 +54,7 @@ import org.apache.hudi.config.metrics.HoodieMetricsDatadogConfig;
import org.apache.hudi.config.metrics.HoodieMetricsGraphiteConfig;
import org.apache.hudi.config.metrics.HoodieMetricsJmxConfig;
import org.apache.hudi.config.metrics.HoodieMetricsPrometheusConfig;
import org.apache.hudi.config.metrics.HoodieMetricsVictoriaConfig;
import org.apache.hudi.exception.HoodieNotSupportedException;
import org.apache.hudi.execution.bulkinsert.BulkInsertSortMode;
import org.apache.hudi.index.HoodieIndex;
@@ -1857,6 +1858,30 @@ public class HoodieWriteConfig extends HoodieConfig {
return getStringOrDefault(HoodieMetricsConfig.METRICS_REPORTER_PREFIX);
}
public String getVictoriaEndpoint() {
return getString(HoodieMetricsVictoriaConfig.VICTORIA_ENDPOINT);
}
public int getVictoriaTimeout() {
return getInt(HoodieMetricsVictoriaConfig.VICTORIA_TIMEOUT);
}
public String getVictoriaTags() {
return getString(HoodieMetricsVictoriaConfig.VICTORIA_TAGS);
}
public boolean getVictoriaBasicAuthEnable() {
return getBoolean(HoodieMetricsVictoriaConfig.VICTORIA_BASIC_AUTH_ENABLE);
}
public String getVictoriaBasicAuthUsername() {
return getString(HoodieMetricsVictoriaConfig.VICTORIA_BASIC_AUTH_USERNAME);
}
public String getVictoriaBasicAuthPassword() {
return getString(HoodieMetricsVictoriaConfig.VICTORIA_BASIC_AUTH_PASSWORD);
}
/**
* memory configs.
*/

View File

@@ -181,6 +181,8 @@ public class HoodieMetricsConfig extends HoodieConfig {
HoodieMetricsGraphiteConfig.newBuilder().fromProperties(hoodieMetricsConfig.getProps()).build());
hoodieMetricsConfig.setDefaultOnCondition(reporterType == MetricsReporterType.CLOUDWATCH,
HoodieMetricsCloudWatchConfig.newBuilder().fromProperties(hoodieMetricsConfig.getProps()).build());
hoodieMetricsConfig.setDefaultOnCondition(reporterType == MetricsReporterType.VICTORIA,
HoodieMetricsVictoriaConfig.newBuilder().fromProperties(hoodieMetricsConfig.getProps()).build());
return hoodieMetricsConfig;
}
}

View File

@@ -0,0 +1,116 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.config.metrics;
import org.apache.hudi.common.config.ConfigClassProperty;
import org.apache.hudi.common.config.ConfigGroups;
import org.apache.hudi.common.config.ConfigProperty;
import org.apache.hudi.common.config.HoodieConfig;
import java.util.Properties;
import static org.apache.hudi.config.metrics.HoodieMetricsConfig.METRIC_PREFIX;
/**
* Configs for Victoria metrics reporter type.
* <p>
* {@link org.apache.hudi.metrics.MetricsReporterType#VICTORIA}
*/
@ConfigClassProperty(name = "Metrics Configurations for Victoria",
groupName = ConfigGroups.Names.METRICS,
description = "Enables reporting on Hudi metrics using Victoria. "
+ " Hudi publishes metrics on every commit, clean, rollback etc.")
public class HoodieMetricsVictoriaConfig extends HoodieConfig {
public static final String VICTORIA_PREFIX = METRIC_PREFIX + ".victoria";
public static final ConfigProperty<String> VICTORIA_ENDPOINT = ConfigProperty
.key(VICTORIA_PREFIX + ".endpoint")
.defaultValue("http://localhost:8428/api/v1/import/prometheus")
.sinceVersion("0.12.0")
.withDocumentation("Victoria metrics endpoint. eg: http://localhost:8428/api/v1/import/prometheus.");
public static final ConfigProperty<Boolean> VICTORIA_BASIC_AUTH_ENABLE = ConfigProperty
.key(VICTORIA_PREFIX + ".auth.basic.enable")
.defaultValue(false)
.sinceVersion("0.13.0")
.withDocumentation("Enable basic authentication.");
public static final ConfigProperty<String> VICTORIA_BASIC_AUTH_USERNAME = ConfigProperty
.key(VICTORIA_PREFIX + ".auth.basic.username")
.defaultValue("")
.sinceVersion("0.13.0")
.withDocumentation("Basic authentication username");
public static final ConfigProperty<String> VICTORIA_BASIC_AUTH_PASSWORD = ConfigProperty
.key(VICTORIA_PREFIX + ".auth.basic.password")
.defaultValue("")
.sinceVersion("0.13.0")
.withDocumentation("Basic authentication password");
public static final ConfigProperty<Integer> VICTORIA_TIMEOUT = ConfigProperty
.key(VICTORIA_PREFIX + ".timeout")
.defaultValue(60000)
.sinceVersion("0.12.0")
.withDocumentation("Http push timeout. Default 1 minute.");
public static final ConfigProperty<String> VICTORIA_TAGS = ConfigProperty
.key(VICTORIA_PREFIX + ".tags")
.defaultValue("")
.sinceVersion("0.12.0")
.withDocumentation("Extra tags for every metric.");
private HoodieMetricsVictoriaConfig() {
super();
}
public static HoodieMetricsVictoriaConfig.Builder newBuilder() {
return new HoodieMetricsVictoriaConfig.Builder();
}
public static class Builder {
private final HoodieMetricsVictoriaConfig hoodieMetricsVictoriaConfig = new HoodieMetricsVictoriaConfig();
public Builder fromProperties(Properties props) {
this.hoodieMetricsVictoriaConfig.getProps().putAll(props);
return this;
}
public HoodieMetricsVictoriaConfig.Builder withVictoriaEndpoint(String endpoint) {
hoodieMetricsVictoriaConfig.setValue(VICTORIA_ENDPOINT, endpoint);
return this;
}
public HoodieMetricsVictoriaConfig.Builder withVictoriaTimeout(Integer timeout) {
hoodieMetricsVictoriaConfig.setValue(VICTORIA_TIMEOUT, String.valueOf(timeout));
return this;
}
public HoodieMetricsVictoriaConfig.Builder withVictoriaTags(String tags) {
hoodieMetricsVictoriaConfig.setValue(VICTORIA_TAGS, tags);
return this;
}
public HoodieMetricsVictoriaConfig build() {
hoodieMetricsVictoriaConfig.setDefaults(HoodieMetricsVictoriaConfig.class.getName());
return hoodieMetricsVictoriaConfig;
}
}
}

View File

@@ -29,6 +29,7 @@ import org.apache.hudi.metrics.prometheus.PrometheusReporter;
import org.apache.hudi.metrics.prometheus.PushGatewayMetricsReporter;
import com.codahale.metrics.MetricRegistry;
import org.apache.hudi.metrics.victoria.VictoriaMetricsReporter;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
@@ -81,6 +82,9 @@ public class MetricsReporterFactory {
case CLOUDWATCH:
reporter = new CloudWatchMetricsReporter(config, registry);
break;
case VICTORIA:
reporter = new VictoriaMetricsReporter(config, registry);
break;
default:
LOG.error("Reporter type[" + type + "] is not supported.");
break;

View File

@@ -22,5 +22,5 @@ package org.apache.hudi.metrics;
* Types of the reporter supported, hudi also supports user defined reporter.
*/
public enum MetricsReporterType {
GRAPHITE, INMEMORY, JMX, DATADOG, CONSOLE, PROMETHEUS_PUSHGATEWAY, PROMETHEUS, CLOUDWATCH
GRAPHITE, INMEMORY, JMX, DATADOG, CONSOLE, PROMETHEUS_PUSHGATEWAY, PROMETHEUS, CLOUDWATCH, VICTORIA
}

View File

@@ -0,0 +1,91 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.metrics.victoria;
import com.codahale.metrics.MetricFilter;
import com.codahale.metrics.MetricRegistry;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.metrics.MetricsReporter;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import java.io.Closeable;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;
/**
* Implementation of victoria metrics reporter, which could receive prometheus format data
*/
public class VictoriaMetricsReporter extends MetricsReporter {
private static final Logger LOG = LogManager.getLogger(VictoriaMetricsReporter.class);
private final static String TAG_SPLIT = ";";
private final static String TAG_OPERATOR = "=";
private final VictoriaReporter victoriaReporter;
public VictoriaMetricsReporter(HoodieWriteConfig config, MetricRegistry registry) {
String endpoint = config.getVictoriaEndpoint();
int timeout = config.getVictoriaTimeout();
String tagsText = config.getVictoriaTags();
boolean basicEnable = config.getVictoriaBasicAuthEnable();
String basicUsername = config.getVictoriaBasicAuthUsername();
String basicPassword = config.getVictoriaBasicAuthPassword();
Map<String, String> tags = new HashMap<>(10);
if (tagsText != null && !tagsText.isEmpty()) {
for (String item : tagsText.split(TAG_SPLIT)) {
String[] parsed = item.split(TAG_OPERATOR);
tags.put(parsed[0], parsed[1]);
}
}
victoriaReporter = new VictoriaReporter(
registry,
MetricFilter.ALL,
TimeUnit.SECONDS,
TimeUnit.SECONDS,
endpoint,
timeout,
tags,
basicEnable,
basicUsername,
basicPassword
);
}
@Override
public void start() {
victoriaReporter.start(10, TimeUnit.SECONDS);
}
@Override
public void report() {
victoriaReporter.report(null, null, null, null, null);
}
@Override
public Closeable getReporter() {
return victoriaReporter;
}
@Override
public void stop() {
victoriaReporter.stop();
}
}

View File

@@ -0,0 +1,137 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.metrics.victoria;
import com.codahale.metrics.*;
import io.prometheus.client.CollectorRegistry;
import io.prometheus.client.dropwizard.DropwizardExports;
import io.prometheus.client.exporter.common.TextFormat;
import org.apache.http.HttpEntity;
import org.apache.http.HttpResponse;
import org.apache.http.HttpStatus;
import org.apache.http.auth.AuthScope;
import org.apache.http.auth.UsernamePasswordCredentials;
import org.apache.http.client.CredentialsProvider;
import org.apache.http.client.HttpClient;
import org.apache.http.client.config.RequestConfig;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.client.protocol.HttpClientContext;
import org.apache.http.impl.client.BasicCredentialsProvider;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClientBuilder;
import org.apache.http.protocol.HttpContext;
import org.apache.http.util.EntityUtils;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import java.io.IOException;
import java.io.StringWriter;
import java.util.Map;
import java.util.SortedMap;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
public class VictoriaReporter extends ScheduledReporter {
private static final Logger LOG = LogManager.getLogger(VictoriaReporter.class);
private final DropwizardExports metricExports;
private final CollectorRegistry collectorRegistry;
private final String endpoint;
private final Map<String, String> tags;
private final boolean basicAuthEnable;
private final String basicAuthUsername;
private final String basicAuthPassword;
private final RequestConfig requestConfig;
protected VictoriaReporter(MetricRegistry registry,
MetricFilter filter,
TimeUnit rateUnit,
TimeUnit durationUnit,
String endpoint,
Integer timout,
Map<String, String> tags,
boolean basicAuthEnable,
String basicAuthUsername,
String basicAuthPassword) {
super(registry, "hudi-push-victoria-reporter", filter, rateUnit, durationUnit);
this.endpoint = endpoint;
this.tags = tags;
this.basicAuthEnable = basicAuthEnable;
this.basicAuthUsername = basicAuthUsername;
this.basicAuthPassword = basicAuthPassword;
requestConfig = RequestConfig.custom()
.setConnectTimeout(timout)
.setSocketTimeout(timout)
.build();
collectorRegistry = new CollectorRegistry();
metricExports = new DropwizardExports(registry);
metricExports.register(collectorRegistry);
}
@Override
public void report(SortedMap<String, Gauge> gauges,
SortedMap<String, Counter> counters,
SortedMap<String, Histogram> histograms,
SortedMap<String, Meter> meters,
SortedMap<String, Timer> timers) {
try (StringWriter writer = new StringWriter()) {
TextFormat.write004(writer, collectorRegistry.metricFamilySamples());
String query = tags.entrySet()
.stream()
.map(entry -> String.format("extra_label=%s=%s", entry.getKey(), entry.getValue()))
.collect(Collectors.joining("&"));
HttpClientBuilder builder = HttpClientBuilder.create();
if (basicAuthEnable) {
CredentialsProvider provider = new BasicCredentialsProvider();
provider.setCredentials(
new AuthScope(AuthScope.ANY_HOST, AuthScope.ANY_PORT, AuthScope.ANY_REALM),
new UsernamePasswordCredentials(basicAuthUsername, basicAuthPassword)
);
builder.setDefaultCredentialsProvider(provider);
}
try(CloseableHttpClient client = builder.build()) {
HttpPost post = new HttpPost(String.format("%s?%s", endpoint, query));
post.setConfig(requestConfig);
HttpResponse response = client.execute(post);
int code = response.getStatusLine().getStatusCode();
if (code < 200 || code >= 300) {
HttpEntity entity = response.getEntity();
LOG.warn("Fail to push metrics: " + (entity == null ? "" : EntityUtils.toString(entity)));
}
}
} catch (IOException e) {
LOG.error("Fail to push metrics", e);
}
}
@Override
public void start(long period, TimeUnit unit) {
super.start(period, unit);
}
@Override
public void stop() {
super.stop();
collectorRegistry.unregister(metricExports);
}
}