1
0

Fixing async clustering job test in TestHoodieDeltaStreamer (#5317)

This commit is contained in:
Sivabalan Narayanan
2022-04-18 08:08:33 -04:00
committed by GitHub
parent b8e465fdfc
commit 05dfc39c29

View File

@@ -104,6 +104,7 @@ import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructField;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Tag;
@@ -123,6 +124,7 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
@@ -380,7 +382,7 @@ public class TestHoodieDeltaStreamer extends HoodieDeltaStreamerTestBase {
ret = false;
}
}
return true;
return ret;
});
res.get(timeoutInSecs, TimeUnit.SECONDS);
}
@@ -1028,17 +1030,20 @@ public class TestHoodieDeltaStreamer extends HoodieDeltaStreamerTestBase {
});
}
@Disabled("HUDI-3710 to fix the ConcurrentModificationException")
@ParameterizedTest
@ValueSource(booleans = {true, false})
public void testHoodieAsyncClusteringJob(boolean shouldPassInClusteringInstantTime) throws Exception {
String tableBasePath = dfsBasePath + "/asyncClusteringJob";
HoodieDeltaStreamer ds = initialHoodieDeltaStreamer(tableBasePath, 3000, "true");
HoodieDeltaStreamer ds = initialHoodieDeltaStreamer(tableBasePath, 3000, "false");
CountDownLatch countDownLatch = new CountDownLatch(1);
deltaStreamerTestRunner(ds, (r) -> {
TestHelpers.assertAtLeastNCommits(2, tableBasePath, dfs);
countDownLatch.countDown();
return true;
});
if (countDownLatch.await(2, TimeUnit.MINUTES)) {
Option<String> scheduleClusteringInstantTime = Option.empty();
try {
HoodieClusteringJob scheduleClusteringJob =
@@ -1046,7 +1051,7 @@ public class TestHoodieDeltaStreamer extends HoodieDeltaStreamerTestBase {
scheduleClusteringInstantTime = scheduleClusteringJob.doSchedule();
} catch (Exception e) {
LOG.warn("Schedule clustering failed", e);
return false;
Assertions.fail("Schedule clustering failed", e);
}
if (scheduleClusteringInstantTime.isPresent()) {
LOG.info("Schedule clustering success, now cluster with instant time " + scheduleClusteringInstantTime.get());
@@ -1054,13 +1059,15 @@ public class TestHoodieDeltaStreamer extends HoodieDeltaStreamerTestBase {
shouldPassInClusteringInstantTime ? scheduleClusteringInstantTime.get() : null, false);
HoodieClusteringJob clusterClusteringJob = new HoodieClusteringJob(jsc, clusterClusteringConfig);
clusterClusteringJob.cluster(clusterClusteringConfig.retry);
TestHelpers.assertAtLeastNReplaceCommits(1, tableBasePath, dfs);
LOG.info("Cluster success");
} else {
LOG.warn("Schedule clustering failed");
LOG.warn("Clustering execution failed");
Assertions.fail("Clustering execution failed");
}
TestHelpers.assertAtLeastNReplaceCommits(2, tableBasePath, dfs);
return true;
});
} else {
Assertions.fail("Deltastreamer should have completed 2 commits.");
}
}
@Test