From b9b9b24993993051ba9378f4e8ed1b103d811795 Mon Sep 17 00:00:00 2001 From: Sunil Ramaiah Date: Wed, 25 Apr 2018 12:40:24 -0700 Subject: [PATCH] Added more comments and removed the extra new lines --- ...TestHoodieMergeHandleDuplicateRecords.java | 55 +++++++++---------- 1 file changed, 27 insertions(+), 28 deletions(-) diff --git a/hoodie-client/src/test/java/com/uber/hoodie/io/TestHoodieMergeHandleDuplicateRecords.java b/hoodie-client/src/test/java/com/uber/hoodie/io/TestHoodieMergeHandleDuplicateRecords.java index d199be72e..bbb826c10 100644 --- a/hoodie-client/src/test/java/com/uber/hoodie/io/TestHoodieMergeHandleDuplicateRecords.java +++ b/hoodie-client/src/test/java/com/uber/hoodie/io/TestHoodieMergeHandleDuplicateRecords.java @@ -18,6 +18,7 @@ package com.uber.hoodie.io; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotEquals; import com.uber.hoodie.HoodieWriteClient; import com.uber.hoodie.WriteStatus; @@ -87,47 +88,40 @@ public class TestHoodieMergeHandleDuplicateRecords { @Test public void testUpsertsForMultipleRecordsInSameFile() throws Exception { - // Create records in a single partition String partitionPath = HoodieTestDataGenerator.DEFAULT_PARTITION_PATHS[0]; dataGen = new HoodieTestDataGenerator(new String[]{partitionPath}); // Build a write config with bulkinsertparallelism set HoodieWriteConfig cfg = getConfigBuilder().build(); - HoodieWriteClient client = new HoodieWriteClient(jsc, cfg); FileSystem fs = FSUtils.getFs(basePath, jsc.hadoopConfiguration()); /** * Write 1 (only inserts) + * This will do a bulk insert of 44 records of which there are 2 records repeated 21 times each. + * id1 (21 records), id2 (21 records), id3, id4 */ String newCommitTime = "001"; client.startCommitWithTime(newCommitTime); - List records = dataGen.generateInserts(newCommitTime, 4); - HoodieRecord record1 = records.get(0); HoodieRecord record2 = records.get(1); - for (int i = 0; i < 20; i++) { HoodieRecord dup = dataGen.generateUpdateRecord(record1.getKey(), newCommitTime); records.add(dup); } - for (int i = 0; i < 20; i++) { HoodieRecord dup = dataGen.generateUpdateRecord(record2.getKey(), newCommitTime); records.add(dup); } - JavaRDD writeRecords = jsc.parallelize(records, 1); - List statuses = client.bulkInsert(writeRecords, newCommitTime).collect(); assertNoWriteErrors(statuses); // verify that there is a commit HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath); HoodieTimeline timeline = new HoodieActiveTimeline(metaClient).getCommitTimeline(); - assertEquals("Expecting a single commit.", 1, timeline.findInstantsAfter("000", Integer.MAX_VALUE).countInstants()); assertEquals("Latest commit should be 001", newCommitTime, timeline.lastInstant().get().getTimestamp()); assertEquals("Must contain 44 records", @@ -136,6 +130,10 @@ public class TestHoodieMergeHandleDuplicateRecords { /** * Write 2 (insert) + * This will do a bulk insert of 1 record with the same row_key as record1 in the previous insert - id1. + * At this point, we will have 2 files with the row_keys as shown here - + * File 1 - id1 (21 records), id2 (21 records), id3, id4 + * File 2 - id1 */ newCommitTime = "002"; client.startCommitWithTime(newCommitTime); @@ -144,16 +142,13 @@ public class TestHoodieMergeHandleDuplicateRecords { List newRecords = new ArrayList<>(); HoodieRecord sameAsRecord1 = dataGen.generateUpdateRecord(record1.getKey(), newCommitTime); newRecords.add(sameAsRecord1); - writeRecords = jsc.parallelize(newRecords, 1); - statuses = client.bulkInsert(writeRecords, newCommitTime).collect(); assertNoWriteErrors(statuses); // verify that there are 2 commits metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath); timeline = new HoodieActiveTimeline(metaClient).getCommitTimeline(); - assertEquals("Expecting two commits.", 2, timeline.findInstantsAfter("000", Integer.MAX_VALUE) .countInstants()); assertEquals("Latest commit should be 002", newCommitTime, timeline.lastInstant().get().getTimestamp()); @@ -162,21 +157,22 @@ public class TestHoodieMergeHandleDuplicateRecords { /** * Write 3 (insert) + * This will bulk insert 2 new completely new records. + * At this point, we will have 2 files with the row_keys as shown here - + * File 1 - id1 (21 records), id2 (21 records), id3, id4 + * File 2 - id1 + * File 3 - id5, id6 */ newCommitTime = "003"; client.startCommitWithTime(newCommitTime); - newRecords = dataGen.generateInserts(newCommitTime, 2); - writeRecords = jsc.parallelize(newRecords, 1); - statuses = client.bulkInsert(writeRecords, newCommitTime).collect(); assertNoWriteErrors(statuses); - // verify that there are not 3 commits + // verify that there are now 3 commits metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath); timeline = new HoodieActiveTimeline(metaClient).getCommitTimeline(); - assertEquals("Expecting three commits.", 3, timeline.findInstantsAfter("000", Integer.MAX_VALUE) .countInstants()); assertEquals("Latest commit should be 003", newCommitTime, timeline.lastInstant().get().getTimestamp()); @@ -185,10 +181,14 @@ public class TestHoodieMergeHandleDuplicateRecords { /** * Write 4 (updates) + * This will generate 2 upsert records with id1 and id2. The rider and driver names in the update records + * will be rider-004 and driver-004. + * After the upsert is complete, all the records with id1 in File 1 and File 2 must be updated, all the records + * with id2 in File 2 must also be updated. + * Also, none of the other records in File 1, File 2 and File 3 must be updated. */ newCommitTime = "004"; client.startCommitWithTime(newCommitTime); - List updateRecords = new ArrayList<>(); // This exists in 001 and 002 and should be updated in both @@ -198,10 +198,9 @@ public class TestHoodieMergeHandleDuplicateRecords { // This exists in 001 and should be updated HoodieRecord sameAsRecord2 = dataGen.generateUpdateRecord(record2.getKey(), newCommitTime); updateRecords.add(sameAsRecord2); - JavaRDD updateRecordsRDD = jsc.parallelize(updateRecords, 1); - statuses = client.upsert(updateRecordsRDD, newCommitTime).collect(); + // Verify there are no errors assertNoWriteErrors(statuses); @@ -211,32 +210,34 @@ public class TestHoodieMergeHandleDuplicateRecords { .countInstants()); assertEquals("Latest commit should be 004", timeline.lastInstant().get().getTimestamp(), newCommitTime); - metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath); - // Check the entire dataset has 47 records still dataSet = getRecords(); assertEquals("Must contain 47 records", 47, dataSet.count()); - Row[] rows = (Row[]) dataSet.collect(); int record1Count = 0; int record2Count = 0; for (Row row : rows) { if (row.getAs("_hoodie_record_key").equals(record1.getKey().getRecordKey())) { record1Count++; - // assert each duplicate record is updated assertEquals(row.getAs("rider"), "rider-004"); assertEquals(row.getAs("driver"), "driver-004"); } else if (row.getAs("_hoodie_record_key").equals(record2.getKey().getRecordKey())) { record2Count++; - // assert each duplicate record is updated assertEquals(row.getAs("rider"), "rider-004"); assertEquals(row.getAs("driver"), "driver-004"); + } else { + assertNotEquals(row.getAs("rider"), "rider-004"); + assertNotEquals(row.getAs("driver"), "rider-004"); } } - + // Assert that id1 record count which has been updated to rider-004 and driver-004 is 22, which is the total + // number of records with row_key id1 assertEquals(22, record1Count); + + // Assert that id2 record count which has been updated to rider-004 and driver-004 is 21, which is the total + // number of records with row_key id2 assertEquals(21, record2Count); } @@ -246,9 +247,7 @@ public class TestHoodieMergeHandleDuplicateRecords { for (int i = 0; i < fullPartitionPaths.length; i++) { fullPartitionPaths[i] = String.format("%s/%s/*", basePath, dataGen.getPartitionPaths()[i]); } - Dataset dataSet = HoodieClientTestUtils.read(basePath, sqlContext, fs, fullPartitionPaths); - return dataSet; }