diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHBaseIndex.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHBaseIndex.java index efaad254f..3ad777475 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHBaseIndex.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHBaseIndex.java @@ -39,7 +39,7 @@ import org.apache.hudi.index.HoodieIndex; import org.apache.hudi.index.hbase.SparkHoodieHBaseIndex; import org.apache.hudi.table.HoodieSparkTable; import org.apache.hudi.table.HoodieTable; -import org.apache.hudi.testutils.FunctionalTestHarness; +import org.apache.hudi.testutils.SparkClientFunctionalTestHarness; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; @@ -54,18 +54,22 @@ import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.util.Bytes; import org.apache.spark.api.java.JavaRDD; import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.MethodOrderer; import org.junit.jupiter.api.Tag; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.TestMethodOrder; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.EnumSource; +import java.io.IOException; import java.util.Arrays; +import java.util.HashMap; import java.util.LinkedList; import java.util.List; import java.util.Map; -import java.util.HashMap; import java.util.UUID; import java.util.stream.Collectors; @@ -88,7 +92,7 @@ import static org.mockito.Mockito.when; */ @TestMethodOrder(MethodOrderer.Alphanumeric.class) @Tag("functional") -public class TestHBaseIndex extends FunctionalTestHarness { +public class TestHBaseIndex extends SparkClientFunctionalTestHarness { private static final String TABLE_NAME = "test_table"; private static HBaseTestingUtility utility; @@ -97,17 +101,8 @@ public class TestHBaseIndex extends FunctionalTestHarness { private Configuration hadoopConf; private HoodieTestDataGenerator dataGen; private HoodieTableMetaClient metaClient; - - @AfterAll - public static void clean() throws Exception { - if (utility != null) { - utility.deleteTable(TABLE_NAME); - utility.shutdownMiniCluster(); - } - if (spark != null) { - spark.close(); - } - } + private HoodieSparkEngineContext context; + private String basePath; @BeforeAll public static void init() throws Exception { @@ -121,27 +116,31 @@ public class TestHBaseIndex extends FunctionalTestHarness { utility.createTable(TableName.valueOf(TABLE_NAME), Bytes.toBytes("_s"),2); } + @AfterAll + public static void clean() throws Exception { + utility.shutdownMiniCluster(); + } + @BeforeEach public void setUp() throws Exception { hadoopConf = jsc().hadoopConfiguration(); hadoopConf.addResource(utility.getConfiguration()); // reInit the context here to keep the hadoopConf the same with that in this class context = new HoodieSparkEngineContext(jsc()); - metaClient = getHoodieMetaClient(hadoopConf, basePath()); + basePath = utility.getDataTestDirOnTestFS(TABLE_NAME).toString(); + metaClient = getHoodieMetaClient(hadoopConf, basePath); dataGen = new HoodieTestDataGenerator(); } - @Test - public void testSimpleTagLocationAndUpdateCOW() throws Exception { - testSimpleTagLocationAndUpdate(HoodieTableType.COPY_ON_WRITE); - } - - @Test void testSimpleTagLocationAndUpdateMOR() throws Exception { - testSimpleTagLocationAndUpdate(HoodieTableType.MERGE_ON_READ); + @AfterEach + public void cleanUpTableData() throws IOException { + utility.cleanupDataTestDirOnTestFS(TABLE_NAME); } + @ParameterizedTest + @EnumSource(HoodieTableType.class) public void testSimpleTagLocationAndUpdate(HoodieTableType tableType) throws Exception { - metaClient = HoodieTestUtils.init(hadoopConf, basePath(), tableType); + metaClient = HoodieTestUtils.init(hadoopConf, basePath, tableType); final String newCommitTime = "001"; final int numRecords = 10; @@ -799,7 +798,7 @@ public class TestHBaseIndex extends FunctionalTestHarness { } private HoodieWriteConfig.Builder getConfigBuilder(int hbaseIndexBatchSize, boolean updatePartitionPath, boolean rollbackSync) { - return HoodieWriteConfig.newBuilder().withPath(basePath()).withSchema(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA) + return HoodieWriteConfig.newBuilder().withPath(basePath).withSchema(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA) .withParallelism(1, 1).withDeleteParallelism(1) .withCompactionConfig(HoodieCompactionConfig.newBuilder().compactionSmallFileSize(1024 * 1024) .withInlineCompaction(false).build()) @@ -817,4 +816,4 @@ public class TestHBaseIndex extends FunctionalTestHarness { .hbaseIndexGetBatchSize(hbaseIndexBatchSize).build()) .build()); } -} \ No newline at end of file +} diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/FunctionalTestHarness.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/FunctionalTestHarness.java index cbf0a2213..11c615a76 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/FunctionalTestHarness.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/FunctionalTestHarness.java @@ -53,6 +53,9 @@ import java.util.Properties; import static org.apache.hudi.common.model.HoodieTableType.COPY_ON_WRITE; import static org.apache.hudi.common.testutils.HoodieTestUtils.RAW_TRIPS_TEST_NAME; +/** + * @deprecated Deprecated. Use {@link SparkClientFunctionalTestHarness} instead. + */ public class FunctionalTestHarness implements SparkProvider, DFSProvider, HoodieMetaClientProvider, HoodieWriteClientProvider { protected static transient SparkSession spark; diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieSnapshotCopier.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieSnapshotCopier.java index 20e19cf08..860e0ade7 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieSnapshotCopier.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieSnapshotCopier.java @@ -54,6 +54,8 @@ import scala.Tuple2; /** * Hoodie snapshot copy job which copies latest files from all partitions to another place, for snapshot backup. + * + * @deprecated Use {@link HoodieSnapshotExporter} instead. */ public class HoodieSnapshotCopier implements Serializable { diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieSnapshotExporter.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieSnapshotExporter.java index 42ab6ca55..85f3d2d18 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieSnapshotExporter.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieSnapshotExporter.java @@ -68,8 +68,6 @@ import scala.collection.JavaConversions; /** * Export the latest records of Hudi dataset to a set of external files (e.g., plain parquet files). - * - * @experimental This export is an experimental tool. If you want to export hudi to hudi, please use HoodieSnapshotCopier. */ public class HoodieSnapshotExporter { diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestAWSDatabaseMigrationServiceSource.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestAWSDatabaseMigrationServiceSource.java index 30efac4a6..cd6b89b36 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestAWSDatabaseMigrationServiceSource.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestAWSDatabaseMigrationServiceSource.java @@ -19,7 +19,7 @@ package org.apache.hudi.utilities.functional; import org.apache.hudi.payload.AWSDmsAvroPayload; -import org.apache.hudi.testutils.FunctionalTestHarness; +import org.apache.hudi.testutils.SparkClientFunctionalTestHarness; import org.apache.hudi.utilities.transform.AWSDmsTransformer; import org.apache.avro.Schema; @@ -38,7 +38,7 @@ import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertTrue; @Tag("functional") -public class TestAWSDatabaseMigrationServiceSource extends FunctionalTestHarness { +public class TestAWSDatabaseMigrationServiceSource extends SparkClientFunctionalTestHarness { @Test public void testPayload() throws IOException { diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestChainedTransformer.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestChainedTransformer.java index 095b12d34..b493d436e 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestChainedTransformer.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestChainedTransformer.java @@ -19,7 +19,7 @@ package org.apache.hudi.utilities.functional; -import org.apache.hudi.testutils.FunctionalTestHarness; +import org.apache.hudi.testutils.SparkClientFunctionalTestHarness; import org.apache.hudi.utilities.transform.ChainedTransformer; import org.apache.hudi.utilities.transform.Transformer; @@ -42,7 +42,7 @@ import static org.junit.jupiter.api.Assertions.assertArrayEquals; import static org.junit.jupiter.api.Assertions.assertEquals; @Tag("functional") -public class TestChainedTransformer extends FunctionalTestHarness { +public class TestChainedTransformer extends SparkClientFunctionalTestHarness { @Test public void testChainedTransformation() { diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieSnapshotCopier.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieSnapshotCopier.java index f68f8c6db..15f702a8d 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieSnapshotCopier.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieSnapshotCopier.java @@ -29,6 +29,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Tag; import org.junit.jupiter.api.Test; @@ -39,6 +40,7 @@ import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertTrue; +@Disabled("Disable due to flakiness and feature deprecation.") @Tag("functional") public class TestHoodieSnapshotCopier extends FunctionalTestHarness { diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieSnapshotExporter.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieSnapshotExporter.java index 133205cb8..c977b79cb 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieSnapshotExporter.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieSnapshotExporter.java @@ -19,6 +19,7 @@ package org.apache.hudi.utilities.functional; import org.apache.hudi.client.SparkRDDWriteClient; +import org.apache.hudi.common.fs.FSUtils; import org.apache.hudi.common.model.HoodieAvroPayload; import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieTableType; @@ -27,7 +28,7 @@ import org.apache.hudi.common.testutils.HoodieTestDataGenerator; import org.apache.hudi.config.HoodieIndexConfig; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.index.HoodieIndex.IndexType; -import org.apache.hudi.testutils.FunctionalTestHarness; +import org.apache.hudi.testutils.SparkClientFunctionalTestHarness; import org.apache.hudi.utilities.HoodieSnapshotExporter; import org.apache.hudi.utilities.HoodieSnapshotExporter.Config; import org.apache.hudi.utilities.HoodieSnapshotExporter.OutputFormatValidator; @@ -35,6 +36,7 @@ import org.apache.hudi.utilities.HoodieSnapshotExporter.Partitioner; import org.apache.hudi.utilities.exception.HoodieSnapshotExporterException; import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.LocalFileSystem; import org.apache.hadoop.fs.LocatedFileStatus; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.RemoteIterator; @@ -54,6 +56,7 @@ import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.ValueSource; import java.io.IOException; +import java.nio.file.Paths; import java.util.Arrays; import java.util.List; import java.util.stream.Collectors; @@ -63,7 +66,7 @@ import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; @Tag("functional") -public class TestHoodieSnapshotExporter extends FunctionalTestHarness { +public class TestHoodieSnapshotExporter extends SparkClientFunctionalTestHarness { static final Logger LOG = LogManager.getLogger(TestHoodieSnapshotExporter.class); static final int NUM_RECORDS = 100; @@ -72,13 +75,14 @@ public class TestHoodieSnapshotExporter extends FunctionalTestHarness { static final String TABLE_NAME = "testing"; String sourcePath; String targetPath; + LocalFileSystem lfs; @BeforeEach public void init() throws Exception { // Initialize test data dirs - sourcePath = dfsBasePath() + "/source/"; - targetPath = dfsBasePath() + "/target/"; - dfs().mkdirs(new Path(sourcePath)); + sourcePath = Paths.get(basePath(), "source").toString(); + targetPath = Paths.get(basePath(), "target").toString(); + lfs = (LocalFileSystem) FSUtils.getFs(basePath(), jsc().hadoopConfiguration()); HoodieTableMetaClient.withPropertyBuilder() .setTableType(HoodieTableType.COPY_ON_WRITE) @@ -88,14 +92,14 @@ public class TestHoodieSnapshotExporter extends FunctionalTestHarness { // Prepare data as source Hudi dataset HoodieWriteConfig cfg = getHoodieWriteConfig(sourcePath); - SparkRDDWriteClient hdfsWriteClient = new SparkRDDWriteClient(context(), cfg); - hdfsWriteClient.startCommitWithTime(COMMIT_TIME); + SparkRDDWriteClient writeClient = getHoodieWriteClient(cfg); + writeClient.startCommitWithTime(COMMIT_TIME); HoodieTestDataGenerator dataGen = new HoodieTestDataGenerator(new String[] {PARTITION_PATH}); List records = dataGen.generateInserts(COMMIT_TIME, NUM_RECORDS); JavaRDD recordsRDD = jsc().parallelize(records, 1); - hdfsWriteClient.bulkInsert(recordsRDD, COMMIT_TIME); - hdfsWriteClient.close(); - RemoteIterator itr = dfs().listFiles(new Path(sourcePath), true); + writeClient.bulkInsert(recordsRDD, COMMIT_TIME); + writeClient.close(); + RemoteIterator itr = lfs.listFiles(new Path(sourcePath), true); while (itr.hasNext()) { LOG.info(">>> Prepared test file: " + itr.next().getPath()); } @@ -103,8 +107,7 @@ public class TestHoodieSnapshotExporter extends FunctionalTestHarness { @AfterEach public void cleanUp() throws IOException { - dfs().delete(new Path(sourcePath), true); - dfs().delete(new Path(targetPath), true); + lfs.close(); } private HoodieWriteConfig getHoodieWriteConfig(String basePath) { @@ -138,18 +141,18 @@ public class TestHoodieSnapshotExporter extends FunctionalTestHarness { new HoodieSnapshotExporter().export(jsc(), cfg); // Check results - assertTrue(dfs().exists(new Path(targetPath + "/.hoodie/" + COMMIT_TIME + ".commit"))); - assertTrue(dfs().exists(new Path(targetPath + "/.hoodie/" + COMMIT_TIME + ".commit.requested"))); - assertTrue(dfs().exists(new Path(targetPath + "/.hoodie/" + COMMIT_TIME + ".inflight"))); - assertTrue(dfs().exists(new Path(targetPath + "/.hoodie/hoodie.properties"))); + assertTrue(lfs.exists(new Path(targetPath + "/.hoodie/" + COMMIT_TIME + ".commit"))); + assertTrue(lfs.exists(new Path(targetPath + "/.hoodie/" + COMMIT_TIME + ".commit.requested"))); + assertTrue(lfs.exists(new Path(targetPath + "/.hoodie/" + COMMIT_TIME + ".inflight"))); + assertTrue(lfs.exists(new Path(targetPath + "/.hoodie/hoodie.properties"))); String partition = targetPath + "/" + PARTITION_PATH; - long numParquetFiles = Arrays.stream(dfs().listStatus(new Path(partition))) + long numParquetFiles = Arrays.stream(lfs.listStatus(new Path(partition))) .filter(fileStatus -> fileStatus.getPath().toString().endsWith(".parquet")) .count(); assertTrue(numParquetFiles >= 1, "There should exist at least 1 parquet file."); assertEquals(NUM_RECORDS, sqlContext().read().parquet(partition).count()); - assertTrue(dfs().exists(new Path(partition + "/.hoodie_partition_metadata"))); - assertTrue(dfs().exists(new Path(targetPath + "/_SUCCESS"))); + assertTrue(lfs.exists(new Path(partition + "/.hoodie_partition_metadata"))); + assertTrue(lfs.exists(new Path(targetPath + "/_SUCCESS"))); } } @@ -169,7 +172,7 @@ public class TestHoodieSnapshotExporter extends FunctionalTestHarness { @Test public void testExportWhenTargetPathExists() throws IOException { // make target output path present - dfs().mkdirs(new Path(targetPath)); + lfs.mkdirs(new Path(targetPath)); // export final Throwable thrown = assertThrows(HoodieSnapshotExporterException.class, () -> { @@ -181,12 +184,12 @@ public class TestHoodieSnapshotExporter extends FunctionalTestHarness { @Test public void testExportDatasetWithNoCommit() throws IOException { // delete commit files - List commitFiles = Arrays.stream(dfs().listStatus(new Path(sourcePath + "/.hoodie"))) + List commitFiles = Arrays.stream(lfs.listStatus(new Path(sourcePath + "/.hoodie"))) .map(FileStatus::getPath) .filter(filePath -> filePath.getName().endsWith(".commit")) .collect(Collectors.toList()); for (Path p : commitFiles) { - dfs().delete(p, false); + lfs.delete(p, false); } // export @@ -199,7 +202,7 @@ public class TestHoodieSnapshotExporter extends FunctionalTestHarness { @Test public void testExportDatasetWithNoPartition() throws IOException { // delete all source data - dfs().delete(new Path(sourcePath + "/" + PARTITION_PATH), true); + lfs.delete(new Path(sourcePath + "/" + PARTITION_PATH), true); // export final Throwable thrown = assertThrows(HoodieSnapshotExporterException.class, () -> { @@ -221,7 +224,7 @@ public class TestHoodieSnapshotExporter extends FunctionalTestHarness { cfg.outputFormat = format; new HoodieSnapshotExporter().export(jsc(), cfg); assertEquals(NUM_RECORDS, sqlContext().read().format(format).load(targetPath).count()); - assertTrue(dfs().exists(new Path(targetPath + "/_SUCCESS"))); + assertTrue(lfs.exists(new Path(targetPath + "/_SUCCESS"))); } } @@ -259,8 +262,8 @@ public class TestHoodieSnapshotExporter extends FunctionalTestHarness { new HoodieSnapshotExporter().export(jsc(), cfg); assertEquals(NUM_RECORDS, sqlContext().read().format("json").load(targetPath).count()); - assertTrue(dfs().exists(new Path(targetPath + "/_SUCCESS"))); - assertTrue(dfs().listStatus(new Path(targetPath)).length > 1); + assertTrue(lfs.exists(new Path(targetPath + "/_SUCCESS"))); + assertTrue(lfs.listStatus(new Path(targetPath)).length > 1); } @Test @@ -269,8 +272,8 @@ public class TestHoodieSnapshotExporter extends FunctionalTestHarness { new HoodieSnapshotExporter().export(jsc(), cfg); assertEquals(NUM_RECORDS, sqlContext().read().format("json").load(targetPath).count()); - assertTrue(dfs().exists(new Path(targetPath + "/_SUCCESS"))); - assertTrue(dfs().exists(new Path(String.format("%s/%s=%s", targetPath, UserDefinedPartitioner.PARTITION_NAME, PARTITION_PATH)))); + assertTrue(lfs.exists(new Path(targetPath + "/_SUCCESS"))); + assertTrue(lfs.exists(new Path(String.format("%s/%s=%s", targetPath, UserDefinedPartitioner.PARTITION_NAME, PARTITION_PATH)))); } } } diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestJdbcbasedSchemaProvider.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestJdbcbasedSchemaProvider.java index fde26f537..7dd8af689 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestJdbcbasedSchemaProvider.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestJdbcbasedSchemaProvider.java @@ -20,7 +20,7 @@ package org.apache.hudi.utilities.functional; import org.apache.hudi.common.config.TypedProperties; import org.apache.hudi.exception.HoodieException; -import org.apache.hudi.testutils.FunctionalTestHarness; +import org.apache.hudi.testutils.SparkClientFunctionalTestHarness; import org.apache.hudi.utilities.UtilHelpers; import org.apache.hudi.utilities.schema.JdbcbasedSchemaProvider; import org.apache.hudi.utilities.testutils.UtilitiesTestBase; @@ -41,7 +41,7 @@ import java.sql.SQLException; import static org.junit.jupiter.api.Assertions.assertEquals; @Tag("functional") -public class TestJdbcbasedSchemaProvider extends FunctionalTestHarness { +public class TestJdbcbasedSchemaProvider extends SparkClientFunctionalTestHarness { private static final Logger LOG = LogManager.getLogger(TestJdbcbasedSchemaProvider.class); private static final TypedProperties PROPS = new TypedProperties();