diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/InputBatch.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/InputBatch.java index 6f15dea97..dcf56f32b 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/InputBatch.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/InputBatch.java @@ -19,6 +19,7 @@ package org.apache.hudi.utilities.sources; import org.apache.hudi.common.util.Option; +import org.apache.hudi.exception.HoodieException; import org.apache.hudi.utilities.schema.SchemaProvider; public class InputBatch { @@ -48,6 +49,9 @@ public class InputBatch { } public SchemaProvider getSchemaProvider() { + if (schemaProvider == null) { + throw new HoodieException("Please provide a valid schema provider class!"); + } return schemaProvider; } } diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHoodieDeltaStreamer.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHoodieDeltaStreamer.java index 0092de0ea..5cdb532dc 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHoodieDeltaStreamer.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHoodieDeltaStreamer.java @@ -31,6 +31,7 @@ import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.function.Function; import java.util.stream.Collectors; +import org.apache.avro.Schema; import org.apache.avro.generic.GenericRecord; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -46,12 +47,14 @@ import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.TypedProperties; import org.apache.hudi.config.HoodieCompactionConfig; import org.apache.hudi.exception.DatasetNotFoundException; +import org.apache.hudi.exception.HoodieException; import org.apache.hudi.hive.HiveSyncConfig; import org.apache.hudi.hive.HoodieHiveClient; import org.apache.hudi.hive.MultiPartKeysValueExtractor; import org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer; import org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer.Operation; import org.apache.hudi.utilities.schema.FilebasedSchemaProvider; +import org.apache.hudi.utilities.schema.SchemaProvider; import org.apache.hudi.utilities.sources.DistributedTestDataSource; import org.apache.hudi.utilities.sources.HoodieIncrSource; import org.apache.hudi.utilities.sources.InputBatch; @@ -169,7 +172,12 @@ public class TestHoodieDeltaStreamer extends UtilitiesTestBase { } static HoodieDeltaStreamer.Config makeConfig(String basePath, Operation op, String transformerClassName, - String propsFilename, boolean enableHiveSync) { + String propsFilename, boolean enableHiveSync) { + return makeConfig(basePath, op, transformerClassName, propsFilename, enableHiveSync, true); + } + + static HoodieDeltaStreamer.Config makeConfig(String basePath, Operation op, String transformerClassName, + String propsFilename, boolean enableHiveSync, boolean useSchemaProviderClass) { HoodieDeltaStreamer.Config cfg = new HoodieDeltaStreamer.Config(); cfg.targetBasePath = basePath; cfg.targetTableName = "hoodie_trips"; @@ -181,12 +189,14 @@ public class TestHoodieDeltaStreamer extends UtilitiesTestBase { cfg.sourceOrderingField = "timestamp"; cfg.propsFilePath = dfsBasePath + "/" + propsFilename; cfg.sourceLimit = 1000; - cfg.schemaProviderClassName = FilebasedSchemaProvider.class.getName(); + if (useSchemaProviderClass) { + cfg.schemaProviderClassName = FilebasedSchemaProvider.class.getName(); + } return cfg; } static HoodieDeltaStreamer.Config makeConfigForHudiIncrSrc(String srcBasePath, String basePath, Operation op, - boolean addReadLatestOnMissingCkpt) { + boolean addReadLatestOnMissingCkpt, String schemaProviderClassName) { HoodieDeltaStreamer.Config cfg = new HoodieDeltaStreamer.Config(); cfg.targetBasePath = basePath; cfg.targetTableName = "hoodie_trips_copy"; @@ -196,6 +206,9 @@ public class TestHoodieDeltaStreamer extends UtilitiesTestBase { cfg.sourceOrderingField = "timestamp"; cfg.propsFilePath = dfsBasePath + "/test-downstream-source.properties"; cfg.sourceLimit = 1000; + if (null != schemaProviderClassName) { + cfg.schemaProviderClassName = schemaProviderClassName; + } List cfgs = new ArrayList<>(); cfgs.add("hoodie.deltastreamer.source.hoodieincr.read_latest_on_missing_ckpt=" + addReadLatestOnMissingCkpt); cfgs.add("hoodie.deltastreamer.source.hoodieincr.path=" + srcBasePath); @@ -412,7 +425,8 @@ public class TestHoodieDeltaStreamer extends UtilitiesTestBase { // Now incrementally pull from the above hudi table and ingest to second table HoodieDeltaStreamer.Config downstreamCfg = - TestHelpers.makeConfigForHudiIncrSrc(datasetBasePath, downstreamDatasetBasePath, Operation.BULK_INSERT, true); + TestHelpers.makeConfigForHudiIncrSrc(datasetBasePath, downstreamDatasetBasePath, Operation.BULK_INSERT, + true, null); new HoodieDeltaStreamer(downstreamCfg, jsc, dfs, hiveServer.getHiveConf()).sync(); TestHelpers.assertRecordCount(1000, downstreamDatasetBasePath + "/*/*.parquet", sqlContext); TestHelpers.assertDistanceCount(1000, downstreamDatasetBasePath + "/*/*.parquet", sqlContext); @@ -428,7 +442,10 @@ public class TestHoodieDeltaStreamer extends UtilitiesTestBase { TestHelpers.assertCommitMetadata("00000", datasetBasePath, dfs, 1); // with no change in upstream table, no change in downstream too when pulled. - new HoodieDeltaStreamer(downstreamCfg, jsc).sync(); + HoodieDeltaStreamer.Config downstreamCfg1 = + TestHelpers.makeConfigForHudiIncrSrc(datasetBasePath, downstreamDatasetBasePath, + Operation.BULK_INSERT, true, DummySchemaProvider.class.getName()); + new HoodieDeltaStreamer(downstreamCfg1, jsc).sync(); TestHelpers.assertRecordCount(1000, downstreamDatasetBasePath + "/*/*.parquet", sqlContext); TestHelpers.assertDistanceCount(1000, downstreamDatasetBasePath + "/*/*.parquet", sqlContext); TestHelpers.assertDistanceCountWithExactValue(1000, downstreamDatasetBasePath + "/*/*.parquet", sqlContext); @@ -447,7 +464,8 @@ public class TestHoodieDeltaStreamer extends UtilitiesTestBase { // Incrementally pull changes in upstream hudi table and apply to downstream table downstreamCfg = - TestHelpers.makeConfigForHudiIncrSrc(datasetBasePath, downstreamDatasetBasePath, Operation.UPSERT, false); + TestHelpers.makeConfigForHudiIncrSrc(datasetBasePath, downstreamDatasetBasePath, Operation.UPSERT, + false, null); downstreamCfg.sourceLimit = 2000; new HoodieDeltaStreamer(downstreamCfg, jsc).sync(); TestHelpers.assertRecordCount(2000, downstreamDatasetBasePath + "/*/*.parquet", sqlContext); @@ -467,6 +485,21 @@ public class TestHoodieDeltaStreamer extends UtilitiesTestBase { hiveClient.getLastCommitTimeSynced().get()); } + @Test + public void testNullSchemaProvider() throws Exception { + String dataSetBasePath = dfsBasePath + "/test_dataset"; + HoodieDeltaStreamer.Config cfg = TestHelpers.makeConfig(dataSetBasePath, Operation.BULK_INSERT, + SqlQueryBasedTransformer.class.getName(), PROPS_FILENAME_TEST_SOURCE, true, + false); + try { + new HoodieDeltaStreamer(cfg, jsc, dfs, hiveServer.getHiveConf()).sync(); + fail("Should error out when schema provider is not provided"); + } catch (HoodieException e) { + log.error("Expected error during reading data from source ", e); + assertTrue(e.getMessage().contains("Please provide a valid schema provider class!")); + } + } + @Test public void testFilterDupes() throws Exception { String datasetBasePath = dfsBasePath + "/test_dupes_dataset"; @@ -577,4 +610,16 @@ public class TestHoodieDeltaStreamer extends UtilitiesTestBase { return sparkSession.createDataFrame(jsc.emptyRDD(), rowDataset.schema()); } } + + public static class DummySchemaProvider extends SchemaProvider { + + public DummySchemaProvider(TypedProperties props, JavaSparkContext jssc) { + super(props, jssc); + } + + @Override + public Schema getSourceSchema() { + return Schema.create(Schema.Type.NULL); + } + } }