znlgis 博客

GIS开发与技术分享

第5章:变量系统与步骤引用

本章深入解析 GeoPipeAgent 的变量系统和步骤引用机制,包括变量定义、引用解析、嵌套解析、CLI 覆盖以及相关源码分析。


5.1 变量系统概述

5.1.1 为什么需要变量

在 GIS 分析流水线中,许多参数需要在不同步骤之间共享,或者在不同运行时使用不同的值。如果每个步骤都硬编码参数值,会带来以下问题:

  • 修改困难:更改缓冲距离需要找到并修改所有使用该值的步骤
  • 无法复用:同一流水线无法用于不同的分析场景
  • 不利于 AI 生成:AI 需要了解哪些值是可配置的

没有变量系统的情况

steps:
  - id: buffer_roads
    use: vector.buffer
    params:
      input: $load_roads
      distance: 500            # 硬编码
  - id: buffer_rivers
    use: vector.buffer
    params:
      input: $load_rivers
      distance: 500            # 重复硬编码
  - id: save
    use: io.write_file
    params:
      input: $buffer_roads
      path: "output/result_500m.shp"  # 硬编码

使用变量系统后

variables:
  buffer_dist: 500

steps:
  - id: buffer_roads
    use: vector.buffer
    params:
      input: $load_roads
      distance: ${buffer_dist}       # 引用变量
  - id: buffer_rivers
    use: vector.buffer
    params:
      input: $load_rivers
      distance: ${buffer_dist}       # 引用同一变量
  - id: save
    use: io.write_file
    params:
      input: $buffer_roads
      path: "output/result_${buffer_dist}m.shp"  # 变量嵌入字符串

5.1.2 变量系统的设计目标

目标 实现方式
参数集中管理 variables 节点统一定义
运行时可配置 CLI --var 参数覆盖
字符串嵌入 ${var} 语法支持嵌入字符串中
类型灵活 支持字符串、数字、布尔等类型
安全可控 变量解析在安全沙箱中进行

5.2 变量定义与使用

5.2.1 在 variables 节点定义变量

变量在流水线的 variables 节点中以键值对形式定义:

pipeline:
  name: "变量示例"
  variables:
    # 字符串变量
    input_path: "data/roads.shp"
    output_dir: "output"
    target_crs: "EPSG:4326"

    # 数字变量
    buffer_distance: 500
    simplify_tolerance: 0.001

    # 布尔变量
    run_qc: true
    save_intermediate: false

5.2.2 ${var} 引用语法

在步骤参数中使用 ${变量名} 语法引用变量:

pipeline:
  variables:
    buffer_dist: 500
    target_crs: "EPSG:4326"
    output_format: "geojson"

  steps:
    - id: load
      use: io.read_file
      params:
        path: "${input_path}"         # 整个值是变量

    - id: reproject
      use: vector.reproject
      params:
        input: $load
        crs: ${target_crs}            # 不需要引号包裹

    - id: buffer
      use: vector.buffer
      params:
        input: $reproject
        distance: ${buffer_dist}      # 数字类型变量

    - id: save
      use: io.write_file
      params:
        input: $buffer
        path: "output/result.${output_format}"  # 嵌入字符串中

5.2.3 变量引用的位置

变量可以在以下位置使用:

位置 示例
步骤参数值 distance: ${buffer_dist}
参数字符串中 path: "output/data_${date}.shp"
when 条件 when: "${run_qc} == true"
输出声明 result: "$step_${suffix}.output"

5.2.4 变量名命名规范

推荐的变量名格式:[a-z][a-z0-9_]*
示例 推荐 说明
buffer_distance 下划线分隔,清晰明了
outputDir ⚠️ 可用,但建议使用下划线
CRS ⚠️ 可用,但建议使用小写
input path 不允许空格
123var 不建议以数字开头

5.3 变量的类型推断与转换

5.3.1 YAML 原生类型

YAML 解析器(pyyaml)会自动将变量值转换为 Python 类型:

