第五章:数据模型与类型系统
5.1 概述
GeoPipeAgent 的数据模型层定义了框架内部传递的核心数据结构。所有模型使用 Python 的 dataclass 装饰器实现,确保简洁性和可序列化性。
数据模型主要包含以下三个核心类:
models/
├── pipeline.py
│ ├── PipelineDefinition # 流水线定义
│ └── StepDefinition # 步骤定义
└── result.py
└── StepResult # 步骤执行结果
5.2 PipelineDefinition(流水线定义)
PipelineDefinition 是 YAML 流水线文件解析后的内存表示:
from dataclasses import dataclass, field
@dataclass
class PipelineDefinition:
name: str # 流水线名称
steps: list[StepDefinition] # 步骤列表(有序)
description: str = "" # 描述
crs: str | None = None # 默认 CRS
variables: dict = field(default_factory=dict) # 变量
outputs: dict = field(default_factory=dict) # 输出声明
5.2.1 字段说明
| 字段 | 类型 | 默认值 | 说明 |
|---|---|---|---|
name |
str |
(必需) | 流水线名称,用于报告标识 |
steps |
list[StepDefinition] |
(必需) | 步骤列表,按顺序执行 |
description |
str |
"" |
流水线描述 |
crs |
str \| None |
None |
默认坐标参考系统 |
variables |
dict |
{} |
变量映射 |
outputs |
dict |
{} |
输出引用声明 |
5.2.2 从 YAML 到 PipelineDefinition
Parser 将以下 YAML 内容:
pipeline:
name: "缓冲区分析"
description: "对道路进行缓冲区分析"
crs: "EPSG:4326"
variables:
dist: 500
steps:
- id: read
use: io.read_vector
params:
path: "data/roads.shp"
outputs:
result: "$read.output"
解析为:
PipelineDefinition(
name="缓冲区分析",
description="对道路进行缓冲区分析",
crs="EPSG:4326",
variables={"dist": 500},
steps=[
StepDefinition(id="read", use="io.read_vector", params={"path": "data/roads.shp"})
],
outputs={"result": "$read.output"}
)
5.3 StepDefinition(步骤定义)
StepDefinition 描述流水线中的单个步骤:
@dataclass
class StepDefinition:
id: str # 唯一标识符
use: str # 步骤类型(registry ID)
params: dict = field(default_factory=dict) # 参数
when: str | None = None # 条件执行表达式
on_error: str = "fail" # 错误策略:fail/skip/retry
backend: str | None = None # 指定后端
def __post_init__(self):
if self.params is None:
self.params = {}
5.3.1 字段说明
| 字段 | 类型 | 默认值 | 说明 |
|---|---|---|---|
id |
str |
(必需) | 唯一标识符,格式 [a-z0-9_-]+ |
use |
str |
(必需) | 步骤类型,如 vector.buffer |
params |
dict |
{} |
参数字典,值可包含引用 |
when |
str \| None |
None |
条件执行表达式 |
on_error |
str |
"fail" |
错误处理策略 |
backend |
str \| None |
None |
指定使用的后端 |
5.3.2 post_init 的作用
__post_init__ 方法确保 params 永远不为 None。当 YAML 中步骤没有 params 字段时,解析结果可能是 None,这里统一转换为空字典,避免后续代码需要处理 None 的情况。
5.4 StepResult(步骤结果)
StepResult 是步骤函数的返回值,包含执行结果和元信息:
from typing import Any
@dataclass
class StepResult:
output: Any = None # 主要输出数据
stats: dict = field(default_factory=dict) # 统计信息
metadata: dict = field(default_factory=dict) # 元数据
5.4.1 output 字段
output 是步骤的主要输出数据,类型取决于步骤类型:
| 步骤类别 | output 类型 | 说明 |
|---|---|---|
io.read_vector |
GeoDataFrame |
矢量数据 |
io.write_vector |
str |
输出文件路径 |
io.read_raster |
dict |
栅格数据信息字典 |
vector.* |
GeoDataFrame |
处理后的矢量数据 |
raster.* |
dict |
处理后的栅格数据信息 |
analysis.* |
GeoDataFrame 或 dict |
分析结果 |
network.* |
GeoDataFrame 或 dict |
网络分析结果 |
5.4.2 stats 字段
stats 存储步骤的统计信息,例如:
StepResult(
output=result_gdf,
stats={
"feature_count": 100,
"total_area": 12345.67,
}
)
常见的 stats 键:
| 键 | 说明 | 出现在 |
|---|---|---|
feature_count |
要素数量 | 大多数矢量步骤 |
total_area |
总面积 | buffer、dissolve |
total_length |
总长度 | simplify |
expression |
使用的表达式 | query、raster.calc |
min/max/mean |
统计值 | raster.stats |
5.4.3 metadata 字段
metadata 存储步骤的元数据信息,如 CRS、数据源等。
5.4.4 属性访问(getattr)
StepResult 实现了自定义的 __getattr__ 方法,允许通过属性语法访问 stats 和 metadata 中的键:
def __getattr__(self, name: str) -> Any:
if name.startswith("_"):
raise AttributeError(name)
if name in self.stats:
return self.stats[name]
if name in self.metadata:
return self.metadata[name]
raise AttributeError(
f"StepResult has no attribute '{name}'. "
f"Available: output, stats={list(self.stats.keys())}, "
f"metadata={list(self.metadata.keys())}"
)
这使得以下引用成为可能:
# 在 YAML 中引用
when: "$buffer.feature_count > 0" # 等价于 buffer_result.stats["feature_count"]
input: "$buffer.output" # 直接访问 output 属性
查找顺序:
- 先在
stats字典中查找 - 然后在
metadata字典中查找 - 都找不到则抛出
AttributeError
5.4.5 summary() 方法
summary() 方法生成用于 JSON 报告的摘要信息:
def summary(self) -> dict:
summary = {}
if self.output is not None:
try:
# GeoDataFrame 摘要
summary["feature_count"] = len(self.output)
if hasattr(self.output, "crs") and self.output.crs:
summary["crs"] = str(self.output.crs)
if hasattr(self.output, "geometry"):
summary["geometry_types"] = list(
self.output.geometry.geom_type.unique()
)
except (TypeError, AttributeError):
summary["type"] = type(self.output).__name__
summary.update(self.stats)
return summary
例如:
result = StepResult(
output=gdf, # 100 个 Point 要素,CRS=EPSG:4326
stats={"total_area": 12345.67}
)
result.summary()
# {
# "feature_count": 100,
# "crs": "EPSG:4326",
# "geometry_types": ["Point"],
# "total_area": 12345.67
# }
5.5 _StepInfo(步骤元信息)
_StepInfo 是注册表中存储的步骤元数据:
class _StepInfo:
def __init__(
self,
id: str, # 步骤 ID(如 "vector.buffer")
func: Callable, # 步骤函数
name: str = "", # 显示名称
description: str = "", # 描述
category: str = "", # 类别
params: dict | None = None, # 参数 schema
outputs: dict | None = None, # 输出 schema
backends: list[str] | None = None, # 支持的后端
examples: list[dict] | None = None, # 使用示例
):
...
5.5.1 params schema 格式
每个参数的 schema 是一个字典:
params={
"input": {
"type": "geodataframe", # 参数类型
"required": True, # 是否必需
"description": "输入矢量数据",
},
"distance": {
"type": "number",
"required": True,
"description": "缓冲区距离",
},
"cap_style": {
"type": "string",
"required": False,
"default": "round",
"enum": ["round", "flat", "square"],
"description": "端点样式",
},
}
支持的参数类型字符串:
| type 值 | 对应的 Python 类型 | 说明 |
|---|---|---|
geodataframe |
GeoDataFrame |
矢量数据 |
raster_info |
dict |
栅格数据信息 |
string |
str |
字符串 |
number |
int \| float |
数值 |
boolean |
bool |
布尔值 |
dict |
dict |
字典 |
list |
list |
列表 |
5.5.2 to_dict() 序列化
_StepInfo 提供 to_dict() 方法用于 JSON 序列化:
def to_dict(self) -> dict:
return {
"id": self.id,
"name": self.name,
"description": self.description,
"category": self.category,
"params": self.params,
"outputs": self.outputs,
"backends": self.backends,
"examples": self.examples,
}
这在 CLI 的 describe 和 list-steps --format json 命令中使用,也用于 Skill 文件生成。
5.6 PipelineContext 与 StepContext
5.6.1 PipelineContext
PipelineContext 管理流水线执行期间的全局状态:
class PipelineContext:
def __init__(self, variables: dict | None = None):
self.variables: dict = dict(variables or {})
self._step_outputs: dict[str, StepResult] = {}
核心方法:
| 方法 | 说明 |
|---|---|
set_output(step_id, result) |
存储步骤结果 |
get_output(step_id) |
获取步骤结果 |
resolve(value) |
解析单个值(变量替换、步骤引用) |
resolve_params(params) |
批量解析参数字典(支持嵌套) |
resolve_params 递归处理嵌套结构:
def resolve_params(self, params: dict) -> dict:
resolved = {}
for key, value in params.items():
if isinstance(value, dict):
resolved[key] = self.resolve_params(value) # 递归处理字典
elif isinstance(value, list):
resolved[key] = [self.resolve(v) for v in value] # 处理列表
else:
resolved[key] = self.resolve(value) # 处理标量
return resolved
5.6.2 StepContext
StepContext 是传递给每个步骤函数的参数容器:
class StepContext:
def __init__(self, params, backend=None, pipeline_context=None):
self._params = params # 已解析的参数
self.backend = backend # 当前后端实例
self.pipeline_context = pipeline_context
def param(self, name, default=None):
"""获取参数值"""
return self._params.get(name, default)
def input(self, name="input"):
"""获取输入数据(快捷方式)"""
return self._params.get(name)
@property
def params(self):
return self._params
在步骤函数中的典型用法:
@step(id="vector.buffer", ...)
def vector_buffer(ctx: StepContext) -> StepResult:
gdf = ctx.input("input") # 获取输入数据
distance = ctx.param("distance") # 获取参数
cap_style = ctx.param("cap_style", "round") # 获取带默认值的参数
result = ctx.backend.buffer(gdf, distance, cap_style=cap_style)
return StepResult(output=result, stats={...})
5.7 错误类型
所有异常继承自 GeopipeAgentError 基类:
class GeopipeAgentError(Exception):
"""基类异常"""
class PipelineParseError(GeopipeAgentError):
"""YAML 解析错误"""
class PipelineValidationError(GeopipeAgentError):
"""流水线验证错误"""
class StepExecutionError(GeopipeAgentError):
"""步骤执行错误"""
def __init__(self, step_id, message, cause=None, suggestion=None):
self.step_id = step_id # 失败的步骤 ID
self.suggestion = suggestion # AI 修复建议
self.cause = cause # 原始异常
def to_dict(self) -> dict:
"""转为 JSON 格式(AI 友好)"""
result = {
"error": "StepExecutionError",
"step_id": self.step_id,
"message": str(self),
}
if self.suggestion:
result["suggestion"] = self.suggestion
if self.cause:
result["cause"] = str(self.cause)
return result
class BackendNotAvailableError(GeopipeAgentError):
"""后端不可用"""
class StepNotFoundError(GeopipeAgentError):
"""步骤未注册"""
class VariableResolutionError(GeopipeAgentError):
"""变量或引用解析错误"""
5.7.1 AI 友好的错误输出
当执行出错时,CLI 会将错误以 JSON 格式输出到 stderr:
{
"error": "StepExecutionError",
"step_id": "buffer",
"message": "CRS mismatch: input data uses geographic CRS (degrees)",
"suggestion": "Add a vector.reproject step before this step to convert to a projected CRS."
}
这种设计使得 AI Agent 可以解析错误信息并自动修正流水线。
5.8 数据流转总结
YAML 文件
│
▼ parse_yaml()
PipelineDefinition
│
├── name, description, crs
├── variables: dict
├── steps: list[StepDefinition]
│ │
│ ├── id, use, params, when, on_error, backend
│ │
│ ▼ execute_pipeline()
│ PipelineContext
│ │
│ ├── variables: dict(全局变量)
│ └── _step_outputs: dict[str, StepResult]
│ │
│ ├── output: Any(数据)
│ ├── stats: dict(统计)
│ └── metadata: dict(元数据)
│
└── outputs: dict
│
▼ build_report()
JSON 报告