[HUDI-1326] Added an API to force publish metrics and flush them. (#2152)
* [HUDI-1326] Added an API to force publish metrics and flush them. Using the added API, publish metrics after each level of the DAG completed in hudi-test-suite. * Code cleanups Co-authored-by: Vinoth Chandar <vinoth@apache.org>
This commit is contained in:
@@ -38,13 +38,15 @@ public class Metrics {
|
|||||||
private static final Logger LOG = LogManager.getLogger(Metrics.class);
|
private static final Logger LOG = LogManager.getLogger(Metrics.class);
|
||||||
|
|
||||||
private static volatile boolean initialized = false;
|
private static volatile boolean initialized = false;
|
||||||
private static Metrics metrics = null;
|
private static Metrics instance = null;
|
||||||
|
|
||||||
private final MetricRegistry registry;
|
private final MetricRegistry registry;
|
||||||
private MetricsReporter reporter;
|
private MetricsReporter reporter;
|
||||||
|
private final String commonMetricPrefix;
|
||||||
|
|
||||||
private Metrics(HoodieWriteConfig metricConfig) {
|
private Metrics(HoodieWriteConfig metricConfig) {
|
||||||
registry = new MetricRegistry();
|
registry = new MetricRegistry();
|
||||||
|
commonMetricPrefix = metricConfig.getTableName();
|
||||||
reporter = MetricsReporterFactory.createReporter(metricConfig, registry);
|
reporter = MetricsReporterFactory.createReporter(metricConfig, registry);
|
||||||
if (reporter == null) {
|
if (reporter == null) {
|
||||||
throw new RuntimeException("Cannot initialize Reporter.");
|
throw new RuntimeException("Cannot initialize Reporter.");
|
||||||
@@ -68,13 +70,24 @@ public class Metrics {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private void reportAndFlushMetrics() {
|
||||||
|
try {
|
||||||
|
LOG.info("Reporting and flushing all metrics");
|
||||||
|
this.registerHoodieCommonMetrics();
|
||||||
|
this.reporter.report();
|
||||||
|
this.registry.getNames().forEach(this.registry::remove);
|
||||||
|
} catch (Exception e) {
|
||||||
|
LOG.error("Error while reporting and flushing metrics", e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
private void registerHoodieCommonMetrics() {
|
private void registerHoodieCommonMetrics() {
|
||||||
registerGauges(Registry.getAllMetrics(true, true), Option.empty());
|
registerGauges(Registry.getAllMetrics(true, true), Option.of(commonMetricPrefix));
|
||||||
}
|
}
|
||||||
|
|
||||||
public static Metrics getInstance() {
|
public static Metrics getInstance() {
|
||||||
assert initialized;
|
assert initialized;
|
||||||
return metrics;
|
return instance;
|
||||||
}
|
}
|
||||||
|
|
||||||
public static synchronized void init(HoodieWriteConfig metricConfig) {
|
public static synchronized void init(HoodieWriteConfig metricConfig) {
|
||||||
@@ -82,7 +95,7 @@ public class Metrics {
|
|||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
try {
|
try {
|
||||||
metrics = new Metrics(metricConfig);
|
instance = new Metrics(metricConfig);
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
throw new HoodieException(e);
|
throw new HoodieException(e);
|
||||||
}
|
}
|
||||||
@@ -93,10 +106,17 @@ public class Metrics {
|
|||||||
if (!initialized) {
|
if (!initialized) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
metrics.reportAndCloseReporter();
|
instance.reportAndCloseReporter();
|
||||||
initialized = false;
|
initialized = false;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public static synchronized void flush() {
|
||||||
|
if (!Metrics.initialized) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
instance.reportAndFlushMetrics();
|
||||||
|
}
|
||||||
|
|
||||||
public static void registerGauges(Map<String, Long> metricsMap, Option<String> prefix) {
|
public static void registerGauges(Map<String, Long> metricsMap, Option<String> prefix) {
|
||||||
String metricPrefix = prefix.isPresent() ? prefix.get() + "." : "";
|
String metricPrefix = prefix.isPresent() ? prefix.get() + "." : "";
|
||||||
metricsMap.forEach((k, v) -> registerGauge(metricPrefix + k, v));
|
metricsMap.forEach((k, v) -> registerGauge(metricPrefix + k, v));
|
||||||
|
|||||||
@@ -30,6 +30,7 @@ import java.util.concurrent.Future;
|
|||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
import org.apache.hudi.exception.HoodieException;
|
import org.apache.hudi.exception.HoodieException;
|
||||||
import org.apache.hudi.integ.testsuite.dag.nodes.DagNode;
|
import org.apache.hudi.integ.testsuite.dag.nodes.DagNode;
|
||||||
|
import org.apache.hudi.metrics.Metrics;
|
||||||
import org.apache.hudi.integ.testsuite.dag.ExecutionContext;
|
import org.apache.hudi.integ.testsuite.dag.ExecutionContext;
|
||||||
import org.apache.hudi.integ.testsuite.dag.WorkflowDag;
|
import org.apache.hudi.integ.testsuite.dag.WorkflowDag;
|
||||||
import org.apache.hudi.integ.testsuite.dag.WriterContext;
|
import org.apache.hudi.integ.testsuite.dag.WriterContext;
|
||||||
@@ -95,6 +96,9 @@ public class DagScheduler {
|
|||||||
for (Future future : futures) {
|
for (Future future : futures) {
|
||||||
future.get(1, TimeUnit.HOURS);
|
future.get(1, TimeUnit.HOURS);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// After each level, report and flush the metrics
|
||||||
|
Metrics.flush();
|
||||||
} while (queue.size() > 0);
|
} while (queue.size() > 0);
|
||||||
log.info("Finished workloads");
|
log.info("Finished workloads");
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user