[HUDI-1785] Move OperationConverter to hudi-client-common for code reuse (#2798)
This commit is contained in:
@@ -16,7 +16,7 @@
|
|||||||
* limitations under the License.
|
* limitations under the License.
|
||||||
*/
|
*/
|
||||||
|
|
||||||
package org.apache.hudi.streamer;
|
package org.apache.hudi.client.utils;
|
||||||
|
|
||||||
import org.apache.hudi.common.model.WriteOperationType;
|
import org.apache.hudi.common.model.WriteOperationType;
|
||||||
|
|
||||||
@@ -18,6 +18,7 @@
|
|||||||
|
|
||||||
package org.apache.hudi.streamer;
|
package org.apache.hudi.streamer;
|
||||||
|
|
||||||
|
import org.apache.hudi.client.utils.OperationConverter;
|
||||||
import org.apache.hudi.common.model.OverwriteWithLatestAvroPayload;
|
import org.apache.hudi.common.model.OverwriteWithLatestAvroPayload;
|
||||||
import org.apache.hudi.common.model.WriteOperationType;
|
import org.apache.hudi.common.model.WriteOperationType;
|
||||||
import org.apache.hudi.keygen.SimpleAvroKeyGenerator;
|
import org.apache.hudi.keygen.SimpleAvroKeyGenerator;
|
||||||
|
|||||||
@@ -24,6 +24,7 @@ import org.apache.hudi.async.SparkAsyncCompactService;
|
|||||||
import org.apache.hudi.client.SparkRDDWriteClient;
|
import org.apache.hudi.client.SparkRDDWriteClient;
|
||||||
import org.apache.hudi.client.WriteStatus;
|
import org.apache.hudi.client.WriteStatus;
|
||||||
import org.apache.hudi.client.common.HoodieSparkEngineContext;
|
import org.apache.hudi.client.common.HoodieSparkEngineContext;
|
||||||
|
import org.apache.hudi.client.utils.OperationConverter;
|
||||||
import org.apache.hudi.common.bootstrap.index.HFileBootstrapIndex;
|
import org.apache.hudi.common.bootstrap.index.HFileBootstrapIndex;
|
||||||
import org.apache.hudi.common.config.TypedProperties;
|
import org.apache.hudi.common.config.TypedProperties;
|
||||||
import org.apache.hudi.common.fs.FSUtils;
|
import org.apache.hudi.common.fs.FSUtils;
|
||||||
@@ -48,10 +49,8 @@ import org.apache.hudi.utilities.checkpointing.InitialCheckPointProvider;
|
|||||||
import org.apache.hudi.utilities.schema.SchemaProvider;
|
import org.apache.hudi.utilities.schema.SchemaProvider;
|
||||||
import org.apache.hudi.utilities.sources.JsonDFSSource;
|
import org.apache.hudi.utilities.sources.JsonDFSSource;
|
||||||
|
|
||||||
import com.beust.jcommander.IStringConverter;
|
|
||||||
import com.beust.jcommander.JCommander;
|
import com.beust.jcommander.JCommander;
|
||||||
import com.beust.jcommander.Parameter;
|
import com.beust.jcommander.Parameter;
|
||||||
import com.beust.jcommander.ParameterException;
|
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.fs.FileSystem;
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
@@ -193,14 +192,6 @@ public class HoodieDeltaStreamer implements Serializable {
|
|||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
protected static class OperationConverter implements IStringConverter<WriteOperationType> {
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public WriteOperationType convert(String value) throws ParameterException {
|
|
||||||
return WriteOperationType.valueOf(value);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
public static class Config implements Serializable {
|
public static class Config implements Serializable {
|
||||||
public static final String DEFAULT_DFS_SOURCE_PROPERTIES = "file://" + System.getProperty("user.dir")
|
public static final String DEFAULT_DFS_SOURCE_PROPERTIES = "file://" + System.getProperty("user.dir")
|
||||||
+ "/src/test/resources/delta-streamer-config/dfs-source.properties";
|
+ "/src/test/resources/delta-streamer-config/dfs-source.properties";
|
||||||
|
|||||||
@@ -20,6 +20,7 @@ package org.apache.hudi.utilities.deltastreamer;
|
|||||||
|
|
||||||
import com.beust.jcommander.Parameter;
|
import com.beust.jcommander.Parameter;
|
||||||
import org.apache.hudi.DataSourceWriteOptions;
|
import org.apache.hudi.DataSourceWriteOptions;
|
||||||
|
import org.apache.hudi.client.utils.OperationConverter;
|
||||||
import org.apache.hudi.common.fs.FSUtils;
|
import org.apache.hudi.common.fs.FSUtils;
|
||||||
import org.apache.hudi.common.model.OverwriteWithLatestAvroPayload;
|
import org.apache.hudi.common.model.OverwriteWithLatestAvroPayload;
|
||||||
import org.apache.hudi.common.model.WriteOperationType;
|
import org.apache.hudi.common.model.WriteOperationType;
|
||||||
@@ -269,7 +270,7 @@ public class HoodieMultiTableDeltaStreamer {
|
|||||||
public long sourceLimit = Long.MAX_VALUE;
|
public long sourceLimit = Long.MAX_VALUE;
|
||||||
|
|
||||||
@Parameter(names = {"--op"}, description = "Takes one of these values : UPSERT (default), INSERT (use when input "
|
@Parameter(names = {"--op"}, description = "Takes one of these values : UPSERT (default), INSERT (use when input "
|
||||||
+ "is purely new data/inserts to gain speed)", converter = HoodieDeltaStreamer.OperationConverter.class)
|
+ "is purely new data/inserts to gain speed)", converter = OperationConverter.class)
|
||||||
public WriteOperationType operation = WriteOperationType.UPSERT;
|
public WriteOperationType operation = WriteOperationType.UPSERT;
|
||||||
|
|
||||||
@Parameter(names = {"--filter-dupes"},
|
@Parameter(names = {"--filter-dupes"},
|
||||||
|
|||||||
Reference in New Issue
Block a user