From c8e19e2def0c33415bc3945ffb81f524c484c924 Mon Sep 17 00:00:00 2001 From: Alexander Filipchik Date: Sat, 19 Sep 2020 11:18:36 -0700 Subject: [PATCH] [HUDI-801] Adding a way to post process schema after it is fetched (#1524) * [HUDI-801] Adding a way to post process schema after it is fetched Co-authored-by: Alex Filipchik Co-authored-by: Balaji Varadarajan --- .../apache/hudi/utilities/UtilHelpers.java | 45 ++++++++++++- .../utilities/deltastreamer/DeltaSync.java | 3 +- .../deltastreamer/HoodieDeltaStreamer.java | 3 +- .../deltastreamer/SourceFormatAdapter.java | 25 +++---- .../schema/DelegatingSchemaProvider.java | 8 +++ .../utilities/schema/SchemaPostProcessor.java | 58 ++++++++++++++++ .../SchemaProviderWithPostProcessor.java | 54 +++++++++++++++ .../hudi/utilities/sources/RowSource.java | 5 +- .../hudi/utilities/DummySchemaProvider.java | 35 ++++++++++ .../utilities/TestSchemaPostProcessor.java | 67 +++++++++++++++++++ .../functional/TestHoodieDeltaStreamer.java | 15 +---- 11 files changed, 286 insertions(+), 32 deletions(-) create mode 100644 hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/SchemaPostProcessor.java create mode 100644 hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/SchemaProviderWithPostProcessor.java create mode 100644 hudi-utilities/src/test/java/org/apache/hudi/utilities/DummySchemaProvider.java create mode 100644 hudi-utilities/src/test/java/org/apache/hudi/utilities/TestSchemaPostProcessor.java diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/UtilHelpers.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/UtilHelpers.java index 976e8309f..ac4880585 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/UtilHelpers.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/UtilHelpers.java @@ -35,7 +35,12 @@ import org.apache.hudi.exception.HoodieException; import org.apache.hudi.exception.HoodieIOException; import org.apache.hudi.index.HoodieIndex; import org.apache.hudi.utilities.checkpointing.InitialCheckPointProvider; +import org.apache.hudi.utilities.schema.DelegatingSchemaProvider; +import org.apache.hudi.utilities.schema.RowBasedSchemaProvider; +import org.apache.hudi.utilities.schema.SchemaPostProcessor; +import org.apache.hudi.utilities.schema.SchemaPostProcessor.Config; import org.apache.hudi.utilities.schema.SchemaProvider; +import org.apache.hudi.utilities.schema.SchemaProviderWithPostProcessor; import org.apache.hudi.utilities.sources.Source; import org.apache.hudi.utilities.sources.helpers.DFSPathSelector; import org.apache.hudi.utilities.transform.ChainedTransformer; @@ -99,7 +104,7 @@ public class UtilHelpers { } public static SchemaProvider createSchemaProvider(String schemaProviderClass, TypedProperties cfg, - JavaSparkContext jssc) throws IOException { + JavaSparkContext jssc) throws IOException { try { return StringUtils.isNullOrEmpty(schemaProviderClass) ? null : (SchemaProvider) ReflectionUtils.loadClass(schemaProviderClass, cfg, jssc); @@ -108,6 +113,13 @@ public class UtilHelpers { } } + public static SchemaPostProcessor createSchemaPostProcessor( + String schemaPostProcessorClass, TypedProperties cfg, JavaSparkContext jssc) { + return schemaPostProcessorClass == null + ? null + : (SchemaPostProcessor) ReflectionUtils.loadClass(schemaPostProcessorClass, cfg, jssc); + } + public static Option createTransformer(List classNames) throws IOException { try { List transformers = new ArrayList<>(); @@ -368,4 +380,35 @@ public class UtilHelpers { throw new IOException("Could not load source selector class " + sourceSelectorClass, e); } } + + public static SchemaProvider getOriginalSchemaProvider(SchemaProvider schemaProvider) { + SchemaProvider originalProvider = schemaProvider; + if (schemaProvider instanceof SchemaProviderWithPostProcessor) { + originalProvider = ((SchemaProviderWithPostProcessor) schemaProvider).getOriginalSchemaProvider(); + } else if (schemaProvider instanceof DelegatingSchemaProvider) { + originalProvider = ((DelegatingSchemaProvider) schemaProvider).getSourceSchemaProvider(); + } + return originalProvider; + } + + public static SchemaProviderWithPostProcessor wrapSchemaProviderWithPostProcessor(SchemaProvider provider, + TypedProperties cfg, JavaSparkContext jssc) { + + if (provider == null) { + return null; + } + + if (provider instanceof SchemaProviderWithPostProcessor) { + return (SchemaProviderWithPostProcessor)provider; + } + String schemaPostProcessorClass = cfg.getString(Config.SCHEMA_POST_PROCESSOR_PROP, null); + return new SchemaProviderWithPostProcessor(provider, + Option.ofNullable(createSchemaPostProcessor(schemaPostProcessorClass, cfg, jssc))); + } + + public static SchemaProvider createRowBasedSchemaProvider(StructType structType, + TypedProperties cfg, JavaSparkContext jssc) { + SchemaProvider rowSchemaProvider = new RowBasedSchemaProvider(structType); + return wrapSchemaProviderWithPostProcessor(rowSchemaProvider, cfg, jssc); + } } diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java index be98a6234..2b0de9355 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java @@ -49,7 +49,6 @@ import org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer.Config; import org.apache.hudi.utilities.callback.kafka.HoodieWriteCommitKafkaCallback; import org.apache.hudi.utilities.callback.kafka.HoodieWriteCommitKafkaCallbackConfig; import org.apache.hudi.utilities.schema.DelegatingSchemaProvider; -import org.apache.hudi.utilities.schema.RowBasedSchemaProvider; import org.apache.hudi.utilities.schema.SchemaProvider; import org.apache.hudi.utilities.sources.InputBatch; import org.apache.hudi.utilities.transform.Transformer; @@ -323,7 +322,7 @@ public class DeltaSync implements Serializable { transformed .map(r -> (SchemaProvider) new DelegatingSchemaProvider(props, jssc, dataAndCheckpoint.getSchemaProvider(), - new RowBasedSchemaProvider(r.schema()))) + UtilHelpers.createRowBasedSchemaProvider(r.schema(), props, jssc))) .orElse(dataAndCheckpoint.getSchemaProvider()); avroRDDOptional = transformed .map(t -> AvroConversionUtils.createRdd( diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java index 2f6b17e9d..692422dd6 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java @@ -547,7 +547,8 @@ public class HoodieDeltaStreamer implements Serializable { this.props = properties; LOG.info("Creating delta streamer with configs : " + props.toString()); - this.schemaProvider = UtilHelpers.createSchemaProvider(cfg.schemaProviderClassName, props, jssc); + this.schemaProvider = UtilHelpers.wrapSchemaProviderWithPostProcessor( + UtilHelpers.createSchemaProvider(cfg.schemaProviderClassName, props, jssc), props, jssc); deltaSync = new DeltaSync(cfg, sparkSession, schemaProvider, props, jssc, fs, conf, this::onInitializingWriteClient); diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/SourceFormatAdapter.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/SourceFormatAdapter.java index 7fd23b869..505deed6d 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/SourceFormatAdapter.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/SourceFormatAdapter.java @@ -20,7 +20,9 @@ package org.apache.hudi.utilities.deltastreamer; import org.apache.hudi.AvroConversionUtils; import org.apache.hudi.common.util.Option; +import org.apache.hudi.utilities.UtilHelpers; import org.apache.hudi.utilities.schema.FilebasedSchemaProvider; +import org.apache.hudi.utilities.schema.SchemaProvider; import org.apache.hudi.utilities.sources.AvroSource; import org.apache.hudi.utilities.sources.InputBatch; import org.apache.hudi.utilities.sources.JsonSource; @@ -65,17 +67,16 @@ public final class SourceFormatAdapter { case ROW: { InputBatch> r = ((RowSource) source).fetchNext(lastCkptStr, sourceLimit); return new InputBatch<>(Option.ofNullable(r.getBatch().map( - rdd -> ( - (r.getSchemaProvider() instanceof FilebasedSchemaProvider) - // If the source schema is specified through Avro schema, - // pass in the schema for the Row-to-Avro conversion - // to avoid nullability mismatch between Avro schema and Row schema - ? AvroConversionUtils.createRdd( - rdd, r.getSchemaProvider().getSourceSchema(), - HOODIE_RECORD_STRUCT_NAME, HOODIE_RECORD_NAMESPACE).toJavaRDD() - : AvroConversionUtils.createRdd( - rdd, HOODIE_RECORD_STRUCT_NAME, HOODIE_RECORD_NAMESPACE).toJavaRDD() - )) + rdd -> { + SchemaProvider originalProvider = UtilHelpers.getOriginalSchemaProvider(r.getSchemaProvider()); + return (originalProvider instanceof FilebasedSchemaProvider) + // If the source schema is specified through Avro schema, + // pass in the schema for the Row-to-Avro conversion + // to avoid nullability mismatch between Avro schema and Row schema + ? AvroConversionUtils.createRdd(rdd, r.getSchemaProvider().getSourceSchema(), + HOODIE_RECORD_STRUCT_NAME, HOODIE_RECORD_NAMESPACE).toJavaRDD() : AvroConversionUtils.createRdd(rdd, + HOODIE_RECORD_STRUCT_NAME, HOODIE_RECORD_NAMESPACE).toJavaRDD(); + }) .orElse(null)), r.getCheckpointForNextBatch(), r.getSchemaProvider()); } default: @@ -116,4 +117,4 @@ public final class SourceFormatAdapter { throw new IllegalArgumentException("Unknown source type (" + source.getSourceType() + ")"); } } -} \ No newline at end of file +} diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/DelegatingSchemaProvider.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/DelegatingSchemaProvider.java index 43c64d0c2..cca335fb0 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/DelegatingSchemaProvider.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/DelegatingSchemaProvider.java @@ -48,4 +48,12 @@ public class DelegatingSchemaProvider extends SchemaProvider { public Schema getTargetSchema() { return targetSchemaProvider.getTargetSchema(); } + + public SchemaProvider getSourceSchemaProvider() { + return sourceSchemaProvider; + } + + public SchemaProvider getTargetSchemaProvider() { + return targetSchemaProvider; + } } diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/SchemaPostProcessor.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/SchemaPostProcessor.java new file mode 100644 index 000000000..f0879e055 --- /dev/null +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/SchemaPostProcessor.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.utilities.schema; + +import org.apache.hudi.common.config.TypedProperties; + +import org.apache.avro.Schema; +import org.apache.spark.api.java.JavaSparkContext; + +import java.io.Serializable; + +/** + * Used in {@link SchemaProvider} to modify schema before it is passed to the caller. Can be used to + * add marker fields in records with no fields, make everything optional, ... + */ +public abstract class SchemaPostProcessor implements Serializable { + + /** Configs supported. */ + public static class Config { + public static final String SCHEMA_POST_PROCESSOR_PROP = + "hoodie.deltastreamer.schemaprovider.schema_post_processor"; + } + + private static final long serialVersionUID = 1L; + + protected TypedProperties config; + + protected JavaSparkContext jssc; + + protected SchemaPostProcessor(TypedProperties props, JavaSparkContext jssc) { + this.config = props; + this.jssc = jssc; + } + + /** + * Rewrites schema. + * + * @param schema input schema. + * @return modified schema. + */ + public abstract Schema processSchema(Schema schema); +} diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/SchemaProviderWithPostProcessor.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/SchemaProviderWithPostProcessor.java new file mode 100644 index 000000000..bd5bae460 --- /dev/null +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/SchemaProviderWithPostProcessor.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.utilities.schema; + +import org.apache.avro.Schema; +import org.apache.hudi.common.util.Option; + +/** + * A schema provider which applies schema post process hook on schema. + */ +public class SchemaProviderWithPostProcessor extends SchemaProvider { + + private final SchemaProvider schemaProvider; + private final Option schemaPostProcessor; + + public SchemaProviderWithPostProcessor(SchemaProvider schemaProvider, + Option schemaPostProcessor) { + super(null, null); + this.schemaProvider = schemaProvider; + this.schemaPostProcessor = schemaPostProcessor; + } + + @Override + public Schema getSourceSchema() { + return schemaPostProcessor.map(processor -> processor.processSchema(schemaProvider.getSourceSchema())) + .orElse(schemaProvider.getSourceSchema()); + } + + @Override + public Schema getTargetSchema() { + return schemaPostProcessor.map(processor -> processor.processSchema(schemaProvider.getTargetSchema())) + .orElse(schemaProvider.getTargetSchema()); + } + + public SchemaProvider getOriginalSchemaProvider() { + return schemaProvider; + } +} diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/RowSource.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/RowSource.java index 6e3fcd43b..bd29ccae6 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/RowSource.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/RowSource.java @@ -21,7 +21,7 @@ package org.apache.hudi.utilities.sources; import org.apache.hudi.common.config.TypedProperties; import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.collection.Pair; -import org.apache.hudi.utilities.schema.RowBasedSchemaProvider; +import org.apache.hudi.utilities.UtilHelpers; import org.apache.hudi.utilities.schema.SchemaProvider; import org.apache.spark.api.java.JavaSparkContext; @@ -42,7 +42,8 @@ public abstract class RowSource extends Source> { protected final InputBatch> fetchNewData(Option lastCkptStr, long sourceLimit) { Pair>, String> res = fetchNextBatch(lastCkptStr, sourceLimit); return res.getKey().map(dsr -> { - SchemaProvider rowSchemaProvider = new RowBasedSchemaProvider(dsr.schema()); + SchemaProvider rowSchemaProvider = + UtilHelpers.createRowBasedSchemaProvider(dsr.schema(), props, sparkContext); return new InputBatch<>(res.getKey(), res.getValue(), rowSchemaProvider); }).orElseGet(() -> new InputBatch<>(res.getKey(), res.getValue())); } diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/DummySchemaProvider.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/DummySchemaProvider.java new file mode 100644 index 000000000..205540699 --- /dev/null +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/DummySchemaProvider.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.utilities; + +import org.apache.avro.Schema; +import org.apache.hudi.common.config.TypedProperties; +import org.apache.hudi.utilities.schema.SchemaProvider; +import org.apache.spark.api.java.JavaSparkContext; + +public class DummySchemaProvider extends SchemaProvider { + public DummySchemaProvider(TypedProperties props, JavaSparkContext jssc) { + super(props, jssc); + } + + @Override + public Schema getSourceSchema() { + return Schema.create(Schema.Type.NULL); + } +} diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestSchemaPostProcessor.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestSchemaPostProcessor.java new file mode 100644 index 000000000..39b84571e --- /dev/null +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestSchemaPostProcessor.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.utilities; + +import org.apache.hudi.common.config.TypedProperties; +import org.apache.hudi.utilities.schema.SchemaPostProcessor; +import org.apache.hudi.utilities.schema.SchemaPostProcessor.Config; +import org.apache.hudi.utilities.schema.SchemaProvider; +import org.apache.hudi.utilities.testutils.UtilitiesTestBase; + +import org.apache.avro.Schema; +import org.apache.avro.Schema.Type; +import org.apache.avro.SchemaBuilder; +import org.apache.spark.api.java.JavaSparkContext; +import org.junit.jupiter.api.Test; + +import java.io.IOException; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; + +public class TestSchemaPostProcessor extends UtilitiesTestBase { + + private TypedProperties properties = new TypedProperties(); + + @Test + public void testPostProcessor() throws IOException { + properties.put(Config.SCHEMA_POST_PROCESSOR_PROP, DummySchemaPostProcessor.class.getName()); + SchemaProvider provider = + UtilHelpers.wrapSchemaProviderWithPostProcessor( + UtilHelpers.createSchemaProvider(DummySchemaProvider.class.getName(), properties, jsc), + properties, jsc); + + Schema schema = provider.getSourceSchema(); + assertEquals(schema.getType(), Type.RECORD); + assertEquals(schema.getName(), "test"); + assertNotNull(schema.getField("testString")); + } + + public static class DummySchemaPostProcessor extends SchemaPostProcessor { + + public DummySchemaPostProcessor(TypedProperties props, JavaSparkContext jssc) { + super(props, jssc); + } + + @Override + public Schema processSchema(Schema schema) { + return SchemaBuilder.record("test").fields().optionalString("testString").endRecord(); + } + } +} diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java index 6577c09ad..5916a5be0 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java @@ -39,10 +39,10 @@ import org.apache.hudi.hive.HiveSyncConfig; import org.apache.hudi.hive.HoodieHiveClient; import org.apache.hudi.hive.MultiPartKeysValueExtractor; import org.apache.hudi.keygen.SimpleKeyGenerator; +import org.apache.hudi.utilities.DummySchemaProvider; import org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer; import org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer.Operation; import org.apache.hudi.utilities.schema.FilebasedSchemaProvider; -import org.apache.hudi.utilities.schema.SchemaProvider; import org.apache.hudi.utilities.sources.CsvDFSSource; import org.apache.hudi.utilities.sources.HoodieIncrSource; import org.apache.hudi.utilities.sources.InputBatch; @@ -54,7 +54,6 @@ import org.apache.hudi.utilities.testutils.sources.config.SourceConfigs; import org.apache.hudi.utilities.transform.SqlQueryBasedTransformer; import org.apache.hudi.utilities.transform.Transformer; -import org.apache.avro.Schema; import org.apache.avro.generic.GenericRecord; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileSystem; @@ -1123,16 +1122,4 @@ public class TestHoodieDeltaStreamer extends UtilitiesTestBase { return sparkSession.createDataFrame(jsc.emptyRDD(), rowDataset.schema()); } } - - public static class DummySchemaProvider extends SchemaProvider { - - public DummySchemaProvider(TypedProperties props, JavaSparkContext jssc) { - super(props, jssc); - } - - @Override - public Schema getSourceSchema() { - return Schema.create(Schema.Type.NULL); - } - } }