Fixing UpsertPartitioner to ensure that input records are deterministically assigned to output partitions
This commit is contained in:
committed by
vinoth chandar
parent
e2d13c6305
commit
ec40d04d51
@@ -898,16 +898,16 @@ public class TestHoodieClient implements Serializable {
|
||||
|
||||
FileSystem fs = FSUtils.getFs();
|
||||
final String TEST_PARTITION_PATH = "2016/09/26";
|
||||
final int INSERT_SPLIT_LIMIT = 10;
|
||||
final int INSERT_SPLIT_LIMIT = 100;
|
||||
// setup the small file handling params
|
||||
HoodieWriteConfig config = getSmallInsertWriteConfig(INSERT_SPLIT_LIMIT); // hold upto 20 records max
|
||||
HoodieWriteConfig config = getSmallInsertWriteConfig(INSERT_SPLIT_LIMIT); // hold upto 200 records max
|
||||
dataGen = new HoodieTestDataGenerator(new String[] {TEST_PARTITION_PATH});
|
||||
|
||||
HoodieWriteClient client = new HoodieWriteClient(jsc, config);
|
||||
|
||||
// Inserts => will write file1
|
||||
String commitTime1 = "001";
|
||||
List<HoodieRecord> inserts1 = dataGen.generateInserts(commitTime1, INSERT_SPLIT_LIMIT); // this writes ~500kb
|
||||
List<HoodieRecord> inserts1 = dataGen.generateInserts(commitTime1, INSERT_SPLIT_LIMIT); // this writes ~5000kb
|
||||
Set<String> keys1 = HoodieClientTestUtils.getRecordKeys(inserts1);
|
||||
|
||||
JavaRDD<HoodieRecord> insertRecordsRDD1 = jsc.parallelize(inserts1, 1);
|
||||
@@ -917,13 +917,13 @@ public class TestHoodieClient implements Serializable {
|
||||
|
||||
assertEquals("Just 1 file needs to be added.", 1, statuses.size());
|
||||
String file1 = statuses.get(0).getFileId();
|
||||
assertEquals("file should contain 10 records",
|
||||
assertEquals("file should contain 100 records",
|
||||
ParquetUtils.readRowKeysFromParquet(new Path(basePath, TEST_PARTITION_PATH + "/" + FSUtils.makeDataFileName(commitTime1, 0, file1))).size(),
|
||||
10);
|
||||
100);
|
||||
|
||||
// Update + Inserts such that they just expand file1
|
||||
String commitTime2 = "002";
|
||||
List<HoodieRecord> inserts2 = dataGen.generateInserts(commitTime2, 4);
|
||||
List<HoodieRecord> inserts2 = dataGen.generateInserts(commitTime2, 40);
|
||||
Set<String> keys2 = HoodieClientTestUtils.getRecordKeys(inserts2);
|
||||
List<HoodieRecord> insertsAndUpdates2 = new ArrayList<>();
|
||||
insertsAndUpdates2.addAll(inserts2);
|
||||
@@ -937,7 +937,7 @@ public class TestHoodieClient implements Serializable {
|
||||
assertEquals("Existing file should be expanded", file1, statuses.get(0).getFileId());
|
||||
assertEquals("Existing file should be expanded", commitTime1, statuses.get(0).getStat().getPrevCommit());
|
||||
Path newFile = new Path(basePath, TEST_PARTITION_PATH + "/" + FSUtils.makeDataFileName(commitTime2, 0, file1));
|
||||
assertEquals("file should contain 14 records", ParquetUtils.readRowKeysFromParquet(newFile).size(), 14);
|
||||
assertEquals("file should contain 140 records", ParquetUtils.readRowKeysFromParquet(newFile).size(), 140);
|
||||
|
||||
List<GenericRecord> records = ParquetUtils.readAvroRecords(newFile);
|
||||
for (GenericRecord record: records) {
|
||||
@@ -948,7 +948,7 @@ public class TestHoodieClient implements Serializable {
|
||||
|
||||
// update + inserts such that file1 is updated and expanded, a new file2 is created.
|
||||
String commitTime3 = "003";
|
||||
List<HoodieRecord> insertsAndUpdates3 = dataGen.generateInserts(commitTime3, 20);
|
||||
List<HoodieRecord> insertsAndUpdates3 = dataGen.generateInserts(commitTime3, 200);
|
||||
Set<String> keys3 = HoodieClientTestUtils.getRecordKeys(insertsAndUpdates3);
|
||||
List<HoodieRecord> updates3 = dataGen.generateUpdates(commitTime3, inserts2);
|
||||
insertsAndUpdates3.addAll(updates3);
|
||||
@@ -999,15 +999,15 @@ public class TestHoodieClient implements Serializable {
|
||||
public void testSmallInsertHandlingForInserts() throws Exception {
|
||||
|
||||
final String TEST_PARTITION_PATH = "2016/09/26";
|
||||
final int INSERT_SPLIT_LIMIT = 10;
|
||||
final int INSERT_SPLIT_LIMIT = 100;
|
||||
// setup the small file handling params
|
||||
HoodieWriteConfig config = getSmallInsertWriteConfig(INSERT_SPLIT_LIMIT); // hold upto 20 records max
|
||||
HoodieWriteConfig config = getSmallInsertWriteConfig(INSERT_SPLIT_LIMIT); // hold upto 200 records max
|
||||
dataGen = new HoodieTestDataGenerator(new String[] {TEST_PARTITION_PATH});
|
||||
HoodieWriteClient client = new HoodieWriteClient(jsc, config);
|
||||
|
||||
// Inserts => will write file1
|
||||
String commitTime1 = "001";
|
||||
List<HoodieRecord> inserts1 = dataGen.generateInserts(commitTime1, INSERT_SPLIT_LIMIT); // this writes ~500kb
|
||||
List<HoodieRecord> inserts1 = dataGen.generateInserts(commitTime1, INSERT_SPLIT_LIMIT); // this writes ~5000kb
|
||||
Set<String> keys1 = HoodieClientTestUtils.getRecordKeys(inserts1);
|
||||
JavaRDD<HoodieRecord> insertRecordsRDD1 = jsc.parallelize(inserts1, 1);
|
||||
List<WriteStatus> statuses= client.insert(insertRecordsRDD1, commitTime1).collect();
|
||||
@@ -1017,13 +1017,13 @@ public class TestHoodieClient implements Serializable {
|
||||
|
||||
assertEquals("Just 1 file needs to be added.", 1, statuses.size());
|
||||
String file1 = statuses.get(0).getFileId();
|
||||
assertEquals("file should contain 10 records",
|
||||
assertEquals("file should contain 100 records",
|
||||
ParquetUtils.readRowKeysFromParquet(new Path(basePath, TEST_PARTITION_PATH + "/" + FSUtils.makeDataFileName(commitTime1, 0, file1))).size(),
|
||||
10);
|
||||
100);
|
||||
|
||||
// Second, set of Inserts should just expand file1
|
||||
String commitTime2 = "002";
|
||||
List<HoodieRecord> inserts2 = dataGen.generateInserts(commitTime2, 4);
|
||||
List<HoodieRecord> inserts2 = dataGen.generateInserts(commitTime2, 40);
|
||||
Set<String> keys2 = HoodieClientTestUtils.getRecordKeys(inserts2);
|
||||
JavaRDD<HoodieRecord> insertRecordsRDD2 = jsc.parallelize(inserts2, 1);
|
||||
statuses = client.insert(insertRecordsRDD2, commitTime2).collect();
|
||||
@@ -1033,7 +1033,7 @@ public class TestHoodieClient implements Serializable {
|
||||
assertEquals("Existing file should be expanded", file1, statuses.get(0).getFileId());
|
||||
assertEquals("Existing file should be expanded", commitTime1, statuses.get(0).getStat().getPrevCommit());
|
||||
Path newFile = new Path(basePath, TEST_PARTITION_PATH + "/" + FSUtils.makeDataFileName(commitTime2, 0, file1));
|
||||
assertEquals("file should contain 14 records", ParquetUtils.readRowKeysFromParquet(newFile).size(), 14);
|
||||
assertEquals("file should contain 140 records", ParquetUtils.readRowKeysFromParquet(newFile).size(), 140);
|
||||
|
||||
List<GenericRecord> records = ParquetUtils.readAvroRecords(newFile);
|
||||
for (GenericRecord record: records) {
|
||||
@@ -1045,7 +1045,7 @@ public class TestHoodieClient implements Serializable {
|
||||
|
||||
// Lots of inserts such that file1 is updated and expanded, a new file2 is created.
|
||||
String commitTime3 = "003";
|
||||
List<HoodieRecord> insert3 = dataGen.generateInserts(commitTime3, 20);
|
||||
List<HoodieRecord> insert3 = dataGen.generateInserts(commitTime3, 200);
|
||||
JavaRDD<HoodieRecord> insertRecordsRDD3 = jsc.parallelize(insert3, 1);
|
||||
statuses = client.insert(insertRecordsRDD3, commitTime3).collect();
|
||||
assertNoWriteErrors(statuses);
|
||||
|
||||
Reference in New Issue
Block a user