第十四章:多后端系统详解
14.1 概述
GeoPipeAgent 的多后端系统(src/geopipe_agent/backends/)是框架的重要特性之一,它通过抽象层屏蔽了底层 GIS 工具的差异,使得同一 YAML 流水线可以在不同的 GIS 工具链上运行。
GeoPipeAgent 内置 7 种后端:
| 后端名称 | 文件 | 实现方式 | 默认 |
|---|---|---|---|
native_python |
native_python_backend.py |
GeoPandas + Shapely + rasterio | ✅ |
gdal_cli |
gdal_cli.py |
调用 ogr2ogr 等 GDAL CLI 工具 | ❌ |
gdal_python |
gdal_python_backend.py |
GDAL/OGR Python 绑定 | ❌ |
qgis_process |
qgis_process.py |
QGIS Processing Framework CLI | ❌ |
pyqgis |
pyqgis_backend.py |
PyQGIS Python API | ❌ |
generic_cli |
generic_cli_backend.py |
任意 CLI 命令 | ❌ |
curl_api |
curl_api_backend.py |
HTTP 请求(curl) | ❌ |
14.2 抽象基类(base.py)
所有后端都继承自 GeopipeBackend 抽象基类:
from abc import ABC, abstractmethod
class GeopipeBackend(ABC):
@abstractmethod
def name(self) -> str:
"""返回后端名称标识符。"""
@abstractmethod
def is_available(self) -> bool:
"""检查后端是否可用(依赖是否满足)。"""
def execute(self, command: str, **kwargs) -> Any:
"""执行后端命令(可选实现)。"""
raise NotImplementedError
def run_algorithm(self, algorithm: str, params: dict) -> Any:
"""运行指定算法(QGIS 后端使用)。"""
raise NotImplementedError
is_available() 的作用:每个后端在初始化时检查自身依赖是否满足。不可用的后端不会被用于执行,通过 geopipe-agent backends 命令可以查看各后端的可用状态。
14.3 native_python 后端(默认)
特点
- 实现方式:Python 生态(GeoPandas + Shapely + rasterio + pyproj)
- 无需额外安装:随 GeoPipeAgent 核心依赖自动安装
- 内存处理:所有操作在内存中完成,无磁盘 IO 中间文件
- 适用场景:绝大多数常规 GIS 操作
配置
# native_python 是默认后端,步骤不需要指定
# 或显式指定
steps:
- id: buffer
use: vector.buffer
params:
input: $load
distance: 500
backend: native_python # 显式指定(可省略)
内部实现
class NativePythonBackend(GeopipeBackend):
def name(self) -> str:
return "native_python"
def is_available(self) -> bool:
try:
import geopandas
import shapely
import rasterio
return True
except ImportError:
return False
大多数步骤的实现直接调用 GeoPandas/Shapely API:
# 步骤实现(native_python 后端)
@step(id="vector.buffer", backends=["native_python"])
def buffer_step(ctx: StepContext) -> StepResult:
gdf = ctx.input()
distance = ctx.param("distance")
buffered = gdf.buffer(distance)
return StepResult(output=gdf.assign(geometry=buffered))
14.4 gdal_cli 后端
特点
- 实现方式:调用系统安装的 GDAL CLI 工具(
ogr2ogr、gdal_translate、gdalwarp等) - 优势:GDAL CLI 成熟稳定,格式支持最广,对特定格式(如 FileGDB)支持更好
- 劣势:需要磁盘读写中间文件,速度略低
系统要求
# 验证 GDAL CLI 是否可用
ogr2ogr --version
gdal_translate --version
使用方式
steps:
- id: convert
use: io.write_vector
params:
input: $load
path: "output/result.gpkg"
backend: gdal_cli # 使用 GDAL CLI 后端执行格式转换
内部实现
gdal_cli 后端通过 subprocess 调用系统命令:
class GdalCliBackend(GeopipeBackend):
def name(self) -> str:
return "gdal_cli"
def is_available(self) -> bool:
import shutil
return shutil.which("ogr2ogr") is not None
def convert_vector(self, src: str, dst: str, driver: str = "GeoJSON") -> None:
cmd = ["ogr2ogr", "-f", driver, dst, src]
subprocess.run(cmd, check=True, capture_output=True)
14.5 gdal_python 后端
特点
- 实现方式:GDAL/OGR Python 绑定(
osgeo.gdal、osgeo.ogr、osgeo.osr) - 优势:比 CLI 更高效(无进程启动开销),保留了 GDAL 的完整功能
- 安装要求:需要 GDAL Python 绑定(
pip install gdal或通过 conda 安装)
系统要求
# 验证 GDAL Python 是否可用
python -c "from osgeo import gdal; print(gdal.__version__)"
使用场景
- 需要精细控制 GDAL 驱动选项
- 处理 GDAL 原生支持的特殊格式(ECW、MrSID 等)
- 需要访问 GDAL 数据集的元数据
steps:
- id: read-ecw
use: io.read_raster
params:
path: "data/imagery.ecw" # ECW 格式需要 GDAL Python 后端
backend: gdal_python
内部实现
class GdalPythonBackend(GeopipeBackend):
def name(self) -> str:
return "gdal_python"
def is_available(self) -> bool:
try:
from osgeo import gdal, ogr
return True
except ImportError:
return False
def read_vector(self, path: str, layer: str = None) -> Any:
from osgeo import ogr
ds = ogr.Open(path)
lyr = ds.GetLayer(layer or 0)
# 将 OGR Layer 转为 GeoDataFrame ...
14.6 qgis_process 后端
特点
- 实现方式:调用 QGIS Processing Framework CLI(
qgis_process命令) - 优势:可调用 QGIS 丰富的算法库(300+ 算法)
- 安装要求:系统已安装 QGIS,且
qgis_process在 PATH 中
系统要求
qgis_process --version
qgis_process list # 列出所有可用算法
使用方式
steps:
- id: qgis-buffer
use: vector.buffer
params:
input: $load
distance: 500
backend: qgis_process # 使用 QGIS Processing 的缓冲区算法
内部实现
qgis_process 后端通过 JSON 格式调用 QGIS 算法:
class QgisProcessBackend(GeopipeBackend):
def name(self) -> str:
return "qgis_process"
def is_available(self) -> bool:
import shutil
return shutil.which("qgis_process") is not None
def run_algorithm(self, algorithm: str, params: dict) -> dict:
cmd = ["qgis_process", "run", algorithm, "--json"]
for key, value in params.items():
cmd.extend([f"--{key}={value}"])
result = subprocess.run(cmd, check=True, capture_output=True, text=True)
return json.loads(result.stdout)
14.7 pyqgis 后端
特点
- 实现方式:通过 PyQGIS Python API 直接调用 QGIS 功能
- 优势:比 qgis_process CLI 更高效(无进程启动开销),可访问完整 QGIS API
- 安装要求:需要将 QGIS Python 路径添加到
sys.path
配置 PyQGIS 路径
# 在使用前设置环境变量(macOS 示例)
import sys
sys.path.insert(0, '/Applications/QGIS.app/Contents/Resources/python')
sys.path.insert(0, '/Applications/QGIS.app/Contents/Resources/python/plugins')
使用场景
- 需要调用 QGIS 专有算法(如 SAGA、GRASS 算法)
- 需要 QGIS 的高精度几何处理
- 在已有 QGIS 安装的服务器环境中运行
steps:
- id: advanced-qgis
use: vector.buffer
params:
input: $load
distance: 500
backend: pyqgis
14.8 generic_cli 后端
特点
- 实现方式:执行任意 CLI 命令,通过参数占位符传递文件路径
- 适用场景:调用 GeoPipeline 中不支持的自定义命令行工具
配置方式
steps:
- id: custom-tool
use: generic_cli.run # 使用 generic_cli 后端的 run 动作
params:
command: "mapshaper {input} -simplify 10% -o {output}"
input: $load
output: "output/simplified.geojson"
backend: generic_cli
generic_cli 后端会:
- 将
inputGeoDataFrame 保存为临时文件 - 将
{input}和{output}占位符替换为实际路径 - 执行命令
- 读取
{output}路径的文件并作为步骤输出
典型应用
# 调用 mapshaper 进行拓扑保留简化
- id: mapshaper-simplify
use: generic_cli.run
params:
command: "mapshaper {input} -simplify keep-shapes 10% -o {output}"
backend: generic_cli
# 调用自定义 Python 脚本
- id: custom-script
use: generic_cli.run
params:
command: "python scripts/my_process.py --input {input} --output {output}"
backend: generic_cli
14.9 curl_api 后端
特点
- 实现方式:通过
curl命令发起 HTTP 请求调用远程 GIS API - 适用场景:调用云端 GIS 服务(如 ArcGIS Online、GeoServer REST API 等)
使用方式
steps:
- id: call-geoserver
use: io.read_vector
params:
url: "https://geoserver.example.com/wfs?service=WFS&version=1.0.0&request=GetFeature&typeName=myns:roads"
output_format: "geojson"
backend: curl_api
- id: upload-result
use: io.write_vector
params:
input: $buffer
url: "https://api.example.com/features"
method: POST
headers:
Authorization: "Bearer ${api_token}"
Content-Type: "application/json"
backend: curl_api
内部实现
class CurlApiBackend(GeopipeBackend):
def name(self) -> str:
return "curl_api"
def is_available(self) -> bool:
import shutil
return shutil.which("curl") is not None
def request(self, url: str, method: str = "GET", headers: dict = None,
data: bytes = None) -> bytes:
cmd = ["curl", "-s", "-X", method, url]
if headers:
for k, v in headers.items():
cmd.extend(["-H", f"{k}: {v}"])
if data:
cmd.extend(["--data-binary", "@-"])
result = subprocess.run(cmd, input=data, capture_output=True, check=True)
return result.stdout
14.10 后端管理器(BackendManager)
职责
BackendManager 统一管理所有后端的实例化和选择:
class BackendManager:
@classmethod
def default(cls) -> "BackendManager":
"""创建包含所有内置后端的管理器实例。"""
def get(self, name: str | None) -> GeopipeBackend:
"""按名称获取后端,None 时返回默认后端(native_python)。"""
def list_available(self) -> list[str]:
"""列出所有可用的后端名称。"""
后端选择优先级
- 步骤级
backend字段(最高优先级): ```yaml steps:- id: convert use: io.write_vector backend: gdal_cli # 此步骤使用 gdal_cli 后端 ```
- 默认后端(最低优先级):
native_python
14.11 后端使用建议
何时使用 native_python(默认)
适合 99% 的场景:
- 矢量/栅格的常规处理(缓冲区、裁剪、重投影等)
- 空间分析和网络分析
- 数据质检
何时使用 gdal_cli
- 处理 GDAL 原生支持但 GeoPandas 不支持的格式(如 FileGDB、SHP with special encoding)
- 需要 GDAL 的高性能格式转换
- 在 GDAL CLI 工具已预装的环境中(如 Linux 服务器)
何时使用 qgis_process 或 pyqgis
- 需要 QGIS 特有的高级算法(SAGA 地形分析、GRASS 栅格处理等)
- 工作环境已有 QGIS 安装
何时使用 generic_cli
- 调用 GeoPipeAgent 不直接支持的专业工具(Mapshaper、PostGIS CLI 等)
- 已有现成的命令行处理脚本需要集成到流水线
何时使用 curl_api
- 调用远程 GIS Web 服务(WFS、OGC API Features 等)
- 上传处理结果到云端 GIS 服务
14.12 小结
本章介绍了 GeoPipeAgent 的多后端系统:
- 7 种后端:native_python(默认)、gdal_cli、gdal_python、qgis_process、pyqgis、generic_cli、curl_api
- 统一接口:所有后端继承
GeopipeBackend基类,is_available()检测可用性 - 灵活选择:可在步骤级别指定后端,覆盖默认值
- 后端管理:
BackendManager统一管理后端实例
下一章将介绍 GeoPipeAgent 的数据模型与错误体系。