1
0

Remove stateful fs member from HoodieTestUtils & FSUtils

This commit is contained in:
vinothchandar
2018-01-03 16:05:30 -08:00
committed by vinoth chandar
parent cf7f7aabb9
commit 21ce846f18
19 changed files with 74 additions and 103 deletions

View File

@@ -102,7 +102,7 @@ public class TestHoodieClientOnCopyOnWriteStorage implements Serializable {
folder.create();
basePath = folder.getRoot().getAbsolutePath();
fs = FSUtils.getFs(basePath.toString(), jsc.hadoopConfiguration());
HoodieTestUtils.init(basePath);
HoodieTestUtils.init(fs, basePath);
dataGen = new HoodieTestDataGenerator();
}
@@ -1247,27 +1247,27 @@ public class TestHoodieClientOnCopyOnWriteStorage implements Serializable {
.retainFileVersions(1).build()).build();
HoodieTableMetaClient metaClient = HoodieTestUtils
.initTableType(basePath, HoodieTableType.MERGE_ON_READ);
.initTableType(fs, basePath, HoodieTableType.MERGE_ON_READ);
// Make 3 files, one base file and 2 log files associated with base file
String file1P0 = HoodieTestUtils.createNewDataFile(basePath, partitionPaths[0], "000");
String file2P0L0 = HoodieTestUtils
.createNewLogFile(basePath, partitionPaths[0], "000", file1P0, Optional.empty());
.createNewLogFile(fs, basePath, partitionPaths[0], "000", file1P0, Optional.empty());
String file2P0L1 = HoodieTestUtils
.createNewLogFile(basePath, partitionPaths[0], "000", file1P0, Optional.of(2));
.createNewLogFile(fs, basePath, partitionPaths[0], "000", file1P0, Optional.of(2));
// make 1 compaction commit
HoodieTestUtils.createCompactionCommitFiles(basePath, "000");
HoodieTestUtils.createCompactionCommitFiles(fs, basePath, "000");
// Make 4 files, one base file and 3 log files associated with base file
HoodieTestUtils.createDataFile(basePath, partitionPaths[0], "001", file1P0);
file2P0L0 = HoodieTestUtils
.createNewLogFile(basePath, partitionPaths[0], "001", file1P0, Optional.empty());
.createNewLogFile(fs, basePath, partitionPaths[0], "001", file1P0, Optional.empty());
file2P0L0 = HoodieTestUtils
.createNewLogFile(basePath, partitionPaths[0], "001", file1P0, Optional.of(2));
.createNewLogFile(fs, basePath, partitionPaths[0], "001", file1P0, Optional.of(2));
file2P0L0 = HoodieTestUtils
.createNewLogFile(basePath, partitionPaths[0], "001", file1P0, Optional.of(3));
.createNewLogFile(fs, basePath, partitionPaths[0], "001", file1P0, Optional.of(3));
// make 1 compaction commit
HoodieTestUtils.createCompactionCommitFiles(basePath, "001");
HoodieTestUtils.createCompactionCommitFiles(fs, basePath, "001");
HoodieTable table = HoodieTable.getHoodieTable(metaClient, config);
List<HoodieCleanStat> hoodieCleanStats = table.clean(jsc);

View File

