refactor code: add docs and init/cleanup resource group for hoodie client test base
This commit is contained in:
@@ -33,6 +33,7 @@ import org.apache.hudi.common.HoodieTestDataGenerator;
|
||||
import org.apache.hudi.common.minicluster.HdfsTestService;
|
||||
import org.apache.hudi.common.model.HoodieTableType;
|
||||
import org.apache.hudi.common.model.HoodieTestUtils;
|
||||
import org.apache.hudi.common.table.HoodieTableMetaClient;
|
||||
import org.apache.hudi.common.util.FSUtils;
|
||||
import org.apache.spark.api.java.JavaSparkContext;
|
||||
import org.apache.spark.sql.SQLContext;
|
||||
@@ -61,6 +62,37 @@ public abstract class HoodieClientTestHarness implements Serializable {
|
||||
protected transient MiniDFSCluster dfsCluster;
|
||||
protected transient DistributedFileSystem dfs;
|
||||
|
||||
/**
|
||||
* Initializes resource group for the subclasses of {@link TestHoodieClientBase}.
|
||||
*
|
||||
* @throws IOException
|
||||
*/
|
||||
public void initResourceGroupForHoodieClientTests() throws IOException {
|
||||
initTempFolderAndPath();
|
||||
initSparkContexts();
|
||||
initTestDataGenerator();
|
||||
initFileSystem();
|
||||
initTableType();
|
||||
}
|
||||
|
||||
/**
|
||||
* Cleanups resource group for the subclasses of {@link TestHoodieClientBase}.
|
||||
* @throws IOException
|
||||
*/
|
||||
public void cleanupResourceGroupForHoodieClientTests() throws IOException {
|
||||
cleanupTableType();
|
||||
cleanupSparkContexts();
|
||||
cleanupTestDataGenerator();
|
||||
cleanupFileSystem();
|
||||
cleanupTempFolderAndPath();
|
||||
}
|
||||
|
||||
/**
|
||||
* Initializes the Spark contexts ({@link JavaSparkContext} and {@link SQLContext})
|
||||
* with the given application name.
|
||||
*
|
||||
* @param appName The specified application name.
|
||||
*/
|
||||
protected void initSparkContexts(String appName) {
|
||||
// Initialize a local spark env
|
||||
jsc = new JavaSparkContext(HoodieClientTestUtils.getSparkConfForTest(appName));
|
||||
@@ -70,10 +102,17 @@ public abstract class HoodieClientTestHarness implements Serializable {
|
||||
sqlContext = new SQLContext(jsc);
|
||||
}
|
||||
|
||||
/**
|
||||
* Initializes the Spark contexts ({@link JavaSparkContext} and {@link SQLContext})
|
||||
* with a default name <b>TestHoodieClient</b>.
|
||||
*/
|
||||
protected void initSparkContexts() {
|
||||
initSparkContexts("TestHoodieClient");
|
||||
}
|
||||
|
||||
/**
|
||||
* Cleanups Spark contexts ({@link JavaSparkContext} and {@link SQLContext}).
|
||||
*/
|
||||
protected void cleanupSparkContexts() {
|
||||
if (sqlContext != null) {
|
||||
logger.info("Clearing sql context cache of spark-session used in previous test-case");
|
||||
@@ -89,12 +128,22 @@ public abstract class HoodieClientTestHarness implements Serializable {
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Initializes a temporary folder and base path.
|
||||
*
|
||||
* @throws IOException
|
||||
*/
|
||||
protected void initTempFolderAndPath() throws IOException {
|
||||
folder = new TemporaryFolder();
|
||||
folder.create();
|
||||
basePath = folder.getRoot().getAbsolutePath();
|
||||
}
|
||||
|
||||
/**
|
||||
* Cleanups the temporary folder and base path.
|
||||
*
|
||||
* @throws IOException
|
||||
*/
|
||||
protected void cleanupTempFolderAndPath() throws IOException {
|
||||
if (basePath != null) {
|
||||
new File(basePath).delete();
|
||||
@@ -106,36 +155,29 @@ public abstract class HoodieClientTestHarness implements Serializable {
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Initializes a file system with the hadoop configuration of Spark context.
|
||||
*/
|
||||
protected void initFileSystem() {
|
||||
if (basePath == null) {
|
||||
throw new IllegalStateException("The base path has not been initialized.");
|
||||
}
|
||||
|
||||
if (jsc == null) {
|
||||
throw new IllegalStateException("The Spark context has not been initialized.");
|
||||
}
|
||||
|
||||
fs = FSUtils.getFs(basePath, jsc.hadoopConfiguration());
|
||||
if (fs instanceof LocalFileSystem) {
|
||||
LocalFileSystem lfs = (LocalFileSystem) fs;
|
||||
// With LocalFileSystem, with checksum disabled, fs.open() returns an inputStream which is FSInputStream
|
||||
// This causes ClassCastExceptions in LogRecordScanner (and potentially other places) calling fs.open
|
||||
// So, for the tests, we enforce checksum verification to circumvent the problem
|
||||
lfs.setVerifyChecksum(true);
|
||||
}
|
||||
initFileSystemWithConfiguration(jsc.hadoopConfiguration());
|
||||
}
|
||||
|
||||
/**
|
||||
* Initializes file system with a default empty configuration.
|
||||
*/
|
||||
protected void initFileSystemWithDefaultConfiguration() {
|
||||
fs = FSUtils.getFs(basePath, new Configuration());
|
||||
if (fs instanceof LocalFileSystem) {
|
||||
LocalFileSystem lfs = (LocalFileSystem) fs;
|
||||
// With LocalFileSystem, with checksum disabled, fs.open() returns an inputStream which is FSInputStream
|
||||
// This causes ClassCastExceptions in LogRecordScanner (and potentially other places) calling fs.open
|
||||
// So, for the tests, we enforce checksum verification to circumvent the problem
|
||||
lfs.setVerifyChecksum(true);
|
||||
}
|
||||
initFileSystemWithConfiguration(new Configuration());
|
||||
}
|
||||
|
||||
/**
|
||||
* Cleanups file system.
|
||||
*
|
||||
* @throws IOException
|
||||
*/
|
||||
protected void cleanupFileSystem() throws IOException {
|
||||
if (fs != null) {
|
||||
logger.warn("Closing file-system instance used in previous test-run");
|
||||
@@ -143,6 +185,12 @@ public abstract class HoodieClientTestHarness implements Serializable {
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Initializes an instance of {@link HoodieTableMetaClient} with a special table type
|
||||
* specified by {@code getTableType()}.
|
||||
*
|
||||
* @throws IOException
|
||||
*/
|
||||
protected void initTableType() throws IOException {
|
||||
if (basePath == null) {
|
||||
throw new IllegalStateException("The base path has not been initialized.");
|
||||
@@ -155,22 +203,46 @@ public abstract class HoodieClientTestHarness implements Serializable {
|
||||
HoodieTestUtils.initTableType(jsc.hadoopConfiguration(), basePath, getTableType());
|
||||
}
|
||||
|
||||
/**
|
||||
* Cleanups table type.
|
||||
*/
|
||||
protected void cleanupTableType() {
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* Initializes a test data generator which used to generate test datas.
|
||||
*
|
||||
* @throws IOException
|
||||
*/
|
||||
protected void initTestDataGenerator() throws IOException {
|
||||
dataGen = new HoodieTestDataGenerator();
|
||||
}
|
||||
|
||||
/**
|
||||
* Cleanups test data generator.
|
||||
*
|
||||
* @throws IOException
|
||||
*/
|
||||
protected void cleanupTestDataGenerator() throws IOException {
|
||||
dataGen = null;
|
||||
}
|
||||
|
||||
/**
|
||||
* Gets a default {@link HoodieTableType#COPY_ON_WRITE} table type.
|
||||
* Sub-classes can override this method to specify a new table type.
|
||||
*
|
||||
* @return an instance of Hoodie table type.
|
||||
*/
|
||||
protected HoodieTableType getTableType() {
|
||||
return HoodieTableType.COPY_ON_WRITE;
|
||||
}
|
||||
|
||||
/**
|
||||
* Initializes a distributed file system and base directory.
|
||||
*
|
||||
* @throws IOException
|
||||
*/
|
||||
protected void initDFS() throws IOException {
|
||||
FileSystem.closeAll();
|
||||
hdfsTestService = new HdfsTestService();
|
||||
@@ -182,6 +254,11 @@ public abstract class HoodieClientTestHarness implements Serializable {
|
||||
dfs.mkdirs(new Path(dfsBasePath));
|
||||
}
|
||||
|
||||
/**
|
||||
* Cleanups the distributed file system.
|
||||
*
|
||||
* @throws IOException
|
||||
*/
|
||||
protected void cleanupDFS() throws IOException {
|
||||
if (hdfsTestService != null) {
|
||||
hdfsTestService.stop();
|
||||
@@ -192,10 +269,18 @@ public abstract class HoodieClientTestHarness implements Serializable {
|
||||
FileSystem.closeAll();
|
||||
}
|
||||
|
||||
/**
|
||||
* Initializes executor service with a fixed thread pool.
|
||||
*
|
||||
* @param threadNum specify the capacity of the fixed thread pool
|
||||
*/
|
||||
protected void initExecutorServiceWithFixedThreadPool(int threadNum) {
|
||||
executorService = Executors.newFixedThreadPool(threadNum);
|
||||
}
|
||||
|
||||
/**
|
||||
* Cleanups the executor service.
|
||||
*/
|
||||
protected void cleanupExecutorService() {
|
||||
if (this.executorService != null) {
|
||||
this.executorService.shutdownNow();
|
||||
@@ -203,4 +288,19 @@ public abstract class HoodieClientTestHarness implements Serializable {
|
||||
}
|
||||
}
|
||||
|
||||
private void initFileSystemWithConfiguration(Configuration configuration) {
|
||||
if (basePath == null) {
|
||||
throw new IllegalStateException("The base path has not been initialized.");
|
||||
}
|
||||
|
||||
fs = FSUtils.getFs(basePath, configuration);
|
||||
if (fs instanceof LocalFileSystem) {
|
||||
LocalFileSystem lfs = (LocalFileSystem) fs;
|
||||
// With LocalFileSystem, with checksum disabled, fs.open() returns an inputStream which is FSInputStream
|
||||
// This causes ClassCastExceptions in LogRecordScanner (and potentially other places) calling fs.open
|
||||
// So, for the tests, we enforce checksum verification to circumvent the problem
|
||||
lfs.setVerifyChecksum(true);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@@ -59,9 +59,7 @@ import org.apache.hudi.config.HoodieWriteConfig;
|
||||
import org.apache.hudi.index.HoodieIndex;
|
||||
import org.apache.hudi.table.HoodieTable;
|
||||
import org.apache.spark.api.java.JavaRDD;
|
||||
import org.junit.After;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
|
||||
/**
|
||||
@@ -86,22 +84,6 @@ public class TestAsyncCompaction extends TestHoodieClientBase {
|
||||
.build());
|
||||
}
|
||||
|
||||
@Before
|
||||
public void setUp() throws Exception {
|
||||
initTempFolderAndPath();
|
||||
initTestDataGenerator();
|
||||
initSparkContexts();
|
||||
initTableType();
|
||||
}
|
||||
|
||||
@After
|
||||
public void tearDown() throws Exception {
|
||||
cleanupTableType();
|
||||
cleanupSparkContexts();
|
||||
cleanupTestDataGenerator();
|
||||
cleanupTempFolderAndPath();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testRollbackForInflightCompaction() throws Exception {
|
||||
// Rollback inflight compaction
|
||||
|
||||
@@ -72,9 +72,7 @@ import org.apache.spark.api.java.JavaRDD;
|
||||
import org.apache.spark.scheduler.SparkListener;
|
||||
import org.apache.spark.scheduler.SparkListenerTaskEnd;
|
||||
import org.apache.spark.util.AccumulatorV2;
|
||||
import org.junit.After;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
import scala.collection.Iterator;
|
||||
|
||||
@@ -86,24 +84,6 @@ public class TestCleaner extends TestHoodieClientBase {
|
||||
private static final int BIG_BATCH_INSERT_SIZE = 500;
|
||||
private static Logger logger = LogManager.getLogger(TestHoodieClientBase.class);
|
||||
|
||||
@Before
|
||||
public void setUp() throws Exception {
|
||||
initTempFolderAndPath();
|
||||
initSparkContexts();
|
||||
initTestDataGenerator();
|
||||
initFileSystem();
|
||||
initTableType();
|
||||
}
|
||||
|
||||
@After
|
||||
public void tearDown() throws Exception {
|
||||
cleanupTableType();
|
||||
cleanupSparkContexts();
|
||||
cleanupTestDataGenerator();
|
||||
cleanupFileSystem();
|
||||
cleanupTempFolderAndPath();
|
||||
}
|
||||
|
||||
/**
|
||||
* Helper method to do first batch of insert for clean by versions/commits tests
|
||||
*
|
||||
|
||||
@@ -42,8 +42,6 @@ import org.apache.hudi.exception.HoodieRollbackException;
|
||||
import org.apache.hudi.index.HoodieIndex;
|
||||
import org.apache.hudi.table.HoodieTable;
|
||||
import org.apache.spark.api.java.JavaRDD;
|
||||
import org.junit.After;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
|
||||
/**
|
||||
@@ -51,24 +49,6 @@ import org.junit.Test;
|
||||
*/
|
||||
public class TestClientRollback extends TestHoodieClientBase {
|
||||
|
||||
@Before
|
||||
public void setUp() throws Exception {
|
||||
initTempFolderAndPath();
|
||||
initTestDataGenerator();
|
||||
initSparkContexts();
|
||||
initFileSystem();
|
||||
initTableType();
|
||||
}
|
||||
|
||||
@After
|
||||
public void tearDown() throws Exception {
|
||||
cleanupTableType();
|
||||
cleanupSparkContexts();
|
||||
cleanupTestDataGenerator();
|
||||
cleanupFileSystem();
|
||||
cleanupTempFolderAndPath();
|
||||
}
|
||||
|
||||
/**
|
||||
* Test case for rollback-savepoint interaction
|
||||
*/
|
||||
|
||||
@@ -56,7 +56,9 @@ import org.apache.hudi.table.HoodieTable;
|
||||
import org.apache.log4j.LogManager;
|
||||
import org.apache.log4j.Logger;
|
||||
import org.apache.spark.api.java.JavaRDD;
|
||||
import org.junit.After;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Before;
|
||||
|
||||
/**
|
||||
* Base Class providing setup/cleanup and utility methods for testing Hoodie Client facing tests
|
||||
@@ -65,6 +67,16 @@ public class TestHoodieClientBase extends HoodieClientTestHarness {
|
||||
|
||||
protected static Logger logger = LogManager.getLogger(TestHoodieClientBase.class);
|
||||
|
||||
@Before
|
||||
public void setUp() throws Exception {
|
||||
initResourceGroupForHoodieClientTests();
|
||||
}
|
||||
|
||||
@After
|
||||
public void tearDown() throws Exception {
|
||||
cleanupResourceGroupForHoodieClientTests();
|
||||
}
|
||||
|
||||
protected HoodieWriteClient getHoodieWriteClient(HoodieWriteConfig cfg) {
|
||||
return getHoodieWriteClient(cfg, false);
|
||||
}
|
||||
|
||||
@@ -65,32 +65,12 @@ import org.apache.hudi.exception.HoodieIOException;
|
||||
import org.apache.hudi.index.HoodieIndex;
|
||||
import org.apache.hudi.table.HoodieTable;
|
||||
import org.apache.spark.api.java.JavaRDD;
|
||||
import org.junit.After;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
public class TestHoodieClientOnCopyOnWriteStorage extends TestHoodieClientBase {
|
||||
|
||||
@Before
|
||||
public void setUp() throws Exception {
|
||||
initTempFolderAndPath();
|
||||
initSparkContexts();
|
||||
initTestDataGenerator();
|
||||
initFileSystem();
|
||||
initTableType();
|
||||
}
|
||||
|
||||
@After
|
||||
public void tearDown() throws Exception {
|
||||
cleanupTableType();
|
||||
cleanupTestDataGenerator();
|
||||
cleanupSparkContexts();
|
||||
cleanupFileSystem();
|
||||
cleanupTempFolderAndPath();
|
||||
}
|
||||
|
||||
/**
|
||||
* Test Auto Commit behavior for HoodieWriteClient insert API
|
||||
*/
|
||||
|
||||
@@ -28,9 +28,7 @@ import org.apache.hudi.common.model.HoodieRecord;
|
||||
import org.apache.hudi.common.util.Option;
|
||||
import org.apache.hudi.config.HoodieWriteConfig;
|
||||
import org.apache.spark.api.java.JavaRDD;
|
||||
import org.junit.After;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
@@ -39,24 +37,6 @@ import org.junit.Test;
|
||||
*/
|
||||
public class TestHoodieReadClient extends TestHoodieClientBase {
|
||||
|
||||
@Before
|
||||
public void setUp() throws Exception {
|
||||
initTempFolderAndPath();
|
||||
initTestDataGenerator();
|
||||
initSparkContexts();
|
||||
initFileSystem();
|
||||
initTableType();
|
||||
}
|
||||
|
||||
@After
|
||||
public void tearDown() throws Exception {
|
||||
cleanupTableType();
|
||||
cleanupTestDataGenerator();
|
||||
cleanupSparkContexts();
|
||||
cleanupFileSystem();
|
||||
cleanupTempFolderAndPath();
|
||||
}
|
||||
|
||||
/**
|
||||
* Test ReadFilter API after writing new records using HoodieWriteClient.insert
|
||||
*/
|
||||
|
||||
Reference in New Issue
Block a user