[HUDI-402]: code clean up in test cases
This commit is contained in:
committed by
vinoth chandar
parent
98c0d8cf60
commit
dde21e7315
@@ -24,7 +24,6 @@ import org.junit.runner.RunWith;
|
||||
import org.junit.runners.Parameterized;
|
||||
import org.junit.runners.Parameterized.Parameters;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collection;
|
||||
@@ -55,7 +54,7 @@ public class TestBloomFilter {
|
||||
|
||||
@Test
|
||||
public void testAddKey() {
|
||||
List<String> inputs = new ArrayList<>();
|
||||
List<String> inputs;
|
||||
int[] sizes = {100, 1000, 10000};
|
||||
for (int size : sizes) {
|
||||
inputs = new ArrayList<>();
|
||||
@@ -78,9 +77,9 @@ public class TestBloomFilter {
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSerialize() throws IOException, ClassNotFoundException {
|
||||
public void testSerialize() {
|
||||
|
||||
List<String> inputs = new ArrayList<>();
|
||||
List<String> inputs;
|
||||
int[] sizes = {100, 1000, 10000};
|
||||
for (int size : sizes) {
|
||||
inputs = new ArrayList<>();
|
||||
|
||||
@@ -79,7 +79,7 @@ public class HdfsTestService {
|
||||
|
||||
// Configure and start the HDFS cluster
|
||||
// boolean format = shouldFormatDFSCluster(localDFSLocation, clean);
|
||||
hadoopConf = configureDFSCluster(hadoopConf, localDFSLocation, bindIP, namenodeRpcPort, namenodeHttpPort,
|
||||
hadoopConf = configureDFSCluster(hadoopConf, localDFSLocation, bindIP, namenodeRpcPort,
|
||||
datanodePort, datanodeIpcPort, datanodeHttpPort);
|
||||
miniDfsCluster = new MiniDFSCluster.Builder(hadoopConf).numDataNodes(1).format(format).checkDataNodeAddrConfig(true)
|
||||
.checkDataNodeHostConfig(true).build();
|
||||
@@ -87,7 +87,7 @@ public class HdfsTestService {
|
||||
return miniDfsCluster;
|
||||
}
|
||||
|
||||
public void stop() throws IOException {
|
||||
public void stop() {
|
||||
LOG.info("HDFS Minicluster service being shut down.");
|
||||
miniDfsCluster.shutdown();
|
||||
miniDfsCluster = null;
|
||||
@@ -104,23 +104,6 @@ public class HdfsTestService {
|
||||
return baseFsLocation + Path.SEPARATOR + "dfs";
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns true if we should format the DFS Cluster. We'll format if clean is true, or if the dfsFsLocation does not
|
||||
* exist.
|
||||
*
|
||||
* @param localDFSLocation The location on the local FS to hold the HDFS metadata and block data
|
||||
* @param clean Specifies if we want to start a clean cluster
|
||||
* @return Returns true if we should format a DFSCluster, otherwise false
|
||||
*/
|
||||
private static boolean shouldFormatDFSCluster(String localDFSLocation, boolean clean) {
|
||||
boolean format = true;
|
||||
File f = new File(localDFSLocation);
|
||||
if (f.exists() && f.isDirectory() && !clean) {
|
||||
format = false;
|
||||
}
|
||||
return format;
|
||||
}
|
||||
|
||||
/**
|
||||
* Configure the DFS Cluster before launching it.
|
||||
*
|
||||
@@ -130,7 +113,7 @@ public class HdfsTestService {
|
||||
* @return The updated Configuration object.
|
||||
*/
|
||||
private static Configuration configureDFSCluster(Configuration config, String localDFSLocation, String bindIP,
|
||||
int namenodeRpcPort, int namenodeHttpPort, int datanodePort, int datanodeIpcPort, int datanodeHttpPort) {
|
||||
int namenodeRpcPort, int datanodePort, int datanodeIpcPort, int datanodeHttpPort) {
|
||||
|
||||
LOG.info("HDFS force binding to ip: " + bindIP);
|
||||
config.set(DFSConfigKeys.FS_DEFAULT_NAME_KEY, "hdfs://" + bindIP + ":" + namenodeRpcPort);
|
||||
|
||||
@@ -139,15 +139,7 @@ public class HoodieTestUtils {
|
||||
}
|
||||
}
|
||||
|
||||
public static final void createDeltaCommitFiles(String basePath, String... commitTimes) throws IOException {
|
||||
for (String commitTime : commitTimes) {
|
||||
new File(
|
||||
basePath + "/" + HoodieTableMetaClient.METAFOLDER_NAME + "/" + HoodieTimeline.makeDeltaFileName(commitTime))
|
||||
.createNewFile();
|
||||
}
|
||||
}
|
||||
|
||||
public static final void createMetadataFolder(String basePath) throws IOException {
|
||||
public static final void createMetadataFolder(String basePath) {
|
||||
new File(basePath + "/" + HoodieTableMetaClient.METAFOLDER_NAME).mkdirs();
|
||||
}
|
||||
|
||||
@@ -161,8 +153,7 @@ public class HoodieTestUtils {
|
||||
}
|
||||
}
|
||||
|
||||
public static final void createPendingCleanFiles(HoodieTableMetaClient metaClient, Configuration configuration,
|
||||
String... commitTimes) throws IOException {
|
||||
public static final void createPendingCleanFiles(HoodieTableMetaClient metaClient, String... commitTimes) {
|
||||
for (String commitTime : commitTimes) {
|
||||
Arrays.asList(HoodieTimeline.makeRequestedCleanerFileName(commitTime),
|
||||
HoodieTimeline.makeInflightCleanerFileName(commitTime)).forEach(f -> {
|
||||
@@ -303,7 +294,7 @@ public class HoodieTestUtils {
|
||||
public static void createCleanFiles(HoodieTableMetaClient metaClient, String basePath,
|
||||
String commitTime, Configuration configuration)
|
||||
throws IOException {
|
||||
createPendingCleanFiles(metaClient, configuration, commitTime);
|
||||
createPendingCleanFiles(metaClient, commitTime);
|
||||
Path commitFile = new Path(
|
||||
basePath + "/" + HoodieTableMetaClient.METAFOLDER_NAME + "/" + HoodieTimeline.makeCleanerFileName(commitTime));
|
||||
FileSystem fs = FSUtils.getFs(basePath, configuration);
|
||||
@@ -323,19 +314,6 @@ public class HoodieTestUtils {
|
||||
}
|
||||
}
|
||||
|
||||
public static void createCleanFiles(HoodieTableMetaClient metaClient,
|
||||
String basePath, String commitTime) throws IOException {
|
||||
createCleanFiles(metaClient, basePath, commitTime, HoodieTestUtils.getDefaultHadoopConf());
|
||||
}
|
||||
|
||||
public static String makeTestFileName(String instant) {
|
||||
return instant + TEST_EXTENSION;
|
||||
}
|
||||
|
||||
public static String makeCommitFileName(String instant) {
|
||||
return instant + ".commit";
|
||||
}
|
||||
|
||||
public static void assertStreamEquals(String message, Stream<?> expected, Stream<?> actual) {
|
||||
Iterator<?> iter1 = expected.iterator();
|
||||
Iterator<?> iter2 = actual.iterator();
|
||||
@@ -345,8 +323,7 @@ public class HoodieTestUtils {
|
||||
assert !iter1.hasNext() && !iter2.hasNext();
|
||||
}
|
||||
|
||||
public static <T extends Serializable> T serializeDeserialize(T object, Class<T> clazz)
|
||||
throws IOException, ClassNotFoundException {
|
||||
public static <T extends Serializable> T serializeDeserialize(T object, Class<T> clazz) {
|
||||
// Using Kyro as the default serializer in Spark Jobs
|
||||
Kryo kryo = new Kryo();
|
||||
kryo.register(HoodieTableMetaClient.class, new JavaSerializer());
|
||||
@@ -367,20 +344,19 @@ public class HoodieTestUtils {
|
||||
Map<HoodieRecordLocation, List<HoodieRecord>> groupedUpdated =
|
||||
updatedRecords.stream().collect(Collectors.groupingBy(HoodieRecord::getCurrentLocation));
|
||||
|
||||
groupedUpdated.entrySet().forEach(s -> {
|
||||
HoodieRecordLocation location = s.getKey();
|
||||
String partitionPath = s.getValue().get(0).getPartitionPath();
|
||||
groupedUpdated.forEach((location, value) -> {
|
||||
String partitionPath = value.get(0).getPartitionPath();
|
||||
|
||||
Writer logWriter;
|
||||
try {
|
||||
logWriter = HoodieLogFormat.newWriterBuilder().onParentPath(new Path(basePath, partitionPath))
|
||||
.withFileExtension(HoodieLogFile.DELTA_EXTENSION).withFileId(location.getFileId())
|
||||
.overBaseCommit(location.getInstantTime()).withFs(fs).build();
|
||||
.withFileExtension(HoodieLogFile.DELTA_EXTENSION).withFileId(location.getFileId())
|
||||
.overBaseCommit(location.getInstantTime()).withFs(fs).build();
|
||||
|
||||
Map<HoodieLogBlock.HeaderMetadataType, String> header = Maps.newHashMap();
|
||||
header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, location.getInstantTime());
|
||||
header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, schema.toString());
|
||||
logWriter.appendBlock(new HoodieAvroDataBlock(s.getValue().stream().map(r -> {
|
||||
logWriter.appendBlock(new HoodieAvroDataBlock(value.stream().map(r -> {
|
||||
try {
|
||||
GenericRecord val = (GenericRecord) r.getData().getInsertValue(schema).get();
|
||||
HoodieAvroUtils.addHoodieKeyToRecord(val, r.getRecordKey(), r.getPartitionPath(), "");
|
||||
|
||||
@@ -33,7 +33,7 @@ public class TestHoodieCommitMetadata {
|
||||
|
||||
List<HoodieWriteStat> fakeHoodieWriteStats = HoodieTestUtils.generateFakeHoodieWriteStat(100);
|
||||
HoodieCommitMetadata commitMetadata = new HoodieCommitMetadata();
|
||||
fakeHoodieWriteStats.stream().forEach(stat -> commitMetadata.addWriteStat(stat.getPartitionPath(), stat));
|
||||
fakeHoodieWriteStats.forEach(stat -> commitMetadata.addWriteStat(stat.getPartitionPath(), stat));
|
||||
Assert.assertTrue(commitMetadata.getTotalCreateTime() > 0);
|
||||
Assert.assertTrue(commitMetadata.getTotalUpsertTime() > 0);
|
||||
Assert.assertTrue(commitMetadata.getTotalScanTime() > 0);
|
||||
@@ -43,7 +43,7 @@ public class TestHoodieCommitMetadata {
|
||||
HoodieCommitMetadata metadata =
|
||||
HoodieCommitMetadata.fromJsonString(serializedCommitMetadata, HoodieCommitMetadata.class);
|
||||
// Make sure timing metrics are not written to instant file
|
||||
Assert.assertTrue(metadata.getTotalScanTime() == 0);
|
||||
Assert.assertEquals(0, (long) metadata.getTotalScanTime());
|
||||
Assert.assertTrue(metadata.getTotalLogFilesCompacted() > 0);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -18,7 +18,6 @@
|
||||
|
||||
package org.apache.hudi.common.model;
|
||||
|
||||
import org.apache.hudi.common.table.HoodieTableMetaClient;
|
||||
import org.apache.hudi.common.util.FSUtils;
|
||||
|
||||
import org.apache.hadoop.fs.Path;
|
||||
@@ -46,7 +45,6 @@ public class TestHoodieWriteStat {
|
||||
|
||||
Path basePath = new Path(basePathString);
|
||||
Path partitionPath = new Path(basePath, partitionPathString);
|
||||
Path tempPath = new Path(basePath, HoodieTableMetaClient.TEMPFOLDER_NAME);
|
||||
|
||||
Path finalizeFilePath = new Path(partitionPath, FSUtils.makeDataFileName(commitTime, writeToken, fileName));
|
||||
HoodieWriteStat writeStat = new HoodieWriteStat();
|
||||
|
||||
@@ -61,7 +61,7 @@ public class TestHoodieTableMetaClient extends HoodieCommonTestHarness {
|
||||
}
|
||||
|
||||
@Test
|
||||
public void checkSerDe() throws IOException, ClassNotFoundException {
|
||||
public void checkSerDe() {
|
||||
// check if this object is serialized and de-serialized, we are able to read from the file system
|
||||
HoodieTableMetaClient deseralizedMetaClient =
|
||||
HoodieTestUtils.serializeDeserialize(metaClient, HoodieTableMetaClient.class);
|
||||
@@ -78,7 +78,7 @@ public class TestHoodieTableMetaClient extends HoodieCommonTestHarness {
|
||||
}
|
||||
|
||||
@Test
|
||||
public void checkCommitTimeline() throws IOException {
|
||||
public void checkCommitTimeline() {
|
||||
HoodieActiveTimeline activeTimeline = metaClient.getActiveTimeline();
|
||||
HoodieTimeline activeCommitTimeline = activeTimeline.getCommitTimeline();
|
||||
assertTrue("Should be empty commit timeline", activeCommitTimeline.empty());
|
||||
|
||||
@@ -30,7 +30,6 @@ import org.apache.hudi.common.table.log.HoodieLogFormat.Reader;
|
||||
import org.apache.hudi.common.table.log.HoodieLogFormat.Writer;
|
||||
import org.apache.hudi.common.table.log.block.HoodieAvroDataBlock;
|
||||
import org.apache.hudi.common.table.log.block.HoodieCommandBlock;
|
||||
import org.apache.hudi.common.table.log.block.HoodieCorruptBlock;
|
||||
import org.apache.hudi.common.table.log.block.HoodieDeleteBlock;
|
||||
import org.apache.hudi.common.table.log.block.HoodieLogBlock;
|
||||
import org.apache.hudi.common.table.log.block.HoodieLogBlock.HeaderMetadataType;
|
||||
@@ -471,7 +470,7 @@ public class TestHoodieLogFormat extends HoodieCommonTestHarness {
|
||||
}
|
||||
|
||||
assertEquals("Scanner records count should be the same as appended records", scannedRecords.size(),
|
||||
allRecords.stream().flatMap(records -> records.stream()).collect(Collectors.toList()).size());
|
||||
allRecords.stream().mapToLong(Collection::size).sum());
|
||||
|
||||
}
|
||||
|
||||
@@ -511,8 +510,6 @@ public class TestHoodieLogFormat extends HoodieCommonTestHarness {
|
||||
assertTrue("We should have corrupted block next", reader.hasNext());
|
||||
HoodieLogBlock block = reader.next();
|
||||
assertEquals("The read block should be a corrupt block", HoodieLogBlockType.CORRUPT_BLOCK, block.getBlockType());
|
||||
HoodieCorruptBlock corruptBlock = (HoodieCorruptBlock) block;
|
||||
// assertEquals("", "something-random", new String(corruptBlock.getCorruptedBytes()));
|
||||
assertFalse("There should be no more block left", reader.hasNext());
|
||||
|
||||
reader.close();
|
||||
@@ -551,8 +548,6 @@ public class TestHoodieLogFormat extends HoodieCommonTestHarness {
|
||||
assertTrue("We should get the 2nd corrupted block next", reader.hasNext());
|
||||
block = reader.next();
|
||||
assertEquals("The read block should be a corrupt block", HoodieLogBlockType.CORRUPT_BLOCK, block.getBlockType());
|
||||
corruptBlock = (HoodieCorruptBlock) block;
|
||||
// assertEquals("", "something-else-random", new String(corruptBlock.getCorruptedBytes()));
|
||||
assertTrue("We should get the last block next", reader.hasNext());
|
||||
reader.next();
|
||||
assertFalse("We should have no more blocks left", reader.hasNext());
|
||||
@@ -817,7 +812,7 @@ public class TestHoodieLogFormat extends HoodieCommonTestHarness {
|
||||
header.put(HoodieLogBlock.HeaderMetadataType.COMMAND_BLOCK_TYPE,
|
||||
String.valueOf(HoodieCommandBlock.HoodieCommandBlockTypeEnum.ROLLBACK_PREVIOUS_BLOCK.ordinal()));
|
||||
HoodieCommandBlock commandBlock = new HoodieCommandBlock(header);
|
||||
writer = writer.appendBlock(commandBlock);
|
||||
writer.appendBlock(commandBlock);
|
||||
|
||||
readKeys.clear();
|
||||
scanner = new HoodieMergedLogRecordScanner(fs, basePath, allLogFiles, schema, "101", 10240L, readBlocksLazily,
|
||||
@@ -855,10 +850,6 @@ public class TestHoodieLogFormat extends HoodieCommonTestHarness {
|
||||
dataBlock = new HoodieAvroDataBlock(records2, header);
|
||||
writer = writer.appendBlock(dataBlock);
|
||||
|
||||
List<String> originalKeys =
|
||||
copyOfRecords1.stream().map(s -> ((GenericRecord) s).get(HoodieRecord.RECORD_KEY_METADATA_FIELD).toString())
|
||||
.collect(Collectors.toList());
|
||||
|
||||
// Delete 50 keys
|
||||
// Delete 50 keys
|
||||
List<HoodieKey> deletedKeys = copyOfRecords1.stream()
|
||||
@@ -881,7 +872,7 @@ public class TestHoodieLogFormat extends HoodieCommonTestHarness {
|
||||
// it's okay
|
||||
}
|
||||
// Attempt 2 : Write another rollback blocks for a failed write
|
||||
writer = writer.appendBlock(commandBlock);
|
||||
writer.appendBlock(commandBlock);
|
||||
|
||||
List<String> allLogFiles =
|
||||
FSUtils.getAllLogFiles(fs, partitionPath, "test-fileid1", HoodieLogFile.DELTA_EXTENSION, "100")
|
||||
@@ -937,7 +928,7 @@ public class TestHoodieLogFormat extends HoodieCommonTestHarness {
|
||||
String.valueOf(HoodieCommandBlock.HoodieCommandBlockTypeEnum.ROLLBACK_PREVIOUS_BLOCK.ordinal()));
|
||||
HoodieCommandBlock commandBlock = new HoodieCommandBlock(header);
|
||||
writer = writer.appendBlock(commandBlock);
|
||||
writer = writer.appendBlock(commandBlock);
|
||||
writer.appendBlock(commandBlock);
|
||||
|
||||
List<String> allLogFiles =
|
||||
FSUtils.getAllLogFiles(fs, partitionPath, "test-fileid1", HoodieLogFile.DELTA_EXTENSION, "100")
|
||||
@@ -970,7 +961,7 @@ public class TestHoodieLogFormat extends HoodieCommonTestHarness {
|
||||
header.put(HoodieLogBlock.HeaderMetadataType.COMMAND_BLOCK_TYPE,
|
||||
String.valueOf(HoodieCommandBlock.HoodieCommandBlockTypeEnum.ROLLBACK_PREVIOUS_BLOCK.ordinal()));
|
||||
HoodieCommandBlock commandBlock = new HoodieCommandBlock(header);
|
||||
writer = writer.appendBlock(commandBlock);
|
||||
writer.appendBlock(commandBlock);
|
||||
|
||||
List<String> allLogFiles =
|
||||
FSUtils.getAllLogFiles(fs, partitionPath, "test-fileid1", HoodieLogFile.DELTA_EXTENSION, "100")
|
||||
@@ -1008,10 +999,6 @@ public class TestHoodieLogFormat extends HoodieCommonTestHarness {
|
||||
writer = writer.appendBlock(dataBlock);
|
||||
writer = writer.appendBlock(dataBlock);
|
||||
|
||||
List<String> originalKeys =
|
||||
copyOfRecords1.stream().map(s -> ((GenericRecord) s).get(HoodieRecord.RECORD_KEY_METADATA_FIELD).toString())
|
||||
.collect(Collectors.toList());
|
||||
|
||||
// Delete 50 keys
|
||||
// Delete 50 keys
|
||||
List<HoodieKey> deletedKeys = copyOfRecords1.stream()
|
||||
@@ -1026,7 +1013,7 @@ public class TestHoodieLogFormat extends HoodieCommonTestHarness {
|
||||
header.put(HoodieLogBlock.HeaderMetadataType.COMMAND_BLOCK_TYPE,
|
||||
String.valueOf(HoodieCommandBlock.HoodieCommandBlockTypeEnum.ROLLBACK_PREVIOUS_BLOCK.ordinal()));
|
||||
HoodieCommandBlock commandBlock = new HoodieCommandBlock(header);
|
||||
writer = writer.appendBlock(commandBlock);
|
||||
writer.appendBlock(commandBlock);
|
||||
|
||||
List<String> allLogFiles =
|
||||
FSUtils.getAllLogFiles(fs, partitionPath, "test-fileid1", HoodieLogFile.DELTA_EXTENSION, "100")
|
||||
@@ -1275,8 +1262,6 @@ public class TestHoodieLogFormat extends HoodieCommonTestHarness {
|
||||
HoodieLogFormat.newWriterBuilder().onParentPath(partitionPath).withFileExtension(HoodieLogFile.DELTA_EXTENSION)
|
||||
.withFileId("test-fileid1").overBaseCommit("100").withFs(fs).build();
|
||||
List<IndexedRecord> records2 = SchemaTestUtil.generateTestRecords(0, 100);
|
||||
List<IndexedRecord> copyOfRecords2 = records2.stream()
|
||||
.map(record -> HoodieAvroUtils.rewriteRecord((GenericRecord) record, schema)).collect(Collectors.toList());
|
||||
dataBlock = new HoodieAvroDataBlock(records2, header);
|
||||
writer = writer.appendBlock(dataBlock);
|
||||
writer.close();
|
||||
@@ -1286,8 +1271,6 @@ public class TestHoodieLogFormat extends HoodieCommonTestHarness {
|
||||
HoodieLogFormat.newWriterBuilder().onParentPath(partitionPath).withFileExtension(HoodieLogFile.DELTA_EXTENSION)
|
||||
.withFileId("test-fileid1").overBaseCommit("100").withFs(fs).build();
|
||||
List<IndexedRecord> records3 = SchemaTestUtil.generateTestRecords(0, 100);
|
||||
List<IndexedRecord> copyOfRecords3 = records3.stream()
|
||||
.map(record -> HoodieAvroUtils.rewriteRecord((GenericRecord) record, schema)).collect(Collectors.toList());
|
||||
dataBlock = new HoodieAvroDataBlock(records3, header);
|
||||
writer = writer.appendBlock(dataBlock);
|
||||
writer.close();
|
||||
|
||||
@@ -137,7 +137,7 @@ public class TestHoodieLogFormatAppendFailure {
|
||||
.withFileExtension(HoodieArchivedLogFile.ARCHIVE_EXTENSION).withFileId("commits.archive")
|
||||
.overBaseCommit("").withFs(fs).build();
|
||||
// The log version should be different for this new writer
|
||||
Assert.assertFalse(writer.getLogFile().getLogVersion() == logFileVersion);
|
||||
Assert.assertNotEquals(writer.getLogFile().getLogVersion(), logFileVersion);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@@ -22,9 +22,7 @@ import org.apache.hudi.common.table.HoodieTimeline;
|
||||
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
|
||||
import org.apache.hudi.common.table.timeline.HoodieInstant;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Comparator;
|
||||
import java.util.function.Function;
|
||||
import java.util.stream.Collectors;
|
||||
import java.util.stream.Stream;
|
||||
|
||||
@@ -33,16 +31,11 @@ import java.util.stream.Stream;
|
||||
*/
|
||||
public class MockHoodieTimeline extends HoodieActiveTimeline {
|
||||
|
||||
public MockHoodieTimeline(Stream<String> completed, Stream<String> inflights) throws IOException {
|
||||
public MockHoodieTimeline(Stream<String> completed, Stream<String> inflights) {
|
||||
super();
|
||||
this.setInstants(Stream
|
||||
.concat(completed.map(s -> new HoodieInstant(false, HoodieTimeline.COMMIT_ACTION, s)),
|
||||
inflights.map(s -> new HoodieInstant(true, HoodieTimeline.COMMIT_ACTION, s)))
|
||||
.sorted(Comparator.comparing(new Function<HoodieInstant, String>() {
|
||||
@Override
|
||||
public String apply(HoodieInstant hoodieInstant) {
|
||||
return hoodieInstant.getFileName();
|
||||
}
|
||||
})).collect(Collectors.toList()));
|
||||
.sorted(Comparator.comparing(HoodieInstant::getFileName)).collect(Collectors.toList()));
|
||||
}
|
||||
}
|
||||
|
||||
@@ -31,7 +31,6 @@ import org.junit.Rule;
|
||||
import org.junit.Test;
|
||||
import org.junit.rules.ExpectedException;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.stream.Stream;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
@@ -53,7 +52,7 @@ public class TestHoodieActiveTimeline extends HoodieCommonTestHarness {
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testLoadingInstantsFromFiles() throws IOException {
|
||||
public void testLoadingInstantsFromFiles() {
|
||||
HoodieInstant instant1 = new HoodieInstant(State.REQUESTED, HoodieTimeline.COMMIT_ACTION, "1");
|
||||
HoodieInstant instant2 = new HoodieInstant(State.REQUESTED, HoodieTimeline.COMMIT_ACTION, "3");
|
||||
HoodieInstant instant3 = new HoodieInstant(State.REQUESTED, HoodieTimeline.COMMIT_ACTION, "5");
|
||||
@@ -100,7 +99,7 @@ public class TestHoodieActiveTimeline extends HoodieCommonTestHarness {
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testTimelineOperationsBasic() throws Exception {
|
||||
public void testTimelineOperationsBasic() {
|
||||
timeline = new HoodieActiveTimeline(metaClient);
|
||||
assertTrue(timeline.empty());
|
||||
assertEquals("", 0, timeline.countInstants());
|
||||
@@ -112,7 +111,7 @@ public class TestHoodieActiveTimeline extends HoodieCommonTestHarness {
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testTimelineOperations() throws Exception {
|
||||
public void testTimelineOperations() {
|
||||
timeline = new MockHoodieTimeline(Stream.of("01", "03", "05", "07", "09", "11", "13", "15", "17", "19"),
|
||||
Stream.of("21", "23"));
|
||||
HoodieTestUtils.assertStreamEquals("", Stream.of("05", "07", "09", "11"), timeline.getCommitTimeline()
|
||||
|
||||
@@ -294,7 +294,7 @@ public class TestHoodieTableFileSystemView extends HoodieCommonTestHarness {
|
||||
assertEquals("Base-Instant must be compaction Instant", compactionRequestedTime,
|
||||
slices.get(0).getBaseInstantTime());
|
||||
assertFalse("Latest File Slice must not have data-file", slices.get(0).getDataFile().isPresent());
|
||||
assertTrue("Latest File Slice must not have any log-files", slices.get(0).getLogFiles().count() == 0);
|
||||
assertEquals("Latest File Slice must not have any log-files", 0, slices.get(0).getLogFiles().count());
|
||||
|
||||
// Fake delta-ingestion after compaction-requested
|
||||
String deltaInstantTime4 = "5";
|
||||
@@ -360,36 +360,28 @@ public class TestHoodieTableFileSystemView extends HoodieCommonTestHarness {
|
||||
assertEquals("Expect no data file to be returned", 0, dataFiles.size());
|
||||
} else {
|
||||
assertEquals("Expect only one data-file to be sent", 1, dataFiles.size());
|
||||
dataFiles.stream().forEach(df -> {
|
||||
assertEquals("Expect data-file for instant 1 be returned", df.getCommitTime(), instantTime1);
|
||||
});
|
||||
dataFiles.forEach(df -> assertEquals("Expect data-file for instant 1 be returned", df.getCommitTime(), instantTime1));
|
||||
}
|
||||
dataFiles = roView.getLatestDataFiles(partitionPath).collect(Collectors.toList());
|
||||
if (skipCreatingDataFile) {
|
||||
assertEquals("Expect no data file to be returned", 0, dataFiles.size());
|
||||
} else {
|
||||
assertEquals("Expect only one data-file to be sent", 1, dataFiles.size());
|
||||
dataFiles.stream().forEach(df -> {
|
||||
assertEquals("Expect data-file for instant 1 be returned", df.getCommitTime(), instantTime1);
|
||||
});
|
||||
dataFiles.forEach(df -> assertEquals("Expect data-file for instant 1 be returned", df.getCommitTime(), instantTime1));
|
||||
}
|
||||
dataFiles = roView.getLatestDataFilesBeforeOrOn(partitionPath, deltaInstantTime5).collect(Collectors.toList());
|
||||
if (skipCreatingDataFile) {
|
||||
assertEquals("Expect no data file to be returned", 0, dataFiles.size());
|
||||
} else {
|
||||
assertEquals("Expect only one data-file to be sent", 1, dataFiles.size());
|
||||
dataFiles.stream().forEach(df -> {
|
||||
assertEquals("Expect data-file for instant 1 be returned", df.getCommitTime(), instantTime1);
|
||||
});
|
||||
dataFiles.forEach(df -> assertEquals("Expect data-file for instant 1 be returned", df.getCommitTime(), instantTime1));
|
||||
}
|
||||
dataFiles = roView.getLatestDataFilesInRange(allInstantTimes).collect(Collectors.toList());
|
||||
if (skipCreatingDataFile) {
|
||||
assertEquals("Expect no data file to be returned", 0, dataFiles.size());
|
||||
} else {
|
||||
assertEquals("Expect only one data-file to be sent", 1, dataFiles.size());
|
||||
dataFiles.stream().forEach(df -> {
|
||||
assertEquals("Expect data-file for instant 1 be returned", df.getCommitTime(), instantTime1);
|
||||
});
|
||||
dataFiles.forEach(df -> assertEquals("Expect data-file for instant 1 be returned", df.getCommitTime(), instantTime1));
|
||||
}
|
||||
|
||||
/** Inflight/Orphan File-groups needs to be in the view **/
|
||||
@@ -517,24 +509,16 @@ public class TestHoodieTableFileSystemView extends HoodieCommonTestHarness {
|
||||
/** Data Files API tests */
|
||||
dataFiles = roView.getLatestDataFiles().collect(Collectors.toList());
|
||||
assertEquals("Expect only one data-file to be sent", 1, dataFiles.size());
|
||||
dataFiles.stream().forEach(df -> {
|
||||
assertEquals("Expect data-file created by compaction be returned", df.getCommitTime(), compactionRequestedTime);
|
||||
});
|
||||
dataFiles.forEach(df -> assertEquals("Expect data-file created by compaction be returned", df.getCommitTime(), compactionRequestedTime));
|
||||
dataFiles = roView.getLatestDataFiles(partitionPath).collect(Collectors.toList());
|
||||
assertEquals("Expect only one data-file to be sent", 1, dataFiles.size());
|
||||
dataFiles.stream().forEach(df -> {
|
||||
assertEquals("Expect data-file created by compaction be returned", df.getCommitTime(), compactionRequestedTime);
|
||||
});
|
||||
dataFiles.forEach(df -> assertEquals("Expect data-file created by compaction be returned", df.getCommitTime(), compactionRequestedTime));
|
||||
dataFiles = roView.getLatestDataFilesBeforeOrOn(partitionPath, deltaInstantTime5).collect(Collectors.toList());
|
||||
assertEquals("Expect only one data-file to be sent", 1, dataFiles.size());
|
||||
dataFiles.stream().forEach(df -> {
|
||||
assertEquals("Expect data-file created by compaction be returned", df.getCommitTime(), compactionRequestedTime);
|
||||
});
|
||||
dataFiles.forEach(df -> assertEquals("Expect data-file created by compaction be returned", df.getCommitTime(), compactionRequestedTime));
|
||||
dataFiles = roView.getLatestDataFilesInRange(allInstantTimes).collect(Collectors.toList());
|
||||
assertEquals("Expect only one data-file to be sent", 1, dataFiles.size());
|
||||
dataFiles.stream().forEach(df -> {
|
||||
assertEquals("Expect data-file created by compaction be returned", df.getCommitTime(), compactionRequestedTime);
|
||||
});
|
||||
dataFiles.forEach(df -> assertEquals("Expect data-file created by compaction be returned", df.getCommitTime(), compactionRequestedTime));
|
||||
|
||||
assertEquals("Total number of file-slices in partitions matches expected", expTotalFileSlices,
|
||||
rtView.getAllFileSlices(partitionPath).count());
|
||||
@@ -552,7 +536,7 @@ public class TestHoodieTableFileSystemView extends HoodieCommonTestHarness {
|
||||
String fileId = UUID.randomUUID().toString();
|
||||
|
||||
assertFalse("No commit, should not find any data file", roView.getLatestDataFiles(partitionPath)
|
||||
.filter(dfile -> dfile.getFileId().equals(fileId)).findFirst().isPresent());
|
||||
.anyMatch(dfile -> dfile.getFileId().equals(fileId)));
|
||||
|
||||
// Only one commit, but is not safe
|
||||
String commitTime1 = "1";
|
||||
@@ -560,7 +544,7 @@ public class TestHoodieTableFileSystemView extends HoodieCommonTestHarness {
|
||||
new File(basePath + "/" + partitionPath + "/" + fileName1).createNewFile();
|
||||
refreshFsView();
|
||||
assertFalse("No commit, should not find any data file", roView.getLatestDataFiles(partitionPath)
|
||||
.filter(dfile -> dfile.getFileId().equals(fileId)).findFirst().isPresent());
|
||||
.anyMatch(dfile -> dfile.getFileId().equals(fileId)));
|
||||
|
||||
// Make this commit safe
|
||||
HoodieActiveTimeline commitTimeline = metaClient.getActiveTimeline();
|
||||
@@ -658,7 +642,7 @@ public class TestHoodieTableFileSystemView extends HoodieCommonTestHarness {
|
||||
List<FileSlice> allSlices = rtView.getAllFileSlices("2016/05/01").collect(Collectors.toList());
|
||||
assertEquals(isLatestFileSliceOnly ? 4 : 8, allSlices.size());
|
||||
Map<String, Long> fileSliceMap =
|
||||
allSlices.stream().collect(Collectors.groupingBy(slice -> slice.getFileId(), Collectors.counting()));
|
||||
allSlices.stream().collect(Collectors.groupingBy(FileSlice::getFileId, Collectors.counting()));
|
||||
assertEquals(isLatestFileSliceOnly ? 1 : 2, fileSliceMap.get(fileId1).longValue());
|
||||
assertEquals(isLatestFileSliceOnly ? 1 : 3, fileSliceMap.get(fileId2).longValue());
|
||||
assertEquals(isLatestFileSliceOnly ? 1 : 2, fileSliceMap.get(fileId3).longValue());
|
||||
@@ -677,7 +661,7 @@ public class TestHoodieTableFileSystemView extends HoodieCommonTestHarness {
|
||||
|
||||
filenames = Sets.newHashSet();
|
||||
List<HoodieLogFile> logFilesList = rtView.getLatestFileSlicesBeforeOrOn("2016/05/01", commitTime4, true)
|
||||
.map(slice -> slice.getLogFiles()).flatMap(logFileList -> logFileList).collect(Collectors.toList());
|
||||
.map(FileSlice::getLogFiles).flatMap(logFileList -> logFileList).collect(Collectors.toList());
|
||||
assertEquals(logFilesList.size(), 4);
|
||||
for (HoodieLogFile logFile : logFilesList) {
|
||||
filenames.add(logFile.getFileName());
|
||||
@@ -709,10 +693,9 @@ public class TestHoodieTableFileSystemView extends HoodieCommonTestHarness {
|
||||
}
|
||||
|
||||
logFilesList = rtView.getLatestFileSlicesBeforeOrOn("2016/05/01", commitTime3, true)
|
||||
.map(slice -> slice.getLogFiles()).flatMap(logFileList -> logFileList).collect(Collectors.toList());
|
||||
.map(FileSlice::getLogFiles).flatMap(logFileList -> logFileList).collect(Collectors.toList());
|
||||
assertEquals(logFilesList.size(), 1);
|
||||
assertTrue(logFilesList.get(0).getFileName()
|
||||
.equals(FSUtils.makeLogFileName(fileId2, HoodieLogFile.DELTA_EXTENSION, commitTime3, 0, TEST_WRITE_TOKEN)));
|
||||
assertEquals(logFilesList.get(0).getFileName(), FSUtils.makeLogFileName(fileId2, HoodieLogFile.DELTA_EXTENSION, commitTime3, 0, TEST_WRITE_TOKEN));
|
||||
}
|
||||
|
||||
@Test
|
||||
@@ -1091,8 +1074,6 @@ public class TestHoodieTableFileSystemView extends HoodieCommonTestHarness {
|
||||
// Fake delta-ingestion after compaction-requested
|
||||
String deltaInstantTime4 = "4";
|
||||
String deltaInstantTime5 = "6";
|
||||
List<String> allInstantTimes = Arrays.asList(instantTime1, deltaInstantTime1, deltaInstantTime2,
|
||||
compactionRequestedTime, deltaInstantTime4, deltaInstantTime5);
|
||||
String fileName3 =
|
||||
FSUtils.makeLogFileName(fileId, HoodieLogFile.DELTA_EXTENSION, compactionRequestedTime, 0, TEST_WRITE_TOKEN);
|
||||
String fileName4 =
|
||||
@@ -1119,7 +1100,7 @@ public class TestHoodieTableFileSystemView extends HoodieCommonTestHarness {
|
||||
assertEquals("Expect only valid commit", "1", dataFiles.get(0).getCommitTime());
|
||||
|
||||
/** Merge API Tests **/
|
||||
Arrays.asList(partitionPath1, partitionPath2, partitionPath3).stream().forEach(partitionPath -> {
|
||||
Arrays.asList(partitionPath1, partitionPath2, partitionPath3).forEach(partitionPath -> {
|
||||
List<FileSlice> fileSliceList =
|
||||
rtView.getLatestMergedFileSlicesBeforeOrOn(partitionPath, deltaInstantTime5).collect(Collectors.toList());
|
||||
assertEquals("Expect file-slice to be merged", 1, fileSliceList.size());
|
||||
|
||||
@@ -173,9 +173,6 @@ public class TestIncrementalFSViewSync extends HoodieCommonTestHarness {
|
||||
|
||||
metaClient.reloadActiveTimeline();
|
||||
SyncableFileSystemView newView = getFileSystemView(metaClient);
|
||||
for (String partition : partitions) {
|
||||
newView.getAllFileGroups(partition).count();
|
||||
}
|
||||
|
||||
areViewsConsistent(view, newView, 0L);
|
||||
|
||||
@@ -221,7 +218,6 @@ public class TestIncrementalFSViewSync extends HoodieCommonTestHarness {
|
||||
view2.sync();
|
||||
SyncableFileSystemView view3 =
|
||||
getFileSystemView(new HoodieTableMetaClient(metaClient.getHadoopConf(), metaClient.getBasePath()));
|
||||
partitions.stream().forEach(p -> view3.getLatestFileSlices(p).count());
|
||||
view3.sync();
|
||||
areViewsConsistent(view1, view2, partitions.size() * fileIdsPerPartition.size());
|
||||
|
||||
@@ -234,7 +230,6 @@ public class TestIncrementalFSViewSync extends HoodieCommonTestHarness {
|
||||
areViewsConsistent(view1, view2, partitions.size() * fileIdsPerPartition.size());
|
||||
SyncableFileSystemView view4 =
|
||||
getFileSystemView(new HoodieTableMetaClient(metaClient.getHadoopConf(), metaClient.getBasePath()));
|
||||
partitions.stream().forEach(p -> view4.getLatestFileSlices(p).count());
|
||||
view4.sync();
|
||||
|
||||
/**
|
||||
@@ -249,7 +244,6 @@ public class TestIncrementalFSViewSync extends HoodieCommonTestHarness {
|
||||
areViewsConsistent(view1, view2, partitions.size() * fileIdsPerPartition.size() * 2);
|
||||
SyncableFileSystemView view5 =
|
||||
getFileSystemView(new HoodieTableMetaClient(metaClient.getHadoopConf(), metaClient.getBasePath()));
|
||||
partitions.stream().forEach(p -> view5.getLatestFileSlices(p).count());
|
||||
view5.sync();
|
||||
|
||||
/**
|
||||
@@ -269,7 +263,6 @@ public class TestIncrementalFSViewSync extends HoodieCommonTestHarness {
|
||||
areViewsConsistent(view1, view2, partitions.size() * fileIdsPerPartition.size() * 2);
|
||||
SyncableFileSystemView view6 =
|
||||
getFileSystemView(new HoodieTableMetaClient(metaClient.getHadoopConf(), metaClient.getBasePath()));
|
||||
partitions.stream().forEach(p -> view5.getLatestFileSlices(p).count());
|
||||
view6.sync();
|
||||
|
||||
/**
|
||||
@@ -284,7 +277,7 @@ public class TestIncrementalFSViewSync extends HoodieCommonTestHarness {
|
||||
testMultipleWriteSteps(view2, Arrays.asList("28"), false, "28", 3,
|
||||
Arrays.asList(new HoodieInstant(State.COMPLETED, HoodieTimeline.DELTA_COMMIT_ACTION, "29")));
|
||||
|
||||
Arrays.asList(view1, view2, view3, view4, view5, view6).stream().forEach(v -> {
|
||||
Arrays.asList(view1, view2, view3, view4, view5, view6).forEach(v -> {
|
||||
v.sync();
|
||||
areViewsConsistent(v, view1, partitions.size() * fileIdsPerPartition.size() * 3);
|
||||
});
|
||||
@@ -318,16 +311,15 @@ public class TestIncrementalFSViewSync extends HoodieCommonTestHarness {
|
||||
Map<String, List<String>> deltaInstantMap, Map<String, List<String>> instantsToFiles,
|
||||
List<String> cleanedInstants) {
|
||||
Assert.assertEquals(newCleanerInstants.size(), cleanedInstants.size());
|
||||
long initialFileSlices = partitions.stream().mapToLong(p -> view.getAllFileSlices(p).count()).findAny().getAsLong();
|
||||
long exp = initialFileSlices;
|
||||
long exp = partitions.stream().mapToLong(p1 -> view.getAllFileSlices(p1).count()).findAny().getAsLong();
|
||||
LOG.info("Initial File Slices :" + exp);
|
||||
for (int idx = 0; idx < newCleanerInstants.size(); idx++) {
|
||||
String instant = cleanedInstants.get(idx);
|
||||
try {
|
||||
List<String> filesToDelete = new ArrayList<>(instantsToFiles.get(instant));
|
||||
deltaInstantMap.get(instant).stream().forEach(n -> filesToDelete.addAll(instantsToFiles.get(n)));
|
||||
deltaInstantMap.get(instant).forEach(n -> filesToDelete.addAll(instantsToFiles.get(n)));
|
||||
|
||||
performClean(view, instant, filesToDelete, newCleanerInstants.get(idx));
|
||||
performClean(instant, filesToDelete, newCleanerInstants.get(idx));
|
||||
|
||||
exp -= fileIdsPerPartition.size();
|
||||
final long expTotalFileSlicesPerPartition = exp;
|
||||
@@ -346,9 +338,6 @@ public class TestIncrementalFSViewSync extends HoodieCommonTestHarness {
|
||||
|
||||
metaClient.reloadActiveTimeline();
|
||||
SyncableFileSystemView newView = getFileSystemView(metaClient);
|
||||
for (String partition : partitions) {
|
||||
newView.getAllFileGroups(partition).count();
|
||||
}
|
||||
areViewsConsistent(view, newView, expTotalFileSlicesPerPartition * partitions.size());
|
||||
} catch (IOException e) {
|
||||
throw new HoodieException(e);
|
||||
@@ -368,13 +357,13 @@ public class TestIncrementalFSViewSync extends HoodieCommonTestHarness {
|
||||
*/
|
||||
private void testRestore(SyncableFileSystemView view, List<String> newRestoreInstants, boolean isDeltaCommit,
|
||||
Map<String, List<String>> instantsToFiles, List<String> rolledBackInstants, String emptyRestoreInstant,
|
||||
boolean isRestore) throws IOException {
|
||||
boolean isRestore) {
|
||||
Assert.assertEquals(newRestoreInstants.size(), rolledBackInstants.size());
|
||||
long initialFileSlices = partitions.stream().mapToLong(p -> view.getAllFileSlices(p).count()).findAny().getAsLong();
|
||||
IntStream.range(0, newRestoreInstants.size()).forEach(idx -> {
|
||||
String instant = rolledBackInstants.get(idx);
|
||||
try {
|
||||
performRestore(view, instant, instantsToFiles.get(instant), newRestoreInstants.get(idx), isRestore);
|
||||
performRestore(instant, instantsToFiles.get(instant), newRestoreInstants.get(idx), isRestore);
|
||||
final long expTotalFileSlicesPerPartition =
|
||||
isDeltaCommit ? initialFileSlices : initialFileSlices - ((idx + 1) * fileIdsPerPartition.size());
|
||||
view.sync();
|
||||
@@ -397,9 +386,6 @@ public class TestIncrementalFSViewSync extends HoodieCommonTestHarness {
|
||||
|
||||
metaClient.reloadActiveTimeline();
|
||||
SyncableFileSystemView newView = getFileSystemView(metaClient);
|
||||
for (String partition : partitions) {
|
||||
newView.getAllFileGroups(partition).count();
|
||||
}
|
||||
areViewsConsistent(view, newView, expTotalFileSlicesPerPartition * partitions.size());
|
||||
} catch (IOException e) {
|
||||
throw new HoodieException(e);
|
||||
@@ -410,18 +396,16 @@ public class TestIncrementalFSViewSync extends HoodieCommonTestHarness {
|
||||
/**
|
||||
* Simulate a Cleaner operation cleaning up an instant.
|
||||
*
|
||||
* @param view Hoodie View
|
||||
* @param instant Instant to be cleaner
|
||||
* @param files List of files to be deleted
|
||||
* @param cleanInstant Cleaner Instant
|
||||
*/
|
||||
private void performClean(SyncableFileSystemView view, String instant, List<String> files, String cleanInstant)
|
||||
private void performClean(String instant, List<String> files, String cleanInstant)
|
||||
throws IOException {
|
||||
Map<String, List<String>> partititonToFiles = deleteFiles(files);
|
||||
List<HoodieCleanStat> cleanStats = partititonToFiles.entrySet().stream().map(e -> {
|
||||
return new HoodieCleanStat(HoodieCleaningPolicy.KEEP_LATEST_COMMITS, e.getKey(), e.getValue(), e.getValue(),
|
||||
new ArrayList<>(), Integer.toString(Integer.parseInt(instant) + 1));
|
||||
}).collect(Collectors.toList());
|
||||
List<HoodieCleanStat> cleanStats = partititonToFiles.entrySet().stream().map(e ->
|
||||
new HoodieCleanStat(HoodieCleaningPolicy.KEEP_LATEST_COMMITS, e.getKey(), e.getValue(), e.getValue(),
|
||||
new ArrayList<>(), Integer.toString(Integer.parseInt(instant) + 1))).collect(Collectors.toList());
|
||||
|
||||
HoodieInstant cleanInflightInstant = new HoodieInstant(true, HoodieTimeline.CLEAN_ACTION, cleanInstant);
|
||||
metaClient.getActiveTimeline().createNewInstant(cleanInflightInstant);
|
||||
@@ -434,17 +418,16 @@ public class TestIncrementalFSViewSync extends HoodieCommonTestHarness {
|
||||
/**
|
||||
* Simulate Restore of an instant in timeline and fsview.
|
||||
*
|
||||
* @param view Hoodie View
|
||||
* @param instant Instant to be rolled-back
|
||||
* @param files List of files to be deleted as part of rollback
|
||||
* @param rollbackInstant Restore Instant
|
||||
*/
|
||||
private void performRestore(SyncableFileSystemView view, String instant, List<String> files, String rollbackInstant,
|
||||
private void performRestore(String instant, List<String> files, String rollbackInstant,
|
||||
boolean isRestore) throws IOException {
|
||||
Map<String, List<String>> partititonToFiles = deleteFiles(files);
|
||||
List<HoodieRollbackStat> rollbackStats = partititonToFiles.entrySet().stream().map(e -> {
|
||||
return new HoodieRollbackStat(e.getKey(), e.getValue(), new ArrayList<>(), new HashMap<>());
|
||||
}).collect(Collectors.toList());
|
||||
List<HoodieRollbackStat> rollbackStats = partititonToFiles.entrySet().stream().map(e ->
|
||||
new HoodieRollbackStat(e.getKey(), e.getValue(), new ArrayList<>(), new HashMap<>())
|
||||
).collect(Collectors.toList());
|
||||
|
||||
List<String> rollbacks = new ArrayList<>();
|
||||
rollbacks.add(instant);
|
||||
@@ -491,7 +474,7 @@ public class TestIncrementalFSViewSync extends HoodieCommonTestHarness {
|
||||
for (String f : files) {
|
||||
String fullPath = String.format("%s/%s", metaClient.getBasePath(), f);
|
||||
new File(fullPath).delete();
|
||||
String partition = partitions.stream().filter(p -> f.startsWith(p)).findAny().get();
|
||||
String partition = partitions.stream().filter(f::startsWith).findAny().get();
|
||||
partititonToFiles.get(partition).add(fullPath);
|
||||
}
|
||||
return partititonToFiles;
|
||||
@@ -515,7 +498,7 @@ public class TestIncrementalFSViewSync extends HoodieCommonTestHarness {
|
||||
AvroUtils.serializeCompactionPlan(plan));
|
||||
|
||||
view.sync();
|
||||
partitions.stream().forEach(p -> {
|
||||
partitions.forEach(p -> {
|
||||
view.getLatestFileSlices(p).forEach(fs -> {
|
||||
Assert.assertEquals(instantTime, fs.getBaseInstantTime());
|
||||
Assert.assertEquals(p, fs.getPartitionPath());
|
||||
@@ -530,7 +513,6 @@ public class TestIncrementalFSViewSync extends HoodieCommonTestHarness {
|
||||
|
||||
metaClient.reloadActiveTimeline();
|
||||
SyncableFileSystemView newView = getFileSystemView(metaClient);
|
||||
partitions.forEach(p -> newView.getLatestFileSlices(p).count());
|
||||
areViewsConsistent(view, newView, initialExpTotalFileSlices + partitions.size() * fileIdsPerPartition.size());
|
||||
}
|
||||
|
||||
@@ -550,11 +532,9 @@ public class TestIncrementalFSViewSync extends HoodieCommonTestHarness {
|
||||
|
||||
view.sync();
|
||||
Assert.assertEquals(newLastInstant, view.getLastInstant().get().getTimestamp());
|
||||
partitions.stream().forEach(p -> {
|
||||
view.getLatestFileSlices(p).forEach(fs -> {
|
||||
Assert.assertEquals(newBaseInstant, fs.getBaseInstantTime());
|
||||
});
|
||||
});
|
||||
partitions.forEach(p -> view.getLatestFileSlices(p).forEach(fs -> {
|
||||
Assert.assertEquals(newBaseInstant, fs.getBaseInstantTime());
|
||||
}));
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -653,9 +633,6 @@ public class TestIncrementalFSViewSync extends HoodieCommonTestHarness {
|
||||
|
||||
metaClient.reloadActiveTimeline();
|
||||
SyncableFileSystemView newView = getFileSystemView(metaClient);
|
||||
for (String partition : partitions) {
|
||||
newView.getAllFileGroups(partition).count();
|
||||
}
|
||||
areViewsConsistent(view, newView, fileIdsPerPartition.size() * partitions.size() * multiple);
|
||||
instantToFiles.put(instant, filePaths);
|
||||
if (!deltaCommit) {
|
||||
@@ -680,10 +657,10 @@ public class TestIncrementalFSViewSync extends HoodieCommonTestHarness {
|
||||
Iterators.elementsEqual(timeline1.getInstants().iterator(), timeline2.getInstants().iterator());
|
||||
|
||||
// View Checks
|
||||
Map<HoodieFileGroupId, HoodieFileGroup> fileGroupsMap1 = partitions.stream().flatMap(p -> view1.getAllFileGroups(p))
|
||||
.collect(Collectors.toMap(fg -> fg.getFileGroupId(), fg -> fg));
|
||||
Map<HoodieFileGroupId, HoodieFileGroup> fileGroupsMap2 = partitions.stream().flatMap(p -> view2.getAllFileGroups(p))
|
||||
.collect(Collectors.toMap(fg -> fg.getFileGroupId(), fg -> fg));
|
||||
Map<HoodieFileGroupId, HoodieFileGroup> fileGroupsMap1 = partitions.stream().flatMap(view1::getAllFileGroups)
|
||||
.collect(Collectors.toMap(HoodieFileGroup::getFileGroupId, fg -> fg));
|
||||
Map<HoodieFileGroupId, HoodieFileGroup> fileGroupsMap2 = partitions.stream().flatMap(view2::getAllFileGroups)
|
||||
.collect(Collectors.toMap(HoodieFileGroup::getFileGroupId, fg -> fg));
|
||||
Assert.assertEquals(fileGroupsMap1.keySet(), fileGroupsMap2.keySet());
|
||||
long gotSlicesCount = fileGroupsMap1.keySet().stream()
|
||||
.map(k -> Pair.of(fileGroupsMap1.get(k), fileGroupsMap2.get(k))).mapToLong(e -> {
|
||||
|
||||
@@ -128,7 +128,7 @@ public class CompactionTestUtils {
|
||||
AvroUtils.serializeCompactionPlan(compactionPlan));
|
||||
}
|
||||
|
||||
public static void createDeltaCommit(HoodieTableMetaClient metaClient, String instantTime) throws IOException {
|
||||
public static void createDeltaCommit(HoodieTableMetaClient metaClient, String instantTime) {
|
||||
HoodieInstant requested = new HoodieInstant(State.REQUESTED, DELTA_COMMIT_ACTION, instantTime);
|
||||
metaClient.getActiveTimeline().createNewInstant(requested);
|
||||
metaClient.getActiveTimeline().transitionRequestedToInflight(requested, Option.empty());
|
||||
|
||||
@@ -124,7 +124,7 @@ public class SchemaTestUtil {
|
||||
}
|
||||
|
||||
public static List<IndexedRecord> updateHoodieTestRecords(List<String> oldRecordKeys, List<IndexedRecord> newRecords,
|
||||
String commitTime) throws IOException, URISyntaxException {
|
||||
String commitTime) {
|
||||
|
||||
return newRecords.stream().map(p -> {
|
||||
((GenericRecord) p).put(HoodieRecord.RECORD_KEY_METADATA_FIELD, oldRecordKeys.remove(0));
|
||||
@@ -144,7 +144,7 @@ public class SchemaTestUtil {
|
||||
}
|
||||
|
||||
public static List<HoodieRecord> updateHoodieTestRecordsWithoutHoodieMetadata(List<HoodieRecord> oldRecords,
|
||||
Schema schema, String fieldNameToUpdate, String newValue) throws IOException, URISyntaxException {
|
||||
Schema schema, String fieldNameToUpdate, String newValue) {
|
||||
return oldRecords.stream().map(r -> {
|
||||
try {
|
||||
GenericRecord rec = (GenericRecord) r.getData().getInsertValue(schema).get();
|
||||
|
||||
@@ -42,7 +42,7 @@ public class SpillableMapTestUtils {
|
||||
public static List<String> upsertRecords(List<IndexedRecord> iRecords,
|
||||
Map<String, HoodieRecord<? extends HoodieRecordPayload>> records) {
|
||||
List<String> recordKeys = new ArrayList<>();
|
||||
iRecords.stream().forEach(r -> {
|
||||
iRecords.forEach(r -> {
|
||||
String key = ((GenericRecord) r).get(HoodieRecord.RECORD_KEY_METADATA_FIELD).toString();
|
||||
String partitionPath = ((GenericRecord) r).get(HoodieRecord.PARTITION_PATH_METADATA_FIELD).toString();
|
||||
recordKeys.add(key);
|
||||
|
||||
@@ -31,6 +31,8 @@ import java.io.IOException;
|
||||
import java.io.PrintStream;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertFalse;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
import static org.junit.Assert.fail;
|
||||
|
||||
/**
|
||||
@@ -85,7 +87,7 @@ public class TestDFSPropertiesConfiguration {
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testParsing() throws IOException {
|
||||
public void testParsing() {
|
||||
DFSPropertiesConfiguration cfg = new DFSPropertiesConfiguration(dfs, new Path(dfsBasePath + "/t1.props"));
|
||||
TypedProperties props = cfg.getConfig();
|
||||
assertEquals(5, props.size());
|
||||
@@ -98,19 +100,19 @@ public class TestDFSPropertiesConfiguration {
|
||||
|
||||
assertEquals(123, props.getInteger("int.prop"));
|
||||
assertEquals(113.4, props.getDouble("double.prop"), 0.001);
|
||||
assertEquals(true, props.getBoolean("boolean.prop"));
|
||||
assertTrue(props.getBoolean("boolean.prop"));
|
||||
assertEquals("str", props.getString("string.prop"));
|
||||
assertEquals(1354354354, props.getLong("long.prop"));
|
||||
|
||||
assertEquals(123, props.getInteger("int.prop", 456));
|
||||
assertEquals(113.4, props.getDouble("double.prop", 223.4), 0.001);
|
||||
assertEquals(true, props.getBoolean("boolean.prop", false));
|
||||
assertTrue(props.getBoolean("boolean.prop", false));
|
||||
assertEquals("str", props.getString("string.prop", "default"));
|
||||
assertEquals(1354354354, props.getLong("long.prop", 8578494434L));
|
||||
|
||||
assertEquals(456, props.getInteger("bad.int.prop", 456));
|
||||
assertEquals(223.4, props.getDouble("bad.double.prop", 223.4), 0.001);
|
||||
assertEquals(false, props.getBoolean("bad.boolean.prop", false));
|
||||
assertFalse(props.getBoolean("bad.boolean.prop", false));
|
||||
assertEquals("default", props.getString("bad.string.prop", "default"));
|
||||
assertEquals(8578494434L, props.getLong("bad.long.prop", 8578494434L));
|
||||
}
|
||||
@@ -122,7 +124,7 @@ public class TestDFSPropertiesConfiguration {
|
||||
|
||||
assertEquals(123, props.getInteger("int.prop"));
|
||||
assertEquals(243.4, props.getDouble("double.prop"), 0.001);
|
||||
assertEquals(true, props.getBoolean("boolean.prop"));
|
||||
assertTrue(props.getBoolean("boolean.prop"));
|
||||
assertEquals("t3.value", props.getString("string.prop"));
|
||||
assertEquals(1354354354, props.getLong("long.prop"));
|
||||
|
||||
|
||||
@@ -39,11 +39,10 @@ import java.util.Arrays;
|
||||
import java.util.Date;
|
||||
import java.util.List;
|
||||
import java.util.UUID;
|
||||
import java.util.regex.Pattern;
|
||||
import java.util.stream.Collectors;
|
||||
import java.util.stream.Stream;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
|
||||
/**
|
||||
* Tests file system utils.
|
||||
@@ -63,18 +62,15 @@ public class TestFSUtils extends HoodieCommonTestHarness {
|
||||
@Test
|
||||
public void testMakeDataFileName() {
|
||||
String commitTime = new SimpleDateFormat("yyyyMMddHHmmss").format(new Date());
|
||||
int taskPartitionId = 2;
|
||||
String fileName = UUID.randomUUID().toString();
|
||||
assertTrue(FSUtils.makeDataFileName(commitTime, TEST_WRITE_TOKEN, fileName)
|
||||
.equals(fileName + "_" + TEST_WRITE_TOKEN + "_" + commitTime + ".parquet"));
|
||||
assertEquals(FSUtils.makeDataFileName(commitTime, TEST_WRITE_TOKEN, fileName), fileName + "_" + TEST_WRITE_TOKEN + "_" + commitTime + ".parquet");
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testMaskFileName() {
|
||||
String commitTime = new SimpleDateFormat("yyyyMMddHHmmss").format(new Date());
|
||||
int taskPartitionId = 2;
|
||||
assertTrue(FSUtils.maskWithoutFileId(commitTime, taskPartitionId)
|
||||
.equals("*_" + taskPartitionId + "_" + commitTime + ".parquet"));
|
||||
assertEquals(FSUtils.maskWithoutFileId(commitTime, taskPartitionId), "*_" + taskPartitionId + "_" + commitTime + ".parquet");
|
||||
}
|
||||
|
||||
@Test
|
||||
@@ -89,7 +85,7 @@ public class TestFSUtils extends HoodieCommonTestHarness {
|
||||
// All directories including marker dirs.
|
||||
List<String> folders =
|
||||
Arrays.asList("2016/04/15", "2016/05/16", ".hoodie/.temp/2/2016/04/15", ".hoodie/.temp/2/2016/05/16");
|
||||
folders.stream().forEach(f -> {
|
||||
folders.forEach(f -> {
|
||||
try {
|
||||
metaClient.getFs().mkdirs(new Path(new Path(basePath), f));
|
||||
} catch (IOException e) {
|
||||
@@ -102,7 +98,7 @@ public class TestFSUtils extends HoodieCommonTestHarness {
|
||||
"2016/05/16/2_1-0-1_20190528120000.parquet", ".hoodie/.temp/2/2016/05/16/2_1-0-1_20190528120000.parquet",
|
||||
".hoodie/.temp/2/2016/04/15/1_1-0-1_20190528120000.parquet");
|
||||
|
||||
files.stream().forEach(f -> {
|
||||
files.forEach(f -> {
|
||||
try {
|
||||
metaClient.getFs().create(new Path(new Path(basePath), f));
|
||||
} catch (IOException e) {
|
||||
@@ -138,19 +134,17 @@ public class TestFSUtils extends HoodieCommonTestHarness {
|
||||
@Test
|
||||
public void testGetCommitTime() {
|
||||
String commitTime = new SimpleDateFormat("yyyyMMddHHmmss").format(new Date());
|
||||
int taskPartitionId = 2;
|
||||
String fileName = UUID.randomUUID().toString();
|
||||
String fullFileName = FSUtils.makeDataFileName(commitTime, TEST_WRITE_TOKEN, fileName);
|
||||
assertTrue(FSUtils.getCommitTime(fullFileName).equals(commitTime));
|
||||
assertEquals(FSUtils.getCommitTime(fullFileName), commitTime);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testGetFileNameWithoutMeta() {
|
||||
String commitTime = new SimpleDateFormat("yyyyMMddHHmmss").format(new Date());
|
||||
int taskPartitionId = 2;
|
||||
String fileName = UUID.randomUUID().toString();
|
||||
String fullFileName = FSUtils.makeDataFileName(commitTime, TEST_WRITE_TOKEN, fileName);
|
||||
assertTrue(FSUtils.getFileId(fullFileName).equals(fileName));
|
||||
assertEquals(FSUtils.getFileId(fullFileName), fileName);
|
||||
}
|
||||
|
||||
@Test
|
||||
@@ -234,7 +228,7 @@ public class TestFSUtils extends HoodieCommonTestHarness {
|
||||
String log1Ver0 = makeOldLogFileName("file1", ".log", "1", 0);
|
||||
String log1Ver1 = makeOldLogFileName("file1", ".log", "1", 1);
|
||||
String log1base2 = makeOldLogFileName("file1", ".log", "2", 0);
|
||||
List<HoodieLogFile> logFiles = Arrays.asList(log1base2, log1Ver1, log1Ver0).stream().map(f -> new HoodieLogFile(f))
|
||||
List<HoodieLogFile> logFiles = Stream.of(log1base2, log1Ver1, log1Ver0).map(HoodieLogFile::new)
|
||||
.collect(Collectors.toList());
|
||||
logFiles.sort(HoodieLogFile.getLogFileComparator());
|
||||
assertEquals(log1Ver0, logFiles.get(0).getFileName());
|
||||
@@ -255,8 +249,8 @@ public class TestFSUtils extends HoodieCommonTestHarness {
|
||||
String log1base2W1 = FSUtils.makeLogFileName("file1", ".log", "2", 0, "1-1-1");
|
||||
|
||||
List<HoodieLogFile> logFiles =
|
||||
Arrays.asList(log1Ver1W1, log1base2W0, log1base2W1, log1Ver1W0, log1Ver0W1, log1Ver0W0).stream()
|
||||
.map(f -> new HoodieLogFile(f)).collect(Collectors.toList());
|
||||
Stream.of(log1Ver1W1, log1base2W0, log1base2W1, log1Ver1W0, log1Ver0W1, log1Ver0W0)
|
||||
.map(HoodieLogFile::new).collect(Collectors.toList());
|
||||
logFiles.sort(HoodieLogFile.getLogFileComparator());
|
||||
assertEquals(log1Ver0W0, logFiles.get(0).getFileName());
|
||||
assertEquals(log1Ver0W1, logFiles.get(1).getFileName());
|
||||
@@ -267,7 +261,6 @@ public class TestFSUtils extends HoodieCommonTestHarness {
|
||||
}
|
||||
|
||||
public static String makeOldLogFileName(String fileId, String logFileExtension, String baseCommitTime, int version) {
|
||||
Pattern oldLogFilePattern = Pattern.compile("\\.(.*)_(.*)\\.(.*)\\.([0-9]*)(\\.([0-9]*))");
|
||||
return "." + String.format("%s_%s%s.%d", fileId, baseCommitTime, logFileExtension, version);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -44,15 +44,15 @@ public class TestHoodieAvroUtils {
|
||||
continue;
|
||||
}
|
||||
|
||||
Assert.assertTrue("field name is null", field.name() != null);
|
||||
Assert.assertNotNull("field name is null", field.name());
|
||||
Map<String, JsonNode> props = field.getJsonProps();
|
||||
Assert.assertTrue("The property is null", props != null);
|
||||
Assert.assertNotNull("The property is null", props);
|
||||
|
||||
if (field.name().equals("pii_col")) {
|
||||
piiPresent = true;
|
||||
Assert.assertTrue("sensitivity_level is removed in field 'pii_col'", props.containsKey("column_category"));
|
||||
} else {
|
||||
Assert.assertTrue("The property shows up but not set", props.size() == 0);
|
||||
Assert.assertEquals("The property shows up but not set", 0, props.size());
|
||||
}
|
||||
}
|
||||
Assert.assertTrue("column pii_col doesn't show up", piiPresent);
|
||||
|
||||
@@ -20,7 +20,7 @@ package org.apache.hudi.common.util;
|
||||
|
||||
import org.junit.Test;
|
||||
|
||||
import static org.junit.Assert.assertTrue;
|
||||
import static org.junit.Assert.assertEquals;
|
||||
|
||||
/**
|
||||
* Tests numeric utils.
|
||||
@@ -29,14 +29,14 @@ public class TestNumericUtils {
|
||||
|
||||
@Test
|
||||
public void testHumanReadableByteCount() {
|
||||
assertTrue(NumericUtils.humanReadableByteCount(0).equals("0.0 B"));
|
||||
assertTrue(NumericUtils.humanReadableByteCount(27).equals("27.0 B"));
|
||||
assertTrue(NumericUtils.humanReadableByteCount(1023).equals("1023.0 B"));
|
||||
assertTrue(NumericUtils.humanReadableByteCount(1024).equals("1.0 KB"));
|
||||
assertTrue(NumericUtils.humanReadableByteCount(110592).equals("108.0 KB"));
|
||||
assertTrue(NumericUtils.humanReadableByteCount(28991029248L).equals("27.0 GB"));
|
||||
assertTrue(NumericUtils.humanReadableByteCount(1855425871872L).equals("1.7 TB"));
|
||||
assertTrue(NumericUtils.humanReadableByteCount(9223372036854775807L).equals("8.0 EB"));
|
||||
assertEquals("0.0 B", NumericUtils.humanReadableByteCount(0));
|
||||
assertEquals("27.0 B", NumericUtils.humanReadableByteCount(27));
|
||||
assertEquals("1023.0 B", NumericUtils.humanReadableByteCount(1023));
|
||||
assertEquals("1.0 KB", NumericUtils.humanReadableByteCount(1024));
|
||||
assertEquals("108.0 KB", NumericUtils.humanReadableByteCount(110592));
|
||||
assertEquals("27.0 GB", NumericUtils.humanReadableByteCount(28991029248L));
|
||||
assertEquals("1.7 TB", NumericUtils.humanReadableByteCount(1855425871872L));
|
||||
assertEquals("8.0 EB", NumericUtils.humanReadableByteCount(9223372036854775807L));
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
@@ -29,7 +29,6 @@ import org.apache.hudi.common.model.HoodieTestUtils;
|
||||
import org.apache.avro.Schema;
|
||||
import org.apache.avro.generic.GenericData;
|
||||
import org.apache.avro.generic.GenericRecord;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.parquet.avro.AvroSchemaConverter;
|
||||
import org.apache.parquet.hadoop.ParquetWriter;
|
||||
@@ -101,16 +100,6 @@ public class TestParquetUtils extends HoodieCommonTestHarness {
|
||||
}
|
||||
}
|
||||
|
||||
private Configuration getConfiguration() {
|
||||
if (bloomFilterTypeToTest.equalsIgnoreCase(BloomFilterTypeCode.SIMPLE.name())) {
|
||||
return HoodieTestUtils.getDefaultHadoopConf();
|
||||
} else {
|
||||
org.apache.hadoop.conf.Configuration conf = HoodieTestUtils.getDefaultHadoopConf();
|
||||
// conf.set();
|
||||
return conf;
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testFilterParquetRowKeys() throws Exception {
|
||||
List<String> rowKeys = new ArrayList<>();
|
||||
|
||||
@@ -59,7 +59,7 @@ public class TestRocksDBManager {
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testRocksDBManager() throws Exception {
|
||||
public void testRocksDBManager() {
|
||||
String prefix1 = "prefix1_";
|
||||
String prefix2 = "prefix2_";
|
||||
String prefix3 = "prefix3_";
|
||||
@@ -77,11 +77,11 @@ public class TestRocksDBManager {
|
||||
return new Payload(prefix, key, val, family);
|
||||
}).collect(Collectors.toList());
|
||||
|
||||
colFamilies.stream().forEach(family -> dbManager.dropColumnFamily(family));
|
||||
colFamilies.stream().forEach(family -> dbManager.addColumnFamily(family));
|
||||
colFamilies.forEach(family -> dbManager.dropColumnFamily(family));
|
||||
colFamilies.forEach(family -> dbManager.addColumnFamily(family));
|
||||
|
||||
Map<String, Map<String, Integer>> countsMap = new HashMap<>();
|
||||
payloads.stream().forEach(payload -> {
|
||||
payloads.forEach(payload -> {
|
||||
dbManager.put(payload.getFamily(), payload.getKey(), payload);
|
||||
|
||||
if (!countsMap.containsKey(payload.family)) {
|
||||
@@ -95,21 +95,21 @@ public class TestRocksDBManager {
|
||||
c.put(payload.prefix, currCount + 1);
|
||||
});
|
||||
|
||||
colFamilies.stream().forEach(family -> {
|
||||
prefixes.stream().forEach(prefix -> {
|
||||
colFamilies.forEach(family -> {
|
||||
prefixes.forEach(prefix -> {
|
||||
List<Pair<String, Payload>> gotPayloads =
|
||||
dbManager.<Payload>prefixSearch(family, prefix).collect(Collectors.toList());
|
||||
Integer expCount = countsMap.get(family).get(prefix);
|
||||
Assert.assertEquals("Size check for prefix (" + prefix + ") and family (" + family + ")",
|
||||
expCount == null ? 0L : expCount.longValue(), gotPayloads.size());
|
||||
gotPayloads.stream().forEach(p -> {
|
||||
gotPayloads.forEach(p -> {
|
||||
Assert.assertEquals(p.getRight().getFamily(), family);
|
||||
Assert.assertTrue(p.getRight().getKey().startsWith(prefix));
|
||||
});
|
||||
});
|
||||
});
|
||||
|
||||
payloads.stream().forEach(payload -> {
|
||||
payloads.forEach(payload -> {
|
||||
Payload p = dbManager.get(payload.getFamily(), payload.getKey());
|
||||
Assert.assertEquals("Retrieved correct payload for key :" + payload.getKey(), payload, p);
|
||||
|
||||
@@ -122,8 +122,8 @@ public class TestRocksDBManager {
|
||||
});
|
||||
|
||||
// Now do a prefix search
|
||||
colFamilies.stream().forEach(family -> {
|
||||
prefixes.stream().forEach(prefix -> {
|
||||
colFamilies.forEach(family -> {
|
||||
prefixes.forEach(prefix -> {
|
||||
List<Pair<String, Payload>> gotPayloads =
|
||||
dbManager.<Payload>prefixSearch(family, prefix).collect(Collectors.toList());
|
||||
Assert.assertEquals("Size check for prefix (" + prefix + ") and family (" + family + ")", 0,
|
||||
|
||||
@@ -87,7 +87,7 @@ public class TestDiskBasedMap extends HoodieCommonTestHarness {
|
||||
List<HoodieRecord> hoodieRecords = SchemaTestUtil.generateHoodieTestRecordsWithoutHoodieMetadata(0, 1000);
|
||||
Set<String> recordKeys = new HashSet<>();
|
||||
// insert generated records into the map
|
||||
hoodieRecords.stream().forEach(r -> {
|
||||
hoodieRecords.forEach(r -> {
|
||||
records.put(r.getRecordKey(), r);
|
||||
recordKeys.add(r.getRecordKey());
|
||||
});
|
||||
|
||||
@@ -87,7 +87,6 @@ public class TestExternalSpillableMap extends HoodieCommonTestHarness {
|
||||
public void testSimpleUpsert() throws IOException, URISyntaxException {
|
||||
|
||||
Schema schema = HoodieAvroUtils.addMetadataFields(SchemaTestUtil.getSimpleSchema());
|
||||
String payloadClazz = HoodieAvroPayload.class.getName();
|
||||
|
||||
ExternalSpillableMap<String, HoodieRecord<? extends HoodieRecordPayload>> records =
|
||||
new ExternalSpillableMap<>(16L, basePath, new DefaultSizeEstimator(), new HoodieRecordSizeEstimator(schema)); // 16B
|
||||
@@ -110,7 +109,7 @@ public class TestExternalSpillableMap extends HoodieCommonTestHarness {
|
||||
assertTrue(records.getDiskBasedMapNumEntries() > 0);
|
||||
|
||||
// iterate over the updated records and compare the value from Map
|
||||
updatedRecords.stream().forEach(record -> {
|
||||
updatedRecords.forEach(record -> {
|
||||
HoodieRecord rec = records.get(((GenericRecord) record).get(HoodieRecord.RECORD_KEY_METADATA_FIELD));
|
||||
try {
|
||||
assertEquals(rec.getData().getInsertValue(schema).get(), record);
|
||||
@@ -196,7 +195,6 @@ public class TestExternalSpillableMap extends HoodieCommonTestHarness {
|
||||
public void testDataCorrectnessWithUpsertsToDataInMapAndOnDisk() throws IOException, URISyntaxException {
|
||||
|
||||
Schema schema = HoodieAvroUtils.addMetadataFields(SchemaTestUtil.getSimpleSchema());
|
||||
String payloadClazz = HoodieAvroPayload.class.getName();
|
||||
|
||||
ExternalSpillableMap<String, HoodieRecord<? extends HoodieRecordPayload>> records =
|
||||
new ExternalSpillableMap<>(16L, basePath, new DefaultSizeEstimator(), new HoodieRecordSizeEstimator(schema)); // 16B
|
||||
@@ -248,7 +246,6 @@ public class TestExternalSpillableMap extends HoodieCommonTestHarness {
|
||||
public void testDataCorrectnessWithoutHoodieMetadata() throws IOException, URISyntaxException {
|
||||
|
||||
Schema schema = SchemaTestUtil.getSimpleSchema();
|
||||
String payloadClazz = HoodieAvroPayload.class.getName();
|
||||
|
||||
ExternalSpillableMap<String, HoodieRecord<? extends HoodieRecordPayload>> records =
|
||||
new ExternalSpillableMap<>(16L, basePath, new DefaultSizeEstimator(), new HoodieRecordSizeEstimator(schema)); // 16B
|
||||
@@ -278,7 +275,7 @@ public class TestExternalSpillableMap extends HoodieCommonTestHarness {
|
||||
SchemaTestUtil.updateHoodieTestRecordsWithoutHoodieMetadata(recordsToUpdate, schema, fieldName, newValue);
|
||||
|
||||
// Upsert this updated record
|
||||
updatedRecords.stream().forEach(r -> {
|
||||
updatedRecords.forEach(r -> {
|
||||
records.put(r.getRecordKey(), r);
|
||||
});
|
||||
GenericRecord gRecord = (GenericRecord) records.get(key).getData().getInsertValue(schema).get();
|
||||
@@ -300,7 +297,7 @@ public class TestExternalSpillableMap extends HoodieCommonTestHarness {
|
||||
SchemaTestUtil.updateHoodieTestRecordsWithoutHoodieMetadata(recordsToUpdate, schema, fieldName, newValue);
|
||||
|
||||
// Upsert this updated record
|
||||
updatedRecords.stream().forEach(r -> {
|
||||
updatedRecords.forEach(r -> {
|
||||
records.put(r.getRecordKey(), r);
|
||||
});
|
||||
gRecord = (GenericRecord) records.get(key).getData().getInsertValue(schema).get();
|
||||
|
||||
Reference in New Issue
Block a user