1
0

[HUDI-210] Hudi Supports Prometheus Pushgateway (#1931)

Co-authored-by: leesf <leesf@apache.org>
This commit is contained in:
liujinhui
2020-08-09 15:29:54 +08:00
committed by GitHub
parent 3c949d2ff5
commit 6b349b7711
12 changed files with 531 additions and 2 deletions

View File

@@ -121,6 +121,22 @@
<groupId>io.dropwizard.metrics</groupId>
<artifactId>metrics-jmx</artifactId>
</dependency>
<dependency>
<groupId>io.prometheus</groupId>
<artifactId>simpleclient</artifactId>
</dependency>
<dependency>
<groupId>io.prometheus</groupId>
<artifactId>simpleclient_httpserver</artifactId>
</dependency>
<dependency>
<groupId>io.prometheus</groupId>
<artifactId>simpleclient_dropwizard</artifactId>
</dependency>
<dependency>
<groupId>io.prometheus</groupId>
<artifactId>simpleclient_pushgateway</artifactId>
</dependency>
<dependency>
<groupId>com.beust</groupId>

View File

@@ -144,6 +144,10 @@ public class HoodieMetricsConfig extends DefaultHoodieConfig {
HoodieMetricsDatadogConfig.newBuilder().fromProperties(props).build());
setDefaultOnCondition(props, !props.containsKey(METRICS_REPORTER_CLASS),
METRICS_REPORTER_CLASS, DEFAULT_METRICS_REPORTER_CLASS);
setDefaultOnCondition(props, reporterType == MetricsReporterType.PROMETHEUS_PUSHGATEWAY,
HoodieMetricsPrometheusConfig.newBuilder().fromProperties(props).build());
setDefaultOnCondition(props, reporterType == MetricsReporterType.PROMETHEUS,
HoodieMetricsPrometheusConfig.newBuilder().fromProperties(props).build());
return config;
}

View File

@@ -0,0 +1,102 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.config;
import org.apache.hudi.common.config.DefaultHoodieConfig;
import java.util.Properties;
import static org.apache.hudi.config.HoodieMetricsConfig.METRIC_PREFIX;
public class HoodieMetricsPrometheusConfig extends DefaultHoodieConfig {
// Prometheus PushGateWay
public static final String PUSHGATEWAY_PREFIX = METRIC_PREFIX + ".pushgateway";
public static final String PUSHGATEWAY_HOST = PUSHGATEWAY_PREFIX + ".host";
public static final String DEFAULT_PUSHGATEWAY_HOST = "localhost";
public static final String PUSHGATEWAY_PORT = PUSHGATEWAY_PREFIX + ".port";
public static final int DEFAULT_PUSHGATEWAY_PORT = 9091;
public static final String PUSHGATEWAY_REPORT_PERIOD_SECONDS = PUSHGATEWAY_PREFIX + ".report.period.seconds";
public static final int DEFAULT_PUSHGATEWAY_REPORT_PERIOD_SECONDS = 30;
public static final String PUSHGATEWAY_DELETE_ON_SHUTDOWN = PUSHGATEWAY_PREFIX + ".delete.on.shutdown";
public static final boolean DEFAULT_PUSHGATEWAY_DELETE_ON_SHUTDOWN = true;
public static final String PUSHGATEWAY_JOB_NAME = PUSHGATEWAY_PREFIX + ".job.name";
public static final String DEFAULT_PUSHGATEWAY_JOB_NAME = "";
public static final String PUSHGATEWAY_RANDOM_JOB_NAME_SUFFIX = PUSHGATEWAY_PREFIX + ".random.job.name.suffix";
public static final boolean DEFAULT_PUSHGATEWAY_RANDOM_JOB_NAME_SUFFIX = true;
// Prometheus HttpServer
public static final String PROMETHEUS_PREFIX = METRIC_PREFIX + ".prometheus";
public static final String PROMETHEUS_PORT = PROMETHEUS_PREFIX + ".port";
public static final int DEFAULT_PROMETHEUS_PORT = 9090;
public HoodieMetricsPrometheusConfig(Properties props) {
super(props);
}
public static HoodieMetricsPrometheusConfig.Builder newBuilder() {
return new HoodieMetricsPrometheusConfig.Builder();
}
@Override
public Properties getProps() {
return super.getProps();
}
public static class Builder {
private Properties props = new Properties();
public Builder fromProperties(Properties props) {
this.props.putAll(props);
return this;
}
public HoodieMetricsPrometheusConfig build() {
HoodieMetricsPrometheusConfig config = new HoodieMetricsPrometheusConfig(props);
setDefaultOnCondition(props, !props.containsKey(PROMETHEUS_PORT), PROMETHEUS_PORT,
String.valueOf(DEFAULT_PROMETHEUS_PORT));
setDefaultOnCondition(props, !props.containsKey(PUSHGATEWAY_HOST),
PUSHGATEWAY_HOST,
DEFAULT_PUSHGATEWAY_HOST);
setDefaultOnCondition(props, !props.containsKey(PUSHGATEWAY_PORT),
PUSHGATEWAY_PORT,
String.valueOf(DEFAULT_PUSHGATEWAY_PORT));
setDefaultOnCondition(props, !props.containsKey(PUSHGATEWAY_REPORT_PERIOD_SECONDS),
PUSHGATEWAY_REPORT_PERIOD_SECONDS,
String.valueOf(DEFAULT_PUSHGATEWAY_REPORT_PERIOD_SECONDS));
setDefaultOnCondition(props, !props.containsKey(PUSHGATEWAY_DELETE_ON_SHUTDOWN),
PUSHGATEWAY_DELETE_ON_SHUTDOWN,
String.valueOf(DEFAULT_PUSHGATEWAY_DELETE_ON_SHUTDOWN));
setDefaultOnCondition(props, !props.containsKey(PUSHGATEWAY_JOB_NAME),
PUSHGATEWAY_JOB_NAME, DEFAULT_PUSHGATEWAY_JOB_NAME);
setDefaultOnCondition(props, !props.containsKey(PUSHGATEWAY_RANDOM_JOB_NAME_SUFFIX),
PUSHGATEWAY_RANDOM_JOB_NAME_SUFFIX,
String.valueOf(DEFAULT_PUSHGATEWAY_RANDOM_JOB_NAME_SUFFIX));
return config;
}
}
}

