[MINOR] Make sure factory method is used to instanciate DFSPathSelector (#2187)
* Move createSourceSelector into DFSPathSelector factory method * Replace constructor call with factory method * Added some javadoc
This commit is contained in:
@@ -18,7 +18,6 @@
|
|||||||
|
|
||||||
package org.apache.hudi.utilities;
|
package org.apache.hudi.utilities;
|
||||||
|
|
||||||
import org.apache.hadoop.conf.Configuration;
|
|
||||||
import org.apache.hudi.AvroConversionUtils;
|
import org.apache.hudi.AvroConversionUtils;
|
||||||
import org.apache.hudi.client.SparkRDDWriteClient;
|
import org.apache.hudi.client.SparkRDDWriteClient;
|
||||||
import org.apache.hudi.client.WriteStatus;
|
import org.apache.hudi.client.WriteStatus;
|
||||||
@@ -46,7 +45,6 @@ import org.apache.hudi.utilities.schema.SchemaProviderWithPostProcessor;
|
|||||||
import org.apache.hudi.utilities.sources.AvroKafkaSource;
|
import org.apache.hudi.utilities.sources.AvroKafkaSource;
|
||||||
import org.apache.hudi.utilities.sources.JsonKafkaSource;
|
import org.apache.hudi.utilities.sources.JsonKafkaSource;
|
||||||
import org.apache.hudi.utilities.sources.Source;
|
import org.apache.hudi.utilities.sources.Source;
|
||||||
import org.apache.hudi.utilities.sources.helpers.DFSPathSelector;
|
|
||||||
import org.apache.hudi.utilities.transform.ChainedTransformer;
|
import org.apache.hudi.utilities.transform.ChainedTransformer;
|
||||||
import org.apache.hudi.utilities.transform.Transformer;
|
import org.apache.hudi.utilities.transform.Transformer;
|
||||||
|
|
||||||
@@ -377,22 +375,6 @@ public class UtilHelpers {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public static DFSPathSelector createSourceSelector(TypedProperties props,
|
|
||||||
Configuration conf) throws IOException {
|
|
||||||
String sourceSelectorClass =
|
|
||||||
props.getString(DFSPathSelector.Config.SOURCE_INPUT_SELECTOR, DFSPathSelector.class.getName());
|
|
||||||
try {
|
|
||||||
DFSPathSelector selector = (DFSPathSelector) ReflectionUtils.loadClass(sourceSelectorClass,
|
|
||||||
new Class<?>[]{TypedProperties.class, Configuration.class},
|
|
||||||
props, conf);
|
|
||||||
|
|
||||||
LOG.info("Using path selector " + selector.getClass().getName());
|
|
||||||
return selector;
|
|
||||||
} catch (Throwable e) {
|
|
||||||
throw new IOException("Could not load source selector class " + sourceSelectorClass, e);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
public static SchemaProvider getOriginalSchemaProvider(SchemaProvider schemaProvider) {
|
public static SchemaProvider getOriginalSchemaProvider(SchemaProvider schemaProvider) {
|
||||||
SchemaProvider originalProvider = schemaProvider;
|
SchemaProvider originalProvider = schemaProvider;
|
||||||
if (schemaProvider instanceof SchemaProviderWithPostProcessor) {
|
if (schemaProvider instanceof SchemaProviderWithPostProcessor) {
|
||||||
|
|||||||
@@ -21,7 +21,6 @@ package org.apache.hudi.utilities.sources;
|
|||||||
import org.apache.hudi.common.config.TypedProperties;
|
import org.apache.hudi.common.config.TypedProperties;
|
||||||
import org.apache.hudi.common.util.Option;
|
import org.apache.hudi.common.util.Option;
|
||||||
import org.apache.hudi.common.util.collection.Pair;
|
import org.apache.hudi.common.util.collection.Pair;
|
||||||
import org.apache.hudi.utilities.UtilHelpers;
|
|
||||||
import org.apache.hudi.utilities.schema.SchemaProvider;
|
import org.apache.hudi.utilities.schema.SchemaProvider;
|
||||||
import org.apache.hudi.utilities.sources.helpers.DFSPathSelector;
|
import org.apache.hudi.utilities.sources.helpers.DFSPathSelector;
|
||||||
|
|
||||||
@@ -46,7 +45,7 @@ public class AvroDFSSource extends AvroSource {
|
|||||||
public AvroDFSSource(TypedProperties props, JavaSparkContext sparkContext, SparkSession sparkSession,
|
public AvroDFSSource(TypedProperties props, JavaSparkContext sparkContext, SparkSession sparkSession,
|
||||||
SchemaProvider schemaProvider) throws IOException {
|
SchemaProvider schemaProvider) throws IOException {
|
||||||
super(props, sparkContext, sparkSession, schemaProvider);
|
super(props, sparkContext, sparkSession, schemaProvider);
|
||||||
this.pathSelector = UtilHelpers
|
this.pathSelector = DFSPathSelector
|
||||||
.createSourceSelector(props, sparkContext.hadoopConfiguration());
|
.createSourceSelector(props, sparkContext.hadoopConfiguration());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -79,7 +79,7 @@ public class CsvDFSSource extends RowSource {
|
|||||||
SparkSession sparkSession,
|
SparkSession sparkSession,
|
||||||
SchemaProvider schemaProvider) {
|
SchemaProvider schemaProvider) {
|
||||||
super(props, sparkContext, sparkSession, schemaProvider);
|
super(props, sparkContext, sparkSession, schemaProvider);
|
||||||
this.pathSelector = new DFSPathSelector(props, sparkContext.hadoopConfiguration());
|
this.pathSelector = DFSPathSelector.createSourceSelector(props, sparkContext.hadoopConfiguration());
|
||||||
if (schemaProvider != null) {
|
if (schemaProvider != null) {
|
||||||
sourceSchema = (StructType) SchemaConverters.toSqlType(schemaProvider.getSourceSchema())
|
sourceSchema = (StructType) SchemaConverters.toSqlType(schemaProvider.getSourceSchema())
|
||||||
.dataType();
|
.dataType();
|
||||||
|
|||||||
@@ -38,7 +38,7 @@ public class JsonDFSSource extends JsonSource {
|
|||||||
public JsonDFSSource(TypedProperties props, JavaSparkContext sparkContext, SparkSession sparkSession,
|
public JsonDFSSource(TypedProperties props, JavaSparkContext sparkContext, SparkSession sparkSession,
|
||||||
SchemaProvider schemaProvider) {
|
SchemaProvider schemaProvider) {
|
||||||
super(props, sparkContext, sparkSession, schemaProvider);
|
super(props, sparkContext, sparkSession, schemaProvider);
|
||||||
this.pathSelector = new DFSPathSelector(props, sparkContext.hadoopConfiguration());
|
this.pathSelector = DFSPathSelector.createSourceSelector(props, sparkContext.hadoopConfiguration());
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|||||||
@@ -39,7 +39,7 @@ public class ParquetDFSSource extends RowSource {
|
|||||||
public ParquetDFSSource(TypedProperties props, JavaSparkContext sparkContext, SparkSession sparkSession,
|
public ParquetDFSSource(TypedProperties props, JavaSparkContext sparkContext, SparkSession sparkSession,
|
||||||
SchemaProvider schemaProvider) {
|
SchemaProvider schemaProvider) {
|
||||||
super(props, sparkContext, sparkSession, schemaProvider);
|
super(props, sparkContext, sparkSession, schemaProvider);
|
||||||
this.pathSelector = new DFSPathSelector(props, this.sparkContext.hadoopConfiguration());
|
this.pathSelector = DFSPathSelector.createSourceSelector(props, this.sparkContext.hadoopConfiguration());
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|||||||
@@ -22,8 +22,10 @@ import org.apache.hudi.DataSourceUtils;
|
|||||||
import org.apache.hudi.common.config.TypedProperties;
|
import org.apache.hudi.common.config.TypedProperties;
|
||||||
import org.apache.hudi.common.fs.FSUtils;
|
import org.apache.hudi.common.fs.FSUtils;
|
||||||
import org.apache.hudi.common.util.Option;
|
import org.apache.hudi.common.util.Option;
|
||||||
|
import org.apache.hudi.common.util.ReflectionUtils;
|
||||||
import org.apache.hudi.common.util.collection.ImmutablePair;
|
import org.apache.hudi.common.util.collection.ImmutablePair;
|
||||||
import org.apache.hudi.common.util.collection.Pair;
|
import org.apache.hudi.common.util.collection.Pair;
|
||||||
|
import org.apache.hudi.exception.HoodieException;
|
||||||
import org.apache.hudi.exception.HoodieIOException;
|
import org.apache.hudi.exception.HoodieIOException;
|
||||||
|
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
@@ -66,6 +68,33 @@ public class DFSPathSelector {
|
|||||||
this.fs = FSUtils.getFs(props.getString(Config.ROOT_INPUT_PATH_PROP), hadoopConf);
|
this.fs = FSUtils.getFs(props.getString(Config.ROOT_INPUT_PATH_PROP), hadoopConf);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Factory method for creating custom DFSPathSelector. Default selector
|
||||||
|
* to use is {@link DFSPathSelector}
|
||||||
|
*/
|
||||||
|
public static DFSPathSelector createSourceSelector(TypedProperties props,
|
||||||
|
Configuration conf) {
|
||||||
|
String sourceSelectorClass = props.getString(DFSPathSelector.Config.SOURCE_INPUT_SELECTOR,
|
||||||
|
DFSPathSelector.class.getName());
|
||||||
|
try {
|
||||||
|
DFSPathSelector selector = (DFSPathSelector) ReflectionUtils.loadClass(sourceSelectorClass,
|
||||||
|
new Class<?>[]{TypedProperties.class, Configuration.class},
|
||||||
|
props, conf);
|
||||||
|
|
||||||
|
log.info("Using path selector " + selector.getClass().getName());
|
||||||
|
return selector;
|
||||||
|
} catch (Exception e) {
|
||||||
|
throw new HoodieException("Could not load source selector class " + sourceSelectorClass, e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get the list of files changed since last checkpoint.
|
||||||
|
*
|
||||||
|
* @param lastCheckpointStr the last checkpoint time string, empty if first run
|
||||||
|
* @param sourceLimit max bytes to read each time
|
||||||
|
* @return the list of files concatenated and their latest modified time
|
||||||
|
*/
|
||||||
public Pair<Option<String>, String> getNextFilePathsAndMaxModificationTime(Option<String> lastCheckpointStr,
|
public Pair<Option<String>, String> getNextFilePathsAndMaxModificationTime(Option<String> lastCheckpointStr,
|
||||||
long sourceLimit) {
|
long sourceLimit) {
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user