diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HDFSParquetImporter.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HDFSParquetImporter.java index e0cfba067..c15585b79 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HDFSParquetImporter.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HDFSParquetImporter.java @@ -278,7 +278,8 @@ public class HDFSParquetImporter implements Serializable { + "hoodie client for importing") public String propsFilePath = null; @Parameter(names = {"--hoodie-conf"}, description = "Any configuration that can be set in the properties file " - + "(using the CLI parameter \"--propsFilePath\") can also be passed command line using this parameter") + + "(using the CLI parameter \"--props\") can also be passed command line using this parameter. This can be repeated", + splitter = IdentitySplitter.class) public List configs = new ArrayList<>(); @Parameter(names = {"--help", "-h"}, help = true) public Boolean help = false; diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieCleaner.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieCleaner.java index 03a934b12..bf1fedb82 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieCleaner.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieCleaner.java @@ -88,7 +88,8 @@ public class HoodieCleaner { public String propsFilePath = null; @Parameter(names = {"--hoodie-conf"}, description = "Any configuration that can be set in the properties file " - + "(using the CLI parameter \"--propsFilePath\") can also be passed command line using this parameter") + + "(using the CLI parameter \"--props\") can also be passed command line using this parameter. This can be repeated", + splitter = IdentitySplitter.class) public List configs = new ArrayList<>(); @Parameter(names = {"--spark-master"}, description = "spark master to use.") diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieCompactor.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieCompactor.java index 88db12dcb..a09112417 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieCompactor.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieCompactor.java @@ -90,7 +90,8 @@ public class HoodieCompactor { public String propsFilePath = null; @Parameter(names = {"--hoodie-conf"}, description = "Any configuration that can be set in the properties file " - + "(using the CLI parameter \"--propsFilePath\") can also be passed command line using this parameter") + + "(using the CLI parameter \"--props\") can also be passed command line using this parameter. This can be repeated", + splitter = IdentitySplitter.class) public List configs = new ArrayList<>(); } diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/IdentitySplitter.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/IdentitySplitter.java new file mode 100644 index 000000000..a75ffffdb --- /dev/null +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/IdentitySplitter.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.utilities; + +import com.beust.jcommander.converters.IParameterSplitter; + +import java.util.Collections; +import java.util.List; + +/** + * Splitter utility related to Jcommander usage of ArrayList parameters. + */ +public class IdentitySplitter implements IParameterSplitter { + public List split(String value) { + return Collections.singletonList(value); + } +} diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java index 2f61c2ec7..05a1bbca6 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java @@ -18,8 +18,8 @@ package org.apache.hudi.utilities.deltastreamer; -import org.apache.hudi.async.AbstractAsyncService; import org.apache.hudi.client.HoodieWriteClient; +import org.apache.hudi.async.AbstractAsyncService; import org.apache.hudi.client.WriteStatus; import org.apache.hudi.common.bootstrap.index.HFileBootstrapIndex; import org.apache.hudi.common.config.TypedProperties; @@ -33,6 +33,7 @@ import org.apache.hudi.common.table.timeline.HoodieTimeline; import org.apache.hudi.common.util.CompactionUtils; import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.ValidationUtils; +import org.apache.hudi.utilities.IdentitySplitter; import org.apache.hudi.common.util.collection.Pair; import org.apache.hudi.exception.HoodieException; import org.apache.hudi.exception.HoodieIOException; @@ -60,6 +61,7 @@ import java.io.Serializable; import java.util.ArrayList; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.concurrent.BlockingQueue; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutorService; @@ -217,12 +219,14 @@ public class HoodieDeltaStreamer implements Serializable { @Parameter(names = {"--props"}, description = "path to properties file on localfs or dfs, with configurations for " + "hoodie client, schema provider, key generator and data source. For hoodie client props, sane defaults are " + "used, but recommend use to provide basic things like metrics endpoints, hive configs etc. For sources, refer" - + "to individual classes, for supported properties.") + + "to individual classes, for supported properties." + + " Properties in this file can be overridden by \"--hoodie-conf\"") public String propsFilePath = "file://" + System.getProperty("user.dir") + "/src/test/resources/delta-streamer-config/dfs-source.properties"; @Parameter(names = {"--hoodie-conf"}, description = "Any configuration that can be set in the properties file " - + "(using the CLI parameter \"--props\") can also be passed command line using this parameter") + + "(using the CLI parameter \"--props\") can also be passed command line using this parameter. This can be repeated", + splitter = IdentitySplitter.class) public List configs = new ArrayList<>(); @Parameter(names = {"--source-class"}, @@ -343,16 +347,106 @@ public class HoodieDeltaStreamer implements Serializable { return !continuousMode && !forceDisableCompaction && HoodieTableType.MERGE_ON_READ.equals(HoodieTableType.valueOf(tableType)); } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + Config config = (Config) o; + return sourceLimit == config.sourceLimit + && Objects.equals(targetBasePath, config.targetBasePath) + && Objects.equals(targetTableName, config.targetTableName) + && Objects.equals(tableType, config.tableType) + && Objects.equals(baseFileFormat, config.baseFileFormat) + && Objects.equals(propsFilePath, config.propsFilePath) + && Objects.equals(configs, config.configs) + && Objects.equals(sourceClassName, config.sourceClassName) + && Objects.equals(sourceOrderingField, config.sourceOrderingField) + && Objects.equals(payloadClassName, config.payloadClassName) + && Objects.equals(schemaProviderClassName, config.schemaProviderClassName) + && Objects.equals(transformerClassNames, config.transformerClassNames) + && operation == config.operation + && Objects.equals(filterDupes, config.filterDupes) + && Objects.equals(enableHiveSync, config.enableHiveSync) + && Objects.equals(maxPendingCompactions, config.maxPendingCompactions) + && Objects.equals(continuousMode, config.continuousMode) + && Objects.equals(minSyncIntervalSeconds, config.minSyncIntervalSeconds) + && Objects.equals(sparkMaster, config.sparkMaster) + && Objects.equals(commitOnErrors, config.commitOnErrors) + && Objects.equals(deltaSyncSchedulingWeight, config.deltaSyncSchedulingWeight) + && Objects.equals(compactSchedulingWeight, config.compactSchedulingWeight) + && Objects.equals(deltaSyncSchedulingMinShare, config.deltaSyncSchedulingMinShare) + && Objects.equals(compactSchedulingMinShare, config.compactSchedulingMinShare) + && Objects.equals(forceDisableCompaction, config.forceDisableCompaction) + && Objects.equals(checkpoint, config.checkpoint) + && Objects.equals(initialCheckpointProvider, config.initialCheckpointProvider) + && Objects.equals(help, config.help); + } + + @Override + public int hashCode() { + return Objects.hash(targetBasePath, targetTableName, tableType, + baseFileFormat, propsFilePath, configs, sourceClassName, + sourceOrderingField, payloadClassName, schemaProviderClassName, + transformerClassNames, sourceLimit, operation, filterDupes, + enableHiveSync, maxPendingCompactions, continuousMode, + minSyncIntervalSeconds, sparkMaster, commitOnErrors, + deltaSyncSchedulingWeight, compactSchedulingWeight, deltaSyncSchedulingMinShare, + compactSchedulingMinShare, forceDisableCompaction, checkpoint, + initialCheckpointProvider, help); + } + + @Override + public String toString() { + return "Config{" + + "targetBasePath='" + targetBasePath + '\'' + + ", targetTableName='" + targetTableName + '\'' + + ", tableType='" + tableType + '\'' + + ", baseFileFormat='" + baseFileFormat + '\'' + + ", propsFilePath='" + propsFilePath + '\'' + + ", configs=" + configs + + ", sourceClassName='" + sourceClassName + '\'' + + ", sourceOrderingField='" + sourceOrderingField + '\'' + + ", payloadClassName='" + payloadClassName + '\'' + + ", schemaProviderClassName='" + schemaProviderClassName + '\'' + + ", transformerClassNames=" + transformerClassNames + + ", sourceLimit=" + sourceLimit + + ", operation=" + operation + + ", filterDupes=" + filterDupes + + ", enableHiveSync=" + enableHiveSync + + ", maxPendingCompactions=" + maxPendingCompactions + + ", continuousMode=" + continuousMode + + ", minSyncIntervalSeconds=" + minSyncIntervalSeconds + + ", sparkMaster='" + sparkMaster + '\'' + + ", commitOnErrors=" + commitOnErrors + + ", deltaSyncSchedulingWeight=" + deltaSyncSchedulingWeight + + ", compactSchedulingWeight=" + compactSchedulingWeight + + ", deltaSyncSchedulingMinShare=" + deltaSyncSchedulingMinShare + + ", compactSchedulingMinShare=" + compactSchedulingMinShare + + ", forceDisableCompaction=" + forceDisableCompaction + + ", checkpoint='" + checkpoint + '\'' + + ", initialCheckpointProvider='" + initialCheckpointProvider + '\'' + + ", help=" + help + + '}'; + } } - public static void main(String[] args) throws Exception { - final Config cfg = new Config(); + public static final Config getConfig(String[] args) { + Config cfg = new Config(); JCommander cmd = new JCommander(cfg, null, args); if (cfg.help || args.length == 0) { cmd.usage(); System.exit(1); } + return cfg; + } + public static void main(String[] args) throws Exception { + final Config cfg = getConfig(args); Map additionalSparkConfigs = SchedulerConfGenerator.getSparkSchedulingConfigs(cfg); JavaSparkContext jssc = UtilHelpers.buildSparkContext("delta-streamer-" + cfg.targetTableName, cfg.sparkMaster, additionalSparkConfigs); diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieMultiTableDeltaStreamer.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieMultiTableDeltaStreamer.java index fc7a0e09c..1b3073115 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieMultiTableDeltaStreamer.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieMultiTableDeltaStreamer.java @@ -25,6 +25,7 @@ import org.apache.hudi.common.model.OverwriteWithLatestAvroPayload; import org.apache.hudi.common.util.StringUtils; import org.apache.hudi.common.config.TypedProperties; import org.apache.hudi.common.util.ValidationUtils; +import org.apache.hudi.utilities.IdentitySplitter; import org.apache.hudi.exception.HoodieException; import org.apache.hudi.utilities.UtilHelpers; import org.apache.hudi.utilities.schema.SchemaRegistryProvider; @@ -226,7 +227,8 @@ public class HoodieMultiTableDeltaStreamer { "file://" + System.getProperty("user.dir") + "/src/test/resources/delta-streamer-config/dfs-source.properties"; @Parameter(names = {"--hoodie-conf"}, description = "Any configuration that can be set in the properties file " - + "(using the CLI parameter \"--props\") can also be passed command line using this parameter") + + "(using the CLI parameter \"--props\") can also be passed command line using this parameter. This can be repeated", + splitter = IdentitySplitter.class) public List configs = new ArrayList<>(); @Parameter(names = {"--source-class"}, diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java index ee8e34f95..5c7ac9319 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java @@ -78,9 +78,13 @@ import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; import java.io.IOException; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collections; import java.util.List; import java.util.Properties; @@ -90,6 +94,7 @@ import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.function.Function; import java.util.stream.Collectors; +import java.util.stream.Stream; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; @@ -113,6 +118,21 @@ public class TestHoodieDeltaStreamer extends UtilitiesTestBase { private static String PARQUET_SOURCE_ROOT; private static final int PARQUET_NUM_RECORDS = 5; private static final int CSV_NUM_RECORDS = 3; + // Required fields + private static final String TGT_BASE_PATH_PARAM = "--target-base-path"; + private static final String TGT_BASE_PATH_VALUE = "s3://mybucket/blah"; + private static final String TABLE_TYPE_PARAM = "--table-type"; + private static final String TABLE_TYPE_VALUE = "COPY_ON_WRITE"; + private static final String TARGET_TABLE_PARAM = "--target-table"; + private static final String TARGET_TABLE_VALUE = "test"; + private static final String BASE_FILE_FORMAT_PARAM = "--base-file-format"; + private static final String BASE_FILE_FORMAT_VALUE = "PARQUET"; + private static final String SOURCE_LIMIT_PARAM = "--source-limit"; + private static final String SOURCE_LIMIT_VALUE = "500"; + private static final String ENABLE_HIVE_SYNC_PARAM = "--enable-hive-sync"; + private static final String HOODIE_CONF_PARAM = "--hoodie-conf"; + private static final String HOODIE_CONF_VALUE1 = "hoodie.datasource.hive_sync.table=test_table"; + private static final String HOODIE_CONF_VALUE2 = "hoodie.datasource.write.recordkey.field=Field1,Field2,Field3"; private static final Logger LOG = LogManager.getLogger(TestHoodieDeltaStreamer.class); public static KafkaTestUtils testUtils; @@ -396,6 +416,93 @@ public class TestHoodieDeltaStreamer extends UtilitiesTestBase { props.getString("hoodie.datasource.write.keygenerator.class")); } + private static HoodieDeltaStreamer.Config getBaseConfig() { + // Base config with all required fields + HoodieDeltaStreamer.Config base = new HoodieDeltaStreamer.Config(); + base.targetBasePath = TGT_BASE_PATH_VALUE; + base.tableType = TABLE_TYPE_VALUE; + base.targetTableName = TARGET_TABLE_VALUE; + return base; + } + + private static Stream provideValidCliArgs() { + + HoodieDeltaStreamer.Config base = getBaseConfig(); + // String parameter + HoodieDeltaStreamer.Config conf1 = getBaseConfig(); + conf1.baseFileFormat = BASE_FILE_FORMAT_VALUE; + + // Integer parameter + HoodieDeltaStreamer.Config conf2 = getBaseConfig(); + conf2.sourceLimit = Long.parseLong(SOURCE_LIMIT_VALUE); + + // Boolean Parameter + HoodieDeltaStreamer.Config conf3 = getBaseConfig(); + conf3.enableHiveSync = true; + + // ArrayList Parameter with 1 value + HoodieDeltaStreamer.Config conf4 = getBaseConfig(); + conf4.configs = Arrays.asList(HOODIE_CONF_VALUE1); + + // ArrayList Parameter with comma separated values + HoodieDeltaStreamer.Config conf5 = getBaseConfig(); + conf5.configs = Arrays.asList(HOODIE_CONF_VALUE2); + + // Multiple ArrayList values + HoodieDeltaStreamer.Config conf6 = getBaseConfig(); + conf6.configs = Arrays.asList(HOODIE_CONF_VALUE1, HOODIE_CONF_VALUE2); + + // Super set of all cases + HoodieDeltaStreamer.Config conf = getBaseConfig(); + conf.baseFileFormat = BASE_FILE_FORMAT_VALUE; + conf.sourceLimit = Long.parseLong(SOURCE_LIMIT_VALUE); + conf.enableHiveSync = true; + conf.configs = Arrays.asList(HOODIE_CONF_VALUE1, HOODIE_CONF_VALUE2); + + String[] allConfig = new String[]{TGT_BASE_PATH_PARAM, TGT_BASE_PATH_VALUE, SOURCE_LIMIT_PARAM, + SOURCE_LIMIT_VALUE, TABLE_TYPE_PARAM, TABLE_TYPE_VALUE, TARGET_TABLE_PARAM, TARGET_TABLE_VALUE, + BASE_FILE_FORMAT_PARAM, BASE_FILE_FORMAT_VALUE, ENABLE_HIVE_SYNC_PARAM, HOODIE_CONF_PARAM, HOODIE_CONF_VALUE1, + HOODIE_CONF_PARAM, HOODIE_CONF_VALUE2}; + + return Stream.of( + // Base + Arguments.of(new String[] {TGT_BASE_PATH_PARAM, TGT_BASE_PATH_VALUE, + TABLE_TYPE_PARAM, TABLE_TYPE_VALUE, TARGET_TABLE_PARAM, TARGET_TABLE_VALUE}, base), + // String + Arguments.of(new String[] {TGT_BASE_PATH_PARAM, TGT_BASE_PATH_VALUE, + TABLE_TYPE_PARAM, TABLE_TYPE_VALUE, TARGET_TABLE_PARAM, TARGET_TABLE_VALUE, + BASE_FILE_FORMAT_PARAM, BASE_FILE_FORMAT_VALUE}, conf1), + // Integer + Arguments.of(new String[] {TGT_BASE_PATH_PARAM, TGT_BASE_PATH_VALUE, + TABLE_TYPE_PARAM, TABLE_TYPE_VALUE, TARGET_TABLE_PARAM, TARGET_TABLE_VALUE, + SOURCE_LIMIT_PARAM, SOURCE_LIMIT_VALUE}, conf2), + // Boolean + Arguments.of(new String[] {TGT_BASE_PATH_PARAM, TGT_BASE_PATH_VALUE, + TABLE_TYPE_PARAM, TABLE_TYPE_VALUE, TARGET_TABLE_PARAM, TARGET_TABLE_VALUE, + ENABLE_HIVE_SYNC_PARAM}, conf3), + // Array List 1 + Arguments.of(new String[] {TGT_BASE_PATH_PARAM, TGT_BASE_PATH_VALUE, + TABLE_TYPE_PARAM, TABLE_TYPE_VALUE, TARGET_TABLE_PARAM, TARGET_TABLE_VALUE, + HOODIE_CONF_PARAM, HOODIE_CONF_VALUE1}, conf4), + // Array List with comma + Arguments.of(new String[] {TGT_BASE_PATH_PARAM, TGT_BASE_PATH_VALUE, + TABLE_TYPE_PARAM, TABLE_TYPE_VALUE, TARGET_TABLE_PARAM, TARGET_TABLE_VALUE, + HOODIE_CONF_PARAM, HOODIE_CONF_VALUE2}, conf5), + // Array list with multiple values + Arguments.of(new String[] {TGT_BASE_PATH_PARAM, TGT_BASE_PATH_VALUE, + TABLE_TYPE_PARAM, TABLE_TYPE_VALUE, TARGET_TABLE_PARAM, TARGET_TABLE_VALUE, + HOODIE_CONF_PARAM, HOODIE_CONF_VALUE1, HOODIE_CONF_PARAM, HOODIE_CONF_VALUE2}, conf6), + // All + Arguments.of(allConfig, conf) + ); + } + + @ParameterizedTest + @MethodSource("provideValidCliArgs") + public void testValidCommandLineArgs(String[] args, HoodieDeltaStreamer.Config expected) { + assertEquals(expected, HoodieDeltaStreamer.getConfig(args)); + } + @Test public void testKafkaConnectCheckpointProvider() throws IOException { String tableBasePath = dfsBasePath + "/test_table";