1
0

[HUDI-996] Add functional test suite for hudi-utilities (#1746)

- Share resources for functional tests
- Add suite for functional test classes from hudi-utilities
This commit is contained in:
Raymond Xu
2020-07-05 16:44:31 -07:00
committed by GitHub
parent 574dcf920c
commit 3b9a30528b
17 changed files with 622 additions and 399 deletions

View File

@@ -0,0 +1,34 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.hudi.testutils;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.MiniDFSCluster;
public interface DFSProvider {
MiniDFSCluster dfsCluster();
DistributedFileSystem dfs();
Path dfsBasePath();
}

View File

@@ -0,0 +1,122 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.hudi.testutils;
import org.apache.hudi.client.HoodieWriteClient;
import org.apache.hudi.common.testutils.minicluster.HdfsTestService;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.sql.SparkSession;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.io.TempDir;
import java.io.IOException;
public class FunctionalTestHarness implements SparkProvider, DFSProvider {
private static transient SparkSession spark;
private static transient SQLContext sqlContext;
private static transient JavaSparkContext jsc;
private static transient HdfsTestService hdfsTestService;
private static transient MiniDFSCluster dfsCluster;
private static transient DistributedFileSystem dfs;
/**
* An indicator of the initialization status.
*/
protected boolean initialized = false;
@TempDir
protected java.nio.file.Path tempDir;
@Override
public SparkSession spark() {
return spark;
}
@Override
public SQLContext sqlContext() {
return sqlContext;
}
@Override
public JavaSparkContext jsc() {
return jsc;
}
@Override
public MiniDFSCluster dfsCluster() {
return dfsCluster;
}
@Override
public DistributedFileSystem dfs() {
return dfs;
}
@Override
public Path dfsBasePath() {
return dfs.getWorkingDirectory();
}
@BeforeEach
public synchronized void runBeforeEach() throws Exception {
initialized = spark != null && hdfsTestService != null;
if (!initialized) {
FileSystem.closeAll();
spark = SparkSession.builder()
.config(HoodieWriteClient.registerClasses(conf()))
.getOrCreate();
sqlContext = spark.sqlContext();
jsc = new JavaSparkContext(spark.sparkContext());
hdfsTestService = new HdfsTestService();
dfsCluster = hdfsTestService.start(true);
dfs = dfsCluster.getFileSystem();
dfs.mkdirs(dfs.getWorkingDirectory());
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
hdfsTestService.stop();
hdfsTestService = null;
spark.stop();
spark = null;
}));
}
}
@AfterAll
public static synchronized void cleanUpAfterAll() throws IOException {
Path workDir = dfs.getWorkingDirectory();
FileSystem fs = workDir.getFileSystem(hdfsTestService.getHadoopConf());
FileStatus[] fileStatuses = dfs.listStatus(workDir);
for (FileStatus f : fileStatuses) {
fs.delete(f.getPath(), true);
}
}
}

View File

@@ -0,0 +1,55 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.hudi.testutils;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.sql.SparkSession;
import java.util.Collections;
import java.util.Map;
public interface SparkProvider {
SparkSession spark();
SQLContext sqlContext();
JavaSparkContext jsc();
default SparkConf conf(Map<String, String> overwritingConfigs) {
SparkConf sparkConf = new SparkConf();
sparkConf.set("spark.app.name", getClass().getName());
sparkConf.set("spark.master", "local[*]");
sparkConf.set("spark.driver.maxResultSize", "2g");
sparkConf.set("spark.hadoop.mapred.output.compress", "true");
sparkConf.set("spark.hadoop.mapred.output.compression.codec", "true");
sparkConf.set("spark.hadoop.mapred.output.compression.codec", "org.apache.hadoop.io.compress.GzipCodec");
sparkConf.set("spark.hadoop.mapred.output.compression.type", "BLOCK");
sparkConf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
overwritingConfigs.forEach(sparkConf::set);
return sparkConf;
}
default SparkConf conf() {
return conf(Collections.emptyMap());
}
}

View File

