1
0

[HUDI-253]: added validations for schema provider class (#995)

This commit is contained in:
Pratyaksh Sharma
2019-11-11 19:33:44 +05:30
committed by vinoth chandar
parent 1483b97018
commit 5f1309407a
2 changed files with 55 additions and 6 deletions

View File

@@ -19,6 +19,7 @@
package org.apache.hudi.utilities.sources;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.utilities.schema.SchemaProvider;
public class InputBatch<T> {
@@ -48,6 +49,9 @@ public class InputBatch<T> {
}
public SchemaProvider getSchemaProvider() {
if (schemaProvider == null) {
throw new HoodieException("Please provide a valid schema provider class!");
}
return schemaProvider;
}
}

View File

@@ -31,6 +31,7 @@ import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@@ -46,12 +47,14 @@ import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.TypedProperties;
import org.apache.hudi.config.HoodieCompactionConfig;
import org.apache.hudi.exception.DatasetNotFoundException;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.hive.HiveSyncConfig;
import org.apache.hudi.hive.HoodieHiveClient;
import org.apache.hudi.hive.MultiPartKeysValueExtractor;
import org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer;
import org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer.Operation;
import org.apache.hudi.utilities.schema.FilebasedSchemaProvider;
import org.apache.hudi.utilities.schema.SchemaProvider;
import org.apache.hudi.utilities.sources.DistributedTestDataSource;
import org.apache.hudi.utilities.sources.HoodieIncrSource;
import org.apache.hudi.utilities.sources.InputBatch;
@@ -169,7 +172,12 @@ public class TestHoodieDeltaStreamer extends UtilitiesTestBase {
}
static HoodieDeltaStreamer.Config makeConfig(String basePath, Operation op, String transformerClassName,
String propsFilename, boolean enableHiveSync) {
String propsFilename, boolean enableHiveSync) {
return makeConfig(basePath, op, transformerClassName, propsFilename, enableHiveSync, true);
}
static HoodieDeltaStreamer.Config makeConfig(String basePath, Operation op, String transformerClassName,
String propsFilename, boolean enableHiveSync, boolean useSchemaProviderClass) {
HoodieDeltaStreamer.Config cfg = new HoodieDeltaStreamer.Config();
cfg.targetBasePath = basePath;
cfg.targetTableName = "hoodie_trips";
@@ -181,12 +189,14 @@ public class TestHoodieDeltaStreamer extends UtilitiesTestBase {
cfg.sourceOrderingField = "timestamp";
cfg.propsFilePath = dfsBasePath + "/" + propsFilename;
cfg.sourceLimit = 1000;
cfg.schemaProviderClassName = FilebasedSchemaProvider.class.getName();
if (useSchemaProviderClass) {
cfg.schemaProviderClassName = FilebasedSchemaProvider.class.getName();
}
return cfg;
}
static HoodieDeltaStreamer.Config makeConfigForHudiIncrSrc(String srcBasePath, String basePath, Operation op,
boolean addReadLatestOnMissingCkpt) {
boolean addReadLatestOnMissingCkpt, String schemaProviderClassName) {
HoodieDeltaStreamer.Config cfg = new HoodieDeltaStreamer.Config();
cfg.targetBasePath = basePath;
cfg.targetTableName = "hoodie_trips_copy";
@@ -196,6 +206,9 @@ public class TestHoodieDeltaStreamer extends UtilitiesTestBase {
cfg.sourceOrderingField = "timestamp";
cfg.propsFilePath = dfsBasePath + "/test-downstream-source.properties";
cfg.sourceLimit = 1000;
if (null != schemaProviderClassName) {
cfg.schemaProviderClassName = schemaProviderClassName;
}
List<String> cfgs = new ArrayList<>();
cfgs.add("hoodie.deltastreamer.source.hoodieincr.read_latest_on_missing_ckpt=" + addReadLatestOnMissingCkpt);
cfgs.add("hoodie.deltastreamer.source.hoodieincr.path=" + srcBasePath);
@@ -412,7 +425,8 @@ public class TestHoodieDeltaStreamer extends UtilitiesTestBase {
// Now incrementally pull from the above hudi table and ingest to second table
HoodieDeltaStreamer.Config downstreamCfg =
TestHelpers.makeConfigForHudiIncrSrc(datasetBasePath, downstreamDatasetBasePath, Operation.BULK_INSERT, true);
TestHelpers.makeConfigForHudiIncrSrc(datasetBasePath, downstreamDatasetBasePath, Operation.BULK_INSERT,
true, null);
new HoodieDeltaStreamer(downstreamCfg, jsc, dfs, hiveServer.getHiveConf()).sync();
TestHelpers.assertRecordCount(1000, downstreamDatasetBasePath + "/*/*.parquet", sqlContext);
TestHelpers.assertDistanceCount(1000, downstreamDatasetBasePath + "/*/*.parquet", sqlContext);
@@ -428,7 +442,10 @@ public class TestHoodieDeltaStreamer extends UtilitiesTestBase {
TestHelpers.assertCommitMetadata("00000", datasetBasePath, dfs, 1);
// with no change in upstream table, no change in downstream too when pulled.
new HoodieDeltaStreamer(downstreamCfg, jsc).sync();
HoodieDeltaStreamer.Config downstreamCfg1 =
TestHelpers.makeConfigForHudiIncrSrc(datasetBasePath, downstreamDatasetBasePath,
Operation.BULK_INSERT, true, DummySchemaProvider.class.getName());
new HoodieDeltaStreamer(downstreamCfg1, jsc).sync();
TestHelpers.assertRecordCount(1000, downstreamDatasetBasePath + "/*/*.parquet", sqlContext);
TestHelpers.assertDistanceCount(1000, downstreamDatasetBasePath + "/*/*.parquet", sqlContext);
TestHelpers.assertDistanceCountWithExactValue(1000, downstreamDatasetBasePath + "/*/*.parquet", sqlContext);
@@ -447,7 +464,8 @@ public class TestHoodieDeltaStreamer extends UtilitiesTestBase {
// Incrementally pull changes in upstream hudi table and apply to downstream table
downstreamCfg =
TestHelpers.makeConfigForHudiIncrSrc(datasetBasePath, downstreamDatasetBasePath, Operation.UPSERT, false);
TestHelpers.makeConfigForHudiIncrSrc(datasetBasePath, downstreamDatasetBasePath, Operation.UPSERT,
false, null);
downstreamCfg.sourceLimit = 2000;
new HoodieDeltaStreamer(downstreamCfg, jsc).sync();
TestHelpers.assertRecordCount(2000, downstreamDatasetBasePath + "/*/*.parquet", sqlContext);
@@ -467,6 +485,21 @@ public class TestHoodieDeltaStreamer extends UtilitiesTestBase {
hiveClient.getLastCommitTimeSynced().get());
}
@Test
public void testNullSchemaProvider() throws Exception {
String dataSetBasePath = dfsBasePath + "/test_dataset";
HoodieDeltaStreamer.Config cfg = TestHelpers.makeConfig(dataSetBasePath, Operation.BULK_INSERT,
SqlQueryBasedTransformer.class.getName(), PROPS_FILENAME_TEST_SOURCE, true,
false);
try {
new HoodieDeltaStreamer(cfg, jsc, dfs, hiveServer.getHiveConf()).sync();
fail("Should error out when schema provider is not provided");
} catch (HoodieException e) {
log.error("Expected error during reading data from source ", e);
assertTrue(e.getMessage().contains("Please provide a valid schema provider class!"));
}
}
@Test
public void testFilterDupes() throws Exception {
String datasetBasePath = dfsBasePath + "/test_dupes_dataset";
@@ -577,4 +610,16 @@ public class TestHoodieDeltaStreamer extends UtilitiesTestBase {
return sparkSession.createDataFrame(jsc.emptyRDD(), rowDataset.schema());
}
}
public static class DummySchemaProvider extends SchemaProvider {
public DummySchemaProvider(TypedProperties props, JavaSparkContext jssc) {
super(props, jssc);
}
@Override
public Schema getSourceSchema() {
return Schema.create(Schema.Type.NULL);
}
}
}