From 4074c5eb234f643ed0d79efff090138b50ad99ea Mon Sep 17 00:00:00 2001 From: Vinoth Chandar Date: Tue, 21 May 2019 18:59:10 -0700 Subject: [PATCH] Fixed HUDI-116 : Handle duplicate record keys across partitions - Join based on HoodieKey and not RecordKey during tagging - Unit tests changed to run with duplicate keys - Special casing GlobalBloom to still join by recordkey --- .../hoodie/index/bloom/HoodieBloomIndex.java | 74 +++++++++---------- .../bloom/HoodieBloomIndexCheckFunction.java | 4 +- .../index/bloom/HoodieGlobalBloomIndex.java | 23 +++++- .../hoodie/index/bloom/KeyLookupResult.java | 8 +- .../index/bloom/TestHoodieBloomIndex.java | 63 ++++------------ 5 files changed, 82 insertions(+), 90 deletions(-) diff --git a/hoodie-client/src/main/java/com/uber/hoodie/index/bloom/HoodieBloomIndex.java b/hoodie-client/src/main/java/com/uber/hoodie/index/bloom/HoodieBloomIndex.java index 8dc15c1c6..d8b08f739 100644 --- a/hoodie-client/src/main/java/com/uber/hoodie/index/bloom/HoodieBloomIndex.java +++ b/hoodie-client/src/main/java/com/uber/hoodie/index/bloom/HoodieBloomIndex.java @@ -44,7 +44,6 @@ import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.stream.Collectors; - import org.apache.hadoop.fs.Path; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; @@ -86,25 +85,25 @@ public class HoodieBloomIndex extends HoodieIndex .mapToPair(record -> new Tuple2<>(record.getPartitionPath(), record.getRecordKey())); // Lookup indexes for all the partition/recordkey pair - JavaPairRDD rowKeyFilenamePairRDD = lookupIndex(partitionRecordKeyPairRDD, jsc, hoodieTable); + JavaPairRDD keyFilenamePairRDD = lookupIndex(partitionRecordKeyPairRDD, jsc, hoodieTable); // Cache the result, for subsequent stages. if (config.getBloomIndexUseCaching()) { - rowKeyFilenamePairRDD.persist(StorageLevel.MEMORY_AND_DISK_SER()); + keyFilenamePairRDD.persist(StorageLevel.MEMORY_AND_DISK_SER()); } if (logger.isDebugEnabled()) { - long totalTaggedRecords = rowKeyFilenamePairRDD.count(); + long totalTaggedRecords = keyFilenamePairRDD.count(); logger.debug("Number of update records (ones tagged with a fileID): " + totalTaggedRecords); } // Step 4: Tag the incoming records, as inserts or updates, by joining with existing record keys // Cost: 4 sec. - JavaRDD> taggedRecordRDD = tagLocationBacktoRecords(rowKeyFilenamePairRDD, + JavaRDD> taggedRecordRDD = tagLocationBacktoRecords(keyFilenamePairRDD, recordRDD); if (config.getBloomIndexUseCaching()) { recordRDD.unpersist(); // unpersist the input Record RDD - rowKeyFilenamePairRDD.unpersist(); + keyFilenamePairRDD.unpersist(); } return taggedRecordRDD; @@ -116,23 +115,21 @@ public class HoodieBloomIndex extends HoodieIndex .mapToPair(key -> new Tuple2<>(key.getPartitionPath(), key.getRecordKey())); // Lookup indexes for all the partition/recordkey pair - JavaPairRDD rowKeyFilenamePairRDD = lookupIndex(partitionRecordKeyPairRDD, jsc, hoodieTable); + JavaPairRDD keyFilenamePairRDD = lookupIndex(partitionRecordKeyPairRDD, jsc, hoodieTable); + JavaPairRDD keyHoodieKeyPairRDD = hoodieKeys.mapToPair(key -> new Tuple2<>(key, null)); - JavaPairRDD rowKeyHoodieKeyPairRDD = hoodieKeys - .mapToPair(key -> new Tuple2<>(key.getRecordKey(), key)); - - return rowKeyHoodieKeyPairRDD.leftOuterJoin(rowKeyFilenamePairRDD).mapToPair(keyPathTuple -> { + return keyHoodieKeyPairRDD.leftOuterJoin(keyFilenamePairRDD).mapToPair(keyPathTuple -> { Optional recordLocationPath; if (keyPathTuple._2._2.isPresent()) { String fileName = keyPathTuple._2._2.get(); - String partitionPath = keyPathTuple._2._1.getPartitionPath(); + String partitionPath = keyPathTuple._1.getPartitionPath(); recordLocationPath = Optional .of(new Path(new Path(hoodieTable.getMetaClient().getBasePath(), partitionPath), fileName) .toUri().getPath()); } else { recordLocationPath = Optional.absent(); } - return new Tuple2<>(keyPathTuple._2._1, recordLocationPath); + return new Tuple2<>(keyPathTuple._1, recordLocationPath); }); } @@ -140,7 +137,7 @@ public class HoodieBloomIndex extends HoodieIndex * Lookup the location for each record key and return the pair for all record keys already * present and drop the record keys if not present */ - private JavaPairRDD lookupIndex( + private JavaPairRDD lookupIndex( JavaPairRDD partitionRecordKeyPairRDD, final JavaSparkContext jsc, final HoodieTable hoodieTable) { // Obtain records per partition, in the incoming records @@ -327,7 +324,7 @@ public class HoodieBloomIndex extends HoodieIndex * parallelism for tagging location */ @VisibleForTesting - JavaPairRDD findMatchingFilesForRecordKeys( + JavaPairRDD findMatchingFilesForRecordKeys( final Map> partitionToFileIndexInfo, JavaPairRDD partitionRecordKeyPairRDD, int shuffleParallelism, HoodieTableMetaClient metaClient, Map fileGroupToComparisons) { @@ -354,38 +351,41 @@ public class HoodieBloomIndex extends HoodieIndex .flatMap(List::iterator) .filter(lr -> lr.getMatchingRecordKeys().size() > 0) .flatMapToPair(lookupResult -> lookupResult.getMatchingRecordKeys().stream() - .map(recordKey -> new Tuple2<>(recordKey, lookupResult.getFileName())) + .map(recordKey -> new Tuple2<>(new HoodieKey(recordKey, lookupResult.getPartitionPath()), + lookupResult.getFileName())) .collect(Collectors.toList()) .iterator()); } + HoodieRecord getTaggedRecord(HoodieRecord inputRecord, org.apache.spark.api.java.Optional location) { + HoodieRecord record = inputRecord; + if (location.isPresent()) { + // When you have a record in multiple files in the same partition, then rowKeyRecordPairRDD + // will have 2 entries with the same exact in memory copy of the HoodieRecord and the 2 + // separate filenames that the record is found in. This will result in setting + // currentLocation 2 times and it will fail the second time. So creating a new in memory + // copy of the hoodie record. + record = new HoodieRecord<>(inputRecord); + String filename = location.get(); + if (filename != null && !filename.isEmpty()) { + record.setCurrentLocation(new HoodieRecordLocation(FSUtils.getCommitTime(filename), + FSUtils.getFileId(filename))); + } + } + return record; + } + /** * Tag the back to the original HoodieRecord RDD. */ - private JavaRDD> tagLocationBacktoRecords( - JavaPairRDD rowKeyFilenamePairRDD, JavaRDD> recordRDD) { - JavaPairRDD> rowKeyRecordPairRDD = recordRDD - .mapToPair(record -> new Tuple2<>(record.getRecordKey(), record)); + protected JavaRDD> tagLocationBacktoRecords( + JavaPairRDD keyFilenamePairRDD, JavaRDD> recordRDD) { + JavaPairRDD> keyRecordPairRDD = recordRDD + .mapToPair(record -> new Tuple2<>(record.getKey(), record)); // Here as the recordRDD might have more data than rowKeyRDD (some rowKeys' fileId is null), // so we do left outer join. - return rowKeyRecordPairRDD.leftOuterJoin(rowKeyFilenamePairRDD).values().map(v1 -> { - HoodieRecord record = v1._1(); - if (v1._2().isPresent()) { - // When you have a record in multiple files in the same partition, then rowKeyRecordPairRDD - // will have 2 entries with the same exact in memory copy of the HoodieRecord and the 2 - // separate filenames that the record is found in. This will result in setting - // currentLocation 2 times and it will fail the second time. So creating a new in memory - // copy of the hoodie record. - record = new HoodieRecord<>(v1._1()); - String filename = v1._2().get(); - if (filename != null && !filename.isEmpty()) { - record.setCurrentLocation(new HoodieRecordLocation(FSUtils.getCommitTime(filename), - FSUtils.getFileId(filename))); - } - } - return record; - }); + return keyRecordPairRDD.leftOuterJoin(keyFilenamePairRDD).values().map(v1 -> getTaggedRecord(v1._1, v1._2)); } @Override diff --git a/hoodie-client/src/main/java/com/uber/hoodie/index/bloom/HoodieBloomIndexCheckFunction.java b/hoodie-client/src/main/java/com/uber/hoodie/index/bloom/HoodieBloomIndexCheckFunction.java index 4595658cc..818e66b3f 100644 --- a/hoodie-client/src/main/java/com/uber/hoodie/index/bloom/HoodieBloomIndexCheckFunction.java +++ b/hoodie-client/src/main/java/com/uber/hoodie/index/bloom/HoodieBloomIndexCheckFunction.java @@ -177,7 +177,7 @@ public class HoodieBloomIndexCheckFunction implements checkAndAddCandidates(recordKey); } else { // do the actual checking of file & break out - ret.add(new KeyLookupResult(currentFile, checkAgainstCurrentFile())); + ret.add(new KeyLookupResult(currentFile, currentPartitionPath, checkAgainstCurrentFile())); initState(fileName, partitionPath); checkAndAddCandidates(recordKey); break; @@ -186,7 +186,7 @@ public class HoodieBloomIndexCheckFunction implements // handle case, where we ran out of input, close pending work, update return val if (!inputItr.hasNext()) { - ret.add(new KeyLookupResult(currentFile, checkAgainstCurrentFile())); + ret.add(new KeyLookupResult(currentFile, currentPartitionPath, checkAgainstCurrentFile())); } } catch (Throwable e) { if (e instanceof HoodieException) { diff --git a/hoodie-client/src/main/java/com/uber/hoodie/index/bloom/HoodieGlobalBloomIndex.java b/hoodie-client/src/main/java/com/uber/hoodie/index/bloom/HoodieGlobalBloomIndex.java index 7bf85cb54..bb09b0ed6 100644 --- a/hoodie-client/src/main/java/com/uber/hoodie/index/bloom/HoodieGlobalBloomIndex.java +++ b/hoodie-client/src/main/java/com/uber/hoodie/index/bloom/HoodieGlobalBloomIndex.java @@ -20,6 +20,7 @@ package com.uber.hoodie.index.bloom; import com.google.common.annotations.VisibleForTesting; import com.uber.hoodie.common.model.HoodieKey; +import com.uber.hoodie.common.model.HoodieRecord; import com.uber.hoodie.common.model.HoodieRecordPayload; import com.uber.hoodie.common.table.HoodieTableMetaClient; import com.uber.hoodie.common.util.FSUtils; @@ -32,7 +33,6 @@ import java.util.List; import java.util.Map; import java.util.Map.Entry; import java.util.stream.Collectors; - import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; @@ -99,4 +99,25 @@ public class HoodieGlobalBloomIndex extends Hoodi .collect(Collectors.toList()); }).flatMap(List::iterator); } + + + /** + * Tagging for global index should only consider the record key + */ + @Override + protected JavaRDD> tagLocationBacktoRecords( + JavaPairRDD keyFilenamePairRDD, JavaRDD> recordRDD) { + JavaPairRDD> rowKeyRecordPairRDD = recordRDD + .mapToPair(record -> new Tuple2<>(record.getRecordKey(), record)); + + // Here as the recordRDD might have more data than rowKeyRDD (some rowKeys' fileId is null), + // so we do left outer join. + return rowKeyRecordPairRDD.leftOuterJoin(keyFilenamePairRDD.mapToPair(p -> new Tuple2<>(p._1.getRecordKey(), p._2))) + .values().map(value -> getTaggedRecord(value._1, value._2)); + } + + @Override + public boolean isGlobal() { + return true; + } } diff --git a/hoodie-client/src/main/java/com/uber/hoodie/index/bloom/KeyLookupResult.java b/hoodie-client/src/main/java/com/uber/hoodie/index/bloom/KeyLookupResult.java index e713c8b6d..6fc270385 100644 --- a/hoodie-client/src/main/java/com/uber/hoodie/index/bloom/KeyLookupResult.java +++ b/hoodie-client/src/main/java/com/uber/hoodie/index/bloom/KeyLookupResult.java @@ -27,9 +27,11 @@ public class KeyLookupResult { private final String fileName; private final List matchingRecordKeys; + private final String partitionPath; - public KeyLookupResult(String fileName, List matchingRecordKeys) { + public KeyLookupResult(String fileName, String partitionPath, List matchingRecordKeys) { this.fileName = fileName; + this.partitionPath = partitionPath; this.matchingRecordKeys = matchingRecordKeys; } @@ -40,4 +42,8 @@ public class KeyLookupResult { public List getMatchingRecordKeys() { return matchingRecordKeys; } + + public String getPartitionPath() { + return partitionPath; + } } diff --git a/hoodie-client/src/test/java/com/uber/hoodie/index/bloom/TestHoodieBloomIndex.java b/hoodie-client/src/test/java/com/uber/hoodie/index/bloom/TestHoodieBloomIndex.java index 277c9a392..6be53294c 100644 --- a/hoodie-client/src/test/java/com/uber/hoodie/index/bloom/TestHoodieBloomIndex.java +++ b/hoodie-client/src/test/java/com/uber/hoodie/index/bloom/TestHoodieBloomIndex.java @@ -127,43 +127,6 @@ public class TestHoodieBloomIndex { return config; } - @Test - public void testLoadUUIDsInMemory() throws IOException { - // Create one RDD of hoodie record - String recordStr1 = "{\"_row_key\":\"1eb5b87a-1feh-4edd-87b4-6ec96dc405a0\"," - + "\"time\":\"2016-01-31T03:16:41.415Z\",\"number\":12}"; - String recordStr2 = "{\"_row_key\":\"2eb5b87b-1feu-4edd-87b4-6ec96dc405a0\"," - + "\"time\":\"2016-01-31T03:20:41.415Z\",\"number\":100}"; - String recordStr3 = "{\"_row_key\":\"3eb5b87c-1fej-4edd-87b4-6ec96dc405a0\"," - + "\"time\":\"2016-01-31T03:16:41.415Z\",\"number\":15}"; - String recordStr4 = "{\"_row_key\":\"4eb5b87c-1fej-4edd-87b4-6ec96dc405a0\"," - + "\"time\":\"2015-01-31T03:16:41.415Z\",\"number\":32}"; - - TestRawTripPayload rowChange1 = new TestRawTripPayload(recordStr1); - HoodieRecord record1 = new HoodieRecord(new HoodieKey(rowChange1.getRowKey(), rowChange1.getPartitionPath()), - rowChange1); - TestRawTripPayload rowChange2 = new TestRawTripPayload(recordStr2); - HoodieRecord record2 = new HoodieRecord(new HoodieKey(rowChange2.getRowKey(), rowChange2.getPartitionPath()), - rowChange2); - TestRawTripPayload rowChange3 = new TestRawTripPayload(recordStr3); - HoodieRecord record3 = new HoodieRecord(new HoodieKey(rowChange3.getRowKey(), rowChange3.getPartitionPath()), - rowChange3); - TestRawTripPayload rowChange4 = new TestRawTripPayload(recordStr4); - HoodieRecord record4 = new HoodieRecord(new HoodieKey(rowChange4.getRowKey(), rowChange4.getPartitionPath()), - rowChange4); - - JavaRDD recordRDD = jsc.parallelize(Arrays.asList(record1, record2, record3, record4)); - - // Load to memory - Map> map = recordRDD.mapToPair( - record -> new Tuple2<>(record.getPartitionPath(), record.getRecordKey())).groupByKey().collectAsMap(); - assertEquals(map.size(), 2); - List list1 = Lists.newArrayList(map.get("2016/01/31")); - List list2 = Lists.newArrayList(map.get("2015/01/31")); - assertEquals(list1.size(), 3); - assertEquals(list2.size(), 1); - } - @Test public void testLoadInvolvedFiles() throws IOException { HoodieWriteConfig config = makeConfig(); @@ -354,14 +317,14 @@ public class TestHoodieBloomIndex { String rowKey1 = UUID.randomUUID().toString(); String rowKey2 = UUID.randomUUID().toString(); String rowKey3 = UUID.randomUUID().toString(); - String rowKey4 = UUID.randomUUID().toString(); String recordStr1 = "{\"_row_key\":\"" + rowKey1 + "\"," + "\"time\":\"2016-01-31T03:16:41.415Z\",\"number\":12}"; String recordStr2 = "{\"_row_key\":\"" + rowKey2 + "\"," + "\"time\":\"2016-01-31T03:20:41.415Z\",\"number\":100}"; String recordStr3 = "{\"_row_key\":\"" + rowKey3 + "\"," + "\"time\":\"2016-01-31T03:16:41.415Z\",\"number\":15}"; - String recordStr4 = "{\"_row_key\":\"" + rowKey4 + "\"," + // place same row key under a different partition. + String recordStr4 = "{\"_row_key\":\"" + rowKey1 + "\"," + "\"time\":\"2015-01-31T03:16:41.415Z\",\"number\":32}"; TestRawTripPayload rowChange1 = new TestRawTripPayload(recordStr1); HoodieRecord record1 = new HoodieRecord(new HoodieKey(rowChange1.getRowKey(), rowChange1.getPartitionPath()), @@ -408,13 +371,15 @@ public class TestHoodieBloomIndex { // Check results for (HoodieRecord record : taggedRecordRDD.collect()) { if (record.getRecordKey().equals(rowKey1)) { - assertTrue(record.getCurrentLocation().getFileId().equals(FSUtils.getFileId(filename1))); + if (record.getPartitionPath().equals("2015/01/31")) { + assertTrue(record.getCurrentLocation().getFileId().equals(FSUtils.getFileId(filename3))); + } else { + assertTrue(record.getCurrentLocation().getFileId().equals(FSUtils.getFileId(filename1))); + } } else if (record.getRecordKey().equals(rowKey2)) { assertTrue(record.getCurrentLocation().getFileId().equals(FSUtils.getFileId(filename2))); } else if (record.getRecordKey().equals(rowKey3)) { assertTrue(!record.isCurrentLocationKnown()); - } else if (record.getRecordKey().equals(rowKey4)) { - assertTrue(record.getCurrentLocation().getFileId().equals(FSUtils.getFileId(filename3))); } } } @@ -429,7 +394,8 @@ public class TestHoodieBloomIndex { + "\"time\":\"2016-01-31T03:20:41.415Z\",\"number\":100}"; String recordStr3 = "{\"_row_key\":\"3eb5b87c-1fej-4edd-87b4-6ec96dc405a0\"," + "\"time\":\"2016-01-31T03:16:41.415Z\",\"number\":15}"; - String recordStr4 = "{\"_row_key\":\"4eb5b87c-1fej-4edd-87b4-6ec96dc405a0\"," + // record key same as recordStr2 + String recordStr4 = "{\"_row_key\":\"2eb5b87b-1feu-4edd-87b4-6ec96dc405a0\"," + "\"time\":\"2015-01-31T03:16:41.415Z\",\"number\":32}"; TestRawTripPayload rowChange1 = new TestRawTripPayload(recordStr1); HoodieKey key1 = new HoodieKey(rowChange1.getRowKey(), rowChange1.getPartitionPath()); @@ -439,7 +405,6 @@ public class TestHoodieBloomIndex { HoodieRecord record2 = new HoodieRecord(key2, rowChange2); TestRawTripPayload rowChange3 = new TestRawTripPayload(recordStr3); HoodieKey key3 = new HoodieKey(rowChange3.getRowKey(), rowChange3.getPartitionPath()); - HoodieRecord record3 = new HoodieRecord(key3, rowChange3); TestRawTripPayload rowChange4 = new TestRawTripPayload(recordStr4); HoodieKey key4 = new HoodieKey(rowChange4.getRowKey(), rowChange4.getPartitionPath()); HoodieRecord record4 = new HoodieRecord(key4, rowChange4); @@ -481,13 +446,13 @@ public class TestHoodieBloomIndex { } else if (record._1.getRecordKey().equals("2eb5b87b-1feu-4edd-87b4-6ec96dc405a0")) { assertTrue(record._2.isPresent()); Path path2 = new Path(record._2.get()); - assertEquals(FSUtils.getFileId(filename2), FSUtils.getFileId(path2.getName())); + if (record._1.getPartitionPath().equals("2015/01/31")) { + assertEquals(FSUtils.getFileId(filename3), FSUtils.getFileId(path2.getName())); + } else { + assertEquals(FSUtils.getFileId(filename2), FSUtils.getFileId(path2.getName())); + } } else if (record._1.getRecordKey().equals("3eb5b87c-1fej-4edd-87b4-6ec96dc405a0")) { assertTrue(!record._2.isPresent()); - } else if (record._1.getRecordKey().equals("4eb5b87c-1fej-4edd-87b4-6ec96dc405a0")) { - assertTrue(record._2.isPresent()); - Path path3 = new Path(record._2.get()); - assertEquals(FSUtils.getFileId(filename3), FSUtils.getFileId(path3.getName())); } } }