znlgis 博客

GIS开发与技术分享

第六章:条件执行、重试与错误策略

6.1 概述

GeoPipeAgent 提供了三个关键的流水线控制机制,让流水线能够在复杂场景下健壮运行:

机制 字段 作用
条件执行 when 根据变量或运行时状态决定是否执行某步骤
错误策略 on_error 步骤失败时的处置方式:终止/跳过/重试
自动重试 on_error: retry 失败时自动重试,适用于不稳定操作

这三个机制共同构成了 GeoPipeAgent 的弹性执行能力,使流水线在面对数据缺失、网络抖动、可选处理等情形时仍能有序运行。

6.2 when 条件执行

6.2.1 基本用法

when 字段接受一个表达式字符串。引擎在执行该步骤前对表达式求值,结果为 false 或 falsy 时,跳过该步骤(记录状态为 skipped),继续执行后续步骤。

variables:
  enable_simplify: true

steps:
  - id: load
    use: io.read_vector
    params:
      path: "data/roads.shp"

  - id: simplify
    use: vector.simplify
    params:
      input: $load
      tolerance: 10
    when: "${enable_simplify} == true"    # 条件为 true,执行

  - id: buffer
    use: vector.buffer
    params:
      input: $simplify    # 即使 simplify 跳过,这里也能正常引用
      distance: 500

6.2.2 跳过步骤的透明传递

当某步骤因 when 为 false 而跳过时,后续步骤引用该步骤($skipped-step)时,引擎会透明地向上查找,返回被跳过步骤的输入:

步骤链: load → simplify(跳过)→ buffer
$simplify 的实际解析结果 → $load 的输出

这意味着跳过一个步骤不会打断数据流,后续步骤照常运行,只是少了被跳过步骤的处理效果。

6.2.3 表达式语法

when 表达式支持的语法(通过 safe_eval 安全求值):

比较运算

when: "${count} > 0"
when: "${mode} == 'production'"
when: "${tolerance} != 0.001"
when: "$load.feature_count >= 100"

逻辑运算

# and(所有条件都满足)
when: "${enable_qc} == true and ${run_topology} == true"

# or(至少一个条件满足)
when: "${format} == 'geojson' or ${format} == 'gpkg'"

# not(取反)
when: "not ${skip_buffer}"

成员运算

# in(值在列表中)
when: "${geometry_type} in ['Polygon', 'MultiPolygon']"

数学运算

when: "${buffer_dist} * 2 > 1000"
when: "$load.feature_count % 2 == 0"

基于步骤结果的条件

# 基于上游步骤的要素数量
when: "$load.feature_count > 0"

# 基于上游步骤的 CRS
when: "$load.crs != 'EPSG:4326'"

# 基于质检步骤的结果
when: "$qc-check.passed == false"    # 质检失败时才执行修复步骤

6.2.4 多步骤条件链

可以构建基于运行时状态的条件处理分支:

steps:
  - id: load
    use: io.read_vector
    params:
      path: "data/buildings.shp"

  # 质检步骤:总是执行
  - id: qc-geom
    use: qc.geometry_validity
    params:
      input: $load

  # 修复步骤:只有质检失败时才执行
  - id: fix-geom
    use: vector.buffer     # 用 0 距离缓冲修复几何(常用技巧)
    params:
      input: $load
      distance: 0
    when: "$qc-geom.issue_count > 0"

  # 保存步骤:总是执行,但数据来源根据修复步骤是否执行而变化
  - id: save
    use: io.write_vector
    params:
      input: $fix-geom     # 如果 fix-geom 被跳过,会透明取 $load
      path: "output/buildings_clean.shp"

6.2.5 whenvariables 结合实现功能开关

variables:
  enable_reproject: true
  enable_simplify: false
  enable_qc: true
  output_format: "GeoJSON"

