第19章:自定义步骤开发
本章详细指导如何开发自定义步骤来扩展 GeoPipeAgent 的分析能力,包括 @step 装饰器、参数和输出定义规范、矢量/栅格/QC 步骤的完整开发示例、自动发现机制以及测试和最佳实践。
19.1 自定义步骤概述
19.1.1 为什么需要自定义步骤
GeoPipeAgent 内置了 33 个步骤,覆盖了大部分常见的 GIS 分析需求。但在实际项目中,可能需要:
- 特定业务逻辑:如城市规划中的容积率计算、交通分析中的可达性评估
- 专用质检规则:如特定行业标准的数据质检项
- 第三方库集成:如集成 scikit-learn 的空间聚类、NetworkX 的网络分析
- 自定义数据源:如读取特定格式的遥感数据或行业数据库
19.1.2 扩展方式
GeoPipeAgent 采用 装饰器 + 自动发现 的扩展方式:
- 使用
@step装饰器定义步骤元数据 - 将步骤文件放在指定目录
- 框架通过
pkgutil自动发现并注册
这种方式的优势是 零配置——开发者只需编写步骤代码并放到正确的位置,无需修改任何注册文件。
19.1.3 开发流程概览
1. 确定步骤的类别和功能
│
2. 创建步骤文件
│ src/geopipe_agent/steps/<category>/<step_name>.py
│
3. 使用 @step 装饰器定义元数据
│ id, name, description, category, params, outputs, backends
│
4. 实现步骤函数
│ 接收 StepContext → 处理数据 → 返回 StepResult
│
5. 编写测试
│ tests/steps/<category>/test_<step_name>.py
│
6. 验证集成
│ geopipe-agent list-steps # 确认步骤已注册
│ geopipe-agent describe <step_id> # 查看步骤详情
19.2 步骤开发基础
19.2.1 @step 装饰器
@step 装饰器是定义步骤的核心工具,它将一个普通函数注册为 GeoPipeAgent 步骤:
from geopipe_agent import step, StepContext, StepResult
@step(
id="custom.my_step", # 步骤唯一标识
name="My Custom Step", # 显示名称
description="步骤的详细描述", # 描述信息
category="custom", # 类别
params={...}, # 参数定义
outputs={...}, # 输出定义
backends=["native_python"], # 支持的后端
)
def my_custom_step(ctx: StepContext) -> StepResult:
# 步骤实现
...
19.2.2 @step 装饰器参数说明
| 参数 | 类型 | 必需 | 说明 |
|---|---|---|---|
id |
str |
✓ | 步骤唯一标识,格式:category.action |
name |
str |
✓ | 步骤的人类可读名称 |
description |
str |
✓ | 步骤的详细描述 |
category |
str |
✓ | 步骤类别(如 vector, raster, qc, custom) |
params |
dict |
✓ | 参数定义字典 |
outputs |
dict |
✓ | 输出定义字典 |
backends |
list |
✓ | 支持的后端列表 |
19.2.3 StepContext 对象
StepContext 封装了步骤执行时的上下文环境:
class StepContext:
"""步骤执行上下文"""
def input(self, name: str):
"""获取输入数据(通过引用 $step_id 传入的数据)"""
...
def param(self, name: str, default=None):
"""获取参数值"""
...
def get_variable(self, name: str):
"""获取流水线变量值"""
...
@property
def logger(self):
"""获取日志记录器"""
...
主要方法:
| 方法 | 说明 | 示例 |
|---|---|---|
ctx.input("input") |
获取名为 input 的输入数据 |
gdf = ctx.input("input") |
ctx.param("distance") |
获取参数值 | dist = ctx.param("distance") |
ctx.param("cap_style", "round") |
获取参数值(带默认值) | 参数未提供时返回默认值 |
ctx.logger |
获取日志器 | ctx.logger.info("处理中...") |
19.2.4 StepResult 对象
StepResult 封装步骤的执行结果:
class StepResult:
"""步骤执行结果"""
def __init__(self, output=None, stats=None):
"""
Args:
output: 主输出数据(GeoDataFrame、栅格数据或文件路径等)
stats: 统计信息字典,可在后续步骤的 when 条件中引用
"""
self.output = output
self.stats = stats or {}
| 属性 | 类型 | 说明 |
|---|---|---|
output |
Any |
主输出数据,可被后续步骤通过 $step_id 引用 |
stats |
dict |
统计信息,可通过 $step_id.stats.key 在 when 条件中使用 |
19.3 参数定义规范
19.3.1 params 字典结构
每个参数在 params 字典中定义为一个子字典:
params={
"input": {
"type": "layer",
"required": True,
"description": "输入矢量图层",
},
"distance": {
"type": "number",
"required": True,
"description": "缓冲距离(地图单位)",
},
"resolution": {
"type": "integer",
"required": False,
"default": 16,
"description": "缓冲区圆弧分辨率",
},
"cap_style": {
"type": "string",
"required": False,
"default": "round",
"description": "端点样式",
"enum": ["round", "flat", "square"],
},
}
19.3.2 参数属性说明
| 属性 | 类型 | 必需 | 说明 |
|---|---|---|---|
type |
str |
✓ | 参数类型 |
required |
bool |
✓ | 是否必需 |
description |
str |
✓ | 参数描述 |
default |
Any |
可选参数必需 | 默认值 |
enum |
list |
— | 允许的值列表 |
19.3.3 支持的参数类型
| 类型 | 说明 | Python 类型 | 示例值 |
|---|---|---|---|
layer |
矢量或栅格图层(通过 $引用传入) | GeoDataFrame / DatasetReader |
$load_data |
string |
字符串 | str |
"EPSG:4326" |
number |
数值(整数或浮点) | int / float |
500, 0.95 |
integer |
整数 | int |
16 |
boolean |
布尔值 | bool |
true, false |
array |
数组/列表 | list |
[116.0, 39.0, 117.0, 40.0] |
object |
字典/对象 | dict |
{"field1": "name"} |
19.3.4 enum 枚举约束
enum 属性限制参数只能取预定义的值:
"join_style": {
"type": "string",
"required": False,
"default": "round",
"description": "连接样式",
"enum": ["round", "mitre", "bevel"],
}
在校验阶段,框架会自动检查参数值是否在 enum 列表中。
19.4 输出定义规范
19.4.1 outputs 字典结构
outputs={
"output": {
"type": "layer",
"description": "处理后的矢量图层",
},
}
19.4.2 输出属性说明
| 属性 | 类型 | 必需 | 说明 |
|---|---|---|---|
type |
str |
✓ | 输出类型(layer, string, object 等) |
description |
str |
✓ | 输出描述 |
19.4.3 多输出步骤
大多数步骤只有一个 output,但某些步骤可以有多个输出:
outputs={
"output": {
"type": "layer",
"description": "主输出图层",
},
"report": {
"type": "object",
"description": "质检报告",
},
}
19.5 开发一个矢量步骤:面积计算
19.5.1 需求描述
开发一个步骤,为每个多边形要素计算面积并添加为新列:
- 输入:多边形矢量图层
- 参数:面积单位(平方米、平方千米、公顷)、输出列名
- 输出:添加了面积列的矢量图层
19.5.2 完整实现
# src/geopipe_agent/steps/vector/calculate_area.py
from geopipe_agent import step, StepContext, StepResult
@step(
id="vector.calculate_area",
name="Calculate Area",
description="计算多边形要素的面积并添加为新列",
category="vector",
params={
"input": {
"type": "layer",
"required": True,
"description": "输入多边形矢量图层",
},
"unit": {
"type": "string",
"required": False,
"default": "sqm",
"description": "面积单位",
"enum": ["sqm", "sqkm", "hectare"],
},
"column_name": {
"type": "string",
"required": False,
"default": "area",
"description": "面积列的列名",
},
},
outputs={
"output": {
"type": "layer",
"description": "添加了面积列的矢量图层",
},
},
backends=["native_python"],
)
def calculate_area(ctx: StepContext) -> StepResult:
"""计算多边形面积"""
# 获取输入和参数
gdf = ctx.input("input")
unit = ctx.param("unit", "sqm")
column_name = ctx.param("column_name", "area")
ctx.logger.info(f"计算面积,单位: {unit},输出列: {column_name}")
# 复制以避免修改原始数据
result = gdf.copy()
# 计算面积(以地图单位的平方为基础)
area = result.geometry.area
# 单位转换
UNIT_FACTORS = {
"sqm": 1.0, # 平方米
"sqkm": 1e-6, # 平方千米
"hectare": 1e-4, # 公顷
}
factor = UNIT_FACTORS.get(unit, 1.0)
result[column_name] = area * factor
# 统计信息
stats = {
"feature_count": len(result),
"min_area": float(result[column_name].min()),
"max_area": float(result[column_name].max()),
"mean_area": float(result[column_name].mean()),
"total_area": float(result[column_name].sum()),
"unit": unit,
}
ctx.logger.info(
f"面积计算完成: {stats['feature_count']} 个要素, "
f"总面积: {stats['total_area']:.2f} {unit}"
)
return StepResult(output=result, stats=stats)
19.5.3 在流水线中使用
name: 面积计算示例
steps:
- id: load_parcels
use: io.read_vector
params:
file: data/parcels.gpkg
- id: reproject
use: vector.reproject
params:
input: $load_parcels
target_crs: "EPSG:3857" # 投影坐标系,面积计算更准确
- id: calc_area
use: vector.calculate_area
params:
input: $reproject
unit: hectare
column_name: area_ha
- id: save
use: io.write_vector
params:
input: $calc_area
file: output/parcels_with_area.gpkg
19.6 开发一个栅格步骤:栅格重采样
19.6.1 需求描述
开发一个步骤,对栅格数据进行空间分辨率重采样:
- 输入:栅格数据
- 参数:目标分辨率、重采样方法
- 输出:重采样后的栅格数据
19.6.2 完整实现
# src/geopipe_agent/steps/raster/resample.py
import numpy as np
from rasterio.enums import Resampling
from rasterio.transform import from_bounds
from geopipe_agent import step, StepContext, StepResult
@step(
id="raster.resample",
name="Raster Resample",
description="对栅格数据进行空间分辨率重采样",
category="raster",
params={
"input": {
"type": "layer",
"required": True,
"description": "输入栅格数据",
},
"resolution": {
"type": "number",
"required": True,
"description": "目标分辨率(地图单位)",
},
"method": {
"type": "string",
"required": False,
"default": "bilinear",
"description": "重采样方法",
"enum": ["nearest", "bilinear", "cubic", "average", "mode"],
},
},
outputs={
"output": {
"type": "layer",
"description": "重采样后的栅格数据",
},
},
backends=["native_python"],
)
def resample(ctx: StepContext) -> StepResult:
"""栅格重采样"""
dataset = ctx.input("input")
target_res = ctx.param("resolution")
method_name = ctx.param("method", "bilinear")
ctx.logger.info(
f"栅格重采样: {dataset.res} -> ({target_res}, {target_res}), "
f"方法: {method_name}"
)
# 选择重采样方法
METHODS = {
"nearest": Resampling.nearest,
"bilinear": Resampling.bilinear,
"cubic": Resampling.cubic,
"average": Resampling.average,
"mode": Resampling.mode,
}
resampling = METHODS.get(method_name, Resampling.bilinear)
# 计算新的尺寸
bounds = dataset.bounds
new_width = int((bounds.right - bounds.left) / target_res)
new_height = int((bounds.top - bounds.bottom) / target_res)
# 计算新的变换矩阵
new_transform = from_bounds(
bounds.left, bounds.bottom, bounds.right, bounds.top,
new_width, new_height
)
# 读取并重采样数据
data = dataset.read(
out_shape=(dataset.count, new_height, new_width),
resampling=resampling,
)
# 构建结果元数据
result_meta = dataset.meta.copy()
result_meta.update({
"width": new_width,
"height": new_height,
"transform": new_transform,
})
stats = {
"original_resolution": list(dataset.res),
"target_resolution": [target_res, target_res],
"original_size": [dataset.width, dataset.height],
"new_size": [new_width, new_height],
"method": method_name,
"bands": dataset.count,
}
ctx.logger.info(
f"重采样完成: {dataset.width}x{dataset.height} -> "
f"{new_width}x{new_height}"
)
return StepResult(
output={"data": data, "meta": result_meta},
stats=stats,
)
19.6.3 在流水线中使用
name: 栅格重采样示例
steps:
- id: load_dem
use: io.read_raster
params:
file: data/dem_10m.tif
- id: resample
use: raster.resample
params:
input: $load_dem
resolution: 30
method: bilinear
- id: save
use: io.write_raster
params:
input: $resample
file: output/dem_30m.tif
19.7 开发一个 QC 步骤:属性编码规范检查
19.7.1 需求描述
开发一个质检步骤,检查矢量数据中特定列的值是否符合预定义的编码规范(如行政区划代码、土地分类代码等)。
19.7.2 QC 步骤的特殊模式
QC 步骤与普通步骤不同,它使用 QcIssue 数据类来记录质检问题,并通过 _helpers 模块提供的辅助函数简化开发:
from dataclasses import dataclass
@dataclass
class QcIssue:
"""质检问题记录"""
feature_id: int # 要素 ID
issue_type: str # 问题类型
severity: str # 严重程度: error, warning, info
message: str # 问题描述
field: str = None # 相关字段名
value: str = None # 问题值
19.7.3 完整实现
# src/geopipe_agent/steps/qc/code_standard.py
from geopipe_agent import step, StepContext, StepResult
@step(
id="qc.code_standard",
name="Code Standard Check",
description="检查属性值是否符合预定义的编码规范",
category="qc",
params={
"input": {
"type": "layer",
"required": True,
"description": "输入矢量图层",
},
"column": {
"type": "string",
"required": True,
"description": "要检查的列名",
},
"valid_codes": {
"type": "array",
"required": True,
"description": "合法的编码值列表",
},
"severity": {
"type": "string",
"required": False,
"default": "error",
"description": "问题严重程度",
"enum": ["error", "warning", "info"],
},
},
outputs={
"output": {
"type": "layer",
"description": "原始输入图层(未修改)",
},
},
backends=["native_python"],
)
def code_standard_check(ctx: StepContext) -> StepResult:
"""编码规范检查"""
gdf = ctx.input("input")
column = ctx.param("column")
valid_codes = ctx.param("valid_codes")
severity = ctx.param("severity", "error")
ctx.logger.info(f"编码规范检查: 列={column}, 合法编码数={len(valid_codes)}")
# 检查列是否存在
if column not in gdf.columns:
raise ValueError(f"列 '{column}' 不存在于数据中,可用列: {list(gdf.columns)}")
# 找出不合法的编码
valid_set = set(valid_codes)
issues = []
invalid_count = 0
null_count = 0
for idx, row in gdf.iterrows():
value = row[column]
# 检查空值
if value is None or (isinstance(value, float) and str(value) == "nan"):
null_count += 1
issues.append({
"feature_id": int(idx),
"issue_type": "null_code",
"severity": severity,
"message": f"编码为空值",
"field": column,
"value": None,
})
continue
# 检查值是否合法
if str(value) not in valid_set:
invalid_count += 1
issues.append({
"feature_id": int(idx),
"issue_type": "invalid_code",
"severity": severity,
"message": f"编码 '{value}' 不在合法范围内",
"field": column,
"value": str(value),
})
total = len(gdf)
pass_count = total - invalid_count - null_count
pass_rate = pass_count / total if total > 0 else 1.0
stats = {
"total_features": total,
"pass_count": pass_count,
"invalid_count": invalid_count,
"null_count": null_count,
"pass_rate": round(pass_rate, 4),
"issues": issues,
"column": column,
}
ctx.logger.info(
f"检查完成: 通过率 {pass_rate:.1%} "
f"({pass_count}/{total}), "
f"无效编码: {invalid_count}, 空值: {null_count}"
)
return StepResult(output=gdf, stats=stats)
19.7.4 在流水线中使用
name: 土地分类编码质检
vars:
input_file: data/land_use.gpkg
steps:
- id: load
use: io.read_vector
params:
file: ${var.input_file}
- id: check_code
use: qc.code_standard
params:
input: $load
column: land_type
valid_codes: ["011", "012", "021", "022", "031", "032", "041", "042"]
severity: error
- id: save_if_passed
use: io.write_vector
when: "$check_code.stats.pass_rate >= 0.95"
params:
input: $load
file: output/land_use_checked.gpkg
19.8 利用后端委托
19.8.1 run_backend_op 复用
对于需要支持多个后端的步骤,可以使用 run_backend_op 函数将具体操作委托给后端:
from geopipe_agent.backends import run_backend_op
@step(
id="custom.multi_backend_step",
name="Multi Backend Step",
description="支持多后端的步骤",
category="custom",
params={...},
outputs={...},
backends=["native_python", "geopandas", "qgis"],
)
def multi_backend_step(ctx: StepContext) -> StepResult:
gdf = ctx.input("input")
# 委托给后端执行
result = run_backend_op(
operation="buffer",
backend=ctx.backend, # 引擎选择的后端
input_data=gdf,
distance=ctx.param("distance"),
)
return StepResult(output=result)
19.8.2 后端选择逻辑
当步骤支持多个后端时,引擎按以下优先级选择:
- 用户在 YAML 中显式指定的
backend - 步骤
backends列表中第一个可用的后端 - 回退到
native_python
19.8.3 何时使用后端委托
| 场景 | 建议 |
|---|---|
| 简单的数据处理逻辑 | 直接在步骤函数中实现 |
| 需要 GDAL/QGIS 加速 | 使用后端委托 |
| 通用的 GIS 操作(buffer/clip 等) | 使用后端委托复用已有实现 |
| 特定的业务逻辑 | 直接实现,无需委托 |
19.9 步骤文件放置与自动发现
19.9.1 目录结构
自定义步骤文件应放置在 src/geopipe_agent/steps/ 目录下,按类别组织:
src/geopipe_agent/steps/
├── __init__.py
├── io/
│ ├── __init__.py
│ ├── read_vector.py
│ ├── write_vector.py
│ ├── read_raster.py
│ └── write_raster.py
├── vector/
│ ├── __init__.py
│ ├── buffer.py
│ ├── reproject.py
│ ├── clip.py
│ ├── overlay.py
│ ├── dissolve.py
│ ├── query.py
│ ├── simplify.py
│ └── calculate_area.py # ← 自定义步骤
├── raster/
│ ├── __init__.py
│ ├── calc.py
│ ├── reproject.py
│ ├── clip.py
│ └── resample.py # ← 自定义步骤
├── spatial/
│ ├── __init__.py
│ └── ...
├── network/
│ ├── __init__.py
│ └── ...
├── qc/
│ ├── __init__.py
│ ├── geometry_validity.py
│ ├── topology.py
│ ├── crs_check.py
│ └── code_standard.py # ← 自定义步骤
└── custom/ # ← 自定义类别
├── __init__.py
└── my_custom_step.py
19.9.2 pkgutil 自动加载机制
GeoPipeAgent 使用 Python 的 pkgutil 模块自动发现和加载步骤:
import pkgutil
import importlib
def discover_steps():
"""自动发现并加载所有步骤模块"""
steps_package = importlib.import_module("geopipe_agent.steps")
for importer, modname, ispkg in pkgutil.walk_packages(
steps_package.__path__,
prefix=steps_package.__name__ + ".",
):
try:
importlib.import_module(modname)
except ImportError as e:
logger.warning(f"无法加载步骤模块 {modname}: {e}")
工作原理:
pkgutil.walk_packages递归遍历steps/目录下的所有 Python 模块importlib.import_module导入每个模块- 模块导入时,
@step装饰器自动将步骤注册到全局注册表
19.9.3 新类别的创建
如果要创建全新的步骤类别(如 custom),需要:
- 创建目录:
src/geopipe_agent/steps/custom/ - 创建
__init__.py(可以为空文件) - 在目录中添加步骤文件
mkdir -p src/geopipe_agent/steps/custom
touch src/geopipe_agent/steps/custom/__init__.py
# 然后创建步骤文件
19.9.4 验证步骤注册
# 查看所有步骤,确认新步骤出现
geopipe-agent list-steps
# 按类别过滤
geopipe-agent list-steps --category custom
# 查看步骤详情
geopipe-agent describe custom.my_step
19.10 测试自定义步骤
19.10.1 测试目录结构
tests/
├── conftest.py # 共享的测试夹具
├── steps/
│ ├── vector/
│ │ ├── test_buffer.py
│ │ └── test_calculate_area.py # 测试自定义步骤
│ ├── raster/
│ │ └── test_resample.py
│ └── qc/
│ └── test_code_standard.py
19.10.2 测试夹具 (conftest.py)
# tests/conftest.py
import pytest
import geopandas as gpd
from shapely.geometry import Point, Polygon, box
@pytest.fixture
def sample_point_gdf():
"""创建示例点数据"""
points = [Point(116.3 + i * 0.01, 39.9 + i * 0.01) for i in range(10)]
return gpd.GeoDataFrame(
{"name": [f"point_{i}" for i in range(10)]},
geometry=points,
crs="EPSG:4326",
)
@pytest.fixture
def sample_polygon_gdf():
"""创建示例多边形数据"""
polygons = [
box(116.0 + i * 0.1, 39.0, 116.1 + i * 0.1, 39.1)
for i in range(5)
]
return gpd.GeoDataFrame(
{"name": [f"poly_{i}" for i in range(5)],
"land_type": ["011", "012", "021", "999", None]},
geometry=polygons,
crs="EPSG:4326",
)
19.10.3 步骤函数测试
# tests/steps/vector/test_calculate_area.py
import pytest
import geopandas as gpd
from shapely.geometry import box
from geopipe_agent.steps.vector.calculate_area import calculate_area
class MockStepContext:
"""模拟 StepContext"""
def __init__(self, input_data, params):
self._input = input_data
self._params = params
self.logger = MockLogger()
def input(self, name):
return self._input
def param(self, name, default=None):
return self._params.get(name, default)
class MockLogger:
def info(self, msg): pass
def debug(self, msg): pass
def warning(self, msg): pass
class TestCalculateArea:
def test_basic_area_calculation(self):
"""测试基本面积计算"""
# 创建已知面积的多边形(投影坐标系)
polygons = [box(0, 0, 100, 100)] # 100x100 = 10000 平方米
gdf = gpd.GeoDataFrame(geometry=polygons, crs="EPSG:3857")
ctx = MockStepContext(
input_data=gdf,
params={"unit": "sqm", "column_name": "area"},
)
result = calculate_area(ctx)
assert "area" in result.output.columns
assert result.stats["feature_count"] == 1
assert result.stats["unit"] == "sqm"
def test_hectare_unit(self):
"""测试公顷单位转换"""
polygons = [box(0, 0, 100, 100)] # 10000 sqm = 1 hectare
gdf = gpd.GeoDataFrame(geometry=polygons, crs="EPSG:3857")
ctx = MockStepContext(
input_data=gdf,
params={"unit": "hectare", "column_name": "area_ha"},
)
result = calculate_area(ctx)
assert "area_ha" in result.output.columns
assert result.stats["unit"] == "hectare"
def test_custom_column_name(self):
"""测试自定义列名"""
polygons = [box(0, 0, 10, 10)]
gdf = gpd.GeoDataFrame(geometry=polygons, crs="EPSG:3857")
ctx = MockStepContext(
input_data=gdf,
params={"unit": "sqm", "column_name": "custom_area"},
)
result = calculate_area(ctx)
assert "custom_area" in result.output.columns
def test_stats_output(self):
"""测试统计信息输出"""
polygons = [box(0, 0, 10, 10), box(0, 0, 20, 20)]
gdf = gpd.GeoDataFrame(geometry=polygons, crs="EPSG:3857")
ctx = MockStepContext(
input_data=gdf,
params={"unit": "sqm", "column_name": "area"},
)
result = calculate_area(ctx)
assert "min_area" in result.stats
assert "max_area" in result.stats
assert "mean_area" in result.stats
assert "total_area" in result.stats
assert result.stats["feature_count"] == 2
19.10.4 运行测试
# 运行所有步骤测试
pytest tests/steps/ -v
# 运行单个步骤测试
pytest tests/steps/vector/test_calculate_area.py -v
# 运行特定测试方法
pytest tests/steps/vector/test_calculate_area.py::TestCalculateArea::test_basic_area_calculation -v
19.11 最佳实践
19.11.1 命名规范
| 元素 | 规范 | 示例 |
|---|---|---|
| Step ID | category.action_name |
vector.calculate_area |
| 文件名 | action_name.py |
calculate_area.py |
| 函数名 | snake_case |
calculate_area |
| 类别名 | 小写单词 | vector, raster, qc, custom |
| 参数名 | snake_case |
column_name, target_crs |
19.11.2 参数设计原则
- 必需参数最小化:只将真正必需的参数设为
required: True - 合理的默认值:为可选参数提供实用的默认值
- 使用 enum 约束:对有限选项的参数使用
enum限制 - 清晰的描述:每个参数都应有明确的
description - 一致的命名:输入图层统一用
input,输出统一用output
19.11.3 错误处理原则
@step(...)
def my_step(ctx: StepContext) -> StepResult:
gdf = ctx.input("input")
# ✓ 在步骤开始时验证输入
if gdf is None or len(gdf) == 0:
raise ValueError("输入数据为空")
column = ctx.param("column")
if column not in gdf.columns:
available = ", ".join(gdf.columns.tolist())
raise ValueError(
f"列 '{column}' 不存在。可用列: {available}"
)
# ✓ 使用日志记录关键信息
ctx.logger.info(f"处理 {len(gdf)} 条要素")
# ✓ 不修改原始数据
result = gdf.copy()
# 处理逻辑...
return StepResult(output=result, stats={...})
19.11.4 统计信息设计
统计信息是步骤间通信的重要机制,尤其在 when 条件中使用。设计统计信息时应注意:
# ✓ 好的统计信息
stats = {
"feature_count": 1024, # 整数类型
"pass_rate": 0.95, # 浮点数 0-1 范围
"total_area": 12345.67, # 数值类型
"has_errors": False, # 布尔类型
"crs": "EPSG:4326", # 字符串类型
}
# ✗ 避免在统计中放入大数据
stats = {
"all_data": gdf.to_dict(), # 不要放完整数据!
}
19.11.5 不修改原始数据
步骤应遵循 不可变输入 原则:
# ✓ 正确:复制后修改
result = gdf.copy()
result["new_col"] = values
# ✗ 错误:直接修改输入
gdf["new_col"] = values # 会影响上游步骤的输出!
19.11.6 步骤开发检查清单
开发自定义步骤前,请确认以下检查项:
- Step ID 遵循
category.action格式 - 所有必需参数已标注
required: True - 可选参数有合理的
default值 description清晰且对 AI 友好- 输入数据被复制而非直接修改
- 错误信息包含上下文(列名、可用值等)
- 统计信息有用且类型正确
- 日志记录了关键操作
- 步骤文件放在正确的目录下
- 已编写单元测试
- 已通过
list-steps确认步骤注册成功
19.12 本章小结
本章全面介绍了如何开发自定义步骤来扩展 GeoPipeAgent 的分析能力,主要内容包括:
- 开发基础:
@step装饰器、StepContext和StepResult的使用方法 - 参数规范:
params字典的 type、required、default、description、enum 定义 - 输出规范:
outputs字典结构 - 矢量步骤示例:面积计算步骤的完整实现
- 栅格步骤示例:栅格重采样步骤的完整实现
- QC 步骤示例:编码规范检查步骤的完整实现
- 后端委托:
run_backend_op的使用场景和方法 - 自动发现:
pkgutil自动加载机制和目录结构规范 - 测试方法:MockStepContext 模式、pytest 测试编写
- 最佳实践:命名规范、参数设计、错误处理、统计信息设计
下一章我们将通过多个完整的实战案例和最佳实践,综合运用前面所学的所有知识。