diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/DatePartitionPathSelector.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/DatePartitionPathSelector.java index c22657f58..97106ded9 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/DatePartitionPathSelector.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/DatePartitionPathSelector.java @@ -74,8 +74,6 @@ public class DatePartitionPathSelector extends DFSPathSelector { private final String dateFormat; private final int datePartitionDepth; private final int numPrevDaysToList; - private final LocalDate fromDate; - private final LocalDate currentDate; private final int partitionsListParallelism; /** Configs supported. */ @@ -107,10 +105,7 @@ public class DatePartitionPathSelector extends DFSPathSelector { */ dateFormat = props.getString(DATE_FORMAT, DEFAULT_DATE_FORMAT); datePartitionDepth = props.getInteger(DATE_PARTITION_DEPTH, DEFAULT_DATE_PARTITION_DEPTH); - // If not specified the current date is assumed by default. - currentDate = LocalDate.parse(props.getString(Config.CURRENT_DATE, LocalDate.now().toString())); numPrevDaysToList = props.getInteger(LOOKBACK_DAYS, DEFAULT_LOOKBACK_DAYS); - fromDate = currentDate.minusDays(numPrevDaysToList); partitionsListParallelism = props.getInteger(PARTITIONS_LIST_PARALLELISM, DEFAULT_PARTITIONS_LIST_PARALLELISM); } @@ -118,6 +113,9 @@ public class DatePartitionPathSelector extends DFSPathSelector { public Pair, String> getNextFilePathsAndMaxModificationTime(JavaSparkContext sparkContext, Option lastCheckpointStr, long sourceLimit) { + // If not specified the current date is assumed by default. + LocalDate currentDate = LocalDate.parse(props.getString(Config.CURRENT_DATE, LocalDate.now().toString())); + // obtain all eligible files under root folder. LOG.info( "Root path => " @@ -133,7 +131,7 @@ public class DatePartitionPathSelector extends DFSPathSelector { long lastCheckpointTime = lastCheckpointStr.map(Long::parseLong).orElse(Long.MIN_VALUE); HoodieSparkEngineContext context = new HoodieSparkEngineContext(sparkContext); SerializableConfiguration serializedConf = new SerializableConfiguration(fs.getConf()); - List prunedParitionPaths = pruneDatePartitionPaths(context, fs, props.getString(ROOT_INPUT_PATH_PROP)); + List prunedParitionPaths = pruneDatePartitionPaths(context, fs, props.getString(ROOT_INPUT_PATH_PROP), currentDate); List eligibleFiles = context.flatMap(prunedParitionPaths, path -> { @@ -173,7 +171,7 @@ public class DatePartitionPathSelector extends DFSPathSelector { * Prunes date level partitions to last few days configured by 'NUM_PREV_DAYS_TO_LIST' from * 'CURRENT_DATE'. Parallelizes listing by leveraging HoodieSparkEngineContext's methods. */ - public List pruneDatePartitionPaths(HoodieSparkEngineContext context, FileSystem fs, String rootPath) { + public List pruneDatePartitionPaths(HoodieSparkEngineContext context, FileSystem fs, String rootPath, LocalDate currentDate) { List partitionPaths = new ArrayList<>(); // get all partition paths before date partition level partitionPaths.add(rootPath); @@ -199,6 +197,7 @@ public class DatePartitionPathSelector extends DFSPathSelector { // Prune date partitions to last few days return context.getJavaSparkContext().parallelize(partitionPaths, partitionsListParallelism) .filter(s -> { + LocalDate fromDate = currentDate.minusDays(numPrevDaysToList); String[] splits = s.split("/"); String datePartition = splits[splits.length - 1]; LocalDate partitionDate; diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/helpers/TestDatePartitionPathSelector.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/helpers/TestDatePartitionPathSelector.java index 30d099323..16736604d 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/helpers/TestDatePartitionPathSelector.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/helpers/TestDatePartitionPathSelector.java @@ -210,7 +210,7 @@ public class TestDatePartitionPathSelector extends HoodieClientTestHarness { createParentDirsBeforeDatePartitions(root, generateRandomStrings(), totalDepthBeforeDatePartitions, leafDirs); createDatePartitionsWithFiles(leafDirs, isHiveStylePartition, dateFormat); - List paths = pathSelector.pruneDatePartitionPaths(context, fs, root.toString()); + List paths = pathSelector.pruneDatePartitionPaths(context, fs, root.toString(), LocalDate.parse(currentDate)); assertEquals(expectedNumFiles, paths.size()); } }