variables:
  # 字符串
  name: "道路数据"         # → str: "道路数据"
  path: data/roads.shp     # → str: "data/roads.shp"
  crs: "EPSG:4326"         # → str: "EPSG:4326"

  # 数字
  distance: 500            # → int: 500
  tolerance: 0.001         # → float: 0.001
  scale: 1e6               # → float: 1000000.0

  # 布尔
  enabled: true            # → bool: True
  debug: false             # → bool: False

  # 空值
  optional: null           # → None
  also_null: ~             # → None

5.3.2 字符串到数字的自动转换

当变量通过 ${var} 语法嵌入字符串中时,解析后的结果是字符串。但当变量值用作独立参数时,PipelineContext.resolve() 会尝试保持原始类型:

variables:
  buffer_dist: 500          # int: 500

steps:
  - id: buffer
    use: vector.buffer
    params:
      distance: ${buffer_dist}     # 独立使用 → 保持 int: 500
      label: "缓冲${buffer_dist}米"  # 嵌入字符串 → str: "缓冲500米"

类型保持规则

┌─────────────────────────────────────────────────┐
│           变量类型解析规则                         │
│                                                 │
│  变量值类型    引用方式           结果类型         │
│  ────────────────────────────────────────────── │
│  int: 500     ${var} 独立使用    int: 500       │
│  int: 500     "text${var}"       str: "text500" │
│  float: 0.1   ${var} 独立使用    float: 0.1     │
│  str: "abc"   ${var} 独立使用    str: "abc"     │
│  bool: true   ${var} 独立使用    bool: True     │
│  bool: true   "is ${var}"        str: "is True" │
└─────────────────────────────────────────────────┘

5.3.3 YAML 类型陷阱

注意 YAML 的自动类型推断可能带来意外:

variables:
  # ⚠️ 注意以下陷阱

  # 这会被解析为布尔值 True,而非字符串 "yes"
  flag: yes           # → bool: True

  # 版本号可能被解析为浮点数
  version: 1.0        # → float: 1.0 (不是 "1.0")

  # 使用引号确保是字符串
  version: "1.0"      # → str: "1.0"
  flag: "yes"         # → str: "yes"

  # EPSG 代码不加引号可能出问题
  crs: EPSG:4326      # → str: "EPSG:4326" (pyyaml 处理为字符串)

最佳实践:当值可能被误解析时,使用引号包裹。


5.4 步骤引用详解

5.4.1 引用的两种形式

GeoPipeAgent 支持两种步骤引用语法:

语法 含义 返回值
$step_id 简写形式 步骤的 output 属性
$step_id.attr 完整形式 步骤的指定属性

5.4.2 $step_id 简写

简写形式是最常用的引用方式,它等价于 $step_id.output

steps:
  - id: load
    use: io.read_file
    params:
      path: "data.shp"
  - id: buffer
    use: vector.buffer
    params:
      input: $load              # 等价于 $load.output
      distance: 100

解析逻辑

# PipelineContext 的引用解析
def _resolve_step_ref(self, value: str):
    if value.startswith("$") and "." not in value[1:]:
        # $step_id 简写 → 返回 step_result.output
        step_id = value[1:]
        result = self.get_output(step_id)
        return result.output

5.4.3 $step_id.output

显式引用步骤的输出数据:

params:
  input: $load.output          # 显式引用 output 属性

返回步骤的主要输出数据,通常是一个 GeoDataFrame 或其他数据对象。

5.4.4 $step_id.stats

引用步骤的统计信息:

outputs:
  statistics: "$buffer.stats"  # 引用缓冲区步骤的统计信息

stats 是一个字典,包含步骤执行的统计数据,如要素计数、面积总计等。

5.4.5 $step_id.metadata

引用步骤的元数据:

outputs:
  meta: "$load.metadata"       # 引用加载步骤的元数据

metadata 包含步骤执行的附加信息,如文件路径、CRS、执行时间等。

5.4.6 $step_id.issues

引用步骤的 QC 问题列表:

outputs:
  problems: "$check_geom.issues"  # 引用质检步骤的问题列表

issues 是一个 QcIssue 列表,每个 QcIssue 包含 rule_idseverityfeature_indexmessagegeometrydetails

5.4.7 StepResult 数据结构

所有步骤引用都基于 StepResult 数据类(models/result.py):