@@ -47,8 +47,8 @@ import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SQLContext;
import org.junit.After;
import org.junit.Before;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
public class TestMultiFS implements Serializable {
@@ -64,8 +64,8 @@ public class TestMultiFS implements Serializable {
private static JavaSparkContext jsc;
private static SQLContext sqlContext;
@Before
public void initClass() throws Exception {
@BeforeClass
public static void initClass() throws Exception {
hdfsTestService = new HdfsTestService();
dfsCluster = hdfsTestService.start(true);
@@ -82,15 +82,18 @@ public class TestMultiFS implements Serializable {
sqlContext = new SQLContext(jsc);
}
@After
public void cleanupClass() throws Exception {
if (hdfsTestService != null) {
hdfsTestService.stop();
}
@AfterClass
public static void cleanupClass() throws Exception {
if (jsc != null) {
jsc.stop();
}
FSUtils.setFs(null);
if (hdfsTestService != null) {
hdfsTestService.stop();
dfsCluster.shutdown();
}
// Need to closeAll to clear FileSystem.Cache, required because DFS and LocalFS used in the same JVM
FileSystem.closeAll();
}
@Test

View File

@@ -48,7 +48,7 @@ public class TestUpdateMapFunction {
TemporaryFolder folder = new TemporaryFolder();
folder.create();
this.basePath = folder.getRoot().getAbsolutePath();
HoodieTestUtils.init(basePath);
HoodieTestUtils.init(FSUtils.getFs(basePath, HoodieTestUtils.getDefaultHadoopConf()), basePath);
}
@Test

View File

@@ -88,7 +88,7 @@ public class TestHoodieBloomIndex {
folder.create();
basePath = folder.getRoot().getAbsolutePath();
fs = FSUtils.getFs(basePath, jsc.hadoopConfiguration());
HoodieTestUtils.init(basePath);
HoodieTestUtils.init(fs, basePath);
// We have some records to be tagged (two different partitions)
schemaStr = IOUtils.toString(getClass().getResourceAsStream("/exampleSchema.txt"), "UTF-8");
schema = HoodieAvroUtils.addMetadataFields(new Schema.Parser().parse(schemaStr));

View File

@@ -54,8 +54,8 @@ public class TestHoodieCommitArchiveLog {
TemporaryFolder folder = new TemporaryFolder();
folder.create();
basePath = folder.getRoot().getAbsolutePath();
HoodieTestUtils.init(basePath);
fs = FSUtils.getFs(basePath, HoodieTestUtils.getDefaultHadoopConf());
HoodieTestUtils.init(fs, basePath);
}
@Test
@@ -75,7 +75,7 @@ public class TestHoodieCommitArchiveLog {
.withSchema(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA).withParallelism(2, 2)
.withCompactionConfig(HoodieCompactionConfig.newBuilder().archiveCommitsWith(2, 4).build())
.forTable("test-trip-table").build();
HoodieTestUtils.init(basePath);
HoodieTestUtils.init(fs, basePath);
HoodieTestDataGenerator.createCommitFile(basePath, "100");
HoodieTestDataGenerator.createCommitFile(basePath, "101");
HoodieTestDataGenerator.createCommitFile(basePath, "102");

View File

@@ -31,6 +31,7 @@ import com.uber.hoodie.common.model.HoodieTestUtils;
import com.uber.hoodie.common.table.HoodieTableMetaClient;
import com.uber.hoodie.common.table.HoodieTimeline;
import com.uber.hoodie.common.table.timeline.HoodieActiveTimeline;
import com.uber.hoodie.common.util.FSUtils;
import com.uber.hoodie.config.HoodieCompactionConfig;
import com.uber.hoodie.config.HoodieIndexConfig;
import com.uber.hoodie.config.HoodieStorageConfig;
@@ -44,6 +45,7 @@ import java.io.File;
import java.io.IOException;
import java.util.List;
import java.util.stream.Collectors;
import org.apache.hadoop.fs.FileSystem;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.junit.After;
@@ -57,6 +59,7 @@ public class TestHoodieCompactor {
private String basePath = null;
private HoodieCompactor compactor;
private transient HoodieTestDataGenerator dataGen = null;
private transient FileSystem fs;
@Before
public void init() throws IOException {
@@ -67,7 +70,8 @@ public class TestHoodieCompactor {
TemporaryFolder folder = new TemporaryFolder();
folder.create();
basePath = folder.getRoot().getAbsolutePath();
HoodieTestUtils.initTableType(basePath, HoodieTableType.MERGE_ON_READ);
fs = FSUtils.getFs(basePath, HoodieTestUtils.getDefaultHadoopConf());
HoodieTestUtils.initTableType(fs, basePath, HoodieTableType.MERGE_ON_READ);
dataGen = new HoodieTestDataGenerator();
compactor = new HoodieRealtimeTableCompactor();
@@ -100,7 +104,7 @@ public class TestHoodieCompactor {
@Test(expected = IllegalArgumentException.class)
public void testCompactionOnCopyOnWriteFail() throws Exception {
HoodieTestUtils.initTableType(basePath, HoodieTableType.COPY_ON_WRITE);
HoodieTestUtils.initTableType(fs, basePath, HoodieTableType.COPY_ON_WRITE);
HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(),
basePath);
HoodieTable table = HoodieTable.getHoodieTable(metaClient, getConfig());
@@ -155,7 +159,7 @@ public class TestHoodieCompactor {
// Write them to corresponding avro logfiles
HoodieTestUtils
.writeRecordsToLogFiles(metaClient.getBasePath(), HoodieTestDataGenerator.avroSchema,
.writeRecordsToLogFiles(fs, metaClient.getBasePath(), HoodieTestDataGenerator.avroSchema,
updatedRecords);
// Verify that all data file has one log file

View File

@@ -76,7 +76,7 @@ public class TestCopyOnWriteTable {
TemporaryFolder folder = new TemporaryFolder();
folder.create();
this.basePath = folder.getRoot().getAbsolutePath();
HoodieTestUtils.init(basePath);
HoodieTestUtils.init(FSUtils.getFs(basePath, jsc.hadoopConfiguration()), basePath);
}
@Test

View File

@@ -42,7 +42,6 @@ import com.uber.hoodie.common.table.TableFileSystemView;
import com.uber.hoodie.common.table.timeline.HoodieActiveTimeline;
import com.uber.hoodie.common.table.timeline.HoodieInstant;
import com.uber.hoodie.common.table.view.HoodieTableFileSystemView;
import com.uber.hoodie.common.util.FSUtils;
import com.uber.hoodie.config.HoodieCompactionConfig;
import com.uber.hoodie.config.HoodieIndexConfig;
import com.uber.hoodie.config.HoodieStorageConfig;
@@ -80,7 +79,6 @@ public class TestMergeOnReadTable {
private transient SQLContext sqlContext;
private static String basePath = null;
private HoodieCompactor compactor;
private FileSystem fs;
//NOTE : Be careful in using DFS (FileSystem.class) vs LocalFs(RawLocalFileSystem.class)
//The implementation and gurantees of many API's differ, for example check rename(src,dst)
@@ -94,10 +92,8 @@ public class TestMergeOnReadTable {
hdfsTestService.stop();
dfsCluster.shutdown();
}
FSUtils.setFs(null);
// Need to closeAll to clear FileSystem.Cache, required because DFS and LocalFS used in the same JVM
FileSystem.closeAll();
HoodieTestUtils.resetFS(basePath);
}
@BeforeClass
@@ -110,8 +106,6 @@ public class TestMergeOnReadTable {
// Create a temp folder as the base path
dfs = dfsCluster.getFileSystem();
}
FSUtils.setFs(dfs);
HoodieTestUtils.resetFS(basePath);
}
@Before
@@ -124,12 +118,10 @@ public class TestMergeOnReadTable {
TemporaryFolder folder = new TemporaryFolder();
folder.create();
basePath = folder.getRoot().getAbsolutePath();
fs = FSUtils.getFs(basePath, jsc.hadoopConfiguration());
jsc.hadoopConfiguration().addResource(fs.getConf());
jsc.hadoopConfiguration().addResource(dfs.getConf());
dfs.mkdirs(new Path(basePath));
FSUtils.setFs(dfs);
HoodieTestUtils.initTableType(basePath, HoodieTableType.MERGE_ON_READ);
HoodieTestUtils.initTableType(dfs, basePath, HoodieTableType.MERGE_ON_READ);
sqlContext = new SQLContext(jsc); // SQLContext stuff
compactor = new HoodieRealtimeTableCompactor();
@@ -219,7 +211,7 @@ public class TestMergeOnReadTable {
compactor.compact(jsc, getConfig(true), table, HoodieActiveTimeline.createNewCommitTime());
allFiles = HoodieTestUtils.listAllDataFilesInPath(fs, cfg.getBasePath());
allFiles = HoodieTestUtils.listAllDataFilesInPath(dfs, cfg.getBasePath());
roView = new HoodieTableFileSystemView(metaClient, hoodieTable.getCompletedCommitTimeline(),
allFiles);
dataFilesToRead = roView.getLatestDataFiles();
@@ -339,7 +331,7 @@ public class TestMergeOnReadTable {
commit = metaClient.getActiveTimeline().getCommitTimeline().firstInstant();
assertFalse(commit.isPresent());
allFiles = HoodieTestUtils.listAllDataFilesInPath(fs, cfg.getBasePath());
allFiles = HoodieTestUtils.listAllDataFilesInPath(dfs, cfg.getBasePath());
roView = new HoodieTableFileSystemView(metaClient, hoodieTable.getCompletedCommitTimeline(),
allFiles);
dataFilesToRead = roView.getLatestDataFiles();
@@ -357,7 +349,7 @@ public class TestMergeOnReadTable {
public void testCOWToMORConvertedDatasetRollback() throws Exception {
//Set TableType to COW
HoodieTestUtils.initTableType(basePath, HoodieTableType.COPY_ON_WRITE);
HoodieTestUtils.initTableType(dfs, basePath, HoodieTableType.COPY_ON_WRITE);
HoodieWriteConfig cfg = getConfig(true);
HoodieWriteClient client = new HoodieWriteClient(jsc, cfg);
@@ -396,7 +388,7 @@ public class TestMergeOnReadTable {
assertNoWriteErrors(statuses);
//Set TableType to MOR
HoodieTestUtils.initTableType(basePath, HoodieTableType.MERGE_ON_READ);
HoodieTestUtils.initTableType(dfs, basePath, HoodieTableType.MERGE_ON_READ);
//rollback a COW commit when TableType is MOR
client.rollback(newCommitTime);