1
0

[HUDI-1140] Fix Jcommander issue for --hoodie-conf in DeltaStreamer (#1898)

This commit is contained in:
Sreeram Ramji
2020-08-04 21:42:51 -07:00
committed by GitHub
parent d3711a2641
commit 217a84192c
7 changed files with 248 additions and 9 deletions

View File

@@ -278,7 +278,8 @@ public class HDFSParquetImporter implements Serializable {
+ "hoodie client for importing")
public String propsFilePath = null;
@Parameter(names = {"--hoodie-conf"}, description = "Any configuration that can be set in the properties file "
+ "(using the CLI parameter \"--propsFilePath\") can also be passed command line using this parameter")
+ "(using the CLI parameter \"--props\") can also be passed command line using this parameter. This can be repeated",
splitter = IdentitySplitter.class)
public List<String> configs = new ArrayList<>();
@Parameter(names = {"--help", "-h"}, help = true)
public Boolean help = false;

View File

@@ -88,7 +88,8 @@ public class HoodieCleaner {
public String propsFilePath = null;
@Parameter(names = {"--hoodie-conf"}, description = "Any configuration that can be set in the properties file "
+ "(using the CLI parameter \"--propsFilePath\") can also be passed command line using this parameter")
+ "(using the CLI parameter \"--props\") can also be passed command line using this parameter. This can be repeated",
splitter = IdentitySplitter.class)
public List<String> configs = new ArrayList<>();
@Parameter(names = {"--spark-master"}, description = "spark master to use.")

View File

@@ -90,7 +90,8 @@ public class HoodieCompactor {
public String propsFilePath = null;
@Parameter(names = {"--hoodie-conf"}, description = "Any configuration that can be set in the properties file "
+ "(using the CLI parameter \"--propsFilePath\") can also be passed command line using this parameter")
+ "(using the CLI parameter \"--props\") can also be passed command line using this parameter. This can be repeated",
splitter = IdentitySplitter.class)
public List<String> configs = new ArrayList<>();
}

View File

@@ -0,0 +1,33 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.utilities;
import com.beust.jcommander.converters.IParameterSplitter;
import java.util.Collections;
import java.util.List;
/**
* Splitter utility related to Jcommander usage of ArrayList parameters.
*/
public class IdentitySplitter implements IParameterSplitter {
public List<String> split(String value) {
return Collections.singletonList(value);
}
}

View File

