znlgis 博客

GIS开发与技术分享

第十五章:数据模型与错误体系

15.1 概述

GeoPipeAgent 的数据模型(src/geopipe_agent/models/)和错误体系(src/geopipe_agent/errors.py)定义了框架内部数据流转的数据结构和异常层级,是理解框架行为的基础。

15.2 数据模型

15.2.1 PipelineDefinition(models/pipeline.py)

表示一个已解析的流水线配置:

from pydantic import BaseModel, field_validator

class StepDefinition(BaseModel):
    """单个步骤的配置。"""
    id: str                         # 步骤唯一 ID
    use: str                        # 步骤类型(如 "vector.buffer")
    params: dict = {}              # 步骤参数(含引用字符串)
    when: str | None = None        # 条件表达式
    on_error: str = "fail"         # 错误策略:fail / skip / retry
    backend: str | None = None     # 指定后端(None 使用默认)

    @field_validator("on_error")
    @classmethod
    def validate_on_error(cls, v):
        allowed = {"fail", "skip", "retry"}
        if v not in allowed:
            raise ValueError(f"on_error 必须是 {allowed} 之一,当前值:{v!r}")
        return v

class PipelineDefinition(BaseModel):
    """完整的流水线配置模型。"""
    name: str                              # 流水线名称(必填)
    description: str | None = None        # 可选描述
    crs: str | None = None                # 默认 CRS
    variables: dict = {}                  # 全局变量
    steps: list[StepDefinition] = []      # 步骤列表
    outputs: dict[str, str] = {}          # 输出映射 {名称: 步骤引用}

Pydantic 的作用

  • 自动进行类型检查和类型转换
  • 在模型实例化时(parse_yaml 阶段)及早发现配置错误
  • 提供清晰的校验错误信息

15.2.2 StepResult(models/result.py)

每个步骤执行后返回的结果对象:

from dataclasses import dataclass, field
from typing import Any

@dataclass
class StepResult:
    """步骤执行结果。"""
    output: Any = None              # 主要输出(GeoDataFrame、raster dict 等)
    stats: dict = field(default_factory=dict)  # 统计信息
    issues: list["QcIssue"] = field(default_factory=list)  # 质检问题列表
    
    # 快捷属性(与 output 同步)
    @property
    def crs(self) -> str | None:
        """获取输出数据的 CRS。"""
        if hasattr(self.output, "crs") and self.output is not None:
            return str(self.output.crs)
        return self.stats.get("crs")
    
    @property
    def feature_count(self) -> int:
        """获取输出数据的要素数量。"""
        if hasattr(self.output, "__len__"):
            return len(self.output)
        return self.stats.get("feature_count", 0)
    
    @property
    def geometry_type(self) -> str | None:
        """获取几何类型。"""
        return self.stats.get("geometry_type")
    
    # 质检结果快捷属性
    @property
    def issue_count(self) -> int:
        return len(self.issues)
    
    @property
    def passed(self) -> bool:
        return self.issue_count == 0
    
    def summary(self) -> dict:
        """生成 JSON 可序列化的摘要,用于报告。"""
        return {
            "type": type(self.output).__name__ if self.output is not None else "None",
            "feature_count": self.feature_count,
            "crs": self.crs,
            **self.stats,
        }

StepResult 的设计意图

  1. 解耦输出与元数据output 是实际的数据(GeoDataFrame 等),stats 是关于该数据的统计摘要。
  2. 统一质检结果:QC 步骤的问题列表通过 issues 字段传递,issue_countpassed 快捷属性便于在 when 表达式中引用。
  3. 可序列化摘要summary() 方法返回 JSON 可序列化的摘要,用于报告生成。

15.2.3 QcIssue(models/qc.py)

数据质检步骤报告问题的标准数据结构:

from dataclasses import dataclass
from typing import Any

@dataclass
class QcIssue:
    """质检发现的单个问题。"""
    type: str           # 问题类型(如 "self_intersection"、"missing_crs")
    message: str        # 问题描述(人类可读)
    fid: Any = None     # 相关要素的 ID(矢量质检)
    location: Any = None  # 问题的地理位置(如有)
    severity: str = "error"   # 严重程度:error / warning / info
    
    def to_dict(self) -> dict:
        """转为字典,用于 JSON 报告。"""
        return {
            "type": self.type,
            "message": self.message,
            "fid": self.fid,
            "severity": self.severity,
        }

QC 步骤使用 QcIssue 的方式:

@step(id="qc.geometry_validity", ...)
def geometry_validity_step(ctx: StepContext) -> StepResult:
    gdf = ctx.input()
    issues = []
    
    for idx, row in gdf.iterrows():
        if row.geometry is None:
            issues.append(QcIssue(
                type="empty_geometry",
                message=f"要素 {idx} 的几何为空",
                fid=idx,
                severity="error"
            ))
        elif not row.geometry.is_valid:
            issues.append(QcIssue(
                type="self_intersection",
                message=f"要素 {idx} 存在自相交",
                fid=idx,
                severity="error"
            ))
    
    return StepResult(
        output=gdf,
        stats={"checked_count": len(gdf), "issue_count": len(issues)},
        issues=issues
    )

15.3 错误体系(errors.py)

错误继承结构

BaseException
└── Exception
    └── GeopipeAgentError          # 框架基类异常
        ├── PipelineParseError     # YAML 解析/模型构建错误
        ├── PipelineValidationError # 流水线语义校验错误
        ├── StepExecutionError      # 步骤执行错误(含修复建议)
        ├── VariableResolutionError # 变量/步骤引用解析错误
        └── BackendError           # 后端不可用或执行错误

GeopipeAgentError(基类)

