[HUDI-780] Migrate test cases to Junit 5 (#1504)
This commit is contained in:
@@ -248,5 +248,36 @@
|
|||||||
<groupId>org.apache.hadoop</groupId>
|
<groupId>org.apache.hadoop</groupId>
|
||||||
<artifactId>hadoop-hdfs</artifactId>
|
<artifactId>hadoop-hdfs</artifactId>
|
||||||
</dependency>
|
</dependency>
|
||||||
|
|
||||||
|
<!-- Test -->
|
||||||
|
<dependency>
|
||||||
|
<groupId>org.junit.jupiter</groupId>
|
||||||
|
<artifactId>junit-jupiter-api</artifactId>
|
||||||
|
<scope>test</scope>
|
||||||
|
</dependency>
|
||||||
|
|
||||||
|
<dependency>
|
||||||
|
<groupId>org.junit.jupiter</groupId>
|
||||||
|
<artifactId>junit-jupiter-engine</artifactId>
|
||||||
|
<scope>test</scope>
|
||||||
|
</dependency>
|
||||||
|
|
||||||
|
<dependency>
|
||||||
|
<groupId>org.junit.vintage</groupId>
|
||||||
|
<artifactId>junit-vintage-engine</artifactId>
|
||||||
|
<scope>test</scope>
|
||||||
|
</dependency>
|
||||||
|
|
||||||
|
<dependency>
|
||||||
|
<groupId>org.junit.jupiter</groupId>
|
||||||
|
<artifactId>junit-jupiter-params</artifactId>
|
||||||
|
<scope>test</scope>
|
||||||
|
</dependency>
|
||||||
|
|
||||||
|
<dependency>
|
||||||
|
<groupId>org.mockito</groupId>
|
||||||
|
<artifactId>mockito-all</artifactId>
|
||||||
|
<scope>test</scope>
|
||||||
|
</dependency>
|
||||||
</dependencies>
|
</dependencies>
|
||||||
</project>
|
</project>
|
||||||
|
|||||||
@@ -239,6 +239,30 @@
|
|||||||
<scope>test</scope>
|
<scope>test</scope>
|
||||||
</dependency>
|
</dependency>
|
||||||
|
|
||||||
|
<dependency>
|
||||||
|
<groupId>org.junit.jupiter</groupId>
|
||||||
|
<artifactId>junit-jupiter-api</artifactId>
|
||||||
|
<scope>test</scope>
|
||||||
|
</dependency>
|
||||||
|
|
||||||
|
<dependency>
|
||||||
|
<groupId>org.junit.jupiter</groupId>
|
||||||
|
<artifactId>junit-jupiter-engine</artifactId>
|
||||||
|
<scope>test</scope>
|
||||||
|
</dependency>
|
||||||
|
|
||||||
|
<dependency>
|
||||||
|
<groupId>org.junit.vintage</groupId>
|
||||||
|
<artifactId>junit-vintage-engine</artifactId>
|
||||||
|
<scope>test</scope>
|
||||||
|
</dependency>
|
||||||
|
|
||||||
|
<dependency>
|
||||||
|
<groupId>org.junit.jupiter</groupId>
|
||||||
|
<artifactId>junit-jupiter-params</artifactId>
|
||||||
|
<scope>test</scope>
|
||||||
|
</dependency>
|
||||||
|
|
||||||
<dependency>
|
<dependency>
|
||||||
<groupId>org.mockito</groupId>
|
<groupId>org.mockito</groupId>
|
||||||
<artifactId>mockito-all</artifactId>
|
<artifactId>mockito-all</artifactId>
|
||||||
|
|||||||
@@ -20,10 +20,10 @@ package org.apache.hudi.metrics;
|
|||||||
|
|
||||||
import org.apache.hudi.config.HoodieWriteConfig;
|
import org.apache.hudi.config.HoodieWriteConfig;
|
||||||
|
|
||||||
import org.junit.Test;
|
import org.junit.jupiter.api.Test;
|
||||||
|
|
||||||
import static org.apache.hudi.metrics.Metrics.registerGauge;
|
import static org.apache.hudi.metrics.Metrics.registerGauge;
|
||||||
import static org.junit.Assert.assertEquals;
|
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||||
import static org.mockito.Mockito.mock;
|
import static org.mockito.Mockito.mock;
|
||||||
import static org.mockito.Mockito.when;
|
import static org.mockito.Mockito.when;
|
||||||
|
|
||||||
|
|||||||
@@ -22,22 +22,23 @@ import org.apache.hudi.common.model.HoodieCommitMetadata;
|
|||||||
import org.apache.hudi.config.HoodieWriteConfig;
|
import org.apache.hudi.config.HoodieWriteConfig;
|
||||||
|
|
||||||
import com.codahale.metrics.Timer;
|
import com.codahale.metrics.Timer;
|
||||||
import org.junit.Before;
|
import org.junit.jupiter.api.BeforeEach;
|
||||||
import org.junit.Test;
|
import org.junit.jupiter.api.Test;
|
||||||
|
|
||||||
import java.util.Arrays;
|
|
||||||
import java.util.Random;
|
import java.util.Random;
|
||||||
|
import java.util.stream.Stream;
|
||||||
|
|
||||||
import static org.apache.hudi.metrics.Metrics.registerGauge;
|
import static org.apache.hudi.metrics.Metrics.registerGauge;
|
||||||
import static org.junit.Assert.assertEquals;
|
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||||
import static org.junit.Assert.assertTrue;
|
import static org.junit.jupiter.api.Assertions.assertTrue;
|
||||||
import static org.mockito.Mockito.mock;
|
import static org.mockito.Mockito.mock;
|
||||||
import static org.mockito.Mockito.when;
|
import static org.mockito.Mockito.when;
|
||||||
|
|
||||||
public class TestHoodieMetrics {
|
public class TestHoodieMetrics {
|
||||||
|
|
||||||
private HoodieMetrics metrics;
|
private HoodieMetrics metrics;
|
||||||
|
|
||||||
@Before
|
@BeforeEach
|
||||||
public void start() {
|
public void start() {
|
||||||
HoodieWriteConfig config = mock(HoodieWriteConfig.class);
|
HoodieWriteConfig config = mock(HoodieWriteConfig.class);
|
||||||
when(config.isMetricsOn()).thenReturn(true);
|
when(config.isMetricsOn()).thenReturn(true);
|
||||||
@@ -97,7 +98,7 @@ public class TestHoodieMetrics {
|
|||||||
assertEquals((long)Metrics.getInstance().getRegistry().getGauges().get(metricName).getValue(), numFilesFinalized);
|
assertEquals((long)Metrics.getInstance().getRegistry().getGauges().get(metricName).getValue(), numFilesFinalized);
|
||||||
|
|
||||||
// Commit / deltacommit / compaction metrics
|
// Commit / deltacommit / compaction metrics
|
||||||
Arrays.asList("commit", "deltacommit", "compaction").stream().forEach(action -> {
|
Stream.of("commit", "deltacommit", "compaction").forEach(action -> {
|
||||||
Timer.Context commitTimer = action.equals("commit") ? metrics.getCommitCtx() :
|
Timer.Context commitTimer = action.equals("commit") ? metrics.getCommitCtx() :
|
||||||
action.equals("deltacommit") ? metrics.getDeltaCommitCtx() : metrics.getCompactionCtx();
|
action.equals("deltacommit") ? metrics.getDeltaCommitCtx() : metrics.getCompactionCtx();
|
||||||
|
|
||||||
|
|||||||
@@ -153,6 +153,30 @@
|
|||||||
<scope>test</scope>
|
<scope>test</scope>
|
||||||
</dependency>
|
</dependency>
|
||||||
|
|
||||||
|
<dependency>
|
||||||
|
<groupId>org.junit.jupiter</groupId>
|
||||||
|
<artifactId>junit-jupiter-api</artifactId>
|
||||||
|
<scope>test</scope>
|
||||||
|
</dependency>
|
||||||
|
|
||||||
|
<dependency>
|
||||||
|
<groupId>org.junit.jupiter</groupId>
|
||||||
|
<artifactId>junit-jupiter-engine</artifactId>
|
||||||
|
<scope>test</scope>
|
||||||
|
</dependency>
|
||||||
|
|
||||||
|
<dependency>
|
||||||
|
<groupId>org.junit.vintage</groupId>
|
||||||
|
<artifactId>junit-vintage-engine</artifactId>
|
||||||
|
<scope>test</scope>
|
||||||
|
</dependency>
|
||||||
|
|
||||||
|
<dependency>
|
||||||
|
<groupId>org.junit.jupiter</groupId>
|
||||||
|
<artifactId>junit-jupiter-params</artifactId>
|
||||||
|
<scope>test</scope>
|
||||||
|
</dependency>
|
||||||
|
|
||||||
<dependency>
|
<dependency>
|
||||||
<groupId>org.mockito</groupId>
|
<groupId>org.mockito</groupId>
|
||||||
<artifactId>mockito-all</artifactId>
|
<artifactId>mockito-all</artifactId>
|
||||||
|
|||||||
@@ -21,11 +21,15 @@ package org.apache.hudi.avro;
|
|||||||
import org.apache.avro.Schema;
|
import org.apache.avro.Schema;
|
||||||
import org.apache.avro.generic.GenericData;
|
import org.apache.avro.generic.GenericData;
|
||||||
import org.apache.avro.generic.GenericRecord;
|
import org.apache.avro.generic.GenericRecord;
|
||||||
import org.junit.Assert;
|
import org.junit.jupiter.api.Test;
|
||||||
import org.junit.Test;
|
|
||||||
|
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
|
||||||
|
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||||
|
import static org.junit.jupiter.api.Assertions.assertNotNull;
|
||||||
|
import static org.junit.jupiter.api.Assertions.assertNull;
|
||||||
|
import static org.junit.jupiter.api.Assertions.assertTrue;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Tests hoodie avro utilities.
|
* Tests hoodie avro utilities.
|
||||||
*/
|
*/
|
||||||
@@ -59,18 +63,18 @@ public class TestHoodieAvroUtils {
|
|||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
Assert.assertNotNull("field name is null", field.name());
|
assertNotNull(field.name(), "field name is null");
|
||||||
Map<String, Object> props = field.getObjectProps();
|
Map<String, Object> props = field.getObjectProps();
|
||||||
Assert.assertNotNull("The property is null", props);
|
assertNotNull(props, "The property is null");
|
||||||
|
|
||||||
if (field.name().equals("pii_col")) {
|
if (field.name().equals("pii_col")) {
|
||||||
piiPresent = true;
|
piiPresent = true;
|
||||||
Assert.assertTrue("sensitivity_level is removed in field 'pii_col'", props.containsKey("column_category"));
|
assertTrue(props.containsKey("column_category"), "sensitivity_level is removed in field 'pii_col'");
|
||||||
} else {
|
} else {
|
||||||
Assert.assertEquals("The property shows up but not set", 0, props.size());
|
assertEquals(0, props.size(), "The property shows up but not set");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
Assert.assertTrue("column pii_col doesn't show up", piiPresent);
|
assertTrue(piiPresent, "column pii_col doesn't show up");
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
@@ -81,8 +85,8 @@ public class TestHoodieAvroUtils {
|
|||||||
rec.put("pii_col", "val2");
|
rec.put("pii_col", "val2");
|
||||||
rec.put("timestamp", 3.5);
|
rec.put("timestamp", 3.5);
|
||||||
GenericRecord rec1 = HoodieAvroUtils.rewriteRecord(rec, new Schema.Parser().parse(EVOLVED_SCHEMA));
|
GenericRecord rec1 = HoodieAvroUtils.rewriteRecord(rec, new Schema.Parser().parse(EVOLVED_SCHEMA));
|
||||||
Assert.assertEquals(rec1.get("new_col1"), "dummy_val");
|
assertEquals(rec1.get("new_col1"), "dummy_val");
|
||||||
Assert.assertNull(rec1.get("new_col2"));
|
assertNull(rec1.get("new_col2"));
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
@@ -93,8 +97,8 @@ public class TestHoodieAvroUtils {
|
|||||||
rec.put("pii_col", "val2");
|
rec.put("pii_col", "val2");
|
||||||
rec.put("timestamp", 3.5);
|
rec.put("timestamp", 3.5);
|
||||||
GenericRecord rec1 = HoodieAvroUtils.rewriteRecord(rec, new Schema.Parser().parse(EVOLVED_SCHEMA));
|
GenericRecord rec1 = HoodieAvroUtils.rewriteRecord(rec, new Schema.Parser().parse(EVOLVED_SCHEMA));
|
||||||
Assert.assertEquals(rec1.get("new_col1"), "dummy_val");
|
assertEquals(rec1.get("new_col1"), "dummy_val");
|
||||||
Assert.assertNull(rec1.get("new_col2"));
|
assertNull(rec1.get("new_col2"));
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
@@ -105,6 +109,6 @@ public class TestHoodieAvroUtils {
|
|||||||
rec.put("pii_col", "val2");
|
rec.put("pii_col", "val2");
|
||||||
rec.put("timestamp", 3.5);
|
rec.put("timestamp", 3.5);
|
||||||
GenericRecord rec1 = HoodieAvroUtils.rewriteRecord(rec, new Schema.Parser().parse(SCHEMA_WITH_METADATA_FIELD));
|
GenericRecord rec1 = HoodieAvroUtils.rewriteRecord(rec, new Schema.Parser().parse(SCHEMA_WITH_METADATA_FIELD));
|
||||||
Assert.assertNull(rec1.get("_hoodie_commit_time"));
|
assertNull(rec1.get("_hoodie_commit_time"));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -18,20 +18,20 @@
|
|||||||
|
|
||||||
package org.apache.hudi.avro;
|
package org.apache.hudi.avro;
|
||||||
|
|
||||||
import org.apache.avro.Schema;
|
|
||||||
import org.apache.avro.generic.GenericData;
|
|
||||||
import org.apache.avro.generic.GenericRecord;
|
|
||||||
import org.apache.hadoop.fs.Path;
|
|
||||||
import org.apache.hudi.common.bloom.BloomFilter;
|
import org.apache.hudi.common.bloom.BloomFilter;
|
||||||
import org.apache.hudi.common.bloom.BloomFilterFactory;
|
import org.apache.hudi.common.bloom.BloomFilterFactory;
|
||||||
import org.apache.hudi.common.bloom.BloomFilterTypeCode;
|
import org.apache.hudi.common.bloom.BloomFilterTypeCode;
|
||||||
import org.apache.hudi.common.model.HoodieRecord;
|
import org.apache.hudi.common.model.HoodieRecord;
|
||||||
|
|
||||||
|
import org.apache.avro.Schema;
|
||||||
|
import org.apache.avro.generic.GenericData;
|
||||||
|
import org.apache.avro.generic.GenericRecord;
|
||||||
|
import org.apache.hadoop.fs.Path;
|
||||||
import org.apache.parquet.avro.AvroSchemaConverter;
|
import org.apache.parquet.avro.AvroSchemaConverter;
|
||||||
import org.apache.parquet.hadoop.ParquetWriter;
|
import org.apache.parquet.hadoop.ParquetWriter;
|
||||||
import org.apache.parquet.hadoop.metadata.CompressionCodecName;
|
import org.apache.parquet.hadoop.metadata.CompressionCodecName;
|
||||||
import org.junit.Rule;
|
import org.junit.jupiter.api.Test;
|
||||||
import org.junit.Test;
|
import org.junit.jupiter.api.io.TempDir;
|
||||||
import org.junit.rules.TemporaryFolder;
|
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
@@ -40,16 +40,13 @@ import java.util.UUID;
|
|||||||
|
|
||||||
public class TestHoodieAvroWriteSupport {
|
public class TestHoodieAvroWriteSupport {
|
||||||
|
|
||||||
@Rule
|
|
||||||
public TemporaryFolder folder = new TemporaryFolder();
|
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testAddKey() throws IOException {
|
public void testAddKey(@TempDir java.nio.file.Path tempDir) throws IOException {
|
||||||
List<String> rowKeys = new ArrayList<>();
|
List<String> rowKeys = new ArrayList<>();
|
||||||
for (int i = 0; i < 1000; i++) {
|
for (int i = 0; i < 1000; i++) {
|
||||||
rowKeys.add(UUID.randomUUID().toString());
|
rowKeys.add(UUID.randomUUID().toString());
|
||||||
}
|
}
|
||||||
String filePath = folder.getRoot() + "/test.parquet";
|
String filePath = tempDir.resolve("test.parquet").toAbsolutePath().toString();
|
||||||
Schema schema = HoodieAvroUtils.getRecordKeySchema();
|
Schema schema = HoodieAvroUtils.getRecordKeySchema();
|
||||||
BloomFilter filter = BloomFilterFactory.createBloomFilter(
|
BloomFilter filter = BloomFilterFactory.createBloomFilter(
|
||||||
1000, 0.0001, 10000,
|
1000, 0.0001, 10000,
|
||||||
|
|||||||
@@ -102,6 +102,30 @@
|
|||||||
<scope>test</scope>
|
<scope>test</scope>
|
||||||
</dependency>
|
</dependency>
|
||||||
|
|
||||||
|
<dependency>
|
||||||
|
<groupId>org.junit.jupiter</groupId>
|
||||||
|
<artifactId>junit-jupiter-api</artifactId>
|
||||||
|
<scope>test</scope>
|
||||||
|
</dependency>
|
||||||
|
|
||||||
|
<dependency>
|
||||||
|
<groupId>org.junit.jupiter</groupId>
|
||||||
|
<artifactId>junit-jupiter-engine</artifactId>
|
||||||
|
<scope>test</scope>
|
||||||
|
</dependency>
|
||||||
|
|
||||||
|
<dependency>
|
||||||
|
<groupId>org.junit.vintage</groupId>
|
||||||
|
<artifactId>junit-vintage-engine</artifactId>
|
||||||
|
<scope>test</scope>
|
||||||
|
</dependency>
|
||||||
|
|
||||||
|
<dependency>
|
||||||
|
<groupId>org.junit.jupiter</groupId>
|
||||||
|
<artifactId>junit-jupiter-params</artifactId>
|
||||||
|
<scope>test</scope>
|
||||||
|
</dependency>
|
||||||
|
|
||||||
<dependency>
|
<dependency>
|
||||||
<groupId>org.mockito</groupId>
|
<groupId>org.mockito</groupId>
|
||||||
<artifactId>mockito-all</artifactId>
|
<artifactId>mockito-all</artifactId>
|
||||||
|
|||||||
@@ -29,10 +29,10 @@ import org.apache.hadoop.fs.FileSystem;
|
|||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
import org.apache.hadoop.hdfs.DistributedFileSystem;
|
import org.apache.hadoop.hdfs.DistributedFileSystem;
|
||||||
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
||||||
import org.junit.AfterClass;
|
import org.junit.jupiter.api.AfterAll;
|
||||||
import org.junit.BeforeClass;
|
import org.junit.jupiter.api.BeforeAll;
|
||||||
import org.junit.Test;
|
import org.junit.jupiter.api.Test;
|
||||||
import org.junit.rules.TemporaryFolder;
|
import org.junit.jupiter.api.io.TempDir;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
@@ -40,7 +40,7 @@ import java.util.List;
|
|||||||
import java.util.Properties;
|
import java.util.Properties;
|
||||||
import java.util.stream.Collectors;
|
import java.util.stream.Collectors;
|
||||||
|
|
||||||
import static org.junit.Assert.assertTrue;
|
import static org.junit.jupiter.api.Assertions.assertTrue;
|
||||||
|
|
||||||
public class InputPathHandlerTest {
|
public class InputPathHandlerTest {
|
||||||
|
|
||||||
@@ -54,6 +54,9 @@ public class InputPathHandlerTest {
|
|||||||
// non Hoodie table
|
// non Hoodie table
|
||||||
public static final String TRIPS_STATS_TEST_NAME = "trips_stats";
|
public static final String TRIPS_STATS_TEST_NAME = "trips_stats";
|
||||||
|
|
||||||
|
@TempDir
|
||||||
|
static java.nio.file.Path parentPath;
|
||||||
|
|
||||||
private static MiniDFSCluster dfsCluster;
|
private static MiniDFSCluster dfsCluster;
|
||||||
private static DistributedFileSystem dfs;
|
private static DistributedFileSystem dfs;
|
||||||
private static HdfsTestService hdfsTestService;
|
private static HdfsTestService hdfsTestService;
|
||||||
@@ -68,7 +71,7 @@ public class InputPathHandlerTest {
|
|||||||
private static List<Path> nonHoodiePaths;
|
private static List<Path> nonHoodiePaths;
|
||||||
private static List<Path> inputPaths;
|
private static List<Path> inputPaths;
|
||||||
|
|
||||||
@BeforeClass
|
@BeforeAll
|
||||||
public static void setUpDFS() throws IOException {
|
public static void setUpDFS() throws IOException {
|
||||||
// Need to closeAll to clear FileSystem.Cache, required because DFS and LocalFS used in the
|
// Need to closeAll to clear FileSystem.Cache, required because DFS and LocalFS used in the
|
||||||
// same JVM
|
// same JVM
|
||||||
@@ -86,7 +89,7 @@ public class InputPathHandlerTest {
|
|||||||
initTables();
|
initTables();
|
||||||
}
|
}
|
||||||
|
|
||||||
@AfterClass
|
@AfterAll
|
||||||
public static void cleanUp() throws Exception {
|
public static void cleanUp() throws Exception {
|
||||||
if (hdfsTestService != null) {
|
if (hdfsTestService != null) {
|
||||||
hdfsTestService.stop();
|
hdfsTestService.stop();
|
||||||
@@ -101,13 +104,10 @@ public class InputPathHandlerTest {
|
|||||||
}
|
}
|
||||||
|
|
||||||
static void initTables() throws IOException {
|
static void initTables() throws IOException {
|
||||||
// Create a temp folder as the base path
|
basePathTable1 = parentPath.resolve(RAW_TRIPS_TEST_NAME).toAbsolutePath().toString();
|
||||||
TemporaryFolder parentFolder = new TemporaryFolder();
|
basePathTable2 = parentPath.resolve(MODEL_TRIPS_TEST_NAME).toAbsolutePath().toString();
|
||||||
parentFolder.create();
|
basePathTable3 = parentPath.resolve(ETL_TRIPS_TEST_NAME).toAbsolutePath().toString();
|
||||||
basePathTable1 = parentFolder.newFolder(RAW_TRIPS_TEST_NAME).getAbsolutePath();
|
basePathTable4 = parentPath.resolve(TRIPS_STATS_TEST_NAME).toAbsolutePath().toString();
|
||||||
basePathTable2 = parentFolder.newFolder(MODEL_TRIPS_TEST_NAME).getAbsolutePath();
|
|
||||||
basePathTable3 = parentFolder.newFolder(ETL_TRIPS_TEST_NAME).getAbsolutePath();
|
|
||||||
basePathTable4 = parentFolder.newFolder(TRIPS_STATS_TEST_NAME).getAbsolutePath();
|
|
||||||
|
|
||||||
dfs.mkdirs(new Path(basePathTable1));
|
dfs.mkdirs(new Path(basePathTable1));
|
||||||
initTableType(dfs.getConf(), basePathTable1, RAW_TRIPS_TEST_NAME, HoodieTableType.MERGE_ON_READ);
|
initTableType(dfs.getConf(), basePathTable1, RAW_TRIPS_TEST_NAME, HoodieTableType.MERGE_ON_READ);
|
||||||
|
|||||||
@@ -18,11 +18,11 @@
|
|||||||
|
|
||||||
package org.apache.hudi.hadoop;
|
package org.apache.hudi.hadoop;
|
||||||
|
|
||||||
import org.junit.Test;
|
import org.junit.jupiter.api.Test;
|
||||||
|
|
||||||
import java.lang.annotation.Annotation;
|
import java.lang.annotation.Annotation;
|
||||||
|
|
||||||
import static org.junit.Assert.assertTrue;
|
import static org.junit.jupiter.api.Assertions.assertTrue;
|
||||||
|
|
||||||
public class TestAnnotation {
|
public class TestAnnotation {
|
||||||
|
|
||||||
|
|||||||
@@ -23,13 +23,16 @@ import org.apache.hudi.common.util.collection.Pair;
|
|||||||
import org.apache.hadoop.io.IntWritable;
|
import org.apache.hadoop.io.IntWritable;
|
||||||
import org.apache.hadoop.io.Text;
|
import org.apache.hadoop.io.Text;
|
||||||
import org.apache.hadoop.mapred.RecordReader;
|
import org.apache.hadoop.mapred.RecordReader;
|
||||||
import org.junit.Assert;
|
import org.junit.jupiter.api.Test;
|
||||||
import org.junit.Test;
|
|
||||||
|
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.stream.Collectors;
|
import java.util.stream.Collectors;
|
||||||
import java.util.stream.IntStream;
|
import java.util.stream.IntStream;
|
||||||
|
|
||||||
|
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||||
|
import static org.junit.jupiter.api.Assertions.assertFalse;
|
||||||
|
import static org.junit.jupiter.api.Assertions.assertTrue;
|
||||||
|
|
||||||
public class TestRecordReaderValueIterator {
|
public class TestRecordReaderValueIterator {
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
@@ -40,11 +43,11 @@ public class TestRecordReaderValueIterator {
|
|||||||
TestRecordReader reader = new TestRecordReader(entries);
|
TestRecordReader reader = new TestRecordReader(entries);
|
||||||
RecordReaderValueIterator<IntWritable, Text> itr = new RecordReaderValueIterator<IntWritable, Text>(reader);
|
RecordReaderValueIterator<IntWritable, Text> itr = new RecordReaderValueIterator<IntWritable, Text>(reader);
|
||||||
for (int i = 0; i < values.length; i++) {
|
for (int i = 0; i < values.length; i++) {
|
||||||
Assert.assertTrue(itr.hasNext());
|
assertTrue(itr.hasNext());
|
||||||
Text val = itr.next();
|
Text val = itr.next();
|
||||||
Assert.assertEquals(values[i], val.toString());
|
assertEquals(values[i], val.toString());
|
||||||
}
|
}
|
||||||
Assert.assertFalse(itr.hasNext());
|
assertFalse(itr.hasNext());
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|||||||
@@ -142,14 +142,32 @@
|
|||||||
</dependency>
|
</dependency>
|
||||||
|
|
||||||
<dependency>
|
<dependency>
|
||||||
<groupId>org.mockito</groupId>
|
<groupId>org.junit.jupiter</groupId>
|
||||||
<artifactId>mockito-all</artifactId>
|
<artifactId>junit-jupiter-api</artifactId>
|
||||||
<scope>test</scope>
|
<scope>test</scope>
|
||||||
</dependency>
|
</dependency>
|
||||||
|
|
||||||
<dependency>
|
<dependency>
|
||||||
<groupId>junit</groupId>
|
<groupId>org.junit.jupiter</groupId>
|
||||||
<artifactId>junit</artifactId>
|
<artifactId>junit-jupiter-engine</artifactId>
|
||||||
|
<scope>test</scope>
|
||||||
|
</dependency>
|
||||||
|
|
||||||
|
<dependency>
|
||||||
|
<groupId>org.junit.vintage</groupId>
|
||||||
|
<artifactId>junit-vintage-engine</artifactId>
|
||||||
|
<scope>test</scope>
|
||||||
|
</dependency>
|
||||||
|
|
||||||
|
<dependency>
|
||||||
|
<groupId>org.junit.jupiter</groupId>
|
||||||
|
<artifactId>junit-jupiter-params</artifactId>
|
||||||
|
<scope>test</scope>
|
||||||
|
</dependency>
|
||||||
|
|
||||||
|
<dependency>
|
||||||
|
<groupId>org.mockito</groupId>
|
||||||
|
<artifactId>mockito-all</artifactId>
|
||||||
<scope>test</scope>
|
<scope>test</scope>
|
||||||
</dependency>
|
</dependency>
|
||||||
|
|
||||||
|
|||||||
@@ -30,55 +30,47 @@ import org.apache.parquet.schema.OriginalType;
|
|||||||
import org.apache.parquet.schema.PrimitiveType;
|
import org.apache.parquet.schema.PrimitiveType;
|
||||||
import org.apache.parquet.schema.Types;
|
import org.apache.parquet.schema.Types;
|
||||||
import org.joda.time.DateTime;
|
import org.joda.time.DateTime;
|
||||||
import org.junit.After;
|
import org.junit.jupiter.api.AfterAll;
|
||||||
import org.junit.AfterClass;
|
import org.junit.jupiter.api.AfterEach;
|
||||||
import org.junit.Before;
|
import org.junit.jupiter.api.BeforeEach;
|
||||||
import org.junit.Test;
|
import org.junit.jupiter.api.Test;
|
||||||
import org.junit.runner.RunWith;
|
import org.junit.jupiter.params.ParameterizedTest;
|
||||||
import org.junit.runners.Parameterized;
|
import org.junit.jupiter.params.provider.MethodSource;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
import java.util.Collection;
|
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
import java.util.stream.Stream;
|
||||||
|
|
||||||
import static org.junit.Assert.assertEquals;
|
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||||
import static org.junit.Assert.assertFalse;
|
import static org.junit.jupiter.api.Assertions.assertFalse;
|
||||||
import static org.junit.Assert.assertTrue;
|
import static org.junit.jupiter.api.Assertions.assertTrue;
|
||||||
|
|
||||||
@RunWith(Parameterized.class)
|
|
||||||
public class TestHiveSyncTool {
|
public class TestHiveSyncTool {
|
||||||
|
|
||||||
// Test sync tool using both jdbc and metastore client
|
private static Stream<Boolean> useJdbc() {
|
||||||
private boolean useJdbc;
|
return Stream.of(false, true);
|
||||||
|
|
||||||
public TestHiveSyncTool(Boolean useJdbc) {
|
|
||||||
this.useJdbc = useJdbc;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Parameterized.Parameters(name = "UseJdbc")
|
@BeforeEach
|
||||||
public static Collection<Boolean[]> data() {
|
|
||||||
return Arrays.asList(new Boolean[][] {{false}, {true}});
|
|
||||||
}
|
|
||||||
|
|
||||||
@Before
|
|
||||||
public void setUp() throws IOException, InterruptedException {
|
public void setUp() throws IOException, InterruptedException {
|
||||||
TestUtil.setUp();
|
TestUtil.setUp();
|
||||||
}
|
}
|
||||||
|
|
||||||
@After
|
@AfterEach
|
||||||
public void teardown() throws IOException {
|
public void teardown() throws IOException {
|
||||||
TestUtil.clear();
|
TestUtil.clear();
|
||||||
}
|
}
|
||||||
|
|
||||||
@AfterClass
|
@AfterAll
|
||||||
public static void cleanUpClass() {
|
public static void cleanUpClass() {
|
||||||
TestUtil.shutdown();
|
TestUtil.shutdown();
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Testing converting array types to Hive field declaration strings. According to the Parquet-113 spec:
|
* Testing converting array types to Hive field declaration strings.
|
||||||
* https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#lists
|
* <p>
|
||||||
|
* Refer to the Parquet-113 spec: https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#lists
|
||||||
*/
|
*/
|
||||||
@Test
|
@Test
|
||||||
public void testSchemaConvertArray() throws IOException {
|
public void testSchemaConvertArray() throws IOException {
|
||||||
@@ -153,44 +145,45 @@ public class TestHiveSyncTool {
|
|||||||
assertEquals("`map_list` ARRAY< MAP< string, int>>", schemaString);
|
assertEquals("`map_list` ARRAY< MAP< string, int>>", schemaString);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@ParameterizedTest
|
||||||
public void testBasicSync() throws Exception {
|
@MethodSource("useJdbc")
|
||||||
TestUtil.hiveSyncConfig.useJdbc = this.useJdbc;
|
public void testBasicSync(boolean useJdbc) throws Exception {
|
||||||
|
TestUtil.hiveSyncConfig.useJdbc = useJdbc;
|
||||||
String instantTime = "100";
|
String instantTime = "100";
|
||||||
TestUtil.createCOWTable(instantTime, 5);
|
TestUtil.createCOWTable(instantTime, 5);
|
||||||
HoodieHiveClient hiveClient =
|
HoodieHiveClient hiveClient =
|
||||||
new HoodieHiveClient(TestUtil.hiveSyncConfig, TestUtil.getHiveConf(), TestUtil.fileSystem);
|
new HoodieHiveClient(TestUtil.hiveSyncConfig, TestUtil.getHiveConf(), TestUtil.fileSystem);
|
||||||
assertFalse("Table " + TestUtil.hiveSyncConfig.tableName + " should not exist initially",
|
assertFalse(hiveClient.doesTableExist(TestUtil.hiveSyncConfig.tableName),
|
||||||
hiveClient.doesTableExist(TestUtil.hiveSyncConfig.tableName));
|
"Table " + TestUtil.hiveSyncConfig.tableName + " should not exist initially");
|
||||||
// Lets do the sync
|
// Lets do the sync
|
||||||
HiveSyncTool tool = new HiveSyncTool(TestUtil.hiveSyncConfig, TestUtil.getHiveConf(), TestUtil.fileSystem);
|
HiveSyncTool tool = new HiveSyncTool(TestUtil.hiveSyncConfig, TestUtil.getHiveConf(), TestUtil.fileSystem);
|
||||||
tool.syncHoodieTable();
|
tool.syncHoodieTable();
|
||||||
assertTrue("Table " + TestUtil.hiveSyncConfig.tableName + " should exist after sync completes",
|
assertTrue(hiveClient.doesTableExist(TestUtil.hiveSyncConfig.tableName),
|
||||||
hiveClient.doesTableExist(TestUtil.hiveSyncConfig.tableName));
|
"Table " + TestUtil.hiveSyncConfig.tableName + " should exist after sync completes");
|
||||||
assertEquals("Hive Schema should match the table schema + partition field",
|
assertEquals(hiveClient.getTableSchema(TestUtil.hiveSyncConfig.tableName).size(),
|
||||||
hiveClient.getTableSchema(TestUtil.hiveSyncConfig.tableName).size(),
|
hiveClient.getDataSchema().getColumns().size() + 1,
|
||||||
hiveClient.getDataSchema().getColumns().size() + 1);
|
"Hive Schema should match the table schema + partition field");
|
||||||
assertEquals("Table partitions should match the number of partitions we wrote", 5,
|
assertEquals(5, hiveClient.scanTablePartitions(TestUtil.hiveSyncConfig.tableName).size(),
|
||||||
hiveClient.scanTablePartitions(TestUtil.hiveSyncConfig.tableName).size());
|
"Table partitions should match the number of partitions we wrote");
|
||||||
assertEquals("The last commit that was sycned should be updated in the TBLPROPERTIES", instantTime,
|
assertEquals(instantTime, hiveClient.getLastCommitTimeSynced(TestUtil.hiveSyncConfig.tableName).get(),
|
||||||
hiveClient.getLastCommitTimeSynced(TestUtil.hiveSyncConfig.tableName).get());
|
"The last commit that was sycned should be updated in the TBLPROPERTIES");
|
||||||
|
|
||||||
// Adding of new partitions
|
// Adding of new partitions
|
||||||
List<String> newPartition = Arrays.asList("2050/01/01");
|
List<String> newPartition = Arrays.asList("2050/01/01");
|
||||||
hiveClient.addPartitionsToTable(TestUtil.hiveSyncConfig.tableName, Arrays.asList());
|
hiveClient.addPartitionsToTable(TestUtil.hiveSyncConfig.tableName, Arrays.asList());
|
||||||
assertEquals("No new partition should be added", 5,
|
assertEquals(5, hiveClient.scanTablePartitions(TestUtil.hiveSyncConfig.tableName).size(),
|
||||||
hiveClient.scanTablePartitions(TestUtil.hiveSyncConfig.tableName).size());
|
"No new partition should be added");
|
||||||
hiveClient.addPartitionsToTable(TestUtil.hiveSyncConfig.tableName, newPartition);
|
hiveClient.addPartitionsToTable(TestUtil.hiveSyncConfig.tableName, newPartition);
|
||||||
assertEquals("New partition should be added", 6,
|
assertEquals(6, hiveClient.scanTablePartitions(TestUtil.hiveSyncConfig.tableName).size(),
|
||||||
hiveClient.scanTablePartitions(TestUtil.hiveSyncConfig.tableName).size());
|
"New partition should be added");
|
||||||
|
|
||||||
// Update partitions
|
// Update partitions
|
||||||
hiveClient.updatePartitionsToTable(TestUtil.hiveSyncConfig.tableName, Arrays.asList());
|
hiveClient.updatePartitionsToTable(TestUtil.hiveSyncConfig.tableName, Arrays.asList());
|
||||||
assertEquals("Partition count should remain the same", 6,
|
assertEquals(6, hiveClient.scanTablePartitions(TestUtil.hiveSyncConfig.tableName).size(),
|
||||||
hiveClient.scanTablePartitions(TestUtil.hiveSyncConfig.tableName).size());
|
"Partition count should remain the same");
|
||||||
hiveClient.updatePartitionsToTable(TestUtil.hiveSyncConfig.tableName, newPartition);
|
hiveClient.updatePartitionsToTable(TestUtil.hiveSyncConfig.tableName, newPartition);
|
||||||
assertEquals("Partition count should remain the same", 6,
|
assertEquals(6, hiveClient.scanTablePartitions(TestUtil.hiveSyncConfig.tableName).size(),
|
||||||
hiveClient.scanTablePartitions(TestUtil.hiveSyncConfig.tableName).size());
|
"Partition count should remain the same");
|
||||||
|
|
||||||
// Alter partitions
|
// Alter partitions
|
||||||
// Manually change a hive partition location to check if the sync will detect
|
// Manually change a hive partition location to check if the sync will detect
|
||||||
@@ -203,23 +196,23 @@ public class TestHiveSyncTool {
|
|||||||
List<String> writtenPartitionsSince = hiveClient.getPartitionsWrittenToSince(Option.empty());
|
List<String> writtenPartitionsSince = hiveClient.getPartitionsWrittenToSince(Option.empty());
|
||||||
writtenPartitionsSince.add(newPartition.get(0));
|
writtenPartitionsSince.add(newPartition.get(0));
|
||||||
List<PartitionEvent> partitionEvents = hiveClient.getPartitionEvents(hivePartitions, writtenPartitionsSince);
|
List<PartitionEvent> partitionEvents = hiveClient.getPartitionEvents(hivePartitions, writtenPartitionsSince);
|
||||||
assertEquals("There should be only one paritition event", 1, partitionEvents.size());
|
assertEquals(1, partitionEvents.size(), "There should be only one paritition event");
|
||||||
assertEquals("The one partition event must of type UPDATE", PartitionEventType.UPDATE,
|
assertEquals(PartitionEventType.UPDATE, partitionEvents.iterator().next().eventType,
|
||||||
partitionEvents.iterator().next().eventType);
|
"The one partition event must of type UPDATE");
|
||||||
|
|
||||||
tool = new HiveSyncTool(TestUtil.hiveSyncConfig, TestUtil.getHiveConf(), TestUtil.fileSystem);
|
tool = new HiveSyncTool(TestUtil.hiveSyncConfig, TestUtil.getHiveConf(), TestUtil.fileSystem);
|
||||||
tool.syncHoodieTable();
|
tool.syncHoodieTable();
|
||||||
// Sync should update the changed partition to correct path
|
// Sync should update the changed partition to correct path
|
||||||
List<Partition> tablePartitions = hiveClient.scanTablePartitions(TestUtil.hiveSyncConfig.tableName);
|
List<Partition> tablePartitions = hiveClient.scanTablePartitions(TestUtil.hiveSyncConfig.tableName);
|
||||||
assertEquals("The one partition we wrote should be added to hive", 6, tablePartitions.size());
|
assertEquals(6, tablePartitions.size(), "The one partition we wrote should be added to hive");
|
||||||
assertEquals("The last commit that was sycned should be 100", instantTime,
|
assertEquals(instantTime, hiveClient.getLastCommitTimeSynced(TestUtil.hiveSyncConfig.tableName).get(),
|
||||||
hiveClient.getLastCommitTimeSynced(TestUtil.hiveSyncConfig.tableName).get());
|
"The last commit that was sycned should be 100");
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@ParameterizedTest
|
||||||
public void testSyncIncremental() throws Exception {
|
@MethodSource("useJdbc")
|
||||||
TestUtil.hiveSyncConfig.useJdbc = this.useJdbc;
|
public void testSyncIncremental(boolean useJdbc) throws Exception {
|
||||||
|
TestUtil.hiveSyncConfig.useJdbc = useJdbc;
|
||||||
String commitTime1 = "100";
|
String commitTime1 = "100";
|
||||||
TestUtil.createCOWTable(commitTime1, 5);
|
TestUtil.createCOWTable(commitTime1, 5);
|
||||||
HoodieHiveClient hiveClient =
|
HoodieHiveClient hiveClient =
|
||||||
@@ -227,10 +220,10 @@ public class TestHiveSyncTool {
|
|||||||
// Lets do the sync
|
// Lets do the sync
|
||||||
HiveSyncTool tool = new HiveSyncTool(TestUtil.hiveSyncConfig, TestUtil.getHiveConf(), TestUtil.fileSystem);
|
HiveSyncTool tool = new HiveSyncTool(TestUtil.hiveSyncConfig, TestUtil.getHiveConf(), TestUtil.fileSystem);
|
||||||
tool.syncHoodieTable();
|
tool.syncHoodieTable();
|
||||||
assertEquals("Table partitions should match the number of partitions we wrote", 5,
|
assertEquals(5, hiveClient.scanTablePartitions(TestUtil.hiveSyncConfig.tableName).size(),
|
||||||
hiveClient.scanTablePartitions(TestUtil.hiveSyncConfig.tableName).size());
|
"Table partitions should match the number of partitions we wrote");
|
||||||
assertEquals("The last commit that was sycned should be updated in the TBLPROPERTIES", commitTime1,
|
assertEquals(commitTime1, hiveClient.getLastCommitTimeSynced(TestUtil.hiveSyncConfig.tableName).get(),
|
||||||
hiveClient.getLastCommitTimeSynced(TestUtil.hiveSyncConfig.tableName).get());
|
"The last commit that was sycned should be updated in the TBLPROPERTIES");
|
||||||
|
|
||||||
// Now lets create more parititions and these are the only ones which needs to be synced
|
// Now lets create more parititions and these are the only ones which needs to be synced
|
||||||
DateTime dateTime = DateTime.now().plusDays(6);
|
DateTime dateTime = DateTime.now().plusDays(6);
|
||||||
@@ -240,25 +233,25 @@ public class TestHiveSyncTool {
|
|||||||
// Lets do the sync
|
// Lets do the sync
|
||||||
hiveClient = new HoodieHiveClient(TestUtil.hiveSyncConfig, TestUtil.getHiveConf(), TestUtil.fileSystem);
|
hiveClient = new HoodieHiveClient(TestUtil.hiveSyncConfig, TestUtil.getHiveConf(), TestUtil.fileSystem);
|
||||||
List<String> writtenPartitionsSince = hiveClient.getPartitionsWrittenToSince(Option.of(commitTime1));
|
List<String> writtenPartitionsSince = hiveClient.getPartitionsWrittenToSince(Option.of(commitTime1));
|
||||||
assertEquals("We should have one partition written after 100 commit", 1, writtenPartitionsSince.size());
|
assertEquals(1, writtenPartitionsSince.size(), "We should have one partition written after 100 commit");
|
||||||
List<Partition> hivePartitions = hiveClient.scanTablePartitions(TestUtil.hiveSyncConfig.tableName);
|
List<Partition> hivePartitions = hiveClient.scanTablePartitions(TestUtil.hiveSyncConfig.tableName);
|
||||||
List<PartitionEvent> partitionEvents = hiveClient.getPartitionEvents(hivePartitions, writtenPartitionsSince);
|
List<PartitionEvent> partitionEvents = hiveClient.getPartitionEvents(hivePartitions, writtenPartitionsSince);
|
||||||
assertEquals("There should be only one paritition event", 1, partitionEvents.size());
|
assertEquals(1, partitionEvents.size(), "There should be only one paritition event");
|
||||||
assertEquals("The one partition event must of type ADD", PartitionEventType.ADD,
|
assertEquals(PartitionEventType.ADD, partitionEvents.iterator().next().eventType, "The one partition event must of type ADD");
|
||||||
partitionEvents.iterator().next().eventType);
|
|
||||||
|
|
||||||
tool = new HiveSyncTool(TestUtil.hiveSyncConfig, TestUtil.getHiveConf(), TestUtil.fileSystem);
|
tool = new HiveSyncTool(TestUtil.hiveSyncConfig, TestUtil.getHiveConf(), TestUtil.fileSystem);
|
||||||
tool.syncHoodieTable();
|
tool.syncHoodieTable();
|
||||||
// Sync should add the one partition
|
// Sync should add the one partition
|
||||||
assertEquals("The one partition we wrote should be added to hive", 6,
|
assertEquals(6, hiveClient.scanTablePartitions(TestUtil.hiveSyncConfig.tableName).size(),
|
||||||
hiveClient.scanTablePartitions(TestUtil.hiveSyncConfig.tableName).size());
|
"The one partition we wrote should be added to hive");
|
||||||
assertEquals("The last commit that was sycned should be 101", commitTime2,
|
assertEquals(commitTime2, hiveClient.getLastCommitTimeSynced(TestUtil.hiveSyncConfig.tableName).get(),
|
||||||
hiveClient.getLastCommitTimeSynced(TestUtil.hiveSyncConfig.tableName).get());
|
"The last commit that was sycned should be 101");
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@ParameterizedTest
|
||||||
public void testSyncIncrementalWithSchemaEvolution() throws Exception {
|
@MethodSource("useJdbc")
|
||||||
TestUtil.hiveSyncConfig.useJdbc = this.useJdbc;
|
public void testSyncIncrementalWithSchemaEvolution(boolean useJdbc) throws Exception {
|
||||||
|
TestUtil.hiveSyncConfig.useJdbc = useJdbc;
|
||||||
String commitTime1 = "100";
|
String commitTime1 = "100";
|
||||||
TestUtil.createCOWTable(commitTime1, 5);
|
TestUtil.createCOWTable(commitTime1, 5);
|
||||||
HoodieHiveClient hiveClient =
|
HoodieHiveClient hiveClient =
|
||||||
@@ -278,42 +271,42 @@ public class TestHiveSyncTool {
|
|||||||
tool = new HiveSyncTool(TestUtil.hiveSyncConfig, TestUtil.getHiveConf(), TestUtil.fileSystem);
|
tool = new HiveSyncTool(TestUtil.hiveSyncConfig, TestUtil.getHiveConf(), TestUtil.fileSystem);
|
||||||
tool.syncHoodieTable();
|
tool.syncHoodieTable();
|
||||||
|
|
||||||
assertEquals("Hive Schema has evolved and should not be 3 more field", fields + 3,
|
assertEquals(fields + 3, hiveClient.getTableSchema(TestUtil.hiveSyncConfig.tableName).size(),
|
||||||
hiveClient.getTableSchema(TestUtil.hiveSyncConfig.tableName).size());
|
"Hive Schema has evolved and should not be 3 more field");
|
||||||
assertEquals("Hive Schema has evolved - Field favorite_number has evolved from int to long", "BIGINT",
|
assertEquals("BIGINT", hiveClient.getTableSchema(TestUtil.hiveSyncConfig.tableName).get("favorite_number"),
|
||||||
hiveClient.getTableSchema(TestUtil.hiveSyncConfig.tableName).get("favorite_number"));
|
"Hive Schema has evolved - Field favorite_number has evolved from int to long");
|
||||||
assertTrue("Hive Schema has evolved - Field favorite_movie was added",
|
assertTrue(hiveClient.getTableSchema(TestUtil.hiveSyncConfig.tableName).containsKey("favorite_movie"),
|
||||||
hiveClient.getTableSchema(TestUtil.hiveSyncConfig.tableName).containsKey("favorite_movie"));
|
"Hive Schema has evolved - Field favorite_movie was added");
|
||||||
|
|
||||||
// Sync should add the one partition
|
// Sync should add the one partition
|
||||||
assertEquals("The one partition we wrote should be added to hive", 6,
|
assertEquals(6, hiveClient.scanTablePartitions(TestUtil.hiveSyncConfig.tableName).size(),
|
||||||
hiveClient.scanTablePartitions(TestUtil.hiveSyncConfig.tableName).size());
|
"The one partition we wrote should be added to hive");
|
||||||
assertEquals("The last commit that was sycned should be 101", commitTime2,
|
assertEquals(commitTime2, hiveClient.getLastCommitTimeSynced(TestUtil.hiveSyncConfig.tableName).get(),
|
||||||
hiveClient.getLastCommitTimeSynced(TestUtil.hiveSyncConfig.tableName).get());
|
"The last commit that was sycned should be 101");
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@ParameterizedTest
|
||||||
public void testSyncMergeOnRead() throws Exception {
|
@MethodSource("useJdbc")
|
||||||
TestUtil.hiveSyncConfig.useJdbc = this.useJdbc;
|
public void testSyncMergeOnRead(boolean useJdbc) throws Exception {
|
||||||
|
TestUtil.hiveSyncConfig.useJdbc = useJdbc;
|
||||||
String instantTime = "100";
|
String instantTime = "100";
|
||||||
String deltaCommitTime = "101";
|
String deltaCommitTime = "101";
|
||||||
TestUtil.createMORTable(instantTime, deltaCommitTime, 5, true);
|
TestUtil.createMORTable(instantTime, deltaCommitTime, 5, true);
|
||||||
|
|
||||||
String roTableName = TestUtil.hiveSyncConfig.tableName + HiveSyncTool.SUFFIX_READ_OPTIMIZED_TABLE;
|
String roTableName = TestUtil.hiveSyncConfig.tableName + HiveSyncTool.SUFFIX_READ_OPTIMIZED_TABLE;
|
||||||
HoodieHiveClient hiveClient = new HoodieHiveClient(TestUtil.hiveSyncConfig, TestUtil.getHiveConf(), TestUtil.fileSystem);
|
HoodieHiveClient hiveClient = new HoodieHiveClient(TestUtil.hiveSyncConfig, TestUtil.getHiveConf(), TestUtil.fileSystem);
|
||||||
assertFalse("Table " + TestUtil.hiveSyncConfig.tableName + " should not exist initially", hiveClient.doesTableExist(roTableName));
|
assertFalse(hiveClient.doesTableExist(roTableName), "Table " + TestUtil.hiveSyncConfig.tableName + " should not exist initially");
|
||||||
// Lets do the sync
|
// Lets do the sync
|
||||||
HiveSyncTool tool = new HiveSyncTool(TestUtil.hiveSyncConfig, TestUtil.getHiveConf(), TestUtil.fileSystem);
|
HiveSyncTool tool = new HiveSyncTool(TestUtil.hiveSyncConfig, TestUtil.getHiveConf(), TestUtil.fileSystem);
|
||||||
tool.syncHoodieTable();
|
tool.syncHoodieTable();
|
||||||
|
|
||||||
assertTrue("Table " + roTableName + " should exist after sync completes",
|
assertTrue(hiveClient.doesTableExist(roTableName), "Table " + roTableName + " should exist after sync completes");
|
||||||
hiveClient.doesTableExist(roTableName));
|
assertEquals(hiveClient.getTableSchema(roTableName).size(), SchemaTestUtil.getSimpleSchema().getFields().size() + 1,
|
||||||
assertEquals("Hive Schema should match the table schema + partition field", hiveClient.getTableSchema(roTableName).size(),
|
"Hive Schema should match the table schema + partition field");
|
||||||
SchemaTestUtil.getSimpleSchema().getFields().size() + 1);
|
assertEquals(5, hiveClient.scanTablePartitions(roTableName).size(),
|
||||||
assertEquals("Table partitions should match the number of partitions we wrote", 5,
|
"Table partitions should match the number of partitions we wrote");
|
||||||
hiveClient.scanTablePartitions(roTableName).size());
|
assertEquals(deltaCommitTime, hiveClient.getLastCommitTimeSynced(roTableName).get(),
|
||||||
assertEquals("The last commit that was sycned should be updated in the TBLPROPERTIES", deltaCommitTime,
|
"The last commit that was sycned should be updated in the TBLPROPERTIES");
|
||||||
hiveClient.getLastCommitTimeSynced(roTableName).get());
|
|
||||||
|
|
||||||
// Now lets create more partitions and these are the only ones which needs to be synced
|
// Now lets create more partitions and these are the only ones which needs to be synced
|
||||||
DateTime dateTime = DateTime.now().plusDays(6);
|
DateTime dateTime = DateTime.now().plusDays(6);
|
||||||
@@ -327,17 +320,19 @@ public class TestHiveSyncTool {
|
|||||||
tool.syncHoodieTable();
|
tool.syncHoodieTable();
|
||||||
hiveClient = new HoodieHiveClient(TestUtil.hiveSyncConfig, TestUtil.getHiveConf(), TestUtil.fileSystem);
|
hiveClient = new HoodieHiveClient(TestUtil.hiveSyncConfig, TestUtil.getHiveConf(), TestUtil.fileSystem);
|
||||||
|
|
||||||
assertEquals("Hive Schema should match the evolved table schema + partition field",
|
assertEquals(hiveClient.getTableSchema(roTableName).size(), SchemaTestUtil.getEvolvedSchema().getFields().size() + 1,
|
||||||
hiveClient.getTableSchema(roTableName).size(), SchemaTestUtil.getEvolvedSchema().getFields().size() + 1);
|
"Hive Schema should match the evolved table schema + partition field");
|
||||||
// Sync should add the one partition
|
// Sync should add the one partition
|
||||||
assertEquals("The 2 partitions we wrote should be added to hive", 6, hiveClient.scanTablePartitions(roTableName).size());
|
assertEquals(6, hiveClient.scanTablePartitions(roTableName).size(),
|
||||||
assertEquals("The last commit that was synced should be 103", deltaCommitTime2,
|
"The 2 partitions we wrote should be added to hive");
|
||||||
hiveClient.getLastCommitTimeSynced(roTableName).get());
|
assertEquals(deltaCommitTime2, hiveClient.getLastCommitTimeSynced(roTableName).get(),
|
||||||
|
"The last commit that was synced should be 103");
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@ParameterizedTest
|
||||||
public void testSyncMergeOnReadRT() throws Exception {
|
@MethodSource("useJdbc")
|
||||||
TestUtil.hiveSyncConfig.useJdbc = this.useJdbc;
|
public void testSyncMergeOnReadRT(boolean useJdbc) throws Exception {
|
||||||
|
TestUtil.hiveSyncConfig.useJdbc = useJdbc;
|
||||||
String instantTime = "100";
|
String instantTime = "100";
|
||||||
String deltaCommitTime = "101";
|
String deltaCommitTime = "101";
|
||||||
String snapshotTableName = TestUtil.hiveSyncConfig.tableName + HiveSyncTool.SUFFIX_SNAPSHOT_TABLE;
|
String snapshotTableName = TestUtil.hiveSyncConfig.tableName + HiveSyncTool.SUFFIX_SNAPSHOT_TABLE;
|
||||||
@@ -345,22 +340,24 @@ public class TestHiveSyncTool {
|
|||||||
HoodieHiveClient hiveClientRT =
|
HoodieHiveClient hiveClientRT =
|
||||||
new HoodieHiveClient(TestUtil.hiveSyncConfig, TestUtil.getHiveConf(), TestUtil.fileSystem);
|
new HoodieHiveClient(TestUtil.hiveSyncConfig, TestUtil.getHiveConf(), TestUtil.fileSystem);
|
||||||
|
|
||||||
assertFalse("Table " + TestUtil.hiveSyncConfig.tableName + HiveSyncTool.SUFFIX_SNAPSHOT_TABLE
|
assertFalse(hiveClientRT.doesTableExist(snapshotTableName),
|
||||||
+ " should not exist initially", hiveClientRT.doesTableExist(snapshotTableName));
|
"Table " + TestUtil.hiveSyncConfig.tableName + HiveSyncTool.SUFFIX_SNAPSHOT_TABLE
|
||||||
|
+ " should not exist initially");
|
||||||
|
|
||||||
// Lets do the sync
|
// Lets do the sync
|
||||||
HiveSyncTool tool = new HiveSyncTool(TestUtil.hiveSyncConfig, TestUtil.getHiveConf(), TestUtil.fileSystem);
|
HiveSyncTool tool = new HiveSyncTool(TestUtil.hiveSyncConfig, TestUtil.getHiveConf(), TestUtil.fileSystem);
|
||||||
tool.syncHoodieTable();
|
tool.syncHoodieTable();
|
||||||
|
|
||||||
assertTrue("Table " + TestUtil.hiveSyncConfig.tableName + HiveSyncTool.SUFFIX_SNAPSHOT_TABLE
|
assertTrue(hiveClientRT.doesTableExist(snapshotTableName),
|
||||||
+ " should exist after sync completes", hiveClientRT.doesTableExist(snapshotTableName));
|
"Table " + TestUtil.hiveSyncConfig.tableName + HiveSyncTool.SUFFIX_SNAPSHOT_TABLE
|
||||||
|
+ " should exist after sync completes");
|
||||||
|
|
||||||
assertEquals("Hive Schema should match the table schema + partition field", hiveClientRT.getTableSchema(snapshotTableName).size(),
|
assertEquals(hiveClientRT.getTableSchema(snapshotTableName).size(), SchemaTestUtil.getSimpleSchema().getFields().size() + 1,
|
||||||
SchemaTestUtil.getSimpleSchema().getFields().size() + 1);
|
"Hive Schema should match the table schema + partition field");
|
||||||
assertEquals("Table partitions should match the number of partitions we wrote", 5,
|
assertEquals(5, hiveClientRT.scanTablePartitions(snapshotTableName).size(),
|
||||||
hiveClientRT.scanTablePartitions(snapshotTableName).size());
|
"Table partitions should match the number of partitions we wrote");
|
||||||
assertEquals("The last commit that was synced should be updated in the TBLPROPERTIES", deltaCommitTime,
|
assertEquals(deltaCommitTime, hiveClientRT.getLastCommitTimeSynced(snapshotTableName).get(),
|
||||||
hiveClientRT.getLastCommitTimeSynced(snapshotTableName).get());
|
"The last commit that was synced should be updated in the TBLPROPERTIES");
|
||||||
|
|
||||||
// Now lets create more parititions and these are the only ones which needs to be synced
|
// Now lets create more parititions and these are the only ones which needs to be synced
|
||||||
DateTime dateTime = DateTime.now().plusDays(6);
|
DateTime dateTime = DateTime.now().plusDays(6);
|
||||||
@@ -374,17 +371,19 @@ public class TestHiveSyncTool {
|
|||||||
tool.syncHoodieTable();
|
tool.syncHoodieTable();
|
||||||
hiveClientRT = new HoodieHiveClient(TestUtil.hiveSyncConfig, TestUtil.getHiveConf(), TestUtil.fileSystem);
|
hiveClientRT = new HoodieHiveClient(TestUtil.hiveSyncConfig, TestUtil.getHiveConf(), TestUtil.fileSystem);
|
||||||
|
|
||||||
assertEquals("Hive Schema should match the evolved table schema + partition field",
|
assertEquals(hiveClientRT.getTableSchema(snapshotTableName).size(), SchemaTestUtil.getEvolvedSchema().getFields().size() + 1,
|
||||||
hiveClientRT.getTableSchema(snapshotTableName).size(), SchemaTestUtil.getEvolvedSchema().getFields().size() + 1);
|
"Hive Schema should match the evolved table schema + partition field");
|
||||||
// Sync should add the one partition
|
// Sync should add the one partition
|
||||||
assertEquals("The 2 partitions we wrote should be added to hive", 6, hiveClientRT.scanTablePartitions(snapshotTableName).size());
|
assertEquals(6, hiveClientRT.scanTablePartitions(snapshotTableName).size(),
|
||||||
assertEquals("The last commit that was sycned should be 103", deltaCommitTime2,
|
"The 2 partitions we wrote should be added to hive");
|
||||||
hiveClientRT.getLastCommitTimeSynced(snapshotTableName).get());
|
assertEquals(deltaCommitTime2, hiveClientRT.getLastCommitTimeSynced(snapshotTableName).get(),
|
||||||
|
"The last commit that was sycned should be 103");
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@ParameterizedTest
|
||||||
public void testMultiPartitionKeySync() throws Exception {
|
@MethodSource("useJdbc")
|
||||||
TestUtil.hiveSyncConfig.useJdbc = this.useJdbc;
|
public void testMultiPartitionKeySync(boolean useJdbc) throws Exception {
|
||||||
|
TestUtil.hiveSyncConfig.useJdbc = useJdbc;
|
||||||
String instantTime = "100";
|
String instantTime = "100";
|
||||||
TestUtil.createCOWTable(instantTime, 5);
|
TestUtil.createCOWTable(instantTime, 5);
|
||||||
|
|
||||||
@@ -395,46 +394,46 @@ public class TestHiveSyncTool {
|
|||||||
TestUtil.getCreatedTablesSet().add(hiveSyncConfig.databaseName + "." + hiveSyncConfig.tableName);
|
TestUtil.getCreatedTablesSet().add(hiveSyncConfig.databaseName + "." + hiveSyncConfig.tableName);
|
||||||
|
|
||||||
HoodieHiveClient hiveClient = new HoodieHiveClient(hiveSyncConfig, TestUtil.getHiveConf(), TestUtil.fileSystem);
|
HoodieHiveClient hiveClient = new HoodieHiveClient(hiveSyncConfig, TestUtil.getHiveConf(), TestUtil.fileSystem);
|
||||||
assertFalse("Table " + hiveSyncConfig.tableName + " should not exist initially",
|
assertFalse(hiveClient.doesTableExist(hiveSyncConfig.tableName),
|
||||||
hiveClient.doesTableExist(hiveSyncConfig.tableName));
|
"Table " + hiveSyncConfig.tableName + " should not exist initially");
|
||||||
// Lets do the sync
|
// Lets do the sync
|
||||||
HiveSyncTool tool = new HiveSyncTool(hiveSyncConfig, TestUtil.getHiveConf(), TestUtil.fileSystem);
|
HiveSyncTool tool = new HiveSyncTool(hiveSyncConfig, TestUtil.getHiveConf(), TestUtil.fileSystem);
|
||||||
tool.syncHoodieTable();
|
tool.syncHoodieTable();
|
||||||
assertTrue("Table " + hiveSyncConfig.tableName + " should exist after sync completes",
|
assertTrue(hiveClient.doesTableExist(hiveSyncConfig.tableName),
|
||||||
hiveClient.doesTableExist(hiveSyncConfig.tableName));
|
"Table " + hiveSyncConfig.tableName + " should exist after sync completes");
|
||||||
assertEquals("Hive Schema should match the table schema + partition fields",
|
assertEquals(hiveClient.getTableSchema(hiveSyncConfig.tableName).size(),
|
||||||
hiveClient.getTableSchema(hiveSyncConfig.tableName).size(),
|
hiveClient.getDataSchema().getColumns().size() + 3,
|
||||||
hiveClient.getDataSchema().getColumns().size() + 3);
|
"Hive Schema should match the table schema + partition fields");
|
||||||
assertEquals("Table partitions should match the number of partitions we wrote", 5,
|
assertEquals(5, hiveClient.scanTablePartitions(hiveSyncConfig.tableName).size(),
|
||||||
hiveClient.scanTablePartitions(hiveSyncConfig.tableName).size());
|
"Table partitions should match the number of partitions we wrote");
|
||||||
assertEquals("The last commit that was sycned should be updated in the TBLPROPERTIES", instantTime,
|
assertEquals(instantTime, hiveClient.getLastCommitTimeSynced(hiveSyncConfig.tableName).get(),
|
||||||
hiveClient.getLastCommitTimeSynced(hiveSyncConfig.tableName).get());
|
"The last commit that was sycned should be updated in the TBLPROPERTIES");
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@ParameterizedTest
|
||||||
public void testReadSchemaForMOR() throws Exception {
|
@MethodSource("useJdbc")
|
||||||
TestUtil.hiveSyncConfig.useJdbc = this.useJdbc;
|
public void testReadSchemaForMOR(boolean useJdbc) throws Exception {
|
||||||
|
TestUtil.hiveSyncConfig.useJdbc = useJdbc;
|
||||||
String commitTime = "100";
|
String commitTime = "100";
|
||||||
String snapshotTableName = TestUtil.hiveSyncConfig.tableName + HiveSyncTool.SUFFIX_SNAPSHOT_TABLE;
|
String snapshotTableName = TestUtil.hiveSyncConfig.tableName + HiveSyncTool.SUFFIX_SNAPSHOT_TABLE;
|
||||||
TestUtil.createMORTable(commitTime, "", 5, false);
|
TestUtil.createMORTable(commitTime, "", 5, false);
|
||||||
HoodieHiveClient hiveClientRT =
|
HoodieHiveClient hiveClientRT =
|
||||||
new HoodieHiveClient(TestUtil.hiveSyncConfig, TestUtil.getHiveConf(), TestUtil.fileSystem);
|
new HoodieHiveClient(TestUtil.hiveSyncConfig, TestUtil.getHiveConf(), TestUtil.fileSystem);
|
||||||
|
|
||||||
assertFalse("Table " + TestUtil.hiveSyncConfig.tableName + HiveSyncTool.SUFFIX_SNAPSHOT_TABLE
|
assertFalse(hiveClientRT.doesTableExist(snapshotTableName), "Table " + TestUtil.hiveSyncConfig.tableName + HiveSyncTool.SUFFIX_SNAPSHOT_TABLE
|
||||||
+ " should not exist initially", hiveClientRT.doesTableExist(snapshotTableName));
|
+ " should not exist initially");
|
||||||
|
|
||||||
// Lets do the sync
|
// Lets do the sync
|
||||||
HiveSyncTool tool = new HiveSyncTool(TestUtil.hiveSyncConfig, TestUtil.getHiveConf(), TestUtil.fileSystem);
|
HiveSyncTool tool = new HiveSyncTool(TestUtil.hiveSyncConfig, TestUtil.getHiveConf(), TestUtil.fileSystem);
|
||||||
tool.syncHoodieTable();
|
tool.syncHoodieTable();
|
||||||
|
|
||||||
assertTrue("Table " + TestUtil.hiveSyncConfig.tableName + HiveSyncTool.SUFFIX_SNAPSHOT_TABLE
|
assertTrue(hiveClientRT.doesTableExist(snapshotTableName), "Table " + TestUtil.hiveSyncConfig.tableName + HiveSyncTool.SUFFIX_SNAPSHOT_TABLE
|
||||||
+ " should exist after sync completes", hiveClientRT.doesTableExist(snapshotTableName));
|
+ " should exist after sync completes");
|
||||||
|
|
||||||
// Schema being read from compacted base files
|
// Schema being read from compacted base files
|
||||||
assertEquals("Hive Schema should match the table schema + partition field", hiveClientRT.getTableSchema(snapshotTableName).size(),
|
assertEquals(hiveClientRT.getTableSchema(snapshotTableName).size(), SchemaTestUtil.getSimpleSchema().getFields().size() + 1,
|
||||||
SchemaTestUtil.getSimpleSchema().getFields().size() + 1);
|
"Hive Schema should match the table schema + partition field");
|
||||||
assertEquals("Table partitions should match the number of partitions we wrote", 5,
|
assertEquals(5, hiveClientRT.scanTablePartitions(snapshotTableName).size(), "Table partitions should match the number of partitions we wrote");
|
||||||
hiveClientRT.scanTablePartitions(snapshotTableName).size());
|
|
||||||
|
|
||||||
// Now lets create more partitions and these are the only ones which needs to be synced
|
// Now lets create more partitions and these are the only ones which needs to be synced
|
||||||
DateTime dateTime = DateTime.now().plusDays(6);
|
DateTime dateTime = DateTime.now().plusDays(6);
|
||||||
@@ -448,12 +447,12 @@ public class TestHiveSyncTool {
|
|||||||
hiveClientRT = new HoodieHiveClient(TestUtil.hiveSyncConfig, TestUtil.getHiveConf(), TestUtil.fileSystem);
|
hiveClientRT = new HoodieHiveClient(TestUtil.hiveSyncConfig, TestUtil.getHiveConf(), TestUtil.fileSystem);
|
||||||
|
|
||||||
// Schema being read from the log files
|
// Schema being read from the log files
|
||||||
assertEquals("Hive Schema should match the evolved table schema + partition field",
|
assertEquals(hiveClientRT.getTableSchema(snapshotTableName).size(), SchemaTestUtil.getEvolvedSchema().getFields().size() + 1,
|
||||||
hiveClientRT.getTableSchema(snapshotTableName).size(), SchemaTestUtil.getEvolvedSchema().getFields().size() + 1);
|
"Hive Schema should match the evolved table schema + partition field");
|
||||||
// Sync should add the one partition
|
// Sync should add the one partition
|
||||||
assertEquals("The 1 partition we wrote should be added to hive", 6, hiveClientRT.scanTablePartitions(snapshotTableName).size());
|
assertEquals(6, hiveClientRT.scanTablePartitions(snapshotTableName).size(), "The 1 partition we wrote should be added to hive");
|
||||||
assertEquals("The last commit that was sycned should be 103", deltaCommitTime2,
|
assertEquals(deltaCommitTime2, hiveClientRT.getLastCommitTimeSynced(snapshotTableName).get(),
|
||||||
hiveClientRT.getLastCommitTimeSynced(snapshotTableName).get());
|
"The last commit that was sycned should be 103");
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -75,7 +75,7 @@ import java.util.Map.Entry;
|
|||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
import java.util.UUID;
|
import java.util.UUID;
|
||||||
|
|
||||||
import static org.junit.Assert.fail;
|
import static org.junit.jupiter.api.Assertions.fail;
|
||||||
|
|
||||||
@SuppressWarnings("SameParameterValue")
|
@SuppressWarnings("SameParameterValue")
|
||||||
public class TestUtil {
|
public class TestUtil {
|
||||||
|
|||||||
@@ -115,11 +115,29 @@
|
|||||||
</dependency>
|
</dependency>
|
||||||
|
|
||||||
<dependency>
|
<dependency>
|
||||||
<groupId>junit</groupId>
|
<groupId>org.junit.jupiter</groupId>
|
||||||
<artifactId>junit</artifactId>
|
<artifactId>junit-jupiter-api</artifactId>
|
||||||
<version>${junit.version}</version>
|
|
||||||
<scope>test</scope>
|
<scope>test</scope>
|
||||||
</dependency>
|
</dependency>
|
||||||
|
|
||||||
|
<dependency>
|
||||||
|
<groupId>org.junit.jupiter</groupId>
|
||||||
|
<artifactId>junit-jupiter-engine</artifactId>
|
||||||
|
<scope>test</scope>
|
||||||
|
</dependency>
|
||||||
|
|
||||||
|
<dependency>
|
||||||
|
<groupId>org.junit.vintage</groupId>
|
||||||
|
<artifactId>junit-vintage-engine</artifactId>
|
||||||
|
<scope>test</scope>
|
||||||
|
</dependency>
|
||||||
|
|
||||||
|
<dependency>
|
||||||
|
<groupId>org.junit.jupiter</groupId>
|
||||||
|
<artifactId>junit-jupiter-params</artifactId>
|
||||||
|
<scope>test</scope>
|
||||||
|
</dependency>
|
||||||
|
|
||||||
</dependencies>
|
</dependencies>
|
||||||
|
|
||||||
<properties>
|
<properties>
|
||||||
|
|||||||
@@ -33,8 +33,7 @@ import com.github.dockerjava.core.command.ExecStartResultCallback;
|
|||||||
import com.github.dockerjava.jaxrs.JerseyDockerCmdExecFactory;
|
import com.github.dockerjava.jaxrs.JerseyDockerCmdExecFactory;
|
||||||
import org.apache.log4j.LogManager;
|
import org.apache.log4j.LogManager;
|
||||||
import org.apache.log4j.Logger;
|
import org.apache.log4j.Logger;
|
||||||
import org.junit.Assert;
|
import org.junit.jupiter.api.BeforeEach;
|
||||||
import org.junit.Before;
|
|
||||||
|
|
||||||
import java.io.ByteArrayOutputStream;
|
import java.io.ByteArrayOutputStream;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
@@ -46,6 +45,8 @@ import java.util.stream.Collectors;
|
|||||||
|
|
||||||
import static java.util.concurrent.TimeUnit.SECONDS;
|
import static java.util.concurrent.TimeUnit.SECONDS;
|
||||||
import static org.awaitility.Awaitility.await;
|
import static org.awaitility.Awaitility.await;
|
||||||
|
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||||
|
import static org.junit.jupiter.api.Assertions.assertNotEquals;
|
||||||
|
|
||||||
public abstract class ITTestBase {
|
public abstract class ITTestBase {
|
||||||
|
|
||||||
@@ -113,12 +114,12 @@ public abstract class ITTestBase {
|
|||||||
static String getPrestoConsoleCommand(String commandFile) {
|
static String getPrestoConsoleCommand(String commandFile) {
|
||||||
StringBuilder builder = new StringBuilder().append("presto --server " + PRESTO_COORDINATOR_URL)
|
StringBuilder builder = new StringBuilder().append("presto --server " + PRESTO_COORDINATOR_URL)
|
||||||
.append(" --catalog hive --schema default")
|
.append(" --catalog hive --schema default")
|
||||||
.append(" -f " + commandFile );
|
.append(" -f " + commandFile);
|
||||||
System.out.println("Presto comamnd " + builder.toString());
|
System.out.println("Presto comamnd " + builder.toString());
|
||||||
return builder.toString();
|
return builder.toString();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Before
|
@BeforeEach
|
||||||
public void init() {
|
public void init() {
|
||||||
String dockerHost = (OVERRIDDEN_DOCKER_HOST != null) ? OVERRIDDEN_DOCKER_HOST : DEFAULT_DOCKER_HOST;
|
String dockerHost = (OVERRIDDEN_DOCKER_HOST != null) ? OVERRIDDEN_DOCKER_HOST : DEFAULT_DOCKER_HOST;
|
||||||
// Assuming insecure docker engine
|
// Assuming insecure docker engine
|
||||||
@@ -165,10 +166,9 @@ public abstract class ITTestBase {
|
|||||||
LOG.error("\n\n ###### Stderr #######\n" + callback.getStderr().toString());
|
LOG.error("\n\n ###### Stderr #######\n" + callback.getStderr().toString());
|
||||||
|
|
||||||
if (expectedToSucceed) {
|
if (expectedToSucceed) {
|
||||||
Assert.assertEquals("Command (" + Arrays.toString(command) + ") expected to succeed. Exit (" + exitCode + ")", 0, exitCode);
|
assertEquals(0, exitCode, "Command (" + Arrays.toString(command) + ") expected to succeed. Exit (" + exitCode + ")");
|
||||||
} else {
|
} else {
|
||||||
Assert.assertTrue("Command (" + Arrays.toString(command) + ") expected to fail. Exit (" + exitCode + ")",
|
assertNotEquals(0, exitCode, "Command (" + Arrays.toString(command) + ") expected to fail. Exit (" + exitCode + ")");
|
||||||
exitCode != 0);
|
|
||||||
}
|
}
|
||||||
cmd.close();
|
cmd.close();
|
||||||
return callback;
|
return callback;
|
||||||
@@ -224,7 +224,7 @@ public abstract class ITTestBase {
|
|||||||
return Pair.of(callback.getStdout().toString().trim(), callback.getStderr().toString().trim());
|
return Pair.of(callback.getStdout().toString().trim(), callback.getStderr().toString().trim());
|
||||||
}
|
}
|
||||||
|
|
||||||
void executePrestoCopyCommand(String fromFile, String remotePath){
|
void executePrestoCopyCommand(String fromFile, String remotePath) {
|
||||||
Container sparkWorkerContainer = runningContainers.get(PRESTO_COORDINATOR);
|
Container sparkWorkerContainer = runningContainers.get(PRESTO_COORDINATOR);
|
||||||
dockerClient.copyArchiveToContainerCmd(sparkWorkerContainer.getId())
|
dockerClient.copyArchiveToContainerCmd(sparkWorkerContainer.getId())
|
||||||
.withHostResource(fromFile)
|
.withHostResource(fromFile)
|
||||||
@@ -268,7 +268,7 @@ public abstract class ITTestBase {
|
|||||||
saveUpLogs();
|
saveUpLogs();
|
||||||
}
|
}
|
||||||
|
|
||||||
Assert.assertEquals("Did not find output the expected number of times", times, count);
|
assertEquals(times, count, "Did not find output the expected number of times");
|
||||||
}
|
}
|
||||||
|
|
||||||
public class TestExecStartResultCallback extends ExecStartResultCallback {
|
public class TestExecStartResultCallback extends ExecStartResultCallback {
|
||||||
|
|||||||
@@ -21,7 +21,7 @@ package org.apache.hudi.integ;
|
|||||||
import org.apache.hudi.common.util.CollectionUtils;
|
import org.apache.hudi.common.util.CollectionUtils;
|
||||||
import org.apache.hudi.common.util.collection.Pair;
|
import org.apache.hudi.common.util.collection.Pair;
|
||||||
|
|
||||||
import org.junit.Test;
|
import org.junit.jupiter.api.Test;
|
||||||
|
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
@@ -113,9 +113,9 @@ public class ITTestHoodieDemo extends ITTestBase {
|
|||||||
executeCommandStringsInDocker(PRESTO_COORDINATOR, cmds);
|
executeCommandStringsInDocker(PRESTO_COORDINATOR, cmds);
|
||||||
|
|
||||||
// copy presto sql files to presto coordinator
|
// copy presto sql files to presto coordinator
|
||||||
executePrestoCopyCommand( System.getProperty("user.dir") + "/.." + PRESTO_INPUT_TABLE_CHECK_RELATIVE_PATH, HDFS_DATA_DIR);
|
executePrestoCopyCommand(System.getProperty("user.dir") + "/.." + PRESTO_INPUT_TABLE_CHECK_RELATIVE_PATH, HDFS_DATA_DIR);
|
||||||
executePrestoCopyCommand( System.getProperty("user.dir") + "/.." + PRESTO_INPUT_BATCH1_RELATIVE_PATH, HDFS_DATA_DIR);
|
executePrestoCopyCommand(System.getProperty("user.dir") + "/.." + PRESTO_INPUT_BATCH1_RELATIVE_PATH, HDFS_DATA_DIR);
|
||||||
executePrestoCopyCommand( System.getProperty("user.dir") + "/.." + PRESTO_INPUT_BATCH2_RELATIVE_PATH, HDFS_DATA_DIR);
|
executePrestoCopyCommand(System.getProperty("user.dir") + "/.." + PRESTO_INPUT_BATCH2_RELATIVE_PATH, HDFS_DATA_DIR);
|
||||||
}
|
}
|
||||||
|
|
||||||
private void ingestFirstBatchAndHiveSync() throws Exception {
|
private void ingestFirstBatchAndHiveSync() throws Exception {
|
||||||
|
|||||||
@@ -18,12 +18,15 @@
|
|||||||
|
|
||||||
package org.apache.hudi.integ;
|
package org.apache.hudi.integ;
|
||||||
|
|
||||||
|
import org.apache.hudi.common.model.HoodieTableType;
|
||||||
import org.apache.hudi.common.util.Option;
|
import org.apache.hudi.common.util.Option;
|
||||||
import org.apache.hudi.common.util.collection.Pair;
|
import org.apache.hudi.common.util.collection.Pair;
|
||||||
import org.apache.hudi.common.model.HoodieTableType;
|
|
||||||
|
|
||||||
import org.junit.Assert;
|
import org.junit.jupiter.api.Test;
|
||||||
import org.junit.Test;
|
|
||||||
|
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||||
|
import static org.junit.jupiter.api.Assertions.assertTrue;
|
||||||
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Smoke tests to run as part of verification.
|
* Smoke tests to run as part of verification.
|
||||||
@@ -129,7 +132,7 @@ public class ITTestHoodieSanity extends ITTestBase {
|
|||||||
|
|
||||||
// Ensure table does not exist
|
// Ensure table does not exist
|
||||||
Pair<String, String> stdOutErr = executeHiveCommand("show tables like '" + hiveTableName + "'");
|
Pair<String, String> stdOutErr = executeHiveCommand("show tables like '" + hiveTableName + "'");
|
||||||
Assert.assertTrue("Dropped table " + hiveTableName + " exists!", stdOutErr.getLeft().isEmpty());
|
assertTrue(stdOutErr.getLeft().isEmpty(), "Dropped table " + hiveTableName + " exists!");
|
||||||
|
|
||||||
// Run Hoodie Java App
|
// Run Hoodie Java App
|
||||||
String cmd;
|
String cmd;
|
||||||
@@ -145,24 +148,24 @@ public class ITTestHoodieSanity extends ITTestBase {
|
|||||||
}
|
}
|
||||||
executeCommandStringInDocker(ADHOC_1_CONTAINER, cmd, true);
|
executeCommandStringInDocker(ADHOC_1_CONTAINER, cmd, true);
|
||||||
|
|
||||||
String snapshotTableName = tableType.equals(HoodieTableType.MERGE_ON_READ.name()) ?
|
String snapshotTableName = tableType.equals(HoodieTableType.MERGE_ON_READ.name())
|
||||||
hiveTableName + "_rt" : hiveTableName;
|
? hiveTableName + "_rt" : hiveTableName;
|
||||||
Option<String> roTableName = tableType.equals(HoodieTableType.MERGE_ON_READ.name()) ?
|
Option<String> roTableName = tableType.equals(HoodieTableType.MERGE_ON_READ.name())
|
||||||
Option.of(hiveTableName +"_ro") : Option.empty();
|
? Option.of(hiveTableName + "_ro") : Option.empty();
|
||||||
|
|
||||||
// Ensure table does exist
|
// Ensure table does exist
|
||||||
stdOutErr = executeHiveCommand("show tables like '" + snapshotTableName + "'");
|
stdOutErr = executeHiveCommand("show tables like '" + snapshotTableName + "'");
|
||||||
Assert.assertEquals("Table exists", snapshotTableName, stdOutErr.getLeft());
|
assertEquals(snapshotTableName, stdOutErr.getLeft(), "Table exists");
|
||||||
|
|
||||||
// Ensure row count is 80 (without duplicates) (100 - 20 deleted)
|
// Ensure row count is 80 (without duplicates) (100 - 20 deleted)
|
||||||
stdOutErr = executeHiveCommand("select count(1) from " + snapshotTableName);
|
stdOutErr = executeHiveCommand("select count(1) from " + snapshotTableName);
|
||||||
Assert.assertEquals("Expecting 80 rows to be present in the snapshot table", 80,
|
assertEquals(80, Integer.parseInt(stdOutErr.getLeft().trim()),
|
||||||
Integer.parseInt(stdOutErr.getLeft().trim()));
|
"Expecting 80 rows to be present in the snapshot table");
|
||||||
|
|
||||||
if (roTableName.isPresent()) {
|
if (roTableName.isPresent()) {
|
||||||
stdOutErr = executeHiveCommand("select count(1) from " + roTableName.get());
|
stdOutErr = executeHiveCommand("select count(1) from " + roTableName.get());
|
||||||
Assert.assertEquals("Expecting 80 rows to be present in the snapshot table", 80,
|
assertEquals(80, Integer.parseInt(stdOutErr.getLeft().trim()),
|
||||||
Integer.parseInt(stdOutErr.getLeft().trim()));
|
"Expecting 80 rows to be present in the snapshot table");
|
||||||
}
|
}
|
||||||
|
|
||||||
// Make the HDFS dataset non-hoodie and run the same query; Checks for interoperability with non-hoodie tables
|
// Make the HDFS dataset non-hoodie and run the same query; Checks for interoperability with non-hoodie tables
|
||||||
@@ -175,8 +178,8 @@ public class ITTestHoodieSanity extends ITTestBase {
|
|||||||
} else {
|
} else {
|
||||||
stdOutErr = executeHiveCommand("select count(1) from " + snapshotTableName);
|
stdOutErr = executeHiveCommand("select count(1) from " + snapshotTableName);
|
||||||
}
|
}
|
||||||
Assert.assertEquals("Expecting 280 rows to be present in the new table", 280,
|
assertEquals(280, Integer.parseInt(stdOutErr.getLeft().trim()),
|
||||||
Integer.parseInt(stdOutErr.getLeft().trim()));
|
"Expecting 280 rows to be present in the new table");
|
||||||
}
|
}
|
||||||
|
|
||||||
private void dropHiveTables(String hiveTableName, String tableType) throws Exception {
|
private void dropHiveTables(String hiveTableName, String tableType) throws Exception {
|
||||||
|
|||||||
@@ -315,9 +315,34 @@
|
|||||||
</dependency>
|
</dependency>
|
||||||
|
|
||||||
<dependency>
|
<dependency>
|
||||||
<groupId>junit</groupId>
|
<groupId>org.junit.jupiter</groupId>
|
||||||
<artifactId>junit</artifactId>
|
<artifactId>junit-jupiter-api</artifactId>
|
||||||
<scope>test</scope>
|
<scope>test</scope>
|
||||||
</dependency>
|
</dependency>
|
||||||
|
|
||||||
|
<dependency>
|
||||||
|
<groupId>org.junit.jupiter</groupId>
|
||||||
|
<artifactId>junit-jupiter-engine</artifactId>
|
||||||
|
<scope>test</scope>
|
||||||
|
</dependency>
|
||||||
|
|
||||||
|
<dependency>
|
||||||
|
<groupId>org.junit.vintage</groupId>
|
||||||
|
<artifactId>junit-vintage-engine</artifactId>
|
||||||
|
<scope>test</scope>
|
||||||
|
</dependency>
|
||||||
|
|
||||||
|
<dependency>
|
||||||
|
<groupId>org.junit.jupiter</groupId>
|
||||||
|
<artifactId>junit-jupiter-params</artifactId>
|
||||||
|
<scope>test</scope>
|
||||||
|
</dependency>
|
||||||
|
|
||||||
|
<dependency>
|
||||||
|
<groupId>org.mockito</groupId>
|
||||||
|
<artifactId>mockito-all</artifactId>
|
||||||
|
<scope>test</scope>
|
||||||
|
</dependency>
|
||||||
|
|
||||||
</dependencies>
|
</dependencies>
|
||||||
</project>
|
</project>
|
||||||
|
|||||||
@@ -21,11 +21,11 @@ import org.apache.hudi.DataSourceUtils;
|
|||||||
import org.apache.avro.Schema;
|
import org.apache.avro.Schema;
|
||||||
import org.apache.avro.generic.GenericData;
|
import org.apache.avro.generic.GenericData;
|
||||||
import org.apache.avro.generic.GenericRecord;
|
import org.apache.avro.generic.GenericRecord;
|
||||||
import org.junit.Test;
|
import org.junit.jupiter.api.Test;
|
||||||
|
|
||||||
import java.time.LocalDate;
|
import java.time.LocalDate;
|
||||||
|
|
||||||
import static org.junit.Assert.assertEquals;
|
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||||
|
|
||||||
public class DataSourceUtilsTest {
|
public class DataSourceUtilsTest {
|
||||||
|
|
||||||
|
|||||||
@@ -21,12 +21,11 @@ import org.apache.hudi.common.fs.FSUtils
|
|||||||
import org.apache.hudi.config.HoodieWriteConfig
|
import org.apache.hudi.config.HoodieWriteConfig
|
||||||
import org.apache.hudi.{DataSourceReadOptions, DataSourceWriteOptions, HoodieDataSourceHelpers}
|
import org.apache.hudi.{DataSourceReadOptions, DataSourceWriteOptions, HoodieDataSourceHelpers}
|
||||||
import org.apache.spark.sql._
|
import org.apache.spark.sql._
|
||||||
import org.apache.spark.sql.streaming.{OutputMode, ProcessingTime}
|
|
||||||
import org.apache.spark.sql.functions.col
|
import org.apache.spark.sql.functions.col
|
||||||
import org.junit.Assert._
|
import org.apache.spark.sql.streaming.{OutputMode, ProcessingTime}
|
||||||
import org.junit.rules.TemporaryFolder
|
import org.junit.jupiter.api.Assertions.{assertEquals, assertTrue}
|
||||||
import org.junit.{Before, Test}
|
import org.junit.jupiter.api.io.TempDir
|
||||||
import org.scalatest.junit.AssertionsForJUnit
|
import org.junit.jupiter.api.{BeforeEach, Test}
|
||||||
|
|
||||||
import scala.collection.JavaConversions._
|
import scala.collection.JavaConversions._
|
||||||
import scala.concurrent.ExecutionContext.Implicits.global
|
import scala.concurrent.ExecutionContext.Implicits.global
|
||||||
@@ -36,7 +35,7 @@ import scala.concurrent.{Await, Future}
|
|||||||
/**
|
/**
|
||||||
* Basic tests on the spark datasource
|
* Basic tests on the spark datasource
|
||||||
*/
|
*/
|
||||||
class TestDataSource extends AssertionsForJUnit {
|
class TestDataSource {
|
||||||
|
|
||||||
var spark: SparkSession = null
|
var spark: SparkSession = null
|
||||||
var dataGen: HoodieTestDataGenerator = null
|
var dataGen: HoodieTestDataGenerator = null
|
||||||
@@ -51,16 +50,14 @@ class TestDataSource extends AssertionsForJUnit {
|
|||||||
var basePath: String = null
|
var basePath: String = null
|
||||||
var fs: FileSystem = null
|
var fs: FileSystem = null
|
||||||
|
|
||||||
@Before def initialize() {
|
@BeforeEach def initialize(@TempDir tempDir: java.nio.file.Path) {
|
||||||
spark = SparkSession.builder
|
spark = SparkSession.builder
|
||||||
.appName("Hoodie Datasource test")
|
.appName("Hoodie Datasource test")
|
||||||
.master("local[2]")
|
.master("local[2]")
|
||||||
.config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
|
.config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
|
||||||
.getOrCreate
|
.getOrCreate
|
||||||
dataGen = new HoodieTestDataGenerator()
|
dataGen = new HoodieTestDataGenerator()
|
||||||
val folder = new TemporaryFolder
|
basePath = tempDir.toAbsolutePath.toString
|
||||||
folder.create
|
|
||||||
basePath = folder.getRoot.getAbsolutePath
|
|
||||||
fs = FSUtils.getFs(basePath, spark.sparkContext.hadoopConfiguration)
|
fs = FSUtils.getFs(basePath, spark.sparkContext.hadoopConfiguration)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -16,25 +16,25 @@
|
|||||||
*/
|
*/
|
||||||
|
|
||||||
import org.apache.avro.generic.GenericRecord
|
import org.apache.avro.generic.GenericRecord
|
||||||
|
import org.apache.hudi.DataSourceWriteOptions
|
||||||
|
import org.apache.hudi.common.config.TypedProperties
|
||||||
import org.apache.hudi.common.model.{EmptyHoodieRecordPayload, OverwriteWithLatestAvroPayload}
|
import org.apache.hudi.common.model.{EmptyHoodieRecordPayload, OverwriteWithLatestAvroPayload}
|
||||||
import org.apache.hudi.common.util.{Option, SchemaTestUtil}
|
import org.apache.hudi.common.util.{Option, SchemaTestUtil}
|
||||||
import org.apache.hudi.exception.{HoodieException, HoodieKeyException}
|
import org.apache.hudi.exception.{HoodieException, HoodieKeyException}
|
||||||
import org.apache.hudi.keygen.{ComplexKeyGenerator, GlobalDeleteKeyGenerator, SimpleKeyGenerator}
|
import org.apache.hudi.keygen.{ComplexKeyGenerator, GlobalDeleteKeyGenerator, SimpleKeyGenerator}
|
||||||
import org.apache.hudi.DataSourceWriteOptions
|
import org.junit.jupiter.api.Assertions.assertEquals
|
||||||
import org.apache.hudi.common.config.TypedProperties
|
import org.junit.jupiter.api.{BeforeEach, Test}
|
||||||
import org.junit.Assert._
|
import org.scalatest.Assertions.fail
|
||||||
import org.junit.{Before, Test}
|
|
||||||
import org.scalatest.junit.AssertionsForJUnit
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Tests on the default key generator, payload classes.
|
* Tests on the default key generator, payload classes.
|
||||||
*/
|
*/
|
||||||
class TestDataSourceDefaults extends AssertionsForJUnit {
|
class TestDataSourceDefaults {
|
||||||
|
|
||||||
val schema = SchemaTestUtil.getComplexEvolvedSchema
|
val schema = SchemaTestUtil.getComplexEvolvedSchema
|
||||||
var baseRecord: GenericRecord = _
|
var baseRecord: GenericRecord = _
|
||||||
|
|
||||||
@Before def initialize(): Unit = {
|
@BeforeEach def initialize(): Unit = {
|
||||||
baseRecord = SchemaTestUtil
|
baseRecord = SchemaTestUtil
|
||||||
.generateAvroRecordFromJson(schema, 1, "001", "f1")
|
.generateAvroRecordFromJson(schema, 1, "001", "f1")
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -184,8 +184,26 @@
|
|||||||
</dependency>
|
</dependency>
|
||||||
|
|
||||||
<dependency>
|
<dependency>
|
||||||
<groupId>junit</groupId>
|
<groupId>org.junit.jupiter</groupId>
|
||||||
<artifactId>junit</artifactId>
|
<artifactId>junit-jupiter-api</artifactId>
|
||||||
|
<scope>test</scope>
|
||||||
|
</dependency>
|
||||||
|
|
||||||
|
<dependency>
|
||||||
|
<groupId>org.junit.jupiter</groupId>
|
||||||
|
<artifactId>junit-jupiter-engine</artifactId>
|
||||||
|
<scope>test</scope>
|
||||||
|
</dependency>
|
||||||
|
|
||||||
|
<dependency>
|
||||||
|
<groupId>org.junit.vintage</groupId>
|
||||||
|
<artifactId>junit-vintage-engine</artifactId>
|
||||||
|
<scope>test</scope>
|
||||||
|
</dependency>
|
||||||
|
|
||||||
|
<dependency>
|
||||||
|
<groupId>org.junit.jupiter</groupId>
|
||||||
|
<artifactId>junit-jupiter-params</artifactId>
|
||||||
<scope>test</scope>
|
<scope>test</scope>
|
||||||
</dependency>
|
</dependency>
|
||||||
|
|
||||||
|
|||||||
@@ -363,6 +363,30 @@
|
|||||||
<classifier>${hive.exec.classifier}</classifier>
|
<classifier>${hive.exec.classifier}</classifier>
|
||||||
</dependency>
|
</dependency>
|
||||||
|
|
||||||
|
<dependency>
|
||||||
|
<groupId>org.junit.jupiter</groupId>
|
||||||
|
<artifactId>junit-jupiter-api</artifactId>
|
||||||
|
<scope>test</scope>
|
||||||
|
</dependency>
|
||||||
|
|
||||||
|
<dependency>
|
||||||
|
<groupId>org.junit.jupiter</groupId>
|
||||||
|
<artifactId>junit-jupiter-engine</artifactId>
|
||||||
|
<scope>test</scope>
|
||||||
|
</dependency>
|
||||||
|
|
||||||
|
<dependency>
|
||||||
|
<groupId>org.junit.vintage</groupId>
|
||||||
|
<artifactId>junit-vintage-engine</artifactId>
|
||||||
|
<scope>test</scope>
|
||||||
|
</dependency>
|
||||||
|
|
||||||
|
<dependency>
|
||||||
|
<groupId>org.junit.jupiter</groupId>
|
||||||
|
<artifactId>junit-jupiter-params</artifactId>
|
||||||
|
<scope>test</scope>
|
||||||
|
</dependency>
|
||||||
|
|
||||||
<dependency>
|
<dependency>
|
||||||
<groupId>org.mockito</groupId>
|
<groupId>org.mockito</groupId>
|
||||||
<artifactId>mockito-all</artifactId>
|
<artifactId>mockito-all</artifactId>
|
||||||
|
|||||||
@@ -29,9 +29,9 @@ import org.apache.spark.sql.SparkSession;
|
|||||||
import org.apache.spark.sql.types.DataTypes;
|
import org.apache.spark.sql.types.DataTypes;
|
||||||
import org.apache.spark.sql.types.StructField;
|
import org.apache.spark.sql.types.StructField;
|
||||||
import org.apache.spark.sql.types.StructType;
|
import org.apache.spark.sql.types.StructType;
|
||||||
import org.junit.After;
|
import org.junit.jupiter.api.AfterEach;
|
||||||
import org.junit.Before;
|
import org.junit.jupiter.api.BeforeEach;
|
||||||
import org.junit.Test;
|
import org.junit.jupiter.api.Test;
|
||||||
|
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
@@ -39,21 +39,21 @@ import java.util.List;
|
|||||||
import static org.apache.spark.sql.types.DataTypes.IntegerType;
|
import static org.apache.spark.sql.types.DataTypes.IntegerType;
|
||||||
import static org.apache.spark.sql.types.DataTypes.StringType;
|
import static org.apache.spark.sql.types.DataTypes.StringType;
|
||||||
import static org.apache.spark.sql.types.DataTypes.createStructField;
|
import static org.apache.spark.sql.types.DataTypes.createStructField;
|
||||||
import static org.junit.Assert.assertArrayEquals;
|
import static org.junit.jupiter.api.Assertions.assertArrayEquals;
|
||||||
import static org.junit.Assert.assertEquals;
|
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||||
|
|
||||||
public class TestChainedTransformer {
|
public class TestChainedTransformer {
|
||||||
|
|
||||||
private JavaSparkContext jsc;
|
private JavaSparkContext jsc;
|
||||||
private SparkSession sparkSession;
|
private SparkSession sparkSession;
|
||||||
|
|
||||||
@Before
|
@BeforeEach
|
||||||
public void setUp() {
|
public void setUp() {
|
||||||
jsc = UtilHelpers.buildSparkContext(this.getClass().getName() + "-hoodie", "local[2]");
|
jsc = UtilHelpers.buildSparkContext(this.getClass().getName() + "-hoodie", "local[2]");
|
||||||
sparkSession = SparkSession.builder().config(jsc.getConf()).getOrCreate();
|
sparkSession = SparkSession.builder().config(jsc.getConf()).getOrCreate();
|
||||||
}
|
}
|
||||||
|
|
||||||
@After
|
@AfterEach
|
||||||
public void tearDown() {
|
public void tearDown() {
|
||||||
jsc.stop();
|
jsc.stop();
|
||||||
}
|
}
|
||||||
|
|||||||
34
pom.xml
34
pom.xml
@@ -68,7 +68,7 @@
|
|||||||
|
|
||||||
<properties>
|
<properties>
|
||||||
<maven-jar-plugin.version>2.6</maven-jar-plugin.version>
|
<maven-jar-plugin.version>2.6</maven-jar-plugin.version>
|
||||||
<maven-surefire-plugin.version>2.19.1</maven-surefire-plugin.version>
|
<maven-surefire-plugin.version>3.0.0-M4</maven-surefire-plugin.version>
|
||||||
<maven-shade-plugin.version>3.1.1</maven-shade-plugin.version>
|
<maven-shade-plugin.version>3.1.1</maven-shade-plugin.version>
|
||||||
<maven-javadoc-plugin.version>3.1.1</maven-javadoc-plugin.version>
|
<maven-javadoc-plugin.version>3.1.1</maven-javadoc-plugin.version>
|
||||||
<maven-compiler-plugin.version>3.8.0</maven-compiler-plugin.version>
|
<maven-compiler-plugin.version>3.8.0</maven-compiler-plugin.version>
|
||||||
@@ -82,7 +82,9 @@
|
|||||||
<kafka.version>2.0.0</kafka.version>
|
<kafka.version>2.0.0</kafka.version>
|
||||||
<glassfish.version>2.17</glassfish.version>
|
<glassfish.version>2.17</glassfish.version>
|
||||||
<parquet.version>1.10.1</parquet.version>
|
<parquet.version>1.10.1</parquet.version>
|
||||||
<junit.version>4.11</junit.version>
|
<junit.version>4.12</junit.version>
|
||||||
|
<junit.jupiter.version>5.6.1</junit.jupiter.version>
|
||||||
|
<junit.vintage.version>5.6.1</junit.vintage.version>
|
||||||
<mockito.version>1.10.19</mockito.version>
|
<mockito.version>1.10.19</mockito.version>
|
||||||
<log4j.version>1.2.17</log4j.version>
|
<log4j.version>1.2.17</log4j.version>
|
||||||
<slf4j.version>1.7.5</slf4j.version>
|
<slf4j.version>1.7.5</slf4j.version>
|
||||||
@@ -820,6 +822,34 @@
|
|||||||
<scope>test</scope>
|
<scope>test</scope>
|
||||||
</dependency>
|
</dependency>
|
||||||
|
|
||||||
|
<dependency>
|
||||||
|
<groupId>org.junit.jupiter</groupId>
|
||||||
|
<artifactId>junit-jupiter-api</artifactId>
|
||||||
|
<version>${junit.jupiter.version}</version>
|
||||||
|
<scope>test</scope>
|
||||||
|
</dependency>
|
||||||
|
|
||||||
|
<dependency>
|
||||||
|
<groupId>org.junit.jupiter</groupId>
|
||||||
|
<artifactId>junit-jupiter-engine</artifactId>
|
||||||
|
<version>${junit.jupiter.version}</version>
|
||||||
|
<scope>test</scope>
|
||||||
|
</dependency>
|
||||||
|
|
||||||
|
<dependency>
|
||||||
|
<groupId>org.junit.vintage</groupId>
|
||||||
|
<artifactId>junit-vintage-engine</artifactId>
|
||||||
|
<version>${junit.vintage.version}</version>
|
||||||
|
<scope>test</scope>
|
||||||
|
</dependency>
|
||||||
|
|
||||||
|
<dependency>
|
||||||
|
<groupId>org.junit.jupiter</groupId>
|
||||||
|
<artifactId>junit-jupiter-params</artifactId>
|
||||||
|
<version>${junit.jupiter.version}</version>
|
||||||
|
<scope>test</scope>
|
||||||
|
</dependency>
|
||||||
|
|
||||||
<dependency>
|
<dependency>
|
||||||
<groupId>org.mockito</groupId>
|
<groupId>org.mockito</groupId>
|
||||||
<artifactId>mockito-all</artifactId>
|
<artifactId>mockito-all</artifactId>
|
||||||
|
|||||||
Reference in New Issue
Block a user