diff --git a/hudi-client/src/test/java/org/apache/hudi/testutils/DFSProvider.java b/hudi-client/src/test/java/org/apache/hudi/testutils/DFSProvider.java new file mode 100644 index 000000000..16cc47152 --- /dev/null +++ b/hudi-client/src/test/java/org/apache/hudi/testutils/DFSProvider.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hudi.testutils; + +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hdfs.DistributedFileSystem; +import org.apache.hadoop.hdfs.MiniDFSCluster; + +public interface DFSProvider { + + MiniDFSCluster dfsCluster(); + + DistributedFileSystem dfs(); + + Path dfsBasePath(); + +} diff --git a/hudi-client/src/test/java/org/apache/hudi/testutils/FunctionalTestHarness.java b/hudi-client/src/test/java/org/apache/hudi/testutils/FunctionalTestHarness.java new file mode 100644 index 000000000..562563ee5 --- /dev/null +++ b/hudi-client/src/test/java/org/apache/hudi/testutils/FunctionalTestHarness.java @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hudi.testutils; + +import org.apache.hudi.client.HoodieWriteClient; +import org.apache.hudi.common.testutils.minicluster.HdfsTestService; + +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hdfs.DistributedFileSystem; +import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.sql.SQLContext; +import org.apache.spark.sql.SparkSession; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.io.TempDir; + +import java.io.IOException; + +public class FunctionalTestHarness implements SparkProvider, DFSProvider { + + private static transient SparkSession spark; + private static transient SQLContext sqlContext; + private static transient JavaSparkContext jsc; + + private static transient HdfsTestService hdfsTestService; + private static transient MiniDFSCluster dfsCluster; + private static transient DistributedFileSystem dfs; + + /** + * An indicator of the initialization status. + */ + protected boolean initialized = false; + @TempDir + protected java.nio.file.Path tempDir; + + @Override + public SparkSession spark() { + return spark; + } + + @Override + public SQLContext sqlContext() { + return sqlContext; + } + + @Override + public JavaSparkContext jsc() { + return jsc; + } + + @Override + public MiniDFSCluster dfsCluster() { + return dfsCluster; + } + + @Override + public DistributedFileSystem dfs() { + return dfs; + } + + @Override + public Path dfsBasePath() { + return dfs.getWorkingDirectory(); + } + + @BeforeEach + public synchronized void runBeforeEach() throws Exception { + initialized = spark != null && hdfsTestService != null; + if (!initialized) { + FileSystem.closeAll(); + + spark = SparkSession.builder() + .config(HoodieWriteClient.registerClasses(conf())) + .getOrCreate(); + sqlContext = spark.sqlContext(); + jsc = new JavaSparkContext(spark.sparkContext()); + + hdfsTestService = new HdfsTestService(); + dfsCluster = hdfsTestService.start(true); + dfs = dfsCluster.getFileSystem(); + dfs.mkdirs(dfs.getWorkingDirectory()); + + Runtime.getRuntime().addShutdownHook(new Thread(() -> { + hdfsTestService.stop(); + hdfsTestService = null; + + spark.stop(); + spark = null; + })); + } + } + + @AfterAll + public static synchronized void cleanUpAfterAll() throws IOException { + Path workDir = dfs.getWorkingDirectory(); + FileSystem fs = workDir.getFileSystem(hdfsTestService.getHadoopConf()); + FileStatus[] fileStatuses = dfs.listStatus(workDir); + for (FileStatus f : fileStatuses) { + fs.delete(f.getPath(), true); + } + } +} diff --git a/hudi-client/src/test/java/org/apache/hudi/testutils/SparkProvider.java b/hudi-client/src/test/java/org/apache/hudi/testutils/SparkProvider.java new file mode 100644 index 000000000..948f73696 --- /dev/null +++ b/hudi-client/src/test/java/org/apache/hudi/testutils/SparkProvider.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hudi.testutils; + +import org.apache.spark.SparkConf; +import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.sql.SQLContext; +import org.apache.spark.sql.SparkSession; + +import java.util.Collections; +import java.util.Map; + +public interface SparkProvider { + + SparkSession spark(); + + SQLContext sqlContext(); + + JavaSparkContext jsc(); + + default SparkConf conf(Map overwritingConfigs) { + SparkConf sparkConf = new SparkConf(); + sparkConf.set("spark.app.name", getClass().getName()); + sparkConf.set("spark.master", "local[*]"); + sparkConf.set("spark.driver.maxResultSize", "2g"); + sparkConf.set("spark.hadoop.mapred.output.compress", "true"); + sparkConf.set("spark.hadoop.mapred.output.compression.codec", "true"); + sparkConf.set("spark.hadoop.mapred.output.compression.codec", "org.apache.hadoop.io.compress.GzipCodec"); + sparkConf.set("spark.hadoop.mapred.output.compression.type", "BLOCK"); + sparkConf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer"); + overwritingConfigs.forEach(sparkConf::set); + return sparkConf; + } + + default SparkConf conf() { + return conf(Collections.emptyMap()); + } +} diff --git a/hudi-common/src/test/java/org/apache/hudi/common/testutils/minicluster/HdfsTestService.java b/hudi-common/src/test/java/org/apache/hudi/common/testutils/minicluster/HdfsTestService.java index a50e69d6f..44819d3ea 100644 --- a/hudi-common/src/test/java/org/apache/hudi/common/testutils/minicluster/HdfsTestService.java +++ b/hudi-common/src/test/java/org/apache/hudi/common/testutils/minicluster/HdfsTestService.java @@ -45,7 +45,7 @@ public class HdfsTestService { * Configuration settings. */ private Configuration hadoopConf; - private String workDir; + private final String workDir; /** * Embedded HDFS cluster. @@ -53,7 +53,7 @@ public class HdfsTestService { private MiniDFSCluster miniDfsCluster; public HdfsTestService() throws IOException { - workDir = Files.createTempDirectory("temp").toFile().getAbsolutePath(); + workDir = Files.createTempDirectory("temp").toAbsolutePath().toString(); } public Configuration getHadoopConf() { diff --git a/hudi-utilities/pom.xml b/hudi-utilities/pom.xml index 360a3d48a..7cb78a189 100644 --- a/hudi-utilities/pom.xml +++ b/hudi-utilities/pom.xml @@ -391,5 +391,23 @@ mockito-junit-jupiter test + + + org.junit.platform + junit-platform-runner + test + + + + org.junit.platform + junit-platform-suite-api + test + + + + org.junit.platform + junit-platform-commons + test + diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHoodieSnapshotExporter.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHoodieSnapshotExporter.java new file mode 100644 index 000000000..ae74c1d04 --- /dev/null +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHoodieSnapshotExporter.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hudi.utilities; + +import org.apache.hudi.utilities.HoodieSnapshotExporter.OutputFormatValidator; + +import com.beust.jcommander.ParameterException; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.NullSource; +import org.junit.jupiter.params.provider.ValueSource; + +import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; +import static org.junit.jupiter.api.Assertions.assertThrows; + +public class TestHoodieSnapshotExporter { + + @ParameterizedTest + @ValueSource(strings = {"json", "parquet", "hudi"}) + public void testValidateOutputFormatWithValidFormat(String format) { + assertDoesNotThrow(() -> { + new OutputFormatValidator().validate(null, format); + }); + } + + @ParameterizedTest + @ValueSource(strings = {"", "JSON"}) + public void testValidateOutputFormatWithInvalidFormat(String format) { + assertThrows(ParameterException.class, () -> { + new OutputFormatValidator().validate(null, format); + }); + } + + @ParameterizedTest + @NullSource + public void testValidateOutputFormatWithNullFormat(String format) { + assertThrows(ParameterException.class, () -> { + new OutputFormatValidator().validate(null, format); + }); + } +} diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/checkpointing/TestKafkaConnectHdfsProvider.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/checkpointing/TestKafkaConnectHdfsProvider.java index 2b2d68870..5b5282ae1 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/checkpointing/TestKafkaConnectHdfsProvider.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/checkpointing/TestKafkaConnectHdfsProvider.java @@ -23,33 +23,23 @@ import org.apache.hudi.common.testutils.HoodieCommonTestHarness; import org.apache.hudi.common.testutils.HoodieTestUtils; import org.apache.hudi.exception.HoodieException; -import org.apache.hadoop.conf.Configuration; -import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import java.io.File; +import java.nio.file.Files; +import java.nio.file.Path; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertThrows; public class TestKafkaConnectHdfsProvider extends HoodieCommonTestHarness { - private String topicPath = null; - private Configuration hadoopConf = null; - - @BeforeEach - public void init() { - // Prepare directories - initPath(); - hadoopConf = HoodieTestUtils.getDefaultHadoopConf(); - } - @Test public void testValidKafkaConnectPath() throws Exception { // a standard format(time based partition) of the files managed by kafka connect is: // topic/year=xxx/month=xxx/day=xxx/topic+partition+lowerOffset+upperOffset.file - topicPath = basePath + "/topic1"; - new File(topicPath).mkdirs(); + Path topicPath = tempDir.resolve("topic1"); + Files.createDirectories(topicPath); // create regular kafka connect hdfs dirs new File(topicPath + "/year=2016/month=05/day=01/").mkdirs(); new File(topicPath + "/year=2016/month=05/day=02/").mkdirs(); @@ -70,16 +60,16 @@ public class TestKafkaConnectHdfsProvider extends HoodieCommonTestHarness { new File(topicPath + "/year=2016/month=05/day=02/" + "random_snappy_2.parquet").createNewFile(); final TypedProperties props = new TypedProperties(); - props.put("hoodie.deltastreamer.checkpoint.provider.path", topicPath); + props.put("hoodie.deltastreamer.checkpoint.provider.path", topicPath.toString()); final InitialCheckPointProvider provider = new KafkaConnectHdfsProvider(props); - provider.init(hadoopConf); + provider.init(HoodieTestUtils.getDefaultHadoopConf()); assertEquals("topic1,0:300,1:200", provider.getCheckpoint()); } @Test public void testMissingPartition() throws Exception { - topicPath = basePath + "/topic2"; - new File(topicPath).mkdirs(); + Path topicPath = tempDir.resolve("topic2"); + Files.createDirectories(topicPath); // create regular kafka connect hdfs dirs new File(topicPath + "/year=2016/month=05/day=01/").mkdirs(); new File(topicPath + "/year=2016/month=05/day=02/").mkdirs(); @@ -91,9 +81,9 @@ public class TestKafkaConnectHdfsProvider extends HoodieCommonTestHarness { new File(topicPath + "/year=2016/month=05/day=02/" + "topic1+0+201+300.parquet").createNewFile(); final TypedProperties props = new TypedProperties(); - props.put("hoodie.deltastreamer.checkpoint.provider.path", topicPath); + props.put("hoodie.deltastreamer.checkpoint.provider.path", topicPath.toString()); final InitialCheckPointProvider provider = new KafkaConnectHdfsProvider(props); - provider.init(hadoopConf); + provider.init(HoodieTestUtils.getDefaultHadoopConf()); assertThrows(HoodieException.class, provider::getCheckpoint); } } diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestAWSDatabaseMigrationServiceSource.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestAWSDatabaseMigrationServiceSource.java index 851df4c33..30efac4a6 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestAWSDatabaseMigrationServiceSource.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestAWSDatabaseMigrationServiceSource.java @@ -19,18 +19,15 @@ package org.apache.hudi.utilities.functional; import org.apache.hudi.payload.AWSDmsAvroPayload; -import org.apache.hudi.utilities.UtilHelpers; +import org.apache.hudi.testutils.FunctionalTestHarness; import org.apache.hudi.utilities.transform.AWSDmsTransformer; import org.apache.avro.Schema; import org.apache.avro.generic.GenericData; import org.apache.avro.generic.GenericRecord; -import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; -import org.apache.spark.sql.SparkSession; -import org.junit.jupiter.api.AfterAll; -import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Tag; import org.junit.jupiter.api.Test; import java.io.IOException; @@ -40,23 +37,8 @@ import java.util.Arrays; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertTrue; -public class TestAWSDatabaseMigrationServiceSource { - - private static JavaSparkContext jsc; - private static SparkSession spark; - - @BeforeAll - public static void setupTest() { - jsc = UtilHelpers.buildSparkContext("aws-dms-test", "local[2]"); - spark = SparkSession.builder().config(jsc.getConf()).getOrCreate(); - } - - @AfterAll - public static void tearDownTest() { - if (jsc != null) { - jsc.stop(); - } - } +@Tag("functional") +public class TestAWSDatabaseMigrationServiceSource extends FunctionalTestHarness { @Test public void testPayload() throws IOException { @@ -83,6 +65,7 @@ public class TestAWSDatabaseMigrationServiceSource { } static class Record implements Serializable { + String id; long ts; @@ -95,11 +78,11 @@ public class TestAWSDatabaseMigrationServiceSource { @Test public void testTransformer() { AWSDmsTransformer transformer = new AWSDmsTransformer(); - Dataset inputFrame = spark.createDataFrame(Arrays.asList( + Dataset inputFrame = spark().createDataFrame(Arrays.asList( new Record("1", 3433L), new Record("2", 3433L)), Record.class); - Dataset outputFrame = transformer.apply(jsc, spark, inputFrame, null); + Dataset outputFrame = transformer.apply(jsc(), spark(), inputFrame, null); assertTrue(Arrays.stream(outputFrame.schema().fields()) .map(f -> f.name()).anyMatch(n -> n.equals(AWSDmsAvroPayload.OP_FIELD))); assertTrue(outputFrame.select(AWSDmsAvroPayload.OP_FIELD).collectAsList().stream() diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestChainedTransformer.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestChainedTransformer.java new file mode 100644 index 000000000..095b12d34 --- /dev/null +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestChainedTransformer.java @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hudi.utilities.functional; + +import org.apache.hudi.testutils.FunctionalTestHarness; +import org.apache.hudi.utilities.transform.ChainedTransformer; +import org.apache.hudi.utilities.transform.Transformer; + +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.RowFactory; +import org.apache.spark.sql.types.DataTypes; +import org.apache.spark.sql.types.StructField; +import org.apache.spark.sql.types.StructType; +import org.junit.jupiter.api.Tag; +import org.junit.jupiter.api.Test; + +import java.util.Arrays; +import java.util.List; + +import static org.apache.spark.sql.types.DataTypes.IntegerType; +import static org.apache.spark.sql.types.DataTypes.StringType; +import static org.apache.spark.sql.types.DataTypes.createStructField; +import static org.junit.jupiter.api.Assertions.assertArrayEquals; +import static org.junit.jupiter.api.Assertions.assertEquals; + +@Tag("functional") +public class TestChainedTransformer extends FunctionalTestHarness { + + @Test + public void testChainedTransformation() { + StructType schema = DataTypes.createStructType( + new StructField[] { + createStructField("foo", StringType, false) + }); + Row r1 = RowFactory.create("100"); + Row r2 = RowFactory.create("200"); + Dataset original = spark().sqlContext().createDataFrame(Arrays.asList(r1, r2), schema); + + Transformer t1 = (jsc, sparkSession, dataset, properties) -> dataset.withColumnRenamed("foo", "bar"); + Transformer t2 = (jsc, sparkSession, dataset, properties) -> dataset.withColumn("bar", dataset.col("bar").cast(IntegerType)); + ChainedTransformer transformer = new ChainedTransformer(Arrays.asList(t1, t2)); + Dataset transformed = transformer.apply(jsc(), spark(), original, null); + + assertEquals(2, transformed.count()); + assertArrayEquals(new String[] {"bar"}, transformed.columns()); + List rows = transformed.collectAsList(); + assertEquals(100, rows.get(0).getInt(0)); + assertEquals(200, rows.get(1).getInt(0)); + } + +} diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHDFSParquetImporter.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHDFSParquetImporter.java index b396bac50..90fa31d0a 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHDFSParquetImporter.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHDFSParquetImporter.java @@ -18,12 +18,10 @@ package org.apache.hudi.utilities.functional; -import org.apache.hudi.client.HoodieReadClient; -import org.apache.hudi.client.HoodieWriteClient; import org.apache.hudi.common.table.timeline.HoodieActiveTimeline; import org.apache.hudi.common.table.timeline.HoodieTimeline; import org.apache.hudi.common.testutils.HoodieTestUtils; -import org.apache.hudi.common.testutils.minicluster.HdfsTestService; +import org.apache.hudi.testutils.FunctionalTestHarness; import org.apache.hudi.testutils.HoodieClientTestUtils; import org.apache.hudi.testutils.HoodieTestDataGenerator; import org.apache.hudi.utilities.HDFSParquetImporter; @@ -33,19 +31,14 @@ import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.LocatedFileStatus; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.RemoteIterator; -import org.apache.hadoop.hdfs.DistributedFileSystem; -import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.parquet.avro.AvroParquetWriter; import org.apache.parquet.hadoop.ParquetWriter; -import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; -import org.apache.spark.sql.SQLContext; -import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.AfterEach; -import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Tag; import org.junit.jupiter.api.Test; import java.io.IOException; @@ -64,30 +57,8 @@ import java.util.stream.Collectors; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertTrue; -public class TestHDFSParquetImporter implements Serializable { - - private static String dfsBasePath; - private static HdfsTestService hdfsTestService; - private static MiniDFSCluster dfsCluster; - private static DistributedFileSystem dfs; - - @BeforeAll - public static void initClass() throws Exception { - hdfsTestService = new HdfsTestService(); - dfsCluster = hdfsTestService.start(true); - - // Create a temp folder as the base path - dfs = dfsCluster.getFileSystem(); - dfsBasePath = dfs.getWorkingDirectory().toString(); - dfs.mkdirs(new Path(dfsBasePath)); - } - - @AfterAll - public static void cleanupClass() { - if (hdfsTestService != null) { - hdfsTestService.stop(); - } - } +@Tag("functional") +public class TestHDFSParquetImporter extends FunctionalTestHarness implements Serializable { private String basePath; private transient Path hoodieFolder; @@ -96,7 +67,7 @@ public class TestHDFSParquetImporter implements Serializable { @BeforeEach public void init() throws IOException, ParseException { - basePath = (new Path(dfsBasePath, Thread.currentThread().getStackTrace()[1].getMethodName())).toString(); + basePath = (new Path(dfsBasePath(), Thread.currentThread().getStackTrace()[1].getMethodName())).toString(); // Hoodie root folder. hoodieFolder = new Path(basePath, "testTarget"); @@ -108,7 +79,7 @@ public class TestHDFSParquetImporter implements Serializable { @AfterEach public void clean() throws IOException { - dfs.delete(new Path(basePath), true); + dfs().delete(new Path(basePath), true); } /** @@ -116,57 +87,54 @@ public class TestHDFSParquetImporter implements Serializable { */ @Test public void testImportWithRetries() throws Exception { - try (JavaSparkContext jsc = getJavaSparkContext()) { - // Create schema file. - String schemaFile = new Path(basePath, "file.schema").toString(); + // Create schema file. + String schemaFile = new Path(basePath, "file.schema").toString(); - HDFSParquetImporter.Config cfg = getHDFSParquetImporterConfig(srcFolder.toString(), hoodieFolder.toString(), - "testTable", "COPY_ON_WRITE", "_row_key", "timestamp", 1, schemaFile); - AtomicInteger retry = new AtomicInteger(3); - AtomicInteger fileCreated = new AtomicInteger(0); - HDFSParquetImporter dataImporter = new HDFSParquetImporter(cfg) { - @Override - protected int dataImport(JavaSparkContext jsc) throws IOException { - int ret = super.dataImport(jsc); - if (retry.decrementAndGet() == 0) { - fileCreated.incrementAndGet(); - createSchemaFile(schemaFile); - } - - return ret; + HDFSParquetImporter.Config cfg = getHDFSParquetImporterConfig(srcFolder.toString(), hoodieFolder.toString(), + "testTable", "COPY_ON_WRITE", "_row_key", "timestamp", 1, schemaFile); + AtomicInteger retry = new AtomicInteger(3); + AtomicInteger fileCreated = new AtomicInteger(0); + HDFSParquetImporter dataImporter = new HDFSParquetImporter(cfg) { + @Override + protected int dataImport(JavaSparkContext jsc) throws IOException { + int ret = super.dataImport(jsc); + if (retry.decrementAndGet() == 0) { + fileCreated.incrementAndGet(); + createSchemaFile(schemaFile); } - }; - // Schema file is not created so this operation should fail. - assertEquals(0, dataImporter.dataImport(jsc, retry.get())); - assertEquals(-1, retry.get()); - assertEquals(1, fileCreated.get()); - // Check if - // 1. .commit file is present - // 2. number of records in each partition == 24 - // 3. total number of partitions == 4; - boolean isCommitFilePresent = false; - Map recordCounts = new HashMap(); - RemoteIterator hoodieFiles = dfs.listFiles(hoodieFolder, true); - while (hoodieFiles.hasNext()) { - LocatedFileStatus f = hoodieFiles.next(); - isCommitFilePresent = isCommitFilePresent || f.getPath().toString().endsWith(HoodieTimeline.COMMIT_EXTENSION); + return ret; + } + }; + // Schema file is not created so this operation should fail. + assertEquals(0, dataImporter.dataImport(jsc(), retry.get())); + assertEquals(-1, retry.get()); + assertEquals(1, fileCreated.get()); - if (f.getPath().toString().endsWith("parquet")) { - SQLContext sc = new SQLContext(jsc); - String partitionPath = f.getPath().getParent().toString(); - long count = sc.read().parquet(f.getPath().toString()).count(); - if (!recordCounts.containsKey(partitionPath)) { - recordCounts.put(partitionPath, 0L); - } - recordCounts.put(partitionPath, recordCounts.get(partitionPath) + count); + // Check if + // 1. .commit file is present + // 2. number of records in each partition == 24 + // 3. total number of partitions == 4; + boolean isCommitFilePresent = false; + Map recordCounts = new HashMap(); + RemoteIterator hoodieFiles = dfs().listFiles(hoodieFolder, true); + while (hoodieFiles.hasNext()) { + LocatedFileStatus f = hoodieFiles.next(); + isCommitFilePresent = isCommitFilePresent || f.getPath().toString().endsWith(HoodieTimeline.COMMIT_EXTENSION); + + if (f.getPath().toString().endsWith("parquet")) { + String partitionPath = f.getPath().getParent().toString(); + long count = sqlContext().read().parquet(f.getPath().toString()).count(); + if (!recordCounts.containsKey(partitionPath)) { + recordCounts.put(partitionPath, 0L); } + recordCounts.put(partitionPath, recordCounts.get(partitionPath) + count); } - assertTrue(isCommitFilePresent, "commit file is missing"); - assertEquals(4, recordCounts.size(), "partition is missing"); - for (Entry e : recordCounts.entrySet()) { - assertEquals(24, e.getValue().longValue(), "missing records"); - } + } + assertTrue(isCommitFilePresent, "commit file is missing"); + assertEquals(4, recordCounts.size(), "partition is missing"); + for (Entry e : recordCounts.entrySet()) { + assertEquals(24, e.getValue().longValue(), "missing records"); } } @@ -187,30 +155,27 @@ public class TestHDFSParquetImporter implements Serializable { */ @Test public void testImportWithInsert() throws IOException, ParseException { - try (JavaSparkContext jsc = getJavaSparkContext()) { - insert(jsc); - SQLContext sqlContext = new SQLContext(jsc); - Dataset ds = HoodieClientTestUtils.read(jsc, basePath + "/testTarget", sqlContext, dfs, basePath + "/testTarget/*/*/*/*"); + insert(jsc()); + Dataset ds = HoodieClientTestUtils.read(jsc(), basePath + "/testTarget", sqlContext(), dfs(), basePath + "/testTarget/*/*/*/*"); - List readData = ds.select("timestamp", "_row_key", "rider", "driver", "begin_lat", "begin_lon", "end_lat", "end_lon").collectAsList(); - List result = readData.stream().map(row -> - new HoodieTripModel(row.getDouble(0), row.getString(1), row.getString(2), row.getString(3), row.getDouble(4), - row.getDouble(5), row.getDouble(6), row.getDouble(7))) - .collect(Collectors.toList()); + List readData = ds.select("timestamp", "_row_key", "rider", "driver", "begin_lat", "begin_lon", "end_lat", "end_lon").collectAsList(); + List result = readData.stream().map(row -> + new HoodieTripModel(row.getDouble(0), row.getString(1), row.getString(2), row.getString(3), row.getDouble(4), + row.getDouble(5), row.getDouble(6), row.getDouble(7))) + .collect(Collectors.toList()); - List expected = insertData.stream().map(g -> - new HoodieTripModel(Double.parseDouble(g.get("timestamp").toString()), - g.get("_row_key").toString(), - g.get("rider").toString(), - g.get("driver").toString(), - Double.parseDouble(g.get("begin_lat").toString()), - Double.parseDouble(g.get("begin_lon").toString()), - Double.parseDouble(g.get("end_lat").toString()), - Double.parseDouble(g.get("end_lon").toString()))) - .collect(Collectors.toList()); + List expected = insertData.stream().map(g -> + new HoodieTripModel(Double.parseDouble(g.get("timestamp").toString()), + g.get("_row_key").toString(), + g.get("rider").toString(), + g.get("driver").toString(), + Double.parseDouble(g.get("begin_lat").toString()), + Double.parseDouble(g.get("begin_lon").toString()), + Double.parseDouble(g.get("end_lat").toString()), + Double.parseDouble(g.get("end_lon").toString()))) + .collect(Collectors.toList()); - assertTrue(result.containsAll(expected) && expected.containsAll(result) && result.size() == expected.size()); - } + assertTrue(result.containsAll(expected) && expected.containsAll(result) && result.size() == expected.size()); } /** @@ -218,50 +183,47 @@ public class TestHDFSParquetImporter implements Serializable { */ @Test public void testImportWithUpsert() throws IOException, ParseException { - try (JavaSparkContext jsc = getJavaSparkContext()) { - insert(jsc); + insert(jsc()); - // Create schema file. - String schemaFile = new Path(basePath, "file.schema").toString(); + // Create schema file. + String schemaFile = new Path(basePath, "file.schema").toString(); - Path upsertFolder = new Path(basePath, "testUpsertSrc"); - List upsertData = createUpsertRecords(upsertFolder); + Path upsertFolder = new Path(basePath, "testUpsertSrc"); + List upsertData = createUpsertRecords(upsertFolder); - HDFSParquetImporter.Config cfg = getHDFSParquetImporterConfig(upsertFolder.toString(), hoodieFolder.toString(), - "testTable", "COPY_ON_WRITE", "_row_key", "timestamp", 1, schemaFile); - cfg.command = "upsert"; - HDFSParquetImporter dataImporter = new HDFSParquetImporter(cfg); + HDFSParquetImporter.Config cfg = getHDFSParquetImporterConfig(upsertFolder.toString(), hoodieFolder.toString(), + "testTable", "COPY_ON_WRITE", "_row_key", "timestamp", 1, schemaFile); + cfg.command = "upsert"; + HDFSParquetImporter dataImporter = new HDFSParquetImporter(cfg); - dataImporter.dataImport(jsc, 0); + dataImporter.dataImport(jsc(), 0); - // construct result, remove top 10 and add upsert data. - List expectData = insertData.subList(11, 96); - expectData.addAll(upsertData); + // construct result, remove top 10 and add upsert data. + List expectData = insertData.subList(11, 96); + expectData.addAll(upsertData); - // read latest data - SQLContext sqlContext = new SQLContext(jsc); - Dataset ds = HoodieClientTestUtils.read(jsc, basePath + "/testTarget", sqlContext, dfs, basePath + "/testTarget/*/*/*/*"); + // read latest data + Dataset ds = HoodieClientTestUtils.read(jsc(), basePath + "/testTarget", sqlContext(), dfs(), basePath + "/testTarget/*/*/*/*"); - List readData = ds.select("timestamp", "_row_key", "rider", "driver", "begin_lat", "begin_lon", "end_lat", "end_lon").collectAsList(); - List result = readData.stream().map(row -> - new HoodieTripModel(row.getDouble(0), row.getString(1), row.getString(2), row.getString(3), row.getDouble(4), - row.getDouble(5), row.getDouble(6), row.getDouble(7))) - .collect(Collectors.toList()); + List readData = ds.select("timestamp", "_row_key", "rider", "driver", "begin_lat", "begin_lon", "end_lat", "end_lon").collectAsList(); + List result = readData.stream().map(row -> + new HoodieTripModel(row.getDouble(0), row.getString(1), row.getString(2), row.getString(3), row.getDouble(4), + row.getDouble(5), row.getDouble(6), row.getDouble(7))) + .collect(Collectors.toList()); - // get expected result. - List expected = expectData.stream().map(g -> - new HoodieTripModel(Double.parseDouble(g.get("timestamp").toString()), - g.get("_row_key").toString(), - g.get("rider").toString(), - g.get("driver").toString(), - Double.parseDouble(g.get("begin_lat").toString()), - Double.parseDouble(g.get("begin_lon").toString()), - Double.parseDouble(g.get("end_lat").toString()), - Double.parseDouble(g.get("end_lon").toString()))) - .collect(Collectors.toList()); + // get expected result. + List expected = expectData.stream().map(g -> + new HoodieTripModel(Double.parseDouble(g.get("timestamp").toString()), + g.get("_row_key").toString(), + g.get("rider").toString(), + g.get("driver").toString(), + Double.parseDouble(g.get("begin_lat").toString()), + Double.parseDouble(g.get("begin_lon").toString()), + Double.parseDouble(g.get("end_lat").toString()), + Double.parseDouble(g.get("end_lon").toString()))) + .collect(Collectors.toList()); - assertTrue(result.containsAll(expected) && expected.containsAll(result) && result.size() == expected.size()); - } + assertTrue(result.containsAll(expected) && expected.containsAll(result) && result.size() == expected.size()); } public List createInsertRecords(Path srcFolder) throws ParseException, IOException { @@ -305,7 +267,7 @@ public class TestHDFSParquetImporter implements Serializable { } private void createSchemaFile(String schemaFile) throws IOException { - FSDataOutputStream schemaFileOS = dfs.create(new Path(schemaFile)); + FSDataOutputStream schemaFileOS = dfs().create(new Path(schemaFile)); schemaFileOS.write(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA.getBytes()); schemaFileOS.close(); } @@ -315,22 +277,19 @@ public class TestHDFSParquetImporter implements Serializable { */ @Test public void testSchemaFile() throws Exception { - try (JavaSparkContext jsc = getJavaSparkContext()) { - // Hoodie root folder - Path hoodieFolder = new Path(basePath, "testTarget"); - Path srcFolder = new Path(basePath.toString(), "srcTest"); - Path schemaFile = new Path(basePath.toString(), "missingFile.schema"); - HDFSParquetImporter.Config cfg = getHDFSParquetImporterConfig(srcFolder.toString(), hoodieFolder.toString(), - "testTable", "COPY_ON_WRITE", "_row_key", "timestamp", 1, schemaFile.toString()); - HDFSParquetImporter dataImporter = new HDFSParquetImporter(cfg); - // Should fail - return : -1. - assertEquals(-1, dataImporter.dataImport(jsc, 0)); + // Hoodie root folder + Path hoodieFolder = new Path(basePath, "testTarget"); + Path srcFolder = new Path(basePath.toString(), "srcTest"); + Path schemaFile = new Path(basePath.toString(), "missingFile.schema"); + HDFSParquetImporter.Config cfg = getHDFSParquetImporterConfig(srcFolder.toString(), hoodieFolder.toString(), + "testTable", "COPY_ON_WRITE", "_row_key", "timestamp", 1, schemaFile.toString()); + HDFSParquetImporter dataImporter = new HDFSParquetImporter(cfg); + // Should fail - return : -1. + assertEquals(-1, dataImporter.dataImport(jsc(), 0)); - dfs.create(schemaFile).write("Random invalid schema data".getBytes()); - // Should fail - return : -1. - assertEquals(-1, dataImporter.dataImport(jsc, 0)); - - } + dfs().create(schemaFile).write("Random invalid schema data".getBytes()); + // Should fail - return : -1. + assertEquals(-1, dataImporter.dataImport(jsc(), 0)); } /** @@ -338,27 +297,24 @@ public class TestHDFSParquetImporter implements Serializable { */ @Test public void testRowAndPartitionKey() throws Exception { - try (JavaSparkContext jsc = getJavaSparkContext()) { - // Create schema file. - Path schemaFile = new Path(basePath.toString(), "missingFile.schema"); - createSchemaFile(schemaFile.toString()); + // Create schema file. + Path schemaFile = new Path(basePath.toString(), "missingFile.schema"); + createSchemaFile(schemaFile.toString()); - HDFSParquetImporter dataImporter; - HDFSParquetImporter.Config cfg; + HDFSParquetImporter dataImporter; + HDFSParquetImporter.Config cfg; - // Check for invalid row key. - cfg = getHDFSParquetImporterConfig(srcFolder.toString(), hoodieFolder.toString(), "testTable", "COPY_ON_WRITE", - "invalidRowKey", "timestamp", 1, schemaFile.toString()); - dataImporter = new HDFSParquetImporter(cfg); - assertEquals(-1, dataImporter.dataImport(jsc, 0)); + // Check for invalid row key. + cfg = getHDFSParquetImporterConfig(srcFolder.toString(), hoodieFolder.toString(), "testTable", "COPY_ON_WRITE", + "invalidRowKey", "timestamp", 1, schemaFile.toString()); + dataImporter = new HDFSParquetImporter(cfg); + assertEquals(-1, dataImporter.dataImport(jsc(), 0)); - // Check for invalid partition key. - cfg = getHDFSParquetImporterConfig(srcFolder.toString(), hoodieFolder.toString(), "testTable", "COPY_ON_WRITE", - "_row_key", "invalidTimeStamp", 1, schemaFile.toString()); - dataImporter = new HDFSParquetImporter(cfg); - assertEquals(-1, dataImporter.dataImport(jsc, 0)); - - } + // Check for invalid partition key. + cfg = getHDFSParquetImporterConfig(srcFolder.toString(), hoodieFolder.toString(), "testTable", "COPY_ON_WRITE", + "_row_key", "invalidTimeStamp", 1, schemaFile.toString()); + dataImporter = new HDFSParquetImporter(cfg); + assertEquals(-1, dataImporter.dataImport(jsc(), 0)); } public HDFSParquetImporter.Config getHDFSParquetImporterConfig(String srcPath, String targetPath, String tableName, @@ -375,17 +331,11 @@ public class TestHDFSParquetImporter implements Serializable { return cfg; } - private JavaSparkContext getJavaSparkContext() { - // Initialize a local spark env - SparkConf sparkConf = new SparkConf().setAppName("TestConversionCommand").setMaster("local[1]"); - sparkConf = HoodieWriteClient.registerClasses(sparkConf); - return new JavaSparkContext(HoodieReadClient.addHoodieSupport(sparkConf)); - } - /** * Class used for compare result and expected. */ public static class HoodieTripModel { + double timestamp; String rowKey; String rider; diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieSnapshotCopier.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieSnapshotCopier.java index 2accc4b13..7c47ce9bc 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieSnapshotCopier.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieSnapshotCopier.java @@ -19,18 +19,16 @@ package org.apache.hudi.utilities.functional; import org.apache.hudi.common.fs.FSUtils; -import org.apache.hudi.common.testutils.HoodieCommonTestHarness; import org.apache.hudi.common.testutils.HoodieTestUtils; +import org.apache.hudi.testutils.FunctionalTestHarness; import org.apache.hudi.testutils.HoodieTestDataGenerator; import org.apache.hudi.utilities.HoodieSnapshotCopier; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -import org.apache.spark.SparkConf; -import org.apache.spark.api.java.JavaSparkContext; -import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Tag; import org.junit.jupiter.api.Test; import java.io.File; @@ -40,29 +38,25 @@ import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertTrue; -public class TestHoodieSnapshotCopier extends HoodieCommonTestHarness { +@Tag("functional") +public class TestHoodieSnapshotCopier extends FunctionalTestHarness { private static final String TEST_WRITE_TOKEN = "1-0-1"; - private String rootPath = null; - private String basePath = null; - private String outputPath = null; - private FileSystem fs = null; - private JavaSparkContext jsc = null; + private String basePath; + private String outputPath; + private FileSystem fs; @BeforeEach public void init() throws IOException { // Prepare directories - rootPath = "file://" + tempDir.toString(); + String rootPath = "file://" + tempDir.toString(); basePath = rootPath + "/" + HoodieTestUtils.RAW_TRIPS_TEST_NAME; outputPath = rootPath + "/output"; final Configuration hadoopConf = HoodieTestUtils.getDefaultHadoopConf(); fs = FSUtils.getFs(basePath, hadoopConf); HoodieTestUtils.init(hadoopConf, basePath); - // Start a local Spark job - SparkConf conf = new SparkConf().setAppName("snapshot-test-job").setMaster("local[2]"); - jsc = new JavaSparkContext(conf); } @Test @@ -73,7 +67,7 @@ public class TestHoodieSnapshotCopier extends HoodieCommonTestHarness { // Do the snapshot HoodieSnapshotCopier copier = new HoodieSnapshotCopier(); - copier.snapshot(jsc, basePath, outputPath, true); + copier.snapshot(jsc(), basePath, outputPath, true); // Nothing changed; we just bail out assertEquals(fs.listStatus(new Path(basePath)).length, 1); @@ -126,7 +120,7 @@ public class TestHoodieSnapshotCopier extends HoodieCommonTestHarness { // Do a snapshot copy HoodieSnapshotCopier copier = new HoodieSnapshotCopier(); - copier.snapshot(jsc, basePath, outputPath, false); + copier.snapshot(jsc(), basePath, outputPath, false); // Check results assertTrue(fs.exists(new Path(outputPath + "/2016/05/01/" + file11.getName()))); @@ -147,14 +141,4 @@ public class TestHoodieSnapshotCopier extends HoodieCommonTestHarness { assertTrue(fs.exists(new Path(outputPath + "/_SUCCESS"))); } - - @AfterEach - public void cleanup() { - if (rootPath != null) { - new File(rootPath).delete(); - } - if (jsc != null) { - jsc.stop(); - } - } } diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieSnapshotExporter.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieSnapshotExporter.java index 34116a87e..6cadbc552 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieSnapshotExporter.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieSnapshotExporter.java @@ -26,7 +26,7 @@ import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.config.HoodieIndexConfig; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.index.HoodieIndex.IndexType; -import org.apache.hudi.testutils.HoodieClientTestHarness; +import org.apache.hudi.testutils.FunctionalTestHarness; import org.apache.hudi.testutils.HoodieTestDataGenerator; import org.apache.hudi.utilities.HoodieSnapshotExporter; import org.apache.hudi.utilities.HoodieSnapshotExporter.Config; @@ -34,7 +34,6 @@ import org.apache.hudi.utilities.HoodieSnapshotExporter.OutputFormatValidator; import org.apache.hudi.utilities.HoodieSnapshotExporter.Partitioner; import org.apache.hudi.utilities.exception.HoodieSnapshotExporterException; -import com.beust.jcommander.ParameterException; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.LocatedFileStatus; import org.apache.hadoop.fs.Path; @@ -49,9 +48,9 @@ import org.apache.spark.sql.Row; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Nested; +import org.junit.jupiter.api.Tag; import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; -import org.junit.jupiter.params.provider.NullSource; import org.junit.jupiter.params.provider.ValueSource; import java.io.IOException; @@ -59,12 +58,12 @@ import java.util.Arrays; import java.util.List; import java.util.stream.Collectors; -import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; -public class TestHoodieSnapshotExporter extends HoodieClientTestHarness { +@Tag("functional") +public class TestHoodieSnapshotExporter extends FunctionalTestHarness { static final Logger LOG = LogManager.getLogger(TestHoodieSnapshotExporter.class); static final int NUM_RECORDS = 100; @@ -75,39 +74,35 @@ public class TestHoodieSnapshotExporter extends HoodieClientTestHarness { String targetPath; @BeforeEach - public void setUp() throws Exception { - initSparkContexts(); - initDFS(); - dataGen = new HoodieTestDataGenerator(new String[] {PARTITION_PATH}); - + public void init() throws Exception { // Initialize test data dirs - sourcePath = dfsBasePath + "/source/"; - targetPath = dfsBasePath + "/target/"; - dfs.mkdirs(new Path(sourcePath)); + sourcePath = dfsBasePath() + "/source/"; + targetPath = dfsBasePath() + "/target/"; + dfs().mkdirs(new Path(sourcePath)); HoodieTableMetaClient - .initTableType(jsc.hadoopConfiguration(), sourcePath, HoodieTableType.COPY_ON_WRITE, TABLE_NAME, + .initTableType(jsc().hadoopConfiguration(), sourcePath, HoodieTableType.COPY_ON_WRITE, TABLE_NAME, HoodieAvroPayload.class.getName()); // Prepare data as source Hudi dataset HoodieWriteConfig cfg = getHoodieWriteConfig(sourcePath); - HoodieWriteClient hdfsWriteClient = new HoodieWriteClient(jsc, cfg); + HoodieWriteClient hdfsWriteClient = new HoodieWriteClient(jsc(), cfg); hdfsWriteClient.startCommitWithTime(COMMIT_TIME); + HoodieTestDataGenerator dataGen = new HoodieTestDataGenerator(new String[] {PARTITION_PATH}); List records = dataGen.generateInserts(COMMIT_TIME, NUM_RECORDS); - JavaRDD recordsRDD = jsc.parallelize(records, 1); + JavaRDD recordsRDD = jsc().parallelize(records, 1); hdfsWriteClient.bulkInsert(recordsRDD, COMMIT_TIME); hdfsWriteClient.close(); - RemoteIterator itr = dfs.listFiles(new Path(sourcePath), true); + RemoteIterator itr = dfs().listFiles(new Path(sourcePath), true); while (itr.hasNext()) { LOG.info(">>> Prepared test file: " + itr.next().getPath()); } } @AfterEach - public void tearDown() throws Exception { - cleanupSparkContexts(); - cleanupDFS(); - cleanupTestDataGenerator(); + public void cleanUp() throws IOException { + dfs().delete(new Path(sourcePath), true); + dfs().delete(new Path(targetPath), true); } private HoodieWriteConfig getHoodieWriteConfig(String basePath) { @@ -128,7 +123,7 @@ public class TestHoodieSnapshotExporter extends HoodieClientTestHarness { private HoodieSnapshotExporter.Config cfg; @BeforeEach - public void setUp() throws Exception { + public void setUp() { cfg = new Config(); cfg.sourceBasePath = sourcePath; cfg.targetOutputPath = targetPath; @@ -137,21 +132,21 @@ public class TestHoodieSnapshotExporter extends HoodieClientTestHarness { @Test public void testExportAsHudi() throws IOException { - new HoodieSnapshotExporter().export(jsc, cfg); + new HoodieSnapshotExporter().export(jsc(), cfg); // Check results - assertTrue(dfs.exists(new Path(targetPath + "/.hoodie/" + COMMIT_TIME + ".commit"))); - assertTrue(dfs.exists(new Path(targetPath + "/.hoodie/" + COMMIT_TIME + ".commit.requested"))); - assertTrue(dfs.exists(new Path(targetPath + "/.hoodie/" + COMMIT_TIME + ".inflight"))); - assertTrue(dfs.exists(new Path(targetPath + "/.hoodie/hoodie.properties"))); + assertTrue(dfs().exists(new Path(targetPath + "/.hoodie/" + COMMIT_TIME + ".commit"))); + assertTrue(dfs().exists(new Path(targetPath + "/.hoodie/" + COMMIT_TIME + ".commit.requested"))); + assertTrue(dfs().exists(new Path(targetPath + "/.hoodie/" + COMMIT_TIME + ".inflight"))); + assertTrue(dfs().exists(new Path(targetPath + "/.hoodie/hoodie.properties"))); String partition = targetPath + "/" + PARTITION_PATH; - long numParquetFiles = Arrays.stream(dfs.listStatus(new Path(partition))) + long numParquetFiles = Arrays.stream(dfs().listStatus(new Path(partition))) .filter(fileStatus -> fileStatus.getPath().toString().endsWith(".parquet")) .count(); assertTrue(numParquetFiles >= 1, "There should exist at least 1 parquet file."); - assertEquals(NUM_RECORDS, sqlContext.read().parquet(partition).count()); - assertTrue(dfs.exists(new Path(partition + "/.hoodie_partition_metadata"))); - assertTrue(dfs.exists(new Path(targetPath + "/_SUCCESS"))); + assertEquals(NUM_RECORDS, sqlContext().read().parquet(partition).count()); + assertTrue(dfs().exists(new Path(partition + "/.hoodie_partition_metadata"))); + assertTrue(dfs().exists(new Path(targetPath + "/_SUCCESS"))); } } @@ -161,7 +156,7 @@ public class TestHoodieSnapshotExporter extends HoodieClientTestHarness { private HoodieSnapshotExporter.Config cfg; @BeforeEach - public void setUp() throws Exception { + public void setUp() { cfg = new Config(); cfg.sourceBasePath = sourcePath; cfg.targetOutputPath = targetPath; @@ -171,11 +166,11 @@ public class TestHoodieSnapshotExporter extends HoodieClientTestHarness { @Test public void testExportWhenTargetPathExists() throws IOException { // make target output path present - dfs.mkdirs(new Path(targetPath)); + dfs().mkdirs(new Path(targetPath)); // export final Throwable thrown = assertThrows(HoodieSnapshotExporterException.class, () -> { - new HoodieSnapshotExporter().export(jsc, cfg); + new HoodieSnapshotExporter().export(jsc(), cfg); }); assertEquals("The target output path already exists.", thrown.getMessage()); } @@ -183,17 +178,17 @@ public class TestHoodieSnapshotExporter extends HoodieClientTestHarness { @Test public void testExportDatasetWithNoCommit() throws IOException { // delete commit files - List commitFiles = Arrays.stream(dfs.listStatus(new Path(sourcePath + "/.hoodie"))) + List commitFiles = Arrays.stream(dfs().listStatus(new Path(sourcePath + "/.hoodie"))) .map(FileStatus::getPath) .filter(filePath -> filePath.getName().endsWith(".commit")) .collect(Collectors.toList()); for (Path p : commitFiles) { - dfs.delete(p, false); + dfs().delete(p, false); } // export final Throwable thrown = assertThrows(HoodieSnapshotExporterException.class, () -> { - new HoodieSnapshotExporter().export(jsc, cfg); + new HoodieSnapshotExporter().export(jsc(), cfg); }); assertEquals("No commits present. Nothing to snapshot.", thrown.getMessage()); } @@ -201,11 +196,11 @@ public class TestHoodieSnapshotExporter extends HoodieClientTestHarness { @Test public void testExportDatasetWithNoPartition() throws IOException { // delete all source data - dfs.delete(new Path(sourcePath + "/" + PARTITION_PATH), true); + dfs().delete(new Path(sourcePath + "/" + PARTITION_PATH), true); // export final Throwable thrown = assertThrows(HoodieSnapshotExporterException.class, () -> { - new HoodieSnapshotExporter().export(jsc, cfg); + new HoodieSnapshotExporter().export(jsc(), cfg); }); assertEquals("The source dataset has 0 partition to snapshot.", thrown.getMessage()); } @@ -221,9 +216,9 @@ public class TestHoodieSnapshotExporter extends HoodieClientTestHarness { cfg.sourceBasePath = sourcePath; cfg.targetOutputPath = targetPath; cfg.outputFormat = format; - new HoodieSnapshotExporter().export(jsc, cfg); - assertEquals(NUM_RECORDS, sqlContext.read().format(format).load(targetPath).count()); - assertTrue(dfs.exists(new Path(targetPath + "/_SUCCESS"))); + new HoodieSnapshotExporter().export(jsc(), cfg); + assertEquals(NUM_RECORDS, sqlContext().read().format(format).load(targetPath).count()); + assertTrue(dfs().exists(new Path(targetPath + "/_SUCCESS"))); } } @@ -247,7 +242,7 @@ public class TestHoodieSnapshotExporter extends HoodieClientTestHarness { private HoodieSnapshotExporter.Config cfg; @BeforeEach - public void setUp() throws Exception { + public void setUp() { cfg = new Config(); cfg.sourceBasePath = sourcePath; cfg.targetOutputPath = targetPath; @@ -258,49 +253,21 @@ public class TestHoodieSnapshotExporter extends HoodieClientTestHarness { public void testExportWithPartitionField() throws IOException { // `driver` field is set in HoodieTestDataGenerator cfg.outputPartitionField = "driver"; - new HoodieSnapshotExporter().export(jsc, cfg); + new HoodieSnapshotExporter().export(jsc(), cfg); - assertEquals(NUM_RECORDS, sqlContext.read().format("json").load(targetPath).count()); - assertTrue(dfs.exists(new Path(targetPath + "/_SUCCESS"))); - assertTrue(dfs.listStatus(new Path(targetPath)).length > 1); + assertEquals(NUM_RECORDS, sqlContext().read().format("json").load(targetPath).count()); + assertTrue(dfs().exists(new Path(targetPath + "/_SUCCESS"))); + assertTrue(dfs().listStatus(new Path(targetPath)).length > 1); } @Test public void testExportForUserDefinedPartitioner() throws IOException { cfg.outputPartitioner = UserDefinedPartitioner.class.getName(); - new HoodieSnapshotExporter().export(jsc, cfg); + new HoodieSnapshotExporter().export(jsc(), cfg); - assertEquals(NUM_RECORDS, sqlContext.read().format("json").load(targetPath).count()); - assertTrue(dfs.exists(new Path(targetPath + "/_SUCCESS"))); - assertTrue(dfs.exists(new Path(String.format("%s/%s=%s", targetPath, UserDefinedPartitioner.PARTITION_NAME, PARTITION_PATH)))); - } - } - - @Nested - public class TestHoodieSnapshotExporterInputValidation { - - @ParameterizedTest - @ValueSource(strings = {"json", "parquet", "hudi"}) - public void testValidateOutputFormat_withValidFormat(String format) { - assertDoesNotThrow(() -> { - new OutputFormatValidator().validate(null, format); - }); - } - - @ParameterizedTest - @ValueSource(strings = {"", "JSON"}) - public void testValidateOutputFormat_withInvalidFormat(String format) { - assertThrows(ParameterException.class, () -> { - new OutputFormatValidator().validate(null, format); - }); - } - - @ParameterizedTest - @NullSource - public void testValidateOutputFormat_withNullFormat(String format) { - assertThrows(ParameterException.class, () -> { - new OutputFormatValidator().validate(null, format); - }); + assertEquals(NUM_RECORDS, sqlContext().read().format("json").load(targetPath).count()); + assertTrue(dfs().exists(new Path(targetPath + "/_SUCCESS"))); + assertTrue(dfs().exists(new Path(String.format("%s/%s=%s", targetPath, UserDefinedPartitioner.PARTITION_NAME, PARTITION_PATH)))); } } } diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestJdbcbasedSchemaProvider.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestJdbcbasedSchemaProvider.java index f50fafa89..fde26f537 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestJdbcbasedSchemaProvider.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestJdbcbasedSchemaProvider.java @@ -20,6 +20,7 @@ package org.apache.hudi.utilities.functional; import org.apache.hudi.common.config.TypedProperties; import org.apache.hudi.exception.HoodieException; +import org.apache.hudi.testutils.FunctionalTestHarness; import org.apache.hudi.utilities.UtilHelpers; import org.apache.hudi.utilities.schema.JdbcbasedSchemaProvider; import org.apache.hudi.utilities.testutils.UtilitiesTestBase; @@ -27,9 +28,8 @@ import org.apache.hudi.utilities.testutils.UtilitiesTestBase; import org.apache.avro.Schema; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; -import org.apache.spark.api.java.JavaSparkContext; -import org.junit.jupiter.api.AfterEach; -import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Tag; import org.junit.jupiter.api.Test; import java.io.IOException; @@ -40,15 +40,14 @@ import java.sql.SQLException; import static org.junit.jupiter.api.Assertions.assertEquals; -public class TestJdbcbasedSchemaProvider { +@Tag("functional") +public class TestJdbcbasedSchemaProvider extends FunctionalTestHarness { private static final Logger LOG = LogManager.getLogger(TestJdbcbasedSchemaProvider.class); private static final TypedProperties PROPS = new TypedProperties(); - protected transient JavaSparkContext jsc = null; - @BeforeEach - public void init() { - jsc = UtilHelpers.buildSparkContext(this.getClass().getName() + "-hoodie", "local[2]"); + @BeforeAll + public static void init() { PROPS.setProperty("hoodie.deltastreamer.schemaprovider.source.schema.jdbc.connection.url", "jdbc:h2:mem:test_mem"); PROPS.setProperty("hoodie.deltastreamer.schemaprovider.source.schema.jdbc.driver.type", "org.h2.Driver"); PROPS.setProperty("hoodie.deltastreamer.schemaprovider.source.schema.jdbc.username", "sa"); @@ -58,18 +57,11 @@ public class TestJdbcbasedSchemaProvider { PROPS.setProperty("hoodie.deltastreamer.schemaprovider.source.schema.jdbc.nullable", "false"); } - @AfterEach - public void teardown() throws Exception { - if (jsc != null) { - jsc.stop(); - } - } - @Test public void testJdbcbasedSchemaProvider() throws Exception { try { initH2Database(); - Schema sourceSchema = UtilHelpers.createSchemaProvider(JdbcbasedSchemaProvider.class.getName(), PROPS, jsc).getSourceSchema(); + Schema sourceSchema = UtilHelpers.createSchemaProvider(JdbcbasedSchemaProvider.class.getName(), PROPS, jsc()).getSourceSchema(); assertEquals(sourceSchema.toString().toUpperCase(), new Schema.Parser().parse(UtilitiesTestBase.Helpers.readFile("delta-streamer-config/source-jdbc.avsc")).toString().toUpperCase()); } catch (HoodieException e) { LOG.error("Failed to get connection through jdbc. ", e); diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/UtilitiesFunctionalTestSuite.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/UtilitiesFunctionalTestSuite.java new file mode 100644 index 000000000..98bba5b4e --- /dev/null +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/UtilitiesFunctionalTestSuite.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hudi.utilities.functional; + +import org.junit.platform.runner.JUnitPlatform; +import org.junit.platform.suite.api.IncludeTags; +import org.junit.platform.suite.api.SelectPackages; +import org.junit.runner.RunWith; + +@RunWith(JUnitPlatform.class) +@SelectPackages("org.apache.hudi.utilities.functional") +@IncludeTags("functional") +public class UtilitiesFunctionalTestSuite { + +} diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/transform/TestChainedTransformer.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/transform/TestChainedTransformer.java index 1a7aa0ea8..efc73653e 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/transform/TestChainedTransformer.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/transform/TestChainedTransformer.java @@ -19,67 +19,15 @@ package org.apache.hudi.utilities.transform; -import org.apache.hudi.utilities.UtilHelpers; - -import org.apache.spark.api.java.JavaSparkContext; -import org.apache.spark.sql.Dataset; -import org.apache.spark.sql.Row; -import org.apache.spark.sql.RowFactory; -import org.apache.spark.sql.SparkSession; -import org.apache.spark.sql.types.DataTypes; -import org.apache.spark.sql.types.StructField; -import org.apache.spark.sql.types.StructType; -import org.junit.jupiter.api.AfterEach; -import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import java.util.Arrays; import java.util.List; import static org.apache.spark.sql.types.DataTypes.IntegerType; -import static org.apache.spark.sql.types.DataTypes.StringType; -import static org.apache.spark.sql.types.DataTypes.createStructField; -import static org.junit.jupiter.api.Assertions.assertArrayEquals; import static org.junit.jupiter.api.Assertions.assertEquals; public class TestChainedTransformer { - - private JavaSparkContext jsc; - private SparkSession sparkSession; - - @BeforeEach - public void setUp() { - jsc = UtilHelpers.buildSparkContext(this.getClass().getName() + "-hoodie", "local[2]"); - sparkSession = SparkSession.builder().config(jsc.getConf()).getOrCreate(); - } - - @AfterEach - public void tearDown() { - jsc.stop(); - } - - @Test - public void testChainedTransformation() { - StructType schema = DataTypes.createStructType( - new StructField[] { - createStructField("foo", StringType, false) - }); - Row r1 = RowFactory.create("100"); - Row r2 = RowFactory.create("200"); - Dataset original = sparkSession.sqlContext().createDataFrame(Arrays.asList(r1, r2), schema); - - Transformer t1 = (jsc, sparkSession, dataset, properties) -> dataset.withColumnRenamed("foo", "bar"); - Transformer t2 = (jsc, sparkSession, dataset, properties) -> dataset.withColumn("bar", dataset.col("bar").cast(IntegerType)); - ChainedTransformer transformer = new ChainedTransformer(Arrays.asList(t1, t2)); - Dataset transformed = transformer.apply(jsc, sparkSession, original, null); - - assertEquals(2, transformed.count()); - assertArrayEquals(new String[] {"bar"}, transformed.columns()); - List rows = transformed.collectAsList(); - assertEquals(100, rows.get(0).getInt(0)); - assertEquals(200, rows.get(1).getInt(0)); - } - @Test public void testGetTransformersNames() { Transformer t1 = (jsc, sparkSession, dataset, properties) -> dataset.withColumnRenamed("foo", "bar"); diff --git a/pom.xml b/pom.xml index d51e04abd..05907f348 100644 --- a/pom.xml +++ b/pom.xml @@ -84,8 +84,9 @@ 2.0.0 2.17 1.10.1 - 5.6.1 - 5.6.1 + 5.7.0-M1 + 5.7.0-M1 + 1.7.0-M1 3.3.3 1.2.17 1.7.5 @@ -833,6 +834,27 @@ ${mockito.jupiter.version} + + org.junit.platform + junit-platform-runner + ${junit.platform.version} + test + + + + org.junit.platform + junit-platform-suite-api + ${junit.platform.version} + test + + + + org.junit.platform + junit-platform-commons + ${junit.platform.version} + test + + com.esotericsoftware diff --git a/style/checkstyle.xml b/style/checkstyle.xml index 9e03882d1..86575c91e 100644 --- a/style/checkstyle.xml +++ b/style/checkstyle.xml @@ -267,7 +267,7 @@ + value="^java\.util\.Optional, ^org\.junit\.(?!jupiter|platform|contrib|Rule|runner)(.*)"/>