znlgis 博客

GIS开发与技术分享

第20章:实战案例与最佳实践

本章通过多个实际 GIS 应用案例展示 Shapely 的综合运用,并总结开发中的最佳实践、性能调优策略和常见问题解决方案,帮助读者将所学知识应用到实际项目中。


20.1 案例一:城市服务设施缓冲区分析

20.1.1 需求描述

分析城市中医院、学校等公共设施的服务覆盖范围,识别服务盲区。

20.1.2 实现代码

from shapely import Point, Polygon, MultiPolygon
from shapely.ops import unary_union
import shapely

# 城市范围
city = Polygon([
    (0, 0), (20, 0), (20, 15), (0, 15)
])

# 医院位置(500m 服务半径,约 0.5 单位)
hospitals = [
    Point(3, 3), Point(8, 7), Point(15, 4),
    Point(5, 12), Point(17, 11),
]

# 学校位置(300m 服务半径)
schools = [
    Point(2, 5), Point(6, 2), Point(10, 10),
    Point(14, 8), Point(18, 3), Point(12, 13),
]

# 生成服务覆盖区域
hospital_coverage = unary_union([h.buffer(2.5) for h in hospitals])
school_coverage = unary_union([s.buffer(1.5) for s in schools])

# 裁剪到城市范围
hospital_in_city = hospital_coverage.intersection(city)
school_in_city = school_coverage.intersection(city)

# 计算覆盖率
city_area = city.area
print(f"城市面积: {city_area:.2f}")
print(f"医院覆盖面积: {hospital_in_city.area:.2f} ({hospital_in_city.area/city_area*100:.1f}%)")
print(f"学校覆盖面积: {school_in_city.area:.2f} ({school_in_city.area/city_area*100:.1f}%)")

# 双重覆盖区域(既有医院又有学校)
dual_coverage = hospital_in_city.intersection(school_in_city)
print(f"双重覆盖面积: {dual_coverage.area:.2f} ({dual_coverage.area/city_area*100:.1f}%)")

# 服务盲区(无任何设施覆盖)
any_coverage = hospital_in_city.union(school_in_city)
blind_spots = city.difference(any_coverage)
print(f"服务盲区面积: {blind_spots.area:.2f} ({blind_spots.area/city_area*100:.1f}%)")

# 分析盲区位置
if blind_spots.geom_type == 'MultiPolygon':
    for i, spot in enumerate(blind_spots.geoms):
        centroid = spot.centroid
        print(f"  盲区 {i+1}: 中心({centroid.x:.1f}, {centroid.y:.1f}), 面积={spot.area:.2f}")
elif not blind_spots.is_empty:
    centroid = blind_spots.centroid
    print(f"  盲区中心: ({centroid.x:.1f}, {centroid.y:.1f})")

20.2 案例二:行政区划空间查询系统

20.2.1 需求描述

给定一批经纬度坐标,快速判断每个点所属的行政区划。

20.2.2 实现代码

from shapely import Point, Polygon, STRtree, prepare
import numpy as np
import time

# 模拟行政区划数据
districts = {
    '东城区': Polygon([(0, 0), (5, 0), (5, 5), (0, 5)]),
    '西城区': Polygon([(5, 0), (10, 0), (10, 5), (5, 5)]),
    '朝阳区': Polygon([(10, 0), (20, 0), (20, 10), (10, 10)]),
    '海淀区': Polygon([(0, 5), (10, 5), (10, 15), (0, 15)]),
    '丰台区': Polygon([(0, -5), (10, -5), (10, 0), (0, 0)]),
}

# 准备几何体(加速重复查询)
for name, poly in districts.items():
    prepare(poly)

# 构建空间索引
district_list = list(districts.values())
district_names = list(districts.keys())
tree = STRtree(district_list)

# 生成随机查询点
np.random.seed(42)
n_points = 10000
query_points = [Point(np.random.uniform(-2, 22), np.random.uniform(-6, 16)) for _ in range(n_points)]

# 方法1:暴力查询
start = time.time()
results_brute = []
for pt in query_points[:1000]:
    found = None
    for name, poly in districts.items():
        if poly.contains(pt):
            found = name
            break
    results_brute.append(found)
brute_time = time.time() - start
print(f"暴力查询 (1000点): {brute_time:.4f}s")

# 方法2:使用 STRtree 索引
start = time.time()
results_tree = []
for pt in query_points:
    indices = tree.query(pt, predicate='contains')
    if len(indices) > 0:
        results_tree.append(district_names[indices[0]])
    else:
        results_tree.append(None)
tree_time = time.time() - start
print(f"STRtree 查询 ({n_points}点): {tree_time:.4f}s")

