From bedbb825e06d479f90bb09d09be4ade37fdaa8b3 Mon Sep 17 00:00:00 2001 From: Abhishek Modi Date: Tue, 18 Aug 2020 07:42:05 -0700 Subject: [PATCH] [HUDI-1025] Meter RPC calls in HoodieWrapperFileSystem (#1916) --- .../java/org/apache/hudi/metrics/Metrics.java | 13 +++ .../common/fs/HoodieWrapperFileSystem.java | 31 ++++++ .../apache/hudi/common/metrics/Counter.java | 43 ++++++++ .../apache/hudi/common/metrics/Metric.java | 26 +++++ .../apache/hudi/common/metrics/Registry.java | 104 ++++++++++++++++++ .../org/apache/hudi/common/TestRegistry.java | 75 +++++++++++++ 6 files changed, 292 insertions(+) create mode 100644 hudi-common/src/main/java/org/apache/hudi/common/metrics/Counter.java create mode 100644 hudi-common/src/main/java/org/apache/hudi/common/metrics/Metric.java create mode 100644 hudi-common/src/main/java/org/apache/hudi/common/metrics/Registry.java create mode 100644 hudi-common/src/test/java/org/apache/hudi/common/TestRegistry.java diff --git a/hudi-client/src/main/java/org/apache/hudi/metrics/Metrics.java b/hudi-client/src/main/java/org/apache/hudi/metrics/Metrics.java index a68ac2602..2b524a74c 100644 --- a/hudi-client/src/main/java/org/apache/hudi/metrics/Metrics.java +++ b/hudi-client/src/main/java/org/apache/hudi/metrics/Metrics.java @@ -18,6 +18,8 @@ package org.apache.hudi.metrics; +import org.apache.hudi.common.metrics.Registry; +import org.apache.hudi.common.util.Option; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.exception.HoodieException; @@ -26,6 +28,7 @@ import org.apache.log4j.LogManager; import org.apache.log4j.Logger; import java.io.Closeable; +import java.util.Map; /** * This is the main class of the metrics system. @@ -50,6 +53,7 @@ public class Metrics { Runtime.getRuntime().addShutdownHook(new Thread(() -> { try { + registerHoodieCommonMetrics(); reporter.report(); if (getReporter() != null) { getReporter().close(); @@ -60,6 +64,10 @@ public class Metrics { })); } + private void registerHoodieCommonMetrics() { + registerGauges(Registry.getAllMetrics(true, true), Option.empty()); + } + public static Metrics getInstance() { assert initialized; return metrics; @@ -77,6 +85,11 @@ public class Metrics { initialized = true; } + public static void registerGauges(Map metricsMap, Option prefix) { + String metricPrefix = prefix.isPresent() ? prefix.get() + "." : ""; + metricsMap.forEach((k, v) -> registerGauge(metricPrefix + k, v)); + } + public static void registerGauge(String metricName, final long value) { try { MetricRegistry registry = Metrics.getInstance().getRegistry(); diff --git a/hudi-common/src/main/java/org/apache/hudi/common/fs/HoodieWrapperFileSystem.java b/hudi-common/src/main/java/org/apache/hudi/common/fs/HoodieWrapperFileSystem.java index 7f8874f41..c3f6189e8 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/fs/HoodieWrapperFileSystem.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/fs/HoodieWrapperFileSystem.java @@ -18,6 +18,7 @@ package org.apache.hudi.common.fs; +import org.apache.hudi.common.metrics.Registry; import org.apache.hudi.exception.HoodieException; import org.apache.hudi.exception.HoodieIOException; @@ -64,10 +65,15 @@ public class HoodieWrapperFileSystem extends FileSystem { public static final String HOODIE_SCHEME_PREFIX = "hoodie-"; + private enum MetricName { + create, rename, delete, listStatus, mkdirs, getFileStatus, globStatus, listFiles + } + private ConcurrentMap openStreams = new ConcurrentHashMap<>(); private FileSystem fileSystem; private URI uri; private ConsistencyGuard consistencyGuard = new NoOpConsistencyGuard(); + private Registry metricsRegistry = Registry.getRegistry(this.getClass().getSimpleName()); public HoodieWrapperFileSystem() {} @@ -140,6 +146,7 @@ public class HoodieWrapperFileSystem extends FileSystem { @Override public FSDataOutputStream create(Path f, FsPermission permission, boolean overwrite, int bufferSize, short replication, long blockSize, Progressable progress) throws IOException { + this.metricsRegistry.increment(MetricName.create.name()); final Path translatedPath = convertToDefaultPath(f); return wrapOutputStream(f, fileSystem.create(translatedPath, permission, overwrite, bufferSize, replication, blockSize, progress)); @@ -159,43 +166,51 @@ public class HoodieWrapperFileSystem extends FileSystem { @Override public FSDataOutputStream create(Path f, boolean overwrite) throws IOException { + this.metricsRegistry.increment(MetricName.create.name()); return wrapOutputStream(f, fileSystem.create(convertToDefaultPath(f), overwrite)); } @Override public FSDataOutputStream create(Path f) throws IOException { + this.metricsRegistry.increment(MetricName.create.name()); return wrapOutputStream(f, fileSystem.create(convertToDefaultPath(f))); } @Override public FSDataOutputStream create(Path f, Progressable progress) throws IOException { + this.metricsRegistry.increment(MetricName.create.name()); return wrapOutputStream(f, fileSystem.create(convertToDefaultPath(f), progress)); } @Override public FSDataOutputStream create(Path f, short replication) throws IOException { + this.metricsRegistry.increment(MetricName.create.name()); return wrapOutputStream(f, fileSystem.create(convertToDefaultPath(f), replication)); } @Override public FSDataOutputStream create(Path f, short replication, Progressable progress) throws IOException { + this.metricsRegistry.increment(MetricName.create.name()); return wrapOutputStream(f, fileSystem.create(convertToDefaultPath(f), replication, progress)); } @Override public FSDataOutputStream create(Path f, boolean overwrite, int bufferSize) throws IOException { + this.metricsRegistry.increment(MetricName.create.name()); return wrapOutputStream(f, fileSystem.create(convertToDefaultPath(f), overwrite, bufferSize)); } @Override public FSDataOutputStream create(Path f, boolean overwrite, int bufferSize, Progressable progress) throws IOException { + this.metricsRegistry.increment(MetricName.create.name()); return wrapOutputStream(f, fileSystem.create(convertToDefaultPath(f), overwrite, bufferSize, progress)); } @Override public FSDataOutputStream create(Path f, boolean overwrite, int bufferSize, short replication, long blockSize, Progressable progress) throws IOException { + this.metricsRegistry.increment(MetricName.create.name()); return wrapOutputStream(f, fileSystem.create(convertToDefaultPath(f), overwrite, bufferSize, replication, blockSize, progress)); } @@ -203,6 +218,7 @@ public class HoodieWrapperFileSystem extends FileSystem { @Override public FSDataOutputStream create(Path f, FsPermission permission, EnumSet flags, int bufferSize, short replication, long blockSize, Progressable progress) throws IOException { + this.metricsRegistry.increment(MetricName.create.name()); return wrapOutputStream(f, fileSystem.create(convertToDefaultPath(f), permission, flags, bufferSize, replication, blockSize, progress)); } @@ -210,6 +226,7 @@ public class HoodieWrapperFileSystem extends FileSystem { @Override public FSDataOutputStream create(Path f, FsPermission permission, EnumSet flags, int bufferSize, short replication, long blockSize, Progressable progress, Options.ChecksumOpt checksumOpt) throws IOException { + this.metricsRegistry.increment(MetricName.create.name()); return wrapOutputStream(f, fileSystem.create(convertToDefaultPath(f), permission, flags, bufferSize, replication, blockSize, progress, checksumOpt)); } @@ -217,6 +234,7 @@ public class HoodieWrapperFileSystem extends FileSystem { @Override public FSDataOutputStream create(Path f, boolean overwrite, int bufferSize, short replication, long blockSize) throws IOException { + this.metricsRegistry.increment(MetricName.create.name()); return wrapOutputStream(f, fileSystem.create(convertToDefaultPath(f), overwrite, bufferSize, replication, blockSize)); } @@ -228,6 +246,7 @@ public class HoodieWrapperFileSystem extends FileSystem { @Override public boolean rename(Path src, Path dst) throws IOException { + this.metricsRegistry.increment(MetricName.rename.name()); try { consistencyGuard.waitTillFileAppears(convertToDefaultPath(src)); } catch (TimeoutException e) { @@ -254,6 +273,7 @@ public class HoodieWrapperFileSystem extends FileSystem { @Override public boolean delete(Path f, boolean recursive) throws IOException { + this.metricsRegistry.increment(MetricName.delete.name()); boolean success = fileSystem.delete(convertToDefaultPath(f), recursive); if (success) { @@ -268,6 +288,7 @@ public class HoodieWrapperFileSystem extends FileSystem { @Override public FileStatus[] listStatus(Path f) throws IOException { + this.metricsRegistry.increment(MetricName.listStatus.name()); return fileSystem.listStatus(convertToDefaultPath(f)); } @@ -283,6 +304,7 @@ public class HoodieWrapperFileSystem extends FileSystem { @Override public boolean mkdirs(Path f, FsPermission permission) throws IOException { + this.metricsRegistry.increment(MetricName.mkdirs.name()); boolean success = fileSystem.mkdirs(convertToDefaultPath(f), permission); if (success) { try { @@ -296,6 +318,7 @@ public class HoodieWrapperFileSystem extends FileSystem { @Override public FileStatus getFileStatus(Path f) throws IOException { + this.metricsRegistry.increment(MetricName.getFileStatus.name()); try { consistencyGuard.waitTillFileAppears(convertToDefaultPath(f)); } catch (TimeoutException e) { @@ -439,6 +462,7 @@ public class HoodieWrapperFileSystem extends FileSystem { @Override public boolean delete(Path f) throws IOException { + this.metricsRegistry.increment(MetricName.delete.name()); return delete(f, true); } @@ -484,26 +508,31 @@ public class HoodieWrapperFileSystem extends FileSystem { @Override public FileStatus[] listStatus(Path f, PathFilter filter) throws IOException { + this.metricsRegistry.increment(MetricName.listStatus.name()); return fileSystem.listStatus(convertToDefaultPath(f), filter); } @Override public FileStatus[] listStatus(Path[] files) throws IOException { + this.metricsRegistry.increment(MetricName.listStatus.name()); return fileSystem.listStatus(convertDefaults(files)); } @Override public FileStatus[] listStatus(Path[] files, PathFilter filter) throws IOException { + this.metricsRegistry.increment(MetricName.listStatus.name()); return fileSystem.listStatus(convertDefaults(files), filter); } @Override public FileStatus[] globStatus(Path pathPattern) throws IOException { + this.metricsRegistry.increment(MetricName.globStatus.name()); return fileSystem.globStatus(convertToDefaultPath(pathPattern)); } @Override public FileStatus[] globStatus(Path pathPattern, PathFilter filter) throws IOException { + this.metricsRegistry.increment(MetricName.globStatus.name()); return fileSystem.globStatus(convertToDefaultPath(pathPattern), filter); } @@ -514,6 +543,7 @@ public class HoodieWrapperFileSystem extends FileSystem { @Override public RemoteIterator listFiles(Path f, boolean recursive) throws IOException { + this.metricsRegistry.increment(MetricName.listFiles.name()); return fileSystem.listFiles(convertToDefaultPath(f), recursive); } @@ -524,6 +554,7 @@ public class HoodieWrapperFileSystem extends FileSystem { @Override public boolean mkdirs(Path f) throws IOException { + this.metricsRegistry.increment(MetricName.mkdirs.name()); boolean success = fileSystem.mkdirs(convertToDefaultPath(f)); if (success) { try { diff --git a/hudi-common/src/main/java/org/apache/hudi/common/metrics/Counter.java b/hudi-common/src/main/java/org/apache/hudi/common/metrics/Counter.java new file mode 100644 index 000000000..546956d00 --- /dev/null +++ b/hudi-common/src/main/java/org/apache/hudi/common/metrics/Counter.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.common.metrics; + +import java.util.concurrent.atomic.AtomicLong; + +/** + * Lightweight Counter for Hudi Metrics. + */ +public class Counter implements Metric { + + private final AtomicLong count = new AtomicLong(); + + public void increment() { + this.add(1); + } + + public void add(long n) { + this.count.addAndGet(n); + } + + @Override + public Long getValue() { + return count.get(); + } + +} \ No newline at end of file diff --git a/hudi-common/src/main/java/org/apache/hudi/common/metrics/Metric.java b/hudi-common/src/main/java/org/apache/hudi/common/metrics/Metric.java new file mode 100644 index 000000000..12b42de23 --- /dev/null +++ b/hudi-common/src/main/java/org/apache/hudi/common/metrics/Metric.java @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.common.metrics; + +/** + * Interface for Hudi Metric Types. + */ +public interface Metric { + Long getValue(); +} \ No newline at end of file diff --git a/hudi-common/src/main/java/org/apache/hudi/common/metrics/Registry.java b/hudi-common/src/main/java/org/apache/hudi/common/metrics/Registry.java new file mode 100644 index 000000000..169e8bc90 --- /dev/null +++ b/hudi-common/src/main/java/org/apache/hudi/common/metrics/Registry.java @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.common.metrics; + +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + + +/** + * Lightweight Metrics Registry to track Hudi events. + */ +public class Registry { + ConcurrentHashMap counters = new ConcurrentHashMap<>(); + final String name; + + private static ConcurrentHashMap registryMap = new ConcurrentHashMap<>(); + + private Registry(String name) { + this.name = name; + } + + /** + * Get (or create) the registry for a provided name. + */ + public static synchronized Registry getRegistry(String registryName) { + if (!registryMap.containsKey(registryName)) { + registryMap.put(registryName, new Registry(registryName)); + } + return registryMap.get(registryName); + } + + /** + * Get all registered metrics. + * @param flush clean all metrics as part of this operation. + * @param prefixWithRegistryName prefix each metric name with the registry name. + * @return + */ + public static synchronized Map getAllMetrics(boolean flush, boolean prefixWithRegistryName) { + HashMap allMetrics = new HashMap<>(); + registryMap.forEach((registryName, registry) -> { + allMetrics.putAll(registry.getAllCounts(prefixWithRegistryName)); + if (flush) { + registry.clear(); + } + }); + return allMetrics; + } + + public void clear() { + counters.clear(); + } + + public void increment(String name) { + getCounter(name).increment(); + } + + public void add(String name, long value) { + getCounter(name).add(value); + } + + private synchronized Counter getCounter(String name) { + if (!counters.containsKey(name)) { + counters.put(name, new Counter()); + } + return counters.get(name); + } + + /** + * Get all Counter type metrics. + */ + public Map getAllCounts() { + return getAllCounts(false); + } + + /** + * Get all Counter type metrics. + */ + public Map getAllCounts(boolean prefixWithRegistryName) { + HashMap countersMap = new HashMap<>(); + counters.forEach((k, v) -> { + String key = prefixWithRegistryName ? name + "." + k : k; + countersMap.put(key, v.getValue()); + }); + return countersMap; + } + +} \ No newline at end of file diff --git a/hudi-common/src/test/java/org/apache/hudi/common/TestRegistry.java b/hudi-common/src/test/java/org/apache/hudi/common/TestRegistry.java new file mode 100644 index 000000000..dec708f0a --- /dev/null +++ b/hudi-common/src/test/java/org/apache/hudi/common/TestRegistry.java @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.common; + +import org.apache.hudi.common.metrics.Registry; +import org.junit.jupiter.api.Test; + +import java.util.HashMap; +import java.util.Map; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; + +public class TestRegistry { + + @Test + public void testGetRegistry() throws Exception { + Registry r = Registry.getRegistry("testGetRegistry_1"); + assertEquals(r, Registry.getRegistry("testGetRegistry_1")); + } + + private void registerMetrics(Map map, Registry registry) { + map.forEach((k, v) -> registry.add(k, v)); + } + + @Test + public void testGetAllMetrics() throws Exception { + String registryName = "testGetAllMetrics"; + Registry r = Registry.getRegistry(registryName); + Map countsMap = new HashMap<>(); + + countsMap.put("one", 1L); + registerMetrics(countsMap, r); + Map allMetrics1 = Registry.getAllMetrics(false, true); + assertTrue(allMetrics1.containsKey(registryName + ".one")); + + countsMap.remove("one"); + countsMap.put("two", 2L); + registerMetrics(countsMap, r); + Map allMetrics2 = Registry.getAllMetrics(true, true); + assertTrue(allMetrics2.containsKey(registryName + ".one")); + assertTrue(allMetrics2.containsKey(registryName + ".two")); + + Map allMetrics3 = Registry.getAllMetrics(false, true); + assertTrue(allMetrics3.isEmpty()); + } + + @Test + public void testCounts() throws Exception { + Registry r = Registry.getRegistry("testCounts"); + Map countsMap = new HashMap<>(); + countsMap.put("one", 1L); + countsMap.put("two", 2L); + + registerMetrics(countsMap, r); + assertEquals(countsMap, r.getAllCounts()); + } + +} \ No newline at end of file