znlgis 博客

GIS开发与技术分享

第十三章:流水线引擎深度解析

GeoPipeAgent 的流水线引擎由 engine/ 目录下 5 个模块组成,本章深入解析每个模块的内部机制,帮助开发者理解框架工作原理、调试问题和进行二次开发。


13.1 引擎整体流程

执行一个 YAML 流水线涉及四个阶段:

YAML 文件
    ↓ parser.py
PipelineDefinition(内存模型)
    ↓ validator.py
校验通过(或抛出 PipelineValidationError)
    ↓ executor.py + context.py
执行所有步骤,收集结果
    ↓ reporter.py
JSON 执行报告

13.2 parser.py:YAML 解析

职责:将 YAML 文件(或字符串)解析为 PipelineDefinition 数据模型。

核心函数

def parse_yaml(source: str | Path) -> PipelineDefinition:
    raw = _load_yaml(source)       # 读取文件或字符串 → dict
    return _build_pipeline(raw)    # 构建 PipelineDefinition

关键解析规则

  1. 顶层 pipeline: 键必须存在:若不存在,抛出 PipelineParseError("Missing 'pipeline' key at the top level.")
  2. steps 必须是非空列表pipeline.steps 为空时报错
  3. 每个步骤必须有 iduse:缺少时报错并指明步骤索引
  4. on_error 默认值为 "fail":解析时自动填充

解析的数据结构

@dataclass
class PipelineDefinition:
    name: str
    steps: list[StepDefinition]
    description: str = ""
    crs: str | None = None
    variables: dict = {}
    outputs: dict = {}

@dataclass
class StepDefinition:
    id: str
    use: str
    params: dict = {}
    when: str | None = None
    on_error: str = "fail"  # "fail" | "skip" | "retry"
    backend: str | None = None

13.3 validator.py:流水线校验

职责:在执行之前检查流水线的语义正确性,尽早发现问题。

