znlgis 博客

GIS开发与技术分享

第15章:数据模型与类型系统

本章详细介绍 GeoPipeAgent 的数据模型体系和参数类型系统。models 包定义了流水线、步骤和执行结果的核心数据结构,参数类型系统则确保 YAML 中声明的参数值能被正确解析和验证。理解这些基础概念有助于编写正确的流水线定义和开发自定义步骤。


15.1 数据模型概述

15.1.1 models 包结构

GeoPipeAgent 的数据模型定义在 models 包中,由三个模块组成:

geopipeagent/models/
├── __init__.py          # 导出所有模型类
├── pipeline.py          # PipelineDefinition, StepDefinition
├── result.py            # StepResult
└── qc.py               # QcIssue

另有 registry.py 模块中定义的 StepInfo,用于存储步骤注册元数据。

15.1.2 模型类清单

类名 模块 用途 创建方式
PipelineDefinition pipeline.py 流水线定义 parser.py 解析生成
StepDefinition pipeline.py 步骤定义 parser.py 解析生成
StepResult result.py 步骤执行结果 步骤函数返回
QcIssue qc.py 质检问题 QC 步骤创建
StepInfo registry.py 步骤注册元数据 @step 装饰器生成

15.1.3 数据模型在流水线中的流转

YAML 文件
    │  parse_yaml()
    ▼
┌──────────────────┐
│ PipelineDefinition│
│  ├─ name         │
│  ├─ steps[]      │──────┐
│  ├─ variables    │      │
│  └─ ...          │      │
└──────────────────┘      │
                          │
                ┌─────────┘
                ▼
        ┌──────────────┐         ┌──────────┐
        │ StepDefinition│────────▶│ StepInfo  │
        │  ├─ id       │  查注册表│ (元数据)  │
        │  ├─ use      │         └──────────┘
        │  ├─ params   │
        │  └─ ...      │
        └──────┬───────┘
               │  execute_step()
               ▼
        ┌──────────────┐
        │  StepResult   │
        │  ├─ output   │──▶ GeoDataFrame / raster_info
        │  ├─ stats    │──▶ dict
        │  ├─ issues   │──▶ list[QcIssue]  (QC 步骤)
        │  └─ metadata │──▶ dict
        └──────────────┘

15.2 PipelineDefinition

15.2.1 类定义

from dataclasses import dataclass, field
from typing import Optional

@dataclass
class PipelineDefinition:
    """流水线定义——YAML 文件的 Python 对象表示"""

    name: str                                   # 流水线名称
    steps: list["StepDefinition"]              # 步骤列表
    description: str = ""                       # 流水线描述
    crs: Optional[str] = None                  # 全局 CRS
    variables: dict = field(default_factory=dict)  # 变量字典
    outputs: dict = field(default_factory=dict)    # 输出配置

15.2.2 字段详解

字段 类型 必需 说明 YAML 对应
name str 流水线名称,用于报告和日志 name:
steps list[StepDefinition] 步骤列表,按定义顺序执行 steps:
description str 流水线描述文本 description:
crs str \| None 全局坐标参考系 crs:
variables dict 变量字典,供 $var.xxx 引用 variables:
outputs dict 输出配置(指定最终输出步骤) outputs:

15.2.3 YAML 映射示例

# YAML 定义
name: "城市绿地分析"            # → name
description: "分析城市绿地覆盖率"  # → description
crs: "EPSG:4326"               # → crs

variables:                      # → variables
  buffer_distance: 500
  min_area: 100

steps:                          # → steps
  - id: read_data
    step: io.read_vector
    params:
      path: "data/greenspace.gpkg"

outputs:                        # → outputs
  result: "$steps.analysis_result"

对应的 Python 对象:

PipelineDefinition(
    name="城市绿地分析",
    description="分析城市绿地覆盖率",
    crs="EPSG:4326",
    variables={
        "buffer_distance": 500,
        "min_area": 100,
    },
    steps=[
        StepDefinition(id="read_data", use="io.read_vector", ...),
    ],
    outputs={
        "result": "$steps.analysis_result",
    },
)

