diff --git a/hoodie-client/src/main/java/com/uber/hoodie/table/HoodieCopyOnWriteTable.java b/hoodie-client/src/main/java/com/uber/hoodie/table/HoodieCopyOnWriteTable.java index 23fec955d..cc0a3ff79 100644 --- a/hoodie-client/src/main/java/com/uber/hoodie/table/HoodieCopyOnWriteTable.java +++ b/hoodie-client/src/main/java/com/uber/hoodie/table/HoodieCopyOnWriteTable.java @@ -16,37 +16,26 @@ package com.uber.hoodie.table; -import com.uber.hoodie.common.model.HoodieCompactionMetadata; -import com.uber.hoodie.common.model.HoodieDataFile; -import com.uber.hoodie.common.table.HoodieTableMetaClient; -import com.uber.hoodie.common.table.HoodieTimeline; -import com.uber.hoodie.common.table.timeline.HoodieInstant; -import com.uber.hoodie.config.HoodieWriteConfig; +import com.google.common.hash.Hashing; import com.uber.hoodie.WriteStatus; import com.uber.hoodie.common.HoodieCleanStat; import com.uber.hoodie.common.model.HoodieCommitMetadata; +import com.uber.hoodie.common.model.HoodieCompactionMetadata; +import com.uber.hoodie.common.model.HoodieDataFile; import com.uber.hoodie.common.model.HoodieKey; import com.uber.hoodie.common.model.HoodieRecord; import com.uber.hoodie.common.model.HoodieRecordLocation; import com.uber.hoodie.common.model.HoodieRecordPayload; +import com.uber.hoodie.common.table.HoodieTableMetaClient; +import com.uber.hoodie.common.table.HoodieTimeline; +import com.uber.hoodie.common.table.timeline.HoodieInstant; import com.uber.hoodie.common.util.FSUtils; +import com.uber.hoodie.config.HoodieWriteConfig; import com.uber.hoodie.exception.HoodieIOException; import com.uber.hoodie.exception.HoodieUpsertException; import com.uber.hoodie.func.LazyInsertIterable; import com.uber.hoodie.io.HoodieCleanHelper; import com.uber.hoodie.io.HoodieMergeHandle; - -import java.util.Optional; -import org.apache.avro.generic.GenericRecord; -import org.apache.avro.generic.IndexedRecord; -import org.apache.hadoop.conf.Configuration; -import org.apache.log4j.LogManager; -import org.apache.log4j.Logger; -import org.apache.parquet.avro.AvroParquetReader; -import org.apache.parquet.avro.AvroReadSupport; -import org.apache.parquet.hadoop.ParquetReader; -import org.apache.spark.Partitioner; - import java.io.IOException; import java.io.Serializable; import java.util.ArrayList; @@ -56,13 +45,11 @@ import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Optional; -import java.util.Random; import java.util.Set; import java.util.stream.Collectors; import org.apache.avro.generic.GenericRecord; import org.apache.avro.generic.IndexedRecord; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; @@ -73,8 +60,8 @@ import org.apache.spark.Partitioner; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.Function2; import org.apache.spark.api.java.function.PairFlatMapFunction; -import scala.Tuple2; import scala.Option; +import scala.Tuple2; /** * Implementation of a very heavily read-optimized Hoodie Table where @@ -90,12 +77,6 @@ public class HoodieCopyOnWriteTable extends Hoodi super(config, metaClient); } - - - // seed for random number generator. No particular significance, just makes testing deterministic - private static final long RANDOM_NUMBER_SEED = 356374L; - - private static Logger logger = LogManager.getLogger(HoodieCopyOnWriteTable.class); enum BucketType { @@ -169,6 +150,11 @@ public class HoodieCopyOnWriteTable extends Hoodi */ private int totalBuckets = 0; + /** + * Stat for the current workload. Helps in determining total inserts, upserts etc. + */ + private WorkloadStat globalStat; + /** * Helps decide which bucket an incoming update should go to. */ @@ -187,17 +173,11 @@ public class HoodieCopyOnWriteTable extends Hoodi */ private HashMap bucketInfoMap; - - /** - * Random number generator to use for splitting inserts into buckets by weight - */ - private Random rand = new Random(RANDOM_NUMBER_SEED); - - UpsertPartitioner(WorkloadProfile profile) { updateLocationToBucket = new HashMap<>(); partitionPathToInsertBuckets = new HashMap<>(); bucketInfoMap = new HashMap<>(); + globalStat = profile.getGlobalStat(); assignUpdates(profile); assignInserts(profile); @@ -379,7 +359,9 @@ public class HoodieCopyOnWriteTable extends Hoodi List targetBuckets = partitionPathToInsertBuckets.get(keyLocation._1().getPartitionPath()); // pick the target bucket to use based on the weights. double totalWeight = 0.0; - double r = rand.nextDouble(); + final long totalInserts = Math.max(1, globalStat.getNumInserts()); + final double r = 1.0 * Math.floorMod(Hashing.md5().hashString(keyLocation._1().getRecordKey()).asLong(), + totalInserts) / totalInserts; for (InsertBucket insertBucket: targetBuckets) { totalWeight += insertBucket.weight; if (r <= totalWeight) { diff --git a/hoodie-client/src/test/java/com/uber/hoodie/TestHoodieClient.java b/hoodie-client/src/test/java/com/uber/hoodie/TestHoodieClient.java index fadb9255f..45ebbd2d0 100644 --- a/hoodie-client/src/test/java/com/uber/hoodie/TestHoodieClient.java +++ b/hoodie-client/src/test/java/com/uber/hoodie/TestHoodieClient.java @@ -898,16 +898,16 @@ public class TestHoodieClient implements Serializable { FileSystem fs = FSUtils.getFs(); final String TEST_PARTITION_PATH = "2016/09/26"; - final int INSERT_SPLIT_LIMIT = 10; + final int INSERT_SPLIT_LIMIT = 100; // setup the small file handling params - HoodieWriteConfig config = getSmallInsertWriteConfig(INSERT_SPLIT_LIMIT); // hold upto 20 records max + HoodieWriteConfig config = getSmallInsertWriteConfig(INSERT_SPLIT_LIMIT); // hold upto 200 records max dataGen = new HoodieTestDataGenerator(new String[] {TEST_PARTITION_PATH}); HoodieWriteClient client = new HoodieWriteClient(jsc, config); // Inserts => will write file1 String commitTime1 = "001"; - List inserts1 = dataGen.generateInserts(commitTime1, INSERT_SPLIT_LIMIT); // this writes ~500kb + List inserts1 = dataGen.generateInserts(commitTime1, INSERT_SPLIT_LIMIT); // this writes ~5000kb Set keys1 = HoodieClientTestUtils.getRecordKeys(inserts1); JavaRDD insertRecordsRDD1 = jsc.parallelize(inserts1, 1); @@ -917,13 +917,13 @@ public class TestHoodieClient implements Serializable { assertEquals("Just 1 file needs to be added.", 1, statuses.size()); String file1 = statuses.get(0).getFileId(); - assertEquals("file should contain 10 records", + assertEquals("file should contain 100 records", ParquetUtils.readRowKeysFromParquet(new Path(basePath, TEST_PARTITION_PATH + "/" + FSUtils.makeDataFileName(commitTime1, 0, file1))).size(), - 10); + 100); // Update + Inserts such that they just expand file1 String commitTime2 = "002"; - List inserts2 = dataGen.generateInserts(commitTime2, 4); + List inserts2 = dataGen.generateInserts(commitTime2, 40); Set keys2 = HoodieClientTestUtils.getRecordKeys(inserts2); List insertsAndUpdates2 = new ArrayList<>(); insertsAndUpdates2.addAll(inserts2); @@ -937,7 +937,7 @@ public class TestHoodieClient implements Serializable { assertEquals("Existing file should be expanded", file1, statuses.get(0).getFileId()); assertEquals("Existing file should be expanded", commitTime1, statuses.get(0).getStat().getPrevCommit()); Path newFile = new Path(basePath, TEST_PARTITION_PATH + "/" + FSUtils.makeDataFileName(commitTime2, 0, file1)); - assertEquals("file should contain 14 records", ParquetUtils.readRowKeysFromParquet(newFile).size(), 14); + assertEquals("file should contain 140 records", ParquetUtils.readRowKeysFromParquet(newFile).size(), 140); List records = ParquetUtils.readAvroRecords(newFile); for (GenericRecord record: records) { @@ -948,7 +948,7 @@ public class TestHoodieClient implements Serializable { // update + inserts such that file1 is updated and expanded, a new file2 is created. String commitTime3 = "003"; - List insertsAndUpdates3 = dataGen.generateInserts(commitTime3, 20); + List insertsAndUpdates3 = dataGen.generateInserts(commitTime3, 200); Set keys3 = HoodieClientTestUtils.getRecordKeys(insertsAndUpdates3); List updates3 = dataGen.generateUpdates(commitTime3, inserts2); insertsAndUpdates3.addAll(updates3); @@ -999,15 +999,15 @@ public class TestHoodieClient implements Serializable { public void testSmallInsertHandlingForInserts() throws Exception { final String TEST_PARTITION_PATH = "2016/09/26"; - final int INSERT_SPLIT_LIMIT = 10; + final int INSERT_SPLIT_LIMIT = 100; // setup the small file handling params - HoodieWriteConfig config = getSmallInsertWriteConfig(INSERT_SPLIT_LIMIT); // hold upto 20 records max + HoodieWriteConfig config = getSmallInsertWriteConfig(INSERT_SPLIT_LIMIT); // hold upto 200 records max dataGen = new HoodieTestDataGenerator(new String[] {TEST_PARTITION_PATH}); HoodieWriteClient client = new HoodieWriteClient(jsc, config); // Inserts => will write file1 String commitTime1 = "001"; - List inserts1 = dataGen.generateInserts(commitTime1, INSERT_SPLIT_LIMIT); // this writes ~500kb + List inserts1 = dataGen.generateInserts(commitTime1, INSERT_SPLIT_LIMIT); // this writes ~5000kb Set keys1 = HoodieClientTestUtils.getRecordKeys(inserts1); JavaRDD insertRecordsRDD1 = jsc.parallelize(inserts1, 1); List statuses= client.insert(insertRecordsRDD1, commitTime1).collect(); @@ -1017,13 +1017,13 @@ public class TestHoodieClient implements Serializable { assertEquals("Just 1 file needs to be added.", 1, statuses.size()); String file1 = statuses.get(0).getFileId(); - assertEquals("file should contain 10 records", + assertEquals("file should contain 100 records", ParquetUtils.readRowKeysFromParquet(new Path(basePath, TEST_PARTITION_PATH + "/" + FSUtils.makeDataFileName(commitTime1, 0, file1))).size(), - 10); + 100); // Second, set of Inserts should just expand file1 String commitTime2 = "002"; - List inserts2 = dataGen.generateInserts(commitTime2, 4); + List inserts2 = dataGen.generateInserts(commitTime2, 40); Set keys2 = HoodieClientTestUtils.getRecordKeys(inserts2); JavaRDD insertRecordsRDD2 = jsc.parallelize(inserts2, 1); statuses = client.insert(insertRecordsRDD2, commitTime2).collect(); @@ -1033,7 +1033,7 @@ public class TestHoodieClient implements Serializable { assertEquals("Existing file should be expanded", file1, statuses.get(0).getFileId()); assertEquals("Existing file should be expanded", commitTime1, statuses.get(0).getStat().getPrevCommit()); Path newFile = new Path(basePath, TEST_PARTITION_PATH + "/" + FSUtils.makeDataFileName(commitTime2, 0, file1)); - assertEquals("file should contain 14 records", ParquetUtils.readRowKeysFromParquet(newFile).size(), 14); + assertEquals("file should contain 140 records", ParquetUtils.readRowKeysFromParquet(newFile).size(), 140); List records = ParquetUtils.readAvroRecords(newFile); for (GenericRecord record: records) { @@ -1045,7 +1045,7 @@ public class TestHoodieClient implements Serializable { // Lots of inserts such that file1 is updated and expanded, a new file2 is created. String commitTime3 = "003"; - List insert3 = dataGen.generateInserts(commitTime3, 20); + List insert3 = dataGen.generateInserts(commitTime3, 200); JavaRDD insertRecordsRDD3 = jsc.parallelize(insert3, 1); statuses = client.insert(insertRecordsRDD3, commitTime3).collect(); assertNoWriteErrors(statuses);