znlgis 博客

GIS开发与技术分享

第19章:自定义步骤开发

本章详细指导如何开发自定义步骤来扩展 GeoPipeAgent 的分析能力,包括 @step 装饰器、参数和输出定义规范、矢量/栅格/QC 步骤的完整开发示例、自动发现机制以及测试和最佳实践。


19.1 自定义步骤概述

19.1.1 为什么需要自定义步骤

GeoPipeAgent 内置了 33 个步骤,覆盖了大部分常见的 GIS 分析需求。但在实际项目中,可能需要:

  • 特定业务逻辑:如城市规划中的容积率计算、交通分析中的可达性评估
  • 专用质检规则:如特定行业标准的数据质检项
  • 第三方库集成:如集成 scikit-learn 的空间聚类、NetworkX 的网络分析
  • 自定义数据源:如读取特定格式的遥感数据或行业数据库

19.1.2 扩展方式

GeoPipeAgent 采用 装饰器 + 自动发现 的扩展方式:

  1. 使用 @step 装饰器定义步骤元数据
  2. 将步骤文件放在指定目录
  3. 框架通过 pkgutil 自动发现并注册

这种方式的优势是 零配置——开发者只需编写步骤代码并放到正确的位置,无需修改任何注册文件。

19.1.3 开发流程概览

1. 确定步骤的类别和功能
   │
2. 创建步骤文件
   │  src/geopipe_agent/steps/<category>/<step_name>.py
   │
3. 使用 @step 装饰器定义元数据
   │  id, name, description, category, params, outputs, backends
   │
4. 实现步骤函数
   │  接收 StepContext → 处理数据 → 返回 StepResult
   │
5. 编写测试
   │  tests/steps/<category>/test_<step_name>.py
   │
6. 验证集成
   │  geopipe-agent list-steps  # 确认步骤已注册
   │  geopipe-agent describe <step_id>  # 查看步骤详情

19.2 步骤开发基础

19.2.1 @step 装饰器

@step 装饰器是定义步骤的核心工具,它将一个普通函数注册为 GeoPipeAgent 步骤:

from geopipe_agent import step, StepContext, StepResult

@step(
    id="custom.my_step",        # 步骤唯一标识
    name="My Custom Step",      # 显示名称
    description="步骤的详细描述", # 描述信息
    category="custom",          # 类别
    params={...},               # 参数定义
    outputs={...},              # 输出定义
    backends=["native_python"], # 支持的后端
)
def my_custom_step(ctx: StepContext) -> StepResult:
    # 步骤实现
    ...

19.2.2 @step 装饰器参数说明

参数 类型 必需 说明
id str 步骤唯一标识,格式:category.action
name str 步骤的人类可读名称
description str 步骤的详细描述
category str 步骤类别(如 vector, raster, qc, custom)
params dict 参数定义字典
outputs dict 输出定义字典
backends list 支持的后端列表

19.2.3 StepContext 对象

StepContext 封装了步骤执行时的上下文环境:

class StepContext:
    """步骤执行上下文"""

    def input(self, name: str):
        """获取输入数据(通过引用 $step_id 传入的数据)"""
        ...

    def param(self, name: str, default=None):
        """获取参数值"""
        ...

    def get_variable(self, name: str):
        """获取流水线变量值"""
        ...

    @property
    def logger(self):
        """获取日志记录器"""
        ...

主要方法:

方法 说明 示例
ctx.input("input") 获取名为 input 的输入数据 gdf = ctx.input("input")
ctx.param("distance") 获取参数值 dist = ctx.param("distance")
ctx.param("cap_style", "round") 获取参数值(带默认值) 参数未提供时返回默认值
ctx.logger 获取日志器 ctx.logger.info("处理中...")

19.2.4 StepResult 对象

StepResult 封装步骤的执行结果:

class StepResult:
    """步骤执行结果"""

    def __init__(self, output=None, stats=None):
        """
        Args:
            output: 主输出数据(GeoDataFrame、栅格数据或文件路径等)
            stats: 统计信息字典,可在后续步骤的 when 条件中引用
        """
        self.output = output
        self.stats = stats or {}
