znlgis 博客

GIS开发与技术分享

第十九章:自定义步骤与扩展开发

19.1 概述

GeoPipeAgent 的步骤体系是完全可扩展的。通过 @step 装饰器,任何 Python 函数都可以成为框架中的一个可用步骤,享受变量引用解析、条件执行、重试、后端选择等所有框架特性。

扩展方式有三种:

  1. 添加自定义步骤:实现特定 GIS 分析逻辑的步骤函数
  2. 扩展现有步骤:为现有步骤添加新后端支持
  3. 添加自定义后端:实现新的 GIS 工具链接入

19.2 创建自定义步骤

19.2.1 最简单的自定义步骤

# src/geopipe_agent/steps/custom/my_steps.py

from geopipe_agent.steps.registry import step, StepContext
from geopipe_agent.models.result import StepResult
from geopipe_agent.models.qc import QcIssue

@step(
    id="custom.centroid",
    name="计算要素中心点",
    category="custom",
    description="将多边形/线要素转换为其质心点",
    params={
        "input": {
            "required": True,
            "type": "geodataframe",
            "description": "输入 GeoDataFrame(Polygon 或 LineString)"
        },
        "geometry_only": {
            "required": False,
            "type": "boolean",
            "description": "是否仅保留几何,丢弃属性(默认 False)",
            "default": False
        }
    },
    backends=["native_python"],
)
def centroid_step(ctx: StepContext) -> StepResult:
    """将要素转换为质心点。"""
    gdf = ctx.input()
    geometry_only = ctx.param("geometry_only", False)
    
    # 核心逻辑:计算质心
    result = gdf.copy()
    result["geometry"] = gdf.centroid
    
    if geometry_only:
        result = result[["geometry"]]
    
    return StepResult(
        output=result,
        stats={
            "feature_count": len(result),
            "crs": str(result.crs),
            "geometry_type": "Point"
        }
    )

将文件放入步骤目录:

src/geopipe_agent/steps/custom/
├── __init__.py
└── my_steps.py

确保 __init__.py 存在(可以为空),框架启动时会自动发现并注册 custom.centroid 步骤。

验证注册

geopipe-agent list-steps --category custom
# 应显示:custom.centroid  计算要素中心点  custom  native_python

19.2.2 带质检功能的步骤

@step(
    id="custom.check_area",
    name="面积范围检查",
    category="custom",
    description="检查面要素的面积是否在指定范围内",
    params={
        "input": {"required": True, "type": "geodataframe", "description": "输入面要素"},
        "min_area": {"required": False, "type": "number", "description": "最小面积(CRS 单位²)"},
        "max_area": {"required": False, "type": "number", "description": "最大面积(CRS 单位²)"},
        "area_field": {"required": False, "type": "string", "description": "计算面积的字段名,默认几何面积"}
    },
    backends=["native_python"],
)
def check_area_step(ctx: StepContext) -> StepResult:
    """检查面要素面积是否在合理范围内。"""
    import geopandas as gpd
    
    gdf = ctx.input()
    min_area = ctx.param("min_area", None)
    max_area = ctx.param("max_area", None)
    area_field = ctx.param("area_field", None)
    
    # 计算面积
    if area_field and area_field in gdf.columns:
        areas = gdf[area_field]
    else:
        areas = gdf.geometry.area
    
    issues = []
    
    for idx, area in areas.items():
        if min_area is not None and area < min_area:
            issues.append(QcIssue(
                type="area_too_small",
                message=f"要素 {idx} 面积 {area:.2f} 小于最小值 {min_area}",
                fid=idx,
                severity="error"
            ))
        if max_area is not None and area > max_area:
            issues.append(QcIssue(
                type="area_too_large",
                message=f"要素 {idx} 面积 {area:.2f} 大于最大值 {max_area}",
                fid=idx,
                severity="error"
            ))
    
    return StepResult(
        output=gdf,
        stats={
            "checked_count": len(gdf),
            "issue_count": len(issues),
            "min_area": float(areas.min()),
            "max_area": float(areas.max()),
            "mean_area": float(areas.mean()),
        },
        issues=issues
    )

