[HUDI-3946] Validate option path in flink hudi sink (#5397)
This commit is contained in:
@@ -20,6 +20,7 @@ package org.apache.hudi.table;
|
|||||||
|
|
||||||
import org.apache.hudi.common.model.DefaultHoodieRecordPayload;
|
import org.apache.hudi.common.model.DefaultHoodieRecordPayload;
|
||||||
import org.apache.hudi.common.model.EventTimeAvroPayload;
|
import org.apache.hudi.common.model.EventTimeAvroPayload;
|
||||||
|
import org.apache.hudi.common.util.StringUtils;
|
||||||
import org.apache.hudi.configuration.FlinkOptions;
|
import org.apache.hudi.configuration.FlinkOptions;
|
||||||
import org.apache.hudi.configuration.OptionsResolver;
|
import org.apache.hudi.configuration.OptionsResolver;
|
||||||
import org.apache.hudi.exception.HoodieValidationException;
|
import org.apache.hudi.exception.HoodieValidationException;
|
||||||
@@ -53,6 +54,8 @@ import java.util.Collections;
|
|||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
|
|
||||||
|
import static org.apache.hudi.common.util.ValidationUtils.checkArgument;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Hoodie data source/sink factory.
|
* Hoodie data source/sink factory.
|
||||||
*/
|
*/
|
||||||
@@ -81,6 +84,8 @@ public class HoodieTableFactory implements DynamicTableSourceFactory, DynamicTab
|
|||||||
@Override
|
@Override
|
||||||
public DynamicTableSink createDynamicTableSink(Context context) {
|
public DynamicTableSink createDynamicTableSink(Context context) {
|
||||||
Configuration conf = FlinkOptions.fromMap(context.getCatalogTable().getOptions());
|
Configuration conf = FlinkOptions.fromMap(context.getCatalogTable().getOptions());
|
||||||
|
checkArgument(!StringUtils.isNullOrEmpty(conf.getString(FlinkOptions.PATH)),
|
||||||
|
"Option [path] should not be empty.");
|
||||||
ResolvedSchema schema = context.getCatalogTable().getResolvedSchema();
|
ResolvedSchema schema = context.getCatalogTable().getResolvedSchema();
|
||||||
sanityCheck(conf, schema);
|
sanityCheck(conf, schema);
|
||||||
setupConfOptions(conf, context.getObjectIdentifier().getObjectName(), context.getCatalogTable(), schema);
|
setupConfOptions(conf, context.getObjectIdentifier().getObjectName(), context.getCatalogTable(), schema);
|
||||||
|
|||||||
Reference in New Issue
Block a user