steps:
  - id: load
    use: io.read_vector
    params:
      path: "data/input.shp"

  - id: reproject
    use: vector.reproject
    params:
      input: $load
      target_crs: "EPSG:3857"
    when: "${enable_reproject} == true"

  - id: simplify
    use: vector.simplify
    params:
      input: $reproject
      tolerance: 5
    when: "${enable_simplify} == true"

  - id: qc-check
    use: qc.geometry_validity
    params:
      input: $simplify
    when: "${enable_qc} == true"

  - id: save-geojson
    use: io.write_vector
    params:
      input: $simplify
      path: "output/result.geojson"
    when: "${output_format} == 'GeoJSON'"

  - id: save-gpkg
    use: io.write_vector
    params:
      input: $simplify
      path: "output/result.gpkg"
      driver: GPKG
    when: "${output_format} == 'GPKG'"

通过 CLI 灵活控制:

# 关闭重投影,开启简化,输出 GPKG
geopipe-agent run pipeline.yaml \
  --var enable_reproject=false \
  --var enable_simplify=true \
  --var output_format=GPKG

6.3 on_error 错误策略

6.3.1 三种错误策略

策略 触发行为 报告状态 后续步骤
fail(默认) 立即终止整个流水线 failed 不执行
skip 跳过当前步骤,继续后续步骤 skipped 继续执行
retry 最多重试 3 次,仍失败则终止 failed(重试失败)或 success 依重试结果而定

6.3.2 on_error: fail(默认)

适用于步骤是后续步骤必不可少的前提条件:

steps:
  - id: load-critical-data
    use: io.read_vector
    params:
      path: "data/critical_input.shp"
    on_error: fail    # 加载失败则整个流水线失败(默认行为,可省略)

6.3.3 on_error: skip

适用于可选的、非关键的步骤,失败时不影响主流程:

steps:
  - id: load
    use: io.read_vector
    params:
      path: "data/roads.shp"

  - id: optional-qc
    use: qc.topology
    params:
      input: $load
    on_error: skip    # 拓扑检查失败时跳过,不影响后续处理

  - id: buffer
    use: vector.buffer
    params:
      input: $load    # 注意:引用 $load 而不是 $optional-qc,避免跳过导致问题
      distance: 500

使用 skip 的注意事项

  • 跳过步骤后,后续引用该步骤的步骤会得到其上游输入(透明传递)
  • 如果需要基于步骤是否成功做决策,结合 when 和步骤属性引用

6.3.4 on_error: retry

适用于不稳定的操作(网络请求、外部 API 调用、文件系统竞态等):

steps:
  - id: geocode-address
    use: network.geocode
    params:
      address: "北京市朝阳区望京街道"
    on_error: retry    # 网络超时时自动重试,最多 3 次

  - id: download-tile
    use: io.read_vector    # 假设从网络 URL 读取
    params:
      path: "https://example.com/tiles/data.geojson"
    on_error: retry

  - id: call-api
    use: io.write_vector    # 推送到远程 API
    params:
      input: $buffer
      path: "api://service.example.com/upload"
    on_error: retry

重试机制细节

  • 默认最多重试 3 次(第 1 次失败后,再尝试 3 次,共 4 次执行)
  • 每次重试前有短暂延迟(指数退避)
  • 所有重试均失败时,行为等同于 on_error: fail

6.4 组合使用示例

6.4.1 生产级容错流水线

name: 生产级容错 GIS 流水线
description: 包含完整错误处理和条件执行的生产就绪流水线

variables:
  input_path: "data/input.shp"
  output_path: "output/result.geojson"
  enable_qc: true
  skip_topology_check: false