属性 类型 说明
output Any 主输出数据,可被后续步骤通过 $step_id 引用
stats dict 统计信息,可通过 $step_id.stats.key 在 when 条件中使用

19.3 参数定义规范

19.3.1 params 字典结构

每个参数在 params 字典中定义为一个子字典:

params={
    "input": {
        "type": "layer",
        "required": True,
        "description": "输入矢量图层",
    },
    "distance": {
        "type": "number",
        "required": True,
        "description": "缓冲距离(地图单位)",
    },
    "resolution": {
        "type": "integer",
        "required": False,
        "default": 16,
        "description": "缓冲区圆弧分辨率",
    },
    "cap_style": {
        "type": "string",
        "required": False,
        "default": "round",
        "description": "端点样式",
        "enum": ["round", "flat", "square"],
    },
}

19.3.2 参数属性说明

属性 类型 必需 说明
type str 参数类型
required bool 是否必需
description str 参数描述
default Any 可选参数必需 默认值
enum list 允许的值列表

19.3.3 支持的参数类型

类型 说明 Python 类型 示例值
layer 矢量或栅格图层(通过 $引用传入) GeoDataFrame / DatasetReader $load_data
string 字符串 str "EPSG:4326"
number 数值(整数或浮点) int / float 500, 0.95
integer 整数 int 16
boolean 布尔值 bool true, false
array 数组/列表 list [116.0, 39.0, 117.0, 40.0]
object 字典/对象 dict {"field1": "name"}

19.3.4 enum 枚举约束

enum 属性限制参数只能取预定义的值:

"join_style": {
    "type": "string",
    "required": False,
    "default": "round",
    "description": "连接样式",
    "enum": ["round", "mitre", "bevel"],
}

在校验阶段,框架会自动检查参数值是否在 enum 列表中。


19.4 输出定义规范

19.4.1 outputs 字典结构

outputs={
    "output": {
        "type": "layer",
        "description": "处理后的矢量图层",
    },
}

19.4.2 输出属性说明

属性 类型 必需 说明
type str 输出类型(layer, string, object 等)
description str 输出描述

19.4.3 多输出步骤

大多数步骤只有一个 output,但某些步骤可以有多个输出:

outputs={
    "output": {
        "type": "layer",
        "description": "主输出图层",
    },
    "report": {
        "type": "object",
        "description": "质检报告",
    },
}

19.5 开发一个矢量步骤:面积计算

19.5.1 需求描述

开发一个步骤,为每个多边形要素计算面积并添加为新列:

  • 输入:多边形矢量图层
  • 参数:面积单位(平方米、平方千米、公顷)、输出列名
  • 输出:添加了面积列的矢量图层

19.5.2 完整实现

# src/geopipe_agent/steps/vector/calculate_area.py

from geopipe_agent import step, StepContext, StepResult

@step(
    id="vector.calculate_area",
    name="Calculate Area",
    description="计算多边形要素的面积并添加为新列",
    category="vector",
    params={
        "input": {
            "type": "layer",
            "required": True,
            "description": "输入多边形矢量图层",
        },
        "unit": {
            "type": "string",
            "required": False,
            "default": "sqm",
            "description": "面积单位",
            "enum": ["sqm", "sqkm", "hectare"],
        },
        "column_name": {
            "type": "string",
            "required": False,
            "default": "area",
            "description": "面积列的列名",
        },
    },
    outputs={
        "output": {
            "type": "layer",
            "description": "添加了面积列的矢量图层",
        },
    },
    backends=["native_python"],
)
def calculate_area(ctx: StepContext) -> StepResult:
    """计算多边形面积"""
    # 获取输入和参数
    gdf = ctx.input("input")
    unit = ctx.param("unit", "sqm")
    column_name = ctx.param("column_name", "area")

    ctx.logger.info(f"计算面积,单位: {unit},输出列: {column_name}")

    # 复制以避免修改原始数据
    result = gdf.copy()

    # 计算面积(以地图单位的平方为基础)
    area = result.geometry.area

    # 单位转换
    UNIT_FACTORS = {
        "sqm": 1.0,           # 平方米
        "sqkm": 1e-6,         # 平方千米
        "hectare": 1e-4,      # 公顷
    }
    factor = UNIT_FACTORS.get(unit, 1.0)
    result[column_name] = area * factor

    # 统计信息
    stats = {
        "feature_count": len(result),
        "min_area": float(result[column_name].min()),
        "max_area": float(result[column_name].max()),
        "mean_area": float(result[column_name].mean()),
        "total_area": float(result[column_name].sum()),
        "unit": unit,
    }

    ctx.logger.info(
        f"面积计算完成: {stats['feature_count']} 个要素, "
        f"总面积: {stats['total_area']:.2f} {unit}"
    )

    return StepResult(output=result, stats=stats)