@dataclass
class StepResult:
    output: any = None              # 主要输出数据
    stats: dict = field(default_factory=dict)     # 统计信息
    metadata: dict = field(default_factory=dict)  # 元数据
    issues: list[QcIssue] = field(default_factory=list)  # QC 问题

# QcIssue 数据类 (models/qc.py)
@dataclass
class QcIssue:
    rule_id: str                    # 规则 ID,如 "invalid_geometry"
    severity: str                   # 严重等级:error/warning/info
    feature_index: int | None       # 问题要素的索引
    message: str                    # 问题描述
    geometry: any = None            # 问题几何
    details: dict = field(default_factory=dict)  # 详细信息

5.4.8 引用解析总结

引用语法                    返回值
──────────────────────────────────────────────────
$step_id                  → StepResult.output
$step_id.output           → StepResult.output
$step_id.stats            → StepResult.stats
$step_id.metadata         → StepResult.metadata
$step_id.issues           → StepResult.issues
${var_name}               → variables[var_name]
──────────────────────────────────────────────────

5.5 引用解析源码分析

5.5.1 PipelineContext.resolve()

resolve() 是引用解析的入口方法,处理任意类型的值(engine/context.py):

class PipelineContext:
    def resolve(self, value):
        """递归解析值中的所有引用"""
        if isinstance(value, str):
            return self._resolve_string(value)
        elif isinstance(value, dict):
            return {k: self.resolve(v) for k, v in value.items()}
        elif isinstance(value, list):
            return [self.resolve(item) for item in value]
        return value  # 数字、布尔等基本类型直接返回

    def _resolve_string(self, value: str):
        """解析字符串中的引用"""
        # 先尝试步骤引用
        resolved = self._resolve_step_ref(value)
        if resolved is not value:
            return resolved

        # 再处理变量替换
        return self._substitute_variables(value)

5.5.2 _resolve_step_ref() 步骤引用解析

import re

class PipelineContext:
    # 匹配 $step_id 或 $step_id.attr
    _STEP_REF_PATTERN = re.compile(r'^\$([a-z0-9_-]+)(?:\.(\w+))?$')

    def _resolve_step_ref(self, value: str):
        """解析步骤引用 $step_id 或 $step_id.attr"""
        match = self._STEP_REF_PATTERN.match(value.strip())
        if not match:
            return value  # 不是步骤引用,原样返回

        step_id = match.group(1)   # 步骤 ID
        attr = match.group(2)      # 属性名(可能为 None)

        # 获取步骤结果
        if step_id not in self._step_outputs:
            raise VariableResolutionError(
                f"Referenced step '{step_id}' not found. "
                f"Available steps: {list(self._step_outputs.keys())}"
            )

        result = self._step_outputs[step_id]

        if attr is None:
            # $step_id 简写 → 返回 output
            return result.output
        elif hasattr(result, attr):
            # $step_id.attr → 返回指定属性
            return getattr(result, attr)
        else:
            raise VariableResolutionError(
                f"StepResult has no attribute '{attr}'. "
                f"Available: output, stats, metadata, issues"
            )

解析流程图

输入: "$buffer.output"
  │
  ├── 正则匹配: step_id="buffer", attr="output"
  │
  ├── 查找: _step_outputs["buffer"] → StepResult
  │
  └── 返回: StepResult.output → GeoDataFrame

输入: "$load"
  │
  ├── 正则匹配: step_id="load", attr=None
  │
  ├── 查找: _step_outputs["load"] → StepResult
  │
  └── 返回: StepResult.output → GeoDataFrame (简写默认 output)

5.5.3 _substitute_variables() 变量替换

class PipelineContext:
    # 匹配 ${var_name}
    _VAR_PATTERN = re.compile(r'\$\{(\w+)\}')

    def _substitute_variables(self, value: str):
        """替换字符串中的 ${var} 变量引用"""
        def replacer(match):
            var_name = match.group(1)
            if var_name not in self.variables:
                raise VariableResolutionError(
                    f"Variable '$}' not defined. "
                    f"Available variables: {list(self.variables.keys())}"
                )
            return str(self.variables[var_name])

        result = self._VAR_PATTERN.sub(replacer, value)

        # 如果整个字符串就是一个变量引用,保持原始类型
        full_match = re.fullmatch(r'\$\{(\w+)\}', value)
        if full_match:
            var_name = full_match.group(1)
            return self.variables[var_name]  # 返回原始类型

        return result  # 包含其他文本时返回字符串