steps:
  # 关键步骤:失败则终止
  - id: load
    use: io.read_vector
    params:
      path: ${input_path}
    on_error: fail

  # 验证 CRS(关键,失败终止)
  - id: check-crs
    use: qc.crs_check
    params:
      input: $load
      expected_crs: "EPSG:4326"
    on_error: fail
    when: "${enable_qc} == true"

  # 投影转换(关键步骤)
  - id: reproject
    use: vector.reproject
    params:
      input: $load
      target_crs: "EPSG:3857"
    on_error: fail

  # 几何有效性修复(非关键,跳过不影响主流程)
  - id: fix-geometry
    use: vector.buffer
    params:
      input: $reproject
      distance: 0
    on_error: skip

  # 拓扑检查(可选,支持通过变量关闭)
  - id: topology-check
    use: qc.topology
    params:
      input: $fix-geometry
    on_error: skip
    when: "${skip_topology_check} == false and ${enable_qc} == true"

  # 缓冲区分析(核心分析步骤)
  - id: buffer
    use: vector.buffer
    params:
      input: $fix-geometry
      distance: 500
    on_error: fail

  # 保存结果(带重试,应对文件系统偶发错误)
  - id: save
    use: io.write_vector
    params:
      input: $buffer
      path: ${output_path}
      driver: GeoJSON
    on_error: retry

outputs:
  result: $save

6.4.2 多数据源并行加载容错

steps:
  # 尝试加载主数据源
  - id: load-primary
    use: io.read_vector
    params:
      path: "data/primary_source.gpkg"
    on_error: skip    # 主数据源不可用时跳过

  # 尝试加载备用数据源(当主数据源跳过时)
  - id: load-backup
    use: io.read_vector
    params:
      path: "data/backup_source.shp"
    on_error: fail    # 备用源失败则终止
    when: "$load-primary.feature_count == 0"    # 主数据源无数据时使用备用

6.5 错误报告

当步骤失败时(无论 on_error 策略如何),JSON 报告中都会记录完整的错误信息:

{
  "steps": [
    {
      "id": "geocode",
      "use": "network.geocode",
      "status": "failed",
      "duration_seconds": 5.2,
      "error": {
        "type": "NetworkError",
        "message": "连接超时:Nominatim API 在 5 秒内未响应",
        "retry_count": 3,
        "last_attempt_at": "2024-01-15T10:30:05.000Z"
      }
    }
  ],
  "pipeline": {
    "status": "failed",
    "failed_step": "geocode"
  }
}

跳过步骤的报告:

{
  "id": "optional-qc",
  "use": "qc.topology",
  "status": "skipped",
  "skip_reason": "on_error: skip(步骤执行失败但被配置为跳过)",
  "error": {
    "type": "TopologyError",
    "message": "拓扑分析内存溢出"
  }
}

6.6 最佳实践

何时使用 fail

  • 步骤输出是后续所有步骤的必要输入
  • 步骤失败意味着整个分析任务无法完成
  • 加载核心输入数据、关键格式转换等场景

何时使用 skip

  • 步骤是可选的增强处理(质检、简化等)
  • 步骤失败不影响主要业务目标
  • 用于生成报告的统计步骤(失败时跳过,但主流程继续)

何时使用 retry

  • 网络 I/O 操作(地理编码、HTTP API 调用)
  • 依赖外部服务的操作
  • 可能因资源竞态(文件锁定等)短暂失败的操作

何时使用 when

  • 基于参数开关控制可选步骤
  • 基于数据特征(要素数量、CRS 类型)决定是否需要某步骤
  • 实现条件性数据修复或增强
  • 在同一流水线中支持多种输出格式

6.7 小结

本章介绍了 GeoPipeAgent 的三大流水线控制机制:

  • when 条件执行:基于变量或运行时步骤结果的表达式,控制步骤是否执行,跳过步骤透明传递上游输入
  • on_error: fail:默认策略,步骤失败时立即终止流水线
  • on_error: skip:步骤失败时跳过,继续执行后续步骤
  • on_error: retry:失败时自动重试最多 3 次,适用于不稳定操作

三者组合使用,可以构建出既健壮又灵活的生产级 GIS 分析流水线。

下一章将进入步骤篇,首先介绍 IO 步骤——数据读写的核心能力。