From d0fde474589aaf17bb840dbf86e2809eddc60e1d Mon Sep 17 00:00:00 2001 From: Nishith Agarwal Date: Tue, 13 Nov 2018 15:42:36 -0800 Subject: [PATCH] Fixing number of insert buckets to be generated by rounding off to the closest greater integer --- .../hoodie/table/HoodieCopyOnWriteTable.java | 2 +- .../hoodie/table/TestCopyOnWriteTable.java | 66 ++++++++++++------- 2 files changed, 42 insertions(+), 26 deletions(-) diff --git a/hoodie-client/src/main/java/com/uber/hoodie/table/HoodieCopyOnWriteTable.java b/hoodie-client/src/main/java/com/uber/hoodie/table/HoodieCopyOnWriteTable.java index 3fa8a175a..67411627b 100644 --- a/hoodie-client/src/main/java/com/uber/hoodie/table/HoodieCopyOnWriteTable.java +++ b/hoodie-client/src/main/java/com/uber/hoodie/table/HoodieCopyOnWriteTable.java @@ -726,7 +726,7 @@ public class HoodieCopyOnWriteTable extends Hoodi insertRecordsPerBucket = config.getParquetMaxFileSize() / averageRecordSize; } - int insertBuckets = (int) Math.max(totalUnassignedInserts / insertRecordsPerBucket, 1L); + int insertBuckets = (int) Math.ceil((1.0 * totalUnassignedInserts) / insertRecordsPerBucket); logger.info( "After small file assignment: unassignedInserts => " + totalUnassignedInserts + ", totalInsertBuckets => " + insertBuckets + ", recordsPerBucket => " diff --git a/hoodie-client/src/test/java/com/uber/hoodie/table/TestCopyOnWriteTable.java b/hoodie-client/src/test/java/com/uber/hoodie/table/TestCopyOnWriteTable.java index b9be42b71..2536c47eb 100644 --- a/hoodie-client/src/test/java/com/uber/hoodie/table/TestCopyOnWriteTable.java +++ b/hoodie-client/src/test/java/com/uber/hoodie/table/TestCopyOnWriteTable.java @@ -40,6 +40,7 @@ import com.uber.hoodie.config.HoodieCompactionConfig; import com.uber.hoodie.config.HoodieStorageConfig; import com.uber.hoodie.config.HoodieWriteConfig; import com.uber.hoodie.io.HoodieCreateHandle; +import com.uber.hoodie.table.HoodieCopyOnWriteTable.UpsertPartitioner; import java.io.File; import java.util.ArrayList; import java.util.Arrays; @@ -401,9 +402,8 @@ public class TestCopyOnWriteTable { } - private List testUpsertPartitioner(int smallFileSize, int numInserts, - int numUpdates, int fileSize, boolean autoSplitInserts) throws Exception { - final String testPartitionPath = "2016/09/26"; + private UpsertPartitioner getUpsertPartitioner(int smallFileSize, int numInserts, + int numUpdates, int fileSize, String testPartitionPath, boolean autoSplitInserts) throws Exception { HoodieWriteConfig config = makeHoodieClientConfigBuilder().withCompactionConfig( HoodieCompactionConfig.newBuilder().compactionSmallFileSize(smallFileSize).insertSplitSize(100) .autoTuneInsertSplits(autoSplitInserts).build()).withStorageConfig( @@ -427,6 +427,31 @@ public class TestCopyOnWriteTable { WorkloadProfile profile = new WorkloadProfile(jsc.parallelize(records)); HoodieCopyOnWriteTable.UpsertPartitioner partitioner = (HoodieCopyOnWriteTable.UpsertPartitioner) table.getUpsertPartitioner(profile); + assertEquals("Update record should have gone to the 1 update partiton", 0, partitioner.getPartition( + new Tuple2<>(updateRecords.get(0).getKey(), Option.apply(updateRecords.get(0).getCurrentLocation())))); + return partitioner; + } + + + @Test + public void testUpsertPartitioner() throws Exception { + final String testPartitionPath = "2016/09/26"; + // Inserts + Updates... Check all updates go together & inserts subsplit + UpsertPartitioner partitioner = getUpsertPartitioner(0, 200, 100, 1024, + testPartitionPath, false); + List insertBuckets = partitioner.getInsertBuckets(testPartitionPath); + assertEquals("Total of 2 insert buckets", 2, insertBuckets.size()); + } + + + @Test + public void testUpsertPartitionerWithSmallInsertHandling() throws Exception { + final String testPartitionPath = "2016/09/26"; + // Inserts + Updates .. Check updates go together & inserts subsplit, after expanding + // smallest file + UpsertPartitioner partitioner = getUpsertPartitioner(1000 * 1024, 400, 100, 800 * 1024,testPartitionPath, + false); + List insertBuckets = partitioner.getInsertBuckets(testPartitionPath); assertEquals("Should have 3 partitions", 3, partitioner.numPartitions()); assertEquals("Bucket 0 is UPDATE", HoodieCopyOnWriteTable.BucketType.UPDATE, @@ -435,33 +460,24 @@ public class TestCopyOnWriteTable { partitioner.getBucketInfo(1).bucketType); assertEquals("Bucket 2 is INSERT", HoodieCopyOnWriteTable.BucketType.INSERT, partitioner.getBucketInfo(2).bucketType); - assertEquals("Update record should have gone to the 1 update partiton", 0, partitioner.getPartition( - new Tuple2<>(updateRecords.get(0).getKey(), Option.apply(updateRecords.get(0).getCurrentLocation())))); - return partitioner.getInsertBuckets(testPartitionPath); - } - - - @Test - public void testUpsertPartitioner() throws Exception { - // Inserts + Updates... Check all updates go together & inserts subsplit - List insertBuckets = testUpsertPartitioner(0, 200, 100, 1024, false); - assertEquals("Total of 2 insert buckets", 2, insertBuckets.size()); - } - - - @Test - public void testUpsertPartitionerWithSmallInsertHandling() throws Exception { - // Inserts + Updates .. Check updates go together & inserts subsplit, after expanding - // smallest file - List insertBuckets = testUpsertPartitioner(1000 * 1024, 400, 100, 800 * 1024, - false); assertEquals("Total of 3 insert buckets", 3, insertBuckets.size()); assertEquals("First insert bucket must be same as update bucket", 0, insertBuckets.get(0).bucketNumber); assertEquals("First insert bucket should have weight 0.5", 0.5, insertBuckets.get(0).weight, 0.01); // Now with insert split size auto tuned - insertBuckets = testUpsertPartitioner(1000 * 1024, 2400, 100, 800 * 1024, true); - assertEquals("Total of 3 insert buckets", 3, insertBuckets.size()); + partitioner = getUpsertPartitioner(1000 * 1024, 2400, 100, 800 * 1024, testPartitionPath, true); + insertBuckets = partitioner.getInsertBuckets(testPartitionPath); + + assertEquals("Should have 4 partitions", 4, partitioner.numPartitions()); + assertEquals("Bucket 0 is UPDATE", HoodieCopyOnWriteTable.BucketType.UPDATE, + partitioner.getBucketInfo(0).bucketType); + assertEquals("Bucket 1 is INSERT", HoodieCopyOnWriteTable.BucketType.INSERT, + partitioner.getBucketInfo(1).bucketType); + assertEquals("Bucket 2 is INSERT", HoodieCopyOnWriteTable.BucketType.INSERT, + partitioner.getBucketInfo(2).bucketType); + assertEquals("Bucket 3 is INSERT", HoodieCopyOnWriteTable.BucketType.INSERT, + partitioner.getBucketInfo(3).bucketType); + assertEquals("Total of 4 insert buckets", 4, insertBuckets.size()); assertEquals("First insert bucket must be same as update bucket", 0, insertBuckets.get(0).bucketNumber); assertEquals("First insert bucket should have weight 0.5", 200.0 / 2400, insertBuckets.get(0).weight, 0.01); }