diff --git a/hudi-flink/src/main/java/org/apache/hudi/operator/StreamWriteOperatorCoordinator.java b/hudi-flink/src/main/java/org/apache/hudi/operator/StreamWriteOperatorCoordinator.java index bd933c212..787c4900b 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/operator/StreamWriteOperatorCoordinator.java +++ b/hudi-flink/src/main/java/org/apache/hudi/operator/StreamWriteOperatorCoordinator.java @@ -398,20 +398,20 @@ public class StreamWriteOperatorCoordinator public static class Provider implements OperatorCoordinator.Provider { private final OperatorID operatorId; private final Configuration conf; - private final int numTasks; - public Provider(OperatorID operatorId, Configuration conf, int numTasks) { + public Provider(OperatorID operatorId, Configuration conf) { this.operatorId = operatorId; this.conf = conf; - this.numTasks = numTasks; } + @Override public OperatorID getOperatorId() { return this.operatorId; } + @Override public OperatorCoordinator create(Context context) { - return new StreamWriteOperatorCoordinator(this.conf, this.numTasks); + return new StreamWriteOperatorCoordinator(this.conf, context.currentParallelism()); } } } diff --git a/hudi-flink/src/main/java/org/apache/hudi/operator/StreamWriteOperatorFactory.java b/hudi-flink/src/main/java/org/apache/hudi/operator/StreamWriteOperatorFactory.java index 56267451f..a558ffdcc 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/operator/StreamWriteOperatorFactory.java +++ b/hudi-flink/src/main/java/org/apache/hudi/operator/StreamWriteOperatorFactory.java @@ -39,15 +39,12 @@ public class StreamWriteOperatorFactory private final StreamWriteOperator operator; private final Configuration conf; - private final int numTasks; public StreamWriteOperatorFactory( - Configuration conf, - int numTasks) { + Configuration conf) { super(new StreamWriteOperator<>(conf)); this.operator = (StreamWriteOperator) getOperator(); this.conf = conf; - this.numTasks = numTasks; } @Override @@ -65,7 +62,7 @@ public class StreamWriteOperatorFactory @Override public OperatorCoordinator.Provider getCoordinatorProvider(String s, OperatorID operatorID) { - return new StreamWriteOperatorCoordinator.Provider(operatorID, this.conf, this.numTasks); + return new StreamWriteOperatorCoordinator.Provider(operatorID, this.conf); } @Override diff --git a/hudi-flink/src/main/java/org/apache/hudi/streamer/HoodieFlinkStreamerV2.java b/hudi-flink/src/main/java/org/apache/hudi/streamer/HoodieFlinkStreamerV2.java index 24b899496..27fd4f5d0 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/streamer/HoodieFlinkStreamerV2.java +++ b/hudi-flink/src/main/java/org/apache/hudi/streamer/HoodieFlinkStreamerV2.java @@ -74,7 +74,7 @@ public class HoodieFlinkStreamerV2 { Configuration conf = FlinkOptions.fromStreamerConfig(cfg); int numWriteTask = conf.getInteger(FlinkOptions.WRITE_TASK_PARALLELISM); StreamWriteOperatorFactory operatorFactory = - new StreamWriteOperatorFactory<>(conf, numWriteTask); + new StreamWriteOperatorFactory<>(conf); DataStream dataStream = env.addSource(new FlinkKafkaConsumer<>( cfg.kafkaTopic, diff --git a/hudi-flink/src/test/java/org/apache/hudi/operator/StreamWriteITCase.java b/hudi-flink/src/test/java/org/apache/hudi/operator/StreamWriteITCase.java index be8ec36cb..42973862d 100644 --- a/hudi-flink/src/test/java/org/apache/hudi/operator/StreamWriteITCase.java +++ b/hudi-flink/src/test/java/org/apache/hudi/operator/StreamWriteITCase.java @@ -93,7 +93,7 @@ public class StreamWriteITCase extends TestLogger { (RowType) AvroSchemaConverter.convertToDataType(StreamerUtil.getSourceSchema(conf)) .getLogicalType(); StreamWriteOperatorFactory operatorFactory = - new StreamWriteOperatorFactory<>(conf, 4); + new StreamWriteOperatorFactory<>(conf); JsonRowDataDeserializationSchema deserializationSchema = new JsonRowDataDeserializationSchema( rowType, diff --git a/hudi-flink/src/test/resources/log4j-surefire.properties b/hudi-flink/src/test/resources/log4j-surefire.properties index 32af46209..8dcd17f30 100644 --- a/hudi-flink/src/test/resources/log4j-surefire.properties +++ b/hudi-flink/src/test/resources/log4j-surefire.properties @@ -15,7 +15,7 @@ # See the License for the specific language governing permissions and # limitations under the License. ### -log4j.rootLogger=WARN, CONSOLE +log4j.rootLogger=INFO, CONSOLE log4j.logger.org.apache=INFO log4j.logger.org.apache.hudi=DEBUG log4j.logger.org.apache.hadoop.hbase=ERROR @@ -27,5 +27,5 @@ log4j.appender.CONSOLE.layout=org.apache.log4j.PatternLayout log4j.appender.CONSOLE.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n log4j.appender.CONSOLE.filter.a=org.apache.log4j.varia.LevelRangeFilter log4j.appender.CONSOLE.filter.a.AcceptOnMatch=true -log4j.appender.CONSOLE.filter.a.LevelMin=WARN +log4j.appender.CONSOLE.filter.a.LevelMin=INFO log4j.appender.CONSOLE.filter.a.LevelMax=FATAL