校验规则

  1. 步骤 ID 格式:匹配 [a-z0-9_-],点号(.)不允许
  2. 步骤 ID 唯一性:同一流水线不能有重复 ID
  3. 步骤类型存在use 指定的步骤必须已注册(通过 registry.has()
  4. 参数引用合法性$step-id 引用的步骤必须在当前步骤之前定义;${var} 引用的变量必须在 variables 中定义
  5. on_error 合法值:必须是 fail/skip/retry 之一
  6. outputs 中的引用:引用的步骤必须存在

区分校验与执行时错误

错误类型 触发时机 异常类型
顶层 pipeline: 缺失 解析时 PipelineParseError
步骤 ID 不合法 校验时 PipelineValidationError
引用未定义步骤 校验时 PipelineValidationError
步骤执行异常 执行时 StepExecutionError
变量解析失败 执行时 VariableResolutionError

geopipe-agent validate 命令只运行解析和校验两个阶段,不执行步骤。


13.4 context.py:上下文与引用解析

职责:维护流水线执行状态,提供变量替换和步骤引用解析。

PipelineContext

class PipelineContext:
    variables: dict              # 流水线变量(含 --var 覆盖)
    _step_outputs: dict[str, StepResult]  # 已完成步骤的输出

    def set_output(step_id, result)  # 步骤完成后存储结果
    def get_output(step_id)          # 获取步骤结果
    def resolve(value)               # 解析值(变量替换/步骤引用)
    def resolve_params(params)        # 解析整个 params 字典

引用解析算法

resolve(value) 的处理逻辑:

def resolve(self, value):
    if not isinstance(value, str):
        return value           # 非字符串直接返回

    if value.startswith("$") and not value.startswith("${"):
        return self._resolve_step_ref(value)  # 步骤引用

    if "${" in value:
        return self._substitute_variables(value)  # 变量替换

    return value               # 普通字符串

def _resolve_step_ref(self, ref):
    ref_body = ref[1:]
    if "." not in ref_body:
        step_id, attr = ref_body, "output"   # $step → $step.output
    else:
        step_id, attr = ref_body.split(".", 1)
    result = self._step_outputs[step_id]
    return getattr(result, attr)             # 访问 StepResult 属性

getattr(result, attr) 的工作原理StepResult 实现了 __getattr__,查找顺序为:

  1. 内置属性(outputstatsmetadataissues
  2. stats 字典键(如 feature_countissues_count
  3. metadata 字典键(如 issues_gdftransform
  4. 以上都找不到 → 抛出 AttributeError,转为 VariableResolutionError

StepContext

每个步骤执行时接收 StepContext,提供参数访问:

class StepContext:
    def param(self, name, default=None)  # 获取指定参数值
    def input(self, name="input")        # input("input") 的快捷方式
    @property params                      # 完整参数字典
    backend                              # 后端对象(可能为 None)

13.5 executor.py:步骤执行调度

职责:按顺序执行所有步骤,处理条件跳过、错误策略、重试逻辑。

主执行流程

def execute_pipeline(pipeline) -> dict:
    context = PipelineContext(variables=pipeline.variables)
    backend_manager = BackendManager.default()

    for step_def in pipeline.steps:
        # 1. 条件判断
        if step_def.when and not _evaluate_condition(step_def.when, context):
            context.set_output(step_def.id, StepResult())
            # → 记录 status=skipped,继续

        # 2. 执行步骤(含重试)
        result = _execute_step(step_def, context, backend_manager)
        context.set_output(step_def.id, result)

    # 3. 解析 outputs
    resolved_outputs = {k: context.resolve(v) for k, v in pipeline.outputs.items()}

    # 4. 生成 JSON 报告
    return build_report(...)

when 条件安全求值

_evaluate_condition 函数使用 AST 白名单确保安全:

  1. 用正则替换 ${var}$step.attr 占位符为 Python 字面值
  2. 解析为 AST(ast.parse(mode="eval")
  3. 通过 validate_condition_ast 检查 AST 节点类型(白名单)
  4. 使用空 __builtins__ 求值:eval(tree, {"__builtins__": {}}, {})

若求值失败,返回 False(步骤被跳过),不中断流水线。

重试机制

def _with_retry(fn, max_attempts, step_id):
    for attempt in range(1, max_attempts + 1):
        try:
            return fn()
        except StepExecutionError:
            raise    # 已封装的执行错误不重试
        except Exception as e:
            if attempt < max_attempts:
                time.sleep(0.5 * attempt)  # 递增等待:0.5s, 1s, 1.5s
    raise last_error

智能错误建议

当步骤失败时,_suggest_fix 函数分析错误信息并提供建议:

patterns = [
    ("crs" in msg and "degree" in msg  建议添加 reproject 步骤),
    ("file not found"  检查文件路径),
    ("invalid geometry"  添加 qc.geometry_validity),
    ("keyerror"  检查字段名是否存在),
    ...
]

这些建议会出现在 JSON 报告的 suggestion 字段中,帮助 AI 或用户快速修复。


13.6 reporter.py:JSON 报告生成

职责:将执行结果汇总为标准 JSON 报告结构。

报告格式

{
  "pipeline": "流水线名称",
  "status": "success",              // "success" | "error"
  "duration": 1.234,                // 总耗时(秒)
  "steps": [
    {
      "id": "load-roads",
      "step": "io.read_vector",
      "status": "success",          // "success" | "skipped" | "error"
      "duration": 0.089,
      "output_summary": {           // StepResult.summary() 的输出
        "feature_count": 100,
        "crs": "EPSG:4326",
        "geometry_types": ["LineString"]
      },
      // QC 步骤还会有:
      "issues_count": 5,
      "issues": [{"rule_id": "...", "severity": "error", "message": "..."}]
    },
    {
      "id": "optional-step",
      "step": "vector.simplify",
      "status": "skipped",
      "skip_reason": "condition not met: ${enable_simplify} == true"
    },
    {
      "id": "failed-step",
      "step": "vector.clip",
      "status": "error",
      "error": "CRS mismatch: ...",
      "suggestion": "Add a vector.reproject step..."
    }
  ],
  "outputs": {
    "result": "output/buffer.geojson",
    "feature_count": 100
  }
}

13.7 Python API:在代码中使用引擎

除 CLI 外,也可直接在 Python 代码中调用引擎 API:

import geopipe_agent  # 触发内置步骤的自动注册

from geopipe_agent.engine.parser import parse_yaml
from geopipe_agent.engine.validator import validate_pipeline
from geopipe_agent.engine.executor import execute_pipeline
from geopipe_agent.errors import GeopipeAgentError

# 方式一:从文件加载
pipeline = parse_yaml("my_pipeline.yaml")

# 方式二:从字符串加载
yaml_str = """
pipeline:
  name: "inline pipeline"
  steps:
    - id: load
      use: io.read_vector
      params: { path: "data.shp" }
"""
pipeline = parse_yaml(yaml_str)

# 运行时覆盖变量
pipeline.variables["input_path"] = "data/roads.shp"
pipeline.variables["buffer_dist"] = 500

# 校验(返回警告列表,或抛出异常)
warnings = validate_pipeline(pipeline)

# 执行(返回 JSON 报告字典)
try:
    report = execute_pipeline(pipeline)
    print(f"Status: {report['status']}")
    print(f"Duration: {report['duration']}s")
except GeopipeAgentError as e:
    print(f"Error: {e}")

13.8 本章小结

本章深入解析了流水线引擎的 5 个模块:

  1. parser.py:YAML → PipelineDefinition,强制要求顶层 pipeline:
  2. validator.py:执行前语义校验,检查 ID 格式、步骤存在性、引用合法性
  3. context.py:维护执行上下文,解析 $step.attr${var} 引用
  4. executor.py:按序执行步骤,处理条件跳过、重试、错误策略,生成建议
  5. reporter.py:汇总生成标准化 JSON 报告

导航← 第十二章:数据质检步骤第十四章:多后端系统 →