[HUDI-568] Improve unit test coverage
Classes improved: * HoodieTableMetaClient * RocksDBDAO * HoodieRealtimeFileSplit
This commit is contained in:
committed by
n3nash
parent
996f761232
commit
f5f34bb1c1
@@ -75,9 +75,6 @@ public class RocksDBDAO {
|
|||||||
* Create RocksDB if not initialized.
|
* Create RocksDB if not initialized.
|
||||||
*/
|
*/
|
||||||
private RocksDB getRocksDB() {
|
private RocksDB getRocksDB() {
|
||||||
if (null == rocksDB) {
|
|
||||||
init();
|
|
||||||
}
|
|
||||||
return rocksDB;
|
return rocksDB;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -33,6 +33,7 @@ import java.io.IOException;
|
|||||||
import static org.junit.Assert.assertArrayEquals;
|
import static org.junit.Assert.assertArrayEquals;
|
||||||
import static org.junit.Assert.assertEquals;
|
import static org.junit.Assert.assertEquals;
|
||||||
import static org.junit.Assert.assertFalse;
|
import static org.junit.Assert.assertFalse;
|
||||||
|
import static org.junit.Assert.assertNotEquals;
|
||||||
import static org.junit.Assert.assertNotNull;
|
import static org.junit.Assert.assertNotNull;
|
||||||
import static org.junit.Assert.assertTrue;
|
import static org.junit.Assert.assertTrue;
|
||||||
|
|
||||||
@@ -94,4 +95,22 @@ public class TestHoodieTableMetaClient extends HoodieCommonTestHarness {
|
|||||||
assertArrayEquals("Commit value should be \"test-detail\"", "test-detail".getBytes(),
|
assertArrayEquals("Commit value should be \"test-detail\"", "test-detail".getBytes(),
|
||||||
activeCommitTimeline.getInstantDetails(completedInstant).get());
|
activeCommitTimeline.getInstantDetails(completedInstant).get());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testEquals() throws IOException {
|
||||||
|
HoodieTableMetaClient metaClient1 = HoodieTestUtils.init(folder.getRoot().getAbsolutePath(), getTableType());
|
||||||
|
HoodieTableMetaClient metaClient2 = HoodieTestUtils.init(folder.getRoot().getAbsolutePath(), getTableType());
|
||||||
|
assertEquals(metaClient1, metaClient1);
|
||||||
|
assertEquals(metaClient1, metaClient2);
|
||||||
|
assertNotEquals(metaClient1, null);
|
||||||
|
assertNotEquals(metaClient1, new Object());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testToString() throws IOException {
|
||||||
|
HoodieTableMetaClient metaClient1 = HoodieTestUtils.init(folder.getRoot().getAbsolutePath(), getTableType());
|
||||||
|
HoodieTableMetaClient metaClient2 = HoodieTestUtils.init(folder.getRoot().getAbsolutePath(), getTableType());
|
||||||
|
assertEquals(metaClient1.toString(), metaClient2.toString());
|
||||||
|
assertNotEquals(metaClient1.toString(), new Object().toString());
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -20,15 +20,17 @@ package org.apache.hudi.common.util.collection;
|
|||||||
|
|
||||||
import org.apache.hudi.common.table.view.FileSystemViewStorageConfig;
|
import org.apache.hudi.common.table.view.FileSystemViewStorageConfig;
|
||||||
|
|
||||||
import org.junit.AfterClass;
|
import org.junit.After;
|
||||||
import org.junit.Assert;
|
import org.junit.Assert;
|
||||||
import org.junit.BeforeClass;
|
import org.junit.Before;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
|
||||||
import java.io.File;
|
import java.io.File;
|
||||||
import java.io.Serializable;
|
import java.io.Serializable;
|
||||||
|
import java.util.ArrayList;
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
|
import java.util.Iterator;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.Objects;
|
import java.util.Objects;
|
||||||
@@ -41,16 +43,16 @@ import java.util.stream.IntStream;
|
|||||||
*/
|
*/
|
||||||
public class TestRocksDBManager {
|
public class TestRocksDBManager {
|
||||||
|
|
||||||
private static RocksDBDAO dbManager;
|
private RocksDBDAO dbManager;
|
||||||
|
|
||||||
@BeforeClass
|
@Before
|
||||||
public static void setUpClass() {
|
public void setUpClass() {
|
||||||
dbManager = new RocksDBDAO("/dummy/path",
|
dbManager = new RocksDBDAO("/dummy/path/" + UUID.randomUUID().toString(),
|
||||||
FileSystemViewStorageConfig.newBuilder().build().newBuilder().build().getRocksdbBasePath());
|
FileSystemViewStorageConfig.newBuilder().build().newBuilder().build().getRocksdbBasePath());
|
||||||
}
|
}
|
||||||
|
|
||||||
@AfterClass
|
@After
|
||||||
public static void tearDownClass() {
|
public void tearDownClass() {
|
||||||
if (dbManager != null) {
|
if (dbManager != null) {
|
||||||
dbManager.close();
|
dbManager.close();
|
||||||
dbManager = null;
|
dbManager = null;
|
||||||
@@ -68,14 +70,17 @@ public class TestRocksDBManager {
|
|||||||
String family2 = "family2";
|
String family2 = "family2";
|
||||||
List<String> colFamilies = Arrays.asList(family1, family2);
|
List<String> colFamilies = Arrays.asList(family1, family2);
|
||||||
|
|
||||||
List<Payload> payloads = IntStream.range(0, 100).mapToObj(index -> {
|
final List<Payload<String>> payloads = new ArrayList<>();
|
||||||
|
IntStream.range(0, 100).forEach(index -> {
|
||||||
String prefix = prefixes.get(index % 4);
|
String prefix = prefixes.get(index % 4);
|
||||||
String key = prefix + UUID.randomUUID().toString();
|
String key = prefix + UUID.randomUUID().toString();
|
||||||
String family = colFamilies.get(index % 2);
|
String family = colFamilies.get(index % 2);
|
||||||
String val = "VALUE_" + UUID.randomUUID().toString();
|
String val = "VALUE_" + UUID.randomUUID().toString();
|
||||||
return new Payload(prefix, key, val, family);
|
payloads.add(new Payload(prefix, key, val, family));
|
||||||
}).collect(Collectors.toList());
|
});
|
||||||
|
|
||||||
|
colFamilies.forEach(family -> dbManager.dropColumnFamily(family));
|
||||||
|
colFamilies.forEach(family -> dbManager.addColumnFamily(family));
|
||||||
colFamilies.forEach(family -> dbManager.dropColumnFamily(family));
|
colFamilies.forEach(family -> dbManager.dropColumnFamily(family));
|
||||||
colFamilies.forEach(family -> dbManager.addColumnFamily(family));
|
colFamilies.forEach(family -> dbManager.addColumnFamily(family));
|
||||||
|
|
||||||
@@ -103,21 +108,114 @@ public class TestRocksDBManager {
|
|||||||
expCount == null ? 0L : expCount.longValue(), gotPayloads.size());
|
expCount == null ? 0L : expCount.longValue(), gotPayloads.size());
|
||||||
gotPayloads.forEach(p -> {
|
gotPayloads.forEach(p -> {
|
||||||
Assert.assertEquals(p.getRight().getFamily(), family);
|
Assert.assertEquals(p.getRight().getFamily(), family);
|
||||||
Assert.assertTrue(p.getRight().getKey().startsWith(prefix));
|
Assert.assertTrue(p.getRight().getKey().toString().startsWith(prefix));
|
||||||
});
|
});
|
||||||
});
|
});
|
||||||
});
|
});
|
||||||
|
|
||||||
payloads.forEach(payload -> {
|
payloads.stream().filter(p -> !p.getPrefix().equalsIgnoreCase(prefix1)).forEach(payload -> {
|
||||||
Payload p = dbManager.get(payload.getFamily(), payload.getKey());
|
Payload p = dbManager.get(payload.getFamily(), payload.getKey());
|
||||||
Assert.assertEquals("Retrieved correct payload for key :" + payload.getKey(), payload, p);
|
Assert.assertEquals("Retrieved correct payload for key :" + payload.getKey(), payload, p);
|
||||||
|
|
||||||
// Now, delete the key
|
|
||||||
dbManager.delete(payload.getFamily(), payload.getKey());
|
dbManager.delete(payload.getFamily(), payload.getKey());
|
||||||
|
|
||||||
// Now retrieve
|
|
||||||
Payload p2 = dbManager.get(payload.getFamily(), payload.getKey());
|
Payload p2 = dbManager.get(payload.getFamily(), payload.getKey());
|
||||||
Assert.assertNull("Retrieved correct payload for key :" + p.getKey(), p2);
|
Assert.assertNull("Retrieved correct payload for key :" + payload.getKey(), p2);
|
||||||
|
});
|
||||||
|
|
||||||
|
colFamilies.forEach(family -> {
|
||||||
|
dbManager.prefixDelete(family, prefix1);
|
||||||
|
|
||||||
|
int got = dbManager.prefixSearch(family, prefix1).collect(Collectors.toList()).size();
|
||||||
|
Assert.assertEquals("Expected prefix delete to leave at least one item for family: " + family, countsMap.get(family).get(prefix1) == null ? 0 : 1, got);
|
||||||
|
});
|
||||||
|
|
||||||
|
payloads.stream().filter(p -> !p.getPrefix().equalsIgnoreCase(prefix1)).forEach(payload -> {
|
||||||
|
Payload p2 = dbManager.get(payload.getFamily(), payload.getKey());
|
||||||
|
Assert.assertNull("Retrieved correct payload for key :" + payload.getKey(), p2);
|
||||||
|
});
|
||||||
|
|
||||||
|
// Now do a prefix search
|
||||||
|
colFamilies.forEach(family -> {
|
||||||
|
prefixes.stream().filter(p -> !p.equalsIgnoreCase(prefix1)).forEach(prefix -> {
|
||||||
|
List<Pair<String, Payload>> gotPayloads =
|
||||||
|
dbManager.<Payload>prefixSearch(family, prefix).collect(Collectors.toList());
|
||||||
|
Assert.assertEquals("Size check for prefix (" + prefix + ") and family (" + family + ")", 0,
|
||||||
|
gotPayloads.size());
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
|
String rocksDBBasePath = dbManager.getRocksDBBasePath();
|
||||||
|
dbManager.close();
|
||||||
|
Assert.assertFalse(new File(rocksDBBasePath).exists());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testWithSerializableKey() {
|
||||||
|
String prefix1 = "prefix1_";
|
||||||
|
String prefix2 = "prefix2_";
|
||||||
|
String prefix3 = "prefix3_";
|
||||||
|
String prefix4 = "prefix4_";
|
||||||
|
List<String> prefixes = Arrays.asList(prefix1, prefix2, prefix3, prefix4);
|
||||||
|
String family1 = "family1";
|
||||||
|
String family2 = "family2";
|
||||||
|
List<String> colFamilies = Arrays.asList(family1, family2);
|
||||||
|
|
||||||
|
final List<Payload<PayloadKey>> payloads = new ArrayList<>();
|
||||||
|
IntStream.range(0, 100).forEach(index -> {
|
||||||
|
String prefix = prefixes.get(index % 4);
|
||||||
|
String key = prefix + UUID.randomUUID().toString();
|
||||||
|
String family = colFamilies.get(index % 2);
|
||||||
|
String val = "VALUE_" + UUID.randomUUID().toString();
|
||||||
|
payloads.add(new Payload(prefix, new PayloadKey((key)), val, family));
|
||||||
|
});
|
||||||
|
|
||||||
|
colFamilies.forEach(family -> dbManager.dropColumnFamily(family));
|
||||||
|
colFamilies.forEach(family -> dbManager.addColumnFamily(family));
|
||||||
|
|
||||||
|
Map<String, Map<String, Integer>> countsMap = new HashMap<>();
|
||||||
|
dbManager.writeBatch(batch -> {
|
||||||
|
payloads.forEach(payload -> {
|
||||||
|
dbManager.putInBatch(batch, payload.getFamily(), payload.getKey(), payload);
|
||||||
|
|
||||||
|
if (!countsMap.containsKey(payload.family)) {
|
||||||
|
countsMap.put(payload.family, new HashMap<>());
|
||||||
|
}
|
||||||
|
Map<String, Integer> c = countsMap.get(payload.family);
|
||||||
|
if (!c.containsKey(payload.prefix)) {
|
||||||
|
c.put(payload.prefix, 0);
|
||||||
|
}
|
||||||
|
int currCount = c.get(payload.prefix);
|
||||||
|
c.put(payload.prefix, currCount + 1);
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
|
Iterator<List<Payload<PayloadKey>>> payloadSplits = payloads.stream()
|
||||||
|
.collect(Collectors.partitioningBy(s -> payloads.indexOf(s) > payloads.size() / 2)).values()
|
||||||
|
.iterator();
|
||||||
|
|
||||||
|
payloads.forEach(payload -> {
|
||||||
|
Payload p = dbManager.get(payload.getFamily(), payload.getKey());
|
||||||
|
Assert.assertEquals("Retrieved correct payload for key :" + payload.getKey(), payload, p);
|
||||||
|
});
|
||||||
|
|
||||||
|
payloadSplits.next().forEach(payload -> {
|
||||||
|
dbManager.delete(payload.getFamily(), payload.getKey());
|
||||||
|
Payload want = dbManager.get(payload.getFamily(), payload.getKey());
|
||||||
|
Assert.assertNull("Verify deleted during single delete for key :" + payload.getKey(), want);
|
||||||
|
});
|
||||||
|
|
||||||
|
dbManager.writeBatch(batch -> {
|
||||||
|
payloadSplits.next().forEach(payload -> {
|
||||||
|
dbManager.deleteInBatch(batch, payload.getFamily(), payload.getKey());
|
||||||
|
Payload want = dbManager.get(payload.getFamily(), payload.getKey());
|
||||||
|
Assert.assertEquals("Verify not deleted during batch delete in progress for key :" + payload.getKey(), payload, want);
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
|
payloads.forEach(payload -> {
|
||||||
|
Payload want = dbManager.get(payload.getFamily(), payload.getKey());
|
||||||
|
Assert.assertNull("Verify delete for key :" + payload.getKey(), want);
|
||||||
});
|
});
|
||||||
|
|
||||||
// Now do a prefix search
|
// Now do a prefix search
|
||||||
@@ -135,17 +233,47 @@ public class TestRocksDBManager {
|
|||||||
Assert.assertFalse(new File(rocksDBBasePath).exists());
|
Assert.assertFalse(new File(rocksDBBasePath).exists());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public static class PayloadKey implements Serializable {
|
||||||
|
private String key;
|
||||||
|
|
||||||
|
public PayloadKey(String key) {
|
||||||
|
this.key = key;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String toString() {
|
||||||
|
return key;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean equals(Object o) {
|
||||||
|
if (this == o) {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
if (o == null || getClass() != o.getClass()) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
PayloadKey that = (PayloadKey) o;
|
||||||
|
return Objects.equals(key, that.key);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public int hashCode() {
|
||||||
|
return Objects.hash(key);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* A payload definition for {@link TestRocksDBManager}.
|
* A payload definition for {@link TestRocksDBManager}.
|
||||||
*/
|
*/
|
||||||
public static class Payload implements Serializable {
|
public static class Payload<T> implements Serializable {
|
||||||
|
|
||||||
private final String prefix;
|
private final String prefix;
|
||||||
private final String key;
|
private final T key;
|
||||||
private final String val;
|
private final String val;
|
||||||
private final String family;
|
private final String family;
|
||||||
|
|
||||||
public Payload(String prefix, String key, String val, String family) {
|
public Payload(String prefix, T key, String val, String family) {
|
||||||
this.prefix = prefix;
|
this.prefix = prefix;
|
||||||
this.key = key;
|
this.key = key;
|
||||||
this.val = val;
|
this.val = val;
|
||||||
@@ -156,7 +284,7 @@ public class TestRocksDBManager {
|
|||||||
return prefix;
|
return prefix;
|
||||||
}
|
}
|
||||||
|
|
||||||
public String getKey() {
|
public T getKey() {
|
||||||
return key;
|
return key;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -101,6 +101,12 @@
|
|||||||
<artifactId>junit</artifactId>
|
<artifactId>junit</artifactId>
|
||||||
<scope>test</scope>
|
<scope>test</scope>
|
||||||
</dependency>
|
</dependency>
|
||||||
|
|
||||||
|
<dependency>
|
||||||
|
<groupId>org.mockito</groupId>
|
||||||
|
<artifactId>mockito-all</artifactId>
|
||||||
|
<scope>test</scope>
|
||||||
|
</dependency>
|
||||||
</dependencies>
|
</dependencies>
|
||||||
|
|
||||||
<build>
|
<build>
|
||||||
|
|||||||
@@ -0,0 +1,162 @@
|
|||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hudi.hadoop.realtime;
|
||||||
|
|
||||||
|
import org.apache.hadoop.fs.Path;
|
||||||
|
import org.apache.hadoop.io.Text;
|
||||||
|
import org.apache.hadoop.mapred.FileSplit;
|
||||||
|
import org.junit.After;
|
||||||
|
import org.junit.Before;
|
||||||
|
import org.junit.Test;
|
||||||
|
import org.junit.rules.TemporaryFolder;
|
||||||
|
import org.mockito.InOrder;
|
||||||
|
import org.mockito.invocation.InvocationOnMock;
|
||||||
|
import org.mockito.stubbing.Answer;
|
||||||
|
|
||||||
|
import java.io.DataInput;
|
||||||
|
import java.io.DataOutput;
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.nio.charset.StandardCharsets;
|
||||||
|
import java.util.Collections;
|
||||||
|
import java.util.List;
|
||||||
|
|
||||||
|
import static org.junit.Assert.assertEquals;
|
||||||
|
import static org.mockito.AdditionalMatchers.aryEq;
|
||||||
|
import static org.mockito.Matchers.any;
|
||||||
|
import static org.mockito.Matchers.anyByte;
|
||||||
|
import static org.mockito.Matchers.anyInt;
|
||||||
|
import static org.mockito.Mockito.doAnswer;
|
||||||
|
import static org.mockito.Mockito.doNothing;
|
||||||
|
import static org.mockito.Mockito.eq;
|
||||||
|
import static org.mockito.Mockito.inOrder;
|
||||||
|
import static org.mockito.Mockito.mock;
|
||||||
|
import static org.mockito.Mockito.times;
|
||||||
|
import static org.mockito.Mockito.when;
|
||||||
|
|
||||||
|
public class TestHoodieRealtimeFileSplit {
|
||||||
|
|
||||||
|
private HoodieRealtimeFileSplit split;
|
||||||
|
private String basePath;
|
||||||
|
private List<String> deltaLogPaths;
|
||||||
|
private String fileSplitName;
|
||||||
|
private FileSplit baseFileSplit;
|
||||||
|
private String maxCommitTime;
|
||||||
|
private TemporaryFolder tmp;
|
||||||
|
|
||||||
|
@Before
|
||||||
|
public void setUp() throws Exception {
|
||||||
|
tmp = new TemporaryFolder();
|
||||||
|
tmp.create();
|
||||||
|
|
||||||
|
basePath = tmp.getRoot().toString();
|
||||||
|
deltaLogPaths = Collections.singletonList(basePath + "/1.log");
|
||||||
|
fileSplitName = basePath + "/test.file";
|
||||||
|
baseFileSplit = new FileSplit(new Path(fileSplitName), 0, 100, new String[]{});
|
||||||
|
maxCommitTime = "10001";
|
||||||
|
|
||||||
|
split = new HoodieRealtimeFileSplit(baseFileSplit, basePath, deltaLogPaths, maxCommitTime);
|
||||||
|
}
|
||||||
|
|
||||||
|
@After
|
||||||
|
public void tearDown() throws Exception {
|
||||||
|
tmp.delete();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testWrite() throws IOException {
|
||||||
|
// create a mock for DataOutput that will be used in the write method
|
||||||
|
// this way we can capture and verify if correct arguments were passed
|
||||||
|
DataOutput out = mock(DataOutput.class);
|
||||||
|
|
||||||
|
// register expected method calls for void functions
|
||||||
|
// so that we can verify what was called after the method call finishes
|
||||||
|
doNothing().when(out).writeByte(anyByte());
|
||||||
|
doNothing().when(out).writeInt(anyInt());
|
||||||
|
doNothing().when(out).write(any(byte[].class), anyInt(), anyInt());
|
||||||
|
doNothing().when(out).write(any(byte[].class));
|
||||||
|
|
||||||
|
// call the method we want to test with the mocked input
|
||||||
|
split.write(out);
|
||||||
|
|
||||||
|
// verify the method calls on the mocked object in the order of the calls
|
||||||
|
InOrder inorder = inOrder(out);
|
||||||
|
inorder.verify(out, times(1)).writeByte(eq(fileSplitName.length()));
|
||||||
|
inorder.verify(out, times(1)).write(aryEq(Text.encode(fileSplitName).array()), eq(0), eq(fileSplitName.length()));
|
||||||
|
inorder.verify(out, times(1)).writeInt(eq(basePath.length()));
|
||||||
|
inorder.verify(out, times(1)).write(aryEq(basePath.getBytes(StandardCharsets.UTF_8)));
|
||||||
|
inorder.verify(out, times(1)).writeInt(eq(maxCommitTime.length()));
|
||||||
|
inorder.verify(out, times(1)).write(aryEq(maxCommitTime.getBytes(StandardCharsets.UTF_8)));
|
||||||
|
inorder.verify(out, times(1)).writeInt(eq(deltaLogPaths.size()));
|
||||||
|
inorder.verify(out, times(1)).writeInt(eq(deltaLogPaths.get(0).length()));
|
||||||
|
inorder.verify(out, times(1)).write(aryEq(deltaLogPaths.get(0).getBytes(StandardCharsets.UTF_8)));
|
||||||
|
// verify there are no more interactions happened on the mocked object
|
||||||
|
inorder.verifyNoMoreInteractions();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testReadFields() throws IOException {
|
||||||
|
// create a mock for DataOutput that will be used in the readFields method
|
||||||
|
// this way we can capture and verify if correct arguments were passed
|
||||||
|
DataInput in = mock(DataInput.class);
|
||||||
|
|
||||||
|
// register the mock responses to be returned when particular method call happens
|
||||||
|
// on the mocked object
|
||||||
|
when(in.readByte()).thenReturn((byte) fileSplitName.length());
|
||||||
|
// Answer implementation is used to guarantee the response in sequence of the mock method calls
|
||||||
|
// since the same method is called many times, we need to return the responses in proper sequence
|
||||||
|
when(in.readInt()).thenAnswer(new Answer<Integer>() {
|
||||||
|
private int count = 0;
|
||||||
|
private int[] answers = new int[]{basePath.length(), maxCommitTime.length(), deltaLogPaths.size(), deltaLogPaths.get(0).length()};
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Integer answer(InvocationOnMock invocationOnMock) throws Throwable {
|
||||||
|
return answers[count++];
|
||||||
|
}
|
||||||
|
});
|
||||||
|
Answer<Void> readFullyAnswer = new Answer<Void>() {
|
||||||
|
private int count = 0;
|
||||||
|
private byte[][] answers = new byte[][]{
|
||||||
|
fileSplitName.getBytes(StandardCharsets.UTF_8),
|
||||||
|
basePath.getBytes(StandardCharsets.UTF_8),
|
||||||
|
maxCommitTime.getBytes(StandardCharsets.UTF_8),
|
||||||
|
deltaLogPaths.get(0).getBytes(StandardCharsets.UTF_8),
|
||||||
|
};
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Void answer(InvocationOnMock invocation) throws Throwable {
|
||||||
|
byte[] bytes = invocation.getArgumentAt(0, byte[].class);
|
||||||
|
byte[] answer = answers[count++];
|
||||||
|
System.arraycopy(answer, 0, bytes, 0, answer.length);
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
doAnswer(readFullyAnswer).when(in).readFully(any());
|
||||||
|
doAnswer(readFullyAnswer).when(in).readFully(any(), anyInt(), anyInt());
|
||||||
|
|
||||||
|
// call readFields with mocked object
|
||||||
|
HoodieRealtimeFileSplit read = new HoodieRealtimeFileSplit();
|
||||||
|
read.readFields(in);
|
||||||
|
|
||||||
|
// assert proper returns after reading from the mocked object
|
||||||
|
assertEquals(basePath, read.getBasePath());
|
||||||
|
assertEquals(maxCommitTime, read.getMaxCommitTime());
|
||||||
|
assertEquals(deltaLogPaths, read.getDeltaLogPaths());
|
||||||
|
assertEquals(split.toString(), read.toString());
|
||||||
|
}
|
||||||
|
}
|
||||||
Reference in New Issue
Block a user