1
0

[MINOR] Fix flaky testUpsertsUpdatePartitionPath* tests (#1863)

This commit is contained in:
vinoth chandar
2020-07-22 19:52:34 -07:00
committed by GitHub
parent 5b6026ba43
commit 9bd37ef291

View File

@@ -445,8 +445,7 @@ public class TestHoodieClientOnCopyOnWriteStorage extends HoodieClientTestBase {
@ParameterizedTest @ParameterizedTest
@EnumSource(value = IndexType.class, names = {"GLOBAL_BLOOM", "GLOBAL_SIMPLE"}) @EnumSource(value = IndexType.class, names = {"GLOBAL_BLOOM", "GLOBAL_SIMPLE"})
public void testUpsertsUpdatePartitionPathGlobalBloom(IndexType indexType) throws Exception { public void testUpsertsUpdatePartitionPathGlobalBloom(IndexType indexType) throws Exception {
testUpsertsUpdatePartitionPath(indexType, getConfig(), testUpsertsUpdatePartitionPath(indexType, getConfig(), HoodieWriteClient::upsert);
HoodieWriteClient::upsert);
} }
/** /**
@@ -494,14 +493,15 @@ public class TestHoodieClientOnCopyOnWriteStorage extends HoodieClientTestBase {
} }
JavaRDD<HoodieRecord> writeRecords = jsc.parallelize(records, 1); JavaRDD<HoodieRecord> writeRecords = jsc.parallelize(records, 1);
JavaRDD<WriteStatus> result = writeFn.apply(client, writeRecords, newCommitTime); JavaRDD<WriteStatus> result = writeFn.apply(client, writeRecords, newCommitTime);
List<WriteStatus> statuses = result.collect(); result.collect();
// Check the entire dataset has all records // Check the entire dataset has all records
String[] fullPartitionPaths = getFullPartitionPaths(); String[] fullPartitionPaths = getFullPartitionPaths();
assertPartitionPathRecordKeys(expectedPartitionPathRecKeyPairs, fullPartitionPaths); assertPartitionPathRecordKeys(expectedPartitionPathRecKeyPairs, fullPartitionPaths);
// verify one basefile per partition // verify one basefile per partition
Map<String, Integer> baseFileCounts = getBaseFileCounts(fullPartitionPaths); String[] fullExpectedPartitionPaths = getFullPartitionPaths(expectedPartitionPathRecKeyPairs.stream().map(Pair::getLeft).toArray(String[]::new));
Map<String, Integer> baseFileCounts = getBaseFileCounts(fullExpectedPartitionPaths);
for (Map.Entry<String, Integer> entry : baseFileCounts.entrySet()) { for (Map.Entry<String, Integer> entry : baseFileCounts.entrySet()) {
assertEquals(1, entry.getValue()); assertEquals(1, entry.getValue());
} }
@@ -560,7 +560,7 @@ public class TestHoodieClientOnCopyOnWriteStorage extends HoodieClientTestBase {
writeRecords = jsc.parallelize(recordsToUpsert, 1); writeRecords = jsc.parallelize(recordsToUpsert, 1);
result = writeFn.apply(client, writeRecords, newCommitTime); result = writeFn.apply(client, writeRecords, newCommitTime);
statuses = result.collect(); result.collect();
// Check the entire dataset has all records // Check the entire dataset has all records
fullPartitionPaths = getFullPartitionPaths(); fullPartitionPaths = getFullPartitionPaths();
@@ -589,9 +589,13 @@ public class TestHoodieClientOnCopyOnWriteStorage extends HoodieClientTestBase {
} }
private String[] getFullPartitionPaths() { private String[] getFullPartitionPaths() {
String[] fullPartitionPaths = new String[dataGen.getPartitionPaths().length]; return getFullPartitionPaths(dataGen.getPartitionPaths());
}
private String[] getFullPartitionPaths(String[] relativePartitionPaths) {
String[] fullPartitionPaths = new String[relativePartitionPaths.length];
for (int i = 0; i < fullPartitionPaths.length; i++) { for (int i = 0; i < fullPartitionPaths.length; i++) {
fullPartitionPaths[i] = String.format("%s/%s/*", basePath, dataGen.getPartitionPaths()[i]); fullPartitionPaths[i] = String.format("%s/%s/*", basePath, relativePartitionPaths[i]);
} }
return fullPartitionPaths; return fullPartitionPaths;
} }