第十五章:数据模型与错误体系
15.1 概述
GeoPipeAgent 的数据模型(src/geopipe_agent/models/)和错误体系(src/geopipe_agent/errors.py)定义了框架内部数据流转的数据结构和异常层级,是理解框架行为的基础。
15.2 数据模型
15.2.1 PipelineDefinition(models/pipeline.py)
表示一个已解析的流水线配置:
from pydantic import BaseModel, field_validator
class StepDefinition(BaseModel):
"""单个步骤的配置。"""
id: str # 步骤唯一 ID
use: str # 步骤类型(如 "vector.buffer")
params: dict = {} # 步骤参数(含引用字符串)
when: str | None = None # 条件表达式
on_error: str = "fail" # 错误策略:fail / skip / retry
backend: str | None = None # 指定后端(None 使用默认)
@field_validator("on_error")
@classmethod
def validate_on_error(cls, v):
allowed = {"fail", "skip", "retry"}
if v not in allowed:
raise ValueError(f"on_error 必须是 {allowed} 之一,当前值:{v!r}")
return v
class PipelineDefinition(BaseModel):
"""完整的流水线配置模型。"""
name: str # 流水线名称(必填)
description: str | None = None # 可选描述
crs: str | None = None # 默认 CRS
variables: dict = {} # 全局变量
steps: list[StepDefinition] = [] # 步骤列表
outputs: dict[str, str] = {} # 输出映射 {名称: 步骤引用}
Pydantic 的作用:
- 自动进行类型检查和类型转换
- 在模型实例化时(parse_yaml 阶段)及早发现配置错误
- 提供清晰的校验错误信息
15.2.2 StepResult(models/result.py)
每个步骤执行后返回的结果对象:
from dataclasses import dataclass, field
from typing import Any
@dataclass
class StepResult:
"""步骤执行结果。"""
output: Any = None # 主要输出(GeoDataFrame、raster dict 等)
stats: dict = field(default_factory=dict) # 统计信息
issues: list["QcIssue"] = field(default_factory=list) # 质检问题列表
# 快捷属性(与 output 同步)
@property
def crs(self) -> str | None:
"""获取输出数据的 CRS。"""
if hasattr(self.output, "crs") and self.output is not None:
return str(self.output.crs)
return self.stats.get("crs")
@property
def feature_count(self) -> int:
"""获取输出数据的要素数量。"""
if hasattr(self.output, "__len__"):
return len(self.output)
return self.stats.get("feature_count", 0)
@property
def geometry_type(self) -> str | None:
"""获取几何类型。"""
return self.stats.get("geometry_type")
# 质检结果快捷属性
@property
def issue_count(self) -> int:
return len(self.issues)
@property
def passed(self) -> bool:
return self.issue_count == 0
def summary(self) -> dict:
"""生成 JSON 可序列化的摘要,用于报告。"""
return {
"type": type(self.output).__name__ if self.output is not None else "None",
"feature_count": self.feature_count,
"crs": self.crs,
**self.stats,
}
StepResult 的设计意图:
- 解耦输出与元数据:
output是实际的数据(GeoDataFrame 等),stats是关于该数据的统计摘要。 - 统一质检结果:QC 步骤的问题列表通过
issues字段传递,issue_count和passed快捷属性便于在when表达式中引用。 - 可序列化摘要:
summary()方法返回 JSON 可序列化的摘要,用于报告生成。
15.2.3 QcIssue(models/qc.py)
数据质检步骤报告问题的标准数据结构:
from dataclasses import dataclass
from typing import Any
@dataclass
class QcIssue:
"""质检发现的单个问题。"""
type: str # 问题类型(如 "self_intersection"、"missing_crs")
message: str # 问题描述(人类可读)
fid: Any = None # 相关要素的 ID(矢量质检)
location: Any = None # 问题的地理位置(如有)
severity: str = "error" # 严重程度:error / warning / info
def to_dict(self) -> dict:
"""转为字典,用于 JSON 报告。"""
return {
"type": self.type,
"message": self.message,
"fid": self.fid,
"severity": self.severity,
}
QC 步骤使用 QcIssue 的方式:
@step(id="qc.geometry_validity", ...)
def geometry_validity_step(ctx: StepContext) -> StepResult:
gdf = ctx.input()
issues = []
for idx, row in gdf.iterrows():
if row.geometry is None:
issues.append(QcIssue(
type="empty_geometry",
message=f"要素 {idx} 的几何为空",
fid=idx,
severity="error"
))
elif not row.geometry.is_valid:
issues.append(QcIssue(
type="self_intersection",
message=f"要素 {idx} 存在自相交",
fid=idx,
severity="error"
))
return StepResult(
output=gdf,
stats={"checked_count": len(gdf), "issue_count": len(issues)},
issues=issues
)
15.3 错误体系(errors.py)
错误继承结构
BaseException
└── Exception
└── GeopipeAgentError # 框架基类异常
├── PipelineParseError # YAML 解析/模型构建错误
├── PipelineValidationError # 流水线语义校验错误
├── StepExecutionError # 步骤执行错误(含修复建议)
├── VariableResolutionError # 变量/步骤引用解析错误
└── BackendError # 后端不可用或执行错误
GeopipeAgentError(基类)
class GeopipeAgentError(Exception):
"""GeoPipeAgent 所有自定义异常的基类。"""
def to_dict(self) -> dict:
"""转为 JSON 可序列化的错误字典,用于输出到 stderr。"""
return {
"error": type(self).__name__,
"message": str(self),
}
PipelineParseError
在 parser.py 中抛出,表示 YAML 文件解析失败:
class PipelineParseError(GeopipeAgentError):
pass
# 抛出示例
raise PipelineParseError(
f"无法解析流水线文件 '{file_path}':{yaml_error}"
)
触发场景:
- YAML 语法错误(缩进错误、特殊字符等)
- 必填字段缺失(如
name字段未填写) - 字段类型错误(如
steps不是列表)
PipelineValidationError
在 validator.py 中抛出,表示流水线语义错误:
class PipelineValidationError(GeopipeAgentError):
pass
# 抛出示例
raise PipelineValidationError(
f"步骤 ID '{step_id}' 重复。流水线中每个步骤的 ID 必须唯一。"
)
触发场景:
- 步骤 ID 重复
- 步骤引用不存在或循环引用
StepExecutionError
在 executor.py 中抛出,是最常见的运行时错误:
class StepExecutionError(GeopipeAgentError):
def __init__(
self,
step_id: str,
message: str,
cause: Exception | None = None,
suggestion: str | None = None, # AI 友好的修复建议
):
self.step_id = step_id
self.message = message
self.cause = cause
self.suggestion = suggestion
super().__init__(f"步骤 '{step_id}' 执行失败:{message}")
def to_dict(self) -> dict:
result = {
"error": "StepExecutionError",
"step_id": self.step_id,
"message": self.message,
}
if self.suggestion:
result["suggestion"] = self.suggestion
if self.cause:
result["cause"] = str(self.cause)
return result
to_dict() 输出示例:
{
"error": "StepExecutionError",
"step_id": "buffer",
"message": "步骤 'buffer' 执行失败:CRS mismatch — input data uses geographic CRS (degrees) but distance expects meters",
"suggestion": "Add a vector.reproject step before this step to convert to a projected CRS.",
"cause": "ValueError: distance should be in the same units as CRS"
}
VariableResolutionError
在 context.py 中抛出,表示变量或步骤引用无法解析:
class VariableResolutionError(GeopipeAgentError):
pass
# 抛出示例(变量未定义)
raise VariableResolutionError(
f"变量 '$' 未定义。可用变量:{list(self.variables.keys())}"
)
# 抛出示例(步骤引用不存在)
raise VariableResolutionError(
f"步骤引用 '$load-roads' 失败:步骤 'load-roads' 无输出。"
f"可用步骤输出:{list(self._step_outputs.keys())}"
)
BackendError
在后端模块中抛出,表示后端不可用或后端命令执行失败:
class BackendError(GeopipeAgentError):
def __init__(self, backend_name: str, message: str):
self.backend_name = backend_name
super().__init__(f"后端 '{backend_name}' 错误:{message}")
15.4 错误处理最佳实践
在 CLI 层面
CLI(cli.py)统一捕获框架异常,以 JSON 格式输出到 stderr:
try:
report = execute_pipeline(pipeline)
click.echo(json.dumps(report, indent=2)) # 成功:报告输出到 stdout
except GeopipeAgentError as e:
# 已知框架错误:JSON 格式输出到 stderr
click.echo(json.dumps(e.to_dict(), indent=2), err=True)
sys.exit(1)
except Exception as e:
# 未知错误:包装后输出
click.echo(json.dumps({"error": "UnexpectedError", "message": str(e)}), err=True)
sys.exit(1)
这种设计保证:
- 成功时:stdout 是 JSON 报告,便于脚本/AI 解析
- 失败时:stderr 是 JSON 错误,stdout 为空
AI 友好的错误信息
所有错误信息(特别是 StepExecutionError 的 suggestion 字段)都以 AI 可理解的方式编写,便于 AI 自动诊断和修复:
{
"error": "StepExecutionError",
"step_id": "topology-check",
"message": "topology analysis failed: memory overflow",
"suggestion": "Clip the input to a smaller area first, or increase system memory."
}
AI 可以解析这个错误信息,自动在流水线中添加 vector.clip 步骤来解决问题。
15.5 数据流转图
YAML 文件
│ parse_yaml()
▼
PipelineDefinition (Pydantic 模型)
│ execute_pipeline()
▼
for each StepDefinition:
│ resolve_params() ──────► PipelineContext
│ (variables + step_outputs)
│ step_func(StepContext)
│ ──────► StepResult
│ ├── output: GeoDataFrame/raster
│ ├── stats: dict
│ └── issues: list[QcIssue]
│ set_output(step_id, result)
▼
build_report()
│
▼
JSON 报告(dict)
15.6 小结
本章介绍了 GeoPipeAgent 的数据模型和错误体系:
数据模型:
PipelineDefinition:流水线配置(Pydantic 模型,含StepDefinition列表)StepResult:步骤执行结果(output+stats+issues),提供crs/feature_count/passed等快捷属性QcIssue:质检问题记录(type+message+fid)
错误体系:
GeopipeAgentError:基类,提供to_dict()JSON 序列化PipelineParseError:YAML 解析失败PipelineValidationError:语义校验失败StepExecutionError:步骤运行失败,含suggestion修复建议VariableResolutionError:引用解析失败
下一章将介绍 GeoPipeAgent 的 CLI 命令行工具完全指南。