diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
index 2cb9e8ce8..a983413f6 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
@@ -54,6 +54,7 @@ import org.apache.hudi.config.metrics.HoodieMetricsDatadogConfig;
import org.apache.hudi.config.metrics.HoodieMetricsGraphiteConfig;
import org.apache.hudi.config.metrics.HoodieMetricsJmxConfig;
import org.apache.hudi.config.metrics.HoodieMetricsPrometheusConfig;
+import org.apache.hudi.config.metrics.HoodieMetricsVictoriaConfig;
import org.apache.hudi.exception.HoodieNotSupportedException;
import org.apache.hudi.execution.bulkinsert.BulkInsertSortMode;
import org.apache.hudi.index.HoodieIndex;
@@ -1857,6 +1858,30 @@ public class HoodieWriteConfig extends HoodieConfig {
return getStringOrDefault(HoodieMetricsConfig.METRICS_REPORTER_PREFIX);
}
+ public String getVictoriaEndpoint() {
+ return getString(HoodieMetricsVictoriaConfig.VICTORIA_ENDPOINT);
+ }
+
+ public int getVictoriaTimeout() {
+ return getInt(HoodieMetricsVictoriaConfig.VICTORIA_TIMEOUT);
+ }
+
+ public String getVictoriaTags() {
+ return getString(HoodieMetricsVictoriaConfig.VICTORIA_TAGS);
+ }
+
+ public boolean getVictoriaBasicAuthEnable() {
+ return getBoolean(HoodieMetricsVictoriaConfig.VICTORIA_BASIC_AUTH_ENABLE);
+ }
+
+ public String getVictoriaBasicAuthUsername() {
+ return getString(HoodieMetricsVictoriaConfig.VICTORIA_BASIC_AUTH_USERNAME);
+ }
+
+ public String getVictoriaBasicAuthPassword() {
+ return getString(HoodieMetricsVictoriaConfig.VICTORIA_BASIC_AUTH_PASSWORD);
+ }
+
/**
* memory configs.
*/
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/metrics/HoodieMetricsConfig.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/metrics/HoodieMetricsConfig.java
index a515eb702..6e11ab927 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/metrics/HoodieMetricsConfig.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/metrics/HoodieMetricsConfig.java
@@ -181,6 +181,8 @@ public class HoodieMetricsConfig extends HoodieConfig {
HoodieMetricsGraphiteConfig.newBuilder().fromProperties(hoodieMetricsConfig.getProps()).build());
hoodieMetricsConfig.setDefaultOnCondition(reporterType == MetricsReporterType.CLOUDWATCH,
HoodieMetricsCloudWatchConfig.newBuilder().fromProperties(hoodieMetricsConfig.getProps()).build());
+ hoodieMetricsConfig.setDefaultOnCondition(reporterType == MetricsReporterType.VICTORIA,
+ HoodieMetricsVictoriaConfig.newBuilder().fromProperties(hoodieMetricsConfig.getProps()).build());
return hoodieMetricsConfig;
}
}
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/metrics/HoodieMetricsVictoriaConfig.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/metrics/HoodieMetricsVictoriaConfig.java
new file mode 100644
index 000000000..60c68debd
--- /dev/null
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/metrics/HoodieMetricsVictoriaConfig.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.config.metrics;
+
+import org.apache.hudi.common.config.ConfigClassProperty;
+import org.apache.hudi.common.config.ConfigGroups;
+import org.apache.hudi.common.config.ConfigProperty;
+import org.apache.hudi.common.config.HoodieConfig;
+
+import java.util.Properties;
+
+import static org.apache.hudi.config.metrics.HoodieMetricsConfig.METRIC_PREFIX;
+
+/**
+ * Configs for Victoria metrics reporter type.
+ *
+ * {@link org.apache.hudi.metrics.MetricsReporterType#VICTORIA}
+ */
+@ConfigClassProperty(name = "Metrics Configurations for Victoria",
+ groupName = ConfigGroups.Names.METRICS,
+ description = "Enables reporting on Hudi metrics using Victoria. "
+ + " Hudi publishes metrics on every commit, clean, rollback etc.")
+public class HoodieMetricsVictoriaConfig extends HoodieConfig {
+
+ public static final String VICTORIA_PREFIX = METRIC_PREFIX + ".victoria";
+
+ public static final ConfigProperty VICTORIA_ENDPOINT = ConfigProperty
+ .key(VICTORIA_PREFIX + ".endpoint")
+ .defaultValue("http://localhost:8428/api/v1/import/prometheus")
+ .sinceVersion("0.12.0")
+ .withDocumentation("Victoria metrics endpoint. eg: http://localhost:8428/api/v1/import/prometheus.");
+
+ public static final ConfigProperty VICTORIA_BASIC_AUTH_ENABLE = ConfigProperty
+ .key(VICTORIA_PREFIX + ".auth.basic.enable")
+ .defaultValue(false)
+ .sinceVersion("0.13.0")
+ .withDocumentation("Enable basic authentication.");
+
+ public static final ConfigProperty VICTORIA_BASIC_AUTH_USERNAME = ConfigProperty
+ .key(VICTORIA_PREFIX + ".auth.basic.username")
+ .defaultValue("")
+ .sinceVersion("0.13.0")
+ .withDocumentation("Basic authentication username");
+
+ public static final ConfigProperty VICTORIA_BASIC_AUTH_PASSWORD = ConfigProperty
+ .key(VICTORIA_PREFIX + ".auth.basic.password")
+ .defaultValue("")
+ .sinceVersion("0.13.0")
+ .withDocumentation("Basic authentication password");
+
+ public static final ConfigProperty VICTORIA_TIMEOUT = ConfigProperty
+ .key(VICTORIA_PREFIX + ".timeout")
+ .defaultValue(60000)
+ .sinceVersion("0.12.0")
+ .withDocumentation("Http push timeout. Default 1 minute.");
+
+ public static final ConfigProperty VICTORIA_TAGS = ConfigProperty
+ .key(VICTORIA_PREFIX + ".tags")
+ .defaultValue("")
+ .sinceVersion("0.12.0")
+ .withDocumentation("Extra tags for every metric.");
+
+ private HoodieMetricsVictoriaConfig() {
+ super();
+ }
+
+ public static HoodieMetricsVictoriaConfig.Builder newBuilder() {
+ return new HoodieMetricsVictoriaConfig.Builder();
+ }
+
+ public static class Builder {
+
+ private final HoodieMetricsVictoriaConfig hoodieMetricsVictoriaConfig = new HoodieMetricsVictoriaConfig();
+
+ public Builder fromProperties(Properties props) {
+ this.hoodieMetricsVictoriaConfig.getProps().putAll(props);
+ return this;
+ }
+
+ public HoodieMetricsVictoriaConfig.Builder withVictoriaEndpoint(String endpoint) {
+ hoodieMetricsVictoriaConfig.setValue(VICTORIA_ENDPOINT, endpoint);
+ return this;
+ }
+
+ public HoodieMetricsVictoriaConfig.Builder withVictoriaTimeout(Integer timeout) {
+ hoodieMetricsVictoriaConfig.setValue(VICTORIA_TIMEOUT, String.valueOf(timeout));
+ return this;
+ }
+
+ public HoodieMetricsVictoriaConfig.Builder withVictoriaTags(String tags) {
+ hoodieMetricsVictoriaConfig.setValue(VICTORIA_TAGS, tags);
+ return this;
+ }
+
+ public HoodieMetricsVictoriaConfig build() {
+ hoodieMetricsVictoriaConfig.setDefaults(HoodieMetricsVictoriaConfig.class.getName());
+ return hoodieMetricsVictoriaConfig;
+ }
+ }
+}
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metrics/MetricsReporterFactory.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metrics/MetricsReporterFactory.java
index d81e337b2..4fed6a00b 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metrics/MetricsReporterFactory.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metrics/MetricsReporterFactory.java
@@ -29,6 +29,7 @@ import org.apache.hudi.metrics.prometheus.PrometheusReporter;
import org.apache.hudi.metrics.prometheus.PushGatewayMetricsReporter;
import com.codahale.metrics.MetricRegistry;
+import org.apache.hudi.metrics.victoria.VictoriaMetricsReporter;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
@@ -81,6 +82,9 @@ public class MetricsReporterFactory {
case CLOUDWATCH:
reporter = new CloudWatchMetricsReporter(config, registry);
break;
+ case VICTORIA:
+ reporter = new VictoriaMetricsReporter(config, registry);
+ break;
default:
LOG.error("Reporter type[" + type + "] is not supported.");
break;
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metrics/MetricsReporterType.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metrics/MetricsReporterType.java
index 3c8600159..b0b4ba16f 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metrics/MetricsReporterType.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metrics/MetricsReporterType.java
@@ -22,5 +22,5 @@ package org.apache.hudi.metrics;
* Types of the reporter supported, hudi also supports user defined reporter.
*/
public enum MetricsReporterType {
- GRAPHITE, INMEMORY, JMX, DATADOG, CONSOLE, PROMETHEUS_PUSHGATEWAY, PROMETHEUS, CLOUDWATCH
+ GRAPHITE, INMEMORY, JMX, DATADOG, CONSOLE, PROMETHEUS_PUSHGATEWAY, PROMETHEUS, CLOUDWATCH, VICTORIA
}
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metrics/victoria/VictoriaMetricsReporter.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metrics/victoria/VictoriaMetricsReporter.java
new file mode 100644
index 000000000..9841776cb
--- /dev/null
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metrics/victoria/VictoriaMetricsReporter.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.metrics.victoria;
+
+import com.codahale.metrics.MetricFilter;
+import com.codahale.metrics.MetricRegistry;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.metrics.MetricsReporter;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.io.Closeable;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Implementation of victoria metrics reporter, which could receive prometheus format data
+ */
+public class VictoriaMetricsReporter extends MetricsReporter {
+ private static final Logger LOG = LogManager.getLogger(VictoriaMetricsReporter.class);
+
+ private final static String TAG_SPLIT = ";";
+ private final static String TAG_OPERATOR = "=";
+
+ private final VictoriaReporter victoriaReporter;
+
+ public VictoriaMetricsReporter(HoodieWriteConfig config, MetricRegistry registry) {
+ String endpoint = config.getVictoriaEndpoint();
+ int timeout = config.getVictoriaTimeout();
+ String tagsText = config.getVictoriaTags();
+ boolean basicEnable = config.getVictoriaBasicAuthEnable();
+ String basicUsername = config.getVictoriaBasicAuthUsername();
+ String basicPassword = config.getVictoriaBasicAuthPassword();
+ Map tags = new HashMap<>(10);
+ if (tagsText != null && !tagsText.isEmpty()) {
+ for (String item : tagsText.split(TAG_SPLIT)) {
+ String[] parsed = item.split(TAG_OPERATOR);
+ tags.put(parsed[0], parsed[1]);
+ }
+ }
+ victoriaReporter = new VictoriaReporter(
+ registry,
+ MetricFilter.ALL,
+ TimeUnit.SECONDS,
+ TimeUnit.SECONDS,
+ endpoint,
+ timeout,
+ tags,
+ basicEnable,
+ basicUsername,
+ basicPassword
+ );
+ }
+
+ @Override
+ public void start() {
+ victoriaReporter.start(10, TimeUnit.SECONDS);
+ }
+
+ @Override
+ public void report() {
+ victoriaReporter.report(null, null, null, null, null);
+ }
+
+ @Override
+ public Closeable getReporter() {
+ return victoriaReporter;
+ }
+
+ @Override
+ public void stop() {
+ victoriaReporter.stop();
+ }
+}
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metrics/victoria/VictoriaReporter.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metrics/victoria/VictoriaReporter.java
new file mode 100644
index 000000000..83205be44
--- /dev/null
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metrics/victoria/VictoriaReporter.java
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.metrics.victoria;
+
+import com.codahale.metrics.*;
+import io.prometheus.client.CollectorRegistry;
+import io.prometheus.client.dropwizard.DropwizardExports;
+import io.prometheus.client.exporter.common.TextFormat;
+import org.apache.http.HttpEntity;
+import org.apache.http.HttpResponse;
+import org.apache.http.HttpStatus;
+import org.apache.http.auth.AuthScope;
+import org.apache.http.auth.UsernamePasswordCredentials;
+import org.apache.http.client.CredentialsProvider;
+import org.apache.http.client.HttpClient;
+import org.apache.http.client.config.RequestConfig;
+import org.apache.http.client.methods.HttpPost;
+import org.apache.http.client.protocol.HttpClientContext;
+import org.apache.http.impl.client.BasicCredentialsProvider;
+import org.apache.http.impl.client.CloseableHttpClient;
+import org.apache.http.impl.client.HttpClientBuilder;
+import org.apache.http.protocol.HttpContext;
+import org.apache.http.util.EntityUtils;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.io.IOException;
+import java.io.StringWriter;
+import java.util.Map;
+import java.util.SortedMap;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+public class VictoriaReporter extends ScheduledReporter {
+ private static final Logger LOG = LogManager.getLogger(VictoriaReporter.class);
+
+ private final DropwizardExports metricExports;
+ private final CollectorRegistry collectorRegistry;
+ private final String endpoint;
+ private final Map tags;
+ private final boolean basicAuthEnable;
+ private final String basicAuthUsername;
+ private final String basicAuthPassword;
+ private final RequestConfig requestConfig;
+
+ protected VictoriaReporter(MetricRegistry registry,
+ MetricFilter filter,
+ TimeUnit rateUnit,
+ TimeUnit durationUnit,
+ String endpoint,
+ Integer timout,
+ Map tags,
+ boolean basicAuthEnable,
+ String basicAuthUsername,
+ String basicAuthPassword) {
+ super(registry, "hudi-push-victoria-reporter", filter, rateUnit, durationUnit);
+ this.endpoint = endpoint;
+ this.tags = tags;
+ this.basicAuthEnable = basicAuthEnable;
+ this.basicAuthUsername = basicAuthUsername;
+ this.basicAuthPassword = basicAuthPassword;
+
+ requestConfig = RequestConfig.custom()
+ .setConnectTimeout(timout)
+ .setSocketTimeout(timout)
+ .build();
+
+ collectorRegistry = new CollectorRegistry();
+ metricExports = new DropwizardExports(registry);
+ metricExports.register(collectorRegistry);
+ }
+
+ @Override
+ public void report(SortedMap gauges,
+ SortedMap counters,
+ SortedMap histograms,
+ SortedMap meters,
+ SortedMap timers) {
+ try (StringWriter writer = new StringWriter()) {
+ TextFormat.write004(writer, collectorRegistry.metricFamilySamples());
+
+ String query = tags.entrySet()
+ .stream()
+ .map(entry -> String.format("extra_label=%s=%s", entry.getKey(), entry.getValue()))
+ .collect(Collectors.joining("&"));
+
+ HttpClientBuilder builder = HttpClientBuilder.create();
+ if (basicAuthEnable) {
+ CredentialsProvider provider = new BasicCredentialsProvider();
+ provider.setCredentials(
+ new AuthScope(AuthScope.ANY_HOST, AuthScope.ANY_PORT, AuthScope.ANY_REALM),
+ new UsernamePasswordCredentials(basicAuthUsername, basicAuthPassword)
+ );
+ builder.setDefaultCredentialsProvider(provider);
+ }
+ try(CloseableHttpClient client = builder.build()) {
+ HttpPost post = new HttpPost(String.format("%s?%s", endpoint, query));
+ post.setConfig(requestConfig);
+ HttpResponse response = client.execute(post);
+ int code = response.getStatusLine().getStatusCode();
+ if (code < 200 || code >= 300) {
+ HttpEntity entity = response.getEntity();
+ LOG.warn("Fail to push metrics: " + (entity == null ? "" : EntityUtils.toString(entity)));
+ }
+ }
+ } catch (IOException e) {
+ LOG.error("Fail to push metrics", e);
+ }
+ }
+
+ @Override
+ public void start(long period, TimeUnit unit) {
+ super.start(period, unit);
+ }
+
+ @Override
+ public void stop() {
+ super.stop();
+ collectorRegistry.unregister(metricExports);
+ }
+}