Added more comments and removed the extra new lines
This commit is contained in:
committed by
vinoth chandar
parent
4d1fba24c9
commit
b9b9b24993
@@ -18,6 +18,7 @@ package com.uber.hoodie.io;
|
|||||||
|
|
||||||
import static org.junit.Assert.assertEquals;
|
import static org.junit.Assert.assertEquals;
|
||||||
import static org.junit.Assert.assertFalse;
|
import static org.junit.Assert.assertFalse;
|
||||||
|
import static org.junit.Assert.assertNotEquals;
|
||||||
|
|
||||||
import com.uber.hoodie.HoodieWriteClient;
|
import com.uber.hoodie.HoodieWriteClient;
|
||||||
import com.uber.hoodie.WriteStatus;
|
import com.uber.hoodie.WriteStatus;
|
||||||
@@ -87,47 +88,40 @@ public class TestHoodieMergeHandleDuplicateRecords {
|
|||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testUpsertsForMultipleRecordsInSameFile() throws Exception {
|
public void testUpsertsForMultipleRecordsInSameFile() throws Exception {
|
||||||
|
|
||||||
// Create records in a single partition
|
// Create records in a single partition
|
||||||
String partitionPath = HoodieTestDataGenerator.DEFAULT_PARTITION_PATHS[0];
|
String partitionPath = HoodieTestDataGenerator.DEFAULT_PARTITION_PATHS[0];
|
||||||
dataGen = new HoodieTestDataGenerator(new String[]{partitionPath});
|
dataGen = new HoodieTestDataGenerator(new String[]{partitionPath});
|
||||||
|
|
||||||
// Build a write config with bulkinsertparallelism set
|
// Build a write config with bulkinsertparallelism set
|
||||||
HoodieWriteConfig cfg = getConfigBuilder().build();
|
HoodieWriteConfig cfg = getConfigBuilder().build();
|
||||||
|
|
||||||
HoodieWriteClient client = new HoodieWriteClient(jsc, cfg);
|
HoodieWriteClient client = new HoodieWriteClient(jsc, cfg);
|
||||||
FileSystem fs = FSUtils.getFs(basePath, jsc.hadoopConfiguration());
|
FileSystem fs = FSUtils.getFs(basePath, jsc.hadoopConfiguration());
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Write 1 (only inserts)
|
* Write 1 (only inserts)
|
||||||
|
* This will do a bulk insert of 44 records of which there are 2 records repeated 21 times each.
|
||||||
|
* id1 (21 records), id2 (21 records), id3, id4
|
||||||
*/
|
*/
|
||||||
String newCommitTime = "001";
|
String newCommitTime = "001";
|
||||||
client.startCommitWithTime(newCommitTime);
|
client.startCommitWithTime(newCommitTime);
|
||||||
|
|
||||||
List<HoodieRecord> records = dataGen.generateInserts(newCommitTime, 4);
|
List<HoodieRecord> records = dataGen.generateInserts(newCommitTime, 4);
|
||||||
|
|
||||||
HoodieRecord record1 = records.get(0);
|
HoodieRecord record1 = records.get(0);
|
||||||
HoodieRecord record2 = records.get(1);
|
HoodieRecord record2 = records.get(1);
|
||||||
|
|
||||||
for (int i = 0; i < 20; i++) {
|
for (int i = 0; i < 20; i++) {
|
||||||
HoodieRecord dup = dataGen.generateUpdateRecord(record1.getKey(), newCommitTime);
|
HoodieRecord dup = dataGen.generateUpdateRecord(record1.getKey(), newCommitTime);
|
||||||
records.add(dup);
|
records.add(dup);
|
||||||
}
|
}
|
||||||
|
|
||||||
for (int i = 0; i < 20; i++) {
|
for (int i = 0; i < 20; i++) {
|
||||||
HoodieRecord dup = dataGen.generateUpdateRecord(record2.getKey(), newCommitTime);
|
HoodieRecord dup = dataGen.generateUpdateRecord(record2.getKey(), newCommitTime);
|
||||||
records.add(dup);
|
records.add(dup);
|
||||||
}
|
}
|
||||||
|
|
||||||
JavaRDD<HoodieRecord> writeRecords = jsc.parallelize(records, 1);
|
JavaRDD<HoodieRecord> writeRecords = jsc.parallelize(records, 1);
|
||||||
|
|
||||||
List<WriteStatus> statuses = client.bulkInsert(writeRecords, newCommitTime).collect();
|
List<WriteStatus> statuses = client.bulkInsert(writeRecords, newCommitTime).collect();
|
||||||
assertNoWriteErrors(statuses);
|
assertNoWriteErrors(statuses);
|
||||||
|
|
||||||
// verify that there is a commit
|
// verify that there is a commit
|
||||||
HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath);
|
HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath);
|
||||||
HoodieTimeline timeline = new HoodieActiveTimeline(metaClient).getCommitTimeline();
|
HoodieTimeline timeline = new HoodieActiveTimeline(metaClient).getCommitTimeline();
|
||||||
|
|
||||||
assertEquals("Expecting a single commit.", 1, timeline.findInstantsAfter("000", Integer.MAX_VALUE).countInstants());
|
assertEquals("Expecting a single commit.", 1, timeline.findInstantsAfter("000", Integer.MAX_VALUE).countInstants());
|
||||||
assertEquals("Latest commit should be 001", newCommitTime, timeline.lastInstant().get().getTimestamp());
|
assertEquals("Latest commit should be 001", newCommitTime, timeline.lastInstant().get().getTimestamp());
|
||||||
assertEquals("Must contain 44 records",
|
assertEquals("Must contain 44 records",
|
||||||
@@ -136,6 +130,10 @@ public class TestHoodieMergeHandleDuplicateRecords {
|
|||||||
|
|
||||||
/**
|
/**
|
||||||
* Write 2 (insert)
|
* Write 2 (insert)
|
||||||
|
* This will do a bulk insert of 1 record with the same row_key as record1 in the previous insert - id1.
|
||||||
|
* At this point, we will have 2 files with the row_keys as shown here -
|
||||||
|
* File 1 - id1 (21 records), id2 (21 records), id3, id4
|
||||||
|
* File 2 - id1
|
||||||
*/
|
*/
|
||||||
newCommitTime = "002";
|
newCommitTime = "002";
|
||||||
client.startCommitWithTime(newCommitTime);
|
client.startCommitWithTime(newCommitTime);
|
||||||
@@ -144,16 +142,13 @@ public class TestHoodieMergeHandleDuplicateRecords {
|
|||||||
List<HoodieRecord> newRecords = new ArrayList<>();
|
List<HoodieRecord> newRecords = new ArrayList<>();
|
||||||
HoodieRecord sameAsRecord1 = dataGen.generateUpdateRecord(record1.getKey(), newCommitTime);
|
HoodieRecord sameAsRecord1 = dataGen.generateUpdateRecord(record1.getKey(), newCommitTime);
|
||||||
newRecords.add(sameAsRecord1);
|
newRecords.add(sameAsRecord1);
|
||||||
|
|
||||||
writeRecords = jsc.parallelize(newRecords, 1);
|
writeRecords = jsc.parallelize(newRecords, 1);
|
||||||
|
|
||||||
statuses = client.bulkInsert(writeRecords, newCommitTime).collect();
|
statuses = client.bulkInsert(writeRecords, newCommitTime).collect();
|
||||||
assertNoWriteErrors(statuses);
|
assertNoWriteErrors(statuses);
|
||||||
|
|
||||||
// verify that there are 2 commits
|
// verify that there are 2 commits
|
||||||
metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath);
|
metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath);
|
||||||
timeline = new HoodieActiveTimeline(metaClient).getCommitTimeline();
|
timeline = new HoodieActiveTimeline(metaClient).getCommitTimeline();
|
||||||
|
|
||||||
assertEquals("Expecting two commits.", 2, timeline.findInstantsAfter("000", Integer.MAX_VALUE)
|
assertEquals("Expecting two commits.", 2, timeline.findInstantsAfter("000", Integer.MAX_VALUE)
|
||||||
.countInstants());
|
.countInstants());
|
||||||
assertEquals("Latest commit should be 002", newCommitTime, timeline.lastInstant().get().getTimestamp());
|
assertEquals("Latest commit should be 002", newCommitTime, timeline.lastInstant().get().getTimestamp());
|
||||||
@@ -162,21 +157,22 @@ public class TestHoodieMergeHandleDuplicateRecords {
|
|||||||
|
|
||||||
/**
|
/**
|
||||||
* Write 3 (insert)
|
* Write 3 (insert)
|
||||||
|
* This will bulk insert 2 new completely new records.
|
||||||
|
* At this point, we will have 2 files with the row_keys as shown here -
|
||||||
|
* File 1 - id1 (21 records), id2 (21 records), id3, id4
|
||||||
|
* File 2 - id1
|
||||||
|
* File 3 - id5, id6
|
||||||
*/
|
*/
|
||||||
newCommitTime = "003";
|
newCommitTime = "003";
|
||||||
client.startCommitWithTime(newCommitTime);
|
client.startCommitWithTime(newCommitTime);
|
||||||
|
|
||||||
newRecords = dataGen.generateInserts(newCommitTime, 2);
|
newRecords = dataGen.generateInserts(newCommitTime, 2);
|
||||||
|
|
||||||
writeRecords = jsc.parallelize(newRecords, 1);
|
writeRecords = jsc.parallelize(newRecords, 1);
|
||||||
|
|
||||||
statuses = client.bulkInsert(writeRecords, newCommitTime).collect();
|
statuses = client.bulkInsert(writeRecords, newCommitTime).collect();
|
||||||
assertNoWriteErrors(statuses);
|
assertNoWriteErrors(statuses);
|
||||||
|
|
||||||
// verify that there are not 3 commits
|
// verify that there are now 3 commits
|
||||||
metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath);
|
metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath);
|
||||||
timeline = new HoodieActiveTimeline(metaClient).getCommitTimeline();
|
timeline = new HoodieActiveTimeline(metaClient).getCommitTimeline();
|
||||||
|
|
||||||
assertEquals("Expecting three commits.", 3, timeline.findInstantsAfter("000", Integer.MAX_VALUE)
|
assertEquals("Expecting three commits.", 3, timeline.findInstantsAfter("000", Integer.MAX_VALUE)
|
||||||
.countInstants());
|
.countInstants());
|
||||||
assertEquals("Latest commit should be 003", newCommitTime, timeline.lastInstant().get().getTimestamp());
|
assertEquals("Latest commit should be 003", newCommitTime, timeline.lastInstant().get().getTimestamp());
|
||||||
@@ -185,10 +181,14 @@ public class TestHoodieMergeHandleDuplicateRecords {
|
|||||||
|
|
||||||
/**
|
/**
|
||||||
* Write 4 (updates)
|
* Write 4 (updates)
|
||||||
|
* This will generate 2 upsert records with id1 and id2. The rider and driver names in the update records
|
||||||
|
* will be rider-004 and driver-004.
|
||||||
|
* After the upsert is complete, all the records with id1 in File 1 and File 2 must be updated, all the records
|
||||||
|
* with id2 in File 2 must also be updated.
|
||||||
|
* Also, none of the other records in File 1, File 2 and File 3 must be updated.
|
||||||
*/
|
*/
|
||||||
newCommitTime = "004";
|
newCommitTime = "004";
|
||||||
client.startCommitWithTime(newCommitTime);
|
client.startCommitWithTime(newCommitTime);
|
||||||
|
|
||||||
List<HoodieRecord> updateRecords = new ArrayList<>();
|
List<HoodieRecord> updateRecords = new ArrayList<>();
|
||||||
|
|
||||||
// This exists in 001 and 002 and should be updated in both
|
// This exists in 001 and 002 and should be updated in both
|
||||||
@@ -198,10 +198,9 @@ public class TestHoodieMergeHandleDuplicateRecords {
|
|||||||
// This exists in 001 and should be updated
|
// This exists in 001 and should be updated
|
||||||
HoodieRecord sameAsRecord2 = dataGen.generateUpdateRecord(record2.getKey(), newCommitTime);
|
HoodieRecord sameAsRecord2 = dataGen.generateUpdateRecord(record2.getKey(), newCommitTime);
|
||||||
updateRecords.add(sameAsRecord2);
|
updateRecords.add(sameAsRecord2);
|
||||||
|
|
||||||
JavaRDD<HoodieRecord> updateRecordsRDD = jsc.parallelize(updateRecords, 1);
|
JavaRDD<HoodieRecord> updateRecordsRDD = jsc.parallelize(updateRecords, 1);
|
||||||
|
|
||||||
statuses = client.upsert(updateRecordsRDD, newCommitTime).collect();
|
statuses = client.upsert(updateRecordsRDD, newCommitTime).collect();
|
||||||
|
|
||||||
// Verify there are no errors
|
// Verify there are no errors
|
||||||
assertNoWriteErrors(statuses);
|
assertNoWriteErrors(statuses);
|
||||||
|
|
||||||
@@ -211,32 +210,34 @@ public class TestHoodieMergeHandleDuplicateRecords {
|
|||||||
.countInstants());
|
.countInstants());
|
||||||
assertEquals("Latest commit should be 004", timeline.lastInstant().get().getTimestamp(), newCommitTime);
|
assertEquals("Latest commit should be 004", timeline.lastInstant().get().getTimestamp(), newCommitTime);
|
||||||
|
|
||||||
metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath);
|
|
||||||
|
|
||||||
// Check the entire dataset has 47 records still
|
// Check the entire dataset has 47 records still
|
||||||
dataSet = getRecords();
|
dataSet = getRecords();
|
||||||
assertEquals("Must contain 47 records", 47, dataSet.count());
|
assertEquals("Must contain 47 records", 47, dataSet.count());
|
||||||
|
|
||||||
Row[] rows = (Row[]) dataSet.collect();
|
Row[] rows = (Row[]) dataSet.collect();
|
||||||
int record1Count = 0;
|
int record1Count = 0;
|
||||||
int record2Count = 0;
|
int record2Count = 0;
|
||||||
for (Row row : rows) {
|
for (Row row : rows) {
|
||||||
if (row.getAs("_hoodie_record_key").equals(record1.getKey().getRecordKey())) {
|
if (row.getAs("_hoodie_record_key").equals(record1.getKey().getRecordKey())) {
|
||||||
record1Count++;
|
record1Count++;
|
||||||
|
|
||||||
// assert each duplicate record is updated
|
// assert each duplicate record is updated
|
||||||
assertEquals(row.getAs("rider"), "rider-004");
|
assertEquals(row.getAs("rider"), "rider-004");
|
||||||
assertEquals(row.getAs("driver"), "driver-004");
|
assertEquals(row.getAs("driver"), "driver-004");
|
||||||
} else if (row.getAs("_hoodie_record_key").equals(record2.getKey().getRecordKey())) {
|
} else if (row.getAs("_hoodie_record_key").equals(record2.getKey().getRecordKey())) {
|
||||||
record2Count++;
|
record2Count++;
|
||||||
|
|
||||||
// assert each duplicate record is updated
|
// assert each duplicate record is updated
|
||||||
assertEquals(row.getAs("rider"), "rider-004");
|
assertEquals(row.getAs("rider"), "rider-004");
|
||||||
assertEquals(row.getAs("driver"), "driver-004");
|
assertEquals(row.getAs("driver"), "driver-004");
|
||||||
|
} else {
|
||||||
|
assertNotEquals(row.getAs("rider"), "rider-004");
|
||||||
|
assertNotEquals(row.getAs("driver"), "rider-004");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
// Assert that id1 record count which has been updated to rider-004 and driver-004 is 22, which is the total
|
||||||
|
// number of records with row_key id1
|
||||||
assertEquals(22, record1Count);
|
assertEquals(22, record1Count);
|
||||||
|
|
||||||
|
// Assert that id2 record count which has been updated to rider-004 and driver-004 is 21, which is the total
|
||||||
|
// number of records with row_key id2
|
||||||
assertEquals(21, record2Count);
|
assertEquals(21, record2Count);
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -246,9 +247,7 @@ public class TestHoodieMergeHandleDuplicateRecords {
|
|||||||
for (int i = 0; i < fullPartitionPaths.length; i++) {
|
for (int i = 0; i < fullPartitionPaths.length; i++) {
|
||||||
fullPartitionPaths[i] = String.format("%s/%s/*", basePath, dataGen.getPartitionPaths()[i]);
|
fullPartitionPaths[i] = String.format("%s/%s/*", basePath, dataGen.getPartitionPaths()[i]);
|
||||||
}
|
}
|
||||||
|
|
||||||
Dataset<Row> dataSet = HoodieClientTestUtils.read(basePath, sqlContext, fs, fullPartitionPaths);
|
Dataset<Row> dataSet = HoodieClientTestUtils.read(basePath, sqlContext, fs, fullPartitionPaths);
|
||||||
|
|
||||||
return dataSet;
|
return dataSet;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user