From 6b47ef6ed223e10a05b91acc3331cff2fa069d87 Mon Sep 17 00:00:00 2001 From: xicm <36392121+xicm@users.noreply.github.com> Date: Mon, 9 May 2022 16:35:50 +0800 Subject: [PATCH] =?UTF-8?q?[HUDI-4053]=20Flaky=20ITTestHoodieDataSource.te?= =?UTF-8?q?stStreamWriteBatchReadOpti=E2=80=A6=20(#5526)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * [HUDI-4053] Flaky ITTestHoodieDataSource.testStreamWriteBatchReadOptimized Co-authored-by: xicm --- .../apache/hudi/table/ITTestHoodieDataSource.java | 10 +++++++++- .../test/java/org/apache/hudi/utils/TestData.java | 12 ++++++++++++ 2 files changed, 21 insertions(+), 1 deletion(-) diff --git a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java index 088ddb260..0c423df6b 100644 --- a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java +++ b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java @@ -240,7 +240,15 @@ public class ITTestHoodieDataSource extends AbstractTestBase { List rows = CollectionUtil.iterableToList( () -> streamTableEnv.sqlQuery("select * from t1").execute().collect()); - assertRowsEquals(rows, TestData.DATA_SET_SOURCE_INSERT); + + // the test is flaky based on whether the first compaction is pending when + // scheduling the 2nd compaction. + // see details in CompactionPlanOperator#scheduleCompaction. + if (rows.size() < TestData.DATA_SET_SOURCE_INSERT.size()) { + assertRowsEquals(rows, TestData.DATA_SET_SOURCE_INSERT_FIRST_COMMIT); + } else { + assertRowsEquals(rows, TestData.DATA_SET_SOURCE_INSERT); + } } @Test diff --git a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestData.java b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestData.java index c1e924056..61f1657c2 100644 --- a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestData.java +++ b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestData.java @@ -164,6 +164,18 @@ public class TestData { TimestampData.fromEpochMillis(8000), StringData.fromString("par4")) ); + // data set of test_source.data first commit. + public static List DATA_SET_SOURCE_INSERT_FIRST_COMMIT = Arrays.asList( + insertRow(StringData.fromString("id1"), StringData.fromString("Danny"), 23, + TimestampData.fromEpochMillis(1000), StringData.fromString("par1")), + insertRow(StringData.fromString("id2"), StringData.fromString("Stephen"), 33, + TimestampData.fromEpochMillis(2000), StringData.fromString("par1")), + insertRow(StringData.fromString("id3"), StringData.fromString("Julian"), 53, + TimestampData.fromEpochMillis(3000), StringData.fromString("par2")), + insertRow(StringData.fromString("id4"), StringData.fromString("Fabian"), 31, + TimestampData.fromEpochMillis(4000), StringData.fromString("par2")) + ); + // data set of test_source.data latest commit. public static List DATA_SET_SOURCE_INSERT_LATEST_COMMIT = Arrays.asList( insertRow(StringData.fromString("id5"), StringData.fromString("Sophia"), 18,