[hotfix] Log the error message for creating table source first (#2711)
This commit is contained in:
@@ -19,6 +19,7 @@
|
|||||||
package org.apache.hudi.table;
|
package org.apache.hudi.table;
|
||||||
|
|
||||||
import org.apache.hudi.configuration.FlinkOptions;
|
import org.apache.hudi.configuration.FlinkOptions;
|
||||||
|
import org.apache.hudi.exception.HoodieException;
|
||||||
import org.apache.hudi.keygen.ComplexAvroKeyGenerator;
|
import org.apache.hudi.keygen.ComplexAvroKeyGenerator;
|
||||||
import org.apache.hudi.util.AvroSchemaConverter;
|
import org.apache.hudi.util.AvroSchemaConverter;
|
||||||
|
|
||||||
@@ -57,14 +58,24 @@ public class HoodieTableFactory implements TableSourceFactory<RowData>, TableSin
|
|||||||
Configuration conf = FlinkOptions.fromMap(context.getTable().getOptions());
|
Configuration conf = FlinkOptions.fromMap(context.getTable().getOptions());
|
||||||
TableSchema schema = TableSchemaUtils.getPhysicalSchema(context.getTable().getSchema());
|
TableSchema schema = TableSchemaUtils.getPhysicalSchema(context.getTable().getSchema());
|
||||||
setupConfOptions(conf, context.getObjectIdentifier().getObjectName(), context.getTable(), schema);
|
setupConfOptions(conf, context.getObjectIdentifier().getObjectName(), context.getTable(), schema);
|
||||||
|
// enclosing the code within a try catch block so that we can log the error message.
|
||||||
|
// Flink 1.11 did a bad compatibility for the old table factory, it uses the old factory
|
||||||
|
// to create the source/sink and catches all the exceptions then tries the new factory.
|
||||||
|
//
|
||||||
|
// log the error message first so that there is a chance to show the real failure cause.
|
||||||
|
try {
|
||||||
Path path = new Path(conf.getOptional(FlinkOptions.PATH).orElseThrow(() ->
|
Path path = new Path(conf.getOptional(FlinkOptions.PATH).orElseThrow(() ->
|
||||||
new ValidationException("Option [path] should be not empty.")));
|
new ValidationException("Option [path] should not be empty.")));
|
||||||
return new HoodieTableSource(
|
return new HoodieTableSource(
|
||||||
schema,
|
schema,
|
||||||
path,
|
path,
|
||||||
context.getTable().getPartitionKeys(),
|
context.getTable().getPartitionKeys(),
|
||||||
conf.getString(FlinkOptions.PARTITION_DEFAULT_NAME),
|
conf.getString(FlinkOptions.PARTITION_DEFAULT_NAME),
|
||||||
conf);
|
conf);
|
||||||
|
} catch (Throwable throwable) {
|
||||||
|
LOG.error("Create table source error", throwable);
|
||||||
|
throw new HoodieException(throwable);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|||||||
Reference in New Issue
Block a user