[HUDI-1685] keep updating current date for every batch (#2671)
This commit is contained in:
@@ -74,8 +74,6 @@ public class DatePartitionPathSelector extends DFSPathSelector {
|
|||||||
private final String dateFormat;
|
private final String dateFormat;
|
||||||
private final int datePartitionDepth;
|
private final int datePartitionDepth;
|
||||||
private final int numPrevDaysToList;
|
private final int numPrevDaysToList;
|
||||||
private final LocalDate fromDate;
|
|
||||||
private final LocalDate currentDate;
|
|
||||||
private final int partitionsListParallelism;
|
private final int partitionsListParallelism;
|
||||||
|
|
||||||
/** Configs supported. */
|
/** Configs supported. */
|
||||||
@@ -107,10 +105,7 @@ public class DatePartitionPathSelector extends DFSPathSelector {
|
|||||||
*/
|
*/
|
||||||
dateFormat = props.getString(DATE_FORMAT, DEFAULT_DATE_FORMAT);
|
dateFormat = props.getString(DATE_FORMAT, DEFAULT_DATE_FORMAT);
|
||||||
datePartitionDepth = props.getInteger(DATE_PARTITION_DEPTH, DEFAULT_DATE_PARTITION_DEPTH);
|
datePartitionDepth = props.getInteger(DATE_PARTITION_DEPTH, DEFAULT_DATE_PARTITION_DEPTH);
|
||||||
// If not specified the current date is assumed by default.
|
|
||||||
currentDate = LocalDate.parse(props.getString(Config.CURRENT_DATE, LocalDate.now().toString()));
|
|
||||||
numPrevDaysToList = props.getInteger(LOOKBACK_DAYS, DEFAULT_LOOKBACK_DAYS);
|
numPrevDaysToList = props.getInteger(LOOKBACK_DAYS, DEFAULT_LOOKBACK_DAYS);
|
||||||
fromDate = currentDate.minusDays(numPrevDaysToList);
|
|
||||||
partitionsListParallelism = props.getInteger(PARTITIONS_LIST_PARALLELISM, DEFAULT_PARTITIONS_LIST_PARALLELISM);
|
partitionsListParallelism = props.getInteger(PARTITIONS_LIST_PARALLELISM, DEFAULT_PARTITIONS_LIST_PARALLELISM);
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -118,6 +113,9 @@ public class DatePartitionPathSelector extends DFSPathSelector {
|
|||||||
public Pair<Option<String>, String> getNextFilePathsAndMaxModificationTime(JavaSparkContext sparkContext,
|
public Pair<Option<String>, String> getNextFilePathsAndMaxModificationTime(JavaSparkContext sparkContext,
|
||||||
Option<String> lastCheckpointStr,
|
Option<String> lastCheckpointStr,
|
||||||
long sourceLimit) {
|
long sourceLimit) {
|
||||||
|
// If not specified the current date is assumed by default.
|
||||||
|
LocalDate currentDate = LocalDate.parse(props.getString(Config.CURRENT_DATE, LocalDate.now().toString()));
|
||||||
|
|
||||||
// obtain all eligible files under root folder.
|
// obtain all eligible files under root folder.
|
||||||
LOG.info(
|
LOG.info(
|
||||||
"Root path => "
|
"Root path => "
|
||||||
@@ -133,7 +131,7 @@ public class DatePartitionPathSelector extends DFSPathSelector {
|
|||||||
long lastCheckpointTime = lastCheckpointStr.map(Long::parseLong).orElse(Long.MIN_VALUE);
|
long lastCheckpointTime = lastCheckpointStr.map(Long::parseLong).orElse(Long.MIN_VALUE);
|
||||||
HoodieSparkEngineContext context = new HoodieSparkEngineContext(sparkContext);
|
HoodieSparkEngineContext context = new HoodieSparkEngineContext(sparkContext);
|
||||||
SerializableConfiguration serializedConf = new SerializableConfiguration(fs.getConf());
|
SerializableConfiguration serializedConf = new SerializableConfiguration(fs.getConf());
|
||||||
List<String> prunedParitionPaths = pruneDatePartitionPaths(context, fs, props.getString(ROOT_INPUT_PATH_PROP));
|
List<String> prunedParitionPaths = pruneDatePartitionPaths(context, fs, props.getString(ROOT_INPUT_PATH_PROP), currentDate);
|
||||||
|
|
||||||
List<FileStatus> eligibleFiles = context.flatMap(prunedParitionPaths,
|
List<FileStatus> eligibleFiles = context.flatMap(prunedParitionPaths,
|
||||||
path -> {
|
path -> {
|
||||||
@@ -173,7 +171,7 @@ public class DatePartitionPathSelector extends DFSPathSelector {
|
|||||||
* Prunes date level partitions to last few days configured by 'NUM_PREV_DAYS_TO_LIST' from
|
* Prunes date level partitions to last few days configured by 'NUM_PREV_DAYS_TO_LIST' from
|
||||||
* 'CURRENT_DATE'. Parallelizes listing by leveraging HoodieSparkEngineContext's methods.
|
* 'CURRENT_DATE'. Parallelizes listing by leveraging HoodieSparkEngineContext's methods.
|
||||||
*/
|
*/
|
||||||
public List<String> pruneDatePartitionPaths(HoodieSparkEngineContext context, FileSystem fs, String rootPath) {
|
public List<String> pruneDatePartitionPaths(HoodieSparkEngineContext context, FileSystem fs, String rootPath, LocalDate currentDate) {
|
||||||
List<String> partitionPaths = new ArrayList<>();
|
List<String> partitionPaths = new ArrayList<>();
|
||||||
// get all partition paths before date partition level
|
// get all partition paths before date partition level
|
||||||
partitionPaths.add(rootPath);
|
partitionPaths.add(rootPath);
|
||||||
@@ -199,6 +197,7 @@ public class DatePartitionPathSelector extends DFSPathSelector {
|
|||||||
// Prune date partitions to last few days
|
// Prune date partitions to last few days
|
||||||
return context.getJavaSparkContext().parallelize(partitionPaths, partitionsListParallelism)
|
return context.getJavaSparkContext().parallelize(partitionPaths, partitionsListParallelism)
|
||||||
.filter(s -> {
|
.filter(s -> {
|
||||||
|
LocalDate fromDate = currentDate.minusDays(numPrevDaysToList);
|
||||||
String[] splits = s.split("/");
|
String[] splits = s.split("/");
|
||||||
String datePartition = splits[splits.length - 1];
|
String datePartition = splits[splits.length - 1];
|
||||||
LocalDate partitionDate;
|
LocalDate partitionDate;
|
||||||
|
|||||||
@@ -210,7 +210,7 @@ public class TestDatePartitionPathSelector extends HoodieClientTestHarness {
|
|||||||
createParentDirsBeforeDatePartitions(root, generateRandomStrings(), totalDepthBeforeDatePartitions, leafDirs);
|
createParentDirsBeforeDatePartitions(root, generateRandomStrings(), totalDepthBeforeDatePartitions, leafDirs);
|
||||||
createDatePartitionsWithFiles(leafDirs, isHiveStylePartition, dateFormat);
|
createDatePartitionsWithFiles(leafDirs, isHiveStylePartition, dateFormat);
|
||||||
|
|
||||||
List<String> paths = pathSelector.pruneDatePartitionPaths(context, fs, root.toString());
|
List<String> paths = pathSelector.pruneDatePartitionPaths(context, fs, root.toString(), LocalDate.parse(currentDate));
|
||||||
assertEquals(expectedNumFiles, paths.size());
|
assertEquals(expectedNumFiles, paths.size());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user