第十八章:自定义步骤与扩展开发
GeoPipeAgent 的步骤系统完全开放,通过 @step 装饰器可以轻松注册自定义步骤,扩展框架的 GIS 分析能力。本章介绍自定义步骤的开发、注册和使用方法。
18.1 步骤注册系统
@step 装饰器
所有步骤(内置和自定义)都通过 @step 装饰器注册到全局注册表(registry._steps):
from geopipe_agent.steps.registry import step
from geopipe_agent.engine.context import StepContext
from geopipe_agent.models.result import StepResult
@step(
id="my_category.my_action", # 步骤注册 ID(必填)
name="我的自定义步骤", # 显示名称
description="步骤功能描述", # 步骤描述
category="my_category", # 类别(影响 list-steps 分组)
params={ # 参数规范字典
"input": {
"type": "geodataframe",
"required": True,
"description": "输入矢量数据",
},
"my_param": {
"type": "number",
"required": False,
"default": 100,
"description": "自定义参数",
},
},
outputs={ # 输出规范
"output": {"type": "geodataframe", "description": "处理结果"},
},
backends=["native_python"], # 支持的后端(可省略)
examples=[ # 示例(用于 Skill 文档)
{
"description": "基础用法",
"params": {"input": "$load.output", "my_param": 200},
},
],
)
def my_custom_step(ctx: StepContext) -> StepResult:
"""步骤实现函数,接收 StepContext,返回 StepResult"""
gdf = ctx.input("input") # 获取 input 参数(GeoDataFrame)
my_param = ctx.param("my_param", 100) # 获取可选参数
# 执行处理逻辑
result_gdf = gdf.copy()
result_gdf["my_field"] = my_param
return StepResult(
output=result_gdf,
stats={"feature_count": len(result_gdf)},
)
注册 ID 规范
步骤 ID 使用点号分隔的 类别.动作 格式:
| 格式 | 示例 | 说明 |
|---|---|---|
category.action |
vector.buffer |
推荐格式 |
org.category.action |
acme.vector.polygon_merge |
带组织前缀,避免冲突 |
custom.action |
custom.fix_topology |
通用自定义步骤 |
18.2 StepContext 用法
步骤函数通过 StepContext 获取所有已解析(变量和引用已替换)的参数:
def my_step(ctx: StepContext) -> StepResult:
# 获取参数(带默认值)
gdf = ctx.param("input") # 等价于 ctx.input("input")
distance = ctx.param("distance") # 必填参数,None 如不提供
cap = ctx.param("cap_style", "round") # 带默认值
# 简写:ctx.input() 等价于 ctx.param("input")
gdf = ctx.input("input")
# 访问后端(仅当 backends 列表非空时)
backend = ctx.backend # GeoBackend 对象或 None
# 访问完整参数字典
all_params = ctx.params
18.3 完整示例:开发空间连接步骤
以下是一个完整的自定义步骤实现:空间连接(Spatial Join)。
# my_steps/spatial_join.py
from __future__ import annotations
from geopipe_agent.steps.registry import step
from geopipe_agent.engine.context import StepContext
from geopipe_agent.models.result import StepResult
@step(
id="custom.spatial_join",
name="空间连接",
description="将右表的属性通过空间关系连接到左表",
category="custom",
params={
"left": {
"type": "geodataframe",
"required": True,
"description": "左表(要素保留数量与左表一致)",
},
"right": {
"type": "geodataframe",
"required": True,
"description": "右表(属性来源)",
},
"how": {
"type": "string",
"required": False,
"default": "left",
"enum": ["left", "right", "inner"],
"description": "连接方式",
},
"op": {
"type": "string",
"required": False,
"default": "intersects",
"enum": ["intersects", "contains", "within"],
"description": "空间关系谓词",
},
"suffix_left": {
"type": "string",
"required": False,
"default": "_left",
"description": "左表重名字段后缀",
},
"suffix_right": {
"type": "string",
"required": False,
"default": "_right",
"description": "右表重名字段后缀",
},
},
outputs={
"output": {"type": "geodataframe", "description": "空间连接结果"},
},
examples=[
{
"description": "点-面空间连接(将行政区属性连接到 POI 点)",
"params": {
"left": "$load-poi.output",
"right": "$load-districts.output",
"how": "left",
"op": "within",
},
},
],
)
def custom_spatial_join(ctx: StepContext) -> StepResult:
import geopandas as gpd
left_gdf = ctx.param("left")
right_gdf = ctx.param("right")
how = ctx.param("how", "left")
op = ctx.param("op", "intersects")
suffix_left = ctx.param("suffix_left", "_left")
suffix_right = ctx.param("suffix_right", "_right")
# 执行空间连接
result_gdf = gpd.sjoin(
left_gdf,
right_gdf,
how=how,
predicate=op,
lsuffix=suffix_left.lstrip("_"),
rsuffix=suffix_right.lstrip("_"),
)
stats = {
"feature_count": len(result_gdf),
"left_count": len(left_gdf),
"right_count": len(right_gdf),
"join_method": how,
"spatial_predicate": op,
}
return StepResult(output=result_gdf, stats=stats)
18.4 自动发现机制
GeoPipeAgent 使用 pkgutil.walk_packages 在包初始化时自动发现并加载内置步骤:
# src/geopipe_agent/__init__.py
import pkgutil
import importlib
import geopipe_agent.steps as steps_pkg
for finder, name, ispkg in pkgutil.walk_packages(
steps_pkg.__path__, prefix=steps_pkg.__name__ + "."
):
if not name.endswith("._") and not any(n.startswith("_") for n in name.split(".")):
importlib.import_module(name)
import geopipe_agent 时,所有 steps/ 子目录下的模块都被自动导入,@step 装饰器在模块导入时即注册步骤。
如何让自定义步骤也被自动发现:
方案一:直接 import(最简单)
在使用自定义步骤的 Python 脚本中,在 import geopipe_agent 之后导入自定义步骤模块:
import geopipe_agent # 加载内置步骤
# 加载自定义步骤(触发 @step 注册)
import my_steps.spatial_join
import my_steps.polygon_merge
from geopipe_agent.engine.parser import parse_yaml
from geopipe_agent.engine.executor import execute_pipeline
pipeline = parse_yaml("pipeline.yaml")
report = execute_pipeline(pipeline)
方案二:创建插件包(可分发)
创建一个 Python 包,在其 __init__.py 中导入所有自定义步骤,通过 pip install 安装后自动注册:
my-geopipe-steps/
├── pyproject.toml
└── src/
└── my_geopipe_steps/
├── __init__.py # 导入所有步骤模块
├── spatial_join.py
└── polygon_merge.py
__init__.py:
# 导入所有步骤(触发 @step 注册)
from . import spatial_join
from . import polygon_merge
安装后,在 Python 脚本开头导入:
import geopipe_agent
import my_geopipe_steps # 注册自定义步骤
方案三:修改内置步骤(不推荐)
直接将自定义步骤放入 src/geopipe_agent/steps/custom/ 目录,自动发现机制会加载它们。仅适合本地修改,不适合分发。
18.5 开发规范与最佳实践
5.1 错误处理
步骤内的异常会被 executor 捕获并包装为 StepExecutionError,不需要在步骤中手动包装。但可以提供清晰的错误消息:
def my_step(ctx: StepContext) -> StepResult:
gdf = ctx.input("input")
field = ctx.param("field")
if field not in gdf.columns:
raise ValueError(
f"Field '{field}' not found in input data. "
f"Available fields: {list(gdf.columns)}"
)
# ... 处理逻辑
5.2 延迟导入重依赖
将可选依赖放在函数内部导入,避免在步骤注册时(模块 import 时)就引入依赖报错:
@step(id="custom.ml_classify", ...)
def ml_classify(ctx: StepContext) -> StepResult:
# 延迟导入,只在步骤实际执行时才尝试导入
try:
from sklearn.ensemble import RandomForestClassifier
except ImportError:
raise ImportError(
"scikit-learn is required for 'custom.ml_classify'. "
"Install it with: pip install scikit-learn"
)
# ... 使用 sklearn
5.3 stats 字段命名
stats 字典的键会通过 $step.key 语法暴露给后续步骤,使用清晰的名称:
return StepResult(
output=result_gdf,
stats={
"feature_count": len(result_gdf), # 标准命名,与内置步骤一致
"join_count": len(joined_gdf), # 自定义统计
"skip_count": len(skipped), # 跳过数量
}
)
5.4 QC 步骤的特殊模式
自定义 QC 步骤应遵循”检查并透传”模式:
from geopipe_agent.models.qc import QcIssue
from geopipe_agent.steps.qc._helpers import build_issues_gdf
@step(id="custom.qc_area_check", ...)
def custom_area_check(ctx: StepContext) -> StepResult:
gdf = ctx.input("input")
min_area = ctx.param("min_area", 1.0)
issues = []
for idx, row in gdf.iterrows():
area = row.geometry.area
if area < min_area:
issues.append(QcIssue(
rule_id="area_check",
severity="warning",
feature_index=idx,
message=f"Feature {idx}: area {area:.4f} < min {min_area}",
details={"area": area, "min_area": min_area},
))
return StepResult(
output=gdf, # 透传输入数据
stats={
"issues_count": len(issues),
"valid_count": len(gdf) - len(issues),
},
metadata={"issues_gdf": build_issues_gdf(gdf, issues)},
issues=issues,
)
18.6 在 YAML 中使用自定义步骤
自定义步骤注册后,可直接在 YAML 中使用:
pipeline:
name: "使用自定义步骤的流水线"
steps:
- id: load-poi
use: io.read_vector
params: { path: "data/poi.shp" }
- id: load-districts
use: io.read_vector
params: { path: "data/districts.shp" }
# 使用自定义空间连接步骤
- id: join-district-attrs
use: custom.spatial_join
params:
left: "$load-poi"
right: "$load-districts"
how: "left"
op: "within"
- id: save-result
use: io.write_vector
params:
input: "$join-district-attrs"
path: "output/poi_with_district.geojson"
18.7 本章小结
本章介绍了自定义步骤的开发与集成:
@step装饰器:注册步骤,定义参数规范、输出、后端和示例StepContext:ctx.param()、ctx.input()、ctx.backend获取参数和后端StepResult:output、stats、metadata、issues四个字段- 自动发现:
import geopipe_agent自动加载内置步骤,自定义步骤需手动 import 或打包 - 最佳实践:清晰错误消息、延迟导入依赖、QC 步骤透传数据