15.3 StepDefinition

15.3.1 类定义

@dataclass
class StepDefinition:
    """步骤定义——单个处理步骤的声明"""

    id: str                         # 步骤唯一标识
    use: str                        # 步骤类型(注册表 ID)
    params: dict = field(default_factory=dict)   # 参数字典
    when: Optional[str] = None      # 条件表达式
    on_error: str = "fail"          # 错误处理策略
    backend: Optional[str] = None   # 指定后端

15.3.2 字段详解

字段 类型 必需 默认值 说明
id str 步骤唯一标识,用于引用和报告
use str 步骤类型,对应注册表中的 step ID
params dict {} 步骤参数,支持 $var$steps 引用
when str \| None None 条件表达式,为 true 时才执行
on_error str "fail" 错误策略:fail / skip / retry
backend str \| None None 指定执行后端,覆盖全局设置

15.3.3 id 与 use 的区别

- id: buffer_roads          # ← id: 用户自定义的步骤实例标识
  step: vector.buffer       # ← use(step): 注册表中的步骤类型

- id: buffer_buildings      # ← 另一个实例,不同的 id
  step: vector.buffer       # ← 同一个步骤类型
id:   步骤的"变量名"     → 在当前流水线中唯一
use:  步骤的"函数名"     → 对应注册表中的步骤类型
                           同一类型可被多次使用

15.3.4 完整 YAML 示例

- id: clip_data
  step: vector.clip          # use 字段(YAML 中可用 step 或 use)
  params:
    input: "$steps.read_data"
    clip_geometry: "$steps.read_boundary"
  when: "$var.enable_clip == true"
  on_error: skip
  backend: gdal_cli

15.4 StepResult 详解

15.4.1 类定义

@dataclass
class StepResult:
    """步骤执行结果"""

    output: Any = None              # 输出数据
    stats: dict = field(default_factory=dict)     # 统计信息
    metadata: dict = field(default_factory=dict)  # 元数据
    issues: list = field(default_factory=list)    # QC 问题列表

    def __getattr__(self, name):
        """魔法方法:允许通过属性访问 stats 和 metadata

        使得 result.feature_count 等价于
        result.stats.get("feature_count")
        """
        if name.startswith("_"):
            raise AttributeError(name)

        # 先查 stats
        if name in self.stats:
            return self.stats[name]

        # 再查 metadata
        if name in self.metadata:
            return self.metadata[name]

        raise AttributeError(
            f"'StepResult' has no attribute '{name}'"
        )

    def summary(self):
        """生成结果摘要字符串"""
        parts = [f"StepResult:"]

        if self.output is not None:
            output_type = type(self.output).__name__
            parts.append(f"  output: {output_type}")

        if self.stats:
            parts.append(f"  stats: {self.stats}")

        if self.issues:
            parts.append(f"  issues: {len(self.issues)} issue(s)")

        return "\n".join(parts)

15.4.2 四个核心字段

字段 类型 说明 示例
output Any 步骤的主要输出数据 GeoDataFrame, raster_info, None
stats dict 统计信息(执行过程的数值指标) {"feature_count": 500, "total_distance": 1234.5}
metadata dict 元数据(描述性信息) {"crs": "EPSG:4326", "format": "GPKG"}
issues list[QcIssue] 质检问题列表(仅 QC 步骤) [QcIssue(...), ...]

15.4.3 getattr 魔法方法

__getattr__ 实现了透明属性访问——可以直接通过 result.xxx 访问 statsmetadata 中的值:

result = StepResult(
    output=gdf,
    stats={"feature_count": 500, "total_area": 12345.6},
    metadata={"crs": "EPSG:4326"},
)

# 直接属性访问(通过 __getattr__)
result.feature_count   # → 500(从 stats 获取)
result.total_area      # → 12345.6(从 stats 获取)
result.crs             # → "EPSG:4326"(从 metadata 获取)

# 等价的显式访问
result.stats["feature_count"]    # → 500
result.metadata["crs"]           # → "EPSG:4326"

属性查找优先级:

