1
0

[HUDI-996] Add functional test in hudi-client (#1824)

- Add functional test suite in hudi-client
- Tag TestHBaseIndex as functional
This commit is contained in:
Raymond Xu
2020-07-14 17:28:50 -07:00
committed by GitHub
parent f5dc8ca733
commit b399b4ad43
8 changed files with 203 additions and 51 deletions

View File

@@ -268,5 +268,23 @@
<artifactId>mockito-junit-jupiter</artifactId> <artifactId>mockito-junit-jupiter</artifactId>
<scope>test</scope> <scope>test</scope>
</dependency> </dependency>
<dependency>
<groupId>org.junit.platform</groupId>
<artifactId>junit-platform-runner</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.junit.platform</groupId>
<artifactId>junit-platform-suite-api</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.junit.platform</groupId>
<artifactId>junit-platform-commons</artifactId>
<scope>test</scope>
</dependency>
</dependencies> </dependencies>
</project> </project>

View File

@@ -0,0 +1,32 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.hudi;
import org.junit.platform.runner.JUnitPlatform;
import org.junit.platform.suite.api.IncludeTags;
import org.junit.platform.suite.api.SelectPackages;
import org.junit.runner.RunWith;
@RunWith(JUnitPlatform.class)
@SelectPackages("org.apache.hudi.index")
@IncludeTags("functional")
public class ClientFunctionalTestSuite {
}

View File

@@ -31,7 +31,7 @@ import org.apache.hudi.config.HoodieStorageConfig;
import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.index.HoodieIndex; import org.apache.hudi.index.HoodieIndex;
import org.apache.hudi.table.HoodieTable; import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.testutils.HoodieClientTestHarness; import org.apache.hudi.testutils.FunctionalTestHarness;
import org.apache.hudi.testutils.HoodieTestDataGenerator; import org.apache.hudi.testutils.HoodieTestDataGenerator;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
@@ -47,10 +47,10 @@ import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Bytes;
import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaRDD;
import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.MethodOrderer; import org.junit.jupiter.api.MethodOrderer;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestMethodOrder; import org.junit.jupiter.api.TestMethodOrder;
@@ -76,12 +76,17 @@ import static org.mockito.Mockito.when;
* {@link MethodOrderer.Alphanumeric} to make sure the tests run in order. Please alter the order of tests running carefully. * {@link MethodOrderer.Alphanumeric} to make sure the tests run in order. Please alter the order of tests running carefully.
*/ */
@TestMethodOrder(MethodOrderer.Alphanumeric.class) @TestMethodOrder(MethodOrderer.Alphanumeric.class)
public class TestHBaseIndex extends HoodieClientTestHarness { @Tag("functional")
public class TestHBaseIndex extends FunctionalTestHarness {
private static final String TABLE_NAME = "test_table"; private static final String TABLE_NAME = "test_table";
private static HBaseTestingUtility utility; private static HBaseTestingUtility utility;
private static Configuration hbaseConfig; private static Configuration hbaseConfig;
private Configuration hadoopConf;
private HoodieTestDataGenerator dataGen;
private HoodieTableMetaClient metaClient;
@AfterAll @AfterAll
public static void clean() throws Exception { public static void clean() throws Exception {
if (utility != null) { if (utility != null) {
@@ -104,21 +109,10 @@ public class TestHBaseIndex extends HoodieClientTestHarness {
@BeforeEach @BeforeEach
public void setUp() throws Exception { public void setUp() throws Exception {
// Initialize a local spark env hadoopConf = jsc().hadoopConfiguration();
initSparkContexts("TestHBaseIndex");
hadoopConf.addResource(utility.getConfiguration()); hadoopConf.addResource(utility.getConfiguration());
metaClient = getHoodieMetaClient(hadoopConf, basePath());
// Create a temp folder as the base path dataGen = new HoodieTestDataGenerator();
initPath();
initTestDataGenerator();
initMetaClient();
}
@AfterEach
public void tearDown() throws Exception {
cleanupSparkContexts();
cleanupTestDataGenerator();
cleanupClients();
} }
@Test @Test
@@ -126,7 +120,7 @@ public class TestHBaseIndex extends HoodieClientTestHarness {
final String newCommitTime = "001"; final String newCommitTime = "001";
final int numRecords = 10; final int numRecords = 10;
List<HoodieRecord> records = dataGen.generateInserts(newCommitTime, numRecords); List<HoodieRecord> records = dataGen.generateInserts(newCommitTime, numRecords);
JavaRDD<HoodieRecord> writeRecords = jsc.parallelize(records, 1); JavaRDD<HoodieRecord> writeRecords = jsc().parallelize(records, 1);
// Load to memory // Load to memory
HoodieWriteConfig config = getConfig(); HoodieWriteConfig config = getConfig();
@@ -136,7 +130,7 @@ public class TestHBaseIndex extends HoodieClientTestHarness {
HoodieTable hoodieTable = HoodieTable.create(metaClient, config, hadoopConf); HoodieTable hoodieTable = HoodieTable.create(metaClient, config, hadoopConf);
// Test tagLocation without any entries in index // Test tagLocation without any entries in index
JavaRDD<HoodieRecord> records1 = index.tagLocation(writeRecords, jsc, hoodieTable); JavaRDD<HoodieRecord> records1 = index.tagLocation(writeRecords, jsc(), hoodieTable);
assertEquals(0, records1.filter(record -> record.isCurrentLocationKnown()).count()); assertEquals(0, records1.filter(record -> record.isCurrentLocationKnown()).count());
// Insert 200 records // Insert 200 records
@@ -146,7 +140,7 @@ public class TestHBaseIndex extends HoodieClientTestHarness {
// Now tagLocation for these records, hbaseIndex should not tag them since it was a failed // Now tagLocation for these records, hbaseIndex should not tag them since it was a failed
// commit // commit
JavaRDD<HoodieRecord> records2 = index.tagLocation(writeRecords, jsc, hoodieTable); JavaRDD<HoodieRecord> records2 = index.tagLocation(writeRecords, jsc(), hoodieTable);
assertEquals(0, records2.filter(record -> record.isCurrentLocationKnown()).count()); assertEquals(0, records2.filter(record -> record.isCurrentLocationKnown()).count());
// Now commit this & update location of records inserted and validate no errors // Now commit this & update location of records inserted and validate no errors
@@ -154,7 +148,7 @@ public class TestHBaseIndex extends HoodieClientTestHarness {
// Now tagLocation for these records, hbaseIndex should tag them correctly // Now tagLocation for these records, hbaseIndex should tag them correctly
metaClient = HoodieTableMetaClient.reload(metaClient); metaClient = HoodieTableMetaClient.reload(metaClient);
hoodieTable = HoodieTable.create(metaClient, config, hadoopConf); hoodieTable = HoodieTable.create(metaClient, config, hadoopConf);
List<HoodieRecord> records3 = index.tagLocation(writeRecords, jsc, hoodieTable).collect(); List<HoodieRecord> records3 = index.tagLocation(writeRecords, jsc(), hoodieTable).collect();
assertEquals(numRecords, records3.stream().filter(record -> record.isCurrentLocationKnown()).count()); assertEquals(numRecords, records3.stream().filter(record -> record.isCurrentLocationKnown()).count());
assertEquals(numRecords, records3.stream().map(record -> record.getKey().getRecordKey()).distinct().count()); assertEquals(numRecords, records3.stream().map(record -> record.getKey().getRecordKey()).distinct().count());
assertEquals(numRecords, records3.stream().filter(record -> (record.getCurrentLocation() != null assertEquals(numRecords, records3.stream().filter(record -> (record.getCurrentLocation() != null
@@ -167,7 +161,7 @@ public class TestHBaseIndex extends HoodieClientTestHarness {
final String newCommitTime = "001"; final String newCommitTime = "001";
final int numRecords = 10; final int numRecords = 10;
List<HoodieRecord> records = dataGen.generateInserts(newCommitTime, numRecords); List<HoodieRecord> records = dataGen.generateInserts(newCommitTime, numRecords);
JavaRDD<HoodieRecord> writeRecords = jsc.parallelize(records, 1); JavaRDD<HoodieRecord> writeRecords = jsc().parallelize(records, 1);
// Load to memory // Load to memory
HoodieWriteConfig config = getConfig(); HoodieWriteConfig config = getConfig();
@@ -178,7 +172,7 @@ public class TestHBaseIndex extends HoodieClientTestHarness {
HoodieTable hoodieTable = HoodieTable.create(metaClient, config, hadoopConf); HoodieTable hoodieTable = HoodieTable.create(metaClient, config, hadoopConf);
JavaRDD<WriteStatus> writeStatues = writeClient.upsert(writeRecords, newCommitTime); JavaRDD<WriteStatus> writeStatues = writeClient.upsert(writeRecords, newCommitTime);
index.tagLocation(writeRecords, jsc, hoodieTable); index.tagLocation(writeRecords, jsc(), hoodieTable);
// Duplicate upsert and ensure correctness is maintained // Duplicate upsert and ensure correctness is maintained
// We are trying to approximately imitate the case when the RDD is recomputed. For RDD creating, driver code is not // We are trying to approximately imitate the case when the RDD is recomputed. For RDD creating, driver code is not
@@ -194,7 +188,7 @@ public class TestHBaseIndex extends HoodieClientTestHarness {
// Now tagLocation for these records, hbaseIndex should tag them correctly // Now tagLocation for these records, hbaseIndex should tag them correctly
metaClient = HoodieTableMetaClient.reload(metaClient); metaClient = HoodieTableMetaClient.reload(metaClient);
hoodieTable = HoodieTable.create(metaClient, config, hadoopConf); hoodieTable = HoodieTable.create(metaClient, config, hadoopConf);
List<HoodieRecord> taggedRecords = index.tagLocation(writeRecords, jsc, hoodieTable).collect(); List<HoodieRecord> taggedRecords = index.tagLocation(writeRecords, jsc(), hoodieTable).collect();
assertEquals(numRecords, taggedRecords.stream().filter(HoodieRecord::isCurrentLocationKnown).count()); assertEquals(numRecords, taggedRecords.stream().filter(HoodieRecord::isCurrentLocationKnown).count());
assertEquals(numRecords, taggedRecords.stream().map(record -> record.getKey().getRecordKey()).distinct().count()); assertEquals(numRecords, taggedRecords.stream().map(record -> record.getKey().getRecordKey()).distinct().count());
assertEquals(numRecords, taggedRecords.stream().filter(record -> (record.getCurrentLocation() != null assertEquals(numRecords, taggedRecords.stream().filter(record -> (record.getCurrentLocation() != null
@@ -211,7 +205,7 @@ public class TestHBaseIndex extends HoodieClientTestHarness {
final String newCommitTime = writeClient.startCommit(); final String newCommitTime = writeClient.startCommit();
final int numRecords = 10; final int numRecords = 10;
List<HoodieRecord> records = dataGen.generateInserts(newCommitTime, numRecords); List<HoodieRecord> records = dataGen.generateInserts(newCommitTime, numRecords);
JavaRDD<HoodieRecord> writeRecords = jsc.parallelize(records, 1); JavaRDD<HoodieRecord> writeRecords = jsc().parallelize(records, 1);
metaClient = HoodieTableMetaClient.reload(metaClient); metaClient = HoodieTableMetaClient.reload(metaClient);
// Insert 200 records // Insert 200 records
@@ -222,7 +216,7 @@ public class TestHBaseIndex extends HoodieClientTestHarness {
writeClient.commit(newCommitTime, writeStatues); writeClient.commit(newCommitTime, writeStatues);
HoodieTable hoodieTable = HoodieTable.create(metaClient, config, hadoopConf); HoodieTable hoodieTable = HoodieTable.create(metaClient, config, hadoopConf);
// Now tagLocation for these records, hbaseIndex should tag them // Now tagLocation for these records, hbaseIndex should tag them
List<HoodieRecord> records2 = index.tagLocation(writeRecords, jsc, hoodieTable).collect(); List<HoodieRecord> records2 = index.tagLocation(writeRecords, jsc(), hoodieTable).collect();
assertEquals(numRecords, records2.stream().filter(HoodieRecord::isCurrentLocationKnown).count()); assertEquals(numRecords, records2.stream().filter(HoodieRecord::isCurrentLocationKnown).count());
// check tagged records are tagged with correct fileIds // check tagged records are tagged with correct fileIds
@@ -238,7 +232,7 @@ public class TestHBaseIndex extends HoodieClientTestHarness {
hoodieTable = HoodieTable.create(metaClient, config, hadoopConf); hoodieTable = HoodieTable.create(metaClient, config, hadoopConf);
// Now tagLocation for these records, hbaseIndex should not tag them since it was a rolled // Now tagLocation for these records, hbaseIndex should not tag them since it was a rolled
// back commit // back commit
List<HoodieRecord> records3 = index.tagLocation(writeRecords, jsc, hoodieTable).collect(); List<HoodieRecord> records3 = index.tagLocation(writeRecords, jsc(), hoodieTable).collect();
assertEquals(0, records3.stream().filter(HoodieRecord::isCurrentLocationKnown).count()); assertEquals(0, records3.stream().filter(HoodieRecord::isCurrentLocationKnown).count());
assertEquals(0, records3.stream().filter(record -> record.getCurrentLocation() != null).count()); assertEquals(0, records3.stream().filter(record -> record.getCurrentLocation() != null).count());
} }
@@ -262,7 +256,7 @@ public class TestHBaseIndex extends HoodieClientTestHarness {
// start a commit and generate test data // start a commit and generate test data
String newCommitTime = writeClient.startCommit(); String newCommitTime = writeClient.startCommit();
List<HoodieRecord> records = dataGen.generateInserts(newCommitTime, 250); List<HoodieRecord> records = dataGen.generateInserts(newCommitTime, 250);
JavaRDD<HoodieRecord> writeRecords = jsc.parallelize(records, 1); JavaRDD<HoodieRecord> writeRecords = jsc().parallelize(records, 1);
metaClient = HoodieTableMetaClient.reload(metaClient); metaClient = HoodieTableMetaClient.reload(metaClient);
HoodieTable hoodieTable = HoodieTable.create(metaClient, config, hadoopConf); HoodieTable hoodieTable = HoodieTable.create(metaClient, config, hadoopConf);
@@ -271,7 +265,7 @@ public class TestHBaseIndex extends HoodieClientTestHarness {
assertNoWriteErrors(writeStatues.collect()); assertNoWriteErrors(writeStatues.collect());
// Now tagLocation for these records, hbaseIndex should tag them // Now tagLocation for these records, hbaseIndex should tag them
index.tagLocation(writeRecords, jsc, hoodieTable); index.tagLocation(writeRecords, jsc(), hoodieTable);
// 3 batches should be executed given batchSize = 100 and parallelism = 1 // 3 batches should be executed given batchSize = 100 and parallelism = 1
verify(table, times(3)).get((List<Get>) any()); verify(table, times(3)).get((List<Get>) any());
@@ -287,7 +281,7 @@ public class TestHBaseIndex extends HoodieClientTestHarness {
// start a commit and generate test data // start a commit and generate test data
String newCommitTime = writeClient.startCommit(); String newCommitTime = writeClient.startCommit();
List<HoodieRecord> records = dataGen.generateInserts(newCommitTime, 250); List<HoodieRecord> records = dataGen.generateInserts(newCommitTime, 250);
JavaRDD<HoodieRecord> writeRecords = jsc.parallelize(records, 1); JavaRDD<HoodieRecord> writeRecords = jsc().parallelize(records, 1);
metaClient = HoodieTableMetaClient.reload(metaClient); metaClient = HoodieTableMetaClient.reload(metaClient);
HoodieTable hoodieTable = HoodieTable.create(metaClient, config, hadoopConf); HoodieTable hoodieTable = HoodieTable.create(metaClient, config, hadoopConf);
@@ -309,7 +303,7 @@ public class TestHBaseIndex extends HoodieClientTestHarness {
// Get all the files generated // Get all the files generated
int numberOfDataFileIds = (int) writeStatues.map(status -> status.getFileId()).distinct().count(); int numberOfDataFileIds = (int) writeStatues.map(status -> status.getFileId()).distinct().count();
index.updateLocation(writeStatues, jsc, hoodieTable); index.updateLocation(writeStatues, jsc(), hoodieTable);
// 3 batches should be executed given batchSize = 100 and <=numberOfDataFileIds getting updated, // 3 batches should be executed given batchSize = 100 and <=numberOfDataFileIds getting updated,
// so each fileId ideally gets updates // so each fileId ideally gets updates
verify(table, atMost(numberOfDataFileIds)).put((List<Put>) any()); verify(table, atMost(numberOfDataFileIds)).put((List<Put>) any());
@@ -319,7 +313,7 @@ public class TestHBaseIndex extends HoodieClientTestHarness {
public void testsHBasePutAccessParallelism() { public void testsHBasePutAccessParallelism() {
HoodieWriteConfig config = getConfig(); HoodieWriteConfig config = getConfig();
HBaseIndex index = new HBaseIndex(config); HBaseIndex index = new HBaseIndex(config);
final JavaRDD<WriteStatus> writeStatusRDD = jsc.parallelize( final JavaRDD<WriteStatus> writeStatusRDD = jsc().parallelize(
Arrays.asList(getSampleWriteStatus(1, 2), getSampleWriteStatus(0, 3), getSampleWriteStatus(10, 0)), 10); Arrays.asList(getSampleWriteStatus(1, 2), getSampleWriteStatus(0, 3), getSampleWriteStatus(10, 0)), 10);
final Tuple2<Long, Integer> tuple = index.getHBasePutAccessParallelism(writeStatusRDD); final Tuple2<Long, Integer> tuple = index.getHBasePutAccessParallelism(writeStatusRDD);
final int hbasePutAccessParallelism = Integer.parseInt(tuple._2.toString()); final int hbasePutAccessParallelism = Integer.parseInt(tuple._2.toString());
@@ -334,7 +328,7 @@ public class TestHBaseIndex extends HoodieClientTestHarness {
HoodieWriteConfig config = getConfig(); HoodieWriteConfig config = getConfig();
HBaseIndex index = new HBaseIndex(config); HBaseIndex index = new HBaseIndex(config);
final JavaRDD<WriteStatus> writeStatusRDD = final JavaRDD<WriteStatus> writeStatusRDD =
jsc.parallelize(Arrays.asList(getSampleWriteStatus(0, 2), getSampleWriteStatus(0, 1)), 10); jsc().parallelize(Arrays.asList(getSampleWriteStatus(0, 2), getSampleWriteStatus(0, 1)), 10);
final Tuple2<Long, Integer> tuple = index.getHBasePutAccessParallelism(writeStatusRDD); final Tuple2<Long, Integer> tuple = index.getHBasePutAccessParallelism(writeStatusRDD);
final int hbasePutAccessParallelism = Integer.parseInt(tuple._2.toString()); final int hbasePutAccessParallelism = Integer.parseInt(tuple._2.toString());
final int hbaseNumPuts = Integer.parseInt(tuple._1.toString()); final int hbaseNumPuts = Integer.parseInt(tuple._1.toString());
@@ -348,7 +342,7 @@ public class TestHBaseIndex extends HoodieClientTestHarness {
final String newCommitTime = "001"; final String newCommitTime = "001";
final int numRecords = 10; final int numRecords = 10;
List<HoodieRecord> records = dataGen.generateInserts(newCommitTime, numRecords); List<HoodieRecord> records = dataGen.generateInserts(newCommitTime, numRecords);
JavaRDD<HoodieRecord> writeRecords = jsc.parallelize(records, 1); JavaRDD<HoodieRecord> writeRecords = jsc().parallelize(records, 1);
// Load to memory // Load to memory
HoodieWriteConfig config = getConfig(2); HoodieWriteConfig config = getConfig(2);
@@ -358,7 +352,7 @@ public class TestHBaseIndex extends HoodieClientTestHarness {
HoodieTable hoodieTable = HoodieTable.create(metaClient, config, hadoopConf); HoodieTable hoodieTable = HoodieTable.create(metaClient, config, hadoopConf);
// Test tagLocation without any entries in index // Test tagLocation without any entries in index
JavaRDD<HoodieRecord> records1 = index.tagLocation(writeRecords, jsc, hoodieTable); JavaRDD<HoodieRecord> records1 = index.tagLocation(writeRecords, jsc(), hoodieTable);
assertEquals(0, records1.filter(record -> record.isCurrentLocationKnown()).count()); assertEquals(0, records1.filter(record -> record.isCurrentLocationKnown()).count());
// Insert 200 records // Insert 200 records
writeClient.startCommitWithTime(newCommitTime); writeClient.startCommitWithTime(newCommitTime);
@@ -367,7 +361,7 @@ public class TestHBaseIndex extends HoodieClientTestHarness {
// Now tagLocation for these records, hbaseIndex should not tag them since it was a failed // Now tagLocation for these records, hbaseIndex should not tag them since it was a failed
// commit // commit
JavaRDD<HoodieRecord> records2 = index.tagLocation(writeRecords, jsc, hoodieTable); JavaRDD<HoodieRecord> records2 = index.tagLocation(writeRecords, jsc(), hoodieTable);
assertEquals(0, records2.filter(record -> record.isCurrentLocationKnown()).count()); assertEquals(0, records2.filter(record -> record.isCurrentLocationKnown()).count());
// Now commit this & update location of records inserted and validate no errors // Now commit this & update location of records inserted and validate no errors
@@ -375,7 +369,7 @@ public class TestHBaseIndex extends HoodieClientTestHarness {
// Now tagLocation for these records, hbaseIndex should tag them correctly // Now tagLocation for these records, hbaseIndex should tag them correctly
metaClient = HoodieTableMetaClient.reload(metaClient); metaClient = HoodieTableMetaClient.reload(metaClient);
hoodieTable = HoodieTable.create(metaClient, config, hadoopConf); hoodieTable = HoodieTable.create(metaClient, config, hadoopConf);
List<HoodieRecord> records3 = index.tagLocation(writeRecords, jsc, hoodieTable).collect(); List<HoodieRecord> records3 = index.tagLocation(writeRecords, jsc(), hoodieTable).collect();
assertEquals(numRecords, records3.stream().filter(record -> record.isCurrentLocationKnown()).count()); assertEquals(numRecords, records3.stream().filter(record -> record.isCurrentLocationKnown()).count());
assertEquals(numRecords, records3.stream().map(record -> record.getKey().getRecordKey()).distinct().count()); assertEquals(numRecords, records3.stream().map(record -> record.getKey().getRecordKey()).distinct().count());
assertEquals(numRecords, records3.stream().filter(record -> (record.getCurrentLocation() != null assertEquals(numRecords, records3.stream().filter(record -> (record.getCurrentLocation() != null
@@ -388,7 +382,7 @@ public class TestHBaseIndex extends HoodieClientTestHarness {
final String newCommitTime = "001"; final String newCommitTime = "001";
final int numRecords = 10; final int numRecords = 10;
List<HoodieRecord> records = dataGen.generateInserts(newCommitTime, numRecords); List<HoodieRecord> records = dataGen.generateInserts(newCommitTime, numRecords);
JavaRDD<HoodieRecord> writeRecords = jsc.parallelize(records, 1); JavaRDD<HoodieRecord> writeRecords = jsc().parallelize(records, 1);
// Load to memory // Load to memory
HoodieWriteConfig config = getConfig(); HoodieWriteConfig config = getConfig();
@@ -398,7 +392,7 @@ public class TestHBaseIndex extends HoodieClientTestHarness {
HoodieTable hoodieTable = HoodieTable.create(metaClient, config, hadoopConf); HoodieTable hoodieTable = HoodieTable.create(metaClient, config, hadoopConf);
// Test tagLocation without any entries in index // Test tagLocation without any entries in index
JavaRDD<HoodieRecord> records1 = index.tagLocation(writeRecords, jsc, hoodieTable); JavaRDD<HoodieRecord> records1 = index.tagLocation(writeRecords, jsc(), hoodieTable);
assertEquals(0, records1.filter(record -> record.isCurrentLocationKnown()).count()); assertEquals(0, records1.filter(record -> record.isCurrentLocationKnown()).count());
// Insert records // Insert records
@@ -410,7 +404,7 @@ public class TestHBaseIndex extends HoodieClientTestHarness {
// Now tagLocation for these records, hbaseIndex should tag them correctly // Now tagLocation for these records, hbaseIndex should tag them correctly
metaClient = HoodieTableMetaClient.reload(metaClient); metaClient = HoodieTableMetaClient.reload(metaClient);
hoodieTable = HoodieTable.create(metaClient, config, hadoopConf); hoodieTable = HoodieTable.create(metaClient, config, hadoopConf);
List<HoodieRecord> records2 = index.tagLocation(writeRecords, jsc, hoodieTable).collect(); List<HoodieRecord> records2 = index.tagLocation(writeRecords, jsc(), hoodieTable).collect();
assertEquals(numRecords, records2.stream().filter(record -> record.isCurrentLocationKnown()).count()); assertEquals(numRecords, records2.stream().filter(record -> record.isCurrentLocationKnown()).count());
assertEquals(numRecords, records2.stream().map(record -> record.getKey().getRecordKey()).distinct().count()); assertEquals(numRecords, records2.stream().map(record -> record.getKey().getRecordKey()).distinct().count());
assertEquals(numRecords, records2.stream().filter(record -> (record.getCurrentLocation() != null assertEquals(numRecords, records2.stream().filter(record -> (record.getCurrentLocation() != null
@@ -425,12 +419,12 @@ public class TestHBaseIndex extends HoodieClientTestHarness {
newWriteStatus.setStat(new HoodieWriteStat()); newWriteStatus.setStat(new HoodieWriteStat());
return newWriteStatus; return newWriteStatus;
}); });
JavaRDD<WriteStatus> deleteStatus = index.updateLocation(deleteWriteStatues, jsc, hoodieTable); JavaRDD<WriteStatus> deleteStatus = index.updateLocation(deleteWriteStatues, jsc(), hoodieTable);
assertEquals(deleteStatus.count(), deleteWriteStatues.count()); assertEquals(deleteStatus.count(), deleteWriteStatues.count());
assertNoWriteErrors(deleteStatus.collect()); assertNoWriteErrors(deleteStatus.collect());
// Ensure no records can be tagged // Ensure no records can be tagged
List<HoodieRecord> records3 = index.tagLocation(writeRecords, jsc, hoodieTable).collect(); List<HoodieRecord> records3 = index.tagLocation(writeRecords, jsc(), hoodieTable).collect();
assertEquals(0, records3.stream().filter(record -> record.isCurrentLocationKnown()).count()); assertEquals(0, records3.stream().filter(record -> record.isCurrentLocationKnown()).count());
assertEquals(numRecords, records3.stream().map(record -> record.getKey().getRecordKey()).distinct().count()); assertEquals(numRecords, records3.stream().map(record -> record.getKey().getRecordKey()).distinct().count());
assertEquals(0, records3.stream().filter(record -> (record.getCurrentLocation() != null assertEquals(0, records3.stream().filter(record -> (record.getCurrentLocation() != null
@@ -456,7 +450,7 @@ public class TestHBaseIndex extends HoodieClientTestHarness {
} }
private HoodieWriteConfig.Builder getConfigBuilder(int hbaseIndexBatchSize) { private HoodieWriteConfig.Builder getConfigBuilder(int hbaseIndexBatchSize) {
return HoodieWriteConfig.newBuilder().withPath(basePath).withSchema(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA) return HoodieWriteConfig.newBuilder().withPath(basePath()).withSchema(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA)
.withParallelism(1, 1) .withParallelism(1, 1)
.withCompactionConfig(HoodieCompactionConfig.newBuilder().compactionSmallFileSize(1024 * 1024) .withCompactionConfig(HoodieCompactionConfig.newBuilder().compactionSmallFileSize(1024 * 1024)
.withInlineCompaction(false).build()) .withInlineCompaction(false).build())

View File

@@ -19,14 +19,26 @@
package org.apache.hudi.testutils; package org.apache.hudi.testutils;
import org.apache.hudi.client.HoodieReadClient;
import org.apache.hudi.client.HoodieWriteClient; import org.apache.hudi.client.HoodieWriteClient;
import org.apache.hudi.common.model.HoodieAvroPayload;
import org.apache.hudi.common.table.HoodieTableConfig;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.testutils.minicluster.HdfsTestService; import org.apache.hudi.common.testutils.minicluster.HdfsTestService;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.index.HoodieIndex;
import org.apache.hudi.testutils.providers.DFSProvider;
import org.apache.hudi.testutils.providers.HoodieMetaClientProvider;
import org.apache.hudi.testutils.providers.HoodieWriteClientProvider;
import org.apache.hudi.testutils.providers.SparkProvider;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.DistributedFileSystem; import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.SQLContext; import org.apache.spark.sql.SQLContext;
import org.apache.spark.sql.SparkSession; import org.apache.spark.sql.SparkSession;
@@ -35,8 +47,13 @@ import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.io.TempDir; import org.junit.jupiter.api.io.TempDir;
import java.io.IOException; import java.io.IOException;
import java.util.Properties;
public class FunctionalTestHarness implements SparkProvider, DFSProvider { import static org.apache.hudi.common.model.HoodieFileFormat.PARQUET;
import static org.apache.hudi.common.model.HoodieTableType.COPY_ON_WRITE;
import static org.apache.hudi.common.testutils.HoodieTestUtils.RAW_TRIPS_TEST_NAME;
public class FunctionalTestHarness implements SparkProvider, DFSProvider, HoodieMetaClientProvider, HoodieWriteClientProvider {
private static transient SparkSession spark; private static transient SparkSession spark;
private static transient SQLContext sqlContext; private static transient SQLContext sqlContext;
@@ -53,6 +70,10 @@ public class FunctionalTestHarness implements SparkProvider, DFSProvider {
@TempDir @TempDir
protected java.nio.file.Path tempDir; protected java.nio.file.Path tempDir;
public String basePath() {
return tempDir.toAbsolutePath().toString();
}
@Override @Override
public SparkSession spark() { public SparkSession spark() {
return spark; return spark;
@@ -83,15 +104,32 @@ public class FunctionalTestHarness implements SparkProvider, DFSProvider {
return dfs.getWorkingDirectory(); return dfs.getWorkingDirectory();
} }
public HoodieTableMetaClient getHoodieMetaClient(Configuration hadoopConf, String basePath) throws IOException {
return getHoodieMetaClient(hadoopConf, basePath, new Properties());
}
@Override
public HoodieTableMetaClient getHoodieMetaClient(Configuration hadoopConf, String basePath, Properties props) throws IOException {
props.putIfAbsent(HoodieTableConfig.HOODIE_BASE_FILE_FORMAT_PROP_NAME, PARQUET.toString());
props.putIfAbsent(HoodieTableConfig.HOODIE_TABLE_NAME_PROP_NAME, RAW_TRIPS_TEST_NAME);
props.putIfAbsent(HoodieTableConfig.HOODIE_TABLE_TYPE_PROP_NAME, COPY_ON_WRITE.name());
props.putIfAbsent(HoodieTableConfig.HOODIE_PAYLOAD_CLASS_PROP_NAME, HoodieAvroPayload.class.getName());
return HoodieTableMetaClient.initTableAndGetMetaClient(hadoopConf, basePath, props);
}
@Override
public HoodieWriteClient getHoodieWriteClient(HoodieWriteConfig cfg) throws IOException {
return new HoodieWriteClient(jsc, cfg, false, HoodieIndex.createIndex(cfg));
}
@BeforeEach @BeforeEach
public synchronized void runBeforeEach() throws Exception { public synchronized void runBeforeEach() throws Exception {
initialized = spark != null && hdfsTestService != null; initialized = spark != null && hdfsTestService != null;
if (!initialized) { if (!initialized) {
FileSystem.closeAll(); SparkConf sparkConf = conf();
HoodieWriteClient.registerClasses(sparkConf);
spark = SparkSession.builder() HoodieReadClient.addHoodieSupport(sparkConf);
.config(HoodieWriteClient.registerClasses(conf())) spark = SparkSession.builder().config(sparkConf).getOrCreate();
.getOrCreate();
sqlContext = spark.sqlContext(); sqlContext = spark.sqlContext();
jsc = new JavaSparkContext(spark.sparkContext()); jsc = new JavaSparkContext(spark.sparkContext());

View File

@@ -17,7 +17,7 @@
* under the License. * under the License.
*/ */
package org.apache.hudi.testutils; package org.apache.hudi.testutils.providers;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.DistributedFileSystem; import org.apache.hadoop.hdfs.DistributedFileSystem;

View File

@@ -0,0 +1,40 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.hudi.testutils.providers;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import java.io.IOException;
import java.util.Properties;
public interface HoodieMetaClientProvider {
HoodieTableMetaClient getHoodieMetaClient(Configuration hadoopConf, String basePath, Properties props) throws IOException;
default HoodieTableFileSystemView getHoodieTableFileSystemView(
HoodieTableMetaClient metaClient, HoodieTimeline visibleActiveTimeline, FileStatus[] fileStatuses) {
return new HoodieTableFileSystemView(metaClient, visibleActiveTimeline, fileStatuses);
}
}

View File

@@ -0,0 +1,30 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.hudi.testutils.providers;
import org.apache.hudi.client.HoodieWriteClient;
import org.apache.hudi.config.HoodieWriteConfig;
import java.io.IOException;
public interface HoodieWriteClientProvider {
HoodieWriteClient getHoodieWriteClient(HoodieWriteConfig cfg) throws IOException;
}

View File

@@ -17,7 +17,7 @@
* under the License. * under the License.
*/ */
package org.apache.hudi.testutils; package org.apache.hudi.testutils.providers;
import org.apache.spark.SparkConf; import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.JavaSparkContext;