Fixing number of insert buckets to be generated by rounding off to the closest greater integer
This commit is contained in:
committed by
vinoth chandar
parent
1362942aa3
commit
d0fde47458
@@ -726,7 +726,7 @@ public class HoodieCopyOnWriteTable<T extends HoodieRecordPayload> extends Hoodi
|
|||||||
insertRecordsPerBucket = config.getParquetMaxFileSize() / averageRecordSize;
|
insertRecordsPerBucket = config.getParquetMaxFileSize() / averageRecordSize;
|
||||||
}
|
}
|
||||||
|
|
||||||
int insertBuckets = (int) Math.max(totalUnassignedInserts / insertRecordsPerBucket, 1L);
|
int insertBuckets = (int) Math.ceil((1.0 * totalUnassignedInserts) / insertRecordsPerBucket);
|
||||||
logger.info(
|
logger.info(
|
||||||
"After small file assignment: unassignedInserts => " + totalUnassignedInserts
|
"After small file assignment: unassignedInserts => " + totalUnassignedInserts
|
||||||
+ ", totalInsertBuckets => " + insertBuckets + ", recordsPerBucket => "
|
+ ", totalInsertBuckets => " + insertBuckets + ", recordsPerBucket => "
|
||||||
|
|||||||
@@ -40,6 +40,7 @@ import com.uber.hoodie.config.HoodieCompactionConfig;
|
|||||||
import com.uber.hoodie.config.HoodieStorageConfig;
|
import com.uber.hoodie.config.HoodieStorageConfig;
|
||||||
import com.uber.hoodie.config.HoodieWriteConfig;
|
import com.uber.hoodie.config.HoodieWriteConfig;
|
||||||
import com.uber.hoodie.io.HoodieCreateHandle;
|
import com.uber.hoodie.io.HoodieCreateHandle;
|
||||||
|
import com.uber.hoodie.table.HoodieCopyOnWriteTable.UpsertPartitioner;
|
||||||
import java.io.File;
|
import java.io.File;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
@@ -401,9 +402,8 @@ public class TestCopyOnWriteTable {
|
|||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
private List<HoodieCopyOnWriteTable.InsertBucket> testUpsertPartitioner(int smallFileSize, int numInserts,
|
private UpsertPartitioner getUpsertPartitioner(int smallFileSize, int numInserts,
|
||||||
int numUpdates, int fileSize, boolean autoSplitInserts) throws Exception {
|
int numUpdates, int fileSize, String testPartitionPath, boolean autoSplitInserts) throws Exception {
|
||||||
final String testPartitionPath = "2016/09/26";
|
|
||||||
HoodieWriteConfig config = makeHoodieClientConfigBuilder().withCompactionConfig(
|
HoodieWriteConfig config = makeHoodieClientConfigBuilder().withCompactionConfig(
|
||||||
HoodieCompactionConfig.newBuilder().compactionSmallFileSize(smallFileSize).insertSplitSize(100)
|
HoodieCompactionConfig.newBuilder().compactionSmallFileSize(smallFileSize).insertSplitSize(100)
|
||||||
.autoTuneInsertSplits(autoSplitInserts).build()).withStorageConfig(
|
.autoTuneInsertSplits(autoSplitInserts).build()).withStorageConfig(
|
||||||
@@ -427,6 +427,31 @@ public class TestCopyOnWriteTable {
|
|||||||
WorkloadProfile profile = new WorkloadProfile(jsc.parallelize(records));
|
WorkloadProfile profile = new WorkloadProfile(jsc.parallelize(records));
|
||||||
HoodieCopyOnWriteTable.UpsertPartitioner partitioner =
|
HoodieCopyOnWriteTable.UpsertPartitioner partitioner =
|
||||||
(HoodieCopyOnWriteTable.UpsertPartitioner) table.getUpsertPartitioner(profile);
|
(HoodieCopyOnWriteTable.UpsertPartitioner) table.getUpsertPartitioner(profile);
|
||||||
|
assertEquals("Update record should have gone to the 1 update partiton", 0, partitioner.getPartition(
|
||||||
|
new Tuple2<>(updateRecords.get(0).getKey(), Option.apply(updateRecords.get(0).getCurrentLocation()))));
|
||||||
|
return partitioner;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testUpsertPartitioner() throws Exception {
|
||||||
|
final String testPartitionPath = "2016/09/26";
|
||||||
|
// Inserts + Updates... Check all updates go together & inserts subsplit
|
||||||
|
UpsertPartitioner partitioner = getUpsertPartitioner(0, 200, 100, 1024,
|
||||||
|
testPartitionPath, false);
|
||||||
|
List<HoodieCopyOnWriteTable.InsertBucket> insertBuckets = partitioner.getInsertBuckets(testPartitionPath);
|
||||||
|
assertEquals("Total of 2 insert buckets", 2, insertBuckets.size());
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testUpsertPartitionerWithSmallInsertHandling() throws Exception {
|
||||||
|
final String testPartitionPath = "2016/09/26";
|
||||||
|
// Inserts + Updates .. Check updates go together & inserts subsplit, after expanding
|
||||||
|
// smallest file
|
||||||
|
UpsertPartitioner partitioner = getUpsertPartitioner(1000 * 1024, 400, 100, 800 * 1024,testPartitionPath,
|
||||||
|
false);
|
||||||
|
List<HoodieCopyOnWriteTable.InsertBucket> insertBuckets = partitioner.getInsertBuckets(testPartitionPath);
|
||||||
|
|
||||||
assertEquals("Should have 3 partitions", 3, partitioner.numPartitions());
|
assertEquals("Should have 3 partitions", 3, partitioner.numPartitions());
|
||||||
assertEquals("Bucket 0 is UPDATE", HoodieCopyOnWriteTable.BucketType.UPDATE,
|
assertEquals("Bucket 0 is UPDATE", HoodieCopyOnWriteTable.BucketType.UPDATE,
|
||||||
@@ -435,33 +460,24 @@ public class TestCopyOnWriteTable {
|
|||||||
partitioner.getBucketInfo(1).bucketType);
|
partitioner.getBucketInfo(1).bucketType);
|
||||||
assertEquals("Bucket 2 is INSERT", HoodieCopyOnWriteTable.BucketType.INSERT,
|
assertEquals("Bucket 2 is INSERT", HoodieCopyOnWriteTable.BucketType.INSERT,
|
||||||
partitioner.getBucketInfo(2).bucketType);
|
partitioner.getBucketInfo(2).bucketType);
|
||||||
assertEquals("Update record should have gone to the 1 update partiton", 0, partitioner.getPartition(
|
|
||||||
new Tuple2<>(updateRecords.get(0).getKey(), Option.apply(updateRecords.get(0).getCurrentLocation()))));
|
|
||||||
return partitioner.getInsertBuckets(testPartitionPath);
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
@Test
|
|
||||||
public void testUpsertPartitioner() throws Exception {
|
|
||||||
// Inserts + Updates... Check all updates go together & inserts subsplit
|
|
||||||
List<HoodieCopyOnWriteTable.InsertBucket> insertBuckets = testUpsertPartitioner(0, 200, 100, 1024, false);
|
|
||||||
assertEquals("Total of 2 insert buckets", 2, insertBuckets.size());
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
@Test
|
|
||||||
public void testUpsertPartitionerWithSmallInsertHandling() throws Exception {
|
|
||||||
// Inserts + Updates .. Check updates go together & inserts subsplit, after expanding
|
|
||||||
// smallest file
|
|
||||||
List<HoodieCopyOnWriteTable.InsertBucket> insertBuckets = testUpsertPartitioner(1000 * 1024, 400, 100, 800 * 1024,
|
|
||||||
false);
|
|
||||||
assertEquals("Total of 3 insert buckets", 3, insertBuckets.size());
|
assertEquals("Total of 3 insert buckets", 3, insertBuckets.size());
|
||||||
assertEquals("First insert bucket must be same as update bucket", 0, insertBuckets.get(0).bucketNumber);
|
assertEquals("First insert bucket must be same as update bucket", 0, insertBuckets.get(0).bucketNumber);
|
||||||
assertEquals("First insert bucket should have weight 0.5", 0.5, insertBuckets.get(0).weight, 0.01);
|
assertEquals("First insert bucket should have weight 0.5", 0.5, insertBuckets.get(0).weight, 0.01);
|
||||||
|
|
||||||
// Now with insert split size auto tuned
|
// Now with insert split size auto tuned
|
||||||
insertBuckets = testUpsertPartitioner(1000 * 1024, 2400, 100, 800 * 1024, true);
|
partitioner = getUpsertPartitioner(1000 * 1024, 2400, 100, 800 * 1024, testPartitionPath, true);
|
||||||
assertEquals("Total of 3 insert buckets", 3, insertBuckets.size());
|
insertBuckets = partitioner.getInsertBuckets(testPartitionPath);
|
||||||
|
|
||||||
|
assertEquals("Should have 4 partitions", 4, partitioner.numPartitions());
|
||||||
|
assertEquals("Bucket 0 is UPDATE", HoodieCopyOnWriteTable.BucketType.UPDATE,
|
||||||
|
partitioner.getBucketInfo(0).bucketType);
|
||||||
|
assertEquals("Bucket 1 is INSERT", HoodieCopyOnWriteTable.BucketType.INSERT,
|
||||||
|
partitioner.getBucketInfo(1).bucketType);
|
||||||
|
assertEquals("Bucket 2 is INSERT", HoodieCopyOnWriteTable.BucketType.INSERT,
|
||||||
|
partitioner.getBucketInfo(2).bucketType);
|
||||||
|
assertEquals("Bucket 3 is INSERT", HoodieCopyOnWriteTable.BucketType.INSERT,
|
||||||
|
partitioner.getBucketInfo(3).bucketType);
|
||||||
|
assertEquals("Total of 4 insert buckets", 4, insertBuckets.size());
|
||||||
assertEquals("First insert bucket must be same as update bucket", 0, insertBuckets.get(0).bucketNumber);
|
assertEquals("First insert bucket must be same as update bucket", 0, insertBuckets.get(0).bucketNumber);
|
||||||
assertEquals("First insert bucket should have weight 0.5", 200.0 / 2400, insertBuckets.get(0).weight, 0.01);
|
assertEquals("First insert bucket should have weight 0.5", 200.0 / 2400, insertBuckets.get(0).weight, 0.01);
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user