第十三章:执行引擎深度解析
13.1 概述
GeoPipeAgent 的执行引擎位于 src/geopipe_agent/engine/ 目录,由 5 个核心模块组成:
| 模块 | 文件 | 职责 |
|---|---|---|
| 解析器 | parser.py |
读取并解析 YAML 文件为 Python 数据模型 |
| 校验器 | validator.py |
验证流水线结构和引用的合法性 |
| 执行器 | executor.py |
按序执行步骤,处理条件/重试/错误 |
| 上下文 | context.py |
维护变量表和步骤结果,解析引用 |
| 报告器 | reporter.py |
生成 JSON 结构化执行报告 |
一次流水线运行的完整生命周期:
YAML 文件
↓ parser.py:parse_yaml()
PipelineDefinition 对象
↓ validator.py:validate_pipeline()
校验通过(或报告警告)
↓ executor.py:execute_pipeline()
逐步执行(每步使用 context.py 解析引用)
↓ reporter.py:build_report()
JSON 执行报告
13.2 解析器(parser.py)
职责
将 YAML 文件内容解析为 PipelineDefinition Pydantic 模型。
核心函数
def parse_yaml(file_path: str) -> PipelineDefinition:
"""读取 YAML 文件,解析并返回 PipelineDefinition。"""
解析流程
- 使用 PyYAML 读取 YAML 文件,转为 Python 字典
- 提取
name、description、crs、variables、steps、outputs字段 - 将字典数据实例化为
PipelineDefinitionPydantic 模型 - Pydantic 自动验证字段类型和必填约束
- 返回结构化的
PipelineDefinition对象
解析后的数据模型
class PipelineDefinition(BaseModel):
name: str # 流水线名称(必填)
description: str | None = None # 描述(可选)
crs: str | None = None # 默认 CRS(可选)
variables: dict = {} # 变量字典
steps: list[StepDefinition] = [] # 步骤列表
outputs: dict[str, str] = {} # 输出映射
class StepDefinition(BaseModel):
id: str # 步骤 ID(必填)
use: str # 步骤类型(必填)
params: dict = {} # 步骤参数
when: str | None = None # 条件表达式
on_error: str = "fail" # 错误策略
backend: str | None = None # 后端指定
错误处理
解析器会捕获并转换以下错误:
FileNotFoundError:YAML 文件不存在yaml.YAMLError:YAML 格式错误(语法问题)pydantic.ValidationError:模型字段类型不匹配
13.3 校验器(validator.py)
职责
在执行之前对 PipelineDefinition 进行语义校验,返回警告列表(非致命问题)或抛出异常(致命问题)。
校验内容
def validate_pipeline(pipeline: PipelineDefinition) -> list[str]:
"""校验流水线,返回警告列表(空列表表示无问题)。"""
| 校验项 | 严重程度 | 说明 |
|---|---|---|
| 步骤 ID 重复 | 错误(抛异常) | 同一流水线不允许重复 ID |
| 步骤 ID 格式 | 警告 | 只允许 [a-z0-9_-]+ |
use 步骤未注册 |
警告 | 可能是自定义步骤,仅警告 |
| 步骤引用前向引用 | 错误(抛异常) | 引用不存在或后定义的步骤 |
when 语法错误 |
警告 | 无法解析的 when 表达式 |
| 循环引用 | 错误(抛异常) | A 引用 B,B 又引用 A |
outputs 中的引用 |
警告 | 引用了不存在的步骤 |
校验结果
- 警告(warnings):非致命问题,打印到 stderr 但继续执行
- 异常(exception):致命问题,立即终止并报错
# validate 命令的输出示例
$ geopipe-agent validate my-pipeline.yaml
{
"status": "valid",
"pipeline": "道路缓冲区分析",
"steps_count": 4,
"steps": [
{"id": "load-roads", "use": "io.read_vector"},
{"id": "reproject", "use": "vector.reproject"},
{"id": "buffer", "use": "vector.buffer"},
{"id": "save-result", "use": "io.write_vector"}
],
"warnings": ["Step 'optional-step' uses 'my.custom_step' which is not registered."]
}
13.4 上下文(context.py)
PipelineContext 类
PipelineContext 是整个流水线执行过程中的”共享状态”对象,存储变量表和所有已执行步骤的结果。
class PipelineContext:
variables: dict # 全局变量表
_step_outputs: dict[str, StepResult] # 步骤结果缓存(key:步骤ID)
def set_output(self, step_id: str, result: StepResult) -> None:
"""记录步骤执行结果。"""
def get_output(self, step_id: str) -> StepResult:
"""获取已执行步骤的结果。"""
def resolve(self, value: Any) -> Any:
"""解析一个可能含有引用的值。"""
def resolve_params(self, params: dict) -> dict:
"""递归解析步骤参数字典中的所有引用。"""
引用解析逻辑
resolve() 方法支持三种形式:
# 1. 步骤引用:$step_id → step_outputs[step_id].output
context.resolve("$load-roads")
# 返回 load-roads 步骤的输出 GeoDataFrame
# 2. 步骤属性引用:$step_id.attr → step_outputs[step_id].attr
context.resolve("$load-roads.crs")
# 返回 load-roads 步骤输出的 crs 属性
# 3. 变量引用:${var_name} → variables[var_name]
context.resolve("${input_path}")
# 返回 variables["input_path"] 的值
# 4. 字符串插值:包含 ${var} 的字符串
context.resolve("output/${run_id}.geojson")
# 返回 "output/20240115.geojson"(假设 run_id=20240115)
# 5. 普通值:直接返回
context.resolve(500) # 返回 500
context.resolve(True) # 返回 True
类型保留机制:当整个字符串就是单个 ${var} 引用时,返回变量的原始类型(int、float、bool 等),而不是字符串。
StepContext 类
StepContext 是传递给每个步骤函数的参数对象:
class StepContext:
_params: dict # 已解析的步骤参数
backend: Any # 步骤指定的后端(如有)
pipeline_context: PipelineContext # 全局上下文引用
def param(self, name: str, default=None) -> Any:
"""获取步骤参数值。"""
def input(self, name: str = "input") -> Any:
"""获取 input 参数(常见模式的快捷方式)。"""
步骤函数通过 StepContext 获取所有需要的参数:
@step(id="vector.buffer", name="缓冲区分析", ...)
def buffer_step(ctx: StepContext) -> StepResult:
gdf = ctx.input() # 获取 input 参数
distance = ctx.param("distance") # 获取 distance 参数
cap_style = ctx.param("cap_style", "round") # 获取可选参数,指定默认值
result = gdf.buffer(distance, cap_style=cap_style)
return StepResult(output=result, stats={"feature_count": len(result)})
13.5 执行器(executor.py)
执行器是引擎的核心,负责按序调度和运行每个步骤。
主函数流程
def execute_pipeline(pipeline: PipelineDefinition) -> dict:
"""执行已校验的流水线,返回 JSON 可序列化的报告。"""
context = PipelineContext(variables=pipeline.variables)
backend_manager = BackendManager.default()
step_reports = []
for step_def in pipeline.steps:
# 1. 条件判断
if step_def.when and not _evaluate_condition(step_def.when, context):
# 跳过步骤
context.set_output(step_def.id, StepResult()) # 空结果,用于透明引用
step_reports.append({"status": "skipped", ...})
continue
try:
# 2. 执行步骤(含重试)
result = _execute_step(step_def, context, backend_manager)
# 3. 记录结果
context.set_output(step_def.id, result)
step_reports.append({"status": "success", ...})
except Exception as e:
if step_def.on_error == "skip":
# 跳过:记录空结果,继续
context.set_output(step_def.id, StepResult())
step_reports.append({"status": "skipped", "error": str(e)})
else:
# 终止:记录错误,抛出异常
raise StepExecutionError(step_def.id, str(e), ...) from e
# 4. 解析 outputs 声明
resolved_outputs = {}
for key, ref in pipeline.outputs.items():
resolved_outputs[key] = _summarize_output(context.resolve(ref))
# 5. 构建报告
return build_report(...)
步骤执行流程
def _execute_step(step_def, context, backend_manager):
"""执行单个步骤(支持重试)。"""
max_attempts = 3 if step_def.on_error == "retry" else 1
def _run():
# 1. 解析参数(替换所有引用)
resolved_params = context.resolve_params(step_def.params)
# 2. 查找步骤注册信息
step_info = registry.get(step_def.use)
# 3. 验证参数(必填字段检查 + 类型转换)
_validate_step_params(step_def.id, resolved_params, step_info)
# 4. 获取后端(如步骤需要)
backend = backend_manager.get(step_def.backend) if step_info.backends else None
# 5. 构建 StepContext 并调用步骤函数
ctx = StepContext(params=resolved_params, backend=backend, pipeline_context=context)
result = step_info.func(ctx)
return result
return _with_retry(_run, max_attempts, step_def.id)
重试机制
def _with_retry(fn, max_attempts: int, step_id: str):
"""指数退避重试。"""
for attempt in range(1, max_attempts + 1):
try:
return fn()
except StepExecutionError:
raise # 步骤执行错误不重试
except Exception as e:
if attempt < max_attempts:
time.sleep(0.5 * attempt) # 指数退避:0.5s, 1.0s, 1.5s
continue
raise
_suggest_fix:AI 友好的错误提示
执行器中有一个 _suggest_fix 函数,通过模式匹配错误信息,自动生成修复建议:
patterns = [
("crs" in msg and "mismatch" in msg → "添加 vector.reproject 步骤"),
("file not found" in msg → "检查文件路径是否正确"),
("self-intersection" in msg → "添加 qc.geometry_validity 步骤修复几何"),
# ...
]
这些建议会包含在 JSON 报告的错误字段中,便于 AI 自动诊断和修复。
13.6 报告器(reporter.py)
职责
将执行器收集的步骤报告和整体状态汇总为结构化 JSON 报告。
报告结构
def build_report(
pipeline_name: str,
status: str,
duration: float,
step_reports: list[dict],
outputs: dict,
) -> dict:
return {
"pipeline": {
"name": pipeline_name,
"status": status, # "success" 或 "error"
"duration": duration, # 总耗时(秒)
"timestamp": "2024-01-15T10:30:00Z",
},
"steps": step_reports, # 每步的执行摘要
"outputs": outputs, # 输出声明的解析结果
}
每个步骤报告的字段:
{
"id": "buffer",
"step": "vector.buffer",
"status": "success",
"duration": 0.05,
"output_summary": {
"type": "GeoDataFrame",
"feature_count": 3,
"crs": "EPSG:3857"
}
}
13.7 步骤注册表(registry.py)
职责
维护所有已注册步骤的全局注册表,提供按 ID 和类别的查询接口。
@step 装饰器
步骤通过 @step 装饰器注册到全局注册表:
from geopipe_agent.steps.registry import step, StepContext
from geopipe_agent.models.result import StepResult
@step(
id="vector.buffer",
name="缓冲区分析",
category="vector",
description="对矢量要素进行缓冲区分析",
params={
"input": {"required": True, "type": "geodataframe", "description": "输入 GeoDataFrame"},
"distance": {"required": True, "type": "number", "description": "缓冲距离"},
"cap_style": {"required": False, "type": "string", "description": "端点样式"},
},
backends=["native_python"],
)
def buffer_step(ctx: StepContext) -> StepResult:
gdf = ctx.input()
distance = ctx.param("distance")
cap_style = ctx.param("cap_style", "round")
result = gdf.buffer(distance, cap_style={"round": 1, "flat": 2, "square": 3}.get(cap_style, 1))
return StepResult(
output=result,
stats={"feature_count": len(result), "distance": distance}
)
自动发现机制
GeoPipeAgent 通过 pkgutil.walk_packages 自动发现并加载 steps/ 目录下的所有模块:
# src/geopipe_agent/__init__.py
import pkgutil
import geopipe_agent.steps as _steps_pkg
for _finder, _name, _ispkg in pkgutil.walk_packages(
_steps_pkg.__path__, prefix=_steps_pkg.__name__ + "."
):
import importlib
importlib.import_module(_name)
# 每个模块被导入时,其中的 @step 装饰器触发注册
这意味着只需将新步骤文件放到正确的目录下,重启时就会自动被注册,无需手动修改注册表。
StepInfo 注册信息
每个步骤注册后,其信息以 StepInfo 对象存储:
class StepInfo:
id: str # 步骤 ID(如 "vector.buffer")
name: str # 步骤名称
category: str # 步骤类别(io/vector/raster/analysis/network/qc)
description: str # 步骤描述
params: dict # 参数规格(required/type/description)
backends: list[str] # 支持的后端列表
func: Callable # 步骤执行函数
13.8 日志系统(utils/logging.py)
结构化日志
GeoPipeAgent 支持两种日志格式:
普通格式(默认):
[INFO] 🚀 开始执行流水线:道路缓冲区分析
[INFO] 执行步骤 [1/4]: load-roads (io.read_vector)
[INFO] Step 'load-roads' (io.read_vector) completed in 0.123s
JSON 格式(--json-log 标志):
{"level": "INFO", "timestamp": "2024-01-15T10:30:00Z", "msg": "Step completed", "step_id": "load-roads", "duration": 0.123}
JSON 格式便于日志聚合系统(ELK、Splunk 等)解析。
日志级别
| 级别 | 命令 | 说明 |
|---|---|---|
| DEBUG | --log-level DEBUG |
打印参数解析、引用解析等详细信息 |
| INFO | --log-level INFO(默认) |
步骤执行进度 |
| WARNING | --log-level WARNING |
仅显示警告和错误 |
| ERROR | --log-level ERROR |
仅显示错误 |
13.9 完整执行流程图
用户执行命令
geopipe-agent run pipeline.yaml --var key=val
│
▼
CLI (cli.py)
解析命令行参数
│
▼
parser.parse_yaml()
YAML → PipelineDefinition
│
▼
变量覆盖 (--var)
│
▼
validator.validate_pipeline()
语义校验
│ 通过(可能有警告)
▼
executor.execute_pipeline()
│
├─ for each step:
│ ├─ context.resolve(when) → 决定是否跳过
│ ├─ context.resolve_params(params) → 解析所有引用
│ ├─ registry.get(use) → 获取步骤函数
│ ├─ backend_manager.get(backend) → 获取后端
│ ├─ step_func(StepContext) → 执行步骤
│ └─ context.set_output(id, result) → 记录结果
│
▼
reporter.build_report()
生成 JSON 报告
│
▼
输出 JSON 到 stdout
13.10 小结
本章深入解析了 GeoPipeAgent 执行引擎的五个核心模块:
- parser.py:YAML → PipelineDefinition 模型
- validator.py:语义校验,检测重复 ID、非法引用等
- executor.py:按序执行步骤,处理
when/on_error/retry - context.py:
PipelineContext维护变量和步骤结果,StepContext向步骤提供参数 - reporter.py:汇总生成结构化 JSON 报告
下一章将详细介绍 GeoPipeAgent 的多后端系统,包括 7 种后端的实现原理和使用场景。