znlgis 博客

GIS开发与技术分享

第三章:核心架构与模块设计

3.1 总体架构

GeoPipeAgent 采用分层架构设计,各层职责清晰,模块之间通过明确的接口交互。从用户输入到执行输出,数据流经以下层次:

                    ┌───────────────────┐
                    │   用户 / AI Agent   │
                    │   (YAML Pipeline)  │
                    └─────────┬─────────┘
                              │
                    ┌─────────▼─────────┐
                    │      CLI 层        │
                    │  geopipe-agent     │
                    │  (click commands)  │
                    └─────────┬─────────┘
                              │
              ┌───────────────▼───────────────┐
              │         Engine 层               │
              │  ┌────────┐  ┌──────────┐      │
              │  │ Parser │→ │ Validator│      │
              │  └────────┘  └────┬─────┘      │
              │                   │            │
              │  ┌────────────────▼─────────┐  │
              │  │     Executor             │  │
              │  │  ┌─────────┐ ┌────────┐  │  │
              │  │  │Resolver │ │Context │  │  │
              │  │  └─────────┘ └────────┘  │  │
              │  │         ┌────────┐       │  │
              │  │         │Reporter│       │  │
              │  │         └────────┘       │  │
              │  └──────────────────────────┘  │
              └───────────────┬───────────────┘
                              │
              ┌───────────────▼───────────────┐
              │         Steps 层               │
              │  ┌──────────┐  ┌───────────┐  │
              │  │ Registry │  │ @step 装饰器│ │
              │  └──────────┘  └───────────┘  │
              │  ┌────┬────┬────┬────┬─────┐  │
              │  │ IO │Vec │Ras │Ana │ Net │  │
              │  └────┴────┴────┴────┴─────┘  │
              └───────────────┬───────────────┘
                              │
              ┌───────────────▼───────────────┐
              │       Backend 层               │
              │  ┌──────────┐ ┌──────────┐    │
              │  │GdalPython│ │ GdalCli  │    │
              │  └──────────┘ └──────────┘    │
              │        ┌──────────┐           │
              │        │QgisProcess│          │
              │        └──────────┘           │
              └───────────────────────────────┘

3.2 数据流详解

一个典型的流水线执行过程如下:

YAML 文件
    │
    ▼
Parser.parse_yaml()
    │  将 YAML 文本解析为 PipelineDefinition 对象
    ▼
Validator.validate_pipeline()
    │  校验 step_id 格式、唯一性、注册表存在性、引用合法性
    ▼
Executor.execute_pipeline()
    │
    ├─── 对每个 Step:
    │    │
    │    ├── 1. 检查 when 条件(条件执行)
    │    ├── 2. Resolver.resolve_step_params() → 解析变量和引用
    │    ├── 3. Registry.get() → 查找步骤函数
    │    ├── 4. BackendManager.get() → 选择后端引擎
    │    ├── 5. 构建 StepContext
    │    ├── 6. 调用步骤函数 → 返回 StepResult
    │    ├── 7. Context.set_output() → 存储结果
    │    └── 8. 处理 on_error 策略(skip/retry/fail)
    │
    ▼
Reporter.build_report()
    │  汇总所有步骤的执行状态、耗时、输出
    ▼
JSON 执行报告

3.3 模块详解

