diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableFactory.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableFactory.java index 7543382e1..5464ea3f2 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableFactory.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableFactory.java @@ -20,6 +20,7 @@ package org.apache.hudi.table; import org.apache.hudi.common.model.DefaultHoodieRecordPayload; import org.apache.hudi.common.model.EventTimeAvroPayload; +import org.apache.hudi.common.util.StringUtils; import org.apache.hudi.configuration.FlinkOptions; import org.apache.hudi.configuration.OptionsResolver; import org.apache.hudi.exception.HoodieValidationException; @@ -53,6 +54,8 @@ import java.util.Collections; import java.util.List; import java.util.Set; +import static org.apache.hudi.common.util.ValidationUtils.checkArgument; + /** * Hoodie data source/sink factory. */ @@ -81,6 +84,8 @@ public class HoodieTableFactory implements DynamicTableSourceFactory, DynamicTab @Override public DynamicTableSink createDynamicTableSink(Context context) { Configuration conf = FlinkOptions.fromMap(context.getCatalogTable().getOptions()); + checkArgument(!StringUtils.isNullOrEmpty(conf.getString(FlinkOptions.PATH)), + "Option [path] should not be empty."); ResolvedSchema schema = context.getCatalogTable().getResolvedSchema(); sanityCheck(conf, schema); setupConfOptions(conf, context.getObjectIdentifier().getObjectName(), context.getCatalogTable(), schema);