使用 simpleeval 库替换原有的简单正则匹配,支持复杂的条件表达式评估。新增 ConditionEvaluator 类处理条件逻辑,支持比较运算、逻辑运算、成员测试、数学计算和内置函数,同时保持向后兼容性。
156 lines
4.7 KiB
Python
156 lines
4.7 KiB
Python
"""
|
||
条件评估器模块
|
||
|
||
提供基于 simpleeval 的安全条件表达式评估能力
|
||
"""
|
||
|
||
from simpleeval import (
|
||
EvalWithCompoundTypes,
|
||
NameNotDefined,
|
||
FunctionNotDefined,
|
||
FeatureNotAvailable,
|
||
AttributeDoesNotExist
|
||
)
|
||
from loaders.yaml_loader import YAMLError
|
||
|
||
|
||
class ConditionEvaluator:
|
||
"""条件表达式评估器
|
||
|
||
使用 simpleeval 库安全地评估条件表达式,支持:
|
||
- 比较运算符:==, !=, >, <, >=, <=
|
||
- 逻辑运算符:and, or, not
|
||
- 成员测试:in, not in
|
||
- 列表/元组字面量
|
||
- 数学运算:+, -, *, /, %, **
|
||
- 内置函数:int, float, str, len, bool, abs, min, max
|
||
"""
|
||
|
||
# 表达式最大长度限制(防止过于复杂的表达式)
|
||
MAX_EXPRESSION_LENGTH = 500
|
||
|
||
def __init__(self):
|
||
"""初始化条件评估器"""
|
||
pass
|
||
|
||
def _get_evaluator(self, vars_values):
|
||
"""
|
||
获取配置好的 simpleeval 实例
|
||
|
||
Args:
|
||
vars_values: 变量值字典
|
||
|
||
Returns:
|
||
EvalWithCompoundTypes 实例
|
||
"""
|
||
# 每次评估都创建新实例,避免状态污染
|
||
evaluator = EvalWithCompoundTypes(names=vars_values)
|
||
|
||
# 添加额外的安全函数到白名单
|
||
evaluator.functions.update({
|
||
'len': len,
|
||
'bool': bool,
|
||
'abs': abs,
|
||
'min': min,
|
||
'max': max,
|
||
})
|
||
|
||
return evaluator
|
||
|
||
def _extract_expression(self, condition):
|
||
"""
|
||
从条件字符串中提取表达式
|
||
|
||
Args:
|
||
condition: 条件字符串,如 "{count > 0}"
|
||
|
||
Returns:
|
||
str: 提取的表达式,如 "count > 0"
|
||
"""
|
||
# 去除首尾的 { } 和空白
|
||
expr = condition.strip()
|
||
if expr.startswith('{') and expr.endswith('}'):
|
||
expr = expr[1:-1].strip()
|
||
return expr
|
||
|
||
def evaluate_condition(self, condition, vars_values):
|
||
"""
|
||
评估条件表达式
|
||
|
||
Args:
|
||
condition: 条件字符串,如 "{count > 0 and status == 'active'}"
|
||
vars_values: 变量值字典
|
||
|
||
Returns:
|
||
bool: 条件是否满足
|
||
|
||
Raises:
|
||
YAMLError: 表达式评估失败
|
||
"""
|
||
# 提取 {expression} 中的表达式
|
||
expr = self._extract_expression(condition)
|
||
|
||
# 验证表达式长度
|
||
if len(expr) > self.MAX_EXPRESSION_LENGTH:
|
||
raise YAMLError(
|
||
f"条件表达式过长({len(expr)} 字符,最大 {self.MAX_EXPRESSION_LENGTH}): {condition}"
|
||
)
|
||
|
||
# 评估表达式
|
||
try:
|
||
evaluator = self._get_evaluator(vars_values)
|
||
result = evaluator.eval(expr)
|
||
return bool(result)
|
||
|
||
except NameNotDefined as e:
|
||
# 变量未定义
|
||
var_name = str(e).split("'")[1] if "'" in str(e) else "unknown"
|
||
raise YAMLError(
|
||
f"条件表达式中的变量未定义: {var_name}\n"
|
||
f"表达式: {condition}\n"
|
||
f"可用变量: {', '.join(vars_values.keys())}"
|
||
)
|
||
|
||
except FunctionNotDefined as e:
|
||
# 函数未定义
|
||
func_name = str(e).split("'")[1] if "'" in str(e) else "unknown"
|
||
raise YAMLError(
|
||
f"条件表达式中使用了不支持的函数: {func_name}\n"
|
||
f"表达式: {condition}\n"
|
||
f"提示: 仅支持 int(), float(), str(), len(), bool(), abs(), min(), max() 等基本函数"
|
||
)
|
||
|
||
except FeatureNotAvailable as e:
|
||
# 功能不可用
|
||
raise YAMLError(
|
||
f"条件表达式使用了不支持的语法特性\n"
|
||
f"表达式: {condition}\n"
|
||
f"错误: {str(e)}\n"
|
||
f"提示: 不支持函数定义、导入模块、属性访问等高级特性"
|
||
)
|
||
|
||
except AttributeDoesNotExist as e:
|
||
# 属性不存在(属性访问被禁止)
|
||
raise YAMLError(
|
||
f"条件表达式使用了不支持的语法特性\n"
|
||
f"表达式: {condition}\n"
|
||
f"错误: {str(e)}\n"
|
||
f"提示: 不支持属性访问,请使用字典访问方式"
|
||
)
|
||
|
||
except SyntaxError as e:
|
||
# 语法错误
|
||
raise YAMLError(
|
||
f"条件表达式语法错误\n"
|
||
f"表达式: {condition}\n"
|
||
f"错误: {str(e)}"
|
||
)
|
||
|
||
except Exception as e:
|
||
# 其他错误
|
||
raise YAMLError(
|
||
f"条件表达式评估失败\n"
|
||
f"表达式: {condition}\n"
|
||
f"错误: {type(e).__name__}: {str(e)}"
|
||
)
|