From 116a78094fd53fc13d3b613f422a9fb220b95f1f Mon Sep 17 00:00:00 2001 From: vinoth chandar Date: Mon, 27 Feb 2017 15:52:13 -0800 Subject: [PATCH] Cleanup code based on Java8 Lambdas (#84) --- .../com/uber/hoodie/HoodieReadClient.java | 73 ++--- .../com/uber/hoodie/HoodieWriteClient.java | 134 +++------ .../uber/hoodie/index/HoodieBloomIndex.java | 262 ++++++------------ 3 files changed, 151 insertions(+), 318 deletions(-) diff --git a/hoodie-client/src/main/java/com/uber/hoodie/HoodieReadClient.java b/hoodie-client/src/main/java/com/uber/hoodie/HoodieReadClient.java index d248dad62..9703a2414 100644 --- a/hoodie-client/src/main/java/com/uber/hoodie/HoodieReadClient.java +++ b/hoodie-client/src/main/java/com/uber/hoodie/HoodieReadClient.java @@ -32,6 +32,7 @@ import com.uber.hoodie.exception.HoodieException; import com.uber.hoodie.index.HoodieBloomIndex; import com.uber.hoodie.table.HoodieTable; + import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.log4j.LogManager; @@ -40,8 +41,6 @@ import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; -import org.apache.spark.api.java.function.Function; -import org.apache.spark.api.java.function.PairFunction; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; import org.apache.spark.sql.SQLContext; @@ -73,8 +72,8 @@ public class HoodieReadClient implements Serializable { private transient final FileSystem fs; /** - * TODO: We need to persist the index type into hoodie.properties and be able to access the index - * just with a simple basepath pointing to the dataset. Until, then just always assume a + * TODO: We need to persist the index type into hoodie.properties and be able to access the + * index just with a simple basepath pointing to the dataset. Until, then just always assume a * BloomIndex */ private transient final HoodieBloomIndex index; @@ -91,11 +90,11 @@ public class HoodieReadClient implements Serializable { this.fs = FSUtils.getFs(); // Create a Hoodie table which encapsulated the commits and files visible this.hoodieTable = HoodieTable - .getHoodieTable(new HoodieTableMetaClient(fs, basePath, true), null); + .getHoodieTable(new HoodieTableMetaClient(fs, basePath, true), null); this.commitTimeline = hoodieTable.getCompletedCompactionCommitTimeline(); this.index = - new HoodieBloomIndex(HoodieWriteConfig.newBuilder().withPath(basePath).build(), jsc); + new HoodieBloomIndex(HoodieWriteConfig.newBuilder().withPath(basePath).build(), jsc); this.sqlContextOpt = Optional.absent(); } @@ -138,19 +137,9 @@ public class HoodieReadClient implements Serializable { JavaPairRDD> keyToFileRDD = index.fetchRecordLocation(hoodieKeys, hoodieTable); List paths = keyToFileRDD - .filter(new Function>, Boolean>() { - @Override - public Boolean call(Tuple2> keyFileTuple) throws Exception { - return keyFileTuple._2().isPresent(); - } - }) - .map(new Function>, String>() { - - @Override - public String call(Tuple2> keyFileTuple) throws Exception { - return keyFileTuple._2().get(); - } - }).collect(); + .filter(keyFileTuple -> keyFileTuple._2().isPresent()) + .map(keyFileTuple -> keyFileTuple._2().get()) + .collect(); // record locations might be same for multiple keys, so need a unique list Set uniquePaths = new HashSet<>(paths); @@ -158,24 +147,16 @@ public class HoodieReadClient implements Serializable { .parquet(uniquePaths.toArray(new String[uniquePaths.size()])); StructType schema = originalDF.schema(); JavaPairRDD keyRowRDD = originalDF.javaRDD() - .mapToPair(new PairFunction() { - @Override - public Tuple2 call(Row row) throws Exception { - HoodieKey key = new HoodieKey( - row.getAs(HoodieRecord.RECORD_KEY_METADATA_FIELD), - row.getAs(HoodieRecord.PARTITION_PATH_METADATA_FIELD)); - return new Tuple2<>(key, row); - } + .mapToPair(row -> { + HoodieKey key = new HoodieKey( + row.getAs(HoodieRecord.RECORD_KEY_METADATA_FIELD), + row.getAs(HoodieRecord.PARTITION_PATH_METADATA_FIELD)); + return new Tuple2<>(key, row); }); // Now, we need to further filter out, for only rows that match the supplied hoodie keys JavaRDD rowRDD = keyRowRDD.join(keyToFileRDD, parallelism) - .map(new Function>>, Row>() { - @Override - public Row call(Tuple2>> tuple) throws Exception { - return tuple._2()._1(); - } - }); + .map(tuple -> tuple._2()._1()); return sqlContextOpt.get().createDataFrame(rowRDD, schema); } @@ -197,7 +178,7 @@ public class HoodieReadClient implements Serializable { } List latestFiles = fileSystemView.getLatestVersions(fs.globStatus(new Path(path))).collect( - Collectors.toList()); + Collectors.toList()); for (HoodieDataFile file : latestFiles) { filteredPaths.add(file.getPath()); } @@ -218,16 +199,16 @@ public class HoodieReadClient implements Serializable { public Dataset readSince(String lastCommitTimestamp) { List commitsToReturn = - commitTimeline.findInstantsAfter(lastCommitTimestamp, Integer.MAX_VALUE) - .getInstants().collect(Collectors.toList()); + commitTimeline.findInstantsAfter(lastCommitTimestamp, Integer.MAX_VALUE) + .getInstants().collect(Collectors.toList()); //TODO: we can potentially trim this down to only affected partitions, using CommitMetadata try { // Go over the commit metadata, and obtain the new files that need to be read. HashMap fileIdToFullPath = new HashMap<>(); - for (HoodieInstant commit: commitsToReturn) { + for (HoodieInstant commit : commitsToReturn) { HoodieCommitMetadata metadata = - HoodieCommitMetadata.fromBytes(commitTimeline.getInstantDetails(commit).get()); + HoodieCommitMetadata.fromBytes(commitTimeline.getInstantDetails(commit).get()); // get files from each commit, and replace any previous versions fileIdToFullPath.putAll(metadata.getFileIdAndFullPaths()); } @@ -247,14 +228,14 @@ public class HoodieReadClient implements Serializable { assertSqlContext(); String actionType = hoodieTable.getCompactedCommitActionType(); HoodieInstant commitInstant = - new HoodieInstant(false, actionType, commitTime); + new HoodieInstant(false, actionType, commitTime); if (!commitTimeline.containsInstant(commitInstant)) { new HoodieException("No commit exists at " + commitTime); } try { HoodieCommitMetadata commitMetdata = - HoodieCommitMetadata.fromBytes(commitTimeline.getInstantDetails(commitInstant).get()); + HoodieCommitMetadata.fromBytes(commitTimeline.getInstantDetails(commitInstant).get()); Collection paths = commitMetdata.getFileIdAndFullPaths().values(); return sqlContextOpt.get().read() .parquet(paths.toArray(new String[paths.size()])) @@ -271,8 +252,7 @@ public class HoodieReadClient implements Serializable { * not found. If the FullFilePath value is present, it is the path component (without scheme) of * the URI underlying file */ - public JavaPairRDD> checkExists( - JavaRDD hoodieKeys) { + public JavaPairRDD> checkExists(JavaRDD hoodieKeys) { return index.fetchRecordLocation(hoodieKeys, hoodieTable); } @@ -285,12 +265,7 @@ public class HoodieReadClient implements Serializable { */ public JavaRDD filterExists(JavaRDD hoodieRecords) { JavaRDD recordsWithLocation = index.tagLocation(hoodieRecords, hoodieTable); - return recordsWithLocation.filter(new Function() { - @Override - public Boolean call(HoodieRecord v1) throws Exception { - return !v1.isCurrentLocationKnown(); - } - }); + return recordsWithLocation.filter(v1 -> !v1.isCurrentLocationKnown()); } /** @@ -308,7 +283,7 @@ public class HoodieReadClient implements Serializable { */ public List listCommitsSince(String commitTimestamp) { return commitTimeline.findInstantsAfter(commitTimestamp, Integer.MAX_VALUE).getInstants() - .map(HoodieInstant::getTimestamp).collect(Collectors.toList()); + .map(HoodieInstant::getTimestamp).collect(Collectors.toList()); } /** diff --git a/hoodie-client/src/main/java/com/uber/hoodie/HoodieWriteClient.java b/hoodie-client/src/main/java/com/uber/hoodie/HoodieWriteClient.java index 814030dec..cd916e668 100644 --- a/hoodie-client/src/main/java/com/uber/hoodie/HoodieWriteClient.java +++ b/hoodie-client/src/main/java/com/uber/hoodie/HoodieWriteClient.java @@ -45,7 +45,6 @@ import com.uber.hoodie.table.WorkloadProfile; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.fs.PathFilter; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; import org.apache.spark.Accumulator; @@ -53,7 +52,6 @@ import org.apache.spark.Partitioner; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; -import org.apache.spark.api.java.function.FlatMapFunction; import org.apache.spark.api.java.function.Function; import org.apache.spark.api.java.function.Function2; import org.apache.spark.api.java.function.PairFunction; @@ -66,7 +64,6 @@ import java.nio.charset.StandardCharsets; import java.text.ParseException; import java.util.Collections; import java.util.Date; -import java.util.Iterator; import java.util.List; import java.util.Optional; import java.util.stream.Collectors; @@ -133,12 +130,7 @@ public class HoodieWriteClient implements Seriali .getHoodieTable(new HoodieTableMetaClient(fs, config.getBasePath(), true), config); JavaRDD> recordsWithLocation = index.tagLocation(hoodieRecords, table); - return recordsWithLocation.filter(new Function, Boolean>() { - @Override - public Boolean call(HoodieRecord v1) throws Exception { - return !v1.isCurrentLocationKnown(); - } - }); + return recordsWithLocation.filter(v1 -> !v1.isCurrentLocationKnown()); } /** @@ -220,30 +212,20 @@ public class HoodieWriteClient implements Seriali try { // De-dupe/merge if needed JavaRDD> dedupedRecords = - combineOnCondition(config.shouldCombineBeforeInsert(), records, - config.getInsertShuffleParallelism()); + combineOnCondition(config.shouldCombineBeforeInsert(), records, config.getInsertShuffleParallelism()); // Now, sort the records and line them up nicely for loading. - JavaRDD> sortedRecords = - dedupedRecords.sortBy(new Function, String>() { - @Override - public String call(HoodieRecord record) { + JavaRDD> sortedRecords = dedupedRecords + .sortBy(record -> { // Let's use "partitionPath + key" as the sort key. Spark, will ensure // the records split evenly across RDD partitions, such that small partitions fit // into 1 RDD partition, while big ones spread evenly across multiple RDD partitions return String .format("%s+%s", record.getPartitionPath(), record.getRecordKey()); - } - }, true, config.getInsertShuffleParallelism()); + }, true, config.getInsertShuffleParallelism()); JavaRDD writeStatusRDD = sortedRecords - .mapPartitionsWithIndex(new BulkInsertMapFunction(commitTime, config, table), - true).flatMap(new FlatMapFunction, WriteStatus>() { - @Override - public Iterator call(List writeStatuses) - throws Exception { - return writeStatuses.iterator(); - } - }); + .mapPartitionsWithIndex(new BulkInsertMapFunction(commitTime, config, table), true) + .flatMap(writeStatuses -> writeStatuses.iterator()); return updateIndexAndCommitIfNeeded(writeStatusRDD, table, commitTime); } catch (Throwable e) { @@ -291,11 +273,8 @@ public class HoodieWriteClient implements Seriali // partition using the insert partitioner final Partitioner partitioner = getPartitioner(hoodieTable, isUpsert, profile); JavaRDD> partitionedRecords = partition(preppedRecords, partitioner); - JavaRDD writeStatusRDD = partitionedRecords.mapPartitionsWithIndex( - new Function2>, Iterator>>() { - @Override - public Iterator> call(Integer partition, - Iterator> recordItr) throws Exception { + JavaRDD writeStatusRDD = partitionedRecords + .mapPartitionsWithIndex((partition, recordItr) -> { if (isUpsert) { return hoodieTable .handleUpsertPartition(commitTime, partition, recordItr, partitioner); @@ -303,14 +282,8 @@ public class HoodieWriteClient implements Seriali return hoodieTable .handleInsertPartition(commitTime, partition, recordItr, partitioner); } - } - }, true).flatMap(new FlatMapFunction, WriteStatus>() { - @Override - public Iterator call(List writeStatuses) - throws Exception { - return writeStatuses.iterator(); - } - }); + }, true) + .flatMap(writeStatuses -> writeStatuses.iterator()); return updateIndexAndCommitIfNeeded(writeStatusRDD, hoodieTable, commitTime); } @@ -323,9 +296,7 @@ public class HoodieWriteClient implements Seriali } } - private JavaRDD updateIndexAndCommitIfNeeded(JavaRDD writeStatusRDD, - HoodieTable table, - String commitTime) { + private JavaRDD updateIndexAndCommitIfNeeded(JavaRDD writeStatusRDD, HoodieTable table, String commitTime) { // Update the index back JavaRDD statuses = index.updateLocation(writeStatusRDD, table); // Trigger the insert and collect statuses @@ -335,23 +306,11 @@ public class HoodieWriteClient implements Seriali } private JavaRDD> partition(JavaRDD> dedupedRecords, Partitioner partitioner) { - return dedupedRecords.mapToPair( - new PairFunction, Tuple2>, HoodieRecord>() { - @Override - public Tuple2>, HoodieRecord> call( - HoodieRecord record) throws Exception { - return new Tuple2<>(new Tuple2<>(record.getKey(), - Option.apply(record.getCurrentLocation())), record); - } - }).partitionBy(partitioner).map( - new Function>, HoodieRecord>, HoodieRecord>() { - @Override - public HoodieRecord call( - Tuple2>, HoodieRecord> tuple) - throws Exception { - return tuple._2(); - } - }); + return dedupedRecords + .mapToPair((PairFunction, Tuple2>, HoodieRecord>) record -> + new Tuple2<>(new Tuple2<>(record.getKey(), Option.apply(record.getCurrentLocation())), record)) + .partitionBy(partitioner) + .map((Function>, HoodieRecord>, HoodieRecord>) tuple -> tuple._2()); } /** @@ -365,14 +324,10 @@ public class HoodieWriteClient implements Seriali HoodieActiveTimeline activeTimeline = table.getActiveTimeline(); - List> stats = - writeStatuses.mapToPair(new PairFunction() { - @Override - public Tuple2 call(WriteStatus writeStatus) - throws Exception { - return new Tuple2<>(writeStatus.getPartitionPath(), writeStatus.getStat()); - } - }).collect(); + List> stats = writeStatuses + .mapToPair((PairFunction) writeStatus -> + new Tuple2(writeStatus.getPartitionPath(), writeStatus.getStat())) + .collect(); HoodieCommitMetadata metadata = new HoodieCommitMetadata(); for (Tuple2 stat : stats) { @@ -460,26 +415,20 @@ public class HoodieWriteClient implements Seriali final Accumulator numFilesDeletedAccu = jsc.accumulator(0); jsc.parallelize( FSUtils.getAllPartitionPaths(fs, table.getMetaClient().getBasePath())) - .foreach(new VoidFunction() { - @Override - public void call(String partitionPath) throws Exception { - // Scan all partitions files with this commit time - FileSystem fs = FSUtils.getFs(); - FileStatus[] toBeDeleted = - fs.listStatus(new Path(config.getBasePath(), partitionPath), - new PathFilter() { - @Override - public boolean accept(Path path) { - return commitTime - .equals(FSUtils.getCommitTime(path.getName())); - } + .foreach((VoidFunction) partitionPath -> { + // Scan all partitions files with this commit time + FileSystem fs1 = FSUtils.getFs(); + FileStatus[] toBeDeleted = + fs1.listStatus(new Path(config.getBasePath(), partitionPath), + path -> { + return commitTime + .equals(FSUtils.getCommitTime(path.getName())); }); - for (FileStatus file : toBeDeleted) { - boolean success = fs.delete(file.getPath(), false); - logger.info("Delete file " + file.getPath() + "\t" + success); - if (success) { - numFilesDeletedAccu.add(1); - } + for (FileStatus file : toBeDeleted) { + boolean success = fs1.delete(file.getPath(), false); + logger.info("Delete file " + file.getPath() + "\t" + success); + if (success) { + numFilesDeletedAccu.add(1); } } }); @@ -530,19 +479,12 @@ public class HoodieWriteClient implements Seriali int cleanerParallelism = Math.min(partitionsToClean.size(), config.getCleanerParallelism()); int numFilesDeleted = jsc.parallelize(partitionsToClean, cleanerParallelism) - .map(new Function() { - @Override - public Integer call(String partitionPathToClean) throws Exception { - FileSystem fs = FSUtils.getFs(); + .map((Function) partitionPathToClean -> { HoodieCleaner cleaner = new HoodieCleaner(table, config); return cleaner.clean(partitionPathToClean); - } - }).reduce(new Function2() { - @Override - public Integer call(Integer v1, Integer v2) throws Exception { - return v1 + v2; - } - }); + }) + .reduce((Function2) (v1, v2) -> v1 + v2); + logger.info("Cleaned " + numFilesDeleted + " files"); // Emit metrics (duration, numFilesDeleted) if needed if (context != null) { diff --git a/hoodie-client/src/main/java/com/uber/hoodie/index/HoodieBloomIndex.java b/hoodie-client/src/main/java/com/uber/hoodie/index/HoodieBloomIndex.java index 96d3270fe..248979b4a 100644 --- a/hoodie-client/src/main/java/com/uber/hoodie/index/HoodieBloomIndex.java +++ b/hoodie-client/src/main/java/com/uber/hoodie/index/HoodieBloomIndex.java @@ -18,6 +18,7 @@ package com.uber.hoodie.index; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Optional; + import com.uber.hoodie.WriteStatus; import com.uber.hoodie.common.model.HoodieDataFile; import com.uber.hoodie.common.model.HoodieKey; @@ -28,21 +29,18 @@ import com.uber.hoodie.common.table.timeline.HoodieInstant; import com.uber.hoodie.common.util.FSUtils; import com.uber.hoodie.config.HoodieWriteConfig; import com.uber.hoodie.table.HoodieTable; + import org.apache.hadoop.fs.Path; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; -import org.apache.spark.api.java.function.FlatMapFunction; -import org.apache.spark.api.java.function.Function; -import org.apache.spark.api.java.function.PairFlatMapFunction; -import org.apache.spark.api.java.function.PairFunction; + import scala.Tuple2; import java.util.ArrayList; import java.util.HashMap; -import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.stream.Collectors; @@ -70,16 +68,10 @@ public class HoodieBloomIndex extends HoodieIndex // Step 1: Extract out thinner JavaPairRDD of (partitionPath, recordKey) JavaPairRDD partitionRecordKeyPairRDD = recordRDD - .mapToPair(new PairFunction, String, String>() { - @Override - public Tuple2 call(HoodieRecord record) throws Exception { - return new Tuple2<>(record.getPartitionPath(), record.getRecordKey()); - } - }); + .mapToPair(record -> new Tuple2<>(record.getPartitionPath(), record.getRecordKey())); // Lookup indexes for all the partition/recordkey pair - JavaPairRDD rowKeyFilenamePairRDD = - lookupIndex(partitionRecordKeyPairRDD, hoodieTable); + JavaPairRDD rowKeyFilenamePairRDD = lookupIndex(partitionRecordKeyPairRDD, hoodieTable); // Cache the result, for subsequent stages. rowKeyFilenamePairRDD.cache(); @@ -93,86 +85,63 @@ public class HoodieBloomIndex extends HoodieIndex } public JavaPairRDD> fetchRecordLocation( - JavaRDD hoodieKeys, final HoodieTable hoodieTable) { + JavaRDD hoodieKeys, final HoodieTable hoodieTable) { JavaPairRDD partitionRecordKeyPairRDD = - hoodieKeys.mapToPair(new PairFunction() { - @Override - public Tuple2 call(HoodieKey key) throws Exception { - return new Tuple2<>(key.getPartitionPath(), key.getRecordKey()); - } - }); + hoodieKeys.mapToPair(key -> new Tuple2<>(key.getPartitionPath(), key.getRecordKey())); // Lookup indexes for all the partition/recordkey pair JavaPairRDD rowKeyFilenamePairRDD = - lookupIndex(partitionRecordKeyPairRDD, hoodieTable); + lookupIndex(partitionRecordKeyPairRDD, hoodieTable); JavaPairRDD rowKeyHoodieKeyPairRDD = - hoodieKeys.mapToPair(new PairFunction() { - @Override - public Tuple2 call(HoodieKey key) throws Exception { - return new Tuple2<>(key.getRecordKey(), key); - } - }); + hoodieKeys.mapToPair(key -> new Tuple2<>(key.getRecordKey(), key)); - return rowKeyHoodieKeyPairRDD.leftOuterJoin(rowKeyFilenamePairRDD).mapToPair( - new PairFunction>>, HoodieKey, Optional>() { - @Override - public Tuple2> call( - Tuple2>> keyPathTuple) - throws Exception { + return rowKeyHoodieKeyPairRDD.leftOuterJoin(rowKeyFilenamePairRDD) + .mapToPair(keyPathTuple -> { Optional recordLocationPath; if (keyPathTuple._2._2.isPresent()) { String fileName = keyPathTuple._2._2.get(); String partitionPath = keyPathTuple._2._1.getPartitionPath(); recordLocationPath = Optional.of(new Path( - new Path(hoodieTable.getMetaClient().getBasePath(), partitionPath), - fileName).toUri().getPath()); + new Path(hoodieTable.getMetaClient().getBasePath(), partitionPath), + fileName).toUri().getPath()); } else { recordLocationPath = Optional.absent(); } return new Tuple2<>(keyPathTuple._2._1, recordLocationPath); - } - }); + }); } /** * Lookup the location for each record key and return the pair for all * record keys already present and drop the record keys if not present - * - * @param partitionRecordKeyPairRDD - * @param hoodieTable - * @return */ private JavaPairRDD lookupIndex( - JavaPairRDD partitionRecordKeyPairRDD, final HoodieTable hoodieTable) { + JavaPairRDD partitionRecordKeyPairRDD, final HoodieTable hoodieTable) { // Obtain records per partition, in the incoming records Map recordsPerPartition = partitionRecordKeyPairRDD.countByKey(); List affectedPartitionPathList = new ArrayList<>(recordsPerPartition.keySet()); // Step 2: Load all involved files as pairs JavaPairRDD partitionFilePairRDD = - loadInvolvedFiles(affectedPartitionPathList, hoodieTable); + loadInvolvedFiles(affectedPartitionPathList, hoodieTable); Map filesPerPartition = partitionFilePairRDD.countByKey(); // Compute total subpartitions, to split partitions into. Map subpartitionCountMap = - computeSubPartitions(recordsPerPartition, filesPerPartition); + computeSubPartitions(recordsPerPartition, filesPerPartition); // Step 3: Obtain a RDD, for each incoming record, that already exists, with the file id, that contains it. return findMatchingFilesForRecordKeys(partitionFilePairRDD, partitionRecordKeyPairRDD, - subpartitionCountMap); + subpartitionCountMap); } /** * The index lookup can be skewed in three dimensions : #files, #partitions, #records * - * To be able to smoothly handle skews, we need to compute how to split each partitions - * into subpartitions. We do it here, in a way that keeps the amount of each Spark join - * partition to < 2GB. - * - * @param recordsPerPartition - * @param filesPerPartition - * @return + * To be able to smoothly handle skews, we need to compute how to split each partitions into + * subpartitions. We do it here, in a way that keeps the amount of each Spark join partition to + * < 2GB. */ private Map computeSubPartitions(Map recordsPerPartition, Map filesPerPartition) { Map subpartitionCountMap = new HashMap<>(); @@ -180,11 +149,11 @@ public class HoodieBloomIndex extends HoodieIndex long totalFiles = 0; for (String partitionPath : recordsPerPartition.keySet()) { - long numRecords = (Long) recordsPerPartition.get(partitionPath); - long numFiles = filesPerPartition.containsKey(partitionPath) ? (Long) filesPerPartition.get(partitionPath) : 1L; + long numRecords = recordsPerPartition.get(partitionPath); + long numFiles = filesPerPartition.containsKey(partitionPath) ? filesPerPartition.get(partitionPath) : 1L; subpartitionCountMap.put(partitionPath, ((numFiles * numRecords) / MAX_ITEMS_PER_JOIN_PARTITION) + 1); - totalFiles += filesPerPartition.containsKey(partitionPath) ? (Long) filesPerPartition.get(partitionPath) : 0L; + totalFiles += filesPerPartition.containsKey(partitionPath) ? filesPerPartition.get(partitionPath) : 0L; totalRecords += numRecords; } logger.info("TotalRecords: " + totalRecords + ", TotalFiles: " + totalFiles + ", TotalAffectedPartitions:" + recordsPerPartition.size()); @@ -198,12 +167,8 @@ public class HoodieBloomIndex extends HoodieIndex @VisibleForTesting Map> getPartitionToRowKeys(JavaRDD> recordRDD) { // Have to wrap the map into a hashmap becuase of the need to braoadcast (see: http://php.sabscape.com/blog/?p=671) - return recordRDD.mapToPair(new PairFunction, String, String>() { - @Override - public Tuple2 call(HoodieRecord record) { - return new Tuple2<>(record.getPartitionPath(), record.getRecordKey()); - } - }).groupByKey().collectAsMap(); + return recordRDD.mapToPair(record -> new Tuple2<>(record.getPartitionPath(), record.getRecordKey())) + .groupByKey().collectAsMap(); } /** @@ -211,25 +176,22 @@ public class HoodieBloomIndex extends HoodieIndex */ @VisibleForTesting JavaPairRDD loadInvolvedFiles(List partitions, - final HoodieTable hoodieTable) { + final HoodieTable hoodieTable) { return jsc.parallelize(partitions, Math.max(partitions.size(), 1)) - .flatMapToPair(new PairFlatMapFunction() { - @Override - public Iterator> call(String partitionPath) { + .flatMapToPair(partitionPath -> { java.util.Optional latestCommitTime = - hoodieTable.getCommitTimeline().filterCompletedInstants().lastInstant(); + hoodieTable.getCommitTimeline().filterCompletedInstants().lastInstant(); List> list = new ArrayList<>(); if (latestCommitTime.isPresent()) { List filteredFiles = - hoodieTable.getFileSystemView().getLatestVersionInPartition(partitionPath, - latestCommitTime.get().getTimestamp()).collect(Collectors.toList()); + hoodieTable.getFileSystemView().getLatestVersionInPartition(partitionPath, + latestCommitTime.get().getTimestamp()).collect(Collectors.toList()); for (HoodieDataFile file : filteredFiles) { list.add(new Tuple2<>(partitionPath, file.getFileName())); } } return list.iterator(); - } - }); + }); } @@ -241,58 +203,38 @@ public class HoodieBloomIndex extends HoodieIndex /** - * When we subpartition records going into a partition, we still need to check them against - * all the files within the partition. Thus, we need to explode the (partition, file) pairs - * to (partition_subpartnum, file), so we can later join. - * - * - * @param partitionFilePairRDD - * @param subpartitionCountMap - * @return + * When we subpartition records going into a partition, we still need to check them against all + * the files within the partition. Thus, we need to explode the (partition, file) pairs to + * (partition_subpartnum, file), so we can later join. */ private JavaPairRDD explodePartitionFilePairRDD(JavaPairRDD partitionFilePairRDD, final Map subpartitionCountMap) { return partitionFilePairRDD - .map(new Function, List>>() { - @Override - public List> call(Tuple2 partitionFilePair) throws Exception { - List> explodedPartitionFilePairs = new ArrayList<>(); - for (long l = 0; l < subpartitionCountMap.get(partitionFilePair._1); l++) { - explodedPartitionFilePairs.add(new Tuple2<>( - String.format("%s#%d", partitionFilePair._1, l), - partitionFilePair._2)); - } - return explodedPartitionFilePairs; + .map(partitionFilePair -> { + List> explodedPartitionFilePairs = new ArrayList<>(); + for (long l = 0; l < subpartitionCountMap.get(partitionFilePair._1); l++) { + explodedPartitionFilePairs.add(new Tuple2<>( + String.format("%s#%d", partitionFilePair._1, l), + partitionFilePair._2)); } + return explodedPartitionFilePairs; }) - .flatMapToPair(new PairFlatMapFunction>, String, String>() { - @Override - public Iterator> call(List> exploded) throws Exception { - return exploded.iterator(); - } - }); - + .flatMapToPair(exploded -> exploded.iterator()); } /** - * To handle tons of incoming records to a partition, we need to split them into groups or create subpartitions. - * Here, we do a simple hash mod splitting, based on computed sub partitions. - * - * @param partitionRecordKeyPairRDD - * @param subpartitionCountMap - * @return + * To handle tons of incoming records to a partition, we need to split them into groups or + * create subpartitions. Here, we do a simple hash mod splitting, based on computed sub + * partitions. */ private JavaPairRDD splitPartitionRecordKeysPairRDD(JavaPairRDD partitionRecordKeyPairRDD, final Map subpartitionCountMap) { return partitionRecordKeyPairRDD - .mapToPair(new PairFunction, String, String>() { - @Override - public Tuple2 call(Tuple2 partitionRecordKeyPair) throws Exception { - long subpart = Math.abs(partitionRecordKeyPair._2.hashCode()) % subpartitionCountMap.get(partitionRecordKeyPair._1); - return new Tuple2<>( - String.format("%s#%d", partitionRecordKeyPair._1, subpart), - partitionRecordKeyPair._2); - } + .mapToPair(partitionRecordKeyPair -> { + long subpart = Math.abs(partitionRecordKeyPair._2.hashCode()) % subpartitionCountMap.get(partitionRecordKeyPair._1); + return new Tuple2<>( + String.format("%s#%d", partitionRecordKeyPair._1, subpart), + partitionRecordKeyPair._2); }); } @@ -300,17 +242,12 @@ public class HoodieBloomIndex extends HoodieIndex /** * Its crucial to pick the right parallelism. * - * totalSubPartitions : this is deemed safe limit, to be nice with Spark. - * inputParallelism : typically number of input files. + * totalSubPartitions : this is deemed safe limit, to be nice with Spark. inputParallelism : + * typically number of input files. * - * We pick the max such that, we are always safe, but go higher if say a there are - * a lot of input files. (otherwise, we will fallback to number of partitions in input and - * end up with slow performance) - * - * - * @param inputParallelism - * @param subpartitionCountMap - * @return + * We pick the max such that, we are always safe, but go higher if say a there are a lot of + * input files. (otherwise, we will fallback to number of partitions in input and end up with + * slow performance) */ private int determineParallelism(int inputParallelism, final Map subpartitionCountMap) { // size the join parallelism to max(total number of sub partitions, total number of files). @@ -329,9 +266,10 @@ public class HoodieBloomIndex extends HoodieIndex /** * Find out pair. All workload grouped by file-level. * - * // Join PairRDD(PartitionPath, RecordKey) and PairRDD(PartitionPath, File) & then repartition such that - // each RDD partition is a file, then for each file, we do (1) load bloom filter, (2) load rowKeys, (3) Tag rowKey - // Make sure the parallelism is atleast the groupby parallelism for tagging location + * // Join PairRDD(PartitionPath, RecordKey) and PairRDD(PartitionPath, File) & then repartition + * such that // each RDD partition is a file, then for each file, we do (1) load bloom filter, + * (2) load rowKeys, (3) Tag rowKey // Make sure the parallelism is atleast the groupby + * parallelism for tagging location */ private JavaPairRDD findMatchingFilesForRecordKeys(JavaPairRDD partitionFilePairRDD, JavaPairRDD partitionRecordKeyPairRDD, @@ -344,50 +282,35 @@ public class HoodieBloomIndex extends HoodieIndex int joinParallelism = determineParallelism(partitionRecordKeyPairRDD.partitions().size(), subpartitionCountMap); // Perform a join, to bring all the files in each subpartition ,together with the record keys to be tested against them - JavaPairRDD> joinedTripletRDD = subpartitionFilePairRDD.join(subpartitionRecordKeyPairRDD, joinParallelism); + JavaPairRDD> joinedTripletRDD = subpartitionFilePairRDD + .join(subpartitionRecordKeyPairRDD, joinParallelism); // sort further based on filename, such that all checking for the file can happen within a single partition, on-the-fly JavaPairRDD> fileSortedTripletRDD = joinedTripletRDD - .mapToPair(new PairFunction>, String, Tuple2>() { - @Override - /** - * Incoming triplet is (partitionPath_subpart) => (file, recordKey) - */ - public Tuple2> call(Tuple2> joinedTriplet) throws Exception { - String partitionPath = joinedTriplet._1.split("#")[0]; // throw away the subpart - String fileName = joinedTriplet._2._1; - String recordKey = joinedTriplet._2._2; + /** + * Incoming triplet is (partitionPath_subpart) => (file, recordKey) + */ + .mapToPair(joinedTriplet -> { + String partitionPath = joinedTriplet._1.split("#")[0]; // throw away the subpart + String fileName = joinedTriplet._2._1; + String recordKey = joinedTriplet._2._2; - // make a sort key as #, to handle skews - return new Tuple2<>(String.format("%s#%s", fileName, recordKey), - new Tuple2<>(fileName, new HoodieKey(recordKey, partitionPath))); - } + // make a sort key as #, to handle skews + return new Tuple2<>(String.format("%s#%s", fileName, recordKey), + new Tuple2<>(fileName, new HoodieKey(recordKey, partitionPath))); }).sortByKey(true, joinParallelism); return fileSortedTripletRDD - .mapPartitionsWithIndex(new HoodieBloomIndexCheckFunction(config.getBasePath()), true) - .flatMap(new FlatMapFunction, IndexLookupResult>() { - @Override - public Iterator call(List indexLookupResults) - throws Exception { - return indexLookupResults.iterator(); - } - }).filter(new Function() { - @Override - public Boolean call(IndexLookupResult lookupResult) throws Exception { - return lookupResult.getMatchingRecordKeys().size() > 0; - } - }).flatMapToPair(new PairFlatMapFunction() { - @Override - public Iterator> call(IndexLookupResult lookupResult) - throws Exception { + .mapPartitionsWithIndex(new HoodieBloomIndexCheckFunction(config.getBasePath()), true) + .flatMap(indexLookupResults -> indexLookupResults.iterator()) + .filter(lookupResult -> lookupResult.getMatchingRecordKeys().size() > 0) + .flatMapToPair(lookupResult -> { List> vals = new ArrayList<>(); for (String recordKey : lookupResult.getMatchingRecordKeys()) { vals.add(new Tuple2<>(recordKey, lookupResult.getFileName())); } return vals.iterator(); - } - }); + }); } /** @@ -395,30 +318,23 @@ public class HoodieBloomIndex extends HoodieIndex */ private JavaRDD> tagLocationBacktoRecords(JavaPairRDD rowKeyFilenamePairRDD, JavaRDD> recordRDD) { - JavaPairRDD> rowKeyRecordPairRDD = recordRDD.mapToPair( - new PairFunction, String, HoodieRecord>() { - @Override - public Tuple2> call(HoodieRecord record) throws Exception { - return new Tuple2<>(record.getRecordKey(), record); - } - }); + JavaPairRDD> rowKeyRecordPairRDD = recordRDD + .mapToPair(record -> new Tuple2<>(record.getRecordKey(), record)); // Here as the recordRDD might have more data than rowKeyRDD (some rowKeys' fileId is null), so we do left outer join. return rowKeyRecordPairRDD.leftOuterJoin(rowKeyFilenamePairRDD).values().map( - new Function, org.apache.spark.api.java.Optional>, HoodieRecord>() { - @Override - public HoodieRecord call(Tuple2, org.apache.spark.api.java.Optional> v1) throws Exception { - HoodieRecord record = v1._1(); - if (v1._2().isPresent()) { - String filename = v1._2().get(); - if (filename != null && !filename.isEmpty()) { - record.setCurrentLocation(new HoodieRecordLocation(FSUtils.getCommitTime(filename), - FSUtils.getFileId(filename))); - } + v1 -> { + HoodieRecord record = v1._1(); + if (v1._2().isPresent()) { + String filename = v1._2().get(); + if (filename != null && !filename.isEmpty()) { + record.setCurrentLocation(new HoodieRecordLocation(FSUtils.getCommitTime(filename), + FSUtils.getFileId(filename))); } - return record; } - }); + return record; + } + ); } @Override