znlgis 博客

GIS开发与技术分享

第十三章:执行引擎深度解析

13.1 概述

GeoPipeAgent 的执行引擎位于 src/geopipe_agent/engine/ 目录,由 5 个核心模块组成:

模块 文件 职责
解析器 parser.py 读取并解析 YAML 文件为 Python 数据模型
校验器 validator.py 验证流水线结构和引用的合法性
执行器 executor.py 按序执行步骤,处理条件/重试/错误
上下文 context.py 维护变量表和步骤结果,解析引用
报告器 reporter.py 生成 JSON 结构化执行报告

一次流水线运行的完整生命周期:

YAML 文件
    ↓ parser.py:parse_yaml()
PipelineDefinition 对象
    ↓ validator.py:validate_pipeline()
校验通过(或报告警告)
    ↓ executor.py:execute_pipeline()
逐步执行(每步使用 context.py 解析引用)
    ↓ reporter.py:build_report()
JSON 执行报告

13.2 解析器(parser.py)

职责

将 YAML 文件内容解析为 PipelineDefinition Pydantic 模型。

核心函数

def parse_yaml(file_path: str) -> PipelineDefinition:
    """读取 YAML 文件,解析并返回 PipelineDefinition。"""

解析流程

  1. 使用 PyYAML 读取 YAML 文件,转为 Python 字典
  2. 提取 namedescriptioncrsvariablesstepsoutputs 字段
  3. 将字典数据实例化为 PipelineDefinition Pydantic 模型
  4. Pydantic 自动验证字段类型和必填约束
  5. 返回结构化的 PipelineDefinition 对象

解析后的数据模型

class PipelineDefinition(BaseModel):
    name: str                           # 流水线名称(必填)
    description: str | None = None      # 描述(可选)
    crs: str | None = None              # 默认 CRS(可选)
    variables: dict = {}                # 变量字典
    steps: list[StepDefinition] = []   # 步骤列表
    outputs: dict[str, str] = {}       # 输出映射

class StepDefinition(BaseModel):
    id: str                             # 步骤 ID(必填)
    use: str                            # 步骤类型(必填)
    params: dict = {}                  # 步骤参数
    when: str | None = None            # 条件表达式
    on_error: str = "fail"             # 错误策略
    backend: str | None = None         # 后端指定

错误处理

解析器会捕获并转换以下错误:

  • FileNotFoundError:YAML 文件不存在
  • yaml.YAMLError:YAML 格式错误(语法问题)
  • pydantic.ValidationError:模型字段类型不匹配

13.3 校验器(validator.py)

职责

在执行之前对 PipelineDefinition 进行语义校验,返回警告列表(非致命问题)或抛出异常(致命问题)。

校验内容

def validate_pipeline(pipeline: PipelineDefinition) -> list[str]:
    """校验流水线,返回警告列表(空列表表示无问题)。"""
校验项 严重程度 说明
步骤 ID 重复 错误(抛异常) 同一流水线不允许重复 ID
步骤 ID 格式 警告 只允许 [a-z0-9_-]+
use 步骤未注册 警告 可能是自定义步骤,仅警告
步骤引用前向引用 错误(抛异常) 引用不存在或后定义的步骤
when 语法错误 警告 无法解析的 when 表达式
循环引用 错误(抛异常) A 引用 B,B 又引用 A
outputs 中的引用 警告 引用了不存在的步骤

校验结果

  • 警告(warnings):非致命问题,打印到 stderr 但继续执行
  • 异常(exception):致命问题,立即终止并报错
# validate 命令的输出示例
$ geopipe-agent validate my-pipeline.yaml
{
  "status": "valid",
  "pipeline": "道路缓冲区分析",
  "steps_count": 4,
  "steps": [
    {"id": "load-roads", "use": "io.read_vector"},
    {"id": "reproject", "use": "vector.reproject"},
    {"id": "buffer", "use": "vector.buffer"},
    {"id": "save-result", "use": "io.write_vector"}
  ],
  "warnings": ["Step 'optional-step' uses 'my.custom_step' which is not registered."]
}