19.2.3 支持多种数据类型的步骤

@step(
    id="custom.add_uuid",
    name="添加 UUID 字段",
    category="custom",
    description="为每个要素添加唯一 UUID 标识字段",
    params={
        "input": {"required": True, "type": "geodataframe", "description": "输入 GeoDataFrame"},
        "field_name": {"required": False, "type": "string", "description": "UUID 字段名,默认 'uuid'"},
        "overwrite": {"required": False, "type": "boolean", "description": "字段已存在时是否覆盖"}
    },
)
def add_uuid_step(ctx: StepContext) -> StepResult:
    """为每个要素添加唯一 UUID 标识。"""
    import uuid
    
    gdf = ctx.input()
    field_name = ctx.param("field_name", "uuid")
    overwrite = ctx.param("overwrite", False)
    
    if field_name in gdf.columns and not overwrite:
        # 字段已存在且不覆盖:直接返回
        return StepResult(
            output=gdf,
            stats={"feature_count": len(gdf), "action": "skipped (field exists)"}
        )
    
    result = gdf.copy()
    result[field_name] = [str(uuid.uuid4()) for _ in range(len(gdf))]
    
    return StepResult(
        output=result,
        stats={"feature_count": len(result), "field_added": field_name}
    )

19.3 @step 装饰器参数完全参考

@step(
    id="category.action_name",     # 必填,格式:category.action
    name="步骤显示名称",             # 必填,UI 和文档中显示的名称
    category="category",           # 必填,步骤类别
    description="步骤描述文字",      # 必填,详细描述
    params={                       # 可选,参数规格字典
        "param_name": {
            "required": True/False,           # 是否必填
            "type": "geodataframe/string/number/boolean/array/dict",  # 参数类型
            "description": "参数描述",         # 参数说明
            "default": default_value,         # 默认值(required=False 时建议设置)
            "enum": ["val1", "val2"],         # 可选:枚举允许值
        }
    },
    backends=["native_python"],    # 可选,支持的后端列表
)
def my_step_function(ctx: StepContext) -> StepResult:
    ...

参数类型标识

类型标识 对应 Python 类型 说明
geodataframe geopandas.GeoDataFrame 矢量数据
raster dict(含 data 和 meta) 栅格数据
string str 字符串
number int / float 数值
boolean bool 布尔值
array list 列表
dict dict 字典

19.4 StepContext API 完全参考

在步骤函数中,通过 ctx 参数访问以下方法和属性:

def my_step(ctx: StepContext) -> StepResult:
    # 获取参数(推荐)
    gdf = ctx.input()                          # 等价于 ctx.param("input")
    distance = ctx.param("distance")           # 获取必填参数
    style = ctx.param("cap_style", "round")   # 获取可选参数(带默认值)
    
    # 获取所有参数字典
    all_params = ctx.params                    # dict,已解析所有引用
    
    # 获取后端(如步骤需要后端支持)
    backend = ctx.backend                      # GeopipeBackend 实例或 None
    
    # 访问全局流水线上下文(高级用法)
    pipeline_ctx = ctx.pipeline_context        # PipelineContext 实例
    variables = pipeline_ctx.variables         # 全局变量字典
    
    return StepResult(...)

19.5 StepResult 构建最佳实践

from geopipe_agent.models.result import StepResult
from geopipe_agent.models.qc import QcIssue

# 基本使用
return StepResult(
    output=processed_gdf,
    stats={
        "feature_count": len(processed_gdf),
        "crs": str(processed_gdf.crs),
        # 可添加任何自定义统计信息
        "min_area": float(processed_gdf.geometry.area.min()),
        "processing_note": "已过滤 5 个无效几何"
    }
)

# 含质检结果
return StepResult(
    output=gdf,
    stats={"checked_count": len(gdf)},
    issues=[
        QcIssue(type="gap", message="要素 42 与要素 43 之间存在缝隙", fid=42),
        QcIssue(type="overlap", message="要素 10 与要素 15 存在重叠", fid=10),
    ]
)

