diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/InputBatch.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/InputBatch.java index dcf56f32b..f752e0d5a 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/InputBatch.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/InputBatch.java @@ -18,10 +18,14 @@ package org.apache.hudi.utilities.sources; +import org.apache.hudi.common.config.TypedProperties; import org.apache.hudi.common.util.Option; import org.apache.hudi.exception.HoodieException; import org.apache.hudi.utilities.schema.SchemaProvider; +import org.apache.avro.Schema; +import org.apache.spark.api.java.JavaSparkContext; + public class InputBatch { private final Option batch; @@ -49,9 +53,25 @@ public class InputBatch { } public SchemaProvider getSchemaProvider() { - if (schemaProvider == null) { + if (batch.isPresent() && schemaProvider == null) { throw new HoodieException("Please provide a valid schema provider class!"); } - return schemaProvider; + return Option.ofNullable(schemaProvider).orElse(new NullSchemaProvider()); + } + + public static class NullSchemaProvider extends SchemaProvider { + + public NullSchemaProvider() { + this(null, null); + } + + public NullSchemaProvider(TypedProperties props, JavaSparkContext jssc) { + super(props, jssc); + } + + @Override + public Schema getSourceSchema() { + return Schema.create(Schema.Type.NULL); + } } } diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestInputBatch.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestInputBatch.java new file mode 100644 index 000000000..752621da5 --- /dev/null +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestInputBatch.java @@ -0,0 +1,37 @@ +package org.apache.hudi.utilities.sources; + +import org.apache.hudi.common.util.Option; +import org.apache.hudi.exception.HoodieException; +import org.apache.hudi.utilities.schema.RowBasedSchemaProvider; +import org.apache.hudi.utilities.schema.SchemaProvider; + +import org.junit.jupiter.api.Test; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertSame; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; + +public class TestInputBatch { + + @Test + public void getSchemaProviderShouldThrowException() { + final InputBatch inputBatch = new InputBatch<>(Option.of("foo"), null, null); + Throwable t = assertThrows(HoodieException.class, inputBatch::getSchemaProvider); + assertEquals("Please provide a valid schema provider class!", t.getMessage()); + } + + @Test + public void getSchemaProviderShouldReturnNullSchemaProvider() { + final InputBatch inputBatch = new InputBatch<>(Option.empty(), null, null); + SchemaProvider schemaProvider = inputBatch.getSchemaProvider(); + assertTrue(schemaProvider instanceof InputBatch.NullSchemaProvider); + } + + @Test + public void getSchemaProviderShouldReturnGivenSchemaProvider() { + SchemaProvider schemaProvider = new RowBasedSchemaProvider(null); + final InputBatch inputBatch = new InputBatch<>(Option.of("foo"), null, schemaProvider); + assertSame(schemaProvider, inputBatch.getSchemaProvider()); + } +}