1
0

[HUDI-2321] Use the caller classloader for ReflectionUtils (#3535)

Based on the discussion on stackoverflow:
https://stackoverflow.com/questions/1771679/difference-between-threads-context-class-loader-and-normal-classloader

The Thread.currentThread().getContextClassLoader() should never be used
because the context classloader is not immutable, user can overwrite it
when thread switches, it is also nullable.

The objection here: https://stackoverflow.com/a/36228195 says the
Thread.currentThread().getContextClassLoader() is a JDK design error
and the context classloader is never suggested to be used. The API that
needs classloader should ask the user to set up the right classloader.
This commit is contained in:
Danny Chan
2021-08-26 21:00:30 +08:00
committed by GitHub
parent 73fdcf37df
commit 0f39137ba8
2 changed files with 7 additions and 2 deletions

View File

@@ -51,8 +51,7 @@ public class ReflectionUtils {
synchronized (CLAZZ_CACHE) {
if (!CLAZZ_CACHE.containsKey(clazzName)) {
try {
Class<?> clazz = Class.forName(clazzName, true,
Thread.currentThread().getContextClassLoader());
Class<?> clazz = Class.forName(clazzName);
CLAZZ_CACHE.put(clazzName, clazz);
} catch (ClassNotFoundException e) {
throw new HoodieException("Unable to load class", e);

View File

@@ -152,6 +152,9 @@ public class StreamWriteOperatorCoordinator
@Override
public void start() throws Exception {
// setup classloader for APIs that use reflection without taking ClassLoader param
// reference: https://stackoverflow.com/questions/1771679/difference-between-threads-context-class-loader-and-normal-classloader
Thread.currentThread().setContextClassLoader(getClass().getClassLoader());
// initialize event buffer
reset();
this.gateways = new SubtaskGateway[this.parallelism];
@@ -206,6 +209,9 @@ public class StreamWriteOperatorCoordinator
public void notifyCheckpointComplete(long checkpointId) {
executor.execute(
() -> {
// The executor thread inherits the classloader of the #notifyCheckpointComplete
// caller, which is a AppClassLoader.
Thread.currentThread().setContextClassLoader(getClass().getClassLoader());
// for streaming mode, commits the ever received events anyway,
// the stream write task snapshot and flush the data buffer synchronously in sequence,
// so a successful checkpoint subsumes the old one(follows the checkpoint subsuming contract)