3.3.1 CLI 层(cli.py

CLI 层是框架的用户入口,基于 Click 框架实现。它提供 8 个命令,负责:

  • 解析命令行参数
  • 调用 Engine 层的相应功能
  • 格式化并输出结果
@click.group()
@click.version_option(package_name="geopipe-agent")
def main():
    """GeoPipeAgent — AI-Native GIS Analysis Pipeline Framework."""
    pass

CLI 层的设计原则是 薄层封装——它不包含业务逻辑,仅负责参数解析和调用引擎层。这使得引擎层可以独立于 CLI 使用(例如在 Python 代码中直接调用)。

8 个 CLI 命令一览:

命令 功能 示例
run 执行 YAML 流水线 geopipe-agent run pipeline.yaml
validate 验证流水线(不执行) geopipe-agent validate pipeline.yaml
list-steps 列出所有可用步骤 geopipe-agent list-steps --category vector
describe 查看步骤详细信息 geopipe-agent describe vector.buffer
info 查看 GIS 数据文件信息 geopipe-agent info data/roads.shp
backends 列出后端可用状态 geopipe-agent backends
generate-skill-doc 生成步骤参考文档 geopipe-agent generate-skill-doc
generate-skill 生成完整 Skill 文件集 geopipe-agent generate-skill --output-dir skills/

3.3.2 Engine 层

Engine 层是框架的核心,由六个子模块组成:

Parser(解析器,engine/parser.py

负责将 YAML 文件或字符串解析为 PipelineDefinition 数据模型。

def parse_yaml(source: str | Path) -> PipelineDefinition:
    """Parse a YAML pipeline file or string into a PipelineDefinition."""

解析过程分为两步:

  1. _load_yaml():加载 YAML 内容(支持文件路径和原始字符串)
  2. _build_pipeline():将原始 dict 转换为 PipelineDefinition 对象

解析器会验证基本结构:

  • 顶层必须是字典,且包含 pipeline
  • pipeline.steps 必须是非空列表
  • 每个步骤必须有 iduse 字段

Validator(验证器,engine/validator.py

负责在执行前验证流水线的合法性:

def validate_pipeline(pipeline: PipelineDefinition) -> list[str]:
    """Validate a parsed pipeline definition. Returns warnings."""

验证内容包括:

  1. step_id 格式检查:只允许 [a-z0-9_-],不允许包含点号
  2. step_id 唯一性:每个步骤 ID 必须唯一
  3. 步骤注册表检查use 字段指定的步骤类型必须已注册
  4. 参数引用检查(递归):
    • $step_id.attr 引用的步骤必须在当前步骤之前定义
    • ${var_name} 引用的变量必须在 variables 中定义
  5. on_error 值检查:只允许 failskipretry
  6. 输出引用检查outputs 中引用的步骤必须存在

Resolver(参数解析器,engine/resolver.py

负责解析步骤参数中的变量和引用:

def resolve_step_params(pipeline, step_index, context) -> dict:
    """Resolve all parameters for a step."""

Resolver 是一个薄层封装,实际工作委托给 PipelineContext.resolve_params()

Context(执行上下文,engine/context.py

上下文管理整个流水线执行过程中的数据流转,是步骤之间传递数据的核心机制。

PipelineContext 提供以下功能:

class PipelineContext:
    def set_output(self, step_id: str, result: StepResult) -> None:
        """存储步骤结果"""

    def get_output(self, step_id: str) -> StepResult:
        """获取步骤结果"""

    def resolve(self, value: Any) -> Any:
        """解析值(变量替换、步骤引用)"""

    def resolve_params(self, params: dict) -> dict:
        """批量解析参数字典"""

引用解析规则:

格式 示例 说明
$step_id.attr $read.output 引用步骤的输出属性
${var_name} ${input_path} 替换变量值
嵌入式变量 data/${name}.shp 字符串内嵌变量替换
纯值 500 原样返回

类型保持规则:

  • 如果整个值是一个 ${var} 引用,返回变量的原始类型(数字、列表等)
  • 如果变量嵌入在字符串中,执行字符串替换

StepContext 是传递给每个步骤函数的上下文:

class StepContext:
    def param(self, name: str, default=None) -> Any:
        """获取解析后的参数"""

    def input(self, name: str = "input") -> Any:
        """获取输入数据(param('input') 的快捷方式)"""

    @property
    def backend(self) -> GeoBackend:
        """获取当前后端"""

Executor(执行器,engine/executor.py

执行器是引擎的核心,负责顺序执行流水线中的每个步骤:

def execute_pipeline(pipeline: PipelineDefinition) -> dict:
    """Execute a validated pipeline and return a JSON report."""

执行流程中的关键特性:

条件执行(when)

if step_def.when:
    if not _evaluate_condition(step_def.when, context):
        # 跳过此步骤

条件表达式通过 AST 白名单验证确保安全性,只允许比较运算、布尔运算和常量。

错误处理策略

match step_def.on_error:
    case "fail":   # 停止流水线(默认)
    case "skip":   # 跳过失败的步骤,继续执行
    case "retry":  # 重试最多 3 次,指数退避

重试机制

on_error="retry" 时,步骤最多重试 3 次,每次间隔 0.5 × 尝试次数秒(简单退避策略)。

AI 友好修复建议

def _suggest_fix(step_use: str, error: Exception) -> str | None:
    """Generate an AI-friendly fix suggestion."""
    # 例如:CRS 不匹配时建议添加 reproject 步骤
    # 文件不存在时建议检查路径

Reporter(报告生成器,engine/reporter.py

负责将执行结果汇总为 JSON 格式的报告:

def build_report(pipeline_name, status, duration, step_reports, outputs) -> dict:
    """Build a JSON-serializable execution report."""

报告结构设计为 AI 友好的扁平格式:

{
  "pipeline": "流水线名称",
  "status": "success | error",
  "duration_seconds": 1.234,
  "steps": [
    {
      "id": "step_id",
      "step": "step_type",
      "status": "success | error | skipped",
      "duration": 0.5,
      "output_summary": { ... }
    }
  ],
  "outputs": { ... }
}

3.3.3 Steps 层

Steps 层包含所有可执行的步骤,采用插件化架构设计。

注册表(StepRegistry)

注册表是一个 单例模式 的全局步骤目录:

class StepRegistry:
    _instance = None

    def __new__(cls):
        if cls._instance is None:
            cls._instance = super().__new__(cls)
            cls._instance._steps = {}
        return cls._instance

注册表提供以下操作:

方法 说明
register(info) 注册步骤
get(step_id) 按 ID 获取步骤信息
list_all() 列出所有步骤
list_by_category(cat) 按类别列出
has(step_id) 检查步骤是否存在
categories() 列出所有类别
reset() 重置注册表(测试用)

@step 装饰器

@step 装饰器是注册步骤的声明式方法:

@step(
    id="vector.buffer",
    name="矢量缓冲区分析",
    description="...",
    category="vector",
    params={...},
    outputs={...},
    backends=["gdal_python", "qgis_process"],
    examples=[...],
)
def vector_buffer(ctx: StepContext) -> StepResult:
    ...

装饰器自动完成:

  1. 如果未指定 category,从 id 中推导(如 vector.buffervector
  2. 创建 _StepInfo 对象
  3. 注册到 StepRegistry
  4. _StepInfo 附加到函数的 _step_info 属性上

步骤加载

步骤通过 load_builtin_steps() 函数加载:

def load_builtin_steps() -> None:
    """Import all built-in step modules to trigger registration."""
    import geopipe_agent.steps.io.read_vector
    import geopipe_agent.steps.io.write_vector
    # ... 共 23 个 import

每个 import 会触发模块中 @step 装饰器的执行,从而将步骤注册到注册表。

3.3.4 Backend 层

Backend 层提供实际的 GIS 操作能力,采用策略模式设计。

抽象基类(GeoBackend)

class GeoBackend(ABC):
    @abstractmethod
    def name(self) -> str: ...
    @abstractmethod
    def is_available(self) -> bool: ...
    @abstractmethod
    def buffer(self, gdf, distance, **kwargs): ...
    @abstractmethod
    def clip(self, input_gdf, clip_gdf, **kwargs): ...
    @abstractmethod
    def reproject(self, gdf, target_crs, **kwargs): ...
    @abstractmethod
    def dissolve(self, gdf, by=None, **kwargs): ...
    @abstractmethod
    def simplify(self, gdf, tolerance, **kwargs): ...
    @abstractmethod
    def overlay(self, gdf1, gdf2, how="intersection", **kwargs): ...

三种后端实现相同的接口,用户可以在步骤中指定使用特定后端:

后端 技术栈 适用场景
GdalPythonBackend GeoPandas + Shapely 默认,适合中小数据
GdalCliBackend ogr2ogr CLI 大文件处理
QgisProcessBackend qgis_process CLI 需要 QGIS 算法时

BackendManager

后端管理器负责自动检测可用后端和按需选择:

class BackendManager:
    def __init__(self):
        self.backends = []
        self._detect_available()  # 自动检测

    def get(self, preferred=None) -> GeoBackend:
        """获取后端(指定或默认第一个可用的)"""

3.3.5 Models 层

Models 层定义了框架的核心数据结构,使用 Python 的 dataclass 实现。

PipelineDefinition

@dataclass
class PipelineDefinition:
    name: str
    steps: list[StepDefinition]
    description: str = ""
    crs: str | None = None
    variables: dict = field(default_factory=dict)
    outputs: dict = field(default_factory=dict)

StepDefinition

@dataclass
class StepDefinition:
    id: str                     # 唯一标识符
    use: str                    # 步骤类型(如 "vector.buffer")
    params: dict = {}           # 参数
    when: str | None = None     # 条件执行表达式
    on_error: str = "fail"      # 错误处理策略
    backend: str | None = None  # 指定后端

StepResult

@dataclass
class StepResult:
    output: Any = None          # 主要输出(通常是 GeoDataFrame)
    stats: dict = {}            # 统计信息
    metadata: dict = {}         # 元数据

StepResult 实现了 __getattr__ 方法,允许通过属性访问 stats 和 metadata 中的键,这是 $step_id.attr 引用机制的基础。

3.3.6 错误处理体系

GeoPipeAgent 设计了完善的异常层次结构:

GeopipeAgentError (基类)
├── PipelineParseError       # YAML 解析错误
├── PipelineValidationError  # 流水线验证错误
├── StepExecutionError       # 步骤执行错误(含 suggestion)
├── BackendNotAvailableError # 后端不可用
├── StepNotFoundError        # 步骤未注册
└── VariableResolutionError  # 变量/引用解析错误

StepExecutionError 是最关键的异常类,它包含了 AI 友好的属性:

class StepExecutionError(GeopipeAgentError):
    def __init__(self, step_id, message, cause=None, suggestion=None):
        self.step_id = step_id
        self.suggestion = suggestion  # AI 修复建议
        self.cause = cause

    def to_dict(self) -> dict:
        """转为 JSON,便于 AI 解析"""

3.3.7 Utils 层

日志系统(utils/logging.py

支持两种日志格式:

  • 文本格式(默认):适合人类阅读
  • JSON 格式(--json-log):适合机器解析
def setup_logging(level: str = "INFO", json_format: bool = False):
    ...

CRS 工具(utils/crs.py

提供 CRS 标准化函数:

def normalize_crs(crs_input: str) -> str:
    """将各种 CRS 表示归一化为 EPSG 代码"""

3.4 设计原则

GeoPipeAgent 的架构设计遵循以下原则:

3.4.1 AI 优先(AI-First)

所有 API 设计都以 AI 的使用习惯为核心:

  • YAML 作为输入格式(结构化、AI 易于生成)
  • JSON 作为输出格式(AI 易于解析)
  • 错误信息包含修复建议(AI 可自动纠错)
  • Skill 文件自动生成(AI 可自主学习框架能力)

3.4.2 声明式优于命令式

用户(包括 AI)只需要声明”做什么”,不需要关心”怎么做”:

  • 流水线是声明式的 YAML 配置
  • 步骤参数是声明式的键值对
  • 后端选择可以是自动的

3.4.3 插件化架构

所有步骤通过 @step 装饰器注册,框架不硬编码任何步骤逻辑:

  • 添加新步骤只需创建一个新文件并使用装饰器
  • 步骤可以按类别组织
  • 步骤的元信息可用于自动生成文档

3.4.4 关注点分离

每个模块有明确的单一职责:

  • Parser 只负责解析
  • Validator 只负责验证
  • Executor 只负责执行
  • Reporter 只负责报告

3.4.5 安全优先

对于涉及动态代码执行的场景(如 when 条件表达式、raster.calc 波段计算),统一使用 AST 白名单验证 而非简单的 eval()

# 安全的做法:先验证 AST,再执行
tree = ast.parse(expression, mode="eval")
for node in ast.walk(tree):
    if not isinstance(node, _SAFE_NODES):
        raise ValueError("不安全的表达式")
result = eval(compile(tree, ...))

3.5 模块依赖关系

cli.py
  └── engine/
        ├── parser.py → models/pipeline.py
        ├── validator.py → steps/registry.py
        ├── executor.py
        │     ├── context.py → models/result.py
        │     ├── resolver.py → context.py
        │     ├── reporter.py
        │     ├── steps/registry.py
        │     └── backends/
        └── reporter.py

steps/
  ├── registry.py(单例)
  ├── decorators.py → registry.py
  └── io/ vector/ raster/ analysis/ network/
        └── 每个步骤文件 → decorators.py, context.py, result.py

backends/
  ├── base.py(抽象基类)
  ├── gdal_python.py → base.py
  ├── gdal_cli.py → base.py
  └── qgis_process.py → base.py

框架没有循环依赖,依赖方向清晰:上层依赖下层,同层之间不互相依赖。