13.4 上下文(context.py)

PipelineContext 类

PipelineContext 是整个流水线执行过程中的”共享状态”对象,存储变量表和所有已执行步骤的结果。

class PipelineContext:
    variables: dict                        # 全局变量表
    _step_outputs: dict[str, StepResult]  # 步骤结果缓存(key:步骤ID)
    
    def set_output(self, step_id: str, result: StepResult) -> None:
        """记录步骤执行结果。"""
    
    def get_output(self, step_id: str) -> StepResult:
        """获取已执行步骤的结果。"""
    
    def resolve(self, value: Any) -> Any:
        """解析一个可能含有引用的值。"""
    
    def resolve_params(self, params: dict) -> dict:
        """递归解析步骤参数字典中的所有引用。"""

引用解析逻辑

resolve() 方法支持三种形式:

# 1. 步骤引用:$step_id → step_outputs[step_id].output
context.resolve("$load-roads")    
# 返回 load-roads 步骤的输出 GeoDataFrame

# 2. 步骤属性引用:$step_id.attr → step_outputs[step_id].attr
context.resolve("$load-roads.crs")
# 返回 load-roads 步骤输出的 crs 属性

# 3. 变量引用:${var_name} → variables[var_name]
context.resolve("${input_path}")
# 返回 variables["input_path"] 的值

# 4. 字符串插值:包含 ${var} 的字符串
context.resolve("output/${run_id}.geojson")
# 返回 "output/20240115.geojson"(假设 run_id=20240115)

# 5. 普通值:直接返回
context.resolve(500)    # 返回 500
context.resolve(True)   # 返回 True

类型保留机制:当整个字符串就是单个 ${var} 引用时,返回变量的原始类型(int、float、bool 等),而不是字符串。

StepContext 类

StepContext 是传递给每个步骤函数的参数对象:

class StepContext:
    _params: dict          # 已解析的步骤参数
    backend: Any           # 步骤指定的后端(如有)
    pipeline_context: PipelineContext  # 全局上下文引用
    
    def param(self, name: str, default=None) -> Any:
        """获取步骤参数值。"""
    
    def input(self, name: str = "input") -> Any:
        """获取 input 参数(常见模式的快捷方式)。"""

步骤函数通过 StepContext 获取所有需要的参数:

@step(id="vector.buffer", name="缓冲区分析", ...)
def buffer_step(ctx: StepContext) -> StepResult:
    gdf = ctx.input()                           # 获取 input 参数
    distance = ctx.param("distance")            # 获取 distance 参数
    cap_style = ctx.param("cap_style", "round") # 获取可选参数,指定默认值
    
    result = gdf.buffer(distance, cap_style=cap_style)
    return StepResult(output=result, stats={"feature_count": len(result)})

13.5 执行器(executor.py)

执行器是引擎的核心,负责按序调度和运行每个步骤。

主函数流程

def execute_pipeline(pipeline: PipelineDefinition) -> dict:
    """执行已校验的流水线,返回 JSON 可序列化的报告。"""
    context = PipelineContext(variables=pipeline.variables)
    backend_manager = BackendManager.default()
    
    step_reports = []
    
    for step_def in pipeline.steps:
        # 1. 条件判断
        if step_def.when and not _evaluate_condition(step_def.when, context):
            # 跳过步骤
            context.set_output(step_def.id, StepResult())  # 空结果,用于透明引用
            step_reports.append({"status": "skipped", ...})
            continue
        
        try:
            # 2. 执行步骤(含重试)
            result = _execute_step(step_def, context, backend_manager)
            # 3. 记录结果
            context.set_output(step_def.id, result)
            step_reports.append({"status": "success", ...})
        
        except Exception as e:
            if step_def.on_error == "skip":
                # 跳过:记录空结果,继续
                context.set_output(step_def.id, StepResult())
                step_reports.append({"status": "skipped", "error": str(e)})
            else:
                # 终止:记录错误,抛出异常
                raise StepExecutionError(step_def.id, str(e), ...) from e
    
    # 4. 解析 outputs 声明
    resolved_outputs = {}
    for key, ref in pipeline.outputs.items():
        resolved_outputs[key] = _summarize_output(context.resolve(ref))
    
    # 5. 构建报告
    return build_report(...)

