[HUDI-1621] Gets the parallelism from context when init StreamWriteOperatorCoordinator (#2579)
This commit is contained in:
@@ -398,20 +398,20 @@ public class StreamWriteOperatorCoordinator
|
|||||||
public static class Provider implements OperatorCoordinator.Provider {
|
public static class Provider implements OperatorCoordinator.Provider {
|
||||||
private final OperatorID operatorId;
|
private final OperatorID operatorId;
|
||||||
private final Configuration conf;
|
private final Configuration conf;
|
||||||
private final int numTasks;
|
|
||||||
|
|
||||||
public Provider(OperatorID operatorId, Configuration conf, int numTasks) {
|
public Provider(OperatorID operatorId, Configuration conf) {
|
||||||
this.operatorId = operatorId;
|
this.operatorId = operatorId;
|
||||||
this.conf = conf;
|
this.conf = conf;
|
||||||
this.numTasks = numTasks;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
public OperatorID getOperatorId() {
|
public OperatorID getOperatorId() {
|
||||||
return this.operatorId;
|
return this.operatorId;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
public OperatorCoordinator create(Context context) {
|
public OperatorCoordinator create(Context context) {
|
||||||
return new StreamWriteOperatorCoordinator(this.conf, this.numTasks);
|
return new StreamWriteOperatorCoordinator(this.conf, context.currentParallelism());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -39,15 +39,12 @@ public class StreamWriteOperatorFactory<I>
|
|||||||
|
|
||||||
private final StreamWriteOperator<I> operator;
|
private final StreamWriteOperator<I> operator;
|
||||||
private final Configuration conf;
|
private final Configuration conf;
|
||||||
private final int numTasks;
|
|
||||||
|
|
||||||
public StreamWriteOperatorFactory(
|
public StreamWriteOperatorFactory(
|
||||||
Configuration conf,
|
Configuration conf) {
|
||||||
int numTasks) {
|
|
||||||
super(new StreamWriteOperator<>(conf));
|
super(new StreamWriteOperator<>(conf));
|
||||||
this.operator = (StreamWriteOperator<I>) getOperator();
|
this.operator = (StreamWriteOperator<I>) getOperator();
|
||||||
this.conf = conf;
|
this.conf = conf;
|
||||||
this.numTasks = numTasks;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
@@ -65,7 +62,7 @@ public class StreamWriteOperatorFactory<I>
|
|||||||
|
|
||||||
@Override
|
@Override
|
||||||
public OperatorCoordinator.Provider getCoordinatorProvider(String s, OperatorID operatorID) {
|
public OperatorCoordinator.Provider getCoordinatorProvider(String s, OperatorID operatorID) {
|
||||||
return new StreamWriteOperatorCoordinator.Provider(operatorID, this.conf, this.numTasks);
|
return new StreamWriteOperatorCoordinator.Provider(operatorID, this.conf);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|||||||
@@ -74,7 +74,7 @@ public class HoodieFlinkStreamerV2 {
|
|||||||
Configuration conf = FlinkOptions.fromStreamerConfig(cfg);
|
Configuration conf = FlinkOptions.fromStreamerConfig(cfg);
|
||||||
int numWriteTask = conf.getInteger(FlinkOptions.WRITE_TASK_PARALLELISM);
|
int numWriteTask = conf.getInteger(FlinkOptions.WRITE_TASK_PARALLELISM);
|
||||||
StreamWriteOperatorFactory<HoodieRecord> operatorFactory =
|
StreamWriteOperatorFactory<HoodieRecord> operatorFactory =
|
||||||
new StreamWriteOperatorFactory<>(conf, numWriteTask);
|
new StreamWriteOperatorFactory<>(conf);
|
||||||
|
|
||||||
DataStream<Object> dataStream = env.addSource(new FlinkKafkaConsumer<>(
|
DataStream<Object> dataStream = env.addSource(new FlinkKafkaConsumer<>(
|
||||||
cfg.kafkaTopic,
|
cfg.kafkaTopic,
|
||||||
|
|||||||
@@ -93,7 +93,7 @@ public class StreamWriteITCase extends TestLogger {
|
|||||||
(RowType) AvroSchemaConverter.convertToDataType(StreamerUtil.getSourceSchema(conf))
|
(RowType) AvroSchemaConverter.convertToDataType(StreamerUtil.getSourceSchema(conf))
|
||||||
.getLogicalType();
|
.getLogicalType();
|
||||||
StreamWriteOperatorFactory<HoodieRecord> operatorFactory =
|
StreamWriteOperatorFactory<HoodieRecord> operatorFactory =
|
||||||
new StreamWriteOperatorFactory<>(conf, 4);
|
new StreamWriteOperatorFactory<>(conf);
|
||||||
|
|
||||||
JsonRowDataDeserializationSchema deserializationSchema = new JsonRowDataDeserializationSchema(
|
JsonRowDataDeserializationSchema deserializationSchema = new JsonRowDataDeserializationSchema(
|
||||||
rowType,
|
rowType,
|
||||||
|
|||||||
@@ -15,7 +15,7 @@
|
|||||||
# See the License for the specific language governing permissions and
|
# See the License for the specific language governing permissions and
|
||||||
# limitations under the License.
|
# limitations under the License.
|
||||||
###
|
###
|
||||||
log4j.rootLogger=WARN, CONSOLE
|
log4j.rootLogger=INFO, CONSOLE
|
||||||
log4j.logger.org.apache=INFO
|
log4j.logger.org.apache=INFO
|
||||||
log4j.logger.org.apache.hudi=DEBUG
|
log4j.logger.org.apache.hudi=DEBUG
|
||||||
log4j.logger.org.apache.hadoop.hbase=ERROR
|
log4j.logger.org.apache.hadoop.hbase=ERROR
|
||||||
@@ -27,5 +27,5 @@ log4j.appender.CONSOLE.layout=org.apache.log4j.PatternLayout
|
|||||||
log4j.appender.CONSOLE.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n
|
log4j.appender.CONSOLE.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n
|
||||||
log4j.appender.CONSOLE.filter.a=org.apache.log4j.varia.LevelRangeFilter
|
log4j.appender.CONSOLE.filter.a=org.apache.log4j.varia.LevelRangeFilter
|
||||||
log4j.appender.CONSOLE.filter.a.AcceptOnMatch=true
|
log4j.appender.CONSOLE.filter.a.AcceptOnMatch=true
|
||||||
log4j.appender.CONSOLE.filter.a.LevelMin=WARN
|
log4j.appender.CONSOLE.filter.a.LevelMin=INFO
|
||||||
log4j.appender.CONSOLE.filter.a.LevelMax=FATAL
|
log4j.appender.CONSOLE.filter.a.LevelMax=FATAL
|
||||||
|
|||||||
Reference in New Issue
Block a user