19.5.3 在流水线中使用

name: 面积计算示例
steps:
  - id: load_parcels
    use: io.read_vector
    params:
      file: data/parcels.gpkg

  - id: reproject
    use: vector.reproject
    params:
      input: $load_parcels
      target_crs: "EPSG:3857"    # 投影坐标系,面积计算更准确

  - id: calc_area
    use: vector.calculate_area
    params:
      input: $reproject
      unit: hectare
      column_name: area_ha

  - id: save
    use: io.write_vector
    params:
      input: $calc_area
      file: output/parcels_with_area.gpkg

19.6 开发一个栅格步骤:栅格重采样

19.6.1 需求描述

开发一个步骤,对栅格数据进行空间分辨率重采样:

  • 输入:栅格数据
  • 参数:目标分辨率、重采样方法
  • 输出:重采样后的栅格数据

19.6.2 完整实现

# src/geopipe_agent/steps/raster/resample.py

import numpy as np
from rasterio.enums import Resampling
from rasterio.transform import from_bounds

from geopipe_agent import step, StepContext, StepResult

@step(
    id="raster.resample",
    name="Raster Resample",
    description="对栅格数据进行空间分辨率重采样",
    category="raster",
    params={
        "input": {
            "type": "layer",
            "required": True,
            "description": "输入栅格数据",
        },
        "resolution": {
            "type": "number",
            "required": True,
            "description": "目标分辨率(地图单位)",
        },
        "method": {
            "type": "string",
            "required": False,
            "default": "bilinear",
            "description": "重采样方法",
            "enum": ["nearest", "bilinear", "cubic", "average", "mode"],
        },
    },
    outputs={
        "output": {
            "type": "layer",
            "description": "重采样后的栅格数据",
        },
    },
    backends=["native_python"],
)
def resample(ctx: StepContext) -> StepResult:
    """栅格重采样"""
    dataset = ctx.input("input")
    target_res = ctx.param("resolution")
    method_name = ctx.param("method", "bilinear")

    ctx.logger.info(
        f"栅格重采样: {dataset.res} -> ({target_res}, {target_res}), "
        f"方法: {method_name}"
    )

    # 选择重采样方法
    METHODS = {
        "nearest": Resampling.nearest,
        "bilinear": Resampling.bilinear,
        "cubic": Resampling.cubic,
        "average": Resampling.average,
        "mode": Resampling.mode,
    }
    resampling = METHODS.get(method_name, Resampling.bilinear)

    # 计算新的尺寸
    bounds = dataset.bounds
    new_width = int((bounds.right - bounds.left) / target_res)
    new_height = int((bounds.top - bounds.bottom) / target_res)

    # 计算新的变换矩阵
    new_transform = from_bounds(
        bounds.left, bounds.bottom, bounds.right, bounds.top,
        new_width, new_height
    )

    # 读取并重采样数据
    data = dataset.read(
        out_shape=(dataset.count, new_height, new_width),
        resampling=resampling,
    )

    # 构建结果元数据
    result_meta = dataset.meta.copy()
    result_meta.update({
        "width": new_width,
        "height": new_height,
        "transform": new_transform,
    })

    stats = {
        "original_resolution": list(dataset.res),
        "target_resolution": [target_res, target_res],
        "original_size": [dataset.width, dataset.height],
        "new_size": [new_width, new_height],
        "method": method_name,
        "bands": dataset.count,
    }

    ctx.logger.info(
        f"重采样完成: {dataset.width}x{dataset.height} -> "
        f"{new_width}x{new_height}"
    )

    return StepResult(
        output={"data": data, "meta": result_meta},
        stats=stats,
    )

19.6.3 在流水线中使用

