[HUDI-1330] handle prefix filtering at directory level (#2157)
The current DFSPathSelector only ignore prefix(_, .) at the file level while files under subdirectories e.g. (.checkpoint/*) are still considered which result in bad-format exception during reading.
This commit is contained in:
@@ -31,16 +31,15 @@ import org.apache.hudi.exception.HoodieIOException;
|
|||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.fs.FileStatus;
|
import org.apache.hadoop.fs.FileStatus;
|
||||||
import org.apache.hadoop.fs.FileSystem;
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
import org.apache.hadoop.fs.LocatedFileStatus;
|
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
import org.apache.hadoop.fs.RemoteIterator;
|
|
||||||
import org.apache.log4j.LogManager;
|
import org.apache.log4j.LogManager;
|
||||||
import org.apache.log4j.Logger;
|
import org.apache.log4j.Logger;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.util.ArrayList;
|
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
|
import java.util.Collections;
|
||||||
import java.util.Comparator;
|
import java.util.Comparator;
|
||||||
|
import java.util.ArrayList;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.stream.Collectors;
|
import java.util.stream.Collectors;
|
||||||
|
|
||||||
@@ -63,7 +62,7 @@ public class DFSPathSelector {
|
|||||||
protected final TypedProperties props;
|
protected final TypedProperties props;
|
||||||
|
|
||||||
public DFSPathSelector(TypedProperties props, Configuration hadoopConf) {
|
public DFSPathSelector(TypedProperties props, Configuration hadoopConf) {
|
||||||
DataSourceUtils.checkRequiredProperties(props, Arrays.asList(Config.ROOT_INPUT_PATH_PROP));
|
DataSourceUtils.checkRequiredProperties(props, Collections.singletonList(Config.ROOT_INPUT_PATH_PROP));
|
||||||
this.props = props;
|
this.props = props;
|
||||||
this.fs = FSUtils.getFs(props.getString(Config.ROOT_INPUT_PATH_PROP), hadoopConf);
|
this.fs = FSUtils.getFs(props.getString(Config.ROOT_INPUT_PATH_PROP), hadoopConf);
|
||||||
}
|
}
|
||||||
@@ -101,18 +100,8 @@ public class DFSPathSelector {
|
|||||||
try {
|
try {
|
||||||
// obtain all eligible files under root folder.
|
// obtain all eligible files under root folder.
|
||||||
log.info("Root path => " + props.getString(Config.ROOT_INPUT_PATH_PROP) + " source limit => " + sourceLimit);
|
log.info("Root path => " + props.getString(Config.ROOT_INPUT_PATH_PROP) + " source limit => " + sourceLimit);
|
||||||
List<FileStatus> eligibleFiles = new ArrayList<>();
|
long lastCheckpointTime = lastCheckpointStr.map(Long::parseLong).orElse(Long.MIN_VALUE);
|
||||||
RemoteIterator<LocatedFileStatus> fitr =
|
List<FileStatus> eligibleFiles = listEligibleFiles(fs, new Path(props.getString(Config.ROOT_INPUT_PATH_PROP)), lastCheckpointTime);
|
||||||
fs.listFiles(new Path(props.getString(Config.ROOT_INPUT_PATH_PROP)), true);
|
|
||||||
while (fitr.hasNext()) {
|
|
||||||
LocatedFileStatus fileStatus = fitr.next();
|
|
||||||
if (fileStatus.isDirectory()
|
|
||||||
|| fileStatus.getLen() == 0
|
|
||||||
|| IGNORE_FILEPREFIX_LIST.stream().anyMatch(pfx -> fileStatus.getPath().getName().startsWith(pfx))) {
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
eligibleFiles.add(fileStatus);
|
|
||||||
}
|
|
||||||
// sort them by modification time.
|
// sort them by modification time.
|
||||||
eligibleFiles.sort(Comparator.comparingLong(FileStatus::getModificationTime));
|
eligibleFiles.sort(Comparator.comparingLong(FileStatus::getModificationTime));
|
||||||
// Filter based on checkpoint & input size, if needed
|
// Filter based on checkpoint & input size, if needed
|
||||||
@@ -120,11 +109,6 @@ public class DFSPathSelector {
|
|||||||
long maxModificationTime = Long.MIN_VALUE;
|
long maxModificationTime = Long.MIN_VALUE;
|
||||||
List<FileStatus> filteredFiles = new ArrayList<>();
|
List<FileStatus> filteredFiles = new ArrayList<>();
|
||||||
for (FileStatus f : eligibleFiles) {
|
for (FileStatus f : eligibleFiles) {
|
||||||
if (lastCheckpointStr.isPresent() && f.getModificationTime() <= Long.valueOf(lastCheckpointStr.get()).longValue()) {
|
|
||||||
// skip processed files
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (currentBytes + f.getLen() >= sourceLimit) {
|
if (currentBytes + f.getLen() >= sourceLimit) {
|
||||||
// we have enough data, we are done
|
// we have enough data, we are done
|
||||||
break;
|
break;
|
||||||
@@ -136,7 +120,7 @@ public class DFSPathSelector {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// no data to read
|
// no data to read
|
||||||
if (filteredFiles.size() == 0) {
|
if (filteredFiles.isEmpty()) {
|
||||||
return new ImmutablePair<>(Option.empty(), lastCheckpointStr.orElseGet(() -> String.valueOf(Long.MIN_VALUE)));
|
return new ImmutablePair<>(Option.empty(), lastCheckpointStr.orElseGet(() -> String.valueOf(Long.MIN_VALUE)));
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -148,4 +132,25 @@ public class DFSPathSelector {
|
|||||||
throw new HoodieIOException("Unable to read from source from checkpoint: " + lastCheckpointStr, ioe);
|
throw new HoodieIOException("Unable to read from source from checkpoint: " + lastCheckpointStr, ioe);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* List files recursively, filter out illegible files/directories while doing so.
|
||||||
|
*/
|
||||||
|
private List<FileStatus> listEligibleFiles(FileSystem fs, Path path, long lastCheckpointTime) throws IOException {
|
||||||
|
// skip files/dirs whose names start with (_, ., etc)
|
||||||
|
FileStatus[] statuses = fs.listStatus(path, file ->
|
||||||
|
IGNORE_FILEPREFIX_LIST.stream().noneMatch(pfx -> file.getName().startsWith(pfx)));
|
||||||
|
List<FileStatus> res = new ArrayList<>();
|
||||||
|
for (FileStatus status: statuses) {
|
||||||
|
if (status.isDirectory()) {
|
||||||
|
// avoid infinite loop
|
||||||
|
if (!status.isSymlink()) {
|
||||||
|
res.addAll(listEligibleFiles(fs, status.getPath(), lastCheckpointTime));
|
||||||
|
}
|
||||||
|
} else if (status.getModificationTime() > lastCheckpointTime && status.getLen() > 0) {
|
||||||
|
res.add(status);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return res;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -175,5 +175,18 @@ public abstract class AbstractDFSSourceTestBase extends UtilitiesTestBase {
|
|||||||
InputBatch<JavaRDD<GenericRecord>> fetch5 = sourceFormatAdapter.fetchNewDataInAvroFormat(
|
InputBatch<JavaRDD<GenericRecord>> fetch5 = sourceFormatAdapter.fetchNewDataInAvroFormat(
|
||||||
Option.empty(), Long.MAX_VALUE);
|
Option.empty(), Long.MAX_VALUE);
|
||||||
assertEquals(10100, fetch5.getBatch().get().count());
|
assertEquals(10100, fetch5.getBatch().get().count());
|
||||||
|
|
||||||
|
// 6. Should skip files/directories whose names start with prefixes ("_", ".")
|
||||||
|
generateOneFile(".checkpoint/3", "002", 100);
|
||||||
|
generateOneFile("_checkpoint/3", "002", 100);
|
||||||
|
generateOneFile(".3", "002", 100);
|
||||||
|
generateOneFile("_3", "002", 100);
|
||||||
|
// also work with nested directory
|
||||||
|
generateOneFile("foo/.bar/3", "002", 1); // not ok
|
||||||
|
generateOneFile("foo/bar/3", "002", 1); // ok
|
||||||
|
// fetch everything from the beginning
|
||||||
|
InputBatch<JavaRDD<GenericRecord>> fetch6 = sourceFormatAdapter.fetchNewDataInAvroFormat(
|
||||||
|
Option.empty(), Long.MAX_VALUE);
|
||||||
|
assertEquals(10101, fetch6.getBatch().get().count());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user