result.xxx
    │
    ├── 1. 实例属性(output, stats, metadata, issues)
    │
    ├── 2. stats 字典中查找
    │
    ├── 3. metadata 字典中查找
    │
    └── 4. 抛出 AttributeError

15.4.4 在流水线中引用 StepResult

steps:
  - id: read_data
    step: io.read_vector
    params:
      path: "data/parcels.gpkg"

  # 引用上一步的输出
  - id: buffer_data
    step: vector.buffer
    params:
      input: "$steps.read_data"         # → result.output
      distance: 100

  # 引用上一步的统计信息
  - id: next_step
    step: some.step
    params:
      count: "$steps.read_data.stats.feature_count"
    when: "$steps.read_data.stats.feature_count > 0"

引用解析规则:

引用表达式 解析结果
$steps.xxx results["xxx"].output
$steps.xxx.output results["xxx"].output
$steps.xxx.stats results["xxx"].stats
$steps.xxx.stats.feature_count results["xxx"].stats["feature_count"]
$steps.xxx.issues results["xxx"].issues

15.4.5 summary() 方法

result = StepResult(
    output=gdf,
    stats={"feature_count": 500, "issue_count": 3},
    issues=[QcIssue(...), QcIssue(...), QcIssue(...)],
)

print(result.summary())
# StepResult:
#   output: GeoDataFrame
#   stats: {'feature_count': 500, 'issue_count': 3}
#   issues: 3 issue(s)

15.5 QcIssue 详解

15.5.1 完整类定义

from dataclasses import dataclass, field
from typing import Any, Optional

@dataclass
class QcIssue:
    """质检问题数据模型

    记录单个质检发现的问题,包含问题位置、
    严重级别和诊断详情。
    """

    rule_id: str                          # 规则标识
    severity: str                         # 严重级别
    feature_index: Optional[int] = None   # 要素索引
    message: str = ""                     # 问题描述
    geometry: Any = None                  # 问题几何
    details: dict = field(default_factory=dict)  # 扩展详情

    def to_dict(self):
        """序列化为字典(JSON 兼容)"""
        d = {
            "rule_id": self.rule_id,
            "severity": self.severity,
            "message": self.message,
        }

        if self.feature_index is not None:
            d["feature_index"] = self.feature_index

        if self.geometry is not None:
            d["geometry"] = self.geometry.wkt

        if self.details:
            d["details"] = self.details

        return d

15.5.2 severity 级别体系

┌─────────────────────────────────────────────────────────────┐
│                    severity 级别层次                          │
├─────────┬───────────────────────────────────────────────────┤
│         │                                                   │
│  error  │  数据严重错误,必须修复后才能使用                      │
│         │  例:自相交多边形、CRS 缺失、必填字段为空              │
│         │  对应行为:可触发 on_error: fail                     │
│         │                                                   │
├─────────┼───────────────────────────────────────────────────┤
│         │                                                   │
│ warning │  数据存在异常,建议人工复核                            │
│         │  例:数值接近范围边界、面积异常小、疑似重复              │
│         │  对应行为:记录但不中断                               │
│         │                                                   │
├─────────┼───────────────────────────────────────────────────┤
│         │                                                   │
│  info   │  信息提示,供参考                                    │
│         │  例:发现可能的重复要素(在某些业务中是合理的)           │
│         │  对应行为:仅记录                                    │
│         │                                                   │
└─────────┴───────────────────────────────────────────────────┘

15.5.3 to_dict() 序列化

to_dict()QcIssue 转换为 JSON 兼容的字典,用于报告输出:

issue = QcIssue(
    rule_id="attribute_completeness",
    severity="error",
    feature_index=42,
    message="Field 'name' is null or empty",
    geometry=Point(116.39, 39.91),
    details={"field": "name", "value": None},
)

issue.to_dict()
# {
#     "rule_id": "attribute_completeness",
#     "severity": "error",
#     "feature_index": 42,
#     "message": "Field 'name' is null or empty",
#     "geometry": "POINT (116.39 39.91)",
#     "details": {"field": "name", "value": null}
# }

关键序列化规则:

