From 29b79c99b02d66ef9b087b56223e74c0d1f99e94 Mon Sep 17 00:00:00 2001 From: Danny Chan Date: Wed, 24 Mar 2021 18:25:37 +0800 Subject: [PATCH] [hotfix] Log the error message for creating table source first (#2711) --- .../apache/hudi/table/HoodieTableFactory.java | 27 +++++++++++++------ 1 file changed, 19 insertions(+), 8 deletions(-) diff --git a/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableFactory.java b/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableFactory.java index a2dac3639..7ce8880e1 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableFactory.java +++ b/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableFactory.java @@ -19,6 +19,7 @@ package org.apache.hudi.table; import org.apache.hudi.configuration.FlinkOptions; +import org.apache.hudi.exception.HoodieException; import org.apache.hudi.keygen.ComplexAvroKeyGenerator; import org.apache.hudi.util.AvroSchemaConverter; @@ -57,14 +58,24 @@ public class HoodieTableFactory implements TableSourceFactory, TableSin Configuration conf = FlinkOptions.fromMap(context.getTable().getOptions()); TableSchema schema = TableSchemaUtils.getPhysicalSchema(context.getTable().getSchema()); setupConfOptions(conf, context.getObjectIdentifier().getObjectName(), context.getTable(), schema); - Path path = new Path(conf.getOptional(FlinkOptions.PATH).orElseThrow(() -> - new ValidationException("Option [path] should be not empty."))); - return new HoodieTableSource( - schema, - path, - context.getTable().getPartitionKeys(), - conf.getString(FlinkOptions.PARTITION_DEFAULT_NAME), - conf); + // enclosing the code within a try catch block so that we can log the error message. + // Flink 1.11 did a bad compatibility for the old table factory, it uses the old factory + // to create the source/sink and catches all the exceptions then tries the new factory. + // + // log the error message first so that there is a chance to show the real failure cause. + try { + Path path = new Path(conf.getOptional(FlinkOptions.PATH).orElseThrow(() -> + new ValidationException("Option [path] should not be empty."))); + return new HoodieTableSource( + schema, + path, + context.getTable().getPartitionKeys(), + conf.getString(FlinkOptions.PARTITION_DEFAULT_NAME), + conf); + } catch (Throwable throwable) { + LOG.error("Create table source error", throwable); + throw new HoodieException(throwable); + } } @Override