diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/ReflectionUtils.java b/hudi-common/src/main/java/org/apache/hudi/common/util/ReflectionUtils.java index 55db2a704..15e4aaa96 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/util/ReflectionUtils.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/util/ReflectionUtils.java @@ -51,8 +51,7 @@ public class ReflectionUtils { synchronized (CLAZZ_CACHE) { if (!CLAZZ_CACHE.containsKey(clazzName)) { try { - Class clazz = Class.forName(clazzName, true, - Thread.currentThread().getContextClassLoader()); + Class clazz = Class.forName(clazzName); CLAZZ_CACHE.put(clazzName, clazz); } catch (ClassNotFoundException e) { throw new HoodieException("Unable to load class", e); diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java b/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java index f7047ef87..3049947f8 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java +++ b/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java @@ -152,6 +152,9 @@ public class StreamWriteOperatorCoordinator @Override public void start() throws Exception { + // setup classloader for APIs that use reflection without taking ClassLoader param + // reference: https://stackoverflow.com/questions/1771679/difference-between-threads-context-class-loader-and-normal-classloader + Thread.currentThread().setContextClassLoader(getClass().getClassLoader()); // initialize event buffer reset(); this.gateways = new SubtaskGateway[this.parallelism]; @@ -206,6 +209,9 @@ public class StreamWriteOperatorCoordinator public void notifyCheckpointComplete(long checkpointId) { executor.execute( () -> { + // The executor thread inherits the classloader of the #notifyCheckpointComplete + // caller, which is a AppClassLoader. + Thread.currentThread().setContextClassLoader(getClass().getClassLoader()); // for streaming mode, commits the ever received events anyway, // the stream write task snapshot and flush the data buffer synchronously in sequence, // so a successful checkpoint subsumes the old one(follows the checkpoint subsuming contract)