1
0

[HUDI-2936] Add data count checks in async clustering tests (#4236)

This commit is contained in:
Sagar Sumit
2021-12-10 19:55:37 +05:30
committed by GitHub
parent 456d74ce4e
commit c7473a7b0c

View File

@@ -257,6 +257,12 @@ public class TestHoodieDeltaStreamer extends HoodieDeltaStreamerTestBase {
assertEquals(expected, recordCount); assertEquals(expected, recordCount);
} }
static void assertDistinctRecordCount(long expected, String tablePath, SQLContext sqlContext) {
sqlContext.clearCache();
long recordCount = sqlContext.read().format("org.apache.hudi").load(tablePath).select("_hoodie_record_key").distinct().count();
assertEquals(expected, recordCount);
}
static List<Row> countsPerCommit(String tablePath, SQLContext sqlContext) { static List<Row> countsPerCommit(String tablePath, SQLContext sqlContext) {
sqlContext.clearCache(); sqlContext.clearCache();
List<Row> rows = sqlContext.read().format("org.apache.hudi").load(tablePath) List<Row> rows = sqlContext.read().format("org.apache.hudi").load(tablePath)
@@ -358,12 +364,12 @@ public class TestHoodieDeltaStreamer extends HoodieDeltaStreamerTestBase {
assertTrue(minExpected <= numDeltaCommits, "Got=" + numDeltaCommits + ", exp >=" + minExpected); assertTrue(minExpected <= numDeltaCommits, "Got=" + numDeltaCommits + ", exp >=" + minExpected);
} }
static void assertNoReplaceCommits(int expected, String tablePath, FileSystem fs) { static void assertNoReplaceCommits(String tablePath, FileSystem fs) {
HoodieTableMetaClient meta = HoodieTableMetaClient.builder().setConf(fs.getConf()).setBasePath(tablePath).setLoadActiveTimelineOnLoad(true).build(); HoodieTableMetaClient meta = HoodieTableMetaClient.builder().setConf(fs.getConf()).setBasePath(tablePath).setLoadActiveTimelineOnLoad(true).build();
HoodieTimeline timeline = meta.getActiveTimeline().getCompletedReplaceTimeline(); HoodieTimeline timeline = meta.getActiveTimeline().getCompletedReplaceTimeline();
LOG.info("Timeline Instants=" + meta.getActiveTimeline().getInstants().collect(Collectors.toList())); LOG.info("Timeline Instants=" + meta.getActiveTimeline().getInstants().collect(Collectors.toList()));
int numDeltaCommits = (int) timeline.getInstants().count(); int numDeltaCommits = (int) timeline.getInstants().count();
assertEquals(expected, numDeltaCommits, "Got=" + numDeltaCommits + ", exp =" + expected); assertEquals(0, numDeltaCommits, "Got=" + numDeltaCommits + ", exp =" + 0);
} }
static void assertAtLeastNReplaceRequests(int minExpected, String tablePath, FileSystem fs) { static void assertAtLeastNReplaceRequests(int minExpected, String tablePath, FileSystem fs) {
@@ -881,7 +887,7 @@ public class TestHoodieDeltaStreamer extends HoodieDeltaStreamerTestBase {
@ParameterizedTest @ParameterizedTest
@ValueSource(booleans = {true, false}) @ValueSource(booleans = {true, false})
public void testHoodieAsyncClusteringJob(boolean shouldPassInClusteringInstantTime) throws Exception { public void testHoodieAsyncClusteringJob(boolean shouldPassInClusteringInstantTime) throws Exception {
String tableBasePath = dfsBasePath + "/asyncClustering"; String tableBasePath = dfsBasePath + "/asyncClusteringJob";
HoodieDeltaStreamer ds = initialHoodieDeltaStreamer(tableBasePath, 3000, "true"); HoodieDeltaStreamer ds = initialHoodieDeltaStreamer(tableBasePath, 3000, "true");
@@ -916,19 +922,22 @@ public class TestHoodieDeltaStreamer extends HoodieDeltaStreamerTestBase {
public void testAsyncClusteringService() throws Exception { public void testAsyncClusteringService() throws Exception {
String tableBasePath = dfsBasePath + "/asyncClustering"; String tableBasePath = dfsBasePath + "/asyncClustering";
// Keep it higher than batch-size to test continuous mode // Keep it higher than batch-size to test continuous mode
int totalRecords = 3000; int totalRecords = 2000;
// Initial bulk insert // Initial bulk insert
HoodieDeltaStreamer.Config cfg = TestHelpers.makeConfig(tableBasePath, WriteOperationType.INSERT); HoodieDeltaStreamer.Config cfg = TestHelpers.makeConfig(tableBasePath, WriteOperationType.INSERT);
cfg.continuousMode = true; cfg.continuousMode = true;
cfg.tableType = HoodieTableType.COPY_ON_WRITE.name(); cfg.tableType = HoodieTableType.COPY_ON_WRITE.name();
cfg.configs.addAll(getAsyncServicesConfigs(totalRecords, "false", "", "", "true", "2")); cfg.configs.addAll(getAsyncServicesConfigs(totalRecords, "false", "", "", "true", "3"));
HoodieDeltaStreamer ds = new HoodieDeltaStreamer(cfg, jsc); HoodieDeltaStreamer ds = new HoodieDeltaStreamer(cfg, jsc);
deltaStreamerTestRunner(ds, cfg, (r) -> { deltaStreamerTestRunner(ds, cfg, (r) -> {
TestHelpers.assertAtLeastNCommits(2, tableBasePath, dfs); TestHelpers.assertAtLeastNReplaceCommits(1, tableBasePath, dfs);
TestHelpers.assertAtLeastNReplaceCommits(2, tableBasePath, dfs);
return true; return true;
}); });
// There should be 4 commits, one of which should be a replace commit
TestHelpers.assertAtLeastNCommits(4, tableBasePath, dfs);
TestHelpers.assertAtLeastNReplaceCommits(1, tableBasePath, dfs);
TestHelpers.assertDistinctRecordCount(totalRecords, tableBasePath + "/*/*.parquet", sqlContext);
} }
/** /**
@@ -941,40 +950,45 @@ public class TestHoodieDeltaStreamer extends HoodieDeltaStreamerTestBase {
public void testAsyncClusteringServiceWithConflicts() throws Exception { public void testAsyncClusteringServiceWithConflicts() throws Exception {
String tableBasePath = dfsBasePath + "/asyncClusteringWithConflicts"; String tableBasePath = dfsBasePath + "/asyncClusteringWithConflicts";
// Keep it higher than batch-size to test continuous mode // Keep it higher than batch-size to test continuous mode
int totalRecords = 3000; int totalRecords = 2000;
// Initial bulk insert // Initial bulk insert
HoodieDeltaStreamer.Config cfg = TestHelpers.makeConfig(tableBasePath, WriteOperationType.UPSERT); HoodieDeltaStreamer.Config cfg = TestHelpers.makeConfig(tableBasePath, WriteOperationType.UPSERT);
cfg.continuousMode = true; cfg.continuousMode = true;
cfg.tableType = HoodieTableType.COPY_ON_WRITE.name(); cfg.tableType = HoodieTableType.COPY_ON_WRITE.name();
cfg.configs.addAll(getAsyncServicesConfigs(totalRecords, "false", "", "", "true", "2")); cfg.configs.addAll(getAsyncServicesConfigs(totalRecords, "false", "", "", "true", "3"));
HoodieDeltaStreamer ds = new HoodieDeltaStreamer(cfg, jsc); HoodieDeltaStreamer ds = new HoodieDeltaStreamer(cfg, jsc);
deltaStreamerTestRunner(ds, cfg, (r) -> { deltaStreamerTestRunner(ds, cfg, (r) -> {
TestHelpers.assertAtLeastNCommits(2, tableBasePath, dfs); TestHelpers.assertAtLeastNReplaceCommits(1, tableBasePath, dfs);
TestHelpers.assertAtLeastNReplaceCommits(2, tableBasePath, dfs);
return true; return true;
}); });
// There should be 4 commits, one of which should be a replace commit
TestHelpers.assertAtLeastNCommits(4, tableBasePath, dfs);
TestHelpers.assertAtLeastNReplaceCommits(1, tableBasePath, dfs);
TestHelpers.assertDistinctRecordCount(1900, tableBasePath + "/*/*.parquet", sqlContext);
} }
@ParameterizedTest @Test
@ValueSource(strings = {"true", "false"}) public void testAsyncClusteringServiceWithCompaction() throws Exception {
public void testAsyncClusteringServiceWithCompaction(String preserveCommitMetadata) throws Exception {
String tableBasePath = dfsBasePath + "/asyncClusteringCompaction"; String tableBasePath = dfsBasePath + "/asyncClusteringCompaction";
// Keep it higher than batch-size to test continuous mode // Keep it higher than batch-size to test continuous mode
int totalRecords = 3000; int totalRecords = 2000;
// Initial bulk insert // Initial bulk insert
HoodieDeltaStreamer.Config cfg = TestHelpers.makeConfig(tableBasePath, WriteOperationType.INSERT); HoodieDeltaStreamer.Config cfg = TestHelpers.makeConfig(tableBasePath, WriteOperationType.INSERT);
cfg.continuousMode = true; cfg.continuousMode = true;
cfg.tableType = HoodieTableType.MERGE_ON_READ.name(); cfg.tableType = HoodieTableType.MERGE_ON_READ.name();
cfg.configs.addAll(getAsyncServicesConfigs(totalRecords, "false", "", "", "true", "2", preserveCommitMetadata)); cfg.configs.addAll(getAsyncServicesConfigs(totalRecords, "false", "", "", "true", "3"));
HoodieDeltaStreamer ds = new HoodieDeltaStreamer(cfg, jsc); HoodieDeltaStreamer ds = new HoodieDeltaStreamer(cfg, jsc);
deltaStreamerTestRunner(ds, cfg, (r) -> { deltaStreamerTestRunner(ds, cfg, (r) -> {
TestHelpers.assertAtLeastNCommits(2, tableBasePath, dfs);
TestHelpers.assertAtleastNCompactionCommits(2, tableBasePath, dfs); TestHelpers.assertAtleastNCompactionCommits(2, tableBasePath, dfs);
TestHelpers.assertAtLeastNReplaceCommits(2, tableBasePath, dfs); TestHelpers.assertAtLeastNReplaceCommits(1, tableBasePath, dfs);
return true; return true;
}); });
// There should be 4 commits, one of which should be a replace commit
TestHelpers.assertAtLeastNCommits(4, tableBasePath, dfs);
TestHelpers.assertAtLeastNReplaceCommits(1, tableBasePath, dfs);
TestHelpers.assertDistinctRecordCount(totalRecords, tableBasePath + "/*/*.parquet", sqlContext);
} }
@ParameterizedTest @ParameterizedTest
@@ -1057,11 +1071,11 @@ public class TestHoodieDeltaStreamer extends HoodieDeltaStreamerTestBase {
} }
case HoodieClusteringJob.SCHEDULE: { case HoodieClusteringJob.SCHEDULE: {
TestHelpers.assertAtLeastNReplaceRequests(2, tableBasePath, dfs); TestHelpers.assertAtLeastNReplaceRequests(2, tableBasePath, dfs);
TestHelpers.assertNoReplaceCommits(0, tableBasePath, dfs); TestHelpers.assertNoReplaceCommits(tableBasePath, dfs);
return true; return true;
} }
case HoodieClusteringJob.EXECUTE: { case HoodieClusteringJob.EXECUTE: {
TestHelpers.assertNoReplaceCommits(0, tableBasePath, dfs); TestHelpers.assertNoReplaceCommits(tableBasePath, dfs);
return true; return true;
} }
default: default: