[HUDI-1439] Remove scala dependency from hudi-client-common (#2306)
This commit is contained in:
@@ -31,13 +31,6 @@
|
|||||||
<packaging>jar</packaging>
|
<packaging>jar</packaging>
|
||||||
|
|
||||||
<dependencies>
|
<dependencies>
|
||||||
<!-- Scala -->
|
|
||||||
<dependency>
|
|
||||||
<groupId>org.scala-lang</groupId>
|
|
||||||
<artifactId>scala-library</artifactId>
|
|
||||||
<version>${scala.version}</version>
|
|
||||||
</dependency>
|
|
||||||
|
|
||||||
<!-- Hoodie -->
|
<!-- Hoodie -->
|
||||||
<dependency>
|
<dependency>
|
||||||
<groupId>org.apache.hudi</groupId>
|
<groupId>org.apache.hudi</groupId>
|
||||||
|
|||||||
@@ -18,14 +18,13 @@
|
|||||||
|
|
||||||
package org.apache.hudi.client.common.function;
|
package org.apache.hudi.client.common.function;
|
||||||
|
|
||||||
|
import org.apache.hudi.common.util.collection.Pair;
|
||||||
import org.apache.hudi.exception.HoodieException;
|
import org.apache.hudi.exception.HoodieException;
|
||||||
|
|
||||||
import java.util.function.Consumer;
|
import java.util.function.Consumer;
|
||||||
import java.util.function.Function;
|
import java.util.function.Function;
|
||||||
import java.util.stream.Stream;
|
import java.util.stream.Stream;
|
||||||
|
|
||||||
import scala.Tuple2;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Function wrapper util class, which catches the exception thrown by input function and return a similar function
|
* Function wrapper util class, which catches the exception thrown by input function and return a similar function
|
||||||
* with no exception thrown.
|
* with no exception thrown.
|
||||||
@@ -62,7 +61,7 @@ public class FunctionWrapper {
|
|||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
|
||||||
public static <I, K, V> Function<I, Tuple2<K, V>> throwingMapToPairWrapper(SerializablePairFunction<I, K, V> throwingPairFunction) {
|
public static <I, K, V> Function<I, Pair<K, V>> throwingMapToPairWrapper(SerializablePairFunction<I, K, V> throwingPairFunction) {
|
||||||
return v1 -> {
|
return v1 -> {
|
||||||
try {
|
try {
|
||||||
return throwingPairFunction.call(v1);
|
return throwingPairFunction.call(v1);
|
||||||
|
|||||||
@@ -18,7 +18,7 @@
|
|||||||
|
|
||||||
package org.apache.hudi.client.common.function;
|
package org.apache.hudi.client.common.function;
|
||||||
|
|
||||||
import scala.Tuple2;
|
import org.apache.hudi.common.util.collection.Pair;
|
||||||
|
|
||||||
import java.io.Serializable;
|
import java.io.Serializable;
|
||||||
|
|
||||||
@@ -27,5 +27,5 @@ import java.io.Serializable;
|
|||||||
*/
|
*/
|
||||||
@FunctionalInterface
|
@FunctionalInterface
|
||||||
public interface SerializablePairFunction<I, K, V> extends Serializable {
|
public interface SerializablePairFunction<I, K, V> extends Serializable {
|
||||||
Tuple2<K, V> call(I t) throws Exception;
|
Pair<K, V> call(I t) throws Exception;
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -29,9 +29,7 @@ import org.apache.hudi.table.HoodieTable;
|
|||||||
|
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
|
|
||||||
import java.util.Iterator;
|
import java.util.stream.Stream;
|
||||||
|
|
||||||
import scala.Tuple2;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* {@link HoodieRecordLocation} fetch handle for all records from {@link HoodieBaseFile} of interest.
|
* {@link HoodieRecordLocation} fetch handle for all records from {@link HoodieBaseFile} of interest.
|
||||||
@@ -48,10 +46,10 @@ public class HoodieKeyLocationFetchHandle<T extends HoodieRecordPayload, I, K, O
|
|||||||
this.partitionPathBaseFilePair = partitionPathBaseFilePair;
|
this.partitionPathBaseFilePair = partitionPathBaseFilePair;
|
||||||
}
|
}
|
||||||
|
|
||||||
public Iterator<Tuple2<HoodieKey, HoodieRecordLocation>> locations() {
|
public Stream<Pair<HoodieKey, HoodieRecordLocation>> locations() {
|
||||||
HoodieBaseFile baseFile = partitionPathBaseFilePair.getRight();
|
HoodieBaseFile baseFile = partitionPathBaseFilePair.getRight();
|
||||||
return ParquetUtils.fetchRecordKeyPartitionPathFromParquet(hoodieTable.getHadoopConf(), new Path(baseFile.getPath())).stream()
|
return ParquetUtils.fetchRecordKeyPartitionPathFromParquet(hoodieTable.getHadoopConf(), new Path(baseFile.getPath())).stream()
|
||||||
.map(entry -> new Tuple2<>(entry,
|
.map(entry -> Pair.of(entry,
|
||||||
new HoodieRecordLocation(baseFile.getCommitTime(), baseFile.getFileId()))).iterator();
|
new HoodieRecordLocation(baseFile.getCommitTime(), baseFile.getFileId())));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -31,13 +31,13 @@ import org.apache.hudi.common.table.timeline.TimelineMetadataUtils;
|
|||||||
import org.apache.hudi.common.table.view.TableFileSystemView;
|
import org.apache.hudi.common.table.view.TableFileSystemView;
|
||||||
import org.apache.hudi.common.util.Option;
|
import org.apache.hudi.common.util.Option;
|
||||||
import org.apache.hudi.common.util.ValidationUtils;
|
import org.apache.hudi.common.util.ValidationUtils;
|
||||||
|
import org.apache.hudi.common.util.collection.ImmutablePair;
|
||||||
import org.apache.hudi.config.HoodieWriteConfig;
|
import org.apache.hudi.config.HoodieWriteConfig;
|
||||||
import org.apache.hudi.exception.HoodieSavepointException;
|
import org.apache.hudi.exception.HoodieSavepointException;
|
||||||
import org.apache.hudi.table.HoodieTable;
|
import org.apache.hudi.table.HoodieTable;
|
||||||
import org.apache.hudi.table.action.BaseActionExecutor;
|
import org.apache.hudi.table.action.BaseActionExecutor;
|
||||||
import org.apache.log4j.LogManager;
|
import org.apache.log4j.LogManager;
|
||||||
import org.apache.log4j.Logger;
|
import org.apache.log4j.Logger;
|
||||||
import scala.Tuple2;
|
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
@@ -96,7 +96,7 @@ public class SavepointActionExecutor<T extends HoodieRecordPayload, I, K, O> ext
|
|||||||
TableFileSystemView.BaseFileOnlyView view = table.getBaseFileOnlyView();
|
TableFileSystemView.BaseFileOnlyView view = table.getBaseFileOnlyView();
|
||||||
List<String> latestFiles = view.getLatestBaseFilesBeforeOrOn(partitionPath, instantTime)
|
List<String> latestFiles = view.getLatestBaseFilesBeforeOrOn(partitionPath, instantTime)
|
||||||
.map(HoodieBaseFile::getFileName).collect(Collectors.toList());
|
.map(HoodieBaseFile::getFileName).collect(Collectors.toList());
|
||||||
return new Tuple2<>(partitionPath, latestFiles);
|
return new ImmutablePair<>(partitionPath, latestFiles);
|
||||||
}, null);
|
}, null);
|
||||||
HoodieSavepointMetadata metadata = TimelineMetadataUtils.convertSavepointMetadata(user, comment, latestFilesMap);
|
HoodieSavepointMetadata metadata = TimelineMetadataUtils.convertSavepointMetadata(user, comment, latestFilesMap);
|
||||||
// Nothing to save in the savepoint
|
// Nothing to save in the savepoint
|
||||||
|
|||||||
@@ -33,7 +33,7 @@ import java.util.Map;
|
|||||||
import java.util.stream.Collectors;
|
import java.util.stream.Collectors;
|
||||||
import java.util.stream.Stream;
|
import java.util.stream.Stream;
|
||||||
|
|
||||||
import scala.Tuple2;
|
import org.apache.hudi.common.util.collection.Pair;
|
||||||
|
|
||||||
import static org.apache.hudi.client.common.function.FunctionWrapper.throwingFlatMapWrapper;
|
import static org.apache.hudi.client.common.function.FunctionWrapper.throwingFlatMapWrapper;
|
||||||
import static org.apache.hudi.client.common.function.FunctionWrapper.throwingForeachWrapper;
|
import static org.apache.hudi.client.common.function.FunctionWrapper.throwingForeachWrapper;
|
||||||
@@ -76,7 +76,7 @@ public class HoodieFlinkEngineContext extends HoodieEngineContext {
|
|||||||
|
|
||||||
@Override
|
@Override
|
||||||
public <I, K, V> Map<K, V> mapToPair(List<I> data, SerializablePairFunction<I, K, V> func, Integer parallelism) {
|
public <I, K, V> Map<K, V> mapToPair(List<I> data, SerializablePairFunction<I, K, V> func, Integer parallelism) {
|
||||||
return data.stream().map(throwingMapToPairWrapper(func)).collect(Collectors.toMap(Tuple2::_1, Tuple2::_2));
|
return data.stream().map(throwingMapToPairWrapper(func)).collect(Collectors.toMap(Pair::getLeft, Pair::getRight));
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|||||||
@@ -29,6 +29,7 @@ import org.apache.hudi.common.table.timeline.HoodieInstant;
|
|||||||
import org.apache.hudi.common.table.timeline.HoodieTimeline;
|
import org.apache.hudi.common.table.timeline.HoodieTimeline;
|
||||||
import org.apache.hudi.common.util.NumericUtils;
|
import org.apache.hudi.common.util.NumericUtils;
|
||||||
import org.apache.hudi.common.util.Option;
|
import org.apache.hudi.common.util.Option;
|
||||||
|
import org.apache.hudi.common.util.collection.ImmutablePair;
|
||||||
import org.apache.hudi.common.util.collection.Pair;
|
import org.apache.hudi.common.util.collection.Pair;
|
||||||
import org.apache.hudi.config.HoodieWriteConfig;
|
import org.apache.hudi.config.HoodieWriteConfig;
|
||||||
import org.apache.hudi.table.HoodieTable;
|
import org.apache.hudi.table.HoodieTable;
|
||||||
@@ -210,7 +211,8 @@ public class UpsertPartitioner<T extends HoodieRecordPayload<T>> implements Part
|
|||||||
Map<String, List<SmallFile>> partitionSmallFilesMap = new HashMap<>();
|
Map<String, List<SmallFile>> partitionSmallFilesMap = new HashMap<>();
|
||||||
if (partitionPaths != null && partitionPaths.size() > 0) {
|
if (partitionPaths != null && partitionPaths.size() > 0) {
|
||||||
context.setJobStatus(this.getClass().getSimpleName(), "Getting small files from partitions");
|
context.setJobStatus(this.getClass().getSimpleName(), "Getting small files from partitions");
|
||||||
partitionSmallFilesMap = context.mapToPair(partitionPaths, partitionPath -> new Tuple2<>(partitionPath, getSmallFiles(partitionPath)), 0);
|
partitionSmallFilesMap = context.mapToPair(partitionPaths,
|
||||||
|
partitionPath -> new ImmutablePair<>(partitionPath, getSmallFiles(partitionPath)), 0);
|
||||||
}
|
}
|
||||||
return partitionSmallFilesMap;
|
return partitionSmallFilesMap;
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -27,6 +27,7 @@ import org.apache.hudi.common.table.log.HoodieLogFormat;
|
|||||||
import org.apache.hudi.common.table.log.block.HoodieCommandBlock;
|
import org.apache.hudi.common.table.log.block.HoodieCommandBlock;
|
||||||
import org.apache.hudi.common.table.log.block.HoodieLogBlock;
|
import org.apache.hudi.common.table.log.block.HoodieLogBlock;
|
||||||
import org.apache.hudi.common.table.timeline.HoodieInstant;
|
import org.apache.hudi.common.table.timeline.HoodieInstant;
|
||||||
|
import org.apache.hudi.common.util.collection.ImmutablePair;
|
||||||
import org.apache.hudi.common.util.collection.Pair;
|
import org.apache.hudi.common.util.collection.Pair;
|
||||||
import org.apache.hudi.config.HoodieWriteConfig;
|
import org.apache.hudi.config.HoodieWriteConfig;
|
||||||
import org.apache.hudi.exception.HoodieIOException;
|
import org.apache.hudi.exception.HoodieIOException;
|
||||||
@@ -48,8 +49,6 @@ import java.util.Map;
|
|||||||
import java.util.Objects;
|
import java.util.Objects;
|
||||||
import java.util.stream.Collectors;
|
import java.util.stream.Collectors;
|
||||||
|
|
||||||
import scala.Tuple2;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Performs Rollback of Hoodie Tables.
|
* Performs Rollback of Hoodie Tables.
|
||||||
*/
|
*/
|
||||||
@@ -106,13 +105,13 @@ public class ListingBasedRollbackHelper implements Serializable {
|
|||||||
case DELETE_DATA_FILES_ONLY: {
|
case DELETE_DATA_FILES_ONLY: {
|
||||||
final Map<FileStatus, Boolean> filesToDeletedStatus = deleteBaseFiles(metaClient, config, instantToRollback.getTimestamp(),
|
final Map<FileStatus, Boolean> filesToDeletedStatus = deleteBaseFiles(metaClient, config, instantToRollback.getTimestamp(),
|
||||||
rollbackRequest.getPartitionPath(), doDelete);
|
rollbackRequest.getPartitionPath(), doDelete);
|
||||||
return new Tuple2<>(rollbackRequest.getPartitionPath(),
|
return new ImmutablePair<>(rollbackRequest.getPartitionPath(),
|
||||||
HoodieRollbackStat.newBuilder().withPartitionPath(rollbackRequest.getPartitionPath())
|
HoodieRollbackStat.newBuilder().withPartitionPath(rollbackRequest.getPartitionPath())
|
||||||
.withDeletedFileResults(filesToDeletedStatus).build());
|
.withDeletedFileResults(filesToDeletedStatus).build());
|
||||||
}
|
}
|
||||||
case DELETE_DATA_AND_LOG_FILES: {
|
case DELETE_DATA_AND_LOG_FILES: {
|
||||||
final Map<FileStatus, Boolean> filesToDeletedStatus = deleteBaseAndLogFiles(metaClient, config, instantToRollback.getTimestamp(), rollbackRequest.getPartitionPath(), doDelete);
|
final Map<FileStatus, Boolean> filesToDeletedStatus = deleteBaseAndLogFiles(metaClient, config, instantToRollback.getTimestamp(), rollbackRequest.getPartitionPath(), doDelete);
|
||||||
return new Tuple2<>(rollbackRequest.getPartitionPath(),
|
return new ImmutablePair<>(rollbackRequest.getPartitionPath(),
|
||||||
HoodieRollbackStat.newBuilder().withPartitionPath(rollbackRequest.getPartitionPath())
|
HoodieRollbackStat.newBuilder().withPartitionPath(rollbackRequest.getPartitionPath())
|
||||||
.withDeletedFileResults(filesToDeletedStatus).build());
|
.withDeletedFileResults(filesToDeletedStatus).build());
|
||||||
}
|
}
|
||||||
@@ -150,7 +149,7 @@ public class ListingBasedRollbackHelper implements Serializable {
|
|||||||
metaClient.getFs().getFileStatus(Objects.requireNonNull(writer).getLogFile().getPath()),
|
metaClient.getFs().getFileStatus(Objects.requireNonNull(writer).getLogFile().getPath()),
|
||||||
1L
|
1L
|
||||||
);
|
);
|
||||||
return new Tuple2<>(rollbackRequest.getPartitionPath(),
|
return new ImmutablePair<>(rollbackRequest.getPartitionPath(),
|
||||||
HoodieRollbackStat.newBuilder().withPartitionPath(rollbackRequest.getPartitionPath())
|
HoodieRollbackStat.newBuilder().withPartitionPath(rollbackRequest.getPartitionPath())
|
||||||
.withRollbackBlockAppendResults(filesToNumBlocksRollback).build());
|
.withRollbackBlockAppendResults(filesToNumBlocksRollback).build());
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -20,6 +20,7 @@ package org.apache.hudi.client.common;
|
|||||||
|
|
||||||
import org.apache.hudi.client.FlinkTaskContextSupplier;
|
import org.apache.hudi.client.FlinkTaskContextSupplier;
|
||||||
|
|
||||||
|
import org.apache.hudi.common.util.collection.ImmutablePair;
|
||||||
import org.junit.jupiter.api.Assertions;
|
import org.junit.jupiter.api.Assertions;
|
||||||
import org.junit.jupiter.api.BeforeEach;
|
import org.junit.jupiter.api.BeforeEach;
|
||||||
import org.junit.jupiter.api.Test;
|
import org.junit.jupiter.api.Test;
|
||||||
@@ -30,8 +31,6 @@ import java.util.Collection;
|
|||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
|
||||||
import scala.Tuple2;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Unit test against HoodieFlinkEngineContext.
|
* Unit test against HoodieFlinkEngineContext.
|
||||||
*/
|
*/
|
||||||
@@ -85,7 +84,7 @@ public class TestHoodieFlinkEngineContext {
|
|||||||
|
|
||||||
Map<String, String> resultMap = context.mapToPair(mapList, x -> {
|
Map<String, String> resultMap = context.mapToPair(mapList, x -> {
|
||||||
String[] splits = x.split("_");
|
String[] splits = x.split("_");
|
||||||
return Tuple2.apply(splits[0], splits[1]);
|
return new ImmutablePair<>(splits[0], splits[1]);
|
||||||
}, 2);
|
}, 2);
|
||||||
|
|
||||||
Assertions.assertEquals(resultMap.get("spark"), resultMap.get("flink"));
|
Assertions.assertEquals(resultMap.get("spark"), resultMap.get("flink"));
|
||||||
|
|||||||
@@ -25,7 +25,7 @@ import org.apache.hudi.client.common.function.SerializablePairFunction;
|
|||||||
import org.apache.hudi.common.config.SerializableConfiguration;
|
import org.apache.hudi.common.config.SerializableConfiguration;
|
||||||
import org.apache.hudi.common.util.Option;
|
import org.apache.hudi.common.util.Option;
|
||||||
|
|
||||||
import scala.Tuple2;
|
import org.apache.hudi.common.util.collection.Pair;
|
||||||
|
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
@@ -65,7 +65,7 @@ public class HoodieJavaEngineContext extends HoodieEngineContext {
|
|||||||
@Override
|
@Override
|
||||||
public <I, K, V> Map<K, V> mapToPair(List<I> data, SerializablePairFunction<I, K, V> func, Integer parallelism) {
|
public <I, K, V> Map<K, V> mapToPair(List<I> data, SerializablePairFunction<I, K, V> func, Integer parallelism) {
|
||||||
return data.stream().map(throwingMapToPairWrapper(func)).collect(
|
return data.stream().map(throwingMapToPairWrapper(func)).collect(
|
||||||
Collectors.toMap(Tuple2::_1, Tuple2::_2, (oldVal, newVal) -> newVal)
|
Collectors.toMap(Pair::getLeft, Pair::getRight, (oldVal, newVal) -> newVal)
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -20,11 +20,10 @@ package org.apache.hudi.client.common;
|
|||||||
|
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hudi.DummyTaskContextSupplier;
|
import org.apache.hudi.DummyTaskContextSupplier;
|
||||||
|
import org.apache.hudi.common.util.collection.ImmutablePair;
|
||||||
import org.junit.jupiter.api.Test;
|
import org.junit.jupiter.api.Test;
|
||||||
import org.junit.jupiter.api.Assertions;
|
import org.junit.jupiter.api.Assertions;
|
||||||
|
|
||||||
import scala.Tuple2;
|
|
||||||
|
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
import java.util.Collection;
|
import java.util.Collection;
|
||||||
@@ -77,7 +76,7 @@ public class TestHoodieJavaEngineContext {
|
|||||||
|
|
||||||
Map<String, String> resultMap = context.mapToPair(mapList, x -> {
|
Map<String, String> resultMap = context.mapToPair(mapList, x -> {
|
||||||
String[] splits = x.split("_");
|
String[] splits = x.split("_");
|
||||||
return Tuple2.apply(splits[0], splits[1]);
|
return new ImmutablePair<>(splits[0], splits[1]);
|
||||||
}, 2);
|
}, 2);
|
||||||
|
|
||||||
Assertions.assertNotNull(resultMap.get("hudi"));
|
Assertions.assertNotNull(resultMap.get("hudi"));
|
||||||
|
|||||||
@@ -24,10 +24,12 @@ import org.apache.hudi.client.common.function.SerializableConsumer;
|
|||||||
import org.apache.hudi.client.common.function.SerializableFunction;
|
import org.apache.hudi.client.common.function.SerializableFunction;
|
||||||
import org.apache.hudi.client.common.function.SerializablePairFunction;
|
import org.apache.hudi.client.common.function.SerializablePairFunction;
|
||||||
import org.apache.hudi.common.util.Option;
|
import org.apache.hudi.common.util.Option;
|
||||||
|
import org.apache.hudi.common.util.collection.Pair;
|
||||||
import org.apache.hudi.exception.HoodieException;
|
import org.apache.hudi.exception.HoodieException;
|
||||||
|
|
||||||
import org.apache.spark.api.java.JavaSparkContext;
|
import org.apache.spark.api.java.JavaSparkContext;
|
||||||
import org.apache.spark.sql.SQLContext;
|
import org.apache.spark.sql.SQLContext;
|
||||||
|
import scala.Tuple2;
|
||||||
|
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
@@ -82,9 +84,15 @@ public class HoodieSparkEngineContext extends HoodieEngineContext {
|
|||||||
@Override
|
@Override
|
||||||
public <I, K, V> Map<K, V> mapToPair(List<I> data, SerializablePairFunction<I, K, V> func, Integer parallelism) {
|
public <I, K, V> Map<K, V> mapToPair(List<I> data, SerializablePairFunction<I, K, V> func, Integer parallelism) {
|
||||||
if (Objects.nonNull(parallelism)) {
|
if (Objects.nonNull(parallelism)) {
|
||||||
return javaSparkContext.parallelize(data, parallelism).mapToPair(func::call).collectAsMap();
|
return javaSparkContext.parallelize(data, parallelism).mapToPair(input -> {
|
||||||
|
Pair<K, V> pair = func.call(input);
|
||||||
|
return new Tuple2(pair.getLeft(), pair.getRight());
|
||||||
|
}).collectAsMap();
|
||||||
} else {
|
} else {
|
||||||
return javaSparkContext.parallelize(data).mapToPair(func::call).collectAsMap();
|
return javaSparkContext.parallelize(data).mapToPair(input -> {
|
||||||
|
Pair<K, V> pair = func.call(input);
|
||||||
|
return new Tuple2(pair.getLeft(), pair.getRight());
|
||||||
|
}).collectAsMap();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -147,6 +147,7 @@ public class SparkHoodieSimpleIndex<T extends HoodieRecordPayload> extends Spark
|
|||||||
JavaSparkContext jsc = HoodieSparkEngineContext.getSparkContext(context);
|
JavaSparkContext jsc = HoodieSparkEngineContext.getSparkContext(context);
|
||||||
int fetchParallelism = Math.max(1, Math.max(baseFiles.size(), parallelism));
|
int fetchParallelism = Math.max(1, Math.max(baseFiles.size(), parallelism));
|
||||||
return jsc.parallelize(baseFiles, fetchParallelism)
|
return jsc.parallelize(baseFiles, fetchParallelism)
|
||||||
.flatMapToPair(partitionPathBaseFile -> new HoodieKeyLocationFetchHandle(config, hoodieTable, partitionPathBaseFile).locations());
|
.flatMapToPair(partitionPathBaseFile -> new HoodieKeyLocationFetchHandle(config, hoodieTable, partitionPathBaseFile)
|
||||||
|
.locations().map(x -> Tuple2.apply(((Pair)x).getLeft(), ((Pair)x).getRight())).iterator());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -95,9 +95,9 @@ public class TestHoodieKeyLocationFetchHandle extends HoodieClientTestHarness {
|
|||||||
|
|
||||||
for (Tuple2<String, HoodieBaseFile> entry : partitionPathFileIdPairs) {
|
for (Tuple2<String, HoodieBaseFile> entry : partitionPathFileIdPairs) {
|
||||||
HoodieKeyLocationFetchHandle fetcherHandle = new HoodieKeyLocationFetchHandle(config, hoodieTable, Pair.of(entry._1, entry._2));
|
HoodieKeyLocationFetchHandle fetcherHandle = new HoodieKeyLocationFetchHandle(config, hoodieTable, Pair.of(entry._1, entry._2));
|
||||||
Iterator<Tuple2<HoodieKey, HoodieRecordLocation>> result = fetcherHandle.locations();
|
Iterator<Pair<HoodieKey, HoodieRecordLocation>> result = fetcherHandle.locations().iterator();
|
||||||
List<Tuple2<HoodieKey, HoodieRecordLocation>> actualList = new ArrayList<>();
|
List<Tuple2<HoodieKey, HoodieRecordLocation>> actualList = new ArrayList<>();
|
||||||
result.forEachRemaining(actualList::add);
|
result.forEachRemaining(x -> actualList.add(new Tuple2<>(x.getLeft(), x.getRight())));
|
||||||
assertEquals(expectedList.get(new Tuple2<>(entry._1, entry._2.getFileId())), actualList);
|
assertEquals(expectedList.get(new Tuple2<>(entry._1, entry._2.getFileId())), actualList);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -304,5 +304,10 @@
|
|||||||
<!-- Checks for simple boolean expressions. -->
|
<!-- Checks for simple boolean expressions. -->
|
||||||
<module name="SimplifyBooleanExpression"/>
|
<module name="SimplifyBooleanExpression"/>
|
||||||
|
|
||||||
|
<module name="ImportControl">
|
||||||
|
<property name="file" value="style/import-control.xml"/>
|
||||||
|
<property name="path" value="^.*[\\/]hudi-client[\\/]hudi-client-common[\\/]src[\\/].*$"/>
|
||||||
|
</module>
|
||||||
|
|
||||||
</module>
|
</module>
|
||||||
</module>
|
</module>
|
||||||
|
|||||||
26
style/import-control.xml
Normal file
26
style/import-control.xml
Normal file
@@ -0,0 +1,26 @@
|
|||||||
|
<?xml version="1.0"?>
|
||||||
|
<!--
|
||||||
|
Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
contributor license agreements. See the NOTICE file distributed with
|
||||||
|
this work for additional information regarding copyright ownership.
|
||||||
|
The ASF licenses this file to You under the Apache License, Version 2.0
|
||||||
|
(the "License"); you may not use this file except in compliance with
|
||||||
|
the License. You may obtain a copy of the License at
|
||||||
|
|
||||||
|
http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
|
||||||
|
Unless required by applicable law or agreed to in writing, software
|
||||||
|
distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
See the License for the specific language governing permissions and
|
||||||
|
limitations under the License.
|
||||||
|
-->
|
||||||
|
<!DOCTYPE import-control PUBLIC
|
||||||
|
"-//Checkstyle//DTD ImportControl Configuration 1.4//EN"
|
||||||
|
"https://checkstyle.org/dtds/import_control_1_4.dtd">
|
||||||
|
|
||||||
|
<import-control pkg="org" strategyOnMismatch="allowed">
|
||||||
|
<disallow pkg="scala"/>
|
||||||
|
</import-control>
|
||||||
|
|
||||||
|
|
||||||
Reference in New Issue
Block a user