第5章:变量系统与步骤引用
本章深入解析 GeoPipeAgent 的变量系统和步骤引用机制,包括变量定义、引用解析、嵌套解析、CLI 覆盖以及相关源码分析。
5.1 变量系统概述
5.1.1 为什么需要变量
在 GIS 分析流水线中,许多参数需要在不同步骤之间共享,或者在不同运行时使用不同的值。如果每个步骤都硬编码参数值,会带来以下问题:
- 修改困难:更改缓冲距离需要找到并修改所有使用该值的步骤
- 无法复用:同一流水线无法用于不同的分析场景
- 不利于 AI 生成:AI 需要了解哪些值是可配置的
没有变量系统的情况:
steps:
- id: buffer_roads
use: vector.buffer
params:
input: $load_roads
distance: 500 # 硬编码
- id: buffer_rivers
use: vector.buffer
params:
input: $load_rivers
distance: 500 # 重复硬编码
- id: save
use: io.write_file
params:
input: $buffer_roads
path: "output/result_500m.shp" # 硬编码
使用变量系统后:
variables:
buffer_dist: 500
steps:
- id: buffer_roads
use: vector.buffer
params:
input: $load_roads
distance: ${buffer_dist} # 引用变量
- id: buffer_rivers
use: vector.buffer
params:
input: $load_rivers
distance: ${buffer_dist} # 引用同一变量
- id: save
use: io.write_file
params:
input: $buffer_roads
path: "output/result_${buffer_dist}m.shp" # 变量嵌入字符串
5.1.2 变量系统的设计目标
| 目标 | 实现方式 |
|---|---|
| 参数集中管理 | variables 节点统一定义 |
| 运行时可配置 | CLI --var 参数覆盖 |
| 字符串嵌入 | ${var} 语法支持嵌入字符串中 |
| 类型灵活 | 支持字符串、数字、布尔等类型 |
| 安全可控 | 变量解析在安全沙箱中进行 |
5.2 变量定义与使用
5.2.1 在 variables 节点定义变量
变量在流水线的 variables 节点中以键值对形式定义:
pipeline:
name: "变量示例"
variables:
# 字符串变量
input_path: "data/roads.shp"
output_dir: "output"
target_crs: "EPSG:4326"
# 数字变量
buffer_distance: 500
simplify_tolerance: 0.001
# 布尔变量
run_qc: true
save_intermediate: false
5.2.2 ${var} 引用语法
在步骤参数中使用 ${变量名} 语法引用变量:
pipeline:
variables:
buffer_dist: 500
target_crs: "EPSG:4326"
output_format: "geojson"
steps:
- id: load
use: io.read_file
params:
path: "${input_path}" # 整个值是变量
- id: reproject
use: vector.reproject
params:
input: $load
crs: ${target_crs} # 不需要引号包裹
- id: buffer
use: vector.buffer
params:
input: $reproject
distance: ${buffer_dist} # 数字类型变量
- id: save
use: io.write_file
params:
input: $buffer
path: "output/result.${output_format}" # 嵌入字符串中
5.2.3 变量引用的位置
变量可以在以下位置使用:
| 位置 | 示例 |
|---|---|
| 步骤参数值 | distance: ${buffer_dist} |
| 参数字符串中 | path: "output/data_${date}.shp" |
when 条件 |
when: "${run_qc} == true" |
| 输出声明 | result: "$step_${suffix}.output" |
5.2.4 变量名命名规范
推荐的变量名格式:[a-z][a-z0-9_]*
| 示例 | 推荐 | 说明 |
|---|---|---|
buffer_distance |
✅ | 下划线分隔,清晰明了 |
outputDir |
⚠️ | 可用,但建议使用下划线 |
CRS |
⚠️ | 可用,但建议使用小写 |
input path |
❌ | 不允许空格 |
123var |
❌ | 不建议以数字开头 |
5.3 变量的类型推断与转换
5.3.1 YAML 原生类型
YAML 解析器(pyyaml)会自动将变量值转换为 Python 类型:
variables:
# 字符串
name: "道路数据" # → str: "道路数据"
path: data/roads.shp # → str: "data/roads.shp"
crs: "EPSG:4326" # → str: "EPSG:4326"
# 数字
distance: 500 # → int: 500
tolerance: 0.001 # → float: 0.001
scale: 1e6 # → float: 1000000.0
# 布尔
enabled: true # → bool: True
debug: false # → bool: False
# 空值
optional: null # → None
also_null: ~ # → None
5.3.2 字符串到数字的自动转换
当变量通过 ${var} 语法嵌入字符串中时,解析后的结果是字符串。但当变量值用作独立参数时,PipelineContext.resolve() 会尝试保持原始类型:
variables:
buffer_dist: 500 # int: 500
steps:
- id: buffer
use: vector.buffer
params:
distance: ${buffer_dist} # 独立使用 → 保持 int: 500
label: "缓冲${buffer_dist}米" # 嵌入字符串 → str: "缓冲500米"
类型保持规则:
┌─────────────────────────────────────────────────┐
│ 变量类型解析规则 │
│ │
│ 变量值类型 引用方式 结果类型 │
│ ────────────────────────────────────────────── │
│ int: 500 ${var} 独立使用 int: 500 │
│ int: 500 "text${var}" str: "text500" │
│ float: 0.1 ${var} 独立使用 float: 0.1 │
│ str: "abc" ${var} 独立使用 str: "abc" │
│ bool: true ${var} 独立使用 bool: True │
│ bool: true "is ${var}" str: "is True" │
└─────────────────────────────────────────────────┘
5.3.3 YAML 类型陷阱
注意 YAML 的自动类型推断可能带来意外:
variables:
# ⚠️ 注意以下陷阱
# 这会被解析为布尔值 True,而非字符串 "yes"
flag: yes # → bool: True
# 版本号可能被解析为浮点数
version: 1.0 # → float: 1.0 (不是 "1.0")
# 使用引号确保是字符串
version: "1.0" # → str: "1.0"
flag: "yes" # → str: "yes"
# EPSG 代码不加引号可能出问题
crs: EPSG:4326 # → str: "EPSG:4326" (pyyaml 处理为字符串)
最佳实践:当值可能被误解析时,使用引号包裹。
5.4 步骤引用详解
5.4.1 引用的两种形式
GeoPipeAgent 支持两种步骤引用语法:
| 语法 | 含义 | 返回值 |
|---|---|---|
$step_id |
简写形式 | 步骤的 output 属性 |
$step_id.attr |
完整形式 | 步骤的指定属性 |
5.4.2 $step_id 简写
简写形式是最常用的引用方式,它等价于 $step_id.output:
steps:
- id: load
use: io.read_file
params:
path: "data.shp"
- id: buffer
use: vector.buffer
params:
input: $load # 等价于 $load.output
distance: 100
解析逻辑:
# PipelineContext 的引用解析
def _resolve_step_ref(self, value: str):
if value.startswith("$") and "." not in value[1:]:
# $step_id 简写 → 返回 step_result.output
step_id = value[1:]
result = self.get_output(step_id)
return result.output
5.4.3 $step_id.output
显式引用步骤的输出数据:
params:
input: $load.output # 显式引用 output 属性
返回步骤的主要输出数据,通常是一个 GeoDataFrame 或其他数据对象。
5.4.4 $step_id.stats
引用步骤的统计信息:
outputs:
statistics: "$buffer.stats" # 引用缓冲区步骤的统计信息
stats 是一个字典,包含步骤执行的统计数据,如要素计数、面积总计等。
5.4.5 $step_id.metadata
引用步骤的元数据:
outputs:
meta: "$load.metadata" # 引用加载步骤的元数据
metadata 包含步骤执行的附加信息,如文件路径、CRS、执行时间等。
5.4.6 $step_id.issues
引用步骤的 QC 问题列表:
outputs:
problems: "$check_geom.issues" # 引用质检步骤的问题列表
issues 是一个 QcIssue 列表,每个 QcIssue 包含 rule_id、severity、feature_index、message、geometry、details。
5.4.7 StepResult 数据结构
所有步骤引用都基于 StepResult 数据类(models/result.py):
@dataclass
class StepResult:
output: any = None # 主要输出数据
stats: dict = field(default_factory=dict) # 统计信息
metadata: dict = field(default_factory=dict) # 元数据
issues: list[QcIssue] = field(default_factory=list) # QC 问题
# QcIssue 数据类 (models/qc.py)
@dataclass
class QcIssue:
rule_id: str # 规则 ID,如 "invalid_geometry"
severity: str # 严重等级:error/warning/info
feature_index: int | None # 问题要素的索引
message: str # 问题描述
geometry: any = None # 问题几何
details: dict = field(default_factory=dict) # 详细信息
5.4.8 引用解析总结
引用语法 返回值
──────────────────────────────────────────────────
$step_id → StepResult.output
$step_id.output → StepResult.output
$step_id.stats → StepResult.stats
$step_id.metadata → StepResult.metadata
$step_id.issues → StepResult.issues
${var_name} → variables[var_name]
──────────────────────────────────────────────────
5.5 引用解析源码分析
5.5.1 PipelineContext.resolve()
resolve() 是引用解析的入口方法,处理任意类型的值(engine/context.py):
class PipelineContext:
def resolve(self, value):
"""递归解析值中的所有引用"""
if isinstance(value, str):
return self._resolve_string(value)
elif isinstance(value, dict):
return {k: self.resolve(v) for k, v in value.items()}
elif isinstance(value, list):
return [self.resolve(item) for item in value]
return value # 数字、布尔等基本类型直接返回
def _resolve_string(self, value: str):
"""解析字符串中的引用"""
# 先尝试步骤引用
resolved = self._resolve_step_ref(value)
if resolved is not value:
return resolved
# 再处理变量替换
return self._substitute_variables(value)
5.5.2 _resolve_step_ref() 步骤引用解析
import re
class PipelineContext:
# 匹配 $step_id 或 $step_id.attr
_STEP_REF_PATTERN = re.compile(r'^\$([a-z0-9_-]+)(?:\.(\w+))?$')
def _resolve_step_ref(self, value: str):
"""解析步骤引用 $step_id 或 $step_id.attr"""
match = self._STEP_REF_PATTERN.match(value.strip())
if not match:
return value # 不是步骤引用,原样返回
step_id = match.group(1) # 步骤 ID
attr = match.group(2) # 属性名(可能为 None)
# 获取步骤结果
if step_id not in self._step_outputs:
raise VariableResolutionError(
f"Referenced step '{step_id}' not found. "
f"Available steps: {list(self._step_outputs.keys())}"
)
result = self._step_outputs[step_id]
if attr is None:
# $step_id 简写 → 返回 output
return result.output
elif hasattr(result, attr):
# $step_id.attr → 返回指定属性
return getattr(result, attr)
else:
raise VariableResolutionError(
f"StepResult has no attribute '{attr}'. "
f"Available: output, stats, metadata, issues"
)
解析流程图:
输入: "$buffer.output"
│
├── 正则匹配: step_id="buffer", attr="output"
│
├── 查找: _step_outputs["buffer"] → StepResult
│
└── 返回: StepResult.output → GeoDataFrame
输入: "$load"
│
├── 正则匹配: step_id="load", attr=None
│
├── 查找: _step_outputs["load"] → StepResult
│
└── 返回: StepResult.output → GeoDataFrame (简写默认 output)
5.5.3 _substitute_variables() 变量替换
class PipelineContext:
# 匹配 ${var_name}
_VAR_PATTERN = re.compile(r'\$\{(\w+)\}')
def _substitute_variables(self, value: str):
"""替换字符串中的 ${var} 变量引用"""
def replacer(match):
var_name = match.group(1)
if var_name not in self.variables:
raise VariableResolutionError(
f"Variable '$}' not defined. "
f"Available variables: {list(self.variables.keys())}"
)
return str(self.variables[var_name])
result = self._VAR_PATTERN.sub(replacer, value)
# 如果整个字符串就是一个变量引用,保持原始类型
full_match = re.fullmatch(r'\$\{(\w+)\}', value)
if full_match:
var_name = full_match.group(1)
return self.variables[var_name] # 返回原始类型
return result # 包含其他文本时返回字符串
类型保持逻辑:
输入: "${buffer_dist}" (变量值 = 500)
│
├── 检测: 整个字符串就是一个 ${var}
│
└── 返回: int(500) ← 保持原始类型
输入: "缓冲${buffer_dist}米" (变量值 = 500)
│
├── 检测: 字符串中嵌入了 ${var}
│
└── 返回: str("缓冲500米") ← 转换为字符串
5.6 嵌套参数的递归解析
5.6.1 字典参数的解析
当参数值是字典时,resolve() 会递归解析每个键值对:
variables:
crs: "EPSG:4326"
tolerance: 0.001
steps:
- id: process
use: vector.simplify
params:
input: $load
options: # 字典参数
crs: ${crs} # 递归解析
simplify: true
tolerance: ${tolerance} # 递归解析
解析过程:
# resolve() 处理字典参数
params = {
"input": "$load",
"options": {
"crs": "${crs}",
"simplify": True,
"tolerance": "${tolerance}"
}
}
# 递归解析后:
resolved_params = {
"input": <GeoDataFrame>, # $load → StepResult.output
"options": {
"crs": "EPSG:4326", # ${crs} → "EPSG:4326"
"simplify": True, # 布尔值直接保留
"tolerance": 0.001 # ${tolerance} → 0.001
}
}
5.6.2 列表参数的解析
当参数值是列表时,resolve() 会递归解析每个元素:
variables:
col1: "name"
col2: "area"
steps:
- id: select
use: vector.select_columns
params:
input: $load
columns: # 列表参数
- ${col1} # 递归解析
- ${col2} # 递归解析
- "population" # 字面值不变
解析过程:
# 原始参数
params = {
"input": "$load",
"columns": ["${col1}", "${col2}", "population"]
}
# 解析后
resolved = {
"input": <GeoDataFrame>,
"columns": ["name", "area", "population"]
}
5.6.3 深层嵌套解析
resolve() 支持任意深度的嵌套:
variables:
threshold: 100
params:
config:
filters:
- field: "area"
value: ${threshold} # 3层嵌套
- field: "population"
value: 50000
output:
format: "geojson"
递归调用链:
resolve(dict) → config
resolve(dict) → filters
resolve(list)
resolve(dict)
resolve(str) → "${threshold}" → 100
resolve(dict)
resolve(int) → 50000 (直接返回)
resolve(dict) → output
resolve(str) → "geojson" (直接返回)
5.7 CLI --var 运行时覆盖
5.7.1 基本用法
GeoPipeAgent CLI 的 run 命令支持 --var 参数,在运行时覆盖流水线中定义的变量:
# 覆盖单个变量
geopipe-agent run pipeline.yaml --var buffer_dist=1000
# 覆盖多个变量
geopipe-agent run pipeline.yaml \
--var buffer_dist=1000 \
--var target_crs=EPSG:32650 \
--var run_qc=false
5.7.2 覆盖机制
CLI --var 参数的值会合并到 PipelineContext 的变量中,覆盖 YAML 中定义的同名变量:
# executor.py 中的变量合并逻辑
def execute_pipeline(self, pipeline: PipelineDefinition,
var_overrides: dict = None):
# 合并变量:CLI --var 覆盖 YAML variables
variables = {
**pipeline.variables, # YAML 中定义的变量
**(var_overrides or {}), # CLI --var 覆盖
}
context = PipelineContext(variables=variables)
合并优先级:
┌──────────────────────────────────────┐
│ 变量优先级(从低到高) │
│ │
│ 1. YAML variables 节点定义(最低) │
│ │ │
│ ▼ 覆盖 │
│ 2. CLI --var 参数(最高) │
└──────────────────────────────────────┘
5.7.3 CLI 命令定义
# cli.py 中 run 命令的定义(基于 Click)
@cli.command()
@click.argument("pipeline_file", type=click.Path(exists=True))
@click.option("--var", multiple=True, help="Override variable: key=value")
@click.option("--backend", default=None, help="Preferred backend")
def run(pipeline_file, var, backend):
"""Run a pipeline from YAML file"""
# 解析 --var 参数
var_overrides = {}
for v in var:
key, _, value = v.partition("=")
var_overrides[key.strip()] = _parse_value(value.strip())
# 解析流水线
pipeline = Parser.parse_yaml(pipeline_file)
# 执行
executor = Executor(preferred_backend=backend)
report = executor.execute_pipeline(pipeline, var_overrides)
# 输出报告
print(json.dumps(report, indent=2, default=str))
5.7.4 值类型解析
CLI --var 传入的值是字符串,GeoPipeAgent 会尝试自动解析为合适的类型:
def _parse_value(value: str):
"""将 CLI 字符串值解析为合适的 Python 类型"""
# 布尔值
if value.lower() in ("true", "yes"):
return True
if value.lower() in ("false", "no"):
return False
# 整数
try:
return int(value)
except ValueError:
pass
# 浮点数
try:
return float(value)
except ValueError:
pass
# 字符串
return value
5.7.5 使用场景
# 场景1:调整分析参数
geopipe-agent run analysis.yaml --var buffer_dist=200
# 场景2:切换输入数据
geopipe-agent run process.yaml --var input_path=data/new_roads.shp
# 场景3:控制可选步骤
geopipe-agent run pipeline.yaml --var run_qc=true
# 场景4:切换坐标系
geopipe-agent run reproject.yaml --var target_crs=EPSG:32650
# 场景5:批量脚本中使用
for dist in 100 200 500 1000; do
geopipe-agent run buffer.yaml --var buffer_dist=$dist
done
5.8 变量验证与错误处理
5.8.1 VariableResolutionError
当变量解析失败时,PipelineContext 会抛出 VariableResolutionError(定义在 errors.py 中):
class VariableResolutionError(GeoPipeAgentError):
"""变量或步骤引用解析失败"""
pass
5.8.2 未定义变量错误
variables:
buffer_dist: 500
steps:
- id: buffer
use: vector.buffer
params:
distance: ${undefined_var} # ❌ 变量未定义
错误信息:
VariableResolutionError: Variable '${undefined_var}' not defined.
Available variables: ['buffer_dist']
5.8.3 未定义步骤引用错误
steps:
- id: buffer
use: vector.buffer
params:
input: $nonexistent_step # ❌ 步骤不存在
distance: 100
错误信息:
VariableResolutionError: Referenced step 'nonexistent_step' not found.
Available steps: []
5.8.4 无效属性引用错误
steps:
- id: load
use: io.read_file
params:
path: "data.shp"
- id: process
use: vector.buffer
params:
input: $load.nonexistent # ❌ 无效属性
distance: 100
错误信息:
VariableResolutionError: StepResult has no attribute 'nonexistent'.
Available: output, stats, metadata, issues
5.8.5 前向引用错误
步骤只能引用在其之前执行的步骤。前向引用(引用后面的步骤)在验证阶段就会被拒绝:
steps:
- id: buffer
use: vector.buffer
params:
input: $load # ❌ load 在 buffer 之后定义
distance: 100
- id: load
use: io.read_file
params:
path: "data.shp"
validator.py 会检测并报告此错误:
def _check_step_references(self, steps):
defined = set()
for step in steps:
# 检查参数中引用的步骤是否已定义
refs = self._extract_step_refs(step.params)
for ref in refs:
if ref not in defined:
raise ValidationError(
f"Step '{step.id}' references '{ref}' "
f"which is not defined before it"
)
defined.add(step.id)
5.8.6 错误信息的 AI 友好设计
所有错误信息都包含修复建议,便于 AI 理解和修正:
错误类型 错误信息包含
─────────────────────────────────────────────
变量未定义 → "Available variables: [...]"
步骤引用无效 → "Available steps: [...]"
属性不存在 → "Available: output, stats, metadata, issues"
前向引用 → "which is not defined before it"
5.9 最佳实践
5.9.1 变量命名规范
| 类别 | 命名模式 | 示例 |
|---|---|---|
| 文件路径 | xxx_path / xxx_dir |
input_path, output_dir |
| 距离/数值 | xxx_distance / xxx_value |
buffer_distance, threshold_value |
| 坐标系 | target_crs / source_crs |
target_crs |
| 布尔开关 | run_xxx / enable_xxx |
run_qc, enable_simplify |
| 格式 | xxx_format |
output_format |
5.9.2 变量与步骤引用的选择
| 场景 | 推荐方式 | 原因 |
|---|---|---|
| 跨步骤共享的固定值 | 变量 ${var} |
集中管理,方便修改 |
| 步骤间的数据传递 | 步骤引用 $step_id |
自动传递数据对象 |
| 运行时可能变化的值 | 变量 + CLI --var |
灵活配置 |
| 环境相关的值(路径等) | 变量 | 便于不同环境切换 |
5.9.3 流水线复用模式
参数化流水线:将所有可变部分提取为变量
pipeline:
name: "通用缓冲区分析"
variables:
input_file: "data/default.shp" # 默认值
buffer_dist: 500 # 默认值
output_file: "output/result.shp" # 默认值
target_crs: "EPSG:4326" # 默认值
steps:
- id: load
use: io.read_file
params:
path: ${input_file}
- id: reproject
use: vector.reproject
params:
input: $load
crs: ${target_crs}
- id: buffer
use: vector.buffer
params:
input: $reproject
distance: ${buffer_dist}
- id: save
use: io.write_file
params:
input: $buffer
path: ${output_file}
使用时通过 --var 定制:
# 分析道路
geopipe-agent run generic_buffer.yaml \
--var input_file=data/roads.shp \
--var buffer_dist=100 \
--var output_file=output/road_buffer.shp
# 分析河流
geopipe-agent run generic_buffer.yaml \
--var input_file=data/rivers.shp \
--var buffer_dist=200 \
--var output_file=output/river_buffer.shp
5.9.4 步骤 ID 与引用的对齐
保持步骤 ID 的语义清晰,使引用代码易于理解:
# ✅ 良好的实践:ID 清晰描述步骤功能
steps:
- id: load_parcels
use: io.read_file
params:
path: "parcels.shp"
- id: buffer_parcels
use: vector.buffer
params:
input: $load_parcels # 清晰:缓冲宗地数据
distance: 50
- id: clip_buildings
use: vector.clip
params:
input: $load_buildings
mask: $buffer_parcels # 清晰:用宗地缓冲区裁剪
# ❌ 不好的实践:ID 含义不清
steps:
- id: s1
use: io.read_file
params:
path: "parcels.shp"
- id: s2
use: vector.buffer
params:
input: $s1 # 不清楚 s1 是什么
distance: 50
5.9.5 调试引用问题
当引用解析出现问题时,可以使用 validate 命令快速定位:
# 验证流水线定义
geopipe-agent validate pipeline.yaml
验证器会检查:
- 所有步骤引用是否指向已定义的前序步骤
- 输出声明中的引用是否有效
- 步骤 ID 是否符合命名规则
5.10 本章小结
本章深入解析了 GeoPipeAgent 的变量系统和步骤引用机制:
- 变量定义:在
variables节点用键值对定义,支持字符串、数字、布尔等类型 - 变量引用:使用
${var_name}语法,支持独立使用和字符串嵌入 - 类型推断:独立使用保持原始类型,嵌入字符串时转为字符串
- 步骤引用:
$step_id(简写,返回 output)和$step_id.attr(完整形式) - 引用解析:
PipelineContext.resolve()递归处理字符串、字典、列表中的引用 - 嵌套解析:支持任意深度的字典和列表嵌套
- CLI 覆盖:
--var key=value运行时覆盖变量,优先级高于 YAML 定义 - 错误处理:
VariableResolutionError提供 AI 友好的错误信息和修复建议 - 最佳实践:语义化命名、参数化流水线、合理选择变量与引用