diff --git a/hudi-client/pom.xml b/hudi-client/pom.xml
index 06d60171d..3cab43a63 100644
--- a/hudi-client/pom.xml
+++ b/hudi-client/pom.xml
@@ -117,6 +117,10 @@
io.dropwizard.metrics
metrics-core
+
+ io.dropwizard.metrics
+ metrics-jmx
+
com.beust
diff --git a/hudi-client/src/main/java/org/apache/hudi/config/HoodieMetricsConfig.java b/hudi-client/src/main/java/org/apache/hudi/config/HoodieMetricsConfig.java
index a21e4cc27..b17e935ee 100644
--- a/hudi-client/src/main/java/org/apache/hudi/config/HoodieMetricsConfig.java
+++ b/hudi-client/src/main/java/org/apache/hudi/config/HoodieMetricsConfig.java
@@ -101,6 +101,16 @@ public class HoodieMetricsConfig extends DefaultHoodieConfig {
return this;
}
+ public Builder toJmxHost(String host) {
+ props.setProperty(JMX_HOST, host);
+ return this;
+ }
+
+ public Builder onJmxPort(String port) {
+ props.setProperty(JMX_PORT, port);
+ return this;
+ }
+
public Builder usePrefix(String prefix) {
props.setProperty(GRAPHITE_METRIC_PREFIX, prefix);
return this;
@@ -115,8 +125,10 @@ public class HoodieMetricsConfig extends DefaultHoodieConfig {
DEFAULT_GRAPHITE_SERVER_HOST);
setDefaultOnCondition(props, !props.containsKey(GRAPHITE_SERVER_PORT), GRAPHITE_SERVER_PORT,
String.valueOf(DEFAULT_GRAPHITE_SERVER_PORT));
- setDefaultOnCondition(props, !props.containsKey(GRAPHITE_SERVER_PORT), GRAPHITE_SERVER_PORT,
- String.valueOf(DEFAULT_GRAPHITE_SERVER_PORT));
+ setDefaultOnCondition(props, !props.containsKey(JMX_HOST), JMX_HOST,
+ DEFAULT_JMX_HOST);
+ setDefaultOnCondition(props, !props.containsKey(JMX_PORT), JMX_PORT,
+ String.valueOf(DEFAULT_JMX_PORT));
return config;
}
}
diff --git a/hudi-client/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java b/hudi-client/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
index 04c1dfdb5..2d323a336 100644
--- a/hudi-client/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
+++ b/hudi-client/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
@@ -500,8 +500,8 @@ public class HoodieWriteConfig extends DefaultHoodieConfig {
return props.getProperty(HoodieMetricsConfig.JMX_HOST);
}
- public int getJmxPort() {
- return Integer.parseInt(props.getProperty(HoodieMetricsConfig.JMX_PORT));
+ public String getJmxPort() {
+ return props.getProperty(HoodieMetricsConfig.JMX_PORT);
}
/**
diff --git a/hudi-client/src/main/java/org/apache/hudi/metrics/InMemoryMetricsReporter.java b/hudi-client/src/main/java/org/apache/hudi/metrics/InMemoryMetricsReporter.java
index a0221c868..a14502457 100644
--- a/hudi-client/src/main/java/org/apache/hudi/metrics/InMemoryMetricsReporter.java
+++ b/hudi-client/src/main/java/org/apache/hudi/metrics/InMemoryMetricsReporter.java
@@ -35,4 +35,9 @@ public class InMemoryMetricsReporter extends MetricsReporter {
public Closeable getReporter() {
return null;
}
+
+ @Override
+ public void stop() {
+
+ }
}
diff --git a/hudi-client/src/main/java/org/apache/hudi/metrics/JmxMetricsReporter.java b/hudi-client/src/main/java/org/apache/hudi/metrics/JmxMetricsReporter.java
index 921dcea07..c7c596c45 100644
--- a/hudi-client/src/main/java/org/apache/hudi/metrics/JmxMetricsReporter.java
+++ b/hudi-client/src/main/java/org/apache/hudi/metrics/JmxMetricsReporter.java
@@ -18,45 +18,53 @@
package org.apache.hudi.metrics;
+import com.google.common.base.Preconditions;
+import java.lang.management.ManagementFactory;
+import javax.management.MBeanServer;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieException;
import org.apache.log4j.LogManager;
-import org.apache.log4j.Logger;
-import javax.management.remote.JMXConnectorServer;
-import javax.management.remote.JMXConnectorServerFactory;
-import javax.management.remote.JMXServiceURL;
+import com.codahale.metrics.MetricRegistry;
import java.io.Closeable;
-import java.lang.management.ManagementFactory;
-import java.rmi.registry.LocateRegistry;
-import java.util.Objects;
+import java.util.stream.IntStream;
/**
* Implementation of Jmx reporter, which used to report jmx metric.
*/
public class JmxMetricsReporter extends MetricsReporter {
- private static final Logger LOG = LogManager.getLogger(JmxMetricsReporter.class);
- private final JMXConnectorServer connector;
+ private static final org.apache.log4j.Logger LOG = LogManager.getLogger(JmxMetricsReporter.class);
- public JmxMetricsReporter(HoodieWriteConfig config) {
+ private final MetricRegistry registry;
+ private JmxReporterServer jmxReporterServer;
+
+ public JmxMetricsReporter(HoodieWriteConfig config, MetricRegistry registry) {
try {
+ this.registry = registry;
// Check the host and port here
String host = config.getJmxHost();
- int port = config.getJmxPort();
- if (host == null || port == 0) {
- throw new RuntimeException(
+ String portsConfig = config.getJmxPort();
+ if (host == null || portsConfig == null) {
+ throw new HoodieException(
String.format("Jmx cannot be initialized with host[%s] and port[%s].",
- host, port));
+ host, portsConfig));
}
- LocateRegistry.createRegistry(port);
- String serviceUrl =
- "service:jmx:rmi://" + host + ":" + port + "/jndi/rmi://" + host + ":" + port + "/jmxrmi";
- JMXServiceURL url = new JMXServiceURL(serviceUrl);
- this.connector = JMXConnectorServerFactory
- .newJMXConnectorServer(url, null, ManagementFactory.getPlatformMBeanServer());
+ int[] ports = getPortRangeFromString(portsConfig);
+ boolean successfullyStartedServer = false;
+ for (int port : ports) {
+ jmxReporterServer = createJmxReport(host, port);
+ LOG.info("Started JMX server on port " + port + ".");
+ successfullyStartedServer = true;
+ break;
+ }
+ if (!successfullyStartedServer) {
+ throw new HoodieException(
+ "Could not start JMX server on any configured port. Ports: " + portsConfig);
+ }
+ LOG.info("Configured JMXReporter with {port:" + portsConfig + "}");
} catch (Exception e) {
String msg = "Jmx initialize failed: ";
LOG.error(msg, e);
@@ -66,11 +74,10 @@ public class JmxMetricsReporter extends MetricsReporter {
@Override
public void start() {
- try {
- Objects.requireNonNull(connector, "Cannot start as the jmxReporter is null.");
- connector.start();
- } catch (Exception e) {
- throw new HoodieException(e);
+ if (jmxReporterServer != null) {
+ jmxReporterServer.start();
+ } else {
+ LOG.error("Cannot start as the jmxReporter is null.");
}
}
@@ -80,6 +87,42 @@ public class JmxMetricsReporter extends MetricsReporter {
@Override
public Closeable getReporter() {
- return null;
+ return jmxReporterServer.getReporter();
+ }
+
+ @Override
+ public void stop() {
+ Preconditions.checkNotNull(jmxReporterServer, "jmxReporterServer is not running.");
+ try {
+ jmxReporterServer.stop();
+ } catch (Exception e) {
+ throw new HoodieException("Stop jmxReporterServer fail", e);
+ }
+ }
+
+ private JmxReporterServer createJmxReport(String host, int port) {
+ MBeanServer mBeanServer = ManagementFactory.getPlatformMBeanServer();
+ return JmxReporterServer.forRegistry(registry)
+ .host(host)
+ .port(port)
+ .registerWith(mBeanServer)
+ .build();
+ }
+
+ private int[] getPortRangeFromString(String portsConfig) {
+ String range = portsConfig.trim();
+ int dashIdx = range.indexOf('-');
+ final int start;
+ final int end;
+ if (dashIdx == -1) {
+ start = Integer.parseInt(range);
+ end = Integer.parseInt(range);
+ } else {
+ start = Integer.parseInt(range.substring(0, dashIdx));
+ end = Integer.parseInt(range.substring(dashIdx + 1));
+ }
+ return IntStream.rangeClosed(start, end)
+ .filter(port -> (0 <= port && port <= 65535))
+ .toArray();
}
}
diff --git a/hudi-client/src/main/java/org/apache/hudi/metrics/JmxReporterServer.java b/hudi-client/src/main/java/org/apache/hudi/metrics/JmxReporterServer.java
new file mode 100644
index 000000000..e055af653
--- /dev/null
+++ b/hudi-client/src/main/java/org/apache/hudi/metrics/JmxReporterServer.java
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.metrics;
+
+import com.codahale.metrics.MetricRegistry;
+import com.codahale.metrics.jmx.JmxReporter;
+import com.google.common.base.Preconditions;
+import java.io.IOException;
+import java.rmi.NoSuchObjectException;
+import java.rmi.registry.LocateRegistry;
+import java.rmi.registry.Registry;
+import java.rmi.server.UnicastRemoteObject;
+import javax.management.MBeanServer;
+import javax.management.remote.JMXConnectorServer;
+import javax.management.remote.JMXConnectorServerFactory;
+import javax.management.remote.JMXServiceURL;
+import org.apache.hudi.common.util.StringUtils;
+import org.apache.hudi.exception.HoodieException;
+
+/**
+ * A reporter which publishes metric values to a JMX server.
+ */
+public class JmxReporterServer {
+
+ /**
+ * Returns a new {@link JmxReporterServer.Builder} for {@link JmxReporterServer}.
+ *
+ * @param registry the registry to report
+ * @return a {@link JmxReporterServer.Builder} instance for a {@link JmxReporterServer}
+ */
+ public static JmxReporterServer.Builder forRegistry(MetricRegistry registry) {
+ return new JmxReporterServer.Builder(registry);
+ }
+
+ /**
+ * A builder for {@link JmxReporterServer} instances.
+ */
+ public static class Builder {
+
+ private final MetricRegistry registry;
+ private MBeanServer mBeanServer;
+ private String host;
+ private int port;
+
+ private Builder(MetricRegistry registry) {
+ this.registry = registry;
+ }
+
+ public JmxReporterServer.Builder host(String host) {
+ this.host = host;
+ return this;
+ }
+
+ public JmxReporterServer.Builder port(int port) {
+ this.port = port;
+ return this;
+ }
+
+ public JmxReporterServer.Builder registerWith(MBeanServer mBeanServer) {
+ this.mBeanServer = mBeanServer;
+ return this;
+ }
+
+ public JmxReporterServer build() {
+ Preconditions.checkNotNull(registry, "registry cannot be null!");
+ Preconditions.checkNotNull(mBeanServer, "mBeanServer cannot be null!");
+ Preconditions
+ .checkArgument(!StringUtils.isNullOrEmpty(host), "host cannot be null or empty!");
+ return new JmxReporterServer(registry, host, port, mBeanServer);
+ }
+ }
+
+ private JMXConnectorServer connector;
+ private Registry rmiRegistry;
+ private JmxReporter reporter;
+
+ protected JmxReporterServer(MetricRegistry registry, String host, int port,
+ MBeanServer mBeanServer) {
+ String serviceUrl =
+ "service:jmx:rmi://localhost:" + port + "/jndi/rmi://" + host + ":" + port + "/jmxrmi";
+ try {
+ JMXServiceURL url = new JMXServiceURL(serviceUrl);
+ connector = JMXConnectorServerFactory
+ .newJMXConnectorServer(url, null, mBeanServer);
+ rmiRegistry = LocateRegistry.createRegistry(port);
+ reporter = JmxReporter.forRegistry(registry).registerWith(mBeanServer).build();
+ } catch (Exception e) {
+ throw new HoodieException("Jmx service url created " + serviceUrl, e);
+ }
+ }
+
+ public JmxReporter getReporter() {
+ return reporter;
+ }
+
+ public void start() {
+ Preconditions.checkArgument(reporter != null && connector != null,
+ "reporter or connector cannot be null!");
+ try {
+ connector.start();
+ reporter.start();
+ } catch (Exception e) {
+ throw new HoodieException("connector or reporter start failed", e);
+ }
+ }
+
+ public void stop() throws IOException {
+ stopConnector();
+ stopReport();
+ stopRmiRegistry();
+ }
+
+ private void stopRmiRegistry() {
+ if (rmiRegistry != null) {
+ try {
+ UnicastRemoteObject.unexportObject(rmiRegistry, true);
+ } catch (NoSuchObjectException e) {
+ throw new HoodieException("Could not un-export our RMI registry", e);
+ } finally {
+ rmiRegistry = null;
+ }
+ }
+ }
+
+ private void stopConnector() throws IOException {
+ if (connector != null) {
+ try {
+ connector.stop();
+ } finally {
+ connector = null;
+ }
+ }
+ }
+
+ private void stopReport() {
+ if (reporter != null) {
+ try {
+ reporter.stop();
+ } finally {
+ reporter = null;
+ }
+ }
+ }
+}
diff --git a/hudi-client/src/main/java/org/apache/hudi/metrics/Metrics.java b/hudi-client/src/main/java/org/apache/hudi/metrics/Metrics.java
index 533208f7f..253813394 100644
--- a/hudi-client/src/main/java/org/apache/hudi/metrics/Metrics.java
+++ b/hudi-client/src/main/java/org/apache/hudi/metrics/Metrics.java
@@ -33,6 +33,7 @@ import java.io.Closeable;
* This is the main class of the metrics system.
*/
public class Metrics {
+
private static final Logger LOG = LogManager.getLogger(Metrics.class);
private static volatile boolean initialized = false;
@@ -47,11 +48,12 @@ public class Metrics {
if (reporter == null) {
throw new RuntimeException("Cannot initialize Reporter.");
}
- // reporter.start();
+ reporter.start();
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
try {
reporter.report();
+ reporter.stop();
Closeables.close(reporter.getReporter(), true);
} catch (Exception e) {
e.printStackTrace();
diff --git a/hudi-client/src/main/java/org/apache/hudi/metrics/MetricsGraphiteReporter.java b/hudi-client/src/main/java/org/apache/hudi/metrics/MetricsGraphiteReporter.java
index aac6c708f..9855ac0b0 100644
--- a/hudi-client/src/main/java/org/apache/hudi/metrics/MetricsGraphiteReporter.java
+++ b/hudi-client/src/main/java/org/apache/hudi/metrics/MetricsGraphiteReporter.java
@@ -87,4 +87,11 @@ public class MetricsGraphiteReporter extends MetricsReporter {
return GraphiteReporter.forRegistry(registry).prefixedWith(reporterPrefix).convertRatesTo(TimeUnit.SECONDS)
.convertDurationsTo(TimeUnit.MILLISECONDS).filter(MetricFilter.ALL).build(graphite);
}
+
+ @Override
+ public void stop() {
+ if (graphiteReporter != null) {
+ graphiteReporter.stop();
+ }
+ }
}
diff --git a/hudi-client/src/main/java/org/apache/hudi/metrics/MetricsReporter.java b/hudi-client/src/main/java/org/apache/hudi/metrics/MetricsReporter.java
index de52f35b1..773bb3bfd 100644
--- a/hudi-client/src/main/java/org/apache/hudi/metrics/MetricsReporter.java
+++ b/hudi-client/src/main/java/org/apache/hudi/metrics/MetricsReporter.java
@@ -36,4 +36,9 @@ public abstract class MetricsReporter {
public abstract void report();
public abstract Closeable getReporter();
+
+ /**
+ * Stop this reporter. Should be used to stop channels, streams and release resources.
+ */
+ public abstract void stop();
}
diff --git a/hudi-client/src/main/java/org/apache/hudi/metrics/MetricsReporterFactory.java b/hudi-client/src/main/java/org/apache/hudi/metrics/MetricsReporterFactory.java
index b9d433d94..8a3a59279 100644
--- a/hudi-client/src/main/java/org/apache/hudi/metrics/MetricsReporterFactory.java
+++ b/hudi-client/src/main/java/org/apache/hudi/metrics/MetricsReporterFactory.java
@@ -42,7 +42,7 @@ public class MetricsReporterFactory {
reporter = new InMemoryMetricsReporter();
break;
case JMX:
- reporter = new JmxMetricsReporter(config);
+ reporter = new JmxMetricsReporter(config, registry);
break;
default:
LOG.error("Reporter type[" + type + "] is not supported.");
diff --git a/hudi-client/src/test/java/org/apache/hudi/metrics/TestHoodieJmxMetrics.java b/hudi-client/src/test/java/org/apache/hudi/metrics/TestHoodieJmxMetrics.java
index 72b218b14..fa0f1da45 100644
--- a/hudi-client/src/test/java/org/apache/hudi/metrics/TestHoodieJmxMetrics.java
+++ b/hudi-client/src/test/java/org/apache/hudi/metrics/TestHoodieJmxMetrics.java
@@ -18,10 +18,8 @@
package org.apache.hudi.metrics;
-import org.apache.hudi.config.HoodieMetricsConfig;
import org.apache.hudi.config.HoodieWriteConfig;
-import org.junit.Before;
import org.junit.Test;
import static org.apache.hudi.metrics.Metrics.registerGauge;
@@ -34,20 +32,29 @@ import static org.mockito.Mockito.when;
*/
public class TestHoodieJmxMetrics {
- @Before
- public void start() {
- HoodieWriteConfig config = mock(HoodieWriteConfig.class);
- when(config.isMetricsOn()).thenReturn(true);
- when(config.getMetricsReporterType()).thenReturn(MetricsReporterType.JMX);
- when(config.getJmxHost()).thenReturn(HoodieMetricsConfig.DEFAULT_JMX_HOST);
- when(config.getJmxPort()).thenReturn(HoodieMetricsConfig.DEFAULT_JMX_PORT);
- new HoodieMetrics(config, "raw_table");
- }
+ HoodieWriteConfig config = mock(HoodieWriteConfig.class);
@Test
public void testRegisterGauge() {
- registerGauge("jmx_metric", 123L);
+ when(config.isMetricsOn()).thenReturn(true);
+ when(config.getMetricsReporterType()).thenReturn(MetricsReporterType.JMX);
+ when(config.getJmxHost()).thenReturn("localhost");
+ when(config.getJmxPort()).thenReturn("9889");
+ new HoodieMetrics(config, "raw_table");
+ registerGauge("jmx_metric1", 123L);
assertEquals("123", Metrics.getInstance().getRegistry().getGauges()
- .get("jmx_metric").getValue().toString());
+ .get("jmx_metric1").getValue().toString());
+ }
+
+ @Test
+ public void testRegisterGaugeByRangerPort() {
+ when(config.isMetricsOn()).thenReturn(true);
+ when(config.getMetricsReporterType()).thenReturn(MetricsReporterType.JMX);
+ when(config.getJmxHost()).thenReturn("localhost");
+ when(config.getJmxPort()).thenReturn("1000-5000");
+ new HoodieMetrics(config, "raw_table");
+ registerGauge("jmx_metric2", 123L);
+ assertEquals("123", Metrics.getInstance().getRegistry().getGauges()
+ .get("jmx_metric2").getValue().toString());
}
}
diff --git a/pom.xml b/pom.xml
index 62427e560..758a0495b 100644
--- a/pom.xml
+++ b/pom.xml
@@ -516,6 +516,11 @@
metrics-core
${metrics.version}
+
+ io.dropwizard.metrics
+ metrics-jmx
+ ${metrics.version}
+
com.beust