diff --git a/hudi-client/hudi-client-common/pom.xml b/hudi-client/hudi-client-common/pom.xml
index 487a2e2b3..7f4123b92 100644
--- a/hudi-client/hudi-client-common/pom.xml
+++ b/hudi-client/hudi-client-common/pom.xml
@@ -31,13 +31,6 @@
jar
-
-
- org.scala-lang
- scala-library
- ${scala.version}
-
-
org.apache.hudi
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/common/function/FunctionWrapper.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/common/function/FunctionWrapper.java
index 4e91bd29d..ff88894c4 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/common/function/FunctionWrapper.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/common/function/FunctionWrapper.java
@@ -18,14 +18,13 @@
package org.apache.hudi.client.common.function;
+import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.exception.HoodieException;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.stream.Stream;
-import scala.Tuple2;
-
/**
* Function wrapper util class, which catches the exception thrown by input function and return a similar function
* with no exception thrown.
@@ -62,7 +61,7 @@ public class FunctionWrapper {
};
}
- public static Function> throwingMapToPairWrapper(SerializablePairFunction throwingPairFunction) {
+ public static Function> throwingMapToPairWrapper(SerializablePairFunction throwingPairFunction) {
return v1 -> {
try {
return throwingPairFunction.call(v1);
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/common/function/SerializablePairFunction.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/common/function/SerializablePairFunction.java
index 155837b7f..d6a7d9134 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/common/function/SerializablePairFunction.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/common/function/SerializablePairFunction.java
@@ -18,7 +18,7 @@
package org.apache.hudi.client.common.function;
-import scala.Tuple2;
+import org.apache.hudi.common.util.collection.Pair;
import java.io.Serializable;
@@ -27,5 +27,5 @@ import java.io.Serializable;
*/
@FunctionalInterface
public interface SerializablePairFunction extends Serializable {
- Tuple2 call(I t) throws Exception;
+ Pair call(I t) throws Exception;
}
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieKeyLocationFetchHandle.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieKeyLocationFetchHandle.java
index 9194fc042..a5bc6b2fe 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieKeyLocationFetchHandle.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieKeyLocationFetchHandle.java
@@ -29,9 +29,7 @@ import org.apache.hudi.table.HoodieTable;
import org.apache.hadoop.fs.Path;
-import java.util.Iterator;
-
-import scala.Tuple2;
+import java.util.stream.Stream;
/**
* {@link HoodieRecordLocation} fetch handle for all records from {@link HoodieBaseFile} of interest.
@@ -48,10 +46,10 @@ public class HoodieKeyLocationFetchHandle> locations() {
+ public Stream> locations() {
HoodieBaseFile baseFile = partitionPathBaseFilePair.getRight();
return ParquetUtils.fetchRecordKeyPartitionPathFromParquet(hoodieTable.getHadoopConf(), new Path(baseFile.getPath())).stream()
- .map(entry -> new Tuple2<>(entry,
- new HoodieRecordLocation(baseFile.getCommitTime(), baseFile.getFileId()))).iterator();
+ .map(entry -> Pair.of(entry,
+ new HoodieRecordLocation(baseFile.getCommitTime(), baseFile.getFileId())));
}
}
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/savepoint/SavepointActionExecutor.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/savepoint/SavepointActionExecutor.java
index 16fd9a481..90a96b914 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/savepoint/SavepointActionExecutor.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/savepoint/SavepointActionExecutor.java
@@ -31,13 +31,13 @@ import org.apache.hudi.common.table.timeline.TimelineMetadataUtils;
import org.apache.hudi.common.table.view.TableFileSystemView;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.common.util.collection.ImmutablePair;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieSavepointException;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.action.BaseActionExecutor;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
-import scala.Tuple2;
import java.io.IOException;
import java.util.List;
@@ -96,7 +96,7 @@ public class SavepointActionExecutor ext
TableFileSystemView.BaseFileOnlyView view = table.getBaseFileOnlyView();
List latestFiles = view.getLatestBaseFilesBeforeOrOn(partitionPath, instantTime)
.map(HoodieBaseFile::getFileName).collect(Collectors.toList());
- return new Tuple2<>(partitionPath, latestFiles);
+ return new ImmutablePair<>(partitionPath, latestFiles);
}, null);
HoodieSavepointMetadata metadata = TimelineMetadataUtils.convertSavepointMetadata(user, comment, latestFilesMap);
// Nothing to save in the savepoint
diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/common/HoodieFlinkEngineContext.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/common/HoodieFlinkEngineContext.java
index 74c921fd0..8f067dc04 100644
--- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/common/HoodieFlinkEngineContext.java
+++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/common/HoodieFlinkEngineContext.java
@@ -33,7 +33,7 @@ import java.util.Map;
import java.util.stream.Collectors;
import java.util.stream.Stream;
-import scala.Tuple2;
+import org.apache.hudi.common.util.collection.Pair;
import static org.apache.hudi.client.common.function.FunctionWrapper.throwingFlatMapWrapper;
import static org.apache.hudi.client.common.function.FunctionWrapper.throwingForeachWrapper;
@@ -76,7 +76,7 @@ public class HoodieFlinkEngineContext extends HoodieEngineContext {
@Override
public Map mapToPair(List data, SerializablePairFunction func, Integer parallelism) {
- return data.stream().map(throwingMapToPairWrapper(func)).collect(Collectors.toMap(Tuple2::_1, Tuple2::_2));
+ return data.stream().map(throwingMapToPairWrapper(func)).collect(Collectors.toMap(Pair::getLeft, Pair::getRight));
}
@Override
diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/UpsertPartitioner.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/UpsertPartitioner.java
index 2bcd3b2a7..5d37a0a25 100644
--- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/UpsertPartitioner.java
+++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/UpsertPartitioner.java
@@ -29,6 +29,7 @@ import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.util.NumericUtils;
import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.collection.ImmutablePair;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.table.HoodieTable;
@@ -210,7 +211,8 @@ public class UpsertPartitioner> implements Part
Map> partitionSmallFilesMap = new HashMap<>();
if (partitionPaths != null && partitionPaths.size() > 0) {
context.setJobStatus(this.getClass().getSimpleName(), "Getting small files from partitions");
- partitionSmallFilesMap = context.mapToPair(partitionPaths, partitionPath -> new Tuple2<>(partitionPath, getSmallFiles(partitionPath)), 0);
+ partitionSmallFilesMap = context.mapToPair(partitionPaths,
+ partitionPath -> new ImmutablePair<>(partitionPath, getSmallFiles(partitionPath)), 0);
}
return partitionSmallFilesMap;
}
diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/rollback/ListingBasedRollbackHelper.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/rollback/ListingBasedRollbackHelper.java
index 612635da8..e88be4e6d 100644
--- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/rollback/ListingBasedRollbackHelper.java
+++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/rollback/ListingBasedRollbackHelper.java
@@ -27,6 +27,7 @@ import org.apache.hudi.common.table.log.HoodieLogFormat;
import org.apache.hudi.common.table.log.block.HoodieCommandBlock;
import org.apache.hudi.common.table.log.block.HoodieLogBlock;
import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.util.collection.ImmutablePair;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieIOException;
@@ -48,8 +49,6 @@ import java.util.Map;
import java.util.Objects;
import java.util.stream.Collectors;
-import scala.Tuple2;
-
/**
* Performs Rollback of Hoodie Tables.
*/
@@ -106,13 +105,13 @@ public class ListingBasedRollbackHelper implements Serializable {
case DELETE_DATA_FILES_ONLY: {
final Map filesToDeletedStatus = deleteBaseFiles(metaClient, config, instantToRollback.getTimestamp(),
rollbackRequest.getPartitionPath(), doDelete);
- return new Tuple2<>(rollbackRequest.getPartitionPath(),
+ return new ImmutablePair<>(rollbackRequest.getPartitionPath(),
HoodieRollbackStat.newBuilder().withPartitionPath(rollbackRequest.getPartitionPath())
.withDeletedFileResults(filesToDeletedStatus).build());
}
case DELETE_DATA_AND_LOG_FILES: {
final Map filesToDeletedStatus = deleteBaseAndLogFiles(metaClient, config, instantToRollback.getTimestamp(), rollbackRequest.getPartitionPath(), doDelete);
- return new Tuple2<>(rollbackRequest.getPartitionPath(),
+ return new ImmutablePair<>(rollbackRequest.getPartitionPath(),
HoodieRollbackStat.newBuilder().withPartitionPath(rollbackRequest.getPartitionPath())
.withDeletedFileResults(filesToDeletedStatus).build());
}
@@ -150,7 +149,7 @@ public class ListingBasedRollbackHelper implements Serializable {
metaClient.getFs().getFileStatus(Objects.requireNonNull(writer).getLogFile().getPath()),
1L
);
- return new Tuple2<>(rollbackRequest.getPartitionPath(),
+ return new ImmutablePair<>(rollbackRequest.getPartitionPath(),
HoodieRollbackStat.newBuilder().withPartitionPath(rollbackRequest.getPartitionPath())
.withRollbackBlockAppendResults(filesToNumBlocksRollback).build());
}
diff --git a/hudi-client/hudi-flink-client/src/test/java/org/apache/hudi/client/common/TestHoodieFlinkEngineContext.java b/hudi-client/hudi-flink-client/src/test/java/org/apache/hudi/client/common/TestHoodieFlinkEngineContext.java
index 41a8b9619..05ec76208 100644
--- a/hudi-client/hudi-flink-client/src/test/java/org/apache/hudi/client/common/TestHoodieFlinkEngineContext.java
+++ b/hudi-client/hudi-flink-client/src/test/java/org/apache/hudi/client/common/TestHoodieFlinkEngineContext.java
@@ -20,6 +20,7 @@ package org.apache.hudi.client.common;
import org.apache.hudi.client.FlinkTaskContextSupplier;
+import org.apache.hudi.common.util.collection.ImmutablePair;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
@@ -30,8 +31,6 @@ import java.util.Collection;
import java.util.List;
import java.util.Map;
-import scala.Tuple2;
-
/**
* Unit test against HoodieFlinkEngineContext.
*/
@@ -85,7 +84,7 @@ public class TestHoodieFlinkEngineContext {
Map resultMap = context.mapToPair(mapList, x -> {
String[] splits = x.split("_");
- return Tuple2.apply(splits[0], splits[1]);
+ return new ImmutablePair<>(splits[0], splits[1]);
}, 2);
Assertions.assertEquals(resultMap.get("spark"), resultMap.get("flink"));
diff --git a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/common/HoodieJavaEngineContext.java b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/common/HoodieJavaEngineContext.java
index a04a18b19..a5cbdd1c1 100644
--- a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/common/HoodieJavaEngineContext.java
+++ b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/common/HoodieJavaEngineContext.java
@@ -25,7 +25,7 @@ import org.apache.hudi.client.common.function.SerializablePairFunction;
import org.apache.hudi.common.config.SerializableConfiguration;
import org.apache.hudi.common.util.Option;
-import scala.Tuple2;
+import org.apache.hudi.common.util.collection.Pair;
import java.util.List;
import java.util.Map;
@@ -65,7 +65,7 @@ public class HoodieJavaEngineContext extends HoodieEngineContext {
@Override
public Map mapToPair(List data, SerializablePairFunction func, Integer parallelism) {
return data.stream().map(throwingMapToPairWrapper(func)).collect(
- Collectors.toMap(Tuple2::_1, Tuple2::_2, (oldVal, newVal) -> newVal)
+ Collectors.toMap(Pair::getLeft, Pair::getRight, (oldVal, newVal) -> newVal)
);
}
diff --git a/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/client/common/TestHoodieJavaEngineContext.java b/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/client/common/TestHoodieJavaEngineContext.java
index b81c11b71..e67e78c01 100644
--- a/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/client/common/TestHoodieJavaEngineContext.java
+++ b/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/client/common/TestHoodieJavaEngineContext.java
@@ -20,11 +20,10 @@ package org.apache.hudi.client.common;
import org.apache.hadoop.conf.Configuration;
import org.apache.hudi.DummyTaskContextSupplier;
+import org.apache.hudi.common.util.collection.ImmutablePair;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Assertions;
-import scala.Tuple2;
-
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
@@ -77,7 +76,7 @@ public class TestHoodieJavaEngineContext {
Map resultMap = context.mapToPair(mapList, x -> {
String[] splits = x.split("_");
- return Tuple2.apply(splits[0], splits[1]);
+ return new ImmutablePair<>(splits[0], splits[1]);
}, 2);
Assertions.assertNotNull(resultMap.get("hudi"));
diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/common/HoodieSparkEngineContext.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/common/HoodieSparkEngineContext.java
index 0f17511b5..dc3dcd0c2 100644
--- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/common/HoodieSparkEngineContext.java
+++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/common/HoodieSparkEngineContext.java
@@ -24,10 +24,12 @@ import org.apache.hudi.client.common.function.SerializableConsumer;
import org.apache.hudi.client.common.function.SerializableFunction;
import org.apache.hudi.client.common.function.SerializablePairFunction;
import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.exception.HoodieException;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.SQLContext;
+import scala.Tuple2;
import java.util.List;
import java.util.Map;
@@ -82,9 +84,15 @@ public class HoodieSparkEngineContext extends HoodieEngineContext {
@Override
public Map mapToPair(List data, SerializablePairFunction func, Integer parallelism) {
if (Objects.nonNull(parallelism)) {
- return javaSparkContext.parallelize(data, parallelism).mapToPair(func::call).collectAsMap();
+ return javaSparkContext.parallelize(data, parallelism).mapToPair(input -> {
+ Pair pair = func.call(input);
+ return new Tuple2(pair.getLeft(), pair.getRight());
+ }).collectAsMap();
} else {
- return javaSparkContext.parallelize(data).mapToPair(func::call).collectAsMap();
+ return javaSparkContext.parallelize(data).mapToPair(input -> {
+ Pair pair = func.call(input);
+ return new Tuple2(pair.getLeft(), pair.getRight());
+ }).collectAsMap();
}
}
diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/simple/SparkHoodieSimpleIndex.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/simple/SparkHoodieSimpleIndex.java
index 3f167e2eb..8575b3f31 100644
--- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/simple/SparkHoodieSimpleIndex.java
+++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/simple/SparkHoodieSimpleIndex.java
@@ -147,6 +147,7 @@ public class SparkHoodieSimpleIndex extends Spark
JavaSparkContext jsc = HoodieSparkEngineContext.getSparkContext(context);
int fetchParallelism = Math.max(1, Math.max(baseFiles.size(), parallelism));
return jsc.parallelize(baseFiles, fetchParallelism)
- .flatMapToPair(partitionPathBaseFile -> new HoodieKeyLocationFetchHandle(config, hoodieTable, partitionPathBaseFile).locations());
+ .flatMapToPair(partitionPathBaseFile -> new HoodieKeyLocationFetchHandle(config, hoodieTable, partitionPathBaseFile)
+ .locations().map(x -> Tuple2.apply(((Pair)x).getLeft(), ((Pair)x).getRight())).iterator());
}
}
diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestHoodieKeyLocationFetchHandle.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestHoodieKeyLocationFetchHandle.java
index 3a7d468e7..ea13645fb 100644
--- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestHoodieKeyLocationFetchHandle.java
+++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestHoodieKeyLocationFetchHandle.java
@@ -95,9 +95,9 @@ public class TestHoodieKeyLocationFetchHandle extends HoodieClientTestHarness {
for (Tuple2 entry : partitionPathFileIdPairs) {
HoodieKeyLocationFetchHandle fetcherHandle = new HoodieKeyLocationFetchHandle(config, hoodieTable, Pair.of(entry._1, entry._2));
- Iterator> result = fetcherHandle.locations();
+ Iterator> result = fetcherHandle.locations().iterator();
List> actualList = new ArrayList<>();
- result.forEachRemaining(actualList::add);
+ result.forEachRemaining(x -> actualList.add(new Tuple2<>(x.getLeft(), x.getRight())));
assertEquals(expectedList.get(new Tuple2<>(entry._1, entry._2.getFileId())), actualList);
}
}
diff --git a/style/checkstyle.xml b/style/checkstyle.xml
index c816abe99..7dbce7973 100644
--- a/style/checkstyle.xml
+++ b/style/checkstyle.xml
@@ -304,5 +304,10 @@
+
+
+
+
+
diff --git a/style/import-control.xml b/style/import-control.xml
new file mode 100644
index 000000000..cd35792cb
--- /dev/null
+++ b/style/import-control.xml
@@ -0,0 +1,26 @@
+
+
+
+
+
+
+
+
+