第十三章:流水线引擎深度解析
GeoPipeAgent 的流水线引擎由 engine/ 目录下 5 个模块组成,本章深入解析每个模块的内部机制,帮助开发者理解框架工作原理、调试问题和进行二次开发。
13.1 引擎整体流程
执行一个 YAML 流水线涉及四个阶段:
YAML 文件
↓ parser.py
PipelineDefinition(内存模型)
↓ validator.py
校验通过(或抛出 PipelineValidationError)
↓ executor.py + context.py
执行所有步骤,收集结果
↓ reporter.py
JSON 执行报告
13.2 parser.py:YAML 解析
职责:将 YAML 文件(或字符串)解析为 PipelineDefinition 数据模型。
核心函数
def parse_yaml(source: str | Path) -> PipelineDefinition:
raw = _load_yaml(source) # 读取文件或字符串 → dict
return _build_pipeline(raw) # 构建 PipelineDefinition
关键解析规则
- 顶层
pipeline:键必须存在:若不存在,抛出PipelineParseError("Missing 'pipeline' key at the top level.") steps必须是非空列表:pipeline.steps为空时报错- 每个步骤必须有
id和use:缺少时报错并指明步骤索引 on_error默认值为"fail":解析时自动填充
解析的数据结构
@dataclass
class PipelineDefinition:
name: str
steps: list[StepDefinition]
description: str = ""
crs: str | None = None
variables: dict = {}
outputs: dict = {}
@dataclass
class StepDefinition:
id: str
use: str
params: dict = {}
when: str | None = None
on_error: str = "fail" # "fail" | "skip" | "retry"
backend: str | None = None
13.3 validator.py:流水线校验
职责:在执行之前检查流水线的语义正确性,尽早发现问题。
校验规则
- 步骤 ID 格式:匹配
[a-z0-9_-],点号(.)不允许 - 步骤 ID 唯一性:同一流水线不能有重复 ID
- 步骤类型存在:
use指定的步骤必须已注册(通过registry.has()) - 参数引用合法性:
$step-id引用的步骤必须在当前步骤之前定义;${var}引用的变量必须在variables中定义 on_error合法值:必须是fail/skip/retry之一outputs中的引用:引用的步骤必须存在
区分校验与执行时错误
| 错误类型 | 触发时机 | 异常类型 |
|---|---|---|
顶层 pipeline: 缺失 |
解析时 | PipelineParseError |
| 步骤 ID 不合法 | 校验时 | PipelineValidationError |
| 引用未定义步骤 | 校验时 | PipelineValidationError |
| 步骤执行异常 | 执行时 | StepExecutionError |
| 变量解析失败 | 执行时 | VariableResolutionError |
geopipe-agent validate 命令只运行解析和校验两个阶段,不执行步骤。
13.4 context.py:上下文与引用解析
职责:维护流水线执行状态,提供变量替换和步骤引用解析。
PipelineContext 类
class PipelineContext:
variables: dict # 流水线变量(含 --var 覆盖)
_step_outputs: dict[str, StepResult] # 已完成步骤的输出
def set_output(step_id, result) # 步骤完成后存储结果
def get_output(step_id) # 获取步骤结果
def resolve(value) # 解析值(变量替换/步骤引用)
def resolve_params(params) # 解析整个 params 字典
引用解析算法
resolve(value) 的处理逻辑:
def resolve(self, value):
if not isinstance(value, str):
return value # 非字符串直接返回
if value.startswith("$") and not value.startswith("${"):
return self._resolve_step_ref(value) # 步骤引用
if "${" in value:
return self._substitute_variables(value) # 变量替换
return value # 普通字符串
def _resolve_step_ref(self, ref):
ref_body = ref[1:]
if "." not in ref_body:
step_id, attr = ref_body, "output" # $step → $step.output
else:
step_id, attr = ref_body.split(".", 1)
result = self._step_outputs[step_id]
return getattr(result, attr) # 访问 StepResult 属性
getattr(result, attr) 的工作原理:StepResult 实现了 __getattr__,查找顺序为:
- 内置属性(
output、stats、metadata、issues) stats字典键(如feature_count、issues_count)metadata字典键(如issues_gdf、transform)- 以上都找不到 → 抛出
AttributeError,转为VariableResolutionError
StepContext 类
每个步骤执行时接收 StepContext,提供参数访问:
class StepContext:
def param(self, name, default=None) # 获取指定参数值
def input(self, name="input") # input("input") 的快捷方式
@property params # 完整参数字典
backend # 后端对象(可能为 None)
13.5 executor.py:步骤执行调度
职责:按顺序执行所有步骤,处理条件跳过、错误策略、重试逻辑。
主执行流程
def execute_pipeline(pipeline) -> dict:
context = PipelineContext(variables=pipeline.variables)
backend_manager = BackendManager.default()
for step_def in pipeline.steps:
# 1. 条件判断
if step_def.when and not _evaluate_condition(step_def.when, context):
context.set_output(step_def.id, StepResult())
# → 记录 status=skipped,继续
# 2. 执行步骤(含重试)
result = _execute_step(step_def, context, backend_manager)
context.set_output(step_def.id, result)
# 3. 解析 outputs
resolved_outputs = {k: context.resolve(v) for k, v in pipeline.outputs.items()}
# 4. 生成 JSON 报告
return build_report(...)
when 条件安全求值
_evaluate_condition 函数使用 AST 白名单确保安全:
- 用正则替换
${var}和$step.attr占位符为 Python 字面值 - 解析为 AST(
ast.parse(mode="eval")) - 通过
validate_condition_ast检查 AST 节点类型(白名单) - 使用空
__builtins__求值:eval(tree, {"__builtins__": {}}, {})
若求值失败,返回 False(步骤被跳过),不中断流水线。
重试机制
def _with_retry(fn, max_attempts, step_id):
for attempt in range(1, max_attempts + 1):
try:
return fn()
except StepExecutionError:
raise # 已封装的执行错误不重试
except Exception as e:
if attempt < max_attempts:
time.sleep(0.5 * attempt) # 递增等待:0.5s, 1s, 1.5s
raise last_error
智能错误建议
当步骤失败时,_suggest_fix 函数分析错误信息并提供建议:
patterns = [
("crs" in msg and "degree" in msg → 建议添加 reproject 步骤),
("file not found" → 检查文件路径),
("invalid geometry" → 添加 qc.geometry_validity),
("keyerror" → 检查字段名是否存在),
...
]
这些建议会出现在 JSON 报告的 suggestion 字段中,帮助 AI 或用户快速修复。
13.6 reporter.py:JSON 报告生成
职责:将执行结果汇总为标准 JSON 报告结构。
报告格式
{
"pipeline": "流水线名称",
"status": "success", // "success" | "error"
"duration": 1.234, // 总耗时(秒)
"steps": [
{
"id": "load-roads",
"step": "io.read_vector",
"status": "success", // "success" | "skipped" | "error"
"duration": 0.089,
"output_summary": { // StepResult.summary() 的输出
"feature_count": 100,
"crs": "EPSG:4326",
"geometry_types": ["LineString"]
},
// QC 步骤还会有:
"issues_count": 5,
"issues": [{"rule_id": "...", "severity": "error", "message": "..."}]
},
{
"id": "optional-step",
"step": "vector.simplify",
"status": "skipped",
"skip_reason": "condition not met: ${enable_simplify} == true"
},
{
"id": "failed-step",
"step": "vector.clip",
"status": "error",
"error": "CRS mismatch: ...",
"suggestion": "Add a vector.reproject step..."
}
],
"outputs": {
"result": "output/buffer.geojson",
"feature_count": 100
}
}
13.7 Python API:在代码中使用引擎
除 CLI 外,也可直接在 Python 代码中调用引擎 API:
import geopipe_agent # 触发内置步骤的自动注册
from geopipe_agent.engine.parser import parse_yaml
from geopipe_agent.engine.validator import validate_pipeline
from geopipe_agent.engine.executor import execute_pipeline
from geopipe_agent.errors import GeopipeAgentError
# 方式一:从文件加载
pipeline = parse_yaml("my_pipeline.yaml")
# 方式二:从字符串加载
yaml_str = """
pipeline:
name: "inline pipeline"
steps:
- id: load
use: io.read_vector
params: { path: "data.shp" }
"""
pipeline = parse_yaml(yaml_str)
# 运行时覆盖变量
pipeline.variables["input_path"] = "data/roads.shp"
pipeline.variables["buffer_dist"] = 500
# 校验(返回警告列表,或抛出异常)
warnings = validate_pipeline(pipeline)
# 执行(返回 JSON 报告字典)
try:
report = execute_pipeline(pipeline)
print(f"Status: {report['status']}")
print(f"Duration: {report['duration']}s")
except GeopipeAgentError as e:
print(f"Error: {e}")
13.8 本章小结
本章深入解析了流水线引擎的 5 个模块:
parser.py:YAML → PipelineDefinition,强制要求顶层pipeline:键validator.py:执行前语义校验,检查 ID 格式、步骤存在性、引用合法性context.py:维护执行上下文,解析$step.attr和${var}引用executor.py:按序执行步骤,处理条件跳过、重试、错误策略,生成建议reporter.py:汇总生成标准化 JSON 报告