From b791473a6d663cd4ac2ce535aaa62bcd89fea3af Mon Sep 17 00:00:00 2001 From: Vinoth Chandar Date: Tue, 21 May 2019 15:37:38 -0700 Subject: [PATCH] Introduce HoodieReadHandle abstraction into index - Generalized BloomIndex to work with file ids instead of paths - Abstracted away Bloom filter checking into HoodieLookupHandle - Abstracted away range information retrieval into HoodieRangeInfoHandle --- .../com/uber/hoodie/HoodieReadClient.java | 18 +- .../func/CopyOnWriteLazyInsertIterable.java | 5 +- .../com/uber/hoodie/index/HoodieIndex.java | 9 +- .../uber/hoodie/index/InMemoryHashIndex.java | 3 +- .../index/bloom/BloomIndexFileInfo.java | 22 +-- .../hoodie/index/bloom/HoodieBloomIndex.java | 95 +++++---- .../bloom/HoodieBloomIndexCheckFunction.java | 138 +++---------- .../index/bloom/HoodieGlobalBloomIndex.java | 5 +- ...ntervalTreeBasedGlobalIndexFileFilter.java | 4 +- .../IntervalTreeBasedIndexFileFilter.java | 4 +- .../bloom/ListBasedGlobalIndexFileFilter.java | 2 +- .../index/bloom/ListBasedIndexFileFilter.java | 2 +- .../uber/hoodie/index/hbase/HBaseIndex.java | 7 +- .../uber/hoodie/io/HoodieAppendHandle.java | 16 +- .../uber/hoodie/io/HoodieCreateHandle.java | 4 +- .../com/uber/hoodie/io/HoodieIOHandle.java | 152 +-------------- .../uber/hoodie/io/HoodieKeyLookupHandle.java | 158 +++++++++++++++ .../com/uber/hoodie/io/HoodieMergeHandle.java | 76 +++++++- .../uber/hoodie/io/HoodieRangeInfoHandle.java | 43 ++++ .../com/uber/hoodie/io/HoodieReadHandle.java | 59 ++++++ .../com/uber/hoodie/io/HoodieWriteHandle.java | 183 ++++++++++++++++++ .../hoodie/table/HoodieCopyOnWriteTable.java | 2 +- .../com/uber/hoodie/table/WorkloadStat.java | 2 +- .../com/uber/hoodie/TestHoodieClientBase.java | 2 +- .../com/uber/hoodie/index/TestHbaseIndex.java | 2 +- .../index/bloom/TestHoodieBloomIndex.java | 27 +-- .../bloom/TestHoodieGlobalBloomIndex.java | 41 ++-- .../common/model/HoodieRecordLocation.java | 16 +- .../hoodie/common/model/HoodieTestUtils.java | 4 +- .../collection/TestExternalSpillableMap.java | 2 +- 30 files changed, 697 insertions(+), 406 deletions(-) create mode 100644 hoodie-client/src/main/java/com/uber/hoodie/io/HoodieKeyLookupHandle.java create mode 100644 hoodie-client/src/main/java/com/uber/hoodie/io/HoodieRangeInfoHandle.java create mode 100644 hoodie-client/src/main/java/com/uber/hoodie/io/HoodieReadHandle.java create mode 100644 hoodie-client/src/main/java/com/uber/hoodie/io/HoodieWriteHandle.java diff --git a/hoodie-client/src/main/java/com/uber/hoodie/HoodieReadClient.java b/hoodie-client/src/main/java/com/uber/hoodie/HoodieReadClient.java index e5e72398a..a1e7ab743 100644 --- a/hoodie-client/src/main/java/com/uber/hoodie/HoodieReadClient.java +++ b/hoodie-client/src/main/java/com/uber/hoodie/HoodieReadClient.java @@ -20,6 +20,7 @@ package com.uber.hoodie; import com.google.common.base.Optional; import com.uber.hoodie.avro.model.HoodieCompactionPlan; +import com.uber.hoodie.common.model.HoodieDataFile; import com.uber.hoodie.common.model.HoodieKey; import com.uber.hoodie.common.model.HoodieRecord; import com.uber.hoodie.common.model.HoodieRecordPayload; @@ -119,15 +120,27 @@ public class HoodieReadClient extends AbstractHoo } } + private Optional convertToDataFilePath(Optional> partitionPathFileIDPair) { + if (partitionPathFileIDPair.isPresent()) { + HoodieDataFile dataFile = hoodieTable.getROFileSystemView() + .getLatestDataFile(partitionPathFileIDPair.get().getLeft(), partitionPathFileIDPair.get().getRight()).get(); + return Optional.of(dataFile.getPath()); + } else { + return Optional.absent(); + } + } + /** * Given a bunch of hoodie keys, fetches all the individual records out as a data frame * * @return a dataframe */ - public Dataset read(JavaRDD hoodieKeys, int parallelism) throws Exception { + public Dataset readROView(JavaRDD hoodieKeys, int parallelism) { assertSqlContext(); - JavaPairRDD> keyToFileRDD = index + JavaPairRDD>> lookupResultRDD = index .fetchRecordLocation(hoodieKeys, jsc, hoodieTable); + JavaPairRDD> keyToFileRDD = lookupResultRDD + .mapToPair(r -> new Tuple2<>(r._1, convertToDataFilePath(r._2))); List paths = keyToFileRDD.filter(keyFileTuple -> keyFileTuple._2().isPresent()) .map(keyFileTuple -> keyFileTuple._2().get()).collect(); @@ -144,7 +157,6 @@ public class HoodieReadClient extends AbstractHoo // Now, we need to further filter out, for only rows that match the supplied hoodie keys JavaRDD rowRDD = keyRowRDD.join(keyToFileRDD, parallelism).map(tuple -> tuple._2()._1()); - return sqlContextOpt.get().createDataFrame(rowRDD, schema); } diff --git a/hoodie-client/src/main/java/com/uber/hoodie/func/CopyOnWriteLazyInsertIterable.java b/hoodie-client/src/main/java/com/uber/hoodie/func/CopyOnWriteLazyInsertIterable.java index f2e98342e..f885ce286 100644 --- a/hoodie-client/src/main/java/com/uber/hoodie/func/CopyOnWriteLazyInsertIterable.java +++ b/hoodie-client/src/main/java/com/uber/hoodie/func/CopyOnWriteLazyInsertIterable.java @@ -26,7 +26,7 @@ import com.uber.hoodie.common.util.queue.BoundedInMemoryQueueConsumer; import com.uber.hoodie.config.HoodieWriteConfig; import com.uber.hoodie.exception.HoodieException; import com.uber.hoodie.io.HoodieCreateHandle; -import com.uber.hoodie.io.HoodieIOHandle; +import com.uber.hoodie.io.HoodieWriteHandle; import com.uber.hoodie.table.HoodieTable; import java.util.ArrayList; import java.util.Iterator; @@ -131,12 +131,11 @@ public class CopyOnWriteLazyInsertIterable extend BoundedInMemoryQueueConsumer, List> { protected final List statuses = new ArrayList<>(); - protected HoodieIOHandle handle; + protected HoodieWriteHandle handle; @Override protected void consumeOneRecord(HoodieInsertValueGenResult payload) { final HoodieRecord insertPayload = payload.record; - // lazily initialize the handle, for the first time if (handle == null) { handle = new HoodieCreateHandle(hoodieConfig, commitTime, hoodieTable, insertPayload.getPartitionPath(), diff --git a/hoodie-client/src/main/java/com/uber/hoodie/index/HoodieIndex.java b/hoodie-client/src/main/java/com/uber/hoodie/index/HoodieIndex.java index e3f357820..7f4eb6365 100644 --- a/hoodie-client/src/main/java/com/uber/hoodie/index/HoodieIndex.java +++ b/hoodie-client/src/main/java/com/uber/hoodie/index/HoodieIndex.java @@ -23,6 +23,7 @@ import com.uber.hoodie.WriteStatus; import com.uber.hoodie.common.model.HoodieKey; import com.uber.hoodie.common.model.HoodieRecord; import com.uber.hoodie.common.model.HoodieRecordPayload; +import com.uber.hoodie.common.util.collection.Pair; import com.uber.hoodie.config.HoodieWriteConfig; import com.uber.hoodie.exception.HoodieIndexException; import com.uber.hoodie.index.bloom.HoodieBloomIndex; @@ -63,12 +64,10 @@ public abstract class HoodieIndex implements Seri } /** - * Checks if the given [Keys] exists in the hoodie table and returns [Key, Optional[FullFilePath]] - * If the optional FullFilePath value is not present, then the key is not found. If the - * FullFilePath value is present, it is the path component (without scheme) of the URI underlying - * file + * Checks if the given [Keys] exists in the hoodie table and returns [Key, Optional[partitionPath, fileID]] + * If the optional is empty, then the key is not found. */ - public abstract JavaPairRDD> fetchRecordLocation( + public abstract JavaPairRDD>> fetchRecordLocation( JavaRDD hoodieKeys, final JavaSparkContext jsc, HoodieTable hoodieTable); /** diff --git a/hoodie-client/src/main/java/com/uber/hoodie/index/InMemoryHashIndex.java b/hoodie-client/src/main/java/com/uber/hoodie/index/InMemoryHashIndex.java index 672fa8410..61da5ab89 100644 --- a/hoodie-client/src/main/java/com/uber/hoodie/index/InMemoryHashIndex.java +++ b/hoodie-client/src/main/java/com/uber/hoodie/index/InMemoryHashIndex.java @@ -24,6 +24,7 @@ import com.uber.hoodie.common.model.HoodieKey; import com.uber.hoodie.common.model.HoodieRecord; import com.uber.hoodie.common.model.HoodieRecordLocation; import com.uber.hoodie.common.model.HoodieRecordPayload; +import com.uber.hoodie.common.util.collection.Pair; import com.uber.hoodie.config.HoodieWriteConfig; import com.uber.hoodie.table.HoodieTable; import java.util.ArrayList; @@ -55,7 +56,7 @@ public class InMemoryHashIndex extends HoodieInde } @Override - public JavaPairRDD> fetchRecordLocation(JavaRDD hoodieKeys, + public JavaPairRDD>> fetchRecordLocation(JavaRDD hoodieKeys, JavaSparkContext jsc, HoodieTable hoodieTable) { throw new UnsupportedOperationException("InMemory index does not implement check exist yet"); } diff --git a/hoodie-client/src/main/java/com/uber/hoodie/index/bloom/BloomIndexFileInfo.java b/hoodie-client/src/main/java/com/uber/hoodie/index/bloom/BloomIndexFileInfo.java index 111426207..2a56d8a25 100644 --- a/hoodie-client/src/main/java/com/uber/hoodie/index/bloom/BloomIndexFileInfo.java +++ b/hoodie-client/src/main/java/com/uber/hoodie/index/bloom/BloomIndexFileInfo.java @@ -22,30 +22,30 @@ import com.google.common.base.Objects; import java.io.Serializable; /** - * Metadata about a given file, useful for index lookup + * Metadata about a given file group, useful for index lookup */ public class BloomIndexFileInfo implements Serializable { - private final String fileName; + private final String fileId; private final String minRecordKey; private final String maxRecordKey; - public BloomIndexFileInfo(String fileName, String minRecordKey, String maxRecordKey) { - this.fileName = fileName; + public BloomIndexFileInfo(String fileId, String minRecordKey, String maxRecordKey) { + this.fileId = fileId; this.minRecordKey = minRecordKey; this.maxRecordKey = maxRecordKey; } - public BloomIndexFileInfo(String fileName) { - this.fileName = fileName; + public BloomIndexFileInfo(String fileId) { + this.fileId = fileId; this.minRecordKey = null; this.maxRecordKey = null; } - public String getFileName() { - return fileName; + public String getFileId() { + return fileId; } public String getMinRecordKey() { @@ -77,19 +77,19 @@ public class BloomIndexFileInfo implements Serializable { } BloomIndexFileInfo that = (BloomIndexFileInfo) o; - return Objects.equal(that.fileName, fileName) && Objects.equal(that.minRecordKey, minRecordKey) + return Objects.equal(that.fileId, fileId) && Objects.equal(that.minRecordKey, minRecordKey) && Objects.equal(that.maxRecordKey, maxRecordKey); } @Override public int hashCode() { - return Objects.hashCode(fileName, minRecordKey, maxRecordKey); + return Objects.hashCode(fileId, minRecordKey, maxRecordKey); } public String toString() { final StringBuilder sb = new StringBuilder("BloomIndexFileInfo {"); - sb.append(" fileName=").append(fileName); + sb.append(" fileId=").append(fileId); sb.append(" minRecordKey=").append(minRecordKey); sb.append(" maxRecordKey=").append(maxRecordKey); sb.append('}'); diff --git a/hoodie-client/src/main/java/com/uber/hoodie/index/bloom/HoodieBloomIndex.java b/hoodie-client/src/main/java/com/uber/hoodie/index/bloom/HoodieBloomIndex.java index 0b93b9c2c..195738ad8 100644 --- a/hoodie-client/src/main/java/com/uber/hoodie/index/bloom/HoodieBloomIndex.java +++ b/hoodie-client/src/main/java/com/uber/hoodie/index/bloom/HoodieBloomIndex.java @@ -25,26 +25,22 @@ import static java.util.stream.Collectors.toList; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Optional; import com.uber.hoodie.WriteStatus; -import com.uber.hoodie.common.model.HoodieDataFile; import com.uber.hoodie.common.model.HoodieKey; import com.uber.hoodie.common.model.HoodieRecord; import com.uber.hoodie.common.model.HoodieRecordLocation; import com.uber.hoodie.common.model.HoodieRecordPayload; -import com.uber.hoodie.common.table.HoodieTableMetaClient; import com.uber.hoodie.common.table.timeline.HoodieInstant; -import com.uber.hoodie.common.util.FSUtils; -import com.uber.hoodie.common.util.ParquetUtils; import com.uber.hoodie.common.util.collection.Pair; import com.uber.hoodie.config.HoodieWriteConfig; import com.uber.hoodie.exception.MetadataNotFoundException; import com.uber.hoodie.index.HoodieIndex; +import com.uber.hoodie.io.HoodieRangeInfoHandle; import com.uber.hoodie.table.HoodieTable; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.stream.Collectors; -import org.apache.hadoop.fs.Path; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; import org.apache.spark.Partitioner; @@ -85,7 +81,8 @@ public class HoodieBloomIndex extends HoodieIndex .mapToPair(record -> new Tuple2<>(record.getPartitionPath(), record.getRecordKey())); // Lookup indexes for all the partition/recordkey pair - JavaPairRDD keyFilenamePairRDD = lookupIndex(partitionRecordKeyPairRDD, jsc, hoodieTable); + JavaPairRDD keyFilenamePairRDD = lookupIndex(partitionRecordKeyPairRDD, jsc, + hoodieTable); // Cache the result, for subsequent stages. if (config.getBloomIndexUseCaching()) { @@ -109,27 +106,33 @@ public class HoodieBloomIndex extends HoodieIndex return taggedRecordRDD; } - public JavaPairRDD> fetchRecordLocation(JavaRDD hoodieKeys, + /** + * Returns an RDD mapping each HoodieKey with a partitionPath/fileID which contains it. Optional.Empty if the key is + * not found. + * + * @param hoodieKeys keys to lookup + * @param jsc spark context + * @param hoodieTable hoodie table object + */ + @Override + public JavaPairRDD>> fetchRecordLocation(JavaRDD hoodieKeys, JavaSparkContext jsc, HoodieTable hoodieTable) { JavaPairRDD partitionRecordKeyPairRDD = hoodieKeys .mapToPair(key -> new Tuple2<>(key.getPartitionPath(), key.getRecordKey())); // Lookup indexes for all the partition/recordkey pair - JavaPairRDD keyFilenamePairRDD = lookupIndex(partitionRecordKeyPairRDD, jsc, hoodieTable); + JavaPairRDD recordKeyLocationRDD = lookupIndex(partitionRecordKeyPairRDD, jsc, + hoodieTable); JavaPairRDD keyHoodieKeyPairRDD = hoodieKeys.mapToPair(key -> new Tuple2<>(key, null)); - return keyHoodieKeyPairRDD.leftOuterJoin(keyFilenamePairRDD).mapToPair(keyPathTuple -> { - Optional recordLocationPath; - if (keyPathTuple._2._2.isPresent()) { - String fileName = keyPathTuple._2._2.get(); - String partitionPath = keyPathTuple._1.getPartitionPath(); - recordLocationPath = Optional - .of(new Path(new Path(hoodieTable.getMetaClient().getBasePath(), partitionPath), fileName) - .toUri().getPath()); + return keyHoodieKeyPairRDD.leftOuterJoin(recordKeyLocationRDD).mapToPair(keyLoc -> { + Optional> partitionPathFileidPair; + if (keyLoc._2._2.isPresent()) { + partitionPathFileidPair = Optional.of(Pair.of(keyLoc._1().getPartitionPath(), keyLoc._2._2.get().getFileId())); } else { - recordLocationPath = Optional.absent(); + partitionPathFileidPair = Optional.absent(); } - return new Tuple2<>(keyPathTuple._1, recordLocationPath); + return new Tuple2<>(keyLoc._1, partitionPathFileidPair); }); } @@ -137,9 +140,9 @@ public class HoodieBloomIndex extends HoodieIndex * Lookup the location for each record key and return the pair for all record keys already * present and drop the record keys if not present */ - private JavaPairRDD lookupIndex( - JavaPairRDD partitionRecordKeyPairRDD, final JavaSparkContext - jsc, final HoodieTable hoodieTable) { + private JavaPairRDD lookupIndex( + JavaPairRDD partitionRecordKeyPairRDD, final JavaSparkContext jsc, + final HoodieTable hoodieTable) { // Obtain records per partition, in the incoming records Map recordsPerPartition = partitionRecordKeyPairRDD.countByKey(); List affectedPartitionPathList = new ArrayList<>(recordsPerPartition.keySet()); @@ -157,7 +160,7 @@ public class HoodieBloomIndex extends HoodieIndex int safeParallelism = computeSafeParallelism(recordsPerPartition, comparisonsPerFileGroup); int joinParallelism = determineParallelism(partitionRecordKeyPairRDD.partitions().size(), safeParallelism); return findMatchingFilesForRecordKeys(partitionToFileInfo, partitionRecordKeyPairRDD, joinParallelism, - hoodieTable.getMetaClient(), comparisonsPerFileGroup); + hoodieTable, comparisonsPerFileGroup); } /** @@ -178,7 +181,7 @@ public class HoodieBloomIndex extends HoodieIndex partitionToFileInfo.entrySet().stream().forEach(e -> { for (BloomIndexFileInfo fileInfo : e.getValue()) { //each file needs to be compared against all the records coming into the partition - fileToComparisons.put(fileInfo.getFileName(), recordsPerPartition.get(e.getKey())); + fileToComparisons.put(fileInfo.getFileId(), recordsPerPartition.get(e.getKey())); } }); } @@ -227,35 +230,35 @@ public class HoodieBloomIndex extends HoodieIndex final HoodieTable hoodieTable) { // Obtain the latest data files from all the partitions. - List> dataFilesList = jsc - .parallelize(partitions, Math.max(partitions.size(), 1)).flatMapToPair(partitionPath -> { + List> partitionPathFileIDList = jsc + .parallelize(partitions, Math.max(partitions.size(), 1)) + .flatMap(partitionPath -> { java.util.Optional latestCommitTime = hoodieTable.getMetaClient().getCommitsTimeline() .filterCompletedInstants().lastInstant(); - List> filteredFiles = new ArrayList<>(); + List> filteredFiles = new ArrayList<>(); if (latestCommitTime.isPresent()) { filteredFiles = hoodieTable.getROFileSystemView() .getLatestDataFilesBeforeOrOn(partitionPath, latestCommitTime.get().getTimestamp()) - .map(f -> new Tuple2<>(partitionPath, f)).collect(toList()); + .map(f -> Pair.of(partitionPath, f.getFileId())).collect(toList()); } return filteredFiles.iterator(); }).collect(); if (config.getBloomIndexPruneByRanges()) { // also obtain file ranges, if range pruning is enabled - return jsc.parallelize(dataFilesList, Math.max(dataFilesList.size(), 1)).mapToPair(ft -> { + return jsc.parallelize(partitionPathFileIDList, Math.max(partitionPathFileIDList.size(), 1)).mapToPair(pf -> { try { - String[] minMaxKeys = ParquetUtils - .readMinMaxRecordKeys(hoodieTable.getHadoopConf(), new Path(ft._2().getPath())); - return new Tuple2<>(ft._1(), - new BloomIndexFileInfo(ft._2().getFileName(), minMaxKeys[0], minMaxKeys[1])); + HoodieRangeInfoHandle rangeInfoHandle = new HoodieRangeInfoHandle(config, hoodieTable, pf); + String[] minMaxKeys = rangeInfoHandle.getMinMaxKeys(); + return new Tuple2<>(pf.getKey(), new BloomIndexFileInfo(pf.getValue(), minMaxKeys[0], minMaxKeys[1])); } catch (MetadataNotFoundException me) { - logger.warn("Unable to find range metadata in file :" + ft._2()); - return new Tuple2<>(ft._1(), new BloomIndexFileInfo(ft._2().getFileName())); + logger.warn("Unable to find range metadata in file :" + pf); + return new Tuple2<>(pf.getKey(), new BloomIndexFileInfo(pf.getValue())); } }).collect(); } else { - return dataFilesList.stream() - .map(ft -> new Tuple2<>(ft._1(), new BloomIndexFileInfo(ft._2().getFileName()))) + return partitionPathFileIDList.stream() + .map(pf -> new Tuple2<>(pf.getKey(), new BloomIndexFileInfo(pf.getValue()))) .collect(toList()); } } @@ -324,9 +327,9 @@ public class HoodieBloomIndex extends HoodieIndex * parallelism for tagging location */ @VisibleForTesting - JavaPairRDD findMatchingFilesForRecordKeys( + JavaPairRDD findMatchingFilesForRecordKeys( final Map> partitionToFileIndexInfo, - JavaPairRDD partitionRecordKeyPairRDD, int shuffleParallelism, HoodieTableMetaClient metaClient, + JavaPairRDD partitionRecordKeyPairRDD, int shuffleParallelism, HoodieTable hoodieTable, Map fileGroupToComparisons) { JavaRDD> fileComparisonsRDD = explodeRecordRDDWithFileComparisons(partitionToFileIndexInfo, partitionRecordKeyPairRDD); @@ -347,17 +350,18 @@ public class HoodieBloomIndex extends HoodieIndex } return fileComparisonsRDD - .mapPartitionsWithIndex(new HoodieBloomIndexCheckFunction(metaClient, config.getBasePath()), true) + .mapPartitionsWithIndex(new HoodieBloomIndexCheckFunction(hoodieTable, config), true) .flatMap(List::iterator) .filter(lr -> lr.getMatchingRecordKeys().size() > 0) .flatMapToPair(lookupResult -> lookupResult.getMatchingRecordKeys().stream() .map(recordKey -> new Tuple2<>(new HoodieKey(recordKey, lookupResult.getPartitionPath()), - lookupResult.getFileName())) + new HoodieRecordLocation(lookupResult.getBaseInstantTime(), lookupResult.getFileId()))) .collect(Collectors.toList()) .iterator()); } - HoodieRecord getTaggedRecord(HoodieRecord inputRecord, org.apache.spark.api.java.Optional location) { + HoodieRecord getTaggedRecord(HoodieRecord inputRecord, + org.apache.spark.api.java.Optional location) { HoodieRecord record = inputRecord; if (location.isPresent()) { // When you have a record in multiple files in the same partition, then rowKeyRecordPairRDD @@ -366,11 +370,7 @@ public class HoodieBloomIndex extends HoodieIndex // currentLocation 2 times and it will fail the second time. So creating a new in memory // copy of the hoodie record. record = new HoodieRecord<>(inputRecord); - String filename = location.get(); - if (filename != null && !filename.isEmpty()) { - record.setCurrentLocation(new HoodieRecordLocation(FSUtils.getCommitTime(filename), - FSUtils.getFileId(filename))); - } + record.setCurrentLocation(location.get()); } return record; } @@ -379,10 +379,9 @@ public class HoodieBloomIndex extends HoodieIndex * Tag the back to the original HoodieRecord RDD. */ protected JavaRDD> tagLocationBacktoRecords( - JavaPairRDD keyFilenamePairRDD, JavaRDD> recordRDD) { + JavaPairRDD keyFilenamePairRDD, JavaRDD> recordRDD) { JavaPairRDD> keyRecordPairRDD = recordRDD .mapToPair(record -> new Tuple2<>(record.getKey(), record)); - // Here as the recordRDD might have more data than rowKeyRDD (some rowKeys' fileId is null), // so we do left outer join. return keyRecordPairRDD.leftOuterJoin(keyFilenamePairRDD).values().map(v1 -> getTaggedRecord(v1._1, v1._2)); diff --git a/hoodie-client/src/main/java/com/uber/hoodie/index/bloom/HoodieBloomIndexCheckFunction.java b/hoodie-client/src/main/java/com/uber/hoodie/index/bloom/HoodieBloomIndexCheckFunction.java index 381e7456c..3dd3df5cd 100644 --- a/hoodie-client/src/main/java/com/uber/hoodie/index/bloom/HoodieBloomIndexCheckFunction.java +++ b/hoodie-client/src/main/java/com/uber/hoodie/index/bloom/HoodieBloomIndexCheckFunction.java @@ -18,23 +18,18 @@ package com.uber.hoodie.index.bloom; -import com.uber.hoodie.common.BloomFilter; import com.uber.hoodie.common.model.HoodieKey; -import com.uber.hoodie.common.table.HoodieTableMetaClient; -import com.uber.hoodie.common.util.HoodieTimer; -import com.uber.hoodie.common.util.ParquetUtils; +import com.uber.hoodie.common.util.collection.Pair; +import com.uber.hoodie.config.HoodieWriteConfig; import com.uber.hoodie.exception.HoodieException; import com.uber.hoodie.exception.HoodieIndexException; import com.uber.hoodie.func.LazyIterableIterator; +import com.uber.hoodie.io.HoodieKeyLookupHandle; +import com.uber.hoodie.io.HoodieKeyLookupHandle.KeyLookupResult; +import com.uber.hoodie.table.HoodieTable; import java.util.ArrayList; -import java.util.HashSet; import java.util.Iterator; import java.util.List; -import java.util.Set; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.Path; -import org.apache.log4j.LogManager; -import org.apache.log4j.Logger; import org.apache.spark.api.java.function.Function2; import scala.Tuple2; @@ -43,150 +38,69 @@ import scala.Tuple2; * actual files */ public class HoodieBloomIndexCheckFunction implements - Function2>, - Iterator>> { + Function2>, Iterator>> { - private static Logger logger = LogManager.getLogger(HoodieBloomIndexCheckFunction.class); + private final HoodieTable hoodieTable; - private final String basePath; + private final HoodieWriteConfig config; - private final HoodieTableMetaClient metaClient; - - public HoodieBloomIndexCheckFunction(HoodieTableMetaClient metaClient, String basePath) { - this.metaClient = metaClient; - this.basePath = basePath; - } - - /** - * Given a list of row keys and one file, return only row keys existing in that file. - */ - public static List checkCandidatesAgainstFile(Configuration configuration, - List candidateRecordKeys, Path filePath) throws HoodieIndexException { - List foundRecordKeys = new ArrayList<>(); - try { - // Load all rowKeys from the file, to double-confirm - if (!candidateRecordKeys.isEmpty()) { - HoodieTimer timer = new HoodieTimer().startTimer(); - Set fileRowKeys = ParquetUtils.filterParquetRowKeys(configuration, filePath, - new HashSet<>(candidateRecordKeys)); - foundRecordKeys.addAll(fileRowKeys); - logger.info(String.format("Checked keys against file %s, in %d ms. #candidates (%d) #found (%d)", filePath, - timer.endTimer(), candidateRecordKeys.size(), foundRecordKeys.size())); - if (logger.isDebugEnabled()) { - logger.debug("Keys matching for file " + filePath + " => " + foundRecordKeys); - } - } - } catch (Exception e) { - throw new HoodieIndexException("Error checking candidate keys against file.", e); - } - return foundRecordKeys; + public HoodieBloomIndexCheckFunction(HoodieTable hoodieTable, HoodieWriteConfig config) { + this.hoodieTable = hoodieTable; + this.config = config; } @Override public Iterator> call(Integer partition, - Iterator> fileParitionRecordKeyTripletItr) - throws Exception { + Iterator> fileParitionRecordKeyTripletItr) { return new LazyKeyCheckIterator(fileParitionRecordKeyTripletItr); } - class LazyKeyCheckIterator extends - LazyIterableIterator, List> { + class LazyKeyCheckIterator extends LazyIterableIterator, List> { - private List candidateRecordKeys; - - private BloomFilter bloomFilter; - - private String currentFile; - - private String currentPartitionPath; - - private long totalKeysChecked; + private HoodieKeyLookupHandle keyLookupHandle; LazyKeyCheckIterator( Iterator> filePartitionRecordKeyTripletItr) { super(filePartitionRecordKeyTripletItr); - currentFile = null; - candidateRecordKeys = new ArrayList<>(); - bloomFilter = null; - currentPartitionPath = null; - totalKeysChecked = 0; } @Override protected void start() { } - private void initState(String fileName, String partitionPath) throws HoodieIndexException { - try { - Path filePath = new Path(basePath + "/" + partitionPath + "/" + fileName); - HoodieTimer timer = new HoodieTimer().startTimer(); - bloomFilter = ParquetUtils.readBloomFilterFromParquetMetadata(metaClient.getHadoopConf(), filePath); - logger.info(String.format("Read bloom filter from %s/%s in %d ms", partitionPath, fileName, timer.endTimer())); - candidateRecordKeys = new ArrayList<>(); - currentFile = fileName; - currentPartitionPath = partitionPath; - totalKeysChecked = 0; - } catch (Exception e) { - throw new HoodieIndexException("Error checking candidate keys against file.", e); - } - } - - // check record key against bloom filter of current file & add to possible keys if needed - private void checkAndAddCandidates(String recordKey) { - if (bloomFilter.mightContain(recordKey)) { - if (logger.isDebugEnabled()) { - logger.debug("Record key " + recordKey + " matches bloom filter in file " + currentPartitionPath - + "/" + currentFile); - } - candidateRecordKeys.add(recordKey); - } - totalKeysChecked++; - } - - private List checkAgainstCurrentFile() { - Path filePath = new Path(basePath + "/" + currentPartitionPath + "/" + currentFile); - if (logger.isDebugEnabled()) { - logger.debug("#The candidate row keys for " + filePath + " => " + candidateRecordKeys); - } - List matchingKeys = checkCandidatesAgainstFile(metaClient.getHadoopConf(), candidateRecordKeys, filePath); - logger.info(String.format("Total records (%d), bloom filter candidates (%d)/fp(%d), actual matches (%d)", - totalKeysChecked, candidateRecordKeys.size(), candidateRecordKeys.size() - matchingKeys.size(), - matchingKeys.size())); - return matchingKeys; - } - @Override - protected List computeNext() { + protected List computeNext() { - List ret = new ArrayList<>(); + List ret = new ArrayList<>(); try { // process one file in each go. while (inputItr.hasNext()) { Tuple2 currentTuple = inputItr.next(); - String fileName = currentTuple._1; + String fileId = currentTuple._1; String partitionPath = currentTuple._2.getPartitionPath(); String recordKey = currentTuple._2.getRecordKey(); + Pair partitionPathFilePair = Pair.of(partitionPath, fileId); // lazily init state - if (currentFile == null) { - initState(fileName, partitionPath); + if (keyLookupHandle == null) { + keyLookupHandle = new HoodieKeyLookupHandle(config, hoodieTable, partitionPathFilePair); } // if continue on current file - if (fileName.equals(currentFile)) { - checkAndAddCandidates(recordKey); + if (keyLookupHandle.getPartitionPathFilePair().equals(partitionPathFilePair)) { + keyLookupHandle.addKey(recordKey); } else { // do the actual checking of file & break out - ret.add(new KeyLookupResult(currentFile, currentPartitionPath, checkAgainstCurrentFile())); - initState(fileName, partitionPath); - checkAndAddCandidates(recordKey); + ret.add(keyLookupHandle.getLookupResult()); + keyLookupHandle = new HoodieKeyLookupHandle(config, hoodieTable, partitionPathFilePair); + keyLookupHandle.addKey(recordKey); break; } } // handle case, where we ran out of input, close pending work, update return val if (!inputItr.hasNext()) { - ret.add(new KeyLookupResult(currentFile, currentPartitionPath, checkAgainstCurrentFile())); + ret.add(keyLookupHandle.getLookupResult()); } } catch (Throwable e) { if (e instanceof HoodieException) { diff --git a/hoodie-client/src/main/java/com/uber/hoodie/index/bloom/HoodieGlobalBloomIndex.java b/hoodie-client/src/main/java/com/uber/hoodie/index/bloom/HoodieGlobalBloomIndex.java index 2331729f9..4ec51d90a 100644 --- a/hoodie-client/src/main/java/com/uber/hoodie/index/bloom/HoodieGlobalBloomIndex.java +++ b/hoodie-client/src/main/java/com/uber/hoodie/index/bloom/HoodieGlobalBloomIndex.java @@ -21,6 +21,7 @@ package com.uber.hoodie.index.bloom; import com.google.common.annotations.VisibleForTesting; import com.uber.hoodie.common.model.HoodieKey; import com.uber.hoodie.common.model.HoodieRecord; +import com.uber.hoodie.common.model.HoodieRecordLocation; import com.uber.hoodie.common.model.HoodieRecordPayload; import com.uber.hoodie.common.table.HoodieTableMetaClient; import com.uber.hoodie.common.util.FSUtils; @@ -83,7 +84,7 @@ public class HoodieGlobalBloomIndex extends Hoodi JavaPairRDD partitionRecordKeyPairRDD) { Map indexToPartitionMap = new HashMap<>(); for (Entry> entry : partitionToFileIndexInfo.entrySet()) { - entry.getValue().forEach(indexFile -> indexToPartitionMap.put(indexFile.getFileName(), entry.getKey())); + entry.getValue().forEach(indexFile -> indexToPartitionMap.put(indexFile.getFileId(), entry.getKey())); } IndexFileFilter indexFileFilter = config.getBloomIndexPruneByRanges() @@ -106,7 +107,7 @@ public class HoodieGlobalBloomIndex extends Hoodi */ @Override protected JavaRDD> tagLocationBacktoRecords( - JavaPairRDD keyFilenamePairRDD, JavaRDD> recordRDD) { + JavaPairRDD keyFilenamePairRDD, JavaRDD> recordRDD) { JavaPairRDD> rowKeyRecordPairRDD = recordRDD .mapToPair(record -> new Tuple2<>(record.getRecordKey(), record)); diff --git a/hoodie-client/src/main/java/com/uber/hoodie/index/bloom/IntervalTreeBasedGlobalIndexFileFilter.java b/hoodie-client/src/main/java/com/uber/hoodie/index/bloom/IntervalTreeBasedGlobalIndexFileFilter.java index c03de0850..c675ffcec 100644 --- a/hoodie-client/src/main/java/com/uber/hoodie/index/bloom/IntervalTreeBasedGlobalIndexFileFilter.java +++ b/hoodie-client/src/main/java/com/uber/hoodie/index/bloom/IntervalTreeBasedGlobalIndexFileFilter.java @@ -50,9 +50,9 @@ class IntervalTreeBasedGlobalIndexFileFilter implements IndexFileFilter { allIndexFiles.forEach(indexFile -> { if (indexFile.hasKeyRanges()) { indexLookUpTree.insert(new KeyRangeNode(indexFile.getMinRecordKey(), - indexFile.getMaxRecordKey(), indexFile.getFileName())); + indexFile.getMaxRecordKey(), indexFile.getFileId())); } else { - filesWithNoRanges.add(indexFile.getFileName()); + filesWithNoRanges.add(indexFile.getFileId()); } }); } diff --git a/hoodie-client/src/main/java/com/uber/hoodie/index/bloom/IntervalTreeBasedIndexFileFilter.java b/hoodie-client/src/main/java/com/uber/hoodie/index/bloom/IntervalTreeBasedIndexFileFilter.java index 5e34d54ae..412e778f3 100644 --- a/hoodie-client/src/main/java/com/uber/hoodie/index/bloom/IntervalTreeBasedIndexFileFilter.java +++ b/hoodie-client/src/main/java/com/uber/hoodie/index/bloom/IntervalTreeBasedIndexFileFilter.java @@ -49,12 +49,12 @@ class IntervalTreeBasedIndexFileFilter implements IndexFileFilter { bloomIndexFiles.forEach(indexFileInfo -> { if (indexFileInfo.hasKeyRanges()) { lookUpTree.insert(new KeyRangeNode(indexFileInfo.getMinRecordKey(), - indexFileInfo.getMaxRecordKey(), indexFileInfo.getFileName())); + indexFileInfo.getMaxRecordKey(), indexFileInfo.getFileId())); } else { if (!partitionToFilesWithNoRanges.containsKey(partition)) { partitionToFilesWithNoRanges.put(partition, new HashSet<>()); } - partitionToFilesWithNoRanges.get(partition).add(indexFileInfo.getFileName()); + partitionToFilesWithNoRanges.get(partition).add(indexFileInfo.getFileId()); } }); partitionToFileIndexLookUpTree.put(partition, lookUpTree); diff --git a/hoodie-client/src/main/java/com/uber/hoodie/index/bloom/ListBasedGlobalIndexFileFilter.java b/hoodie-client/src/main/java/com/uber/hoodie/index/bloom/ListBasedGlobalIndexFileFilter.java index c5d627d01..f904126f3 100644 --- a/hoodie-client/src/main/java/com/uber/hoodie/index/bloom/ListBasedGlobalIndexFileFilter.java +++ b/hoodie-client/src/main/java/com/uber/hoodie/index/bloom/ListBasedGlobalIndexFileFilter.java @@ -43,7 +43,7 @@ class ListBasedGlobalIndexFileFilter extends ListBasedIndexFileFilter { // for each candidate file in partition, that needs to be compared. for (BloomIndexFileInfo indexInfo : indexInfos) { if (shouldCompareWithFile(indexInfo, recordKey)) { - toReturn.add(indexInfo.getFileName()); + toReturn.add(indexInfo.getFileId()); } } } diff --git a/hoodie-client/src/main/java/com/uber/hoodie/index/bloom/ListBasedIndexFileFilter.java b/hoodie-client/src/main/java/com/uber/hoodie/index/bloom/ListBasedIndexFileFilter.java index 353f22dfd..b99280b98 100644 --- a/hoodie-client/src/main/java/com/uber/hoodie/index/bloom/ListBasedIndexFileFilter.java +++ b/hoodie-client/src/main/java/com/uber/hoodie/index/bloom/ListBasedIndexFileFilter.java @@ -48,7 +48,7 @@ class ListBasedIndexFileFilter implements IndexFileFilter { // for each candidate file in partition, that needs to be compared. for (BloomIndexFileInfo indexInfo : indexInfos) { if (shouldCompareWithFile(indexInfo, recordKey)) { - toReturn.add(indexInfo.getFileName()); + toReturn.add(indexInfo.getFileId()); } } } diff --git a/hoodie-client/src/main/java/com/uber/hoodie/index/hbase/HBaseIndex.java b/hoodie-client/src/main/java/com/uber/hoodie/index/hbase/HBaseIndex.java index 87f3ba504..4eb944386 100644 --- a/hoodie-client/src/main/java/com/uber/hoodie/index/hbase/HBaseIndex.java +++ b/hoodie-client/src/main/java/com/uber/hoodie/index/hbase/HBaseIndex.java @@ -29,6 +29,7 @@ import com.uber.hoodie.common.table.HoodieTableMetaClient; import com.uber.hoodie.common.table.HoodieTimeline; import com.uber.hoodie.common.table.timeline.HoodieInstant; import com.uber.hoodie.common.util.ReflectionUtils; +import com.uber.hoodie.common.util.collection.Pair; import com.uber.hoodie.config.HoodieIndexConfig; import com.uber.hoodie.config.HoodieWriteConfig; import com.uber.hoodie.exception.HoodieDependentSystemUnavailableException; @@ -60,7 +61,6 @@ import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.Function2; - import scala.Tuple2; /** @@ -123,9 +123,8 @@ public class HBaseIndex extends HoodieIndex { } @Override - public JavaPairRDD> fetchRecordLocation(JavaRDD hoodieKeys, + public JavaPairRDD>> fetchRecordLocation(JavaRDD hoodieKeys, JavaSparkContext jsc, HoodieTable hoodieTable) { - //TODO : Change/Remove filterExists in HoodieReadClient() and revisit throw new UnsupportedOperationException("HBase index does not implement check exist"); } @@ -297,7 +296,7 @@ public class HBaseIndex extends HoodieIndex { } Put put = new Put(Bytes.toBytes(rec.getRecordKey())); put.addColumn(SYSTEM_COLUMN_FAMILY, COMMIT_TS_COLUMN, - Bytes.toBytes(loc.get().getCommitTime())); + Bytes.toBytes(loc.get().getInstantTime())); put.addColumn(SYSTEM_COLUMN_FAMILY, FILE_NAME_COLUMN, Bytes.toBytes(loc.get().getFileId())); put.addColumn(SYSTEM_COLUMN_FAMILY, PARTITION_PATH_COLUMN, diff --git a/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieAppendHandle.java b/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieAppendHandle.java index 1b4081c1c..cc98f5621 100644 --- a/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieAppendHandle.java +++ b/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieAppendHandle.java @@ -59,7 +59,7 @@ import org.apache.spark.util.SizeEstimator; /** * IO Operation to append data onto an existing file. */ -public class HoodieAppendHandle extends HoodieIOHandle { +public class HoodieAppendHandle extends HoodieWriteHandle { private static Logger logger = LogManager.getLogger(HoodieAppendHandle.class); // This acts as the sequenceID for records written @@ -114,7 +114,7 @@ public class HoodieAppendHandle extends HoodieIOH RealtimeView rtView = hoodieTable.getRTFileSystemView(); Option fileSlice = rtView.getLatestFileSlice(partitionPath, fileId); // Set the base commit time as the current commitTime for new inserts into log files - String baseInstantTime = commitTime; + String baseInstantTime = instantTime; if (fileSlice.isPresent()) { baseInstantTime = fileSlice.get().getBaseInstantTime(); } else { @@ -134,11 +134,11 @@ public class HoodieAppendHandle extends HoodieIOH ((HoodieDeltaWriteStat) writeStatus.getStat()).setLogVersion(currentLogFile.getLogVersion()); ((HoodieDeltaWriteStat) writeStatus.getStat()).setLogOffset(writer.getCurrentSize()); } catch (Exception e) { - logger.error("Error in update task at commit " + commitTime, e); + logger.error("Error in update task at commit " + instantTime, e); writeStatus.setGlobalError(e); throw new HoodieUpsertException( "Failed to initialize HoodieAppendHandle for FileId: " + fileId + " on commit " - + commitTime + " on HDFS path " + hoodieTable.getMetaClient().getBasePath() + + instantTime + " on HDFS path " + hoodieTable.getMetaClient().getBasePath() + partitionPath, e); } Path path = new Path(partitionPath, writer.getLogFile().getFileName()); @@ -154,13 +154,13 @@ public class HoodieAppendHandle extends HoodieIOH if (avroRecord.isPresent()) { // Convert GenericRecord to GenericRecord with hoodie commit metadata in schema avroRecord = Optional.of(rewriteRecord((GenericRecord) avroRecord.get())); - String seqId = HoodieRecord.generateSequenceId(commitTime, TaskContext.getPartitionId(), + String seqId = HoodieRecord.generateSequenceId(instantTime, TaskContext.getPartitionId(), recordIndex.getAndIncrement()); HoodieAvroUtils .addHoodieKeyToRecord((GenericRecord) avroRecord.get(), hoodieRecord.getRecordKey(), hoodieRecord.getPartitionPath(), fileId); HoodieAvroUtils - .addCommitMetadataToRecord((GenericRecord) avroRecord.get(), commitTime, seqId); + .addCommitMetadataToRecord((GenericRecord) avroRecord.get(), instantTime, seqId); // If currentLocation is present, then this is an update if (hoodieRecord.getCurrentLocation() != null) { updatedRecordsWritten++; @@ -200,7 +200,7 @@ public class HoodieAppendHandle extends HoodieIOH private void doAppend(Map header) { try { - header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, commitTime); + header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, instantTime); header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, writerSchema.toString()); if (recordList.size() > 0) { writer = writer.appendBlock(new HoodieAvroDataBlock(recordList, header)); @@ -286,7 +286,7 @@ public class HoodieAppendHandle extends HoodieIOH private void writeToBuffer(HoodieRecord record) { // update the new location of the record, so we know where to find it next - record.setNewLocation(new HoodieRecordLocation(commitTime, fileId)); + record.setNewLocation(new HoodieRecordLocation(instantTime, fileId)); Optional indexedRecord = getIndexedRecord(record); if (indexedRecord.isPresent()) { recordList.add(indexedRecord.get()); diff --git a/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieCreateHandle.java b/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieCreateHandle.java index 0b754278f..cc4596e93 100644 --- a/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieCreateHandle.java +++ b/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieCreateHandle.java @@ -41,7 +41,7 @@ import org.apache.log4j.LogManager; import org.apache.log4j.Logger; import org.apache.spark.TaskContext; -public class HoodieCreateHandle extends HoodieIOHandle { +public class HoodieCreateHandle extends HoodieWriteHandle { private static Logger logger = LogManager.getLogger(HoodieCreateHandle.class); @@ -101,7 +101,7 @@ public class HoodieCreateHandle extends HoodieIOH IndexedRecord recordWithMetadataInSchema = rewriteRecord((GenericRecord) avroRecord.get()); storageWriter.writeAvroWithMetadata(recordWithMetadataInSchema, record); // update the new location of record, so we know where to find it next - record.setNewLocation(new HoodieRecordLocation(commitTime, writeStatus.getFileId())); + record.setNewLocation(new HoodieRecordLocation(instantTime, writeStatus.getFileId())); recordsWritten++; insertRecordsWritten++; } else { diff --git a/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieIOHandle.java b/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieIOHandle.java index 8fd039190..89d0aa259 100644 --- a/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieIOHandle.java +++ b/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieIOHandle.java @@ -18,167 +18,25 @@ package com.uber.hoodie.io; -import com.uber.hoodie.WriteStatus; -import com.uber.hoodie.common.io.storage.HoodieWrapperFileSystem; -import com.uber.hoodie.common.model.HoodieRecord; import com.uber.hoodie.common.model.HoodieRecordPayload; -import com.uber.hoodie.common.util.FSUtils; -import com.uber.hoodie.common.util.FailSafeConsistencyGuard; -import com.uber.hoodie.common.util.HoodieAvroUtils; -import com.uber.hoodie.common.util.HoodieTimer; -import com.uber.hoodie.common.util.NoOpConsistencyGuard; -import com.uber.hoodie.common.util.ReflectionUtils; import com.uber.hoodie.config.HoodieWriteConfig; -import com.uber.hoodie.exception.HoodieException; -import com.uber.hoodie.exception.HoodieIOException; import com.uber.hoodie.table.HoodieTable; -import java.io.IOException; -import java.util.Optional; -import org.apache.avro.Schema; -import org.apache.avro.generic.GenericRecord; -import org.apache.avro.generic.IndexedRecord; import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.log4j.LogManager; -import org.apache.log4j.Logger; -import org.apache.spark.TaskContext; public abstract class HoodieIOHandle { - private static Logger logger = LogManager.getLogger(HoodieIOHandle.class); - protected final String commitTime; - protected final String fileId; - protected final String writeToken; + protected final String instantTime; protected final HoodieWriteConfig config; protected final FileSystem fs; protected final HoodieTable hoodieTable; - protected final Schema originalSchema; - protected final Schema writerSchema; - protected HoodieTimer timer; - protected final WriteStatus writeStatus; - public HoodieIOHandle(HoodieWriteConfig config, String commitTime, String fileId, - HoodieTable hoodieTable) { - this.commitTime = commitTime; - this.fileId = fileId; - this.writeToken = makeSparkWriteToken(); + HoodieIOHandle(HoodieWriteConfig config, String instantTime, HoodieTable hoodieTable) { + this.instantTime = instantTime; this.config = config; - this.fs = getFileSystem(hoodieTable, config); this.hoodieTable = hoodieTable; - this.originalSchema = new Schema.Parser().parse(config.getSchema()); - this.writerSchema = createHoodieWriteSchema(originalSchema); - this.timer = new HoodieTimer().startTimer(); - this.writeStatus = (WriteStatus) ReflectionUtils.loadClass(config.getWriteStatusClassName(), - !hoodieTable.getIndex().isImplicitWithStorage(), - config.getWriteStatusFailureFraction()); + this.fs = getFileSystem(); } - private static FileSystem getFileSystem(HoodieTable hoodieTable, HoodieWriteConfig config) { - return new HoodieWrapperFileSystem(hoodieTable.getMetaClient().getFs(), config.isConsistencyCheckEnabled() - ? new FailSafeConsistencyGuard(hoodieTable.getMetaClient().getFs(), - config.getMaxConsistencyChecks(), config.getInitialConsistencyCheckIntervalMs(), - config.getMaxConsistencyCheckIntervalMs()) : new NoOpConsistencyGuard()); - } - - /** - * Generate a write token based on the currently running spark task and its place in the spark dag. - */ - private static String makeSparkWriteToken() { - return FSUtils.makeWriteToken(TaskContext.getPartitionId(), TaskContext.get().stageId(), - TaskContext.get().taskAttemptId()); - } - - public static Schema createHoodieWriteSchema(Schema originalSchema) { - return HoodieAvroUtils.addMetadataFields(originalSchema); - } - - public Path makeNewPath(String partitionPath) { - Path path = FSUtils.getPartitionPath(config.getBasePath(), partitionPath); - try { - fs.mkdirs(path); // create a new partition as needed. - } catch (IOException e) { - throw new HoodieIOException("Failed to make dir " + path, e); - } - - return new Path(path.toString(), FSUtils.makeDataFileName(commitTime, writeToken, fileId)); - } - - /** - * Creates an empty marker file corresponding to storage writer path - * @param partitionPath Partition path - */ - protected void createMarkerFile(String partitionPath) { - Path markerPath = makeNewMarkerPath(partitionPath); - try { - logger.info("Creating Marker Path=" + markerPath); - fs.create(markerPath, false).close(); - } catch (IOException e) { - throw new HoodieException("Failed to create marker file " + markerPath, e); - } - } - - /** - * THe marker path will be /.hoodie/.temp//2019/04/25/filename - * @param partitionPath - * @return - */ - private Path makeNewMarkerPath(String partitionPath) { - Path markerRootPath = new Path(hoodieTable.getMetaClient().getMarkerFolderPath(commitTime)); - Path path = FSUtils.getPartitionPath(markerRootPath, partitionPath); - try { - fs.mkdirs(path); // create a new partition as needed. - } catch (IOException e) { - throw new HoodieIOException("Failed to make dir " + path, e); - } - return new Path(path.toString(), FSUtils.makeMarkerFile(commitTime, writeToken, fileId)); - } - - public Schema getWriterSchema() { - return writerSchema; - } - - /** - * Determines whether we can accept the incoming records, into the current file, depending on - *

- * - Whether it belongs to the same partitionPath as existing records - Whether the current file - * written bytes lt max file size - */ - public boolean canWrite(HoodieRecord record) { - return false; - } - - /** - * Perform the actual writing of the given record into the backing file. - */ - public void write(HoodieRecord record, Optional insertValue) { - // NO_OP - } - - /** - * Perform the actual writing of the given record into the backing file. - */ - public void write(HoodieRecord record, Optional avroRecord, Optional exception) { - Optional recordMetadata = record.getData().getMetadata(); - if (exception.isPresent() && exception.get() instanceof Throwable) { - // Not throwing exception from here, since we don't want to fail the entire job for a single record - writeStatus.markFailure(record, exception.get(), recordMetadata); - logger.error("Error writing record " + record, exception.get()); - } else { - write(record, avroRecord); - } - } - - /** - * Rewrite the GenericRecord with the Schema containing the Hoodie Metadata fields - * @param record - * @return - */ - protected GenericRecord rewriteRecord(GenericRecord record) { - return HoodieAvroUtils.rewriteRecord(record, writerSchema); - } - - public abstract WriteStatus close(); - - public abstract WriteStatus getWriteStatus(); + protected abstract FileSystem getFileSystem(); } diff --git a/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieKeyLookupHandle.java b/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieKeyLookupHandle.java new file mode 100644 index 000000000..c5bdbf6e8 --- /dev/null +++ b/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieKeyLookupHandle.java @@ -0,0 +1,158 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.uber.hoodie.io; + +import com.uber.hoodie.common.BloomFilter; +import com.uber.hoodie.common.model.HoodieDataFile; +import com.uber.hoodie.common.model.HoodieRecordPayload; +import com.uber.hoodie.common.model.HoodieTableType; +import com.uber.hoodie.common.util.HoodieTimer; +import com.uber.hoodie.common.util.ParquetUtils; +import com.uber.hoodie.common.util.collection.Pair; +import com.uber.hoodie.config.HoodieWriteConfig; +import com.uber.hoodie.exception.HoodieIndexException; +import com.uber.hoodie.table.HoodieTable; +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; + +/** + * Takes a bunch of keys and returns ones that are present in the file group. + */ +public class HoodieKeyLookupHandle extends HoodieReadHandle { + + private static Logger logger = LogManager.getLogger(HoodieKeyLookupHandle.class); + + private final HoodieTableType tableType; + + private final BloomFilter bloomFilter; + + private final List candidateRecordKeys; + + private long totalKeysChecked; + + public HoodieKeyLookupHandle(HoodieWriteConfig config, HoodieTable hoodieTable, + Pair partitionPathFilePair) { + super(config, null, hoodieTable, partitionPathFilePair); + this.tableType = hoodieTable.getMetaClient().getTableType(); + this.candidateRecordKeys = new ArrayList<>(); + this.totalKeysChecked = 0; + HoodieTimer timer = new HoodieTimer().startTimer(); + this.bloomFilter = ParquetUtils.readBloomFilterFromParquetMetadata(hoodieTable.getHadoopConf(), + new Path(getLatestDataFile().getPath())); + logger.info(String.format("Read bloom filter from %s in %d ms", partitionPathFilePair, timer.endTimer())); + } + + /** + * Given a list of row keys and one file, return only row keys existing in that file. + */ + public static List checkCandidatesAgainstFile(Configuration configuration, + List candidateRecordKeys, Path filePath) throws HoodieIndexException { + List foundRecordKeys = new ArrayList<>(); + try { + // Load all rowKeys from the file, to double-confirm + if (!candidateRecordKeys.isEmpty()) { + HoodieTimer timer = new HoodieTimer().startTimer(); + Set fileRowKeys = ParquetUtils.filterParquetRowKeys(configuration, filePath, + new HashSet<>(candidateRecordKeys)); + foundRecordKeys.addAll(fileRowKeys); + logger.info(String.format("Checked keys against file %s, in %d ms. #candidates (%d) #found (%d)", filePath, + timer.endTimer(), candidateRecordKeys.size(), foundRecordKeys.size())); + if (logger.isDebugEnabled()) { + logger.debug("Keys matching for file " + filePath + " => " + foundRecordKeys); + } + } + } catch (Exception e) { + throw new HoodieIndexException("Error checking candidate keys against file.", e); + } + return foundRecordKeys; + } + + /** + * Adds the key for look up. + */ + public void addKey(String recordKey) { + // check record key against bloom filter of current file & add to possible keys if needed + if (bloomFilter.mightContain(recordKey)) { + if (logger.isDebugEnabled()) { + logger.debug("Record key " + recordKey + " matches bloom filter in " + partitionPathFilePair); + } + candidateRecordKeys.add(recordKey); + } + totalKeysChecked++; + } + + /** + * Of all the keys, that were added, return a list of keys that were actually found in the file group. + */ + public KeyLookupResult getLookupResult() { + if (logger.isDebugEnabled()) { + logger.debug("#The candidate row keys for " + partitionPathFilePair + " => " + candidateRecordKeys); + } + + HoodieDataFile dataFile = getLatestDataFile(); + List matchingKeys = checkCandidatesAgainstFile(hoodieTable.getHadoopConf(), candidateRecordKeys, + new Path(dataFile.getPath())); + logger.info(String.format("Total records (%d), bloom filter candidates (%d)/fp(%d), actual matches (%d)", + totalKeysChecked, candidateRecordKeys.size(), candidateRecordKeys.size() - matchingKeys.size(), + matchingKeys.size())); + return new KeyLookupResult(partitionPathFilePair.getRight(), partitionPathFilePair.getLeft(), + dataFile.getCommitTime(), matchingKeys); + } + + /** + * Encapsulates the result from a key lookup + */ + public static class KeyLookupResult { + + private final String fileId; + private final String baseInstantTime; + private final List matchingRecordKeys; + private final String partitionPath; + + public KeyLookupResult(String fileId, String partitionPath, String baseInstantTime, + List matchingRecordKeys) { + this.fileId = fileId; + this.partitionPath = partitionPath; + this.baseInstantTime = baseInstantTime; + this.matchingRecordKeys = matchingRecordKeys; + } + + public String getFileId() { + return fileId; + } + + public String getBaseInstantTime() { + return baseInstantTime; + } + + public String getPartitionPath() { + return partitionPath; + } + + public List getMatchingRecordKeys() { + return matchingRecordKeys; + } + } +} diff --git a/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieMergeHandle.java b/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieMergeHandle.java index f94e29378..b22bcb39f 100644 --- a/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieMergeHandle.java +++ b/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieMergeHandle.java @@ -28,6 +28,7 @@ import com.uber.hoodie.common.model.HoodieWriteStat; import com.uber.hoodie.common.model.HoodieWriteStat.RuntimeStats; import com.uber.hoodie.common.util.DefaultSizeEstimator; import com.uber.hoodie.common.util.FSUtils; +import com.uber.hoodie.common.util.HoodieAvroUtils; import com.uber.hoodie.common.util.HoodieRecordSizeEstimator; import com.uber.hoodie.common.util.collection.ExternalSpillableMap; import com.uber.hoodie.config.HoodieWriteConfig; @@ -42,6 +43,7 @@ import java.util.Iterator; import java.util.Map; import java.util.Optional; import java.util.Set; +import org.apache.avro.Schema; import org.apache.avro.generic.GenericRecord; import org.apache.avro.generic.IndexedRecord; import org.apache.hadoop.fs.Path; @@ -50,7 +52,7 @@ import org.apache.log4j.Logger; import org.apache.spark.TaskContext; @SuppressWarnings("Duplicates") -public class HoodieMergeHandle extends HoodieIOHandle { +public class HoodieMergeHandle extends HoodieWriteHandle { private static Logger logger = LogManager.getLogger(HoodieMergeHandle.class); @@ -85,6 +87,64 @@ public class HoodieMergeHandle extends HoodieIOHa .getPartitionPath(), dataFileToBeMerged); } + + public static Schema createHoodieWriteSchema(Schema originalSchema) { + return HoodieAvroUtils.addMetadataFields(originalSchema); + } + + public Path makeNewPath(String partitionPath) { + Path path = FSUtils.getPartitionPath(config.getBasePath(), partitionPath); + try { + fs.mkdirs(path); // create a new partition as needed. + } catch (IOException e) { + throw new HoodieIOException("Failed to make dir " + path, e); + } + + return new Path(path.toString(), FSUtils.makeDataFileName(instantTime, writeToken, fileId)); + } + + public Schema getWriterSchema() { + return writerSchema; + } + + /** + * Determines whether we can accept the incoming records, into the current file, depending on + *

+ * - Whether it belongs to the same partitionPath as existing records - Whether the current file written bytes lt max + * file size + */ + public boolean canWrite(HoodieRecord record) { + return false; + } + + /** + * Perform the actual writing of the given record into the backing file. + */ + public void write(HoodieRecord record, Optional insertValue) { + // NO_OP + } + + /** + * Perform the actual writing of the given record into the backing file. + */ + public void write(HoodieRecord record, Optional avroRecord, Optional exception) { + Optional recordMetadata = record.getData().getMetadata(); + if (exception.isPresent() && exception.get() instanceof Throwable) { + // Not throwing exception from here, since we don't want to fail the entire job for a single record + writeStatus.markFailure(record, exception.get(), recordMetadata); + logger.error("Error writing record " + record, exception.get()); + } else { + write(record, avroRecord); + } + } + + /** + * Rewrite the GenericRecord with the Schema containing the Hoodie Metadata fields + */ + protected GenericRecord rewriteRecord(GenericRecord record) { + return HoodieAvroUtils.rewriteRecord(record, writerSchema); + } + /** * Extract old file path, initialize StorageWriter and WriteStatus */ @@ -95,14 +155,14 @@ public class HoodieMergeHandle extends HoodieIOHa String latestValidFilePath = dataFileToBeMerged.getFileName(); writeStatus.getStat().setPrevCommit(FSUtils.getCommitTime(latestValidFilePath)); - HoodiePartitionMetadata partitionMetadata = new HoodiePartitionMetadata(fs, commitTime, + HoodiePartitionMetadata partitionMetadata = new HoodiePartitionMetadata(fs, instantTime, new Path(config.getBasePath()), FSUtils.getPartitionPath(config.getBasePath(), partitionPath)); partitionMetadata.trySave(TaskContext.getPartitionId()); oldFilePath = new Path( config.getBasePath() + "/" + partitionPath + "/" + latestValidFilePath); - String relativePath = new Path((partitionPath.isEmpty() ? "" : partitionPath + "/") + FSUtils - .makeDataFileName(commitTime, writeToken, fileId)).toString(); + String relativePath = new Path((partitionPath.isEmpty() ? "" : partitionPath + "/") + + FSUtils.makeDataFileName(instantTime, writeToken, fileId)).toString(); newFilePath = new Path(config.getBasePath(), relativePath); logger.info(String @@ -120,13 +180,13 @@ public class HoodieMergeHandle extends HoodieIOHa // Create the writer for writing the new version file storageWriter = HoodieStorageWriterFactory - .getStorageWriter(commitTime, newFilePath, hoodieTable, config, writerSchema); + .getStorageWriter(instantTime, newFilePath, hoodieTable, config, writerSchema); } catch (IOException io) { - logger.error("Error in update task at commit " + commitTime, io); + logger.error("Error in update task at commit " + instantTime, io); writeStatus.setGlobalError(io); throw new HoodieUpsertException( "Failed to initialize HoodieUpdateHandle for FileId: " + fileId + " on commit " - + commitTime + " on path " + hoodieTable.getMetaClient().getBasePath(), io); + + instantTime + " on path " + hoodieTable.getMetaClient().getBasePath(), io); } } @@ -148,7 +208,7 @@ public class HoodieMergeHandle extends HoodieIOHa partitionPath = record.getPartitionPath(); keyToNewRecords.put(record.getRecordKey(), record); // update the new location of the record, so we know where to find it next - record.setNewLocation(new HoodieRecordLocation(commitTime, fileId)); + record.setNewLocation(new HoodieRecordLocation(instantTime, fileId)); } logger.info("Number of entries in MemoryBasedMap => " + ((ExternalSpillableMap) keyToNewRecords).getInMemoryMapNumEntries() diff --git a/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieRangeInfoHandle.java b/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieRangeInfoHandle.java new file mode 100644 index 000000000..1f2369e6b --- /dev/null +++ b/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieRangeInfoHandle.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.uber.hoodie.io; + +import com.uber.hoodie.common.model.HoodieDataFile; +import com.uber.hoodie.common.model.HoodieRecordPayload; +import com.uber.hoodie.common.util.ParquetUtils; +import com.uber.hoodie.common.util.collection.Pair; +import com.uber.hoodie.config.HoodieWriteConfig; +import com.uber.hoodie.table.HoodieTable; +import org.apache.hadoop.fs.Path; + +/** + * Extract range information for a given file slice + */ +public class HoodieRangeInfoHandle extends HoodieReadHandle { + + public HoodieRangeInfoHandle(HoodieWriteConfig config, HoodieTable hoodieTable, + Pair partitionPathFilePair) { + super(config, null, hoodieTable, partitionPathFilePair); + } + + public String[] getMinMaxKeys() { + HoodieDataFile dataFile = getLatestDataFile(); + return ParquetUtils.readMinMaxRecordKeys(hoodieTable.getHadoopConf(), new Path(dataFile.getPath())); + } +} diff --git a/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieReadHandle.java b/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieReadHandle.java new file mode 100644 index 000000000..e0d33238a --- /dev/null +++ b/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieReadHandle.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.uber.hoodie.io; + +import com.uber.hoodie.common.model.HoodieDataFile; +import com.uber.hoodie.common.model.HoodieRecordPayload; +import com.uber.hoodie.common.util.collection.Pair; +import com.uber.hoodie.config.HoodieWriteConfig; +import com.uber.hoodie.table.HoodieTable; +import org.apache.hadoop.fs.FileSystem; + +/** + * Base class for read operations done logically on the file group. + */ +public abstract class HoodieReadHandle extends HoodieIOHandle { + + protected final Pair partitionPathFilePair; + + public HoodieReadHandle(HoodieWriteConfig config, String instantTime, HoodieTable hoodieTable, + Pair partitionPathFilePair) { + super(config, instantTime, hoodieTable); + this.partitionPathFilePair = partitionPathFilePair; + } + + @Override + protected FileSystem getFileSystem() { + return hoodieTable.getMetaClient().getFs(); + } + + + public Pair getPartitionPathFilePair() { + return partitionPathFilePair; + } + + public String getFileId() { + return partitionPathFilePair.getRight(); + } + + protected HoodieDataFile getLatestDataFile() { + return hoodieTable.getROFileSystemView() + .getLatestDataFile(partitionPathFilePair.getLeft(), partitionPathFilePair.getRight()).get(); + } +} diff --git a/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieWriteHandle.java b/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieWriteHandle.java new file mode 100644 index 000000000..1adeecc1b --- /dev/null +++ b/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieWriteHandle.java @@ -0,0 +1,183 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.uber.hoodie.io; + +import com.uber.hoodie.WriteStatus; +import com.uber.hoodie.common.io.storage.HoodieWrapperFileSystem; +import com.uber.hoodie.common.model.HoodieRecord; +import com.uber.hoodie.common.model.HoodieRecordPayload; +import com.uber.hoodie.common.util.FSUtils; +import com.uber.hoodie.common.util.FailSafeConsistencyGuard; +import com.uber.hoodie.common.util.HoodieAvroUtils; +import com.uber.hoodie.common.util.HoodieTimer; +import com.uber.hoodie.common.util.NoOpConsistencyGuard; +import com.uber.hoodie.common.util.ReflectionUtils; +import com.uber.hoodie.config.HoodieWriteConfig; +import com.uber.hoodie.exception.HoodieException; +import com.uber.hoodie.exception.HoodieIOException; +import com.uber.hoodie.table.HoodieTable; +import java.io.IOException; +import java.util.Optional; +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.generic.IndexedRecord; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; +import org.apache.spark.TaskContext; + +/** + * Base class for all write operations logically performed at the file group level. + */ +public abstract class HoodieWriteHandle extends HoodieIOHandle { + + private static Logger logger = LogManager.getLogger(HoodieWriteHandle.class); + protected final Schema originalSchema; + protected final Schema writerSchema; + protected HoodieTimer timer; + protected final WriteStatus writeStatus; + protected final String fileId; + protected final String writeToken; + + public HoodieWriteHandle(HoodieWriteConfig config, String instantTime, String fileId, HoodieTable hoodieTable) { + super(config, instantTime, hoodieTable); + this.fileId = fileId; + this.writeToken = makeSparkWriteToken(); + this.originalSchema = new Schema.Parser().parse(config.getSchema()); + this.writerSchema = createHoodieWriteSchema(originalSchema); + this.timer = new HoodieTimer().startTimer(); + this.writeStatus = (WriteStatus) ReflectionUtils.loadClass(config.getWriteStatusClassName(), + !hoodieTable.getIndex().isImplicitWithStorage(), + config.getWriteStatusFailureFraction()); + } + + private static FileSystem getFileSystem(HoodieTable hoodieTable, HoodieWriteConfig config) { + return new HoodieWrapperFileSystem(hoodieTable.getMetaClient().getFs(), config.isConsistencyCheckEnabled() + ? new FailSafeConsistencyGuard(hoodieTable.getMetaClient().getFs(), + config.getMaxConsistencyChecks(), config.getInitialConsistencyCheckIntervalMs(), + config.getMaxConsistencyCheckIntervalMs()) : new NoOpConsistencyGuard()); + } + + /** + * Generate a write token based on the currently running spark task and its place in the spark dag. + */ + private static String makeSparkWriteToken() { + return FSUtils.makeWriteToken(TaskContext.getPartitionId(), TaskContext.get().stageId(), + TaskContext.get().taskAttemptId()); + } + + public static Schema createHoodieWriteSchema(Schema originalSchema) { + return HoodieAvroUtils.addMetadataFields(originalSchema); + } + + public Path makeNewPath(String partitionPath) { + Path path = FSUtils.getPartitionPath(config.getBasePath(), partitionPath); + try { + fs.mkdirs(path); // create a new partition as needed. + } catch (IOException e) { + throw new HoodieIOException("Failed to make dir " + path, e); + } + + return new Path(path.toString(), FSUtils.makeDataFileName(instantTime, writeToken, fileId)); + } + + /** + * Creates an empty marker file corresponding to storage writer path + * + * @param partitionPath Partition path + */ + protected void createMarkerFile(String partitionPath) { + Path markerPath = makeNewMarkerPath(partitionPath); + try { + logger.info("Creating Marker Path=" + markerPath); + fs.create(markerPath, false).close(); + } catch (IOException e) { + throw new HoodieException("Failed to create marker file " + markerPath, e); + } + } + + /** + * THe marker path will be /.hoodie/.temp//2019/04/25/filename + */ + private Path makeNewMarkerPath(String partitionPath) { + Path markerRootPath = new Path(hoodieTable.getMetaClient().getMarkerFolderPath(instantTime)); + Path path = FSUtils.getPartitionPath(markerRootPath, partitionPath); + try { + fs.mkdirs(path); // create a new partition as needed. + } catch (IOException e) { + throw new HoodieIOException("Failed to make dir " + path, e); + } + return new Path(path.toString(), FSUtils.makeMarkerFile(instantTime, writeToken, fileId)); + } + + public Schema getWriterSchema() { + return writerSchema; + } + + /** + * Determines whether we can accept the incoming records, into the current file, depending on + *

+ * - Whether it belongs to the same partitionPath as existing records - Whether the current file written bytes lt max + * file size + */ + public boolean canWrite(HoodieRecord record) { + return false; + } + + /** + * Perform the actual writing of the given record into the backing file. + */ + public void write(HoodieRecord record, Optional insertValue) { + // NO_OP + } + + /** + * Perform the actual writing of the given record into the backing file. + */ + public void write(HoodieRecord record, Optional avroRecord, Optional exception) { + Optional recordMetadata = record.getData().getMetadata(); + if (exception.isPresent() && exception.get() instanceof Throwable) { + // Not throwing exception from here, since we don't want to fail the entire job for a single record + writeStatus.markFailure(record, exception.get(), recordMetadata); + logger.error("Error writing record " + record, exception.get()); + } else { + write(record, avroRecord); + } + } + + /** + * Rewrite the GenericRecord with the Schema containing the Hoodie Metadata fields + */ + protected GenericRecord rewriteRecord(GenericRecord record) { + return HoodieAvroUtils.rewriteRecord(record, writerSchema); + } + + public abstract WriteStatus close(); + + public abstract WriteStatus getWriteStatus(); + + @Override + protected FileSystem getFileSystem() { + return new HoodieWrapperFileSystem(hoodieTable.getMetaClient().getFs(), config.isConsistencyCheckEnabled() + ? new FailSafeConsistencyGuard(hoodieTable.getMetaClient().getFs(), + config.getMaxConsistencyChecks(), config.getInitialConsistencyCheckIntervalMs(), + config.getMaxConsistencyCheckIntervalMs()) : new NoOpConsistencyGuard()); + } +} diff --git a/hoodie-client/src/main/java/com/uber/hoodie/table/HoodieCopyOnWriteTable.java b/hoodie-client/src/main/java/com/uber/hoodie/table/HoodieCopyOnWriteTable.java index ac81a4350..6b9f4f6ab 100644 --- a/hoodie-client/src/main/java/com/uber/hoodie/table/HoodieCopyOnWriteTable.java +++ b/hoodie-client/src/main/java/com/uber/hoodie/table/HoodieCopyOnWriteTable.java @@ -500,7 +500,7 @@ public class HoodieCopyOnWriteTable extends Hoodi /** * Helper class for a small file's location and its actual size on disk */ - class SmallFile implements Serializable { + static class SmallFile implements Serializable { HoodieRecordLocation location; long sizeBytes; diff --git a/hoodie-client/src/main/java/com/uber/hoodie/table/WorkloadStat.java b/hoodie-client/src/main/java/com/uber/hoodie/table/WorkloadStat.java index 273998199..a613f3b61 100644 --- a/hoodie-client/src/main/java/com/uber/hoodie/table/WorkloadStat.java +++ b/hoodie-client/src/main/java/com/uber/hoodie/table/WorkloadStat.java @@ -43,7 +43,7 @@ public class WorkloadStat implements Serializable { } long addUpdates(HoodieRecordLocation location, long numUpdates) { - updateLocationToCount.put(location.getFileId(), Pair.of(location.getCommitTime(), numUpdates)); + updateLocationToCount.put(location.getFileId(), Pair.of(location.getInstantTime(), numUpdates)); return this.numUpdates += numUpdates; } diff --git a/hoodie-client/src/test/java/com/uber/hoodie/TestHoodieClientBase.java b/hoodie-client/src/test/java/com/uber/hoodie/TestHoodieClientBase.java index fb1d1eeca..ae2e57ceb 100644 --- a/hoodie-client/src/test/java/com/uber/hoodie/TestHoodieClientBase.java +++ b/hoodie-client/src/test/java/com/uber/hoodie/TestHoodieClientBase.java @@ -217,7 +217,7 @@ public class TestHoodieClientBase implements Serializable { for (HoodieRecord rec : taggedRecords) { assertTrue("Record " + rec + " found with no location.", rec.isCurrentLocationKnown()); assertEquals("All records should have commit time " + commitTime + ", since updates were made", - rec.getCurrentLocation().getCommitTime(), commitTime); + rec.getCurrentLocation().getInstantTime(), commitTime); } } diff --git a/hoodie-client/src/test/java/com/uber/hoodie/index/TestHbaseIndex.java b/hoodie-client/src/test/java/com/uber/hoodie/index/TestHbaseIndex.java index 603ef8f50..24bb4fa08 100644 --- a/hoodie-client/src/test/java/com/uber/hoodie/index/TestHbaseIndex.java +++ b/hoodie-client/src/test/java/com/uber/hoodie/index/TestHbaseIndex.java @@ -166,7 +166,7 @@ public class TestHbaseIndex { assertTrue(javaRDD.filter(record -> record.isCurrentLocationKnown()).collect().size() == 200); assertTrue(javaRDD.map(record -> record.getKey().getRecordKey()).distinct().count() == 200); assertTrue(javaRDD.filter( - record -> (record.getCurrentLocation() != null && record.getCurrentLocation().getCommitTime() + record -> (record.getCurrentLocation() != null && record.getCurrentLocation().getInstantTime() .equals(newCommitTime))).distinct().count() == 200); } diff --git a/hoodie-client/src/test/java/com/uber/hoodie/index/bloom/TestHoodieBloomIndex.java b/hoodie-client/src/test/java/com/uber/hoodie/index/bloom/TestHoodieBloomIndex.java index e62760ae1..f2e078488 100644 --- a/hoodie-client/src/test/java/com/uber/hoodie/index/bloom/TestHoodieBloomIndex.java +++ b/hoodie-client/src/test/java/com/uber/hoodie/index/bloom/TestHoodieBloomIndex.java @@ -36,8 +36,10 @@ import com.uber.hoodie.common.model.HoodieTestUtils; import com.uber.hoodie.common.table.HoodieTableMetaClient; import com.uber.hoodie.common.util.FSUtils; import com.uber.hoodie.common.util.HoodieAvroUtils; +import com.uber.hoodie.common.util.collection.Pair; import com.uber.hoodie.config.HoodieIndexConfig; import com.uber.hoodie.config.HoodieWriteConfig; +import com.uber.hoodie.io.HoodieKeyLookupHandle; import com.uber.hoodie.table.HoodieTable; import java.io.File; import java.io.IOException; @@ -200,10 +202,10 @@ public class TestHoodieBloomIndex { // no longer sorted, but should have same files. List> expected = Arrays.asList( - new Tuple2<>("2016/04/01", new BloomIndexFileInfo("2_0_20160401010101.parquet")), - new Tuple2<>("2015/03/12", new BloomIndexFileInfo("1_0_20150312101010.parquet")), - new Tuple2<>("2015/03/12", new BloomIndexFileInfo("3_0_20150312101010.parquet", "000", "000")), - new Tuple2<>("2015/03/12", new BloomIndexFileInfo("4_0_20150312101010.parquet", "001", "003"))); + new Tuple2<>("2016/04/01", new BloomIndexFileInfo("2")), + new Tuple2<>("2015/03/12", new BloomIndexFileInfo("1")), + new Tuple2<>("2015/03/12", new BloomIndexFileInfo("3", "000", "000")), + new Tuple2<>("2015/03/12", new BloomIndexFileInfo("4", "001", "003"))); assertEquals(expected, filesList); } } @@ -279,7 +281,7 @@ public class TestHoodieBloomIndex { List uuids = Arrays.asList(record1.getRecordKey(), record2.getRecordKey(), record3.getRecordKey(), record4.getRecordKey()); - List results = HoodieBloomIndexCheckFunction.checkCandidatesAgainstFile(jsc.hadoopConfiguration(), uuids, + List results = HoodieKeyLookupHandle.checkCandidatesAgainstFile(jsc.hadoopConfiguration(), uuids, new Path(basePath + "/2016/01/31/" + filename)); assertEquals(results.size(), 2); assertTrue(results.get(0).equals("1eb5b87a-1feh-4edd-87b4-6ec96dc405a0") || results.get(1).equals( @@ -417,10 +419,11 @@ public class TestHoodieBloomIndex { // Let's tag HoodieBloomIndex bloomIndex = new HoodieBloomIndex(config); - JavaPairRDD> taggedRecordRDD = bloomIndex.fetchRecordLocation(keysRDD, jsc, table); + JavaPairRDD>> taggedRecordRDD = bloomIndex + .fetchRecordLocation(keysRDD, jsc, table); // Should not find any files - for (Tuple2> record : taggedRecordRDD.collect()) { + for (Tuple2>> record : taggedRecordRDD.collect()) { assertTrue(!record._2.isPresent()); } @@ -438,18 +441,16 @@ public class TestHoodieBloomIndex { taggedRecordRDD = bloomIndex.fetchRecordLocation(keysRDD, jsc, table); // Check results - for (Tuple2> record : taggedRecordRDD.collect()) { + for (Tuple2>> record : taggedRecordRDD.collect()) { if (record._1.getRecordKey().equals("1eb5b87a-1feh-4edd-87b4-6ec96dc405a0")) { assertTrue(record._2.isPresent()); - Path path1 = new Path(record._2.get()); - assertEquals(FSUtils.getFileId(filename1), FSUtils.getFileId(path1.getName())); + assertEquals(FSUtils.getFileId(filename1), record._2.get().getRight()); } else if (record._1.getRecordKey().equals("2eb5b87b-1feu-4edd-87b4-6ec96dc405a0")) { assertTrue(record._2.isPresent()); - Path path2 = new Path(record._2.get()); if (record._1.getPartitionPath().equals("2015/01/31")) { - assertEquals(FSUtils.getFileId(filename3), FSUtils.getFileId(path2.getName())); + assertEquals(FSUtils.getFileId(filename3), record._2.get().getRight()); } else { - assertEquals(FSUtils.getFileId(filename2), FSUtils.getFileId(path2.getName())); + assertEquals(FSUtils.getFileId(filename2), record._2.get().getRight()); } } else if (record._1.getRecordKey().equals("3eb5b87c-1fej-4edd-87b4-6ec96dc405a0")) { assertTrue(!record._2.isPresent()); diff --git a/hoodie-client/src/test/java/com/uber/hoodie/index/bloom/TestHoodieGlobalBloomIndex.java b/hoodie-client/src/test/java/com/uber/hoodie/index/bloom/TestHoodieGlobalBloomIndex.java index 0a812e3f7..4048a5e3f 100644 --- a/hoodie-client/src/test/java/com/uber/hoodie/index/bloom/TestHoodieGlobalBloomIndex.java +++ b/hoodie-client/src/test/java/com/uber/hoodie/index/bloom/TestHoodieGlobalBloomIndex.java @@ -18,7 +18,11 @@ package com.uber.hoodie.index.bloom; -import static org.junit.Assert.*; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; import com.google.common.collect.Lists; import com.uber.hoodie.common.HoodieClientTestUtils; @@ -32,16 +36,17 @@ import com.uber.hoodie.common.util.FSUtils; import com.uber.hoodie.common.util.HoodieAvroUtils; import com.uber.hoodie.config.HoodieWriteConfig; import com.uber.hoodie.table.HoodieTable; - import java.io.File; import java.io.IOException; -import java.util.*; +import java.util.Arrays; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; import java.util.stream.Collectors; - import org.apache.avro.Schema; import org.apache.commons.io.IOUtils; import org.apache.hadoop.fs.FileSystem; - import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; @@ -154,20 +159,20 @@ public class TestHoodieGlobalBloomIndex { Map filesMap = toFileMap(filesList); // key ranges checks - assertNull(filesMap.get("2016/04/01/2_0_20160401010101.parquet").getMaxRecordKey()); - assertNull(filesMap.get("2016/04/01/2_0_20160401010101.parquet").getMinRecordKey()); - assertFalse(filesMap.get("2015/03/12/1_0_20150312101010.parquet").hasKeyRanges()); - assertNotNull(filesMap.get("2015/03/12/3_0_20150312101010.parquet").getMaxRecordKey()); - assertNotNull(filesMap.get("2015/03/12/3_0_20150312101010.parquet").getMinRecordKey()); - assertTrue(filesMap.get("2015/03/12/3_0_20150312101010.parquet").hasKeyRanges()); + assertNull(filesMap.get("2016/04/01/2").getMaxRecordKey()); + assertNull(filesMap.get("2016/04/01/2").getMinRecordKey()); + assertFalse(filesMap.get("2015/03/12/1").hasKeyRanges()); + assertNotNull(filesMap.get("2015/03/12/3").getMaxRecordKey()); + assertNotNull(filesMap.get("2015/03/12/3").getMinRecordKey()); + assertTrue(filesMap.get("2015/03/12/3").hasKeyRanges()); Map expected = new HashMap<>(); - expected.put("2016/04/01/2_0_20160401010101.parquet", new BloomIndexFileInfo("2_0_20160401010101.parquet")); - expected.put("2015/03/12/1_0_20150312101010.parquet", new BloomIndexFileInfo("1_0_20150312101010.parquet")); - expected.put("2015/03/12/3_0_20150312101010.parquet", - new BloomIndexFileInfo("3_0_20150312101010.parquet", "000", "000")); - expected.put("2015/03/12/4_0_20150312101010.parquet", - new BloomIndexFileInfo("4_0_20150312101010.parquet", "001", "003")); + expected.put("2016/04/01/2", new BloomIndexFileInfo("2")); + expected.put("2015/03/12/1", new BloomIndexFileInfo("1")); + expected.put("2015/03/12/3", + new BloomIndexFileInfo("3", "000", "000")); + expected.put("2015/03/12/4", + new BloomIndexFileInfo("4", "001", "003")); assertEquals(expected, filesMap); } @@ -300,7 +305,7 @@ public class TestHoodieGlobalBloomIndex { private Map toFileMap(List> filesList) { Map filesMap = new HashMap<>(); for (Tuple2 t : filesList) { - filesMap.put(t._1() + "/" + t._2().getFileName(), t._2()); + filesMap.put(t._1() + "/" + t._2().getFileId(), t._2()); } return filesMap; } diff --git a/hoodie-common/src/main/java/com/uber/hoodie/common/model/HoodieRecordLocation.java b/hoodie-common/src/main/java/com/uber/hoodie/common/model/HoodieRecordLocation.java index ddf9777ab..75f41ef1f 100644 --- a/hoodie-common/src/main/java/com/uber/hoodie/common/model/HoodieRecordLocation.java +++ b/hoodie-common/src/main/java/com/uber/hoodie/common/model/HoodieRecordLocation.java @@ -27,11 +27,11 @@ import java.io.Serializable; */ public class HoodieRecordLocation implements Serializable { - private final String commitTime; + private final String instantTime; private final String fileId; - public HoodieRecordLocation(String commitTime, String fileId) { - this.commitTime = commitTime; + public HoodieRecordLocation(String instantTime, String fileId) { + this.instantTime = instantTime; this.fileId = fileId; } @@ -44,26 +44,26 @@ public class HoodieRecordLocation implements Serializable { return false; } HoodieRecordLocation otherLoc = (HoodieRecordLocation) o; - return Objects.equal(commitTime, otherLoc.commitTime) + return Objects.equal(instantTime, otherLoc.instantTime) && Objects.equal(fileId, otherLoc.fileId); } @Override public int hashCode() { - return Objects.hashCode(commitTime, fileId); + return Objects.hashCode(instantTime, fileId); } @Override public String toString() { final StringBuilder sb = new StringBuilder("HoodieRecordLocation {"); - sb.append("commitTime=").append(commitTime).append(", "); + sb.append("instantTime=").append(instantTime).append(", "); sb.append("fileId=").append(fileId); sb.append('}'); return sb.toString(); } - public String getCommitTime() { - return commitTime; + public String getInstantTime() { + return instantTime; } public String getFileId() { diff --git a/hoodie-common/src/test/java/com/uber/hoodie/common/model/HoodieTestUtils.java b/hoodie-common/src/test/java/com/uber/hoodie/common/model/HoodieTestUtils.java index e42dbcd0b..fd821e737 100644 --- a/hoodie-common/src/test/java/com/uber/hoodie/common/model/HoodieTestUtils.java +++ b/hoodie-common/src/test/java/com/uber/hoodie/common/model/HoodieTestUtils.java @@ -346,10 +346,10 @@ public class HoodieTestUtils { try { logWriter = HoodieLogFormat.newWriterBuilder().onParentPath(new Path(basePath, partitionPath)) .withFileExtension(HoodieLogFile.DELTA_EXTENSION).withFileId(location.getFileId()) - .overBaseCommit(location.getCommitTime()).withFs(fs).build(); + .overBaseCommit(location.getInstantTime()).withFs(fs).build(); Map header = Maps.newHashMap(); - header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, location.getCommitTime()); + header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, location.getInstantTime()); header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, schema.toString()); logWriter.appendBlock(new HoodieAvroDataBlock(s.getValue().stream().map(r -> { try { diff --git a/hoodie-common/src/test/java/com/uber/hoodie/common/util/collection/TestExternalSpillableMap.java b/hoodie-common/src/test/java/com/uber/hoodie/common/util/collection/TestExternalSpillableMap.java index 572ea4dcd..1cfb25471 100644 --- a/hoodie-common/src/test/java/com/uber/hoodie/common/util/collection/TestExternalSpillableMap.java +++ b/hoodie-common/src/test/java/com/uber/hoodie/common/util/collection/TestExternalSpillableMap.java @@ -151,7 +151,7 @@ public class TestExternalSpillableMap { assert onDiskHoodieRecord.getKey().equals(records.get(dkey).getKey()); // compare the member variables of HoodieRecord not set by the constructor assert records.get(ikey).getCurrentLocation().getFileId().equals(SpillableMapTestUtils.DUMMY_FILE_ID); - assert records.get(ikey).getCurrentLocation().getCommitTime().equals(SpillableMapTestUtils.DUMMY_COMMIT_TIME); + assert records.get(ikey).getCurrentLocation().getInstantTime().equals(SpillableMapTestUtils.DUMMY_COMMIT_TIME); // test contains assertTrue(records.containsKey(ikey));