1
0

[HUDI-751] Fix some coding issues reported by FindBugs (#1470)

This commit is contained in:
Shaofeng Shi
2020-03-31 21:19:32 +08:00
committed by GitHub
parent 9ecf0ccfb2
commit 78b3194e82
31 changed files with 57 additions and 41 deletions

View File

@@ -120,7 +120,7 @@ public class RollbacksCommand implements CommandMarker {
/** /**
* An Active timeline containing only rollbacks. * An Active timeline containing only rollbacks.
*/ */
class RollbackTimeline extends HoodieActiveTimeline { static class RollbackTimeline extends HoodieActiveTimeline {
public RollbackTimeline(HoodieTableMetaClient metaClient) { public RollbackTimeline(HoodieTableMetaClient metaClient) {
super(metaClient, CollectionUtils.createImmutableSet(HoodieTimeline.ROLLBACK_EXTENSION)); super(metaClient, CollectionUtils.createImmutableSet(HoodieTimeline.ROLLBACK_EXTENSION));

View File

@@ -34,7 +34,7 @@ import java.util.Map;
@Component @Component
public class SparkEnvCommand implements CommandMarker { public class SparkEnvCommand implements CommandMarker {
public static Map<String, String> env = new HashMap<String, String>(); public static Map<String, String> env = new HashMap<>();
@CliCommand(value = "set", help = "Set spark launcher env to cli") @CliCommand(value = "set", help = "Set spark launcher env to cli")
public void setEnv(@CliOption(key = {"conf"}, help = "Env config to be set") final String confMap) { public void setEnv(@CliOption(key = {"conf"}, help = "Env config to be set") final String confMap) {
@@ -49,8 +49,8 @@ public class SparkEnvCommand implements CommandMarker {
public String showAllEnv() { public String showAllEnv() {
String[][] rows = new String[env.size()][2]; String[][] rows = new String[env.size()][2];
int i = 0; int i = 0;
for (String key: env.keySet()) { for (Map.Entry<String, String> entry: env.entrySet()) {
rows[i] = new String[]{key, env.get(key)}; rows[i] = new String[]{entry.getKey(), entry.getValue()};
i++; i++;
} }
return HoodiePrintHelper.print(new String[] {"key", "value"}, rows); return HoodiePrintHelper.print(new String[] {"key", "value"}, rows);

View File

@@ -131,9 +131,9 @@ public class StatsCommand implements CommandMarker {
} }
List<Comparable[]> rows = new ArrayList<>(); List<Comparable[]> rows = new ArrayList<>();
for (String instantTime : commitHistoMap.keySet()) { for (Map.Entry<String, Histogram> entry : commitHistoMap.entrySet()) {
Snapshot s = commitHistoMap.get(instantTime).getSnapshot(); Snapshot s = entry.getValue().getSnapshot();
rows.add(printFileSizeHistogram(instantTime, s)); rows.add(printFileSizeHistogram(entry.getKey(), s));
} }
Snapshot s = globalHistogram.getSnapshot(); Snapshot s = globalHistogram.getSnapshot();
rows.add(printFileSizeHistogram("ALL", s)); rows.add(printFileSizeHistogram("ALL", s));

View File

@@ -49,10 +49,9 @@ public class HiveUtil {
public static long countRecords(String jdbcUrl, HoodieTableMetaClient source, String dbName, String user, String pass) public static long countRecords(String jdbcUrl, HoodieTableMetaClient source, String dbName, String user, String pass)
throws SQLException { throws SQLException {
Connection conn = HiveUtil.getConnection(jdbcUrl, user, pass);
ResultSet rs = null; ResultSet rs = null;
Statement stmt = conn.createStatement(); try (Connection conn = HiveUtil.getConnection(jdbcUrl, user, pass);
try { Statement stmt = conn.createStatement()) {
// stmt.execute("set mapred.job.queue.name=<queue_name>"); // stmt.execute("set mapred.job.queue.name=<queue_name>");
stmt.execute("set hive.input.format=org.apache.hadoop.hive.ql.io.HiveInputFormat"); stmt.execute("set hive.input.format=org.apache.hadoop.hive.ql.io.HiveInputFormat");
stmt.execute("set hive.stats.autogather=false"); stmt.execute("set hive.stats.autogather=false");
@@ -68,9 +67,6 @@ public class HiveUtil {
if (rs != null) { if (rs != null) {
rs.close(); rs.close();
} }
if (stmt != null) {
stmt.close();
}
} }
} }
@@ -88,10 +84,9 @@ public class HiveUtil {
private static long countRecords(String jdbcUrl, HoodieTableMetaClient source, String srcDb, String startDateStr, private static long countRecords(String jdbcUrl, HoodieTableMetaClient source, String srcDb, String startDateStr,
String endDateStr, String user, String pass) throws SQLException { String endDateStr, String user, String pass) throws SQLException {
Connection conn = HiveUtil.getConnection(jdbcUrl, user, pass);
ResultSet rs = null; ResultSet rs = null;
Statement stmt = conn.createStatement(); try (Connection conn = HiveUtil.getConnection(jdbcUrl, user, pass);
try { Statement stmt = conn.createStatement()) {
// stmt.execute("set mapred.job.queue.name=<queue_name>"); // stmt.execute("set mapred.job.queue.name=<queue_name>");
stmt.execute("set hive.input.format=org.apache.hadoop.hive.ql.io.HiveInputFormat"); stmt.execute("set hive.input.format=org.apache.hadoop.hive.ql.io.HiveInputFormat");
stmt.execute("set hive.stats.autogather=false"); stmt.execute("set hive.stats.autogather=false");
@@ -106,9 +101,6 @@ public class HiveUtil {
if (rs != null) { if (rs != null) {
rs.close(); rs.close();
} }
if (stmt != null) {
stmt.close();
}
} }
} }
} }

View File

@@ -47,6 +47,7 @@ import java.util.List;
public class HoodieCleanClient<T extends HoodieRecordPayload> extends AbstractHoodieClient { public class HoodieCleanClient<T extends HoodieRecordPayload> extends AbstractHoodieClient {
private static final long serialVersionUID = 1L;
private static final Logger LOG = LogManager.getLogger(HoodieCleanClient.class); private static final Logger LOG = LogManager.getLogger(HoodieCleanClient.class);
private final transient HoodieMetrics metrics; private final transient HoodieMetrics metrics;

View File

@@ -57,6 +57,7 @@ import scala.Tuple2;
*/ */
public class HoodieReadClient<T extends HoodieRecordPayload> implements Serializable { public class HoodieReadClient<T extends HoodieRecordPayload> implements Serializable {
private static final long serialVersionUID = 1L;
private static final Logger LOG = LogManager.getLogger(HoodieReadClient.class); private static final Logger LOG = LogManager.getLogger(HoodieReadClient.class);
/** /**

View File

@@ -93,6 +93,7 @@ import scala.Tuple2;
*/ */
public class HoodieWriteClient<T extends HoodieRecordPayload> extends AbstractHoodieWriteClient<T> { public class HoodieWriteClient<T extends HoodieRecordPayload> extends AbstractHoodieWriteClient<T> {
private static final long serialVersionUID = 1L;
private static final Logger LOG = LogManager.getLogger(HoodieWriteClient.class); private static final Logger LOG = LogManager.getLogger(HoodieWriteClient.class);
private static final String LOOKUP_STR = "lookup"; private static final String LOOKUP_STR = "lookup";
private final boolean rollbackPending; private final boolean rollbackPending;

View File

@@ -35,6 +35,7 @@ import java.util.Random;
*/ */
public class WriteStatus implements Serializable { public class WriteStatus implements Serializable {
private static final long serialVersionUID = 1L;
private static final long RANDOM_SEED = 9038412832L; private static final long RANDOM_SEED = 9038412832L;
private final HashMap<HoodieKey, Throwable> errors = new HashMap<>(); private final HashMap<HoodieKey, Throwable> errors = new HashMap<>();

View File

@@ -61,7 +61,7 @@ public class HoodieHBaseIndexConfig extends DefaultHoodieConfig {
* value based on global indexing throughput needs and most importantly, how much the HBase installation in use is * value based on global indexing throughput needs and most importantly, how much the HBase installation in use is
* able to tolerate without Region Servers going down. * able to tolerate without Region Servers going down.
*/ */
public static String HBASE_MAX_QPS_PER_REGION_SERVER_PROP = "hoodie.index.hbase.max.qps.per.region.server"; public static final String HBASE_MAX_QPS_PER_REGION_SERVER_PROP = "hoodie.index.hbase.max.qps.per.region.server";
/** /**
* Default batch size, used only for Get, but computed for Put. * Default batch size, used only for Get, but computed for Put.
*/ */

View File

@@ -39,7 +39,7 @@ import java.util.stream.Collectors;
*/ */
public class BoundedPartitionAwareCompactionStrategy extends DayBasedCompactionStrategy { public class BoundedPartitionAwareCompactionStrategy extends DayBasedCompactionStrategy {
SimpleDateFormat dateFormat = new SimpleDateFormat(datePartitionFormat); SimpleDateFormat dateFormat = new SimpleDateFormat(DATE_PARTITION_FORMAT);
@Override @Override
public List<HoodieCompactionOperation> orderAndFilter(HoodieWriteConfig writeConfig, public List<HoodieCompactionOperation> orderAndFilter(HoodieWriteConfig writeConfig,

View File

@@ -39,14 +39,14 @@ import java.util.stream.Collectors;
public class DayBasedCompactionStrategy extends CompactionStrategy { public class DayBasedCompactionStrategy extends CompactionStrategy {
// For now, use SimpleDateFormat as default partition format // For now, use SimpleDateFormat as default partition format
protected static String datePartitionFormat = "yyyy/MM/dd"; protected static final String DATE_PARTITION_FORMAT = "yyyy/MM/dd";
// Sorts compaction in LastInFirstCompacted order // Sorts compaction in LastInFirstCompacted order
protected static Comparator<String> comparator = (String leftPartition, String rightPartition) -> { protected static Comparator<String> comparator = (String leftPartition, String rightPartition) -> {
try { try {
leftPartition = getPartitionPathWithoutPartitionKeys(leftPartition); leftPartition = getPartitionPathWithoutPartitionKeys(leftPartition);
rightPartition = getPartitionPathWithoutPartitionKeys(rightPartition); rightPartition = getPartitionPathWithoutPartitionKeys(rightPartition);
Date left = new SimpleDateFormat(datePartitionFormat, Locale.ENGLISH).parse(leftPartition); Date left = new SimpleDateFormat(DATE_PARTITION_FORMAT, Locale.ENGLISH).parse(leftPartition);
Date right = new SimpleDateFormat(datePartitionFormat, Locale.ENGLISH).parse(rightPartition); Date right = new SimpleDateFormat(DATE_PARTITION_FORMAT, Locale.ENGLISH).parse(rightPartition);
return left.after(right) ? -1 : right.after(left) ? 1 : 0; return left.after(right) ? -1 : right.after(left) ? 1 : 0;
} catch (ParseException e) { } catch (ParseException e) {
throw new HoodieException("Invalid Partition Date Format", e); throw new HoodieException("Invalid Partition Date Format", e);

View File

@@ -30,6 +30,7 @@ import java.io.Serializable;
*/ */
public class SerializableConfiguration implements Serializable { public class SerializableConfiguration implements Serializable {
private static final long serialVersionUID = 1L;
private transient Configuration configuration; private transient Configuration configuration;
public SerializableConfiguration(Configuration configuration) { public SerializableConfiguration(Configuration configuration) {

View File

@@ -31,6 +31,7 @@ import java.util.Objects;
*/ */
public class HoodieBaseFile implements Serializable { public class HoodieBaseFile implements Serializable {
private static final long serialVersionUID = 1L;
private transient FileStatus fileStatus; private transient FileStatus fileStatus;
private final String fullPath; private final String fullPath;
private long fileLen; private long fileLen;

View File

@@ -37,6 +37,7 @@ import java.util.Objects;
*/ */
public class HoodieLogFile implements Serializable { public class HoodieLogFile implements Serializable {
private static final long serialVersionUID = 1L;
public static final String DELTA_EXTENSION = ".log"; public static final String DELTA_EXTENSION = ".log";
public static final Integer LOGFILE_BASE_VERSION = 1; public static final Integer LOGFILE_BASE_VERSION = 1;
@@ -129,6 +130,7 @@ public class HoodieLogFile implements Serializable {
*/ */
public static class LogFileComparator implements Comparator<HoodieLogFile>, Serializable { public static class LogFileComparator implements Comparator<HoodieLogFile>, Serializable {
private static final long serialVersionUID = 1L;
private transient Comparator<String> writeTokenComparator; private transient Comparator<String> writeTokenComparator;
private Comparator<String> getWriteTokenComparator() { private Comparator<String> getWriteTokenComparator() {

View File

@@ -30,11 +30,11 @@ import java.util.Objects;
*/ */
public class HoodieRecord<T extends HoodieRecordPayload> implements Serializable { public class HoodieRecord<T extends HoodieRecordPayload> implements Serializable {
public static String COMMIT_TIME_METADATA_FIELD = "_hoodie_commit_time"; public static final String COMMIT_TIME_METADATA_FIELD = "_hoodie_commit_time";
public static String COMMIT_SEQNO_METADATA_FIELD = "_hoodie_commit_seqno"; public static final String COMMIT_SEQNO_METADATA_FIELD = "_hoodie_commit_seqno";
public static String RECORD_KEY_METADATA_FIELD = "_hoodie_record_key"; public static final String RECORD_KEY_METADATA_FIELD = "_hoodie_record_key";
public static String PARTITION_PATH_METADATA_FIELD = "_hoodie_partition_path"; public static final String PARTITION_PATH_METADATA_FIELD = "_hoodie_partition_path";
public static String FILENAME_METADATA_FIELD = "_hoodie_file_name"; public static final String FILENAME_METADATA_FIELD = "_hoodie_file_name";
public static final List<String> HOODIE_META_COLUMNS = public static final List<String> HOODIE_META_COLUMNS =
CollectionUtils.createImmutableList(COMMIT_TIME_METADATA_FIELD, COMMIT_SEQNO_METADATA_FIELD, CollectionUtils.createImmutableList(COMMIT_TIME_METADATA_FIELD, COMMIT_SEQNO_METADATA_FIELD,

View File

@@ -52,7 +52,7 @@ public class HoodieRollingStatMetadata implements Serializable {
this.actionType = actionType; this.actionType = actionType;
} }
class RollingStatsHashMap<K, V> extends HashMap<K, V> { static class RollingStatsHashMap<K, V> extends HashMap<K, V> {
@Override @Override
public V put(K key, V value) { public V put(K key, V value) {

View File

@@ -68,10 +68,11 @@ import java.util.stream.Stream;
*/ */
public class HoodieTableMetaClient implements Serializable { public class HoodieTableMetaClient implements Serializable {
private static final long serialVersionUID = 1L;
private static final Logger LOG = LogManager.getLogger(HoodieTableMetaClient.class); private static final Logger LOG = LogManager.getLogger(HoodieTableMetaClient.class);
public static String METAFOLDER_NAME = ".hoodie"; public static final String METAFOLDER_NAME = ".hoodie";
public static String TEMPFOLDER_NAME = METAFOLDER_NAME + File.separator + ".temp"; public static final String TEMPFOLDER_NAME = METAFOLDER_NAME + File.separator + ".temp";
public static String AUXILIARYFOLDER_NAME = METAFOLDER_NAME + File.separator + ".aux"; public static final String AUXILIARYFOLDER_NAME = METAFOLDER_NAME + File.separator + ".aux";
public static final String MARKER_EXTN = ".marker"; public static final String MARKER_EXTN = ".marker";
private String basePath; private String basePath;

View File

@@ -48,6 +48,7 @@ import static java.util.Collections.reverse;
*/ */
public class HoodieDefaultTimeline implements HoodieTimeline { public class HoodieDefaultTimeline implements HoodieTimeline {
private static final long serialVersionUID = 1L;
private static final Logger LOG = LogManager.getLogger(HoodieDefaultTimeline.class); private static final Logger LOG = LogManager.getLogger(HoodieDefaultTimeline.class);
private static final String HASHING_ALGORITHM = "SHA-256"; private static final String HASHING_ALGORITHM = "SHA-256";

View File

@@ -32,7 +32,7 @@ public class HoodieTimer {
// Ordered stack of TimeInfo's to make sure stopping the timer returns the correct elapsed time // Ordered stack of TimeInfo's to make sure stopping the timer returns the correct elapsed time
Deque<TimeInfo> timeInfoDeque = new ArrayDeque<>(); Deque<TimeInfo> timeInfoDeque = new ArrayDeque<>();
class TimeInfo { static class TimeInfo {
// captures the startTime of the code block // captures the startTime of the code block
long startTime; long startTime;

View File

@@ -55,7 +55,7 @@ import java.util.stream.Stream;
*/ */
public final class DiskBasedMap<T extends Serializable, R extends Serializable> implements Map<T, R>, Iterable<R> { public final class DiskBasedMap<T extends Serializable, R extends Serializable> implements Map<T, R>, Iterable<R> {
public static int BUFFER_SIZE = 128 * 1024; // 128 KB public static final int BUFFER_SIZE = 128 * 1024; // 128 KB
private static final Logger LOG = LogManager.getLogger(DiskBasedMap.class); private static final Logger LOG = LogManager.getLogger(DiskBasedMap.class);
// Stores the key and corresponding value's latest metadata spilled to disk // Stores the key and corresponding value's latest metadata spilled to disk
private final Map<T, ValueMetadata> valueMetadataMap; private final Map<T, ValueMetadata> valueMetadataMap;

View File

@@ -51,6 +51,7 @@ import java.util.stream.Collectors;
*/ */
public class HoodieROTablePathFilter implements PathFilter, Serializable { public class HoodieROTablePathFilter implements PathFilter, Serializable {
private static final long serialVersionUID = 1L;
private static final Logger LOG = LogManager.getLogger(HoodieROTablePathFilter.class); private static final Logger LOG = LogManager.getLogger(HoodieROTablePathFilter.class);
/** /**

View File

@@ -33,6 +33,7 @@ import java.util.List;
*/ */
public class SlashEncodedDayPartitionValueExtractor implements PartitionValueExtractor { public class SlashEncodedDayPartitionValueExtractor implements PartitionValueExtractor {
private static final long serialVersionUID = 1L;
private transient DateTimeFormatter dtfOut; private transient DateTimeFormatter dtfOut;
public SlashEncodedDayPartitionValueExtractor() { public SlashEncodedDayPartitionValueExtractor() {

View File

@@ -66,6 +66,7 @@ import scala.Tuple2;
*/ */
public class HDFSParquetImporter implements Serializable { public class HDFSParquetImporter implements Serializable {
private static final long serialVersionUID = 1L;
private static final Logger LOG = LogManager.getLogger(HDFSParquetImporter.class); private static final Logger LOG = LogManager.getLogger(HDFSParquetImporter.class);
private static final DateTimeFormatter PARTITION_FORMATTER = DateTimeFormatter.ofPattern("yyyy/MM/dd") private static final DateTimeFormatter PARTITION_FORMATTER = DateTimeFormatter.ofPattern("yyyy/MM/dd")

View File

@@ -37,6 +37,7 @@ import java.io.Serializable;
*/ */
public class Compactor implements Serializable { public class Compactor implements Serializable {
private static final long serialVersionUID = 1L;
private static final Logger LOG = LogManager.getLogger(Compactor.class); private static final Logger LOG = LogManager.getLogger(Compactor.class);
private transient HoodieWriteClient compactionClient; private transient HoodieWriteClient compactionClient;

View File

@@ -83,9 +83,10 @@ import static org.apache.hudi.utilities.schema.RowBasedSchemaProvider.HOODIE_REC
*/ */
public class DeltaSync implements Serializable { public class DeltaSync implements Serializable {
private static final long serialVersionUID = 1L;
private static final Logger LOG = LogManager.getLogger(DeltaSync.class); private static final Logger LOG = LogManager.getLogger(DeltaSync.class);
public static String CHECKPOINT_KEY = "deltastreamer.checkpoint.key"; public static final String CHECKPOINT_KEY = "deltastreamer.checkpoint.key";
public static String CHECKPOINT_RESET_KEY = "deltastreamer.checkpoint.reset_key"; public static final String CHECKPOINT_RESET_KEY = "deltastreamer.checkpoint.reset_key";
/** /**
* Delta Sync Config. * Delta Sync Config.

View File

@@ -77,6 +77,7 @@ import java.util.stream.IntStream;
*/ */
public class HoodieDeltaStreamer implements Serializable { public class HoodieDeltaStreamer implements Serializable {
private static final long serialVersionUID = 1L;
private static final Logger LOG = LogManager.getLogger(HoodieDeltaStreamer.class); private static final Logger LOG = LogManager.getLogger(HoodieDeltaStreamer.class);
public static String CHECKPOINT_KEY = "deltastreamer.checkpoint.key"; public static String CHECKPOINT_KEY = "deltastreamer.checkpoint.key";
@@ -302,6 +303,7 @@ public class HoodieDeltaStreamer implements Serializable {
*/ */
public static class DeltaSyncService extends AbstractDeltaStreamerService { public static class DeltaSyncService extends AbstractDeltaStreamerService {
private static final long serialVersionUID = 1L;
/** /**
* Delta Sync Config. * Delta Sync Config.
*/ */
@@ -489,6 +491,7 @@ public class HoodieDeltaStreamer implements Serializable {
*/ */
public static class AsyncCompactService extends AbstractDeltaStreamerService { public static class AsyncCompactService extends AbstractDeltaStreamerService {
private static final long serialVersionUID = 1L;
private final int maxConcurrentCompaction; private final int maxConcurrentCompaction;
private transient Compactor compactor; private transient Compactor compactor;
private transient JavaSparkContext jssc; private transient JavaSparkContext jssc;

View File

@@ -58,6 +58,7 @@ import java.util.stream.IntStream;
public class TimelineServerPerf implements Serializable { public class TimelineServerPerf implements Serializable {
private static final long serialVersionUID = 1L;
private static final Logger LOG = LogManager.getLogger(TimelineServerPerf.class); private static final Logger LOG = LogManager.getLogger(TimelineServerPerf.class);
private final Config cfg; private final Config cfg;
private transient TimelineService timelineServer; private transient TimelineService timelineServer;

View File

@@ -58,10 +58,12 @@ import java.util.List;
* passed to the CSV reader without inferring the schema from the CSV file. * passed to the CSV reader without inferring the schema from the CSV file.
*/ */
public class CsvDFSSource extends RowSource { public class CsvDFSSource extends RowSource {
private static final long serialVersionUID = 1L;
// CsvSource config prefix // CsvSource config prefix
public static final String CSV_SRC_CONFIG_PREFIX = "hoodie.deltastreamer.csv."; protected static final String CSV_SRC_CONFIG_PREFIX = "hoodie.deltastreamer.csv.";
// CSV-specific configurations to pass in from Hudi to Spark // CSV-specific configurations to pass in from Hudi to Spark
public static final List<String> CSV_CONFIG_KEYS = Arrays.asList( protected static final List<String> CSV_CONFIG_KEYS = Arrays.asList(
"sep", "encoding", "quote", "escape", "charToEscapeQuoteEscaping", "comment", "sep", "encoding", "quote", "escape", "charToEscapeQuoteEscaping", "comment",
"header", "enforceSchema", "inferSchema", "samplingRatio", "ignoreLeadingWhiteSpace", "header", "enforceSchema", "inferSchema", "samplingRatio", "ignoreLeadingWhiteSpace",
"ignoreTrailingWhiteSpace", "nullValue", "emptyValue", "nanValue", "positiveInf", "ignoreTrailingWhiteSpace", "nullValue", "emptyValue", "nanValue", "positiveInf",
@@ -69,7 +71,7 @@ public class CsvDFSSource extends RowSource {
"mode", "columnNameOfCorruptRecord", "multiLine" "mode", "columnNameOfCorruptRecord", "multiLine"
); );
private final DFSPathSelector pathSelector; private final transient DFSPathSelector pathSelector;
private final StructType sourceSchema; private final StructType sourceSchema;
public CsvDFSSource(TypedProperties props, public CsvDFSSource(TypedProperties props,

View File

@@ -59,6 +59,8 @@ import java.util.stream.Collectors;
*/ */
public class HiveIncrPullSource extends AvroSource { public class HiveIncrPullSource extends AvroSource {
private static final long serialVersionUID = 1L;
private static final Logger LOG = LogManager.getLogger(HiveIncrPullSource.class); private static final Logger LOG = LogManager.getLogger(HiveIncrPullSource.class);
private final transient FileSystem fs; private final transient FileSystem fs;

View File

@@ -33,6 +33,7 @@ import java.io.Serializable;
*/ */
public class AvroConvertor implements Serializable { public class AvroConvertor implements Serializable {
private static final long serialVersionUID = 1L;
/** /**
* To be lazily inited on executors. * To be lazily inited on executors.
*/ */

View File

@@ -85,7 +85,7 @@ public class DFSPathSelector {
long maxModificationTime = Long.MIN_VALUE; long maxModificationTime = Long.MIN_VALUE;
List<FileStatus> filteredFiles = new ArrayList<>(); List<FileStatus> filteredFiles = new ArrayList<>();
for (FileStatus f : eligibleFiles) { for (FileStatus f : eligibleFiles) {
if (lastCheckpointStr.isPresent() && f.getModificationTime() <= Long.valueOf(lastCheckpointStr.get())) { if (lastCheckpointStr.isPresent() && f.getModificationTime() <= Long.valueOf(lastCheckpointStr.get()).longValue()) {
// skip processed files // skip processed files
continue; continue;
} }