[HUDI-209] Implement JMX metrics reporter (#1106)
This commit is contained in:
@@ -117,6 +117,10 @@
|
||||
<groupId>io.dropwizard.metrics</groupId>
|
||||
<artifactId>metrics-core</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>io.dropwizard.metrics</groupId>
|
||||
<artifactId>metrics-jmx</artifactId>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>com.beust</groupId>
|
||||
|
||||
@@ -101,6 +101,16 @@ public class HoodieMetricsConfig extends DefaultHoodieConfig {
|
||||
return this;
|
||||
}
|
||||
|
||||
public Builder toJmxHost(String host) {
|
||||
props.setProperty(JMX_HOST, host);
|
||||
return this;
|
||||
}
|
||||
|
||||
public Builder onJmxPort(String port) {
|
||||
props.setProperty(JMX_PORT, port);
|
||||
return this;
|
||||
}
|
||||
|
||||
public Builder usePrefix(String prefix) {
|
||||
props.setProperty(GRAPHITE_METRIC_PREFIX, prefix);
|
||||
return this;
|
||||
@@ -115,8 +125,10 @@ public class HoodieMetricsConfig extends DefaultHoodieConfig {
|
||||
DEFAULT_GRAPHITE_SERVER_HOST);
|
||||
setDefaultOnCondition(props, !props.containsKey(GRAPHITE_SERVER_PORT), GRAPHITE_SERVER_PORT,
|
||||
String.valueOf(DEFAULT_GRAPHITE_SERVER_PORT));
|
||||
setDefaultOnCondition(props, !props.containsKey(GRAPHITE_SERVER_PORT), GRAPHITE_SERVER_PORT,
|
||||
String.valueOf(DEFAULT_GRAPHITE_SERVER_PORT));
|
||||
setDefaultOnCondition(props, !props.containsKey(JMX_HOST), JMX_HOST,
|
||||
DEFAULT_JMX_HOST);
|
||||
setDefaultOnCondition(props, !props.containsKey(JMX_PORT), JMX_PORT,
|
||||
String.valueOf(DEFAULT_JMX_PORT));
|
||||
return config;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -500,8 +500,8 @@ public class HoodieWriteConfig extends DefaultHoodieConfig {
|
||||
return props.getProperty(HoodieMetricsConfig.JMX_HOST);
|
||||
}
|
||||
|
||||
public int getJmxPort() {
|
||||
return Integer.parseInt(props.getProperty(HoodieMetricsConfig.JMX_PORT));
|
||||
public String getJmxPort() {
|
||||
return props.getProperty(HoodieMetricsConfig.JMX_PORT);
|
||||
}
|
||||
|
||||
/**
|
||||
|
||||
@@ -35,4 +35,9 @@ public class InMemoryMetricsReporter extends MetricsReporter {
|
||||
public Closeable getReporter() {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void stop() {
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
@@ -18,45 +18,53 @@
|
||||
|
||||
package org.apache.hudi.metrics;
|
||||
|
||||
import com.google.common.base.Preconditions;
|
||||
import java.lang.management.ManagementFactory;
|
||||
import javax.management.MBeanServer;
|
||||
import org.apache.hudi.config.HoodieWriteConfig;
|
||||
import org.apache.hudi.exception.HoodieException;
|
||||
|
||||
import org.apache.log4j.LogManager;
|
||||
import org.apache.log4j.Logger;
|
||||
|
||||
import javax.management.remote.JMXConnectorServer;
|
||||
import javax.management.remote.JMXConnectorServerFactory;
|
||||
import javax.management.remote.JMXServiceURL;
|
||||
import com.codahale.metrics.MetricRegistry;
|
||||
|
||||
import java.io.Closeable;
|
||||
import java.lang.management.ManagementFactory;
|
||||
import java.rmi.registry.LocateRegistry;
|
||||
import java.util.Objects;
|
||||
import java.util.stream.IntStream;
|
||||
|
||||
/**
|
||||
* Implementation of Jmx reporter, which used to report jmx metric.
|
||||
*/
|
||||
public class JmxMetricsReporter extends MetricsReporter {
|
||||
|
||||
private static final Logger LOG = LogManager.getLogger(JmxMetricsReporter.class);
|
||||
private final JMXConnectorServer connector;
|
||||
private static final org.apache.log4j.Logger LOG = LogManager.getLogger(JmxMetricsReporter.class);
|
||||
|
||||
public JmxMetricsReporter(HoodieWriteConfig config) {
|
||||
private final MetricRegistry registry;
|
||||
private JmxReporterServer jmxReporterServer;
|
||||
|
||||
public JmxMetricsReporter(HoodieWriteConfig config, MetricRegistry registry) {
|
||||
try {
|
||||
this.registry = registry;
|
||||
// Check the host and port here
|
||||
String host = config.getJmxHost();
|
||||
int port = config.getJmxPort();
|
||||
if (host == null || port == 0) {
|
||||
throw new RuntimeException(
|
||||
String portsConfig = config.getJmxPort();
|
||||
if (host == null || portsConfig == null) {
|
||||
throw new HoodieException(
|
||||
String.format("Jmx cannot be initialized with host[%s] and port[%s].",
|
||||
host, port));
|
||||
host, portsConfig));
|
||||
}
|
||||
LocateRegistry.createRegistry(port);
|
||||
String serviceUrl =
|
||||
"service:jmx:rmi://" + host + ":" + port + "/jndi/rmi://" + host + ":" + port + "/jmxrmi";
|
||||
JMXServiceURL url = new JMXServiceURL(serviceUrl);
|
||||
this.connector = JMXConnectorServerFactory
|
||||
.newJMXConnectorServer(url, null, ManagementFactory.getPlatformMBeanServer());
|
||||
int[] ports = getPortRangeFromString(portsConfig);
|
||||
boolean successfullyStartedServer = false;
|
||||
for (int port : ports) {
|
||||
jmxReporterServer = createJmxReport(host, port);
|
||||
LOG.info("Started JMX server on port " + port + ".");
|
||||
successfullyStartedServer = true;
|
||||
break;
|
||||
}
|
||||
if (!successfullyStartedServer) {
|
||||
throw new HoodieException(
|
||||
"Could not start JMX server on any configured port. Ports: " + portsConfig);
|
||||
}
|
||||
LOG.info("Configured JMXReporter with {port:" + portsConfig + "}");
|
||||
} catch (Exception e) {
|
||||
String msg = "Jmx initialize failed: ";
|
||||
LOG.error(msg, e);
|
||||
@@ -66,11 +74,10 @@ public class JmxMetricsReporter extends MetricsReporter {
|
||||
|
||||
@Override
|
||||
public void start() {
|
||||
try {
|
||||
Objects.requireNonNull(connector, "Cannot start as the jmxReporter is null.");
|
||||
connector.start();
|
||||
} catch (Exception e) {
|
||||
throw new HoodieException(e);
|
||||
if (jmxReporterServer != null) {
|
||||
jmxReporterServer.start();
|
||||
} else {
|
||||
LOG.error("Cannot start as the jmxReporter is null.");
|
||||
}
|
||||
}
|
||||
|
||||
@@ -80,6 +87,42 @@ public class JmxMetricsReporter extends MetricsReporter {
|
||||
|
||||
@Override
|
||||
public Closeable getReporter() {
|
||||
return null;
|
||||
return jmxReporterServer.getReporter();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void stop() {
|
||||
Preconditions.checkNotNull(jmxReporterServer, "jmxReporterServer is not running.");
|
||||
try {
|
||||
jmxReporterServer.stop();
|
||||
} catch (Exception e) {
|
||||
throw new HoodieException("Stop jmxReporterServer fail", e);
|
||||
}
|
||||
}
|
||||
|
||||
private JmxReporterServer createJmxReport(String host, int port) {
|
||||
MBeanServer mBeanServer = ManagementFactory.getPlatformMBeanServer();
|
||||
return JmxReporterServer.forRegistry(registry)
|
||||
.host(host)
|
||||
.port(port)
|
||||
.registerWith(mBeanServer)
|
||||
.build();
|
||||
}
|
||||
|
||||
private int[] getPortRangeFromString(String portsConfig) {
|
||||
String range = portsConfig.trim();
|
||||
int dashIdx = range.indexOf('-');
|
||||
final int start;
|
||||
final int end;
|
||||
if (dashIdx == -1) {
|
||||
start = Integer.parseInt(range);
|
||||
end = Integer.parseInt(range);
|
||||
} else {
|
||||
start = Integer.parseInt(range.substring(0, dashIdx));
|
||||
end = Integer.parseInt(range.substring(dashIdx + 1));
|
||||
}
|
||||
return IntStream.rangeClosed(start, end)
|
||||
.filter(port -> (0 <= port && port <= 65535))
|
||||
.toArray();
|
||||
}
|
||||
}
|
||||
|
||||
@@ -0,0 +1,160 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hudi.metrics;
|
||||
|
||||
import com.codahale.metrics.MetricRegistry;
|
||||
import com.codahale.metrics.jmx.JmxReporter;
|
||||
import com.google.common.base.Preconditions;
|
||||
import java.io.IOException;
|
||||
import java.rmi.NoSuchObjectException;
|
||||
import java.rmi.registry.LocateRegistry;
|
||||
import java.rmi.registry.Registry;
|
||||
import java.rmi.server.UnicastRemoteObject;
|
||||
import javax.management.MBeanServer;
|
||||
import javax.management.remote.JMXConnectorServer;
|
||||
import javax.management.remote.JMXConnectorServerFactory;
|
||||
import javax.management.remote.JMXServiceURL;
|
||||
import org.apache.hudi.common.util.StringUtils;
|
||||
import org.apache.hudi.exception.HoodieException;
|
||||
|
||||
/**
|
||||
* A reporter which publishes metric values to a JMX server.
|
||||
*/
|
||||
public class JmxReporterServer {
|
||||
|
||||
/**
|
||||
* Returns a new {@link JmxReporterServer.Builder} for {@link JmxReporterServer}.
|
||||
*
|
||||
* @param registry the registry to report
|
||||
* @return a {@link JmxReporterServer.Builder} instance for a {@link JmxReporterServer}
|
||||
*/
|
||||
public static JmxReporterServer.Builder forRegistry(MetricRegistry registry) {
|
||||
return new JmxReporterServer.Builder(registry);
|
||||
}
|
||||
|
||||
/**
|
||||
* A builder for {@link JmxReporterServer} instances.
|
||||
*/
|
||||
public static class Builder {
|
||||
|
||||
private final MetricRegistry registry;
|
||||
private MBeanServer mBeanServer;
|
||||
private String host;
|
||||
private int port;
|
||||
|
||||
private Builder(MetricRegistry registry) {
|
||||
this.registry = registry;
|
||||
}
|
||||
|
||||
public JmxReporterServer.Builder host(String host) {
|
||||
this.host = host;
|
||||
return this;
|
||||
}
|
||||
|
||||
public JmxReporterServer.Builder port(int port) {
|
||||
this.port = port;
|
||||
return this;
|
||||
}
|
||||
|
||||
public JmxReporterServer.Builder registerWith(MBeanServer mBeanServer) {
|
||||
this.mBeanServer = mBeanServer;
|
||||
return this;
|
||||
}
|
||||
|
||||
public JmxReporterServer build() {
|
||||
Preconditions.checkNotNull(registry, "registry cannot be null!");
|
||||
Preconditions.checkNotNull(mBeanServer, "mBeanServer cannot be null!");
|
||||
Preconditions
|
||||
.checkArgument(!StringUtils.isNullOrEmpty(host), "host cannot be null or empty!");
|
||||
return new JmxReporterServer(registry, host, port, mBeanServer);
|
||||
}
|
||||
}
|
||||
|
||||
private JMXConnectorServer connector;
|
||||
private Registry rmiRegistry;
|
||||
private JmxReporter reporter;
|
||||
|
||||
protected JmxReporterServer(MetricRegistry registry, String host, int port,
|
||||
MBeanServer mBeanServer) {
|
||||
String serviceUrl =
|
||||
"service:jmx:rmi://localhost:" + port + "/jndi/rmi://" + host + ":" + port + "/jmxrmi";
|
||||
try {
|
||||
JMXServiceURL url = new JMXServiceURL(serviceUrl);
|
||||
connector = JMXConnectorServerFactory
|
||||
.newJMXConnectorServer(url, null, mBeanServer);
|
||||
rmiRegistry = LocateRegistry.createRegistry(port);
|
||||
reporter = JmxReporter.forRegistry(registry).registerWith(mBeanServer).build();
|
||||
} catch (Exception e) {
|
||||
throw new HoodieException("Jmx service url created " + serviceUrl, e);
|
||||
}
|
||||
}
|
||||
|
||||
public JmxReporter getReporter() {
|
||||
return reporter;
|
||||
}
|
||||
|
||||
public void start() {
|
||||
Preconditions.checkArgument(reporter != null && connector != null,
|
||||
"reporter or connector cannot be null!");
|
||||
try {
|
||||
connector.start();
|
||||
reporter.start();
|
||||
} catch (Exception e) {
|
||||
throw new HoodieException("connector or reporter start failed", e);
|
||||
}
|
||||
}
|
||||
|
||||
public void stop() throws IOException {
|
||||
stopConnector();
|
||||
stopReport();
|
||||
stopRmiRegistry();
|
||||
}
|
||||
|
||||
private void stopRmiRegistry() {
|
||||
if (rmiRegistry != null) {
|
||||
try {
|
||||
UnicastRemoteObject.unexportObject(rmiRegistry, true);
|
||||
} catch (NoSuchObjectException e) {
|
||||
throw new HoodieException("Could not un-export our RMI registry", e);
|
||||
} finally {
|
||||
rmiRegistry = null;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private void stopConnector() throws IOException {
|
||||
if (connector != null) {
|
||||
try {
|
||||
connector.stop();
|
||||
} finally {
|
||||
connector = null;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private void stopReport() {
|
||||
if (reporter != null) {
|
||||
try {
|
||||
reporter.stop();
|
||||
} finally {
|
||||
reporter = null;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -33,6 +33,7 @@ import java.io.Closeable;
|
||||
* This is the main class of the metrics system.
|
||||
*/
|
||||
public class Metrics {
|
||||
|
||||
private static final Logger LOG = LogManager.getLogger(Metrics.class);
|
||||
|
||||
private static volatile boolean initialized = false;
|
||||
@@ -47,11 +48,12 @@ public class Metrics {
|
||||
if (reporter == null) {
|
||||
throw new RuntimeException("Cannot initialize Reporter.");
|
||||
}
|
||||
// reporter.start();
|
||||
reporter.start();
|
||||
|
||||
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
|
||||
try {
|
||||
reporter.report();
|
||||
reporter.stop();
|
||||
Closeables.close(reporter.getReporter(), true);
|
||||
} catch (Exception e) {
|
||||
e.printStackTrace();
|
||||
|
||||
@@ -87,4 +87,11 @@ public class MetricsGraphiteReporter extends MetricsReporter {
|
||||
return GraphiteReporter.forRegistry(registry).prefixedWith(reporterPrefix).convertRatesTo(TimeUnit.SECONDS)
|
||||
.convertDurationsTo(TimeUnit.MILLISECONDS).filter(MetricFilter.ALL).build(graphite);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void stop() {
|
||||
if (graphiteReporter != null) {
|
||||
graphiteReporter.stop();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -36,4 +36,9 @@ public abstract class MetricsReporter {
|
||||
public abstract void report();
|
||||
|
||||
public abstract Closeable getReporter();
|
||||
|
||||
/**
|
||||
* Stop this reporter. Should be used to stop channels, streams and release resources.
|
||||
*/
|
||||
public abstract void stop();
|
||||
}
|
||||
|
||||
@@ -42,7 +42,7 @@ public class MetricsReporterFactory {
|
||||
reporter = new InMemoryMetricsReporter();
|
||||
break;
|
||||
case JMX:
|
||||
reporter = new JmxMetricsReporter(config);
|
||||
reporter = new JmxMetricsReporter(config, registry);
|
||||
break;
|
||||
default:
|
||||
LOG.error("Reporter type[" + type + "] is not supported.");
|
||||
|
||||
@@ -18,10 +18,8 @@
|
||||
|
||||
package org.apache.hudi.metrics;
|
||||
|
||||
import org.apache.hudi.config.HoodieMetricsConfig;
|
||||
import org.apache.hudi.config.HoodieWriteConfig;
|
||||
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
|
||||
import static org.apache.hudi.metrics.Metrics.registerGauge;
|
||||
@@ -34,20 +32,29 @@ import static org.mockito.Mockito.when;
|
||||
*/
|
||||
public class TestHoodieJmxMetrics {
|
||||
|
||||
@Before
|
||||
public void start() {
|
||||
HoodieWriteConfig config = mock(HoodieWriteConfig.class);
|
||||
when(config.isMetricsOn()).thenReturn(true);
|
||||
when(config.getMetricsReporterType()).thenReturn(MetricsReporterType.JMX);
|
||||
when(config.getJmxHost()).thenReturn(HoodieMetricsConfig.DEFAULT_JMX_HOST);
|
||||
when(config.getJmxPort()).thenReturn(HoodieMetricsConfig.DEFAULT_JMX_PORT);
|
||||
new HoodieMetrics(config, "raw_table");
|
||||
}
|
||||
HoodieWriteConfig config = mock(HoodieWriteConfig.class);
|
||||
|
||||
@Test
|
||||
public void testRegisterGauge() {
|
||||
registerGauge("jmx_metric", 123L);
|
||||
when(config.isMetricsOn()).thenReturn(true);
|
||||
when(config.getMetricsReporterType()).thenReturn(MetricsReporterType.JMX);
|
||||
when(config.getJmxHost()).thenReturn("localhost");
|
||||
when(config.getJmxPort()).thenReturn("9889");
|
||||
new HoodieMetrics(config, "raw_table");
|
||||
registerGauge("jmx_metric1", 123L);
|
||||
assertEquals("123", Metrics.getInstance().getRegistry().getGauges()
|
||||
.get("jmx_metric").getValue().toString());
|
||||
.get("jmx_metric1").getValue().toString());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testRegisterGaugeByRangerPort() {
|
||||
when(config.isMetricsOn()).thenReturn(true);
|
||||
when(config.getMetricsReporterType()).thenReturn(MetricsReporterType.JMX);
|
||||
when(config.getJmxHost()).thenReturn("localhost");
|
||||
when(config.getJmxPort()).thenReturn("1000-5000");
|
||||
new HoodieMetrics(config, "raw_table");
|
||||
registerGauge("jmx_metric2", 123L);
|
||||
assertEquals("123", Metrics.getInstance().getRegistry().getGauges()
|
||||
.get("jmx_metric2").getValue().toString());
|
||||
}
|
||||
}
|
||||
|
||||
5
pom.xml
5
pom.xml
@@ -516,6 +516,11 @@
|
||||
<artifactId>metrics-core</artifactId>
|
||||
<version>${metrics.version}</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>io.dropwizard.metrics</groupId>
|
||||
<artifactId>metrics-jmx</artifactId>
|
||||
<version>${metrics.version}</version>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>com.beust</groupId>
|
||||
|
||||
Reference in New Issue
Block a user