# 统计各区数量
from collections import Counter
counter = Counter(results_tree)
for name, count in counter.most_common():
    pct = count / n_points * 100
    print(f"  {name or '未匹配'}: {count} ({pct:.1f}%)")

20.3 案例三:道路网络分析

20.3.1 需求描述

处理道路网络数据,提取交叉口、合并连续路段、计算路径长度。

20.3.2 实现代码

from shapely import LineString, MultiLineString, Point
from shapely.ops import linemerge, polygonize, unary_union, split
from shapely import intersection

# 模拟道路网络
roads = [
    LineString([(0, 5), (10, 5)]),   # 横路1
    LineString([(0, 10), (10, 10)]), # 横路2
    LineString([(3, 0), (3, 15)]),   # 纵路1
    LineString([(7, 0), (7, 15)]),   # 纵路2
]

# 合并所有道路
all_roads = unary_union(roads)
print(f"道路网络类型: {all_roads.geom_type}")

# 提取交叉口
# 道路网络的交叉口位于线段交点处
intersections = []
for i in range(len(roads)):
    for j in range(i+1, len(roads)):
        inter = roads[i].intersection(roads[j])
        if not inter.is_empty and inter.geom_type == 'Point':
            intersections.append(inter)

print(f"\n交叉口数量: {len(intersections)}")
for idx, pt in enumerate(intersections):
    print(f"  交叉口 {idx+1}: ({pt.x:.0f}, {pt.y:.0f})")

# 从道路网络生成街区
blocks = list(polygonize(roads))
print(f"\n街区数量: {len(blocks)}")
for i, block in enumerate(blocks):
    print(f"  街区 {i+1}: 面积={block.area:.0f}, 周长={block.length:.1f}")

# 计算道路总长度
total_length = sum(r.length for r in roads)
print(f"\n道路总长度: {total_length:.1f}")

# 合并连续路段
merged = linemerge(MultiLineString(roads))
if merged.geom_type == 'MultiLineString':
    print(f"合并后路段数: {len(merged.geoms)}")
else:
    print(f"合并为单条线路")

20.4 案例四:地块面积批量计算

20.4.1 实现代码

from shapely.geometry import shape
from shapely import area, is_valid, make_valid
import json
import numpy as np

# 模拟从 GeoJSON 读取地块数据
geojson_data = {
    "type": "FeatureCollection",
    "features": [
        {
            "type": "Feature",
            "properties": {"id": "A001", "owner": "张三"},
            "geometry": {
                "type": "Polygon",
                "coordinates": [[(0, 0), (100, 0), (100, 80), (0, 80), (0, 0)]]
            }
        },
        {
            "type": "Feature",
            "properties": {"id": "A002", "owner": "李四"},
            "geometry": {
                "type": "Polygon",
                "coordinates": [[(100, 0), (250, 0), (250, 80), (100, 80), (100, 0)]]
            }
        },
        {
            "type": "Feature",
            "properties": {"id": "A003", "owner": "王五"},
            "geometry": {
                "type": "Polygon",
                "coordinates": [[(0, 80), (120, 80), (120, 180), (0, 180), (0, 80)]]
            }
        },
    ]
}

# 解析并计算
print("地块面积统计:")
print("-" * 50)
total_area = 0
for feature in geojson_data["features"]:
    geom = shape(feature["geometry"])
    props = feature["properties"]

    # 验证几何有效性
    if not is_valid(geom):
        print(f"  ⚠ {props['id']} 无效,正在修复...")
        geom = make_valid(geom)

    parcel_area = geom.area
    total_area += parcel_area
    centroid = geom.centroid

    print(f"  {props['id']} ({props['owner']}): "
          f"面积={parcel_area:.0f}m², "
          f"周长={geom.length:.1f}m, "
          f"中心=({centroid.x:.0f}, {centroid.y:.0f})")

print("-" * 50)
print(f"总面积: {total_area:.0f}m²")
print(f"平均面积: {total_area/len(geojson_data['features']):.0f}m²")

20.5 案例五:空间数据质量检查

20.5.1 实现代码

from shapely import Polygon, MultiPolygon, is_valid, make_valid
from shapely import is_valid_reason

# 模拟数据集(包含问题几何)
parcels = [
    ("P001", Polygon([(0, 0), (10, 0), (10, 10), (0, 10)])),  # 正常
    ("P002", Polygon([(0, 0), (10, 10), (10, 0), (0, 10)])),  # 蝴蝶结(自相交)
    ("P003", Polygon([(0, 0), (5, 0), (5, 5), (0, 5)])),      # 正常
    ("P004", Polygon([(0, 0), (0, 0), (1, 0), (1, 1), (0, 1)])),  # 重复顶点
]

