[HUDI-622]: Remove VisibleForTesting annotation and import from code (#1343)
* HUDI:622: Remove VisibleForTesting annotation and import from code
This commit is contained in:
@@ -37,7 +37,6 @@ import org.apache.hudi.metrics.HoodieMetrics;
|
|||||||
import org.apache.hudi.table.HoodieTable;
|
import org.apache.hudi.table.HoodieTable;
|
||||||
|
|
||||||
import com.codahale.metrics.Timer;
|
import com.codahale.metrics.Timer;
|
||||||
import com.google.common.annotations.VisibleForTesting;
|
|
||||||
import com.google.common.base.Preconditions;
|
import com.google.common.base.Preconditions;
|
||||||
import org.apache.log4j.LogManager;
|
import org.apache.log4j.LogManager;
|
||||||
import org.apache.log4j.Logger;
|
import org.apache.log4j.Logger;
|
||||||
@@ -108,7 +107,6 @@ public class HoodieCleanClient<T extends HoodieRecordPayload> extends AbstractHo
|
|||||||
* @param startCleanTime Cleaner Instant Time
|
* @param startCleanTime Cleaner Instant Time
|
||||||
* @return Cleaner Plan if generated
|
* @return Cleaner Plan if generated
|
||||||
*/
|
*/
|
||||||
@VisibleForTesting
|
|
||||||
protected Option<HoodieCleanerPlan> scheduleClean(String startCleanTime) {
|
protected Option<HoodieCleanerPlan> scheduleClean(String startCleanTime) {
|
||||||
// Create a Hoodie table which encapsulated the commits and files visible
|
// Create a Hoodie table which encapsulated the commits and files visible
|
||||||
HoodieTable<T> table = HoodieTable.getHoodieTable(createMetaClient(true), config, jsc);
|
HoodieTable<T> table = HoodieTable.getHoodieTable(createMetaClient(true), config, jsc);
|
||||||
@@ -138,7 +136,6 @@ public class HoodieCleanClient<T extends HoodieRecordPayload> extends AbstractHo
|
|||||||
* @param table Hoodie Table
|
* @param table Hoodie Table
|
||||||
* @param cleanInstant Cleaner Instant
|
* @param cleanInstant Cleaner Instant
|
||||||
*/
|
*/
|
||||||
@VisibleForTesting
|
|
||||||
protected HoodieCleanMetadata runClean(HoodieTable<T> table, HoodieInstant cleanInstant) {
|
protected HoodieCleanMetadata runClean(HoodieTable<T> table, HoodieInstant cleanInstant) {
|
||||||
try {
|
try {
|
||||||
HoodieCleanerPlan cleanerPlan = CleanerUtils.getCleanerPlan(table.getMetaClient(), cleanInstant);
|
HoodieCleanerPlan cleanerPlan = CleanerUtils.getCleanerPlan(table.getMetaClient(), cleanInstant);
|
||||||
|
|||||||
@@ -61,7 +61,6 @@ import org.apache.hudi.table.WorkloadProfile;
|
|||||||
import org.apache.hudi.table.WorkloadStat;
|
import org.apache.hudi.table.WorkloadStat;
|
||||||
|
|
||||||
import com.codahale.metrics.Timer;
|
import com.codahale.metrics.Timer;
|
||||||
import com.google.common.annotations.VisibleForTesting;
|
|
||||||
import com.google.common.base.Preconditions;
|
import com.google.common.base.Preconditions;
|
||||||
import com.google.common.collect.ImmutableMap;
|
import com.google.common.collect.ImmutableMap;
|
||||||
import org.apache.log4j.LogManager;
|
import org.apache.log4j.LogManager;
|
||||||
@@ -121,7 +120,6 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> extends AbstractHo
|
|||||||
this(jsc, clientConfig, rollbackPending, HoodieIndex.createIndex(clientConfig, jsc));
|
this(jsc, clientConfig, rollbackPending, HoodieIndex.createIndex(clientConfig, jsc));
|
||||||
}
|
}
|
||||||
|
|
||||||
@VisibleForTesting
|
|
||||||
HoodieWriteClient(JavaSparkContext jsc, HoodieWriteConfig clientConfig, boolean rollbackPending, HoodieIndex index) {
|
HoodieWriteClient(JavaSparkContext jsc, HoodieWriteConfig clientConfig, boolean rollbackPending, HoodieIndex index) {
|
||||||
this(jsc, clientConfig, rollbackPending, index, Option.empty());
|
this(jsc, clientConfig, rollbackPending, index, Option.empty());
|
||||||
}
|
}
|
||||||
@@ -1113,7 +1111,6 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> extends AbstractHo
|
|||||||
* @param inflightInstant Inflight Compaction Instant
|
* @param inflightInstant Inflight Compaction Instant
|
||||||
* @param table Hoodie Table
|
* @param table Hoodie Table
|
||||||
*/
|
*/
|
||||||
@VisibleForTesting
|
|
||||||
void rollbackInflightCompaction(HoodieInstant inflightInstant, HoodieTable table) throws IOException {
|
void rollbackInflightCompaction(HoodieInstant inflightInstant, HoodieTable table) throws IOException {
|
||||||
table.rollback(jsc, inflightInstant, false);
|
table.rollback(jsc, inflightInstant, false);
|
||||||
// Revert instant state file
|
// Revert instant state file
|
||||||
|
|||||||
@@ -20,7 +20,6 @@ package org.apache.hudi.index.bloom;
|
|||||||
|
|
||||||
import org.apache.hudi.common.util.collection.Pair;
|
import org.apache.hudi.common.util.collection.Pair;
|
||||||
|
|
||||||
import com.google.common.annotations.VisibleForTesting;
|
|
||||||
import com.google.common.hash.Hashing;
|
import com.google.common.hash.Hashing;
|
||||||
import org.apache.log4j.LogManager;
|
import org.apache.log4j.LogManager;
|
||||||
import org.apache.log4j.Logger;
|
import org.apache.log4j.Logger;
|
||||||
@@ -152,7 +151,6 @@ public class BucketizedBloomCheckPartitioner extends Partitioner {
|
|||||||
return candidatePartitions.get(idx);
|
return candidatePartitions.get(idx);
|
||||||
}
|
}
|
||||||
|
|
||||||
@VisibleForTesting
|
|
||||||
Map<String, List<Integer>> getFileGroupToPartitions() {
|
Map<String, List<Integer>> getFileGroupToPartitions() {
|
||||||
return fileGroupToPartitions;
|
return fileGroupToPartitions;
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -32,7 +32,6 @@ import org.apache.hudi.index.HoodieIndex;
|
|||||||
import org.apache.hudi.io.HoodieRangeInfoHandle;
|
import org.apache.hudi.io.HoodieRangeInfoHandle;
|
||||||
import org.apache.hudi.table.HoodieTable;
|
import org.apache.hudi.table.HoodieTable;
|
||||||
|
|
||||||
import com.google.common.annotations.VisibleForTesting;
|
|
||||||
import org.apache.log4j.LogManager;
|
import org.apache.log4j.LogManager;
|
||||||
import org.apache.log4j.Logger;
|
import org.apache.log4j.Logger;
|
||||||
import org.apache.spark.Partitioner;
|
import org.apache.spark.Partitioner;
|
||||||
@@ -188,7 +187,6 @@ public class HoodieBloomIndex<T extends HoodieRecordPayload> extends HoodieIndex
|
|||||||
/**
|
/**
|
||||||
* Load all involved files as <Partition, filename> pair RDD.
|
* Load all involved files as <Partition, filename> pair RDD.
|
||||||
*/
|
*/
|
||||||
@VisibleForTesting
|
|
||||||
List<Tuple2<String, BloomIndexFileInfo>> loadInvolvedFiles(List<String> partitions, final JavaSparkContext jsc,
|
List<Tuple2<String, BloomIndexFileInfo>> loadInvolvedFiles(List<String> partitions, final JavaSparkContext jsc,
|
||||||
final HoodieTable hoodieTable) {
|
final HoodieTable hoodieTable) {
|
||||||
|
|
||||||
@@ -262,7 +260,6 @@ public class HoodieBloomIndex<T extends HoodieRecordPayload> extends HoodieIndex
|
|||||||
* Sub-partition to ensure the records can be looked up against files & also prune file<=>record comparisons based on
|
* Sub-partition to ensure the records can be looked up against files & also prune file<=>record comparisons based on
|
||||||
* recordKey ranges in the index info.
|
* recordKey ranges in the index info.
|
||||||
*/
|
*/
|
||||||
@VisibleForTesting
|
|
||||||
JavaRDD<Tuple2<String, HoodieKey>> explodeRecordRDDWithFileComparisons(
|
JavaRDD<Tuple2<String, HoodieKey>> explodeRecordRDDWithFileComparisons(
|
||||||
final Map<String, List<BloomIndexFileInfo>> partitionToFileIndexInfo,
|
final Map<String, List<BloomIndexFileInfo>> partitionToFileIndexInfo,
|
||||||
JavaPairRDD<String, String> partitionRecordKeyPairRDD) {
|
JavaPairRDD<String, String> partitionRecordKeyPairRDD) {
|
||||||
@@ -289,7 +286,6 @@ public class HoodieBloomIndex<T extends HoodieRecordPayload> extends HoodieIndex
|
|||||||
* <p>
|
* <p>
|
||||||
* Make sure the parallelism is atleast the groupby parallelism for tagging location
|
* Make sure the parallelism is atleast the groupby parallelism for tagging location
|
||||||
*/
|
*/
|
||||||
@VisibleForTesting
|
|
||||||
JavaPairRDD<HoodieKey, HoodieRecordLocation> findMatchingFilesForRecordKeys(
|
JavaPairRDD<HoodieKey, HoodieRecordLocation> findMatchingFilesForRecordKeys(
|
||||||
final Map<String, List<BloomIndexFileInfo>> partitionToFileIndexInfo,
|
final Map<String, List<BloomIndexFileInfo>> partitionToFileIndexInfo,
|
||||||
JavaPairRDD<String, String> partitionRecordKeyPairRDD, int shuffleParallelism, HoodieTable hoodieTable,
|
JavaPairRDD<String, String> partitionRecordKeyPairRDD, int shuffleParallelism, HoodieTable hoodieTable,
|
||||||
|
|||||||
@@ -30,7 +30,6 @@ import org.apache.hudi.config.HoodieWriteConfig;
|
|||||||
import org.apache.hudi.exception.HoodieIOException;
|
import org.apache.hudi.exception.HoodieIOException;
|
||||||
import org.apache.hudi.table.HoodieTable;
|
import org.apache.hudi.table.HoodieTable;
|
||||||
|
|
||||||
import com.google.common.annotations.VisibleForTesting;
|
|
||||||
import org.apache.spark.api.java.JavaPairRDD;
|
import org.apache.spark.api.java.JavaPairRDD;
|
||||||
import org.apache.spark.api.java.JavaRDD;
|
import org.apache.spark.api.java.JavaRDD;
|
||||||
import org.apache.spark.api.java.JavaSparkContext;
|
import org.apache.spark.api.java.JavaSparkContext;
|
||||||
@@ -59,7 +58,6 @@ public class HoodieGlobalBloomIndex<T extends HoodieRecordPayload> extends Hoodi
|
|||||||
* Load all involved files as <Partition, filename> pair RDD from all partitions in the table.
|
* Load all involved files as <Partition, filename> pair RDD from all partitions in the table.
|
||||||
*/
|
*/
|
||||||
@Override
|
@Override
|
||||||
@VisibleForTesting
|
|
||||||
List<Tuple2<String, BloomIndexFileInfo>> loadInvolvedFiles(List<String> partitions, final JavaSparkContext jsc,
|
List<Tuple2<String, BloomIndexFileInfo>> loadInvolvedFiles(List<String> partitions, final JavaSparkContext jsc,
|
||||||
final HoodieTable hoodieTable) {
|
final HoodieTable hoodieTable) {
|
||||||
HoodieTableMetaClient metaClient = hoodieTable.getMetaClient();
|
HoodieTableMetaClient metaClient = hoodieTable.getMetaClient();
|
||||||
@@ -83,7 +81,6 @@ public class HoodieGlobalBloomIndex<T extends HoodieRecordPayload> extends Hoodi
|
|||||||
*/
|
*/
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
@VisibleForTesting
|
|
||||||
JavaRDD<Tuple2<String, HoodieKey>> explodeRecordRDDWithFileComparisons(
|
JavaRDD<Tuple2<String, HoodieKey>> explodeRecordRDDWithFileComparisons(
|
||||||
final Map<String, List<BloomIndexFileInfo>> partitionToFileIndexInfo,
|
final Map<String, List<BloomIndexFileInfo>> partitionToFileIndexInfo,
|
||||||
JavaPairRDD<String, String> partitionRecordKeyPairRDD) {
|
JavaPairRDD<String, String> partitionRecordKeyPairRDD) {
|
||||||
|
|||||||
@@ -36,7 +36,6 @@ import org.apache.hudi.exception.HoodieIndexException;
|
|||||||
import org.apache.hudi.index.HoodieIndex;
|
import org.apache.hudi.index.HoodieIndex;
|
||||||
import org.apache.hudi.table.HoodieTable;
|
import org.apache.hudi.table.HoodieTable;
|
||||||
|
|
||||||
import com.google.common.annotations.VisibleForTesting;
|
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.hbase.HBaseConfiguration;
|
import org.apache.hadoop.hbase.HBaseConfiguration;
|
||||||
import org.apache.hadoop.hbase.HRegionLocation;
|
import org.apache.hadoop.hbase.HRegionLocation;
|
||||||
@@ -114,7 +113,6 @@ public class HBaseIndex<T extends HoodieRecordPayload> extends HoodieIndex<T> {
|
|||||||
this.hBaseIndexQPSResourceAllocator = createQPSResourceAllocator(this.config);
|
this.hBaseIndexQPSResourceAllocator = createQPSResourceAllocator(this.config);
|
||||||
}
|
}
|
||||||
|
|
||||||
@VisibleForTesting
|
|
||||||
public HBaseIndexQPSResourceAllocator createQPSResourceAllocator(HoodieWriteConfig config) {
|
public HBaseIndexQPSResourceAllocator createQPSResourceAllocator(HoodieWriteConfig config) {
|
||||||
try {
|
try {
|
||||||
LOG.info("createQPSResourceAllocator :" + config.getHBaseQPSResourceAllocatorClass());
|
LOG.info("createQPSResourceAllocator :" + config.getHBaseQPSResourceAllocatorClass());
|
||||||
@@ -387,7 +385,6 @@ public class HBaseIndex<T extends HoodieRecordPayload> extends HoodieIndex<T> {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@VisibleForTesting
|
|
||||||
public Tuple2<Long, Integer> getHBasePutAccessParallelism(final JavaRDD<WriteStatus> writeStatusRDD) {
|
public Tuple2<Long, Integer> getHBasePutAccessParallelism(final JavaRDD<WriteStatus> writeStatusRDD) {
|
||||||
final JavaPairRDD<Long, Integer> insertOnlyWriteStatusRDD = writeStatusRDD
|
final JavaPairRDD<Long, Integer> insertOnlyWriteStatusRDD = writeStatusRDD
|
||||||
.filter(w -> w.getStat().getNumInserts() > 0).mapToPair(w -> new Tuple2<>(w.getStat().getNumInserts(), 1));
|
.filter(w -> w.getStat().getNumInserts() > 0).mapToPair(w -> new Tuple2<>(w.getStat().getNumInserts(), 1));
|
||||||
@@ -497,7 +494,6 @@ public class HBaseIndex<T extends HoodieRecordPayload> extends HoodieIndex<T> {
|
|||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
@VisibleForTesting
|
|
||||||
public void setHbaseConnection(Connection hbaseConnection) {
|
public void setHbaseConnection(Connection hbaseConnection) {
|
||||||
HBaseIndex.hbaseConnection = hbaseConnection;
|
HBaseIndex.hbaseConnection = hbaseConnection;
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -22,8 +22,6 @@ import org.apache.hudi.avro.model.HoodieCompactionOperation;
|
|||||||
import org.apache.hudi.avro.model.HoodieCompactionPlan;
|
import org.apache.hudi.avro.model.HoodieCompactionPlan;
|
||||||
import org.apache.hudi.config.HoodieWriteConfig;
|
import org.apache.hudi.config.HoodieWriteConfig;
|
||||||
|
|
||||||
import com.google.common.annotations.VisibleForTesting;
|
|
||||||
|
|
||||||
import java.text.SimpleDateFormat;
|
import java.text.SimpleDateFormat;
|
||||||
import java.util.Calendar;
|
import java.util.Calendar;
|
||||||
import java.util.Comparator;
|
import java.util.Comparator;
|
||||||
@@ -68,7 +66,6 @@ public class BoundedPartitionAwareCompactionStrategy extends DayBasedCompactionS
|
|||||||
.filter(e -> comparator.compare(earliestPartitionPathToCompact, e) >= 0).collect(Collectors.toList());
|
.filter(e -> comparator.compare(earliestPartitionPathToCompact, e) >= 0).collect(Collectors.toList());
|
||||||
}
|
}
|
||||||
|
|
||||||
@VisibleForTesting
|
|
||||||
public static Date getDateAtOffsetFromToday(int offset) {
|
public static Date getDateAtOffsetFromToday(int offset) {
|
||||||
Calendar calendar = Calendar.getInstance();
|
Calendar calendar = Calendar.getInstance();
|
||||||
calendar.add(Calendar.DATE, offset);
|
calendar.add(Calendar.DATE, offset);
|
||||||
|
|||||||
@@ -23,8 +23,6 @@ import org.apache.hudi.avro.model.HoodieCompactionPlan;
|
|||||||
import org.apache.hudi.config.HoodieWriteConfig;
|
import org.apache.hudi.config.HoodieWriteConfig;
|
||||||
import org.apache.hudi.exception.HoodieException;
|
import org.apache.hudi.exception.HoodieException;
|
||||||
|
|
||||||
import com.google.common.annotations.VisibleForTesting;
|
|
||||||
|
|
||||||
import java.text.ParseException;
|
import java.text.ParseException;
|
||||||
import java.text.SimpleDateFormat;
|
import java.text.SimpleDateFormat;
|
||||||
import java.util.Comparator;
|
import java.util.Comparator;
|
||||||
@@ -55,7 +53,6 @@ public class DayBasedCompactionStrategy extends CompactionStrategy {
|
|||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
@VisibleForTesting
|
|
||||||
public Comparator<String> getComparator() {
|
public Comparator<String> getComparator() {
|
||||||
return comparator;
|
return comparator;
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -23,7 +23,6 @@ import org.apache.hudi.common.table.HoodieTimeline;
|
|||||||
import org.apache.hudi.config.HoodieWriteConfig;
|
import org.apache.hudi.config.HoodieWriteConfig;
|
||||||
|
|
||||||
import com.codahale.metrics.Timer;
|
import com.codahale.metrics.Timer;
|
||||||
import com.google.common.annotations.VisibleForTesting;
|
|
||||||
import org.apache.log4j.LogManager;
|
import org.apache.log4j.LogManager;
|
||||||
import org.apache.log4j.Logger;
|
import org.apache.log4j.Logger;
|
||||||
|
|
||||||
@@ -187,7 +186,6 @@ public class HoodieMetrics {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@VisibleForTesting
|
|
||||||
String getMetricsName(String action, String metric) {
|
String getMetricsName(String action, String metric) {
|
||||||
return config == null ? null : String.format("%s.%s.%s", tableName, action, metric);
|
return config == null ? null : String.format("%s.%s.%s", tableName, action, metric);
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -42,7 +42,6 @@ import org.apache.hudi.func.MergeOnReadLazyInsertIterable;
|
|||||||
import org.apache.hudi.io.HoodieAppendHandle;
|
import org.apache.hudi.io.HoodieAppendHandle;
|
||||||
import org.apache.hudi.io.compact.HoodieMergeOnReadTableCompactor;
|
import org.apache.hudi.io.compact.HoodieMergeOnReadTableCompactor;
|
||||||
|
|
||||||
import com.google.common.annotations.VisibleForTesting;
|
|
||||||
import com.google.common.base.Preconditions;
|
import com.google.common.base.Preconditions;
|
||||||
import org.apache.log4j.LogManager;
|
import org.apache.log4j.LogManager;
|
||||||
import org.apache.log4j.Logger;
|
import org.apache.log4j.Logger;
|
||||||
@@ -409,7 +408,6 @@ public class HoodieMergeOnReadTable<T extends HoodieRecordPayload> extends Hoodi
|
|||||||
}
|
}
|
||||||
|
|
||||||
// TODO (NA) : Make this static part of utility
|
// TODO (NA) : Make this static part of utility
|
||||||
@VisibleForTesting
|
|
||||||
public long convertLogFilesSizeToExpectedParquetSize(List<HoodieLogFile> hoodieLogFiles) {
|
public long convertLogFilesSizeToExpectedParquetSize(List<HoodieLogFile> hoodieLogFiles) {
|
||||||
long totalSizeOfLogFiles = hoodieLogFiles.stream().map(HoodieLogFile::getFileSize)
|
long totalSizeOfLogFiles = hoodieLogFiles.stream().map(HoodieLogFile::getFileSize)
|
||||||
.filter(size -> size > 0).reduce(Long::sum).orElse(0L);
|
.filter(size -> size > 0).reduce(Long::sum).orElse(0L);
|
||||||
|
|||||||
@@ -145,8 +145,8 @@ abstract class InternalFilter implements Writable {
|
|||||||
if (keys == null) {
|
if (keys == null) {
|
||||||
throw new IllegalArgumentException("Key[] may not be null");
|
throw new IllegalArgumentException("Key[] may not be null");
|
||||||
}
|
}
|
||||||
for (int i = 0; i < keys.length; i++) {
|
for (Key key : keys) {
|
||||||
add(keys[i]);
|
add(key);
|
||||||
}
|
}
|
||||||
} //end add()
|
} //end add()
|
||||||
|
|
||||||
|
|||||||
@@ -169,7 +169,7 @@ public final class BufferedRandomAccessFile extends RandomAccessFile {
|
|||||||
private int fillBuffer() throws IOException {
|
private int fillBuffer() throws IOException {
|
||||||
int cnt = 0;
|
int cnt = 0;
|
||||||
int bytesToRead = this.capacity;
|
int bytesToRead = this.capacity;
|
||||||
/** blocking read, until buffer is filled or EOF reached */
|
// blocking read, until buffer is filled or EOF reached
|
||||||
while (bytesToRead > 0) {
|
while (bytesToRead > 0) {
|
||||||
int n = super.read(this.dataBuffer.array(), cnt, bytesToRead);
|
int n = super.read(this.dataBuffer.array(), cnt, bytesToRead);
|
||||||
if (n < 0) {
|
if (n < 0) {
|
||||||
|
|||||||
@@ -28,7 +28,6 @@ import org.apache.hudi.exception.HoodieException;
|
|||||||
import org.apache.hudi.exception.HoodieIOException;
|
import org.apache.hudi.exception.HoodieIOException;
|
||||||
import org.apache.hudi.exception.InvalidHoodiePathException;
|
import org.apache.hudi.exception.InvalidHoodiePathException;
|
||||||
|
|
||||||
import com.google.common.annotations.VisibleForTesting;
|
|
||||||
import com.google.common.base.Preconditions;
|
import com.google.common.base.Preconditions;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.fs.FSDataInputStream;
|
import org.apache.hadoop.fs.FSDataInputStream;
|
||||||
@@ -216,7 +215,6 @@ public class FSUtils {
|
|||||||
* @param excludeMetaFolder Exclude .hoodie folder
|
* @param excludeMetaFolder Exclude .hoodie folder
|
||||||
* @throws IOException
|
* @throws IOException
|
||||||
*/
|
*/
|
||||||
@VisibleForTesting
|
|
||||||
static void processFiles(FileSystem fs, String basePathStr, Function<FileStatus, Boolean> consumer,
|
static void processFiles(FileSystem fs, String basePathStr, Function<FileStatus, Boolean> consumer,
|
||||||
boolean excludeMetaFolder) throws IOException {
|
boolean excludeMetaFolder) throws IOException {
|
||||||
PathFilter pathFilter = excludeMetaFolder ? getExcludeMetaPathFilter() : ALLOW_ALL_FILTER;
|
PathFilter pathFilter = excludeMetaFolder ? getExcludeMetaPathFilter() : ALLOW_ALL_FILTER;
|
||||||
|
|||||||
@@ -16,7 +16,6 @@
|
|||||||
|
|
||||||
package org.apache.hudi.common.util;
|
package org.apache.hudi.common.util;
|
||||||
|
|
||||||
import com.google.common.annotations.VisibleForTesting;
|
|
||||||
import com.google.common.base.Preconditions;
|
import com.google.common.base.Preconditions;
|
||||||
import com.google.common.cache.CacheBuilder;
|
import com.google.common.cache.CacheBuilder;
|
||||||
import com.google.common.cache.CacheLoader;
|
import com.google.common.cache.CacheLoader;
|
||||||
@@ -252,7 +251,6 @@ public class ObjectSizeCalculator {
|
|||||||
size += objectSize;
|
size += objectSize;
|
||||||
}
|
}
|
||||||
|
|
||||||
@VisibleForTesting
|
|
||||||
static long roundTo(long x, int multiple) {
|
static long roundTo(long x, int multiple) {
|
||||||
return ((x + multiple - 1) / multiple) * multiple;
|
return ((x + multiple - 1) / multiple) * multiple;
|
||||||
}
|
}
|
||||||
@@ -325,7 +323,6 @@ public class ObjectSizeCalculator {
|
|||||||
throw new AssertionError("Encountered unexpected primitive type " + type.getName());
|
throw new AssertionError("Encountered unexpected primitive type " + type.getName());
|
||||||
}
|
}
|
||||||
|
|
||||||
@VisibleForTesting
|
|
||||||
static MemoryLayoutSpecification getEffectiveMemoryLayoutSpecification() {
|
static MemoryLayoutSpecification getEffectiveMemoryLayoutSpecification() {
|
||||||
final String vmName = System.getProperty("java.vm.name");
|
final String vmName = System.getProperty("java.vm.name");
|
||||||
if (vmName == null || !(vmName.startsWith("Java HotSpot(TM) ") || vmName.startsWith("OpenJDK")
|
if (vmName == null || !(vmName.startsWith("Java HotSpot(TM) ") || vmName.startsWith("OpenJDK")
|
||||||
|
|||||||
@@ -22,7 +22,6 @@ import org.apache.hudi.common.util.collection.Pair;
|
|||||||
import org.apache.hudi.exception.HoodieException;
|
import org.apache.hudi.exception.HoodieException;
|
||||||
import org.apache.hudi.exception.HoodieIOException;
|
import org.apache.hudi.exception.HoodieIOException;
|
||||||
|
|
||||||
import com.google.common.annotations.VisibleForTesting;
|
|
||||||
import com.google.common.base.Preconditions;
|
import com.google.common.base.Preconditions;
|
||||||
import org.apache.log4j.LogManager;
|
import org.apache.log4j.LogManager;
|
||||||
import org.apache.log4j.Logger;
|
import org.apache.log4j.Logger;
|
||||||
@@ -450,7 +449,6 @@ public class RocksDBDAO {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@VisibleForTesting
|
|
||||||
String getRocksDBBasePath() {
|
String getRocksDBBasePath() {
|
||||||
return rocksDBBasePath;
|
return rocksDBBasePath;
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -23,7 +23,6 @@ import org.apache.hudi.common.util.Option;
|
|||||||
import org.apache.hudi.common.util.SizeEstimator;
|
import org.apache.hudi.common.util.SizeEstimator;
|
||||||
import org.apache.hudi.exception.HoodieException;
|
import org.apache.hudi.exception.HoodieException;
|
||||||
|
|
||||||
import com.google.common.annotations.VisibleForTesting;
|
|
||||||
import com.google.common.base.Preconditions;
|
import com.google.common.base.Preconditions;
|
||||||
import org.apache.log4j.LogManager;
|
import org.apache.log4j.LogManager;
|
||||||
import org.apache.log4j.Logger;
|
import org.apache.log4j.Logger;
|
||||||
@@ -62,7 +61,6 @@ public class BoundedInMemoryQueue<I, O> implements Iterable<O> {
|
|||||||
// It indicates number of records to cache. We will be using sampled record's average size to
|
// It indicates number of records to cache. We will be using sampled record's average size to
|
||||||
// determine how many
|
// determine how many
|
||||||
// records we should cache and will change (increase/decrease) permits accordingly.
|
// records we should cache and will change (increase/decrease) permits accordingly.
|
||||||
@VisibleForTesting
|
|
||||||
public final Semaphore rateLimiter = new Semaphore(1);
|
public final Semaphore rateLimiter = new Semaphore(1);
|
||||||
// used for sampling records with "RECORD_SAMPLING_RATE" frequency.
|
// used for sampling records with "RECORD_SAMPLING_RATE" frequency.
|
||||||
public final AtomicLong samplingRecordCounter = new AtomicLong(-1);
|
public final AtomicLong samplingRecordCounter = new AtomicLong(-1);
|
||||||
@@ -86,10 +84,8 @@ public class BoundedInMemoryQueue<I, O> implements Iterable<O> {
|
|||||||
private final QueueIterator iterator;
|
private final QueueIterator iterator;
|
||||||
// indicates rate limit (number of records to cache). it is updated whenever there is a change
|
// indicates rate limit (number of records to cache). it is updated whenever there is a change
|
||||||
// in avg record size.
|
// in avg record size.
|
||||||
@VisibleForTesting
|
|
||||||
public int currentRateLimit = 1;
|
public int currentRateLimit = 1;
|
||||||
// indicates avg record size in bytes. It is updated whenever a new record is sampled.
|
// indicates avg record size in bytes. It is updated whenever a new record is sampled.
|
||||||
@VisibleForTesting
|
|
||||||
public long avgRecordSizeInBytes = 0;
|
public long avgRecordSizeInBytes = 0;
|
||||||
// indicates number of samples collected so far.
|
// indicates number of samples collected so far.
|
||||||
private long numSamples = 0;
|
private long numSamples = 0;
|
||||||
@@ -119,7 +115,6 @@ public class BoundedInMemoryQueue<I, O> implements Iterable<O> {
|
|||||||
this.iterator = new QueueIterator();
|
this.iterator = new QueueIterator();
|
||||||
}
|
}
|
||||||
|
|
||||||
@VisibleForTesting
|
|
||||||
public int size() {
|
public int size() {
|
||||||
return this.queue.size();
|
return this.queue.size();
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -127,7 +127,7 @@ object AvroConversionHelper {
|
|||||||
new Timestamp(item.asInstanceOf[Long])
|
new Timestamp(item.asInstanceOf[Long])
|
||||||
case other =>
|
case other =>
|
||||||
throw new IncompatibleSchemaException(
|
throw new IncompatibleSchemaException(
|
||||||
s"Cannot convert Avro logical type ${other} to Catalyst Timestamp type.")
|
s"Cannot convert Avro logical type $other to Catalyst Timestamp type.")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
case (struct: StructType, RECORD) =>
|
case (struct: StructType, RECORD) =>
|
||||||
@@ -215,7 +215,7 @@ object AvroConversionHelper {
|
|||||||
createConverter(Schema.createUnion(remainingUnionTypes.asJava), sqlType, path)
|
createConverter(Schema.createUnion(remainingUnionTypes.asJava), sqlType, path)
|
||||||
}
|
}
|
||||||
} else avroSchema.getTypes.asScala.map(_.getType) match {
|
} else avroSchema.getTypes.asScala.map(_.getType) match {
|
||||||
case Seq(t1) => createConverter(avroSchema.getTypes.get(0), sqlType, path)
|
case Seq(_) => createConverter(avroSchema.getTypes.get(0), sqlType, path)
|
||||||
case Seq(a, b) if Set(a, b) == Set(INT, LONG) && sqlType == LongType =>
|
case Seq(a, b) if Set(a, b) == Set(INT, LONG) && sqlType == LongType =>
|
||||||
(item: AnyRef) => {
|
(item: AnyRef) => {
|
||||||
item match {
|
item match {
|
||||||
@@ -286,7 +286,7 @@ object AvroConversionHelper {
|
|||||||
case ShortType => (item: Any) =>
|
case ShortType => (item: Any) =>
|
||||||
if (item == null) null else item.asInstanceOf[Short].intValue
|
if (item == null) null else item.asInstanceOf[Short].intValue
|
||||||
case dec: DecimalType => (item: Any) =>
|
case dec: DecimalType => (item: Any) =>
|
||||||
Option(item).map { i =>
|
Option(item).map { _ =>
|
||||||
val bigDecimalValue = item.asInstanceOf[java.math.BigDecimal]
|
val bigDecimalValue = item.asInstanceOf[java.math.BigDecimal]
|
||||||
val decimalConversions = new DecimalConversion()
|
val decimalConversions = new DecimalConversion()
|
||||||
decimalConversions.toFixed(bigDecimalValue, avroSchema.getField(structName).schema().getTypes.get(0),
|
decimalConversions.toFixed(bigDecimalValue, avroSchema.getField(structName).schema().getTypes.get(0),
|
||||||
|
|||||||
@@ -35,7 +35,6 @@ import com.beust.jcommander.IValueValidator;
|
|||||||
import com.beust.jcommander.JCommander;
|
import com.beust.jcommander.JCommander;
|
||||||
import com.beust.jcommander.Parameter;
|
import com.beust.jcommander.Parameter;
|
||||||
import com.beust.jcommander.ParameterException;
|
import com.beust.jcommander.ParameterException;
|
||||||
import com.google.common.annotations.VisibleForTesting;
|
|
||||||
import org.apache.avro.Schema;
|
import org.apache.avro.Schema;
|
||||||
import org.apache.avro.generic.GenericRecord;
|
import org.apache.avro.generic.GenericRecord;
|
||||||
import org.apache.hadoop.fs.FileSystem;
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
@@ -120,7 +119,6 @@ public class HDFSParquetImporter implements Serializable {
|
|||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
|
|
||||||
@VisibleForTesting
|
|
||||||
protected int dataImport(JavaSparkContext jsc) throws IOException {
|
protected int dataImport(JavaSparkContext jsc) throws IOException {
|
||||||
try {
|
try {
|
||||||
if (fs.exists(new Path(cfg.targetPath))) {
|
if (fs.exists(new Path(cfg.targetPath))) {
|
||||||
|
|||||||
Reference in New Issue
Block a user