Fix Integration test flakiness in HoodieJavaStreamingApp (#1967)
This commit is contained in:
committed by
GitHub
parent
9bde6d616c
commit
b8f4a30efd
@@ -19,6 +19,7 @@
|
|||||||
package org.apache.hudi.integ;
|
package org.apache.hudi.integ;
|
||||||
|
|
||||||
import org.apache.hudi.common.model.HoodieTableType;
|
import org.apache.hudi.common.model.HoodieTableType;
|
||||||
|
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
|
||||||
import org.apache.hudi.common.util.Option;
|
import org.apache.hudi.common.util.Option;
|
||||||
import org.apache.hudi.common.util.collection.Pair;
|
import org.apache.hudi.common.util.collection.Pair;
|
||||||
|
|
||||||
@@ -34,20 +35,23 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
|
|||||||
*/
|
*/
|
||||||
public class ITTestHoodieSanity extends ITTestBase {
|
public class ITTestHoodieSanity extends ITTestBase {
|
||||||
|
|
||||||
|
private static final String HDFS_BASE_URL = "hdfs://namenode";
|
||||||
|
private static final String HDFS_STREAMING_SOURCE = HDFS_BASE_URL + "/streaming/source/";
|
||||||
|
private static final String HDFS_STREAMING_CKPT = HDFS_BASE_URL + "/streaming/ckpt/";
|
||||||
|
|
||||||
enum PartitionType {
|
enum PartitionType {
|
||||||
SINGLE_KEY_PARTITIONED, MULTI_KEYS_PARTITIONED, NON_PARTITIONED,
|
SINGLE_KEY_PARTITIONED, MULTI_KEYS_PARTITIONED, NON_PARTITIONED,
|
||||||
}
|
}
|
||||||
|
|
||||||
@ParameterizedTest
|
@Test
|
||||||
@ValueSource(strings = { HOODIE_JAVA_APP, HOODIE_JAVA_STREAMING_APP })
|
|
||||||
/**
|
/**
|
||||||
* A basic integration test that runs HoodieJavaApp to create a sample COW Hoodie with single partition key data-set
|
* A basic integration test that runs HoodieJavaApp to create a sample COW Hoodie with single partition key data-set
|
||||||
* and performs upserts on it. Hive integration and upsert functionality is checked by running a count query in hive
|
* and performs upserts on it. Hive integration and upsert functionality is checked by running a count query in hive
|
||||||
* console.
|
* console.
|
||||||
*/
|
*/
|
||||||
public void testRunHoodieJavaAppOnSinglePartitionKeyCOWTable(String command) throws Exception {
|
public void testRunHoodieJavaAppOnSinglePartitionKeyCOWTable() throws Exception {
|
||||||
String hiveTableName = "docker_hoodie_single_partition_key_cow_test";
|
String hiveTableName = "docker_hoodie_single_partition_key_cow_test_" + HoodieActiveTimeline.createNewInstantTime();
|
||||||
testRunHoodieJavaApp(command, hiveTableName, HoodieTableType.COPY_ON_WRITE.name(),
|
testRunHoodieJavaApp(hiveTableName, HoodieTableType.COPY_ON_WRITE.name(),
|
||||||
PartitionType.SINGLE_KEY_PARTITIONED);
|
PartitionType.SINGLE_KEY_PARTITIONED);
|
||||||
dropHiveTables(hiveTableName, HoodieTableType.COPY_ON_WRITE.name());
|
dropHiveTables(hiveTableName, HoodieTableType.COPY_ON_WRITE.name());
|
||||||
}
|
}
|
||||||
@@ -59,9 +63,9 @@ public class ITTestHoodieSanity extends ITTestBase {
|
|||||||
* data-set and performs upserts on it. Hive integration and upsert functionality is checked by running a count query
|
* data-set and performs upserts on it. Hive integration and upsert functionality is checked by running a count query
|
||||||
* in hive console.
|
* in hive console.
|
||||||
*/
|
*/
|
||||||
public void testRunHoodieJavaAppOnMultiPartitionKeysCOWTable(String command) throws Exception {
|
public void testRunHoodieJavaAppOnMultiPartitionKeysCOWTable() throws Exception {
|
||||||
String hiveTableName = "docker_hoodie_multi_partition_key_cow_test";
|
String hiveTableName = "docker_hoodie_multi_partition_key_cow_test_" + HoodieActiveTimeline.createNewInstantTime();
|
||||||
testRunHoodieJavaApp(command, hiveTableName, HoodieTableType.COPY_ON_WRITE.name(),
|
testRunHoodieJavaApp(HOODIE_JAVA_APP, hiveTableName, HoodieTableType.COPY_ON_WRITE.name(),
|
||||||
PartitionType.MULTI_KEYS_PARTITIONED);
|
PartitionType.MULTI_KEYS_PARTITIONED);
|
||||||
dropHiveTables(hiveTableName, HoodieTableType.COPY_ON_WRITE.name());
|
dropHiveTables(hiveTableName, HoodieTableType.COPY_ON_WRITE.name());
|
||||||
}
|
}
|
||||||
@@ -73,21 +77,20 @@ public class ITTestHoodieSanity extends ITTestBase {
|
|||||||
* console.
|
* console.
|
||||||
*/
|
*/
|
||||||
public void testRunHoodieJavaAppOnNonPartitionedCOWTable() throws Exception {
|
public void testRunHoodieJavaAppOnNonPartitionedCOWTable() throws Exception {
|
||||||
String hiveTableName = "docker_hoodie_non_partition_key_cow_test";
|
String hiveTableName = "docker_hoodie_non_partition_key_cow_test_" + HoodieActiveTimeline.createNewInstantTime();
|
||||||
testRunHoodieJavaApp(hiveTableName, HoodieTableType.COPY_ON_WRITE.name(), PartitionType.NON_PARTITIONED);
|
testRunHoodieJavaApp(hiveTableName, HoodieTableType.COPY_ON_WRITE.name(), PartitionType.NON_PARTITIONED);
|
||||||
dropHiveTables(hiveTableName, HoodieTableType.COPY_ON_WRITE.name());
|
dropHiveTables(hiveTableName, HoodieTableType.COPY_ON_WRITE.name());
|
||||||
}
|
}
|
||||||
|
|
||||||
@ParameterizedTest
|
@Test
|
||||||
@ValueSource(strings = { HOODIE_JAVA_APP, HOODIE_JAVA_STREAMING_APP })
|
|
||||||
/**
|
/**
|
||||||
* A basic integration test that runs HoodieJavaApp to create a sample MOR Hoodie with single partition key data-set
|
* A basic integration test that runs HoodieJavaApp to create a sample MOR Hoodie with single partition key data-set
|
||||||
* and performs upserts on it. Hive integration and upsert functionality is checked by running a count query in hive
|
* and performs upserts on it. Hive integration and upsert functionality is checked by running a count query in hive
|
||||||
* console.
|
* console.
|
||||||
*/
|
*/
|
||||||
public void testRunHoodieJavaAppOnSinglePartitionKeyMORTable(String command) throws Exception {
|
public void testRunHoodieJavaAppOnSinglePartitionKeyMORTable() throws Exception {
|
||||||
String hiveTableName = "docker_hoodie_single_partition_key_mor_test";
|
String hiveTableName = "docker_hoodie_single_partition_key_mor_test_" + HoodieActiveTimeline.createNewInstantTime();
|
||||||
testRunHoodieJavaApp(command, hiveTableName, HoodieTableType.MERGE_ON_READ.name(),
|
testRunHoodieJavaApp(hiveTableName, HoodieTableType.MERGE_ON_READ.name(),
|
||||||
PartitionType.SINGLE_KEY_PARTITIONED);
|
PartitionType.SINGLE_KEY_PARTITIONED);
|
||||||
dropHiveTables(hiveTableName, HoodieTableType.MERGE_ON_READ.name());
|
dropHiveTables(hiveTableName, HoodieTableType.MERGE_ON_READ.name());
|
||||||
}
|
}
|
||||||
@@ -100,7 +103,7 @@ public class ITTestHoodieSanity extends ITTestBase {
|
|||||||
* in hive console.
|
* in hive console.
|
||||||
*/
|
*/
|
||||||
public void testRunHoodieJavaAppOnMultiPartitionKeysMORTable(String command) throws Exception {
|
public void testRunHoodieJavaAppOnMultiPartitionKeysMORTable(String command) throws Exception {
|
||||||
String hiveTableName = "docker_hoodie_multi_partition_key_mor_test";
|
String hiveTableName = "docker_hoodie_multi_partition_key_mor_test_" + HoodieActiveTimeline.createNewInstantTime();
|
||||||
testRunHoodieJavaApp(command, hiveTableName, HoodieTableType.MERGE_ON_READ.name(),
|
testRunHoodieJavaApp(command, hiveTableName, HoodieTableType.MERGE_ON_READ.name(),
|
||||||
PartitionType.MULTI_KEYS_PARTITIONED);
|
PartitionType.MULTI_KEYS_PARTITIONED);
|
||||||
dropHiveTables(hiveTableName, HoodieTableType.MERGE_ON_READ.name());
|
dropHiveTables(hiveTableName, HoodieTableType.MERGE_ON_READ.name());
|
||||||
@@ -113,7 +116,7 @@ public class ITTestHoodieSanity extends ITTestBase {
|
|||||||
* console.
|
* console.
|
||||||
*/
|
*/
|
||||||
public void testRunHoodieJavaAppOnNonPartitionedMORTable() throws Exception {
|
public void testRunHoodieJavaAppOnNonPartitionedMORTable() throws Exception {
|
||||||
String hiveTableName = "docker_hoodie_non_partition_key_mor_test";
|
String hiveTableName = "docker_hoodie_non_partition_key_mor_test_" + HoodieActiveTimeline.createNewInstantTime();
|
||||||
testRunHoodieJavaApp(hiveTableName, HoodieTableType.MERGE_ON_READ.name(), PartitionType.NON_PARTITIONED);
|
testRunHoodieJavaApp(hiveTableName, HoodieTableType.MERGE_ON_READ.name(), PartitionType.NON_PARTITIONED);
|
||||||
dropHiveTables(hiveTableName, HoodieTableType.MERGE_ON_READ.name());
|
dropHiveTables(hiveTableName, HoodieTableType.MERGE_ON_READ.name());
|
||||||
}
|
}
|
||||||
@@ -127,7 +130,7 @@ public class ITTestHoodieSanity extends ITTestBase {
|
|||||||
throws Exception {
|
throws Exception {
|
||||||
|
|
||||||
String hdfsPath = "/" + hiveTableName;
|
String hdfsPath = "/" + hiveTableName;
|
||||||
String hdfsUrl = "hdfs://namenode" + hdfsPath;
|
String hdfsUrl = HDFS_BASE_URL + hdfsPath;
|
||||||
|
|
||||||
// Drop Table if it exists
|
// Drop Table if it exists
|
||||||
try {
|
try {
|
||||||
@@ -155,6 +158,13 @@ public class ITTestHoodieSanity extends ITTestBase {
|
|||||||
cmd = command + " --hive-sync --table-path " + hdfsUrl + " --hive-url " + HIVE_SERVER_JDBC_URL
|
cmd = command + " --hive-sync --table-path " + hdfsUrl + " --hive-url " + HIVE_SERVER_JDBC_URL
|
||||||
+ " --table-type " + tableType + " --hive-table " + hiveTableName + " --non-partitioned";
|
+ " --table-type " + tableType + " --hive-table " + hiveTableName + " --non-partitioned";
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (command.equals(HOODIE_JAVA_STREAMING_APP)) {
|
||||||
|
String streamingSourcePath = HDFS_STREAMING_SOURCE + hiveTableName;
|
||||||
|
String streamingCkptPath = HDFS_STREAMING_CKPT + hiveTableName;
|
||||||
|
cmd = cmd + " --streaming-source-path " + streamingSourcePath
|
||||||
|
+ " --streaming-checkpointing-path " + streamingCkptPath;
|
||||||
|
}
|
||||||
executeCommandStringInDocker(ADHOC_1_CONTAINER, cmd, true);
|
executeCommandStringInDocker(ADHOC_1_CONTAINER, cmd, true);
|
||||||
|
|
||||||
String snapshotTableName = tableType.equals(HoodieTableType.MERGE_ON_READ.name())
|
String snapshotTableName = tableType.equals(HoodieTableType.MERGE_ON_READ.name())
|
||||||
|
|||||||
@@ -74,7 +74,7 @@ public class HoodieDataSourceHelpers {
|
|||||||
if (metaClient.getTableType().equals(HoodieTableType.MERGE_ON_READ)) {
|
if (metaClient.getTableType().equals(HoodieTableType.MERGE_ON_READ)) {
|
||||||
return metaClient.getActiveTimeline().getTimelineOfActions(
|
return metaClient.getActiveTimeline().getTimelineOfActions(
|
||||||
CollectionUtils.createSet(HoodieActiveTimeline.COMMIT_ACTION,
|
CollectionUtils.createSet(HoodieActiveTimeline.COMMIT_ACTION,
|
||||||
HoodieActiveTimeline.DELTA_COMMIT_ACTION));
|
HoodieActiveTimeline.DELTA_COMMIT_ACTION)).filterCompletedInstants();
|
||||||
} else {
|
} else {
|
||||||
return metaClient.getCommitTimeline().filterCompletedInstants();
|
return metaClient.getCommitTimeline().filterCompletedInstants();
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -16,6 +16,7 @@
|
|||||||
* limitations under the License.
|
* limitations under the License.
|
||||||
*/
|
*/
|
||||||
|
|
||||||
|
import org.apache.hadoop.fs.Path;
|
||||||
import org.apache.hudi.DataSourceReadOptions;
|
import org.apache.hudi.DataSourceReadOptions;
|
||||||
import org.apache.hudi.DataSourceWriteOptions;
|
import org.apache.hudi.DataSourceWriteOptions;
|
||||||
import org.apache.hudi.HoodieDataSourceHelpers;
|
import org.apache.hudi.HoodieDataSourceHelpers;
|
||||||
@@ -120,6 +121,9 @@ public class HoodieJavaApp {
|
|||||||
dataGen = new HoodieTestDataGenerator();
|
dataGen = new HoodieTestDataGenerator();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Explicitly clear up the hoodie table path if it exists.
|
||||||
|
fs.delete(new Path(tablePath), true);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Commit with only inserts
|
* Commit with only inserts
|
||||||
*/
|
*/
|
||||||
|
|||||||
@@ -273,7 +273,9 @@ public class HoodieJavaStreamingApp {
|
|||||||
public int addInputAndValidateIngestion(SparkSession spark, FileSystem fs, String srcPath,
|
public int addInputAndValidateIngestion(SparkSession spark, FileSystem fs, String srcPath,
|
||||||
int initialCommits, int expRecords,
|
int initialCommits, int expRecords,
|
||||||
Dataset<Row> inputDF1, Dataset<Row> inputDF2, boolean instantTimeValidation) throws Exception {
|
Dataset<Row> inputDF1, Dataset<Row> inputDF2, boolean instantTimeValidation) throws Exception {
|
||||||
inputDF1.write().mode(SaveMode.Append).json(srcPath);
|
// Ensure, we always write only one file. This is very important to ensure a single batch is reliably read
|
||||||
|
// atomically by one iteration of spark streaming.
|
||||||
|
inputDF1.coalesce(1).write().mode(SaveMode.Append).json(srcPath);
|
||||||
|
|
||||||
int numExpCommits = initialCommits + 1;
|
int numExpCommits = initialCommits + 1;
|
||||||
// wait for spark streaming to process one microbatch
|
// wait for spark streaming to process one microbatch
|
||||||
|
|||||||
@@ -102,7 +102,7 @@ class TestStructuredStreaming extends HoodieClientTestBase {
|
|||||||
}
|
}
|
||||||
|
|
||||||
val f2 = Future {
|
val f2 = Future {
|
||||||
inputDF1.write.mode(SaveMode.Append).json(sourcePath)
|
inputDF1.coalesce(1).write.mode(SaveMode.Append).json(sourcePath)
|
||||||
// wait for spark streaming to process one microbatch
|
// wait for spark streaming to process one microbatch
|
||||||
val currNumCommits = waitTillAtleastNCommits(fs, destPath, 1, 120, 5)
|
val currNumCommits = waitTillAtleastNCommits(fs, destPath, 1, 120, 5)
|
||||||
assertTrue(HoodieDataSourceHelpers.hasNewCommits(fs, destPath, "000"))
|
assertTrue(HoodieDataSourceHelpers.hasNewCommits(fs, destPath, "000"))
|
||||||
@@ -112,7 +112,7 @@ class TestStructuredStreaming extends HoodieClientTestBase {
|
|||||||
.load(destPath + "/*/*/*/*")
|
.load(destPath + "/*/*/*/*")
|
||||||
assert(hoodieROViewDF1.count() == 100)
|
assert(hoodieROViewDF1.count() == 100)
|
||||||
|
|
||||||
inputDF2.write.mode(SaveMode.Append).json(sourcePath)
|
inputDF2.coalesce(1).write.mode(SaveMode.Append).json(sourcePath)
|
||||||
// wait for spark streaming to process one microbatch
|
// wait for spark streaming to process one microbatch
|
||||||
waitTillAtleastNCommits(fs, destPath, currNumCommits + 1, 120, 5)
|
waitTillAtleastNCommits(fs, destPath, currNumCommits + 1, 120, 5)
|
||||||
val commitInstantTime2 = HoodieDataSourceHelpers.latestCommit(fs, destPath)
|
val commitInstantTime2 = HoodieDataSourceHelpers.latestCommit(fs, destPath)
|
||||||
|
|||||||
Reference in New Issue
Block a user