# 质量检查
print("=" * 60)
print("空间数据质量检查报告")
print("=" * 60)

valid_count = 0
fixed_count = 0
results = []

for pid, geom in parcels:
    valid = is_valid(geom)
    if valid:
        valid_count += 1
        results.append((pid, geom, "✅ 有效", None))
        print(f"  {pid}: ✅ 有效 (面积={geom.area:.2f})")
    else:
        reason = is_valid_reason(geom)
        # 尝试修复
        fixed = make_valid(geom)
        if is_valid(fixed):
            fixed_count += 1
            results.append((pid, fixed, "🔧 已修复", reason))
            print(f"  {pid}: 🔧 已修复")
            print(f"        原因: {reason}")
            print(f"        修复前面积: {geom.area:.2f}")
            print(f"        修复后面积: {fixed.area:.2f}")
        else:
            results.append((pid, geom, "❌ 无法修复", reason))
            print(f"  {pid}: ❌ 无法修复 - {reason}")

print(f"\n总计: {len(parcels)} 个, "
      f"有效: {valid_count}, "
      f"已修复: {fixed_count}, "
      f"失败: {len(parcels) - valid_count - fixed_count}")

# 拓扑一致性检查(重叠检测)
print("\n重叠检测:")
valid_geoms = [(pid, geom) for pid, geom, status, _ in results if status != "❌ 无法修复"]
overlap_found = False
for i in range(len(valid_geoms)):
    for j in range(i + 1, len(valid_geoms)):
        pid1, g1 = valid_geoms[i]
        pid2, g2 = valid_geoms[j]
        if g1.intersects(g2) and not g1.touches(g2):
            overlap = g1.intersection(g2)
            if overlap.area > 0.001:
                print(f"  ⚠ {pid1}{pid2} 重叠, 面积={overlap.area:.4f}")
                overlap_found = True

if not overlap_found:
    print("  ✅ 未发现重叠")

20.6 案例六:基于 Voronoi 的空间插值

20.6.1 实现代码

from shapely import MultiPoint, Point, Polygon, voronoi_polygons

# 采样点及其属性值
samples = [
    (Point(2, 2), 25.3),    # 温度测量
    (Point(8, 3), 27.1),
    (Point(5, 7), 23.8),
    (Point(1, 9), 22.1),
    (Point(9, 8), 26.5),
    (Point(5, 1), 28.0),
    (Point(3, 5), 24.2),
]

points = MultiPoint([s[0] for s in samples])
values = [s[1] for s in samples]

# 研究区域
study_area = Polygon([(0, 0), (10, 0), (10, 10), (0, 10)])

# 生成泰森多边形
voronoi = voronoi_polygons(points, extend_to=study_area)

# 面积加权插值
print("泰森多边形法空间插值:")
print("-" * 50)
total_weighted = 0
total_area = 0

for i, cell in enumerate(voronoi.geoms):
    clipped = cell.intersection(study_area)
    if clipped.is_empty:
        continue
    cell_area = clipped.area
    total_weighted += cell_area * values[i]
    total_area += cell_area
    print(f"  站点 {i+1}: 值={values[i]:.1f}, "
          f"控制面积={cell_area:.2f} ({cell_area/study_area.area*100:.1f}%)")

mean_value = total_weighted / total_area
print(f"\n面积加权平均值: {mean_value:.2f}")
print(f"简单平均值: {sum(values)/len(values):.2f}")

# 对指定点进行最近邻插值
query = Point(6, 4)
min_dist = float('inf')
nearest_value = None
for pt, val in samples:
    d = query.distance(pt)
    if d < min_dist:
        min_dist = d
        nearest_value = val

print(f"\n查询点 ({query.x}, {query.y}) 的插值结果: {nearest_value:.1f}")
print(f"最近站点距离: {min_dist:.2f}")

20.7 最佳实践总结

20.7.1 代码组织模式

# 推荐的项目结构
# project/
# ├── geometry_utils.py    # 几何工具函数
# ├── spatial_analysis.py  # 空间分析逻辑
# ├── data_io.py           # 数据读写
# └── main.py              # 主程序

# geometry_utils.py 示例
from shapely import Polygon, is_valid, make_valid
from shapely.ops import unary_union

def ensure_valid(geom):
    """确保几何体有效"""
    if is_valid(geom):
        return geom
    return make_valid(geom)

def safe_union(geometries):
    """安全的批量并集"""
    valid_geoms = [ensure_valid(g) for g in geometries if not g.is_empty]
    if not valid_geoms:
        return Polygon()  # 返回空多边形
    return unary_union(valid_geoms)

