From f34de3fb2738c8c36c937eba8df2a6848fafa886 Mon Sep 17 00:00:00 2001 From: Raymond Xu <2701446+xushiyan@users.noreply.github.com> Date: Fri, 22 May 2020 09:14:21 -0700 Subject: [PATCH] [HUDI-836] Implement datadog metrics reporter (#1572) - Adds support for emitting metrics to datadog - Tests, configs.. --- .../hudi/config/HoodieMetricsConfig.java | 3 + .../config/HoodieMetricsDatadogConfig.java | 127 +++++++++++++ .../apache/hudi/config/HoodieWriteConfig.java | 44 +++++ .../java/org/apache/hudi/metrics/Metrics.java | 2 +- .../hudi/metrics/MetricsReporterFactory.java | 4 + .../hudi/metrics/MetricsReporterType.java | 2 +- .../metrics/datadog/DatadogHttpClient.java | 127 +++++++++++++ .../datadog/DatadogMetricsReporter.java | 93 ++++++++++ .../hudi/metrics/datadog/DatadogReporter.java | 171 ++++++++++++++++++ .../datadog/TestDatadogHttpClient.java | 152 ++++++++++++++++ .../datadog/TestDatadogMetricsReporter.java | 77 ++++++++ .../metrics/datadog/TestDatadogReporter.java | 105 +++++++++++ packaging/hudi-spark-bundle/pom.xml | 2 +- 13 files changed, 906 insertions(+), 3 deletions(-) create mode 100644 hudi-client/src/main/java/org/apache/hudi/config/HoodieMetricsDatadogConfig.java create mode 100644 hudi-client/src/main/java/org/apache/hudi/metrics/datadog/DatadogHttpClient.java create mode 100644 hudi-client/src/main/java/org/apache/hudi/metrics/datadog/DatadogMetricsReporter.java create mode 100644 hudi-client/src/main/java/org/apache/hudi/metrics/datadog/DatadogReporter.java create mode 100644 hudi-client/src/test/java/org/apache/hudi/metrics/datadog/TestDatadogHttpClient.java create mode 100644 hudi-client/src/test/java/org/apache/hudi/metrics/datadog/TestDatadogMetricsReporter.java create mode 100644 hudi-client/src/test/java/org/apache/hudi/metrics/datadog/TestDatadogReporter.java diff --git a/hudi-client/src/main/java/org/apache/hudi/config/HoodieMetricsConfig.java b/hudi-client/src/main/java/org/apache/hudi/config/HoodieMetricsConfig.java index 4792d6f07..42555ce4c 100644 --- a/hudi-client/src/main/java/org/apache/hudi/config/HoodieMetricsConfig.java +++ b/hudi-client/src/main/java/org/apache/hudi/config/HoodieMetricsConfig.java @@ -130,6 +130,9 @@ public class HoodieMetricsConfig extends DefaultHoodieConfig { DEFAULT_JMX_HOST); setDefaultOnCondition(props, !props.containsKey(JMX_PORT), JMX_PORT, String.valueOf(DEFAULT_JMX_PORT)); + MetricsReporterType reporterType = MetricsReporterType.valueOf(props.getProperty(METRICS_REPORTER_TYPE)); + setDefaultOnCondition(props, reporterType == MetricsReporterType.DATADOG, + HoodieMetricsDatadogConfig.newBuilder().fromProperties(props).build()); return config; } } diff --git a/hudi-client/src/main/java/org/apache/hudi/config/HoodieMetricsDatadogConfig.java b/hudi-client/src/main/java/org/apache/hudi/config/HoodieMetricsDatadogConfig.java new file mode 100644 index 000000000..e6dcc282c --- /dev/null +++ b/hudi-client/src/main/java/org/apache/hudi/config/HoodieMetricsDatadogConfig.java @@ -0,0 +1,127 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.config; + +import org.apache.hudi.common.config.DefaultHoodieConfig; + +import javax.annotation.concurrent.Immutable; + +import java.util.Properties; + +import static org.apache.hudi.config.HoodieMetricsConfig.METRIC_PREFIX; + +/** + * Configs for Datadog reporter type. + *
+ * {@link org.apache.hudi.metrics.MetricsReporterType#DATADOG}
+ */
+@Immutable
+public class HoodieMetricsDatadogConfig extends DefaultHoodieConfig {
+
+ public static final String DATADOG_PREFIX = METRIC_PREFIX + ".datadog";
+ public static final String DATADOG_REPORT_PERIOD_SECONDS = DATADOG_PREFIX + ".report.period.seconds";
+ public static final int DEFAULT_DATADOG_REPORT_PERIOD_SECONDS = 30;
+ public static final String DATADOG_API_SITE = DATADOG_PREFIX + ".api.site";
+ public static final String DATADOG_API_KEY = DATADOG_PREFIX + ".api.key";
+ public static final String DATADOG_API_KEY_SKIP_VALIDATION = DATADOG_PREFIX + ".api.key.skip.validation";
+ public static final boolean DEFAULT_DATADOG_API_KEY_SKIP_VALIDATION = false;
+ public static final String DATADOG_API_KEY_SUPPLIER = DATADOG_PREFIX + ".api.key.supplier";
+ public static final String DATADOG_API_TIMEOUT_SECONDS = DATADOG_PREFIX + ".api.timeout.seconds";
+ public static final int DEFAULT_DATADOG_API_TIMEOUT_SECONDS = 3;
+ public static final String DATADOG_METRIC_PREFIX = DATADOG_PREFIX + ".metric.prefix";
+ public static final String DATADOG_METRIC_HOST = DATADOG_PREFIX + ".metric.host";
+ public static final String DATADOG_METRIC_TAGS = DATADOG_PREFIX + ".metric.tags";
+
+ private HoodieMetricsDatadogConfig(Properties props) {
+ super(props);
+ }
+
+ public static HoodieMetricsDatadogConfig.Builder newBuilder() {
+ return new Builder();
+ }
+
+ public static class Builder {
+
+ private final Properties props = new Properties();
+
+ public Builder fromProperties(Properties props) {
+ this.props.putAll(props);
+ return this;
+ }
+
+ public Builder withDatadogReportPeriodSeconds(int period) {
+ props.setProperty(DATADOG_REPORT_PERIOD_SECONDS, String.valueOf(period));
+ return this;
+ }
+
+ public Builder withDatadogApiSite(String apiSite) {
+ props.setProperty(DATADOG_API_SITE, apiSite);
+ return this;
+ }
+
+ public Builder withDatadogApiKey(String apiKey) {
+ props.setProperty(DATADOG_API_KEY, apiKey);
+ return this;
+ }
+
+ public Builder withDatadogApiKeySkipValidation(boolean skip) {
+ props.setProperty(DATADOG_API_KEY_SKIP_VALIDATION, String.valueOf(skip));
+ return this;
+ }
+
+ public Builder withDatadogApiKeySupplier(String apiKeySupplier) {
+ props.setProperty(DATADOG_API_KEY_SUPPLIER, apiKeySupplier);
+ return this;
+ }
+
+ public Builder withDatadogApiTimeoutSeconds(int timeout) {
+ props.setProperty(DATADOG_API_TIMEOUT_SECONDS, String.valueOf(timeout));
+ return this;
+ }
+
+ public Builder withDatadogPrefix(String prefix) {
+ props.setProperty(DATADOG_METRIC_PREFIX, prefix);
+ return this;
+ }
+
+ public Builder withDatadogHost(String host) {
+ props.setProperty(DATADOG_METRIC_HOST, host);
+ return this;
+ }
+
+ public Builder withDatadogTags(String tags) {
+ props.setProperty(DATADOG_METRIC_TAGS, tags);
+ return this;
+ }
+
+ public HoodieMetricsDatadogConfig build() {
+ HoodieMetricsDatadogConfig config = new HoodieMetricsDatadogConfig(props);
+ setDefaultOnCondition(props, !props.containsKey(DATADOG_REPORT_PERIOD_SECONDS),
+ DATADOG_REPORT_PERIOD_SECONDS,
+ String.valueOf(DEFAULT_DATADOG_REPORT_PERIOD_SECONDS));
+ setDefaultOnCondition(props, !props.containsKey(DATADOG_API_KEY_SKIP_VALIDATION),
+ DATADOG_API_KEY_SKIP_VALIDATION,
+ String.valueOf(DEFAULT_DATADOG_API_KEY_SKIP_VALIDATION));
+ setDefaultOnCondition(props, !props.containsKey(DATADOG_API_TIMEOUT_SECONDS),
+ DATADOG_API_TIMEOUT_SECONDS,
+ String.valueOf(DEFAULT_DATADOG_API_TIMEOUT_SECONDS));
+ return config;
+ }
+ }
+}
diff --git a/hudi-client/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java b/hudi-client/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
index 3f0f61942..d6527fa3a 100644
--- a/hudi-client/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
+++ b/hudi-client/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
@@ -28,6 +28,7 @@ import org.apache.hudi.common.table.view.FileSystemViewStorageConfig;
import org.apache.hudi.common.util.ReflectionUtils;
import org.apache.hudi.index.HoodieIndex;
import org.apache.hudi.metrics.MetricsReporterType;
+import org.apache.hudi.metrics.datadog.DatadogHttpClient.ApiSite;
import org.apache.hudi.table.action.compact.strategy.CompactionStrategy;
import org.apache.parquet.hadoop.metadata.CompressionCodecName;
@@ -38,9 +39,13 @@ import java.io.File;
import java.io.FileReader;
import java.io.IOException;
import java.io.InputStream;
+import java.util.Arrays;
+import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Properties;
+import java.util.function.Supplier;
+import java.util.stream.Collectors;
/**
* Class storing configs for the {@link HoodieWriteClient}.
@@ -541,6 +546,45 @@ public class HoodieWriteConfig extends DefaultHoodieConfig {
return props.getProperty(HoodieMetricsConfig.JMX_PORT);
}
+ public int getDatadogReportPeriodSeconds() {
+ return Integer.parseInt(props.getProperty(HoodieMetricsDatadogConfig.DATADOG_REPORT_PERIOD_SECONDS));
+ }
+
+ public ApiSite getDatadogApiSite() {
+ return ApiSite.valueOf(props.getProperty(HoodieMetricsDatadogConfig.DATADOG_API_SITE));
+ }
+
+ public String getDatadogApiKey() {
+ if (props.containsKey(HoodieMetricsDatadogConfig.DATADOG_API_KEY)) {
+ return props.getProperty(HoodieMetricsDatadogConfig.DATADOG_API_KEY);
+ } else {
+ Supplier
+ * Responsible for API endpoint routing, validating API key, and sending requests with metrics payload.
+ */
+public class DatadogHttpClient implements Closeable {
+
+ private static final Logger LOG = LogManager.getLogger(DatadogHttpClient.class);
+
+ private static final String SERIES_URL_FORMAT = "https://app.datadoghq.%s/api/v1/series";
+ private static final String VALIDATE_URL_FORMAT = "https://app.datadoghq.%s/api/v1/validate";
+ private static final String HEADER_KEY_API_KEY = "DD-API-KEY";
+
+ private final String apiKey;
+ private final String seriesUrl;
+ private final String validateUrl;
+ private final CloseableHttpClient client;
+
+ public DatadogHttpClient(ApiSite apiSite, String apiKey, boolean skipValidation, CloseableHttpClient client) {
+ this.apiKey = apiKey;
+ this.seriesUrl = String.format(SERIES_URL_FORMAT, apiSite.getDomain());
+ this.validateUrl = String.format(VALIDATE_URL_FORMAT, apiSite.getDomain());
+ this.client = client;
+ if (!skipValidation) {
+ validateApiKey();
+ }
+ }
+
+ public DatadogHttpClient(ApiSite apiSite, String apiKey, boolean skipValidation, int timeoutSeconds) {
+ this(apiSite, apiKey, skipValidation, HttpClientBuilder.create()
+ .setDefaultRequestConfig(RequestConfig.custom()
+ .setConnectTimeout(timeoutSeconds * 1000)
+ .setConnectionRequestTimeout(timeoutSeconds * 1000)
+ .setSocketTimeout(timeoutSeconds * 1000).build())
+ .build());
+ }
+
+ private void validateApiKey() {
+ ValidationUtils.checkArgument(!StringUtils.isNullOrEmpty(apiKey),
+ "API key is null or empty.");
+
+ HttpUriRequest request = new HttpGet(validateUrl);
+ request.setHeader(HEADER_KEY_API_KEY, apiKey);
+ try (CloseableHttpResponse response = client.execute(request)) {
+ int statusCode = response.getStatusLine().getStatusCode();
+ ValidationUtils.checkState(statusCode == HttpStatus.SC_OK, "API key is invalid.");
+ } catch (IOException e) {
+ throw new IllegalStateException("Failed to connect to Datadog to validate API key.", e);
+ }
+ }
+
+ public void send(String payload) {
+ HttpPost request = new HttpPost(seriesUrl);
+ request.setHeader(HEADER_KEY_API_KEY, apiKey);
+ request.setHeader(HttpHeaders.CONTENT_TYPE, ContentType.APPLICATION_JSON.toString());
+ request.setEntity(new StringEntity(payload, ContentType.APPLICATION_JSON));
+ try (CloseableHttpResponse response = client.execute(request)) {
+ int statusCode = response.getStatusLine().getStatusCode();
+ if (statusCode >= 300) {
+ LOG.warn(String.format("Failed to send to Datadog. Response was %s", response));
+ } else {
+ LOG.debug(String.format("Sent metrics data (size: %d) to %s", payload.length(), seriesUrl));
+ }
+ } catch (IOException e) {
+ LOG.warn("Failed to send to Datadog.", e);
+ }
+ }
+
+ @Override
+ public void close() throws IOException {
+ client.close();
+ }
+
+ public enum ApiSite {
+ US("com"), EU("eu");
+
+ private final String domain;
+
+ ApiSite(String domain) {
+ this.domain = domain;
+ }
+
+ public String getDomain() {
+ return domain;
+ }
+ }
+}
diff --git a/hudi-client/src/main/java/org/apache/hudi/metrics/datadog/DatadogMetricsReporter.java b/hudi-client/src/main/java/org/apache/hudi/metrics/datadog/DatadogMetricsReporter.java
new file mode 100644
index 000000000..0830ef4c5
--- /dev/null
+++ b/hudi-client/src/main/java/org/apache/hudi/metrics/datadog/DatadogMetricsReporter.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.metrics.datadog;
+
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.StringUtils;
+import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.metrics.MetricsReporter;
+import org.apache.hudi.metrics.datadog.DatadogHttpClient.ApiSite;
+
+import com.codahale.metrics.MetricFilter;
+import com.codahale.metrics.MetricRegistry;
+
+import java.io.Closeable;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Hudi Datadog metrics reporter.
+ *
+ * Responsible for reading Hoodie metrics configurations and hooking up with {@link org.apache.hudi.metrics.Metrics}.
+ *
+ * Internally delegate reporting tasks to {@link DatadogReporter}.
+ */
+public class DatadogMetricsReporter extends MetricsReporter {
+
+ private final DatadogReporter reporter;
+ private final int reportPeriodSeconds;
+
+ public DatadogMetricsReporter(HoodieWriteConfig config, MetricRegistry registry) {
+ reportPeriodSeconds = config.getDatadogReportPeriodSeconds();
+ ApiSite apiSite = config.getDatadogApiSite();
+ String apiKey = config.getDatadogApiKey();
+ ValidationUtils.checkState(!StringUtils.isNullOrEmpty(apiKey),
+ "Datadog cannot be initialized: API key is null or empty.");
+ boolean skipValidation = config.getDatadogApiKeySkipValidation();
+ int timeoutSeconds = config.getDatadogApiTimeoutSeconds();
+ String prefix = config.getDatadogMetricPrefix();
+ ValidationUtils.checkState(!StringUtils.isNullOrEmpty(prefix),
+ "Datadog cannot be initialized: Metric prefix is null or empty.");
+ Option
+ * Responsible for collecting and composing metrics payload.
+ *
+ * Internally use {@link DatadogHttpClient} to interact with Datadog APIs.
+ */
+public class DatadogReporter extends ScheduledReporter {
+
+ private static final Logger LOG = LogManager.getLogger(DatadogReporter.class);
+
+ private final DatadogHttpClient client;
+ private final String prefix;
+ private final Option
+ * Refer to Datadog API reference https://docs.datadoghq.com/api/?lang=bash#post-timeseries-points
+ */
+ static class PayloadBuilder {
+
+ private static final ObjectMapper MAPPER = new ObjectMapper();
+
+ private final ObjectNode payload;
+ private final ArrayNode series;
+ private MetricType type;
+
+ PayloadBuilder() {
+ payload = MAPPER.createObjectNode();
+ series = payload.putArray("series");
+ }
+
+ PayloadBuilder withMetricType(MetricType type) {
+ this.type = type;
+ return this;
+ }
+
+ PayloadBuilder addGauge(String metric, long timestamp, long gaugeValue) {
+ ValidationUtils.checkState(type == MetricType.gauge);
+ ObjectNode seriesItem = MAPPER.createObjectNode().put("metric", metric);
+ seriesItem.putArray("points").addArray().add(timestamp).add(gaugeValue);
+ series.add(seriesItem);
+ return this;
+ }
+
+ PayloadBuilder withHost(String host) {
+ series.forEach(seriesItem -> ((ObjectNode) seriesItem).put("host", host));
+ return this;
+ }
+
+ PayloadBuilder withTags(List> tags = tagList.isEmpty() ? Option.empty() : Option.of(tagList);
+
+ reporter = new DatadogReporter(
+ registry,
+ new DatadogHttpClient(apiSite, apiKey, skipValidation, timeoutSeconds),
+ prefix,
+ host,
+ tags,
+ MetricFilter.ALL,
+ TimeUnit.SECONDS,
+ TimeUnit.SECONDS
+ );
+ }
+
+ @Override
+ public void start() {
+ reporter.start(reportPeriodSeconds, TimeUnit.SECONDS);
+ }
+
+ @Override
+ public void report() {
+ reporter.report();
+ }
+
+ @Override
+ public Closeable getReporter() {
+ return reporter;
+ }
+
+ @Override
+ public void stop() {
+ reporter.stop();
+ }
+}
diff --git a/hudi-client/src/main/java/org/apache/hudi/metrics/datadog/DatadogReporter.java b/hudi-client/src/main/java/org/apache/hudi/metrics/datadog/DatadogReporter.java
new file mode 100644
index 000000000..a388aecda
--- /dev/null
+++ b/hudi-client/src/main/java/org/apache/hudi/metrics/datadog/DatadogReporter.java
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.metrics.datadog;
+
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.ValidationUtils;
+
+import com.codahale.metrics.Clock;
+import com.codahale.metrics.Counter;
+import com.codahale.metrics.Gauge;
+import com.codahale.metrics.Histogram;
+import com.codahale.metrics.Meter;
+import com.codahale.metrics.MetricFilter;
+import com.codahale.metrics.MetricRegistry;
+import com.codahale.metrics.ScheduledReporter;
+import com.codahale.metrics.Timer;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ArrayNode;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import com.fasterxml.jackson.databind.node.TextNode;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.SortedMap;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+/**
+ * A reporter which publishes metric values to Datadog API.
+ *
> tags;
+ private final Clock clock;
+
+ protected DatadogReporter(
+ MetricRegistry registry,
+ DatadogHttpClient client,
+ String prefix,
+ Option
> tags,
+ MetricFilter filter,
+ TimeUnit rateUnit,
+ TimeUnit durationUnit) {
+ super(registry, "hudi-datadog-reporter", filter, rateUnit, durationUnit);
+ this.client = client;
+ this.prefix = prefix;
+ this.host = host;
+ this.tags = tags;
+ this.clock = Clock.defaultClock();
+ }
+
+ @Override
+ public void report(
+ SortedMap