字段 序列化方式
rule_id, severity, message 原样输出
feature_index 非 None 时输出
geometry 转换为 WKT 字符串
details 原样输出字典

15.5.4 各 QC 步骤的 rule_id 对照表

QC 步骤 rule_id 常见 details 字段
qc.geometry_validity "geometry_validity" type, reason
qc.crs_check "crs_check" expected, actual
qc.topology "topology" rule, overlap_area
qc.attribute_completeness "attribute_completeness" field, value
qc.attribute_domain "attribute_domain" field, value, allowed/pattern
qc.value_range "value_range" field, value, min/max
qc.duplicate_check "duplicate_check" duplicate_of, key
qc.raster_nodata "raster_nodata" expected, actual, nodata_ratio
qc.raster_resolution "raster_resolution" axis, expected, actual
qc.raster_value_range "raster_value_range" band, actual_min/actual_max

15.6 StepInfo 详解

15.6.1 类定义

StepInfo 存储步骤的注册元数据,由 @step 装饰器自动生成:

@dataclass
class StepInfo:
    """步骤注册元数据"""

    id: str                     # 步骤 ID(如 "vector.buffer")
    name: str                   # 步骤显示名称
    description: str = ""       # 步骤描述
    category: str = ""          # 分类(如 "vector", "io", "qc")
    params: list = field(default_factory=list)    # 参数定义列表
    input_type: str = ""        # 输入类型
    output_type: str = ""       # 输出类型

    def get_param(self, name):
        """根据名称获取参数定义"""
        for p in self.params:
            if p.get("name") == name:
                return p
        return None

    def to_dict(self):
        """序列化为字典"""
        return {
            "id": self.id,
            "name": self.name,
            "description": self.description,
            "category": self.category,
            "params": self.params,
            "input_type": self.input_type,
            "output_type": self.output_type,
        }

15.6.2 字段详解

字段 类型 说明 示例
id str 步骤的全局唯一 ID "vector.buffer"
name str 人类可读的名称 "缓冲区分析"
description str 功能描述 "为几何要素创建缓冲区"
category str 步骤分类 "vector", "io", "qc"
params list[dict] 参数定义列表 见下表
input_type str 输入数据类型 "geodataframe"
output_type str 输出数据类型 "geodataframe"

15.6.3 参数定义结构

params 列表中每个元素是一个字典:

{
    "name": "distance",        # 参数名
    "type": "number",          # 参数类型
    "required": True,          # 是否必需
    "default": None,           # 默认值
    "description": "缓冲距离",  # 参数描述
}

15.6.4 @step 装饰器生成 StepInfo

from geopipeagent.registry import step

@step(
    id="vector.buffer",
    name="缓冲区分析",
    description="为几何要素创建缓冲区",
    category="vector",
    params=[
        {"name": "input", "type": "geodataframe", "required": True},
        {"name": "distance", "type": "number", "required": True},
        {"name": "resolution", "type": "integer", "required": False,
         "default": 16},
    ],
    input_type="geodataframe",
    output_type="geodataframe",
)
def buffer(ctx):
    ...

# 装饰器自动生成并绑定 StepInfo
buffer.step_info
# StepInfo(
#     id="vector.buffer",
#     name="缓冲区分析",
#     ...
# )

15.6.5 to_dict() 序列化

info = buffer.step_info
info.to_dict()
# {
#     "id": "vector.buffer",
#     "name": "缓冲区分析",
#     "description": "为几何要素创建缓冲区",
#     "category": "vector",
#     "params": [
#         {"name": "input", "type": "geodataframe", "required": true},
#         {"name": "distance", "type": "number", "required": true},
#         {"name": "resolution", "type": "integer", "required": false,
#          "default": 16},
#     ],
#     "input_type": "geodataframe",
#     "output_type": "geodataframe"
# }

to_dict() 的输出可用于:

  • 生成步骤文档
  • API 接口返回步骤元信息
  • 前端 UI 动态生成参数表单

15.7 类型系统

15.7.1 支持的参数类型

GeoPipeAgent 的参数类型系统定义了步骤参数可接受的数据类型:

