1
0

Moving to Spark 2.1.0

This commit is contained in:
Prasanna Rajaperumal
2017-02-20 16:47:52 -08:00
parent be1dd9444f
commit 0e234ac0ef
6 changed files with 35 additions and 33 deletions

View File

@@ -40,7 +40,7 @@ import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.sql.types.StructType;
@@ -122,7 +122,7 @@ public class HoodieReadClient implements Serializable {
*
* @return a dataframe
*/
public DataFrame read(JavaRDD<HoodieKey> hoodieKeys, int parallelism)
public Dataset<Row> read(JavaRDD<HoodieKey> hoodieKeys, int parallelism)
throws Exception {
assertSqlContext();
@@ -145,7 +145,7 @@ public class HoodieReadClient implements Serializable {
// record locations might be same for multiple keys, so need a unique list
Set<String> uniquePaths = new HashSet<>(paths);
DataFrame originalDF = sqlContextOpt.get().read()
Dataset<Row> originalDF = sqlContextOpt.get().read()
.parquet(uniquePaths.toArray(new String[uniquePaths.size()]));
StructType schema = originalDF.schema();
JavaPairRDD<HoodieKey, Row> keyRowRDD = originalDF.javaRDD()
@@ -174,7 +174,7 @@ public class HoodieReadClient implements Serializable {
/**
* Reads the paths under the a hoodie dataset out as a DataFrame
*/
public DataFrame read(String... paths) {
public Dataset<Row> read(String... paths) {
assertSqlContext();
List<String> filteredPaths = new ArrayList<>();
try {
@@ -203,7 +203,7 @@ public class HoodieReadClient implements Serializable {
* If you made a prior call to {@link HoodieReadClient#latestCommit()}, it gives you all data in
* the time window (commitTimestamp, latestCommit)
*/
public DataFrame readSince(String lastCommitTimestamp) {
public Dataset<Row> readSince(String lastCommitTimestamp) {
List<String> commitsToReturn = metadata.findCommitsAfter(lastCommitTimestamp, Integer.MAX_VALUE);
//TODO: we can potentially trim this down to only affected partitions, using CommitMetadata
@@ -227,7 +227,7 @@ public class HoodieReadClient implements Serializable {
/**
* Obtain
*/
public DataFrame readCommit(String commitTime) {
public Dataset<Row> readCommit(String commitTime) {
assertSqlContext();
HoodieCommits commits = metadata.getAllCommits();
if (!commits.contains(commitTime)) {

View File

@@ -283,9 +283,9 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> implements Seriali
}
}, true).flatMap(new FlatMapFunction<List<WriteStatus>, WriteStatus>() {
@Override
public Iterable<WriteStatus> call(List<WriteStatus> writeStatuses)
public Iterator<WriteStatus> call(List<WriteStatus> writeStatuses)
throws Exception {
return writeStatuses;
return writeStatuses.iterator();
}
});
@@ -332,9 +332,9 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> implements Seriali
.mapPartitionsWithIndex(new BulkInsertMapFunction<T>(commitTime, config, metadata),
true).flatMap(new FlatMapFunction<List<WriteStatus>, WriteStatus>() {
@Override
public Iterable<WriteStatus> call(List<WriteStatus> writeStatuses)
public Iterator<WriteStatus> call(List<WriteStatus> writeStatuses)
throws Exception {
return writeStatuses;
return writeStatuses.iterator();
}
});

View File

@@ -115,10 +115,10 @@ public class HoodieBloomIndex<T extends HoodieRecordPayload> extends HoodieIndex
});
return rowKeyHoodieKeyPairRDD.leftOuterJoin(rowKeyFilenamePairRDD).mapToPair(
new PairFunction<Tuple2<String, Tuple2<HoodieKey, Optional<String>>>, HoodieKey, Optional<String>>() {
new PairFunction<Tuple2<String, Tuple2<HoodieKey, org.apache.spark.api.java.Optional<String>>>, HoodieKey, Optional<String>>() {
@Override
public Tuple2<HoodieKey, Optional<String>> call(
Tuple2<String, Tuple2<HoodieKey, Optional<String>>> keyPathTuple)
Tuple2<String, Tuple2<HoodieKey, org.apache.spark.api.java.Optional<String>>> keyPathTuple)
throws Exception {
Optional<String> recordLocationPath;
if (keyPathTuple._2._2.isPresent()) {
@@ -146,13 +146,13 @@ public class HoodieBloomIndex<T extends HoodieRecordPayload> extends HoodieIndex
private JavaPairRDD<String, String> lookupIndex(
JavaPairRDD<String, String> partitionRecordKeyPairRDD, final HoodieTableMetadata metadata) {
// Obtain records per partition, in the incoming records
Map<String, Object> recordsPerPartition = partitionRecordKeyPairRDD.countByKey();
Map<String, Long> recordsPerPartition = partitionRecordKeyPairRDD.countByKey();
List<String> affectedPartitionPathList = new ArrayList<>(recordsPerPartition.keySet());
// Step 2: Load all involved files as <Partition, filename> pairs
JavaPairRDD<String, String> partitionFilePairRDD =
loadInvolvedFiles(affectedPartitionPathList, metadata);
Map<String, Object> filesPerPartition = partitionFilePairRDD.countByKey();
Map<String, Long> filesPerPartition = partitionFilePairRDD.countByKey();
// Compute total subpartitions, to split partitions into.
Map<String, Long> subpartitionCountMap =
@@ -174,7 +174,7 @@ public class HoodieBloomIndex<T extends HoodieRecordPayload> extends HoodieIndex
* @param filesPerPartition
* @return
*/
private Map<String, Long> computeSubPartitions(Map<String, Object> recordsPerPartition, Map<String, Object> filesPerPartition) {
private Map<String, Long> computeSubPartitions(Map<String, Long> recordsPerPartition, Map<String, Long> filesPerPartition) {
Map<String, Long> subpartitionCountMap = new HashMap<>();
long totalRecords = 0;
long totalFiles = 0;
@@ -214,7 +214,7 @@ public class HoodieBloomIndex<T extends HoodieRecordPayload> extends HoodieIndex
return jsc.parallelize(partitions, Math.max(partitions.size(), 1))
.flatMapToPair(new PairFlatMapFunction<String, String, String>() {
@Override
public Iterable<Tuple2<String, String>> call(String partitionPath) {
public Iterator<Tuple2<String, String>> call(String partitionPath) {
FileSystem fs = FSUtils.getFs();
String latestCommitTime = metadata.getAllCommits().lastCommit();
FileStatus[] filteredStatus = metadata.getLatestVersionInPartition(fs, partitionPath, latestCommitTime);
@@ -222,7 +222,7 @@ public class HoodieBloomIndex<T extends HoodieRecordPayload> extends HoodieIndex
for (FileStatus fileStatus : filteredStatus) {
list.add(new Tuple2<>(partitionPath, fileStatus.getPath().getName()));
}
return list;
return list.iterator();
}
});
}
@@ -261,8 +261,8 @@ public class HoodieBloomIndex<T extends HoodieRecordPayload> extends HoodieIndex
})
.flatMapToPair(new PairFlatMapFunction<List<Tuple2<String, String>>, String, String>() {
@Override
public Iterable<Tuple2<String, String>> call(List<Tuple2<String, String>> exploded) throws Exception {
return exploded;
public Iterator<Tuple2<String, String>> call(List<Tuple2<String, String>> exploded) throws Exception {
return exploded.iterator();
}
});
@@ -362,9 +362,9 @@ public class HoodieBloomIndex<T extends HoodieRecordPayload> extends HoodieIndex
.mapPartitionsWithIndex(new HoodieBloomIndexCheckFunction(config.getBasePath()), true)
.flatMap(new FlatMapFunction<List<IndexLookupResult>, IndexLookupResult>() {
@Override
public Iterable<IndexLookupResult> call(List<IndexLookupResult> indexLookupResults)
public Iterator<IndexLookupResult> call(List<IndexLookupResult> indexLookupResults)
throws Exception {
return indexLookupResults;
return indexLookupResults.iterator();
}
}).filter(new Function<IndexLookupResult, Boolean>() {
@Override
@@ -373,13 +373,13 @@ public class HoodieBloomIndex<T extends HoodieRecordPayload> extends HoodieIndex
}
}).flatMapToPair(new PairFlatMapFunction<IndexLookupResult, String, String>() {
@Override
public Iterable<Tuple2<String, String>> call(IndexLookupResult lookupResult)
public Iterator<Tuple2<String, String>> call(IndexLookupResult lookupResult)
throws Exception {
List<Tuple2<String, String>> vals = new ArrayList<>();
for (String recordKey : lookupResult.getMatchingRecordKeys()) {
vals.add(new Tuple2<>(recordKey, lookupResult.getFileName()));
}
return vals;
return vals.iterator();
}
});
}
@@ -399,9 +399,9 @@ public class HoodieBloomIndex<T extends HoodieRecordPayload> extends HoodieIndex
// Here as the recordRDD might have more data than rowKeyRDD (some rowKeys' fileId is null), so we do left outer join.
return rowKeyRecordPairRDD.leftOuterJoin(rowKeyFilenamePairRDD).values().map(
new Function<Tuple2<HoodieRecord<T>, Optional<String>>, HoodieRecord<T>>() {
new Function<Tuple2<HoodieRecord<T>, org.apache.spark.api.java.Optional<String>>, HoodieRecord<T>>() {
@Override
public HoodieRecord<T> call(Tuple2<HoodieRecord<T>, Optional<String>> v1) throws Exception {
public HoodieRecord<T> call(Tuple2<HoodieRecord<T>, org.apache.spark.api.java.Optional<String>> v1) throws Exception {
HoodieRecord<T> record = v1._1();
if (v1._2().isPresent()) {
String filename = v1._2().get();

View File

@@ -63,7 +63,7 @@ public class WorkloadProfile<T extends HoodieRecordPayload> implements Serializa
private void buildProfile() {
Map<Tuple2<String, Option<HoodieRecordLocation>>, Object> partitionLocationCounts =
Map<Tuple2<String, Option<HoodieRecordLocation>>, Long> partitionLocationCounts =
taggedRecords.mapToPair(new PairFunction<HoodieRecord<T>, Tuple2<String, Option<HoodieRecordLocation>>, HoodieRecord<T>>() {
@Override
public Tuple2<Tuple2<String, Option<HoodieRecordLocation>>, HoodieRecord<T>> call(HoodieRecord<T> record) throws Exception {
@@ -71,9 +71,9 @@ public class WorkloadProfile<T extends HoodieRecordPayload> implements Serializa
}
}).countByKey();
for (Map.Entry<Tuple2<String, Option<HoodieRecordLocation>>, Object> e: partitionLocationCounts.entrySet()) {
for (Map.Entry<Tuple2<String, Option<HoodieRecordLocation>>, Long> e: partitionLocationCounts.entrySet()) {
String partitionPath = e.getKey()._1();
Long count = (Long) e.getValue();
Long count = e.getValue();
Option<HoodieRecordLocation> locOption = e.getKey()._2();
if (!partitionPathStatMap.containsKey(partitionPath)){

View File

@@ -38,6 +38,7 @@ import scala.Tuple2;
import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
/**
@@ -75,14 +76,14 @@ public class HoodieSnapshotCopier implements Serializable {
jsc.parallelize(partitions, partitions.size()).flatMap(new FlatMapFunction<String, Tuple2<String, String>>() {
@Override
public Iterable<Tuple2<String, String>> call(String partition) throws Exception {
public Iterator<Tuple2<String, String>> call(String partition) throws Exception {
// Only take latest version files <= latestCommit.
FileSystem fs = FSUtils.getFs();
List<Tuple2<String, String>> filePaths = new ArrayList<>();
for (FileStatus fileStatus : tableMetadata.getLatestVersionInPartition(fs, partition, latestCommit)) {
filePaths.add(new Tuple2<>(partition, fileStatus.getPath().toString()));
}
return filePaths;
return filePaths.iterator();
}
}).foreach(new VoidFunction<Tuple2<String, String>>() {
@Override

View File

@@ -89,7 +89,7 @@
<maven-dependency-plugin.version>2.10</maven-dependency-plugin.version>
<maven-jar-plugin.version>2.6</maven-jar-plugin.version>
<maven-surefire-plugin.version>2.19.1</maven-surefire-plugin.version>
<parquet.version>1.7.0</parquet.version>
<parquet.version>1.8.1</parquet.version>
<junit.version>4.11</junit.version>
<mockito.version>1.9.5</mockito.version>
<log4j.version>1.2.17</log4j.version>
@@ -97,7 +97,7 @@
<hadoop.version>2.6.0</hadoop.version>
<hive.version>1.1.0</hive.version>
<metrics.version>3.1.1</metrics.version>
<spark.version>1.5.1</spark.version>
<spark.version>2.1.0</spark.version>
</properties>
<scm>
@@ -192,6 +192,7 @@
<exclude>**/.*</exclude>
<exclude>**/*.txt</exclude>
<exclude>**/*.sh</exclude>
<exclude>**/dependency-reduced-pom.xml</exclude>
<exclude>**/test/resources/*.avro</exclude>
<exclude>**/test/resources/*.schema</exclude>
<exclude>**/test/resources/*.csv</exclude>
@@ -316,7 +317,7 @@
<dependency>
<groupId>org.apache.parquet</groupId>
<artifactId>parquet-hive-bundle</artifactId>
<version>1.7.0</version>
<version>1.8.1</version>
</dependency>
<dependency>