From d07ac588ac30e80f59522d51e939cc125edcff52 Mon Sep 17 00:00:00 2001 From: Satish Kotha Date: Fri, 31 Jan 2020 17:17:03 -0800 Subject: [PATCH] Increase test coverage for HoodieReadClient --- .../org/apache/hudi/TestAsyncCompaction.java | 49 +++++++++++-------- .../org/apache/hudi/TestHoodieClientBase.java | 3 +- .../org/apache/hudi/TestHoodieReadClient.java | 38 ++++++++++++++ 3 files changed, 68 insertions(+), 22 deletions(-) diff --git a/hudi-client/src/test/java/org/apache/hudi/TestAsyncCompaction.java b/hudi-client/src/test/java/org/apache/hudi/TestAsyncCompaction.java index 4f8fbb1f4..eae98c643 100644 --- a/hudi-client/src/test/java/org/apache/hudi/TestAsyncCompaction.java +++ b/hudi-client/src/test/java/org/apache/hudi/TestAsyncCompaction.java @@ -88,7 +88,8 @@ public class TestAsyncCompaction extends TestHoodieClientBase { public void testRollbackForInflightCompaction() throws Exception { // Rollback inflight compaction HoodieWriteConfig cfg = getConfig(false); - try (HoodieWriteClient client = getHoodieWriteClient(cfg, true);) { + try (HoodieWriteClient client = getHoodieWriteClient(cfg, true); + HoodieReadClient readClient = getHoodieReadClient(cfg.getBasePath());) { String firstInstantTime = "001"; String secondInstantTime = "004"; @@ -97,7 +98,7 @@ public class TestAsyncCompaction extends TestHoodieClientBase { int numRecs = 2000; List records = dataGen.generateInserts(firstInstantTime, numRecs); - runNextDeltaCommits(client, Arrays.asList(firstInstantTime, secondInstantTime), records, cfg, true, + runNextDeltaCommits(client, readClient, Arrays.asList(firstInstantTime, secondInstantTime), records, cfg, true, new ArrayList<>()); // Schedule compaction but do not run them @@ -150,10 +151,11 @@ public class TestAsyncCompaction extends TestHoodieClientBase { int numRecs = 2000; - try (HoodieWriteClient client = getHoodieWriteClient(cfg, true);) { + try (HoodieWriteClient client = getHoodieWriteClient(cfg, true); + HoodieReadClient readClient = getHoodieReadClient(cfg.getBasePath());) { List records = dataGen.generateInserts(firstInstantTime, numRecs); - records = runNextDeltaCommits(client, Arrays.asList(firstInstantTime, secondInstantTime), records, cfg, true, + records = runNextDeltaCommits(client, readClient, Arrays.asList(firstInstantTime, secondInstantTime), records, cfg, true, new ArrayList<>()); // Schedule compaction but do not run them @@ -191,7 +193,8 @@ public class TestAsyncCompaction extends TestHoodieClientBase { public void testInflightCompaction() throws Exception { // There is inflight compaction. Subsequent compaction run must work correctly HoodieWriteConfig cfg = getConfig(true); - try (HoodieWriteClient client = getHoodieWriteClient(cfg, true);) { + try (HoodieWriteClient client = getHoodieWriteClient(cfg, true); + HoodieReadClient readClient = getHoodieReadClient(cfg.getBasePath());) { String firstInstantTime = "001"; String secondInstantTime = "004"; @@ -202,7 +205,7 @@ public class TestAsyncCompaction extends TestHoodieClientBase { int numRecs = 2000; List records = dataGen.generateInserts(firstInstantTime, numRecs); - records = runNextDeltaCommits(client, Arrays.asList(firstInstantTime, secondInstantTime), records, cfg, true, + records = runNextDeltaCommits(client, readClient, Arrays.asList(firstInstantTime, secondInstantTime), records, cfg, true, new ArrayList<>()); // Schedule and mark compaction instant as inflight @@ -212,7 +215,7 @@ public class TestAsyncCompaction extends TestHoodieClientBase { moveCompactionFromRequestedToInflight(compactionInstantTime, cfg); // Complete ingestions - runNextDeltaCommits(client, Arrays.asList(thirdInstantTime, fourthInstantTime), records, cfg, false, + runNextDeltaCommits(client, readClient, Arrays.asList(thirdInstantTime, fourthInstantTime), records, cfg, false, Arrays.asList(compactionInstantTime)); // execute inflight compaction @@ -225,6 +228,7 @@ public class TestAsyncCompaction extends TestHoodieClientBase { // Case: Failure case. Latest pending compaction instant time must be earlier than this instant time HoodieWriteConfig cfg = getConfig(false); HoodieWriteClient client = getHoodieWriteClient(cfg, true); + HoodieReadClient readClient = getHoodieReadClient(cfg.getBasePath()); String firstInstantTime = "001"; String secondInstantTime = "004"; @@ -233,7 +237,7 @@ public class TestAsyncCompaction extends TestHoodieClientBase { int numRecs = 2000; List records = dataGen.generateInserts(firstInstantTime, numRecs); - records = runNextDeltaCommits(client, Arrays.asList(firstInstantTime, secondInstantTime), records, cfg, true, + records = runNextDeltaCommits(client, readClient, Arrays.asList(firstInstantTime, secondInstantTime), records, cfg, true, new ArrayList<>()); // Schedule compaction but do not run them @@ -245,7 +249,7 @@ public class TestAsyncCompaction extends TestHoodieClientBase { boolean gotException = false; try { - runNextDeltaCommits(client, Arrays.asList(failedInstantTime), records, cfg, false, + runNextDeltaCommits(client, readClient, Arrays.asList(failedInstantTime), records, cfg, false, Arrays.asList(compactionInstantTime)); } catch (IllegalArgumentException iex) { // Latest pending compaction instant time must be earlier than this instant time. Should fail here @@ -260,6 +264,7 @@ public class TestAsyncCompaction extends TestHoodieClientBase { HoodieWriteConfig cfg = getConfig(false); HoodieWriteClient client = getHoodieWriteClient(cfg, true); + HoodieReadClient readClient = getHoodieReadClient(cfg.getBasePath()); String firstInstantTime = "001"; String secondInstantTime = "004"; @@ -268,7 +273,7 @@ public class TestAsyncCompaction extends TestHoodieClientBase { int numRecs = 2000; List records = dataGen.generateInserts(firstInstantTime, numRecs); - records = runNextDeltaCommits(client, Arrays.asList(firstInstantTime, secondInstantTime), records, cfg, true, + records = runNextDeltaCommits(client, readClient, Arrays.asList(firstInstantTime, secondInstantTime), records, cfg, true, new ArrayList<>()); HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), cfg.getBasePath()); @@ -296,6 +301,7 @@ public class TestAsyncCompaction extends TestHoodieClientBase { HoodieWriteConfig cfg = getConfig(false); HoodieWriteClient client = getHoodieWriteClient(cfg, true); + HoodieReadClient readClient = getHoodieReadClient(cfg.getBasePath()); String firstInstantTime = "001"; String secondInstantTime = "004"; @@ -303,7 +309,7 @@ public class TestAsyncCompaction extends TestHoodieClientBase { int numRecs = 2000; List records = dataGen.generateInserts(firstInstantTime, numRecs); - runNextDeltaCommits(client, Arrays.asList(firstInstantTime, secondInstantTime), records, cfg, true, + runNextDeltaCommits(client, readClient, Arrays.asList(firstInstantTime, secondInstantTime), records, cfg, true, new ArrayList<>()); boolean gotException = false; @@ -341,7 +347,8 @@ public class TestAsyncCompaction extends TestHoodieClientBase { public void testCompactionAfterTwoDeltaCommits() throws Exception { // No Delta Commits after compaction request HoodieWriteConfig cfg = getConfig(true); - try (HoodieWriteClient client = getHoodieWriteClient(cfg, true)) { + try (HoodieWriteClient client = getHoodieWriteClient(cfg, true); + HoodieReadClient readClient = getHoodieReadClient(cfg.getBasePath());) { String firstInstantTime = "001"; String secondInstantTime = "004"; @@ -349,7 +356,7 @@ public class TestAsyncCompaction extends TestHoodieClientBase { int numRecs = 2000; List records = dataGen.generateInserts(firstInstantTime, numRecs); - runNextDeltaCommits(client, Arrays.asList(firstInstantTime, secondInstantTime), records, cfg, true, + runNextDeltaCommits(client, readClient, Arrays.asList(firstInstantTime, secondInstantTime), records, cfg, true, new ArrayList<>()); HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), cfg.getBasePath()); @@ -362,7 +369,8 @@ public class TestAsyncCompaction extends TestHoodieClientBase { public void testInterleavedCompaction() throws Exception { // Case: Two delta commits before and after compaction schedule HoodieWriteConfig cfg = getConfig(true); - try (HoodieWriteClient client = getHoodieWriteClient(cfg, true);) { + try (HoodieWriteClient client = getHoodieWriteClient(cfg, true); + HoodieReadClient readClient = getHoodieReadClient(cfg.getBasePath());) { String firstInstantTime = "001"; String secondInstantTime = "004"; @@ -373,14 +381,14 @@ public class TestAsyncCompaction extends TestHoodieClientBase { int numRecs = 2000; List records = dataGen.generateInserts(firstInstantTime, numRecs); - records = runNextDeltaCommits(client, Arrays.asList(firstInstantTime, secondInstantTime), records, cfg, true, + records = runNextDeltaCommits(client, readClient, Arrays.asList(firstInstantTime, secondInstantTime), records, cfg, true, new ArrayList<>()); HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), cfg.getBasePath()); HoodieTable hoodieTable = getHoodieTable(metaClient, cfg); scheduleCompaction(compactionInstantTime, client, cfg); - runNextDeltaCommits(client, Arrays.asList(thirdInstantTime, fourthInstantTime), records, cfg, false, + runNextDeltaCommits(client, readClient, Arrays.asList(thirdInstantTime, fourthInstantTime), records, cfg, false, Arrays.asList(compactionInstantTime)); executeCompaction(compactionInstantTime, client, hoodieTable, cfg, numRecs, true); } @@ -410,15 +418,14 @@ public class TestAsyncCompaction extends TestHoodieClientBase { }); } - private List runNextDeltaCommits(HoodieWriteClient client, List deltaInstants, - List records, HoodieWriteConfig cfg, boolean insertFirst, List expPendingCompactionInstants) + private List runNextDeltaCommits(HoodieWriteClient client, final HoodieReadClient readClient, List deltaInstants, + List records, HoodieWriteConfig cfg, boolean insertFirst, List expPendingCompactionInstants) throws Exception { HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), cfg.getBasePath()); - List> pendingCompactions = - CompactionUtils.getAllPendingCompactionPlans(metaClient); + List> pendingCompactions = readClient.getPendingCompactions(); List gotPendingCompactionInstants = - pendingCompactions.stream().map(pc -> pc.getKey().getTimestamp()).sorted().collect(Collectors.toList()); + pendingCompactions.stream().map(pc -> pc.getKey()).sorted().collect(Collectors.toList()); assertEquals(expPendingCompactionInstants, gotPendingCompactionInstants); Map> fgIdToCompactionOperation = diff --git a/hudi-client/src/test/java/org/apache/hudi/TestHoodieClientBase.java b/hudi-client/src/test/java/org/apache/hudi/TestHoodieClientBase.java index 07e93df35..61332a76d 100644 --- a/hudi-client/src/test/java/org/apache/hudi/TestHoodieClientBase.java +++ b/hudi-client/src/test/java/org/apache/hudi/TestHoodieClientBase.java @@ -49,6 +49,7 @@ import org.apache.hadoop.fs.Path; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; import org.apache.spark.api.java.JavaRDD; +import org.apache.spark.sql.SQLContext; import org.junit.After; import org.junit.Assert; import org.junit.Before; @@ -100,7 +101,7 @@ public class TestHoodieClientBase extends HoodieClientTestHarness { } protected HoodieReadClient getHoodieReadClient(String basePath) { - return new HoodieReadClient(jsc, basePath); + return new HoodieReadClient(jsc, basePath, SQLContext.getOrCreate(jsc.sc())); } /** diff --git a/hudi-client/src/test/java/org/apache/hudi/TestHoodieReadClient.java b/hudi-client/src/test/java/org/apache/hudi/TestHoodieReadClient.java index e6ea2688c..4d35524f5 100644 --- a/hudi-client/src/test/java/org/apache/hudi/TestHoodieReadClient.java +++ b/hudi-client/src/test/java/org/apache/hudi/TestHoodieReadClient.java @@ -18,13 +18,19 @@ package org.apache.hudi; +import org.apache.hudi.common.model.HoodieKey; import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.util.Option; import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; +import org.apache.spark.sql.AnalysisException; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Row; import org.junit.Test; +import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; import java.util.List; @@ -73,6 +79,13 @@ public class TestHoodieReadClient extends TestHoodieClientBase { }); } + @Test(expected = IllegalStateException.class) + public void testReadROViewFailsWithoutSqlContext() { + HoodieReadClient readClient = new HoodieReadClient(jsc, getConfig()); + JavaRDD recordsRDD = jsc.parallelize(new ArrayList<>(), 1); + readClient.readROView(recordsRDD, 1); + } + /** * Helper to write new records using one of HoodieWriteClient's write API and use ReadClient to test filterExists() * API works correctly. @@ -105,6 +118,31 @@ public class TestHoodieReadClient extends TestHoodieClientBase { List result = filteredRDD.collect(); // Check results assertEquals(25, result.size()); + + // check path exists for written keys + JavaPairRDD> keyToPathPair = + anotherReadClient.checkExists(recordsRDD.map(r -> r.getKey())); + JavaRDD keysWithPaths = keyToPathPair.filter(keyPath -> keyPath._2.isPresent()) + .map(keyPath -> keyPath._1); + assertEquals(75, keysWithPaths.count()); + + // verify rows match inserted records + Dataset rows = anotherReadClient.readROView(keysWithPaths, 1); + assertEquals(75, rows.count()); + + JavaRDD keysWithoutPaths = keyToPathPair.filter(keyPath -> !keyPath._2.isPresent()) + .map(keyPath -> keyPath._1); + + try { + anotherReadClient.readROView(keysWithoutPaths, 1); + } catch (Exception e) { + // data frame reader throws exception for empty records. ignore the error. + assertEquals(e.getClass(), AnalysisException.class); + } + + // Actual tests of getPendingCompactions method are in TestAsyncCompaction + // This is just testing empty list + assertEquals(0, anotherReadClient.getPendingCompactions().size()); } } }