1
0

[HUDI-810] Migrate ClientTestHarness to JUnit 5 (#1553)

This commit is contained in:
Raymond Xu
2020-04-28 08:38:16 -07:00
committed by GitHub
parent 6de9f5d9e5
commit 06dae30297
36 changed files with 1232 additions and 1243 deletions

View File

@@ -45,95 +45,89 @@ import org.apache.spark.sql.Column;
import org.apache.spark.sql.DataFrameWriter;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.junit.After;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.runners.Enclosed;
import org.junit.rules.ExpectedException;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.junit.runners.Parameterized.Parameter;
import org.junit.runners.Parameterized.Parameters;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Nested;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.NullSource;
import org.junit.jupiter.params.provider.ValueSource;
import java.io.IOException;
import java.util.Arrays;
import java.util.List;
import java.util.stream.Collectors;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
@RunWith(Enclosed.class)
public class TestHoodieSnapshotExporter {
public class TestHoodieSnapshotExporter extends HoodieClientTestHarness {
static class ExporterTestHarness extends HoodieClientTestHarness {
static final Logger LOG = LogManager.getLogger(TestHoodieSnapshotExporter.class);
static final int NUM_RECORDS = 100;
static final String COMMIT_TIME = "20200101000000";
static final String PARTITION_PATH = "2020";
static final String TABLE_NAME = "testing";
String sourcePath;
String targetPath;
static final Logger LOG = LogManager.getLogger(ExporterTestHarness.class);
static final int NUM_RECORDS = 100;
static final String COMMIT_TIME = "20200101000000";
static final String PARTITION_PATH = "2020";
static final String TABLE_NAME = "testing";
String sourcePath;
String targetPath;
@BeforeEach
public void setUp() throws Exception {
initSparkContexts();
initDFS();
dataGen = new HoodieTestDataGenerator(new String[] {PARTITION_PATH});
@Before
public void setUp() throws Exception {
initSparkContexts();
initDFS();
dataGen = new HoodieTestDataGenerator(new String[] {PARTITION_PATH});
// Initialize test data dirs
sourcePath = dfsBasePath + "/source/";
targetPath = dfsBasePath + "/target/";
dfs.mkdirs(new Path(sourcePath));
HoodieTableMetaClient
.initTableType(jsc.hadoopConfiguration(), sourcePath, HoodieTableType.COPY_ON_WRITE, TABLE_NAME,
HoodieAvroPayload.class.getName());
// Initialize test data dirs
sourcePath = dfsBasePath + "/source/";
targetPath = dfsBasePath + "/target/";
dfs.mkdirs(new Path(sourcePath));
HoodieTableMetaClient
.initTableType(jsc.hadoopConfiguration(), sourcePath, HoodieTableType.COPY_ON_WRITE, TABLE_NAME,
HoodieAvroPayload.class.getName());
// Prepare data as source Hudi dataset
HoodieWriteConfig cfg = getHoodieWriteConfig(sourcePath);
HoodieWriteClient hdfsWriteClient = new HoodieWriteClient(jsc, cfg);
hdfsWriteClient.startCommitWithTime(COMMIT_TIME);
List<HoodieRecord> records = dataGen.generateInserts(COMMIT_TIME, NUM_RECORDS);
JavaRDD<HoodieRecord> recordsRDD = jsc.parallelize(records, 1);
hdfsWriteClient.bulkInsert(recordsRDD, COMMIT_TIME);
hdfsWriteClient.close();
// Prepare data as source Hudi dataset
HoodieWriteConfig cfg = getHoodieWriteConfig(sourcePath);
HoodieWriteClient hdfsWriteClient = new HoodieWriteClient(jsc, cfg);
hdfsWriteClient.startCommitWithTime(COMMIT_TIME);
List<HoodieRecord> records = dataGen.generateInserts(COMMIT_TIME, NUM_RECORDS);
JavaRDD<HoodieRecord> recordsRDD = jsc.parallelize(records, 1);
hdfsWriteClient.bulkInsert(recordsRDD, COMMIT_TIME);
hdfsWriteClient.close();
RemoteIterator<LocatedFileStatus> itr = dfs.listFiles(new Path(sourcePath), true);
while (itr.hasNext()) {
LOG.info(">>> Prepared test file: " + itr.next().getPath());
}
}
@After
public void tearDown() throws Exception {
cleanupSparkContexts();
cleanupDFS();
cleanupTestDataGenerator();
}
private HoodieWriteConfig getHoodieWriteConfig(String basePath) {
return HoodieWriteConfig.newBuilder()
.withPath(basePath)
.withEmbeddedTimelineServerEnabled(false)
.withSchema(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA)
.withParallelism(2, 2)
.withBulkInsertParallelism(2)
.forTable(TABLE_NAME)
.withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(IndexType.BLOOM).build())
.build();
RemoteIterator<LocatedFileStatus> itr = dfs.listFiles(new Path(sourcePath), true);
while (itr.hasNext()) {
LOG.info(">>> Prepared test file: " + itr.next().getPath());
}
}
public static class TestHoodieSnapshotExporterForHudi extends ExporterTestHarness {
@AfterEach
public void tearDown() throws Exception {
cleanupSparkContexts();
cleanupDFS();
cleanupTestDataGenerator();
}
private HoodieWriteConfig getHoodieWriteConfig(String basePath) {
return HoodieWriteConfig.newBuilder()
.withPath(basePath)
.withEmbeddedTimelineServerEnabled(false)
.withSchema(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA)
.withParallelism(2, 2)
.withBulkInsertParallelism(2)
.forTable(TABLE_NAME)
.withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(IndexType.BLOOM).build())
.build();
}
@Nested
public class TestHoodieSnapshotExporterForHudi {
private HoodieSnapshotExporter.Config cfg;
@Before
@BeforeEach
public void setUp() throws Exception {
super.setUp();
cfg = new Config();
cfg.sourceBasePath = sourcePath;
cfg.targetOutputPath = targetPath;
@@ -156,23 +150,20 @@ public class TestHoodieSnapshotExporter {
long numParquetFiles = Arrays.stream(dfs.listStatus(new Path(partition)))
.filter(fileStatus -> fileStatus.getPath().toString().endsWith(".parquet"))
.count();
assertTrue("There should exist at least 1 parquet file.", numParquetFiles >= 1);
assertTrue(numParquetFiles >= 1, "There should exist at least 1 parquet file.");
assertEquals(NUM_RECORDS, sqlContext.read().parquet(partition).count());
assertTrue(dfs.exists(new Path(partition + "/.hoodie_partition_metadata")));
assertTrue(dfs.exists(new Path(targetPath + "/_SUCCESS")));
}
}
public static class TestHoodieSnapshotExporterForEarlyAbort extends ExporterTestHarness {
@Nested
public class TestHoodieSnapshotExporterForEarlyAbort {
private HoodieSnapshotExporter.Config cfg;
@Rule
public ExpectedException exceptionRule = ExpectedException.none();
@Before
@BeforeEach
public void setUp() throws Exception {
super.setUp();
cfg = new Config();
cfg.sourceBasePath = sourcePath;
cfg.targetOutputPath = targetPath;
@@ -185,9 +176,10 @@ public class TestHoodieSnapshotExporter {
dfs.mkdirs(new Path(targetPath));
// export
exceptionRule.expect(HoodieSnapshotExporterException.class);
exceptionRule.expectMessage("The target output path already exists.");
new HoodieSnapshotExporter().export(jsc, cfg);
final Throwable thrown = assertThrows(HoodieSnapshotExporterException.class, () -> {
new HoodieSnapshotExporter().export(jsc, cfg);
});
assertEquals("The target output path already exists.", thrown.getMessage());
}
@Test
@@ -202,9 +194,10 @@ public class TestHoodieSnapshotExporter {
}
// export
exceptionRule.expect(HoodieSnapshotExporterException.class);
exceptionRule.expectMessage("No commits present. Nothing to snapshot.");
new HoodieSnapshotExporter().export(jsc, cfg);
final Throwable thrown = assertThrows(HoodieSnapshotExporterException.class, () -> {
new HoodieSnapshotExporter().export(jsc, cfg);
});
assertEquals("No commits present. Nothing to snapshot.", thrown.getMessage());
}
@Test
@@ -213,25 +206,19 @@ public class TestHoodieSnapshotExporter {
dfs.delete(new Path(sourcePath + "/" + PARTITION_PATH), true);
// export
exceptionRule.expect(HoodieSnapshotExporterException.class);
exceptionRule.expectMessage("The source dataset has 0 partition to snapshot.");
new HoodieSnapshotExporter().export(jsc, cfg);
final Throwable thrown = assertThrows(HoodieSnapshotExporterException.class, () -> {
new HoodieSnapshotExporter().export(jsc, cfg);
});
assertEquals("The source dataset has 0 partition to snapshot.", thrown.getMessage());
}
}
@RunWith(Parameterized.class)
public static class TestHoodieSnapshotExporterForNonHudi extends ExporterTestHarness {
@Nested
public class TestHoodieSnapshotExporterForNonHudi {
@Parameters
public static Iterable<String[]> formats() {
return Arrays.asList(new String[][] {{"json"}, {"parquet"}});
}
@Parameter
public String format;
@Test
public void testExportAsNonHudi() throws IOException {
@ParameterizedTest
@ValueSource(strings = {"json", "parquet"})
public void testExportAsNonHudi(String format) throws IOException {
HoodieSnapshotExporter.Config cfg = new Config();
cfg.sourceBasePath = sourcePath;
cfg.targetOutputPath = targetPath;
@@ -242,27 +229,27 @@ public class TestHoodieSnapshotExporter {
}
}
public static class TestHoodieSnapshotExporterForRepartitioning extends ExporterTestHarness {
public static class UserDefinedPartitioner implements Partitioner {
private static final String PARTITION_NAME = "year";
public static final String PARTITION_NAME = "year";
public static class UserDefinedPartitioner implements Partitioner {
@Override
public DataFrameWriter<Row> partition(Dataset<Row> source) {
return source
.withColumnRenamed(HoodieRecord.PARTITION_PATH_METADATA_FIELD, PARTITION_NAME)
.repartition(new Column(PARTITION_NAME))
.write()
.partitionBy(PARTITION_NAME);
}
@Override
public DataFrameWriter<Row> partition(Dataset<Row> source) {
return source
.withColumnRenamed(HoodieRecord.PARTITION_PATH_METADATA_FIELD, PARTITION_NAME)
.repartition(new Column(PARTITION_NAME))
.write()
.partitionBy(PARTITION_NAME);
}
}
@Nested
public class TestHoodieSnapshotExporterForRepartitioning {
private HoodieSnapshotExporter.Config cfg;
@Before
@BeforeEach
public void setUp() throws Exception {
super.setUp();
cfg = new Config();
cfg.sourceBasePath = sourcePath;
cfg.targetOutputPath = targetPath;
@@ -287,39 +274,35 @@ public class TestHoodieSnapshotExporter {
assertEquals(NUM_RECORDS, sqlContext.read().format("json").load(targetPath).count());
assertTrue(dfs.exists(new Path(targetPath + "/_SUCCESS")));
assertTrue(dfs.exists(new Path(String.format("%s/%s=%s", targetPath, PARTITION_NAME, PARTITION_PATH))));
assertTrue(dfs.exists(new Path(String.format("%s/%s=%s", targetPath, UserDefinedPartitioner.PARTITION_NAME, PARTITION_PATH))));
}
}
@RunWith(Parameterized.class)
public static class TestHoodieSnapshotExporterInputValidation {
@Nested
public class TestHoodieSnapshotExporterInputValidation {
@Parameters
public static Iterable<Object[]> data() {
return Arrays.asList(new Object[][] {
{"json", true}, {"parquet", true}, {"hudi", true},
{"JSON", false}, {"foo", false}, {null, false}, {"", false}
@ParameterizedTest
@ValueSource(strings = {"json", "parquet", "hudi"})
public void testValidateOutputFormat_withValidFormat(String format) {
assertDoesNotThrow(() -> {
new OutputFormatValidator().validate(null, format);
});
}
@Parameter
public String format;
@Parameter(1)
public boolean isValid;
@Test
public void testValidateOutputFormat() {
Throwable t = null;
try {
@ParameterizedTest
@ValueSource(strings = {"", "JSON"})
public void testValidateOutputFormat_withInvalidFormat(String format) {
assertThrows(ParameterException.class, () -> {
new OutputFormatValidator().validate(null, format);
} catch (Exception e) {
t = e;
}
if (isValid) {
assertNull(t);
} else {
assertTrue(t instanceof ParameterException);
}
});
}
@ParameterizedTest
@NullSource
public void testValidateOutputFormat_withNullFormat(String format) {
assertThrows(ParameterException.class, () -> {
new OutputFormatValidator().validate(null, format);
});
}
}
}