1
0

[HUDI-667] Fixing delete tests for DeltaStreamer (#1395)

This commit is contained in:
Sivabalan Narayanan
2020-03-11 16:19:23 -07:00
committed by GitHub
parent dd7cf38a13
commit 1ca912af09
2 changed files with 23 additions and 23 deletions

View File

@@ -399,7 +399,6 @@ public class HoodieTestDataGenerator {
*/ */
public Stream<HoodieRecord> generateUniqueUpdatesStream(String commitTime, Integer n) { public Stream<HoodieRecord> generateUniqueUpdatesStream(String commitTime, Integer n) {
final Set<KeyPartition> used = new HashSet<>(); final Set<KeyPartition> used = new HashSet<>();
if (n > numExistingKeys) { if (n > numExistingKeys) {
throw new IllegalArgumentException("Requested unique updates is greater than number of available keys"); throw new IllegalArgumentException("Requested unique updates is greater than number of available keys");
} }
@@ -429,24 +428,24 @@ public class HoodieTestDataGenerator {
*/ */
public Stream<HoodieKey> generateUniqueDeleteStream(Integer n) { public Stream<HoodieKey> generateUniqueDeleteStream(Integer n) {
final Set<KeyPartition> used = new HashSet<>(); final Set<KeyPartition> used = new HashSet<>();
if (n > numExistingKeys) { if (n > numExistingKeys) {
throw new IllegalArgumentException("Requested unique deletes is greater than number of available keys"); throw new IllegalArgumentException("Requested unique deletes is greater than number of available keys");
} }
return IntStream.range(0, n).boxed().map(i -> { List<HoodieKey> result = new ArrayList<>();
int index = numExistingKeys == 1 ? 0 : RAND.nextInt(numExistingKeys - 1); for (int i = 0; i < n; i++) {
KeyPartition kp = existingKeys.get(index); int index = RAND.nextInt(numExistingKeys);
// Find the available keyPartition starting from randomly chosen one. while (!existingKeys.containsKey(index)) {
while (used.contains(kp)) {
index = (index + 1) % numExistingKeys; index = (index + 1) % numExistingKeys;
kp = existingKeys.get(index);
} }
existingKeys.remove(kp); KeyPartition kp = existingKeys.remove(index);
existingKeys.put(index, existingKeys.get(numExistingKeys - 1));
existingKeys.remove(numExistingKeys - 1);
numExistingKeys--; numExistingKeys--;
used.add(kp); used.add(kp);
return kp.key; result.add(kp.key);
}); }
return result.stream();
} }
/** /**
@@ -458,28 +457,29 @@ public class HoodieTestDataGenerator {
*/ */
public Stream<HoodieRecord> generateUniqueDeleteRecordStream(String commitTime, Integer n) { public Stream<HoodieRecord> generateUniqueDeleteRecordStream(String commitTime, Integer n) {
final Set<KeyPartition> used = new HashSet<>(); final Set<KeyPartition> used = new HashSet<>();
if (n > numExistingKeys) { if (n > numExistingKeys) {
throw new IllegalArgumentException("Requested unique deletes is greater than number of available keys"); throw new IllegalArgumentException("Requested unique deletes is greater than number of available keys");
} }
return IntStream.range(0, n).boxed().map(i -> { List<HoodieRecord> result = new ArrayList<>();
int index = numExistingKeys == 1 ? 0 : RAND.nextInt(numExistingKeys - 1); for (int i = 0; i < n; i++) {
KeyPartition kp = existingKeys.get(index); int index = RAND.nextInt(numExistingKeys);
// Find the available keyPartition starting from randomly chosen one. while (!existingKeys.containsKey(index)) {
while (used.contains(kp)) {
index = (index + 1) % numExistingKeys; index = (index + 1) % numExistingKeys;
kp = existingKeys.get(index);
} }
existingKeys.remove(kp); // swap chosen index with last index and remove last entry.
KeyPartition kp = existingKeys.remove(index);
existingKeys.put(index, existingKeys.get(numExistingKeys - 1));
existingKeys.remove(numExistingKeys - 1);
numExistingKeys--; numExistingKeys--;
used.add(kp); used.add(kp);
try { try {
return new HoodieRecord(kp.key, generateRandomDeleteValue(kp.key, commitTime)); result.add(new HoodieRecord(kp.key, generateRandomDeleteValue(kp.key, commitTime)));
} catch (IOException e) { } catch (IOException e) {
throw new HoodieIOException(e.getMessage(), e); throw new HoodieIOException(e.getMessage(), e);
} }
}); }
return result.stream();
} }
public String[] getPartitionPaths() { public String[] getPartitionPaths() {

View File

@@ -422,8 +422,8 @@ public class TestHoodieDeltaStreamer extends UtilitiesTestBase {
} else { } else {
TestHelpers.assertAtleastNCompactionCommits(5, tableBasePath, dfs); TestHelpers.assertAtleastNCompactionCommits(5, tableBasePath, dfs);
} }
TestHelpers.assertRecordCount(totalRecords + 200, tableBasePath + "/*/*.parquet", sqlContext); TestHelpers.assertRecordCount(totalRecords, tableBasePath + "/*/*.parquet", sqlContext);
TestHelpers.assertDistanceCount(totalRecords + 200, tableBasePath + "/*/*.parquet", sqlContext); TestHelpers.assertDistanceCount(totalRecords, tableBasePath + "/*/*.parquet", sqlContext);
return true; return true;
}, 180); }, 180);
ds.shutdownGracefully(); ds.shutdownGracefully();