[HUDI-2408] Deprecate FunctionalTestHarness to avoid init DFS (#3628)
This commit is contained in:
@@ -39,7 +39,7 @@ import org.apache.hudi.index.HoodieIndex;
|
|||||||
import org.apache.hudi.index.hbase.SparkHoodieHBaseIndex;
|
import org.apache.hudi.index.hbase.SparkHoodieHBaseIndex;
|
||||||
import org.apache.hudi.table.HoodieSparkTable;
|
import org.apache.hudi.table.HoodieSparkTable;
|
||||||
import org.apache.hudi.table.HoodieTable;
|
import org.apache.hudi.table.HoodieTable;
|
||||||
import org.apache.hudi.testutils.FunctionalTestHarness;
|
import org.apache.hudi.testutils.SparkClientFunctionalTestHarness;
|
||||||
|
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
@@ -54,18 +54,22 @@ import org.apache.hadoop.hbase.client.Result;
|
|||||||
import org.apache.hadoop.hbase.util.Bytes;
|
import org.apache.hadoop.hbase.util.Bytes;
|
||||||
import org.apache.spark.api.java.JavaRDD;
|
import org.apache.spark.api.java.JavaRDD;
|
||||||
import org.junit.jupiter.api.AfterAll;
|
import org.junit.jupiter.api.AfterAll;
|
||||||
|
import org.junit.jupiter.api.AfterEach;
|
||||||
import org.junit.jupiter.api.BeforeAll;
|
import org.junit.jupiter.api.BeforeAll;
|
||||||
import org.junit.jupiter.api.BeforeEach;
|
import org.junit.jupiter.api.BeforeEach;
|
||||||
import org.junit.jupiter.api.MethodOrderer;
|
import org.junit.jupiter.api.MethodOrderer;
|
||||||
import org.junit.jupiter.api.Tag;
|
import org.junit.jupiter.api.Tag;
|
||||||
import org.junit.jupiter.api.Test;
|
import org.junit.jupiter.api.Test;
|
||||||
import org.junit.jupiter.api.TestMethodOrder;
|
import org.junit.jupiter.api.TestMethodOrder;
|
||||||
|
import org.junit.jupiter.params.ParameterizedTest;
|
||||||
|
import org.junit.jupiter.params.provider.EnumSource;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
|
import java.util.HashMap;
|
||||||
import java.util.LinkedList;
|
import java.util.LinkedList;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.HashMap;
|
|
||||||
import java.util.UUID;
|
import java.util.UUID;
|
||||||
import java.util.stream.Collectors;
|
import java.util.stream.Collectors;
|
||||||
|
|
||||||
@@ -88,7 +92,7 @@ import static org.mockito.Mockito.when;
|
|||||||
*/
|
*/
|
||||||
@TestMethodOrder(MethodOrderer.Alphanumeric.class)
|
@TestMethodOrder(MethodOrderer.Alphanumeric.class)
|
||||||
@Tag("functional")
|
@Tag("functional")
|
||||||
public class TestHBaseIndex extends FunctionalTestHarness {
|
public class TestHBaseIndex extends SparkClientFunctionalTestHarness {
|
||||||
|
|
||||||
private static final String TABLE_NAME = "test_table";
|
private static final String TABLE_NAME = "test_table";
|
||||||
private static HBaseTestingUtility utility;
|
private static HBaseTestingUtility utility;
|
||||||
@@ -97,17 +101,8 @@ public class TestHBaseIndex extends FunctionalTestHarness {
|
|||||||
private Configuration hadoopConf;
|
private Configuration hadoopConf;
|
||||||
private HoodieTestDataGenerator dataGen;
|
private HoodieTestDataGenerator dataGen;
|
||||||
private HoodieTableMetaClient metaClient;
|
private HoodieTableMetaClient metaClient;
|
||||||
|
private HoodieSparkEngineContext context;
|
||||||
@AfterAll
|
private String basePath;
|
||||||
public static void clean() throws Exception {
|
|
||||||
if (utility != null) {
|
|
||||||
utility.deleteTable(TABLE_NAME);
|
|
||||||
utility.shutdownMiniCluster();
|
|
||||||
}
|
|
||||||
if (spark != null) {
|
|
||||||
spark.close();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
@BeforeAll
|
@BeforeAll
|
||||||
public static void init() throws Exception {
|
public static void init() throws Exception {
|
||||||
@@ -121,27 +116,31 @@ public class TestHBaseIndex extends FunctionalTestHarness {
|
|||||||
utility.createTable(TableName.valueOf(TABLE_NAME), Bytes.toBytes("_s"),2);
|
utility.createTable(TableName.valueOf(TABLE_NAME), Bytes.toBytes("_s"),2);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@AfterAll
|
||||||
|
public static void clean() throws Exception {
|
||||||
|
utility.shutdownMiniCluster();
|
||||||
|
}
|
||||||
|
|
||||||
@BeforeEach
|
@BeforeEach
|
||||||
public void setUp() throws Exception {
|
public void setUp() throws Exception {
|
||||||
hadoopConf = jsc().hadoopConfiguration();
|
hadoopConf = jsc().hadoopConfiguration();
|
||||||
hadoopConf.addResource(utility.getConfiguration());
|
hadoopConf.addResource(utility.getConfiguration());
|
||||||
// reInit the context here to keep the hadoopConf the same with that in this class
|
// reInit the context here to keep the hadoopConf the same with that in this class
|
||||||
context = new HoodieSparkEngineContext(jsc());
|
context = new HoodieSparkEngineContext(jsc());
|
||||||
metaClient = getHoodieMetaClient(hadoopConf, basePath());
|
basePath = utility.getDataTestDirOnTestFS(TABLE_NAME).toString();
|
||||||
|
metaClient = getHoodieMetaClient(hadoopConf, basePath);
|
||||||
dataGen = new HoodieTestDataGenerator();
|
dataGen = new HoodieTestDataGenerator();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@AfterEach
|
||||||
public void testSimpleTagLocationAndUpdateCOW() throws Exception {
|
public void cleanUpTableData() throws IOException {
|
||||||
testSimpleTagLocationAndUpdate(HoodieTableType.COPY_ON_WRITE);
|
utility.cleanupDataTestDirOnTestFS(TABLE_NAME);
|
||||||
}
|
|
||||||
|
|
||||||
@Test void testSimpleTagLocationAndUpdateMOR() throws Exception {
|
|
||||||
testSimpleTagLocationAndUpdate(HoodieTableType.MERGE_ON_READ);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ParameterizedTest
|
||||||
|
@EnumSource(HoodieTableType.class)
|
||||||
public void testSimpleTagLocationAndUpdate(HoodieTableType tableType) throws Exception {
|
public void testSimpleTagLocationAndUpdate(HoodieTableType tableType) throws Exception {
|
||||||
metaClient = HoodieTestUtils.init(hadoopConf, basePath(), tableType);
|
metaClient = HoodieTestUtils.init(hadoopConf, basePath, tableType);
|
||||||
|
|
||||||
final String newCommitTime = "001";
|
final String newCommitTime = "001";
|
||||||
final int numRecords = 10;
|
final int numRecords = 10;
|
||||||
@@ -799,7 +798,7 @@ public class TestHBaseIndex extends FunctionalTestHarness {
|
|||||||
}
|
}
|
||||||
|
|
||||||
private HoodieWriteConfig.Builder getConfigBuilder(int hbaseIndexBatchSize, boolean updatePartitionPath, boolean rollbackSync) {
|
private HoodieWriteConfig.Builder getConfigBuilder(int hbaseIndexBatchSize, boolean updatePartitionPath, boolean rollbackSync) {
|
||||||
return HoodieWriteConfig.newBuilder().withPath(basePath()).withSchema(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA)
|
return HoodieWriteConfig.newBuilder().withPath(basePath).withSchema(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA)
|
||||||
.withParallelism(1, 1).withDeleteParallelism(1)
|
.withParallelism(1, 1).withDeleteParallelism(1)
|
||||||
.withCompactionConfig(HoodieCompactionConfig.newBuilder().compactionSmallFileSize(1024 * 1024)
|
.withCompactionConfig(HoodieCompactionConfig.newBuilder().compactionSmallFileSize(1024 * 1024)
|
||||||
.withInlineCompaction(false).build())
|
.withInlineCompaction(false).build())
|
||||||
|
|||||||
@@ -53,6 +53,9 @@ import java.util.Properties;
|
|||||||
import static org.apache.hudi.common.model.HoodieTableType.COPY_ON_WRITE;
|
import static org.apache.hudi.common.model.HoodieTableType.COPY_ON_WRITE;
|
||||||
import static org.apache.hudi.common.testutils.HoodieTestUtils.RAW_TRIPS_TEST_NAME;
|
import static org.apache.hudi.common.testutils.HoodieTestUtils.RAW_TRIPS_TEST_NAME;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @deprecated Deprecated. Use {@link SparkClientFunctionalTestHarness} instead.
|
||||||
|
*/
|
||||||
public class FunctionalTestHarness implements SparkProvider, DFSProvider, HoodieMetaClientProvider, HoodieWriteClientProvider {
|
public class FunctionalTestHarness implements SparkProvider, DFSProvider, HoodieMetaClientProvider, HoodieWriteClientProvider {
|
||||||
|
|
||||||
protected static transient SparkSession spark;
|
protected static transient SparkSession spark;
|
||||||
|
|||||||
@@ -54,6 +54,8 @@ import scala.Tuple2;
|
|||||||
|
|
||||||
/**
|
/**
|
||||||
* Hoodie snapshot copy job which copies latest files from all partitions to another place, for snapshot backup.
|
* Hoodie snapshot copy job which copies latest files from all partitions to another place, for snapshot backup.
|
||||||
|
*
|
||||||
|
* @deprecated Use {@link HoodieSnapshotExporter} instead.
|
||||||
*/
|
*/
|
||||||
public class HoodieSnapshotCopier implements Serializable {
|
public class HoodieSnapshotCopier implements Serializable {
|
||||||
|
|
||||||
|
|||||||
@@ -68,8 +68,6 @@ import scala.collection.JavaConversions;
|
|||||||
|
|
||||||
/**
|
/**
|
||||||
* Export the latest records of Hudi dataset to a set of external files (e.g., plain parquet files).
|
* Export the latest records of Hudi dataset to a set of external files (e.g., plain parquet files).
|
||||||
*
|
|
||||||
* @experimental This export is an experimental tool. If you want to export hudi to hudi, please use HoodieSnapshotCopier.
|
|
||||||
*/
|
*/
|
||||||
public class HoodieSnapshotExporter {
|
public class HoodieSnapshotExporter {
|
||||||
|
|
||||||
|
|||||||
@@ -19,7 +19,7 @@
|
|||||||
package org.apache.hudi.utilities.functional;
|
package org.apache.hudi.utilities.functional;
|
||||||
|
|
||||||
import org.apache.hudi.payload.AWSDmsAvroPayload;
|
import org.apache.hudi.payload.AWSDmsAvroPayload;
|
||||||
import org.apache.hudi.testutils.FunctionalTestHarness;
|
import org.apache.hudi.testutils.SparkClientFunctionalTestHarness;
|
||||||
import org.apache.hudi.utilities.transform.AWSDmsTransformer;
|
import org.apache.hudi.utilities.transform.AWSDmsTransformer;
|
||||||
|
|
||||||
import org.apache.avro.Schema;
|
import org.apache.avro.Schema;
|
||||||
@@ -38,7 +38,7 @@ import static org.junit.jupiter.api.Assertions.assertFalse;
|
|||||||
import static org.junit.jupiter.api.Assertions.assertTrue;
|
import static org.junit.jupiter.api.Assertions.assertTrue;
|
||||||
|
|
||||||
@Tag("functional")
|
@Tag("functional")
|
||||||
public class TestAWSDatabaseMigrationServiceSource extends FunctionalTestHarness {
|
public class TestAWSDatabaseMigrationServiceSource extends SparkClientFunctionalTestHarness {
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testPayload() throws IOException {
|
public void testPayload() throws IOException {
|
||||||
|
|||||||
@@ -19,7 +19,7 @@
|
|||||||
|
|
||||||
package org.apache.hudi.utilities.functional;
|
package org.apache.hudi.utilities.functional;
|
||||||
|
|
||||||
import org.apache.hudi.testutils.FunctionalTestHarness;
|
import org.apache.hudi.testutils.SparkClientFunctionalTestHarness;
|
||||||
import org.apache.hudi.utilities.transform.ChainedTransformer;
|
import org.apache.hudi.utilities.transform.ChainedTransformer;
|
||||||
import org.apache.hudi.utilities.transform.Transformer;
|
import org.apache.hudi.utilities.transform.Transformer;
|
||||||
|
|
||||||
@@ -42,7 +42,7 @@ import static org.junit.jupiter.api.Assertions.assertArrayEquals;
|
|||||||
import static org.junit.jupiter.api.Assertions.assertEquals;
|
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||||
|
|
||||||
@Tag("functional")
|
@Tag("functional")
|
||||||
public class TestChainedTransformer extends FunctionalTestHarness {
|
public class TestChainedTransformer extends SparkClientFunctionalTestHarness {
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testChainedTransformation() {
|
public void testChainedTransformation() {
|
||||||
|
|||||||
@@ -29,6 +29,7 @@ import org.apache.hadoop.conf.Configuration;
|
|||||||
import org.apache.hadoop.fs.FileSystem;
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
import org.junit.jupiter.api.BeforeEach;
|
import org.junit.jupiter.api.BeforeEach;
|
||||||
|
import org.junit.jupiter.api.Disabled;
|
||||||
import org.junit.jupiter.api.Tag;
|
import org.junit.jupiter.api.Tag;
|
||||||
import org.junit.jupiter.api.Test;
|
import org.junit.jupiter.api.Test;
|
||||||
|
|
||||||
@@ -39,6 +40,7 @@ import static org.junit.jupiter.api.Assertions.assertEquals;
|
|||||||
import static org.junit.jupiter.api.Assertions.assertFalse;
|
import static org.junit.jupiter.api.Assertions.assertFalse;
|
||||||
import static org.junit.jupiter.api.Assertions.assertTrue;
|
import static org.junit.jupiter.api.Assertions.assertTrue;
|
||||||
|
|
||||||
|
@Disabled("Disable due to flakiness and feature deprecation.")
|
||||||
@Tag("functional")
|
@Tag("functional")
|
||||||
public class TestHoodieSnapshotCopier extends FunctionalTestHarness {
|
public class TestHoodieSnapshotCopier extends FunctionalTestHarness {
|
||||||
|
|
||||||
|
|||||||
@@ -19,6 +19,7 @@
|
|||||||
package org.apache.hudi.utilities.functional;
|
package org.apache.hudi.utilities.functional;
|
||||||
|
|
||||||
import org.apache.hudi.client.SparkRDDWriteClient;
|
import org.apache.hudi.client.SparkRDDWriteClient;
|
||||||
|
import org.apache.hudi.common.fs.FSUtils;
|
||||||
import org.apache.hudi.common.model.HoodieAvroPayload;
|
import org.apache.hudi.common.model.HoodieAvroPayload;
|
||||||
import org.apache.hudi.common.model.HoodieRecord;
|
import org.apache.hudi.common.model.HoodieRecord;
|
||||||
import org.apache.hudi.common.model.HoodieTableType;
|
import org.apache.hudi.common.model.HoodieTableType;
|
||||||
@@ -27,7 +28,7 @@ import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
|
|||||||
import org.apache.hudi.config.HoodieIndexConfig;
|
import org.apache.hudi.config.HoodieIndexConfig;
|
||||||
import org.apache.hudi.config.HoodieWriteConfig;
|
import org.apache.hudi.config.HoodieWriteConfig;
|
||||||
import org.apache.hudi.index.HoodieIndex.IndexType;
|
import org.apache.hudi.index.HoodieIndex.IndexType;
|
||||||
import org.apache.hudi.testutils.FunctionalTestHarness;
|
import org.apache.hudi.testutils.SparkClientFunctionalTestHarness;
|
||||||
import org.apache.hudi.utilities.HoodieSnapshotExporter;
|
import org.apache.hudi.utilities.HoodieSnapshotExporter;
|
||||||
import org.apache.hudi.utilities.HoodieSnapshotExporter.Config;
|
import org.apache.hudi.utilities.HoodieSnapshotExporter.Config;
|
||||||
import org.apache.hudi.utilities.HoodieSnapshotExporter.OutputFormatValidator;
|
import org.apache.hudi.utilities.HoodieSnapshotExporter.OutputFormatValidator;
|
||||||
@@ -35,6 +36,7 @@ import org.apache.hudi.utilities.HoodieSnapshotExporter.Partitioner;
|
|||||||
import org.apache.hudi.utilities.exception.HoodieSnapshotExporterException;
|
import org.apache.hudi.utilities.exception.HoodieSnapshotExporterException;
|
||||||
|
|
||||||
import org.apache.hadoop.fs.FileStatus;
|
import org.apache.hadoop.fs.FileStatus;
|
||||||
|
import org.apache.hadoop.fs.LocalFileSystem;
|
||||||
import org.apache.hadoop.fs.LocatedFileStatus;
|
import org.apache.hadoop.fs.LocatedFileStatus;
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
import org.apache.hadoop.fs.RemoteIterator;
|
import org.apache.hadoop.fs.RemoteIterator;
|
||||||
@@ -54,6 +56,7 @@ import org.junit.jupiter.params.ParameterizedTest;
|
|||||||
import org.junit.jupiter.params.provider.ValueSource;
|
import org.junit.jupiter.params.provider.ValueSource;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
import java.nio.file.Paths;
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.stream.Collectors;
|
import java.util.stream.Collectors;
|
||||||
@@ -63,7 +66,7 @@ import static org.junit.jupiter.api.Assertions.assertThrows;
|
|||||||
import static org.junit.jupiter.api.Assertions.assertTrue;
|
import static org.junit.jupiter.api.Assertions.assertTrue;
|
||||||
|
|
||||||
@Tag("functional")
|
@Tag("functional")
|
||||||
public class TestHoodieSnapshotExporter extends FunctionalTestHarness {
|
public class TestHoodieSnapshotExporter extends SparkClientFunctionalTestHarness {
|
||||||
|
|
||||||
static final Logger LOG = LogManager.getLogger(TestHoodieSnapshotExporter.class);
|
static final Logger LOG = LogManager.getLogger(TestHoodieSnapshotExporter.class);
|
||||||
static final int NUM_RECORDS = 100;
|
static final int NUM_RECORDS = 100;
|
||||||
@@ -72,13 +75,14 @@ public class TestHoodieSnapshotExporter extends FunctionalTestHarness {
|
|||||||
static final String TABLE_NAME = "testing";
|
static final String TABLE_NAME = "testing";
|
||||||
String sourcePath;
|
String sourcePath;
|
||||||
String targetPath;
|
String targetPath;
|
||||||
|
LocalFileSystem lfs;
|
||||||
|
|
||||||
@BeforeEach
|
@BeforeEach
|
||||||
public void init() throws Exception {
|
public void init() throws Exception {
|
||||||
// Initialize test data dirs
|
// Initialize test data dirs
|
||||||
sourcePath = dfsBasePath() + "/source/";
|
sourcePath = Paths.get(basePath(), "source").toString();
|
||||||
targetPath = dfsBasePath() + "/target/";
|
targetPath = Paths.get(basePath(), "target").toString();
|
||||||
dfs().mkdirs(new Path(sourcePath));
|
lfs = (LocalFileSystem) FSUtils.getFs(basePath(), jsc().hadoopConfiguration());
|
||||||
|
|
||||||
HoodieTableMetaClient.withPropertyBuilder()
|
HoodieTableMetaClient.withPropertyBuilder()
|
||||||
.setTableType(HoodieTableType.COPY_ON_WRITE)
|
.setTableType(HoodieTableType.COPY_ON_WRITE)
|
||||||
@@ -88,14 +92,14 @@ public class TestHoodieSnapshotExporter extends FunctionalTestHarness {
|
|||||||
|
|
||||||
// Prepare data as source Hudi dataset
|
// Prepare data as source Hudi dataset
|
||||||
HoodieWriteConfig cfg = getHoodieWriteConfig(sourcePath);
|
HoodieWriteConfig cfg = getHoodieWriteConfig(sourcePath);
|
||||||
SparkRDDWriteClient hdfsWriteClient = new SparkRDDWriteClient(context(), cfg);
|
SparkRDDWriteClient writeClient = getHoodieWriteClient(cfg);
|
||||||
hdfsWriteClient.startCommitWithTime(COMMIT_TIME);
|
writeClient.startCommitWithTime(COMMIT_TIME);
|
||||||
HoodieTestDataGenerator dataGen = new HoodieTestDataGenerator(new String[] {PARTITION_PATH});
|
HoodieTestDataGenerator dataGen = new HoodieTestDataGenerator(new String[] {PARTITION_PATH});
|
||||||
List<HoodieRecord> records = dataGen.generateInserts(COMMIT_TIME, NUM_RECORDS);
|
List<HoodieRecord> records = dataGen.generateInserts(COMMIT_TIME, NUM_RECORDS);
|
||||||
JavaRDD<HoodieRecord> recordsRDD = jsc().parallelize(records, 1);
|
JavaRDD<HoodieRecord> recordsRDD = jsc().parallelize(records, 1);
|
||||||
hdfsWriteClient.bulkInsert(recordsRDD, COMMIT_TIME);
|
writeClient.bulkInsert(recordsRDD, COMMIT_TIME);
|
||||||
hdfsWriteClient.close();
|
writeClient.close();
|
||||||
RemoteIterator<LocatedFileStatus> itr = dfs().listFiles(new Path(sourcePath), true);
|
RemoteIterator<LocatedFileStatus> itr = lfs.listFiles(new Path(sourcePath), true);
|
||||||
while (itr.hasNext()) {
|
while (itr.hasNext()) {
|
||||||
LOG.info(">>> Prepared test file: " + itr.next().getPath());
|
LOG.info(">>> Prepared test file: " + itr.next().getPath());
|
||||||
}
|
}
|
||||||
@@ -103,8 +107,7 @@ public class TestHoodieSnapshotExporter extends FunctionalTestHarness {
|
|||||||
|
|
||||||
@AfterEach
|
@AfterEach
|
||||||
public void cleanUp() throws IOException {
|
public void cleanUp() throws IOException {
|
||||||
dfs().delete(new Path(sourcePath), true);
|
lfs.close();
|
||||||
dfs().delete(new Path(targetPath), true);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private HoodieWriteConfig getHoodieWriteConfig(String basePath) {
|
private HoodieWriteConfig getHoodieWriteConfig(String basePath) {
|
||||||
@@ -138,18 +141,18 @@ public class TestHoodieSnapshotExporter extends FunctionalTestHarness {
|
|||||||
new HoodieSnapshotExporter().export(jsc(), cfg);
|
new HoodieSnapshotExporter().export(jsc(), cfg);
|
||||||
|
|
||||||
// Check results
|
// Check results
|
||||||
assertTrue(dfs().exists(new Path(targetPath + "/.hoodie/" + COMMIT_TIME + ".commit")));
|
assertTrue(lfs.exists(new Path(targetPath + "/.hoodie/" + COMMIT_TIME + ".commit")));
|
||||||
assertTrue(dfs().exists(new Path(targetPath + "/.hoodie/" + COMMIT_TIME + ".commit.requested")));
|
assertTrue(lfs.exists(new Path(targetPath + "/.hoodie/" + COMMIT_TIME + ".commit.requested")));
|
||||||
assertTrue(dfs().exists(new Path(targetPath + "/.hoodie/" + COMMIT_TIME + ".inflight")));
|
assertTrue(lfs.exists(new Path(targetPath + "/.hoodie/" + COMMIT_TIME + ".inflight")));
|
||||||
assertTrue(dfs().exists(new Path(targetPath + "/.hoodie/hoodie.properties")));
|
assertTrue(lfs.exists(new Path(targetPath + "/.hoodie/hoodie.properties")));
|
||||||
String partition = targetPath + "/" + PARTITION_PATH;
|
String partition = targetPath + "/" + PARTITION_PATH;
|
||||||
long numParquetFiles = Arrays.stream(dfs().listStatus(new Path(partition)))
|
long numParquetFiles = Arrays.stream(lfs.listStatus(new Path(partition)))
|
||||||
.filter(fileStatus -> fileStatus.getPath().toString().endsWith(".parquet"))
|
.filter(fileStatus -> fileStatus.getPath().toString().endsWith(".parquet"))
|
||||||
.count();
|
.count();
|
||||||
assertTrue(numParquetFiles >= 1, "There should exist at least 1 parquet file.");
|
assertTrue(numParquetFiles >= 1, "There should exist at least 1 parquet file.");
|
||||||
assertEquals(NUM_RECORDS, sqlContext().read().parquet(partition).count());
|
assertEquals(NUM_RECORDS, sqlContext().read().parquet(partition).count());
|
||||||
assertTrue(dfs().exists(new Path(partition + "/.hoodie_partition_metadata")));
|
assertTrue(lfs.exists(new Path(partition + "/.hoodie_partition_metadata")));
|
||||||
assertTrue(dfs().exists(new Path(targetPath + "/_SUCCESS")));
|
assertTrue(lfs.exists(new Path(targetPath + "/_SUCCESS")));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -169,7 +172,7 @@ public class TestHoodieSnapshotExporter extends FunctionalTestHarness {
|
|||||||
@Test
|
@Test
|
||||||
public void testExportWhenTargetPathExists() throws IOException {
|
public void testExportWhenTargetPathExists() throws IOException {
|
||||||
// make target output path present
|
// make target output path present
|
||||||
dfs().mkdirs(new Path(targetPath));
|
lfs.mkdirs(new Path(targetPath));
|
||||||
|
|
||||||
// export
|
// export
|
||||||
final Throwable thrown = assertThrows(HoodieSnapshotExporterException.class, () -> {
|
final Throwable thrown = assertThrows(HoodieSnapshotExporterException.class, () -> {
|
||||||
@@ -181,12 +184,12 @@ public class TestHoodieSnapshotExporter extends FunctionalTestHarness {
|
|||||||
@Test
|
@Test
|
||||||
public void testExportDatasetWithNoCommit() throws IOException {
|
public void testExportDatasetWithNoCommit() throws IOException {
|
||||||
// delete commit files
|
// delete commit files
|
||||||
List<Path> commitFiles = Arrays.stream(dfs().listStatus(new Path(sourcePath + "/.hoodie")))
|
List<Path> commitFiles = Arrays.stream(lfs.listStatus(new Path(sourcePath + "/.hoodie")))
|
||||||
.map(FileStatus::getPath)
|
.map(FileStatus::getPath)
|
||||||
.filter(filePath -> filePath.getName().endsWith(".commit"))
|
.filter(filePath -> filePath.getName().endsWith(".commit"))
|
||||||
.collect(Collectors.toList());
|
.collect(Collectors.toList());
|
||||||
for (Path p : commitFiles) {
|
for (Path p : commitFiles) {
|
||||||
dfs().delete(p, false);
|
lfs.delete(p, false);
|
||||||
}
|
}
|
||||||
|
|
||||||
// export
|
// export
|
||||||
@@ -199,7 +202,7 @@ public class TestHoodieSnapshotExporter extends FunctionalTestHarness {
|
|||||||
@Test
|
@Test
|
||||||
public void testExportDatasetWithNoPartition() throws IOException {
|
public void testExportDatasetWithNoPartition() throws IOException {
|
||||||
// delete all source data
|
// delete all source data
|
||||||
dfs().delete(new Path(sourcePath + "/" + PARTITION_PATH), true);
|
lfs.delete(new Path(sourcePath + "/" + PARTITION_PATH), true);
|
||||||
|
|
||||||
// export
|
// export
|
||||||
final Throwable thrown = assertThrows(HoodieSnapshotExporterException.class, () -> {
|
final Throwable thrown = assertThrows(HoodieSnapshotExporterException.class, () -> {
|
||||||
@@ -221,7 +224,7 @@ public class TestHoodieSnapshotExporter extends FunctionalTestHarness {
|
|||||||
cfg.outputFormat = format;
|
cfg.outputFormat = format;
|
||||||
new HoodieSnapshotExporter().export(jsc(), cfg);
|
new HoodieSnapshotExporter().export(jsc(), cfg);
|
||||||
assertEquals(NUM_RECORDS, sqlContext().read().format(format).load(targetPath).count());
|
assertEquals(NUM_RECORDS, sqlContext().read().format(format).load(targetPath).count());
|
||||||
assertTrue(dfs().exists(new Path(targetPath + "/_SUCCESS")));
|
assertTrue(lfs.exists(new Path(targetPath + "/_SUCCESS")));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -259,8 +262,8 @@ public class TestHoodieSnapshotExporter extends FunctionalTestHarness {
|
|||||||
new HoodieSnapshotExporter().export(jsc(), cfg);
|
new HoodieSnapshotExporter().export(jsc(), cfg);
|
||||||
|
|
||||||
assertEquals(NUM_RECORDS, sqlContext().read().format("json").load(targetPath).count());
|
assertEquals(NUM_RECORDS, sqlContext().read().format("json").load(targetPath).count());
|
||||||
assertTrue(dfs().exists(new Path(targetPath + "/_SUCCESS")));
|
assertTrue(lfs.exists(new Path(targetPath + "/_SUCCESS")));
|
||||||
assertTrue(dfs().listStatus(new Path(targetPath)).length > 1);
|
assertTrue(lfs.listStatus(new Path(targetPath)).length > 1);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
@@ -269,8 +272,8 @@ public class TestHoodieSnapshotExporter extends FunctionalTestHarness {
|
|||||||
new HoodieSnapshotExporter().export(jsc(), cfg);
|
new HoodieSnapshotExporter().export(jsc(), cfg);
|
||||||
|
|
||||||
assertEquals(NUM_RECORDS, sqlContext().read().format("json").load(targetPath).count());
|
assertEquals(NUM_RECORDS, sqlContext().read().format("json").load(targetPath).count());
|
||||||
assertTrue(dfs().exists(new Path(targetPath + "/_SUCCESS")));
|
assertTrue(lfs.exists(new Path(targetPath + "/_SUCCESS")));
|
||||||
assertTrue(dfs().exists(new Path(String.format("%s/%s=%s", targetPath, UserDefinedPartitioner.PARTITION_NAME, PARTITION_PATH))));
|
assertTrue(lfs.exists(new Path(String.format("%s/%s=%s", targetPath, UserDefinedPartitioner.PARTITION_NAME, PARTITION_PATH))));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -20,7 +20,7 @@ package org.apache.hudi.utilities.functional;
|
|||||||
|
|
||||||
import org.apache.hudi.common.config.TypedProperties;
|
import org.apache.hudi.common.config.TypedProperties;
|
||||||
import org.apache.hudi.exception.HoodieException;
|
import org.apache.hudi.exception.HoodieException;
|
||||||
import org.apache.hudi.testutils.FunctionalTestHarness;
|
import org.apache.hudi.testutils.SparkClientFunctionalTestHarness;
|
||||||
import org.apache.hudi.utilities.UtilHelpers;
|
import org.apache.hudi.utilities.UtilHelpers;
|
||||||
import org.apache.hudi.utilities.schema.JdbcbasedSchemaProvider;
|
import org.apache.hudi.utilities.schema.JdbcbasedSchemaProvider;
|
||||||
import org.apache.hudi.utilities.testutils.UtilitiesTestBase;
|
import org.apache.hudi.utilities.testutils.UtilitiesTestBase;
|
||||||
@@ -41,7 +41,7 @@ import java.sql.SQLException;
|
|||||||
import static org.junit.jupiter.api.Assertions.assertEquals;
|
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||||
|
|
||||||
@Tag("functional")
|
@Tag("functional")
|
||||||
public class TestJdbcbasedSchemaProvider extends FunctionalTestHarness {
|
public class TestJdbcbasedSchemaProvider extends SparkClientFunctionalTestHarness {
|
||||||
|
|
||||||
private static final Logger LOG = LogManager.getLogger(TestJdbcbasedSchemaProvider.class);
|
private static final Logger LOG = LogManager.getLogger(TestJdbcbasedSchemaProvider.class);
|
||||||
private static final TypedProperties PROPS = new TypedProperties();
|
private static final TypedProperties PROPS = new TypedProperties();
|
||||||
|
|||||||
Reference in New Issue
Block a user