1
0

[HUDI-1406] Add date partition based source input selector for Delta streamer (#2264)

- Adds ability to list only recent date based partitions from source data.
- Parallelizes listing for faster tailing of DFSSources
This commit is contained in:
Bhavani Sudha Saktheeswaran
2020-12-17 03:59:30 -08:00
committed by GitHub
parent 4ddfc61d70
commit 14d5d1100c
7 changed files with 444 additions and 8 deletions

View File

@@ -52,7 +52,7 @@ public class AvroDFSSource extends AvroSource {
@Override
protected InputBatch<JavaRDD<GenericRecord>> fetchNewData(Option<String> lastCkptStr, long sourceLimit) {
Pair<Option<String>, String> selectPathsWithMaxModificationTime =
pathSelector.getNextFilePathsAndMaxModificationTime(lastCkptStr, sourceLimit);
pathSelector.getNextFilePathsAndMaxModificationTime(sparkContext, lastCkptStr, sourceLimit);
return selectPathsWithMaxModificationTime.getLeft()
.map(pathStr -> new InputBatch<>(Option.of(fromFiles(pathStr)), selectPathsWithMaxModificationTime.getRight()))
.orElseGet(() -> new InputBatch<>(Option.empty(), selectPathsWithMaxModificationTime.getRight()));

View File

@@ -92,7 +92,7 @@ public class CsvDFSSource extends RowSource {
protected Pair<Option<Dataset<Row>>, String> fetchNextBatch(Option<String> lastCkptStr,
long sourceLimit) {
Pair<Option<String>, String> selPathsWithMaxModificationTime =
pathSelector.getNextFilePathsAndMaxModificationTime(lastCkptStr, sourceLimit);
pathSelector.getNextFilePathsAndMaxModificationTime(sparkContext, lastCkptStr, sourceLimit);
return Pair.of(fromFiles(
selPathsWithMaxModificationTime.getLeft()), selPathsWithMaxModificationTime.getRight());
}

View File

@@ -44,7 +44,7 @@ public class JsonDFSSource extends JsonSource {
@Override
protected InputBatch<JavaRDD<String>> fetchNewData(Option<String> lastCkptStr, long sourceLimit) {
Pair<Option<String>, String> selPathsWithMaxModificationTime =
pathSelector.getNextFilePathsAndMaxModificationTime(lastCkptStr, sourceLimit);
pathSelector.getNextFilePathsAndMaxModificationTime(sparkContext, lastCkptStr, sourceLimit);
return selPathsWithMaxModificationTime.getLeft()
.map(pathStr -> new InputBatch<>(Option.of(fromFiles(pathStr)), selPathsWithMaxModificationTime.getRight()))
.orElse(new InputBatch<>(Option.empty(), selPathsWithMaxModificationTime.getRight()));

View File

@@ -45,7 +45,7 @@ public class ParquetDFSSource extends RowSource {
@Override
public Pair<Option<Dataset<Row>>, String> fetchNextBatch(Option<String> lastCkptStr, long sourceLimit) {
Pair<Option<String>, String> selectPathsWithMaxModificationTime =
pathSelector.getNextFilePathsAndMaxModificationTime(lastCkptStr, sourceLimit);
pathSelector.getNextFilePathsAndMaxModificationTime(sparkContext, lastCkptStr, sourceLimit);
return selectPathsWithMaxModificationTime.getLeft()
.map(pathStr -> Pair.of(Option.of(fromFiles(pathStr)), selectPathsWithMaxModificationTime.getRight()))
.orElseGet(() -> Pair.of(Option.empty(), selectPathsWithMaxModificationTime.getRight()));

View File

@@ -34,8 +34,10 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.spark.api.java.JavaSparkContext;
import java.io.IOException;
import java.io.Serializable;
import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
@@ -43,7 +45,7 @@ import java.util.ArrayList;
import java.util.List;
import java.util.stream.Collectors;
public class DFSPathSelector {
public class DFSPathSelector implements Serializable {
protected static volatile Logger log = LogManager.getLogger(DFSPathSelector.class);
@@ -90,13 +92,26 @@ public class DFSPathSelector {
/**
* Get the list of files changed since last checkpoint.
*
* @param sparkContext JavaSparkContext to help parallelize certain operations
* @param lastCheckpointStr the last checkpoint time string, empty if first run
* @param sourceLimit max bytes to read each time
* @return the list of files concatenated and their latest modified time
*/
public Pair<Option<String>, String> getNextFilePathsAndMaxModificationTime(Option<String> lastCheckpointStr,
long sourceLimit) {
public Pair<Option<String>, String> getNextFilePathsAndMaxModificationTime(JavaSparkContext sparkContext, Option<String> lastCheckpointStr,
long sourceLimit) {
return getNextFilePathsAndMaxModificationTime(lastCheckpointStr, sourceLimit);
}
/**
* Get the list of files changed since last checkpoint.
*
* @param lastCheckpointStr the last checkpoint time string, empty if first run
* @param sourceLimit max bytes to read each time
* @return the list of files concatenated and their latest modified time
*/
@Deprecated
public Pair<Option<String>, String> getNextFilePathsAndMaxModificationTime(Option<String> lastCheckpointStr,
long sourceLimit) {
try {
// obtain all eligible files under root folder.
log.info("Root path => " + props.getString(Config.ROOT_INPUT_PATH_PROP) + " source limit => " + sourceLimit);
@@ -136,7 +151,7 @@ public class DFSPathSelector {
/**
* List files recursively, filter out illegible files/directories while doing so.
*/
private List<FileStatus> listEligibleFiles(FileSystem fs, Path path, long lastCheckpointTime) throws IOException {
protected List<FileStatus> listEligibleFiles(FileSystem fs, Path path, long lastCheckpointTime) throws IOException {
// skip files/dirs whose names start with (_, ., etc)
FileStatus[] statuses = fs.listStatus(path, file ->
IGNORE_FILEPREFIX_LIST.stream().noneMatch(pfx -> file.getName().startsWith(pfx)));

View File

@@ -0,0 +1,209 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.utilities.sources.helpers;
import org.apache.hudi.client.common.HoodieSparkEngineContext;
import org.apache.hudi.common.config.SerializableConfiguration;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.common.util.collection.ImmutablePair;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.spark.api.java.JavaSparkContext;
import java.time.LocalDate;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.List;
import java.util.stream.Collectors;
import static org.apache.hudi.utilities.sources.helpers.DFSPathSelector.Config.ROOT_INPUT_PATH_PROP;
import static org.apache.hudi.utilities.sources.helpers.DatePartitionPathSelector.Config.DATE_PARTITION_DEPTH;
import static org.apache.hudi.utilities.sources.helpers.DatePartitionPathSelector.Config.DEFAULT_DATE_PARTITION_DEPTH;
import static org.apache.hudi.utilities.sources.helpers.DatePartitionPathSelector.Config.DEFAULT_LOOKBACK_DAYS;
import static org.apache.hudi.utilities.sources.helpers.DatePartitionPathSelector.Config.DEFAULT_PARTITIONS_LIST_PARALLELISM;
import static org.apache.hudi.utilities.sources.helpers.DatePartitionPathSelector.Config.LOOKBACK_DAYS;
import static org.apache.hudi.utilities.sources.helpers.DatePartitionPathSelector.Config.PARTITIONS_LIST_PARALLELISM;
/**
* Custom dfs path selector used to list just the last few days provided there is a date based
* partition.
*
* <p>This is useful for workloads where there are multiple partition fields and only recent
* partitions are affected by new writes. Especially if the data sits in S3, listing all historical
* data can be time expensive and unnecessary for the above type of workload.
*
* <p>The date based partition is expected to be of the format '<date string>=yyyy-mm-dd' or
* 'yyyy-mm-dd'. The date partition can be at any level. For ex. the partition path can be of the
* form `<basepath>/<partition-field1>/<date-based-partition>/<partition-field3>/` or
* `<basepath>/<<date-based-partition>/`
*/
public class DatePartitionPathSelector extends DFSPathSelector {
private static volatile Logger LOG = LogManager.getLogger(DatePartitionPathSelector.class);
private final int datePartitionDepth;
private final int numPrevDaysToList;
private final LocalDate fromDate;
private final LocalDate currentDate;
private final int partitionsListParallelism;
/** Configs supported. */
public static class Config {
public static final String DATE_PARTITION_DEPTH =
"hoodie.deltastreamer.source.dfs.datepartitioned.selector.depth";
public static final int DEFAULT_DATE_PARTITION_DEPTH = 0; // Implies no (date) partition
public static final String LOOKBACK_DAYS =
"hoodie.deltastreamer.source.dfs.datepartitioned.selector.lookback.days";
public static final int DEFAULT_LOOKBACK_DAYS = 2;
public static final String CURRENT_DATE =
"hoodie.deltastreamer.source.dfs.datepartitioned.selector.currentdate";
public static final String PARTITIONS_LIST_PARALLELISM =
"hoodie.deltastreamer.source.dfs.datepartitioned.selector.parallelism";
public static final int DEFAULT_PARTITIONS_LIST_PARALLELISM = 20;
}
public DatePartitionPathSelector(TypedProperties props, Configuration hadoopConf) {
super(props, hadoopConf);
/*
* datePartitionDepth = 0 is same as basepath and there is no partition. In which case
* this path selector would be a no-op and lists all paths under the table basepath.
*/
datePartitionDepth = props.getInteger(DATE_PARTITION_DEPTH, DEFAULT_DATE_PARTITION_DEPTH);
// If not specified the current date is assumed by default.
currentDate = LocalDate.parse(props.getString(Config.CURRENT_DATE, LocalDate.now().toString()));
numPrevDaysToList = props.getInteger(LOOKBACK_DAYS, DEFAULT_LOOKBACK_DAYS);
fromDate = currentDate.minusDays(numPrevDaysToList);
partitionsListParallelism = props.getInteger(PARTITIONS_LIST_PARALLELISM, DEFAULT_PARTITIONS_LIST_PARALLELISM);
}
@Override
public Pair<Option<String>, String> getNextFilePathsAndMaxModificationTime(JavaSparkContext sparkContext,
Option<String> lastCheckpointStr,
long sourceLimit) {
// obtain all eligible files under root folder.
LOG.info(
"Root path => "
+ props.getString(ROOT_INPUT_PATH_PROP)
+ " source limit => "
+ sourceLimit
+ " depth of day partition => "
+ datePartitionDepth
+ " num prev days to list => "
+ numPrevDaysToList
+ " from current date => "
+ currentDate);
long lastCheckpointTime = lastCheckpointStr.map(Long::parseLong).orElse(Long.MIN_VALUE);
HoodieSparkEngineContext context = new HoodieSparkEngineContext(sparkContext);
SerializableConfiguration serializedConf = new SerializableConfiguration(fs.getConf());
List<String> prunedParitionPaths = pruneDatePartitionPaths(context, fs, props.getString(ROOT_INPUT_PATH_PROP));
List<FileStatus> eligibleFiles = context.flatMap(prunedParitionPaths,
path -> {
FileSystem fs = new Path(path).getFileSystem(serializedConf.get());
return listEligibleFiles(fs, new Path(path), lastCheckpointTime).stream();
}, partitionsListParallelism);
// sort them by modification time.
eligibleFiles.sort(Comparator.comparingLong(FileStatus::getModificationTime));
// Filter based on checkpoint & input size, if needed
long currentBytes = 0;
long maxModificationTime = Long.MIN_VALUE;
List<FileStatus> filteredFiles = new ArrayList<>();
for (FileStatus f : eligibleFiles) {
if (currentBytes + f.getLen() >= sourceLimit) {
// we have enough data, we are done
break;
}
maxModificationTime = f.getModificationTime();
currentBytes += f.getLen();
filteredFiles.add(f);
}
// no data to read
if (filteredFiles.isEmpty()) {
return new ImmutablePair<>(
Option.empty(), lastCheckpointStr.orElseGet(() -> String.valueOf(Long.MIN_VALUE)));
}
// read the files out.
String pathStr = filteredFiles.stream().map(f -> f.getPath().toString()).collect(Collectors.joining(","));
return new ImmutablePair<>(Option.ofNullable(pathStr), String.valueOf(maxModificationTime));
}
/**
* Prunes date level partitions to last few days configured by 'NUM_PREV_DAYS_TO_LIST' from
* 'CURRENT_DATE'. Parallelizes listing by leveraging HoodieSparkEngineContext's methods.
*/
public List<String> pruneDatePartitionPaths(HoodieSparkEngineContext context, FileSystem fs, String rootPath) {
List<String> partitionPaths = new ArrayList<>();
// get all partition paths before date partition level
partitionPaths.add(rootPath);
if (datePartitionDepth <= 0) {
return partitionPaths;
}
SerializableConfiguration serializedConf = new SerializableConfiguration(fs.getConf());
for (int i = 0; i < datePartitionDepth; i++) {
partitionPaths = context.flatMap(partitionPaths, path -> {
Path subDir = new Path(path);
FileSystem fileSystem = subDir.getFileSystem(serializedConf.get());
// skip files/dirs whose names start with (_, ., etc)
FileStatus[] statuses = fileSystem.listStatus(subDir,
file -> IGNORE_FILEPREFIX_LIST.stream().noneMatch(pfx -> file.getName().startsWith(pfx)));
List<String> res = new ArrayList<>();
for (FileStatus status : statuses) {
res.add(status.getPath().toString());
}
return res.stream();
}, partitionsListParallelism);
}
// Prune date partitions to last few days
return context.getJavaSparkContext().parallelize(partitionPaths, partitionsListParallelism)
.filter(s -> {
String[] splits = s.split("/");
String datePartition = splits[splits.length - 1];
LocalDate partitionDate;
if (datePartition.contains("=")) {
String[] moreSplit = datePartition.split("=");
ValidationUtils.checkArgument(
moreSplit.length == 2,
"Partition Field (" + datePartition + ") not in expected format");
partitionDate = LocalDate.parse(moreSplit[1]);
} else {
partitionDate = LocalDate.parse(datePartition);
}
return (partitionDate.isEqual(fromDate) || partitionDate.isAfter(fromDate))
&& (partitionDate.isEqual(currentDate) || partitionDate.isBefore(currentDate));
}).collect();
}
}

View File

@@ -0,0 +1,212 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.utilities.sources.helpers;
import org.apache.hudi.client.common.HoodieSparkEngineContext;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.testutils.HoodieClientTestHarness;
import org.apache.hudi.utilities.testutils.UtilitiesTestBase;
import org.apache.hadoop.fs.Path;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;
import java.io.IOException;
import java.time.LocalDate;
import java.util.ArrayList;
import java.util.List;
import java.util.UUID;
import java.util.stream.Stream;
import static org.apache.hudi.utilities.sources.helpers.DFSPathSelector.Config.ROOT_INPUT_PATH_PROP;
import static org.apache.hudi.utilities.sources.helpers.DatePartitionPathSelector.Config.CURRENT_DATE;
import static org.apache.hudi.utilities.sources.helpers.DatePartitionPathSelector.Config.DATE_PARTITION_DEPTH;
import static org.apache.hudi.utilities.sources.helpers.DatePartitionPathSelector.Config.LOOKBACK_DAYS;
import static org.junit.jupiter.api.Assertions.assertEquals;
public class TestDatePartitionPathSelector extends HoodieClientTestHarness {
private transient HoodieSparkEngineContext context = null;
static List<LocalDate> totalDates;
@BeforeAll
public static void initClass() {
String s = "2020-07-21";
String e = "2020-07-25";
LocalDate start = LocalDate.parse(s);
LocalDate end = LocalDate.parse(e);
totalDates = new ArrayList<>();
while (!start.isAfter(end)) {
totalDates.add(start);
start = start.plusDays(1);
}
}
@BeforeEach
public void setup() {
initSparkContexts();
initPath();
initFileSystem();
context = new HoodieSparkEngineContext(jsc);
}
@AfterEach
public void teardown() throws Exception {
cleanupResources();
}
/*
* Create Date partitions with some files under each of the leaf Dirs.
*/
public List<Path> createDatePartitionsWithFiles(List<Path> leafDirs, boolean hiveStyle)
throws IOException {
List<Path> allFiles = new ArrayList<>();
for (Path path : leafDirs) {
List<Path> datePartitions = generateDatePartitionsUnder(path, hiveStyle);
for (Path datePartition : datePartitions) {
allFiles.addAll(createRandomFilesUnder(datePartition));
}
}
return allFiles;
}
/**
* Create all parent level dirs before the date partitions.
*
* @param root Current parent dir. Initially this points to table basepath.
* @param dirs List o sub dirs to be created under root.
* @param depth Depth of partitions before date partitions.
* @param leafDirs Collect list of leaf dirs. These will be the immediate parents of date based partitions.
* @throws IOException
*/
public void createParentDirsBeforeDatePartitions(Path root, List<String> dirs, int depth, List<Path> leafDirs)
throws IOException {
if (depth <= 0) {
leafDirs.add(root);
return;
}
for (String s : dirs) {
Path subdir = new Path(root, s);
fs.mkdirs(subdir);
createParentDirsBeforeDatePartitions(subdir, generateRandomStrings(), depth - 1, leafDirs);
}
}
/*
* Random string generation util used for generating file names or file contents.
*/
private List<String> generateRandomStrings() {
List<String> subDirs = new ArrayList<>();
for (int i = 0; i < 5; i++) {
subDirs.add(UUID.randomUUID().toString());
}
return subDirs;
}
/*
* Generate date based partitions under a parent dir with or without hivestyle formatting.
*/
private List<Path> generateDatePartitionsUnder(Path parent, boolean hiveStyle) throws IOException {
List<Path> datePartitions = new ArrayList<>();
String prefix = (hiveStyle ? "dt=" : "");
for (int i = 0; i < 5; i++) {
Path child = new Path(parent, prefix + totalDates.get(i).toString());
fs.mkdirs(child);
datePartitions.add(child);
}
return datePartitions;
}
/*
* Creates random files under the given directory.
*/
private List<Path> createRandomFilesUnder(Path path) throws IOException {
List<Path> resultFiles = new ArrayList<>();
List<String> fileNames = generateRandomStrings();
for (String fileName : fileNames) {
List<String> fileContent = generateRandomStrings();
String[] lines = new String[fileContent.size()];
lines = fileContent.toArray(lines);
Path file = new Path(path, fileName);
UtilitiesTestBase.Helpers.saveStringsToDFS(lines, fs, file.toString());
resultFiles.add(file);
}
return resultFiles;
}
private static TypedProperties getProps(
String basePath, int datePartitionDepth, int numDaysToList, String currentDate) {
TypedProperties properties = new TypedProperties();
properties.put(ROOT_INPUT_PATH_PROP, basePath);
properties.put(DATE_PARTITION_DEPTH, "" + datePartitionDepth);
properties.put(LOOKBACK_DAYS, "" + numDaysToList);
properties.put(CURRENT_DATE, currentDate);
return properties;
}
/*
* Return test params => (table basepath, date partition's depth,
* num of prev days to list, current date, is date partition formatted in hive style?,
* expected number of paths after pruning)
*/
private static Stream<Arguments> configParams() {
Object[][] data =
new Object[][] {
{"table1", 0, 2, "2020-07-25", true, 1},
{"table2", 0, 2, "2020-07-25", false, 1},
{"table3", 1, 3, "2020-07-25", true, 4},
{"table4", 1, 3, "2020-07-25", false, 4},
{"table5", 2, 1, "2020-07-25", true, 10},
{"table6", 2, 1, "2020-07-25", false, 10},
{"table7", 3, 2, "2020-07-25", true, 75},
{"table8", 3, 2, "2020-07-25", false, 75}
};
return Stream.of(data).map(Arguments::of);
}
@ParameterizedTest(name = "[{index}] {0}")
@MethodSource("configParams")
public void testPruneDatePartitionPaths(
String tableName,
int datePartitionDepth,
int numPrevDaysToList,
String currentDate,
boolean isHiveStylePartition,
int expectedNumFiles)
throws IOException {
TypedProperties props = getProps(basePath + "/" + tableName, datePartitionDepth, numPrevDaysToList, currentDate);
DatePartitionPathSelector pathSelector = new DatePartitionPathSelector(props, jsc.hadoopConfiguration());
Path root = new Path(props.getString(ROOT_INPUT_PATH_PROP));
int totalDepthBeforeDatePartitions = props.getInteger(DATE_PARTITION_DEPTH) - 1;
// Create parent dir
List<Path> leafDirs = new ArrayList<>();
createParentDirsBeforeDatePartitions(root, generateRandomStrings(), totalDepthBeforeDatePartitions, leafDirs);
createDatePartitionsWithFiles(leafDirs, isHiveStylePartition);
List<String> paths = pathSelector.pruneDatePartitionPaths(context, fs, root.toString());
assertEquals(expectedNumFiles, pathSelector.pruneDatePartitionPaths(context, fs, root.toString()).size());
}
}