diff --git a/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableFactory.java b/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableFactory.java index a2dac3639..7ce8880e1 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableFactory.java +++ b/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableFactory.java @@ -19,6 +19,7 @@ package org.apache.hudi.table; import org.apache.hudi.configuration.FlinkOptions; +import org.apache.hudi.exception.HoodieException; import org.apache.hudi.keygen.ComplexAvroKeyGenerator; import org.apache.hudi.util.AvroSchemaConverter; @@ -57,14 +58,24 @@ public class HoodieTableFactory implements TableSourceFactory, TableSin Configuration conf = FlinkOptions.fromMap(context.getTable().getOptions()); TableSchema schema = TableSchemaUtils.getPhysicalSchema(context.getTable().getSchema()); setupConfOptions(conf, context.getObjectIdentifier().getObjectName(), context.getTable(), schema); - Path path = new Path(conf.getOptional(FlinkOptions.PATH).orElseThrow(() -> - new ValidationException("Option [path] should be not empty."))); - return new HoodieTableSource( - schema, - path, - context.getTable().getPartitionKeys(), - conf.getString(FlinkOptions.PARTITION_DEFAULT_NAME), - conf); + // enclosing the code within a try catch block so that we can log the error message. + // Flink 1.11 did a bad compatibility for the old table factory, it uses the old factory + // to create the source/sink and catches all the exceptions then tries the new factory. + // + // log the error message first so that there is a chance to show the real failure cause. + try { + Path path = new Path(conf.getOptional(FlinkOptions.PATH).orElseThrow(() -> + new ValidationException("Option [path] should not be empty."))); + return new HoodieTableSource( + schema, + path, + context.getTable().getPartitionKeys(), + conf.getString(FlinkOptions.PARTITION_DEFAULT_NAME), + conf); + } catch (Throwable throwable) { + LOG.error("Create table source error", throwable); + throw new HoodieException(throwable); + } } @Override