diff --git a/hoodie-spark/src/main/java/com/uber/hoodie/DataSourceUtils.java b/hoodie-spark/src/main/java/com/uber/hoodie/DataSourceUtils.java index e7b949485..d700ff607 100644 --- a/hoodie-spark/src/main/java/com/uber/hoodie/DataSourceUtils.java +++ b/hoodie-spark/src/main/java/com/uber/hoodie/DataSourceUtils.java @@ -90,10 +90,17 @@ public class DataSourceUtils { } /** - * Create a key generator class via reflection, passing in any configs needed + * Create a key generator class via reflection, passing in any configs needed. + * + * If the class name of key generator is configured through the properties file, i.e., {@code + * props}, use the corresponding key generator class; otherwise, use the default key generator + * class specified in {@code DataSourceWriteOptions}. */ - public static KeyGenerator createKeyGenerator(String keyGeneratorClass, - TypedProperties props) throws IOException { + public static KeyGenerator createKeyGenerator(TypedProperties props) throws IOException { + String keyGeneratorClass = props.getString( + DataSourceWriteOptions.KEYGENERATOR_CLASS_OPT_KEY(), + DataSourceWriteOptions.DEFAULT_KEYGENERATOR_CLASS_OPT_VAL() + ); try { return (KeyGenerator) ReflectionUtils.loadClass(keyGeneratorClass, props); } catch (Throwable e) { diff --git a/hoodie-spark/src/main/scala/com/uber/hoodie/HoodieSparkSqlWriter.scala b/hoodie-spark/src/main/scala/com/uber/hoodie/HoodieSparkSqlWriter.scala index cf44e091a..414cad4de 100644 --- a/hoodie-spark/src/main/scala/com/uber/hoodie/HoodieSparkSqlWriter.scala +++ b/hoodie-spark/src/main/scala/com/uber/hoodie/HoodieSparkSqlWriter.scala @@ -84,10 +84,7 @@ private[hoodie] object HoodieSparkSqlWriter { log.info(s"Registered avro schema : ${schema.toString(true)}") // Convert to RDD[HoodieRecord] - val keyGenerator = DataSourceUtils.createKeyGenerator( - parameters(KEYGENERATOR_CLASS_OPT_KEY), - toProperties(parameters) - ) + val keyGenerator = DataSourceUtils.createKeyGenerator(toProperties(parameters)) val genericRecords: RDD[GenericRecord] = AvroConversionUtils.createRdd(df, structName, nameSpace) val hoodieAllIncomingRecords = genericRecords.map(gr => { val orderingVal = DataSourceUtils.getNestedFieldValAsString( diff --git a/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/deltastreamer/DeltaSync.java b/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/deltastreamer/DeltaSync.java index 89e5c7311..00d270b6a 100644 --- a/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/deltastreamer/DeltaSync.java +++ b/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/deltastreamer/DeltaSync.java @@ -171,7 +171,7 @@ public class DeltaSync implements Serializable { refreshTimeline(); this.transformer = UtilHelpers.createTransformer(cfg.transformerClassName); - this.keyGenerator = DataSourceUtils.createKeyGenerator(cfg.keyGeneratorClass, props); + this.keyGenerator = DataSourceUtils.createKeyGenerator(props); this.formatAdapter = new SourceFormatAdapter(UtilHelpers.createSource(cfg.sourceClassName, props, jssc, sparkSession, schemaProvider)); diff --git a/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/deltastreamer/HoodieDeltaStreamer.java b/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/deltastreamer/HoodieDeltaStreamer.java index c49f3f836..195154680 100644 --- a/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/deltastreamer/HoodieDeltaStreamer.java +++ b/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/deltastreamer/HoodieDeltaStreamer.java @@ -27,7 +27,6 @@ import com.beust.jcommander.ParameterException; import com.google.common.base.Preconditions; import com.uber.hoodie.HoodieWriteClient; import com.uber.hoodie.OverwriteWithLatestAvroPayload; -import com.uber.hoodie.SimpleKeyGenerator; import com.uber.hoodie.common.model.HoodieTableType; import com.uber.hoodie.common.table.HoodieTableMetaClient; import com.uber.hoodie.common.table.timeline.HoodieInstant; @@ -181,11 +180,6 @@ public class HoodieDeltaStreamer implements Serializable { + " to break ties between records with same key in input data. Default: 'ts' holding unix timestamp of record") public String sourceOrderingField = "ts"; - @Parameter(names = {"--key-generator-class"}, description = "Subclass of com.uber.hoodie.KeyGenerator " - + "to generate a HoodieKey from the given avro record. Built in: SimpleKeyGenerator (uses " - + "provided field names as recordkey & partitionpath. Nested fields specified via dot notation, e.g: a.b.c)") - public String keyGeneratorClass = SimpleKeyGenerator.class.getName(); - @Parameter(names = {"--payload-class"}, description = "subclass of HoodieRecordPayload, that works off " + "a GenericRecord. Implement your own, if you want to do something other than overwriting existing value") public String payloadClassName = OverwriteWithLatestAvroPayload.class.getName(); diff --git a/hoodie-utilities/src/test/java/com/uber/hoodie/utilities/TestHoodieDeltaStreamer.java b/hoodie-utilities/src/test/java/com/uber/hoodie/utilities/TestHoodieDeltaStreamer.java index 68a5c8db5..14b6c6abc 100644 --- a/hoodie-utilities/src/test/java/com/uber/hoodie/utilities/TestHoodieDeltaStreamer.java +++ b/hoodie-utilities/src/test/java/com/uber/hoodie/utilities/TestHoodieDeltaStreamer.java @@ -23,6 +23,7 @@ import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; import com.uber.hoodie.DataSourceWriteOptions; +import com.uber.hoodie.SimpleKeyGenerator; import com.uber.hoodie.common.model.HoodieCommitMetadata; import com.uber.hoodie.common.model.HoodieTableType; import com.uber.hoodie.common.table.HoodieTableMetaClient; @@ -80,7 +81,8 @@ import org.junit.Test; * upserts, inserts. Check counts at the end. */ public class TestHoodieDeltaStreamer extends UtilitiesTestBase { - + private static final String PROPS_FILENAME_TEST_SOURCE = "test-source.properties"; + private static final String PROPS_FILENAME_TEST_INVALID = "test-invalid.properties"; private static volatile Logger log = LogManager.getLogger(TestHoodieDeltaStreamer.class); @BeforeClass @@ -96,6 +98,7 @@ public class TestHoodieDeltaStreamer extends UtilitiesTestBase { TypedProperties props = new TypedProperties(); props.setProperty("include", "sql-transformer.properties"); + props.setProperty("hoodie.datasource.write.keygenerator.class", TestGenerator.class.getName()); props.setProperty("hoodie.datasource.write.recordkey.field", "_row_key"); props.setProperty("hoodie.datasource.write.partitionpath.field", "not_there"); props.setProperty("hoodie.deltastreamer.schemaprovider.source.schema.file", dfsBasePath + "/source.avsc"); @@ -108,7 +111,7 @@ public class TestHoodieDeltaStreamer extends UtilitiesTestBase { props.setProperty(DataSourceWriteOptions.HIVE_PARTITION_FIELDS_OPT_KEY(), "datestr"); props.setProperty(DataSourceWriteOptions.HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY(), MultiPartKeysValueExtractor.class.getName()); - UtilitiesTestBase.Helpers.savePropsToDFS(props, dfs, dfsBasePath + "/test-source.properties"); + UtilitiesTestBase.Helpers.savePropsToDFS(props, dfs, dfsBasePath + "/" + PROPS_FILENAME_TEST_SOURCE); // Properties used for the delta-streamer which incrementally pulls from upstream Hudi source table and writes to // downstream hudi table @@ -122,6 +125,17 @@ public class TestHoodieDeltaStreamer extends UtilitiesTestBase { downstreamProps.setProperty("hoodie.deltastreamer.schemaprovider.target.schema.file", dfsBasePath + "/target.avsc"); UtilitiesTestBase.Helpers.savePropsToDFS(downstreamProps, dfs, dfsBasePath + "/test-downstream-source.properties"); + + // Properties used for testing invalid key generator + TypedProperties invalidProps = new TypedProperties(); + invalidProps.setProperty("include", "sql-transformer.properties"); + invalidProps.setProperty("hoodie.datasource.write.keygenerator.class", "invalid"); + invalidProps.setProperty("hoodie.datasource.write.recordkey.field", "_row_key"); + invalidProps.setProperty("hoodie.datasource.write.partitionpath.field", "not_there"); + invalidProps.setProperty("hoodie.deltastreamer.schemaprovider.source.schema.file", dfsBasePath + "/source.avsc"); + invalidProps.setProperty("hoodie.deltastreamer.schemaprovider.target.schema.file", dfsBasePath + "/target.avsc"); + UtilitiesTestBase.Helpers.savePropsToDFS(invalidProps, dfs, + dfsBasePath + "/" + PROPS_FILENAME_TEST_INVALID); } @AfterClass @@ -147,11 +161,11 @@ public class TestHoodieDeltaStreamer extends UtilitiesTestBase { } static HoodieDeltaStreamer.Config makeConfig(String basePath, Operation op, String transformerClassName) { - return makeConfig(basePath, op, transformerClassName, false); + return makeConfig(basePath, op, transformerClassName, PROPS_FILENAME_TEST_SOURCE, false); } static HoodieDeltaStreamer.Config makeConfig(String basePath, Operation op, String transformerClassName, - boolean enableHiveSync) { + String propsFilename, boolean enableHiveSync) { HoodieDeltaStreamer.Config cfg = new HoodieDeltaStreamer.Config(); cfg.targetBasePath = basePath; cfg.targetTableName = "hoodie_trips"; @@ -161,7 +175,7 @@ public class TestHoodieDeltaStreamer extends UtilitiesTestBase { cfg.operation = op; cfg.enableHiveSync = enableHiveSync; cfg.sourceOrderingField = "timestamp"; - cfg.propsFilePath = dfsBasePath + "/test-source.properties"; + cfg.propsFilePath = dfsBasePath + "/" + propsFilename; cfg.sourceLimit = 1000; cfg.schemaProviderClassName = FilebasedSchemaProvider.class.getName(); return cfg; @@ -259,10 +273,31 @@ public class TestHoodieDeltaStreamer extends UtilitiesTestBase { @Test public void testProps() throws IOException { - TypedProperties props = new DFSPropertiesConfiguration(dfs, new Path(dfsBasePath + "/test-source.properties")) - .getConfig(); + TypedProperties props = new DFSPropertiesConfiguration( + dfs, new Path(dfsBasePath + "/" + PROPS_FILENAME_TEST_SOURCE)).getConfig(); assertEquals(2, props.getInteger("hoodie.upsert.shuffle.parallelism")); assertEquals("_row_key", props.getString("hoodie.datasource.write.recordkey.field")); + assertEquals( + "com.uber.hoodie.utilities.TestHoodieDeltaStreamer$TestGenerator", + props.getString("hoodie.datasource.write.keygenerator.class") + ); + } + + @Test + public void testPropsWithInvalidKeyGenerator() throws Exception { + try { + String datasetBasePath = dfsBasePath + "/test_dataset"; + HoodieDeltaStreamer deltaStreamer = new HoodieDeltaStreamer( + TestHelpers.makeConfig( + datasetBasePath, Operation.BULK_INSERT, TripsWithDistanceTransformer.class.getName(), + PROPS_FILENAME_TEST_INVALID, false), jsc); + deltaStreamer.sync(); + fail("Should error out when setting the key generator class property to an invalid value"); + } catch (IOException e) { + //expected + log.error("Expected error during getting the key generator", e); + assertTrue(e.getMessage().contains("Could not load key generator class")); + } } @Test @@ -370,7 +405,7 @@ public class TestHoodieDeltaStreamer extends UtilitiesTestBase { // Initial bulk insert to ingest to first hudi table HoodieDeltaStreamer.Config cfg = TestHelpers.makeConfig(datasetBasePath, Operation.BULK_INSERT, - SqlQueryBasedTransformer.class.getName(), true); + SqlQueryBasedTransformer.class.getName(), PROPS_FILENAME_TEST_SOURCE, true); new HoodieDeltaStreamer(cfg, jsc, dfs, hiveServer.getHiveConf()).sync(); TestHelpers.assertRecordCount(1000, datasetBasePath + "/*/*.parquet", sqlContext); TestHelpers.assertDistanceCount(1000, datasetBasePath + "/*/*.parquet", sqlContext); @@ -524,4 +559,11 @@ public class TestHoodieDeltaStreamer extends UtilitiesTestBase { functions.col("end_lat"), functions.col("begin_lon"), functions.col("end_lat"))); } } + + public static class TestGenerator extends SimpleKeyGenerator { + + public TestGenerator(TypedProperties props) { + super(props); + } + } }