第20章:实战案例与最佳实践
本章通过多个实际 GIS 应用案例展示 Shapely 的综合运用,并总结开发中的最佳实践、性能调优策略和常见问题解决方案,帮助读者将所学知识应用到实际项目中。
20.1 案例一:城市服务设施缓冲区分析
20.1.1 需求描述
分析城市中医院、学校等公共设施的服务覆盖范围,识别服务盲区。
20.1.2 实现代码
from shapely import Point, Polygon, MultiPolygon
from shapely.ops import unary_union
import shapely
# 城市范围
city = Polygon([
(0, 0), (20, 0), (20, 15), (0, 15)
])
# 医院位置(500m 服务半径,约 0.5 单位)
hospitals = [
Point(3, 3), Point(8, 7), Point(15, 4),
Point(5, 12), Point(17, 11),
]
# 学校位置(300m 服务半径)
schools = [
Point(2, 5), Point(6, 2), Point(10, 10),
Point(14, 8), Point(18, 3), Point(12, 13),
]
# 生成服务覆盖区域
hospital_coverage = unary_union([h.buffer(2.5) for h in hospitals])
school_coverage = unary_union([s.buffer(1.5) for s in schools])
# 裁剪到城市范围
hospital_in_city = hospital_coverage.intersection(city)
school_in_city = school_coverage.intersection(city)
# 计算覆盖率
city_area = city.area
print(f"城市面积: {city_area:.2f}")
print(f"医院覆盖面积: {hospital_in_city.area:.2f} ({hospital_in_city.area/city_area*100:.1f}%)")
print(f"学校覆盖面积: {school_in_city.area:.2f} ({school_in_city.area/city_area*100:.1f}%)")
# 双重覆盖区域(既有医院又有学校)
dual_coverage = hospital_in_city.intersection(school_in_city)
print(f"双重覆盖面积: {dual_coverage.area:.2f} ({dual_coverage.area/city_area*100:.1f}%)")
# 服务盲区(无任何设施覆盖)
any_coverage = hospital_in_city.union(school_in_city)
blind_spots = city.difference(any_coverage)
print(f"服务盲区面积: {blind_spots.area:.2f} ({blind_spots.area/city_area*100:.1f}%)")
# 分析盲区位置
if blind_spots.geom_type == 'MultiPolygon':
for i, spot in enumerate(blind_spots.geoms):
centroid = spot.centroid
print(f" 盲区 {i+1}: 中心({centroid.x:.1f}, {centroid.y:.1f}), 面积={spot.area:.2f}")
elif not blind_spots.is_empty:
centroid = blind_spots.centroid
print(f" 盲区中心: ({centroid.x:.1f}, {centroid.y:.1f})")
20.2 案例二:行政区划空间查询系统
20.2.1 需求描述
给定一批经纬度坐标,快速判断每个点所属的行政区划。
20.2.2 实现代码
from shapely import Point, Polygon, STRtree, prepare
import numpy as np
import time
# 模拟行政区划数据
districts = {
'东城区': Polygon([(0, 0), (5, 0), (5, 5), (0, 5)]),
'西城区': Polygon([(5, 0), (10, 0), (10, 5), (5, 5)]),
'朝阳区': Polygon([(10, 0), (20, 0), (20, 10), (10, 10)]),
'海淀区': Polygon([(0, 5), (10, 5), (10, 15), (0, 15)]),
'丰台区': Polygon([(0, -5), (10, -5), (10, 0), (0, 0)]),
}
# 准备几何体(加速重复查询)
for name, poly in districts.items():
prepare(poly)
# 构建空间索引
district_list = list(districts.values())
district_names = list(districts.keys())
tree = STRtree(district_list)
# 生成随机查询点
np.random.seed(42)
n_points = 10000
query_points = [Point(np.random.uniform(-2, 22), np.random.uniform(-6, 16)) for _ in range(n_points)]
# 方法1:暴力查询
start = time.time()
results_brute = []
for pt in query_points[:1000]:
found = None
for name, poly in districts.items():
if poly.contains(pt):
found = name
break
results_brute.append(found)
brute_time = time.time() - start
print(f"暴力查询 (1000点): {brute_time:.4f}s")
# 方法2:使用 STRtree 索引
start = time.time()
results_tree = []
for pt in query_points:
indices = tree.query(pt, predicate='contains')
if len(indices) > 0:
results_tree.append(district_names[indices[0]])
else:
results_tree.append(None)
tree_time = time.time() - start
print(f"STRtree 查询 ({n_points}点): {tree_time:.4f}s")
# 统计各区数量
from collections import Counter
counter = Counter(results_tree)
for name, count in counter.most_common():
pct = count / n_points * 100
print(f" {name or '未匹配'}: {count} ({pct:.1f}%)")
20.3 案例三:道路网络分析
20.3.1 需求描述
处理道路网络数据,提取交叉口、合并连续路段、计算路径长度。
20.3.2 实现代码
from shapely import LineString, MultiLineString, Point
from shapely.ops import linemerge, polygonize, unary_union, split
from shapely import intersection
# 模拟道路网络
roads = [
LineString([(0, 5), (10, 5)]), # 横路1
LineString([(0, 10), (10, 10)]), # 横路2
LineString([(3, 0), (3, 15)]), # 纵路1
LineString([(7, 0), (7, 15)]), # 纵路2
]
# 合并所有道路
all_roads = unary_union(roads)
print(f"道路网络类型: {all_roads.geom_type}")
# 提取交叉口
# 道路网络的交叉口位于线段交点处
intersections = []
for i in range(len(roads)):
for j in range(i+1, len(roads)):
inter = roads[i].intersection(roads[j])
if not inter.is_empty and inter.geom_type == 'Point':
intersections.append(inter)
print(f"\n交叉口数量: {len(intersections)}")
for idx, pt in enumerate(intersections):
print(f" 交叉口 {idx+1}: ({pt.x:.0f}, {pt.y:.0f})")
# 从道路网络生成街区
blocks = list(polygonize(roads))
print(f"\n街区数量: {len(blocks)}")
for i, block in enumerate(blocks):
print(f" 街区 {i+1}: 面积={block.area:.0f}, 周长={block.length:.1f}")
# 计算道路总长度
total_length = sum(r.length for r in roads)
print(f"\n道路总长度: {total_length:.1f}")
# 合并连续路段
merged = linemerge(MultiLineString(roads))
if merged.geom_type == 'MultiLineString':
print(f"合并后路段数: {len(merged.geoms)}")
else:
print(f"合并为单条线路")
20.4 案例四:地块面积批量计算
20.4.1 实现代码
from shapely.geometry import shape
from shapely import area, is_valid, make_valid
import json
import numpy as np
# 模拟从 GeoJSON 读取地块数据
geojson_data = {
"type": "FeatureCollection",
"features": [
{
"type": "Feature",
"properties": {"id": "A001", "owner": "张三"},
"geometry": {
"type": "Polygon",
"coordinates": [[(0, 0), (100, 0), (100, 80), (0, 80), (0, 0)]]
}
},
{
"type": "Feature",
"properties": {"id": "A002", "owner": "李四"},
"geometry": {
"type": "Polygon",
"coordinates": [[(100, 0), (250, 0), (250, 80), (100, 80), (100, 0)]]
}
},
{
"type": "Feature",
"properties": {"id": "A003", "owner": "王五"},
"geometry": {
"type": "Polygon",
"coordinates": [[(0, 80), (120, 80), (120, 180), (0, 180), (0, 80)]]
}
},
]
}
# 解析并计算
print("地块面积统计:")
print("-" * 50)
total_area = 0
for feature in geojson_data["features"]:
geom = shape(feature["geometry"])
props = feature["properties"]
# 验证几何有效性
if not is_valid(geom):
print(f" ⚠ {props['id']} 无效,正在修复...")
geom = make_valid(geom)
parcel_area = geom.area
total_area += parcel_area
centroid = geom.centroid
print(f" {props['id']} ({props['owner']}): "
f"面积={parcel_area:.0f}m², "
f"周长={geom.length:.1f}m, "
f"中心=({centroid.x:.0f}, {centroid.y:.0f})")
print("-" * 50)
print(f"总面积: {total_area:.0f}m²")
print(f"平均面积: {total_area/len(geojson_data['features']):.0f}m²")
20.5 案例五:空间数据质量检查
20.5.1 实现代码
from shapely import Polygon, MultiPolygon, is_valid, make_valid
from shapely import is_valid_reason
# 模拟数据集(包含问题几何)
parcels = [
("P001", Polygon([(0, 0), (10, 0), (10, 10), (0, 10)])), # 正常
("P002", Polygon([(0, 0), (10, 10), (10, 0), (0, 10)])), # 蝴蝶结(自相交)
("P003", Polygon([(0, 0), (5, 0), (5, 5), (0, 5)])), # 正常
("P004", Polygon([(0, 0), (0, 0), (1, 0), (1, 1), (0, 1)])), # 重复顶点
]
# 质量检查
print("=" * 60)
print("空间数据质量检查报告")
print("=" * 60)
valid_count = 0
fixed_count = 0
results = []
for pid, geom in parcels:
valid = is_valid(geom)
if valid:
valid_count += 1
results.append((pid, geom, "✅ 有效", None))
print(f" {pid}: ✅ 有效 (面积={geom.area:.2f})")
else:
reason = is_valid_reason(geom)
# 尝试修复
fixed = make_valid(geom)
if is_valid(fixed):
fixed_count += 1
results.append((pid, fixed, "🔧 已修复", reason))
print(f" {pid}: 🔧 已修复")
print(f" 原因: {reason}")
print(f" 修复前面积: {geom.area:.2f}")
print(f" 修复后面积: {fixed.area:.2f}")
else:
results.append((pid, geom, "❌ 无法修复", reason))
print(f" {pid}: ❌ 无法修复 - {reason}")
print(f"\n总计: {len(parcels)} 个, "
f"有效: {valid_count}, "
f"已修复: {fixed_count}, "
f"失败: {len(parcels) - valid_count - fixed_count}")
# 拓扑一致性检查(重叠检测)
print("\n重叠检测:")
valid_geoms = [(pid, geom) for pid, geom, status, _ in results if status != "❌ 无法修复"]
overlap_found = False
for i in range(len(valid_geoms)):
for j in range(i + 1, len(valid_geoms)):
pid1, g1 = valid_geoms[i]
pid2, g2 = valid_geoms[j]
if g1.intersects(g2) and not g1.touches(g2):
overlap = g1.intersection(g2)
if overlap.area > 0.001:
print(f" ⚠ {pid1} 与 {pid2} 重叠, 面积={overlap.area:.4f}")
overlap_found = True
if not overlap_found:
print(" ✅ 未发现重叠")
20.6 案例六:基于 Voronoi 的空间插值
20.6.1 实现代码
from shapely import MultiPoint, Point, Polygon, voronoi_polygons
# 采样点及其属性值
samples = [
(Point(2, 2), 25.3), # 温度测量
(Point(8, 3), 27.1),
(Point(5, 7), 23.8),
(Point(1, 9), 22.1),
(Point(9, 8), 26.5),
(Point(5, 1), 28.0),
(Point(3, 5), 24.2),
]
points = MultiPoint([s[0] for s in samples])
values = [s[1] for s in samples]
# 研究区域
study_area = Polygon([(0, 0), (10, 0), (10, 10), (0, 10)])
# 生成泰森多边形
voronoi = voronoi_polygons(points, extend_to=study_area)
# 面积加权插值
print("泰森多边形法空间插值:")
print("-" * 50)
total_weighted = 0
total_area = 0
for i, cell in enumerate(voronoi.geoms):
clipped = cell.intersection(study_area)
if clipped.is_empty:
continue
cell_area = clipped.area
total_weighted += cell_area * values[i]
total_area += cell_area
print(f" 站点 {i+1}: 值={values[i]:.1f}, "
f"控制面积={cell_area:.2f} ({cell_area/study_area.area*100:.1f}%)")
mean_value = total_weighted / total_area
print(f"\n面积加权平均值: {mean_value:.2f}")
print(f"简单平均值: {sum(values)/len(values):.2f}")
# 对指定点进行最近邻插值
query = Point(6, 4)
min_dist = float('inf')
nearest_value = None
for pt, val in samples:
d = query.distance(pt)
if d < min_dist:
min_dist = d
nearest_value = val
print(f"\n查询点 ({query.x}, {query.y}) 的插值结果: {nearest_value:.1f}")
print(f"最近站点距离: {min_dist:.2f}")
20.7 最佳实践总结
20.7.1 代码组织模式
# 推荐的项目结构
# project/
# ├── geometry_utils.py # 几何工具函数
# ├── spatial_analysis.py # 空间分析逻辑
# ├── data_io.py # 数据读写
# └── main.py # 主程序
# geometry_utils.py 示例
from shapely import Polygon, is_valid, make_valid
from shapely.ops import unary_union
def ensure_valid(geom):
"""确保几何体有效"""
if is_valid(geom):
return geom
return make_valid(geom)
def safe_union(geometries):
"""安全的批量并集"""
valid_geoms = [ensure_valid(g) for g in geometries if not g.is_empty]
if not valid_geoms:
return Polygon() # 返回空多边形
return unary_union(valid_geoms)
def calculate_coverage(facilities, radius, boundary):
"""计算设施覆盖率"""
buffers = [f.buffer(radius) for f in facilities]
coverage = safe_union(buffers).intersection(boundary)
return coverage.area / boundary.area
20.7.2 错误处理模式
from shapely import Point, Polygon, is_valid, make_valid
from shapely.errors import GEOSException
import logging
logger = logging.getLogger(__name__)
def safe_intersection(geom1, geom2):
"""安全的交集运算,带错误处理"""
try:
# 确保输入有效
if not is_valid(geom1):
geom1 = make_valid(geom1)
if not is_valid(geom2):
geom2 = make_valid(geom2)
result = geom1.intersection(geom2)
if result.is_empty:
logger.debug("交集结果为空")
return result
except GEOSException as e:
logger.warning(f"GEOS 错误: {e}")
# 尝试 buffer(0) 修复
try:
geom1 = geom1.buffer(0)
geom2 = geom2.buffer(0)
return geom1.intersection(geom2)
except Exception:
logger.error("无法修复几何体")
return Polygon() # 返回空几何
except Exception as e:
logger.error(f"未预期的错误: {e}")
raise
20.7.3 性能优化检查清单
| 检查项 | 优化方法 |
|---|---|
| 循环中重复操作 | 使用向量化函数 shapely.xxx() |
| 多次查询同一几何 | 使用 prepare() |
| 大量空间查询 | 使用 STRtree 空间索引 |
| 几何体创建 | 使用 shapely.points() 等批量创建 |
| 精度过高 | 使用 grid_size 参数降低精度 |
| 串行处理 | 利用 GIL 释放进行多线程 |
| 大文件一次加载 | 使用流式处理/分块读取 |
| 复杂并集 | 使用 unary_union 而非逐个 union |
| 不必要的验证 | 仅在数据入口处验证 |
20.7.4 内存管理建议
import shapely
from shapely import Point
import gc
# 1. 及时释放不需要的几何体
large_geom = Point(0, 0).buffer(1000)
# 使用完毕后
del large_geom
# 2. 使用生成器处理大数据集
def process_features(features):
"""使用生成器逐个处理"""
for feature in features:
geom = shapely.from_wkt(feature['wkt'])
yield process_single(geom)
def process_single(geom):
return geom.buffer(10).area
# 3. 分批处理
def batch_process(geometries, batch_size=1000):
results = []
for i in range(0, len(geometries), batch_size):
batch = geometries[i:i+batch_size]
batch_results = [g.buffer(1).area for g in batch]
results.extend(batch_results)
gc.collect() # 强制垃圾回收
return results
20.8 学习资源推荐
20.8.1 官方资源
| 资源 | 地址 |
|---|---|
| Shapely 官方文档 | https://shapely.readthedocs.io/ |
| Shapely GitHub | https://github.com/shapely/shapely |
| GEOS 项目 | https://libgeos.org/ |
| Shapely 变更日志 | https://github.com/shapely/shapely/blob/main/CHANGES.txt |
20.8.2 相关库文档
| 库 | 用途 | 文档 |
|---|---|---|
| GeoPandas | 地理空间数据分析 | https://geopandas.org/ |
| Fiona | 矢量数据读写 | https://fiona.readthedocs.io/ |
| pyproj | 坐标转换 | https://pyproj4.github.io/pyproj/ |
| rasterio | 栅格数据处理 | https://rasterio.readthedocs.io/ |
| matplotlib | 数据可视化 | https://matplotlib.org/ |
20.8.3 推荐学习路径
初级阶段
├── 1. 学习基本几何类型(Point, LineString, Polygon)
├── 2. 掌握属性访问和基本操作
└── 3. 了解 WKT/WKB/GeoJSON 序列化
中级阶段
├── 4. 掌握空间关系判断
├── 5. 熟练使用集合运算和缓冲区
├── 6. 学习 STRtree 空间索引
└── 7. 与 GeoPandas/matplotlib 集成
高级阶段
├── 8. 性能优化(Prepared, 向量化)
├── 9. 复杂空间分析(Voronoi, 网络分析)
├── 10. 大规模数据处理
└── 11. 自定义工作流开发
20.9 本章小结
本章通过六个实际案例展示了 Shapely 在不同 GIS 场景中的综合应用:
- 缓冲区分析:设施服务范围评估和盲区识别
- 空间查询:利用 STRtree 实现高效的点在面内查询
- 网络分析:道路网络拓扑处理和街区生成
- 批量计算:地块面积统计和 GeoJSON 数据处理
- 质量检查:几何有效性验证和自动修复
- 空间插值:基于 Voronoi 图的面积加权插值
最佳实践要点:
- 始终验证输入几何体的有效性
- 使用空间索引加速批量查询
- 优先使用向量化操作
- 合理的错误处理和日志记录
- 注意内存管理和性能优化