[HUDI-2799] Fix the classloader of flink write task (#4042)
This commit is contained in:
@@ -111,6 +111,8 @@ public class BulkInsertWriteFunction<I>
|
|||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void open(Configuration parameters) throws IOException {
|
public void open(Configuration parameters) throws IOException {
|
||||||
|
// always use the user classloader
|
||||||
|
Thread.currentThread().setContextClassLoader(getRuntimeContext().getUserCodeClassLoader());
|
||||||
this.taskID = getRuntimeContext().getIndexOfThisSubtask();
|
this.taskID = getRuntimeContext().getIndexOfThisSubtask();
|
||||||
this.metaClient = StreamerUtil.createMetaClient(this.config);
|
this.metaClient = StreamerUtil.createMetaClient(this.config);
|
||||||
this.writeClient = StreamerUtil.createWriteClient(this.config, getRuntimeContext());
|
this.writeClient = StreamerUtil.createWriteClient(this.config, getRuntimeContext());
|
||||||
|
|||||||
@@ -125,6 +125,8 @@ public abstract class AbstractStreamWriteFunction<I>
|
|||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void initializeState(FunctionInitializationContext context) throws Exception {
|
public void initializeState(FunctionInitializationContext context) throws Exception {
|
||||||
|
// always use the user classloader
|
||||||
|
Thread.currentThread().setContextClassLoader(getRuntimeContext().getUserCodeClassLoader());
|
||||||
this.taskID = getRuntimeContext().getIndexOfThisSubtask();
|
this.taskID = getRuntimeContext().getIndexOfThisSubtask();
|
||||||
this.metaClient = StreamerUtil.createMetaClient(this.config);
|
this.metaClient = StreamerUtil.createMetaClient(this.config);
|
||||||
this.writeClient = StreamerUtil.createWriteClient(this.config, getRuntimeContext());
|
this.writeClient = StreamerUtil.createWriteClient(this.config, getRuntimeContext());
|
||||||
|
|||||||
@@ -75,6 +75,8 @@ public class CompactFunction extends ProcessFunction<CompactionPlanEvent, Compac
|
|||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void open(Configuration parameters) throws Exception {
|
public void open(Configuration parameters) throws Exception {
|
||||||
|
// always use the user classloader
|
||||||
|
Thread.currentThread().setContextClassLoader(getRuntimeContext().getUserCodeClassLoader());
|
||||||
this.taskID = getRuntimeContext().getIndexOfThisSubtask();
|
this.taskID = getRuntimeContext().getIndexOfThisSubtask();
|
||||||
this.writeClient = StreamerUtil.createWriteClient(conf, getRuntimeContext());
|
this.writeClient = StreamerUtil.createWriteClient(conf, getRuntimeContext());
|
||||||
if (this.asyncCompaction) {
|
if (this.asyncCompaction) {
|
||||||
|
|||||||
Reference in New Issue
Block a user