类型保持逻辑

输入: "${buffer_dist}"    (变量值 = 500)
  │
  ├── 检测: 整个字符串就是一个 ${var}
  │
  └── 返回: int(500)  ← 保持原始类型

输入: "缓冲${buffer_dist}米"  (变量值 = 500)
  │
  ├── 检测: 字符串中嵌入了 ${var}
  │
  └── 返回: str("缓冲500米")  ← 转换为字符串

5.6 嵌套参数的递归解析

5.6.1 字典参数的解析

当参数值是字典时,resolve() 会递归解析每个键值对:

variables:
  crs: "EPSG:4326"
  tolerance: 0.001

steps:
  - id: process
    use: vector.simplify
    params:
      input: $load
      options:                    # 字典参数
        crs: ${crs}              # 递归解析
        simplify: true
        tolerance: ${tolerance}  # 递归解析

解析过程

# resolve() 处理字典参数
params = {
    "input": "$load",
    "options": {
        "crs": "${crs}",
        "simplify": True,
        "tolerance": "${tolerance}"
    }
}

# 递归解析后:
resolved_params = {
    "input": <GeoDataFrame>,          # $load → StepResult.output
    "options": {
        "crs": "EPSG:4326",           # ${crs} → "EPSG:4326"
        "simplify": True,             # 布尔值直接保留
        "tolerance": 0.001            # ${tolerance} → 0.001
    }
}

5.6.2 列表参数的解析

当参数值是列表时,resolve() 会递归解析每个元素:

variables:
  col1: "name"
  col2: "area"

steps:
  - id: select
    use: vector.select_columns
    params:
      input: $load
      columns:                  # 列表参数
        - ${col1}              # 递归解析
        - ${col2}              # 递归解析
        - "population"         # 字面值不变

解析过程

# 原始参数
params = {
    "input": "$load",
    "columns": ["${col1}", "${col2}", "population"]
}

# 解析后
resolved = {
    "input": <GeoDataFrame>,
    "columns": ["name", "area", "population"]
}

5.6.3 深层嵌套解析

resolve() 支持任意深度的嵌套:

variables:
  threshold: 100

params:
  config:
    filters:
      - field: "area"
        value: ${threshold}         # 3层嵌套
      - field: "population"
        value: 50000
    output:
      format: "geojson"

递归调用链:

resolve(dict)  → config
  resolve(dict)  → filters
    resolve(list)
      resolve(dict)
        resolve(str)  → "${threshold}" → 100
      resolve(dict)
        resolve(int)  → 50000 (直接返回)
  resolve(dict)  → output
    resolve(str)  → "geojson" (直接返回)

5.7 CLI --var 运行时覆盖

5.7.1 基本用法

GeoPipeAgent CLI 的 run 命令支持 --var 参数,在运行时覆盖流水线中定义的变量:

# 覆盖单个变量
geopipe-agent run pipeline.yaml --var buffer_dist=1000

# 覆盖多个变量
geopipe-agent run pipeline.yaml \
  --var buffer_dist=1000 \
  --var target_crs=EPSG:32650 \
  --var run_qc=false

5.7.2 覆盖机制

CLI --var 参数的值会合并到 PipelineContext 的变量中,覆盖 YAML 中定义的同名变量:

# executor.py 中的变量合并逻辑
def execute_pipeline(self, pipeline: PipelineDefinition,
                     var_overrides: dict = None):
    # 合并变量:CLI --var 覆盖 YAML variables
    variables = {
        **pipeline.variables,       # YAML 中定义的变量
        **(var_overrides or {}),     # CLI --var 覆盖
    }
    context = PipelineContext(variables=variables)

合并优先级

┌──────────────────────────────────────┐
│         变量优先级(从低到高)          │
│                                      │
│  1. YAML variables 节点定义(最低)    │
│       │                              │
│       ▼ 覆盖                         │
│  2. CLI --var 参数(最高)             │
└──────────────────────────────────────┘

5.7.3 CLI 命令定义

