[HUDI-1137] Add option to configure different path selector
This commit is contained in:
@@ -32,12 +32,17 @@ import org.apache.hudi.common.util.Option;
|
|||||||
import org.apache.hudi.common.util.collection.ImmutablePair;
|
import org.apache.hudi.common.util.collection.ImmutablePair;
|
||||||
import org.apache.hudi.common.util.collection.Pair;
|
import org.apache.hudi.common.util.collection.Pair;
|
||||||
import org.apache.hudi.exception.HoodieIOException;
|
import org.apache.hudi.exception.HoodieIOException;
|
||||||
|
import org.apache.hudi.integ.testsuite.HoodieTestSuiteJob;
|
||||||
import org.apache.hudi.utilities.sources.helpers.DFSPathSelector;
|
import org.apache.hudi.utilities.sources.helpers.DFSPathSelector;
|
||||||
|
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* A custom dfs path selector used only for the hudi test suite. To be used only if workload is not run inline.
|
* A custom dfs path selector used only for the hudi test suite. To be used only if workload is not run inline.
|
||||||
*/
|
*/
|
||||||
public class DFSTestSuitePathSelector extends DFSPathSelector {
|
public class DFSTestSuitePathSelector extends DFSPathSelector {
|
||||||
|
private static volatile Logger log = LoggerFactory.getLogger(HoodieTestSuiteJob.class);
|
||||||
|
|
||||||
public DFSTestSuitePathSelector(TypedProperties props, Configuration hadoopConf) {
|
public DFSTestSuitePathSelector(TypedProperties props, Configuration hadoopConf) {
|
||||||
super(props, hadoopConf);
|
super(props, hadoopConf);
|
||||||
@@ -54,9 +59,12 @@ public class DFSTestSuitePathSelector extends DFSPathSelector {
|
|||||||
lastBatchId = Integer.parseInt(lastCheckpointStr.get());
|
lastBatchId = Integer.parseInt(lastCheckpointStr.get());
|
||||||
nextBatchId = lastBatchId + 1;
|
nextBatchId = lastBatchId + 1;
|
||||||
} else {
|
} else {
|
||||||
lastBatchId = -1;
|
lastBatchId = 0;
|
||||||
nextBatchId = 0;
|
nextBatchId = 1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
log.info("Using DFSTestSuitePathSelector, checkpoint: " + lastCheckpointStr + " sourceLimit: " + sourceLimit
|
||||||
|
+ " lastBatchId: " + lastBatchId + " nextBatchId: " + nextBatchId);
|
||||||
// obtain all eligible files for the batch
|
// obtain all eligible files for the batch
|
||||||
List<FileStatus> eligibleFiles = new ArrayList<>();
|
List<FileStatus> eligibleFiles = new ArrayList<>();
|
||||||
FileStatus[] fileStatuses = fs.globStatus(
|
FileStatus[] fileStatuses = fs.globStatus(
|
||||||
@@ -73,6 +81,8 @@ public class DFSTestSuitePathSelector extends DFSPathSelector {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
log.info("Reading " + eligibleFiles.size() + " files. ");
|
||||||
// no data to readAvro
|
// no data to readAvro
|
||||||
if (eligibleFiles.size() == 0) {
|
if (eligibleFiles.size() == 0) {
|
||||||
return new ImmutablePair<>(Option.empty(),
|
return new ImmutablePair<>(Option.empty(),
|
||||||
|
|||||||
@@ -352,12 +352,17 @@ public class UtilHelpers {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public static DFSPathSelector createSourceSelector(String sourceSelectorClass, TypedProperties props,
|
public static DFSPathSelector createSourceSelector(TypedProperties props,
|
||||||
Configuration conf) throws IOException {
|
Configuration conf) throws IOException {
|
||||||
|
String sourceSelectorClass =
|
||||||
|
props.getString(DFSPathSelector.Config.SOURCE_INPUT_SELECTOR, DFSPathSelector.class.getName());
|
||||||
try {
|
try {
|
||||||
return (DFSPathSelector) ReflectionUtils.loadClass(sourceSelectorClass,
|
DFSPathSelector selector = (DFSPathSelector) ReflectionUtils.loadClass(sourceSelectorClass,
|
||||||
new Class<?>[]{TypedProperties.class, Configuration.class},
|
new Class<?>[]{TypedProperties.class, Configuration.class},
|
||||||
props, conf);
|
props, conf);
|
||||||
|
|
||||||
|
LOG.info("Using path selector " + selector.getClass().getName());
|
||||||
|
return selector;
|
||||||
} catch (Throwable e) {
|
} catch (Throwable e) {
|
||||||
throw new IOException("Could not load source selector class " + sourceSelectorClass, e);
|
throw new IOException("Could not load source selector class " + sourceSelectorClass, e);
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -47,8 +47,7 @@ public class AvroDFSSource extends AvroSource {
|
|||||||
SchemaProvider schemaProvider) throws IOException {
|
SchemaProvider schemaProvider) throws IOException {
|
||||||
super(props, sparkContext, sparkSession, schemaProvider);
|
super(props, sparkContext, sparkSession, schemaProvider);
|
||||||
this.pathSelector = UtilHelpers
|
this.pathSelector = UtilHelpers
|
||||||
.createSourceSelector(DFSPathSelector.class.getName(), props, sparkContext
|
.createSourceSelector(props, sparkContext.hadoopConfiguration());
|
||||||
.hadoopConfiguration());
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|||||||
@@ -52,6 +52,7 @@ public class DFSPathSelector {
|
|||||||
public static class Config {
|
public static class Config {
|
||||||
|
|
||||||
public static final String ROOT_INPUT_PATH_PROP = "hoodie.deltastreamer.source.dfs.root";
|
public static final String ROOT_INPUT_PATH_PROP = "hoodie.deltastreamer.source.dfs.root";
|
||||||
|
public static final String SOURCE_INPUT_SELECTOR = "hoodie.deltastreamer.source.input.selector";
|
||||||
}
|
}
|
||||||
|
|
||||||
protected static final List<String> IGNORE_FILEPREFIX_LIST = Arrays.asList(".", "_");
|
protected static final List<String> IGNORE_FILEPREFIX_LIST = Arrays.asList(".", "_");
|
||||||
|
|||||||
Reference in New Issue
Block a user