From f5f34bb1c16e6d070668486eba2a29f554c0bbc7 Mon Sep 17 00:00:00 2001 From: Ramachandran Madtas Subramaniam Date: Wed, 11 Mar 2020 12:58:31 -0700 Subject: [PATCH] [HUDI-568] Improve unit test coverage Classes improved: * HoodieTableMetaClient * RocksDBDAO * HoodieRealtimeFileSplit --- .../common/util/collection/RocksDBDAO.java | 3 - .../table/TestHoodieTableMetaClient.java | 19 ++ .../util/collection/TestRocksDBManager.java | 168 +++++++++++++++--- hudi-hadoop-mr/pom.xml | 6 + .../realtime/TestHoodieRealtimeFileSplit.java | 162 +++++++++++++++++ 5 files changed, 335 insertions(+), 23 deletions(-) create mode 100644 hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/realtime/TestHoodieRealtimeFileSplit.java diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/collection/RocksDBDAO.java b/hudi-common/src/main/java/org/apache/hudi/common/util/collection/RocksDBDAO.java index 84b4953ab..3c08460f2 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/util/collection/RocksDBDAO.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/util/collection/RocksDBDAO.java @@ -75,9 +75,6 @@ public class RocksDBDAO { * Create RocksDB if not initialized. */ private RocksDB getRocksDB() { - if (null == rocksDB) { - init(); - } return rocksDB; } diff --git a/hudi-common/src/test/java/org/apache/hudi/common/table/TestHoodieTableMetaClient.java b/hudi-common/src/test/java/org/apache/hudi/common/table/TestHoodieTableMetaClient.java index ae376b4fd..e1279d1f9 100644 --- a/hudi-common/src/test/java/org/apache/hudi/common/table/TestHoodieTableMetaClient.java +++ b/hudi-common/src/test/java/org/apache/hudi/common/table/TestHoodieTableMetaClient.java @@ -33,6 +33,7 @@ import java.io.IOException; import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotEquals; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; @@ -94,4 +95,22 @@ public class TestHoodieTableMetaClient extends HoodieCommonTestHarness { assertArrayEquals("Commit value should be \"test-detail\"", "test-detail".getBytes(), activeCommitTimeline.getInstantDetails(completedInstant).get()); } + + @Test + public void testEquals() throws IOException { + HoodieTableMetaClient metaClient1 = HoodieTestUtils.init(folder.getRoot().getAbsolutePath(), getTableType()); + HoodieTableMetaClient metaClient2 = HoodieTestUtils.init(folder.getRoot().getAbsolutePath(), getTableType()); + assertEquals(metaClient1, metaClient1); + assertEquals(metaClient1, metaClient2); + assertNotEquals(metaClient1, null); + assertNotEquals(metaClient1, new Object()); + } + + @Test + public void testToString() throws IOException { + HoodieTableMetaClient metaClient1 = HoodieTestUtils.init(folder.getRoot().getAbsolutePath(), getTableType()); + HoodieTableMetaClient metaClient2 = HoodieTestUtils.init(folder.getRoot().getAbsolutePath(), getTableType()); + assertEquals(metaClient1.toString(), metaClient2.toString()); + assertNotEquals(metaClient1.toString(), new Object().toString()); + } } diff --git a/hudi-common/src/test/java/org/apache/hudi/common/util/collection/TestRocksDBManager.java b/hudi-common/src/test/java/org/apache/hudi/common/util/collection/TestRocksDBManager.java index f024a0b00..5f30fa15b 100644 --- a/hudi-common/src/test/java/org/apache/hudi/common/util/collection/TestRocksDBManager.java +++ b/hudi-common/src/test/java/org/apache/hudi/common/util/collection/TestRocksDBManager.java @@ -20,15 +20,17 @@ package org.apache.hudi.common.util.collection; import org.apache.hudi.common.table.view.FileSystemViewStorageConfig; -import org.junit.AfterClass; +import org.junit.After; import org.junit.Assert; -import org.junit.BeforeClass; +import org.junit.Before; import org.junit.Test; import java.io.File; import java.io.Serializable; +import java.util.ArrayList; import java.util.Arrays; import java.util.HashMap; +import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Objects; @@ -41,16 +43,16 @@ import java.util.stream.IntStream; */ public class TestRocksDBManager { - private static RocksDBDAO dbManager; + private RocksDBDAO dbManager; - @BeforeClass - public static void setUpClass() { - dbManager = new RocksDBDAO("/dummy/path", + @Before + public void setUpClass() { + dbManager = new RocksDBDAO("/dummy/path/" + UUID.randomUUID().toString(), FileSystemViewStorageConfig.newBuilder().build().newBuilder().build().getRocksdbBasePath()); } - @AfterClass - public static void tearDownClass() { + @After + public void tearDownClass() { if (dbManager != null) { dbManager.close(); dbManager = null; @@ -68,14 +70,17 @@ public class TestRocksDBManager { String family2 = "family2"; List colFamilies = Arrays.asList(family1, family2); - List payloads = IntStream.range(0, 100).mapToObj(index -> { + final List> payloads = new ArrayList<>(); + IntStream.range(0, 100).forEach(index -> { String prefix = prefixes.get(index % 4); String key = prefix + UUID.randomUUID().toString(); String family = colFamilies.get(index % 2); String val = "VALUE_" + UUID.randomUUID().toString(); - return new Payload(prefix, key, val, family); - }).collect(Collectors.toList()); + payloads.add(new Payload(prefix, key, val, family)); + }); + colFamilies.forEach(family -> dbManager.dropColumnFamily(family)); + colFamilies.forEach(family -> dbManager.addColumnFamily(family)); colFamilies.forEach(family -> dbManager.dropColumnFamily(family)); colFamilies.forEach(family -> dbManager.addColumnFamily(family)); @@ -103,21 +108,114 @@ public class TestRocksDBManager { expCount == null ? 0L : expCount.longValue(), gotPayloads.size()); gotPayloads.forEach(p -> { Assert.assertEquals(p.getRight().getFamily(), family); - Assert.assertTrue(p.getRight().getKey().startsWith(prefix)); + Assert.assertTrue(p.getRight().getKey().toString().startsWith(prefix)); }); }); }); - payloads.forEach(payload -> { + payloads.stream().filter(p -> !p.getPrefix().equalsIgnoreCase(prefix1)).forEach(payload -> { Payload p = dbManager.get(payload.getFamily(), payload.getKey()); Assert.assertEquals("Retrieved correct payload for key :" + payload.getKey(), payload, p); - // Now, delete the key dbManager.delete(payload.getFamily(), payload.getKey()); - // Now retrieve Payload p2 = dbManager.get(payload.getFamily(), payload.getKey()); - Assert.assertNull("Retrieved correct payload for key :" + p.getKey(), p2); + Assert.assertNull("Retrieved correct payload for key :" + payload.getKey(), p2); + }); + + colFamilies.forEach(family -> { + dbManager.prefixDelete(family, prefix1); + + int got = dbManager.prefixSearch(family, prefix1).collect(Collectors.toList()).size(); + Assert.assertEquals("Expected prefix delete to leave at least one item for family: " + family, countsMap.get(family).get(prefix1) == null ? 0 : 1, got); + }); + + payloads.stream().filter(p -> !p.getPrefix().equalsIgnoreCase(prefix1)).forEach(payload -> { + Payload p2 = dbManager.get(payload.getFamily(), payload.getKey()); + Assert.assertNull("Retrieved correct payload for key :" + payload.getKey(), p2); + }); + + // Now do a prefix search + colFamilies.forEach(family -> { + prefixes.stream().filter(p -> !p.equalsIgnoreCase(prefix1)).forEach(prefix -> { + List> gotPayloads = + dbManager.prefixSearch(family, prefix).collect(Collectors.toList()); + Assert.assertEquals("Size check for prefix (" + prefix + ") and family (" + family + ")", 0, + gotPayloads.size()); + }); + }); + + String rocksDBBasePath = dbManager.getRocksDBBasePath(); + dbManager.close(); + Assert.assertFalse(new File(rocksDBBasePath).exists()); + } + + @Test + public void testWithSerializableKey() { + String prefix1 = "prefix1_"; + String prefix2 = "prefix2_"; + String prefix3 = "prefix3_"; + String prefix4 = "prefix4_"; + List prefixes = Arrays.asList(prefix1, prefix2, prefix3, prefix4); + String family1 = "family1"; + String family2 = "family2"; + List colFamilies = Arrays.asList(family1, family2); + + final List> payloads = new ArrayList<>(); + IntStream.range(0, 100).forEach(index -> { + String prefix = prefixes.get(index % 4); + String key = prefix + UUID.randomUUID().toString(); + String family = colFamilies.get(index % 2); + String val = "VALUE_" + UUID.randomUUID().toString(); + payloads.add(new Payload(prefix, new PayloadKey((key)), val, family)); + }); + + colFamilies.forEach(family -> dbManager.dropColumnFamily(family)); + colFamilies.forEach(family -> dbManager.addColumnFamily(family)); + + Map> countsMap = new HashMap<>(); + dbManager.writeBatch(batch -> { + payloads.forEach(payload -> { + dbManager.putInBatch(batch, payload.getFamily(), payload.getKey(), payload); + + if (!countsMap.containsKey(payload.family)) { + countsMap.put(payload.family, new HashMap<>()); + } + Map c = countsMap.get(payload.family); + if (!c.containsKey(payload.prefix)) { + c.put(payload.prefix, 0); + } + int currCount = c.get(payload.prefix); + c.put(payload.prefix, currCount + 1); + }); + }); + + Iterator>> payloadSplits = payloads.stream() + .collect(Collectors.partitioningBy(s -> payloads.indexOf(s) > payloads.size() / 2)).values() + .iterator(); + + payloads.forEach(payload -> { + Payload p = dbManager.get(payload.getFamily(), payload.getKey()); + Assert.assertEquals("Retrieved correct payload for key :" + payload.getKey(), payload, p); + }); + + payloadSplits.next().forEach(payload -> { + dbManager.delete(payload.getFamily(), payload.getKey()); + Payload want = dbManager.get(payload.getFamily(), payload.getKey()); + Assert.assertNull("Verify deleted during single delete for key :" + payload.getKey(), want); + }); + + dbManager.writeBatch(batch -> { + payloadSplits.next().forEach(payload -> { + dbManager.deleteInBatch(batch, payload.getFamily(), payload.getKey()); + Payload want = dbManager.get(payload.getFamily(), payload.getKey()); + Assert.assertEquals("Verify not deleted during batch delete in progress for key :" + payload.getKey(), payload, want); + }); + }); + + payloads.forEach(payload -> { + Payload want = dbManager.get(payload.getFamily(), payload.getKey()); + Assert.assertNull("Verify delete for key :" + payload.getKey(), want); }); // Now do a prefix search @@ -135,17 +233,47 @@ public class TestRocksDBManager { Assert.assertFalse(new File(rocksDBBasePath).exists()); } + public static class PayloadKey implements Serializable { + private String key; + + public PayloadKey(String key) { + this.key = key; + } + + @Override + public String toString() { + return key; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + PayloadKey that = (PayloadKey) o; + return Objects.equals(key, that.key); + } + + @Override + public int hashCode() { + return Objects.hash(key); + } + } + /** * A payload definition for {@link TestRocksDBManager}. */ - public static class Payload implements Serializable { + public static class Payload implements Serializable { private final String prefix; - private final String key; + private final T key; private final String val; private final String family; - public Payload(String prefix, String key, String val, String family) { + public Payload(String prefix, T key, String val, String family) { this.prefix = prefix; this.key = key; this.val = val; @@ -156,7 +284,7 @@ public class TestRocksDBManager { return prefix; } - public String getKey() { + public T getKey() { return key; } diff --git a/hudi-hadoop-mr/pom.xml b/hudi-hadoop-mr/pom.xml index 0b35256d4..578b9a028 100644 --- a/hudi-hadoop-mr/pom.xml +++ b/hudi-hadoop-mr/pom.xml @@ -101,6 +101,12 @@ junit test + + + org.mockito + mockito-all + test + diff --git a/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/realtime/TestHoodieRealtimeFileSplit.java b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/realtime/TestHoodieRealtimeFileSplit.java new file mode 100644 index 000000000..cab1e665c --- /dev/null +++ b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/realtime/TestHoodieRealtimeFileSplit.java @@ -0,0 +1,162 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.hadoop.realtime; + +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapred.FileSplit; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; +import org.mockito.InOrder; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.Collections; +import java.util.List; + +import static org.junit.Assert.assertEquals; +import static org.mockito.AdditionalMatchers.aryEq; +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyByte; +import static org.mockito.Matchers.anyInt; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.doNothing; +import static org.mockito.Mockito.eq; +import static org.mockito.Mockito.inOrder; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.when; + +public class TestHoodieRealtimeFileSplit { + + private HoodieRealtimeFileSplit split; + private String basePath; + private List deltaLogPaths; + private String fileSplitName; + private FileSplit baseFileSplit; + private String maxCommitTime; + private TemporaryFolder tmp; + + @Before + public void setUp() throws Exception { + tmp = new TemporaryFolder(); + tmp.create(); + + basePath = tmp.getRoot().toString(); + deltaLogPaths = Collections.singletonList(basePath + "/1.log"); + fileSplitName = basePath + "/test.file"; + baseFileSplit = new FileSplit(new Path(fileSplitName), 0, 100, new String[]{}); + maxCommitTime = "10001"; + + split = new HoodieRealtimeFileSplit(baseFileSplit, basePath, deltaLogPaths, maxCommitTime); + } + + @After + public void tearDown() throws Exception { + tmp.delete(); + } + + @Test + public void testWrite() throws IOException { + // create a mock for DataOutput that will be used in the write method + // this way we can capture and verify if correct arguments were passed + DataOutput out = mock(DataOutput.class); + + // register expected method calls for void functions + // so that we can verify what was called after the method call finishes + doNothing().when(out).writeByte(anyByte()); + doNothing().when(out).writeInt(anyInt()); + doNothing().when(out).write(any(byte[].class), anyInt(), anyInt()); + doNothing().when(out).write(any(byte[].class)); + + // call the method we want to test with the mocked input + split.write(out); + + // verify the method calls on the mocked object in the order of the calls + InOrder inorder = inOrder(out); + inorder.verify(out, times(1)).writeByte(eq(fileSplitName.length())); + inorder.verify(out, times(1)).write(aryEq(Text.encode(fileSplitName).array()), eq(0), eq(fileSplitName.length())); + inorder.verify(out, times(1)).writeInt(eq(basePath.length())); + inorder.verify(out, times(1)).write(aryEq(basePath.getBytes(StandardCharsets.UTF_8))); + inorder.verify(out, times(1)).writeInt(eq(maxCommitTime.length())); + inorder.verify(out, times(1)).write(aryEq(maxCommitTime.getBytes(StandardCharsets.UTF_8))); + inorder.verify(out, times(1)).writeInt(eq(deltaLogPaths.size())); + inorder.verify(out, times(1)).writeInt(eq(deltaLogPaths.get(0).length())); + inorder.verify(out, times(1)).write(aryEq(deltaLogPaths.get(0).getBytes(StandardCharsets.UTF_8))); + // verify there are no more interactions happened on the mocked object + inorder.verifyNoMoreInteractions(); + } + + @Test + public void testReadFields() throws IOException { + // create a mock for DataOutput that will be used in the readFields method + // this way we can capture and verify if correct arguments were passed + DataInput in = mock(DataInput.class); + + // register the mock responses to be returned when particular method call happens + // on the mocked object + when(in.readByte()).thenReturn((byte) fileSplitName.length()); + // Answer implementation is used to guarantee the response in sequence of the mock method calls + // since the same method is called many times, we need to return the responses in proper sequence + when(in.readInt()).thenAnswer(new Answer() { + private int count = 0; + private int[] answers = new int[]{basePath.length(), maxCommitTime.length(), deltaLogPaths.size(), deltaLogPaths.get(0).length()}; + + @Override + public Integer answer(InvocationOnMock invocationOnMock) throws Throwable { + return answers[count++]; + } + }); + Answer readFullyAnswer = new Answer() { + private int count = 0; + private byte[][] answers = new byte[][]{ + fileSplitName.getBytes(StandardCharsets.UTF_8), + basePath.getBytes(StandardCharsets.UTF_8), + maxCommitTime.getBytes(StandardCharsets.UTF_8), + deltaLogPaths.get(0).getBytes(StandardCharsets.UTF_8), + }; + + @Override + public Void answer(InvocationOnMock invocation) throws Throwable { + byte[] bytes = invocation.getArgumentAt(0, byte[].class); + byte[] answer = answers[count++]; + System.arraycopy(answer, 0, bytes, 0, answer.length); + return null; + } + }; + doAnswer(readFullyAnswer).when(in).readFully(any()); + doAnswer(readFullyAnswer).when(in).readFully(any(), anyInt(), anyInt()); + + // call readFields with mocked object + HoodieRealtimeFileSplit read = new HoodieRealtimeFileSplit(); + read.readFields(in); + + // assert proper returns after reading from the mocked object + assertEquals(basePath, read.getBasePath()); + assertEquals(maxCommitTime, read.getMaxCommitTime()); + assertEquals(deltaLogPaths, read.getDeltaLogPaths()); + assertEquals(split.toString(), read.toString()); + } +} \ No newline at end of file