From 236d1b0dec409efb9c7a9febd260b1060da6a9c1 Mon Sep 17 00:00:00 2001 From: Shen Hong Date: Fri, 11 Dec 2020 16:36:37 +0800 Subject: [PATCH] [HUDI-1439] Remove scala dependency from hudi-client-common (#2306) --- hudi-client/hudi-client-common/pom.xml | 7 ----- .../common/function/FunctionWrapper.java | 5 ++-- .../function/SerializablePairFunction.java | 4 +-- .../hudi/io/HoodieKeyLocationFetchHandle.java | 10 +++---- .../savepoint/SavepointActionExecutor.java | 4 +-- .../common/HoodieFlinkEngineContext.java | 4 +-- .../action/commit/UpsertPartitioner.java | 4 ++- .../rollback/ListingBasedRollbackHelper.java | 9 +++---- .../common/TestHoodieFlinkEngineContext.java | 5 ++-- .../common/HoodieJavaEngineContext.java | 4 +-- .../common/TestHoodieJavaEngineContext.java | 5 ++-- .../common/HoodieSparkEngineContext.java | 12 +++++++-- .../index/simple/SparkHoodieSimpleIndex.java | 3 ++- .../io/TestHoodieKeyLocationFetchHandle.java | 4 +-- style/checkstyle.xml | 5 ++++ style/import-control.xml | 26 +++++++++++++++++++ 16 files changed, 70 insertions(+), 41 deletions(-) create mode 100644 style/import-control.xml diff --git a/hudi-client/hudi-client-common/pom.xml b/hudi-client/hudi-client-common/pom.xml index 487a2e2b3..7f4123b92 100644 --- a/hudi-client/hudi-client-common/pom.xml +++ b/hudi-client/hudi-client-common/pom.xml @@ -31,13 +31,6 @@ jar - - - org.scala-lang - scala-library - ${scala.version} - - org.apache.hudi diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/common/function/FunctionWrapper.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/common/function/FunctionWrapper.java index 4e91bd29d..ff88894c4 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/common/function/FunctionWrapper.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/common/function/FunctionWrapper.java @@ -18,14 +18,13 @@ package org.apache.hudi.client.common.function; +import org.apache.hudi.common.util.collection.Pair; import org.apache.hudi.exception.HoodieException; import java.util.function.Consumer; import java.util.function.Function; import java.util.stream.Stream; -import scala.Tuple2; - /** * Function wrapper util class, which catches the exception thrown by input function and return a similar function * with no exception thrown. @@ -62,7 +61,7 @@ public class FunctionWrapper { }; } - public static Function> throwingMapToPairWrapper(SerializablePairFunction throwingPairFunction) { + public static Function> throwingMapToPairWrapper(SerializablePairFunction throwingPairFunction) { return v1 -> { try { return throwingPairFunction.call(v1); diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/common/function/SerializablePairFunction.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/common/function/SerializablePairFunction.java index 155837b7f..d6a7d9134 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/common/function/SerializablePairFunction.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/common/function/SerializablePairFunction.java @@ -18,7 +18,7 @@ package org.apache.hudi.client.common.function; -import scala.Tuple2; +import org.apache.hudi.common.util.collection.Pair; import java.io.Serializable; @@ -27,5 +27,5 @@ import java.io.Serializable; */ @FunctionalInterface public interface SerializablePairFunction extends Serializable { - Tuple2 call(I t) throws Exception; + Pair call(I t) throws Exception; } diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieKeyLocationFetchHandle.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieKeyLocationFetchHandle.java index 9194fc042..a5bc6b2fe 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieKeyLocationFetchHandle.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieKeyLocationFetchHandle.java @@ -29,9 +29,7 @@ import org.apache.hudi.table.HoodieTable; import org.apache.hadoop.fs.Path; -import java.util.Iterator; - -import scala.Tuple2; +import java.util.stream.Stream; /** * {@link HoodieRecordLocation} fetch handle for all records from {@link HoodieBaseFile} of interest. @@ -48,10 +46,10 @@ public class HoodieKeyLocationFetchHandle> locations() { + public Stream> locations() { HoodieBaseFile baseFile = partitionPathBaseFilePair.getRight(); return ParquetUtils.fetchRecordKeyPartitionPathFromParquet(hoodieTable.getHadoopConf(), new Path(baseFile.getPath())).stream() - .map(entry -> new Tuple2<>(entry, - new HoodieRecordLocation(baseFile.getCommitTime(), baseFile.getFileId()))).iterator(); + .map(entry -> Pair.of(entry, + new HoodieRecordLocation(baseFile.getCommitTime(), baseFile.getFileId()))); } } diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/savepoint/SavepointActionExecutor.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/savepoint/SavepointActionExecutor.java index 16fd9a481..90a96b914 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/savepoint/SavepointActionExecutor.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/savepoint/SavepointActionExecutor.java @@ -31,13 +31,13 @@ import org.apache.hudi.common.table.timeline.TimelineMetadataUtils; import org.apache.hudi.common.table.view.TableFileSystemView; import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.ValidationUtils; +import org.apache.hudi.common.util.collection.ImmutablePair; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.exception.HoodieSavepointException; import org.apache.hudi.table.HoodieTable; import org.apache.hudi.table.action.BaseActionExecutor; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; -import scala.Tuple2; import java.io.IOException; import java.util.List; @@ -96,7 +96,7 @@ public class SavepointActionExecutor ext TableFileSystemView.BaseFileOnlyView view = table.getBaseFileOnlyView(); List latestFiles = view.getLatestBaseFilesBeforeOrOn(partitionPath, instantTime) .map(HoodieBaseFile::getFileName).collect(Collectors.toList()); - return new Tuple2<>(partitionPath, latestFiles); + return new ImmutablePair<>(partitionPath, latestFiles); }, null); HoodieSavepointMetadata metadata = TimelineMetadataUtils.convertSavepointMetadata(user, comment, latestFilesMap); // Nothing to save in the savepoint diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/common/HoodieFlinkEngineContext.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/common/HoodieFlinkEngineContext.java index 74c921fd0..8f067dc04 100644 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/common/HoodieFlinkEngineContext.java +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/common/HoodieFlinkEngineContext.java @@ -33,7 +33,7 @@ import java.util.Map; import java.util.stream.Collectors; import java.util.stream.Stream; -import scala.Tuple2; +import org.apache.hudi.common.util.collection.Pair; import static org.apache.hudi.client.common.function.FunctionWrapper.throwingFlatMapWrapper; import static org.apache.hudi.client.common.function.FunctionWrapper.throwingForeachWrapper; @@ -76,7 +76,7 @@ public class HoodieFlinkEngineContext extends HoodieEngineContext { @Override public Map mapToPair(List data, SerializablePairFunction func, Integer parallelism) { - return data.stream().map(throwingMapToPairWrapper(func)).collect(Collectors.toMap(Tuple2::_1, Tuple2::_2)); + return data.stream().map(throwingMapToPairWrapper(func)).collect(Collectors.toMap(Pair::getLeft, Pair::getRight)); } @Override diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/UpsertPartitioner.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/UpsertPartitioner.java index 2bcd3b2a7..5d37a0a25 100644 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/UpsertPartitioner.java +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/UpsertPartitioner.java @@ -29,6 +29,7 @@ import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.table.timeline.HoodieTimeline; import org.apache.hudi.common.util.NumericUtils; import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.collection.ImmutablePair; import org.apache.hudi.common.util.collection.Pair; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.table.HoodieTable; @@ -210,7 +211,8 @@ public class UpsertPartitioner> implements Part Map> partitionSmallFilesMap = new HashMap<>(); if (partitionPaths != null && partitionPaths.size() > 0) { context.setJobStatus(this.getClass().getSimpleName(), "Getting small files from partitions"); - partitionSmallFilesMap = context.mapToPair(partitionPaths, partitionPath -> new Tuple2<>(partitionPath, getSmallFiles(partitionPath)), 0); + partitionSmallFilesMap = context.mapToPair(partitionPaths, + partitionPath -> new ImmutablePair<>(partitionPath, getSmallFiles(partitionPath)), 0); } return partitionSmallFilesMap; } diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/rollback/ListingBasedRollbackHelper.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/rollback/ListingBasedRollbackHelper.java index 612635da8..e88be4e6d 100644 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/rollback/ListingBasedRollbackHelper.java +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/rollback/ListingBasedRollbackHelper.java @@ -27,6 +27,7 @@ import org.apache.hudi.common.table.log.HoodieLogFormat; import org.apache.hudi.common.table.log.block.HoodieCommandBlock; import org.apache.hudi.common.table.log.block.HoodieLogBlock; import org.apache.hudi.common.table.timeline.HoodieInstant; +import org.apache.hudi.common.util.collection.ImmutablePair; import org.apache.hudi.common.util.collection.Pair; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.exception.HoodieIOException; @@ -48,8 +49,6 @@ import java.util.Map; import java.util.Objects; import java.util.stream.Collectors; -import scala.Tuple2; - /** * Performs Rollback of Hoodie Tables. */ @@ -106,13 +105,13 @@ public class ListingBasedRollbackHelper implements Serializable { case DELETE_DATA_FILES_ONLY: { final Map filesToDeletedStatus = deleteBaseFiles(metaClient, config, instantToRollback.getTimestamp(), rollbackRequest.getPartitionPath(), doDelete); - return new Tuple2<>(rollbackRequest.getPartitionPath(), + return new ImmutablePair<>(rollbackRequest.getPartitionPath(), HoodieRollbackStat.newBuilder().withPartitionPath(rollbackRequest.getPartitionPath()) .withDeletedFileResults(filesToDeletedStatus).build()); } case DELETE_DATA_AND_LOG_FILES: { final Map filesToDeletedStatus = deleteBaseAndLogFiles(metaClient, config, instantToRollback.getTimestamp(), rollbackRequest.getPartitionPath(), doDelete); - return new Tuple2<>(rollbackRequest.getPartitionPath(), + return new ImmutablePair<>(rollbackRequest.getPartitionPath(), HoodieRollbackStat.newBuilder().withPartitionPath(rollbackRequest.getPartitionPath()) .withDeletedFileResults(filesToDeletedStatus).build()); } @@ -150,7 +149,7 @@ public class ListingBasedRollbackHelper implements Serializable { metaClient.getFs().getFileStatus(Objects.requireNonNull(writer).getLogFile().getPath()), 1L ); - return new Tuple2<>(rollbackRequest.getPartitionPath(), + return new ImmutablePair<>(rollbackRequest.getPartitionPath(), HoodieRollbackStat.newBuilder().withPartitionPath(rollbackRequest.getPartitionPath()) .withRollbackBlockAppendResults(filesToNumBlocksRollback).build()); } diff --git a/hudi-client/hudi-flink-client/src/test/java/org/apache/hudi/client/common/TestHoodieFlinkEngineContext.java b/hudi-client/hudi-flink-client/src/test/java/org/apache/hudi/client/common/TestHoodieFlinkEngineContext.java index 41a8b9619..05ec76208 100644 --- a/hudi-client/hudi-flink-client/src/test/java/org/apache/hudi/client/common/TestHoodieFlinkEngineContext.java +++ b/hudi-client/hudi-flink-client/src/test/java/org/apache/hudi/client/common/TestHoodieFlinkEngineContext.java @@ -20,6 +20,7 @@ package org.apache.hudi.client.common; import org.apache.hudi.client.FlinkTaskContextSupplier; +import org.apache.hudi.common.util.collection.ImmutablePair; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -30,8 +31,6 @@ import java.util.Collection; import java.util.List; import java.util.Map; -import scala.Tuple2; - /** * Unit test against HoodieFlinkEngineContext. */ @@ -85,7 +84,7 @@ public class TestHoodieFlinkEngineContext { Map resultMap = context.mapToPair(mapList, x -> { String[] splits = x.split("_"); - return Tuple2.apply(splits[0], splits[1]); + return new ImmutablePair<>(splits[0], splits[1]); }, 2); Assertions.assertEquals(resultMap.get("spark"), resultMap.get("flink")); diff --git a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/common/HoodieJavaEngineContext.java b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/common/HoodieJavaEngineContext.java index a04a18b19..a5cbdd1c1 100644 --- a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/common/HoodieJavaEngineContext.java +++ b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/common/HoodieJavaEngineContext.java @@ -25,7 +25,7 @@ import org.apache.hudi.client.common.function.SerializablePairFunction; import org.apache.hudi.common.config.SerializableConfiguration; import org.apache.hudi.common.util.Option; -import scala.Tuple2; +import org.apache.hudi.common.util.collection.Pair; import java.util.List; import java.util.Map; @@ -65,7 +65,7 @@ public class HoodieJavaEngineContext extends HoodieEngineContext { @Override public Map mapToPair(List data, SerializablePairFunction func, Integer parallelism) { return data.stream().map(throwingMapToPairWrapper(func)).collect( - Collectors.toMap(Tuple2::_1, Tuple2::_2, (oldVal, newVal) -> newVal) + Collectors.toMap(Pair::getLeft, Pair::getRight, (oldVal, newVal) -> newVal) ); } diff --git a/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/client/common/TestHoodieJavaEngineContext.java b/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/client/common/TestHoodieJavaEngineContext.java index b81c11b71..e67e78c01 100644 --- a/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/client/common/TestHoodieJavaEngineContext.java +++ b/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/client/common/TestHoodieJavaEngineContext.java @@ -20,11 +20,10 @@ package org.apache.hudi.client.common; import org.apache.hadoop.conf.Configuration; import org.apache.hudi.DummyTaskContextSupplier; +import org.apache.hudi.common.util.collection.ImmutablePair; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Assertions; -import scala.Tuple2; - import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; @@ -77,7 +76,7 @@ public class TestHoodieJavaEngineContext { Map resultMap = context.mapToPair(mapList, x -> { String[] splits = x.split("_"); - return Tuple2.apply(splits[0], splits[1]); + return new ImmutablePair<>(splits[0], splits[1]); }, 2); Assertions.assertNotNull(resultMap.get("hudi")); diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/common/HoodieSparkEngineContext.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/common/HoodieSparkEngineContext.java index 0f17511b5..dc3dcd0c2 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/common/HoodieSparkEngineContext.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/common/HoodieSparkEngineContext.java @@ -24,10 +24,12 @@ import org.apache.hudi.client.common.function.SerializableConsumer; import org.apache.hudi.client.common.function.SerializableFunction; import org.apache.hudi.client.common.function.SerializablePairFunction; import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.collection.Pair; import org.apache.hudi.exception.HoodieException; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.sql.SQLContext; +import scala.Tuple2; import java.util.List; import java.util.Map; @@ -82,9 +84,15 @@ public class HoodieSparkEngineContext extends HoodieEngineContext { @Override public Map mapToPair(List data, SerializablePairFunction func, Integer parallelism) { if (Objects.nonNull(parallelism)) { - return javaSparkContext.parallelize(data, parallelism).mapToPair(func::call).collectAsMap(); + return javaSparkContext.parallelize(data, parallelism).mapToPair(input -> { + Pair pair = func.call(input); + return new Tuple2(pair.getLeft(), pair.getRight()); + }).collectAsMap(); } else { - return javaSparkContext.parallelize(data).mapToPair(func::call).collectAsMap(); + return javaSparkContext.parallelize(data).mapToPair(input -> { + Pair pair = func.call(input); + return new Tuple2(pair.getLeft(), pair.getRight()); + }).collectAsMap(); } } diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/simple/SparkHoodieSimpleIndex.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/simple/SparkHoodieSimpleIndex.java index 3f167e2eb..8575b3f31 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/simple/SparkHoodieSimpleIndex.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/simple/SparkHoodieSimpleIndex.java @@ -147,6 +147,7 @@ public class SparkHoodieSimpleIndex extends Spark JavaSparkContext jsc = HoodieSparkEngineContext.getSparkContext(context); int fetchParallelism = Math.max(1, Math.max(baseFiles.size(), parallelism)); return jsc.parallelize(baseFiles, fetchParallelism) - .flatMapToPair(partitionPathBaseFile -> new HoodieKeyLocationFetchHandle(config, hoodieTable, partitionPathBaseFile).locations()); + .flatMapToPair(partitionPathBaseFile -> new HoodieKeyLocationFetchHandle(config, hoodieTable, partitionPathBaseFile) + .locations().map(x -> Tuple2.apply(((Pair)x).getLeft(), ((Pair)x).getRight())).iterator()); } } diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestHoodieKeyLocationFetchHandle.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestHoodieKeyLocationFetchHandle.java index 3a7d468e7..ea13645fb 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestHoodieKeyLocationFetchHandle.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestHoodieKeyLocationFetchHandle.java @@ -95,9 +95,9 @@ public class TestHoodieKeyLocationFetchHandle extends HoodieClientTestHarness { for (Tuple2 entry : partitionPathFileIdPairs) { HoodieKeyLocationFetchHandle fetcherHandle = new HoodieKeyLocationFetchHandle(config, hoodieTable, Pair.of(entry._1, entry._2)); - Iterator> result = fetcherHandle.locations(); + Iterator> result = fetcherHandle.locations().iterator(); List> actualList = new ArrayList<>(); - result.forEachRemaining(actualList::add); + result.forEachRemaining(x -> actualList.add(new Tuple2<>(x.getLeft(), x.getRight()))); assertEquals(expectedList.get(new Tuple2<>(entry._1, entry._2.getFileId())), actualList); } } diff --git a/style/checkstyle.xml b/style/checkstyle.xml index c816abe99..7dbce7973 100644 --- a/style/checkstyle.xml +++ b/style/checkstyle.xml @@ -304,5 +304,10 @@ + + + + + diff --git a/style/import-control.xml b/style/import-control.xml new file mode 100644 index 000000000..cd35792cb --- /dev/null +++ b/style/import-control.xml @@ -0,0 +1,26 @@ + + + + + + + + +