class GeopipeAgentError(Exception):
    """GeoPipeAgent 所有自定义异常的基类。"""
    
    def to_dict(self) -> dict:
        """转为 JSON 可序列化的错误字典,用于输出到 stderr。"""
        return {
            "error": type(self).__name__,
            "message": str(self),
        }

PipelineParseError

parser.py 中抛出,表示 YAML 文件解析失败:

class PipelineParseError(GeopipeAgentError):
    pass

# 抛出示例
raise PipelineParseError(
    f"无法解析流水线文件 '{file_path}':{yaml_error}"
)

触发场景

  • YAML 语法错误(缩进错误、特殊字符等)
  • 必填字段缺失(如 name 字段未填写)
  • 字段类型错误(如 steps 不是列表)

PipelineValidationError

validator.py 中抛出,表示流水线语义错误:

class PipelineValidationError(GeopipeAgentError):
    pass

# 抛出示例
raise PipelineValidationError(
    f"步骤 ID '{step_id}' 重复。流水线中每个步骤的 ID 必须唯一。"
)

触发场景

  • 步骤 ID 重复
  • 步骤引用不存在或循环引用

StepExecutionError

executor.py 中抛出,是最常见的运行时错误:

class StepExecutionError(GeopipeAgentError):
    def __init__(
        self,
        step_id: str,
        message: str,
        cause: Exception | None = None,
        suggestion: str | None = None,   # AI 友好的修复建议
    ):
        self.step_id = step_id
        self.message = message
        self.cause = cause
        self.suggestion = suggestion
        super().__init__(f"步骤 '{step_id}' 执行失败:{message}")
    
    def to_dict(self) -> dict:
        result = {
            "error": "StepExecutionError",
            "step_id": self.step_id,
            "message": self.message,
        }
        if self.suggestion:
            result["suggestion"] = self.suggestion
        if self.cause:
            result["cause"] = str(self.cause)
        return result

to_dict() 输出示例

{
  "error": "StepExecutionError",
  "step_id": "buffer",
  "message": "步骤 'buffer' 执行失败:CRS mismatch — input data uses geographic CRS (degrees) but distance expects meters",
  "suggestion": "Add a vector.reproject step before this step to convert to a projected CRS.",
  "cause": "ValueError: distance should be in the same units as CRS"
}

VariableResolutionError

context.py 中抛出,表示变量或步骤引用无法解析:

class VariableResolutionError(GeopipeAgentError):
    pass

# 抛出示例(变量未定义)
raise VariableResolutionError(
    f"变量 '$' 未定义。可用变量:{list(self.variables.keys())}"
)

# 抛出示例(步骤引用不存在)
raise VariableResolutionError(
    f"步骤引用 '$load-roads' 失败:步骤 'load-roads' 无输出。"
    f"可用步骤输出:{list(self._step_outputs.keys())}"
)

BackendError

在后端模块中抛出,表示后端不可用或后端命令执行失败:

class BackendError(GeopipeAgentError):
    def __init__(self, backend_name: str, message: str):
        self.backend_name = backend_name
        super().__init__(f"后端 '{backend_name}' 错误:{message}")

15.4 错误处理最佳实践

在 CLI 层面

CLI(cli.py)统一捕获框架异常,以 JSON 格式输出到 stderr:

try:
    report = execute_pipeline(pipeline)
    click.echo(json.dumps(report, indent=2))  # 成功:报告输出到 stdout
except GeopipeAgentError as e:
    # 已知框架错误:JSON 格式输出到 stderr
    click.echo(json.dumps(e.to_dict(), indent=2), err=True)
    sys.exit(1)
except Exception as e:
    # 未知错误:包装后输出
    click.echo(json.dumps({"error": "UnexpectedError", "message": str(e)}), err=True)
    sys.exit(1)

这种设计保证:

  • 成功时:stdout 是 JSON 报告,便于脚本/AI 解析
  • 失败时:stderr 是 JSON 错误,stdout 为空

AI 友好的错误信息

所有错误信息(特别是 StepExecutionErrorsuggestion 字段)都以 AI 可理解的方式编写,便于 AI 自动诊断和修复:

{
  "error": "StepExecutionError",
  "step_id": "topology-check",
  "message": "topology analysis failed: memory overflow",
  "suggestion": "Clip the input to a smaller area first, or increase system memory."
}

AI 可以解析这个错误信息,自动在流水线中添加 vector.clip 步骤来解决问题。

15.5 数据流转图

YAML 文件
    │ parse_yaml()
    ▼
PipelineDefinition (Pydantic 模型)
    │ execute_pipeline()
    ▼
for each StepDefinition:
    │ resolve_params()    ──────► PipelineContext
    │                              (variables + step_outputs)
    │ step_func(StepContext)
    │                    ──────► StepResult
    │                              ├── output: GeoDataFrame/raster
    │                              ├── stats: dict
    │                              └── issues: list[QcIssue]
    │ set_output(step_id, result)
    ▼
build_report()
    │
    ▼
JSON 报告(dict)

15.6 小结

本章介绍了 GeoPipeAgent 的数据模型和错误体系:

数据模型

  • PipelineDefinition:流水线配置(Pydantic 模型,含 StepDefinition 列表)
  • StepResult:步骤执行结果(output + stats + issues),提供 crs/feature_count/passed 等快捷属性
  • QcIssue:质检问题记录(type + message + fid

错误体系

  • GeopipeAgentError:基类,提供 to_dict() JSON 序列化
  • PipelineParseError:YAML 解析失败
  • PipelineValidationError:语义校验失败
  • StepExecutionError:步骤运行失败,含 suggestion 修复建议
  • VariableResolutionError:引用解析失败

下一章将介绍 GeoPipeAgent 的 CLI 命令行工具完全指南。