1
0

[HUDI-1538] Try to init class trying different signatures instead of checking its name (#2476)

* [HUDI-1538] Try to init class trying different signatures instead of checking its name.

* Removed unused imports

Co-authored-by: volodymyr.burenin <volodymyr.burenin@cloudkitchens.com>
This commit is contained in:
Volodymyr Burenin
2021-02-03 14:29:08 -06:00
committed by GitHub
parent eb91e5ba70
commit 17802569fd

View File

@@ -43,8 +43,6 @@ import org.apache.hudi.utilities.schema.SchemaProvider;
import org.apache.hudi.utilities.schema.SchemaProviderWithPostProcessor;
import org.apache.hudi.utilities.schema.SparkAvroPostProcessor;
import org.apache.hudi.utilities.schema.RowBasedSchemaProvider;
import org.apache.hudi.utilities.sources.AvroKafkaSource;
import org.apache.hudi.utilities.sources.JsonKafkaSource;
import org.apache.hudi.utilities.sources.Source;
import org.apache.hudi.utilities.transform.ChainedTransformer;
import org.apache.hudi.utilities.transform.Transformer;
@@ -96,19 +94,21 @@ public class UtilHelpers {
private static final Logger LOG = LogManager.getLogger(UtilHelpers.class);
public static Source createSource(String sourceClass, TypedProperties cfg, JavaSparkContext jssc,
SparkSession sparkSession, SchemaProvider schemaProvider, HoodieDeltaStreamerMetrics metrics) throws IOException {
SparkSession sparkSession, SchemaProvider schemaProvider,
HoodieDeltaStreamerMetrics metrics) throws IOException {
try {
if (JsonKafkaSource.class.getName().equals(sourceClass)
|| AvroKafkaSource.class.getName().equals(sourceClass)) {
try {
return (Source) ReflectionUtils.loadClass(sourceClass,
new Class<?>[]{TypedProperties.class, JavaSparkContext.class, SparkSession.class, SchemaProvider.class, HoodieDeltaStreamerMetrics.class}, cfg,
jssc, sparkSession, schemaProvider, metrics);
new Class<?>[]{TypedProperties.class, JavaSparkContext.class,
SparkSession.class, SchemaProvider.class,
HoodieDeltaStreamerMetrics.class},
cfg, jssc, sparkSession, schemaProvider, metrics);
} catch (HoodieException e) {
return (Source) ReflectionUtils.loadClass(sourceClass,
new Class<?>[]{TypedProperties.class, JavaSparkContext.class,
SparkSession.class, SchemaProvider.class},
cfg, jssc, sparkSession, schemaProvider);
}
return (Source) ReflectionUtils.loadClass(sourceClass,
new Class<?>[] {TypedProperties.class, JavaSparkContext.class, SparkSession.class, SchemaProvider.class}, cfg,
jssc, sparkSession, schemaProvider);
} catch (Throwable e) {
throw new IOException("Could not load source class " + sourceClass, e);
}