znlgis 博客

GIS开发与技术分享

第五章:数据模型与类型系统

5.1 概述

GeoPipeAgent 的数据模型层定义了框架内部传递的核心数据结构。所有模型使用 Python 的 dataclass 装饰器实现,确保简洁性和可序列化性。

数据模型主要包含以下三个核心类:

models/
├── pipeline.py
│   ├── PipelineDefinition   # 流水线定义
│   └── StepDefinition       # 步骤定义
└── result.py
    └── StepResult           # 步骤执行结果

5.2 PipelineDefinition(流水线定义)

PipelineDefinition 是 YAML 流水线文件解析后的内存表示:

from dataclasses import dataclass, field

@dataclass
class PipelineDefinition:
    name: str                           # 流水线名称
    steps: list[StepDefinition]         # 步骤列表(有序)
    description: str = ""               # 描述
    crs: str | None = None              # 默认 CRS
    variables: dict = field(default_factory=dict)   # 变量
    outputs: dict = field(default_factory=dict)      # 输出声明

5.2.1 字段说明

字段 类型 默认值 说明
name str (必需) 流水线名称,用于报告标识
steps list[StepDefinition] (必需) 步骤列表,按顺序执行
description str "" 流水线描述
crs str \| None None 默认坐标参考系统
variables dict {} 变量映射
outputs dict {} 输出引用声明

5.2.2 从 YAML 到 PipelineDefinition

Parser 将以下 YAML 内容:

pipeline:
  name: "缓冲区分析"
  description: "对道路进行缓冲区分析"
  crs: "EPSG:4326"
  variables:
    dist: 500
  steps:
    - id: read
      use: io.read_vector
      params:
        path: "data/roads.shp"
  outputs:
    result: "$read.output"

解析为:

PipelineDefinition(
    name="缓冲区分析",
    description="对道路进行缓冲区分析",
    crs="EPSG:4326",
    variables={"dist": 500},
    steps=[
        StepDefinition(id="read", use="io.read_vector", params={"path": "data/roads.shp"})
    ],
    outputs={"result": "$read.output"}
)

5.3 StepDefinition(步骤定义)

StepDefinition 描述流水线中的单个步骤:

@dataclass
class StepDefinition:
    id: str                         # 唯一标识符
    use: str                        # 步骤类型(registry ID)
    params: dict = field(default_factory=dict)   # 参数
    when: str | None = None         # 条件执行表达式
    on_error: str = "fail"          # 错误策略:fail/skip/retry
    backend: str | None = None      # 指定后端

    def __post_init__(self):
        if self.params is None:
            self.params = {}

5.3.1 字段说明

字段 类型 默认值 说明
id str (必需) 唯一标识符,格式 [a-z0-9_-]+
use str (必需) 步骤类型,如 vector.buffer
params dict {} 参数字典,值可包含引用
when str \| None None 条件执行表达式
on_error str "fail" 错误处理策略
backend str \| None None 指定使用的后端

5.3.2 post_init 的作用

__post_init__ 方法确保 params 永远不为 None。当 YAML 中步骤没有 params 字段时,解析结果可能是 None,这里统一转换为空字典,避免后续代码需要处理 None 的情况。

5.4 StepResult(步骤结果)

StepResult 是步骤函数的返回值,包含执行结果和元信息:

from typing import Any

@dataclass
class StepResult:
    output: Any = None              # 主要输出数据
    stats: dict = field(default_factory=dict)    # 统计信息
    metadata: dict = field(default_factory=dict) # 元数据

5.4.1 output 字段

output 是步骤的主要输出数据,类型取决于步骤类型:

步骤类别 output 类型 说明
io.read_vector GeoDataFrame 矢量数据
io.write_vector str 输出文件路径
io.read_raster dict 栅格数据信息字典
vector.* GeoDataFrame 处理后的矢量数据
raster.* dict 处理后的栅格数据信息
analysis.* GeoDataFramedict 分析结果
network.* GeoDataFramedict 网络分析结果

5.4.2 stats 字段

stats 存储步骤的统计信息,例如:

StepResult(
    output=result_gdf,
    stats={
        "feature_count": 100,
        "total_area": 12345.67,
    }
)

