[HUDI-902] Avoid exception when getSchemaProvider (#1584)
* When no new input data, don't throw exception for null SchemaProvider * Return the newly added NullSchemaProvider instead
This commit is contained in:
@@ -18,10 +18,14 @@
|
|||||||
|
|
||||||
package org.apache.hudi.utilities.sources;
|
package org.apache.hudi.utilities.sources;
|
||||||
|
|
||||||
|
import org.apache.hudi.common.config.TypedProperties;
|
||||||
import org.apache.hudi.common.util.Option;
|
import org.apache.hudi.common.util.Option;
|
||||||
import org.apache.hudi.exception.HoodieException;
|
import org.apache.hudi.exception.HoodieException;
|
||||||
import org.apache.hudi.utilities.schema.SchemaProvider;
|
import org.apache.hudi.utilities.schema.SchemaProvider;
|
||||||
|
|
||||||
|
import org.apache.avro.Schema;
|
||||||
|
import org.apache.spark.api.java.JavaSparkContext;
|
||||||
|
|
||||||
public class InputBatch<T> {
|
public class InputBatch<T> {
|
||||||
|
|
||||||
private final Option<T> batch;
|
private final Option<T> batch;
|
||||||
@@ -49,9 +53,25 @@ public class InputBatch<T> {
|
|||||||
}
|
}
|
||||||
|
|
||||||
public SchemaProvider getSchemaProvider() {
|
public SchemaProvider getSchemaProvider() {
|
||||||
if (schemaProvider == null) {
|
if (batch.isPresent() && schemaProvider == null) {
|
||||||
throw new HoodieException("Please provide a valid schema provider class!");
|
throw new HoodieException("Please provide a valid schema provider class!");
|
||||||
}
|
}
|
||||||
return schemaProvider;
|
return Option.ofNullable(schemaProvider).orElse(new NullSchemaProvider());
|
||||||
|
}
|
||||||
|
|
||||||
|
public static class NullSchemaProvider extends SchemaProvider {
|
||||||
|
|
||||||
|
public NullSchemaProvider() {
|
||||||
|
this(null, null);
|
||||||
|
}
|
||||||
|
|
||||||
|
public NullSchemaProvider(TypedProperties props, JavaSparkContext jssc) {
|
||||||
|
super(props, jssc);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Schema getSourceSchema() {
|
||||||
|
return Schema.create(Schema.Type.NULL);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -0,0 +1,37 @@
|
|||||||
|
package org.apache.hudi.utilities.sources;
|
||||||
|
|
||||||
|
import org.apache.hudi.common.util.Option;
|
||||||
|
import org.apache.hudi.exception.HoodieException;
|
||||||
|
import org.apache.hudi.utilities.schema.RowBasedSchemaProvider;
|
||||||
|
import org.apache.hudi.utilities.schema.SchemaProvider;
|
||||||
|
|
||||||
|
import org.junit.jupiter.api.Test;
|
||||||
|
|
||||||
|
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||||
|
import static org.junit.jupiter.api.Assertions.assertSame;
|
||||||
|
import static org.junit.jupiter.api.Assertions.assertThrows;
|
||||||
|
import static org.junit.jupiter.api.Assertions.assertTrue;
|
||||||
|
|
||||||
|
public class TestInputBatch {
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void getSchemaProviderShouldThrowException() {
|
||||||
|
final InputBatch<String> inputBatch = new InputBatch<>(Option.of("foo"), null, null);
|
||||||
|
Throwable t = assertThrows(HoodieException.class, inputBatch::getSchemaProvider);
|
||||||
|
assertEquals("Please provide a valid schema provider class!", t.getMessage());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void getSchemaProviderShouldReturnNullSchemaProvider() {
|
||||||
|
final InputBatch<String> inputBatch = new InputBatch<>(Option.empty(), null, null);
|
||||||
|
SchemaProvider schemaProvider = inputBatch.getSchemaProvider();
|
||||||
|
assertTrue(schemaProvider instanceof InputBatch.NullSchemaProvider);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void getSchemaProviderShouldReturnGivenSchemaProvider() {
|
||||||
|
SchemaProvider schemaProvider = new RowBasedSchemaProvider(null);
|
||||||
|
final InputBatch<String> inputBatch = new InputBatch<>(Option.of("foo"), null, schemaProvider);
|
||||||
|
assertSame(schemaProvider, inputBatch.getSchemaProvider());
|
||||||
|
}
|
||||||
|
}
|
||||||
Reference in New Issue
Block a user