# 纯统计(不修改数据)
return StepResult(
    output=input_gdf,        # 原样返回输入
    stats={
        "feature_count": len(input_gdf),
        "total_length": float(input_gdf.geometry.length.sum()),
        "total_area": float(input_gdf.geometry.area.sum()),
    }
)

19.6 完整自定义步骤示例:建筑物面积分级

以下是一个完整的自定义步骤示例,展示了实际 GIS 分析逻辑:

# src/geopipe_agent/steps/custom/building_steps.py

import geopandas as gpd
import pandas as pd
from geopipe_agent.steps.registry import step, StepContext
from geopipe_agent.models.result import StepResult

@step(
    id="custom.building_classify",
    name="建筑物面积分级",
    category="custom",
    description="根据建筑物占地面积对建筑物进行分级分类",
    params={
        "input": {
            "required": True,
            "type": "geodataframe",
            "description": "输入建筑物多边形 GeoDataFrame"
        },
        "breaks": {
            "required": False,
            "type": "array",
            "description": "面积分级断点(平方米),如 [100, 500, 2000, 10000]",
            "default": [100, 500, 2000, 10000]
        },
        "labels": {
            "required": False,
            "type": "array",
            "description": "各级别标签,数量应为 breaks 长度加 1",
            "default": ["微型", "小型", "中型", "大型", "超大型"]
        },
        "output_field": {
            "required": False,
            "type": "string",
            "description": "分级结果字段名",
            "default": "size_class"
        }
    },
    backends=["native_python"],
)
def building_classify_step(ctx: StepContext) -> StepResult:
    """按建筑物占地面积进行分级。"""
    gdf = ctx.input()
    breaks = ctx.param("breaks", [100, 500, 2000, 10000])
    labels = ctx.param("labels", ["微型", "小型", "中型", "大型", "超大型"])
    output_field = ctx.param("output_field", "size_class")
    
    # 参数验证
    if len(labels) != len(breaks) + 1:
        raise ValueError(
            f"labels 数量 ({len(labels)}) 应为 breaks 数量 ({len(breaks)}) 加 1"
        )
    
    # 计算面积(使用当前 CRS 的面积单位)
    result = gdf.copy()
    areas = gdf.geometry.area
    
    # 使用 pd.cut 进行分级
    bins = [float('-inf')] + list(breaks) + [float('inf')]
    result[output_field] = pd.cut(
        areas,
        bins=bins,
        labels=labels,
        right=True
    ).astype(str)
    
    # 统计各级别数量
    class_counts = result[output_field].value_counts().to_dict()
    
    return StepResult(
        output=result,
        stats={
            "feature_count": len(result),
            "class_distribution": class_counts,
            "min_area": float(areas.min()),
            "max_area": float(areas.max()),
            "mean_area": float(areas.mean()),
            "output_field": output_field
        }
    )

在 YAML 中使用自定义步骤

name: 建筑物面积分析
steps:
  - id: load
    use: io.read_vector
    params:
      path: "data/buildings.shp"

  - id: reproject
    use: vector.reproject
    params:
      input: $load
      target_crs: "EPSG:3857"

  - id: classify
    use: custom.building_classify    # 使用自定义步骤
    params:
      input: $reproject
      breaks: [50, 200, 1000, 5000]
      labels: ["小房", "普通", "大楼", "商业楼", "超高层"]
      output_field: "building_size"

  - id: save
    use: io.write_vector
    params:
      input: $classify
      path: "output/buildings_classified.gpkg"

19.7 自定义后端

如果需要接入新的 GIS 工具(如 PostGIS、ArcGIS Pro Python API 等),可以创建自定义后端:

# src/geopipe_agent/backends/postgis_backend.py

from geopipe_agent.backends.base import GeopipeBackend

class PostgisBackend(GeopipeBackend):
    def name(self) -> str:
        return "postgis"
    
    def is_available(self) -> bool:
        try:
            import psycopg2
            return True
        except ImportError:
            return False
    
    def execute_sql(self, connection_string: str, sql: str):
        """执行 PostGIS SQL 查询。"""
        import psycopg2
        import geopandas as gpd
        from sqlalchemy import create_engine
        
        engine = create_engine(connection_string)
        return gpd.read_postgis(sql, engine)

