From 17802569fd64d865317353b3d855a68a6b3874bd Mon Sep 17 00:00:00 2001 From: Volodymyr Burenin Date: Wed, 3 Feb 2021 14:29:08 -0600 Subject: [PATCH] [HUDI-1538] Try to init class trying different signatures instead of checking its name (#2476) * [HUDI-1538] Try to init class trying different signatures instead of checking its name. * Removed unused imports Co-authored-by: volodymyr.burenin --- .../apache/hudi/utilities/UtilHelpers.java | 24 +++++++++---------- 1 file changed, 12 insertions(+), 12 deletions(-) diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/UtilHelpers.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/UtilHelpers.java index 19de67f7b..390476503 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/UtilHelpers.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/UtilHelpers.java @@ -43,8 +43,6 @@ import org.apache.hudi.utilities.schema.SchemaProvider; import org.apache.hudi.utilities.schema.SchemaProviderWithPostProcessor; import org.apache.hudi.utilities.schema.SparkAvroPostProcessor; import org.apache.hudi.utilities.schema.RowBasedSchemaProvider; -import org.apache.hudi.utilities.sources.AvroKafkaSource; -import org.apache.hudi.utilities.sources.JsonKafkaSource; import org.apache.hudi.utilities.sources.Source; import org.apache.hudi.utilities.transform.ChainedTransformer; import org.apache.hudi.utilities.transform.Transformer; @@ -96,19 +94,21 @@ public class UtilHelpers { private static final Logger LOG = LogManager.getLogger(UtilHelpers.class); public static Source createSource(String sourceClass, TypedProperties cfg, JavaSparkContext jssc, - SparkSession sparkSession, SchemaProvider schemaProvider, HoodieDeltaStreamerMetrics metrics) throws IOException { - + SparkSession sparkSession, SchemaProvider schemaProvider, + HoodieDeltaStreamerMetrics metrics) throws IOException { try { - if (JsonKafkaSource.class.getName().equals(sourceClass) - || AvroKafkaSource.class.getName().equals(sourceClass)) { + try { return (Source) ReflectionUtils.loadClass(sourceClass, - new Class[]{TypedProperties.class, JavaSparkContext.class, SparkSession.class, SchemaProvider.class, HoodieDeltaStreamerMetrics.class}, cfg, - jssc, sparkSession, schemaProvider, metrics); + new Class[]{TypedProperties.class, JavaSparkContext.class, + SparkSession.class, SchemaProvider.class, + HoodieDeltaStreamerMetrics.class}, + cfg, jssc, sparkSession, schemaProvider, metrics); + } catch (HoodieException e) { + return (Source) ReflectionUtils.loadClass(sourceClass, + new Class[]{TypedProperties.class, JavaSparkContext.class, + SparkSession.class, SchemaProvider.class}, + cfg, jssc, sparkSession, schemaProvider); } - - return (Source) ReflectionUtils.loadClass(sourceClass, - new Class[] {TypedProperties.class, JavaSparkContext.class, SparkSession.class, SchemaProvider.class}, cfg, - jssc, sparkSession, schemaProvider); } catch (Throwable e) { throw new IOException("Could not load source class " + sourceClass, e); }