znlgis 博客

GIS开发与技术分享

第十八章:自定义步骤与扩展开发

GeoPipeAgent 的步骤系统完全开放,通过 @step 装饰器可以轻松注册自定义步骤,扩展框架的 GIS 分析能力。本章介绍自定义步骤的开发、注册和使用方法。


18.1 步骤注册系统

@step 装饰器

所有步骤(内置和自定义)都通过 @step 装饰器注册到全局注册表(registry._steps):

from geopipe_agent.steps.registry import step
from geopipe_agent.engine.context import StepContext
from geopipe_agent.models.result import StepResult

@step(
    id="my_category.my_action",    # 步骤注册 ID(必填)
    name="我的自定义步骤",           # 显示名称
    description="步骤功能描述",      # 步骤描述
    category="my_category",         # 类别(影响 list-steps 分组)
    params={                         # 参数规范字典
        "input": {
            "type": "geodataframe",
            "required": True,
            "description": "输入矢量数据",
        },
        "my_param": {
            "type": "number",
            "required": False,
            "default": 100,
            "description": "自定义参数",
        },
    },
    outputs={                        # 输出规范
        "output": {"type": "geodataframe", "description": "处理结果"},
    },
    backends=["native_python"],      # 支持的后端(可省略)
    examples=[                       # 示例(用于 Skill 文档)
        {
            "description": "基础用法",
            "params": {"input": "$load.output", "my_param": 200},
        },
    ],
)
def my_custom_step(ctx: StepContext) -> StepResult:
    """步骤实现函数,接收 StepContext,返回 StepResult"""
    gdf = ctx.input("input")          # 获取 input 参数(GeoDataFrame)
    my_param = ctx.param("my_param", 100)  # 获取可选参数

    # 执行处理逻辑
    result_gdf = gdf.copy()
    result_gdf["my_field"] = my_param

    return StepResult(
        output=result_gdf,
        stats={"feature_count": len(result_gdf)},
    )

注册 ID 规范

步骤 ID 使用点号分隔的 类别.动作 格式:

格式 示例 说明
category.action vector.buffer 推荐格式
org.category.action acme.vector.polygon_merge 带组织前缀,避免冲突
custom.action custom.fix_topology 通用自定义步骤

18.2 StepContext 用法

步骤函数通过 StepContext 获取所有已解析(变量和引用已替换)的参数:

def my_step(ctx: StepContext) -> StepResult:
    # 获取参数(带默认值)
    gdf = ctx.param("input")           # 等价于 ctx.input("input")
    distance = ctx.param("distance")    # 必填参数,None 如不提供
    cap = ctx.param("cap_style", "round")  # 带默认值

    # 简写:ctx.input() 等价于 ctx.param("input")
    gdf = ctx.input("input")

    # 访问后端(仅当 backends 列表非空时)
    backend = ctx.backend              # GeoBackend 对象或 None

    # 访问完整参数字典
    all_params = ctx.params

18.3 完整示例:开发空间连接步骤

以下是一个完整的自定义步骤实现:空间连接(Spatial Join)。

# my_steps/spatial_join.py

from __future__ import annotations

from geopipe_agent.steps.registry import step
from geopipe_agent.engine.context import StepContext
from geopipe_agent.models.result import StepResult


@step(
    id="custom.spatial_join",
    name="空间连接",
    description="将右表的属性通过空间关系连接到左表",
    category="custom",
    params={
        "left": {
            "type": "geodataframe",
            "required": True,
            "description": "左表(要素保留数量与左表一致)",
        },
        "right": {
            "type": "geodataframe",
            "required": True,
            "description": "右表(属性来源)",
        },
        "how": {
            "type": "string",
            "required": False,
            "default": "left",
            "enum": ["left", "right", "inner"],
            "description": "连接方式",
        },
        "op": {
            "type": "string",
            "required": False,
            "default": "intersects",
            "enum": ["intersects", "contains", "within"],
            "description": "空间关系谓词",
        },
        "suffix_left": {
            "type": "string",
            "required": False,
            "default": "_left",
            "description": "左表重名字段后缀",
        },
        "suffix_right": {
            "type": "string",
            "required": False,
            "default": "_right",
            "description": "右表重名字段后缀",
        },
    },
    outputs={
        "output": {"type": "geodataframe", "description": "空间连接结果"},
    },
    examples=[
        {
            "description": "点-面空间连接(将行政区属性连接到 POI 点)",
            "params": {
                "left": "$load-poi.output",
                "right": "$load-districts.output",
                "how": "left",
                "op": "within",
            },
        },
    ],
)
def custom_spatial_join(ctx: StepContext) -> StepResult:
    import geopandas as gpd

    left_gdf = ctx.param("left")
    right_gdf = ctx.param("right")
    how = ctx.param("how", "left")
    op = ctx.param("op", "intersects")
    suffix_left = ctx.param("suffix_left", "_left")
    suffix_right = ctx.param("suffix_right", "_right")

    # 执行空间连接
    result_gdf = gpd.sjoin(
        left_gdf,
        right_gdf,
        how=how,
        predicate=op,
        lsuffix=suffix_left.lstrip("_"),
        rsuffix=suffix_right.lstrip("_"),
    )

    stats = {
        "feature_count": len(result_gdf),
        "left_count": len(left_gdf),
        "right_count": len(right_gdf),
        "join_method": how,
        "spatial_predicate": op,
    }

    return StepResult(output=result_gdf, stats=stats)

18.4 自动发现机制

GeoPipeAgent 使用 pkgutil.walk_packages 在包初始化时自动发现并加载内置步骤:

