1
0

[HUDI-161] Remove --key-generator-class CLI arg in HoodieDeltaStreamer and use key generator class specified in datasource properties. (#781)

This commit is contained in:
Yihua Guo
2019-07-12 13:45:49 -07:00
committed by vinoth chandar
parent 11c4121f73
commit 621c246fa9
5 changed files with 62 additions and 22 deletions

View File

@@ -90,10 +90,17 @@ public class DataSourceUtils {
}
/**
* Create a key generator class via reflection, passing in any configs needed
* Create a key generator class via reflection, passing in any configs needed.
*
* If the class name of key generator is configured through the properties file, i.e., {@code
* props}, use the corresponding key generator class; otherwise, use the default key generator
* class specified in {@code DataSourceWriteOptions}.
*/
public static KeyGenerator createKeyGenerator(String keyGeneratorClass,
TypedProperties props) throws IOException {
public static KeyGenerator createKeyGenerator(TypedProperties props) throws IOException {
String keyGeneratorClass = props.getString(
DataSourceWriteOptions.KEYGENERATOR_CLASS_OPT_KEY(),
DataSourceWriteOptions.DEFAULT_KEYGENERATOR_CLASS_OPT_VAL()
);
try {
return (KeyGenerator) ReflectionUtils.loadClass(keyGeneratorClass, props);
} catch (Throwable e) {

View File

@@ -84,10 +84,7 @@ private[hoodie] object HoodieSparkSqlWriter {
log.info(s"Registered avro schema : ${schema.toString(true)}")
// Convert to RDD[HoodieRecord]
val keyGenerator = DataSourceUtils.createKeyGenerator(
parameters(KEYGENERATOR_CLASS_OPT_KEY),
toProperties(parameters)
)
val keyGenerator = DataSourceUtils.createKeyGenerator(toProperties(parameters))
val genericRecords: RDD[GenericRecord] = AvroConversionUtils.createRdd(df, structName, nameSpace)
val hoodieAllIncomingRecords = genericRecords.map(gr => {
val orderingVal = DataSourceUtils.getNestedFieldValAsString(

View File

@@ -171,7 +171,7 @@ public class DeltaSync implements Serializable {
refreshTimeline();
this.transformer = UtilHelpers.createTransformer(cfg.transformerClassName);
this.keyGenerator = DataSourceUtils.createKeyGenerator(cfg.keyGeneratorClass, props);
this.keyGenerator = DataSourceUtils.createKeyGenerator(props);
this.formatAdapter = new SourceFormatAdapter(UtilHelpers.createSource(cfg.sourceClassName, props, jssc,
sparkSession, schemaProvider));

View File

@@ -27,7 +27,6 @@ import com.beust.jcommander.ParameterException;
import com.google.common.base.Preconditions;
import com.uber.hoodie.HoodieWriteClient;
import com.uber.hoodie.OverwriteWithLatestAvroPayload;
import com.uber.hoodie.SimpleKeyGenerator;
import com.uber.hoodie.common.model.HoodieTableType;
import com.uber.hoodie.common.table.HoodieTableMetaClient;
import com.uber.hoodie.common.table.timeline.HoodieInstant;
@@ -181,11 +180,6 @@ public class HoodieDeltaStreamer implements Serializable {
+ " to break ties between records with same key in input data. Default: 'ts' holding unix timestamp of record")
public String sourceOrderingField = "ts";
@Parameter(names = {"--key-generator-class"}, description = "Subclass of com.uber.hoodie.KeyGenerator "
+ "to generate a HoodieKey from the given avro record. Built in: SimpleKeyGenerator (uses "
+ "provided field names as recordkey & partitionpath. Nested fields specified via dot notation, e.g: a.b.c)")
public String keyGeneratorClass = SimpleKeyGenerator.class.getName();
@Parameter(names = {"--payload-class"}, description = "subclass of HoodieRecordPayload, that works off "
+ "a GenericRecord. Implement your own, if you want to do something other than overwriting existing value")
public String payloadClassName = OverwriteWithLatestAvroPayload.class.getName();

View File

@@ -23,6 +23,7 @@ import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import com.uber.hoodie.DataSourceWriteOptions;
import com.uber.hoodie.SimpleKeyGenerator;
import com.uber.hoodie.common.model.HoodieCommitMetadata;
import com.uber.hoodie.common.model.HoodieTableType;
import com.uber.hoodie.common.table.HoodieTableMetaClient;
@@ -80,7 +81,8 @@ import org.junit.Test;
* upserts, inserts. Check counts at the end.
*/
public class TestHoodieDeltaStreamer extends UtilitiesTestBase {
private static final String PROPS_FILENAME_TEST_SOURCE = "test-source.properties";
private static final String PROPS_FILENAME_TEST_INVALID = "test-invalid.properties";
private static volatile Logger log = LogManager.getLogger(TestHoodieDeltaStreamer.class);
@BeforeClass
@@ -96,6 +98,7 @@ public class TestHoodieDeltaStreamer extends UtilitiesTestBase {
TypedProperties props = new TypedProperties();
props.setProperty("include", "sql-transformer.properties");
props.setProperty("hoodie.datasource.write.keygenerator.class", TestGenerator.class.getName());
props.setProperty("hoodie.datasource.write.recordkey.field", "_row_key");
props.setProperty("hoodie.datasource.write.partitionpath.field", "not_there");
props.setProperty("hoodie.deltastreamer.schemaprovider.source.schema.file", dfsBasePath + "/source.avsc");
@@ -108,7 +111,7 @@ public class TestHoodieDeltaStreamer extends UtilitiesTestBase {
props.setProperty(DataSourceWriteOptions.HIVE_PARTITION_FIELDS_OPT_KEY(), "datestr");
props.setProperty(DataSourceWriteOptions.HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY(),
MultiPartKeysValueExtractor.class.getName());
UtilitiesTestBase.Helpers.savePropsToDFS(props, dfs, dfsBasePath + "/test-source.properties");
UtilitiesTestBase.Helpers.savePropsToDFS(props, dfs, dfsBasePath + "/" + PROPS_FILENAME_TEST_SOURCE);
// Properties used for the delta-streamer which incrementally pulls from upstream Hudi source table and writes to
// downstream hudi table
@@ -122,6 +125,17 @@ public class TestHoodieDeltaStreamer extends UtilitiesTestBase {
downstreamProps.setProperty("hoodie.deltastreamer.schemaprovider.target.schema.file", dfsBasePath + "/target.avsc");
UtilitiesTestBase.Helpers.savePropsToDFS(downstreamProps, dfs,
dfsBasePath + "/test-downstream-source.properties");
// Properties used for testing invalid key generator
TypedProperties invalidProps = new TypedProperties();
invalidProps.setProperty("include", "sql-transformer.properties");
invalidProps.setProperty("hoodie.datasource.write.keygenerator.class", "invalid");
invalidProps.setProperty("hoodie.datasource.write.recordkey.field", "_row_key");
invalidProps.setProperty("hoodie.datasource.write.partitionpath.field", "not_there");
invalidProps.setProperty("hoodie.deltastreamer.schemaprovider.source.schema.file", dfsBasePath + "/source.avsc");
invalidProps.setProperty("hoodie.deltastreamer.schemaprovider.target.schema.file", dfsBasePath + "/target.avsc");
UtilitiesTestBase.Helpers.savePropsToDFS(invalidProps, dfs,
dfsBasePath + "/" + PROPS_FILENAME_TEST_INVALID);
}
@AfterClass
@@ -147,11 +161,11 @@ public class TestHoodieDeltaStreamer extends UtilitiesTestBase {
}
static HoodieDeltaStreamer.Config makeConfig(String basePath, Operation op, String transformerClassName) {
return makeConfig(basePath, op, transformerClassName, false);
return makeConfig(basePath, op, transformerClassName, PROPS_FILENAME_TEST_SOURCE, false);
}
static HoodieDeltaStreamer.Config makeConfig(String basePath, Operation op, String transformerClassName,
boolean enableHiveSync) {
String propsFilename, boolean enableHiveSync) {
HoodieDeltaStreamer.Config cfg = new HoodieDeltaStreamer.Config();
cfg.targetBasePath = basePath;
cfg.targetTableName = "hoodie_trips";
@@ -161,7 +175,7 @@ public class TestHoodieDeltaStreamer extends UtilitiesTestBase {
cfg.operation = op;
cfg.enableHiveSync = enableHiveSync;
cfg.sourceOrderingField = "timestamp";
cfg.propsFilePath = dfsBasePath + "/test-source.properties";
cfg.propsFilePath = dfsBasePath + "/" + propsFilename;
cfg.sourceLimit = 1000;
cfg.schemaProviderClassName = FilebasedSchemaProvider.class.getName();
return cfg;
@@ -259,10 +273,31 @@ public class TestHoodieDeltaStreamer extends UtilitiesTestBase {
@Test
public void testProps() throws IOException {
TypedProperties props = new DFSPropertiesConfiguration(dfs, new Path(dfsBasePath + "/test-source.properties"))
.getConfig();
TypedProperties props = new DFSPropertiesConfiguration(
dfs, new Path(dfsBasePath + "/" + PROPS_FILENAME_TEST_SOURCE)).getConfig();
assertEquals(2, props.getInteger("hoodie.upsert.shuffle.parallelism"));
assertEquals("_row_key", props.getString("hoodie.datasource.write.recordkey.field"));
assertEquals(
"com.uber.hoodie.utilities.TestHoodieDeltaStreamer$TestGenerator",
props.getString("hoodie.datasource.write.keygenerator.class")
);
}
@Test
public void testPropsWithInvalidKeyGenerator() throws Exception {
try {
String datasetBasePath = dfsBasePath + "/test_dataset";
HoodieDeltaStreamer deltaStreamer = new HoodieDeltaStreamer(
TestHelpers.makeConfig(
datasetBasePath, Operation.BULK_INSERT, TripsWithDistanceTransformer.class.getName(),
PROPS_FILENAME_TEST_INVALID, false), jsc);
deltaStreamer.sync();
fail("Should error out when setting the key generator class property to an invalid value");
} catch (IOException e) {
//expected
log.error("Expected error during getting the key generator", e);
assertTrue(e.getMessage().contains("Could not load key generator class"));
}
}
@Test
@@ -370,7 +405,7 @@ public class TestHoodieDeltaStreamer extends UtilitiesTestBase {
// Initial bulk insert to ingest to first hudi table
HoodieDeltaStreamer.Config cfg = TestHelpers.makeConfig(datasetBasePath, Operation.BULK_INSERT,
SqlQueryBasedTransformer.class.getName(), true);
SqlQueryBasedTransformer.class.getName(), PROPS_FILENAME_TEST_SOURCE, true);
new HoodieDeltaStreamer(cfg, jsc, dfs, hiveServer.getHiveConf()).sync();
TestHelpers.assertRecordCount(1000, datasetBasePath + "/*/*.parquet", sqlContext);
TestHelpers.assertDistanceCount(1000, datasetBasePath + "/*/*.parquet", sqlContext);
@@ -524,4 +559,11 @@ public class TestHoodieDeltaStreamer extends UtilitiesTestBase {
functions.col("end_lat"), functions.col("begin_lon"), functions.col("end_lat")));
}
}
public static class TestGenerator extends SimpleKeyGenerator {
public TestGenerator(TypedProperties props) {
super(props);
}
}
}