常见的 stats 键:

说明 出现在
feature_count 要素数量 大多数矢量步骤
total_area 总面积 buffer、dissolve
total_length 总长度 simplify
expression 使用的表达式 query、raster.calc
min/max/mean 统计值 raster.stats

5.4.3 metadata 字段

metadata 存储步骤的元数据信息,如 CRS、数据源等。

5.4.4 属性访问(getattr

StepResult 实现了自定义的 __getattr__ 方法,允许通过属性语法访问 stats 和 metadata 中的键:

def __getattr__(self, name: str) -> Any:
    if name.startswith("_"):
        raise AttributeError(name)
    if name in self.stats:
        return self.stats[name]
    if name in self.metadata:
        return self.metadata[name]
    raise AttributeError(
        f"StepResult has no attribute '{name}'. "
        f"Available: output, stats={list(self.stats.keys())}, "
        f"metadata={list(self.metadata.keys())}"
    )

这使得以下引用成为可能:

# 在 YAML 中引用
when: "$buffer.feature_count > 0"     # 等价于 buffer_result.stats["feature_count"]
input: "$buffer.output"                # 直接访问 output 属性

查找顺序:

  1. 先在 stats 字典中查找
  2. 然后在 metadata 字典中查找
  3. 都找不到则抛出 AttributeError

5.4.5 summary() 方法

summary() 方法生成用于 JSON 报告的摘要信息:

def summary(self) -> dict:
    summary = {}
    if self.output is not None:
        try:
            # GeoDataFrame 摘要
            summary["feature_count"] = len(self.output)
            if hasattr(self.output, "crs") and self.output.crs:
                summary["crs"] = str(self.output.crs)
            if hasattr(self.output, "geometry"):
                summary["geometry_types"] = list(
                    self.output.geometry.geom_type.unique()
                )
        except (TypeError, AttributeError):
            summary["type"] = type(self.output).__name__
    summary.update(self.stats)
    return summary

例如:

result = StepResult(
    output=gdf,  # 100 个 Point 要素,CRS=EPSG:4326
    stats={"total_area": 12345.67}
)

result.summary()
# {
#     "feature_count": 100,
#     "crs": "EPSG:4326",
#     "geometry_types": ["Point"],
#     "total_area": 12345.67
# }

5.5 _StepInfo(步骤元信息)

_StepInfo 是注册表中存储的步骤元数据:

class _StepInfo:
    def __init__(
        self,
        id: str,            # 步骤 ID(如 "vector.buffer")
        func: Callable,     # 步骤函数
        name: str = "",     # 显示名称
        description: str = "",  # 描述
        category: str = "", # 类别
        params: dict | None = None,    # 参数 schema
        outputs: dict | None = None,   # 输出 schema
        backends: list[str] | None = None,  # 支持的后端
        examples: list[dict] | None = None, # 使用示例
    ):
        ...

5.5.1 params schema 格式

每个参数的 schema 是一个字典:

params={
    "input": {
        "type": "geodataframe",    # 参数类型
        "required": True,          # 是否必需
        "description": "输入矢量数据",
    },
    "distance": {
        "type": "number",
        "required": True,
        "description": "缓冲区距离",
    },
    "cap_style": {
        "type": "string",
        "required": False,
        "default": "round",
        "enum": ["round", "flat", "square"],
        "description": "端点样式",
    },
}

支持的参数类型字符串:

type 值 对应的 Python 类型 说明
geodataframe GeoDataFrame 矢量数据
raster_info dict 栅格数据信息
string str 字符串
number int \| float 数值
boolean bool 布尔值
dict dict 字典
list list 列表

5.5.2 to_dict() 序列化

_StepInfo 提供 to_dict() 方法用于 JSON 序列化:

def to_dict(self) -> dict:
    return {
        "id": self.id,
        "name": self.name,
        "description": self.description,
        "category": self.category,
        "params": self.params,
        "outputs": self.outputs,
        "backends": self.backends,
        "examples": self.examples,
    }

这在 CLI 的 describelist-steps --format json 命令中使用,也用于 Skill 文件生成。

5.6 PipelineContext 与 StepContext

5.6.1 PipelineContext

PipelineContext 管理流水线执行期间的全局状态:

class PipelineContext:
    def __init__(self, variables: dict | None = None):
        self.variables: dict = dict(variables or {})
        self._step_outputs: dict[str, StepResult] = {}

核心方法:

方法 说明
set_output(step_id, result) 存储步骤结果
get_output(step_id) 获取步骤结果
resolve(value) 解析单个值(变量替换、步骤引用)
resolve_params(params) 批量解析参数字典(支持嵌套)

resolve_params 递归处理嵌套结构:

def resolve_params(self, params: dict) -> dict:
    resolved = {}
    for key, value in params.items():
        if isinstance(value, dict):
            resolved[key] = self.resolve_params(value)    # 递归处理字典
        elif isinstance(value, list):
            resolved[key] = [self.resolve(v) for v in value]  # 处理列表
        else:
            resolved[key] = self.resolve(value)            # 处理标量
    return resolved

5.6.2 StepContext

StepContext 是传递给每个步骤函数的参数容器:

class StepContext:
    def __init__(self, params, backend=None, pipeline_context=None):
        self._params = params       # 已解析的参数
        self.backend = backend      # 当前后端实例
        self.pipeline_context = pipeline_context

    def param(self, name, default=None):
        """获取参数值"""
        return self._params.get(name, default)

    def input(self, name="input"):
        """获取输入数据(快捷方式)"""
        return self._params.get(name)

    @property
    def params(self):
        return self._params

在步骤函数中的典型用法:

@step(id="vector.buffer", ...)
def vector_buffer(ctx: StepContext) -> StepResult:
    gdf = ctx.input("input")           # 获取输入数据
    distance = ctx.param("distance")    # 获取参数
    cap_style = ctx.param("cap_style", "round")  # 获取带默认值的参数

    result = ctx.backend.buffer(gdf, distance, cap_style=cap_style)

    return StepResult(output=result, stats={...})

5.7 错误类型

所有异常继承自 GeopipeAgentError 基类:

class GeopipeAgentError(Exception):
    """基类异常"""

class PipelineParseError(GeopipeAgentError):
    """YAML 解析错误"""

class PipelineValidationError(GeopipeAgentError):
    """流水线验证错误"""

class StepExecutionError(GeopipeAgentError):
    """步骤执行错误"""
    def __init__(self, step_id, message, cause=None, suggestion=None):
        self.step_id = step_id         # 失败的步骤 ID
        self.suggestion = suggestion   # AI 修复建议
        self.cause = cause             # 原始异常

    def to_dict(self) -> dict:
        """转为 JSON 格式(AI 友好)"""
        result = {
            "error": "StepExecutionError",
            "step_id": self.step_id,
            "message": str(self),
        }
        if self.suggestion:
            result["suggestion"] = self.suggestion
        if self.cause:
            result["cause"] = str(self.cause)
        return result

class BackendNotAvailableError(GeopipeAgentError):
    """后端不可用"""

class StepNotFoundError(GeopipeAgentError):
    """步骤未注册"""

class VariableResolutionError(GeopipeAgentError):
    """变量或引用解析错误"""

5.7.1 AI 友好的错误输出

当执行出错时,CLI 会将错误以 JSON 格式输出到 stderr:

{
  "error": "StepExecutionError",
  "step_id": "buffer",
  "message": "CRS mismatch: input data uses geographic CRS (degrees)",
  "suggestion": "Add a vector.reproject step before this step to convert to a projected CRS."
}

这种设计使得 AI Agent 可以解析错误信息并自动修正流水线。

5.8 数据流转总结

YAML 文件
    │
    ▼ parse_yaml()
PipelineDefinition
    │
    ├── name, description, crs
    ├── variables: dict
    ├── steps: list[StepDefinition]
    │       │
    │       ├── id, use, params, when, on_error, backend
    │       │
    │       ▼ execute_pipeline()
    │   PipelineContext
    │       │
    │       ├── variables: dict(全局变量)
    │       └── _step_outputs: dict[str, StepResult]
    │                               │
    │                               ├── output: Any(数据)
    │                               ├── stats: dict(统计)
    │                               └── metadata: dict(元数据)
    │
    └── outputs: dict
            │
            ▼ build_report()
        JSON 报告