diff --git a/hoodie-client/src/main/java/com/uber/hoodie/HoodieReadClient.java b/hoodie-client/src/main/java/com/uber/hoodie/HoodieReadClient.java index 157aa75c1..c2a4500ea 100644 --- a/hoodie-client/src/main/java/com/uber/hoodie/HoodieReadClient.java +++ b/hoodie-client/src/main/java/com/uber/hoodie/HoodieReadClient.java @@ -40,7 +40,7 @@ import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.Function; import org.apache.spark.api.java.function.PairFunction; -import org.apache.spark.sql.DataFrame; +import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; import org.apache.spark.sql.SQLContext; import org.apache.spark.sql.types.StructType; @@ -122,7 +122,7 @@ public class HoodieReadClient implements Serializable { * * @return a dataframe */ - public DataFrame read(JavaRDD hoodieKeys, int parallelism) + public Dataset read(JavaRDD hoodieKeys, int parallelism) throws Exception { assertSqlContext(); @@ -145,7 +145,7 @@ public class HoodieReadClient implements Serializable { // record locations might be same for multiple keys, so need a unique list Set uniquePaths = new HashSet<>(paths); - DataFrame originalDF = sqlContextOpt.get().read() + Dataset originalDF = sqlContextOpt.get().read() .parquet(uniquePaths.toArray(new String[uniquePaths.size()])); StructType schema = originalDF.schema(); JavaPairRDD keyRowRDD = originalDF.javaRDD() @@ -174,7 +174,7 @@ public class HoodieReadClient implements Serializable { /** * Reads the paths under the a hoodie dataset out as a DataFrame */ - public DataFrame read(String... paths) { + public Dataset read(String... paths) { assertSqlContext(); List filteredPaths = new ArrayList<>(); try { @@ -203,7 +203,7 @@ public class HoodieReadClient implements Serializable { * If you made a prior call to {@link HoodieReadClient#latestCommit()}, it gives you all data in * the time window (commitTimestamp, latestCommit) */ - public DataFrame readSince(String lastCommitTimestamp) { + public Dataset readSince(String lastCommitTimestamp) { List commitsToReturn = metadata.findCommitsAfter(lastCommitTimestamp, Integer.MAX_VALUE); //TODO: we can potentially trim this down to only affected partitions, using CommitMetadata @@ -227,7 +227,7 @@ public class HoodieReadClient implements Serializable { /** * Obtain */ - public DataFrame readCommit(String commitTime) { + public Dataset readCommit(String commitTime) { assertSqlContext(); HoodieCommits commits = metadata.getAllCommits(); if (!commits.contains(commitTime)) { diff --git a/hoodie-client/src/main/java/com/uber/hoodie/HoodieWriteClient.java b/hoodie-client/src/main/java/com/uber/hoodie/HoodieWriteClient.java index d50f2e1de..f367ad27b 100644 --- a/hoodie-client/src/main/java/com/uber/hoodie/HoodieWriteClient.java +++ b/hoodie-client/src/main/java/com/uber/hoodie/HoodieWriteClient.java @@ -283,9 +283,9 @@ public class HoodieWriteClient implements Seriali } }, true).flatMap(new FlatMapFunction, WriteStatus>() { @Override - public Iterable call(List writeStatuses) + public Iterator call(List writeStatuses) throws Exception { - return writeStatuses; + return writeStatuses.iterator(); } }); @@ -332,9 +332,9 @@ public class HoodieWriteClient implements Seriali .mapPartitionsWithIndex(new BulkInsertMapFunction(commitTime, config, metadata), true).flatMap(new FlatMapFunction, WriteStatus>() { @Override - public Iterable call(List writeStatuses) + public Iterator call(List writeStatuses) throws Exception { - return writeStatuses; + return writeStatuses.iterator(); } }); diff --git a/hoodie-client/src/main/java/com/uber/hoodie/index/HoodieBloomIndex.java b/hoodie-client/src/main/java/com/uber/hoodie/index/HoodieBloomIndex.java index a20325da9..e5622d99e 100644 --- a/hoodie-client/src/main/java/com/uber/hoodie/index/HoodieBloomIndex.java +++ b/hoodie-client/src/main/java/com/uber/hoodie/index/HoodieBloomIndex.java @@ -115,10 +115,10 @@ public class HoodieBloomIndex extends HoodieIndex }); return rowKeyHoodieKeyPairRDD.leftOuterJoin(rowKeyFilenamePairRDD).mapToPair( - new PairFunction>>, HoodieKey, Optional>() { + new PairFunction>>, HoodieKey, Optional>() { @Override public Tuple2> call( - Tuple2>> keyPathTuple) + Tuple2>> keyPathTuple) throws Exception { Optional recordLocationPath; if (keyPathTuple._2._2.isPresent()) { @@ -146,13 +146,13 @@ public class HoodieBloomIndex extends HoodieIndex private JavaPairRDD lookupIndex( JavaPairRDD partitionRecordKeyPairRDD, final HoodieTableMetadata metadata) { // Obtain records per partition, in the incoming records - Map recordsPerPartition = partitionRecordKeyPairRDD.countByKey(); + Map recordsPerPartition = partitionRecordKeyPairRDD.countByKey(); List affectedPartitionPathList = new ArrayList<>(recordsPerPartition.keySet()); // Step 2: Load all involved files as pairs JavaPairRDD partitionFilePairRDD = loadInvolvedFiles(affectedPartitionPathList, metadata); - Map filesPerPartition = partitionFilePairRDD.countByKey(); + Map filesPerPartition = partitionFilePairRDD.countByKey(); // Compute total subpartitions, to split partitions into. Map subpartitionCountMap = @@ -174,7 +174,7 @@ public class HoodieBloomIndex extends HoodieIndex * @param filesPerPartition * @return */ - private Map computeSubPartitions(Map recordsPerPartition, Map filesPerPartition) { + private Map computeSubPartitions(Map recordsPerPartition, Map filesPerPartition) { Map subpartitionCountMap = new HashMap<>(); long totalRecords = 0; long totalFiles = 0; @@ -214,7 +214,7 @@ public class HoodieBloomIndex extends HoodieIndex return jsc.parallelize(partitions, Math.max(partitions.size(), 1)) .flatMapToPair(new PairFlatMapFunction() { @Override - public Iterable> call(String partitionPath) { + public Iterator> call(String partitionPath) { FileSystem fs = FSUtils.getFs(); String latestCommitTime = metadata.getAllCommits().lastCommit(); FileStatus[] filteredStatus = metadata.getLatestVersionInPartition(fs, partitionPath, latestCommitTime); @@ -222,7 +222,7 @@ public class HoodieBloomIndex extends HoodieIndex for (FileStatus fileStatus : filteredStatus) { list.add(new Tuple2<>(partitionPath, fileStatus.getPath().getName())); } - return list; + return list.iterator(); } }); } @@ -261,8 +261,8 @@ public class HoodieBloomIndex extends HoodieIndex }) .flatMapToPair(new PairFlatMapFunction>, String, String>() { @Override - public Iterable> call(List> exploded) throws Exception { - return exploded; + public Iterator> call(List> exploded) throws Exception { + return exploded.iterator(); } }); @@ -362,9 +362,9 @@ public class HoodieBloomIndex extends HoodieIndex .mapPartitionsWithIndex(new HoodieBloomIndexCheckFunction(config.getBasePath()), true) .flatMap(new FlatMapFunction, IndexLookupResult>() { @Override - public Iterable call(List indexLookupResults) + public Iterator call(List indexLookupResults) throws Exception { - return indexLookupResults; + return indexLookupResults.iterator(); } }).filter(new Function() { @Override @@ -373,13 +373,13 @@ public class HoodieBloomIndex extends HoodieIndex } }).flatMapToPair(new PairFlatMapFunction() { @Override - public Iterable> call(IndexLookupResult lookupResult) + public Iterator> call(IndexLookupResult lookupResult) throws Exception { List> vals = new ArrayList<>(); for (String recordKey : lookupResult.getMatchingRecordKeys()) { vals.add(new Tuple2<>(recordKey, lookupResult.getFileName())); } - return vals; + return vals.iterator(); } }); } @@ -399,9 +399,9 @@ public class HoodieBloomIndex extends HoodieIndex // Here as the recordRDD might have more data than rowKeyRDD (some rowKeys' fileId is null), so we do left outer join. return rowKeyRecordPairRDD.leftOuterJoin(rowKeyFilenamePairRDD).values().map( - new Function, Optional>, HoodieRecord>() { + new Function, org.apache.spark.api.java.Optional>, HoodieRecord>() { @Override - public HoodieRecord call(Tuple2, Optional> v1) throws Exception { + public HoodieRecord call(Tuple2, org.apache.spark.api.java.Optional> v1) throws Exception { HoodieRecord record = v1._1(); if (v1._2().isPresent()) { String filename = v1._2().get(); diff --git a/hoodie-client/src/main/java/com/uber/hoodie/table/WorkloadProfile.java b/hoodie-client/src/main/java/com/uber/hoodie/table/WorkloadProfile.java index 21ca4eaf2..7109f3065 100644 --- a/hoodie-client/src/main/java/com/uber/hoodie/table/WorkloadProfile.java +++ b/hoodie-client/src/main/java/com/uber/hoodie/table/WorkloadProfile.java @@ -63,7 +63,7 @@ public class WorkloadProfile implements Serializa private void buildProfile() { - Map>, Object> partitionLocationCounts = + Map>, Long> partitionLocationCounts = taggedRecords.mapToPair(new PairFunction, Tuple2>, HoodieRecord>() { @Override public Tuple2>, HoodieRecord> call(HoodieRecord record) throws Exception { @@ -71,9 +71,9 @@ public class WorkloadProfile implements Serializa } }).countByKey(); - for (Map.Entry>, Object> e: partitionLocationCounts.entrySet()) { + for (Map.Entry>, Long> e: partitionLocationCounts.entrySet()) { String partitionPath = e.getKey()._1(); - Long count = (Long) e.getValue(); + Long count = e.getValue(); Option locOption = e.getKey()._2(); if (!partitionPathStatMap.containsKey(partitionPath)){ diff --git a/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/HoodieSnapshotCopier.java b/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/HoodieSnapshotCopier.java index 4aaa6607b..4196fe449 100644 --- a/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/HoodieSnapshotCopier.java +++ b/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/HoodieSnapshotCopier.java @@ -38,6 +38,7 @@ import scala.Tuple2; import java.io.IOException; import java.io.Serializable; import java.util.ArrayList; +import java.util.Iterator; import java.util.List; /** @@ -75,14 +76,14 @@ public class HoodieSnapshotCopier implements Serializable { jsc.parallelize(partitions, partitions.size()).flatMap(new FlatMapFunction>() { @Override - public Iterable> call(String partition) throws Exception { + public Iterator> call(String partition) throws Exception { // Only take latest version files <= latestCommit. FileSystem fs = FSUtils.getFs(); List> filePaths = new ArrayList<>(); for (FileStatus fileStatus : tableMetadata.getLatestVersionInPartition(fs, partition, latestCommit)) { filePaths.add(new Tuple2<>(partition, fileStatus.getPath().toString())); } - return filePaths; + return filePaths.iterator(); } }).foreach(new VoidFunction>() { @Override diff --git a/pom.xml b/pom.xml index 9d0356f86..4b44095d3 100644 --- a/pom.xml +++ b/pom.xml @@ -89,7 +89,7 @@ 2.10 2.6 2.19.1 - 1.7.0 + 1.8.1 4.11 1.9.5 1.2.17 @@ -97,7 +97,7 @@ 2.6.0 1.1.0 3.1.1 - 1.5.1 + 2.1.0 @@ -192,6 +192,7 @@ **/.* **/*.txt **/*.sh + **/dependency-reduced-pom.xml **/test/resources/*.avro **/test/resources/*.schema **/test/resources/*.csv @@ -316,7 +317,7 @@ org.apache.parquet parquet-hive-bundle - 1.7.0 + 1.8.1