第三章:核心架构与模块设计
3.1 总体架构
GeoPipeAgent 采用分层架构设计,各层职责清晰,模块之间通过明确的接口交互。从用户输入到执行输出,数据流经以下层次:
┌───────────────────┐
│ 用户 / AI Agent │
│ (YAML Pipeline) │
└─────────┬─────────┘
│
┌─────────▼─────────┐
│ CLI 层 │
│ geopipe-agent │
│ (click commands) │
└─────────┬─────────┘
│
┌───────────────▼───────────────┐
│ Engine 层 │
│ ┌────────┐ ┌──────────┐ │
│ │ Parser │→ │ Validator│ │
│ └────────┘ └────┬─────┘ │
│ │ │
│ ┌────────────────▼─────────┐ │
│ │ Executor │ │
│ │ ┌─────────┐ ┌────────┐ │ │
│ │ │Resolver │ │Context │ │ │
│ │ └─────────┘ └────────┘ │ │
│ │ ┌────────┐ │ │
│ │ │Reporter│ │ │
│ │ └────────┘ │ │
│ └──────────────────────────┘ │
└───────────────┬───────────────┘
│
┌───────────────▼───────────────┐
│ Steps 层 │
│ ┌──────────┐ ┌───────────┐ │
│ │ Registry │ │ @step 装饰器│ │
│ └──────────┘ └───────────┘ │
│ ┌────┬────┬────┬────┬─────┐ │
│ │ IO │Vec │Ras │Ana │ Net │ │
│ └────┴────┴────┴────┴─────┘ │
└───────────────┬───────────────┘
│
┌───────────────▼───────────────┐
│ Backend 层 │
│ ┌──────────┐ ┌──────────┐ │
│ │GdalPython│ │ GdalCli │ │
│ └──────────┘ └──────────┘ │
│ ┌──────────┐ │
│ │QgisProcess│ │
│ └──────────┘ │
└───────────────────────────────┘
3.2 数据流详解
一个典型的流水线执行过程如下:
YAML 文件
│
▼
Parser.parse_yaml()
│ 将 YAML 文本解析为 PipelineDefinition 对象
▼
Validator.validate_pipeline()
│ 校验 step_id 格式、唯一性、注册表存在性、引用合法性
▼
Executor.execute_pipeline()
│
├─── 对每个 Step:
│ │
│ ├── 1. 检查 when 条件(条件执行)
│ ├── 2. Resolver.resolve_step_params() → 解析变量和引用
│ ├── 3. Registry.get() → 查找步骤函数
│ ├── 4. BackendManager.get() → 选择后端引擎
│ ├── 5. 构建 StepContext
│ ├── 6. 调用步骤函数 → 返回 StepResult
│ ├── 7. Context.set_output() → 存储结果
│ └── 8. 处理 on_error 策略(skip/retry/fail)
│
▼
Reporter.build_report()
│ 汇总所有步骤的执行状态、耗时、输出
▼
JSON 执行报告
3.3 模块详解
3.3.1 CLI 层(cli.py)
CLI 层是框架的用户入口,基于 Click 框架实现。它提供 8 个命令,负责:
- 解析命令行参数
- 调用 Engine 层的相应功能
- 格式化并输出结果
@click.group()
@click.version_option(package_name="geopipe-agent")
def main():
"""GeoPipeAgent — AI-Native GIS Analysis Pipeline Framework."""
pass
CLI 层的设计原则是 薄层封装——它不包含业务逻辑,仅负责参数解析和调用引擎层。这使得引擎层可以独立于 CLI 使用(例如在 Python 代码中直接调用)。
8 个 CLI 命令一览:
| 命令 | 功能 | 示例 |
|---|---|---|
run |
执行 YAML 流水线 | geopipe-agent run pipeline.yaml |
validate |
验证流水线(不执行) | geopipe-agent validate pipeline.yaml |
list-steps |
列出所有可用步骤 | geopipe-agent list-steps --category vector |
describe |
查看步骤详细信息 | geopipe-agent describe vector.buffer |
info |
查看 GIS 数据文件信息 | geopipe-agent info data/roads.shp |
backends |
列出后端可用状态 | geopipe-agent backends |
generate-skill-doc |
生成步骤参考文档 | geopipe-agent generate-skill-doc |
generate-skill |
生成完整 Skill 文件集 | geopipe-agent generate-skill --output-dir skills/ |
3.3.2 Engine 层
Engine 层是框架的核心,由六个子模块组成:
Parser(解析器,engine/parser.py)
负责将 YAML 文件或字符串解析为 PipelineDefinition 数据模型。
def parse_yaml(source: str | Path) -> PipelineDefinition:
"""Parse a YAML pipeline file or string into a PipelineDefinition."""
解析过程分为两步:
_load_yaml():加载 YAML 内容(支持文件路径和原始字符串)_build_pipeline():将原始 dict 转换为PipelineDefinition对象
解析器会验证基本结构:
- 顶层必须是字典,且包含
pipeline键 pipeline.steps必须是非空列表- 每个步骤必须有
id和use字段
Validator(验证器,engine/validator.py)
负责在执行前验证流水线的合法性:
def validate_pipeline(pipeline: PipelineDefinition) -> list[str]:
"""Validate a parsed pipeline definition. Returns warnings."""
验证内容包括:
- step_id 格式检查:只允许
[a-z0-9_-],不允许包含点号 - step_id 唯一性:每个步骤 ID 必须唯一
- 步骤注册表检查:
use字段指定的步骤类型必须已注册 - 参数引用检查(递归):
$step_id.attr引用的步骤必须在当前步骤之前定义${var_name}引用的变量必须在variables中定义
- on_error 值检查:只允许
fail、skip、retry - 输出引用检查:
outputs中引用的步骤必须存在
Resolver(参数解析器,engine/resolver.py)
负责解析步骤参数中的变量和引用:
def resolve_step_params(pipeline, step_index, context) -> dict:
"""Resolve all parameters for a step."""
Resolver 是一个薄层封装,实际工作委托给 PipelineContext.resolve_params()。
Context(执行上下文,engine/context.py)
上下文管理整个流水线执行过程中的数据流转,是步骤之间传递数据的核心机制。
PipelineContext 提供以下功能:
class PipelineContext:
def set_output(self, step_id: str, result: StepResult) -> None:
"""存储步骤结果"""
def get_output(self, step_id: str) -> StepResult:
"""获取步骤结果"""
def resolve(self, value: Any) -> Any:
"""解析值(变量替换、步骤引用)"""
def resolve_params(self, params: dict) -> dict:
"""批量解析参数字典"""
引用解析规则:
| 格式 | 示例 | 说明 |
|---|---|---|
$step_id.attr |
$read.output |
引用步骤的输出属性 |
${var_name} |
${input_path} |
替换变量值 |
| 嵌入式变量 | data/${name}.shp |
字符串内嵌变量替换 |
| 纯值 | 500 |
原样返回 |
类型保持规则:
- 如果整个值是一个
${var}引用,返回变量的原始类型(数字、列表等) - 如果变量嵌入在字符串中,执行字符串替换
StepContext 是传递给每个步骤函数的上下文:
class StepContext:
def param(self, name: str, default=None) -> Any:
"""获取解析后的参数"""
def input(self, name: str = "input") -> Any:
"""获取输入数据(param('input') 的快捷方式)"""
@property
def backend(self) -> GeoBackend:
"""获取当前后端"""
Executor(执行器,engine/executor.py)
执行器是引擎的核心,负责顺序执行流水线中的每个步骤:
def execute_pipeline(pipeline: PipelineDefinition) -> dict:
"""Execute a validated pipeline and return a JSON report."""
执行流程中的关键特性:
条件执行(when):
if step_def.when:
if not _evaluate_condition(step_def.when, context):
# 跳过此步骤
条件表达式通过 AST 白名单验证确保安全性,只允许比较运算、布尔运算和常量。
错误处理策略:
match step_def.on_error:
case "fail": # 停止流水线(默认)
case "skip": # 跳过失败的步骤,继续执行
case "retry": # 重试最多 3 次,指数退避
重试机制:
当 on_error="retry" 时,步骤最多重试 3 次,每次间隔 0.5 × 尝试次数秒(简单退避策略)。
AI 友好修复建议:
def _suggest_fix(step_use: str, error: Exception) -> str | None:
"""Generate an AI-friendly fix suggestion."""
# 例如:CRS 不匹配时建议添加 reproject 步骤
# 文件不存在时建议检查路径
Reporter(报告生成器,engine/reporter.py)
负责将执行结果汇总为 JSON 格式的报告:
def build_report(pipeline_name, status, duration, step_reports, outputs) -> dict:
"""Build a JSON-serializable execution report."""
报告结构设计为 AI 友好的扁平格式:
{
"pipeline": "流水线名称",
"status": "success | error",
"duration_seconds": 1.234,
"steps": [
{
"id": "step_id",
"step": "step_type",
"status": "success | error | skipped",
"duration": 0.5,
"output_summary": { ... }
}
],
"outputs": { ... }
}
3.3.3 Steps 层
Steps 层包含所有可执行的步骤,采用插件化架构设计。
注册表(StepRegistry)
注册表是一个 单例模式 的全局步骤目录:
class StepRegistry:
_instance = None
def __new__(cls):
if cls._instance is None:
cls._instance = super().__new__(cls)
cls._instance._steps = {}
return cls._instance
注册表提供以下操作:
| 方法 | 说明 |
|---|---|
register(info) |
注册步骤 |
get(step_id) |
按 ID 获取步骤信息 |
list_all() |
列出所有步骤 |
list_by_category(cat) |
按类别列出 |
has(step_id) |
检查步骤是否存在 |
categories() |
列出所有类别 |
reset() |
重置注册表(测试用) |
@step 装饰器
@step 装饰器是注册步骤的声明式方法:
@step(
id="vector.buffer",
name="矢量缓冲区分析",
description="...",
category="vector",
params={...},
outputs={...},
backends=["gdal_python", "qgis_process"],
examples=[...],
)
def vector_buffer(ctx: StepContext) -> StepResult:
...
装饰器自动完成:
- 如果未指定
category,从id中推导(如vector.buffer→vector) - 创建
_StepInfo对象 - 注册到
StepRegistry - 将
_StepInfo附加到函数的_step_info属性上
步骤加载
步骤通过 load_builtin_steps() 函数加载:
def load_builtin_steps() -> None:
"""Import all built-in step modules to trigger registration."""
import geopipe_agent.steps.io.read_vector
import geopipe_agent.steps.io.write_vector
# ... 共 23 个 import
每个 import 会触发模块中 @step 装饰器的执行,从而将步骤注册到注册表。
3.3.4 Backend 层
Backend 层提供实际的 GIS 操作能力,采用策略模式设计。
抽象基类(GeoBackend)
class GeoBackend(ABC):
@abstractmethod
def name(self) -> str: ...
@abstractmethod
def is_available(self) -> bool: ...
@abstractmethod
def buffer(self, gdf, distance, **kwargs): ...
@abstractmethod
def clip(self, input_gdf, clip_gdf, **kwargs): ...
@abstractmethod
def reproject(self, gdf, target_crs, **kwargs): ...
@abstractmethod
def dissolve(self, gdf, by=None, **kwargs): ...
@abstractmethod
def simplify(self, gdf, tolerance, **kwargs): ...
@abstractmethod
def overlay(self, gdf1, gdf2, how="intersection", **kwargs): ...
三种后端实现相同的接口,用户可以在步骤中指定使用特定后端:
| 后端 | 技术栈 | 适用场景 |
|---|---|---|
GdalPythonBackend |
GeoPandas + Shapely | 默认,适合中小数据 |
GdalCliBackend |
ogr2ogr CLI | 大文件处理 |
QgisProcessBackend |
qgis_process CLI | 需要 QGIS 算法时 |
BackendManager
后端管理器负责自动检测可用后端和按需选择:
class BackendManager:
def __init__(self):
self.backends = []
self._detect_available() # 自动检测
def get(self, preferred=None) -> GeoBackend:
"""获取后端(指定或默认第一个可用的)"""
3.3.5 Models 层
Models 层定义了框架的核心数据结构,使用 Python 的 dataclass 实现。
PipelineDefinition
@dataclass
class PipelineDefinition:
name: str
steps: list[StepDefinition]
description: str = ""
crs: str | None = None
variables: dict = field(default_factory=dict)
outputs: dict = field(default_factory=dict)
StepDefinition
@dataclass
class StepDefinition:
id: str # 唯一标识符
use: str # 步骤类型(如 "vector.buffer")
params: dict = {} # 参数
when: str | None = None # 条件执行表达式
on_error: str = "fail" # 错误处理策略
backend: str | None = None # 指定后端
StepResult
@dataclass
class StepResult:
output: Any = None # 主要输出(通常是 GeoDataFrame)
stats: dict = {} # 统计信息
metadata: dict = {} # 元数据
StepResult 实现了 __getattr__ 方法,允许通过属性访问 stats 和 metadata 中的键,这是 $step_id.attr 引用机制的基础。
3.3.6 错误处理体系
GeoPipeAgent 设计了完善的异常层次结构:
GeopipeAgentError (基类)
├── PipelineParseError # YAML 解析错误
├── PipelineValidationError # 流水线验证错误
├── StepExecutionError # 步骤执行错误(含 suggestion)
├── BackendNotAvailableError # 后端不可用
├── StepNotFoundError # 步骤未注册
└── VariableResolutionError # 变量/引用解析错误
StepExecutionError 是最关键的异常类,它包含了 AI 友好的属性:
class StepExecutionError(GeopipeAgentError):
def __init__(self, step_id, message, cause=None, suggestion=None):
self.step_id = step_id
self.suggestion = suggestion # AI 修复建议
self.cause = cause
def to_dict(self) -> dict:
"""转为 JSON,便于 AI 解析"""
3.3.7 Utils 层
日志系统(utils/logging.py)
支持两种日志格式:
- 文本格式(默认):适合人类阅读
- JSON 格式(
--json-log):适合机器解析
def setup_logging(level: str = "INFO", json_format: bool = False):
...
CRS 工具(utils/crs.py)
提供 CRS 标准化函数:
def normalize_crs(crs_input: str) -> str:
"""将各种 CRS 表示归一化为 EPSG 代码"""
3.4 设计原则
GeoPipeAgent 的架构设计遵循以下原则:
3.4.1 AI 优先(AI-First)
所有 API 设计都以 AI 的使用习惯为核心:
- YAML 作为输入格式(结构化、AI 易于生成)
- JSON 作为输出格式(AI 易于解析)
- 错误信息包含修复建议(AI 可自动纠错)
- Skill 文件自动生成(AI 可自主学习框架能力)
3.4.2 声明式优于命令式
用户(包括 AI)只需要声明”做什么”,不需要关心”怎么做”:
- 流水线是声明式的 YAML 配置
- 步骤参数是声明式的键值对
- 后端选择可以是自动的
3.4.3 插件化架构
所有步骤通过 @step 装饰器注册,框架不硬编码任何步骤逻辑:
- 添加新步骤只需创建一个新文件并使用装饰器
- 步骤可以按类别组织
- 步骤的元信息可用于自动生成文档
3.4.4 关注点分离
每个模块有明确的单一职责:
- Parser 只负责解析
- Validator 只负责验证
- Executor 只负责执行
- Reporter 只负责报告
3.4.5 安全优先
对于涉及动态代码执行的场景(如 when 条件表达式、raster.calc 波段计算),统一使用 AST 白名单验证 而非简单的 eval():
# 安全的做法:先验证 AST,再执行
tree = ast.parse(expression, mode="eval")
for node in ast.walk(tree):
if not isinstance(node, _SAFE_NODES):
raise ValueError("不安全的表达式")
result = eval(compile(tree, ...))
3.5 模块依赖关系
cli.py
└── engine/
├── parser.py → models/pipeline.py
├── validator.py → steps/registry.py
├── executor.py
│ ├── context.py → models/result.py
│ ├── resolver.py → context.py
│ ├── reporter.py
│ ├── steps/registry.py
│ └── backends/
└── reporter.py
steps/
├── registry.py(单例)
├── decorators.py → registry.py
└── io/ vector/ raster/ analysis/ network/
└── 每个步骤文件 → decorators.py, context.py, result.py
backends/
├── base.py(抽象基类)
├── gdal_python.py → base.py
├── gdal_cli.py → base.py
└── qgis_process.py → base.py
框架没有循环依赖,依赖方向清晰:上层依赖下层,同层之间不互相依赖。