diff --git a/hudi-client/pom.xml b/hudi-client/pom.xml
index 326cf83ff..a390923bb 100644
--- a/hudi-client/pom.xml
+++ b/hudi-client/pom.xml
@@ -268,5 +268,23 @@
mockito-junit-jupiter
test
+
+
+ org.junit.platform
+ junit-platform-runner
+ test
+
+
+
+ org.junit.platform
+ junit-platform-suite-api
+ test
+
+
+
+ org.junit.platform
+ junit-platform-commons
+ test
+
diff --git a/hudi-client/src/test/java/org/apache/hudi/ClientFunctionalTestSuite.java b/hudi-client/src/test/java/org/apache/hudi/ClientFunctionalTestSuite.java
new file mode 100644
index 000000000..4e62618c8
--- /dev/null
+++ b/hudi-client/src/test/java/org/apache/hudi/ClientFunctionalTestSuite.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi;
+
+import org.junit.platform.runner.JUnitPlatform;
+import org.junit.platform.suite.api.IncludeTags;
+import org.junit.platform.suite.api.SelectPackages;
+import org.junit.runner.RunWith;
+
+@RunWith(JUnitPlatform.class)
+@SelectPackages("org.apache.hudi.index")
+@IncludeTags("functional")
+public class ClientFunctionalTestSuite {
+
+}
diff --git a/hudi-client/src/test/java/org/apache/hudi/index/hbase/TestHBaseIndex.java b/hudi-client/src/test/java/org/apache/hudi/index/hbase/TestHBaseIndex.java
index 6b6f11f79..b3d2f5a5d 100644
--- a/hudi-client/src/test/java/org/apache/hudi/index/hbase/TestHBaseIndex.java
+++ b/hudi-client/src/test/java/org/apache/hudi/index/hbase/TestHBaseIndex.java
@@ -31,7 +31,7 @@ import org.apache.hudi.config.HoodieStorageConfig;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.index.HoodieIndex;
import org.apache.hudi.table.HoodieTable;
-import org.apache.hudi.testutils.HoodieClientTestHarness;
+import org.apache.hudi.testutils.FunctionalTestHarness;
import org.apache.hudi.testutils.HoodieTestDataGenerator;
import org.apache.hadoop.conf.Configuration;
@@ -47,10 +47,10 @@ import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.spark.api.java.JavaRDD;
import org.junit.jupiter.api.AfterAll;
-import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.MethodOrderer;
+import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestMethodOrder;
@@ -76,12 +76,17 @@ import static org.mockito.Mockito.when;
* {@link MethodOrderer.Alphanumeric} to make sure the tests run in order. Please alter the order of tests running carefully.
*/
@TestMethodOrder(MethodOrderer.Alphanumeric.class)
-public class TestHBaseIndex extends HoodieClientTestHarness {
+@Tag("functional")
+public class TestHBaseIndex extends FunctionalTestHarness {
private static final String TABLE_NAME = "test_table";
private static HBaseTestingUtility utility;
private static Configuration hbaseConfig;
+ private Configuration hadoopConf;
+ private HoodieTestDataGenerator dataGen;
+ private HoodieTableMetaClient metaClient;
+
@AfterAll
public static void clean() throws Exception {
if (utility != null) {
@@ -104,21 +109,10 @@ public class TestHBaseIndex extends HoodieClientTestHarness {
@BeforeEach
public void setUp() throws Exception {
- // Initialize a local spark env
- initSparkContexts("TestHBaseIndex");
+ hadoopConf = jsc().hadoopConfiguration();
hadoopConf.addResource(utility.getConfiguration());
-
- // Create a temp folder as the base path
- initPath();
- initTestDataGenerator();
- initMetaClient();
- }
-
- @AfterEach
- public void tearDown() throws Exception {
- cleanupSparkContexts();
- cleanupTestDataGenerator();
- cleanupClients();
+ metaClient = getHoodieMetaClient(hadoopConf, basePath());
+ dataGen = new HoodieTestDataGenerator();
}
@Test
@@ -126,7 +120,7 @@ public class TestHBaseIndex extends HoodieClientTestHarness {
final String newCommitTime = "001";
final int numRecords = 10;
List records = dataGen.generateInserts(newCommitTime, numRecords);
- JavaRDD writeRecords = jsc.parallelize(records, 1);
+ JavaRDD writeRecords = jsc().parallelize(records, 1);
// Load to memory
HoodieWriteConfig config = getConfig();
@@ -136,7 +130,7 @@ public class TestHBaseIndex extends HoodieClientTestHarness {
HoodieTable hoodieTable = HoodieTable.create(metaClient, config, hadoopConf);
// Test tagLocation without any entries in index
- JavaRDD records1 = index.tagLocation(writeRecords, jsc, hoodieTable);
+ JavaRDD records1 = index.tagLocation(writeRecords, jsc(), hoodieTable);
assertEquals(0, records1.filter(record -> record.isCurrentLocationKnown()).count());
// Insert 200 records
@@ -146,7 +140,7 @@ public class TestHBaseIndex extends HoodieClientTestHarness {
// Now tagLocation for these records, hbaseIndex should not tag them since it was a failed
// commit
- JavaRDD records2 = index.tagLocation(writeRecords, jsc, hoodieTable);
+ JavaRDD records2 = index.tagLocation(writeRecords, jsc(), hoodieTable);
assertEquals(0, records2.filter(record -> record.isCurrentLocationKnown()).count());
// Now commit this & update location of records inserted and validate no errors
@@ -154,7 +148,7 @@ public class TestHBaseIndex extends HoodieClientTestHarness {
// Now tagLocation for these records, hbaseIndex should tag them correctly
metaClient = HoodieTableMetaClient.reload(metaClient);
hoodieTable = HoodieTable.create(metaClient, config, hadoopConf);
- List records3 = index.tagLocation(writeRecords, jsc, hoodieTable).collect();
+ List records3 = index.tagLocation(writeRecords, jsc(), hoodieTable).collect();
assertEquals(numRecords, records3.stream().filter(record -> record.isCurrentLocationKnown()).count());
assertEquals(numRecords, records3.stream().map(record -> record.getKey().getRecordKey()).distinct().count());
assertEquals(numRecords, records3.stream().filter(record -> (record.getCurrentLocation() != null
@@ -167,7 +161,7 @@ public class TestHBaseIndex extends HoodieClientTestHarness {
final String newCommitTime = "001";
final int numRecords = 10;
List records = dataGen.generateInserts(newCommitTime, numRecords);
- JavaRDD writeRecords = jsc.parallelize(records, 1);
+ JavaRDD writeRecords = jsc().parallelize(records, 1);
// Load to memory
HoodieWriteConfig config = getConfig();
@@ -178,7 +172,7 @@ public class TestHBaseIndex extends HoodieClientTestHarness {
HoodieTable hoodieTable = HoodieTable.create(metaClient, config, hadoopConf);
JavaRDD writeStatues = writeClient.upsert(writeRecords, newCommitTime);
- index.tagLocation(writeRecords, jsc, hoodieTable);
+ index.tagLocation(writeRecords, jsc(), hoodieTable);
// Duplicate upsert and ensure correctness is maintained
// We are trying to approximately imitate the case when the RDD is recomputed. For RDD creating, driver code is not
@@ -194,7 +188,7 @@ public class TestHBaseIndex extends HoodieClientTestHarness {
// Now tagLocation for these records, hbaseIndex should tag them correctly
metaClient = HoodieTableMetaClient.reload(metaClient);
hoodieTable = HoodieTable.create(metaClient, config, hadoopConf);
- List taggedRecords = index.tagLocation(writeRecords, jsc, hoodieTable).collect();
+ List taggedRecords = index.tagLocation(writeRecords, jsc(), hoodieTable).collect();
assertEquals(numRecords, taggedRecords.stream().filter(HoodieRecord::isCurrentLocationKnown).count());
assertEquals(numRecords, taggedRecords.stream().map(record -> record.getKey().getRecordKey()).distinct().count());
assertEquals(numRecords, taggedRecords.stream().filter(record -> (record.getCurrentLocation() != null
@@ -211,7 +205,7 @@ public class TestHBaseIndex extends HoodieClientTestHarness {
final String newCommitTime = writeClient.startCommit();
final int numRecords = 10;
List records = dataGen.generateInserts(newCommitTime, numRecords);
- JavaRDD writeRecords = jsc.parallelize(records, 1);
+ JavaRDD writeRecords = jsc().parallelize(records, 1);
metaClient = HoodieTableMetaClient.reload(metaClient);
// Insert 200 records
@@ -222,7 +216,7 @@ public class TestHBaseIndex extends HoodieClientTestHarness {
writeClient.commit(newCommitTime, writeStatues);
HoodieTable hoodieTable = HoodieTable.create(metaClient, config, hadoopConf);
// Now tagLocation for these records, hbaseIndex should tag them
- List records2 = index.tagLocation(writeRecords, jsc, hoodieTable).collect();
+ List records2 = index.tagLocation(writeRecords, jsc(), hoodieTable).collect();
assertEquals(numRecords, records2.stream().filter(HoodieRecord::isCurrentLocationKnown).count());
// check tagged records are tagged with correct fileIds
@@ -238,7 +232,7 @@ public class TestHBaseIndex extends HoodieClientTestHarness {
hoodieTable = HoodieTable.create(metaClient, config, hadoopConf);
// Now tagLocation for these records, hbaseIndex should not tag them since it was a rolled
// back commit
- List records3 = index.tagLocation(writeRecords, jsc, hoodieTable).collect();
+ List records3 = index.tagLocation(writeRecords, jsc(), hoodieTable).collect();
assertEquals(0, records3.stream().filter(HoodieRecord::isCurrentLocationKnown).count());
assertEquals(0, records3.stream().filter(record -> record.getCurrentLocation() != null).count());
}
@@ -262,7 +256,7 @@ public class TestHBaseIndex extends HoodieClientTestHarness {
// start a commit and generate test data
String newCommitTime = writeClient.startCommit();
List records = dataGen.generateInserts(newCommitTime, 250);
- JavaRDD writeRecords = jsc.parallelize(records, 1);
+ JavaRDD writeRecords = jsc().parallelize(records, 1);
metaClient = HoodieTableMetaClient.reload(metaClient);
HoodieTable hoodieTable = HoodieTable.create(metaClient, config, hadoopConf);
@@ -271,7 +265,7 @@ public class TestHBaseIndex extends HoodieClientTestHarness {
assertNoWriteErrors(writeStatues.collect());
// Now tagLocation for these records, hbaseIndex should tag them
- index.tagLocation(writeRecords, jsc, hoodieTable);
+ index.tagLocation(writeRecords, jsc(), hoodieTable);
// 3 batches should be executed given batchSize = 100 and parallelism = 1
verify(table, times(3)).get((List) any());
@@ -287,7 +281,7 @@ public class TestHBaseIndex extends HoodieClientTestHarness {
// start a commit and generate test data
String newCommitTime = writeClient.startCommit();
List records = dataGen.generateInserts(newCommitTime, 250);
- JavaRDD writeRecords = jsc.parallelize(records, 1);
+ JavaRDD writeRecords = jsc().parallelize(records, 1);
metaClient = HoodieTableMetaClient.reload(metaClient);
HoodieTable hoodieTable = HoodieTable.create(metaClient, config, hadoopConf);
@@ -309,7 +303,7 @@ public class TestHBaseIndex extends HoodieClientTestHarness {
// Get all the files generated
int numberOfDataFileIds = (int) writeStatues.map(status -> status.getFileId()).distinct().count();
- index.updateLocation(writeStatues, jsc, hoodieTable);
+ index.updateLocation(writeStatues, jsc(), hoodieTable);
// 3 batches should be executed given batchSize = 100 and <=numberOfDataFileIds getting updated,
// so each fileId ideally gets updates
verify(table, atMost(numberOfDataFileIds)).put((List) any());
@@ -319,7 +313,7 @@ public class TestHBaseIndex extends HoodieClientTestHarness {
public void testsHBasePutAccessParallelism() {
HoodieWriteConfig config = getConfig();
HBaseIndex index = new HBaseIndex(config);
- final JavaRDD writeStatusRDD = jsc.parallelize(
+ final JavaRDD writeStatusRDD = jsc().parallelize(
Arrays.asList(getSampleWriteStatus(1, 2), getSampleWriteStatus(0, 3), getSampleWriteStatus(10, 0)), 10);
final Tuple2 tuple = index.getHBasePutAccessParallelism(writeStatusRDD);
final int hbasePutAccessParallelism = Integer.parseInt(tuple._2.toString());
@@ -334,7 +328,7 @@ public class TestHBaseIndex extends HoodieClientTestHarness {
HoodieWriteConfig config = getConfig();
HBaseIndex index = new HBaseIndex(config);
final JavaRDD writeStatusRDD =
- jsc.parallelize(Arrays.asList(getSampleWriteStatus(0, 2), getSampleWriteStatus(0, 1)), 10);
+ jsc().parallelize(Arrays.asList(getSampleWriteStatus(0, 2), getSampleWriteStatus(0, 1)), 10);
final Tuple2 tuple = index.getHBasePutAccessParallelism(writeStatusRDD);
final int hbasePutAccessParallelism = Integer.parseInt(tuple._2.toString());
final int hbaseNumPuts = Integer.parseInt(tuple._1.toString());
@@ -348,7 +342,7 @@ public class TestHBaseIndex extends HoodieClientTestHarness {
final String newCommitTime = "001";
final int numRecords = 10;
List records = dataGen.generateInserts(newCommitTime, numRecords);
- JavaRDD writeRecords = jsc.parallelize(records, 1);
+ JavaRDD writeRecords = jsc().parallelize(records, 1);
// Load to memory
HoodieWriteConfig config = getConfig(2);
@@ -358,7 +352,7 @@ public class TestHBaseIndex extends HoodieClientTestHarness {
HoodieTable hoodieTable = HoodieTable.create(metaClient, config, hadoopConf);
// Test tagLocation without any entries in index
- JavaRDD records1 = index.tagLocation(writeRecords, jsc, hoodieTable);
+ JavaRDD records1 = index.tagLocation(writeRecords, jsc(), hoodieTable);
assertEquals(0, records1.filter(record -> record.isCurrentLocationKnown()).count());
// Insert 200 records
writeClient.startCommitWithTime(newCommitTime);
@@ -367,7 +361,7 @@ public class TestHBaseIndex extends HoodieClientTestHarness {
// Now tagLocation for these records, hbaseIndex should not tag them since it was a failed
// commit
- JavaRDD records2 = index.tagLocation(writeRecords, jsc, hoodieTable);
+ JavaRDD records2 = index.tagLocation(writeRecords, jsc(), hoodieTable);
assertEquals(0, records2.filter(record -> record.isCurrentLocationKnown()).count());
// Now commit this & update location of records inserted and validate no errors
@@ -375,7 +369,7 @@ public class TestHBaseIndex extends HoodieClientTestHarness {
// Now tagLocation for these records, hbaseIndex should tag them correctly
metaClient = HoodieTableMetaClient.reload(metaClient);
hoodieTable = HoodieTable.create(metaClient, config, hadoopConf);
- List records3 = index.tagLocation(writeRecords, jsc, hoodieTable).collect();
+ List records3 = index.tagLocation(writeRecords, jsc(), hoodieTable).collect();
assertEquals(numRecords, records3.stream().filter(record -> record.isCurrentLocationKnown()).count());
assertEquals(numRecords, records3.stream().map(record -> record.getKey().getRecordKey()).distinct().count());
assertEquals(numRecords, records3.stream().filter(record -> (record.getCurrentLocation() != null
@@ -388,7 +382,7 @@ public class TestHBaseIndex extends HoodieClientTestHarness {
final String newCommitTime = "001";
final int numRecords = 10;
List records = dataGen.generateInserts(newCommitTime, numRecords);
- JavaRDD writeRecords = jsc.parallelize(records, 1);
+ JavaRDD writeRecords = jsc().parallelize(records, 1);
// Load to memory
HoodieWriteConfig config = getConfig();
@@ -398,7 +392,7 @@ public class TestHBaseIndex extends HoodieClientTestHarness {
HoodieTable hoodieTable = HoodieTable.create(metaClient, config, hadoopConf);
// Test tagLocation without any entries in index
- JavaRDD records1 = index.tagLocation(writeRecords, jsc, hoodieTable);
+ JavaRDD records1 = index.tagLocation(writeRecords, jsc(), hoodieTable);
assertEquals(0, records1.filter(record -> record.isCurrentLocationKnown()).count());
// Insert records
@@ -410,7 +404,7 @@ public class TestHBaseIndex extends HoodieClientTestHarness {
// Now tagLocation for these records, hbaseIndex should tag them correctly
metaClient = HoodieTableMetaClient.reload(metaClient);
hoodieTable = HoodieTable.create(metaClient, config, hadoopConf);
- List records2 = index.tagLocation(writeRecords, jsc, hoodieTable).collect();
+ List records2 = index.tagLocation(writeRecords, jsc(), hoodieTable).collect();
assertEquals(numRecords, records2.stream().filter(record -> record.isCurrentLocationKnown()).count());
assertEquals(numRecords, records2.stream().map(record -> record.getKey().getRecordKey()).distinct().count());
assertEquals(numRecords, records2.stream().filter(record -> (record.getCurrentLocation() != null
@@ -425,12 +419,12 @@ public class TestHBaseIndex extends HoodieClientTestHarness {
newWriteStatus.setStat(new HoodieWriteStat());
return newWriteStatus;
});
- JavaRDD deleteStatus = index.updateLocation(deleteWriteStatues, jsc, hoodieTable);
+ JavaRDD deleteStatus = index.updateLocation(deleteWriteStatues, jsc(), hoodieTable);
assertEquals(deleteStatus.count(), deleteWriteStatues.count());
assertNoWriteErrors(deleteStatus.collect());
// Ensure no records can be tagged
- List records3 = index.tagLocation(writeRecords, jsc, hoodieTable).collect();
+ List records3 = index.tagLocation(writeRecords, jsc(), hoodieTable).collect();
assertEquals(0, records3.stream().filter(record -> record.isCurrentLocationKnown()).count());
assertEquals(numRecords, records3.stream().map(record -> record.getKey().getRecordKey()).distinct().count());
assertEquals(0, records3.stream().filter(record -> (record.getCurrentLocation() != null
@@ -456,7 +450,7 @@ public class TestHBaseIndex extends HoodieClientTestHarness {
}
private HoodieWriteConfig.Builder getConfigBuilder(int hbaseIndexBatchSize) {
- return HoodieWriteConfig.newBuilder().withPath(basePath).withSchema(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA)
+ return HoodieWriteConfig.newBuilder().withPath(basePath()).withSchema(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA)
.withParallelism(1, 1)
.withCompactionConfig(HoodieCompactionConfig.newBuilder().compactionSmallFileSize(1024 * 1024)
.withInlineCompaction(false).build())
diff --git a/hudi-client/src/test/java/org/apache/hudi/testutils/FunctionalTestHarness.java b/hudi-client/src/test/java/org/apache/hudi/testutils/FunctionalTestHarness.java
index 562563ee5..27db072c7 100644
--- a/hudi-client/src/test/java/org/apache/hudi/testutils/FunctionalTestHarness.java
+++ b/hudi-client/src/test/java/org/apache/hudi/testutils/FunctionalTestHarness.java
@@ -19,14 +19,26 @@
package org.apache.hudi.testutils;
+import org.apache.hudi.client.HoodieReadClient;
import org.apache.hudi.client.HoodieWriteClient;
+import org.apache.hudi.common.model.HoodieAvroPayload;
+import org.apache.hudi.common.table.HoodieTableConfig;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.testutils.minicluster.HdfsTestService;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.index.HoodieIndex;
+import org.apache.hudi.testutils.providers.DFSProvider;
+import org.apache.hudi.testutils.providers.HoodieMetaClientProvider;
+import org.apache.hudi.testutils.providers.HoodieWriteClientProvider;
+import org.apache.hudi.testutils.providers.SparkProvider;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.sql.SparkSession;
@@ -35,8 +47,13 @@ import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.io.TempDir;
import java.io.IOException;
+import java.util.Properties;
-public class FunctionalTestHarness implements SparkProvider, DFSProvider {
+import static org.apache.hudi.common.model.HoodieFileFormat.PARQUET;
+import static org.apache.hudi.common.model.HoodieTableType.COPY_ON_WRITE;
+import static org.apache.hudi.common.testutils.HoodieTestUtils.RAW_TRIPS_TEST_NAME;
+
+public class FunctionalTestHarness implements SparkProvider, DFSProvider, HoodieMetaClientProvider, HoodieWriteClientProvider {
private static transient SparkSession spark;
private static transient SQLContext sqlContext;
@@ -53,6 +70,10 @@ public class FunctionalTestHarness implements SparkProvider, DFSProvider {
@TempDir
protected java.nio.file.Path tempDir;
+ public String basePath() {
+ return tempDir.toAbsolutePath().toString();
+ }
+
@Override
public SparkSession spark() {
return spark;
@@ -83,15 +104,32 @@ public class FunctionalTestHarness implements SparkProvider, DFSProvider {
return dfs.getWorkingDirectory();
}
+ public HoodieTableMetaClient getHoodieMetaClient(Configuration hadoopConf, String basePath) throws IOException {
+ return getHoodieMetaClient(hadoopConf, basePath, new Properties());
+ }
+
+ @Override
+ public HoodieTableMetaClient getHoodieMetaClient(Configuration hadoopConf, String basePath, Properties props) throws IOException {
+ props.putIfAbsent(HoodieTableConfig.HOODIE_BASE_FILE_FORMAT_PROP_NAME, PARQUET.toString());
+ props.putIfAbsent(HoodieTableConfig.HOODIE_TABLE_NAME_PROP_NAME, RAW_TRIPS_TEST_NAME);
+ props.putIfAbsent(HoodieTableConfig.HOODIE_TABLE_TYPE_PROP_NAME, COPY_ON_WRITE.name());
+ props.putIfAbsent(HoodieTableConfig.HOODIE_PAYLOAD_CLASS_PROP_NAME, HoodieAvroPayload.class.getName());
+ return HoodieTableMetaClient.initTableAndGetMetaClient(hadoopConf, basePath, props);
+ }
+
+ @Override
+ public HoodieWriteClient getHoodieWriteClient(HoodieWriteConfig cfg) throws IOException {
+ return new HoodieWriteClient(jsc, cfg, false, HoodieIndex.createIndex(cfg));
+ }
+
@BeforeEach
public synchronized void runBeforeEach() throws Exception {
initialized = spark != null && hdfsTestService != null;
if (!initialized) {
- FileSystem.closeAll();
-
- spark = SparkSession.builder()
- .config(HoodieWriteClient.registerClasses(conf()))
- .getOrCreate();
+ SparkConf sparkConf = conf();
+ HoodieWriteClient.registerClasses(sparkConf);
+ HoodieReadClient.addHoodieSupport(sparkConf);
+ spark = SparkSession.builder().config(sparkConf).getOrCreate();
sqlContext = spark.sqlContext();
jsc = new JavaSparkContext(spark.sparkContext());
diff --git a/hudi-client/src/test/java/org/apache/hudi/testutils/DFSProvider.java b/hudi-client/src/test/java/org/apache/hudi/testutils/providers/DFSProvider.java
similarity index 95%
rename from hudi-client/src/test/java/org/apache/hudi/testutils/DFSProvider.java
rename to hudi-client/src/test/java/org/apache/hudi/testutils/providers/DFSProvider.java
index 16cc47152..62b48cbf7 100644
--- a/hudi-client/src/test/java/org/apache/hudi/testutils/DFSProvider.java
+++ b/hudi-client/src/test/java/org/apache/hudi/testutils/providers/DFSProvider.java
@@ -17,7 +17,7 @@
* under the License.
*/
-package org.apache.hudi.testutils;
+package org.apache.hudi.testutils.providers;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.DistributedFileSystem;
diff --git a/hudi-client/src/test/java/org/apache/hudi/testutils/providers/HoodieMetaClientProvider.java b/hudi-client/src/test/java/org/apache/hudi/testutils/providers/HoodieMetaClientProvider.java
new file mode 100644
index 000000000..0cd7ed5a7
--- /dev/null
+++ b/hudi-client/src/test/java/org/apache/hudi/testutils/providers/HoodieMetaClientProvider.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.testutils.providers;
+
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+
+import java.io.IOException;
+import java.util.Properties;
+
+public interface HoodieMetaClientProvider {
+
+ HoodieTableMetaClient getHoodieMetaClient(Configuration hadoopConf, String basePath, Properties props) throws IOException;
+
+ default HoodieTableFileSystemView getHoodieTableFileSystemView(
+ HoodieTableMetaClient metaClient, HoodieTimeline visibleActiveTimeline, FileStatus[] fileStatuses) {
+ return new HoodieTableFileSystemView(metaClient, visibleActiveTimeline, fileStatuses);
+ }
+}
diff --git a/hudi-client/src/test/java/org/apache/hudi/testutils/providers/HoodieWriteClientProvider.java b/hudi-client/src/test/java/org/apache/hudi/testutils/providers/HoodieWriteClientProvider.java
new file mode 100644
index 000000000..4840af0be
--- /dev/null
+++ b/hudi-client/src/test/java/org/apache/hudi/testutils/providers/HoodieWriteClientProvider.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.testutils.providers;
+
+import org.apache.hudi.client.HoodieWriteClient;
+import org.apache.hudi.config.HoodieWriteConfig;
+
+import java.io.IOException;
+
+public interface HoodieWriteClientProvider {
+
+ HoodieWriteClient getHoodieWriteClient(HoodieWriteConfig cfg) throws IOException;
+}
diff --git a/hudi-client/src/test/java/org/apache/hudi/testutils/SparkProvider.java b/hudi-client/src/test/java/org/apache/hudi/testutils/providers/SparkProvider.java
similarity index 97%
rename from hudi-client/src/test/java/org/apache/hudi/testutils/SparkProvider.java
rename to hudi-client/src/test/java/org/apache/hudi/testutils/providers/SparkProvider.java
index 948f73696..cdf5ac4ad 100644
--- a/hudi-client/src/test/java/org/apache/hudi/testutils/SparkProvider.java
+++ b/hudi-client/src/test/java/org/apache/hudi/testutils/providers/SparkProvider.java
@@ -17,7 +17,7 @@
* under the License.
*/
-package org.apache.hudi.testutils;
+package org.apache.hudi.testutils.providers;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;