# cli.py 中 run 命令的定义(基于 Click)
@cli.command()
@click.argument("pipeline_file", type=click.Path(exists=True))
@click.option("--var", multiple=True, help="Override variable: key=value")
@click.option("--backend", default=None, help="Preferred backend")
def run(pipeline_file, var, backend):
    """Run a pipeline from YAML file"""
    # 解析 --var 参数
    var_overrides = {}
    for v in var:
        key, _, value = v.partition("=")
        var_overrides[key.strip()] = _parse_value(value.strip())

    # 解析流水线
    pipeline = Parser.parse_yaml(pipeline_file)

    # 执行
    executor = Executor(preferred_backend=backend)
    report = executor.execute_pipeline(pipeline, var_overrides)

    # 输出报告
    print(json.dumps(report, indent=2, default=str))

5.7.4 值类型解析

CLI --var 传入的值是字符串,GeoPipeAgent 会尝试自动解析为合适的类型:

def _parse_value(value: str):
    """将 CLI 字符串值解析为合适的 Python 类型"""
    # 布尔值
    if value.lower() in ("true", "yes"):
        return True
    if value.lower() in ("false", "no"):
        return False

    # 整数
    try:
        return int(value)
    except ValueError:
        pass

    # 浮点数
    try:
        return float(value)
    except ValueError:
        pass

    # 字符串
    return value

5.7.5 使用场景

# 场景1:调整分析参数
geopipe-agent run analysis.yaml --var buffer_dist=200

# 场景2:切换输入数据
geopipe-agent run process.yaml --var input_path=data/new_roads.shp

# 场景3:控制可选步骤
geopipe-agent run pipeline.yaml --var run_qc=true

# 场景4:切换坐标系
geopipe-agent run reproject.yaml --var target_crs=EPSG:32650

# 场景5:批量脚本中使用
for dist in 100 200 500 1000; do
  geopipe-agent run buffer.yaml --var buffer_dist=$dist
done

5.8 变量验证与错误处理

5.8.1 VariableResolutionError

当变量解析失败时,PipelineContext 会抛出 VariableResolutionError(定义在 errors.py 中):

class VariableResolutionError(GeoPipeAgentError):
    """变量或步骤引用解析失败"""
    pass

5.8.2 未定义变量错误

variables:
  buffer_dist: 500

steps:
  - id: buffer
    use: vector.buffer
    params:
      distance: ${undefined_var}     # ❌ 变量未定义

错误信息:

VariableResolutionError: Variable '${undefined_var}' not defined.
Available variables: ['buffer_dist']

5.8.3 未定义步骤引用错误

steps:
  - id: buffer
    use: vector.buffer
    params:
      input: $nonexistent_step       # ❌ 步骤不存在
      distance: 100

错误信息:

VariableResolutionError: Referenced step 'nonexistent_step' not found.
Available steps: []

5.8.4 无效属性引用错误

steps:
  - id: load
    use: io.read_file
    params:
      path: "data.shp"
  - id: process
    use: vector.buffer
    params:
      input: $load.nonexistent       # ❌ 无效属性
      distance: 100

错误信息:

VariableResolutionError: StepResult has no attribute 'nonexistent'.
Available: output, stats, metadata, issues

5.8.5 前向引用错误

步骤只能引用在其之前执行的步骤。前向引用(引用后面的步骤)在验证阶段就会被拒绝:

steps:
  - id: buffer
    use: vector.buffer
    params:
      input: $load                   # ❌ load 在 buffer 之后定义
      distance: 100
  - id: load
    use: io.read_file
    params:
      path: "data.shp"

validator.py 会检测并报告此错误:

def _check_step_references(self, steps):
    defined = set()
    for step in steps:
        # 检查参数中引用的步骤是否已定义
        refs = self._extract_step_refs(step.params)
        for ref in refs:
            if ref not in defined:
                raise ValidationError(
                    f"Step '{step.id}' references '{ref}' "
                    f"which is not defined before it"
                )
        defined.add(step.id)

5.8.6 错误信息的 AI 友好设计

所有错误信息都包含修复建议,便于 AI 理解和修正:

