[HUDI-4224] Fix CI issues (#5842)
- Upgrade junit to 5.7.2 - Downgrade surefire and failsafe to 2.22.2 - Fix test failures that were previously not reported - Improve azure pipeline configs Co-authored-by: liujinhui1994 <965147871@qq.com> Co-authored-by: Y Ethan Guo <ethan.guoyihua@gmail.com>
This commit is contained in:
@@ -97,7 +97,7 @@ public class HoodieDeltaStreamerTestBase extends UtilitiesTestBase {
|
||||
|
||||
@BeforeAll
|
||||
public static void initClass() throws Exception {
|
||||
UtilitiesTestBase.initClass(true);
|
||||
UtilitiesTestBase.initTestServices(true, true);
|
||||
PARQUET_SOURCE_ROOT = dfsBasePath + "/parquetFiles";
|
||||
ORC_SOURCE_ROOT = dfsBasePath + "/orcFiles";
|
||||
JSON_KAFKA_SOURCE_ROOT = dfsBasePath + "/jsonKafkaFiles";
|
||||
|
||||
@@ -23,13 +23,14 @@ import org.apache.hudi.client.WriteStatus;
|
||||
import org.apache.hudi.common.config.HoodieMetadataConfig;
|
||||
import org.apache.hudi.common.config.TypedProperties;
|
||||
import org.apache.hudi.common.model.HoodieRecord;
|
||||
import org.apache.hudi.common.table.HoodieTableMetaClient;
|
||||
import org.apache.hudi.common.table.timeline.versioning.TimelineLayoutVersion;
|
||||
import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
|
||||
import org.apache.hudi.common.util.Option;
|
||||
import org.apache.hudi.common.util.collection.Pair;
|
||||
import org.apache.hudi.config.HoodieCompactionConfig;
|
||||
import org.apache.hudi.config.HoodieWriteConfig;
|
||||
import org.apache.hudi.testutils.HoodieClientTestHarness;
|
||||
import org.apache.hudi.testutils.SparkClientFunctionalTestHarness;
|
||||
import org.apache.hudi.utilities.schema.SchemaProvider;
|
||||
import org.apache.hudi.utilities.sources.helpers.IncrSourceHelper;
|
||||
|
||||
@@ -37,7 +38,6 @@ import org.apache.avro.Schema;
|
||||
import org.apache.spark.api.java.JavaRDD;
|
||||
import org.apache.spark.sql.Dataset;
|
||||
import org.apache.spark.sql.Row;
|
||||
import org.junit.jupiter.api.AfterEach;
|
||||
import org.junit.jupiter.api.Assertions;
|
||||
import org.junit.jupiter.api.BeforeEach;
|
||||
import org.junit.jupiter.api.Test;
|
||||
@@ -50,28 +50,27 @@ import static org.apache.hudi.testutils.Assertions.assertNoWriteErrors;
|
||||
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||
import static org.junit.jupiter.api.Assertions.assertFalse;
|
||||
|
||||
public class TestHoodieIncrSource extends HoodieClientTestHarness {
|
||||
public class TestHoodieIncrSource extends SparkClientFunctionalTestHarness {
|
||||
|
||||
private HoodieTestDataGenerator dataGen;
|
||||
private HoodieTableMetaClient metaClient;
|
||||
|
||||
@BeforeEach
|
||||
public void setUp() throws IOException {
|
||||
initResources();
|
||||
}
|
||||
|
||||
@AfterEach
|
||||
public void tearDown() throws IOException {
|
||||
cleanupResources();
|
||||
dataGen = new HoodieTestDataGenerator();
|
||||
metaClient = getHoodieMetaClient(hadoopConf(), basePath());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testHoodieIncrSource() throws IOException {
|
||||
HoodieWriteConfig writeConfig = getConfigBuilder(basePath)
|
||||
HoodieWriteConfig writeConfig = getConfigBuilder(basePath(), metaClient)
|
||||
.withCompactionConfig(HoodieCompactionConfig.newBuilder()
|
||||
.archiveCommitsWith(2, 3).retainCommits(1).build())
|
||||
.withMetadataConfig(HoodieMetadataConfig.newBuilder()
|
||||
.withMaxNumDeltaCommitsBeforeCompaction(1).build())
|
||||
.build();
|
||||
|
||||
SparkRDDWriteClient writeClient = new SparkRDDWriteClient(context, writeConfig);
|
||||
SparkRDDWriteClient writeClient = getHoodieWriteClient(writeConfig);
|
||||
Pair<String, List<HoodieRecord>> inserts = writeRecords(writeClient, true, null, "100");
|
||||
Pair<String, List<HoodieRecord>> inserts2 = writeRecords(writeClient, true, null, "200");
|
||||
Pair<String, List<HoodieRecord>> inserts3 = writeRecords(writeClient, true, null, "300");
|
||||
@@ -97,15 +96,16 @@ public class TestHoodieIncrSource extends HoodieClientTestHarness {
|
||||
|
||||
// insert new batch and ensure the checkpoint moves
|
||||
readAndAssert(IncrSourceHelper.MissingCheckpointStrategy.READ_LATEST, Option.of(inserts5.getKey()), 100, inserts6.getKey());
|
||||
writeClient.close();
|
||||
}
|
||||
|
||||
private void readAndAssert(IncrSourceHelper.MissingCheckpointStrategy missingCheckpointStrategy, Option<String> checkpointToPull, int expectedCount, String expectedCheckpoint) {
|
||||
|
||||
Properties properties = new Properties();
|
||||
properties.setProperty("hoodie.deltastreamer.source.hoodieincr.path", basePath);
|
||||
properties.setProperty("hoodie.deltastreamer.source.hoodieincr.path", basePath());
|
||||
properties.setProperty("hoodie.deltastreamer.source.hoodieincr.missing.checkpoint.strategy", missingCheckpointStrategy.name());
|
||||
TypedProperties typedProperties = new TypedProperties(properties);
|
||||
HoodieIncrSource incrSource = new HoodieIncrSource(typedProperties, jsc, sparkSession, new TestSchemaProvider(HoodieTestDataGenerator.AVRO_SCHEMA));
|
||||
HoodieIncrSource incrSource = new HoodieIncrSource(typedProperties, jsc(), spark(), new DummySchemaProvider(HoodieTestDataGenerator.AVRO_SCHEMA));
|
||||
|
||||
// read everything until latest
|
||||
Pair<Option<Dataset<Row>>, String> batchCheckPoint = incrSource.fetchNextBatch(checkpointToPull, 500);
|
||||
@@ -118,27 +118,27 @@ public class TestHoodieIncrSource extends HoodieClientTestHarness {
|
||||
Assertions.assertEquals(batchCheckPoint.getRight(), expectedCheckpoint);
|
||||
}
|
||||
|
||||
public Pair<String, List<HoodieRecord>> writeRecords(SparkRDDWriteClient writeClient, boolean insert, List<HoodieRecord> insertRecords, String commit) throws IOException {
|
||||
private Pair<String, List<HoodieRecord>> writeRecords(SparkRDDWriteClient writeClient, boolean insert, List<HoodieRecord> insertRecords, String commit) throws IOException {
|
||||
writeClient.startCommitWithTime(commit);
|
||||
List<HoodieRecord> records = insert ? dataGen.generateInserts(commit, 100) : dataGen.generateUpdates(commit, insertRecords);
|
||||
JavaRDD<WriteStatus> result = writeClient.upsert(jsc.parallelize(records, 1), commit);
|
||||
JavaRDD<WriteStatus> result = writeClient.upsert(jsc().parallelize(records, 1), commit);
|
||||
List<WriteStatus> statuses = result.collect();
|
||||
assertNoWriteErrors(statuses);
|
||||
return Pair.of(commit, records);
|
||||
}
|
||||
|
||||
public HoodieWriteConfig.Builder getConfigBuilder(String basePath) {
|
||||
private HoodieWriteConfig.Builder getConfigBuilder(String basePath, HoodieTableMetaClient metaClient) {
|
||||
return HoodieWriteConfig.newBuilder().withPath(basePath).withSchema(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA)
|
||||
.withParallelism(2, 2).withBulkInsertParallelism(2).withFinalizeWriteParallelism(2).withDeleteParallelism(2)
|
||||
.withTimelineLayoutVersion(TimelineLayoutVersion.CURR_VERSION)
|
||||
.forTable("test-hoodie-incr-source");
|
||||
.forTable(metaClient.getTableConfig().getTableName());
|
||||
}
|
||||
|
||||
class TestSchemaProvider extends SchemaProvider {
|
||||
private static class DummySchemaProvider extends SchemaProvider {
|
||||
|
||||
private final Schema schema;
|
||||
|
||||
public TestSchemaProvider(Schema schema) {
|
||||
public DummySchemaProvider(Schema schema) {
|
||||
super(new TypedProperties());
|
||||
this.schema = schema;
|
||||
}
|
||||
|
||||
@@ -59,7 +59,7 @@ public class TestSqlSource extends UtilitiesTestBase {
|
||||
|
||||
@BeforeAll
|
||||
public static void initClass() throws Exception {
|
||||
UtilitiesTestBase.initClass();
|
||||
UtilitiesTestBase.initTestServices(false, false);
|
||||
}
|
||||
|
||||
@AfterAll
|
||||
|
||||
@@ -64,7 +64,7 @@ public abstract class TestAbstractDebeziumSource extends UtilitiesTestBase {
|
||||
|
||||
@BeforeAll
|
||||
public static void initClass() throws Exception {
|
||||
UtilitiesTestBase.initClass(false);
|
||||
UtilitiesTestBase.initTestServices(false, false);
|
||||
}
|
||||
|
||||
@AfterAll
|
||||
|
||||
@@ -104,30 +104,30 @@ public class UtilitiesTestBase {
|
||||
protected static HiveServer2 hiveServer;
|
||||
protected static HiveTestService hiveTestService;
|
||||
protected static ZookeeperTestService zookeeperTestService;
|
||||
private static ObjectMapper mapper = new ObjectMapper();
|
||||
private static final ObjectMapper MAPPER = new ObjectMapper();
|
||||
|
||||
@BeforeAll
|
||||
public static void initClass() throws Exception {
|
||||
// Set log level to WARN for spark logs to avoid exceeding log limit in travis
|
||||
public static void setLogLevel() {
|
||||
Logger rootLogger = Logger.getRootLogger();
|
||||
rootLogger.setLevel(Level.ERROR);
|
||||
Logger.getLogger("org.apache.spark").setLevel(Level.WARN);
|
||||
initClass(true);
|
||||
}
|
||||
|
||||
public static void initClass(boolean startHiveService) throws Exception {
|
||||
public static void initTestServices(boolean needsHive, boolean needsZookeeper) throws Exception {
|
||||
hdfsTestService = new HdfsTestService();
|
||||
zookeeperTestService = new ZookeeperTestService(hdfsTestService.getHadoopConf());
|
||||
dfsCluster = hdfsTestService.start(true);
|
||||
dfs = dfsCluster.getFileSystem();
|
||||
dfsBasePath = dfs.getWorkingDirectory().toString();
|
||||
dfs.mkdirs(new Path(dfsBasePath));
|
||||
if (startHiveService) {
|
||||
if (needsHive) {
|
||||
hiveTestService = new HiveTestService(hdfsTestService.getHadoopConf());
|
||||
hiveServer = hiveTestService.start();
|
||||
clearHiveDb();
|
||||
}
|
||||
zookeeperTestService.start();
|
||||
if (needsZookeeper) {
|
||||
zookeeperTestService = new ZookeeperTestService(hdfsTestService.getHadoopConf());
|
||||
zookeeperTestService.start();
|
||||
}
|
||||
}
|
||||
|
||||
@AfterAll
|
||||
@@ -288,11 +288,11 @@ public class UtilitiesTestBase {
|
||||
String[] lines, FileSystem fs, String targetPath) throws IOException {
|
||||
Builder csvSchemaBuilder = CsvSchema.builder();
|
||||
|
||||
ArrayNode arrayNode = mapper.createArrayNode();
|
||||
ArrayNode arrayNode = MAPPER.createArrayNode();
|
||||
Arrays.stream(lines).forEachOrdered(
|
||||
line -> {
|
||||
try {
|
||||
arrayNode.add(mapper.readValue(line, ObjectNode.class));
|
||||
arrayNode.add(MAPPER.readValue(line, ObjectNode.class));
|
||||
} catch (IOException e) {
|
||||
throw new HoodieIOException(
|
||||
"Error converting json records into CSV format: " + e.getMessage());
|
||||
|
||||
@@ -54,7 +54,7 @@ public abstract class AbstractCloudObjectsSourceTestBase extends UtilitiesTestBa
|
||||
|
||||
@BeforeAll
|
||||
public static void initClass() throws Exception {
|
||||
UtilitiesTestBase.initClass();
|
||||
UtilitiesTestBase.initTestServices(false, false);
|
||||
}
|
||||
|
||||
@AfterAll
|
||||
|
||||
@@ -62,7 +62,7 @@ public abstract class AbstractDFSSourceTestBase extends UtilitiesTestBase {
|
||||
|
||||
@BeforeAll
|
||||
public static void initClass() throws Exception {
|
||||
UtilitiesTestBase.initClass();
|
||||
UtilitiesTestBase.initTestServices(false, false);
|
||||
}
|
||||
|
||||
@AfterAll
|
||||
|
||||
@@ -49,7 +49,7 @@ public class TestSqlFileBasedTransformer extends UtilitiesTestBase {
|
||||
|
||||
@BeforeAll
|
||||
public static void initClass() throws Exception {
|
||||
UtilitiesTestBase.initClass();
|
||||
UtilitiesTestBase.initTestServices(false, false);
|
||||
UtilitiesTestBase.Helpers.copyToDFS(
|
||||
"delta-streamer-config/sql-file-transformer.sql",
|
||||
UtilitiesTestBase.dfs,
|
||||
|
||||
Reference in New Issue
Block a user