name: 栅格重采样示例
steps:
  - id: load_dem
    use: io.read_raster
    params:
      file: data/dem_10m.tif

  - id: resample
    use: raster.resample
    params:
      input: $load_dem
      resolution: 30
      method: bilinear

  - id: save
    use: io.write_raster
    params:
      input: $resample
      file: output/dem_30m.tif

19.7 开发一个 QC 步骤:属性编码规范检查

19.7.1 需求描述

开发一个质检步骤,检查矢量数据中特定列的值是否符合预定义的编码规范(如行政区划代码、土地分类代码等)。

19.7.2 QC 步骤的特殊模式

QC 步骤与普通步骤不同,它使用 QcIssue 数据类来记录质检问题,并通过 _helpers 模块提供的辅助函数简化开发:

from dataclasses import dataclass

@dataclass
class QcIssue:
    """质检问题记录"""
    feature_id: int       # 要素 ID
    issue_type: str       # 问题类型
    severity: str         # 严重程度: error, warning, info
    message: str          # 问题描述
    field: str = None     # 相关字段名
    value: str = None     # 问题值

19.7.3 完整实现

# src/geopipe_agent/steps/qc/code_standard.py

from geopipe_agent import step, StepContext, StepResult

@step(
    id="qc.code_standard",
    name="Code Standard Check",
    description="检查属性值是否符合预定义的编码规范",
    category="qc",
    params={
        "input": {
            "type": "layer",
            "required": True,
            "description": "输入矢量图层",
        },
        "column": {
            "type": "string",
            "required": True,
            "description": "要检查的列名",
        },
        "valid_codes": {
            "type": "array",
            "required": True,
            "description": "合法的编码值列表",
        },
        "severity": {
            "type": "string",
            "required": False,
            "default": "error",
            "description": "问题严重程度",
            "enum": ["error", "warning", "info"],
        },
    },
    outputs={
        "output": {
            "type": "layer",
            "description": "原始输入图层(未修改)",
        },
    },
    backends=["native_python"],
)
def code_standard_check(ctx: StepContext) -> StepResult:
    """编码规范检查"""
    gdf = ctx.input("input")
    column = ctx.param("column")
    valid_codes = ctx.param("valid_codes")
    severity = ctx.param("severity", "error")

    ctx.logger.info(f"编码规范检查: 列={column}, 合法编码数={len(valid_codes)}")

    # 检查列是否存在
    if column not in gdf.columns:
        raise ValueError(f"列 '{column}' 不存在于数据中,可用列: {list(gdf.columns)}")

    # 找出不合法的编码
    valid_set = set(valid_codes)
    issues = []
    invalid_count = 0
    null_count = 0

    for idx, row in gdf.iterrows():
        value = row[column]

        # 检查空值
        if value is None or (isinstance(value, float) and str(value) == "nan"):
            null_count += 1
            issues.append({
                "feature_id": int(idx),
                "issue_type": "null_code",
                "severity": severity,
                "message": f"编码为空值",
                "field": column,
                "value": None,
            })
            continue

        # 检查值是否合法
        if str(value) not in valid_set:
            invalid_count += 1
            issues.append({
                "feature_id": int(idx),
                "issue_type": "invalid_code",
                "severity": severity,
                "message": f"编码 '{value}' 不在合法范围内",
                "field": column,
                "value": str(value),
            })

    total = len(gdf)
    pass_count = total - invalid_count - null_count
    pass_rate = pass_count / total if total > 0 else 1.0

    stats = {
        "total_features": total,
        "pass_count": pass_count,
        "invalid_count": invalid_count,
        "null_count": null_count,
        "pass_rate": round(pass_rate, 4),
        "issues": issues,
        "column": column,
    }

    ctx.logger.info(
        f"检查完成: 通过率 {pass_rate:.1%} "
        f"({pass_count}/{total}), "
        f"无效编码: {invalid_count}, 空值: {null_count}"
    )

    return StepResult(output=gdf, stats=stats)

19.7.4 在流水线中使用

name: 土地分类编码质检
vars:
  input_file: data/land_use.gpkg