步骤执行流程

def _execute_step(step_def, context, backend_manager):
    """执行单个步骤(支持重试)。"""
    max_attempts = 3 if step_def.on_error == "retry" else 1
    
    def _run():
        # 1. 解析参数(替换所有引用)
        resolved_params = context.resolve_params(step_def.params)
        
        # 2. 查找步骤注册信息
        step_info = registry.get(step_def.use)
        
        # 3. 验证参数(必填字段检查 + 类型转换)
        _validate_step_params(step_def.id, resolved_params, step_info)
        
        # 4. 获取后端(如步骤需要)
        backend = backend_manager.get(step_def.backend) if step_info.backends else None
        
        # 5. 构建 StepContext 并调用步骤函数
        ctx = StepContext(params=resolved_params, backend=backend, pipeline_context=context)
        result = step_info.func(ctx)
        
        return result
    
    return _with_retry(_run, max_attempts, step_def.id)

重试机制

def _with_retry(fn, max_attempts: int, step_id: str):
    """指数退避重试。"""
    for attempt in range(1, max_attempts + 1):
        try:
            return fn()
        except StepExecutionError:
            raise  # 步骤执行错误不重试
        except Exception as e:
            if attempt < max_attempts:
                time.sleep(0.5 * attempt)  # 指数退避:0.5s, 1.0s, 1.5s
                continue
            raise

_suggest_fix:AI 友好的错误提示

执行器中有一个 _suggest_fix 函数,通过模式匹配错误信息,自动生成修复建议:

patterns = [
    ("crs" in msg and "mismatch" in msg  "添加 vector.reproject 步骤"),
    ("file not found" in msg  "检查文件路径是否正确"),
    ("self-intersection" in msg  "添加 qc.geometry_validity 步骤修复几何"),
    # ...
]

这些建议会包含在 JSON 报告的错误字段中,便于 AI 自动诊断和修复。

13.6 报告器(reporter.py)

职责

将执行器收集的步骤报告和整体状态汇总为结构化 JSON 报告。

报告结构

def build_report(
    pipeline_name: str,
    status: str,
    duration: float,
    step_reports: list[dict],
    outputs: dict,
) -> dict:
    return {
        "pipeline": {
            "name": pipeline_name,
            "status": status,           # "success" 或 "error"
            "duration": duration,       # 总耗时(秒)
            "timestamp": "2024-01-15T10:30:00Z",
        },
        "steps": step_reports,         # 每步的执行摘要
        "outputs": outputs,            # 输出声明的解析结果
    }

每个步骤报告的字段:

{
  "id": "buffer",
  "step": "vector.buffer",
  "status": "success",
  "duration": 0.05,
  "output_summary": {
    "type": "GeoDataFrame",
    "feature_count": 3,
    "crs": "EPSG:3857"
  }
}

13.7 步骤注册表(registry.py)

职责

维护所有已注册步骤的全局注册表,提供按 ID 和类别的查询接口。

@step 装饰器

步骤通过 @step 装饰器注册到全局注册表:

from geopipe_agent.steps.registry import step, StepContext
from geopipe_agent.models.result import StepResult

@step(
    id="vector.buffer",
    name="缓冲区分析",
    category="vector",
    description="对矢量要素进行缓冲区分析",
    params={
        "input": {"required": True, "type": "geodataframe", "description": "输入 GeoDataFrame"},
        "distance": {"required": True, "type": "number", "description": "缓冲距离"},
        "cap_style": {"required": False, "type": "string", "description": "端点样式"},
    },
    backends=["native_python"],
)
def buffer_step(ctx: StepContext) -> StepResult:
    gdf = ctx.input()
    distance = ctx.param("distance")
    cap_style = ctx.param("cap_style", "round")
    result = gdf.buffer(distance, cap_style={"round": 1, "flat": 2, "square": 3}.get(cap_style, 1))
    return StepResult(
        output=result,
        stats={"feature_count": len(result), "distance": distance}
    )