def calculate_coverage(facilities, radius, boundary):
    """计算设施覆盖率"""
    buffers = [f.buffer(radius) for f in facilities]
    coverage = safe_union(buffers).intersection(boundary)
    return coverage.area / boundary.area

20.7.2 错误处理模式

from shapely import Point, Polygon, is_valid, make_valid
from shapely.errors import GEOSException
import logging

logger = logging.getLogger(__name__)

def safe_intersection(geom1, geom2):
    """安全的交集运算,带错误处理"""
    try:
        # 确保输入有效
        if not is_valid(geom1):
            geom1 = make_valid(geom1)
        if not is_valid(geom2):
            geom2 = make_valid(geom2)

        result = geom1.intersection(geom2)

        if result.is_empty:
            logger.debug("交集结果为空")

        return result

    except GEOSException as e:
        logger.warning(f"GEOS 错误: {e}")
        # 尝试 buffer(0) 修复
        try:
            geom1 = geom1.buffer(0)
            geom2 = geom2.buffer(0)
            return geom1.intersection(geom2)
        except Exception:
            logger.error("无法修复几何体")
            return Polygon()  # 返回空几何

    except Exception as e:
        logger.error(f"未预期的错误: {e}")
        raise

20.7.3 性能优化检查清单

检查项 优化方法
循环中重复操作 使用向量化函数 shapely.xxx()
多次查询同一几何 使用 prepare()
大量空间查询 使用 STRtree 空间索引
几何体创建 使用 shapely.points() 等批量创建
精度过高 使用 grid_size 参数降低精度
串行处理 利用 GIL 释放进行多线程
大文件一次加载 使用流式处理/分块读取
复杂并集 使用 unary_union 而非逐个 union
不必要的验证 仅在数据入口处验证

20.7.4 内存管理建议

import shapely
from shapely import Point
import gc

# 1. 及时释放不需要的几何体
large_geom = Point(0, 0).buffer(1000)
# 使用完毕后
del large_geom

# 2. 使用生成器处理大数据集
def process_features(features):
    """使用生成器逐个处理"""
    for feature in features:
        geom = shapely.from_wkt(feature['wkt'])
        yield process_single(geom)

def process_single(geom):
    return geom.buffer(10).area

# 3. 分批处理
def batch_process(geometries, batch_size=1000):
    results = []
    for i in range(0, len(geometries), batch_size):
        batch = geometries[i:i+batch_size]
        batch_results = [g.buffer(1).area for g in batch]
        results.extend(batch_results)
        gc.collect()  # 强制垃圾回收
    return results

20.8 学习资源推荐

20.8.1 官方资源

资源 地址
Shapely 官方文档 https://shapely.readthedocs.io/
Shapely GitHub https://github.com/shapely/shapely
GEOS 项目 https://libgeos.org/
Shapely 变更日志 https://github.com/shapely/shapely/blob/main/CHANGES.txt

20.8.2 相关库文档

用途 文档
GeoPandas 地理空间数据分析 https://geopandas.org/
Fiona 矢量数据读写 https://fiona.readthedocs.io/
pyproj 坐标转换 https://pyproj4.github.io/pyproj/
rasterio 栅格数据处理 https://rasterio.readthedocs.io/
matplotlib 数据可视化 https://matplotlib.org/

20.8.3 推荐学习路径

初级阶段
├── 1. 学习基本几何类型(Point, LineString, Polygon)
├── 2. 掌握属性访问和基本操作
└── 3. 了解 WKT/WKB/GeoJSON 序列化

中级阶段
├── 4. 掌握空间关系判断
├── 5. 熟练使用集合运算和缓冲区
├── 6. 学习 STRtree 空间索引
└── 7. 与 GeoPandas/matplotlib 集成

高级阶段
├── 8. 性能优化(Prepared, 向量化)
├── 9. 复杂空间分析(Voronoi, 网络分析)
├── 10. 大规模数据处理
└── 11. 自定义工作流开发

20.9 本章小结

本章通过六个实际案例展示了 Shapely 在不同 GIS 场景中的综合应用:

  1. 缓冲区分析:设施服务范围评估和盲区识别
  2. 空间查询:利用 STRtree 实现高效的点在面内查询
  3. 网络分析:道路网络拓扑处理和街区生成
  4. 批量计算:地块面积统计和 GeoJSON 数据处理
  5. 质量检查:几何有效性验证和自动修复
  6. 空间插值:基于 Voronoi 图的面积加权插值

最佳实践要点:

  • 始终验证输入几何体的有效性
  • 使用空间索引加速批量查询
  • 优先使用向量化操作
  • 合理的错误处理和日志记录
  • 注意内存管理和性能优化