steps:
  - id: load
    use: io.read_vector
    params:
      file: ${var.input_file}

  - id: check_code
    use: qc.code_standard
    params:
      input: $load
      column: land_type
      valid_codes: ["011", "012", "021", "022", "031", "032", "041", "042"]
      severity: error

  - id: save_if_passed
    use: io.write_vector
    when: "$check_code.stats.pass_rate >= 0.95"
    params:
      input: $load
      file: output/land_use_checked.gpkg

19.8 利用后端委托

19.8.1 run_backend_op 复用

对于需要支持多个后端的步骤,可以使用 run_backend_op 函数将具体操作委托给后端:

from geopipe_agent.backends import run_backend_op

@step(
    id="custom.multi_backend_step",
    name="Multi Backend Step",
    description="支持多后端的步骤",
    category="custom",
    params={...},
    outputs={...},
    backends=["native_python", "geopandas", "qgis"],
)
def multi_backend_step(ctx: StepContext) -> StepResult:
    gdf = ctx.input("input")

    # 委托给后端执行
    result = run_backend_op(
        operation="buffer",
        backend=ctx.backend,       # 引擎选择的后端
        input_data=gdf,
        distance=ctx.param("distance"),
    )

    return StepResult(output=result)

19.8.2 后端选择逻辑

当步骤支持多个后端时,引擎按以下优先级选择:

  1. 用户在 YAML 中显式指定的 backend
  2. 步骤 backends 列表中第一个可用的后端
  3. 回退到 native_python

19.8.3 何时使用后端委托

场景 建议
简单的数据处理逻辑 直接在步骤函数中实现
需要 GDAL/QGIS 加速 使用后端委托
通用的 GIS 操作(buffer/clip 等) 使用后端委托复用已有实现
特定的业务逻辑 直接实现,无需委托

19.9 步骤文件放置与自动发现

19.9.1 目录结构

自定义步骤文件应放置在 src/geopipe_agent/steps/ 目录下,按类别组织:

src/geopipe_agent/steps/
├── __init__.py
├── io/
│   ├── __init__.py
│   ├── read_vector.py
│   ├── write_vector.py
│   ├── read_raster.py
│   └── write_raster.py
├── vector/
│   ├── __init__.py
│   ├── buffer.py
│   ├── reproject.py
│   ├── clip.py
│   ├── overlay.py
│   ├── dissolve.py
│   ├── query.py
│   ├── simplify.py
│   └── calculate_area.py      # ← 自定义步骤
├── raster/
│   ├── __init__.py
│   ├── calc.py
│   ├── reproject.py
│   ├── clip.py
│   └── resample.py             # ← 自定义步骤
├── spatial/
│   ├── __init__.py
│   └── ...
├── network/
│   ├── __init__.py
│   └── ...
├── qc/
│   ├── __init__.py
│   ├── geometry_validity.py
│   ├── topology.py
│   ├── crs_check.py
│   └── code_standard.py        # ← 自定义步骤
└── custom/                      # ← 自定义类别
    ├── __init__.py
    └── my_custom_step.py

19.9.2 pkgutil 自动加载机制

GeoPipeAgent 使用 Python 的 pkgutil 模块自动发现和加载步骤:

import pkgutil
import importlib

def discover_steps():
    """自动发现并加载所有步骤模块"""
    steps_package = importlib.import_module("geopipe_agent.steps")

    for importer, modname, ispkg in pkgutil.walk_packages(
        steps_package.__path__,
        prefix=steps_package.__name__ + ".",
    ):
        try:
            importlib.import_module(modname)
        except ImportError as e:
            logger.warning(f"无法加载步骤模块 {modname}: {e}")

工作原理:

  1. pkgutil.walk_packages 递归遍历 steps/ 目录下的所有 Python 模块
  2. importlib.import_module 导入每个模块
  3. 模块导入时,@step 装饰器自动将步骤注册到全局注册表

19.9.3 新类别的创建

如果要创建全新的步骤类别(如 custom),需要:

  1. 创建目录:src/geopipe_agent/steps/custom/
  2. 创建 __init__.py(可以为空文件)
  3. 在目录中添加步骤文件
mkdir -p src/geopipe_agent/steps/custom
touch src/geopipe_agent/steps/custom/__init__.py
# 然后创建步骤文件

19.9.4 验证步骤注册

