第六章:条件执行、重试与错误策略
6.1 概述
GeoPipeAgent 提供了三个关键的流水线控制机制,让流水线能够在复杂场景下健壮运行:
| 机制 | 字段 | 作用 |
|---|---|---|
| 条件执行 | when |
根据变量或运行时状态决定是否执行某步骤 |
| 错误策略 | on_error |
步骤失败时的处置方式:终止/跳过/重试 |
| 自动重试 | on_error: retry |
失败时自动重试,适用于不稳定操作 |
这三个机制共同构成了 GeoPipeAgent 的弹性执行能力,使流水线在面对数据缺失、网络抖动、可选处理等情形时仍能有序运行。
6.2 when 条件执行
6.2.1 基本用法
when 字段接受一个表达式字符串。引擎在执行该步骤前对表达式求值,结果为 false 或 falsy 时,跳过该步骤(记录状态为 skipped),继续执行后续步骤。
variables:
enable_simplify: true
steps:
- id: load
use: io.read_vector
params:
path: "data/roads.shp"
- id: simplify
use: vector.simplify
params:
input: $load
tolerance: 10
when: "${enable_simplify} == true" # 条件为 true,执行
- id: buffer
use: vector.buffer
params:
input: $simplify # 即使 simplify 跳过,这里也能正常引用
distance: 500
6.2.2 跳过步骤的透明传递
当某步骤因 when 为 false 而跳过时,后续步骤引用该步骤($skipped-step)时,引擎会透明地向上查找,返回被跳过步骤的输入:
步骤链: load → simplify(跳过)→ buffer
$simplify 的实际解析结果 → $load 的输出
这意味着跳过一个步骤不会打断数据流,后续步骤照常运行,只是少了被跳过步骤的处理效果。
6.2.3 表达式语法
when 表达式支持的语法(通过 safe_eval 安全求值):
比较运算:
when: "${count} > 0"
when: "${mode} == 'production'"
when: "${tolerance} != 0.001"
when: "$load.feature_count >= 100"
逻辑运算:
# and(所有条件都满足)
when: "${enable_qc} == true and ${run_topology} == true"
# or(至少一个条件满足)
when: "${format} == 'geojson' or ${format} == 'gpkg'"
# not(取反)
when: "not ${skip_buffer}"
成员运算:
# in(值在列表中)
when: "${geometry_type} in ['Polygon', 'MultiPolygon']"
数学运算:
when: "${buffer_dist} * 2 > 1000"
when: "$load.feature_count % 2 == 0"
基于步骤结果的条件:
# 基于上游步骤的要素数量
when: "$load.feature_count > 0"
# 基于上游步骤的 CRS
when: "$load.crs != 'EPSG:4326'"
# 基于质检步骤的结果
when: "$qc-check.passed == false" # 质检失败时才执行修复步骤
6.2.4 多步骤条件链
可以构建基于运行时状态的条件处理分支:
steps:
- id: load
use: io.read_vector
params:
path: "data/buildings.shp"
# 质检步骤:总是执行
- id: qc-geom
use: qc.geometry_validity
params:
input: $load
# 修复步骤:只有质检失败时才执行
- id: fix-geom
use: vector.buffer # 用 0 距离缓冲修复几何(常用技巧)
params:
input: $load
distance: 0
when: "$qc-geom.issue_count > 0"
# 保存步骤:总是执行,但数据来源根据修复步骤是否执行而变化
- id: save
use: io.write_vector
params:
input: $fix-geom # 如果 fix-geom 被跳过,会透明取 $load
path: "output/buildings_clean.shp"
6.2.5 when 与 variables 结合实现功能开关
variables:
enable_reproject: true
enable_simplify: false
enable_qc: true
output_format: "GeoJSON"
steps:
- id: load
use: io.read_vector
params:
path: "data/input.shp"
- id: reproject
use: vector.reproject
params:
input: $load
target_crs: "EPSG:3857"
when: "${enable_reproject} == true"
- id: simplify
use: vector.simplify
params:
input: $reproject
tolerance: 5
when: "${enable_simplify} == true"
- id: qc-check
use: qc.geometry_validity
params:
input: $simplify
when: "${enable_qc} == true"
- id: save-geojson
use: io.write_vector
params:
input: $simplify
path: "output/result.geojson"
when: "${output_format} == 'GeoJSON'"
- id: save-gpkg
use: io.write_vector
params:
input: $simplify
path: "output/result.gpkg"
driver: GPKG
when: "${output_format} == 'GPKG'"
通过 CLI 灵活控制:
# 关闭重投影,开启简化,输出 GPKG
geopipe-agent run pipeline.yaml \
--var enable_reproject=false \
--var enable_simplify=true \
--var output_format=GPKG
6.3 on_error 错误策略
6.3.1 三种错误策略
| 策略 | 触发行为 | 报告状态 | 后续步骤 |
|---|---|---|---|
fail(默认) |
立即终止整个流水线 | failed |
不执行 |
skip |
跳过当前步骤,继续后续步骤 | skipped |
继续执行 |
retry |
最多重试 3 次,仍失败则终止 | failed(重试失败)或 success |
依重试结果而定 |
6.3.2 on_error: fail(默认)
适用于步骤是后续步骤必不可少的前提条件:
steps:
- id: load-critical-data
use: io.read_vector
params:
path: "data/critical_input.shp"
on_error: fail # 加载失败则整个流水线失败(默认行为,可省略)
6.3.3 on_error: skip
适用于可选的、非关键的步骤,失败时不影响主流程:
steps:
- id: load
use: io.read_vector
params:
path: "data/roads.shp"
- id: optional-qc
use: qc.topology
params:
input: $load
on_error: skip # 拓扑检查失败时跳过,不影响后续处理
- id: buffer
use: vector.buffer
params:
input: $load # 注意:引用 $load 而不是 $optional-qc,避免跳过导致问题
distance: 500
使用 skip 的注意事项:
- 跳过步骤后,后续引用该步骤的步骤会得到其上游输入(透明传递)
- 如果需要基于步骤是否成功做决策,结合
when和步骤属性引用
6.3.4 on_error: retry
适用于不稳定的操作(网络请求、外部 API 调用、文件系统竞态等):
steps:
- id: geocode-address
use: network.geocode
params:
address: "北京市朝阳区望京街道"
on_error: retry # 网络超时时自动重试,最多 3 次
- id: download-tile
use: io.read_vector # 假设从网络 URL 读取
params:
path: "https://example.com/tiles/data.geojson"
on_error: retry
- id: call-api
use: io.write_vector # 推送到远程 API
params:
input: $buffer
path: "api://service.example.com/upload"
on_error: retry
重试机制细节:
- 默认最多重试 3 次(第 1 次失败后,再尝试 3 次,共 4 次执行)
- 每次重试前有短暂延迟(指数退避)
- 所有重试均失败时,行为等同于
on_error: fail
6.4 组合使用示例
6.4.1 生产级容错流水线
name: 生产级容错 GIS 流水线
description: 包含完整错误处理和条件执行的生产就绪流水线
variables:
input_path: "data/input.shp"
output_path: "output/result.geojson"
enable_qc: true
skip_topology_check: false
steps:
# 关键步骤:失败则终止
- id: load
use: io.read_vector
params:
path: ${input_path}
on_error: fail
# 验证 CRS(关键,失败终止)
- id: check-crs
use: qc.crs_check
params:
input: $load
expected_crs: "EPSG:4326"
on_error: fail
when: "${enable_qc} == true"
# 投影转换(关键步骤)
- id: reproject
use: vector.reproject
params:
input: $load
target_crs: "EPSG:3857"
on_error: fail
# 几何有效性修复(非关键,跳过不影响主流程)
- id: fix-geometry
use: vector.buffer
params:
input: $reproject
distance: 0
on_error: skip
# 拓扑检查(可选,支持通过变量关闭)
- id: topology-check
use: qc.topology
params:
input: $fix-geometry
on_error: skip
when: "${skip_topology_check} == false and ${enable_qc} == true"
# 缓冲区分析(核心分析步骤)
- id: buffer
use: vector.buffer
params:
input: $fix-geometry
distance: 500
on_error: fail
# 保存结果(带重试,应对文件系统偶发错误)
- id: save
use: io.write_vector
params:
input: $buffer
path: ${output_path}
driver: GeoJSON
on_error: retry
outputs:
result: $save
6.4.2 多数据源并行加载容错
steps:
# 尝试加载主数据源
- id: load-primary
use: io.read_vector
params:
path: "data/primary_source.gpkg"
on_error: skip # 主数据源不可用时跳过
# 尝试加载备用数据源(当主数据源跳过时)
- id: load-backup
use: io.read_vector
params:
path: "data/backup_source.shp"
on_error: fail # 备用源失败则终止
when: "$load-primary.feature_count == 0" # 主数据源无数据时使用备用
6.5 错误报告
当步骤失败时(无论 on_error 策略如何),JSON 报告中都会记录完整的错误信息:
{
"steps": [
{
"id": "geocode",
"use": "network.geocode",
"status": "failed",
"duration_seconds": 5.2,
"error": {
"type": "NetworkError",
"message": "连接超时:Nominatim API 在 5 秒内未响应",
"retry_count": 3,
"last_attempt_at": "2024-01-15T10:30:05.000Z"
}
}
],
"pipeline": {
"status": "failed",
"failed_step": "geocode"
}
}
跳过步骤的报告:
{
"id": "optional-qc",
"use": "qc.topology",
"status": "skipped",
"skip_reason": "on_error: skip(步骤执行失败但被配置为跳过)",
"error": {
"type": "TopologyError",
"message": "拓扑分析内存溢出"
}
}
6.6 最佳实践
何时使用 fail
- 步骤输出是后续所有步骤的必要输入
- 步骤失败意味着整个分析任务无法完成
- 加载核心输入数据、关键格式转换等场景
何时使用 skip
- 步骤是可选的增强处理(质检、简化等)
- 步骤失败不影响主要业务目标
- 用于生成报告的统计步骤(失败时跳过,但主流程继续)
何时使用 retry
- 网络 I/O 操作(地理编码、HTTP API 调用)
- 依赖外部服务的操作
- 可能因资源竞态(文件锁定等)短暂失败的操作
何时使用 when
- 基于参数开关控制可选步骤
- 基于数据特征(要素数量、CRS 类型)决定是否需要某步骤
- 实现条件性数据修复或增强
- 在同一流水线中支持多种输出格式
6.7 小结
本章介绍了 GeoPipeAgent 的三大流水线控制机制:
when条件执行:基于变量或运行时步骤结果的表达式,控制步骤是否执行,跳过步骤透明传递上游输入on_error: fail:默认策略,步骤失败时立即终止流水线on_error: skip:步骤失败时跳过,继续执行后续步骤on_error: retry:失败时自动重试最多 3 次,适用于不稳定操作
三者组合使用,可以构建出既健壮又灵活的生产级 GIS 分析流水线。
下一章将进入步骤篇,首先介绍 IO 步骤——数据读写的核心能力。