1
0

Increase test coverage for HoodieReadClient

This commit is contained in:
Satish Kotha
2020-01-31 17:17:03 -08:00
committed by n3nash
parent fcf9e4aded
commit d07ac588ac
3 changed files with 68 additions and 22 deletions

View File

@@ -88,7 +88,8 @@ public class TestAsyncCompaction extends TestHoodieClientBase {
public void testRollbackForInflightCompaction() throws Exception {
// Rollback inflight compaction
HoodieWriteConfig cfg = getConfig(false);
try (HoodieWriteClient client = getHoodieWriteClient(cfg, true);) {
try (HoodieWriteClient client = getHoodieWriteClient(cfg, true);
HoodieReadClient readClient = getHoodieReadClient(cfg.getBasePath());) {
String firstInstantTime = "001";
String secondInstantTime = "004";
@@ -97,7 +98,7 @@ public class TestAsyncCompaction extends TestHoodieClientBase {
int numRecs = 2000;
List<HoodieRecord> records = dataGen.generateInserts(firstInstantTime, numRecs);
runNextDeltaCommits(client, Arrays.asList(firstInstantTime, secondInstantTime), records, cfg, true,
runNextDeltaCommits(client, readClient, Arrays.asList(firstInstantTime, secondInstantTime), records, cfg, true,
new ArrayList<>());
// Schedule compaction but do not run them
@@ -150,10 +151,11 @@ public class TestAsyncCompaction extends TestHoodieClientBase {
int numRecs = 2000;
try (HoodieWriteClient client = getHoodieWriteClient(cfg, true);) {
try (HoodieWriteClient client = getHoodieWriteClient(cfg, true);
HoodieReadClient readClient = getHoodieReadClient(cfg.getBasePath());) {
List<HoodieRecord> records = dataGen.generateInserts(firstInstantTime, numRecs);
records = runNextDeltaCommits(client, Arrays.asList(firstInstantTime, secondInstantTime), records, cfg, true,
records = runNextDeltaCommits(client, readClient, Arrays.asList(firstInstantTime, secondInstantTime), records, cfg, true,
new ArrayList<>());
// Schedule compaction but do not run them
@@ -191,7 +193,8 @@ public class TestAsyncCompaction extends TestHoodieClientBase {
public void testInflightCompaction() throws Exception {
// There is inflight compaction. Subsequent compaction run must work correctly
HoodieWriteConfig cfg = getConfig(true);
try (HoodieWriteClient client = getHoodieWriteClient(cfg, true);) {
try (HoodieWriteClient client = getHoodieWriteClient(cfg, true);
HoodieReadClient readClient = getHoodieReadClient(cfg.getBasePath());) {
String firstInstantTime = "001";
String secondInstantTime = "004";
@@ -202,7 +205,7 @@ public class TestAsyncCompaction extends TestHoodieClientBase {
int numRecs = 2000;
List<HoodieRecord> records = dataGen.generateInserts(firstInstantTime, numRecs);
records = runNextDeltaCommits(client, Arrays.asList(firstInstantTime, secondInstantTime), records, cfg, true,
records = runNextDeltaCommits(client, readClient, Arrays.asList(firstInstantTime, secondInstantTime), records, cfg, true,
new ArrayList<>());
// Schedule and mark compaction instant as inflight
@@ -212,7 +215,7 @@ public class TestAsyncCompaction extends TestHoodieClientBase {
moveCompactionFromRequestedToInflight(compactionInstantTime, cfg);
// Complete ingestions
runNextDeltaCommits(client, Arrays.asList(thirdInstantTime, fourthInstantTime), records, cfg, false,
runNextDeltaCommits(client, readClient, Arrays.asList(thirdInstantTime, fourthInstantTime), records, cfg, false,
Arrays.asList(compactionInstantTime));
// execute inflight compaction
@@ -225,6 +228,7 @@ public class TestAsyncCompaction extends TestHoodieClientBase {
// Case: Failure case. Latest pending compaction instant time must be earlier than this instant time
HoodieWriteConfig cfg = getConfig(false);
HoodieWriteClient client = getHoodieWriteClient(cfg, true);
HoodieReadClient readClient = getHoodieReadClient(cfg.getBasePath());
String firstInstantTime = "001";
String secondInstantTime = "004";
@@ -233,7 +237,7 @@ public class TestAsyncCompaction extends TestHoodieClientBase {
int numRecs = 2000;
List<HoodieRecord> records = dataGen.generateInserts(firstInstantTime, numRecs);
records = runNextDeltaCommits(client, Arrays.asList(firstInstantTime, secondInstantTime), records, cfg, true,
records = runNextDeltaCommits(client, readClient, Arrays.asList(firstInstantTime, secondInstantTime), records, cfg, true,
new ArrayList<>());
// Schedule compaction but do not run them
@@ -245,7 +249,7 @@ public class TestAsyncCompaction extends TestHoodieClientBase {
boolean gotException = false;
try {
runNextDeltaCommits(client, Arrays.asList(failedInstantTime), records, cfg, false,
runNextDeltaCommits(client, readClient, Arrays.asList(failedInstantTime), records, cfg, false,
Arrays.asList(compactionInstantTime));
} catch (IllegalArgumentException iex) {
// Latest pending compaction instant time must be earlier than this instant time. Should fail here
@@ -260,6 +264,7 @@ public class TestAsyncCompaction extends TestHoodieClientBase {
HoodieWriteConfig cfg = getConfig(false);
HoodieWriteClient client = getHoodieWriteClient(cfg, true);
HoodieReadClient readClient = getHoodieReadClient(cfg.getBasePath());
String firstInstantTime = "001";
String secondInstantTime = "004";
@@ -268,7 +273,7 @@ public class TestAsyncCompaction extends TestHoodieClientBase {
int numRecs = 2000;
List<HoodieRecord> records = dataGen.generateInserts(firstInstantTime, numRecs);
records = runNextDeltaCommits(client, Arrays.asList(firstInstantTime, secondInstantTime), records, cfg, true,
records = runNextDeltaCommits(client, readClient, Arrays.asList(firstInstantTime, secondInstantTime), records, cfg, true,
new ArrayList<>());
HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), cfg.getBasePath());
@@ -296,6 +301,7 @@ public class TestAsyncCompaction extends TestHoodieClientBase {
HoodieWriteConfig cfg = getConfig(false);
HoodieWriteClient client = getHoodieWriteClient(cfg, true);
HoodieReadClient readClient = getHoodieReadClient(cfg.getBasePath());
String firstInstantTime = "001";
String secondInstantTime = "004";
@@ -303,7 +309,7 @@ public class TestAsyncCompaction extends TestHoodieClientBase {
int numRecs = 2000;
List<HoodieRecord> records = dataGen.generateInserts(firstInstantTime, numRecs);
runNextDeltaCommits(client, Arrays.asList(firstInstantTime, secondInstantTime), records, cfg, true,
runNextDeltaCommits(client, readClient, Arrays.asList(firstInstantTime, secondInstantTime), records, cfg, true,
new ArrayList<>());
boolean gotException = false;
@@ -341,7 +347,8 @@ public class TestAsyncCompaction extends TestHoodieClientBase {
public void testCompactionAfterTwoDeltaCommits() throws Exception {
// No Delta Commits after compaction request
HoodieWriteConfig cfg = getConfig(true);
try (HoodieWriteClient client = getHoodieWriteClient(cfg, true)) {
try (HoodieWriteClient client = getHoodieWriteClient(cfg, true);
HoodieReadClient readClient = getHoodieReadClient(cfg.getBasePath());) {
String firstInstantTime = "001";
String secondInstantTime = "004";
@@ -349,7 +356,7 @@ public class TestAsyncCompaction extends TestHoodieClientBase {
int numRecs = 2000;
List<HoodieRecord> records = dataGen.generateInserts(firstInstantTime, numRecs);
runNextDeltaCommits(client, Arrays.asList(firstInstantTime, secondInstantTime), records, cfg, true,
runNextDeltaCommits(client, readClient, Arrays.asList(firstInstantTime, secondInstantTime), records, cfg, true,
new ArrayList<>());
HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), cfg.getBasePath());
@@ -362,7 +369,8 @@ public class TestAsyncCompaction extends TestHoodieClientBase {
public void testInterleavedCompaction() throws Exception {
// Case: Two delta commits before and after compaction schedule
HoodieWriteConfig cfg = getConfig(true);
try (HoodieWriteClient client = getHoodieWriteClient(cfg, true);) {
try (HoodieWriteClient client = getHoodieWriteClient(cfg, true);
HoodieReadClient readClient = getHoodieReadClient(cfg.getBasePath());) {
String firstInstantTime = "001";
String secondInstantTime = "004";
@@ -373,14 +381,14 @@ public class TestAsyncCompaction extends TestHoodieClientBase {
int numRecs = 2000;
List<HoodieRecord> records = dataGen.generateInserts(firstInstantTime, numRecs);
records = runNextDeltaCommits(client, Arrays.asList(firstInstantTime, secondInstantTime), records, cfg, true,
records = runNextDeltaCommits(client, readClient, Arrays.asList(firstInstantTime, secondInstantTime), records, cfg, true,
new ArrayList<>());
HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), cfg.getBasePath());
HoodieTable hoodieTable = getHoodieTable(metaClient, cfg);
scheduleCompaction(compactionInstantTime, client, cfg);
runNextDeltaCommits(client, Arrays.asList(thirdInstantTime, fourthInstantTime), records, cfg, false,
runNextDeltaCommits(client, readClient, Arrays.asList(thirdInstantTime, fourthInstantTime), records, cfg, false,
Arrays.asList(compactionInstantTime));
executeCompaction(compactionInstantTime, client, hoodieTable, cfg, numRecs, true);
}
@@ -410,15 +418,14 @@ public class TestAsyncCompaction extends TestHoodieClientBase {
});
}
private List<HoodieRecord> runNextDeltaCommits(HoodieWriteClient client, List<String> deltaInstants,
List<HoodieRecord> records, HoodieWriteConfig cfg, boolean insertFirst, List<String> expPendingCompactionInstants)
private List<HoodieRecord> runNextDeltaCommits(HoodieWriteClient client, final HoodieReadClient readClient, List<String> deltaInstants,
List<HoodieRecord> records, HoodieWriteConfig cfg, boolean insertFirst, List<String> expPendingCompactionInstants)
throws Exception {
HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), cfg.getBasePath());
List<Pair<HoodieInstant, HoodieCompactionPlan>> pendingCompactions =
CompactionUtils.getAllPendingCompactionPlans(metaClient);
List<Pair<String, HoodieCompactionPlan>> pendingCompactions = readClient.getPendingCompactions();
List<String> gotPendingCompactionInstants =
pendingCompactions.stream().map(pc -> pc.getKey().getTimestamp()).sorted().collect(Collectors.toList());
pendingCompactions.stream().map(pc -> pc.getKey()).sorted().collect(Collectors.toList());
assertEquals(expPendingCompactionInstants, gotPendingCompactionInstants);
Map<HoodieFileGroupId, Pair<String, HoodieCompactionOperation>> fgIdToCompactionOperation =

View File

@@ -49,6 +49,7 @@ import org.apache.hadoop.fs.Path;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.sql.SQLContext;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
@@ -100,7 +101,7 @@ public class TestHoodieClientBase extends HoodieClientTestHarness {
}
protected HoodieReadClient getHoodieReadClient(String basePath) {
return new HoodieReadClient(jsc, basePath);
return new HoodieReadClient(jsc, basePath, SQLContext.getOrCreate(jsc.sc()));
}
/**

View File

@@ -18,13 +18,19 @@
package org.apache.hudi;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.sql.AnalysisException;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.junit.Test;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
@@ -73,6 +79,13 @@ public class TestHoodieReadClient extends TestHoodieClientBase {
});
}
@Test(expected = IllegalStateException.class)
public void testReadROViewFailsWithoutSqlContext() {
HoodieReadClient readClient = new HoodieReadClient(jsc, getConfig());
JavaRDD<HoodieKey> recordsRDD = jsc.parallelize(new ArrayList<>(), 1);
readClient.readROView(recordsRDD, 1);
}
/**
* Helper to write new records using one of HoodieWriteClient's write API and use ReadClient to test filterExists()
* API works correctly.
@@ -105,6 +118,31 @@ public class TestHoodieReadClient extends TestHoodieClientBase {
List<HoodieRecord> result = filteredRDD.collect();
// Check results
assertEquals(25, result.size());
// check path exists for written keys
JavaPairRDD<HoodieKey, Option<String>> keyToPathPair =
anotherReadClient.checkExists(recordsRDD.map(r -> r.getKey()));
JavaRDD<HoodieKey> keysWithPaths = keyToPathPair.filter(keyPath -> keyPath._2.isPresent())
.map(keyPath -> keyPath._1);
assertEquals(75, keysWithPaths.count());
// verify rows match inserted records
Dataset<Row> rows = anotherReadClient.readROView(keysWithPaths, 1);
assertEquals(75, rows.count());
JavaRDD<HoodieKey> keysWithoutPaths = keyToPathPair.filter(keyPath -> !keyPath._2.isPresent())
.map(keyPath -> keyPath._1);
try {
anotherReadClient.readROView(keysWithoutPaths, 1);
} catch (Exception e) {
// data frame reader throws exception for empty records. ignore the error.
assertEquals(e.getClass(), AnalysisException.class);
}
// Actual tests of getPendingCompactions method are in TestAsyncCompaction
// This is just testing empty list
assertEquals(0, anotherReadClient.getPendingCompactions().size());
}
}
}