注册自定义后端:

# 在 backends/__init__.py 中添加
from geopipe_agent.backends.postgis_backend import PostgisBackend
_BACKEND_CLASSES.append(PostgisBackend)

19.8 开发规范

步骤函数命名

  • 文件名:{category}_steps.py(如 custom_steps.py
  • 函数名:{action}_step(如 centroid_step
  • 步骤 ID:{category}.{action}(如 custom.centroid

错误处理

步骤函数中应不捕获异常,让框架统一处理:

# ✅ 正确:让异常向上传播,框架根据 on_error 策略处理
def my_step(ctx: StepContext) -> StepResult:
    gdf = ctx.input()
    result = gdf.buffer(ctx.param("distance"))  # 出错会自动传播
    return StepResult(output=result)

# ❌ 错误:过度捕获异常,导致框架无法正确处理
def my_step(ctx: StepContext) -> StepResult:
    try:
        gdf = ctx.input()
        result = gdf.buffer(ctx.param("distance"))
        return StepResult(output=result)
    except Exception as e:
        print(f"Error: {e}")  # 吃掉异常,框架看不到错误
        return StepResult()   # 返回空结果,后续步骤出错更难排查

参数默认值

通过 ctx.param("name", default_value) 提供默认值,而不是在 @step 装饰器的 params 中定义逻辑:

# ✅ 推荐
def my_step(ctx: StepContext) -> StepResult:
    cap_style = ctx.param("cap_style", "round")  # 代码级默认值

输出统计

始终在 stats 中提供 feature_count(矢量步骤),便于报告解析和 when 表达式引用:

return StepResult(
    output=result,
    stats={
        "feature_count": len(result),    # 必须包含
        "crs": str(result.crs),          # 推荐包含
        # 其他自定义统计...
    }
)

19.9 运行测试

对自定义步骤编写单元测试:

# tests/test_custom_steps.py
import pytest
import geopandas as gpd
from shapely.geometry import Polygon

from geopipe_agent.steps.custom.building_steps import building_classify_step
from geopipe_agent.engine.context import StepContext, PipelineContext

def create_test_gdf():
    """创建测试用 GeoDataFrame。"""
    geoms = [Polygon([(0,0), (1,0), (1,1), (0,1)]) for _ in range(5)]
    return gpd.GeoDataFrame({"name": [f"bldg_{i}" for i in range(5)]}, 
                            geometry=geoms, crs="EPSG:3857")

def make_ctx(params: dict) -> StepContext:
    """创建测试用 StepContext。"""
    ctx = StepContext(params=params, pipeline_context=PipelineContext())
    return ctx

def test_building_classify_basic():
    gdf = create_test_gdf()
    ctx = make_ctx({"input": gdf, "breaks": [0.5, 1.5], "labels": ["小", "中", "大"]})
    
    result = building_classify_step(ctx)
    
    assert result.output is not None
    assert "size_class" in result.output.columns
    assert result.stats["feature_count"] == 5

def test_building_classify_invalid_labels():
    gdf = create_test_gdf()
    ctx = make_ctx({"input": gdf, "breaks": [0.5, 1.5], "labels": ["小", "大"]})  # labels 数量错误
    
    with pytest.raises(ValueError, match="labels 数量"):
        building_classify_step(ctx)

运行测试:

pip install -e ".[dev]"
pytest tests/test_custom_steps.py -v

19.10 小结

本章介绍了 GeoPipeAgent 的自定义步骤与扩展开发:

  • @step 装饰器:将普通 Python 函数注册为框架步骤,自动享受所有框架特性
  • 自动发现:步骤文件放到正确目录即自动注册,无需手动修改注册表
  • StepContext:通过 ctx.param()/ctx.input() 获取已解析的参数
  • StepResult:包含 output(数据)+ stats(统计)+ issues(质检问题)
  • 最佳实践:不捕获异常(让框架处理)、提供 feature_count(便于引用)
  • 测试:通过 StepContext 构造函数直接单元测试步骤函数

下一章将介绍 Docker 部署、运行测试和最佳实践。