From 040756d8c0db4633ac3cccaa11e4172e8e077e1e Mon Sep 17 00:00:00 2001 From: wangxianghu Date: Mon, 12 Apr 2021 16:22:33 +0800 Subject: [PATCH] [HUDI-1785] Move OperationConverter to hudi-client-common for code reuse (#2798) --- .../apache/hudi/client/utils}/OperationConverter.java | 2 +- .../org/apache/hudi/streamer/FlinkStreamerConfig.java | 1 + .../utilities/deltastreamer/HoodieDeltaStreamer.java | 11 +---------- .../deltastreamer/HoodieMultiTableDeltaStreamer.java | 3 ++- 4 files changed, 5 insertions(+), 12 deletions(-) rename {hudi-flink/src/main/java/org/apache/hudi/streamer => hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/utils}/OperationConverter.java (97%) diff --git a/hudi-flink/src/main/java/org/apache/hudi/streamer/OperationConverter.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/utils/OperationConverter.java similarity index 97% rename from hudi-flink/src/main/java/org/apache/hudi/streamer/OperationConverter.java rename to hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/utils/OperationConverter.java index 9baaf0a80..05b07bdce 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/streamer/OperationConverter.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/utils/OperationConverter.java @@ -16,7 +16,7 @@ * limitations under the License. */ -package org.apache.hudi.streamer; +package org.apache.hudi.client.utils; import org.apache.hudi.common.model.WriteOperationType; diff --git a/hudi-flink/src/main/java/org/apache/hudi/streamer/FlinkStreamerConfig.java b/hudi-flink/src/main/java/org/apache/hudi/streamer/FlinkStreamerConfig.java index 418e2ea25..78e477ae7 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/streamer/FlinkStreamerConfig.java +++ b/hudi-flink/src/main/java/org/apache/hudi/streamer/FlinkStreamerConfig.java @@ -18,6 +18,7 @@ package org.apache.hudi.streamer; +import org.apache.hudi.client.utils.OperationConverter; import org.apache.hudi.common.model.OverwriteWithLatestAvroPayload; import org.apache.hudi.common.model.WriteOperationType; import org.apache.hudi.keygen.SimpleAvroKeyGenerator; diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java index 6e3a024d0..7b0a2344d 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java @@ -24,6 +24,7 @@ import org.apache.hudi.async.SparkAsyncCompactService; import org.apache.hudi.client.SparkRDDWriteClient; import org.apache.hudi.client.WriteStatus; import org.apache.hudi.client.common.HoodieSparkEngineContext; +import org.apache.hudi.client.utils.OperationConverter; import org.apache.hudi.common.bootstrap.index.HFileBootstrapIndex; import org.apache.hudi.common.config.TypedProperties; import org.apache.hudi.common.fs.FSUtils; @@ -48,10 +49,8 @@ import org.apache.hudi.utilities.checkpointing.InitialCheckPointProvider; import org.apache.hudi.utilities.schema.SchemaProvider; import org.apache.hudi.utilities.sources.JsonDFSSource; -import com.beust.jcommander.IStringConverter; import com.beust.jcommander.JCommander; import com.beust.jcommander.Parameter; -import com.beust.jcommander.ParameterException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -193,14 +192,6 @@ public class HoodieDeltaStreamer implements Serializable { return true; } - protected static class OperationConverter implements IStringConverter { - - @Override - public WriteOperationType convert(String value) throws ParameterException { - return WriteOperationType.valueOf(value); - } - } - public static class Config implements Serializable { public static final String DEFAULT_DFS_SOURCE_PROPERTIES = "file://" + System.getProperty("user.dir") + "/src/test/resources/delta-streamer-config/dfs-source.properties"; diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieMultiTableDeltaStreamer.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieMultiTableDeltaStreamer.java index be2fe5425..a39b97393 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieMultiTableDeltaStreamer.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieMultiTableDeltaStreamer.java @@ -20,6 +20,7 @@ package org.apache.hudi.utilities.deltastreamer; import com.beust.jcommander.Parameter; import org.apache.hudi.DataSourceWriteOptions; +import org.apache.hudi.client.utils.OperationConverter; import org.apache.hudi.common.fs.FSUtils; import org.apache.hudi.common.model.OverwriteWithLatestAvroPayload; import org.apache.hudi.common.model.WriteOperationType; @@ -269,7 +270,7 @@ public class HoodieMultiTableDeltaStreamer { public long sourceLimit = Long.MAX_VALUE; @Parameter(names = {"--op"}, description = "Takes one of these values : UPSERT (default), INSERT (use when input " - + "is purely new data/inserts to gain speed)", converter = HoodieDeltaStreamer.OperationConverter.class) + + "is purely new data/inserts to gain speed)", converter = OperationConverter.class) public WriteOperationType operation = WriteOperationType.UPSERT; @Parameter(names = {"--filter-dupes"},