类型标识 Python 类型 说明 YAML 示例
geodataframe GeoDataFrame 矢量空间数据 "$steps.read_data"
raster_info dict 栅格数据字典 "$steps.read_raster"
string str 字符串 "EPSG:4326"
number float 浮点数 100.5
integer int 整数 16
boolean bool 布尔值 true / false
list list 列表 [1, 2, 3]
dict dict 字典 {key: value}
point Point Shapely 点几何 {type: Point, coordinates: [...]}
crs str\|CRS 坐标参考系 "EPSG:4326"

15.7.2 类型在系统中的使用位置

┌─────────────────────────────────────────────────────┐
│  @step 装饰器                                        │
│  params=[{"name": "distance", "type": "number"}]    │
│                      │                               │
│                      ▼                               │
│  StepInfo.params    存储类型声明                       │
│                      │                               │
│                      ▼                               │
│  _validate_step_params()   执行时类型转换              │
│  "100" (str) → 100.0 (float)                        │
│                      │                               │
│                      ▼                               │
│  StepContext.param()  步骤函数获取已转换的值            │
└─────────────────────────────────────────────────────┘

15.7.3 geodataframe 类型

geodataframe 是最常用的参数类型,表示 GeoPandas GeoDataFrame 对象:

# 步骤定义中
{"name": "input", "type": "geodataframe", "required": True}

# YAML 中的使用
params:
  input: "$steps.read_data"   # 引用另一步骤的 GeoDataFrame 输出

# 步骤函数中的获取
gdf = ctx.input("input")      # → GeoDataFrame 对象

15.7.4 raster_info 类型

raster_info 是一个标准化的字典结构,描述栅格数据:

raster_info = {
    "data": np.ndarray,           # 栅格数据数组 (bands, height, width)
    "transform": Affine,          # 仿射变换矩阵
    "crs": CRS,                   # 坐标参考系
    "profile": {                  # rasterio profile
        "driver": "GTiff",
        "dtype": "float32",
        "width": 1000,
        "height": 1000,
        "count": 1,               # 波段数
        "crs": CRS,
        "transform": Affine,
        "nodata": -9999,
    },
    "path": str | None,           # 源文件路径
}

15.7.5 类型转换矩阵

执行器在参数验证阶段执行自动类型转换:

声明类型 YAML 值 Python 解析值 转换后
number 100 int(100) 100(保持)
number 100.5 float(100.5) 100.5(保持)
number "100" str("100") float(100.0)
integer 42 int(42) 42(保持)
integer "42" str("42") int(42)
boolean true bool(True) True(保持)
boolean "true" str("true") True
boolean "false" str("false") False
string hello str("hello") "hello"(保持)
list [1,2,3] list [1,2,3](保持)
geodataframe "$steps.xxx" GeoDataFrame 由 context 解析

15.7.6 point 类型的 YAML 表示

point 类型在 YAML 中有特殊的表示方式:

# 方式 1:GeoJSON 格式
origin:
  type: Point
  coordinates: [116.3975, 39.9087]

# 方式 2:引用地理编码步骤输出
origin: "$steps.geocode_origin"

# 方式 3:简写列表(部分步骤支持)
origin: [116.3975, 39.9087]

15.8 数据模型关系图

15.8.1 类关系全景

