1
0

[HUDI-1655] Support custom date format and fix unsupported exception in DatePartitionPathSelector (#2621)

- Add a config to allow parsing custom date format in `DatePartitionPathSelector`. Currently it assumes date partition string in the format of `yyyy-MM-dd`.
- Fix a bug where `UnsupportedOperationException` was thrown when sort `eligibleFiles` in-place. Changed to sort it and store in a new list.
This commit is contained in:
Raymond Xu
2021-03-04 21:01:51 -08:00
committed by GitHub
parent 7cc75e0be2
commit f53bca404f
2 changed files with 41 additions and 27 deletions

View File

@@ -35,13 +35,16 @@ import org.apache.log4j.Logger;
import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.JavaSparkContext;
import java.time.LocalDate; import java.time.LocalDate;
import java.time.format.DateTimeFormatter;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Comparator; import java.util.Comparator;
import java.util.List; import java.util.List;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import static org.apache.hudi.utilities.sources.helpers.DFSPathSelector.Config.ROOT_INPUT_PATH_PROP; import static org.apache.hudi.utilities.sources.helpers.DFSPathSelector.Config.ROOT_INPUT_PATH_PROP;
import static org.apache.hudi.utilities.sources.helpers.DatePartitionPathSelector.Config.DATE_FORMAT;
import static org.apache.hudi.utilities.sources.helpers.DatePartitionPathSelector.Config.DATE_PARTITION_DEPTH; import static org.apache.hudi.utilities.sources.helpers.DatePartitionPathSelector.Config.DATE_PARTITION_DEPTH;
import static org.apache.hudi.utilities.sources.helpers.DatePartitionPathSelector.Config.DEFAULT_DATE_FORMAT;
import static org.apache.hudi.utilities.sources.helpers.DatePartitionPathSelector.Config.DEFAULT_DATE_PARTITION_DEPTH; import static org.apache.hudi.utilities.sources.helpers.DatePartitionPathSelector.Config.DEFAULT_DATE_PARTITION_DEPTH;
import static org.apache.hudi.utilities.sources.helpers.DatePartitionPathSelector.Config.DEFAULT_LOOKBACK_DAYS; import static org.apache.hudi.utilities.sources.helpers.DatePartitionPathSelector.Config.DEFAULT_LOOKBACK_DAYS;
import static org.apache.hudi.utilities.sources.helpers.DatePartitionPathSelector.Config.DEFAULT_PARTITIONS_LIST_PARALLELISM; import static org.apache.hudi.utilities.sources.helpers.DatePartitionPathSelector.Config.DEFAULT_PARTITIONS_LIST_PARALLELISM;
@@ -59,12 +62,16 @@ import static org.apache.hudi.utilities.sources.helpers.DatePartitionPathSelecto
* <p>The date based partition is expected to be of the format '<date string>=yyyy-mm-dd' or * <p>The date based partition is expected to be of the format '<date string>=yyyy-mm-dd' or
* 'yyyy-mm-dd'. The date partition can be at any level. For ex. the partition path can be of the * 'yyyy-mm-dd'. The date partition can be at any level. For ex. the partition path can be of the
* form `<basepath>/<partition-field1>/<date-based-partition>/<partition-field3>/` or * form `<basepath>/<partition-field1>/<date-based-partition>/<partition-field3>/` or
* `<basepath>/<<date-based-partition>/` * `<basepath>/<<date-based-partition>/`.
*
* <p>The date based partition format can be configured via this property
* hoodie.deltastreamer.source.dfs.datepartitioned.date.format
*/ */
public class DatePartitionPathSelector extends DFSPathSelector { public class DatePartitionPathSelector extends DFSPathSelector {
private static volatile Logger LOG = LogManager.getLogger(DatePartitionPathSelector.class); private static volatile Logger LOG = LogManager.getLogger(DatePartitionPathSelector.class);
private final String dateFormat;
private final int datePartitionDepth; private final int datePartitionDepth;
private final int numPrevDaysToList; private final int numPrevDaysToList;
private final LocalDate fromDate; private final LocalDate fromDate;
@@ -73,6 +80,9 @@ public class DatePartitionPathSelector extends DFSPathSelector {
/** Configs supported. */ /** Configs supported. */
public static class Config { public static class Config {
public static final String DATE_FORMAT = "hoodie.deltastreamer.source.dfs.datepartitioned.date.format";
public static final String DEFAULT_DATE_FORMAT = "yyyy-MM-dd";
public static final String DATE_PARTITION_DEPTH = public static final String DATE_PARTITION_DEPTH =
"hoodie.deltastreamer.source.dfs.datepartitioned.selector.depth"; "hoodie.deltastreamer.source.dfs.datepartitioned.selector.depth";
public static final int DEFAULT_DATE_PARTITION_DEPTH = 0; // Implies no (date) partition public static final int DEFAULT_DATE_PARTITION_DEPTH = 0; // Implies no (date) partition
@@ -84,7 +94,6 @@ public class DatePartitionPathSelector extends DFSPathSelector {
public static final String CURRENT_DATE = public static final String CURRENT_DATE =
"hoodie.deltastreamer.source.dfs.datepartitioned.selector.currentdate"; "hoodie.deltastreamer.source.dfs.datepartitioned.selector.currentdate";
public static final String PARTITIONS_LIST_PARALLELISM = public static final String PARTITIONS_LIST_PARALLELISM =
"hoodie.deltastreamer.source.dfs.datepartitioned.selector.parallelism"; "hoodie.deltastreamer.source.dfs.datepartitioned.selector.parallelism";
public static final int DEFAULT_PARTITIONS_LIST_PARALLELISM = 20; public static final int DEFAULT_PARTITIONS_LIST_PARALLELISM = 20;
@@ -96,6 +105,7 @@ public class DatePartitionPathSelector extends DFSPathSelector {
* datePartitionDepth = 0 is same as basepath and there is no partition. In which case * datePartitionDepth = 0 is same as basepath and there is no partition. In which case
* this path selector would be a no-op and lists all paths under the table basepath. * this path selector would be a no-op and lists all paths under the table basepath.
*/ */
dateFormat = props.getString(DATE_FORMAT, DEFAULT_DATE_FORMAT);
datePartitionDepth = props.getInteger(DATE_PARTITION_DEPTH, DEFAULT_DATE_PARTITION_DEPTH); datePartitionDepth = props.getInteger(DATE_PARTITION_DEPTH, DEFAULT_DATE_PARTITION_DEPTH);
// If not specified the current date is assumed by default. // If not specified the current date is assumed by default.
currentDate = LocalDate.parse(props.getString(Config.CURRENT_DATE, LocalDate.now().toString())); currentDate = LocalDate.parse(props.getString(Config.CURRENT_DATE, LocalDate.now().toString()));
@@ -130,20 +140,19 @@ public class DatePartitionPathSelector extends DFSPathSelector {
FileSystem fs = new Path(path).getFileSystem(serializedConf.get()); FileSystem fs = new Path(path).getFileSystem(serializedConf.get());
return listEligibleFiles(fs, new Path(path), lastCheckpointTime).stream(); return listEligibleFiles(fs, new Path(path), lastCheckpointTime).stream();
}, partitionsListParallelism); }, partitionsListParallelism);
// sort them by modification time. // sort them by modification time ascending.
eligibleFiles.sort(Comparator.comparingLong(FileStatus::getModificationTime)); List<FileStatus> sortedEligibleFiles = eligibleFiles.stream()
.sorted(Comparator.comparingLong(FileStatus::getModificationTime)).collect(Collectors.toList());
// Filter based on checkpoint & input size, if needed // Filter based on checkpoint & input size, if needed
long currentBytes = 0; long currentBytes = 0;
long maxModificationTime = Long.MIN_VALUE;
List<FileStatus> filteredFiles = new ArrayList<>(); List<FileStatus> filteredFiles = new ArrayList<>();
for (FileStatus f : eligibleFiles) { for (FileStatus f : sortedEligibleFiles) {
if (currentBytes + f.getLen() >= sourceLimit) { if (currentBytes + f.getLen() >= sourceLimit) {
// we have enough data, we are done // we have enough data, we are done
break; break;
} }
maxModificationTime = f.getModificationTime();
currentBytes += f.getLen(); currentBytes += f.getLen();
filteredFiles.add(f); filteredFiles.add(f);
} }
@@ -156,7 +165,7 @@ public class DatePartitionPathSelector extends DFSPathSelector {
// read the files out. // read the files out.
String pathStr = filteredFiles.stream().map(f -> f.getPath().toString()).collect(Collectors.joining(",")); String pathStr = filteredFiles.stream().map(f -> f.getPath().toString()).collect(Collectors.joining(","));
long maxModificationTime = filteredFiles.get(filteredFiles.size() - 1).getModificationTime();
return new ImmutablePair<>(Option.ofNullable(pathStr), String.valueOf(maxModificationTime)); return new ImmutablePair<>(Option.ofNullable(pathStr), String.valueOf(maxModificationTime));
} }
@@ -193,14 +202,15 @@ public class DatePartitionPathSelector extends DFSPathSelector {
String[] splits = s.split("/"); String[] splits = s.split("/");
String datePartition = splits[splits.length - 1]; String datePartition = splits[splits.length - 1];
LocalDate partitionDate; LocalDate partitionDate;
DateTimeFormatter dateFormatter = DateTimeFormatter.ofPattern(dateFormat);
if (datePartition.contains("=")) { if (datePartition.contains("=")) {
String[] moreSplit = datePartition.split("="); String[] moreSplit = datePartition.split("=");
ValidationUtils.checkArgument( ValidationUtils.checkArgument(
moreSplit.length == 2, moreSplit.length == 2,
"Partition Field (" + datePartition + ") not in expected format"); "Partition Field (" + datePartition + ") not in expected format");
partitionDate = LocalDate.parse(moreSplit[1]); partitionDate = LocalDate.parse(moreSplit[1], dateFormatter);
} else { } else {
partitionDate = LocalDate.parse(datePartition); partitionDate = LocalDate.parse(datePartition, dateFormatter);
} }
return (partitionDate.isEqual(fromDate) || partitionDate.isAfter(fromDate)) return (partitionDate.isEqual(fromDate) || partitionDate.isAfter(fromDate))
&& (partitionDate.isEqual(currentDate) || partitionDate.isBefore(currentDate)); && (partitionDate.isEqual(currentDate) || partitionDate.isBefore(currentDate));

View File

@@ -33,6 +33,7 @@ import org.junit.jupiter.params.provider.MethodSource;
import java.io.IOException; import java.io.IOException;
import java.time.LocalDate; import java.time.LocalDate;
import java.time.format.DateTimeFormatter;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.UUID; import java.util.UUID;
@@ -40,6 +41,7 @@ import java.util.stream.Stream;
import static org.apache.hudi.utilities.sources.helpers.DFSPathSelector.Config.ROOT_INPUT_PATH_PROP; import static org.apache.hudi.utilities.sources.helpers.DFSPathSelector.Config.ROOT_INPUT_PATH_PROP;
import static org.apache.hudi.utilities.sources.helpers.DatePartitionPathSelector.Config.CURRENT_DATE; import static org.apache.hudi.utilities.sources.helpers.DatePartitionPathSelector.Config.CURRENT_DATE;
import static org.apache.hudi.utilities.sources.helpers.DatePartitionPathSelector.Config.DATE_FORMAT;
import static org.apache.hudi.utilities.sources.helpers.DatePartitionPathSelector.Config.DATE_PARTITION_DEPTH; import static org.apache.hudi.utilities.sources.helpers.DatePartitionPathSelector.Config.DATE_PARTITION_DEPTH;
import static org.apache.hudi.utilities.sources.helpers.DatePartitionPathSelector.Config.LOOKBACK_DAYS; import static org.apache.hudi.utilities.sources.helpers.DatePartitionPathSelector.Config.LOOKBACK_DAYS;
import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -78,11 +80,11 @@ public class TestDatePartitionPathSelector extends HoodieClientTestHarness {
/* /*
* Create Date partitions with some files under each of the leaf Dirs. * Create Date partitions with some files under each of the leaf Dirs.
*/ */
public List<Path> createDatePartitionsWithFiles(List<Path> leafDirs, boolean hiveStyle) public List<Path> createDatePartitionsWithFiles(List<Path> leafDirs, boolean hiveStyle, String dateFormat)
throws IOException { throws IOException {
List<Path> allFiles = new ArrayList<>(); List<Path> allFiles = new ArrayList<>();
for (Path path : leafDirs) { for (Path path : leafDirs) {
List<Path> datePartitions = generateDatePartitionsUnder(path, hiveStyle); List<Path> datePartitions = generateDatePartitionsUnder(path, hiveStyle, dateFormat);
for (Path datePartition : datePartitions) { for (Path datePartition : datePartitions) {
allFiles.addAll(createRandomFilesUnder(datePartition)); allFiles.addAll(createRandomFilesUnder(datePartition));
} }
@@ -126,11 +128,12 @@ public class TestDatePartitionPathSelector extends HoodieClientTestHarness {
/* /*
* Generate date based partitions under a parent dir with or without hivestyle formatting. * Generate date based partitions under a parent dir with or without hivestyle formatting.
*/ */
private List<Path> generateDatePartitionsUnder(Path parent, boolean hiveStyle) throws IOException { private List<Path> generateDatePartitionsUnder(Path parent, boolean hiveStyle, String dateFormat) throws IOException {
DateTimeFormatter formatter = DateTimeFormatter.ofPattern(dateFormat);
List<Path> datePartitions = new ArrayList<>(); List<Path> datePartitions = new ArrayList<>();
String prefix = (hiveStyle ? "dt=" : ""); String prefix = (hiveStyle ? "dt=" : "");
for (int i = 0; i < 5; i++) { for (int i = 0; i < 5; i++) {
Path child = new Path(parent, prefix + totalDates.get(i).toString()); Path child = new Path(parent, prefix + formatter.format(totalDates.get(i)));
fs.mkdirs(child); fs.mkdirs(child);
datePartitions.add(child); datePartitions.add(child);
} }
@@ -155,9 +158,10 @@ public class TestDatePartitionPathSelector extends HoodieClientTestHarness {
} }
private static TypedProperties getProps( private static TypedProperties getProps(
String basePath, int datePartitionDepth, int numDaysToList, String currentDate) { String basePath, String dateFormat, int datePartitionDepth, int numDaysToList, String currentDate) {
TypedProperties properties = new TypedProperties(); TypedProperties properties = new TypedProperties();
properties.put(ROOT_INPUT_PATH_PROP, basePath); properties.put(ROOT_INPUT_PATH_PROP, basePath);
properties.put(DATE_FORMAT, dateFormat);
properties.put(DATE_PARTITION_DEPTH, "" + datePartitionDepth); properties.put(DATE_PARTITION_DEPTH, "" + datePartitionDepth);
properties.put(LOOKBACK_DAYS, "" + numDaysToList); properties.put(LOOKBACK_DAYS, "" + numDaysToList);
properties.put(CURRENT_DATE, currentDate); properties.put(CURRENT_DATE, currentDate);
@@ -172,14 +176,14 @@ public class TestDatePartitionPathSelector extends HoodieClientTestHarness {
private static Stream<Arguments> configParams() { private static Stream<Arguments> configParams() {
Object[][] data = Object[][] data =
new Object[][] { new Object[][] {
{"table1", 0, 2, "2020-07-25", true, 1}, {"table1", "yyyyMMdd", 0, 2, "2020-07-25", true, 1},
{"table2", 0, 2, "2020-07-25", false, 1}, {"table2", "yyyyMMdd", 0, 2, "2020-07-25", false, 1},
{"table3", 1, 3, "2020-07-25", true, 4}, {"table3", "yyyyMMMdd", 1, 3, "2020-07-25", true, 4},
{"table4", 1, 3, "2020-07-25", false, 4}, {"table4", "yyyyMMMdd", 1, 3, "2020-07-25", false, 4},
{"table5", 2, 1, "2020-07-25", true, 10}, {"table5", "yyyy-MM-dd", 2, 1, "2020-07-25", true, 10},
{"table6", 2, 1, "2020-07-25", false, 10}, {"table6", "yyyy-MM-dd", 2, 1, "2020-07-25", false, 10},
{"table7", 3, 2, "2020-07-25", true, 75}, {"table7", "yyyy-MMM-dd", 3, 2, "2020-07-25", true, 75},
{"table8", 3, 2, "2020-07-25", false, 75} {"table8", "yyyy-MMM-dd", 3, 2, "2020-07-25", false, 75}
}; };
return Stream.of(data).map(Arguments::of); return Stream.of(data).map(Arguments::of);
} }
@@ -188,13 +192,14 @@ public class TestDatePartitionPathSelector extends HoodieClientTestHarness {
@MethodSource("configParams") @MethodSource("configParams")
public void testPruneDatePartitionPaths( public void testPruneDatePartitionPaths(
String tableName, String tableName,
String dateFormat,
int datePartitionDepth, int datePartitionDepth,
int numPrevDaysToList, int numPrevDaysToList,
String currentDate, String currentDate,
boolean isHiveStylePartition, boolean isHiveStylePartition,
int expectedNumFiles) int expectedNumFiles)
throws IOException { throws IOException {
TypedProperties props = getProps(basePath + "/" + tableName, datePartitionDepth, numPrevDaysToList, currentDate); TypedProperties props = getProps(basePath + "/" + tableName, dateFormat, datePartitionDepth, numPrevDaysToList, currentDate);
DatePartitionPathSelector pathSelector = new DatePartitionPathSelector(props, jsc.hadoopConfiguration()); DatePartitionPathSelector pathSelector = new DatePartitionPathSelector(props, jsc.hadoopConfiguration());
Path root = new Path(props.getString(ROOT_INPUT_PATH_PROP)); Path root = new Path(props.getString(ROOT_INPUT_PATH_PROP));
@@ -203,10 +208,9 @@ public class TestDatePartitionPathSelector extends HoodieClientTestHarness {
// Create parent dir // Create parent dir
List<Path> leafDirs = new ArrayList<>(); List<Path> leafDirs = new ArrayList<>();
createParentDirsBeforeDatePartitions(root, generateRandomStrings(), totalDepthBeforeDatePartitions, leafDirs); createParentDirsBeforeDatePartitions(root, generateRandomStrings(), totalDepthBeforeDatePartitions, leafDirs);
createDatePartitionsWithFiles(leafDirs, isHiveStylePartition); createDatePartitionsWithFiles(leafDirs, isHiveStylePartition, dateFormat);
List<String> paths = pathSelector.pruneDatePartitionPaths(context, fs, root.toString()); List<String> paths = pathSelector.pruneDatePartitionPaths(context, fs, root.toString());
assertEquals(expectedNumFiles, paths.size());
assertEquals(expectedNumFiles, pathSelector.pruneDatePartitionPaths(context, fs, root.toString()).size());
} }
} }