View File

@@ -626,6 +626,34 @@ public class HoodieWriteConfig extends DefaultHoodieConfig {
return props.getProperty(HoodieMetricsConfig.METRICS_REPORTER_CLASS);
}
public int getPrometheusPort() {
return Integer.parseInt(props.getProperty(HoodieMetricsPrometheusConfig.PROMETHEUS_PORT));
}
public String getPushGatewayHost() {
return props.getProperty(HoodieMetricsPrometheusConfig.PUSHGATEWAY_HOST);
}
public int getPushGatewayPort() {
return Integer.parseInt(props.getProperty(HoodieMetricsPrometheusConfig.PUSHGATEWAY_PORT));
}
public int getPushGatewayReportPeriodSeconds() {
return Integer.parseInt(props.getProperty(HoodieMetricsPrometheusConfig.PUSHGATEWAY_REPORT_PERIOD_SECONDS));
}
public boolean getPushGatewayDeleteOnShutdown() {
return Boolean.parseBoolean(props.getProperty(HoodieMetricsPrometheusConfig.PUSHGATEWAY_DELETE_ON_SHUTDOWN));
}
public String getPushGatewayJobName() {
return props.getProperty(HoodieMetricsPrometheusConfig.PUSHGATEWAY_JOB_NAME);
}
public boolean getPushGatewayRandomJobNameSuffix() {
return Boolean.parseBoolean(props.getProperty(HoodieMetricsPrometheusConfig.PUSHGATEWAY_RANDOM_JOB_NAME_SUFFIX));
}
/**
* memory configs.
*/

View File

