From b399b4ad43b8caa69f129e756e51ad4bc8c81de2 Mon Sep 17 00:00:00 2001 From: Raymond Xu <2701446+xushiyan@users.noreply.github.com> Date: Tue, 14 Jul 2020 17:28:50 -0700 Subject: [PATCH] [HUDI-996] Add functional test in hudi-client (#1824) - Add functional test suite in hudi-client - Tag TestHBaseIndex as functional --- hudi-client/pom.xml | 18 +++++ .../hudi/ClientFunctionalTestSuite.java | 32 ++++++++ .../hudi/index/hbase/TestHBaseIndex.java | 80 +++++++++---------- .../hudi/testutils/FunctionalTestHarness.java | 50 ++++++++++-- .../{ => providers}/DFSProvider.java | 2 +- .../providers/HoodieMetaClientProvider.java | 40 ++++++++++ .../providers/HoodieWriteClientProvider.java | 30 +++++++ .../{ => providers}/SparkProvider.java | 2 +- 8 files changed, 203 insertions(+), 51 deletions(-) create mode 100644 hudi-client/src/test/java/org/apache/hudi/ClientFunctionalTestSuite.java rename hudi-client/src/test/java/org/apache/hudi/testutils/{ => providers}/DFSProvider.java (95%) create mode 100644 hudi-client/src/test/java/org/apache/hudi/testutils/providers/HoodieMetaClientProvider.java create mode 100644 hudi-client/src/test/java/org/apache/hudi/testutils/providers/HoodieWriteClientProvider.java rename hudi-client/src/test/java/org/apache/hudi/testutils/{ => providers}/SparkProvider.java (97%) diff --git a/hudi-client/pom.xml b/hudi-client/pom.xml index 326cf83ff..a390923bb 100644 --- a/hudi-client/pom.xml +++ b/hudi-client/pom.xml @@ -268,5 +268,23 @@ mockito-junit-jupiter test + + + org.junit.platform + junit-platform-runner + test + + + + org.junit.platform + junit-platform-suite-api + test + + + + org.junit.platform + junit-platform-commons + test + diff --git a/hudi-client/src/test/java/org/apache/hudi/ClientFunctionalTestSuite.java b/hudi-client/src/test/java/org/apache/hudi/ClientFunctionalTestSuite.java new file mode 100644 index 000000000..4e62618c8 --- /dev/null +++ b/hudi-client/src/test/java/org/apache/hudi/ClientFunctionalTestSuite.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hudi; + +import org.junit.platform.runner.JUnitPlatform; +import org.junit.platform.suite.api.IncludeTags; +import org.junit.platform.suite.api.SelectPackages; +import org.junit.runner.RunWith; + +@RunWith(JUnitPlatform.class) +@SelectPackages("org.apache.hudi.index") +@IncludeTags("functional") +public class ClientFunctionalTestSuite { + +} diff --git a/hudi-client/src/test/java/org/apache/hudi/index/hbase/TestHBaseIndex.java b/hudi-client/src/test/java/org/apache/hudi/index/hbase/TestHBaseIndex.java index 6b6f11f79..b3d2f5a5d 100644 --- a/hudi-client/src/test/java/org/apache/hudi/index/hbase/TestHBaseIndex.java +++ b/hudi-client/src/test/java/org/apache/hudi/index/hbase/TestHBaseIndex.java @@ -31,7 +31,7 @@ import org.apache.hudi.config.HoodieStorageConfig; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.index.HoodieIndex; import org.apache.hudi.table.HoodieTable; -import org.apache.hudi.testutils.HoodieClientTestHarness; +import org.apache.hudi.testutils.FunctionalTestHarness; import org.apache.hudi.testutils.HoodieTestDataGenerator; import org.apache.hadoop.conf.Configuration; @@ -47,10 +47,10 @@ import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.util.Bytes; import org.apache.spark.api.java.JavaRDD; import org.junit.jupiter.api.AfterAll; -import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.MethodOrderer; +import org.junit.jupiter.api.Tag; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.TestMethodOrder; @@ -76,12 +76,17 @@ import static org.mockito.Mockito.when; * {@link MethodOrderer.Alphanumeric} to make sure the tests run in order. Please alter the order of tests running carefully. */ @TestMethodOrder(MethodOrderer.Alphanumeric.class) -public class TestHBaseIndex extends HoodieClientTestHarness { +@Tag("functional") +public class TestHBaseIndex extends FunctionalTestHarness { private static final String TABLE_NAME = "test_table"; private static HBaseTestingUtility utility; private static Configuration hbaseConfig; + private Configuration hadoopConf; + private HoodieTestDataGenerator dataGen; + private HoodieTableMetaClient metaClient; + @AfterAll public static void clean() throws Exception { if (utility != null) { @@ -104,21 +109,10 @@ public class TestHBaseIndex extends HoodieClientTestHarness { @BeforeEach public void setUp() throws Exception { - // Initialize a local spark env - initSparkContexts("TestHBaseIndex"); + hadoopConf = jsc().hadoopConfiguration(); hadoopConf.addResource(utility.getConfiguration()); - - // Create a temp folder as the base path - initPath(); - initTestDataGenerator(); - initMetaClient(); - } - - @AfterEach - public void tearDown() throws Exception { - cleanupSparkContexts(); - cleanupTestDataGenerator(); - cleanupClients(); + metaClient = getHoodieMetaClient(hadoopConf, basePath()); + dataGen = new HoodieTestDataGenerator(); } @Test @@ -126,7 +120,7 @@ public class TestHBaseIndex extends HoodieClientTestHarness { final String newCommitTime = "001"; final int numRecords = 10; List records = dataGen.generateInserts(newCommitTime, numRecords); - JavaRDD writeRecords = jsc.parallelize(records, 1); + JavaRDD writeRecords = jsc().parallelize(records, 1); // Load to memory HoodieWriteConfig config = getConfig(); @@ -136,7 +130,7 @@ public class TestHBaseIndex extends HoodieClientTestHarness { HoodieTable hoodieTable = HoodieTable.create(metaClient, config, hadoopConf); // Test tagLocation without any entries in index - JavaRDD records1 = index.tagLocation(writeRecords, jsc, hoodieTable); + JavaRDD records1 = index.tagLocation(writeRecords, jsc(), hoodieTable); assertEquals(0, records1.filter(record -> record.isCurrentLocationKnown()).count()); // Insert 200 records @@ -146,7 +140,7 @@ public class TestHBaseIndex extends HoodieClientTestHarness { // Now tagLocation for these records, hbaseIndex should not tag them since it was a failed // commit - JavaRDD records2 = index.tagLocation(writeRecords, jsc, hoodieTable); + JavaRDD records2 = index.tagLocation(writeRecords, jsc(), hoodieTable); assertEquals(0, records2.filter(record -> record.isCurrentLocationKnown()).count()); // Now commit this & update location of records inserted and validate no errors @@ -154,7 +148,7 @@ public class TestHBaseIndex extends HoodieClientTestHarness { // Now tagLocation for these records, hbaseIndex should tag them correctly metaClient = HoodieTableMetaClient.reload(metaClient); hoodieTable = HoodieTable.create(metaClient, config, hadoopConf); - List records3 = index.tagLocation(writeRecords, jsc, hoodieTable).collect(); + List records3 = index.tagLocation(writeRecords, jsc(), hoodieTable).collect(); assertEquals(numRecords, records3.stream().filter(record -> record.isCurrentLocationKnown()).count()); assertEquals(numRecords, records3.stream().map(record -> record.getKey().getRecordKey()).distinct().count()); assertEquals(numRecords, records3.stream().filter(record -> (record.getCurrentLocation() != null @@ -167,7 +161,7 @@ public class TestHBaseIndex extends HoodieClientTestHarness { final String newCommitTime = "001"; final int numRecords = 10; List records = dataGen.generateInserts(newCommitTime, numRecords); - JavaRDD writeRecords = jsc.parallelize(records, 1); + JavaRDD writeRecords = jsc().parallelize(records, 1); // Load to memory HoodieWriteConfig config = getConfig(); @@ -178,7 +172,7 @@ public class TestHBaseIndex extends HoodieClientTestHarness { HoodieTable hoodieTable = HoodieTable.create(metaClient, config, hadoopConf); JavaRDD writeStatues = writeClient.upsert(writeRecords, newCommitTime); - index.tagLocation(writeRecords, jsc, hoodieTable); + index.tagLocation(writeRecords, jsc(), hoodieTable); // Duplicate upsert and ensure correctness is maintained // We are trying to approximately imitate the case when the RDD is recomputed. For RDD creating, driver code is not @@ -194,7 +188,7 @@ public class TestHBaseIndex extends HoodieClientTestHarness { // Now tagLocation for these records, hbaseIndex should tag them correctly metaClient = HoodieTableMetaClient.reload(metaClient); hoodieTable = HoodieTable.create(metaClient, config, hadoopConf); - List taggedRecords = index.tagLocation(writeRecords, jsc, hoodieTable).collect(); + List taggedRecords = index.tagLocation(writeRecords, jsc(), hoodieTable).collect(); assertEquals(numRecords, taggedRecords.stream().filter(HoodieRecord::isCurrentLocationKnown).count()); assertEquals(numRecords, taggedRecords.stream().map(record -> record.getKey().getRecordKey()).distinct().count()); assertEquals(numRecords, taggedRecords.stream().filter(record -> (record.getCurrentLocation() != null @@ -211,7 +205,7 @@ public class TestHBaseIndex extends HoodieClientTestHarness { final String newCommitTime = writeClient.startCommit(); final int numRecords = 10; List records = dataGen.generateInserts(newCommitTime, numRecords); - JavaRDD writeRecords = jsc.parallelize(records, 1); + JavaRDD writeRecords = jsc().parallelize(records, 1); metaClient = HoodieTableMetaClient.reload(metaClient); // Insert 200 records @@ -222,7 +216,7 @@ public class TestHBaseIndex extends HoodieClientTestHarness { writeClient.commit(newCommitTime, writeStatues); HoodieTable hoodieTable = HoodieTable.create(metaClient, config, hadoopConf); // Now tagLocation for these records, hbaseIndex should tag them - List records2 = index.tagLocation(writeRecords, jsc, hoodieTable).collect(); + List records2 = index.tagLocation(writeRecords, jsc(), hoodieTable).collect(); assertEquals(numRecords, records2.stream().filter(HoodieRecord::isCurrentLocationKnown).count()); // check tagged records are tagged with correct fileIds @@ -238,7 +232,7 @@ public class TestHBaseIndex extends HoodieClientTestHarness { hoodieTable = HoodieTable.create(metaClient, config, hadoopConf); // Now tagLocation for these records, hbaseIndex should not tag them since it was a rolled // back commit - List records3 = index.tagLocation(writeRecords, jsc, hoodieTable).collect(); + List records3 = index.tagLocation(writeRecords, jsc(), hoodieTable).collect(); assertEquals(0, records3.stream().filter(HoodieRecord::isCurrentLocationKnown).count()); assertEquals(0, records3.stream().filter(record -> record.getCurrentLocation() != null).count()); } @@ -262,7 +256,7 @@ public class TestHBaseIndex extends HoodieClientTestHarness { // start a commit and generate test data String newCommitTime = writeClient.startCommit(); List records = dataGen.generateInserts(newCommitTime, 250); - JavaRDD writeRecords = jsc.parallelize(records, 1); + JavaRDD writeRecords = jsc().parallelize(records, 1); metaClient = HoodieTableMetaClient.reload(metaClient); HoodieTable hoodieTable = HoodieTable.create(metaClient, config, hadoopConf); @@ -271,7 +265,7 @@ public class TestHBaseIndex extends HoodieClientTestHarness { assertNoWriteErrors(writeStatues.collect()); // Now tagLocation for these records, hbaseIndex should tag them - index.tagLocation(writeRecords, jsc, hoodieTable); + index.tagLocation(writeRecords, jsc(), hoodieTable); // 3 batches should be executed given batchSize = 100 and parallelism = 1 verify(table, times(3)).get((List) any()); @@ -287,7 +281,7 @@ public class TestHBaseIndex extends HoodieClientTestHarness { // start a commit and generate test data String newCommitTime = writeClient.startCommit(); List records = dataGen.generateInserts(newCommitTime, 250); - JavaRDD writeRecords = jsc.parallelize(records, 1); + JavaRDD writeRecords = jsc().parallelize(records, 1); metaClient = HoodieTableMetaClient.reload(metaClient); HoodieTable hoodieTable = HoodieTable.create(metaClient, config, hadoopConf); @@ -309,7 +303,7 @@ public class TestHBaseIndex extends HoodieClientTestHarness { // Get all the files generated int numberOfDataFileIds = (int) writeStatues.map(status -> status.getFileId()).distinct().count(); - index.updateLocation(writeStatues, jsc, hoodieTable); + index.updateLocation(writeStatues, jsc(), hoodieTable); // 3 batches should be executed given batchSize = 100 and <=numberOfDataFileIds getting updated, // so each fileId ideally gets updates verify(table, atMost(numberOfDataFileIds)).put((List) any()); @@ -319,7 +313,7 @@ public class TestHBaseIndex extends HoodieClientTestHarness { public void testsHBasePutAccessParallelism() { HoodieWriteConfig config = getConfig(); HBaseIndex index = new HBaseIndex(config); - final JavaRDD writeStatusRDD = jsc.parallelize( + final JavaRDD writeStatusRDD = jsc().parallelize( Arrays.asList(getSampleWriteStatus(1, 2), getSampleWriteStatus(0, 3), getSampleWriteStatus(10, 0)), 10); final Tuple2 tuple = index.getHBasePutAccessParallelism(writeStatusRDD); final int hbasePutAccessParallelism = Integer.parseInt(tuple._2.toString()); @@ -334,7 +328,7 @@ public class TestHBaseIndex extends HoodieClientTestHarness { HoodieWriteConfig config = getConfig(); HBaseIndex index = new HBaseIndex(config); final JavaRDD writeStatusRDD = - jsc.parallelize(Arrays.asList(getSampleWriteStatus(0, 2), getSampleWriteStatus(0, 1)), 10); + jsc().parallelize(Arrays.asList(getSampleWriteStatus(0, 2), getSampleWriteStatus(0, 1)), 10); final Tuple2 tuple = index.getHBasePutAccessParallelism(writeStatusRDD); final int hbasePutAccessParallelism = Integer.parseInt(tuple._2.toString()); final int hbaseNumPuts = Integer.parseInt(tuple._1.toString()); @@ -348,7 +342,7 @@ public class TestHBaseIndex extends HoodieClientTestHarness { final String newCommitTime = "001"; final int numRecords = 10; List records = dataGen.generateInserts(newCommitTime, numRecords); - JavaRDD writeRecords = jsc.parallelize(records, 1); + JavaRDD writeRecords = jsc().parallelize(records, 1); // Load to memory HoodieWriteConfig config = getConfig(2); @@ -358,7 +352,7 @@ public class TestHBaseIndex extends HoodieClientTestHarness { HoodieTable hoodieTable = HoodieTable.create(metaClient, config, hadoopConf); // Test tagLocation without any entries in index - JavaRDD records1 = index.tagLocation(writeRecords, jsc, hoodieTable); + JavaRDD records1 = index.tagLocation(writeRecords, jsc(), hoodieTable); assertEquals(0, records1.filter(record -> record.isCurrentLocationKnown()).count()); // Insert 200 records writeClient.startCommitWithTime(newCommitTime); @@ -367,7 +361,7 @@ public class TestHBaseIndex extends HoodieClientTestHarness { // Now tagLocation for these records, hbaseIndex should not tag them since it was a failed // commit - JavaRDD records2 = index.tagLocation(writeRecords, jsc, hoodieTable); + JavaRDD records2 = index.tagLocation(writeRecords, jsc(), hoodieTable); assertEquals(0, records2.filter(record -> record.isCurrentLocationKnown()).count()); // Now commit this & update location of records inserted and validate no errors @@ -375,7 +369,7 @@ public class TestHBaseIndex extends HoodieClientTestHarness { // Now tagLocation for these records, hbaseIndex should tag them correctly metaClient = HoodieTableMetaClient.reload(metaClient); hoodieTable = HoodieTable.create(metaClient, config, hadoopConf); - List records3 = index.tagLocation(writeRecords, jsc, hoodieTable).collect(); + List records3 = index.tagLocation(writeRecords, jsc(), hoodieTable).collect(); assertEquals(numRecords, records3.stream().filter(record -> record.isCurrentLocationKnown()).count()); assertEquals(numRecords, records3.stream().map(record -> record.getKey().getRecordKey()).distinct().count()); assertEquals(numRecords, records3.stream().filter(record -> (record.getCurrentLocation() != null @@ -388,7 +382,7 @@ public class TestHBaseIndex extends HoodieClientTestHarness { final String newCommitTime = "001"; final int numRecords = 10; List records = dataGen.generateInserts(newCommitTime, numRecords); - JavaRDD writeRecords = jsc.parallelize(records, 1); + JavaRDD writeRecords = jsc().parallelize(records, 1); // Load to memory HoodieWriteConfig config = getConfig(); @@ -398,7 +392,7 @@ public class TestHBaseIndex extends HoodieClientTestHarness { HoodieTable hoodieTable = HoodieTable.create(metaClient, config, hadoopConf); // Test tagLocation without any entries in index - JavaRDD records1 = index.tagLocation(writeRecords, jsc, hoodieTable); + JavaRDD records1 = index.tagLocation(writeRecords, jsc(), hoodieTable); assertEquals(0, records1.filter(record -> record.isCurrentLocationKnown()).count()); // Insert records @@ -410,7 +404,7 @@ public class TestHBaseIndex extends HoodieClientTestHarness { // Now tagLocation for these records, hbaseIndex should tag them correctly metaClient = HoodieTableMetaClient.reload(metaClient); hoodieTable = HoodieTable.create(metaClient, config, hadoopConf); - List records2 = index.tagLocation(writeRecords, jsc, hoodieTable).collect(); + List records2 = index.tagLocation(writeRecords, jsc(), hoodieTable).collect(); assertEquals(numRecords, records2.stream().filter(record -> record.isCurrentLocationKnown()).count()); assertEquals(numRecords, records2.stream().map(record -> record.getKey().getRecordKey()).distinct().count()); assertEquals(numRecords, records2.stream().filter(record -> (record.getCurrentLocation() != null @@ -425,12 +419,12 @@ public class TestHBaseIndex extends HoodieClientTestHarness { newWriteStatus.setStat(new HoodieWriteStat()); return newWriteStatus; }); - JavaRDD deleteStatus = index.updateLocation(deleteWriteStatues, jsc, hoodieTable); + JavaRDD deleteStatus = index.updateLocation(deleteWriteStatues, jsc(), hoodieTable); assertEquals(deleteStatus.count(), deleteWriteStatues.count()); assertNoWriteErrors(deleteStatus.collect()); // Ensure no records can be tagged - List records3 = index.tagLocation(writeRecords, jsc, hoodieTable).collect(); + List records3 = index.tagLocation(writeRecords, jsc(), hoodieTable).collect(); assertEquals(0, records3.stream().filter(record -> record.isCurrentLocationKnown()).count()); assertEquals(numRecords, records3.stream().map(record -> record.getKey().getRecordKey()).distinct().count()); assertEquals(0, records3.stream().filter(record -> (record.getCurrentLocation() != null @@ -456,7 +450,7 @@ public class TestHBaseIndex extends HoodieClientTestHarness { } private HoodieWriteConfig.Builder getConfigBuilder(int hbaseIndexBatchSize) { - return HoodieWriteConfig.newBuilder().withPath(basePath).withSchema(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA) + return HoodieWriteConfig.newBuilder().withPath(basePath()).withSchema(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA) .withParallelism(1, 1) .withCompactionConfig(HoodieCompactionConfig.newBuilder().compactionSmallFileSize(1024 * 1024) .withInlineCompaction(false).build()) diff --git a/hudi-client/src/test/java/org/apache/hudi/testutils/FunctionalTestHarness.java b/hudi-client/src/test/java/org/apache/hudi/testutils/FunctionalTestHarness.java index 562563ee5..27db072c7 100644 --- a/hudi-client/src/test/java/org/apache/hudi/testutils/FunctionalTestHarness.java +++ b/hudi-client/src/test/java/org/apache/hudi/testutils/FunctionalTestHarness.java @@ -19,14 +19,26 @@ package org.apache.hudi.testutils; +import org.apache.hudi.client.HoodieReadClient; import org.apache.hudi.client.HoodieWriteClient; +import org.apache.hudi.common.model.HoodieAvroPayload; +import org.apache.hudi.common.table.HoodieTableConfig; +import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.testutils.minicluster.HdfsTestService; +import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.index.HoodieIndex; +import org.apache.hudi.testutils.providers.DFSProvider; +import org.apache.hudi.testutils.providers.HoodieMetaClientProvider; +import org.apache.hudi.testutils.providers.HoodieWriteClientProvider; +import org.apache.hudi.testutils.providers.SparkProvider; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hdfs.DistributedFileSystem; import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.sql.SQLContext; import org.apache.spark.sql.SparkSession; @@ -35,8 +47,13 @@ import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.io.TempDir; import java.io.IOException; +import java.util.Properties; -public class FunctionalTestHarness implements SparkProvider, DFSProvider { +import static org.apache.hudi.common.model.HoodieFileFormat.PARQUET; +import static org.apache.hudi.common.model.HoodieTableType.COPY_ON_WRITE; +import static org.apache.hudi.common.testutils.HoodieTestUtils.RAW_TRIPS_TEST_NAME; + +public class FunctionalTestHarness implements SparkProvider, DFSProvider, HoodieMetaClientProvider, HoodieWriteClientProvider { private static transient SparkSession spark; private static transient SQLContext sqlContext; @@ -53,6 +70,10 @@ public class FunctionalTestHarness implements SparkProvider, DFSProvider { @TempDir protected java.nio.file.Path tempDir; + public String basePath() { + return tempDir.toAbsolutePath().toString(); + } + @Override public SparkSession spark() { return spark; @@ -83,15 +104,32 @@ public class FunctionalTestHarness implements SparkProvider, DFSProvider { return dfs.getWorkingDirectory(); } + public HoodieTableMetaClient getHoodieMetaClient(Configuration hadoopConf, String basePath) throws IOException { + return getHoodieMetaClient(hadoopConf, basePath, new Properties()); + } + + @Override + public HoodieTableMetaClient getHoodieMetaClient(Configuration hadoopConf, String basePath, Properties props) throws IOException { + props.putIfAbsent(HoodieTableConfig.HOODIE_BASE_FILE_FORMAT_PROP_NAME, PARQUET.toString()); + props.putIfAbsent(HoodieTableConfig.HOODIE_TABLE_NAME_PROP_NAME, RAW_TRIPS_TEST_NAME); + props.putIfAbsent(HoodieTableConfig.HOODIE_TABLE_TYPE_PROP_NAME, COPY_ON_WRITE.name()); + props.putIfAbsent(HoodieTableConfig.HOODIE_PAYLOAD_CLASS_PROP_NAME, HoodieAvroPayload.class.getName()); + return HoodieTableMetaClient.initTableAndGetMetaClient(hadoopConf, basePath, props); + } + + @Override + public HoodieWriteClient getHoodieWriteClient(HoodieWriteConfig cfg) throws IOException { + return new HoodieWriteClient(jsc, cfg, false, HoodieIndex.createIndex(cfg)); + } + @BeforeEach public synchronized void runBeforeEach() throws Exception { initialized = spark != null && hdfsTestService != null; if (!initialized) { - FileSystem.closeAll(); - - spark = SparkSession.builder() - .config(HoodieWriteClient.registerClasses(conf())) - .getOrCreate(); + SparkConf sparkConf = conf(); + HoodieWriteClient.registerClasses(sparkConf); + HoodieReadClient.addHoodieSupport(sparkConf); + spark = SparkSession.builder().config(sparkConf).getOrCreate(); sqlContext = spark.sqlContext(); jsc = new JavaSparkContext(spark.sparkContext()); diff --git a/hudi-client/src/test/java/org/apache/hudi/testutils/DFSProvider.java b/hudi-client/src/test/java/org/apache/hudi/testutils/providers/DFSProvider.java similarity index 95% rename from hudi-client/src/test/java/org/apache/hudi/testutils/DFSProvider.java rename to hudi-client/src/test/java/org/apache/hudi/testutils/providers/DFSProvider.java index 16cc47152..62b48cbf7 100644 --- a/hudi-client/src/test/java/org/apache/hudi/testutils/DFSProvider.java +++ b/hudi-client/src/test/java/org/apache/hudi/testutils/providers/DFSProvider.java @@ -17,7 +17,7 @@ * under the License. */ -package org.apache.hudi.testutils; +package org.apache.hudi.testutils.providers; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hdfs.DistributedFileSystem; diff --git a/hudi-client/src/test/java/org/apache/hudi/testutils/providers/HoodieMetaClientProvider.java b/hudi-client/src/test/java/org/apache/hudi/testutils/providers/HoodieMetaClientProvider.java new file mode 100644 index 000000000..0cd7ed5a7 --- /dev/null +++ b/hudi-client/src/test/java/org/apache/hudi/testutils/providers/HoodieMetaClientProvider.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hudi.testutils.providers; + +import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.common.table.timeline.HoodieTimeline; +import org.apache.hudi.common.table.view.HoodieTableFileSystemView; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; + +import java.io.IOException; +import java.util.Properties; + +public interface HoodieMetaClientProvider { + + HoodieTableMetaClient getHoodieMetaClient(Configuration hadoopConf, String basePath, Properties props) throws IOException; + + default HoodieTableFileSystemView getHoodieTableFileSystemView( + HoodieTableMetaClient metaClient, HoodieTimeline visibleActiveTimeline, FileStatus[] fileStatuses) { + return new HoodieTableFileSystemView(metaClient, visibleActiveTimeline, fileStatuses); + } +} diff --git a/hudi-client/src/test/java/org/apache/hudi/testutils/providers/HoodieWriteClientProvider.java b/hudi-client/src/test/java/org/apache/hudi/testutils/providers/HoodieWriteClientProvider.java new file mode 100644 index 000000000..4840af0be --- /dev/null +++ b/hudi-client/src/test/java/org/apache/hudi/testutils/providers/HoodieWriteClientProvider.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hudi.testutils.providers; + +import org.apache.hudi.client.HoodieWriteClient; +import org.apache.hudi.config.HoodieWriteConfig; + +import java.io.IOException; + +public interface HoodieWriteClientProvider { + + HoodieWriteClient getHoodieWriteClient(HoodieWriteConfig cfg) throws IOException; +} diff --git a/hudi-client/src/test/java/org/apache/hudi/testutils/SparkProvider.java b/hudi-client/src/test/java/org/apache/hudi/testutils/providers/SparkProvider.java similarity index 97% rename from hudi-client/src/test/java/org/apache/hudi/testutils/SparkProvider.java rename to hudi-client/src/test/java/org/apache/hudi/testutils/providers/SparkProvider.java index 948f73696..cdf5ac4ad 100644 --- a/hudi-client/src/test/java/org/apache/hudi/testutils/SparkProvider.java +++ b/hudi-client/src/test/java/org/apache/hudi/testutils/providers/SparkProvider.java @@ -17,7 +17,7 @@ * under the License. */ -package org.apache.hudi.testutils; +package org.apache.hudi.testutils.providers; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaSparkContext;