1
0

[HUDI-3607] Support backend switch in HoodieFlinkStreamer (#5032)

* [HUDI-3607] Support backend switch in HoodieFlinkStreamer

* [HUDI-3607] Support backend switch in HoodieFlinkStreamer
1. checkstyle fix

* [HUDI-3607] Support backend switch in HoodieFlinkStreamer
1. change the msg
This commit is contained in:
that's cool
2022-03-16 14:07:31 +08:00
committed by GitHub
parent 296a0e6bcf
commit 91849c3d66
4 changed files with 56 additions and 2 deletions

View File

@@ -174,6 +174,12 @@
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-statebackend-rocksdb_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.parquet</groupId>

View File

@@ -18,6 +18,8 @@
package org.apache.hudi.streamer;
import org.apache.flink.runtime.state.StateBackend;
import org.apache.flink.runtime.state.hashmap.HashMapStateBackend;
import org.apache.hudi.client.utils.OperationConverter;
import org.apache.hudi.common.model.OverwriteWithLatestAvroPayload;
import org.apache.hudi.common.model.WriteOperationType;
@@ -25,6 +27,7 @@ import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.hive.SlashEncodedDayPartitionValueExtractor;
import org.apache.hudi.keygen.constant.KeyGeneratorType;
import org.apache.hudi.util.FlinkStateBackendConverter;
import org.apache.hudi.util.StreamerUtil;
import com.beust.jcommander.Parameter;
@@ -53,6 +56,10 @@ public class FlinkStreamerConfig extends Configuration {
@Parameter(names = {"--flink-checkpoint-path"}, description = "Flink checkpoint path.")
public String flinkCheckPointPath;
@Parameter(names = {"--flink-state-backend-type"}, description = "Flink state backend type, support only hashmap and rocksdb by now,"
+ " default hashmap.", converter = FlinkStateBackendConverter.class)
public StateBackend stateBackend = new HashMapStateBackend();
@Parameter(names = {"--instant-retry-times"}, description = "Times to retry when latest instant has not completed.")
public String instantRetryTimes = "10";

View File

@@ -32,7 +32,6 @@ import com.beust.jcommander.JCommander;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.formats.common.TimestampFormat;
import org.apache.flink.formats.json.JsonRowDataDeserializationSchema;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
@@ -61,8 +60,9 @@ public class HoodieFlinkStreamer {
// There can only be one checkpoint at one time.
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
env.setStateBackend(cfg.stateBackend);
if (cfg.flinkCheckPointPath != null) {
env.setStateBackend(new FsStateBackend(cfg.flinkCheckPointPath));
env.getCheckpointConfig().setCheckpointStorage(cfg.flinkCheckPointPath);
}
TypedProperties kafkaProps = DFSPropertiesConfiguration.getGlobalProps();

View File

@@ -0,0 +1,41 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.util;
import com.beust.jcommander.IStringConverter;
import com.beust.jcommander.ParameterException;
import org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend;
import org.apache.flink.runtime.state.StateBackend;
import org.apache.flink.runtime.state.hashmap.HashMapStateBackend;
import org.apache.hudi.exception.HoodieException;
/**
* Converter that converts a string into Flink StateBackend.
*/
public class FlinkStateBackendConverter implements IStringConverter<StateBackend> {
@Override
public StateBackend convert(String value) throws ParameterException {
switch (value) {
case "hashmap" : return new HashMapStateBackend();
case "rocksdb" : return new EmbeddedRocksDBStateBackend();
default:
throw new HoodieException(String.format("Unknown flink state backend %s.", value));
}
}
}