From 0f39137ba854a2f808e2108c6aa10f4260b00844 Mon Sep 17 00:00:00 2001 From: Danny Chan Date: Thu, 26 Aug 2021 21:00:30 +0800 Subject: [PATCH] [HUDI-2321] Use the caller classloader for ReflectionUtils (#3535) Based on the discussion on stackoverflow: https://stackoverflow.com/questions/1771679/difference-between-threads-context-class-loader-and-normal-classloader The Thread.currentThread().getContextClassLoader() should never be used because the context classloader is not immutable, user can overwrite it when thread switches, it is also nullable. The objection here: https://stackoverflow.com/a/36228195 says the Thread.currentThread().getContextClassLoader() is a JDK design error and the context classloader is never suggested to be used. The API that needs classloader should ask the user to set up the right classloader. --- .../java/org/apache/hudi/common/util/ReflectionUtils.java | 3 +-- .../apache/hudi/sink/StreamWriteOperatorCoordinator.java | 6 ++++++ 2 files changed, 7 insertions(+), 2 deletions(-) diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/ReflectionUtils.java b/hudi-common/src/main/java/org/apache/hudi/common/util/ReflectionUtils.java index 55db2a704..15e4aaa96 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/util/ReflectionUtils.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/util/ReflectionUtils.java @@ -51,8 +51,7 @@ public class ReflectionUtils { synchronized (CLAZZ_CACHE) { if (!CLAZZ_CACHE.containsKey(clazzName)) { try { - Class clazz = Class.forName(clazzName, true, - Thread.currentThread().getContextClassLoader()); + Class clazz = Class.forName(clazzName); CLAZZ_CACHE.put(clazzName, clazz); } catch (ClassNotFoundException e) { throw new HoodieException("Unable to load class", e); diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java b/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java index f7047ef87..3049947f8 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java +++ b/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java @@ -152,6 +152,9 @@ public class StreamWriteOperatorCoordinator @Override public void start() throws Exception { + // setup classloader for APIs that use reflection without taking ClassLoader param + // reference: https://stackoverflow.com/questions/1771679/difference-between-threads-context-class-loader-and-normal-classloader + Thread.currentThread().setContextClassLoader(getClass().getClassLoader()); // initialize event buffer reset(); this.gateways = new SubtaskGateway[this.parallelism]; @@ -206,6 +209,9 @@ public class StreamWriteOperatorCoordinator public void notifyCheckpointComplete(long checkpointId) { executor.execute( () -> { + // The executor thread inherits the classloader of the #notifyCheckpointComplete + // caller, which is a AppClassLoader. + Thread.currentThread().setContextClassLoader(getClass().getClassLoader()); // for streaming mode, commits the ever received events anyway, // the stream write task snapshot and flush the data buffer synchronously in sequence, // so a successful checkpoint subsumes the old one(follows the checkpoint subsuming contract)