1
0

Cleanup code based on Java8 Lambdas (#84)

This commit is contained in:
vinoth chandar
2017-02-27 15:52:13 -08:00
committed by prazanna
parent c4fa585b27
commit 116a78094f
3 changed files with 151 additions and 318 deletions

View File

@@ -32,6 +32,7 @@ import com.uber.hoodie.exception.HoodieException;
import com.uber.hoodie.index.HoodieBloomIndex;
import com.uber.hoodie.table.HoodieTable;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.log4j.LogManager;
@@ -40,8 +41,6 @@ import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SQLContext;
@@ -73,8 +72,8 @@ public class HoodieReadClient implements Serializable {
private transient final FileSystem fs;
/**
* TODO: We need to persist the index type into hoodie.properties and be able to access the index
* just with a simple basepath pointing to the dataset. Until, then just always assume a
* TODO: We need to persist the index type into hoodie.properties and be able to access the
* index just with a simple basepath pointing to the dataset. Until, then just always assume a
* BloomIndex
*/
private transient final HoodieBloomIndex index;
@@ -91,11 +90,11 @@ public class HoodieReadClient implements Serializable {
this.fs = FSUtils.getFs();
// Create a Hoodie table which encapsulated the commits and files visible
this.hoodieTable = HoodieTable
.getHoodieTable(new HoodieTableMetaClient(fs, basePath, true), null);
.getHoodieTable(new HoodieTableMetaClient(fs, basePath, true), null);
this.commitTimeline = hoodieTable.getCompletedCompactionCommitTimeline();
this.index =
new HoodieBloomIndex(HoodieWriteConfig.newBuilder().withPath(basePath).build(), jsc);
new HoodieBloomIndex(HoodieWriteConfig.newBuilder().withPath(basePath).build(), jsc);
this.sqlContextOpt = Optional.absent();
}
@@ -138,19 +137,9 @@ public class HoodieReadClient implements Serializable {
JavaPairRDD<HoodieKey, Optional<String>> keyToFileRDD =
index.fetchRecordLocation(hoodieKeys, hoodieTable);
List<String> paths = keyToFileRDD
.filter(new Function<Tuple2<HoodieKey, Optional<String>>, Boolean>() {
@Override
public Boolean call(Tuple2<HoodieKey, Optional<String>> keyFileTuple) throws Exception {
return keyFileTuple._2().isPresent();
}
})
.map(new Function<Tuple2<HoodieKey, Optional<String>>, String>() {
@Override
public String call(Tuple2<HoodieKey, Optional<String>> keyFileTuple) throws Exception {
return keyFileTuple._2().get();
}
}).collect();
.filter(keyFileTuple -> keyFileTuple._2().isPresent())
.map(keyFileTuple -> keyFileTuple._2().get())
.collect();
// record locations might be same for multiple keys, so need a unique list
Set<String> uniquePaths = new HashSet<>(paths);
@@ -158,24 +147,16 @@ public class HoodieReadClient implements Serializable {
.parquet(uniquePaths.toArray(new String[uniquePaths.size()]));
StructType schema = originalDF.schema();
JavaPairRDD<HoodieKey, Row> keyRowRDD = originalDF.javaRDD()
.mapToPair(new PairFunction<Row, HoodieKey, Row>() {
@Override
public Tuple2<HoodieKey, Row> call(Row row) throws Exception {
HoodieKey key = new HoodieKey(
row.<String>getAs(HoodieRecord.RECORD_KEY_METADATA_FIELD),
row.<String>getAs(HoodieRecord.PARTITION_PATH_METADATA_FIELD));
return new Tuple2<>(key, row);
}
.mapToPair(row -> {
HoodieKey key = new HoodieKey(
row.<String>getAs(HoodieRecord.RECORD_KEY_METADATA_FIELD),
row.<String>getAs(HoodieRecord.PARTITION_PATH_METADATA_FIELD));
return new Tuple2<>(key, row);
});
// Now, we need to further filter out, for only rows that match the supplied hoodie keys
JavaRDD<Row> rowRDD = keyRowRDD.join(keyToFileRDD, parallelism)
.map(new Function<Tuple2<HoodieKey, Tuple2<Row, Optional<String>>>, Row>() {
@Override
public Row call(Tuple2<HoodieKey, Tuple2<Row, Optional<String>>> tuple) throws Exception {
return tuple._2()._1();
}
});
.map(tuple -> tuple._2()._1());
return sqlContextOpt.get().createDataFrame(rowRDD, schema);
}
@@ -197,7 +178,7 @@ public class HoodieReadClient implements Serializable {
}
List<HoodieDataFile> latestFiles = fileSystemView.getLatestVersions(fs.globStatus(new Path(path))).collect(
Collectors.toList());
Collectors.toList());
for (HoodieDataFile file : latestFiles) {
filteredPaths.add(file.getPath());
}
@@ -218,16 +199,16 @@ public class HoodieReadClient implements Serializable {
public Dataset<Row> readSince(String lastCommitTimestamp) {
List<HoodieInstant> commitsToReturn =
commitTimeline.findInstantsAfter(lastCommitTimestamp, Integer.MAX_VALUE)
.getInstants().collect(Collectors.toList());
commitTimeline.findInstantsAfter(lastCommitTimestamp, Integer.MAX_VALUE)
.getInstants().collect(Collectors.toList());
//TODO: we can potentially trim this down to only affected partitions, using CommitMetadata
try {
// Go over the commit metadata, and obtain the new files that need to be read.
HashMap<String, String> fileIdToFullPath = new HashMap<>();
for (HoodieInstant commit: commitsToReturn) {
for (HoodieInstant commit : commitsToReturn) {
HoodieCommitMetadata metadata =
HoodieCommitMetadata.fromBytes(commitTimeline.getInstantDetails(commit).get());
HoodieCommitMetadata.fromBytes(commitTimeline.getInstantDetails(commit).get());
// get files from each commit, and replace any previous versions
fileIdToFullPath.putAll(metadata.getFileIdAndFullPaths());
}
@@ -247,14 +228,14 @@ public class HoodieReadClient implements Serializable {
assertSqlContext();
String actionType = hoodieTable.getCompactedCommitActionType();
HoodieInstant commitInstant =
new HoodieInstant(false, actionType, commitTime);
new HoodieInstant(false, actionType, commitTime);
if (!commitTimeline.containsInstant(commitInstant)) {
new HoodieException("No commit exists at " + commitTime);
}
try {
HoodieCommitMetadata commitMetdata =
HoodieCommitMetadata.fromBytes(commitTimeline.getInstantDetails(commitInstant).get());
HoodieCommitMetadata.fromBytes(commitTimeline.getInstantDetails(commitInstant).get());
Collection<String> paths = commitMetdata.getFileIdAndFullPaths().values();
return sqlContextOpt.get().read()
.parquet(paths.toArray(new String[paths.size()]))
@@ -271,8 +252,7 @@ public class HoodieReadClient implements Serializable {
* not found. If the FullFilePath value is present, it is the path component (without scheme) of
* the URI underlying file
*/
public JavaPairRDD<HoodieKey, Optional<String>> checkExists(
JavaRDD<HoodieKey> hoodieKeys) {
public JavaPairRDD<HoodieKey, Optional<String>> checkExists(JavaRDD<HoodieKey> hoodieKeys) {
return index.fetchRecordLocation(hoodieKeys, hoodieTable);
}
@@ -285,12 +265,7 @@ public class HoodieReadClient implements Serializable {
*/
public JavaRDD<HoodieRecord> filterExists(JavaRDD<HoodieRecord> hoodieRecords) {
JavaRDD<HoodieRecord> recordsWithLocation = index.tagLocation(hoodieRecords, hoodieTable);
return recordsWithLocation.filter(new Function<HoodieRecord, Boolean>() {
@Override
public Boolean call(HoodieRecord v1) throws Exception {
return !v1.isCurrentLocationKnown();
}
});
return recordsWithLocation.filter(v1 -> !v1.isCurrentLocationKnown());
}
/**
@@ -308,7 +283,7 @@ public class HoodieReadClient implements Serializable {
*/
public List<String> listCommitsSince(String commitTimestamp) {
return commitTimeline.findInstantsAfter(commitTimestamp, Integer.MAX_VALUE).getInstants()
.map(HoodieInstant::getTimestamp).collect(Collectors.toList());
.map(HoodieInstant::getTimestamp).collect(Collectors.toList());
}
/**

View File

@@ -45,7 +45,6 @@ import com.uber.hoodie.table.WorkloadProfile;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.spark.Accumulator;
@@ -53,7 +52,6 @@ import org.apache.spark.Partitioner;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
@@ -66,7 +64,6 @@ import java.nio.charset.StandardCharsets;
import java.text.ParseException;
import java.util.Collections;
import java.util.Date;
import java.util.Iterator;
import java.util.List;
import java.util.Optional;
import java.util.stream.Collectors;
@@ -133,12 +130,7 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> implements Seriali
.getHoodieTable(new HoodieTableMetaClient(fs, config.getBasePath(), true), config);
JavaRDD<HoodieRecord<T>> recordsWithLocation = index.tagLocation(hoodieRecords, table);
return recordsWithLocation.filter(new Function<HoodieRecord<T>, Boolean>() {
@Override
public Boolean call(HoodieRecord<T> v1) throws Exception {
return !v1.isCurrentLocationKnown();
}
});
return recordsWithLocation.filter(v1 -> !v1.isCurrentLocationKnown());
}
/**
@@ -220,30 +212,20 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> implements Seriali
try {
// De-dupe/merge if needed
JavaRDD<HoodieRecord<T>> dedupedRecords =
combineOnCondition(config.shouldCombineBeforeInsert(), records,
config.getInsertShuffleParallelism());
combineOnCondition(config.shouldCombineBeforeInsert(), records, config.getInsertShuffleParallelism());
// Now, sort the records and line them up nicely for loading.
JavaRDD<HoodieRecord<T>> sortedRecords =
dedupedRecords.sortBy(new Function<HoodieRecord<T>, String>() {
@Override
public String call(HoodieRecord<T> record) {
JavaRDD<HoodieRecord<T>> sortedRecords = dedupedRecords
.sortBy(record -> {
// Let's use "partitionPath + key" as the sort key. Spark, will ensure
// the records split evenly across RDD partitions, such that small partitions fit
// into 1 RDD partition, while big ones spread evenly across multiple RDD partitions
return String
.format("%s+%s", record.getPartitionPath(), record.getRecordKey());
}
}, true, config.getInsertShuffleParallelism());
}, true, config.getInsertShuffleParallelism());
JavaRDD<WriteStatus> writeStatusRDD = sortedRecords
.mapPartitionsWithIndex(new BulkInsertMapFunction<T>(commitTime, config, table),
true).flatMap(new FlatMapFunction<List<WriteStatus>, WriteStatus>() {
@Override
public Iterator<WriteStatus> call(List<WriteStatus> writeStatuses)
throws Exception {
return writeStatuses.iterator();
}
});
.mapPartitionsWithIndex(new BulkInsertMapFunction<T>(commitTime, config, table), true)
.flatMap(writeStatuses -> writeStatuses.iterator());
return updateIndexAndCommitIfNeeded(writeStatusRDD, table, commitTime);
} catch (Throwable e) {
@@ -291,11 +273,8 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> implements Seriali
// partition using the insert partitioner
final Partitioner partitioner = getPartitioner(hoodieTable, isUpsert, profile);
JavaRDD<HoodieRecord<T>> partitionedRecords = partition(preppedRecords, partitioner);
JavaRDD<WriteStatus> writeStatusRDD = partitionedRecords.mapPartitionsWithIndex(
new Function2<Integer, Iterator<HoodieRecord<T>>, Iterator<List<WriteStatus>>>() {
@Override
public Iterator<List<WriteStatus>> call(Integer partition,
Iterator<HoodieRecord<T>> recordItr) throws Exception {
JavaRDD<WriteStatus> writeStatusRDD = partitionedRecords
.mapPartitionsWithIndex((partition, recordItr) -> {
if (isUpsert) {
return hoodieTable
.handleUpsertPartition(commitTime, partition, recordItr, partitioner);
@@ -303,14 +282,8 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> implements Seriali
return hoodieTable
.handleInsertPartition(commitTime, partition, recordItr, partitioner);
}
}
}, true).flatMap(new FlatMapFunction<List<WriteStatus>, WriteStatus>() {
@Override
public Iterator<WriteStatus> call(List<WriteStatus> writeStatuses)
throws Exception {
return writeStatuses.iterator();
}
});
}, true)
.flatMap(writeStatuses -> writeStatuses.iterator());
return updateIndexAndCommitIfNeeded(writeStatusRDD, hoodieTable, commitTime);
}
@@ -323,9 +296,7 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> implements Seriali
}
}
private JavaRDD<WriteStatus> updateIndexAndCommitIfNeeded(JavaRDD<WriteStatus> writeStatusRDD,
HoodieTable<T> table,
String commitTime) {
private JavaRDD<WriteStatus> updateIndexAndCommitIfNeeded(JavaRDD<WriteStatus> writeStatusRDD, HoodieTable<T> table, String commitTime) {
// Update the index back
JavaRDD<WriteStatus> statuses = index.updateLocation(writeStatusRDD, table);
// Trigger the insert and collect statuses
@@ -335,23 +306,11 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> implements Seriali
}
private JavaRDD<HoodieRecord<T>> partition(JavaRDD<HoodieRecord<T>> dedupedRecords, Partitioner partitioner) {
return dedupedRecords.mapToPair(
new PairFunction<HoodieRecord<T>, Tuple2<HoodieKey, Option<HoodieRecordLocation>>, HoodieRecord<T>>() {
@Override
public Tuple2<Tuple2<HoodieKey, Option<HoodieRecordLocation>>, HoodieRecord<T>> call(
HoodieRecord<T> record) throws Exception {
return new Tuple2<>(new Tuple2<>(record.getKey(),
Option.apply(record.getCurrentLocation())), record);
}
}).partitionBy(partitioner).map(
new Function<Tuple2<Tuple2<HoodieKey, Option<HoodieRecordLocation>>, HoodieRecord<T>>, HoodieRecord<T>>() {
@Override
public HoodieRecord<T> call(
Tuple2<Tuple2<HoodieKey, Option<HoodieRecordLocation>>, HoodieRecord<T>> tuple)
throws Exception {
return tuple._2();
}
});
return dedupedRecords
.mapToPair((PairFunction<HoodieRecord<T>, Tuple2<HoodieKey, Option<HoodieRecordLocation>>, HoodieRecord<T>>) record ->
new Tuple2<>(new Tuple2<>(record.getKey(), Option.apply(record.getCurrentLocation())), record))
.partitionBy(partitioner)
.map((Function<Tuple2<Tuple2<HoodieKey, Option<HoodieRecordLocation>>, HoodieRecord<T>>, HoodieRecord<T>>) tuple -> tuple._2());
}
/**
@@ -365,14 +324,10 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> implements Seriali
HoodieActiveTimeline activeTimeline = table.getActiveTimeline();
List<Tuple2<String, HoodieWriteStat>> stats =
writeStatuses.mapToPair(new PairFunction<WriteStatus, String, HoodieWriteStat>() {
@Override
public Tuple2<String, HoodieWriteStat> call(WriteStatus writeStatus)
throws Exception {
return new Tuple2<>(writeStatus.getPartitionPath(), writeStatus.getStat());
}
}).collect();
List<Tuple2<String, HoodieWriteStat>> stats = writeStatuses
.mapToPair((PairFunction<WriteStatus, String, HoodieWriteStat>) writeStatus ->
new Tuple2<String, HoodieWriteStat>(writeStatus.getPartitionPath(), writeStatus.getStat()))
.collect();
HoodieCommitMetadata metadata = new HoodieCommitMetadata();
for (Tuple2<String, HoodieWriteStat> stat : stats) {
@@ -460,26 +415,20 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> implements Seriali
final Accumulator<Integer> numFilesDeletedAccu = jsc.accumulator(0);
jsc.parallelize(
FSUtils.getAllPartitionPaths(fs, table.getMetaClient().getBasePath()))
.foreach(new VoidFunction<String>() {
@Override
public void call(String partitionPath) throws Exception {
// Scan all partitions files with this commit time
FileSystem fs = FSUtils.getFs();
FileStatus[] toBeDeleted =
fs.listStatus(new Path(config.getBasePath(), partitionPath),
new PathFilter() {
@Override
public boolean accept(Path path) {
return commitTime
.equals(FSUtils.getCommitTime(path.getName()));
}
.foreach((VoidFunction<String>) partitionPath -> {
// Scan all partitions files with this commit time
FileSystem fs1 = FSUtils.getFs();
FileStatus[] toBeDeleted =
fs1.listStatus(new Path(config.getBasePath(), partitionPath),
path -> {
return commitTime
.equals(FSUtils.getCommitTime(path.getName()));
});
for (FileStatus file : toBeDeleted) {
boolean success = fs.delete(file.getPath(), false);
logger.info("Delete file " + file.getPath() + "\t" + success);
if (success) {
numFilesDeletedAccu.add(1);
}
for (FileStatus file : toBeDeleted) {
boolean success = fs1.delete(file.getPath(), false);
logger.info("Delete file " + file.getPath() + "\t" + success);
if (success) {
numFilesDeletedAccu.add(1);
}
}
});
@@ -530,19 +479,12 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> implements Seriali
int cleanerParallelism = Math.min(partitionsToClean.size(), config.getCleanerParallelism());
int numFilesDeleted = jsc.parallelize(partitionsToClean, cleanerParallelism)
.map(new Function<String, Integer>() {
@Override
public Integer call(String partitionPathToClean) throws Exception {
FileSystem fs = FSUtils.getFs();
.map((Function<String, Integer>) partitionPathToClean -> {
HoodieCleaner cleaner = new HoodieCleaner(table, config);
return cleaner.clean(partitionPathToClean);
}
}).reduce(new Function2<Integer, Integer, Integer>() {
@Override
public Integer call(Integer v1, Integer v2) throws Exception {
return v1 + v2;
}
});
})
.reduce((Function2<Integer, Integer, Integer>) (v1, v2) -> v1 + v2);
logger.info("Cleaned " + numFilesDeleted + " files");
// Emit metrics (duration, numFilesDeleted) if needed
if (context != null) {

View File

@@ -18,6 +18,7 @@ package com.uber.hoodie.index;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Optional;
import com.uber.hoodie.WriteStatus;
import com.uber.hoodie.common.model.HoodieDataFile;
import com.uber.hoodie.common.model.HoodieKey;
@@ -28,21 +29,18 @@ import com.uber.hoodie.common.table.timeline.HoodieInstant;
import com.uber.hoodie.common.util.FSUtils;
import com.uber.hoodie.config.HoodieWriteConfig;
import com.uber.hoodie.table.HoodieTable;
import org.apache.hadoop.fs.Path;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.PairFlatMapFunction;
import org.apache.spark.api.java.function.PairFunction;
import scala.Tuple2;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
@@ -70,16 +68,10 @@ public class HoodieBloomIndex<T extends HoodieRecordPayload> extends HoodieIndex
// Step 1: Extract out thinner JavaPairRDD of (partitionPath, recordKey)
JavaPairRDD<String, String> partitionRecordKeyPairRDD = recordRDD
.mapToPair(new PairFunction<HoodieRecord<T>, String, String>() {
@Override
public Tuple2<String, String> call(HoodieRecord<T> record) throws Exception {
return new Tuple2<>(record.getPartitionPath(), record.getRecordKey());
}
});
.mapToPair(record -> new Tuple2<>(record.getPartitionPath(), record.getRecordKey()));
// Lookup indexes for all the partition/recordkey pair
JavaPairRDD<String, String> rowKeyFilenamePairRDD =
lookupIndex(partitionRecordKeyPairRDD, hoodieTable);
JavaPairRDD<String, String> rowKeyFilenamePairRDD = lookupIndex(partitionRecordKeyPairRDD, hoodieTable);
// Cache the result, for subsequent stages.
rowKeyFilenamePairRDD.cache();
@@ -93,86 +85,63 @@ public class HoodieBloomIndex<T extends HoodieRecordPayload> extends HoodieIndex
}
public JavaPairRDD<HoodieKey, Optional<String>> fetchRecordLocation(
JavaRDD<HoodieKey> hoodieKeys, final HoodieTable<T> hoodieTable) {
JavaRDD<HoodieKey> hoodieKeys, final HoodieTable<T> hoodieTable) {
JavaPairRDD<String, String> partitionRecordKeyPairRDD =
hoodieKeys.mapToPair(new PairFunction<HoodieKey, String, String>() {
@Override
public Tuple2<String, String> call(HoodieKey key) throws Exception {
return new Tuple2<>(key.getPartitionPath(), key.getRecordKey());
}
});
hoodieKeys.mapToPair(key -> new Tuple2<>(key.getPartitionPath(), key.getRecordKey()));
// Lookup indexes for all the partition/recordkey pair
JavaPairRDD<String, String> rowKeyFilenamePairRDD =
lookupIndex(partitionRecordKeyPairRDD, hoodieTable);
lookupIndex(partitionRecordKeyPairRDD, hoodieTable);
JavaPairRDD<String, HoodieKey> rowKeyHoodieKeyPairRDD =
hoodieKeys.mapToPair(new PairFunction<HoodieKey, String, HoodieKey>() {
@Override
public Tuple2<String, HoodieKey> call(HoodieKey key) throws Exception {
return new Tuple2<>(key.getRecordKey(), key);
}
});
hoodieKeys.mapToPair(key -> new Tuple2<>(key.getRecordKey(), key));
return rowKeyHoodieKeyPairRDD.leftOuterJoin(rowKeyFilenamePairRDD).mapToPair(
new PairFunction<Tuple2<String, Tuple2<HoodieKey, org.apache.spark.api.java.Optional<String>>>, HoodieKey, Optional<String>>() {
@Override
public Tuple2<HoodieKey, Optional<String>> call(
Tuple2<String, Tuple2<HoodieKey, org.apache.spark.api.java.Optional<String>>> keyPathTuple)
throws Exception {
return rowKeyHoodieKeyPairRDD.leftOuterJoin(rowKeyFilenamePairRDD)
.mapToPair(keyPathTuple -> {
Optional<String> recordLocationPath;
if (keyPathTuple._2._2.isPresent()) {
String fileName = keyPathTuple._2._2.get();
String partitionPath = keyPathTuple._2._1.getPartitionPath();
recordLocationPath = Optional.of(new Path(
new Path(hoodieTable.getMetaClient().getBasePath(), partitionPath),
fileName).toUri().getPath());
new Path(hoodieTable.getMetaClient().getBasePath(), partitionPath),
fileName).toUri().getPath());
} else {
recordLocationPath = Optional.absent();
}
return new Tuple2<>(keyPathTuple._2._1, recordLocationPath);
}
});
});
}
/**
* Lookup the location for each record key and return the pair<record_key,location> for all
* record keys already present and drop the record keys if not present
*
* @param partitionRecordKeyPairRDD
* @param hoodieTable
* @return
*/
private JavaPairRDD<String, String> lookupIndex(
JavaPairRDD<String, String> partitionRecordKeyPairRDD, final HoodieTable<T> hoodieTable) {
JavaPairRDD<String, String> partitionRecordKeyPairRDD, final HoodieTable<T> hoodieTable) {
// Obtain records per partition, in the incoming records
Map<String, Long> recordsPerPartition = partitionRecordKeyPairRDD.countByKey();
List<String> affectedPartitionPathList = new ArrayList<>(recordsPerPartition.keySet());
// Step 2: Load all involved files as <Partition, filename> pairs
JavaPairRDD<String, String> partitionFilePairRDD =
loadInvolvedFiles(affectedPartitionPathList, hoodieTable);
loadInvolvedFiles(affectedPartitionPathList, hoodieTable);
Map<String, Long> filesPerPartition = partitionFilePairRDD.countByKey();
// Compute total subpartitions, to split partitions into.
Map<String, Long> subpartitionCountMap =
computeSubPartitions(recordsPerPartition, filesPerPartition);
computeSubPartitions(recordsPerPartition, filesPerPartition);
// Step 3: Obtain a RDD, for each incoming record, that already exists, with the file id, that contains it.
return findMatchingFilesForRecordKeys(partitionFilePairRDD, partitionRecordKeyPairRDD,
subpartitionCountMap);
subpartitionCountMap);
}
/**
* The index lookup can be skewed in three dimensions : #files, #partitions, #records
*
* To be able to smoothly handle skews, we need to compute how to split each partitions
* into subpartitions. We do it here, in a way that keeps the amount of each Spark join
* partition to < 2GB.
*
* @param recordsPerPartition
* @param filesPerPartition
* @return
* To be able to smoothly handle skews, we need to compute how to split each partitions into
* subpartitions. We do it here, in a way that keeps the amount of each Spark join partition to
* < 2GB.
*/
private Map<String, Long> computeSubPartitions(Map<String, Long> recordsPerPartition, Map<String, Long> filesPerPartition) {
Map<String, Long> subpartitionCountMap = new HashMap<>();
@@ -180,11 +149,11 @@ public class HoodieBloomIndex<T extends HoodieRecordPayload> extends HoodieIndex
long totalFiles = 0;
for (String partitionPath : recordsPerPartition.keySet()) {
long numRecords = (Long) recordsPerPartition.get(partitionPath);
long numFiles = filesPerPartition.containsKey(partitionPath) ? (Long) filesPerPartition.get(partitionPath) : 1L;
long numRecords = recordsPerPartition.get(partitionPath);
long numFiles = filesPerPartition.containsKey(partitionPath) ? filesPerPartition.get(partitionPath) : 1L;
subpartitionCountMap.put(partitionPath, ((numFiles * numRecords) / MAX_ITEMS_PER_JOIN_PARTITION) + 1);
totalFiles += filesPerPartition.containsKey(partitionPath) ? (Long) filesPerPartition.get(partitionPath) : 0L;
totalFiles += filesPerPartition.containsKey(partitionPath) ? filesPerPartition.get(partitionPath) : 0L;
totalRecords += numRecords;
}
logger.info("TotalRecords: " + totalRecords + ", TotalFiles: " + totalFiles + ", TotalAffectedPartitions:" + recordsPerPartition.size());
@@ -198,12 +167,8 @@ public class HoodieBloomIndex<T extends HoodieRecordPayload> extends HoodieIndex
@VisibleForTesting
Map<String, Iterable<String>> getPartitionToRowKeys(JavaRDD<HoodieRecord<T>> recordRDD) {
// Have to wrap the map into a hashmap becuase of the need to braoadcast (see: http://php.sabscape.com/blog/?p=671)
return recordRDD.mapToPair(new PairFunction<HoodieRecord<T>, String, String>() {
@Override
public Tuple2<String, String> call(HoodieRecord record) {
return new Tuple2<>(record.getPartitionPath(), record.getRecordKey());
}
}).groupByKey().collectAsMap();
return recordRDD.mapToPair(record -> new Tuple2<>(record.getPartitionPath(), record.getRecordKey()))
.groupByKey().collectAsMap();
}
/**
@@ -211,25 +176,22 @@ public class HoodieBloomIndex<T extends HoodieRecordPayload> extends HoodieIndex
*/
@VisibleForTesting
JavaPairRDD<String, String> loadInvolvedFiles(List<String> partitions,
final HoodieTable<T> hoodieTable) {
final HoodieTable<T> hoodieTable) {
return jsc.parallelize(partitions, Math.max(partitions.size(), 1))
.flatMapToPair(new PairFlatMapFunction<String, String, String>() {
@Override
public Iterator<Tuple2<String, String>> call(String partitionPath) {
.flatMapToPair(partitionPath -> {
java.util.Optional<HoodieInstant> latestCommitTime =
hoodieTable.getCommitTimeline().filterCompletedInstants().lastInstant();
hoodieTable.getCommitTimeline().filterCompletedInstants().lastInstant();
List<Tuple2<String, String>> list = new ArrayList<>();
if (latestCommitTime.isPresent()) {
List<HoodieDataFile> filteredFiles =
hoodieTable.getFileSystemView().getLatestVersionInPartition(partitionPath,
latestCommitTime.get().getTimestamp()).collect(Collectors.toList());
hoodieTable.getFileSystemView().getLatestVersionInPartition(partitionPath,
latestCommitTime.get().getTimestamp()).collect(Collectors.toList());
for (HoodieDataFile file : filteredFiles) {
list.add(new Tuple2<>(partitionPath, file.getFileName()));
}
}
return list.iterator();
}
});
});
}
@@ -241,58 +203,38 @@ public class HoodieBloomIndex<T extends HoodieRecordPayload> extends HoodieIndex
/**
* When we subpartition records going into a partition, we still need to check them against
* all the files within the partition. Thus, we need to explode the (partition, file) pairs
* to (partition_subpartnum, file), so we can later join.
*
*
* @param partitionFilePairRDD
* @param subpartitionCountMap
* @return
* When we subpartition records going into a partition, we still need to check them against all
* the files within the partition. Thus, we need to explode the (partition, file) pairs to
* (partition_subpartnum, file), so we can later join.
*/
private JavaPairRDD<String, String> explodePartitionFilePairRDD(JavaPairRDD<String, String> partitionFilePairRDD,
final Map<String, Long> subpartitionCountMap) {
return partitionFilePairRDD
.map(new Function<Tuple2<String, String>, List<Tuple2<String, String>>>() {
@Override
public List<Tuple2<String, String>> call(Tuple2<String, String> partitionFilePair) throws Exception {
List<Tuple2<String, String>> explodedPartitionFilePairs = new ArrayList<>();
for (long l = 0; l < subpartitionCountMap.get(partitionFilePair._1); l++) {
explodedPartitionFilePairs.add(new Tuple2<>(
String.format("%s#%d", partitionFilePair._1, l),
partitionFilePair._2));
}
return explodedPartitionFilePairs;
.map(partitionFilePair -> {
List<Tuple2<String, String>> explodedPartitionFilePairs = new ArrayList<>();
for (long l = 0; l < subpartitionCountMap.get(partitionFilePair._1); l++) {
explodedPartitionFilePairs.add(new Tuple2<>(
String.format("%s#%d", partitionFilePair._1, l),
partitionFilePair._2));
}
return explodedPartitionFilePairs;
})
.flatMapToPair(new PairFlatMapFunction<List<Tuple2<String, String>>, String, String>() {
@Override
public Iterator<Tuple2<String, String>> call(List<Tuple2<String, String>> exploded) throws Exception {
return exploded.iterator();
}
});
.flatMapToPair(exploded -> exploded.iterator());
}
/**
* To handle tons of incoming records to a partition, we need to split them into groups or create subpartitions.
* Here, we do a simple hash mod splitting, based on computed sub partitions.
*
* @param partitionRecordKeyPairRDD
* @param subpartitionCountMap
* @return
* To handle tons of incoming records to a partition, we need to split them into groups or
* create subpartitions. Here, we do a simple hash mod splitting, based on computed sub
* partitions.
*/
private JavaPairRDD<String, String> splitPartitionRecordKeysPairRDD(JavaPairRDD<String, String> partitionRecordKeyPairRDD,
final Map<String, Long> subpartitionCountMap) {
return partitionRecordKeyPairRDD
.mapToPair(new PairFunction<Tuple2<String, String>, String, String>() {
@Override
public Tuple2<String, String> call(Tuple2<String, String> partitionRecordKeyPair) throws Exception {
long subpart = Math.abs(partitionRecordKeyPair._2.hashCode()) % subpartitionCountMap.get(partitionRecordKeyPair._1);
return new Tuple2<>(
String.format("%s#%d", partitionRecordKeyPair._1, subpart),
partitionRecordKeyPair._2);
}
.mapToPair(partitionRecordKeyPair -> {
long subpart = Math.abs(partitionRecordKeyPair._2.hashCode()) % subpartitionCountMap.get(partitionRecordKeyPair._1);
return new Tuple2<>(
String.format("%s#%d", partitionRecordKeyPair._1, subpart),
partitionRecordKeyPair._2);
});
}
@@ -300,17 +242,12 @@ public class HoodieBloomIndex<T extends HoodieRecordPayload> extends HoodieIndex
/**
* Its crucial to pick the right parallelism.
*
* totalSubPartitions : this is deemed safe limit, to be nice with Spark.
* inputParallelism : typically number of input files.
* totalSubPartitions : this is deemed safe limit, to be nice with Spark. inputParallelism :
* typically number of input files.
*
* We pick the max such that, we are always safe, but go higher if say a there are
* a lot of input files. (otherwise, we will fallback to number of partitions in input and
* end up with slow performance)
*
*
* @param inputParallelism
* @param subpartitionCountMap
* @return
* We pick the max such that, we are always safe, but go higher if say a there are a lot of
* input files. (otherwise, we will fallback to number of partitions in input and end up with
* slow performance)
*/
private int determineParallelism(int inputParallelism, final Map<String, Long> subpartitionCountMap) {
// size the join parallelism to max(total number of sub partitions, total number of files).
@@ -329,9 +266,10 @@ public class HoodieBloomIndex<T extends HoodieRecordPayload> extends HoodieIndex
/**
* Find out <RowKey, filename> pair. All workload grouped by file-level.
*
* // Join PairRDD(PartitionPath, RecordKey) and PairRDD(PartitionPath, File) & then repartition such that
// each RDD partition is a file, then for each file, we do (1) load bloom filter, (2) load rowKeys, (3) Tag rowKey
// Make sure the parallelism is atleast the groupby parallelism for tagging location
* // Join PairRDD(PartitionPath, RecordKey) and PairRDD(PartitionPath, File) & then repartition
* such that // each RDD partition is a file, then for each file, we do (1) load bloom filter,
* (2) load rowKeys, (3) Tag rowKey // Make sure the parallelism is atleast the groupby
* parallelism for tagging location
*/
private JavaPairRDD<String, String> findMatchingFilesForRecordKeys(JavaPairRDD<String, String> partitionFilePairRDD,
JavaPairRDD<String, String> partitionRecordKeyPairRDD,
@@ -344,50 +282,35 @@ public class HoodieBloomIndex<T extends HoodieRecordPayload> extends HoodieIndex
int joinParallelism = determineParallelism(partitionRecordKeyPairRDD.partitions().size(), subpartitionCountMap);
// Perform a join, to bring all the files in each subpartition ,together with the record keys to be tested against them
JavaPairRDD<String, Tuple2<String, String>> joinedTripletRDD = subpartitionFilePairRDD.join(subpartitionRecordKeyPairRDD, joinParallelism);
JavaPairRDD<String, Tuple2<String, String>> joinedTripletRDD = subpartitionFilePairRDD
.join(subpartitionRecordKeyPairRDD, joinParallelism);
// sort further based on filename, such that all checking for the file can happen within a single partition, on-the-fly
JavaPairRDD<String, Tuple2<String, HoodieKey>> fileSortedTripletRDD = joinedTripletRDD
.mapToPair(new PairFunction<Tuple2<String, Tuple2<String, String>>, String, Tuple2<String, HoodieKey>>() {
@Override
/**
* Incoming triplet is (partitionPath_subpart) => (file, recordKey)
*/
public Tuple2<String, Tuple2<String, HoodieKey>> call(Tuple2<String, Tuple2<String, String>> joinedTriplet) throws Exception {
String partitionPath = joinedTriplet._1.split("#")[0]; // throw away the subpart
String fileName = joinedTriplet._2._1;
String recordKey = joinedTriplet._2._2;
/**
* Incoming triplet is (partitionPath_subpart) => (file, recordKey)
*/
.mapToPair(joinedTriplet -> {
String partitionPath = joinedTriplet._1.split("#")[0]; // throw away the subpart
String fileName = joinedTriplet._2._1;
String recordKey = joinedTriplet._2._2;
// make a sort key as <file>#<recordKey>, to handle skews
return new Tuple2<>(String.format("%s#%s", fileName, recordKey),
new Tuple2<>(fileName, new HoodieKey(recordKey, partitionPath)));
}
// make a sort key as <file>#<recordKey>, to handle skews
return new Tuple2<>(String.format("%s#%s", fileName, recordKey),
new Tuple2<>(fileName, new HoodieKey(recordKey, partitionPath)));
}).sortByKey(true, joinParallelism);
return fileSortedTripletRDD
.mapPartitionsWithIndex(new HoodieBloomIndexCheckFunction(config.getBasePath()), true)
.flatMap(new FlatMapFunction<List<IndexLookupResult>, IndexLookupResult>() {
@Override
public Iterator<IndexLookupResult> call(List<IndexLookupResult> indexLookupResults)
throws Exception {
return indexLookupResults.iterator();
}
}).filter(new Function<IndexLookupResult, Boolean>() {
@Override
public Boolean call(IndexLookupResult lookupResult) throws Exception {
return lookupResult.getMatchingRecordKeys().size() > 0;
}
}).flatMapToPair(new PairFlatMapFunction<IndexLookupResult, String, String>() {
@Override
public Iterator<Tuple2<String, String>> call(IndexLookupResult lookupResult)
throws Exception {
.mapPartitionsWithIndex(new HoodieBloomIndexCheckFunction(config.getBasePath()), true)
.flatMap(indexLookupResults -> indexLookupResults.iterator())
.filter(lookupResult -> lookupResult.getMatchingRecordKeys().size() > 0)
.flatMapToPair(lookupResult -> {
List<Tuple2<String, String>> vals = new ArrayList<>();
for (String recordKey : lookupResult.getMatchingRecordKeys()) {
vals.add(new Tuple2<>(recordKey, lookupResult.getFileName()));
}
return vals.iterator();
}
});
});
}
/**
@@ -395,30 +318,23 @@ public class HoodieBloomIndex<T extends HoodieRecordPayload> extends HoodieIndex
*/
private JavaRDD<HoodieRecord<T>> tagLocationBacktoRecords(JavaPairRDD<String, String> rowKeyFilenamePairRDD,
JavaRDD<HoodieRecord<T>> recordRDD) {
JavaPairRDD<String, HoodieRecord<T>> rowKeyRecordPairRDD = recordRDD.mapToPair(
new PairFunction<HoodieRecord<T>, String, HoodieRecord<T>>() {
@Override
public Tuple2<String, HoodieRecord<T>> call(HoodieRecord<T> record) throws Exception {
return new Tuple2<>(record.getRecordKey(), record);
}
});
JavaPairRDD<String, HoodieRecord<T>> rowKeyRecordPairRDD = recordRDD
.mapToPair(record -> new Tuple2<>(record.getRecordKey(), record));
// Here as the recordRDD might have more data than rowKeyRDD (some rowKeys' fileId is null), so we do left outer join.
return rowKeyRecordPairRDD.leftOuterJoin(rowKeyFilenamePairRDD).values().map(
new Function<Tuple2<HoodieRecord<T>, org.apache.spark.api.java.Optional<String>>, HoodieRecord<T>>() {
@Override
public HoodieRecord<T> call(Tuple2<HoodieRecord<T>, org.apache.spark.api.java.Optional<String>> v1) throws Exception {
HoodieRecord<T> record = v1._1();
if (v1._2().isPresent()) {
String filename = v1._2().get();
if (filename != null && !filename.isEmpty()) {
record.setCurrentLocation(new HoodieRecordLocation(FSUtils.getCommitTime(filename),
FSUtils.getFileId(filename)));
}
v1 -> {
HoodieRecord<T> record = v1._1();
if (v1._2().isPresent()) {
String filename = v1._2().get();
if (filename != null && !filename.isEmpty()) {
record.setCurrentLocation(new HoodieRecordLocation(FSUtils.getCommitTime(filename),
FSUtils.getFileId(filename)));
}
return record;
}
});
return record;
}
);
}
@Override