@@ -45,7 +45,7 @@ public class HdfsTestService {
* Configuration settings.
*/
private Configuration hadoopConf;
private String workDir;
private final String workDir;
/**
* Embedded HDFS cluster.
@@ -53,7 +53,7 @@ public class HdfsTestService {
private MiniDFSCluster miniDfsCluster;
public HdfsTestService() throws IOException {
workDir = Files.createTempDirectory("temp").toFile().getAbsolutePath();
workDir = Files.createTempDirectory("temp").toAbsolutePath().toString();
}
public Configuration getHadoopConf() {

View File

@@ -391,5 +391,23 @@
<artifactId>mockito-junit-jupiter</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.junit.platform</groupId>
<artifactId>junit-platform-runner</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.junit.platform</groupId>
<artifactId>junit-platform-suite-api</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.junit.platform</groupId>
<artifactId>junit-platform-commons</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
</project>

View File

@@ -0,0 +1,57 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.hudi.utilities;
import org.apache.hudi.utilities.HoodieSnapshotExporter.OutputFormatValidator;
import com.beust.jcommander.ParameterException;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.NullSource;
import org.junit.jupiter.params.provider.ValueSource;
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
import static org.junit.jupiter.api.Assertions.assertThrows;
public class TestHoodieSnapshotExporter {
@ParameterizedTest
@ValueSource(strings = {"json", "parquet", "hudi"})
public void testValidateOutputFormatWithValidFormat(String format) {
assertDoesNotThrow(() -> {
new OutputFormatValidator().validate(null, format);
});
}
@ParameterizedTest
@ValueSource(strings = {"", "JSON"})
public void testValidateOutputFormatWithInvalidFormat(String format) {
assertThrows(ParameterException.class, () -> {
new OutputFormatValidator().validate(null, format);
});
}
@ParameterizedTest
@NullSource
public void testValidateOutputFormatWithNullFormat(String format) {
assertThrows(ParameterException.class, () -> {
new OutputFormatValidator().validate(null, format);
});
}
}

View File

@@ -23,33 +23,23 @@ import org.apache.hudi.common.testutils.HoodieCommonTestHarness;
import org.apache.hudi.common.testutils.HoodieTestUtils;
import org.apache.hudi.exception.HoodieException;
import org.apache.hadoop.conf.Configuration;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import java.io.File;
import java.nio.file.Files;
import java.nio.file.Path;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
public class TestKafkaConnectHdfsProvider extends HoodieCommonTestHarness {
private String topicPath = null;
private Configuration hadoopConf = null;
@BeforeEach
public void init() {
// Prepare directories
initPath();
hadoopConf = HoodieTestUtils.getDefaultHadoopConf();
}
@Test
public void testValidKafkaConnectPath() throws Exception {
// a standard format(time based partition) of the files managed by kafka connect is:
// topic/year=xxx/month=xxx/day=xxx/topic+partition+lowerOffset+upperOffset.file
topicPath = basePath + "/topic1";
new File(topicPath).mkdirs();
Path topicPath = tempDir.resolve("topic1");
Files.createDirectories(topicPath);
// create regular kafka connect hdfs dirs
new File(topicPath + "/year=2016/month=05/day=01/").mkdirs();
new File(topicPath + "/year=2016/month=05/day=02/").mkdirs();
@@ -70,16 +60,16 @@ public class TestKafkaConnectHdfsProvider extends HoodieCommonTestHarness {
new File(topicPath + "/year=2016/month=05/day=02/"
+ "random_snappy_2.parquet").createNewFile();
final TypedProperties props = new TypedProperties();
props.put("hoodie.deltastreamer.checkpoint.provider.path", topicPath);
props.put("hoodie.deltastreamer.checkpoint.provider.path", topicPath.toString());
final InitialCheckPointProvider provider = new KafkaConnectHdfsProvider(props);
provider.init(hadoopConf);
provider.init(HoodieTestUtils.getDefaultHadoopConf());
assertEquals("topic1,0:300,1:200", provider.getCheckpoint());
}
@Test
public void testMissingPartition() throws Exception {
topicPath = basePath + "/topic2";
new File(topicPath).mkdirs();
Path topicPath = tempDir.resolve("topic2");
Files.createDirectories(topicPath);
// create regular kafka connect hdfs dirs
new File(topicPath + "/year=2016/month=05/day=01/").mkdirs();
new File(topicPath + "/year=2016/month=05/day=02/").mkdirs();
@@ -91,9 +81,9 @@ public class TestKafkaConnectHdfsProvider extends HoodieCommonTestHarness {
new File(topicPath + "/year=2016/month=05/day=02/"
+ "topic1+0+201+300.parquet").createNewFile();
final TypedProperties props = new TypedProperties();
props.put("hoodie.deltastreamer.checkpoint.provider.path", topicPath);
props.put("hoodie.deltastreamer.checkpoint.provider.path", topicPath.toString());
final InitialCheckPointProvider provider = new KafkaConnectHdfsProvider(props);
provider.init(hadoopConf);
provider.init(HoodieTestUtils.getDefaultHadoopConf());
assertThrows(HoodieException.class, provider::getCheckpoint);
}
}

View File

@@ -19,18 +19,15 @@
package org.apache.hudi.utilities.functional;
import org.apache.hudi.payload.AWSDmsAvroPayload;
import org.apache.hudi.utilities.UtilHelpers;
import org.apache.hudi.testutils.FunctionalTestHarness;
import org.apache.hudi.utilities.transform.AWSDmsTransformer;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;
import java.io.IOException;
@@ -40,23 +37,8 @@ import java.util.Arrays;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;
public class TestAWSDatabaseMigrationServiceSource {
private static JavaSparkContext jsc;
private static SparkSession spark;
@BeforeAll
public static void setupTest() {
jsc = UtilHelpers.buildSparkContext("aws-dms-test", "local[2]");
spark = SparkSession.builder().config(jsc.getConf()).getOrCreate();
}
@AfterAll
public static void tearDownTest() {
if (jsc != null) {
jsc.stop();
}
}
@Tag("functional")
public class TestAWSDatabaseMigrationServiceSource extends FunctionalTestHarness {
@Test
public void testPayload() throws IOException {
@@ -83,6 +65,7 @@ public class TestAWSDatabaseMigrationServiceSource {
}
static class Record implements Serializable {
String id;
long ts;
@@ -95,11 +78,11 @@ public class TestAWSDatabaseMigrationServiceSource {
@Test
public void testTransformer() {
AWSDmsTransformer transformer = new AWSDmsTransformer();
Dataset<Row> inputFrame = spark.createDataFrame(Arrays.asList(
Dataset<Row> inputFrame = spark().createDataFrame(Arrays.asList(
new Record("1", 3433L),
new Record("2", 3433L)), Record.class);
Dataset<Row> outputFrame = transformer.apply(jsc, spark, inputFrame, null);
Dataset<Row> outputFrame = transformer.apply(jsc(), spark(), inputFrame, null);
assertTrue(Arrays.stream(outputFrame.schema().fields())
.map(f -> f.name()).anyMatch(n -> n.equals(AWSDmsAvroPayload.OP_FIELD)));
assertTrue(outputFrame.select(AWSDmsAvroPayload.OP_FIELD).collectAsList().stream()

View File

@@ -0,0 +1,69 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.hudi.utilities.functional;
import org.apache.hudi.testutils.FunctionalTestHarness;
import org.apache.hudi.utilities.transform.ChainedTransformer;
import org.apache.hudi.utilities.transform.Transformer;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;
import java.util.Arrays;
import java.util.List;
import static org.apache.spark.sql.types.DataTypes.IntegerType;
import static org.apache.spark.sql.types.DataTypes.StringType;
import static org.apache.spark.sql.types.DataTypes.createStructField;
import static org.junit.jupiter.api.Assertions.assertArrayEquals;
import static org.junit.jupiter.api.Assertions.assertEquals;
@Tag("functional")
public class TestChainedTransformer extends FunctionalTestHarness {
@Test
public void testChainedTransformation() {
StructType schema = DataTypes.createStructType(
new StructField[] {
createStructField("foo", StringType, false)
});
Row r1 = RowFactory.create("100");
Row r2 = RowFactory.create("200");
Dataset<Row> original = spark().sqlContext().createDataFrame(Arrays.asList(r1, r2), schema);
Transformer t1 = (jsc, sparkSession, dataset, properties) -> dataset.withColumnRenamed("foo", "bar");
Transformer t2 = (jsc, sparkSession, dataset, properties) -> dataset.withColumn("bar", dataset.col("bar").cast(IntegerType));
ChainedTransformer transformer = new ChainedTransformer(Arrays.asList(t1, t2));
Dataset<Row> transformed = transformer.apply(jsc(), spark(), original, null);
assertEquals(2, transformed.count());
assertArrayEquals(new String[] {"bar"}, transformed.columns());
List<Row> rows = transformed.collectAsList();
assertEquals(100, rows.get(0).getInt(0));
assertEquals(200, rows.get(1).getInt(0));
}
}

View File

@@ -18,12 +18,10 @@
package org.apache.hudi.utilities.functional;
import org.apache.hudi.client.HoodieReadClient;
import org.apache.hudi.client.HoodieWriteClient;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.testutils.HoodieTestUtils;
import org.apache.hudi.common.testutils.minicluster.HdfsTestService;
import org.apache.hudi.testutils.FunctionalTestHarness;
import org.apache.hudi.testutils.HoodieClientTestUtils;
import org.apache.hudi.testutils.HoodieTestDataGenerator;
import org.apache.hudi.utilities.HDFSParquetImporter;
@@ -33,19 +31,14 @@ import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.parquet.avro.AvroParquetWriter;
import org.apache.parquet.hadoop.ParquetWriter;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SQLContext;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;
import java.io.IOException;
@@ -64,30 +57,8 @@ import java.util.stream.Collectors;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
public class TestHDFSParquetImporter implements Serializable {
private static String dfsBasePath;
private static HdfsTestService hdfsTestService;
private static MiniDFSCluster dfsCluster;
private static DistributedFileSystem dfs;
@BeforeAll
public static void initClass() throws Exception {
hdfsTestService = new HdfsTestService();
dfsCluster = hdfsTestService.start(true);
// Create a temp folder as the base path
dfs = dfsCluster.getFileSystem();
dfsBasePath = dfs.getWorkingDirectory().toString();
dfs.mkdirs(new Path(dfsBasePath));
}
@AfterAll
public static void cleanupClass() {
if (hdfsTestService != null) {
hdfsTestService.stop();
}
}
@Tag("functional")
public class TestHDFSParquetImporter extends FunctionalTestHarness implements Serializable {
private String basePath;
private transient Path hoodieFolder;
@@ -96,7 +67,7 @@ public class TestHDFSParquetImporter implements Serializable {
@BeforeEach
public void init() throws IOException, ParseException {
basePath = (new Path(dfsBasePath, Thread.currentThread().getStackTrace()[1].getMethodName())).toString();
basePath = (new Path(dfsBasePath(), Thread.currentThread().getStackTrace()[1].getMethodName())).toString();
// Hoodie root folder.
hoodieFolder = new Path(basePath, "testTarget");
@@ -108,7 +79,7 @@ public class TestHDFSParquetImporter implements Serializable {
@AfterEach
public void clean() throws IOException {
dfs.delete(new Path(basePath), true);
dfs().delete(new Path(basePath), true);
}
/**
@@ -116,57 +87,54 @@ public class TestHDFSParquetImporter implements Serializable {
*/
@Test
public void testImportWithRetries() throws Exception {
try (JavaSparkContext jsc = getJavaSparkContext()) {
// Create schema file.
String schemaFile = new Path(basePath, "file.schema").toString();
// Create schema file.
String schemaFile = new Path(basePath, "file.schema").toString();
HDFSParquetImporter.Config cfg = getHDFSParquetImporterConfig(srcFolder.toString(), hoodieFolder.toString(),
"testTable", "COPY_ON_WRITE", "_row_key", "timestamp", 1, schemaFile);
AtomicInteger retry = new AtomicInteger(3);
AtomicInteger fileCreated = new AtomicInteger(0);
HDFSParquetImporter dataImporter = new HDFSParquetImporter(cfg) {
@Override
protected int dataImport(JavaSparkContext jsc) throws IOException {
int ret = super.dataImport(jsc);
if (retry.decrementAndGet() == 0) {
fileCreated.incrementAndGet();
createSchemaFile(schemaFile);
}
return ret;
HDFSParquetImporter.Config cfg = getHDFSParquetImporterConfig(srcFolder.toString(), hoodieFolder.toString(),
"testTable", "COPY_ON_WRITE", "_row_key", "timestamp", 1, schemaFile);
AtomicInteger retry = new AtomicInteger(3);
AtomicInteger fileCreated = new AtomicInteger(0);
HDFSParquetImporter dataImporter = new HDFSParquetImporter(cfg) {
@Override
protected int dataImport(JavaSparkContext jsc) throws IOException {
int ret = super.dataImport(jsc);
if (retry.decrementAndGet() == 0) {
fileCreated.incrementAndGet();
createSchemaFile(schemaFile);
}
};
// Schema file is not created so this operation should fail.
assertEquals(0, dataImporter.dataImport(jsc, retry.get()));
assertEquals(-1, retry.get());
assertEquals(1, fileCreated.get());
// Check if
// 1. .commit file is present
// 2. number of records in each partition == 24
// 3. total number of partitions == 4;
boolean isCommitFilePresent = false;
Map<String, Long> recordCounts = new HashMap<String, Long>();
RemoteIterator<LocatedFileStatus> hoodieFiles = dfs.listFiles(hoodieFolder, true);
while (hoodieFiles.hasNext()) {
LocatedFileStatus f = hoodieFiles.next();
isCommitFilePresent = isCommitFilePresent || f.getPath().toString().endsWith(HoodieTimeline.COMMIT_EXTENSION);
return ret;
}
};
// Schema file is not created so this operation should fail.
assertEquals(0, dataImporter.dataImport(jsc(), retry.get()));
assertEquals(-1, retry.get());
assertEquals(1, fileCreated.get());
if (f.getPath().toString().endsWith("parquet")) {
SQLContext sc = new SQLContext(jsc);
String partitionPath = f.getPath().getParent().toString();
long count = sc.read().parquet(f.getPath().toString()).count();
if (!recordCounts.containsKey(partitionPath)) {
recordCounts.put(partitionPath, 0L);
}
recordCounts.put(partitionPath, recordCounts.get(partitionPath) + count);
// Check if
// 1. .commit file is present
// 2. number of records in each partition == 24
// 3. total number of partitions == 4;
boolean isCommitFilePresent = false;
Map<String, Long> recordCounts = new HashMap<String, Long>();
RemoteIterator<LocatedFileStatus> hoodieFiles = dfs().listFiles(hoodieFolder, true);
while (hoodieFiles.hasNext()) {
LocatedFileStatus f = hoodieFiles.next();
isCommitFilePresent = isCommitFilePresent || f.getPath().toString().endsWith(HoodieTimeline.COMMIT_EXTENSION);
if (f.getPath().toString().endsWith("parquet")) {
String partitionPath = f.getPath().getParent().toString();
long count = sqlContext().read().parquet(f.getPath().toString()).count();
if (!recordCounts.containsKey(partitionPath)) {
recordCounts.put(partitionPath, 0L);
}
recordCounts.put(partitionPath, recordCounts.get(partitionPath) + count);
}
assertTrue(isCommitFilePresent, "commit file is missing");
assertEquals(4, recordCounts.size(), "partition is missing");
for (Entry<String, Long> e : recordCounts.entrySet()) {
assertEquals(24, e.getValue().longValue(), "missing records");
}
}
assertTrue(isCommitFilePresent, "commit file is missing");
assertEquals(4, recordCounts.size(), "partition is missing");
for (Entry<String, Long> e : recordCounts.entrySet()) {
assertEquals(24, e.getValue().longValue(), "missing records");
}
}
@@ -187,30 +155,27 @@ public class TestHDFSParquetImporter implements Serializable {
*/
@Test
public void testImportWithInsert() throws IOException, ParseException {
try (JavaSparkContext jsc = getJavaSparkContext()) {
insert(jsc);
SQLContext sqlContext = new SQLContext(jsc);
Dataset<Row> ds = HoodieClientTestUtils.read(jsc, basePath + "/testTarget", sqlContext, dfs, basePath + "/testTarget/*/*/*/*");
insert(jsc());
Dataset<Row> ds = HoodieClientTestUtils.read(jsc(), basePath + "/testTarget", sqlContext(), dfs(), basePath + "/testTarget/*/*/*/*");
List<Row> readData = ds.select("timestamp", "_row_key", "rider", "driver", "begin_lat", "begin_lon", "end_lat", "end_lon").collectAsList();
List<HoodieTripModel> result = readData.stream().map(row ->
new HoodieTripModel(row.getDouble(0), row.getString(1), row.getString(2), row.getString(3), row.getDouble(4),
row.getDouble(5), row.getDouble(6), row.getDouble(7)))
.collect(Collectors.toList());
List<Row> readData = ds.select("timestamp", "_row_key", "rider", "driver", "begin_lat", "begin_lon", "end_lat", "end_lon").collectAsList();
List<HoodieTripModel> result = readData.stream().map(row ->
new HoodieTripModel(row.getDouble(0), row.getString(1), row.getString(2), row.getString(3), row.getDouble(4),
row.getDouble(5), row.getDouble(6), row.getDouble(7)))
.collect(Collectors.toList());
List<HoodieTripModel> expected = insertData.stream().map(g ->
new HoodieTripModel(Double.parseDouble(g.get("timestamp").toString()),
g.get("_row_key").toString(),
g.get("rider").toString(),
g.get("driver").toString(),
Double.parseDouble(g.get("begin_lat").toString()),
Double.parseDouble(g.get("begin_lon").toString()),
Double.parseDouble(g.get("end_lat").toString()),
Double.parseDouble(g.get("end_lon").toString())))
.collect(Collectors.toList());
List<HoodieTripModel> expected = insertData.stream().map(g ->
new HoodieTripModel(Double.parseDouble(g.get("timestamp").toString()),
g.get("_row_key").toString(),
g.get("rider").toString(),
g.get("driver").toString(),
Double.parseDouble(g.get("begin_lat").toString()),
Double.parseDouble(g.get("begin_lon").toString()),
Double.parseDouble(g.get("end_lat").toString()),
Double.parseDouble(g.get("end_lon").toString())))
.collect(Collectors.toList());
assertTrue(result.containsAll(expected) && expected.containsAll(result) && result.size() == expected.size());
}
assertTrue(result.containsAll(expected) && expected.containsAll(result) && result.size() == expected.size());
}
/**
@@ -218,50 +183,47 @@ public class TestHDFSParquetImporter implements Serializable {
*/
@Test
public void testImportWithUpsert() throws IOException, ParseException {
try (JavaSparkContext jsc = getJavaSparkContext()) {
insert(jsc);
insert(jsc());
// Create schema file.
String schemaFile = new Path(basePath, "file.schema").toString();
// Create schema file.
String schemaFile = new Path(basePath, "file.schema").toString();
Path upsertFolder = new Path(basePath, "testUpsertSrc");
List<GenericRecord> upsertData = createUpsertRecords(upsertFolder);
Path upsertFolder = new Path(basePath, "testUpsertSrc");
List<GenericRecord> upsertData = createUpsertRecords(upsertFolder);
HDFSParquetImporter.Config cfg = getHDFSParquetImporterConfig(upsertFolder.toString(), hoodieFolder.toString(),
"testTable", "COPY_ON_WRITE", "_row_key", "timestamp", 1, schemaFile);
cfg.command = "upsert";
HDFSParquetImporter dataImporter = new HDFSParquetImporter(cfg);
HDFSParquetImporter.Config cfg = getHDFSParquetImporterConfig(upsertFolder.toString(), hoodieFolder.toString(),
"testTable", "COPY_ON_WRITE", "_row_key", "timestamp", 1, schemaFile);
cfg.command = "upsert";
HDFSParquetImporter dataImporter = new HDFSParquetImporter(cfg);
dataImporter.dataImport(jsc, 0);
dataImporter.dataImport(jsc(), 0);
// construct result, remove top 10 and add upsert data.
List<GenericRecord> expectData = insertData.subList(11, 96);
expectData.addAll(upsertData);
// construct result, remove top 10 and add upsert data.
List<GenericRecord> expectData = insertData.subList(11, 96);
expectData.addAll(upsertData);
// read latest data
SQLContext sqlContext = new SQLContext(jsc);
Dataset<Row> ds = HoodieClientTestUtils.read(jsc, basePath + "/testTarget", sqlContext, dfs, basePath + "/testTarget/*/*/*/*");
// read latest data
Dataset<Row> ds = HoodieClientTestUtils.read(jsc(), basePath + "/testTarget", sqlContext(), dfs(), basePath + "/testTarget/*/*/*/*");
List<Row> readData = ds.select("timestamp", "_row_key", "rider", "driver", "begin_lat", "begin_lon", "end_lat", "end_lon").collectAsList();
List<HoodieTripModel> result = readData.stream().map(row ->
new HoodieTripModel(row.getDouble(0), row.getString(1), row.getString(2), row.getString(3), row.getDouble(4),
row.getDouble(5), row.getDouble(6), row.getDouble(7)))
.collect(Collectors.toList());
List<Row> readData = ds.select("timestamp", "_row_key", "rider", "driver", "begin_lat", "begin_lon", "end_lat", "end_lon").collectAsList();
List<HoodieTripModel> result = readData.stream().map(row ->
new HoodieTripModel(row.getDouble(0), row.getString(1), row.getString(2), row.getString(3), row.getDouble(4),
row.getDouble(5), row.getDouble(6), row.getDouble(7)))
.collect(Collectors.toList());
// get expected result.
List<HoodieTripModel> expected = expectData.stream().map(g ->
new HoodieTripModel(Double.parseDouble(g.get("timestamp").toString()),
g.get("_row_key").toString(),
g.get("rider").toString(),
g.get("driver").toString(),
Double.parseDouble(g.get("begin_lat").toString()),
Double.parseDouble(g.get("begin_lon").toString()),
Double.parseDouble(g.get("end_lat").toString()),
Double.parseDouble(g.get("end_lon").toString())))
.collect(Collectors.toList());
// get expected result.
List<HoodieTripModel> expected = expectData.stream().map(g ->
new HoodieTripModel(Double.parseDouble(g.get("timestamp").toString()),
g.get("_row_key").toString(),
g.get("rider").toString(),
g.get("driver").toString(),
Double.parseDouble(g.get("begin_lat").toString()),
Double.parseDouble(g.get("begin_lon").toString()),
Double.parseDouble(g.get("end_lat").toString()),
Double.parseDouble(g.get("end_lon").toString())))
.collect(Collectors.toList());
assertTrue(result.containsAll(expected) && expected.containsAll(result) && result.size() == expected.size());
}
assertTrue(result.containsAll(expected) && expected.containsAll(result) && result.size() == expected.size());
}
public List<GenericRecord> createInsertRecords(Path srcFolder) throws ParseException, IOException {
@@ -305,7 +267,7 @@ public class TestHDFSParquetImporter implements Serializable {
}
private void createSchemaFile(String schemaFile) throws IOException {
FSDataOutputStream schemaFileOS = dfs.create(new Path(schemaFile));
FSDataOutputStream schemaFileOS = dfs().create(new Path(schemaFile));
schemaFileOS.write(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA.getBytes());
schemaFileOS.close();
}
@@ -315,22 +277,19 @@ public class TestHDFSParquetImporter implements Serializable {
*/
@Test
public void testSchemaFile() throws Exception {
try (JavaSparkContext jsc = getJavaSparkContext()) {
// Hoodie root folder
Path hoodieFolder = new Path(basePath, "testTarget");
Path srcFolder = new Path(basePath.toString(), "srcTest");
Path schemaFile = new Path(basePath.toString(), "missingFile.schema");
HDFSParquetImporter.Config cfg = getHDFSParquetImporterConfig(srcFolder.toString(), hoodieFolder.toString(),
"testTable", "COPY_ON_WRITE", "_row_key", "timestamp", 1, schemaFile.toString());
HDFSParquetImporter dataImporter = new HDFSParquetImporter(cfg);
// Should fail - return : -1.
assertEquals(-1, dataImporter.dataImport(jsc, 0));
// Hoodie root folder
Path hoodieFolder = new Path(basePath, "testTarget");
Path srcFolder = new Path(basePath.toString(), "srcTest");
Path schemaFile = new Path(basePath.toString(), "missingFile.schema");
HDFSParquetImporter.Config cfg = getHDFSParquetImporterConfig(srcFolder.toString(), hoodieFolder.toString(),
"testTable", "COPY_ON_WRITE", "_row_key", "timestamp", 1, schemaFile.toString());
HDFSParquetImporter dataImporter = new HDFSParquetImporter(cfg);
// Should fail - return : -1.
assertEquals(-1, dataImporter.dataImport(jsc(), 0));
dfs.create(schemaFile).write("Random invalid schema data".getBytes());
// Should fail - return : -1.
assertEquals(-1, dataImporter.dataImport(jsc, 0));
}
dfs().create(schemaFile).write("Random invalid schema data".getBytes());
// Should fail - return : -1.
assertEquals(-1, dataImporter.dataImport(jsc(), 0));
}
/**
@@ -338,27 +297,24 @@ public class TestHDFSParquetImporter implements Serializable {
*/
@Test
public void testRowAndPartitionKey() throws Exception {
try (JavaSparkContext jsc = getJavaSparkContext()) {
// Create schema file.
Path schemaFile = new Path(basePath.toString(), "missingFile.schema");
createSchemaFile(schemaFile.toString());
// Create schema file.
Path schemaFile = new Path(basePath.toString(), "missingFile.schema");
createSchemaFile(schemaFile.toString());
HDFSParquetImporter dataImporter;
HDFSParquetImporter.Config cfg;
HDFSParquetImporter dataImporter;
HDFSParquetImporter.Config cfg;
// Check for invalid row key.
cfg = getHDFSParquetImporterConfig(srcFolder.toString(), hoodieFolder.toString(), "testTable", "COPY_ON_WRITE",
"invalidRowKey", "timestamp", 1, schemaFile.toString());
dataImporter = new HDFSParquetImporter(cfg);
assertEquals(-1, dataImporter.dataImport(jsc, 0));
// Check for invalid row key.
cfg = getHDFSParquetImporterConfig(srcFolder.toString(), hoodieFolder.toString(), "testTable", "COPY_ON_WRITE",
"invalidRowKey", "timestamp", 1, schemaFile.toString());
dataImporter = new HDFSParquetImporter(cfg);
assertEquals(-1, dataImporter.dataImport(jsc(), 0));
// Check for invalid partition key.
cfg = getHDFSParquetImporterConfig(srcFolder.toString(), hoodieFolder.toString(), "testTable", "COPY_ON_WRITE",
"_row_key", "invalidTimeStamp", 1, schemaFile.toString());
dataImporter = new HDFSParquetImporter(cfg);
assertEquals(-1, dataImporter.dataImport(jsc, 0));
}
// Check for invalid partition key.
cfg = getHDFSParquetImporterConfig(srcFolder.toString(), hoodieFolder.toString(), "testTable", "COPY_ON_WRITE",
"_row_key", "invalidTimeStamp", 1, schemaFile.toString());
dataImporter = new HDFSParquetImporter(cfg);
assertEquals(-1, dataImporter.dataImport(jsc(), 0));
}
public HDFSParquetImporter.Config getHDFSParquetImporterConfig(String srcPath, String targetPath, String tableName,
@@ -375,17 +331,11 @@ public class TestHDFSParquetImporter implements Serializable {
return cfg;
}
private JavaSparkContext getJavaSparkContext() {
// Initialize a local spark env
SparkConf sparkConf = new SparkConf().setAppName("TestConversionCommand").setMaster("local[1]");
sparkConf = HoodieWriteClient.registerClasses(sparkConf);
return new JavaSparkContext(HoodieReadClient.addHoodieSupport(sparkConf));
}
/**
* Class used for compare result and expected.
*/
public static class HoodieTripModel {
double timestamp;
String rowKey;
String rider;

View File

@@ -19,18 +19,16 @@
package org.apache.hudi.utilities.functional;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.testutils.HoodieCommonTestHarness;
import org.apache.hudi.common.testutils.HoodieTestUtils;
import org.apache.hudi.testutils.FunctionalTestHarness;
import org.apache.hudi.testutils.HoodieTestDataGenerator;
import org.apache.hudi.utilities.HoodieSnapshotCopier;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;
import java.io.File;
@@ -40,29 +38,25 @@ import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;
public class TestHoodieSnapshotCopier extends HoodieCommonTestHarness {
@Tag("functional")
public class TestHoodieSnapshotCopier extends FunctionalTestHarness {
private static final String TEST_WRITE_TOKEN = "1-0-1";
private String rootPath = null;
private String basePath = null;
private String outputPath = null;
private FileSystem fs = null;
private JavaSparkContext jsc = null;
private String basePath;
private String outputPath;
private FileSystem fs;
@BeforeEach
public void init() throws IOException {
// Prepare directories
rootPath = "file://" + tempDir.toString();
String rootPath = "file://" + tempDir.toString();
basePath = rootPath + "/" + HoodieTestUtils.RAW_TRIPS_TEST_NAME;
outputPath = rootPath + "/output";
final Configuration hadoopConf = HoodieTestUtils.getDefaultHadoopConf();
fs = FSUtils.getFs(basePath, hadoopConf);
HoodieTestUtils.init(hadoopConf, basePath);
// Start a local Spark job
SparkConf conf = new SparkConf().setAppName("snapshot-test-job").setMaster("local[2]");
jsc = new JavaSparkContext(conf);
}
@Test
@@ -73,7 +67,7 @@ public class TestHoodieSnapshotCopier extends HoodieCommonTestHarness {
// Do the snapshot
HoodieSnapshotCopier copier = new HoodieSnapshotCopier();
copier.snapshot(jsc, basePath, outputPath, true);
copier.snapshot(jsc(), basePath, outputPath, true);
// Nothing changed; we just bail out
assertEquals(fs.listStatus(new Path(basePath)).length, 1);
@@ -126,7 +120,7 @@ public class TestHoodieSnapshotCopier extends HoodieCommonTestHarness {
// Do a snapshot copy
HoodieSnapshotCopier copier = new HoodieSnapshotCopier();
copier.snapshot(jsc, basePath, outputPath, false);
copier.snapshot(jsc(), basePath, outputPath, false);
// Check results
assertTrue(fs.exists(new Path(outputPath + "/2016/05/01/" + file11.getName())));
@@ -147,14 +141,4 @@ public class TestHoodieSnapshotCopier extends HoodieCommonTestHarness {
assertTrue(fs.exists(new Path(outputPath + "/_SUCCESS")));
}
@AfterEach
public void cleanup() {
if (rootPath != null) {
new File(rootPath).delete();
}
if (jsc != null) {
jsc.stop();
}
}
}

View File

@@ -26,7 +26,7 @@ import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.config.HoodieIndexConfig;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.index.HoodieIndex.IndexType;
import org.apache.hudi.testutils.HoodieClientTestHarness;
import org.apache.hudi.testutils.FunctionalTestHarness;
import org.apache.hudi.testutils.HoodieTestDataGenerator;
import org.apache.hudi.utilities.HoodieSnapshotExporter;
import org.apache.hudi.utilities.HoodieSnapshotExporter.Config;
@@ -34,7 +34,6 @@ import org.apache.hudi.utilities.HoodieSnapshotExporter.OutputFormatValidator;
import org.apache.hudi.utilities.HoodieSnapshotExporter.Partitioner;
import org.apache.hudi.utilities.exception.HoodieSnapshotExporterException;
import com.beust.jcommander.ParameterException;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Path;
@@ -49,9 +48,9 @@ import org.apache.spark.sql.Row;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Nested;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.NullSource;
import org.junit.jupiter.params.provider.ValueSource;
import java.io.IOException;
@@ -59,12 +58,12 @@ import java.util.Arrays;
import java.util.List;
import java.util.stream.Collectors;
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
public class TestHoodieSnapshotExporter extends HoodieClientTestHarness {
@Tag("functional")
public class TestHoodieSnapshotExporter extends FunctionalTestHarness {
static final Logger LOG = LogManager.getLogger(TestHoodieSnapshotExporter.class);
static final int NUM_RECORDS = 100;
@@ -75,39 +74,35 @@ public class TestHoodieSnapshotExporter extends HoodieClientTestHarness {
String targetPath;
@BeforeEach
public void setUp() throws Exception {
initSparkContexts();
initDFS();
dataGen = new HoodieTestDataGenerator(new String[] {PARTITION_PATH});
public void init() throws Exception {
// Initialize test data dirs
sourcePath = dfsBasePath + "/source/";
targetPath = dfsBasePath + "/target/";
dfs.mkdirs(new Path(sourcePath));
sourcePath = dfsBasePath() + "/source/";
targetPath = dfsBasePath() + "/target/";
dfs().mkdirs(new Path(sourcePath));
HoodieTableMetaClient
.initTableType(jsc.hadoopConfiguration(), sourcePath, HoodieTableType.COPY_ON_WRITE, TABLE_NAME,
.initTableType(jsc().hadoopConfiguration(), sourcePath, HoodieTableType.COPY_ON_WRITE, TABLE_NAME,
HoodieAvroPayload.class.getName());
// Prepare data as source Hudi dataset
HoodieWriteConfig cfg = getHoodieWriteConfig(sourcePath);
HoodieWriteClient hdfsWriteClient = new HoodieWriteClient(jsc, cfg);
HoodieWriteClient hdfsWriteClient = new HoodieWriteClient(jsc(), cfg);
hdfsWriteClient.startCommitWithTime(COMMIT_TIME);
HoodieTestDataGenerator dataGen = new HoodieTestDataGenerator(new String[] {PARTITION_PATH});
List<HoodieRecord> records = dataGen.generateInserts(COMMIT_TIME, NUM_RECORDS);
JavaRDD<HoodieRecord> recordsRDD = jsc.parallelize(records, 1);
JavaRDD<HoodieRecord> recordsRDD = jsc().parallelize(records, 1);
hdfsWriteClient.bulkInsert(recordsRDD, COMMIT_TIME);
hdfsWriteClient.close();
RemoteIterator<LocatedFileStatus> itr = dfs.listFiles(new Path(sourcePath), true);
RemoteIterator<LocatedFileStatus> itr = dfs().listFiles(new Path(sourcePath), true);
while (itr.hasNext()) {
LOG.info(">>> Prepared test file: " + itr.next().getPath());
}
}
@AfterEach
public void tearDown() throws Exception {
cleanupSparkContexts();
cleanupDFS();
cleanupTestDataGenerator();
public void cleanUp() throws IOException {
dfs().delete(new Path(sourcePath), true);
dfs().delete(new Path(targetPath), true);
}
private HoodieWriteConfig getHoodieWriteConfig(String basePath) {
@@ -128,7 +123,7 @@ public class TestHoodieSnapshotExporter extends HoodieClientTestHarness {
private HoodieSnapshotExporter.Config cfg;
@BeforeEach
public void setUp() throws Exception {
public void setUp() {
cfg = new Config();
cfg.sourceBasePath = sourcePath;
cfg.targetOutputPath = targetPath;
@@ -137,21 +132,21 @@ public class TestHoodieSnapshotExporter extends HoodieClientTestHarness {
@Test
public void testExportAsHudi() throws IOException {
new HoodieSnapshotExporter().export(jsc, cfg);
new HoodieSnapshotExporter().export(jsc(), cfg);
// Check results
assertTrue(dfs.exists(new Path(targetPath + "/.hoodie/" + COMMIT_TIME + ".commit")));
assertTrue(dfs.exists(new Path(targetPath + "/.hoodie/" + COMMIT_TIME + ".commit.requested")));
assertTrue(dfs.exists(new Path(targetPath + "/.hoodie/" + COMMIT_TIME + ".inflight")));
assertTrue(dfs.exists(new Path(targetPath + "/.hoodie/hoodie.properties")));
assertTrue(dfs().exists(new Path(targetPath + "/.hoodie/" + COMMIT_TIME + ".commit")));
assertTrue(dfs().exists(new Path(targetPath + "/.hoodie/" + COMMIT_TIME + ".commit.requested")));
assertTrue(dfs().exists(new Path(targetPath + "/.hoodie/" + COMMIT_TIME + ".inflight")));
assertTrue(dfs().exists(new Path(targetPath + "/.hoodie/hoodie.properties")));
String partition = targetPath + "/" + PARTITION_PATH;
long numParquetFiles = Arrays.stream(dfs.listStatus(new Path(partition)))
long numParquetFiles = Arrays.stream(dfs().listStatus(new Path(partition)))
.filter(fileStatus -> fileStatus.getPath().toString().endsWith(".parquet"))
.count();
assertTrue(numParquetFiles >= 1, "There should exist at least 1 parquet file.");
assertEquals(NUM_RECORDS, sqlContext.read().parquet(partition).count());
assertTrue(dfs.exists(new Path(partition + "/.hoodie_partition_metadata")));
assertTrue(dfs.exists(new Path(targetPath + "/_SUCCESS")));
assertEquals(NUM_RECORDS, sqlContext().read().parquet(partition).count());
assertTrue(dfs().exists(new Path(partition + "/.hoodie_partition_metadata")));
assertTrue(dfs().exists(new Path(targetPath + "/_SUCCESS")));
}
}
@@ -161,7 +156,7 @@ public class TestHoodieSnapshotExporter extends HoodieClientTestHarness {
private HoodieSnapshotExporter.Config cfg;
@BeforeEach
public void setUp() throws Exception {
public void setUp() {
cfg = new Config();
cfg.sourceBasePath = sourcePath;
cfg.targetOutputPath = targetPath;
@@ -171,11 +166,11 @@ public class TestHoodieSnapshotExporter extends HoodieClientTestHarness {
@Test
public void testExportWhenTargetPathExists() throws IOException {
// make target output path present
dfs.mkdirs(new Path(targetPath));
dfs().mkdirs(new Path(targetPath));
// export
final Throwable thrown = assertThrows(HoodieSnapshotExporterException.class, () -> {
new HoodieSnapshotExporter().export(jsc, cfg);
new HoodieSnapshotExporter().export(jsc(), cfg);
});
assertEquals("The target output path already exists.", thrown.getMessage());
}
@@ -183,17 +178,17 @@ public class TestHoodieSnapshotExporter extends HoodieClientTestHarness {
@Test
public void testExportDatasetWithNoCommit() throws IOException {
// delete commit files
List<Path> commitFiles = Arrays.stream(dfs.listStatus(new Path(sourcePath + "/.hoodie")))
List<Path> commitFiles = Arrays.stream(dfs().listStatus(new Path(sourcePath + "/.hoodie")))
.map(FileStatus::getPath)
.filter(filePath -> filePath.getName().endsWith(".commit"))
.collect(Collectors.toList());
for (Path p : commitFiles) {
dfs.delete(p, false);
dfs().delete(p, false);
}
// export
final Throwable thrown = assertThrows(HoodieSnapshotExporterException.class, () -> {
new HoodieSnapshotExporter().export(jsc, cfg);
new HoodieSnapshotExporter().export(jsc(), cfg);
});
assertEquals("No commits present. Nothing to snapshot.", thrown.getMessage());
}
@@ -201,11 +196,11 @@ public class TestHoodieSnapshotExporter extends HoodieClientTestHarness {
@Test
public void testExportDatasetWithNoPartition() throws IOException {
// delete all source data
dfs.delete(new Path(sourcePath + "/" + PARTITION_PATH), true);
dfs().delete(new Path(sourcePath + "/" + PARTITION_PATH), true);
// export
final Throwable thrown = assertThrows(HoodieSnapshotExporterException.class, () -> {
new HoodieSnapshotExporter().export(jsc, cfg);
new HoodieSnapshotExporter().export(jsc(), cfg);
});
assertEquals("The source dataset has 0 partition to snapshot.", thrown.getMessage());
}
@@ -221,9 +216,9 @@ public class TestHoodieSnapshotExporter extends HoodieClientTestHarness {
cfg.sourceBasePath = sourcePath;
cfg.targetOutputPath = targetPath;
cfg.outputFormat = format;
new HoodieSnapshotExporter().export(jsc, cfg);
assertEquals(NUM_RECORDS, sqlContext.read().format(format).load(targetPath).count());
assertTrue(dfs.exists(new Path(targetPath + "/_SUCCESS")));
new HoodieSnapshotExporter().export(jsc(), cfg);
assertEquals(NUM_RECORDS, sqlContext().read().format(format).load(targetPath).count());
assertTrue(dfs().exists(new Path(targetPath + "/_SUCCESS")));
}
}
@@ -247,7 +242,7 @@ public class TestHoodieSnapshotExporter extends HoodieClientTestHarness {
private HoodieSnapshotExporter.Config cfg;
@BeforeEach
public void setUp() throws Exception {
public void setUp() {
cfg = new Config();
cfg.sourceBasePath = sourcePath;
cfg.targetOutputPath = targetPath;
@@ -258,49 +253,21 @@ public class TestHoodieSnapshotExporter extends HoodieClientTestHarness {
public void testExportWithPartitionField() throws IOException {
// `driver` field is set in HoodieTestDataGenerator
cfg.outputPartitionField = "driver";
new HoodieSnapshotExporter().export(jsc, cfg);
new HoodieSnapshotExporter().export(jsc(), cfg);
assertEquals(NUM_RECORDS, sqlContext.read().format("json").load(targetPath).count());
assertTrue(dfs.exists(new Path(targetPath + "/_SUCCESS")));
assertTrue(dfs.listStatus(new Path(targetPath)).length > 1);
assertEquals(NUM_RECORDS, sqlContext().read().format("json").load(targetPath).count());
assertTrue(dfs().exists(new Path(targetPath + "/_SUCCESS")));
assertTrue(dfs().listStatus(new Path(targetPath)).length > 1);
}
@Test
public void testExportForUserDefinedPartitioner() throws IOException {
cfg.outputPartitioner = UserDefinedPartitioner.class.getName();
new HoodieSnapshotExporter().export(jsc, cfg);
new HoodieSnapshotExporter().export(jsc(), cfg);
assertEquals(NUM_RECORDS, sqlContext.read().format("json").load(targetPath).count());
assertTrue(dfs.exists(new Path(targetPath + "/_SUCCESS")));
assertTrue(dfs.exists(new Path(String.format("%s/%s=%s", targetPath, UserDefinedPartitioner.PARTITION_NAME, PARTITION_PATH))));
}
}
@Nested
public class TestHoodieSnapshotExporterInputValidation {
@ParameterizedTest
@ValueSource(strings = {"json", "parquet", "hudi"})
public void testValidateOutputFormat_withValidFormat(String format) {
assertDoesNotThrow(() -> {
new OutputFormatValidator().validate(null, format);
});
}
@ParameterizedTest
@ValueSource(strings = {"", "JSON"})
public void testValidateOutputFormat_withInvalidFormat(String format) {
assertThrows(ParameterException.class, () -> {
new OutputFormatValidator().validate(null, format);
});
}
@ParameterizedTest
@NullSource
public void testValidateOutputFormat_withNullFormat(String format) {
assertThrows(ParameterException.class, () -> {
new OutputFormatValidator().validate(null, format);
});
assertEquals(NUM_RECORDS, sqlContext().read().format("json").load(targetPath).count());
assertTrue(dfs().exists(new Path(targetPath + "/_SUCCESS")));
assertTrue(dfs().exists(new Path(String.format("%s/%s=%s", targetPath, UserDefinedPartitioner.PARTITION_NAME, PARTITION_PATH))));
}
}
}

View File

@@ -20,6 +20,7 @@ package org.apache.hudi.utilities.functional;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.testutils.FunctionalTestHarness;
import org.apache.hudi.utilities.UtilHelpers;
import org.apache.hudi.utilities.schema.JdbcbasedSchemaProvider;
import org.apache.hudi.utilities.testutils.UtilitiesTestBase;
@@ -27,9 +28,8 @@ import org.apache.hudi.utilities.testutils.UtilitiesTestBase;
import org.apache.avro.Schema;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.spark.api.java.JavaSparkContext;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;
import java.io.IOException;
@@ -40,15 +40,14 @@ import java.sql.SQLException;
import static org.junit.jupiter.api.Assertions.assertEquals;
public class TestJdbcbasedSchemaProvider {
@Tag("functional")
public class TestJdbcbasedSchemaProvider extends FunctionalTestHarness {
private static final Logger LOG = LogManager.getLogger(TestJdbcbasedSchemaProvider.class);
private static final TypedProperties PROPS = new TypedProperties();
protected transient JavaSparkContext jsc = null;
@BeforeEach
public void init() {
jsc = UtilHelpers.buildSparkContext(this.getClass().getName() + "-hoodie", "local[2]");
@BeforeAll
public static void init() {
PROPS.setProperty("hoodie.deltastreamer.schemaprovider.source.schema.jdbc.connection.url", "jdbc:h2:mem:test_mem");
PROPS.setProperty("hoodie.deltastreamer.schemaprovider.source.schema.jdbc.driver.type", "org.h2.Driver");
PROPS.setProperty("hoodie.deltastreamer.schemaprovider.source.schema.jdbc.username", "sa");
@@ -58,18 +57,11 @@ public class TestJdbcbasedSchemaProvider {
PROPS.setProperty("hoodie.deltastreamer.schemaprovider.source.schema.jdbc.nullable", "false");
}
@AfterEach
public void teardown() throws Exception {
if (jsc != null) {
jsc.stop();
}
}
@Test
public void testJdbcbasedSchemaProvider() throws Exception {
try {
initH2Database();
Schema sourceSchema = UtilHelpers.createSchemaProvider(JdbcbasedSchemaProvider.class.getName(), PROPS, jsc).getSourceSchema();
Schema sourceSchema = UtilHelpers.createSchemaProvider(JdbcbasedSchemaProvider.class.getName(), PROPS, jsc()).getSourceSchema();
assertEquals(sourceSchema.toString().toUpperCase(), new Schema.Parser().parse(UtilitiesTestBase.Helpers.readFile("delta-streamer-config/source-jdbc.avsc")).toString().toUpperCase());
} catch (HoodieException e) {
LOG.error("Failed to get connection through jdbc. ", e);

View File

@@ -0,0 +1,32 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.hudi.utilities.functional;
import org.junit.platform.runner.JUnitPlatform;
import org.junit.platform.suite.api.IncludeTags;
import org.junit.platform.suite.api.SelectPackages;
import org.junit.runner.RunWith;
@RunWith(JUnitPlatform.class)
@SelectPackages("org.apache.hudi.utilities.functional")
@IncludeTags("functional")
public class UtilitiesFunctionalTestSuite {
}

View File

@@ -19,67 +19,15 @@
package org.apache.hudi.utilities.transform;
import org.apache.hudi.utilities.UtilHelpers;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import java.util.Arrays;
import java.util.List;
import static org.apache.spark.sql.types.DataTypes.IntegerType;
import static org.apache.spark.sql.types.DataTypes.StringType;
import static org.apache.spark.sql.types.DataTypes.createStructField;
import static org.junit.jupiter.api.Assertions.assertArrayEquals;
import static org.junit.jupiter.api.Assertions.assertEquals;
public class TestChainedTransformer {
private JavaSparkContext jsc;
private SparkSession sparkSession;
@BeforeEach
public void setUp() {
jsc = UtilHelpers.buildSparkContext(this.getClass().getName() + "-hoodie", "local[2]");
sparkSession = SparkSession.builder().config(jsc.getConf()).getOrCreate();
}
@AfterEach
public void tearDown() {
jsc.stop();
}
@Test
public void testChainedTransformation() {
StructType schema = DataTypes.createStructType(
new StructField[] {
createStructField("foo", StringType, false)
});
Row r1 = RowFactory.create("100");
Row r2 = RowFactory.create("200");
Dataset<Row> original = sparkSession.sqlContext().createDataFrame(Arrays.asList(r1, r2), schema);
Transformer t1 = (jsc, sparkSession, dataset, properties) -> dataset.withColumnRenamed("foo", "bar");
Transformer t2 = (jsc, sparkSession, dataset, properties) -> dataset.withColumn("bar", dataset.col("bar").cast(IntegerType));
ChainedTransformer transformer = new ChainedTransformer(Arrays.asList(t1, t2));
Dataset<Row> transformed = transformer.apply(jsc, sparkSession, original, null);
assertEquals(2, transformed.count());
assertArrayEquals(new String[] {"bar"}, transformed.columns());
List<Row> rows = transformed.collectAsList();
assertEquals(100, rows.get(0).getInt(0));
assertEquals(200, rows.get(1).getInt(0));
}
@Test
public void testGetTransformersNames() {
Transformer t1 = (jsc, sparkSession, dataset, properties) -> dataset.withColumnRenamed("foo", "bar");

26
pom.xml
View File

@@ -84,8 +84,9 @@
<kafka.version>2.0.0</kafka.version>
<glassfish.version>2.17</glassfish.version>
<parquet.version>1.10.1</parquet.version>
<junit.jupiter.version>5.6.1</junit.jupiter.version>
<junit.vintage.version>5.6.1</junit.vintage.version>
<junit.jupiter.version>5.7.0-M1</junit.jupiter.version>
<junit.vintage.version>5.7.0-M1</junit.vintage.version>
<junit.platform.version>1.7.0-M1</junit.platform.version>
<mockito.jupiter.version>3.3.3</mockito.jupiter.version>
<log4j.version>1.2.17</log4j.version>
<slf4j.version>1.7.5</slf4j.version>
@@ -833,6 +834,27 @@
<version>${mockito.jupiter.version}</version>
</dependency>
<dependency>
<groupId>org.junit.platform</groupId>
<artifactId>junit-platform-runner</artifactId>
<version>${junit.platform.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.junit.platform</groupId>
<artifactId>junit-platform-suite-api</artifactId>
<version>${junit.platform.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.junit.platform</groupId>
<artifactId>junit-platform-commons</artifactId>
<version>${junit.platform.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<!--Used to test execution in task executor after de-serializing-->
<groupId>com.esotericsoftware</groupId>

View File

@@ -267,7 +267,7 @@
<property name="regexp" value="true"/>
<property name="illegalPkgs" value="org\.apache\.commons, com\.google\.common"/>
<property name="illegalClasses"
value="^java\.util\.Optional, ^org\.junit\.(?!jupiter|platform|contrib|Rule)(.*)"/>
value="^java\.util\.Optional, ^org\.junit\.(?!jupiter|platform|contrib|Rule|runner)(.*)"/>
</module>
<module name="EmptyStatement" />