# 查看所有步骤,确认新步骤出现
geopipe-agent list-steps

# 按类别过滤
geopipe-agent list-steps --category custom

# 查看步骤详情
geopipe-agent describe custom.my_step

19.10 测试自定义步骤

19.10.1 测试目录结构

tests/
├── conftest.py                    # 共享的测试夹具
├── steps/
│   ├── vector/
│   │   ├── test_buffer.py
│   │   └── test_calculate_area.py  # 测试自定义步骤
│   ├── raster/
│   │   └── test_resample.py
│   └── qc/
│       └── test_code_standard.py

19.10.2 测试夹具 (conftest.py)

# tests/conftest.py

import pytest
import geopandas as gpd
from shapely.geometry import Point, Polygon, box

@pytest.fixture
def sample_point_gdf():
    """创建示例点数据"""
    points = [Point(116.3 + i * 0.01, 39.9 + i * 0.01) for i in range(10)]
    return gpd.GeoDataFrame(
        {"name": [f"point_{i}" for i in range(10)]},
        geometry=points,
        crs="EPSG:4326",
    )

@pytest.fixture
def sample_polygon_gdf():
    """创建示例多边形数据"""
    polygons = [
        box(116.0 + i * 0.1, 39.0, 116.1 + i * 0.1, 39.1)
        for i in range(5)
    ]
    return gpd.GeoDataFrame(
        {"name": [f"poly_{i}" for i in range(5)],
         "land_type": ["011", "012", "021", "999", None]},
        geometry=polygons,
        crs="EPSG:4326",
    )

19.10.3 步骤函数测试

# tests/steps/vector/test_calculate_area.py

import pytest
import geopandas as gpd
from shapely.geometry import box

from geopipe_agent.steps.vector.calculate_area import calculate_area

class MockStepContext:
    """模拟 StepContext"""

    def __init__(self, input_data, params):
        self._input = input_data
        self._params = params
        self.logger = MockLogger()

    def input(self, name):
        return self._input

    def param(self, name, default=None):
        return self._params.get(name, default)

class MockLogger:
    def info(self, msg): pass
    def debug(self, msg): pass
    def warning(self, msg): pass

class TestCalculateArea:
    def test_basic_area_calculation(self):
        """测试基本面积计算"""
        # 创建已知面积的多边形(投影坐标系)
        polygons = [box(0, 0, 100, 100)]  # 100x100 = 10000 平方米
        gdf = gpd.GeoDataFrame(geometry=polygons, crs="EPSG:3857")

        ctx = MockStepContext(
            input_data=gdf,
            params={"unit": "sqm", "column_name": "area"},
        )

        result = calculate_area(ctx)

        assert "area" in result.output.columns
        assert result.stats["feature_count"] == 1
        assert result.stats["unit"] == "sqm"

    def test_hectare_unit(self):
        """测试公顷单位转换"""
        polygons = [box(0, 0, 100, 100)]  # 10000 sqm = 1 hectare
        gdf = gpd.GeoDataFrame(geometry=polygons, crs="EPSG:3857")

        ctx = MockStepContext(
            input_data=gdf,
            params={"unit": "hectare", "column_name": "area_ha"},
        )

        result = calculate_area(ctx)

        assert "area_ha" in result.output.columns
        assert result.stats["unit"] == "hectare"

    def test_custom_column_name(self):
        """测试自定义列名"""
        polygons = [box(0, 0, 10, 10)]
        gdf = gpd.GeoDataFrame(geometry=polygons, crs="EPSG:3857")

        ctx = MockStepContext(
            input_data=gdf,
            params={"unit": "sqm", "column_name": "custom_area"},
        )

        result = calculate_area(ctx)

        assert "custom_area" in result.output.columns

    def test_stats_output(self):
        """测试统计信息输出"""
        polygons = [box(0, 0, 10, 10), box(0, 0, 20, 20)]
        gdf = gpd.GeoDataFrame(geometry=polygons, crs="EPSG:3857")

        ctx = MockStepContext(
            input_data=gdf,
            params={"unit": "sqm", "column_name": "area"},
        )

        result = calculate_area(ctx)

        assert "min_area" in result.stats
        assert "max_area" in result.stats
        assert "mean_area" in result.stats
        assert "total_area" in result.stats
        assert result.stats["feature_count"] == 2

