diff --git a/hudi-client/src/test/java/org/apache/hudi/common/HoodieTestDataGenerator.java b/hudi-client/src/test/java/org/apache/hudi/common/HoodieTestDataGenerator.java index e0d2a5397..6d86e9350 100644 --- a/hudi-client/src/test/java/org/apache/hudi/common/HoodieTestDataGenerator.java +++ b/hudi-client/src/test/java/org/apache/hudi/common/HoodieTestDataGenerator.java @@ -399,7 +399,6 @@ public class HoodieTestDataGenerator { */ public Stream generateUniqueUpdatesStream(String commitTime, Integer n) { final Set used = new HashSet<>(); - if (n > numExistingKeys) { throw new IllegalArgumentException("Requested unique updates is greater than number of available keys"); } @@ -429,24 +428,24 @@ public class HoodieTestDataGenerator { */ public Stream generateUniqueDeleteStream(Integer n) { final Set used = new HashSet<>(); - if (n > numExistingKeys) { throw new IllegalArgumentException("Requested unique deletes is greater than number of available keys"); } - return IntStream.range(0, n).boxed().map(i -> { - int index = numExistingKeys == 1 ? 0 : RAND.nextInt(numExistingKeys - 1); - KeyPartition kp = existingKeys.get(index); - // Find the available keyPartition starting from randomly chosen one. - while (used.contains(kp)) { + List result = new ArrayList<>(); + for (int i = 0; i < n; i++) { + int index = RAND.nextInt(numExistingKeys); + while (!existingKeys.containsKey(index)) { index = (index + 1) % numExistingKeys; - kp = existingKeys.get(index); } - existingKeys.remove(kp); + KeyPartition kp = existingKeys.remove(index); + existingKeys.put(index, existingKeys.get(numExistingKeys - 1)); + existingKeys.remove(numExistingKeys - 1); numExistingKeys--; used.add(kp); - return kp.key; - }); + result.add(kp.key); + } + return result.stream(); } /** @@ -458,28 +457,29 @@ public class HoodieTestDataGenerator { */ public Stream generateUniqueDeleteRecordStream(String commitTime, Integer n) { final Set used = new HashSet<>(); - if (n > numExistingKeys) { throw new IllegalArgumentException("Requested unique deletes is greater than number of available keys"); } - return IntStream.range(0, n).boxed().map(i -> { - int index = numExistingKeys == 1 ? 0 : RAND.nextInt(numExistingKeys - 1); - KeyPartition kp = existingKeys.get(index); - // Find the available keyPartition starting from randomly chosen one. - while (used.contains(kp)) { + List result = new ArrayList<>(); + for (int i = 0; i < n; i++) { + int index = RAND.nextInt(numExistingKeys); + while (!existingKeys.containsKey(index)) { index = (index + 1) % numExistingKeys; - kp = existingKeys.get(index); } - existingKeys.remove(kp); + // swap chosen index with last index and remove last entry. + KeyPartition kp = existingKeys.remove(index); + existingKeys.put(index, existingKeys.get(numExistingKeys - 1)); + existingKeys.remove(numExistingKeys - 1); numExistingKeys--; used.add(kp); try { - return new HoodieRecord(kp.key, generateRandomDeleteValue(kp.key, commitTime)); + result.add(new HoodieRecord(kp.key, generateRandomDeleteValue(kp.key, commitTime))); } catch (IOException e) { throw new HoodieIOException(e.getMessage(), e); } - }); + } + return result.stream(); } public String[] getPartitionPaths() { diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHoodieDeltaStreamer.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHoodieDeltaStreamer.java index 9d324dce2..100faa210 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHoodieDeltaStreamer.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHoodieDeltaStreamer.java @@ -422,8 +422,8 @@ public class TestHoodieDeltaStreamer extends UtilitiesTestBase { } else { TestHelpers.assertAtleastNCompactionCommits(5, tableBasePath, dfs); } - TestHelpers.assertRecordCount(totalRecords + 200, tableBasePath + "/*/*.parquet", sqlContext); - TestHelpers.assertDistanceCount(totalRecords + 200, tableBasePath + "/*/*.parquet", sqlContext); + TestHelpers.assertRecordCount(totalRecords, tableBasePath + "/*/*.parquet", sqlContext); + TestHelpers.assertDistanceCount(totalRecords, tableBasePath + "/*/*.parquet", sqlContext); return true; }, 180); ds.shutdownGracefully();