[HUDI-813] Migrate hudi-utilities tests to JUnit 5 (#1589)
This commit is contained in:
@@ -28,29 +28,29 @@ import org.apache.spark.api.java.JavaSparkContext;
|
|||||||
import org.apache.spark.sql.Dataset;
|
import org.apache.spark.sql.Dataset;
|
||||||
import org.apache.spark.sql.Row;
|
import org.apache.spark.sql.Row;
|
||||||
import org.apache.spark.sql.SparkSession;
|
import org.apache.spark.sql.SparkSession;
|
||||||
import org.junit.AfterClass;
|
import org.junit.jupiter.api.AfterAll;
|
||||||
import org.junit.BeforeClass;
|
import org.junit.jupiter.api.BeforeAll;
|
||||||
import org.junit.Test;
|
import org.junit.jupiter.api.Test;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.io.Serializable;
|
import java.io.Serializable;
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
|
|
||||||
import static org.junit.Assert.assertFalse;
|
import static org.junit.jupiter.api.Assertions.assertFalse;
|
||||||
import static org.junit.Assert.assertTrue;
|
import static org.junit.jupiter.api.Assertions.assertTrue;
|
||||||
|
|
||||||
public class TestAWSDatabaseMigrationServiceSource {
|
public class TestAWSDatabaseMigrationServiceSource {
|
||||||
|
|
||||||
private static JavaSparkContext jsc;
|
private static JavaSparkContext jsc;
|
||||||
private static SparkSession spark;
|
private static SparkSession spark;
|
||||||
|
|
||||||
@BeforeClass
|
@BeforeAll
|
||||||
public static void setupTest() {
|
public static void setupTest() {
|
||||||
jsc = UtilHelpers.buildSparkContext("aws-dms-test", "local[2]");
|
jsc = UtilHelpers.buildSparkContext("aws-dms-test", "local[2]");
|
||||||
spark = SparkSession.builder().config(jsc.getConf()).getOrCreate();
|
spark = SparkSession.builder().config(jsc.getConf()).getOrCreate();
|
||||||
}
|
}
|
||||||
|
|
||||||
@AfterClass
|
@AfterAll
|
||||||
public static void tearDownTest() {
|
public static void tearDownTest() {
|
||||||
if (jsc != null) {
|
if (jsc != null) {
|
||||||
jsc.stop();
|
jsc.stop();
|
||||||
@@ -99,7 +99,7 @@ public class TestAWSDatabaseMigrationServiceSource {
|
|||||||
new Record("2", 3433L)), Record.class);
|
new Record("2", 3433L)), Record.class);
|
||||||
|
|
||||||
Dataset<Row> outputFrame = transformer.apply(jsc, spark, inputFrame, null);
|
Dataset<Row> outputFrame = transformer.apply(jsc, spark, inputFrame, null);
|
||||||
assertTrue(Arrays.asList(outputFrame.schema().fields()).stream()
|
assertTrue(Arrays.stream(outputFrame.schema().fields())
|
||||||
.map(f -> f.name()).anyMatch(n -> n.equals(AWSDmsAvroPayload.OP_FIELD)));
|
.map(f -> f.name()).anyMatch(n -> n.equals(AWSDmsAvroPayload.OP_FIELD)));
|
||||||
assertTrue(outputFrame.select(AWSDmsAvroPayload.OP_FIELD).collectAsList().stream()
|
assertTrue(outputFrame.select(AWSDmsAvroPayload.OP_FIELD).collectAsList().stream()
|
||||||
.allMatch(r -> r.getString(0).equals("")));
|
.allMatch(r -> r.getString(0).equals("")));
|
||||||
|
|||||||
@@ -41,12 +41,11 @@ import org.apache.spark.api.java.JavaSparkContext;
|
|||||||
import org.apache.spark.sql.Dataset;
|
import org.apache.spark.sql.Dataset;
|
||||||
import org.apache.spark.sql.Row;
|
import org.apache.spark.sql.Row;
|
||||||
import org.apache.spark.sql.SQLContext;
|
import org.apache.spark.sql.SQLContext;
|
||||||
|
import org.junit.jupiter.api.AfterAll;
|
||||||
import org.junit.After;
|
import org.junit.jupiter.api.AfterEach;
|
||||||
import org.junit.AfterClass;
|
import org.junit.jupiter.api.BeforeAll;
|
||||||
import org.junit.Before;
|
import org.junit.jupiter.api.BeforeEach;
|
||||||
import org.junit.BeforeClass;
|
import org.junit.jupiter.api.Test;
|
||||||
import org.junit.Test;
|
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.io.Serializable;
|
import java.io.Serializable;
|
||||||
@@ -61,8 +60,8 @@ import java.util.concurrent.TimeUnit;
|
|||||||
import java.util.concurrent.atomic.AtomicInteger;
|
import java.util.concurrent.atomic.AtomicInteger;
|
||||||
import java.util.stream.Collectors;
|
import java.util.stream.Collectors;
|
||||||
|
|
||||||
import static org.junit.Assert.assertEquals;
|
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||||
import static org.junit.Assert.assertTrue;
|
import static org.junit.jupiter.api.Assertions.assertTrue;
|
||||||
|
|
||||||
public class TestHDFSParquetImporter implements Serializable {
|
public class TestHDFSParquetImporter implements Serializable {
|
||||||
|
|
||||||
@@ -71,7 +70,7 @@ public class TestHDFSParquetImporter implements Serializable {
|
|||||||
private static MiniDFSCluster dfsCluster;
|
private static MiniDFSCluster dfsCluster;
|
||||||
private static DistributedFileSystem dfs;
|
private static DistributedFileSystem dfs;
|
||||||
|
|
||||||
@BeforeClass
|
@BeforeAll
|
||||||
public static void initClass() throws Exception {
|
public static void initClass() throws Exception {
|
||||||
hdfsTestService = new HdfsTestService();
|
hdfsTestService = new HdfsTestService();
|
||||||
dfsCluster = hdfsTestService.start(true);
|
dfsCluster = hdfsTestService.start(true);
|
||||||
@@ -82,7 +81,7 @@ public class TestHDFSParquetImporter implements Serializable {
|
|||||||
dfs.mkdirs(new Path(dfsBasePath));
|
dfs.mkdirs(new Path(dfsBasePath));
|
||||||
}
|
}
|
||||||
|
|
||||||
@AfterClass
|
@AfterAll
|
||||||
public static void cleanupClass() {
|
public static void cleanupClass() {
|
||||||
if (hdfsTestService != null) {
|
if (hdfsTestService != null) {
|
||||||
hdfsTestService.stop();
|
hdfsTestService.stop();
|
||||||
@@ -94,7 +93,7 @@ public class TestHDFSParquetImporter implements Serializable {
|
|||||||
private transient Path srcFolder;
|
private transient Path srcFolder;
|
||||||
private transient List<GenericRecord> insertData;
|
private transient List<GenericRecord> insertData;
|
||||||
|
|
||||||
@Before
|
@BeforeEach
|
||||||
public void init() throws IOException, ParseException {
|
public void init() throws IOException, ParseException {
|
||||||
basePath = (new Path(dfsBasePath, Thread.currentThread().getStackTrace()[1].getMethodName())).toString();
|
basePath = (new Path(dfsBasePath, Thread.currentThread().getStackTrace()[1].getMethodName())).toString();
|
||||||
|
|
||||||
@@ -106,7 +105,7 @@ public class TestHDFSParquetImporter implements Serializable {
|
|||||||
insertData = createInsertRecords(srcFolder);
|
insertData = createInsertRecords(srcFolder);
|
||||||
}
|
}
|
||||||
|
|
||||||
@After
|
@AfterEach
|
||||||
public void clean() throws IOException {
|
public void clean() throws IOException {
|
||||||
dfs.delete(new Path(basePath), true);
|
dfs.delete(new Path(basePath), true);
|
||||||
}
|
}
|
||||||
@@ -138,8 +137,8 @@ public class TestHDFSParquetImporter implements Serializable {
|
|||||||
};
|
};
|
||||||
// Schema file is not created so this operation should fail.
|
// Schema file is not created so this operation should fail.
|
||||||
assertEquals(0, dataImporter.dataImport(jsc, retry.get()));
|
assertEquals(0, dataImporter.dataImport(jsc, retry.get()));
|
||||||
assertEquals(retry.get(), -1);
|
assertEquals(-1, retry.get());
|
||||||
assertEquals(fileCreated.get(), 1);
|
assertEquals(1, fileCreated.get());
|
||||||
|
|
||||||
// Check if
|
// Check if
|
||||||
// 1. .commit file is present
|
// 1. .commit file is present
|
||||||
@@ -162,10 +161,10 @@ public class TestHDFSParquetImporter implements Serializable {
|
|||||||
recordCounts.put(partitionPath, recordCounts.get(partitionPath) + count);
|
recordCounts.put(partitionPath, recordCounts.get(partitionPath) + count);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
assertTrue("commit file is missing", isCommitFilePresent);
|
assertTrue(isCommitFilePresent, "commit file is missing");
|
||||||
assertEquals("partition is missing", 4, recordCounts.size());
|
assertEquals(4, recordCounts.size(), "partition is missing");
|
||||||
for (Entry<String, Long> e : recordCounts.entrySet()) {
|
for (Entry<String, Long> e : recordCounts.entrySet()) {
|
||||||
assertEquals("missing records", 24, e.getValue().longValue());
|
assertEquals(24, e.getValue().longValue(), "missing records");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -18,15 +18,16 @@
|
|||||||
|
|
||||||
package org.apache.hudi.utilities;
|
package org.apache.hudi.utilities;
|
||||||
|
|
||||||
import org.junit.Assert;
|
import org.junit.jupiter.api.BeforeEach;
|
||||||
import org.junit.Before;
|
import org.junit.jupiter.api.Test;
|
||||||
import org.junit.Test;
|
|
||||||
|
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
|
||||||
|
|
||||||
public class TestHiveIncrementalPuller {
|
public class TestHiveIncrementalPuller {
|
||||||
|
|
||||||
private HiveIncrementalPuller.Config config;
|
private HiveIncrementalPuller.Config config;
|
||||||
|
|
||||||
@Before
|
@BeforeEach
|
||||||
public void setup() {
|
public void setup() {
|
||||||
config = new HiveIncrementalPuller.Config();
|
config = new HiveIncrementalPuller.Config();
|
||||||
}
|
}
|
||||||
@@ -34,11 +35,9 @@ public class TestHiveIncrementalPuller {
|
|||||||
@Test
|
@Test
|
||||||
public void testInitHiveIncrementalPuller() {
|
public void testInitHiveIncrementalPuller() {
|
||||||
|
|
||||||
try {
|
assertDoesNotThrow(() -> {
|
||||||
new HiveIncrementalPuller(config);
|
new HiveIncrementalPuller(config);
|
||||||
} catch (Exception e) {
|
}, "Unexpected exception while initing HiveIncrementalPuller.");
|
||||||
Assert.fail("Unexpected exception while initing HiveIncrementalPuller, msg: " + e.getMessage());
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -70,12 +70,11 @@ import org.apache.spark.sql.api.java.UDF4;
|
|||||||
import org.apache.spark.sql.functions;
|
import org.apache.spark.sql.functions;
|
||||||
import org.apache.spark.sql.types.DataTypes;
|
import org.apache.spark.sql.types.DataTypes;
|
||||||
import org.apache.spark.streaming.kafka010.KafkaTestUtils;
|
import org.apache.spark.streaming.kafka010.KafkaTestUtils;
|
||||||
import org.junit.After;
|
import org.junit.jupiter.api.AfterAll;
|
||||||
import org.junit.AfterClass;
|
import org.junit.jupiter.api.AfterEach;
|
||||||
import org.junit.Assert;
|
import org.junit.jupiter.api.BeforeAll;
|
||||||
import org.junit.Before;
|
import org.junit.jupiter.api.BeforeEach;
|
||||||
import org.junit.BeforeClass;
|
import org.junit.jupiter.api.Test;
|
||||||
import org.junit.Test;
|
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
@@ -89,10 +88,10 @@ import java.util.concurrent.TimeUnit;
|
|||||||
import java.util.function.Function;
|
import java.util.function.Function;
|
||||||
import java.util.stream.Collectors;
|
import java.util.stream.Collectors;
|
||||||
|
|
||||||
import static org.junit.Assert.assertEquals;
|
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||||
import static org.junit.Assert.assertFalse;
|
import static org.junit.jupiter.api.Assertions.assertFalse;
|
||||||
import static org.junit.Assert.assertTrue;
|
import static org.junit.jupiter.api.Assertions.assertThrows;
|
||||||
import static org.junit.Assert.fail;
|
import static org.junit.jupiter.api.Assertions.assertTrue;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Basic tests against {@link HoodieDeltaStreamer}, by issuing bulk_inserts, upserts, inserts. Check counts at the end.
|
* Basic tests against {@link HoodieDeltaStreamer}, by issuing bulk_inserts, upserts, inserts. Check counts at the end.
|
||||||
@@ -116,7 +115,7 @@ public class TestHoodieDeltaStreamer extends UtilitiesTestBase {
|
|||||||
|
|
||||||
private static int testNum = 1;
|
private static int testNum = 1;
|
||||||
|
|
||||||
@BeforeClass
|
@BeforeAll
|
||||||
public static void initClass() throws Exception {
|
public static void initClass() throws Exception {
|
||||||
UtilitiesTestBase.initClass(true);
|
UtilitiesTestBase.initClass(true);
|
||||||
PARQUET_SOURCE_ROOT = dfsBasePath + "/parquetFiles";
|
PARQUET_SOURCE_ROOT = dfsBasePath + "/parquetFiles";
|
||||||
@@ -226,17 +225,17 @@ public class TestHoodieDeltaStreamer extends UtilitiesTestBase {
|
|||||||
MultiPartKeysValueExtractor.class.getName());
|
MultiPartKeysValueExtractor.class.getName());
|
||||||
}
|
}
|
||||||
|
|
||||||
@AfterClass
|
@AfterAll
|
||||||
public static void cleanupClass() {
|
public static void cleanupClass() {
|
||||||
UtilitiesTestBase.cleanupClass();
|
UtilitiesTestBase.cleanupClass();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Before
|
@BeforeEach
|
||||||
public void setup() throws Exception {
|
public void setup() throws Exception {
|
||||||
super.setup();
|
super.setup();
|
||||||
}
|
}
|
||||||
|
|
||||||
@After
|
@AfterEach
|
||||||
public void teardown() throws Exception {
|
public void teardown() throws Exception {
|
||||||
super.teardown();
|
super.teardown();
|
||||||
}
|
}
|
||||||
@@ -343,7 +342,7 @@ public class TestHoodieDeltaStreamer extends UtilitiesTestBase {
|
|||||||
HoodieTimeline timeline = meta.getActiveTimeline().getCommitTimeline().filterCompletedInstants();
|
HoodieTimeline timeline = meta.getActiveTimeline().getCommitTimeline().filterCompletedInstants();
|
||||||
LOG.info("Timeline Instants=" + meta.getActiveTimeline().getInstants().collect(Collectors.toList()));
|
LOG.info("Timeline Instants=" + meta.getActiveTimeline().getInstants().collect(Collectors.toList()));
|
||||||
int numCompactionCommits = (int) timeline.getInstants().count();
|
int numCompactionCommits = (int) timeline.getInstants().count();
|
||||||
assertTrue("Got=" + numCompactionCommits + ", exp >=" + minExpected, minExpected <= numCompactionCommits);
|
assertTrue(minExpected <= numCompactionCommits, "Got=" + numCompactionCommits + ", exp >=" + minExpected);
|
||||||
}
|
}
|
||||||
|
|
||||||
static void assertAtleastNDeltaCommits(int minExpected, String tablePath, FileSystem fs) {
|
static void assertAtleastNDeltaCommits(int minExpected, String tablePath, FileSystem fs) {
|
||||||
@@ -351,7 +350,7 @@ public class TestHoodieDeltaStreamer extends UtilitiesTestBase {
|
|||||||
HoodieTimeline timeline = meta.getActiveTimeline().getDeltaCommitTimeline().filterCompletedInstants();
|
HoodieTimeline timeline = meta.getActiveTimeline().getDeltaCommitTimeline().filterCompletedInstants();
|
||||||
LOG.info("Timeline Instants=" + meta.getActiveTimeline().getInstants().collect(Collectors.toList()));
|
LOG.info("Timeline Instants=" + meta.getActiveTimeline().getInstants().collect(Collectors.toList()));
|
||||||
int numDeltaCommits = (int) timeline.getInstants().count();
|
int numDeltaCommits = (int) timeline.getInstants().count();
|
||||||
assertTrue("Got=" + numDeltaCommits + ", exp >=" + minExpected, minExpected <= numDeltaCommits);
|
assertTrue(minExpected <= numDeltaCommits, "Got=" + numDeltaCommits + ", exp >=" + minExpected);
|
||||||
}
|
}
|
||||||
|
|
||||||
static String assertCommitMetadata(String expected, String tablePath, FileSystem fs, int totalCommits)
|
static String assertCommitMetadata(String expected, String tablePath, FileSystem fs, int totalCommits)
|
||||||
@@ -413,37 +412,33 @@ public class TestHoodieDeltaStreamer extends UtilitiesTestBase {
|
|||||||
HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator();
|
HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator();
|
||||||
Helpers.saveParquetToDFS(Helpers.toGenericRecords(dataGenerator.generateInserts("000", 100)), new Path(filePath));
|
Helpers.saveParquetToDFS(Helpers.toGenericRecords(dataGenerator.generateInserts("000", 100)), new Path(filePath));
|
||||||
HoodieDeltaStreamer deltaStreamer = new HoodieDeltaStreamer(cfg, jsc, dfs, hdfsTestService.getHadoopConf(), props);
|
HoodieDeltaStreamer deltaStreamer = new HoodieDeltaStreamer(cfg, jsc, dfs, hdfsTestService.getHadoopConf(), props);
|
||||||
assertEquals(deltaStreamer.getConfig().checkpoint, "kafka_topic1,0:200");
|
assertEquals("kafka_topic1,0:200", deltaStreamer.getConfig().checkpoint);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testPropsWithInvalidKeyGenerator() throws Exception {
|
public void testPropsWithInvalidKeyGenerator() throws Exception {
|
||||||
try {
|
Exception e = assertThrows(IOException.class, () -> {
|
||||||
String tableBasePath = dfsBasePath + "/test_table";
|
String tableBasePath = dfsBasePath + "/test_table";
|
||||||
HoodieDeltaStreamer deltaStreamer =
|
HoodieDeltaStreamer deltaStreamer =
|
||||||
new HoodieDeltaStreamer(TestHelpers.makeConfig(tableBasePath, Operation.BULK_INSERT,
|
new HoodieDeltaStreamer(TestHelpers.makeConfig(tableBasePath, Operation.BULK_INSERT,
|
||||||
Collections.singletonList(TripsWithDistanceTransformer.class.getName()), PROPS_FILENAME_TEST_INVALID, false), jsc);
|
Collections.singletonList(TripsWithDistanceTransformer.class.getName()), PROPS_FILENAME_TEST_INVALID, false), jsc);
|
||||||
deltaStreamer.sync();
|
deltaStreamer.sync();
|
||||||
fail("Should error out when setting the key generator class property to an invalid value");
|
}, "Should error out when setting the key generator class property to an invalid value");
|
||||||
} catch (IOException e) {
|
|
||||||
// expected
|
// expected
|
||||||
LOG.error("Expected error during getting the key generator", e);
|
LOG.debug("Expected error during getting the key generator", e);
|
||||||
assertTrue(e.getMessage().contains("Could not load key generator class"));
|
assertTrue(e.getMessage().contains("Could not load key generator class"));
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testTableCreation() throws Exception {
|
public void testTableCreation() throws Exception {
|
||||||
try {
|
Exception e = assertThrows(TableNotFoundException.class, () -> {
|
||||||
dfs.mkdirs(new Path(dfsBasePath + "/not_a_table"));
|
dfs.mkdirs(new Path(dfsBasePath + "/not_a_table"));
|
||||||
HoodieDeltaStreamer deltaStreamer =
|
HoodieDeltaStreamer deltaStreamer =
|
||||||
new HoodieDeltaStreamer(TestHelpers.makeConfig(dfsBasePath + "/not_a_table", Operation.BULK_INSERT), jsc);
|
new HoodieDeltaStreamer(TestHelpers.makeConfig(dfsBasePath + "/not_a_table", Operation.BULK_INSERT), jsc);
|
||||||
deltaStreamer.sync();
|
deltaStreamer.sync();
|
||||||
fail("Should error out when pointed out at a dir thats not a table");
|
}, "Should error out when pointed out at a dir thats not a table");
|
||||||
} catch (TableNotFoundException e) {
|
|
||||||
// expected
|
// expected
|
||||||
LOG.error("Expected error during table creation", e);
|
LOG.debug("Expected error during table creation", e);
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
@@ -596,11 +591,12 @@ public class TestHoodieDeltaStreamer extends UtilitiesTestBase {
|
|||||||
|
|
||||||
// Test Hive integration
|
// Test Hive integration
|
||||||
HoodieHiveClient hiveClient = new HoodieHiveClient(hiveSyncConfig, hiveServer.getHiveConf(), dfs);
|
HoodieHiveClient hiveClient = new HoodieHiveClient(hiveSyncConfig, hiveServer.getHiveConf(), dfs);
|
||||||
assertTrue("Table " + hiveSyncConfig.tableName + " should exist", hiveClient.doesTableExist(hiveSyncConfig.tableName));
|
assertTrue(hiveClient.doesTableExist(hiveSyncConfig.tableName), "Table " + hiveSyncConfig.tableName + " should exist");
|
||||||
assertEquals("Table partitions should match the number of partitions we wrote", 1,
|
assertEquals(1, hiveClient.scanTablePartitions(hiveSyncConfig.tableName).size(),
|
||||||
hiveClient.scanTablePartitions(hiveSyncConfig.tableName).size());
|
"Table partitions should match the number of partitions we wrote");
|
||||||
assertEquals("The last commit that was sycned should be updated in the TBLPROPERTIES", lastInstantForUpstreamTable,
|
assertEquals(lastInstantForUpstreamTable,
|
||||||
hiveClient.getLastCommitTimeSynced(hiveSyncConfig.tableName).get());
|
hiveClient.getLastCommitTimeSynced(hiveSyncConfig.tableName).get(),
|
||||||
|
"The last commit that was sycned should be updated in the TBLPROPERTIES");
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
@@ -609,14 +605,12 @@ public class TestHoodieDeltaStreamer extends UtilitiesTestBase {
|
|||||||
HoodieDeltaStreamer.Config cfg = TestHelpers.makeConfig(tableBasePath, Operation.BULK_INSERT,
|
HoodieDeltaStreamer.Config cfg = TestHelpers.makeConfig(tableBasePath, Operation.BULK_INSERT,
|
||||||
Collections.singletonList(SqlQueryBasedTransformer.class.getName()), PROPS_FILENAME_TEST_SOURCE, true,
|
Collections.singletonList(SqlQueryBasedTransformer.class.getName()), PROPS_FILENAME_TEST_SOURCE, true,
|
||||||
false, false, null, null);
|
false, false, null, null);
|
||||||
try {
|
Exception e = assertThrows(HoodieException.class, () -> {
|
||||||
new HoodieDeltaStreamer(cfg, jsc, dfs, hiveServer.getHiveConf()).sync();
|
new HoodieDeltaStreamer(cfg, jsc, dfs, hiveServer.getHiveConf()).sync();
|
||||||
fail("Should error out when schema provider is not provided");
|
}, "Should error out when schema provider is not provided");
|
||||||
} catch (HoodieException e) {
|
LOG.debug("Expected error during reading data from source ", e);
|
||||||
LOG.error("Expected error during reading data from source ", e);
|
|
||||||
assertTrue(e.getMessage().contains("Please provide a valid schema provider class!"));
|
assertTrue(e.getMessage().contains("Please provide a valid schema provider class!"));
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testPayloadClassUpdate() throws Exception {
|
public void testPayloadClassUpdate() throws Exception {
|
||||||
@@ -734,7 +728,7 @@ public class TestHoodieDeltaStreamer extends UtilitiesTestBase {
|
|||||||
InputBatch<JavaRDD<GenericRecord>> batch = distributedTestDataSource.fetchNext(Option.empty(), 10000000);
|
InputBatch<JavaRDD<GenericRecord>> batch = distributedTestDataSource.fetchNext(Option.empty(), 10000000);
|
||||||
batch.getBatch().get().cache();
|
batch.getBatch().get().cache();
|
||||||
long c = batch.getBatch().get().count();
|
long c = batch.getBatch().get().count();
|
||||||
Assert.assertEquals(1000, c);
|
assertEquals(1000, c);
|
||||||
}
|
}
|
||||||
|
|
||||||
private static void prepareParquetDFSFiles(int numRecords) throws IOException {
|
private static void prepareParquetDFSFiles(int numRecords) throws IOException {
|
||||||
@@ -916,14 +910,12 @@ public class TestHoodieDeltaStreamer extends UtilitiesTestBase {
|
|||||||
// Target schema is determined based on the Dataframe after transformation
|
// Target schema is determined based on the Dataframe after transformation
|
||||||
// No CSV header and no schema provider at the same time are not recommended,
|
// No CSV header and no schema provider at the same time are not recommended,
|
||||||
// as the transformer behavior may be unexpected
|
// as the transformer behavior may be unexpected
|
||||||
try {
|
Exception e = assertThrows(AnalysisException.class, () -> {
|
||||||
testCsvDFSSource(false, '\t', false, Collections.singletonList(TripsWithDistanceTransformer.class.getName()));
|
testCsvDFSSource(false, '\t', false, Collections.singletonList(TripsWithDistanceTransformer.class.getName()));
|
||||||
fail("Should error out when doing the transformation.");
|
}, "Should error out when doing the transformation.");
|
||||||
} catch (AnalysisException e) {
|
LOG.debug("Expected error during transformation", e);
|
||||||
LOG.error("Expected error during transformation", e);
|
|
||||||
assertTrue(e.getMessage().contains("cannot resolve '`begin_lat`' given input columns:"));
|
assertTrue(e.getMessage().contains("cannot resolve '`begin_lat`' given input columns:"));
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testCsvDFSSourceNoHeaderWithSchemaProviderAndTransformer() throws Exception {
|
public void testCsvDFSSourceNoHeaderWithSchemaProviderAndTransformer() throws Exception {
|
||||||
|
|||||||
@@ -30,15 +30,15 @@ import org.apache.hudi.utilities.sources.TestDataSource;
|
|||||||
|
|
||||||
import org.apache.log4j.LogManager;
|
import org.apache.log4j.LogManager;
|
||||||
import org.apache.log4j.Logger;
|
import org.apache.log4j.Logger;
|
||||||
import org.junit.Ignore;
|
import org.junit.jupiter.api.Disabled;
|
||||||
import org.junit.Test;
|
import org.junit.jupiter.api.Test;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
|
||||||
import static org.junit.Assert.assertEquals;
|
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||||
import static org.junit.Assert.assertTrue;
|
import static org.junit.jupiter.api.Assertions.assertThrows;
|
||||||
import static org.junit.Assert.fail;
|
import static org.junit.jupiter.api.Assertions.assertTrue;
|
||||||
|
|
||||||
public class TestHoodieMultiTableDeltaStreamer extends TestHoodieDeltaStreamer {
|
public class TestHoodieMultiTableDeltaStreamer extends TestHoodieDeltaStreamer {
|
||||||
|
|
||||||
@@ -64,37 +64,31 @@ public class TestHoodieMultiTableDeltaStreamer extends TestHoodieDeltaStreamer {
|
|||||||
@Test
|
@Test
|
||||||
public void testInvalidHiveSyncProps() throws IOException {
|
public void testInvalidHiveSyncProps() throws IOException {
|
||||||
HoodieMultiTableDeltaStreamer.Config cfg = TestHelpers.getConfig(PROPS_INVALID_HIVE_SYNC_TEST_SOURCE1, dfsBasePath + "/config", TestDataSource.class.getName(), true);
|
HoodieMultiTableDeltaStreamer.Config cfg = TestHelpers.getConfig(PROPS_INVALID_HIVE_SYNC_TEST_SOURCE1, dfsBasePath + "/config", TestDataSource.class.getName(), true);
|
||||||
try {
|
Exception e = assertThrows(HoodieException.class, () -> {
|
||||||
new HoodieMultiTableDeltaStreamer(cfg, jsc);
|
new HoodieMultiTableDeltaStreamer(cfg, jsc);
|
||||||
fail("Should fail when hive sync table not provided with enableHiveSync flag");
|
}, "Should fail when hive sync table not provided with enableHiveSync flag");
|
||||||
} catch (HoodieException he) {
|
log.debug("Expected error when creating table execution objects", e);
|
||||||
log.error("Expected error when creating table execution objects", he);
|
assertTrue(e.getMessage().contains("Hive sync table field not provided!"));
|
||||||
assertTrue(he.getMessage().contains("Hive sync table field not provided!"));
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testInvalidPropsFilePath() throws IOException {
|
public void testInvalidPropsFilePath() throws IOException {
|
||||||
HoodieMultiTableDeltaStreamer.Config cfg = TestHelpers.getConfig(PROPS_INVALID_FILE, dfsBasePath + "/config", TestDataSource.class.getName(), true);
|
HoodieMultiTableDeltaStreamer.Config cfg = TestHelpers.getConfig(PROPS_INVALID_FILE, dfsBasePath + "/config", TestDataSource.class.getName(), true);
|
||||||
try {
|
Exception e = assertThrows(IllegalArgumentException.class, () -> {
|
||||||
new HoodieMultiTableDeltaStreamer(cfg, jsc);
|
new HoodieMultiTableDeltaStreamer(cfg, jsc);
|
||||||
fail("Should fail when invalid props file is provided");
|
}, "Should fail when invalid props file is provided");
|
||||||
} catch (IllegalArgumentException iae) {
|
log.debug("Expected error when creating table execution objects", e);
|
||||||
log.error("Expected error when creating table execution objects", iae);
|
assertTrue(e.getMessage().contains("Please provide valid common config file path!"));
|
||||||
assertTrue(iae.getMessage().contains("Please provide valid common config file path!"));
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testInvalidTableConfigFilePath() throws IOException {
|
public void testInvalidTableConfigFilePath() throws IOException {
|
||||||
HoodieMultiTableDeltaStreamer.Config cfg = TestHelpers.getConfig(PROPS_INVALID_TABLE_CONFIG_FILE, dfsBasePath + "/config", TestDataSource.class.getName(), true);
|
HoodieMultiTableDeltaStreamer.Config cfg = TestHelpers.getConfig(PROPS_INVALID_TABLE_CONFIG_FILE, dfsBasePath + "/config", TestDataSource.class.getName(), true);
|
||||||
try {
|
Exception e = assertThrows(IllegalArgumentException.class, () -> {
|
||||||
new HoodieMultiTableDeltaStreamer(cfg, jsc);
|
new HoodieMultiTableDeltaStreamer(cfg, jsc);
|
||||||
fail("Should fail when invalid table config props file path is provided");
|
}, "Should fail when invalid table config props file path is provided");
|
||||||
} catch (IllegalArgumentException iae) {
|
log.debug("Expected error when creating table execution objects", e);
|
||||||
log.error("Expected error when creating table execution objects", iae);
|
assertTrue(e.getMessage().contains("Please provide valid table config file path!"));
|
||||||
assertTrue(iae.getMessage().contains("Please provide valid table config file path!"));
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
@@ -102,27 +96,25 @@ public class TestHoodieMultiTableDeltaStreamer extends TestHoodieDeltaStreamer {
|
|||||||
HoodieMultiTableDeltaStreamer.Config cfg = TestHelpers.getConfig(PROPS_FILENAME_TEST_SOURCE1, dfsBasePath + "/config", TestDataSource.class.getName(), false);
|
HoodieMultiTableDeltaStreamer.Config cfg = TestHelpers.getConfig(PROPS_FILENAME_TEST_SOURCE1, dfsBasePath + "/config", TestDataSource.class.getName(), false);
|
||||||
HoodieMultiTableDeltaStreamer streamer = new HoodieMultiTableDeltaStreamer(cfg, jsc);
|
HoodieMultiTableDeltaStreamer streamer = new HoodieMultiTableDeltaStreamer(cfg, jsc);
|
||||||
TableExecutionContext executionContext = streamer.getTableExecutionContexts().get(1);
|
TableExecutionContext executionContext = streamer.getTableExecutionContexts().get(1);
|
||||||
assertEquals(streamer.getTableExecutionContexts().size(), 2);
|
assertEquals(2, streamer.getTableExecutionContexts().size());
|
||||||
assertEquals(executionContext.getConfig().targetBasePath, dfsBasePath + "/multi_table_dataset/uber_db/dummy_table_uber");
|
assertEquals(dfsBasePath + "/multi_table_dataset/uber_db/dummy_table_uber", executionContext.getConfig().targetBasePath);
|
||||||
assertEquals(executionContext.getConfig().targetTableName, "uber_db.dummy_table_uber");
|
assertEquals("uber_db.dummy_table_uber", executionContext.getConfig().targetTableName);
|
||||||
assertEquals(executionContext.getProperties().getString(HoodieMultiTableDeltaStreamer.Constants.KAFKA_TOPIC_PROP), "topic1");
|
assertEquals("topic1", executionContext.getProperties().getString(HoodieMultiTableDeltaStreamer.Constants.KAFKA_TOPIC_PROP));
|
||||||
assertEquals(executionContext.getProperties().getString(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY()), "_row_key");
|
assertEquals("_row_key", executionContext.getProperties().getString(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY()));
|
||||||
assertEquals(executionContext.getProperties().getString(DataSourceWriteOptions.KEYGENERATOR_CLASS_OPT_KEY()), TestHoodieDeltaStreamer.TestGenerator.class.getName());
|
assertEquals(TestHoodieDeltaStreamer.TestGenerator.class.getName(), executionContext.getProperties().getString(DataSourceWriteOptions.KEYGENERATOR_CLASS_OPT_KEY()));
|
||||||
assertEquals(executionContext.getProperties().getString(HoodieMultiTableDeltaStreamer.Constants.HIVE_SYNC_TABLE_PROP), "uber_hive_dummy_table");
|
assertEquals("uber_hive_dummy_table", executionContext.getProperties().getString(HoodieMultiTableDeltaStreamer.Constants.HIVE_SYNC_TABLE_PROP));
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
@Ignore
|
@Disabled
|
||||||
public void testInvalidIngestionProps() {
|
public void testInvalidIngestionProps() {
|
||||||
try {
|
Exception e = assertThrows(Exception.class, () -> {
|
||||||
HoodieMultiTableDeltaStreamer.Config cfg = TestHelpers.getConfig(PROPS_FILENAME_TEST_SOURCE1, dfsBasePath + "/config", TestDataSource.class.getName(), true);
|
HoodieMultiTableDeltaStreamer.Config cfg = TestHelpers.getConfig(PROPS_FILENAME_TEST_SOURCE1, dfsBasePath + "/config", TestDataSource.class.getName(), true);
|
||||||
new HoodieMultiTableDeltaStreamer(cfg, jsc);
|
new HoodieMultiTableDeltaStreamer(cfg, jsc);
|
||||||
fail("Creation of execution object should fail without kafka topic");
|
}, "Creation of execution object should fail without kafka topic");
|
||||||
} catch (Exception e) {
|
log.debug("Creation of execution object failed with error: " + e.getMessage(), e);
|
||||||
log.error("Creation of execution object failed with error: " + e.getMessage(), e);
|
|
||||||
assertTrue(e.getMessage().contains("Please provide valid table config arguments!"));
|
assertTrue(e.getMessage().contains("Please provide valid table config arguments!"));
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
|
||||||
@Test //0 corresponds to fg
|
@Test //0 corresponds to fg
|
||||||
public void testMultiTableExecution() throws IOException {
|
public void testMultiTableExecution() throws IOException {
|
||||||
@@ -156,7 +148,7 @@ public class TestHoodieMultiTableDeltaStreamer extends TestHoodieDeltaStreamer {
|
|||||||
testUtils.sendMessages("topic1", Helpers.jsonifyRecords(dataGenerator.generateUpdatesAsPerSchema("001", 5, HoodieTestDataGenerator.TRIP_SCHEMA)));
|
testUtils.sendMessages("topic1", Helpers.jsonifyRecords(dataGenerator.generateUpdatesAsPerSchema("001", 5, HoodieTestDataGenerator.TRIP_SCHEMA)));
|
||||||
testUtils.sendMessages("topic2", Helpers.jsonifyRecords(dataGenerator.generateUpdatesAsPerSchema("001", 10, HoodieTestDataGenerator.SHORT_TRIP_SCHEMA)));
|
testUtils.sendMessages("topic2", Helpers.jsonifyRecords(dataGenerator.generateUpdatesAsPerSchema("001", 10, HoodieTestDataGenerator.SHORT_TRIP_SCHEMA)));
|
||||||
streamer.sync();
|
streamer.sync();
|
||||||
assertEquals(streamer.getSuccessTables().size(), 2);
|
assertEquals(2, streamer.getSuccessTables().size());
|
||||||
assertTrue(streamer.getFailedTables().isEmpty());
|
assertTrue(streamer.getFailedTables().isEmpty());
|
||||||
|
|
||||||
//assert the record count matches now
|
//assert the record count matches now
|
||||||
|
|||||||
@@ -26,9 +26,9 @@ import org.apache.avro.Schema;
|
|||||||
import org.apache.log4j.LogManager;
|
import org.apache.log4j.LogManager;
|
||||||
import org.apache.log4j.Logger;
|
import org.apache.log4j.Logger;
|
||||||
import org.apache.spark.api.java.JavaSparkContext;
|
import org.apache.spark.api.java.JavaSparkContext;
|
||||||
import org.junit.After;
|
import org.junit.jupiter.api.AfterEach;
|
||||||
import org.junit.Before;
|
import org.junit.jupiter.api.BeforeEach;
|
||||||
import org.junit.Test;
|
import org.junit.jupiter.api.Test;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.sql.Connection;
|
import java.sql.Connection;
|
||||||
@@ -36,7 +36,7 @@ import java.sql.DriverManager;
|
|||||||
import java.sql.PreparedStatement;
|
import java.sql.PreparedStatement;
|
||||||
import java.sql.SQLException;
|
import java.sql.SQLException;
|
||||||
|
|
||||||
import static org.junit.Assert.assertEquals;
|
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||||
|
|
||||||
public class TestJdbcbasedSchemaProvider {
|
public class TestJdbcbasedSchemaProvider {
|
||||||
|
|
||||||
@@ -44,7 +44,7 @@ public class TestJdbcbasedSchemaProvider {
|
|||||||
private static final TypedProperties PROPS = new TypedProperties();
|
private static final TypedProperties PROPS = new TypedProperties();
|
||||||
protected transient JavaSparkContext jsc = null;
|
protected transient JavaSparkContext jsc = null;
|
||||||
|
|
||||||
@Before
|
@BeforeEach
|
||||||
public void init() {
|
public void init() {
|
||||||
jsc = UtilHelpers.buildSparkContext(this.getClass().getName() + "-hoodie", "local[2]");
|
jsc = UtilHelpers.buildSparkContext(this.getClass().getName() + "-hoodie", "local[2]");
|
||||||
PROPS.setProperty("hoodie.deltastreamer.schemaprovider.source.schema.jdbc.connection.url", "jdbc:h2:mem:test_mem");
|
PROPS.setProperty("hoodie.deltastreamer.schemaprovider.source.schema.jdbc.connection.url", "jdbc:h2:mem:test_mem");
|
||||||
@@ -56,7 +56,7 @@ public class TestJdbcbasedSchemaProvider {
|
|||||||
PROPS.setProperty("hoodie.deltastreamer.schemaprovider.source.schema.jdbc.nullable", "false");
|
PROPS.setProperty("hoodie.deltastreamer.schemaprovider.source.schema.jdbc.nullable", "false");
|
||||||
}
|
}
|
||||||
|
|
||||||
@After
|
@AfterEach
|
||||||
public void teardown() throws Exception {
|
public void teardown() throws Exception {
|
||||||
if (jsc != null) {
|
if (jsc != null) {
|
||||||
jsc.stop();
|
jsc.stop();
|
||||||
|
|||||||
@@ -22,12 +22,12 @@ import org.apache.hudi.common.model.HoodieTableType;
|
|||||||
import org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer;
|
import org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer;
|
||||||
import org.apache.hudi.utilities.deltastreamer.SchedulerConfGenerator;
|
import org.apache.hudi.utilities.deltastreamer.SchedulerConfGenerator;
|
||||||
|
|
||||||
import org.junit.Test;
|
import org.junit.jupiter.api.Test;
|
||||||
|
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
|
||||||
import static org.junit.Assert.assertNotNull;
|
import static org.junit.jupiter.api.Assertions.assertNotNull;
|
||||||
import static org.junit.Assert.assertNull;
|
import static org.junit.jupiter.api.Assertions.assertNull;
|
||||||
|
|
||||||
public class TestSchedulerConfGenerator {
|
public class TestSchedulerConfGenerator {
|
||||||
|
|
||||||
@@ -35,21 +35,21 @@ public class TestSchedulerConfGenerator {
|
|||||||
public void testGenerateSparkSchedulingConf() throws Exception {
|
public void testGenerateSparkSchedulingConf() throws Exception {
|
||||||
HoodieDeltaStreamer.Config cfg = new HoodieDeltaStreamer.Config();
|
HoodieDeltaStreamer.Config cfg = new HoodieDeltaStreamer.Config();
|
||||||
Map<String, String> configs = SchedulerConfGenerator.getSparkSchedulingConfigs(cfg);
|
Map<String, String> configs = SchedulerConfGenerator.getSparkSchedulingConfigs(cfg);
|
||||||
assertNull("spark.scheduler.mode not set", configs.get(SchedulerConfGenerator.SPARK_SCHEDULER_ALLOCATION_FILE_KEY));
|
assertNull(configs.get(SchedulerConfGenerator.SPARK_SCHEDULER_ALLOCATION_FILE_KEY), "spark.scheduler.mode not set");
|
||||||
|
|
||||||
System.setProperty(SchedulerConfGenerator.SPARK_SCHEDULER_MODE_KEY, "FAIR");
|
System.setProperty(SchedulerConfGenerator.SPARK_SCHEDULER_MODE_KEY, "FAIR");
|
||||||
cfg.continuousMode = false;
|
cfg.continuousMode = false;
|
||||||
configs = SchedulerConfGenerator.getSparkSchedulingConfigs(cfg);
|
configs = SchedulerConfGenerator.getSparkSchedulingConfigs(cfg);
|
||||||
assertNull("continuousMode is false", configs.get(SchedulerConfGenerator.SPARK_SCHEDULER_ALLOCATION_FILE_KEY));
|
assertNull(configs.get(SchedulerConfGenerator.SPARK_SCHEDULER_ALLOCATION_FILE_KEY), "continuousMode is false");
|
||||||
|
|
||||||
cfg.continuousMode = true;
|
cfg.continuousMode = true;
|
||||||
cfg.tableType = HoodieTableType.COPY_ON_WRITE.name();
|
cfg.tableType = HoodieTableType.COPY_ON_WRITE.name();
|
||||||
configs = SchedulerConfGenerator.getSparkSchedulingConfigs(cfg);
|
configs = SchedulerConfGenerator.getSparkSchedulingConfigs(cfg);
|
||||||
assertNull("table type is not MERGE_ON_READ",
|
assertNull(configs.get(SchedulerConfGenerator.SPARK_SCHEDULER_ALLOCATION_FILE_KEY),
|
||||||
configs.get(SchedulerConfGenerator.SPARK_SCHEDULER_ALLOCATION_FILE_KEY));
|
"table type is not MERGE_ON_READ");
|
||||||
|
|
||||||
cfg.tableType = HoodieTableType.MERGE_ON_READ.name();
|
cfg.tableType = HoodieTableType.MERGE_ON_READ.name();
|
||||||
configs = SchedulerConfGenerator.getSparkSchedulingConfigs(cfg);
|
configs = SchedulerConfGenerator.getSparkSchedulingConfigs(cfg);
|
||||||
assertNotNull("all satisfies", configs.get(SchedulerConfGenerator.SPARK_SCHEDULER_ALLOCATION_FILE_KEY));
|
assertNotNull(configs.get(SchedulerConfGenerator.SPARK_SCHEDULER_ALLOCATION_FILE_KEY), "all satisfies");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -26,18 +26,18 @@ import org.apache.hudi.utilities.keygen.TimestampBasedKeyGenerator;
|
|||||||
|
|
||||||
import org.apache.avro.Schema;
|
import org.apache.avro.Schema;
|
||||||
import org.apache.avro.generic.GenericRecord;
|
import org.apache.avro.generic.GenericRecord;
|
||||||
import org.junit.Before;
|
import org.junit.jupiter.api.BeforeEach;
|
||||||
import org.junit.Test;
|
import org.junit.jupiter.api.Test;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
|
||||||
import static org.junit.Assert.assertEquals;
|
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||||
|
|
||||||
public class TestTimestampBasedKeyGenerator {
|
public class TestTimestampBasedKeyGenerator {
|
||||||
private GenericRecord baseRecord;
|
private GenericRecord baseRecord;
|
||||||
private TypedProperties properties = new TypedProperties();
|
private TypedProperties properties = new TypedProperties();
|
||||||
|
|
||||||
@Before
|
@BeforeEach
|
||||||
public void initialize() throws IOException {
|
public void initialize() throws IOException {
|
||||||
Schema schema = SchemaTestUtil.getTimestampEvolvedSchema();
|
Schema schema = SchemaTestUtil.getTimestampEvolvedSchema();
|
||||||
baseRecord = SchemaTestUtil
|
baseRecord = SchemaTestUtil
|
||||||
@@ -61,23 +61,23 @@ public class TestTimestampBasedKeyGenerator {
|
|||||||
baseRecord.put("createTime", 1578283932000L);
|
baseRecord.put("createTime", 1578283932000L);
|
||||||
properties = getBaseKeyConfig("EPOCHMILLISECONDS", "yyyy-MM-dd hh", "GMT+8:00");
|
properties = getBaseKeyConfig("EPOCHMILLISECONDS", "yyyy-MM-dd hh", "GMT+8:00");
|
||||||
HoodieKey hk1 = new TimestampBasedKeyGenerator(properties).getKey(baseRecord);
|
HoodieKey hk1 = new TimestampBasedKeyGenerator(properties).getKey(baseRecord);
|
||||||
assertEquals(hk1.getPartitionPath(), "2020-01-06 12");
|
assertEquals("2020-01-06 12", hk1.getPartitionPath());
|
||||||
|
|
||||||
// timezone is GMT
|
// timezone is GMT
|
||||||
properties = getBaseKeyConfig("EPOCHMILLISECONDS", "yyyy-MM-dd hh", "GMT");
|
properties = getBaseKeyConfig("EPOCHMILLISECONDS", "yyyy-MM-dd hh", "GMT");
|
||||||
HoodieKey hk2 = new TimestampBasedKeyGenerator(properties).getKey(baseRecord);
|
HoodieKey hk2 = new TimestampBasedKeyGenerator(properties).getKey(baseRecord);
|
||||||
assertEquals(hk2.getPartitionPath(), "2020-01-06 04");
|
assertEquals("2020-01-06 04", hk2.getPartitionPath());
|
||||||
|
|
||||||
// timestamp is DATE_STRING, timezone is GMT+8:00
|
// timestamp is DATE_STRING, timezone is GMT+8:00
|
||||||
baseRecord.put("createTime", "2020-01-06 12:12:12");
|
baseRecord.put("createTime", "2020-01-06 12:12:12");
|
||||||
properties = getBaseKeyConfig("DATE_STRING", "yyyy-MM-dd hh", "GMT+8:00");
|
properties = getBaseKeyConfig("DATE_STRING", "yyyy-MM-dd hh", "GMT+8:00");
|
||||||
properties.setProperty("hoodie.deltastreamer.keygen.timebased.input.dateformat", "yyyy-MM-dd hh:mm:ss");
|
properties.setProperty("hoodie.deltastreamer.keygen.timebased.input.dateformat", "yyyy-MM-dd hh:mm:ss");
|
||||||
HoodieKey hk3 = new TimestampBasedKeyGenerator(properties).getKey(baseRecord);
|
HoodieKey hk3 = new TimestampBasedKeyGenerator(properties).getKey(baseRecord);
|
||||||
assertEquals(hk3.getPartitionPath(), "2020-01-06 12");
|
assertEquals("2020-01-06 12", hk3.getPartitionPath());
|
||||||
|
|
||||||
// timezone is GMT
|
// timezone is GMT
|
||||||
properties = getBaseKeyConfig("DATE_STRING", "yyyy-MM-dd hh", "GMT");
|
properties = getBaseKeyConfig("DATE_STRING", "yyyy-MM-dd hh", "GMT");
|
||||||
HoodieKey hk4 = new TimestampBasedKeyGenerator(properties).getKey(baseRecord);
|
HoodieKey hk4 = new TimestampBasedKeyGenerator(properties).getKey(baseRecord);
|
||||||
assertEquals(hk4.getPartitionPath(), "2020-01-06 12");
|
assertEquals("2020-01-06 12", hk4.getPartitionPath());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -27,26 +27,21 @@ import org.apache.spark.api.java.JavaSparkContext;
|
|||||||
import org.apache.spark.sql.Dataset;
|
import org.apache.spark.sql.Dataset;
|
||||||
import org.apache.spark.sql.Row;
|
import org.apache.spark.sql.Row;
|
||||||
import org.apache.spark.sql.SparkSession;
|
import org.apache.spark.sql.SparkSession;
|
||||||
import org.junit.Rule;
|
import org.junit.jupiter.api.Nested;
|
||||||
import org.junit.Test;
|
import org.junit.jupiter.api.Test;
|
||||||
import org.junit.experimental.runners.Enclosed;
|
|
||||||
import org.junit.rules.ExpectedException;
|
|
||||||
import org.junit.runner.RunWith;
|
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
|
||||||
import static org.junit.Assert.assertEquals;
|
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||||
import static org.junit.Assert.assertFalse;
|
import static org.junit.jupiter.api.Assertions.assertFalse;
|
||||||
import static org.junit.Assert.assertTrue;
|
import static org.junit.jupiter.api.Assertions.assertThrows;
|
||||||
|
import static org.junit.jupiter.api.Assertions.assertTrue;
|
||||||
|
|
||||||
@RunWith(Enclosed.class)
|
|
||||||
public class TestUtilHelpers {
|
public class TestUtilHelpers {
|
||||||
|
|
||||||
public static class TestCreateTransformer {
|
|
||||||
|
|
||||||
public static class TransformerFoo implements Transformer {
|
public static class TransformerFoo implements Transformer {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
@@ -63,8 +58,8 @@ public class TestUtilHelpers {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Rule
|
@Nested
|
||||||
public ExpectedException exceptionRule = ExpectedException.none();
|
public class TestCreateTransformer {
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testCreateTransformerNotPresent() throws IOException {
|
public void testCreateTransformerNotPresent() throws IOException {
|
||||||
@@ -93,9 +88,10 @@ public class TestUtilHelpers {
|
|||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testCreateTransformerThrowsException() throws IOException {
|
public void testCreateTransformerThrowsException() throws IOException {
|
||||||
exceptionRule.expect(IOException.class);
|
Exception e = assertThrows(IOException.class, () -> {
|
||||||
exceptionRule.expectMessage("Could not load transformer class(es) [foo, bar]");
|
|
||||||
UtilHelpers.createTransformer(Arrays.asList("foo", "bar"));
|
UtilHelpers.createTransformer(Arrays.asList("foo", "bar"));
|
||||||
|
});
|
||||||
|
assertEquals("Could not load transformer class(es) [foo, bar]", e.getMessage());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -56,10 +56,10 @@ import org.apache.parquet.hadoop.ParquetWriter;
|
|||||||
import org.apache.spark.api.java.JavaSparkContext;
|
import org.apache.spark.api.java.JavaSparkContext;
|
||||||
import org.apache.spark.sql.SQLContext;
|
import org.apache.spark.sql.SQLContext;
|
||||||
import org.apache.spark.sql.SparkSession;
|
import org.apache.spark.sql.SparkSession;
|
||||||
import org.junit.After;
|
import org.junit.jupiter.api.AfterAll;
|
||||||
import org.junit.AfterClass;
|
import org.junit.jupiter.api.AfterEach;
|
||||||
import org.junit.Before;
|
import org.junit.jupiter.api.BeforeAll;
|
||||||
import org.junit.BeforeClass;
|
import org.junit.jupiter.api.BeforeEach;
|
||||||
|
|
||||||
import java.io.BufferedReader;
|
import java.io.BufferedReader;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
@@ -86,7 +86,7 @@ public class UtilitiesTestBase {
|
|||||||
protected static HiveTestService hiveTestService;
|
protected static HiveTestService hiveTestService;
|
||||||
private static ObjectMapper mapper = new ObjectMapper();
|
private static ObjectMapper mapper = new ObjectMapper();
|
||||||
|
|
||||||
@BeforeClass
|
@BeforeAll
|
||||||
public static void initClass() throws Exception {
|
public static void initClass() throws Exception {
|
||||||
initClass(false);
|
initClass(false);
|
||||||
}
|
}
|
||||||
@@ -104,7 +104,7 @@ public class UtilitiesTestBase {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@AfterClass
|
@AfterAll
|
||||||
public static void cleanupClass() {
|
public static void cleanupClass() {
|
||||||
if (hdfsTestService != null) {
|
if (hdfsTestService != null) {
|
||||||
hdfsTestService.stop();
|
hdfsTestService.stop();
|
||||||
@@ -117,7 +117,7 @@ public class UtilitiesTestBase {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Before
|
@BeforeEach
|
||||||
public void setup() throws Exception {
|
public void setup() throws Exception {
|
||||||
TestDataSource.initDataGen();
|
TestDataSource.initDataGen();
|
||||||
jsc = UtilHelpers.buildSparkContext(this.getClass().getName() + "-hoodie", "local[2]");
|
jsc = UtilHelpers.buildSparkContext(this.getClass().getName() + "-hoodie", "local[2]");
|
||||||
@@ -125,7 +125,7 @@ public class UtilitiesTestBase {
|
|||||||
sparkSession = SparkSession.builder().config(jsc.getConf()).getOrCreate();
|
sparkSession = SparkSession.builder().config(jsc.getConf()).getOrCreate();
|
||||||
}
|
}
|
||||||
|
|
||||||
@After
|
@AfterEach
|
||||||
public void teardown() throws Exception {
|
public void teardown() throws Exception {
|
||||||
TestDataSource.resetDataGen();
|
TestDataSource.resetDataGen();
|
||||||
if (jsc != null) {
|
if (jsc != null) {
|
||||||
|
|||||||
@@ -33,9 +33,8 @@ import org.apache.parquet.avro.AvroParquetWriter;
|
|||||||
import org.apache.parquet.hadoop.ParquetReader;
|
import org.apache.parquet.hadoop.ParquetReader;
|
||||||
import org.apache.parquet.hadoop.ParquetWriter;
|
import org.apache.parquet.hadoop.ParquetWriter;
|
||||||
import org.apache.parquet.hadoop.metadata.CompressionCodecName;
|
import org.apache.parquet.hadoop.metadata.CompressionCodecName;
|
||||||
import org.junit.After;
|
import org.junit.jupiter.api.AfterEach;
|
||||||
import org.junit.Assert;
|
import org.junit.jupiter.api.Test;
|
||||||
import org.junit.Test;
|
|
||||||
|
|
||||||
import java.io.File;
|
import java.io.File;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
@@ -46,6 +45,7 @@ import java.util.UUID;
|
|||||||
import static org.apache.hudi.common.fs.inline.FileSystemTestUtils.FILE_SCHEME;
|
import static org.apache.hudi.common.fs.inline.FileSystemTestUtils.FILE_SCHEME;
|
||||||
import static org.apache.hudi.common.fs.inline.FileSystemTestUtils.getPhantomFile;
|
import static org.apache.hudi.common.fs.inline.FileSystemTestUtils.getPhantomFile;
|
||||||
import static org.apache.hudi.common.fs.inline.FileSystemTestUtils.getRandomOuterInMemPath;
|
import static org.apache.hudi.common.fs.inline.FileSystemTestUtils.getRandomOuterInMemPath;
|
||||||
|
import static org.junit.jupiter.api.Assertions.assertArrayEquals;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Tests {@link InLineFileSystem} with Parquet writer and reader. hudi-common can't access HoodieTestDataGenerator.
|
* Tests {@link InLineFileSystem} with Parquet writer and reader. hudi-common can't access HoodieTestDataGenerator.
|
||||||
@@ -64,7 +64,7 @@ public class TestParquetInLining {
|
|||||||
inlineConf.set("fs." + InLineFileSystem.SCHEME + ".impl", InLineFileSystem.class.getName());
|
inlineConf.set("fs." + InLineFileSystem.SCHEME + ".impl", InLineFileSystem.class.getName());
|
||||||
}
|
}
|
||||||
|
|
||||||
@After
|
@AfterEach
|
||||||
public void teardown() throws IOException {
|
public void teardown() throws IOException {
|
||||||
if (generatedPath != null) {
|
if (generatedPath != null) {
|
||||||
File filePath = new File(generatedPath.toString().substring(generatedPath.toString().indexOf(':') + 1));
|
File filePath = new File(generatedPath.toString().substring(generatedPath.toString().indexOf(':') + 1));
|
||||||
@@ -98,7 +98,7 @@ public class TestParquetInLining {
|
|||||||
// instantiate Parquet reader
|
// instantiate Parquet reader
|
||||||
ParquetReader inLineReader = AvroParquetReader.builder(inlinePath).withConf(inlineConf).build();
|
ParquetReader inLineReader = AvroParquetReader.builder(inlinePath).withConf(inlineConf).build();
|
||||||
List<GenericRecord> records = readParquetGenericRecords(inLineReader);
|
List<GenericRecord> records = readParquetGenericRecords(inLineReader);
|
||||||
Assert.assertArrayEquals(recordsToWrite.toArray(), records.toArray());
|
assertArrayEquals(recordsToWrite.toArray(), records.toArray());
|
||||||
inLineReader.close();
|
inLineReader.close();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -35,17 +35,17 @@ import org.apache.spark.api.java.JavaRDD;
|
|||||||
import org.apache.spark.sql.Dataset;
|
import org.apache.spark.sql.Dataset;
|
||||||
import org.apache.spark.sql.Row;
|
import org.apache.spark.sql.Row;
|
||||||
import org.apache.spark.sql.SparkSession;
|
import org.apache.spark.sql.SparkSession;
|
||||||
import org.junit.After;
|
import org.junit.jupiter.api.AfterAll;
|
||||||
import org.junit.AfterClass;
|
import org.junit.jupiter.api.AfterEach;
|
||||||
import org.junit.Before;
|
import org.junit.jupiter.api.BeforeAll;
|
||||||
import org.junit.BeforeClass;
|
import org.junit.jupiter.api.BeforeEach;
|
||||||
import org.junit.Test;
|
import org.junit.jupiter.api.Test;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
|
||||||
import static org.junit.Assert.assertEquals;
|
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||||
import static org.junit.Assert.assertTrue;
|
import static org.junit.jupiter.api.Assertions.assertTrue;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* An abstract test base for {@link Source} using DFS as the file system.
|
* An abstract test base for {@link Source} using DFS as the file system.
|
||||||
@@ -58,23 +58,23 @@ public abstract class AbstractDFSSourceTestBase extends UtilitiesTestBase {
|
|||||||
HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator();
|
HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator();
|
||||||
boolean useFlattenedSchema = false;
|
boolean useFlattenedSchema = false;
|
||||||
|
|
||||||
@BeforeClass
|
@BeforeAll
|
||||||
public static void initClass() throws Exception {
|
public static void initClass() throws Exception {
|
||||||
UtilitiesTestBase.initClass();
|
UtilitiesTestBase.initClass();
|
||||||
}
|
}
|
||||||
|
|
||||||
@AfterClass
|
@AfterAll
|
||||||
public static void cleanupClass() {
|
public static void cleanupClass() {
|
||||||
UtilitiesTestBase.cleanupClass();
|
UtilitiesTestBase.cleanupClass();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Before
|
@BeforeEach
|
||||||
public void setup() throws Exception {
|
public void setup() throws Exception {
|
||||||
super.setup();
|
super.setup();
|
||||||
schemaProvider = new FilebasedSchemaProvider(Helpers.setupSchemaOnDFS(), jsc);
|
schemaProvider = new FilebasedSchemaProvider(Helpers.setupSchemaOnDFS(), jsc);
|
||||||
}
|
}
|
||||||
|
|
||||||
@After
|
@AfterEach
|
||||||
public void teardown() throws Exception {
|
public void teardown() throws Exception {
|
||||||
super.teardown();
|
super.teardown();
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -24,7 +24,7 @@ import org.apache.hudi.utilities.UtilitiesTestBase;
|
|||||||
import org.apache.hudi.utilities.schema.FilebasedSchemaProvider;
|
import org.apache.hudi.utilities.schema.FilebasedSchemaProvider;
|
||||||
|
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
import org.junit.Before;
|
import org.junit.jupiter.api.BeforeEach;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
@@ -34,7 +34,7 @@ import java.util.List;
|
|||||||
*/
|
*/
|
||||||
public class TestCsvDFSSource extends AbstractDFSSourceTestBase {
|
public class TestCsvDFSSource extends AbstractDFSSourceTestBase {
|
||||||
|
|
||||||
@Before
|
@BeforeEach
|
||||||
public void setup() throws Exception {
|
public void setup() throws Exception {
|
||||||
super.setup();
|
super.setup();
|
||||||
this.dfsRoot = dfsBasePath + "/jsonFiles";
|
this.dfsRoot = dfsBasePath + "/jsonFiles";
|
||||||
|
|||||||
@@ -23,7 +23,7 @@ import org.apache.hudi.common.model.HoodieRecord;
|
|||||||
import org.apache.hudi.utilities.UtilitiesTestBase;
|
import org.apache.hudi.utilities.UtilitiesTestBase;
|
||||||
|
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
import org.junit.Before;
|
import org.junit.jupiter.api.BeforeEach;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
@@ -33,7 +33,7 @@ import java.util.List;
|
|||||||
*/
|
*/
|
||||||
public class TestJsonDFSSource extends AbstractDFSSourceTestBase {
|
public class TestJsonDFSSource extends AbstractDFSSourceTestBase {
|
||||||
|
|
||||||
@Before
|
@BeforeEach
|
||||||
public void setup() throws Exception {
|
public void setup() throws Exception {
|
||||||
super.setup();
|
super.setup();
|
||||||
this.dfsRoot = dfsBasePath + "/jsonFiles";
|
this.dfsRoot = dfsBasePath + "/jsonFiles";
|
||||||
|
|||||||
@@ -36,16 +36,16 @@ import org.apache.spark.sql.Dataset;
|
|||||||
import org.apache.spark.sql.Row;
|
import org.apache.spark.sql.Row;
|
||||||
import org.apache.spark.streaming.kafka010.KafkaTestUtils;
|
import org.apache.spark.streaming.kafka010.KafkaTestUtils;
|
||||||
import org.apache.spark.streaming.kafka010.OffsetRange;
|
import org.apache.spark.streaming.kafka010.OffsetRange;
|
||||||
import org.junit.After;
|
import org.junit.jupiter.api.AfterAll;
|
||||||
import org.junit.AfterClass;
|
import org.junit.jupiter.api.AfterEach;
|
||||||
import org.junit.Before;
|
import org.junit.jupiter.api.BeforeAll;
|
||||||
import org.junit.BeforeClass;
|
import org.junit.jupiter.api.BeforeEach;
|
||||||
import org.junit.Test;
|
import org.junit.jupiter.api.Test;
|
||||||
|
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
import java.util.UUID;
|
import java.util.UUID;
|
||||||
|
|
||||||
import static org.junit.Assert.assertEquals;
|
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Tests against {@link AvroKafkaSource}.
|
* Tests against {@link AvroKafkaSource}.
|
||||||
@@ -57,17 +57,17 @@ public class TestKafkaSource extends UtilitiesTestBase {
|
|||||||
private FilebasedSchemaProvider schemaProvider;
|
private FilebasedSchemaProvider schemaProvider;
|
||||||
private KafkaTestUtils testUtils;
|
private KafkaTestUtils testUtils;
|
||||||
|
|
||||||
@BeforeClass
|
@BeforeAll
|
||||||
public static void initClass() throws Exception {
|
public static void initClass() throws Exception {
|
||||||
UtilitiesTestBase.initClass();
|
UtilitiesTestBase.initClass();
|
||||||
}
|
}
|
||||||
|
|
||||||
@AfterClass
|
@AfterAll
|
||||||
public static void cleanupClass() {
|
public static void cleanupClass() {
|
||||||
UtilitiesTestBase.cleanupClass();
|
UtilitiesTestBase.cleanupClass();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Before
|
@BeforeEach
|
||||||
public void setup() throws Exception {
|
public void setup() throws Exception {
|
||||||
super.setup();
|
super.setup();
|
||||||
schemaProvider = new FilebasedSchemaProvider(Helpers.setupSchemaOnDFS(), jsc);
|
schemaProvider = new FilebasedSchemaProvider(Helpers.setupSchemaOnDFS(), jsc);
|
||||||
@@ -75,7 +75,7 @@ public class TestKafkaSource extends UtilitiesTestBase {
|
|||||||
testUtils.setup();
|
testUtils.setup();
|
||||||
}
|
}
|
||||||
|
|
||||||
@After
|
@AfterEach
|
||||||
public void teardown() throws Exception {
|
public void teardown() throws Exception {
|
||||||
super.teardown();
|
super.teardown();
|
||||||
testUtils.teardown();
|
testUtils.teardown();
|
||||||
|
|||||||
@@ -22,7 +22,7 @@ import org.apache.hudi.common.config.TypedProperties;
|
|||||||
import org.apache.hudi.common.model.HoodieRecord;
|
import org.apache.hudi.common.model.HoodieRecord;
|
||||||
|
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
import org.junit.Before;
|
import org.junit.jupiter.api.BeforeEach;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
@@ -32,7 +32,7 @@ import java.util.List;
|
|||||||
*/
|
*/
|
||||||
public class TestParquetDFSSource extends AbstractDFSSourceTestBase {
|
public class TestParquetDFSSource extends AbstractDFSSourceTestBase {
|
||||||
|
|
||||||
@Before
|
@BeforeEach
|
||||||
public void setup() throws Exception {
|
public void setup() throws Exception {
|
||||||
super.setup();
|
super.setup();
|
||||||
this.dfsRoot = dfsBasePath + "/parquetFiles";
|
this.dfsRoot = dfsBasePath + "/parquetFiles";
|
||||||
|
|||||||
@@ -22,9 +22,9 @@ import org.apache.spark.sql.types.DataTypes;
|
|||||||
import org.apache.spark.sql.types.Metadata;
|
import org.apache.spark.sql.types.Metadata;
|
||||||
import org.apache.spark.sql.types.StructField;
|
import org.apache.spark.sql.types.StructField;
|
||||||
import org.apache.spark.sql.types.StructType;
|
import org.apache.spark.sql.types.StructType;
|
||||||
import org.junit.Test;
|
import org.junit.jupiter.api.Test;
|
||||||
|
|
||||||
import static org.junit.Assert.assertEquals;
|
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||||
|
|
||||||
public class TestFlatteningTransformer {
|
public class TestFlatteningTransformer {
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user