19.10.4 运行测试

# 运行所有步骤测试
pytest tests/steps/ -v

# 运行单个步骤测试
pytest tests/steps/vector/test_calculate_area.py -v

# 运行特定测试方法
pytest tests/steps/vector/test_calculate_area.py::TestCalculateArea::test_basic_area_calculation -v

19.11 最佳实践

19.11.1 命名规范

元素 规范 示例
Step ID category.action_name vector.calculate_area
文件名 action_name.py calculate_area.py
函数名 snake_case calculate_area
类别名 小写单词 vector, raster, qc, custom
参数名 snake_case column_name, target_crs

19.11.2 参数设计原则

  1. 必需参数最小化:只将真正必需的参数设为 required: True
  2. 合理的默认值:为可选参数提供实用的默认值
  3. 使用 enum 约束:对有限选项的参数使用 enum 限制
  4. 清晰的描述:每个参数都应有明确的 description
  5. 一致的命名:输入图层统一用 input,输出统一用 output

19.11.3 错误处理原则

@step(...)
def my_step(ctx: StepContext) -> StepResult:
    gdf = ctx.input("input")

    # ✓ 在步骤开始时验证输入
    if gdf is None or len(gdf) == 0:
        raise ValueError("输入数据为空")

    column = ctx.param("column")
    if column not in gdf.columns:
        available = ", ".join(gdf.columns.tolist())
        raise ValueError(
            f"列 '{column}' 不存在。可用列: {available}"
        )

    # ✓ 使用日志记录关键信息
    ctx.logger.info(f"处理 {len(gdf)} 条要素")

    # ✓ 不修改原始数据
    result = gdf.copy()
    # 处理逻辑...

    return StepResult(output=result, stats={...})

19.11.4 统计信息设计

统计信息是步骤间通信的重要机制,尤其在 when 条件中使用。设计统计信息时应注意:

# ✓ 好的统计信息
stats = {
    "feature_count": 1024,      # 整数类型
    "pass_rate": 0.95,          # 浮点数 0-1 范围
    "total_area": 12345.67,     # 数值类型
    "has_errors": False,        # 布尔类型
    "crs": "EPSG:4326",        # 字符串类型
}

# ✗ 避免在统计中放入大数据
stats = {
    "all_data": gdf.to_dict(),  # 不要放完整数据!
}

19.11.5 不修改原始数据

步骤应遵循 不可变输入 原则:

# ✓ 正确:复制后修改
result = gdf.copy()
result["new_col"] = values

# ✗ 错误:直接修改输入
gdf["new_col"] = values  # 会影响上游步骤的输出!

19.11.6 步骤开发检查清单

开发自定义步骤前,请确认以下检查项:

  • Step ID 遵循 category.action 格式
  • 所有必需参数已标注 required: True
  • 可选参数有合理的 default
  • description 清晰且对 AI 友好
  • 输入数据被复制而非直接修改
  • 错误信息包含上下文(列名、可用值等)
  • 统计信息有用且类型正确
  • 日志记录了关键操作
  • 步骤文件放在正确的目录下
  • 已编写单元测试
  • 已通过 list-steps 确认步骤注册成功

19.12 本章小结

本章全面介绍了如何开发自定义步骤来扩展 GeoPipeAgent 的分析能力,主要内容包括:

  1. 开发基础@step 装饰器、StepContextStepResult 的使用方法
  2. 参数规范params 字典的 type、required、default、description、enum 定义
  3. 输出规范outputs 字典结构
  4. 矢量步骤示例:面积计算步骤的完整实现
  5. 栅格步骤示例:栅格重采样步骤的完整实现
  6. QC 步骤示例:编码规范检查步骤的完整实现
  7. 后端委托run_backend_op 的使用场景和方法
  8. 自动发现pkgutil 自动加载机制和目录结构规范
  9. 测试方法:MockStepContext 模式、pytest 测试编写
  10. 最佳实践:命名规范、参数设计、错误处理、统计信息设计

下一章我们将通过多个完整的实战案例和最佳实践,综合运用前面所学的所有知识。


下一章:实战案例与最佳实践 →