错误类型                      错误信息包含
─────────────────────────────────────────────
变量未定义        → "Available variables: [...]"
步骤引用无效      → "Available steps: [...]"
属性不存在        → "Available: output, stats, metadata, issues"
前向引用          → "which is not defined before it"

5.9 最佳实践

5.9.1 变量命名规范

类别 命名模式 示例
文件路径 xxx_path / xxx_dir input_path, output_dir
距离/数值 xxx_distance / xxx_value buffer_distance, threshold_value
坐标系 target_crs / source_crs target_crs
布尔开关 run_xxx / enable_xxx run_qc, enable_simplify
格式 xxx_format output_format

5.9.2 变量与步骤引用的选择

场景 推荐方式 原因
跨步骤共享的固定值 变量 ${var} 集中管理,方便修改
步骤间的数据传递 步骤引用 $step_id 自动传递数据对象
运行时可能变化的值 变量 + CLI --var 灵活配置
环境相关的值(路径等) 变量 便于不同环境切换

5.9.3 流水线复用模式

参数化流水线:将所有可变部分提取为变量

pipeline:
  name: "通用缓冲区分析"
  variables:
    input_file: "data/default.shp"    # 默认值
    buffer_dist: 500                   # 默认值
    output_file: "output/result.shp"   # 默认值
    target_crs: "EPSG:4326"            # 默认值

  steps:
    - id: load
      use: io.read_file
      params:
        path: ${input_file}
    - id: reproject
      use: vector.reproject
      params:
        input: $load
        crs: ${target_crs}
    - id: buffer
      use: vector.buffer
      params:
        input: $reproject
        distance: ${buffer_dist}
    - id: save
      use: io.write_file
      params:
        input: $buffer
        path: ${output_file}

使用时通过 --var 定制:

# 分析道路
geopipe-agent run generic_buffer.yaml \
  --var input_file=data/roads.shp \
  --var buffer_dist=100 \
  --var output_file=output/road_buffer.shp

# 分析河流
geopipe-agent run generic_buffer.yaml \
  --var input_file=data/rivers.shp \
  --var buffer_dist=200 \
  --var output_file=output/river_buffer.shp

5.9.4 步骤 ID 与引用的对齐

保持步骤 ID 的语义清晰,使引用代码易于理解:

# ✅ 良好的实践:ID 清晰描述步骤功能
steps:
  - id: load_parcels
    use: io.read_file
    params:
      path: "parcels.shp"
  - id: buffer_parcels
    use: vector.buffer
    params:
      input: $load_parcels        # 清晰:缓冲宗地数据
      distance: 50
  - id: clip_buildings
    use: vector.clip
    params:
      input: $load_buildings
      mask: $buffer_parcels       # 清晰:用宗地缓冲区裁剪

# ❌ 不好的实践:ID 含义不清
steps:
  - id: s1
    use: io.read_file
    params:
      path: "parcels.shp"
  - id: s2
    use: vector.buffer
    params:
      input: $s1                  # 不清楚 s1 是什么
      distance: 50

5.9.5 调试引用问题

当引用解析出现问题时,可以使用 validate 命令快速定位:

# 验证流水线定义
geopipe-agent validate pipeline.yaml

验证器会检查:

  • 所有步骤引用是否指向已定义的前序步骤
  • 输出声明中的引用是否有效
  • 步骤 ID 是否符合命名规则

5.10 本章小结

本章深入解析了 GeoPipeAgent 的变量系统和步骤引用机制:

  1. 变量定义:在 variables 节点用键值对定义,支持字符串、数字、布尔等类型
  2. 变量引用:使用 ${var_name} 语法,支持独立使用和字符串嵌入
  3. 类型推断:独立使用保持原始类型,嵌入字符串时转为字符串
  4. 步骤引用$step_id(简写,返回 output)和 $step_id.attr(完整形式)
  5. 引用解析PipelineContext.resolve() 递归处理字符串、字典、列表中的引用
  6. 嵌套解析:支持任意深度的字典和列表嵌套
  7. CLI 覆盖--var key=value 运行时覆盖变量,优先级高于 YAML 定义
  8. 错误处理VariableResolutionError 提供 AI 友好的错误信息和修复建议
  9. 最佳实践:语义化命名、参数化流水线、合理选择变量与引用

下一章:Step注册表与插件系统 →