第十九章:自定义步骤与扩展开发
19.1 概述
GeoPipeAgent 的步骤体系是完全可扩展的。通过 @step 装饰器,任何 Python 函数都可以成为框架中的一个可用步骤,享受变量引用解析、条件执行、重试、后端选择等所有框架特性。
扩展方式有三种:
- 添加自定义步骤:实现特定 GIS 分析逻辑的步骤函数
- 扩展现有步骤:为现有步骤添加新后端支持
- 添加自定义后端:实现新的 GIS 工具链接入
19.2 创建自定义步骤
19.2.1 最简单的自定义步骤
# src/geopipe_agent/steps/custom/my_steps.py
from geopipe_agent.steps.registry import step, StepContext
from geopipe_agent.models.result import StepResult
from geopipe_agent.models.qc import QcIssue
@step(
id="custom.centroid",
name="计算要素中心点",
category="custom",
description="将多边形/线要素转换为其质心点",
params={
"input": {
"required": True,
"type": "geodataframe",
"description": "输入 GeoDataFrame(Polygon 或 LineString)"
},
"geometry_only": {
"required": False,
"type": "boolean",
"description": "是否仅保留几何,丢弃属性(默认 False)",
"default": False
}
},
backends=["native_python"],
)
def centroid_step(ctx: StepContext) -> StepResult:
"""将要素转换为质心点。"""
gdf = ctx.input()
geometry_only = ctx.param("geometry_only", False)
# 核心逻辑:计算质心
result = gdf.copy()
result["geometry"] = gdf.centroid
if geometry_only:
result = result[["geometry"]]
return StepResult(
output=result,
stats={
"feature_count": len(result),
"crs": str(result.crs),
"geometry_type": "Point"
}
)
将文件放入步骤目录:
src/geopipe_agent/steps/custom/
├── __init__.py
└── my_steps.py
确保 __init__.py 存在(可以为空),框架启动时会自动发现并注册 custom.centroid 步骤。
验证注册:
geopipe-agent list-steps --category custom
# 应显示:custom.centroid 计算要素中心点 custom native_python
19.2.2 带质检功能的步骤
@step(
id="custom.check_area",
name="面积范围检查",
category="custom",
description="检查面要素的面积是否在指定范围内",
params={
"input": {"required": True, "type": "geodataframe", "description": "输入面要素"},
"min_area": {"required": False, "type": "number", "description": "最小面积(CRS 单位²)"},
"max_area": {"required": False, "type": "number", "description": "最大面积(CRS 单位²)"},
"area_field": {"required": False, "type": "string", "description": "计算面积的字段名,默认几何面积"}
},
backends=["native_python"],
)
def check_area_step(ctx: StepContext) -> StepResult:
"""检查面要素面积是否在合理范围内。"""
import geopandas as gpd
gdf = ctx.input()
min_area = ctx.param("min_area", None)
max_area = ctx.param("max_area", None)
area_field = ctx.param("area_field", None)
# 计算面积
if area_field and area_field in gdf.columns:
areas = gdf[area_field]
else:
areas = gdf.geometry.area
issues = []
for idx, area in areas.items():
if min_area is not None and area < min_area:
issues.append(QcIssue(
type="area_too_small",
message=f"要素 {idx} 面积 {area:.2f} 小于最小值 {min_area}",
fid=idx,
severity="error"
))
if max_area is not None and area > max_area:
issues.append(QcIssue(
type="area_too_large",
message=f"要素 {idx} 面积 {area:.2f} 大于最大值 {max_area}",
fid=idx,
severity="error"
))
return StepResult(
output=gdf,
stats={
"checked_count": len(gdf),
"issue_count": len(issues),
"min_area": float(areas.min()),
"max_area": float(areas.max()),
"mean_area": float(areas.mean()),
},
issues=issues
)
19.2.3 支持多种数据类型的步骤
@step(
id="custom.add_uuid",
name="添加 UUID 字段",
category="custom",
description="为每个要素添加唯一 UUID 标识字段",
params={
"input": {"required": True, "type": "geodataframe", "description": "输入 GeoDataFrame"},
"field_name": {"required": False, "type": "string", "description": "UUID 字段名,默认 'uuid'"},
"overwrite": {"required": False, "type": "boolean", "description": "字段已存在时是否覆盖"}
},
)
def add_uuid_step(ctx: StepContext) -> StepResult:
"""为每个要素添加唯一 UUID 标识。"""
import uuid
gdf = ctx.input()
field_name = ctx.param("field_name", "uuid")
overwrite = ctx.param("overwrite", False)
if field_name in gdf.columns and not overwrite:
# 字段已存在且不覆盖:直接返回
return StepResult(
output=gdf,
stats={"feature_count": len(gdf), "action": "skipped (field exists)"}
)
result = gdf.copy()
result[field_name] = [str(uuid.uuid4()) for _ in range(len(gdf))]
return StepResult(
output=result,
stats={"feature_count": len(result), "field_added": field_name}
)
19.3 @step 装饰器参数完全参考
@step(
id="category.action_name", # 必填,格式:category.action
name="步骤显示名称", # 必填,UI 和文档中显示的名称
category="category", # 必填,步骤类别
description="步骤描述文字", # 必填,详细描述
params={ # 可选,参数规格字典
"param_name": {
"required": True/False, # 是否必填
"type": "geodataframe/string/number/boolean/array/dict", # 参数类型
"description": "参数描述", # 参数说明
"default": default_value, # 默认值(required=False 时建议设置)
"enum": ["val1", "val2"], # 可选:枚举允许值
}
},
backends=["native_python"], # 可选,支持的后端列表
)
def my_step_function(ctx: StepContext) -> StepResult:
...
参数类型标识:
| 类型标识 | 对应 Python 类型 | 说明 |
|---|---|---|
geodataframe |
geopandas.GeoDataFrame |
矢量数据 |
raster |
dict(含 data 和 meta) | 栅格数据 |
string |
str |
字符串 |
number |
int / float |
数值 |
boolean |
bool |
布尔值 |
array |
list |
列表 |
dict |
dict |
字典 |
19.4 StepContext API 完全参考
在步骤函数中,通过 ctx 参数访问以下方法和属性:
def my_step(ctx: StepContext) -> StepResult:
# 获取参数(推荐)
gdf = ctx.input() # 等价于 ctx.param("input")
distance = ctx.param("distance") # 获取必填参数
style = ctx.param("cap_style", "round") # 获取可选参数(带默认值)
# 获取所有参数字典
all_params = ctx.params # dict,已解析所有引用
# 获取后端(如步骤需要后端支持)
backend = ctx.backend # GeopipeBackend 实例或 None
# 访问全局流水线上下文(高级用法)
pipeline_ctx = ctx.pipeline_context # PipelineContext 实例
variables = pipeline_ctx.variables # 全局变量字典
return StepResult(...)
19.5 StepResult 构建最佳实践
from geopipe_agent.models.result import StepResult
from geopipe_agent.models.qc import QcIssue
# 基本使用
return StepResult(
output=processed_gdf,
stats={
"feature_count": len(processed_gdf),
"crs": str(processed_gdf.crs),
# 可添加任何自定义统计信息
"min_area": float(processed_gdf.geometry.area.min()),
"processing_note": "已过滤 5 个无效几何"
}
)
# 含质检结果
return StepResult(
output=gdf,
stats={"checked_count": len(gdf)},
issues=[
QcIssue(type="gap", message="要素 42 与要素 43 之间存在缝隙", fid=42),
QcIssue(type="overlap", message="要素 10 与要素 15 存在重叠", fid=10),
]
)
# 纯统计(不修改数据)
return StepResult(
output=input_gdf, # 原样返回输入
stats={
"feature_count": len(input_gdf),
"total_length": float(input_gdf.geometry.length.sum()),
"total_area": float(input_gdf.geometry.area.sum()),
}
)
19.6 完整自定义步骤示例:建筑物面积分级
以下是一个完整的自定义步骤示例,展示了实际 GIS 分析逻辑:
# src/geopipe_agent/steps/custom/building_steps.py
import geopandas as gpd
import pandas as pd
from geopipe_agent.steps.registry import step, StepContext
from geopipe_agent.models.result import StepResult
@step(
id="custom.building_classify",
name="建筑物面积分级",
category="custom",
description="根据建筑物占地面积对建筑物进行分级分类",
params={
"input": {
"required": True,
"type": "geodataframe",
"description": "输入建筑物多边形 GeoDataFrame"
},
"breaks": {
"required": False,
"type": "array",
"description": "面积分级断点(平方米),如 [100, 500, 2000, 10000]",
"default": [100, 500, 2000, 10000]
},
"labels": {
"required": False,
"type": "array",
"description": "各级别标签,数量应为 breaks 长度加 1",
"default": ["微型", "小型", "中型", "大型", "超大型"]
},
"output_field": {
"required": False,
"type": "string",
"description": "分级结果字段名",
"default": "size_class"
}
},
backends=["native_python"],
)
def building_classify_step(ctx: StepContext) -> StepResult:
"""按建筑物占地面积进行分级。"""
gdf = ctx.input()
breaks = ctx.param("breaks", [100, 500, 2000, 10000])
labels = ctx.param("labels", ["微型", "小型", "中型", "大型", "超大型"])
output_field = ctx.param("output_field", "size_class")
# 参数验证
if len(labels) != len(breaks) + 1:
raise ValueError(
f"labels 数量 ({len(labels)}) 应为 breaks 数量 ({len(breaks)}) 加 1"
)
# 计算面积(使用当前 CRS 的面积单位)
result = gdf.copy()
areas = gdf.geometry.area
# 使用 pd.cut 进行分级
bins = [float('-inf')] + list(breaks) + [float('inf')]
result[output_field] = pd.cut(
areas,
bins=bins,
labels=labels,
right=True
).astype(str)
# 统计各级别数量
class_counts = result[output_field].value_counts().to_dict()
return StepResult(
output=result,
stats={
"feature_count": len(result),
"class_distribution": class_counts,
"min_area": float(areas.min()),
"max_area": float(areas.max()),
"mean_area": float(areas.mean()),
"output_field": output_field
}
)
在 YAML 中使用自定义步骤
name: 建筑物面积分析
steps:
- id: load
use: io.read_vector
params:
path: "data/buildings.shp"
- id: reproject
use: vector.reproject
params:
input: $load
target_crs: "EPSG:3857"
- id: classify
use: custom.building_classify # 使用自定义步骤
params:
input: $reproject
breaks: [50, 200, 1000, 5000]
labels: ["小房", "普通", "大楼", "商业楼", "超高层"]
output_field: "building_size"
- id: save
use: io.write_vector
params:
input: $classify
path: "output/buildings_classified.gpkg"
19.7 自定义后端
如果需要接入新的 GIS 工具(如 PostGIS、ArcGIS Pro Python API 等),可以创建自定义后端:
# src/geopipe_agent/backends/postgis_backend.py
from geopipe_agent.backends.base import GeopipeBackend
class PostgisBackend(GeopipeBackend):
def name(self) -> str:
return "postgis"
def is_available(self) -> bool:
try:
import psycopg2
return True
except ImportError:
return False
def execute_sql(self, connection_string: str, sql: str):
"""执行 PostGIS SQL 查询。"""
import psycopg2
import geopandas as gpd
from sqlalchemy import create_engine
engine = create_engine(connection_string)
return gpd.read_postgis(sql, engine)
注册自定义后端:
# 在 backends/__init__.py 中添加
from geopipe_agent.backends.postgis_backend import PostgisBackend
_BACKEND_CLASSES.append(PostgisBackend)
19.8 开发规范
步骤函数命名
- 文件名:
{category}_steps.py(如custom_steps.py) - 函数名:
{action}_step(如centroid_step) - 步骤 ID:
{category}.{action}(如custom.centroid)
错误处理
步骤函数中应不捕获异常,让框架统一处理:
# ✅ 正确:让异常向上传播,框架根据 on_error 策略处理
def my_step(ctx: StepContext) -> StepResult:
gdf = ctx.input()
result = gdf.buffer(ctx.param("distance")) # 出错会自动传播
return StepResult(output=result)
# ❌ 错误:过度捕获异常,导致框架无法正确处理
def my_step(ctx: StepContext) -> StepResult:
try:
gdf = ctx.input()
result = gdf.buffer(ctx.param("distance"))
return StepResult(output=result)
except Exception as e:
print(f"Error: {e}") # 吃掉异常,框架看不到错误
return StepResult() # 返回空结果,后续步骤出错更难排查
参数默认值
通过 ctx.param("name", default_value) 提供默认值,而不是在 @step 装饰器的 params 中定义逻辑:
# ✅ 推荐
def my_step(ctx: StepContext) -> StepResult:
cap_style = ctx.param("cap_style", "round") # 代码级默认值
输出统计
始终在 stats 中提供 feature_count(矢量步骤),便于报告解析和 when 表达式引用:
return StepResult(
output=result,
stats={
"feature_count": len(result), # 必须包含
"crs": str(result.crs), # 推荐包含
# 其他自定义统计...
}
)
19.9 运行测试
对自定义步骤编写单元测试:
# tests/test_custom_steps.py
import pytest
import geopandas as gpd
from shapely.geometry import Polygon
from geopipe_agent.steps.custom.building_steps import building_classify_step
from geopipe_agent.engine.context import StepContext, PipelineContext
def create_test_gdf():
"""创建测试用 GeoDataFrame。"""
geoms = [Polygon([(0,0), (1,0), (1,1), (0,1)]) for _ in range(5)]
return gpd.GeoDataFrame({"name": [f"bldg_{i}" for i in range(5)]},
geometry=geoms, crs="EPSG:3857")
def make_ctx(params: dict) -> StepContext:
"""创建测试用 StepContext。"""
ctx = StepContext(params=params, pipeline_context=PipelineContext())
return ctx
def test_building_classify_basic():
gdf = create_test_gdf()
ctx = make_ctx({"input": gdf, "breaks": [0.5, 1.5], "labels": ["小", "中", "大"]})
result = building_classify_step(ctx)
assert result.output is not None
assert "size_class" in result.output.columns
assert result.stats["feature_count"] == 5
def test_building_classify_invalid_labels():
gdf = create_test_gdf()
ctx = make_ctx({"input": gdf, "breaks": [0.5, 1.5], "labels": ["小", "大"]}) # labels 数量错误
with pytest.raises(ValueError, match="labels 数量"):
building_classify_step(ctx)
运行测试:
pip install -e ".[dev]"
pytest tests/test_custom_steps.py -v
19.10 小结
本章介绍了 GeoPipeAgent 的自定义步骤与扩展开发:
@step装饰器:将普通 Python 函数注册为框架步骤,自动享受所有框架特性- 自动发现:步骤文件放到正确目录即自动注册,无需手动修改注册表
StepContext:通过ctx.param()/ctx.input()获取已解析的参数StepResult:包含output(数据)+stats(统计)+issues(质检问题)- 最佳实践:不捕获异常(让框架处理)、提供
feature_count(便于引用) - 测试:通过
StepContext构造函数直接单元测试步骤函数
下一章将介绍 Docker 部署、运行测试和最佳实践。