diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/DatePartitionPathSelector.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/DatePartitionPathSelector.java
index 2cedb6cea..c22657f58 100644
--- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/DatePartitionPathSelector.java
+++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/DatePartitionPathSelector.java
@@ -35,13 +35,16 @@ import org.apache.log4j.Logger;
import org.apache.spark.api.java.JavaSparkContext;
import java.time.LocalDate;
+import java.time.format.DateTimeFormatter;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.List;
import java.util.stream.Collectors;
import static org.apache.hudi.utilities.sources.helpers.DFSPathSelector.Config.ROOT_INPUT_PATH_PROP;
+import static org.apache.hudi.utilities.sources.helpers.DatePartitionPathSelector.Config.DATE_FORMAT;
import static org.apache.hudi.utilities.sources.helpers.DatePartitionPathSelector.Config.DATE_PARTITION_DEPTH;
+import static org.apache.hudi.utilities.sources.helpers.DatePartitionPathSelector.Config.DEFAULT_DATE_FORMAT;
import static org.apache.hudi.utilities.sources.helpers.DatePartitionPathSelector.Config.DEFAULT_DATE_PARTITION_DEPTH;
import static org.apache.hudi.utilities.sources.helpers.DatePartitionPathSelector.Config.DEFAULT_LOOKBACK_DAYS;
import static org.apache.hudi.utilities.sources.helpers.DatePartitionPathSelector.Config.DEFAULT_PARTITIONS_LIST_PARALLELISM;
@@ -59,12 +62,16 @@ import static org.apache.hudi.utilities.sources.helpers.DatePartitionPathSelecto
*
The date based partition is expected to be of the format '=yyyy-mm-dd' or
* 'yyyy-mm-dd'. The date partition can be at any level. For ex. the partition path can be of the
* form `////` or
- * `/</`
+ * `/</`.
+ *
+ * The date based partition format can be configured via this property
+ * hoodie.deltastreamer.source.dfs.datepartitioned.date.format
*/
public class DatePartitionPathSelector extends DFSPathSelector {
private static volatile Logger LOG = LogManager.getLogger(DatePartitionPathSelector.class);
+ private final String dateFormat;
private final int datePartitionDepth;
private final int numPrevDaysToList;
private final LocalDate fromDate;
@@ -73,6 +80,9 @@ public class DatePartitionPathSelector extends DFSPathSelector {
/** Configs supported. */
public static class Config {
+ public static final String DATE_FORMAT = "hoodie.deltastreamer.source.dfs.datepartitioned.date.format";
+ public static final String DEFAULT_DATE_FORMAT = "yyyy-MM-dd";
+
public static final String DATE_PARTITION_DEPTH =
"hoodie.deltastreamer.source.dfs.datepartitioned.selector.depth";
public static final int DEFAULT_DATE_PARTITION_DEPTH = 0; // Implies no (date) partition
@@ -84,7 +94,6 @@ public class DatePartitionPathSelector extends DFSPathSelector {
public static final String CURRENT_DATE =
"hoodie.deltastreamer.source.dfs.datepartitioned.selector.currentdate";
-
public static final String PARTITIONS_LIST_PARALLELISM =
"hoodie.deltastreamer.source.dfs.datepartitioned.selector.parallelism";
public static final int DEFAULT_PARTITIONS_LIST_PARALLELISM = 20;
@@ -96,6 +105,7 @@ public class DatePartitionPathSelector extends DFSPathSelector {
* datePartitionDepth = 0 is same as basepath and there is no partition. In which case
* this path selector would be a no-op and lists all paths under the table basepath.
*/
+ dateFormat = props.getString(DATE_FORMAT, DEFAULT_DATE_FORMAT);
datePartitionDepth = props.getInteger(DATE_PARTITION_DEPTH, DEFAULT_DATE_PARTITION_DEPTH);
// If not specified the current date is assumed by default.
currentDate = LocalDate.parse(props.getString(Config.CURRENT_DATE, LocalDate.now().toString()));
@@ -130,20 +140,19 @@ public class DatePartitionPathSelector extends DFSPathSelector {
FileSystem fs = new Path(path).getFileSystem(serializedConf.get());
return listEligibleFiles(fs, new Path(path), lastCheckpointTime).stream();
}, partitionsListParallelism);
- // sort them by modification time.
- eligibleFiles.sort(Comparator.comparingLong(FileStatus::getModificationTime));
+ // sort them by modification time ascending.
+ List sortedEligibleFiles = eligibleFiles.stream()
+ .sorted(Comparator.comparingLong(FileStatus::getModificationTime)).collect(Collectors.toList());
// Filter based on checkpoint & input size, if needed
long currentBytes = 0;
- long maxModificationTime = Long.MIN_VALUE;
List filteredFiles = new ArrayList<>();
- for (FileStatus f : eligibleFiles) {
+ for (FileStatus f : sortedEligibleFiles) {
if (currentBytes + f.getLen() >= sourceLimit) {
// we have enough data, we are done
break;
}
- maxModificationTime = f.getModificationTime();
currentBytes += f.getLen();
filteredFiles.add(f);
}
@@ -156,7 +165,7 @@ public class DatePartitionPathSelector extends DFSPathSelector {
// read the files out.
String pathStr = filteredFiles.stream().map(f -> f.getPath().toString()).collect(Collectors.joining(","));
-
+ long maxModificationTime = filteredFiles.get(filteredFiles.size() - 1).getModificationTime();
return new ImmutablePair<>(Option.ofNullable(pathStr), String.valueOf(maxModificationTime));
}
@@ -193,14 +202,15 @@ public class DatePartitionPathSelector extends DFSPathSelector {
String[] splits = s.split("/");
String datePartition = splits[splits.length - 1];
LocalDate partitionDate;
+ DateTimeFormatter dateFormatter = DateTimeFormatter.ofPattern(dateFormat);
if (datePartition.contains("=")) {
String[] moreSplit = datePartition.split("=");
ValidationUtils.checkArgument(
moreSplit.length == 2,
"Partition Field (" + datePartition + ") not in expected format");
- partitionDate = LocalDate.parse(moreSplit[1]);
+ partitionDate = LocalDate.parse(moreSplit[1], dateFormatter);
} else {
- partitionDate = LocalDate.parse(datePartition);
+ partitionDate = LocalDate.parse(datePartition, dateFormatter);
}
return (partitionDate.isEqual(fromDate) || partitionDate.isAfter(fromDate))
&& (partitionDate.isEqual(currentDate) || partitionDate.isBefore(currentDate));
diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/helpers/TestDatePartitionPathSelector.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/helpers/TestDatePartitionPathSelector.java
index b7e127924..30d099323 100644
--- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/helpers/TestDatePartitionPathSelector.java
+++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/helpers/TestDatePartitionPathSelector.java
@@ -33,6 +33,7 @@ import org.junit.jupiter.params.provider.MethodSource;
import java.io.IOException;
import java.time.LocalDate;
+import java.time.format.DateTimeFormatter;
import java.util.ArrayList;
import java.util.List;
import java.util.UUID;
@@ -40,6 +41,7 @@ import java.util.stream.Stream;
import static org.apache.hudi.utilities.sources.helpers.DFSPathSelector.Config.ROOT_INPUT_PATH_PROP;
import static org.apache.hudi.utilities.sources.helpers.DatePartitionPathSelector.Config.CURRENT_DATE;
+import static org.apache.hudi.utilities.sources.helpers.DatePartitionPathSelector.Config.DATE_FORMAT;
import static org.apache.hudi.utilities.sources.helpers.DatePartitionPathSelector.Config.DATE_PARTITION_DEPTH;
import static org.apache.hudi.utilities.sources.helpers.DatePartitionPathSelector.Config.LOOKBACK_DAYS;
import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -78,11 +80,11 @@ public class TestDatePartitionPathSelector extends HoodieClientTestHarness {
/*
* Create Date partitions with some files under each of the leaf Dirs.
*/
- public List createDatePartitionsWithFiles(List leafDirs, boolean hiveStyle)
+ public List createDatePartitionsWithFiles(List leafDirs, boolean hiveStyle, String dateFormat)
throws IOException {
List allFiles = new ArrayList<>();
for (Path path : leafDirs) {
- List datePartitions = generateDatePartitionsUnder(path, hiveStyle);
+ List datePartitions = generateDatePartitionsUnder(path, hiveStyle, dateFormat);
for (Path datePartition : datePartitions) {
allFiles.addAll(createRandomFilesUnder(datePartition));
}
@@ -126,11 +128,12 @@ public class TestDatePartitionPathSelector extends HoodieClientTestHarness {
/*
* Generate date based partitions under a parent dir with or without hivestyle formatting.
*/
- private List generateDatePartitionsUnder(Path parent, boolean hiveStyle) throws IOException {
+ private List generateDatePartitionsUnder(Path parent, boolean hiveStyle, String dateFormat) throws IOException {
+ DateTimeFormatter formatter = DateTimeFormatter.ofPattern(dateFormat);
List datePartitions = new ArrayList<>();
String prefix = (hiveStyle ? "dt=" : "");
for (int i = 0; i < 5; i++) {
- Path child = new Path(parent, prefix + totalDates.get(i).toString());
+ Path child = new Path(parent, prefix + formatter.format(totalDates.get(i)));
fs.mkdirs(child);
datePartitions.add(child);
}
@@ -155,9 +158,10 @@ public class TestDatePartitionPathSelector extends HoodieClientTestHarness {
}
private static TypedProperties getProps(
- String basePath, int datePartitionDepth, int numDaysToList, String currentDate) {
+ String basePath, String dateFormat, int datePartitionDepth, int numDaysToList, String currentDate) {
TypedProperties properties = new TypedProperties();
properties.put(ROOT_INPUT_PATH_PROP, basePath);
+ properties.put(DATE_FORMAT, dateFormat);
properties.put(DATE_PARTITION_DEPTH, "" + datePartitionDepth);
properties.put(LOOKBACK_DAYS, "" + numDaysToList);
properties.put(CURRENT_DATE, currentDate);
@@ -172,14 +176,14 @@ public class TestDatePartitionPathSelector extends HoodieClientTestHarness {
private static Stream configParams() {
Object[][] data =
new Object[][] {
- {"table1", 0, 2, "2020-07-25", true, 1},
- {"table2", 0, 2, "2020-07-25", false, 1},
- {"table3", 1, 3, "2020-07-25", true, 4},
- {"table4", 1, 3, "2020-07-25", false, 4},
- {"table5", 2, 1, "2020-07-25", true, 10},
- {"table6", 2, 1, "2020-07-25", false, 10},
- {"table7", 3, 2, "2020-07-25", true, 75},
- {"table8", 3, 2, "2020-07-25", false, 75}
+ {"table1", "yyyyMMdd", 0, 2, "2020-07-25", true, 1},
+ {"table2", "yyyyMMdd", 0, 2, "2020-07-25", false, 1},
+ {"table3", "yyyyMMMdd", 1, 3, "2020-07-25", true, 4},
+ {"table4", "yyyyMMMdd", 1, 3, "2020-07-25", false, 4},
+ {"table5", "yyyy-MM-dd", 2, 1, "2020-07-25", true, 10},
+ {"table6", "yyyy-MM-dd", 2, 1, "2020-07-25", false, 10},
+ {"table7", "yyyy-MMM-dd", 3, 2, "2020-07-25", true, 75},
+ {"table8", "yyyy-MMM-dd", 3, 2, "2020-07-25", false, 75}
};
return Stream.of(data).map(Arguments::of);
}
@@ -188,13 +192,14 @@ public class TestDatePartitionPathSelector extends HoodieClientTestHarness {
@MethodSource("configParams")
public void testPruneDatePartitionPaths(
String tableName,
+ String dateFormat,
int datePartitionDepth,
int numPrevDaysToList,
String currentDate,
boolean isHiveStylePartition,
int expectedNumFiles)
throws IOException {
- TypedProperties props = getProps(basePath + "/" + tableName, datePartitionDepth, numPrevDaysToList, currentDate);
+ TypedProperties props = getProps(basePath + "/" + tableName, dateFormat, datePartitionDepth, numPrevDaysToList, currentDate);
DatePartitionPathSelector pathSelector = new DatePartitionPathSelector(props, jsc.hadoopConfiguration());
Path root = new Path(props.getString(ROOT_INPUT_PATH_PROP));
@@ -203,10 +208,9 @@ public class TestDatePartitionPathSelector extends HoodieClientTestHarness {
// Create parent dir
List leafDirs = new ArrayList<>();
createParentDirsBeforeDatePartitions(root, generateRandomStrings(), totalDepthBeforeDatePartitions, leafDirs);
- createDatePartitionsWithFiles(leafDirs, isHiveStylePartition);
+ createDatePartitionsWithFiles(leafDirs, isHiveStylePartition, dateFormat);
List paths = pathSelector.pruneDatePartitionPaths(context, fs, root.toString());
-
- assertEquals(expectedNumFiles, pathSelector.pruneDatePartitionPaths(context, fs, root.toString()).size());
+ assertEquals(expectedNumFiles, paths.size());
}
}