第25章:性能优化与大数据处理
当地理数据集规模从数千条增长到数百万条时,未经优化的代码可能导致处理时间从秒级膨胀到小时级。本章系统介绍 GeoPandas 性能优化的关键策略,涵盖向量化操作、数据类型优化、空间索引、分块处理、分布式计算以及内存管理等方面,帮助读者高效处理大规模地理数据。
25.1 性能优化概述
25.1.1 常见性能瓶颈
GeoPandas 应用中的性能瓶颈通常来自以下几个方面:
| 瓶颈类型 | 典型场景 | 影响程度 |
|---|---|---|
| 逐行循环 | 使用 iterrows() 遍历几何对象 |
高 |
| 内存溢出 | 一次性加载超大文件 | 高 |
| 冗余计算 | 未使用空间索引的空间查询 | 高 |
| 数据类型浪费 | 字符串列未转为分类类型 | 中 |
| 文件格式低效 | 使用 Shapefile 存储大数据集 | 中 |
25.1.2 性能分析工具
在优化之前,首先需要定位瓶颈:
import time
import cProfile
import geopandas as gpd
# 1. 简单计时
start = time.perf_counter()
gdf = gpd.read_file("large_dataset.gpkg")
elapsed = time.perf_counter() - start
print(f"读取耗时: {elapsed:.2f} 秒")
# 2. 使用 cProfile 进行详细分析
cProfile.run('gpd.read_file("large_dataset.gpkg")', sort='cumulative')
# 3. 使用 %%timeit 魔法命令(Jupyter 环境)
# %%timeit -n 3 -r 5
# gdf.buffer(100)
# 4. 使用 line_profiler 逐行分析(需安装 line_profiler)
# 在脚本中:
# @profile
# def process_data(gdf):
# buffered = gdf.buffer(100)
# dissolved = buffered.dissolve()
# return dissolved
# 5. 内存分析
print(f"内存占用: {gdf.memory_usage(deep=True).sum() / 1024**2:.1f} MB")
25.2 Shapely 2.0 向量化操作
25.2.1 循环 vs 向量化
Shapely 2.0 引入了基于 GEOS 的向量化操作,极大提升了几何运算性能:
import geopandas as gpd
import shapely
import numpy as np
gdf = gpd.read_file("buildings.gpkg")
# ❌ 低效:逐行循环
areas_slow = []
for idx, row in gdf.iterrows():
areas_slow.append(row.geometry.area)
# ✅ 高效:向量化操作
areas_fast = gdf.geometry.area
# ✅ 更高效:直接使用 Shapely 2.0 ufunc
areas_ufunc = shapely.area(gdf.geometry.values)
25.2.2 常用向量化函数
# 向量化缓冲区
buffered = shapely.buffer(gdf.geometry.values, distance=100)
# 向量化质心计算
centroids = shapely.centroid(gdf.geometry.values)
# 向量化距离计算(两个等长几何数组之间)
distances = shapely.distance(gdf1.geometry.values, gdf2.geometry.values)
# 向量化判断
is_valid = shapely.is_valid(gdf.geometry.values)
is_empty = shapely.is_empty(gdf.geometry.values)
# 向量化几何简化
simplified = shapely.simplify(gdf.geometry.values, tolerance=10)
25.2.3 性能对比
import timeit
gdf = gpd.GeoDataFrame(
geometry=gpd.points_from_xy(
np.random.uniform(0, 100, 100_000),
np.random.uniform(0, 100, 100_000)
)
)
# 循环方式
t_loop = timeit.timeit(
lambda: [g.buffer(1.0) for g in gdf.geometry],
number=3
)
# 向量化方式
t_vec = timeit.timeit(
lambda: gdf.geometry.buffer(1.0),
number=3
)
print(f"循环: {t_loop:.2f}s, 向量化: {t_vec:.2f}s, 加速比: {t_loop/t_vec:.1f}x")
25.3 数据类型优化
25.3.1 查看内存使用
gdf = gpd.read_file("city_data.gpkg")
# 查看各列内存使用
mem = gdf.memory_usage(deep=True)
print(mem)
print(f"\n总内存: {mem.sum() / 1024**2:.2f} MB")
# 查看各列数据类型
print(gdf.dtypes)
25.3.2 分类类型优化
对于重复值较多的字符串列,转换为 category 类型可以显著节省内存:
# 优化前
print(f"省份列内存: {gdf['province'].memory_usage(deep=True) / 1024**2:.2f} MB")
# 转换为分类类型
gdf['province'] = gdf['province'].astype('category')
gdf['land_use'] = gdf['land_use'].astype('category')
# 优化后
print(f"省份列内存: {gdf['province'].memory_usage(deep=True) / 1024**2:.2f} MB")
25.3.3 数值类型降级
# 整数降级
gdf['population'] = gdf['population'].astype('int32') # 从 int64 降级
gdf['floor_count'] = gdf['floor_count'].astype('int16')
# 浮点数降级
gdf['area_km2'] = gdf['area_km2'].astype('float32') # 从 float64 降级
# 自动降级
gdf['population'] = pd.to_numeric(gdf['population'], downcast='integer')
gdf['area_km2'] = pd.to_numeric(gdf['area_km2'], downcast='float')
25.3.4 优化效果对比
| 数据类型 | 字节数 | 适用场景 |
|---|---|---|
| int64 | 8 | 极大整数 |
| int32 | 4 | 一般整数(±21亿) |
| int16 | 2 | 小整数(±32767) |
| float64 | 8 | 高精度浮点 |
| float32 | 4 | 一般精度浮点 |
| object (字符串) | 不定 | 唯一文本值 |
| category | 极小 | 重复度高的文本 |
25.4 空间索引优化
25.4.1 空间索引原理
GeoPandas 内置 R-tree 空间索引(通过 sindex 属性访问),可将空间查询从 O(n) 降低到 O(log n):
gdf = gpd.read_file("parcels.gpkg")
# 访问空间索引(首次访问时自动构建)
sindex = gdf.sindex
print(f"索引类型: {type(sindex)}")
print(f"索引大小: {len(sindex)}")
25.4.2 使用 query 方法
from shapely.geometry import box
# 定义查询区域
query_bbox = box(116.3, 39.9, 116.5, 40.0)
# ❌ 不使用索引:遍历所有要素
results_slow = gdf[gdf.intersects(query_bbox)]
# ✅ 使用空间索引:两步查询法
# 第一步:粗筛(使用包围盒)
candidate_idx = list(gdf.sindex.intersection(query_bbox.bounds))
candidates = gdf.iloc[candidate_idx]
# 第二步:精筛(精确几何判断)
results_fast = candidates[candidates.intersects(query_bbox)]
25.4.3 使用 query 谓词
# GeoPandas >= 0.12 推荐的方式
idx = gdf.sindex.query(query_bbox, predicate='intersects')
results = gdf.iloc[idx]
# 支持的谓词
predicates = ['intersects', 'contains', 'within', 'touches',
'crosses', 'overlaps', 'covers', 'covered_by']
# 批量查询:多个几何对象同时查询
query_geoms = gpd.GeoSeries([box(116+i*0.1, 39.9, 116.1+i*0.1, 40.0)
for i in range(10)])
# 批量空间连接(内部自动使用空间索引)
result = gpd.sjoin(gdf, gpd.GeoDataFrame(geometry=query_geoms), predicate='intersects')
25.4.4 空间索引性能对比
import timeit
gdf_large = gpd.GeoDataFrame(
geometry=gpd.points_from_xy(
np.random.uniform(0, 100, 500_000),
np.random.uniform(0, 100, 500_000)
)
)
query_area = box(40, 40, 60, 60)
# 无索引
t_no_idx = timeit.timeit(
lambda: gdf_large[gdf_large.intersects(query_area)],
number=3
)
# 有索引
_ = gdf_large.sindex # 预构建索引
t_with_idx = timeit.timeit(
lambda: gdf_large.iloc[gdf_large.sindex.query(query_area, predicate='intersects')],
number=3
)
print(f"无索引: {t_no_idx:.2f}s, 有索引: {t_with_idx:.2f}s")
25.5 分块处理
25.5.1 使用 pyogrio 分块读取
对于无法一次性载入内存的大文件,可以分块读取和处理:
import pyogrio
# 获取文件基本信息(不读取数据)
info = pyogrio.read_info("huge_dataset.gpkg")
total_features = info['features']
print(f"总要素数: {total_features}")
# 分块读取和处理
chunk_size = 100_000
results = []
for skip in range(0, total_features, chunk_size):
chunk = gpd.read_file(
"huge_dataset.gpkg",
skip_features=skip,
max_features=chunk_size,
engine='pyogrio'
)
# 对每块数据进行处理
processed = chunk[chunk.geometry.area > 1000]
results.append(processed)
print(f"已处理: {min(skip + chunk_size, total_features)}/{total_features}")
# 合并结果
final = gpd.pd.concat(results, ignore_index=True)
25.5.2 按空间范围分块
from shapely.geometry import box
# 将研究区域划分为网格
xmin, ymin, xmax, ymax = gdf.total_bounds
nx, ny = 10, 10
dx = (xmax - xmin) / nx
dy = (ymax - ymin) / ny
results = []
for i in range(nx):
for j in range(ny):
cell = box(xmin + i*dx, ymin + j*dy,
xmin + (i+1)*dx, ymin + (j+1)*dy)
# 使用 bbox 参数读取特定区域
chunk = gpd.read_file(
"huge_dataset.gpkg",
bbox=cell,
engine='pyogrio'
)
if not chunk.empty:
processed = chunk.dissolve()
results.append(processed)
final = gpd.pd.concat(results, ignore_index=True)
25.5.3 只读取必要的列
# 只读取需要的列,减少内存占用
gdf = gpd.read_file(
"huge_dataset.gpkg",
columns=['name', 'population', 'geometry'],
engine='pyogrio'
)
# 只读取属性(不读取几何)
df = pyogrio.read_dataframe(
"huge_dataset.gpkg",
read_geometry=False
)
25.6 Dask-GeoPandas
25.6.1 简介
Dask-GeoPandas 将 GeoPandas 扩展到分布式计算环境,支持对超大数据集进行并行处理:
import dask_geopandas
# 从文件创建 Dask-GeoDataFrame
ddf = dask_geopandas.read_file("large_dataset.gpkg", npartitions=8)
# 查看分区信息
print(f"分区数: {ddf.npartitions}")
print(f"列名: {ddf.columns.tolist()}")
25.6.2 基本操作
# 延迟计算 —— 操作不会立即执行
buffered = ddf.buffer(100)
centroids = ddf.centroid
areas = ddf.area
# 触发计算
result = buffered.compute() # 返回普通 GeoDataFrame
print(type(result))
# 筛选
filtered = ddf[ddf.area > 5000]
filtered_result = filtered.compute()
25.6.3 空间分区
空间分区能让相邻数据落入同一分区,提升空间操作效率:
# 使用 Hilbert 曲线进行空间分区
ddf_spatial = ddf.spatial_shuffle(by='hilbert', npartitions=16)
# 查看分区的空间范围
print(ddf_spatial.spatial_partitions)
25.6.4 从 GeoPandas 转换
gdf = gpd.read_file("medium_dataset.gpkg")
# 转换为 Dask-GeoDataFrame
ddf = dask_geopandas.from_geopandas(gdf, npartitions=4)
# 并行处理后转回
result = ddf.buffer(50).compute()
25.6.5 写入 Parquet
# Dask-GeoPandas 原生支持写入 Parquet(分区文件)
ddf.to_parquet("output_partitioned/")
# 读取分区 Parquet
ddf_loaded = dask_geopandas.read_parquet("output_partitioned/")
25.7 数据格式选择
25.7.1 常用格式对比
不同的文件格式在读写速度、文件大小和功能支持方面差异显著:
| 格式 | 读取速度 | 写入速度 | 文件大小 | 随机访问 | 列式读取 |
|---|---|---|---|---|---|
| GeoParquet | ★★★★★ | ★★★★★ | ★★★★★ | ✅ | ✅ |
| FlatGeobuf | ★★★★☆ | ★★★★☆ | ★★★★☆ | ✅ | ❌ |
| GeoPackage | ★★★☆☆ | ★★★☆☆ | ★★★★☆ | ✅ | ❌ |
| GeoJSON | ★★☆☆☆ | ★★☆☆☆ | ★★☆☆☆ | ❌ | ❌ |
| Shapefile | ★★★☆☆ | ★★★☆☆ | ★★★☆☆ | ❌ | ❌ |
25.7.2 格式性能基准测试
import time
gdf = gpd.read_file("test_data.gpkg")
formats = {
'GeoParquet': ('test.parquet', 'parquet'),
'FlatGeobuf': ('test.fgb', 'FlatGeobuf'),
'GeoPackage': ('test.gpkg', 'GPKG'),
'GeoJSON': ('test.geojson', 'GeoJSON'),
'Shapefile': ('test.shp', 'ESRI Shapefile'),
}
results = []
for name, (path, driver) in formats.items():
# 测试写入
start = time.perf_counter()
if name == 'GeoParquet':
gdf.to_parquet(path)
else:
gdf.to_file(path, driver=driver)
write_time = time.perf_counter() - start
# 测试读取
start = time.perf_counter()
if name == 'GeoParquet':
_ = gpd.read_parquet(path)
else:
_ = gpd.read_file(path)
read_time = time.perf_counter() - start
results.append({
'格式': name,
'写入(s)': round(write_time, 2),
'读取(s)': round(read_time, 2),
})
import pandas as pd
print(pd.DataFrame(results).to_string(index=False))
25.7.3 GeoParquet 最佳实践
# 写入 GeoParquet(推荐用于大数据集)
gdf.to_parquet("data.parquet", compression='snappy')
# 读取 GeoParquet(支持列式读取)
gdf = gpd.read_parquet("data.parquet", columns=['name', 'geometry'])
# 使用 pyarrow 过滤读取
import pyarrow.parquet as pq
table = pq.read_table(
"data.parquet",
filters=[('population', '>', 100000)]
)
25.8 几何简化
25.8.1 使用 simplify 减少顶点
对于可视化或分析精度要求不高的场景,简化几何可以大幅提升性能:
gdf = gpd.read_file("detailed_boundaries.gpkg")
# 查看原始顶点数量
vertex_count_before = gdf.geometry.apply(
lambda g: shapely.get_num_coordinates(g)
).sum()
print(f"简化前顶点数: {vertex_count_before:,}")
# 简化几何(tolerance 单位与坐标系一致)
gdf_simplified = gdf.copy()
gdf_simplified['geometry'] = gdf.geometry.simplify(tolerance=0.001)
vertex_count_after = gdf_simplified.geometry.apply(
lambda g: shapely.get_num_coordinates(g)
).sum()
print(f"简化后顶点数: {vertex_count_after:,}")
print(f"顶点减少比例: {(1 - vertex_count_after/vertex_count_before)*100:.1f}%")
25.8.2 保持拓扑关系的简化
# preserve_topology=True(默认值)保持多边形不自交
gdf_topo = gdf.copy()
gdf_topo['geometry'] = gdf.geometry.simplify(
tolerance=0.001,
preserve_topology=True
)
# 不同简化程度的对比
tolerances = [0.0001, 0.001, 0.01, 0.1]
for tol in tolerances:
simplified = gdf.geometry.simplify(tolerance=tol)
n_coords = sum(shapely.get_num_coordinates(simplified))
print(f"tolerance={tol}: 顶点数={n_coords:,}")
25.8.3 使用 shapely.segmentize 控制线段密度
# 增加顶点密度(在需要更精细插值时使用)
densified = shapely.segmentize(gdf.geometry.values, max_segment_length=100)
# 减少顶点密度(简化后再等间隔采样)
simplified = shapely.simplify(gdf.geometry.values, tolerance=50)
25.9 内存管理
25.9.1 监控内存使用
import psutil
import os
def get_memory_mb():
"""获取当前进程内存使用(MB)"""
process = psutil.Process(os.getpid())
return process.memory_info().rss / 1024**2
print(f"当前内存: {get_memory_mb():.1f} MB")
# GeoDataFrame 内存明细
mem = gdf.memory_usage(deep=True)
print("\n各列内存使用:")
for col in mem.index:
if mem[col] > 1024**2:
print(f" {col}: {mem[col]/1024**2:.1f} MB")
25.9.2 主动释放内存
import gc
# 删除不再需要的变量
del gdf_temp
del intermediate_results
# 强制垃圾回收
gc.collect()
print(f"回收后内存: {get_memory_mb():.1f} MB")
25.9.3 使用上下文管理器模式
def process_in_chunks(filepath, chunk_size=50000):
"""分块处理并及时释放内存"""
info = pyogrio.read_info(filepath)
total = info['features']
all_results = []
for skip in range(0, total, chunk_size):
chunk = gpd.read_file(
filepath,
skip_features=skip,
max_features=chunk_size,
engine='pyogrio'
)
# 处理当前块
result = chunk[chunk.geometry.is_valid].dissolve()
all_results.append(result)
# 主动释放当前块
del chunk
gc.collect()
return gpd.pd.concat(all_results, ignore_index=True)
25.9.4 减少几何精度以节省内存
from shapely import set_precision
# 将坐标精度降低到0.001度(约100米级别)
gdf['geometry'] = set_precision(gdf.geometry.values, grid_size=0.001)
# 对比内存使用
print(f"精度调整后内存: {gdf.memory_usage(deep=True).sum() / 1024**2:.1f} MB")
25.10 性能测试与基准测试
25.10.1 使用 timeit 模块
import timeit
# 测试单个操作
time_buffer = timeit.timeit(
lambda: gdf.buffer(100),
number=10
)
print(f"buffer 平均耗时: {time_buffer/10:.3f}s")
# 对比不同方法
time_sjoin = timeit.timeit(
lambda: gpd.sjoin(gdf1, gdf2, predicate='intersects'),
number=5
)
time_overlay = timeit.timeit(
lambda: gpd.overlay(gdf1, gdf2, how='intersection'),
number=5
)
print(f"sjoin: {time_sjoin/5:.3f}s, overlay: {time_overlay/5:.3f}s")
25.10.2 构建基准测试框架
import time
import pandas as pd
def benchmark(func, name, n_runs=5):
"""通用基准测试函数"""
times = []
for _ in range(n_runs):
start = time.perf_counter()
func()
elapsed = time.perf_counter() - start
times.append(elapsed)
return {
'操作': name,
'平均耗时(s)': round(sum(times) / n_runs, 4),
'最短耗时(s)': round(min(times), 4),
'最长耗时(s)': round(max(times), 4),
}
# 执行基准测试
results = [
benchmark(lambda: gdf.buffer(100), 'buffer(100)'),
benchmark(lambda: gdf.centroid, 'centroid'),
benchmark(lambda: gdf.unary_union, 'unary_union'),
benchmark(lambda: gdf.simplify(0.01), 'simplify(0.01)'),
benchmark(lambda: gdf.to_crs(epsg=3857), 'to_crs(3857)'),
]
report = pd.DataFrame(results)
print(report.to_string(index=False))
25.10.3 按数据规模测试
import numpy as np
sizes = [1000, 10_000, 50_000, 100_000, 500_000]
scale_results = []
for n in sizes:
test_gdf = gpd.GeoDataFrame(
geometry=gpd.points_from_xy(
np.random.uniform(0, 100, n),
np.random.uniform(0, 100, n)
)
)
start = time.perf_counter()
_ = test_gdf.buffer(1.0)
elapsed = time.perf_counter() - start
scale_results.append({'数据量': n, '耗时(s)': round(elapsed, 3)})
print(pd.DataFrame(scale_results).to_string(index=False))
25.11 本章小结
本章全面介绍了 GeoPandas 性能优化的核心技术,以下是各优化策略的适用场景与预期效果:
| 优化策略 | 适用场景 | 预期加速比 | 实施难度 |
|---|---|---|---|
| Shapely 2.0 向量化 | 几何运算密集 | 10-100x | 低 |
| 数据类型优化 | 内存占用大 | 1.5-3x(内存) | 低 |
| 空间索引 | 空间查询频繁 | 10-1000x | 低 |
| 分块处理 | 数据超出内存 | 不适用 | 中 |
| Dask-GeoPandas | 超大数据集并行 | 2-8x | 中 |
| GeoParquet 格式 | 频繁读写大文件 | 3-10x | 低 |
| 几何简化 | 顶点数量过多 | 2-20x | 低 |
| 内存管理 | 长时间运行任务 | 不适用 | 低 |
核心原则:
- 先分析后优化 —— 使用
cProfile、timeit定位真正的瓶颈 - 优先向量化 —— 避免逐行循环,使用 Shapely 2.0 ufunc 操作
- 善用空间索引 —— 在空间查询前始终利用
sindex进行预筛选 - 选择合适格式 —— 大数据集优先使用 GeoParquet 格式
- 分而治之 —— 对超大数据集使用分块处理或 Dask-GeoPandas 并行计算
下一章将介绍 GeoPandas 在实际项目中的综合应用案例。