@@ -18,8 +18,8 @@
package org.apache.hudi.utilities.deltastreamer;
import org.apache.hudi.async.AbstractAsyncService;
import org.apache.hudi.client.HoodieWriteClient;
import org.apache.hudi.async.AbstractAsyncService;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.common.bootstrap.index.HFileBootstrapIndex;
import org.apache.hudi.common.config.TypedProperties;
@@ -33,6 +33,7 @@ import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.util.CompactionUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.utilities.IdentitySplitter;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieIOException;
@@ -60,6 +61,7 @@ import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
@@ -217,12 +219,14 @@ public class HoodieDeltaStreamer implements Serializable {
@Parameter(names = {"--props"}, description = "path to properties file on localfs or dfs, with configurations for "
+ "hoodie client, schema provider, key generator and data source. For hoodie client props, sane defaults are "
+ "used, but recommend use to provide basic things like metrics endpoints, hive configs etc. For sources, refer"
+ "to individual classes, for supported properties.")
+ "to individual classes, for supported properties."
+ " Properties in this file can be overridden by \"--hoodie-conf\"")
public String propsFilePath =
"file://" + System.getProperty("user.dir") + "/src/test/resources/delta-streamer-config/dfs-source.properties";
@Parameter(names = {"--hoodie-conf"}, description = "Any configuration that can be set in the properties file "
+ "(using the CLI parameter \"--props\") can also be passed command line using this parameter")
+ "(using the CLI parameter \"--props\") can also be passed command line using this parameter. This can be repeated",
splitter = IdentitySplitter.class)
public List<String> configs = new ArrayList<>();
@Parameter(names = {"--source-class"},
@@ -343,16 +347,106 @@ public class HoodieDeltaStreamer implements Serializable {
return !continuousMode && !forceDisableCompaction
&& HoodieTableType.MERGE_ON_READ.equals(HoodieTableType.valueOf(tableType));
}
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
Config config = (Config) o;
return sourceLimit == config.sourceLimit
&& Objects.equals(targetBasePath, config.targetBasePath)
&& Objects.equals(targetTableName, config.targetTableName)
&& Objects.equals(tableType, config.tableType)
&& Objects.equals(baseFileFormat, config.baseFileFormat)
&& Objects.equals(propsFilePath, config.propsFilePath)
&& Objects.equals(configs, config.configs)
&& Objects.equals(sourceClassName, config.sourceClassName)
&& Objects.equals(sourceOrderingField, config.sourceOrderingField)
&& Objects.equals(payloadClassName, config.payloadClassName)
&& Objects.equals(schemaProviderClassName, config.schemaProviderClassName)
&& Objects.equals(transformerClassNames, config.transformerClassNames)
&& operation == config.operation
&& Objects.equals(filterDupes, config.filterDupes)
&& Objects.equals(enableHiveSync, config.enableHiveSync)
&& Objects.equals(maxPendingCompactions, config.maxPendingCompactions)
&& Objects.equals(continuousMode, config.continuousMode)
&& Objects.equals(minSyncIntervalSeconds, config.minSyncIntervalSeconds)
&& Objects.equals(sparkMaster, config.sparkMaster)
&& Objects.equals(commitOnErrors, config.commitOnErrors)
&& Objects.equals(deltaSyncSchedulingWeight, config.deltaSyncSchedulingWeight)
&& Objects.equals(compactSchedulingWeight, config.compactSchedulingWeight)
&& Objects.equals(deltaSyncSchedulingMinShare, config.deltaSyncSchedulingMinShare)
&& Objects.equals(compactSchedulingMinShare, config.compactSchedulingMinShare)
&& Objects.equals(forceDisableCompaction, config.forceDisableCompaction)
&& Objects.equals(checkpoint, config.checkpoint)
&& Objects.equals(initialCheckpointProvider, config.initialCheckpointProvider)
&& Objects.equals(help, config.help);
}
@Override
public int hashCode() {
return Objects.hash(targetBasePath, targetTableName, tableType,
baseFileFormat, propsFilePath, configs, sourceClassName,
sourceOrderingField, payloadClassName, schemaProviderClassName,
transformerClassNames, sourceLimit, operation, filterDupes,
enableHiveSync, maxPendingCompactions, continuousMode,
minSyncIntervalSeconds, sparkMaster, commitOnErrors,
deltaSyncSchedulingWeight, compactSchedulingWeight, deltaSyncSchedulingMinShare,
compactSchedulingMinShare, forceDisableCompaction, checkpoint,
initialCheckpointProvider, help);
}
@Override
public String toString() {
return "Config{"
+ "targetBasePath='" + targetBasePath + '\''
+ ", targetTableName='" + targetTableName + '\''
+ ", tableType='" + tableType + '\''
+ ", baseFileFormat='" + baseFileFormat + '\''
+ ", propsFilePath='" + propsFilePath + '\''
+ ", configs=" + configs
+ ", sourceClassName='" + sourceClassName + '\''
+ ", sourceOrderingField='" + sourceOrderingField + '\''
+ ", payloadClassName='" + payloadClassName + '\''
+ ", schemaProviderClassName='" + schemaProviderClassName + '\''
+ ", transformerClassNames=" + transformerClassNames
+ ", sourceLimit=" + sourceLimit
+ ", operation=" + operation
+ ", filterDupes=" + filterDupes
+ ", enableHiveSync=" + enableHiveSync
+ ", maxPendingCompactions=" + maxPendingCompactions
+ ", continuousMode=" + continuousMode
+ ", minSyncIntervalSeconds=" + minSyncIntervalSeconds
+ ", sparkMaster='" + sparkMaster + '\''
+ ", commitOnErrors=" + commitOnErrors
+ ", deltaSyncSchedulingWeight=" + deltaSyncSchedulingWeight
+ ", compactSchedulingWeight=" + compactSchedulingWeight
+ ", deltaSyncSchedulingMinShare=" + deltaSyncSchedulingMinShare
+ ", compactSchedulingMinShare=" + compactSchedulingMinShare
+ ", forceDisableCompaction=" + forceDisableCompaction
+ ", checkpoint='" + checkpoint + '\''
+ ", initialCheckpointProvider='" + initialCheckpointProvider + '\''
+ ", help=" + help
+ '}';
}
}
public static void main(String[] args) throws Exception {
final Config cfg = new Config();
public static final Config getConfig(String[] args) {
Config cfg = new Config();
JCommander cmd = new JCommander(cfg, null, args);
if (cfg.help || args.length == 0) {
cmd.usage();
System.exit(1);
}
return cfg;
}
public static void main(String[] args) throws Exception {
final Config cfg = getConfig(args);
Map<String, String> additionalSparkConfigs = SchedulerConfGenerator.getSparkSchedulingConfigs(cfg);
JavaSparkContext jssc =
UtilHelpers.buildSparkContext("delta-streamer-" + cfg.targetTableName, cfg.sparkMaster, additionalSparkConfigs);

View File

@@ -25,6 +25,7 @@ import org.apache.hudi.common.model.OverwriteWithLatestAvroPayload;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.utilities.IdentitySplitter;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.utilities.UtilHelpers;
import org.apache.hudi.utilities.schema.SchemaRegistryProvider;
@@ -226,7 +227,8 @@ public class HoodieMultiTableDeltaStreamer {
"file://" + System.getProperty("user.dir") + "/src/test/resources/delta-streamer-config/dfs-source.properties";
@Parameter(names = {"--hoodie-conf"}, description = "Any configuration that can be set in the properties file "
+ "(using the CLI parameter \"--props\") can also be passed command line using this parameter")
+ "(using the CLI parameter \"--props\") can also be passed command line using this parameter. This can be repeated",
splitter = IdentitySplitter.class)
public List<String> configs = new ArrayList<>();
@Parameter(names = {"--source-class"},

View File

@@ -78,9 +78,13 @@ import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Properties;
@@ -90,6 +94,7 @@ import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
@@ -113,6 +118,21 @@ public class TestHoodieDeltaStreamer extends UtilitiesTestBase {
private static String PARQUET_SOURCE_ROOT;
private static final int PARQUET_NUM_RECORDS = 5;
private static final int CSV_NUM_RECORDS = 3;
// Required fields
private static final String TGT_BASE_PATH_PARAM = "--target-base-path";
private static final String TGT_BASE_PATH_VALUE = "s3://mybucket/blah";
private static final String TABLE_TYPE_PARAM = "--table-type";
private static final String TABLE_TYPE_VALUE = "COPY_ON_WRITE";
private static final String TARGET_TABLE_PARAM = "--target-table";
private static final String TARGET_TABLE_VALUE = "test";
private static final String BASE_FILE_FORMAT_PARAM = "--base-file-format";
private static final String BASE_FILE_FORMAT_VALUE = "PARQUET";
private static final String SOURCE_LIMIT_PARAM = "--source-limit";
private static final String SOURCE_LIMIT_VALUE = "500";
private static final String ENABLE_HIVE_SYNC_PARAM = "--enable-hive-sync";
private static final String HOODIE_CONF_PARAM = "--hoodie-conf";
private static final String HOODIE_CONF_VALUE1 = "hoodie.datasource.hive_sync.table=test_table";
private static final String HOODIE_CONF_VALUE2 = "hoodie.datasource.write.recordkey.field=Field1,Field2,Field3";
private static final Logger LOG = LogManager.getLogger(TestHoodieDeltaStreamer.class);
public static KafkaTestUtils testUtils;
@@ -396,6 +416,93 @@ public class TestHoodieDeltaStreamer extends UtilitiesTestBase {
props.getString("hoodie.datasource.write.keygenerator.class"));
}
private static HoodieDeltaStreamer.Config getBaseConfig() {
// Base config with all required fields
HoodieDeltaStreamer.Config base = new HoodieDeltaStreamer.Config();
base.targetBasePath = TGT_BASE_PATH_VALUE;
base.tableType = TABLE_TYPE_VALUE;
base.targetTableName = TARGET_TABLE_VALUE;
return base;
}
private static Stream<Arguments> provideValidCliArgs() {
HoodieDeltaStreamer.Config base = getBaseConfig();
// String parameter
HoodieDeltaStreamer.Config conf1 = getBaseConfig();
conf1.baseFileFormat = BASE_FILE_FORMAT_VALUE;
// Integer parameter
HoodieDeltaStreamer.Config conf2 = getBaseConfig();
conf2.sourceLimit = Long.parseLong(SOURCE_LIMIT_VALUE);
// Boolean Parameter
HoodieDeltaStreamer.Config conf3 = getBaseConfig();
conf3.enableHiveSync = true;
// ArrayList Parameter with 1 value
HoodieDeltaStreamer.Config conf4 = getBaseConfig();
conf4.configs = Arrays.asList(HOODIE_CONF_VALUE1);
// ArrayList Parameter with comma separated values
HoodieDeltaStreamer.Config conf5 = getBaseConfig();
conf5.configs = Arrays.asList(HOODIE_CONF_VALUE2);
// Multiple ArrayList values
HoodieDeltaStreamer.Config conf6 = getBaseConfig();
conf6.configs = Arrays.asList(HOODIE_CONF_VALUE1, HOODIE_CONF_VALUE2);
// Super set of all cases
HoodieDeltaStreamer.Config conf = getBaseConfig();
conf.baseFileFormat = BASE_FILE_FORMAT_VALUE;
conf.sourceLimit = Long.parseLong(SOURCE_LIMIT_VALUE);
conf.enableHiveSync = true;
conf.configs = Arrays.asList(HOODIE_CONF_VALUE1, HOODIE_CONF_VALUE2);
String[] allConfig = new String[]{TGT_BASE_PATH_PARAM, TGT_BASE_PATH_VALUE, SOURCE_LIMIT_PARAM,
SOURCE_LIMIT_VALUE, TABLE_TYPE_PARAM, TABLE_TYPE_VALUE, TARGET_TABLE_PARAM, TARGET_TABLE_VALUE,
BASE_FILE_FORMAT_PARAM, BASE_FILE_FORMAT_VALUE, ENABLE_HIVE_SYNC_PARAM, HOODIE_CONF_PARAM, HOODIE_CONF_VALUE1,
HOODIE_CONF_PARAM, HOODIE_CONF_VALUE2};
return Stream.of(
// Base
Arguments.of(new String[] {TGT_BASE_PATH_PARAM, TGT_BASE_PATH_VALUE,
TABLE_TYPE_PARAM, TABLE_TYPE_VALUE, TARGET_TABLE_PARAM, TARGET_TABLE_VALUE}, base),
// String
Arguments.of(new String[] {TGT_BASE_PATH_PARAM, TGT_BASE_PATH_VALUE,
TABLE_TYPE_PARAM, TABLE_TYPE_VALUE, TARGET_TABLE_PARAM, TARGET_TABLE_VALUE,
BASE_FILE_FORMAT_PARAM, BASE_FILE_FORMAT_VALUE}, conf1),
// Integer
Arguments.of(new String[] {TGT_BASE_PATH_PARAM, TGT_BASE_PATH_VALUE,
TABLE_TYPE_PARAM, TABLE_TYPE_VALUE, TARGET_TABLE_PARAM, TARGET_TABLE_VALUE,
SOURCE_LIMIT_PARAM, SOURCE_LIMIT_VALUE}, conf2),
// Boolean
Arguments.of(new String[] {TGT_BASE_PATH_PARAM, TGT_BASE_PATH_VALUE,
TABLE_TYPE_PARAM, TABLE_TYPE_VALUE, TARGET_TABLE_PARAM, TARGET_TABLE_VALUE,
ENABLE_HIVE_SYNC_PARAM}, conf3),
// Array List 1
Arguments.of(new String[] {TGT_BASE_PATH_PARAM, TGT_BASE_PATH_VALUE,
TABLE_TYPE_PARAM, TABLE_TYPE_VALUE, TARGET_TABLE_PARAM, TARGET_TABLE_VALUE,
HOODIE_CONF_PARAM, HOODIE_CONF_VALUE1}, conf4),
// Array List with comma
Arguments.of(new String[] {TGT_BASE_PATH_PARAM, TGT_BASE_PATH_VALUE,
TABLE_TYPE_PARAM, TABLE_TYPE_VALUE, TARGET_TABLE_PARAM, TARGET_TABLE_VALUE,
HOODIE_CONF_PARAM, HOODIE_CONF_VALUE2}, conf5),
// Array list with multiple values
Arguments.of(new String[] {TGT_BASE_PATH_PARAM, TGT_BASE_PATH_VALUE,
TABLE_TYPE_PARAM, TABLE_TYPE_VALUE, TARGET_TABLE_PARAM, TARGET_TABLE_VALUE,
HOODIE_CONF_PARAM, HOODIE_CONF_VALUE1, HOODIE_CONF_PARAM, HOODIE_CONF_VALUE2}, conf6),
// All
Arguments.of(allConfig, conf)
);
}
@ParameterizedTest
@MethodSource("provideValidCliArgs")
public void testValidCommandLineArgs(String[] args, HoodieDeltaStreamer.Config expected) {
assertEquals(expected, HoodieDeltaStreamer.getConfig(args));
}
@Test
public void testKafkaConnectCheckpointProvider() throws IOException {
String tableBasePath = dfsBasePath + "/test_table";