# src/geopipe_agent/__init__.py
import pkgutil
import importlib
import geopipe_agent.steps as steps_pkg

for finder, name, ispkg in pkgutil.walk_packages(
    steps_pkg.__path__, prefix=steps_pkg.__name__ + "."
):
    if not name.endswith("._") and not any(n.startswith("_") for n in name.split(".")):
        importlib.import_module(name)

import geopipe_agent 时,所有 steps/ 子目录下的模块都被自动导入,@step 装饰器在模块导入时即注册步骤。

如何让自定义步骤也被自动发现

方案一:直接 import(最简单)

在使用自定义步骤的 Python 脚本中,在 import geopipe_agent 之后导入自定义步骤模块:

import geopipe_agent  # 加载内置步骤

# 加载自定义步骤(触发 @step 注册)
import my_steps.spatial_join
import my_steps.polygon_merge

from geopipe_agent.engine.parser import parse_yaml
from geopipe_agent.engine.executor import execute_pipeline

pipeline = parse_yaml("pipeline.yaml")
report = execute_pipeline(pipeline)

方案二:创建插件包(可分发)

创建一个 Python 包,在其 __init__.py 中导入所有自定义步骤,通过 pip install 安装后自动注册:

my-geopipe-steps/
├── pyproject.toml
└── src/
    └── my_geopipe_steps/
        ├── __init__.py          # 导入所有步骤模块
        ├── spatial_join.py
        └── polygon_merge.py

__init__.py

# 导入所有步骤(触发 @step 注册)
from . import spatial_join
from . import polygon_merge

安装后,在 Python 脚本开头导入:

import geopipe_agent
import my_geopipe_steps  # 注册自定义步骤

方案三:修改内置步骤(不推荐)

直接将自定义步骤放入 src/geopipe_agent/steps/custom/ 目录,自动发现机制会加载它们。仅适合本地修改,不适合分发。


18.5 开发规范与最佳实践

5.1 错误处理

步骤内的异常会被 executor 捕获并包装为 StepExecutionError,不需要在步骤中手动包装。但可以提供清晰的错误消息:

def my_step(ctx: StepContext) -> StepResult:
    gdf = ctx.input("input")
    field = ctx.param("field")

    if field not in gdf.columns:
        raise ValueError(
            f"Field '{field}' not found in input data. "
            f"Available fields: {list(gdf.columns)}"
        )

    # ... 处理逻辑

5.2 延迟导入重依赖

将可选依赖放在函数内部导入,避免在步骤注册时(模块 import 时)就引入依赖报错:

@step(id="custom.ml_classify", ...)
def ml_classify(ctx: StepContext) -> StepResult:
    # 延迟导入,只在步骤实际执行时才尝试导入
    try:
        from sklearn.ensemble import RandomForestClassifier
    except ImportError:
        raise ImportError(
            "scikit-learn is required for 'custom.ml_classify'. "
            "Install it with: pip install scikit-learn"
        )

    # ... 使用 sklearn

5.3 stats 字段命名

stats 字典的键会通过 $step.key 语法暴露给后续步骤,使用清晰的名称:

return StepResult(
    output=result_gdf,
    stats={
        "feature_count": len(result_gdf),   # 标准命名,与内置步骤一致
        "join_count": len(joined_gdf),       # 自定义统计
        "skip_count": len(skipped),          # 跳过数量
    }
)

5.4 QC 步骤的特殊模式

自定义 QC 步骤应遵循”检查并透传”模式:

from geopipe_agent.models.qc import QcIssue
from geopipe_agent.steps.qc._helpers import build_issues_gdf

@step(id="custom.qc_area_check", ...)
def custom_area_check(ctx: StepContext) -> StepResult:
    gdf = ctx.input("input")
    min_area = ctx.param("min_area", 1.0)

    issues = []
    for idx, row in gdf.iterrows():
        area = row.geometry.area
        if area < min_area:
            issues.append(QcIssue(
                rule_id="area_check",
                severity="warning",
                feature_index=idx,
                message=f"Feature {idx}: area {area:.4f} < min {min_area}",
                details={"area": area, "min_area": min_area},
            ))

    return StepResult(
        output=gdf,                          # 透传输入数据
        stats={
            "issues_count": len(issues),
            "valid_count": len(gdf) - len(issues),
        },
        metadata={"issues_gdf": build_issues_gdf(gdf, issues)},
        issues=issues,
    )

18.6 在 YAML 中使用自定义步骤

自定义步骤注册后,可直接在 YAML 中使用:

pipeline:
  name: "使用自定义步骤的流水线"

  steps:
    - id: load-poi
      use: io.read_vector
      params: { path: "data/poi.shp" }

    - id: load-districts
      use: io.read_vector
      params: { path: "data/districts.shp" }

    # 使用自定义空间连接步骤
    - id: join-district-attrs
      use: custom.spatial_join
      params:
        left: "$load-poi"
        right: "$load-districts"
        how: "left"
        op: "within"

    - id: save-result
      use: io.write_vector
      params:
        input: "$join-district-attrs"
        path: "output/poi_with_district.geojson"

18.7 本章小结

本章介绍了自定义步骤的开发与集成:

  1. @step 装饰器:注册步骤,定义参数规范、输出、后端和示例
  2. StepContextctx.param()ctx.input()ctx.backend 获取参数和后端
  3. StepResultoutputstatsmetadataissues 四个字段
  4. 自动发现import geopipe_agent 自动加载内置步骤,自定义步骤需手动 import 或打包
  5. 最佳实践:清晰错误消息、延迟导入依赖、QC 步骤透传数据

导航← 第十七章:Skill 文件与 AI 集成第十九章:Cookbook 示例精讲 →