第15章:数据模型与类型系统
本章详细介绍 GeoPipeAgent 的数据模型体系和参数类型系统。models 包定义了流水线、步骤和执行结果的核心数据结构,参数类型系统则确保 YAML 中声明的参数值能被正确解析和验证。理解这些基础概念有助于编写正确的流水线定义和开发自定义步骤。
15.1 数据模型概述
15.1.1 models 包结构
GeoPipeAgent 的数据模型定义在 models 包中,由三个模块组成:
geopipeagent/models/
├── __init__.py # 导出所有模型类
├── pipeline.py # PipelineDefinition, StepDefinition
├── result.py # StepResult
└── qc.py # QcIssue
另有 registry.py 模块中定义的 StepInfo,用于存储步骤注册元数据。
15.1.2 模型类清单
| 类名 | 模块 | 用途 | 创建方式 |
|---|---|---|---|
PipelineDefinition |
pipeline.py |
流水线定义 | parser.py 解析生成 |
StepDefinition |
pipeline.py |
步骤定义 | parser.py 解析生成 |
StepResult |
result.py |
步骤执行结果 | 步骤函数返回 |
QcIssue |
qc.py |
质检问题 | QC 步骤创建 |
StepInfo |
registry.py |
步骤注册元数据 | @step 装饰器生成 |
15.1.3 数据模型在流水线中的流转
YAML 文件
│ parse_yaml()
▼
┌──────────────────┐
│ PipelineDefinition│
│ ├─ name │
│ ├─ steps[] │──────┐
│ ├─ variables │ │
│ └─ ... │ │
└──────────────────┘ │
│
┌─────────┘
▼
┌──────────────┐ ┌──────────┐
│ StepDefinition│────────▶│ StepInfo │
│ ├─ id │ 查注册表│ (元数据) │
│ ├─ use │ └──────────┘
│ ├─ params │
│ └─ ... │
└──────┬───────┘
│ execute_step()
▼
┌──────────────┐
│ StepResult │
│ ├─ output │──▶ GeoDataFrame / raster_info
│ ├─ stats │──▶ dict
│ ├─ issues │──▶ list[QcIssue] (QC 步骤)
│ └─ metadata │──▶ dict
└──────────────┘
15.2 PipelineDefinition
15.2.1 类定义
from dataclasses import dataclass, field
from typing import Optional
@dataclass
class PipelineDefinition:
"""流水线定义——YAML 文件的 Python 对象表示"""
name: str # 流水线名称
steps: list["StepDefinition"] # 步骤列表
description: str = "" # 流水线描述
crs: Optional[str] = None # 全局 CRS
variables: dict = field(default_factory=dict) # 变量字典
outputs: dict = field(default_factory=dict) # 输出配置
15.2.2 字段详解
| 字段 | 类型 | 必需 | 说明 | YAML 对应 |
|---|---|---|---|---|
name |
str |
✅ | 流水线名称,用于报告和日志 | name: |
steps |
list[StepDefinition] |
✅ | 步骤列表,按定义顺序执行 | steps: |
description |
str |
❌ | 流水线描述文本 | description: |
crs |
str \| None |
❌ | 全局坐标参考系 | crs: |
variables |
dict |
❌ | 变量字典,供 $var.xxx 引用 |
variables: |
outputs |
dict |
❌ | 输出配置(指定最终输出步骤) | outputs: |
15.2.3 YAML 映射示例
# YAML 定义
name: "城市绿地分析" # → name
description: "分析城市绿地覆盖率" # → description
crs: "EPSG:4326" # → crs
variables: # → variables
buffer_distance: 500
min_area: 100
steps: # → steps
- id: read_data
step: io.read_vector
params:
path: "data/greenspace.gpkg"
outputs: # → outputs
result: "$steps.analysis_result"
对应的 Python 对象:
PipelineDefinition(
name="城市绿地分析",
description="分析城市绿地覆盖率",
crs="EPSG:4326",
variables={
"buffer_distance": 500,
"min_area": 100,
},
steps=[
StepDefinition(id="read_data", use="io.read_vector", ...),
],
outputs={
"result": "$steps.analysis_result",
},
)
15.3 StepDefinition
15.3.1 类定义
@dataclass
class StepDefinition:
"""步骤定义——单个处理步骤的声明"""
id: str # 步骤唯一标识
use: str # 步骤类型(注册表 ID)
params: dict = field(default_factory=dict) # 参数字典
when: Optional[str] = None # 条件表达式
on_error: str = "fail" # 错误处理策略
backend: Optional[str] = None # 指定后端
15.3.2 字段详解
| 字段 | 类型 | 必需 | 默认值 | 说明 |
|---|---|---|---|---|
id |
str |
✅ | — | 步骤唯一标识,用于引用和报告 |
use |
str |
✅ | — | 步骤类型,对应注册表中的 step ID |
params |
dict |
❌ | {} |
步骤参数,支持 $var 和 $steps 引用 |
when |
str \| None |
❌ | None |
条件表达式,为 true 时才执行 |
on_error |
str |
❌ | "fail" |
错误策略:fail / skip / retry |
backend |
str \| None |
❌ | None |
指定执行后端,覆盖全局设置 |
15.3.3 id 与 use 的区别
- id: buffer_roads # ← id: 用户自定义的步骤实例标识
step: vector.buffer # ← use(step): 注册表中的步骤类型
- id: buffer_buildings # ← 另一个实例,不同的 id
step: vector.buffer # ← 同一个步骤类型
id: 步骤的"变量名" → 在当前流水线中唯一
use: 步骤的"函数名" → 对应注册表中的步骤类型
同一类型可被多次使用
15.3.4 完整 YAML 示例
- id: clip_data
step: vector.clip # use 字段(YAML 中可用 step 或 use)
params:
input: "$steps.read_data"
clip_geometry: "$steps.read_boundary"
when: "$var.enable_clip == true"
on_error: skip
backend: gdal_cli
15.4 StepResult 详解
15.4.1 类定义
@dataclass
class StepResult:
"""步骤执行结果"""
output: Any = None # 输出数据
stats: dict = field(default_factory=dict) # 统计信息
metadata: dict = field(default_factory=dict) # 元数据
issues: list = field(default_factory=list) # QC 问题列表
def __getattr__(self, name):
"""魔法方法:允许通过属性访问 stats 和 metadata
使得 result.feature_count 等价于
result.stats.get("feature_count")
"""
if name.startswith("_"):
raise AttributeError(name)
# 先查 stats
if name in self.stats:
return self.stats[name]
# 再查 metadata
if name in self.metadata:
return self.metadata[name]
raise AttributeError(
f"'StepResult' has no attribute '{name}'"
)
def summary(self):
"""生成结果摘要字符串"""
parts = [f"StepResult:"]
if self.output is not None:
output_type = type(self.output).__name__
parts.append(f" output: {output_type}")
if self.stats:
parts.append(f" stats: {self.stats}")
if self.issues:
parts.append(f" issues: {len(self.issues)} issue(s)")
return "\n".join(parts)
15.4.2 四个核心字段
| 字段 | 类型 | 说明 | 示例 |
|---|---|---|---|
output |
Any |
步骤的主要输出数据 | GeoDataFrame, raster_info, None |
stats |
dict |
统计信息(执行过程的数值指标) | {"feature_count": 500, "total_distance": 1234.5} |
metadata |
dict |
元数据(描述性信息) | {"crs": "EPSG:4326", "format": "GPKG"} |
issues |
list[QcIssue] |
质检问题列表(仅 QC 步骤) | [QcIssue(...), ...] |
15.4.3 getattr 魔法方法
__getattr__ 实现了透明属性访问——可以直接通过 result.xxx 访问 stats 和 metadata 中的值:
result = StepResult(
output=gdf,
stats={"feature_count": 500, "total_area": 12345.6},
metadata={"crs": "EPSG:4326"},
)
# 直接属性访问(通过 __getattr__)
result.feature_count # → 500(从 stats 获取)
result.total_area # → 12345.6(从 stats 获取)
result.crs # → "EPSG:4326"(从 metadata 获取)
# 等价的显式访问
result.stats["feature_count"] # → 500
result.metadata["crs"] # → "EPSG:4326"
属性查找优先级:
result.xxx
│
├── 1. 实例属性(output, stats, metadata, issues)
│
├── 2. stats 字典中查找
│
├── 3. metadata 字典中查找
│
└── 4. 抛出 AttributeError
15.4.4 在流水线中引用 StepResult
steps:
- id: read_data
step: io.read_vector
params:
path: "data/parcels.gpkg"
# 引用上一步的输出
- id: buffer_data
step: vector.buffer
params:
input: "$steps.read_data" # → result.output
distance: 100
# 引用上一步的统计信息
- id: next_step
step: some.step
params:
count: "$steps.read_data.stats.feature_count"
when: "$steps.read_data.stats.feature_count > 0"
引用解析规则:
| 引用表达式 | 解析结果 |
|---|---|
$steps.xxx |
results["xxx"].output |
$steps.xxx.output |
results["xxx"].output |
$steps.xxx.stats |
results["xxx"].stats |
$steps.xxx.stats.feature_count |
results["xxx"].stats["feature_count"] |
$steps.xxx.issues |
results["xxx"].issues |
15.4.5 summary() 方法
result = StepResult(
output=gdf,
stats={"feature_count": 500, "issue_count": 3},
issues=[QcIssue(...), QcIssue(...), QcIssue(...)],
)
print(result.summary())
# StepResult:
# output: GeoDataFrame
# stats: {'feature_count': 500, 'issue_count': 3}
# issues: 3 issue(s)
15.5 QcIssue 详解
15.5.1 完整类定义
from dataclasses import dataclass, field
from typing import Any, Optional
@dataclass
class QcIssue:
"""质检问题数据模型
记录单个质检发现的问题,包含问题位置、
严重级别和诊断详情。
"""
rule_id: str # 规则标识
severity: str # 严重级别
feature_index: Optional[int] = None # 要素索引
message: str = "" # 问题描述
geometry: Any = None # 问题几何
details: dict = field(default_factory=dict) # 扩展详情
def to_dict(self):
"""序列化为字典(JSON 兼容)"""
d = {
"rule_id": self.rule_id,
"severity": self.severity,
"message": self.message,
}
if self.feature_index is not None:
d["feature_index"] = self.feature_index
if self.geometry is not None:
d["geometry"] = self.geometry.wkt
if self.details:
d["details"] = self.details
return d
15.5.2 severity 级别体系
┌─────────────────────────────────────────────────────────────┐
│ severity 级别层次 │
├─────────┬───────────────────────────────────────────────────┤
│ │ │
│ error │ 数据严重错误,必须修复后才能使用 │
│ │ 例:自相交多边形、CRS 缺失、必填字段为空 │
│ │ 对应行为:可触发 on_error: fail │
│ │ │
├─────────┼───────────────────────────────────────────────────┤
│ │ │
│ warning │ 数据存在异常,建议人工复核 │
│ │ 例:数值接近范围边界、面积异常小、疑似重复 │
│ │ 对应行为:记录但不中断 │
│ │ │
├─────────┼───────────────────────────────────────────────────┤
│ │ │
│ info │ 信息提示,供参考 │
│ │ 例:发现可能的重复要素(在某些业务中是合理的) │
│ │ 对应行为:仅记录 │
│ │ │
└─────────┴───────────────────────────────────────────────────┘
15.5.3 to_dict() 序列化
to_dict() 将 QcIssue 转换为 JSON 兼容的字典,用于报告输出:
issue = QcIssue(
rule_id="attribute_completeness",
severity="error",
feature_index=42,
message="Field 'name' is null or empty",
geometry=Point(116.39, 39.91),
details={"field": "name", "value": None},
)
issue.to_dict()
# {
# "rule_id": "attribute_completeness",
# "severity": "error",
# "feature_index": 42,
# "message": "Field 'name' is null or empty",
# "geometry": "POINT (116.39 39.91)",
# "details": {"field": "name", "value": null}
# }
关键序列化规则:
| 字段 | 序列化方式 |
|---|---|
rule_id, severity, message |
原样输出 |
feature_index |
非 None 时输出 |
geometry |
转换为 WKT 字符串 |
details |
原样输出字典 |
15.5.4 各 QC 步骤的 rule_id 对照表
| QC 步骤 | rule_id | 常见 details 字段 |
|---|---|---|
qc.geometry_validity |
"geometry_validity" |
type, reason |
qc.crs_check |
"crs_check" |
expected, actual |
qc.topology |
"topology" |
rule, overlap_area |
qc.attribute_completeness |
"attribute_completeness" |
field, value |
qc.attribute_domain |
"attribute_domain" |
field, value, allowed/pattern |
qc.value_range |
"value_range" |
field, value, min/max |
qc.duplicate_check |
"duplicate_check" |
duplicate_of, key |
qc.raster_nodata |
"raster_nodata" |
expected, actual, nodata_ratio |
qc.raster_resolution |
"raster_resolution" |
axis, expected, actual |
qc.raster_value_range |
"raster_value_range" |
band, actual_min/actual_max |
15.6 StepInfo 详解
15.6.1 类定义
StepInfo 存储步骤的注册元数据,由 @step 装饰器自动生成:
@dataclass
class StepInfo:
"""步骤注册元数据"""
id: str # 步骤 ID(如 "vector.buffer")
name: str # 步骤显示名称
description: str = "" # 步骤描述
category: str = "" # 分类(如 "vector", "io", "qc")
params: list = field(default_factory=list) # 参数定义列表
input_type: str = "" # 输入类型
output_type: str = "" # 输出类型
def get_param(self, name):
"""根据名称获取参数定义"""
for p in self.params:
if p.get("name") == name:
return p
return None
def to_dict(self):
"""序列化为字典"""
return {
"id": self.id,
"name": self.name,
"description": self.description,
"category": self.category,
"params": self.params,
"input_type": self.input_type,
"output_type": self.output_type,
}
15.6.2 字段详解
| 字段 | 类型 | 说明 | 示例 |
|---|---|---|---|
id |
str |
步骤的全局唯一 ID | "vector.buffer" |
name |
str |
人类可读的名称 | "缓冲区分析" |
description |
str |
功能描述 | "为几何要素创建缓冲区" |
category |
str |
步骤分类 | "vector", "io", "qc" |
params |
list[dict] |
参数定义列表 | 见下表 |
input_type |
str |
输入数据类型 | "geodataframe" |
output_type |
str |
输出数据类型 | "geodataframe" |
15.6.3 参数定义结构
params 列表中每个元素是一个字典:
{
"name": "distance", # 参数名
"type": "number", # 参数类型
"required": True, # 是否必需
"default": None, # 默认值
"description": "缓冲距离", # 参数描述
}
15.6.4 @step 装饰器生成 StepInfo
from geopipeagent.registry import step
@step(
id="vector.buffer",
name="缓冲区分析",
description="为几何要素创建缓冲区",
category="vector",
params=[
{"name": "input", "type": "geodataframe", "required": True},
{"name": "distance", "type": "number", "required": True},
{"name": "resolution", "type": "integer", "required": False,
"default": 16},
],
input_type="geodataframe",
output_type="geodataframe",
)
def buffer(ctx):
...
# 装饰器自动生成并绑定 StepInfo
buffer.step_info
# StepInfo(
# id="vector.buffer",
# name="缓冲区分析",
# ...
# )
15.6.5 to_dict() 序列化
info = buffer.step_info
info.to_dict()
# {
# "id": "vector.buffer",
# "name": "缓冲区分析",
# "description": "为几何要素创建缓冲区",
# "category": "vector",
# "params": [
# {"name": "input", "type": "geodataframe", "required": true},
# {"name": "distance", "type": "number", "required": true},
# {"name": "resolution", "type": "integer", "required": false,
# "default": 16},
# ],
# "input_type": "geodataframe",
# "output_type": "geodataframe"
# }
to_dict() 的输出可用于:
- 生成步骤文档
- API 接口返回步骤元信息
- 前端 UI 动态生成参数表单
15.7 类型系统
15.7.1 支持的参数类型
GeoPipeAgent 的参数类型系统定义了步骤参数可接受的数据类型:
| 类型标识 | Python 类型 | 说明 | YAML 示例 |
|---|---|---|---|
geodataframe |
GeoDataFrame |
矢量空间数据 | "$steps.read_data" |
raster_info |
dict |
栅格数据字典 | "$steps.read_raster" |
string |
str |
字符串 | "EPSG:4326" |
number |
float |
浮点数 | 100.5 |
integer |
int |
整数 | 16 |
boolean |
bool |
布尔值 | true / false |
list |
list |
列表 | [1, 2, 3] |
dict |
dict |
字典 | {key: value} |
point |
Point |
Shapely 点几何 | {type: Point, coordinates: [...]} |
crs |
str\|CRS |
坐标参考系 | "EPSG:4326" |
15.7.2 类型在系统中的使用位置
┌─────────────────────────────────────────────────────┐
│ @step 装饰器 │
│ params=[{"name": "distance", "type": "number"}] │
│ │ │
│ ▼ │
│ StepInfo.params 存储类型声明 │
│ │ │
│ ▼ │
│ _validate_step_params() 执行时类型转换 │
│ "100" (str) → 100.0 (float) │
│ │ │
│ ▼ │
│ StepContext.param() 步骤函数获取已转换的值 │
└─────────────────────────────────────────────────────┘
15.7.3 geodataframe 类型
geodataframe 是最常用的参数类型,表示 GeoPandas GeoDataFrame 对象:
# 步骤定义中
{"name": "input", "type": "geodataframe", "required": True}
# YAML 中的使用
params:
input: "$steps.read_data" # 引用另一步骤的 GeoDataFrame 输出
# 步骤函数中的获取
gdf = ctx.input("input") # → GeoDataFrame 对象
15.7.4 raster_info 类型
raster_info 是一个标准化的字典结构,描述栅格数据:
raster_info = {
"data": np.ndarray, # 栅格数据数组 (bands, height, width)
"transform": Affine, # 仿射变换矩阵
"crs": CRS, # 坐标参考系
"profile": { # rasterio profile
"driver": "GTiff",
"dtype": "float32",
"width": 1000,
"height": 1000,
"count": 1, # 波段数
"crs": CRS,
"transform": Affine,
"nodata": -9999,
},
"path": str | None, # 源文件路径
}
15.7.5 类型转换矩阵
执行器在参数验证阶段执行自动类型转换:
| 声明类型 | YAML 值 | Python 解析值 | 转换后 |
|---|---|---|---|
number |
100 |
int(100) |
100(保持) |
number |
100.5 |
float(100.5) |
100.5(保持) |
number |
"100" |
str("100") |
float(100.0) |
integer |
42 |
int(42) |
42(保持) |
integer |
"42" |
str("42") |
int(42) |
boolean |
true |
bool(True) |
True(保持) |
boolean |
"true" |
str("true") |
True |
boolean |
"false" |
str("false") |
False |
string |
hello |
str("hello") |
"hello"(保持) |
list |
[1,2,3] |
list |
[1,2,3](保持) |
geodataframe |
"$steps.xxx" |
GeoDataFrame |
由 context 解析 |
15.7.6 point 类型的 YAML 表示
point 类型在 YAML 中有特殊的表示方式:
# 方式 1:GeoJSON 格式
origin:
type: Point
coordinates: [116.3975, 39.9087]
# 方式 2:引用地理编码步骤输出
origin: "$steps.geocode_origin"
# 方式 3:简写列表(部分步骤支持)
origin: [116.3975, 39.9087]
15.8 数据模型关系图
15.8.1 类关系全景
┌─────────────────────────────────────────────────────────────────┐
│ models 包 │
│ │
│ ┌─────────────────────┐ ┌──────────────────────┐ │
│ │ PipelineDefinition │ 1──────* │ StepDefinition │ │
│ ├─────────────────────┤ ├──────────────────────┤ │
│ │ name: str │ │ id: str │ │
│ │ steps: list │────────▶│ use: str ────────────┼──┐ │
│ │ description: str │ │ params: dict │ │ │
│ │ crs: str? │ │ when: str? │ │ │
│ │ variables: dict │ │ on_error: str │ │ │
│ │ outputs: dict │ │ backend: str? │ │ │
│ └─────────────────────┘ └──────────────────────┘ │ │
│ │ │
│ ┌─────────────────────┐ ┌──────────────────────┐ │ │
│ │ StepResult │ 0──────* │ QcIssue │ │ │
│ ├─────────────────────┤ ├──────────────────────┤ │ │
│ │ output: Any │ │ rule_id: str │ │ │
│ │ stats: dict │ │ severity: str │ │ │
│ │ metadata: dict │ │ feature_index: int? │ │ │
│ │ issues: list ───────┼────────▶│ message: str │ │ │
│ ├─────────────────────┤ │ geometry: Any? │ │ │
│ │ __getattr__() │ │ details: dict │ │ │
│ │ summary() │ ├──────────────────────┤ │ │
│ └─────────────────────┘ │ to_dict() │ │ │
│ └──────────────────────┘ │ │
│ │ │
│ ┌──────────────────────────────────────────────────────┐ │ │
│ │ registry 模块 │ │ │
│ │ │ │ │
│ │ ┌──────────────────────┐ │ │ │
│ │ │ StepInfo │◀───────────────────────────┘ │ │
│ │ ├──────────────────────┤ use → registry[use] │ │
│ │ │ id: str │ │ │
│ │ │ name: str │ │ │
│ │ │ description: str │ │ │
│ │ │ category: str │ │ │
│ │ │ params: list[dict] │ │ │
│ │ │ input_type: str │ │ │
│ │ │ output_type: str │ │ │
│ │ ├──────────────────────┤ │ │
│ │ │ get_param(name) │ │ │
│ │ │ to_dict() │ │ │
│ │ └──────────────────────┘ │ │
│ └──────────────────────────────────────────────────────┘ │ │
│ │ │
└─────────────────────────────────────────────────────────────────┘
15.8.2 关系说明
| 关系 | 说明 |
|---|---|
| PipelineDefinition → StepDefinition | 一对多:一个流水线包含多个步骤 |
| StepDefinition → StepInfo | 多对一:多个步骤实例可使用同一步骤类型 |
| StepResult → QcIssue | 一对多:一个步骤结果可含多个质检问题 |
| StepDefinition.use → StepInfo.id | 通过注册表查找关联 |
15.8.3 生命周期
阶段 创建的对象 创建者
──── ────────── ──────
解析阶段 PipelineDefinition parser.py
StepDefinition parser.py
注册阶段 StepInfo @step 装饰器
(应用启动时完成)
执行阶段 StepResult 步骤函数
QcIssue QC 步骤函数
报告阶段 JSON report dict reporter.py
(消费所有模型对象)
15.9 本章小结
本章详细介绍了 GeoPipeAgent 的数据模型体系和参数类型系统:
核心数据模型(4 个类):
| 类 | 职责 | 关键字段 |
|---|---|---|
PipelineDefinition |
流水线定义 | name, steps, variables, crs, outputs |
StepDefinition |
步骤定义 | id, use, params, when, on_error, backend |
StepResult |
步骤执行结果 | output, stats, metadata, issues |
QcIssue |
质检问题 | rule_id, severity, feature_index, message, geometry, details |
注册表元数据(1 个类):
| 类 | 职责 | 关键字段 |
|---|---|---|
StepInfo |
步骤注册信息 | id, name, params, input_type, output_type |
关键设计特征:
StepResult.__getattr__:魔法方法实现透明属性访问,result.feature_count自动从stats字典获取QcIssue.to_dict():将几何对象序列化为 WKT 字符串,确保 JSON 兼容StepInfo.get_param():根据参数名获取类型声明,支持执行器的类型转换
参数类型系统:
- 支持 10 种参数类型:
geodataframe、raster_info、string、number、integer、boolean、list、dict、point、crs - 执行器在运行时自动进行类型转换(如 YAML 字符串
"100"→ Python 浮点数100.0) $steps和$var引用由上下文管理器在执行阶段解析
这些数据模型构成了 GeoPipeAgent 的基础类型骨架,贯穿于 YAML 解析、流水线校验、步骤执行和报告生成的完整生命周期。