[HUDI-836] Implement datadog metrics reporter (#1572)
- Adds support for emitting metrics to datadog - Tests, configs..
This commit is contained in:
@@ -130,6 +130,9 @@ public class HoodieMetricsConfig extends DefaultHoodieConfig {
|
||||
DEFAULT_JMX_HOST);
|
||||
setDefaultOnCondition(props, !props.containsKey(JMX_PORT), JMX_PORT,
|
||||
String.valueOf(DEFAULT_JMX_PORT));
|
||||
MetricsReporterType reporterType = MetricsReporterType.valueOf(props.getProperty(METRICS_REPORTER_TYPE));
|
||||
setDefaultOnCondition(props, reporterType == MetricsReporterType.DATADOG,
|
||||
HoodieMetricsDatadogConfig.newBuilder().fromProperties(props).build());
|
||||
return config;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -0,0 +1,127 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hudi.config;
|
||||
|
||||
import org.apache.hudi.common.config.DefaultHoodieConfig;
|
||||
|
||||
import javax.annotation.concurrent.Immutable;
|
||||
|
||||
import java.util.Properties;
|
||||
|
||||
import static org.apache.hudi.config.HoodieMetricsConfig.METRIC_PREFIX;
|
||||
|
||||
/**
|
||||
* Configs for Datadog reporter type.
|
||||
* <p>
|
||||
* {@link org.apache.hudi.metrics.MetricsReporterType#DATADOG}
|
||||
*/
|
||||
@Immutable
|
||||
public class HoodieMetricsDatadogConfig extends DefaultHoodieConfig {
|
||||
|
||||
public static final String DATADOG_PREFIX = METRIC_PREFIX + ".datadog";
|
||||
public static final String DATADOG_REPORT_PERIOD_SECONDS = DATADOG_PREFIX + ".report.period.seconds";
|
||||
public static final int DEFAULT_DATADOG_REPORT_PERIOD_SECONDS = 30;
|
||||
public static final String DATADOG_API_SITE = DATADOG_PREFIX + ".api.site";
|
||||
public static final String DATADOG_API_KEY = DATADOG_PREFIX + ".api.key";
|
||||
public static final String DATADOG_API_KEY_SKIP_VALIDATION = DATADOG_PREFIX + ".api.key.skip.validation";
|
||||
public static final boolean DEFAULT_DATADOG_API_KEY_SKIP_VALIDATION = false;
|
||||
public static final String DATADOG_API_KEY_SUPPLIER = DATADOG_PREFIX + ".api.key.supplier";
|
||||
public static final String DATADOG_API_TIMEOUT_SECONDS = DATADOG_PREFIX + ".api.timeout.seconds";
|
||||
public static final int DEFAULT_DATADOG_API_TIMEOUT_SECONDS = 3;
|
||||
public static final String DATADOG_METRIC_PREFIX = DATADOG_PREFIX + ".metric.prefix";
|
||||
public static final String DATADOG_METRIC_HOST = DATADOG_PREFIX + ".metric.host";
|
||||
public static final String DATADOG_METRIC_TAGS = DATADOG_PREFIX + ".metric.tags";
|
||||
|
||||
private HoodieMetricsDatadogConfig(Properties props) {
|
||||
super(props);
|
||||
}
|
||||
|
||||
public static HoodieMetricsDatadogConfig.Builder newBuilder() {
|
||||
return new Builder();
|
||||
}
|
||||
|
||||
public static class Builder {
|
||||
|
||||
private final Properties props = new Properties();
|
||||
|
||||
public Builder fromProperties(Properties props) {
|
||||
this.props.putAll(props);
|
||||
return this;
|
||||
}
|
||||
|
||||
public Builder withDatadogReportPeriodSeconds(int period) {
|
||||
props.setProperty(DATADOG_REPORT_PERIOD_SECONDS, String.valueOf(period));
|
||||
return this;
|
||||
}
|
||||
|
||||
public Builder withDatadogApiSite(String apiSite) {
|
||||
props.setProperty(DATADOG_API_SITE, apiSite);
|
||||
return this;
|
||||
}
|
||||
|
||||
public Builder withDatadogApiKey(String apiKey) {
|
||||
props.setProperty(DATADOG_API_KEY, apiKey);
|
||||
return this;
|
||||
}
|
||||
|
||||
public Builder withDatadogApiKeySkipValidation(boolean skip) {
|
||||
props.setProperty(DATADOG_API_KEY_SKIP_VALIDATION, String.valueOf(skip));
|
||||
return this;
|
||||
}
|
||||
|
||||
public Builder withDatadogApiKeySupplier(String apiKeySupplier) {
|
||||
props.setProperty(DATADOG_API_KEY_SUPPLIER, apiKeySupplier);
|
||||
return this;
|
||||
}
|
||||
|
||||
public Builder withDatadogApiTimeoutSeconds(int timeout) {
|
||||
props.setProperty(DATADOG_API_TIMEOUT_SECONDS, String.valueOf(timeout));
|
||||
return this;
|
||||
}
|
||||
|
||||
public Builder withDatadogPrefix(String prefix) {
|
||||
props.setProperty(DATADOG_METRIC_PREFIX, prefix);
|
||||
return this;
|
||||
}
|
||||
|
||||
public Builder withDatadogHost(String host) {
|
||||
props.setProperty(DATADOG_METRIC_HOST, host);
|
||||
return this;
|
||||
}
|
||||
|
||||
public Builder withDatadogTags(String tags) {
|
||||
props.setProperty(DATADOG_METRIC_TAGS, tags);
|
||||
return this;
|
||||
}
|
||||
|
||||
public HoodieMetricsDatadogConfig build() {
|
||||
HoodieMetricsDatadogConfig config = new HoodieMetricsDatadogConfig(props);
|
||||
setDefaultOnCondition(props, !props.containsKey(DATADOG_REPORT_PERIOD_SECONDS),
|
||||
DATADOG_REPORT_PERIOD_SECONDS,
|
||||
String.valueOf(DEFAULT_DATADOG_REPORT_PERIOD_SECONDS));
|
||||
setDefaultOnCondition(props, !props.containsKey(DATADOG_API_KEY_SKIP_VALIDATION),
|
||||
DATADOG_API_KEY_SKIP_VALIDATION,
|
||||
String.valueOf(DEFAULT_DATADOG_API_KEY_SKIP_VALIDATION));
|
||||
setDefaultOnCondition(props, !props.containsKey(DATADOG_API_TIMEOUT_SECONDS),
|
||||
DATADOG_API_TIMEOUT_SECONDS,
|
||||
String.valueOf(DEFAULT_DATADOG_API_TIMEOUT_SECONDS));
|
||||
return config;
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -28,6 +28,7 @@ import org.apache.hudi.common.table.view.FileSystemViewStorageConfig;
|
||||
import org.apache.hudi.common.util.ReflectionUtils;
|
||||
import org.apache.hudi.index.HoodieIndex;
|
||||
import org.apache.hudi.metrics.MetricsReporterType;
|
||||
import org.apache.hudi.metrics.datadog.DatadogHttpClient.ApiSite;
|
||||
import org.apache.hudi.table.action.compact.strategy.CompactionStrategy;
|
||||
|
||||
import org.apache.parquet.hadoop.metadata.CompressionCodecName;
|
||||
@@ -38,9 +39,13 @@ import java.io.File;
|
||||
import java.io.FileReader;
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Objects;
|
||||
import java.util.Properties;
|
||||
import java.util.function.Supplier;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
/**
|
||||
* Class storing configs for the {@link HoodieWriteClient}.
|
||||
@@ -541,6 +546,45 @@ public class HoodieWriteConfig extends DefaultHoodieConfig {
|
||||
return props.getProperty(HoodieMetricsConfig.JMX_PORT);
|
||||
}
|
||||
|
||||
public int getDatadogReportPeriodSeconds() {
|
||||
return Integer.parseInt(props.getProperty(HoodieMetricsDatadogConfig.DATADOG_REPORT_PERIOD_SECONDS));
|
||||
}
|
||||
|
||||
public ApiSite getDatadogApiSite() {
|
||||
return ApiSite.valueOf(props.getProperty(HoodieMetricsDatadogConfig.DATADOG_API_SITE));
|
||||
}
|
||||
|
||||
public String getDatadogApiKey() {
|
||||
if (props.containsKey(HoodieMetricsDatadogConfig.DATADOG_API_KEY)) {
|
||||
return props.getProperty(HoodieMetricsDatadogConfig.DATADOG_API_KEY);
|
||||
} else {
|
||||
Supplier<String> apiKeySupplier = ReflectionUtils.loadClass(
|
||||
props.getProperty(HoodieMetricsDatadogConfig.DATADOG_API_KEY_SUPPLIER));
|
||||
return apiKeySupplier.get();
|
||||
}
|
||||
}
|
||||
|
||||
public boolean getDatadogApiKeySkipValidation() {
|
||||
return Boolean.parseBoolean(props.getProperty(HoodieMetricsDatadogConfig.DATADOG_API_KEY_SKIP_VALIDATION));
|
||||
}
|
||||
|
||||
public int getDatadogApiTimeoutSeconds() {
|
||||
return Integer.parseInt(props.getProperty(HoodieMetricsDatadogConfig.DATADOG_API_TIMEOUT_SECONDS));
|
||||
}
|
||||
|
||||
public String getDatadogMetricPrefix() {
|
||||
return props.getProperty(HoodieMetricsDatadogConfig.DATADOG_METRIC_PREFIX);
|
||||
}
|
||||
|
||||
public String getDatadogMetricHost() {
|
||||
return props.getProperty(HoodieMetricsDatadogConfig.DATADOG_METRIC_HOST);
|
||||
}
|
||||
|
||||
public List<String> getDatadogMetricTags() {
|
||||
return Arrays.stream(props.getProperty(
|
||||
HoodieMetricsDatadogConfig.DATADOG_METRIC_TAGS).split("\\s*,\\s*")).collect(Collectors.toList());
|
||||
}
|
||||
|
||||
/**
|
||||
* memory configs.
|
||||
*/
|
||||
|
||||
@@ -81,7 +81,7 @@ public class Metrics {
|
||||
public static void registerGauge(String metricName, final long value) {
|
||||
try {
|
||||
MetricRegistry registry = Metrics.getInstance().getRegistry();
|
||||
registry.register(metricName, (Gauge<Long>) () -> value);
|
||||
registry.<Gauge<Long>>register(metricName, () -> value);
|
||||
} catch (Exception e) {
|
||||
// Here we catch all exception, so the major upsert pipeline will not be affected if the
|
||||
// metrics system
|
||||
|
||||
@@ -19,6 +19,7 @@
|
||||
package org.apache.hudi.metrics;
|
||||
|
||||
import org.apache.hudi.config.HoodieWriteConfig;
|
||||
import org.apache.hudi.metrics.datadog.DatadogMetricsReporter;
|
||||
|
||||
import com.codahale.metrics.MetricRegistry;
|
||||
import org.apache.log4j.LogManager;
|
||||
@@ -44,6 +45,9 @@ public class MetricsReporterFactory {
|
||||
case JMX:
|
||||
reporter = new JmxMetricsReporter(config, registry);
|
||||
break;
|
||||
case DATADOG:
|
||||
reporter = new DatadogMetricsReporter(config, registry);
|
||||
break;
|
||||
default:
|
||||
LOG.error("Reporter type[" + type + "] is not supported.");
|
||||
break;
|
||||
|
||||
@@ -22,5 +22,5 @@ package org.apache.hudi.metrics;
|
||||
* Types of the reporter. Right now we only support Graphite. We can include JMX and CSV in the future.
|
||||
*/
|
||||
public enum MetricsReporterType {
|
||||
GRAPHITE, INMEMORY, JMX
|
||||
GRAPHITE, INMEMORY, JMX, DATADOG
|
||||
}
|
||||
|
||||
@@ -0,0 +1,127 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hudi.metrics.datadog;
|
||||
|
||||
import org.apache.hudi.common.util.StringUtils;
|
||||
import org.apache.hudi.common.util.ValidationUtils;
|
||||
|
||||
import org.apache.http.HttpHeaders;
|
||||
import org.apache.http.HttpStatus;
|
||||
import org.apache.http.client.config.RequestConfig;
|
||||
import org.apache.http.client.methods.CloseableHttpResponse;
|
||||
import org.apache.http.client.methods.HttpGet;
|
||||
import org.apache.http.client.methods.HttpPost;
|
||||
import org.apache.http.client.methods.HttpUriRequest;
|
||||
import org.apache.http.entity.ContentType;
|
||||
import org.apache.http.entity.StringEntity;
|
||||
import org.apache.http.impl.client.CloseableHttpClient;
|
||||
import org.apache.http.impl.client.HttpClientBuilder;
|
||||
import org.apache.log4j.LogManager;
|
||||
import org.apache.log4j.Logger;
|
||||
|
||||
import java.io.Closeable;
|
||||
import java.io.IOException;
|
||||
|
||||
/**
|
||||
* Datadog API HTTP client.
|
||||
* <p>
|
||||
* Responsible for API endpoint routing, validating API key, and sending requests with metrics payload.
|
||||
*/
|
||||
public class DatadogHttpClient implements Closeable {
|
||||
|
||||
private static final Logger LOG = LogManager.getLogger(DatadogHttpClient.class);
|
||||
|
||||
private static final String SERIES_URL_FORMAT = "https://app.datadoghq.%s/api/v1/series";
|
||||
private static final String VALIDATE_URL_FORMAT = "https://app.datadoghq.%s/api/v1/validate";
|
||||
private static final String HEADER_KEY_API_KEY = "DD-API-KEY";
|
||||
|
||||
private final String apiKey;
|
||||
private final String seriesUrl;
|
||||
private final String validateUrl;
|
||||
private final CloseableHttpClient client;
|
||||
|
||||
public DatadogHttpClient(ApiSite apiSite, String apiKey, boolean skipValidation, CloseableHttpClient client) {
|
||||
this.apiKey = apiKey;
|
||||
this.seriesUrl = String.format(SERIES_URL_FORMAT, apiSite.getDomain());
|
||||
this.validateUrl = String.format(VALIDATE_URL_FORMAT, apiSite.getDomain());
|
||||
this.client = client;
|
||||
if (!skipValidation) {
|
||||
validateApiKey();
|
||||
}
|
||||
}
|
||||
|
||||
public DatadogHttpClient(ApiSite apiSite, String apiKey, boolean skipValidation, int timeoutSeconds) {
|
||||
this(apiSite, apiKey, skipValidation, HttpClientBuilder.create()
|
||||
.setDefaultRequestConfig(RequestConfig.custom()
|
||||
.setConnectTimeout(timeoutSeconds * 1000)
|
||||
.setConnectionRequestTimeout(timeoutSeconds * 1000)
|
||||
.setSocketTimeout(timeoutSeconds * 1000).build())
|
||||
.build());
|
||||
}
|
||||
|
||||
private void validateApiKey() {
|
||||
ValidationUtils.checkArgument(!StringUtils.isNullOrEmpty(apiKey),
|
||||
"API key is null or empty.");
|
||||
|
||||
HttpUriRequest request = new HttpGet(validateUrl);
|
||||
request.setHeader(HEADER_KEY_API_KEY, apiKey);
|
||||
try (CloseableHttpResponse response = client.execute(request)) {
|
||||
int statusCode = response.getStatusLine().getStatusCode();
|
||||
ValidationUtils.checkState(statusCode == HttpStatus.SC_OK, "API key is invalid.");
|
||||
} catch (IOException e) {
|
||||
throw new IllegalStateException("Failed to connect to Datadog to validate API key.", e);
|
||||
}
|
||||
}
|
||||
|
||||
public void send(String payload) {
|
||||
HttpPost request = new HttpPost(seriesUrl);
|
||||
request.setHeader(HEADER_KEY_API_KEY, apiKey);
|
||||
request.setHeader(HttpHeaders.CONTENT_TYPE, ContentType.APPLICATION_JSON.toString());
|
||||
request.setEntity(new StringEntity(payload, ContentType.APPLICATION_JSON));
|
||||
try (CloseableHttpResponse response = client.execute(request)) {
|
||||
int statusCode = response.getStatusLine().getStatusCode();
|
||||
if (statusCode >= 300) {
|
||||
LOG.warn(String.format("Failed to send to Datadog. Response was %s", response));
|
||||
} else {
|
||||
LOG.debug(String.format("Sent metrics data (size: %d) to %s", payload.length(), seriesUrl));
|
||||
}
|
||||
} catch (IOException e) {
|
||||
LOG.warn("Failed to send to Datadog.", e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() throws IOException {
|
||||
client.close();
|
||||
}
|
||||
|
||||
public enum ApiSite {
|
||||
US("com"), EU("eu");
|
||||
|
||||
private final String domain;
|
||||
|
||||
ApiSite(String domain) {
|
||||
this.domain = domain;
|
||||
}
|
||||
|
||||
public String getDomain() {
|
||||
return domain;
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,93 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hudi.metrics.datadog;
|
||||
|
||||
import org.apache.hudi.common.util.Option;
|
||||
import org.apache.hudi.common.util.StringUtils;
|
||||
import org.apache.hudi.common.util.ValidationUtils;
|
||||
import org.apache.hudi.config.HoodieWriteConfig;
|
||||
import org.apache.hudi.metrics.MetricsReporter;
|
||||
import org.apache.hudi.metrics.datadog.DatadogHttpClient.ApiSite;
|
||||
|
||||
import com.codahale.metrics.MetricFilter;
|
||||
import com.codahale.metrics.MetricRegistry;
|
||||
|
||||
import java.io.Closeable;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
/**
|
||||
* Hudi Datadog metrics reporter.
|
||||
* <p>
|
||||
* Responsible for reading Hoodie metrics configurations and hooking up with {@link org.apache.hudi.metrics.Metrics}.
|
||||
* <p>
|
||||
* Internally delegate reporting tasks to {@link DatadogReporter}.
|
||||
*/
|
||||
public class DatadogMetricsReporter extends MetricsReporter {
|
||||
|
||||
private final DatadogReporter reporter;
|
||||
private final int reportPeriodSeconds;
|
||||
|
||||
public DatadogMetricsReporter(HoodieWriteConfig config, MetricRegistry registry) {
|
||||
reportPeriodSeconds = config.getDatadogReportPeriodSeconds();
|
||||
ApiSite apiSite = config.getDatadogApiSite();
|
||||
String apiKey = config.getDatadogApiKey();
|
||||
ValidationUtils.checkState(!StringUtils.isNullOrEmpty(apiKey),
|
||||
"Datadog cannot be initialized: API key is null or empty.");
|
||||
boolean skipValidation = config.getDatadogApiKeySkipValidation();
|
||||
int timeoutSeconds = config.getDatadogApiTimeoutSeconds();
|
||||
String prefix = config.getDatadogMetricPrefix();
|
||||
ValidationUtils.checkState(!StringUtils.isNullOrEmpty(prefix),
|
||||
"Datadog cannot be initialized: Metric prefix is null or empty.");
|
||||
Option<String> host = Option.ofNullable(config.getDatadogMetricHost());
|
||||
List<String> tagList = config.getDatadogMetricTags();
|
||||
Option<List<String>> tags = tagList.isEmpty() ? Option.empty() : Option.of(tagList);
|
||||
|
||||
reporter = new DatadogReporter(
|
||||
registry,
|
||||
new DatadogHttpClient(apiSite, apiKey, skipValidation, timeoutSeconds),
|
||||
prefix,
|
||||
host,
|
||||
tags,
|
||||
MetricFilter.ALL,
|
||||
TimeUnit.SECONDS,
|
||||
TimeUnit.SECONDS
|
||||
);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void start() {
|
||||
reporter.start(reportPeriodSeconds, TimeUnit.SECONDS);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void report() {
|
||||
reporter.report();
|
||||
}
|
||||
|
||||
@Override
|
||||
public Closeable getReporter() {
|
||||
return reporter;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void stop() {
|
||||
reporter.stop();
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,171 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hudi.metrics.datadog;
|
||||
|
||||
import org.apache.hudi.common.util.Option;
|
||||
import org.apache.hudi.common.util.ValidationUtils;
|
||||
|
||||
import com.codahale.metrics.Clock;
|
||||
import com.codahale.metrics.Counter;
|
||||
import com.codahale.metrics.Gauge;
|
||||
import com.codahale.metrics.Histogram;
|
||||
import com.codahale.metrics.Meter;
|
||||
import com.codahale.metrics.MetricFilter;
|
||||
import com.codahale.metrics.MetricRegistry;
|
||||
import com.codahale.metrics.ScheduledReporter;
|
||||
import com.codahale.metrics.Timer;
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.fasterxml.jackson.databind.node.ArrayNode;
|
||||
import com.fasterxml.jackson.databind.node.ObjectNode;
|
||||
import com.fasterxml.jackson.databind.node.TextNode;
|
||||
import org.apache.log4j.LogManager;
|
||||
import org.apache.log4j.Logger;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.List;
|
||||
import java.util.SortedMap;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
/**
|
||||
* A reporter which publishes metric values to Datadog API.
|
||||
* <p>
|
||||
* Responsible for collecting and composing metrics payload.
|
||||
* <p>
|
||||
* Internally use {@link DatadogHttpClient} to interact with Datadog APIs.
|
||||
*/
|
||||
public class DatadogReporter extends ScheduledReporter {
|
||||
|
||||
private static final Logger LOG = LogManager.getLogger(DatadogReporter.class);
|
||||
|
||||
private final DatadogHttpClient client;
|
||||
private final String prefix;
|
||||
private final Option<String> host;
|
||||
private final Option<List<String>> tags;
|
||||
private final Clock clock;
|
||||
|
||||
protected DatadogReporter(
|
||||
MetricRegistry registry,
|
||||
DatadogHttpClient client,
|
||||
String prefix,
|
||||
Option<String> host,
|
||||
Option<List<String>> tags,
|
||||
MetricFilter filter,
|
||||
TimeUnit rateUnit,
|
||||
TimeUnit durationUnit) {
|
||||
super(registry, "hudi-datadog-reporter", filter, rateUnit, durationUnit);
|
||||
this.client = client;
|
||||
this.prefix = prefix;
|
||||
this.host = host;
|
||||
this.tags = tags;
|
||||
this.clock = Clock.defaultClock();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void report(
|
||||
SortedMap<String, Gauge> gauges,
|
||||
SortedMap<String, Counter> counters,
|
||||
SortedMap<String, Histogram> histograms,
|
||||
SortedMap<String, Meter> meters,
|
||||
SortedMap<String, Timer> timers) {
|
||||
final long now = clock.getTime() / 1000;
|
||||
final PayloadBuilder builder = new PayloadBuilder();
|
||||
|
||||
builder.withMetricType(MetricType.gauge);
|
||||
gauges.forEach((metricName, metric) -> {
|
||||
builder.addGauge(prefix(metricName), now, (long) metric.getValue());
|
||||
});
|
||||
|
||||
host.ifPresent(builder::withHost);
|
||||
tags.ifPresent(builder::withTags);
|
||||
|
||||
client.send(builder.build());
|
||||
}
|
||||
|
||||
protected String prefix(String... components) {
|
||||
return MetricRegistry.name(prefix, components);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void stop() {
|
||||
try {
|
||||
super.stop();
|
||||
} finally {
|
||||
try {
|
||||
client.close();
|
||||
} catch (IOException e) {
|
||||
LOG.warn("Error disconnecting from Datadog.", e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Build payload that contains metrics data.
|
||||
* <p>
|
||||
* Refer to Datadog API reference https://docs.datadoghq.com/api/?lang=bash#post-timeseries-points
|
||||
*/
|
||||
static class PayloadBuilder {
|
||||
|
||||
private static final ObjectMapper MAPPER = new ObjectMapper();
|
||||
|
||||
private final ObjectNode payload;
|
||||
private final ArrayNode series;
|
||||
private MetricType type;
|
||||
|
||||
PayloadBuilder() {
|
||||
payload = MAPPER.createObjectNode();
|
||||
series = payload.putArray("series");
|
||||
}
|
||||
|
||||
PayloadBuilder withMetricType(MetricType type) {
|
||||
this.type = type;
|
||||
return this;
|
||||
}
|
||||
|
||||
PayloadBuilder addGauge(String metric, long timestamp, long gaugeValue) {
|
||||
ValidationUtils.checkState(type == MetricType.gauge);
|
||||
ObjectNode seriesItem = MAPPER.createObjectNode().put("metric", metric);
|
||||
seriesItem.putArray("points").addArray().add(timestamp).add(gaugeValue);
|
||||
series.add(seriesItem);
|
||||
return this;
|
||||
}
|
||||
|
||||
PayloadBuilder withHost(String host) {
|
||||
series.forEach(seriesItem -> ((ObjectNode) seriesItem).put("host", host));
|
||||
return this;
|
||||
}
|
||||
|
||||
PayloadBuilder withTags(List<String> tags) {
|
||||
series.forEach(seriesItem -> {
|
||||
((ObjectNode) seriesItem)
|
||||
.putArray("tags")
|
||||
.addAll(tags.stream().map(TextNode::new).collect(Collectors.toList()));
|
||||
});
|
||||
return this;
|
||||
}
|
||||
|
||||
String build() {
|
||||
return payload.toString();
|
||||
}
|
||||
}
|
||||
|
||||
enum MetricType {
|
||||
gauge;
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,152 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hudi.metrics.datadog;
|
||||
|
||||
import org.apache.hudi.metrics.datadog.DatadogHttpClient.ApiSite;
|
||||
|
||||
import org.apache.http.StatusLine;
|
||||
import org.apache.http.client.methods.CloseableHttpResponse;
|
||||
import org.apache.http.impl.client.CloseableHttpClient;
|
||||
import org.apache.log4j.AppenderSkeleton;
|
||||
import org.apache.log4j.Level;
|
||||
import org.apache.log4j.Logger;
|
||||
import org.apache.log4j.spi.LoggingEvent;
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.junit.jupiter.api.extension.ExtendWith;
|
||||
import org.junit.jupiter.params.ParameterizedTest;
|
||||
import org.junit.jupiter.params.provider.Arguments;
|
||||
import org.junit.jupiter.params.provider.MethodSource;
|
||||
import org.mockito.ArgumentCaptor;
|
||||
import org.mockito.Captor;
|
||||
import org.mockito.Mock;
|
||||
import org.mockito.junit.jupiter.MockitoExtension;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
|
||||
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||
import static org.junit.jupiter.api.Assertions.assertThrows;
|
||||
import static org.junit.jupiter.api.Assertions.assertTrue;
|
||||
import static org.junit.jupiter.api.Assertions.fail;
|
||||
import static org.mockito.ArgumentMatchers.any;
|
||||
import static org.mockito.Mockito.verify;
|
||||
import static org.mockito.Mockito.when;
|
||||
|
||||
@ExtendWith(MockitoExtension.class)
|
||||
public class TestDatadogHttpClient {
|
||||
|
||||
@Mock
|
||||
AppenderSkeleton appender;
|
||||
|
||||
@Captor
|
||||
ArgumentCaptor<LoggingEvent> logCaptor;
|
||||
|
||||
@Mock
|
||||
CloseableHttpClient httpClient;
|
||||
|
||||
@Mock
|
||||
CloseableHttpResponse httpResponse;
|
||||
|
||||
@Mock
|
||||
StatusLine statusLine;
|
||||
|
||||
private void mockResponse(int statusCode) {
|
||||
when(statusLine.getStatusCode()).thenReturn(statusCode);
|
||||
when(httpResponse.getStatusLine()).thenReturn(statusLine);
|
||||
try {
|
||||
when(httpClient.execute(any())).thenReturn(httpResponse);
|
||||
} catch (IOException e) {
|
||||
fail(e.getMessage(), e);
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void validateApiKeyShouldThrowExceptionWhenRequestFailed() throws IOException {
|
||||
when(httpClient.execute(any())).thenThrow(IOException.class);
|
||||
|
||||
Throwable t = assertThrows(IllegalStateException.class, () -> {
|
||||
new DatadogHttpClient(ApiSite.EU, "foo", false, httpClient);
|
||||
});
|
||||
assertEquals("Failed to connect to Datadog to validate API key.", t.getMessage());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void validateApiKeyShouldThrowExceptionWhenResponseNotSuccessful() {
|
||||
mockResponse(500);
|
||||
|
||||
Throwable t = assertThrows(IllegalStateException.class, () -> {
|
||||
new DatadogHttpClient(ApiSite.EU, "foo", false, httpClient);
|
||||
});
|
||||
assertEquals("API key is invalid.", t.getMessage());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void sendPayloadShouldLogWhenRequestFailed() throws IOException {
|
||||
Logger.getRootLogger().addAppender(appender);
|
||||
when(httpClient.execute(any())).thenThrow(IOException.class);
|
||||
|
||||
DatadogHttpClient ddClient = new DatadogHttpClient(ApiSite.US, "foo", true, httpClient);
|
||||
ddClient.send("{}");
|
||||
|
||||
verify(appender).doAppend(logCaptor.capture());
|
||||
assertEquals("Failed to send to Datadog.", logCaptor.getValue().getRenderedMessage());
|
||||
assertEquals(Level.WARN, logCaptor.getValue().getLevel());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void sendPayloadShouldLogUnsuccessfulSending() {
|
||||
Logger.getRootLogger().addAppender(appender);
|
||||
mockResponse(401);
|
||||
when(httpResponse.toString()).thenReturn("unauthorized");
|
||||
|
||||
DatadogHttpClient ddClient = new DatadogHttpClient(ApiSite.US, "foo", true, httpClient);
|
||||
ddClient.send("{}");
|
||||
|
||||
verify(appender).doAppend(logCaptor.capture());
|
||||
assertEquals("Failed to send to Datadog. Response was unauthorized", logCaptor.getValue().getRenderedMessage());
|
||||
assertEquals(Level.WARN, logCaptor.getValue().getLevel());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void sendPayloadShouldLogSuccessfulSending() {
|
||||
Logger.getRootLogger().addAppender(appender);
|
||||
mockResponse(202);
|
||||
|
||||
DatadogHttpClient ddClient = new DatadogHttpClient(ApiSite.US, "foo", true, httpClient);
|
||||
ddClient.send("{}");
|
||||
|
||||
verify(appender).doAppend(logCaptor.capture());
|
||||
assertTrue(logCaptor.getValue().getRenderedMessage().startsWith("Sent metrics data"));
|
||||
assertEquals(Level.DEBUG, logCaptor.getValue().getLevel());
|
||||
}
|
||||
|
||||
public static List<Arguments> getApiSiteAndDomain() {
|
||||
return Arrays.asList(
|
||||
Arguments.of("US", "com"),
|
||||
Arguments.of("EU", "eu")
|
||||
);
|
||||
}
|
||||
|
||||
@ParameterizedTest
|
||||
@MethodSource("getApiSiteAndDomain")
|
||||
public void testApiSiteReturnCorrectDomain(String apiSite, String domain) {
|
||||
assertEquals(domain, ApiSite.valueOf(apiSite).getDomain());
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,77 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hudi.metrics.datadog;
|
||||
|
||||
import org.apache.hudi.config.HoodieWriteConfig;
|
||||
import org.apache.hudi.metrics.datadog.DatadogHttpClient.ApiSite;
|
||||
|
||||
import com.codahale.metrics.MetricRegistry;
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.junit.jupiter.api.extension.ExtendWith;
|
||||
import org.mockito.Mock;
|
||||
import org.mockito.junit.jupiter.MockitoExtension;
|
||||
|
||||
import java.util.Arrays;
|
||||
|
||||
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
|
||||
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||
import static org.junit.jupiter.api.Assertions.assertThrows;
|
||||
import static org.mockito.Mockito.when;
|
||||
|
||||
@ExtendWith(MockitoExtension.class)
|
||||
public class TestDatadogMetricsReporter {
|
||||
|
||||
@Mock
|
||||
HoodieWriteConfig config;
|
||||
|
||||
@Mock
|
||||
MetricRegistry registry;
|
||||
|
||||
@Test
|
||||
public void instantiationShouldFailWhenNoApiKey() {
|
||||
when(config.getDatadogApiKey()).thenReturn("");
|
||||
Throwable t = assertThrows(IllegalStateException.class, () -> {
|
||||
new DatadogMetricsReporter(config, registry);
|
||||
});
|
||||
assertEquals("Datadog cannot be initialized: API key is null or empty.", t.getMessage());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void instantiationShouldFailWhenNoMetricPrefix() {
|
||||
when(config.getDatadogApiKey()).thenReturn("foo");
|
||||
when(config.getDatadogMetricPrefix()).thenReturn("");
|
||||
Throwable t = assertThrows(IllegalStateException.class, () -> {
|
||||
new DatadogMetricsReporter(config, registry);
|
||||
});
|
||||
assertEquals("Datadog cannot be initialized: Metric prefix is null or empty.", t.getMessage());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void instantiationShouldSucceed() {
|
||||
when(config.getDatadogApiSite()).thenReturn(ApiSite.EU);
|
||||
when(config.getDatadogApiKey()).thenReturn("foo");
|
||||
when(config.getDatadogApiKeySkipValidation()).thenReturn(true);
|
||||
when(config.getDatadogMetricPrefix()).thenReturn("bar");
|
||||
when(config.getDatadogMetricHost()).thenReturn("foo");
|
||||
when(config.getDatadogMetricTags()).thenReturn(Arrays.asList("baz", "foo"));
|
||||
assertDoesNotThrow(() -> {
|
||||
new DatadogMetricsReporter(config, registry);
|
||||
});
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,105 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hudi.metrics.datadog;
|
||||
|
||||
import org.apache.hudi.common.util.Option;
|
||||
import org.apache.hudi.metrics.datadog.DatadogReporter.MetricType;
|
||||
import org.apache.hudi.metrics.datadog.DatadogReporter.PayloadBuilder;
|
||||
|
||||
import com.codahale.metrics.MetricFilter;
|
||||
import com.codahale.metrics.MetricRegistry;
|
||||
import org.apache.log4j.AppenderSkeleton;
|
||||
import org.apache.log4j.Level;
|
||||
import org.apache.log4j.Logger;
|
||||
import org.apache.log4j.spi.LoggingEvent;
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.junit.jupiter.api.extension.ExtendWith;
|
||||
import org.mockito.ArgumentCaptor;
|
||||
import org.mockito.Captor;
|
||||
import org.mockito.Mock;
|
||||
import org.mockito.junit.jupiter.MockitoExtension;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Arrays;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||
import static org.mockito.Mockito.doThrow;
|
||||
import static org.mockito.Mockito.verify;
|
||||
|
||||
@ExtendWith(MockitoExtension.class)
|
||||
public class TestDatadogReporter {
|
||||
|
||||
@Mock
|
||||
AppenderSkeleton appender;
|
||||
|
||||
@Captor
|
||||
ArgumentCaptor<LoggingEvent> logCaptor;
|
||||
|
||||
@Mock
|
||||
MetricRegistry registry;
|
||||
|
||||
@Mock
|
||||
DatadogHttpClient client;
|
||||
|
||||
@Test
|
||||
public void stopShouldCloseEnclosedClient() throws IOException {
|
||||
new DatadogReporter(registry, client, "foo", Option.empty(), Option.empty(),
|
||||
MetricFilter.ALL, TimeUnit.SECONDS, TimeUnit.SECONDS).stop();
|
||||
|
||||
verify(client).close();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void stopShouldLogWhenEnclosedClientFailToClose() throws IOException {
|
||||
Logger.getRootLogger().addAppender(appender);
|
||||
doThrow(IOException.class).when(client).close();
|
||||
|
||||
new DatadogReporter(registry, client, "foo", Option.empty(), Option.empty(),
|
||||
MetricFilter.ALL, TimeUnit.SECONDS, TimeUnit.SECONDS).stop();
|
||||
|
||||
verify(appender).doAppend(logCaptor.capture());
|
||||
assertEquals("Error disconnecting from Datadog.", logCaptor.getValue().getRenderedMessage());
|
||||
assertEquals(Level.WARN, logCaptor.getValue().getLevel());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void prefixShouldPrepend() {
|
||||
DatadogReporter reporter = new DatadogReporter(
|
||||
registry, client, "foo", Option.empty(), Option.empty(),
|
||||
MetricFilter.ALL, TimeUnit.SECONDS, TimeUnit.SECONDS);
|
||||
assertEquals("foo.bar", reporter.prefix("bar"));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void payloadBuilderShouldBuildExpectedPayloadString() {
|
||||
String payload = new PayloadBuilder()
|
||||
.withMetricType(MetricType.gauge)
|
||||
.addGauge("foo", 0, 0)
|
||||
.addGauge("bar", 1, 999)
|
||||
.withHost("xhost")
|
||||
.withTags(Arrays.asList("tag1", "tag2"))
|
||||
.build();
|
||||
assertEquals(
|
||||
"{\"series\":["
|
||||
+ "{\"metric\":\"foo\",\"points\":[[0,0]],\"host\":\"xhost\",\"tags\":[\"tag1\",\"tag2\"]},"
|
||||
+ "{\"metric\":\"bar\",\"points\":[[1,999]],\"host\":\"xhost\",\"tags\":[\"tag1\",\"tag2\"]}]}",
|
||||
payload);
|
||||
}
|
||||
}
|
||||
@@ -291,4 +291,4 @@
|
||||
</properties>
|
||||
</profile>
|
||||
</profiles>
|
||||
</project>
|
||||
</project>
|
||||
|
||||
Reference in New Issue
Block a user