@@ -25,6 +25,8 @@ import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.metrics.datadog.DatadogMetricsReporter;
import com.codahale.metrics.MetricRegistry;
import org.apache.hudi.metrics.prometheus.PrometheusReporter;
import org.apache.hudi.metrics.prometheus.PushGatewayMetricsReporter;
import org.apache.hudi.metrics.userdefined.AbstractUserDefinedMetricsReporter;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
@@ -66,6 +68,12 @@ public class MetricsReporterFactory {
case DATADOG:
reporter = new DatadogMetricsReporter(config, registry);
break;
case PROMETHEUS_PUSHGATEWAY:
reporter = new PushGatewayMetricsReporter(config, registry);
break;
case PROMETHEUS:
reporter = new PrometheusReporter(config, registry);
break;
case CONSOLE:
reporter = new ConsoleMetricsReporter(registry);
break;

View File

@@ -19,8 +19,8 @@
package org.apache.hudi.metrics;
/**
* Types of the reporter. Right now we only support Graphite. We can include JMX and CSV in the future.
* Types of the reporter supported, hudi also supports user defined reporter.
*/
public enum MetricsReporterType {
GRAPHITE, INMEMORY, JMX, DATADOG, CONSOLE
GRAPHITE, INMEMORY, JMX, DATADOG, CONSOLE, PROMETHEUS_PUSHGATEWAY, PROMETHEUS
}

View File

@@ -0,0 +1,80 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.metrics.prometheus;
import com.codahale.metrics.MetricRegistry;
import io.prometheus.client.CollectorRegistry;
import io.prometheus.client.dropwizard.DropwizardExports;
import io.prometheus.client.exporter.HTTPServer;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.metrics.MetricsReporter;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import java.io.Closeable;
import java.net.InetSocketAddress;
/**
* Implementation of Prometheus reporter, which connects to the Http server, and get metrics
* from that server.
*/
public class PrometheusReporter extends MetricsReporter {
private static final Logger LOG = LogManager.getLogger(PrometheusReporter.class);
private HTTPServer httpServer;
private final DropwizardExports metricExports;
private final CollectorRegistry collectorRegistry;
public PrometheusReporter(HoodieWriteConfig config, MetricRegistry registry) {
int serverPort = config.getPrometheusPort();
collectorRegistry = new CollectorRegistry();
metricExports = new DropwizardExports(registry);
metricExports.register(collectorRegistry);
try {
httpServer = new HTTPServer(new InetSocketAddress(serverPort), collectorRegistry);
} catch (Exception e) {
String msg = "Could not start PrometheusReporter HTTP server on port " + serverPort;
LOG.error(msg, e);
throw new HoodieException(msg, e);
}
}
@Override
public void start() {
}
@Override
public void report() {
}
@Override
public Closeable getReporter() {
return null;
}
@Override
public void stop() {
collectorRegistry.unregister(metricExports);
if (httpServer != null) {
httpServer.stop();
}
}
}

View File

@@ -0,0 +1,84 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.metrics.prometheus;
import com.codahale.metrics.MetricFilter;
import com.codahale.metrics.MetricRegistry;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.metrics.MetricsReporter;
import java.io.Closeable;
import java.util.Random;
import java.util.concurrent.TimeUnit;
public class PushGatewayMetricsReporter extends MetricsReporter {
private final PushGatewayReporter pushGatewayReporter;
private final int periodSeconds;
private final boolean deleteShutdown;
private final String configuredJobName;
private final boolean randomSuffix;
public PushGatewayMetricsReporter(HoodieWriteConfig config, MetricRegistry registry) {
String serverHost = config.getPushGatewayHost();
int serverPort = config.getPushGatewayPort();
periodSeconds = config.getPushGatewayReportPeriodSeconds();
deleteShutdown = config.getPushGatewayDeleteOnShutdown();
configuredJobName = config.getPushGatewayJobName();
randomSuffix = config.getPushGatewayRandomJobNameSuffix();
pushGatewayReporter = new PushGatewayReporter(
registry,
MetricFilter.ALL,
TimeUnit.SECONDS,
TimeUnit.SECONDS,
getJobName(),
serverHost + ":" + serverPort,
deleteShutdown);
}
@Override
public void start() {
pushGatewayReporter.start(periodSeconds, TimeUnit.SECONDS);
}
@Override
public void report() {
pushGatewayReporter.report(null, null, null, null, null);
}
@Override
public Closeable getReporter() {
return pushGatewayReporter;
}
@Override
public void stop() {
pushGatewayReporter.stop();
}
private String getJobName() {
if (randomSuffix) {
Random random = new Random();
return configuredJobName + random.nextLong();
}
return configuredJobName;
}
}

View File

@@ -0,0 +1,95 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.metrics.prometheus;
import com.codahale.metrics.Gauge;
import com.codahale.metrics.Histogram;
import com.codahale.metrics.Counter;
import com.codahale.metrics.Meter;
import com.codahale.metrics.Timer;
import com.codahale.metrics.MetricFilter;
import com.codahale.metrics.MetricRegistry;
import com.codahale.metrics.ScheduledReporter;
import io.prometheus.client.CollectorRegistry;
import io.prometheus.client.dropwizard.DropwizardExports;
import io.prometheus.client.exporter.PushGateway;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import java.io.IOException;
import java.util.SortedMap;
import java.util.concurrent.TimeUnit;
public class PushGatewayReporter extends ScheduledReporter {
private static final Logger LOG = LogManager.getLogger(PushGatewayReporter.class);
private final PushGateway pushGateway;
private final DropwizardExports metricExports;
private final CollectorRegistry collectorRegistry;
private final String jobName;
private final boolean deleteShutdown;
protected PushGatewayReporter(MetricRegistry registry,
MetricFilter filter,
TimeUnit rateUnit,
TimeUnit durationUnit,
String jobName,
String address,
boolean deleteShutdown) {
super(registry, "hudi-push-gateway-reporter", filter, rateUnit, durationUnit);
this.jobName = jobName;
this.deleteShutdown = deleteShutdown;
collectorRegistry = new CollectorRegistry();
metricExports = new DropwizardExports(registry);
pushGateway = new PushGateway(address);
metricExports.register(collectorRegistry);
}
@Override
public void report(SortedMap<String, Gauge> gauges,
SortedMap<String, Counter> counters,
SortedMap<String, Histogram> histograms,
SortedMap<String, Meter> meters,
SortedMap<String, Timer> timers) {
try {
pushGateway.pushAdd(collectorRegistry, jobName);
} catch (IOException e) {
LOG.warn("Can't push monitoring information to pushGateway", e);
}
}
@Override
public void start(long period, TimeUnit unit) {
super.start(period, unit);
}
@Override
public void stop() {
super.stop();
try {
if (deleteShutdown) {
collectorRegistry.unregister(metricExports);
pushGateway.delete(jobName);
}
} catch (IOException e) {
LOG.warn("Failed to delete metrics from pushGateway with jobName {" + jobName + "}", e);
}
}
}

View File

@@ -0,0 +1,43 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.metrics.prometheus;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.metrics.HoodieMetrics;
import org.apache.hudi.metrics.MetricsReporterType;
import org.junit.jupiter.api.Test;
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
public class TestPrometheusReporter {
HoodieWriteConfig config = mock(HoodieWriteConfig.class);
@Test
public void testRegisterGauge() {
when(config.isMetricsOn()).thenReturn(true);
when(config.getMetricsReporterType()).thenReturn(MetricsReporterType.PROMETHEUS);
when(config.getPrometheusPort()).thenReturn(9090);
assertDoesNotThrow(() -> {
new HoodieMetrics(config, "raw_table");
});
}
}

View File

@@ -0,0 +1,47 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.metrics.prometheus;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.metrics.HoodieMetrics;
import org.apache.hudi.metrics.Metrics;
import org.apache.hudi.metrics.MetricsReporterType;
import org.junit.jupiter.api.Test;
import static org.apache.hudi.metrics.Metrics.registerGauge;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
public class TestPushGateWayReporter {
HoodieWriteConfig config = mock(HoodieWriteConfig.class);
@Test
public void testRegisterGauge() {
when(config.isMetricsOn()).thenReturn(true);
when(config.getMetricsReporterType()).thenReturn(MetricsReporterType.PROMETHEUS_PUSHGATEWAY);
when(config.getPushGatewayHost()).thenReturn("localhost");
when(config.getPushGatewayPort()).thenReturn(9091);
new HoodieMetrics(config, "raw_table");
registerGauge("pushGateWayReporter_metric", 123L);
assertEquals("123", Metrics.getInstance().getRegistry().getGauges()
.get("pushGateWayReporter_metric").getValue().toString());
}
}

22
pom.xml
View File

@@ -97,6 +97,7 @@
<hive.version>2.3.1</hive.version>
<hive.exec.classifier>core</hive.exec.classifier>
<metrics.version>4.1.1</metrics.version>
<prometheus.version>0.8.0</prometheus.version>
<spark.version>2.4.4</spark.version>
<avro.version>1.8.2</avro.version>
<scala.version>2.11.12</scala.version>
@@ -510,6 +511,27 @@
<version>${metrics.version}</version>
</dependency>
<dependency>
<groupId>io.prometheus</groupId>
<artifactId>simpleclient</artifactId>
<version>${prometheus.version}</version>
</dependency>
<dependency>
<groupId>io.prometheus</groupId>
<artifactId>simpleclient_httpserver</artifactId>
<version>${prometheus.version}</version>
</dependency>
<dependency>
<groupId>io.prometheus</groupId>
<artifactId>simpleclient_dropwizard</artifactId>
<version>${prometheus.version}</version>
</dependency>
<dependency>
<groupId>io.prometheus</groupId>
<artifactId>simpleclient_pushgateway</artifactId>
<version>${prometheus.version}</version>
</dependency>
<dependency>
<groupId>com.beust</groupId>
<artifactId>jcommander</artifactId>