┌─────────────────────────────────────────────────────────────────┐
│                        models 包                                │
│                                                                 │
│  ┌─────────────────────┐         ┌──────────────────────┐      │
│  │  PipelineDefinition │ 1──────* │   StepDefinition     │      │
│  ├─────────────────────┤         ├──────────────────────┤      │
│  │ name: str           │         │ id: str              │      │
│  │ steps: list         │────────▶│ use: str ────────────┼──┐   │
│  │ description: str    │         │ params: dict         │  │   │
│  │ crs: str?           │         │ when: str?           │  │   │
│  │ variables: dict     │         │ on_error: str        │  │   │
│  │ outputs: dict       │         │ backend: str?        │  │   │
│  └─────────────────────┘         └──────────────────────┘  │   │
│                                                             │   │
│  ┌─────────────────────┐         ┌──────────────────────┐  │   │
│  │    StepResult       │ 0──────* │     QcIssue          │  │   │
│  ├─────────────────────┤         ├──────────────────────┤  │   │
│  │ output: Any         │         │ rule_id: str         │  │   │
│  │ stats: dict         │         │ severity: str        │  │   │
│  │ metadata: dict      │         │ feature_index: int?  │  │   │
│  │ issues: list ───────┼────────▶│ message: str         │  │   │
│  ├─────────────────────┤         │ geometry: Any?       │  │   │
│  │ __getattr__()       │         │ details: dict        │  │   │
│  │ summary()           │         ├──────────────────────┤  │   │
│  └─────────────────────┘         │ to_dict()            │  │   │
│                                  └──────────────────────┘  │   │
│                                                             │   │
│  ┌──────────────────────────────────────────────────────┐   │   │
│  │                  registry 模块                        │   │   │
│  │                                                      │   │   │
│  │  ┌──────────────────────┐                            │   │   │
│  │  │      StepInfo        │◀───────────────────────────┘   │   │
│  │  ├──────────────────────┤    use → registry[use]         │   │
│  │  │ id: str              │                                │   │
│  │  │ name: str            │                                │   │
│  │  │ description: str     │                                │   │
│  │  │ category: str        │                                │   │
│  │  │ params: list[dict]   │                                │   │
│  │  │ input_type: str      │                                │   │
│  │  │ output_type: str     │                                │   │
│  │  ├──────────────────────┤                                │   │
│  │  │ get_param(name)      │                                │   │
│  │  │ to_dict()            │                                │   │
│  │  └──────────────────────┘                                │   │
│  └──────────────────────────────────────────────────────┘   │   │
│                                                             │   │
└─────────────────────────────────────────────────────────────────┘

15.8.2 关系说明

关系 说明
PipelineDefinition → StepDefinition 一对多:一个流水线包含多个步骤
StepDefinition → StepInfo 多对一:多个步骤实例可使用同一步骤类型
StepResult → QcIssue 一对多:一个步骤结果可含多个质检问题
StepDefinition.use → StepInfo.id 通过注册表查找关联

15.8.3 生命周期

阶段           创建的对象                     创建者
────           ──────────                     ──────
解析阶段       PipelineDefinition            parser.py
               StepDefinition                parser.py

注册阶段       StepInfo                      @step 装饰器
               (应用启动时完成)

执行阶段       StepResult                    步骤函数
               QcIssue                       QC 步骤函数

报告阶段       JSON report dict              reporter.py
               (消费所有模型对象)

15.9 本章小结

本章详细介绍了 GeoPipeAgent 的数据模型体系和参数类型系统:

核心数据模型(4 个类):

职责 关键字段
PipelineDefinition 流水线定义 name, steps, variables, crs, outputs
StepDefinition 步骤定义 id, use, params, when, on_error, backend
StepResult 步骤执行结果 output, stats, metadata, issues
QcIssue 质检问题 rule_id, severity, feature_index, message, geometry, details

注册表元数据(1 个类):

职责 关键字段
StepInfo 步骤注册信息 id, name, params, input_type, output_type

关键设计特征:

  1. StepResult.__getattr__:魔法方法实现透明属性访问,result.feature_count 自动从 stats 字典获取
  2. QcIssue.to_dict():将几何对象序列化为 WKT 字符串,确保 JSON 兼容
  3. StepInfo.get_param():根据参数名获取类型声明,支持执行器的类型转换

参数类型系统:

  • 支持 10 种参数类型:geodataframeraster_infostringnumberintegerbooleanlistdictpointcrs
  • 执行器在运行时自动进行类型转换(如 YAML 字符串 "100" → Python 浮点数 100.0
  • $steps$var 引用由上下文管理器在执行阶段解析

这些数据模型构成了 GeoPipeAgent 的基础类型骨架,贯穿于 YAML 解析、流水线校验、步骤执行和报告生成的完整生命周期。


下一章:CLI 命令行工具 →