[HUDI-668] Added additional unit-tests for HUDI metrics. (#1380)
This commit is contained in:
@@ -21,6 +21,7 @@ package org.apache.hudi.metrics;
|
||||
import org.apache.hudi.config.HoodieMetricsConfig;
|
||||
import org.apache.hudi.config.HoodieWriteConfig;
|
||||
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
|
||||
import static org.apache.hudi.metrics.Metrics.registerGauge;
|
||||
@@ -31,9 +32,9 @@ import static org.mockito.Mockito.when;
|
||||
/**
|
||||
* Test for the Jmx metrics report.
|
||||
*/
|
||||
public class TestHoodieJmxMetrics extends TestHoodieMetrics {
|
||||
public class TestHoodieJmxMetrics {
|
||||
|
||||
@Override
|
||||
@Before
|
||||
public void start() {
|
||||
HoodieWriteConfig config = mock(HoodieWriteConfig.class);
|
||||
when(config.isMetricsOn()).thenReturn(true);
|
||||
|
||||
117
hudi-client/src/test/java/org/apache/hudi/metrics/TestHoodieMetrics.java
Normal file → Executable file
117
hudi-client/src/test/java/org/apache/hudi/metrics/TestHoodieMetrics.java
Normal file → Executable file
@@ -18,24 +18,32 @@
|
||||
|
||||
package org.apache.hudi.metrics;
|
||||
|
||||
import org.apache.hudi.common.model.HoodieCommitMetadata;
|
||||
import org.apache.hudi.config.HoodieWriteConfig;
|
||||
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
|
||||
import com.codahale.metrics.Timer;
|
||||
|
||||
import java.util.Arrays;
|
||||
import java.util.Random;
|
||||
|
||||
import static org.apache.hudi.metrics.Metrics.registerGauge;
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
import static org.mockito.Mockito.mock;
|
||||
import static org.mockito.Mockito.when;
|
||||
|
||||
public class TestHoodieMetrics {
|
||||
private HoodieMetrics metrics;
|
||||
|
||||
@Before
|
||||
public void start() {
|
||||
HoodieWriteConfig config = mock(HoodieWriteConfig.class);
|
||||
when(config.isMetricsOn()).thenReturn(true);
|
||||
when(config.getMetricsReporterType()).thenReturn(MetricsReporterType.INMEMORY);
|
||||
new HoodieMetrics(config, "raw_table");
|
||||
metrics = new HoodieMetrics(config, "raw_table");
|
||||
}
|
||||
|
||||
@Test
|
||||
@@ -43,4 +51,111 @@ public class TestHoodieMetrics {
|
||||
registerGauge("metric1", 123L);
|
||||
assertEquals("123", Metrics.getInstance().getRegistry().getGauges().get("metric1").getValue().toString());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testTimerCtx() throws InterruptedException {
|
||||
Random rand = new Random();
|
||||
|
||||
// Index metrics
|
||||
Timer.Context timer = metrics.getIndexCtx();
|
||||
Thread.sleep(5); // Ensure timer duration is > 0
|
||||
metrics.updateIndexMetrics("some_action", metrics.getDurationInMs(timer.stop()));
|
||||
String metricName = metrics.getMetricsName("index", "some_action.duration");
|
||||
long msec = (Long)Metrics.getInstance().getRegistry().getGauges().get(metricName).getValue();
|
||||
assertTrue(msec > 0);
|
||||
|
||||
// Rollback metrics
|
||||
timer = metrics.getRollbackCtx();
|
||||
Thread.sleep(5); // Ensure timer duration is > 0
|
||||
long numFilesDeleted = 1 + rand.nextInt();
|
||||
metrics.updateRollbackMetrics(metrics.getDurationInMs(timer.stop()), numFilesDeleted);
|
||||
metricName = metrics.getMetricsName("rollback", "duration");
|
||||
msec = (Long)Metrics.getInstance().getRegistry().getGauges().get(metricName).getValue();
|
||||
assertTrue(msec > 0);
|
||||
metricName = metrics.getMetricsName("rollback", "numFilesDeleted");
|
||||
assertEquals((long)Metrics.getInstance().getRegistry().getGauges().get(metricName).getValue(), numFilesDeleted);
|
||||
|
||||
// Clean metrics
|
||||
timer = metrics.getRollbackCtx();
|
||||
Thread.sleep(5); // Ensure timer duration is > 0
|
||||
numFilesDeleted = 1 + rand.nextInt();
|
||||
metrics.updateCleanMetrics(metrics.getDurationInMs(timer.stop()), (int)numFilesDeleted);
|
||||
metricName = metrics.getMetricsName("clean", "duration");
|
||||
msec = (Long)Metrics.getInstance().getRegistry().getGauges().get(metricName).getValue();
|
||||
assertTrue(msec > 0);
|
||||
metricName = metrics.getMetricsName("clean", "numFilesDeleted");
|
||||
assertEquals((long)Metrics.getInstance().getRegistry().getGauges().get(metricName).getValue(), numFilesDeleted);
|
||||
|
||||
// Finalize metrics
|
||||
timer = metrics.getFinalizeCtx();
|
||||
Thread.sleep(5); // Ensure timer duration is > 0
|
||||
long numFilesFinalized = 1 + rand.nextInt();
|
||||
metrics.updateFinalizeWriteMetrics(metrics.getDurationInMs(timer.stop()), (int)numFilesFinalized);
|
||||
metricName = metrics.getMetricsName("finalize", "duration");
|
||||
msec = (Long)Metrics.getInstance().getRegistry().getGauges().get(metricName).getValue();
|
||||
assertTrue(msec > 0);
|
||||
metricName = metrics.getMetricsName("finalize", "numFilesFinalized");
|
||||
assertEquals((long)Metrics.getInstance().getRegistry().getGauges().get(metricName).getValue(), numFilesFinalized);
|
||||
|
||||
// Commit / deltacommit / compaction metrics
|
||||
Arrays.asList("commit", "deltacommit", "compaction").stream().forEach(action -> {
|
||||
Timer.Context commitTimer = action.equals("commit") ? metrics.getCommitCtx() :
|
||||
action.equals("deltacommit") ? metrics.getDeltaCommitCtx() : metrics.getCompactionCtx();
|
||||
|
||||
try {
|
||||
// Ensure timer duration is > 0
|
||||
Thread.sleep(5);
|
||||
} catch (InterruptedException e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
long randomValue = 1 + rand.nextInt();
|
||||
HoodieCommitMetadata metadata = mock(HoodieCommitMetadata.class);
|
||||
when(metadata.fetchTotalPartitionsWritten()).thenReturn(randomValue + 1);
|
||||
when(metadata.fetchTotalFilesInsert()).thenReturn(randomValue + 2);
|
||||
when(metadata.fetchTotalFilesUpdated()).thenReturn(randomValue + 3);
|
||||
when(metadata.fetchTotalRecordsWritten()).thenReturn(randomValue + 4);
|
||||
when(metadata.fetchTotalUpdateRecordsWritten()).thenReturn(randomValue + 5);
|
||||
when(metadata.fetchTotalInsertRecordsWritten()).thenReturn(randomValue + 6);
|
||||
when(metadata.fetchTotalBytesWritten()).thenReturn(randomValue + 7);
|
||||
when(metadata.getTotalScanTime()).thenReturn(randomValue + 8);
|
||||
when(metadata.getTotalCreateTime()).thenReturn(randomValue + 9);
|
||||
when(metadata.getTotalUpsertTime()).thenReturn(randomValue + 10);
|
||||
when(metadata.getTotalCompactedRecordsUpdated()).thenReturn(randomValue + 11);
|
||||
when(metadata.getTotalLogFilesCompacted()).thenReturn(randomValue + 12);
|
||||
when(metadata.getTotalLogFilesSize()).thenReturn(randomValue + 13);
|
||||
metrics.updateCommitMetrics(randomValue + 14, commitTimer.stop(), metadata, action);
|
||||
|
||||
String metricname = metrics.getMetricsName(action, "duration");
|
||||
long duration = (Long)Metrics.getInstance().getRegistry().getGauges().get(metricname).getValue();
|
||||
assertTrue(duration > 0);
|
||||
metricname = metrics.getMetricsName(action, "totalPartitionsWritten");
|
||||
assertEquals((long)Metrics.getInstance().getRegistry().getGauges().get(metricname).getValue(), metadata.fetchTotalPartitionsWritten());
|
||||
metricname = metrics.getMetricsName(action, "totalFilesInsert");
|
||||
assertEquals((long)Metrics.getInstance().getRegistry().getGauges().get(metricname).getValue(), metadata.fetchTotalFilesInsert());
|
||||
metricname = metrics.getMetricsName(action, "totalFilesUpdate");
|
||||
assertEquals((long)Metrics.getInstance().getRegistry().getGauges().get(metricname).getValue(), metadata.fetchTotalFilesUpdated());
|
||||
metricname = metrics.getMetricsName(action, "totalRecordsWritten");
|
||||
assertEquals((long)Metrics.getInstance().getRegistry().getGauges().get(metricname).getValue(), metadata.fetchTotalRecordsWritten());
|
||||
metricname = metrics.getMetricsName(action, "totalUpdateRecordsWritten");
|
||||
assertEquals((long)Metrics.getInstance().getRegistry().getGauges().get(metricname).getValue(), metadata.fetchTotalUpdateRecordsWritten());
|
||||
metricname = metrics.getMetricsName(action, "totalInsertRecordsWritten");
|
||||
assertEquals((long)Metrics.getInstance().getRegistry().getGauges().get(metricname).getValue(), metadata.fetchTotalInsertRecordsWritten());
|
||||
metricname = metrics.getMetricsName(action, "totalBytesWritten");
|
||||
assertEquals((long)Metrics.getInstance().getRegistry().getGauges().get(metricname).getValue(), metadata.fetchTotalBytesWritten());
|
||||
metricname = metrics.getMetricsName(action, "commitTime");
|
||||
assertEquals((long)Metrics.getInstance().getRegistry().getGauges().get(metricname).getValue(), randomValue + 14);
|
||||
metricname = metrics.getMetricsName(action, "totalScanTime");
|
||||
assertEquals(Metrics.getInstance().getRegistry().getGauges().get(metricname).getValue(), metadata.getTotalScanTime());
|
||||
metricname = metrics.getMetricsName(action, "totalCreateTime");
|
||||
assertEquals(Metrics.getInstance().getRegistry().getGauges().get(metricname).getValue(), metadata.getTotalCreateTime());
|
||||
metricname = metrics.getMetricsName(action, "totalUpsertTime");
|
||||
assertEquals(Metrics.getInstance().getRegistry().getGauges().get(metricname).getValue(), metadata.getTotalUpsertTime());
|
||||
metricname = metrics.getMetricsName(action, "totalCompactedRecordsUpdated");
|
||||
assertEquals(Metrics.getInstance().getRegistry().getGauges().get(metricname).getValue(), metadata.getTotalCompactedRecordsUpdated());
|
||||
metricname = metrics.getMetricsName(action, "totalLogFilesCompacted");
|
||||
assertEquals(Metrics.getInstance().getRegistry().getGauges().get(metricname).getValue(), metadata.getTotalLogFilesCompacted());
|
||||
metricname = metrics.getMetricsName(action, "totalLogFilesSize");
|
||||
assertEquals(Metrics.getInstance().getRegistry().getGauges().get(metricname).getValue(), metadata.getTotalLogFilesSize());
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user