From 77d5b92d88d6583bdfc09e4c10ecfe7ddbb04806 Mon Sep 17 00:00:00 2001 From: Prashant Wason Date: Mon, 9 Mar 2020 20:15:42 -0700 Subject: [PATCH] [HUDI-668] Added additional unit-tests for HUDI metrics. (#1380) --- .../hudi/metrics/TestHoodieJmxMetrics.java | 5 +- .../hudi/metrics/TestHoodieMetrics.java | 117 +++++++++++++++++- 2 files changed, 119 insertions(+), 3 deletions(-) mode change 100644 => 100755 hudi-client/src/test/java/org/apache/hudi/metrics/TestHoodieMetrics.java diff --git a/hudi-client/src/test/java/org/apache/hudi/metrics/TestHoodieJmxMetrics.java b/hudi-client/src/test/java/org/apache/hudi/metrics/TestHoodieJmxMetrics.java index c1e3d6122..72b218b14 100644 --- a/hudi-client/src/test/java/org/apache/hudi/metrics/TestHoodieJmxMetrics.java +++ b/hudi-client/src/test/java/org/apache/hudi/metrics/TestHoodieJmxMetrics.java @@ -21,6 +21,7 @@ package org.apache.hudi.metrics; import org.apache.hudi.config.HoodieMetricsConfig; import org.apache.hudi.config.HoodieWriteConfig; +import org.junit.Before; import org.junit.Test; import static org.apache.hudi.metrics.Metrics.registerGauge; @@ -31,9 +32,9 @@ import static org.mockito.Mockito.when; /** * Test for the Jmx metrics report. */ -public class TestHoodieJmxMetrics extends TestHoodieMetrics { +public class TestHoodieJmxMetrics { - @Override + @Before public void start() { HoodieWriteConfig config = mock(HoodieWriteConfig.class); when(config.isMetricsOn()).thenReturn(true); diff --git a/hudi-client/src/test/java/org/apache/hudi/metrics/TestHoodieMetrics.java b/hudi-client/src/test/java/org/apache/hudi/metrics/TestHoodieMetrics.java old mode 100644 new mode 100755 index c71092d21..d52bf8d8b --- a/hudi-client/src/test/java/org/apache/hudi/metrics/TestHoodieMetrics.java +++ b/hudi-client/src/test/java/org/apache/hudi/metrics/TestHoodieMetrics.java @@ -18,24 +18,32 @@ package org.apache.hudi.metrics; +import org.apache.hudi.common.model.HoodieCommitMetadata; import org.apache.hudi.config.HoodieWriteConfig; import org.junit.Before; import org.junit.Test; +import com.codahale.metrics.Timer; + +import java.util.Arrays; +import java.util.Random; + import static org.apache.hudi.metrics.Metrics.registerGauge; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; public class TestHoodieMetrics { + private HoodieMetrics metrics; @Before public void start() { HoodieWriteConfig config = mock(HoodieWriteConfig.class); when(config.isMetricsOn()).thenReturn(true); when(config.getMetricsReporterType()).thenReturn(MetricsReporterType.INMEMORY); - new HoodieMetrics(config, "raw_table"); + metrics = new HoodieMetrics(config, "raw_table"); } @Test @@ -43,4 +51,111 @@ public class TestHoodieMetrics { registerGauge("metric1", 123L); assertEquals("123", Metrics.getInstance().getRegistry().getGauges().get("metric1").getValue().toString()); } + + @Test + public void testTimerCtx() throws InterruptedException { + Random rand = new Random(); + + // Index metrics + Timer.Context timer = metrics.getIndexCtx(); + Thread.sleep(5); // Ensure timer duration is > 0 + metrics.updateIndexMetrics("some_action", metrics.getDurationInMs(timer.stop())); + String metricName = metrics.getMetricsName("index", "some_action.duration"); + long msec = (Long)Metrics.getInstance().getRegistry().getGauges().get(metricName).getValue(); + assertTrue(msec > 0); + + // Rollback metrics + timer = metrics.getRollbackCtx(); + Thread.sleep(5); // Ensure timer duration is > 0 + long numFilesDeleted = 1 + rand.nextInt(); + metrics.updateRollbackMetrics(metrics.getDurationInMs(timer.stop()), numFilesDeleted); + metricName = metrics.getMetricsName("rollback", "duration"); + msec = (Long)Metrics.getInstance().getRegistry().getGauges().get(metricName).getValue(); + assertTrue(msec > 0); + metricName = metrics.getMetricsName("rollback", "numFilesDeleted"); + assertEquals((long)Metrics.getInstance().getRegistry().getGauges().get(metricName).getValue(), numFilesDeleted); + + // Clean metrics + timer = metrics.getRollbackCtx(); + Thread.sleep(5); // Ensure timer duration is > 0 + numFilesDeleted = 1 + rand.nextInt(); + metrics.updateCleanMetrics(metrics.getDurationInMs(timer.stop()), (int)numFilesDeleted); + metricName = metrics.getMetricsName("clean", "duration"); + msec = (Long)Metrics.getInstance().getRegistry().getGauges().get(metricName).getValue(); + assertTrue(msec > 0); + metricName = metrics.getMetricsName("clean", "numFilesDeleted"); + assertEquals((long)Metrics.getInstance().getRegistry().getGauges().get(metricName).getValue(), numFilesDeleted); + + // Finalize metrics + timer = metrics.getFinalizeCtx(); + Thread.sleep(5); // Ensure timer duration is > 0 + long numFilesFinalized = 1 + rand.nextInt(); + metrics.updateFinalizeWriteMetrics(metrics.getDurationInMs(timer.stop()), (int)numFilesFinalized); + metricName = metrics.getMetricsName("finalize", "duration"); + msec = (Long)Metrics.getInstance().getRegistry().getGauges().get(metricName).getValue(); + assertTrue(msec > 0); + metricName = metrics.getMetricsName("finalize", "numFilesFinalized"); + assertEquals((long)Metrics.getInstance().getRegistry().getGauges().get(metricName).getValue(), numFilesFinalized); + + // Commit / deltacommit / compaction metrics + Arrays.asList("commit", "deltacommit", "compaction").stream().forEach(action -> { + Timer.Context commitTimer = action.equals("commit") ? metrics.getCommitCtx() : + action.equals("deltacommit") ? metrics.getDeltaCommitCtx() : metrics.getCompactionCtx(); + + try { + // Ensure timer duration is > 0 + Thread.sleep(5); + } catch (InterruptedException e) { + e.printStackTrace(); + } + long randomValue = 1 + rand.nextInt(); + HoodieCommitMetadata metadata = mock(HoodieCommitMetadata.class); + when(metadata.fetchTotalPartitionsWritten()).thenReturn(randomValue + 1); + when(metadata.fetchTotalFilesInsert()).thenReturn(randomValue + 2); + when(metadata.fetchTotalFilesUpdated()).thenReturn(randomValue + 3); + when(metadata.fetchTotalRecordsWritten()).thenReturn(randomValue + 4); + when(metadata.fetchTotalUpdateRecordsWritten()).thenReturn(randomValue + 5); + when(metadata.fetchTotalInsertRecordsWritten()).thenReturn(randomValue + 6); + when(metadata.fetchTotalBytesWritten()).thenReturn(randomValue + 7); + when(metadata.getTotalScanTime()).thenReturn(randomValue + 8); + when(metadata.getTotalCreateTime()).thenReturn(randomValue + 9); + when(metadata.getTotalUpsertTime()).thenReturn(randomValue + 10); + when(metadata.getTotalCompactedRecordsUpdated()).thenReturn(randomValue + 11); + when(metadata.getTotalLogFilesCompacted()).thenReturn(randomValue + 12); + when(metadata.getTotalLogFilesSize()).thenReturn(randomValue + 13); + metrics.updateCommitMetrics(randomValue + 14, commitTimer.stop(), metadata, action); + + String metricname = metrics.getMetricsName(action, "duration"); + long duration = (Long)Metrics.getInstance().getRegistry().getGauges().get(metricname).getValue(); + assertTrue(duration > 0); + metricname = metrics.getMetricsName(action, "totalPartitionsWritten"); + assertEquals((long)Metrics.getInstance().getRegistry().getGauges().get(metricname).getValue(), metadata.fetchTotalPartitionsWritten()); + metricname = metrics.getMetricsName(action, "totalFilesInsert"); + assertEquals((long)Metrics.getInstance().getRegistry().getGauges().get(metricname).getValue(), metadata.fetchTotalFilesInsert()); + metricname = metrics.getMetricsName(action, "totalFilesUpdate"); + assertEquals((long)Metrics.getInstance().getRegistry().getGauges().get(metricname).getValue(), metadata.fetchTotalFilesUpdated()); + metricname = metrics.getMetricsName(action, "totalRecordsWritten"); + assertEquals((long)Metrics.getInstance().getRegistry().getGauges().get(metricname).getValue(), metadata.fetchTotalRecordsWritten()); + metricname = metrics.getMetricsName(action, "totalUpdateRecordsWritten"); + assertEquals((long)Metrics.getInstance().getRegistry().getGauges().get(metricname).getValue(), metadata.fetchTotalUpdateRecordsWritten()); + metricname = metrics.getMetricsName(action, "totalInsertRecordsWritten"); + assertEquals((long)Metrics.getInstance().getRegistry().getGauges().get(metricname).getValue(), metadata.fetchTotalInsertRecordsWritten()); + metricname = metrics.getMetricsName(action, "totalBytesWritten"); + assertEquals((long)Metrics.getInstance().getRegistry().getGauges().get(metricname).getValue(), metadata.fetchTotalBytesWritten()); + metricname = metrics.getMetricsName(action, "commitTime"); + assertEquals((long)Metrics.getInstance().getRegistry().getGauges().get(metricname).getValue(), randomValue + 14); + metricname = metrics.getMetricsName(action, "totalScanTime"); + assertEquals(Metrics.getInstance().getRegistry().getGauges().get(metricname).getValue(), metadata.getTotalScanTime()); + metricname = metrics.getMetricsName(action, "totalCreateTime"); + assertEquals(Metrics.getInstance().getRegistry().getGauges().get(metricname).getValue(), metadata.getTotalCreateTime()); + metricname = metrics.getMetricsName(action, "totalUpsertTime"); + assertEquals(Metrics.getInstance().getRegistry().getGauges().get(metricname).getValue(), metadata.getTotalUpsertTime()); + metricname = metrics.getMetricsName(action, "totalCompactedRecordsUpdated"); + assertEquals(Metrics.getInstance().getRegistry().getGauges().get(metricname).getValue(), metadata.getTotalCompactedRecordsUpdated()); + metricname = metrics.getMetricsName(action, "totalLogFilesCompacted"); + assertEquals(Metrics.getInstance().getRegistry().getGauges().get(metricname).getValue(), metadata.getTotalLogFilesCompacted()); + metricname = metrics.getMetricsName(action, "totalLogFilesSize"); + assertEquals(Metrics.getInstance().getRegistry().getGauges().get(metricname).getValue(), metadata.getTotalLogFilesSize()); + }); + } }