自动发现机制

GeoPipeAgent 通过 pkgutil.walk_packages 自动发现并加载 steps/ 目录下的所有模块:

# src/geopipe_agent/__init__.py
import pkgutil
import geopipe_agent.steps as _steps_pkg

for _finder, _name, _ispkg in pkgutil.walk_packages(
    _steps_pkg.__path__, prefix=_steps_pkg.__name__ + "."
):
    import importlib
    importlib.import_module(_name)
    # 每个模块被导入时,其中的 @step 装饰器触发注册

这意味着只需将新步骤文件放到正确的目录下,重启时就会自动被注册,无需手动修改注册表。

StepInfo 注册信息

每个步骤注册后,其信息以 StepInfo 对象存储:

class StepInfo:
    id: str              # 步骤 ID(如 "vector.buffer")
    name: str            # 步骤名称
    category: str        # 步骤类别(io/vector/raster/analysis/network/qc)
    description: str     # 步骤描述
    params: dict         # 参数规格(required/type/description)
    backends: list[str]  # 支持的后端列表
    func: Callable       # 步骤执行函数

13.8 日志系统(utils/logging.py)

结构化日志

GeoPipeAgent 支持两种日志格式:

普通格式(默认):

[INFO] 🚀 开始执行流水线:道路缓冲区分析
[INFO] 执行步骤 [1/4]: load-roads (io.read_vector)
[INFO] Step 'load-roads' (io.read_vector) completed in 0.123s

JSON 格式--json-log 标志):

{"level": "INFO", "timestamp": "2024-01-15T10:30:00Z", "msg": "Step completed", "step_id": "load-roads", "duration": 0.123}

JSON 格式便于日志聚合系统(ELK、Splunk 等)解析。

日志级别

级别 命令 说明
DEBUG --log-level DEBUG 打印参数解析、引用解析等详细信息
INFO --log-level INFO(默认) 步骤执行进度
WARNING --log-level WARNING 仅显示警告和错误
ERROR --log-level ERROR 仅显示错误

13.9 完整执行流程图

用户执行命令
geopipe-agent run pipeline.yaml --var key=val
         │
         ▼
   CLI (cli.py)
   解析命令行参数
         │
         ▼
   parser.parse_yaml()
   YAML → PipelineDefinition
         │
         ▼
   变量覆盖 (--var)
         │
         ▼
   validator.validate_pipeline()
   语义校验
         │ 通过(可能有警告)
         ▼
   executor.execute_pipeline()
         │
         ├─ for each step:
         │    ├─ context.resolve(when) → 决定是否跳过
         │    ├─ context.resolve_params(params) → 解析所有引用
         │    ├─ registry.get(use) → 获取步骤函数
         │    ├─ backend_manager.get(backend) → 获取后端
         │    ├─ step_func(StepContext) → 执行步骤
         │    └─ context.set_output(id, result) → 记录结果
         │
         ▼
   reporter.build_report()
   生成 JSON 报告
         │
         ▼
   输出 JSON 到 stdout

13.10 小结

本章深入解析了 GeoPipeAgent 执行引擎的五个核心模块:

  • parser.py:YAML → PipelineDefinition 模型
  • validator.py:语义校验,检测重复 ID、非法引用等
  • executor.py:按序执行步骤,处理 when/on_error/retry
  • context.pyPipelineContext 维护变量和步骤结果,StepContext 向步骤提供参数
  • reporter.py:汇总生成结构化 JSON 报告

下一章将详细介绍 GeoPipeAgent 的多后端系统,包括 7 种后端的实现原理和使用场景。