From 8281cbf7624c3a4eb90bf58671daf76843d00819 Mon Sep 17 00:00:00 2001 From: Danny Chan Date: Mon, 22 Nov 2021 11:05:05 +0800 Subject: [PATCH] [HUDI-2799] Fix the classloader of flink write task (#4042) --- .../java/org/apache/hudi/sink/bulk/BulkInsertWriteFunction.java | 2 ++ .../apache/hudi/sink/common/AbstractStreamWriteFunction.java | 2 ++ .../main/java/org/apache/hudi/sink/compact/CompactFunction.java | 2 ++ 3 files changed, 6 insertions(+) diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/bulk/BulkInsertWriteFunction.java b/hudi-flink/src/main/java/org/apache/hudi/sink/bulk/BulkInsertWriteFunction.java index 408990724..a5980dbb1 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/sink/bulk/BulkInsertWriteFunction.java +++ b/hudi-flink/src/main/java/org/apache/hudi/sink/bulk/BulkInsertWriteFunction.java @@ -111,6 +111,8 @@ public class BulkInsertWriteFunction @Override public void open(Configuration parameters) throws IOException { + // always use the user classloader + Thread.currentThread().setContextClassLoader(getRuntimeContext().getUserCodeClassLoader()); this.taskID = getRuntimeContext().getIndexOfThisSubtask(); this.metaClient = StreamerUtil.createMetaClient(this.config); this.writeClient = StreamerUtil.createWriteClient(this.config, getRuntimeContext()); diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/common/AbstractStreamWriteFunction.java b/hudi-flink/src/main/java/org/apache/hudi/sink/common/AbstractStreamWriteFunction.java index 0e7300591..7fbbb1403 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/sink/common/AbstractStreamWriteFunction.java +++ b/hudi-flink/src/main/java/org/apache/hudi/sink/common/AbstractStreamWriteFunction.java @@ -125,6 +125,8 @@ public abstract class AbstractStreamWriteFunction @Override public void initializeState(FunctionInitializationContext context) throws Exception { + // always use the user classloader + Thread.currentThread().setContextClassLoader(getRuntimeContext().getUserCodeClassLoader()); this.taskID = getRuntimeContext().getIndexOfThisSubtask(); this.metaClient = StreamerUtil.createMetaClient(this.config); this.writeClient = StreamerUtil.createWriteClient(this.config, getRuntimeContext()); diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactFunction.java b/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactFunction.java index 560b5ffba..73392a571 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactFunction.java +++ b/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactFunction.java @@ -75,6 +75,8 @@ public class CompactFunction extends ProcessFunction