znlgis 博客

GIS开发与技术分享

第25章:性能优化与大数据处理

当地理数据集规模从数千条增长到数百万条时,未经优化的代码可能导致处理时间从秒级膨胀到小时级。本章系统介绍 GeoPandas 性能优化的关键策略,涵盖向量化操作、数据类型优化、空间索引、分块处理、分布式计算以及内存管理等方面,帮助读者高效处理大规模地理数据。


25.1 性能优化概述

25.1.1 常见性能瓶颈

GeoPandas 应用中的性能瓶颈通常来自以下几个方面:

瓶颈类型 典型场景 影响程度
逐行循环 使用 iterrows() 遍历几何对象
内存溢出 一次性加载超大文件
冗余计算 未使用空间索引的空间查询
数据类型浪费 字符串列未转为分类类型
文件格式低效 使用 Shapefile 存储大数据集

25.1.2 性能分析工具

在优化之前,首先需要定位瓶颈:

import time
import cProfile
import geopandas as gpd

# 1. 简单计时
start = time.perf_counter()
gdf = gpd.read_file("large_dataset.gpkg")
elapsed = time.perf_counter() - start
print(f"读取耗时: {elapsed:.2f} 秒")

# 2. 使用 cProfile 进行详细分析
cProfile.run('gpd.read_file("large_dataset.gpkg")', sort='cumulative')

# 3. 使用 %%timeit 魔法命令(Jupyter 环境)
# %%timeit -n 3 -r 5
# gdf.buffer(100)
# 4. 使用 line_profiler 逐行分析(需安装 line_profiler)
# 在脚本中:
# @profile
# def process_data(gdf):
#     buffered = gdf.buffer(100)
#     dissolved = buffered.dissolve()
#     return dissolved

# 5. 内存分析
print(f"内存占用: {gdf.memory_usage(deep=True).sum() / 1024**2:.1f} MB")

25.2 Shapely 2.0 向量化操作

25.2.1 循环 vs 向量化

Shapely 2.0 引入了基于 GEOS 的向量化操作,极大提升了几何运算性能:

import geopandas as gpd
import shapely
import numpy as np

gdf = gpd.read_file("buildings.gpkg")

# ❌ 低效:逐行循环
areas_slow = []
for idx, row in gdf.iterrows():
    areas_slow.append(row.geometry.area)

# ✅ 高效:向量化操作
areas_fast = gdf.geometry.area

# ✅ 更高效:直接使用 Shapely 2.0 ufunc
areas_ufunc = shapely.area(gdf.geometry.values)

25.2.2 常用向量化函数

# 向量化缓冲区
buffered = shapely.buffer(gdf.geometry.values, distance=100)

# 向量化质心计算
centroids = shapely.centroid(gdf.geometry.values)

# 向量化距离计算(两个等长几何数组之间)
distances = shapely.distance(gdf1.geometry.values, gdf2.geometry.values)

# 向量化判断
is_valid = shapely.is_valid(gdf.geometry.values)
is_empty = shapely.is_empty(gdf.geometry.values)

# 向量化几何简化
simplified = shapely.simplify(gdf.geometry.values, tolerance=10)

25.2.3 性能对比

import timeit

gdf = gpd.GeoDataFrame(
    geometry=gpd.points_from_xy(
        np.random.uniform(0, 100, 100_000),
        np.random.uniform(0, 100, 100_000)
    )
)

# 循环方式
t_loop = timeit.timeit(
    lambda: [g.buffer(1.0) for g in gdf.geometry],
    number=3
)

# 向量化方式
t_vec = timeit.timeit(
    lambda: gdf.geometry.buffer(1.0),
    number=3
)

print(f"循环: {t_loop:.2f}s, 向量化: {t_vec:.2f}s, 加速比: {t_loop/t_vec:.1f}x")

25.3 数据类型优化

25.3.1 查看内存使用

gdf = gpd.read_file("city_data.gpkg")

# 查看各列内存使用
mem = gdf.memory_usage(deep=True)
print(mem)
print(f"\n总内存: {mem.sum() / 1024**2:.2f} MB")

# 查看各列数据类型
print(gdf.dtypes)

25.3.2 分类类型优化

对于重复值较多的字符串列,转换为 category 类型可以显著节省内存:

# 优化前
print(f"省份列内存: {gdf['province'].memory_usage(deep=True) / 1024**2:.2f} MB")

# 转换为分类类型
gdf['province'] = gdf['province'].astype('category')
gdf['land_use'] = gdf['land_use'].astype('category')

# 优化后
print(f"省份列内存: {gdf['province'].memory_usage(deep=True) / 1024**2:.2f} MB")

25.3.3 数值类型降级

# 整数降级
gdf['population'] = gdf['population'].astype('int32')  # 从 int64 降级
gdf['floor_count'] = gdf['floor_count'].astype('int16')

# 浮点数降级
gdf['area_km2'] = gdf['area_km2'].astype('float32')  # 从 float64 降级

# 自动降级
gdf['population'] = pd.to_numeric(gdf['population'], downcast='integer')
gdf['area_km2'] = pd.to_numeric(gdf['area_km2'], downcast='float')

25.3.4 优化效果对比

数据类型 字节数 适用场景
int64 8 极大整数
int32 4 一般整数(±21亿)
int16 2 小整数(±32767)
float64 8 高精度浮点
float32 4 一般精度浮点
object (字符串) 不定 唯一文本值
category 极小 重复度高的文本

25.4 空间索引优化

25.4.1 空间索引原理

GeoPandas 内置 R-tree 空间索引(通过 sindex 属性访问),可将空间查询从 O(n) 降低到 O(log n):

gdf = gpd.read_file("parcels.gpkg")

# 访问空间索引(首次访问时自动构建)
sindex = gdf.sindex
print(f"索引类型: {type(sindex)}")
print(f"索引大小: {len(sindex)}")

25.4.2 使用 query 方法

from shapely.geometry import box

# 定义查询区域
query_bbox = box(116.3, 39.9, 116.5, 40.0)

# ❌ 不使用索引:遍历所有要素
results_slow = gdf[gdf.intersects(query_bbox)]

# ✅ 使用空间索引:两步查询法
# 第一步:粗筛(使用包围盒)
candidate_idx = list(gdf.sindex.intersection(query_bbox.bounds))
candidates = gdf.iloc[candidate_idx]

# 第二步:精筛(精确几何判断)
results_fast = candidates[candidates.intersects(query_bbox)]

25.4.3 使用 query 谓词

# GeoPandas >= 0.12 推荐的方式
idx = gdf.sindex.query(query_bbox, predicate='intersects')
results = gdf.iloc[idx]

# 支持的谓词
predicates = ['intersects', 'contains', 'within', 'touches',
              'crosses', 'overlaps', 'covers', 'covered_by']

# 批量查询:多个几何对象同时查询
query_geoms = gpd.GeoSeries([box(116+i*0.1, 39.9, 116.1+i*0.1, 40.0)
                              for i in range(10)])

# 批量空间连接(内部自动使用空间索引)
result = gpd.sjoin(gdf, gpd.GeoDataFrame(geometry=query_geoms), predicate='intersects')

25.4.4 空间索引性能对比

import timeit

gdf_large = gpd.GeoDataFrame(
    geometry=gpd.points_from_xy(
        np.random.uniform(0, 100, 500_000),
        np.random.uniform(0, 100, 500_000)
    )
)
query_area = box(40, 40, 60, 60)

# 无索引
t_no_idx = timeit.timeit(
    lambda: gdf_large[gdf_large.intersects(query_area)],
    number=3
)

# 有索引
_ = gdf_large.sindex  # 预构建索引
t_with_idx = timeit.timeit(
    lambda: gdf_large.iloc[gdf_large.sindex.query(query_area, predicate='intersects')],
    number=3
)

print(f"无索引: {t_no_idx:.2f}s, 有索引: {t_with_idx:.2f}s")

25.5 分块处理

25.5.1 使用 pyogrio 分块读取

对于无法一次性载入内存的大文件,可以分块读取和处理:

import pyogrio

# 获取文件基本信息(不读取数据)
info = pyogrio.read_info("huge_dataset.gpkg")
total_features = info['features']
print(f"总要素数: {total_features}")

# 分块读取和处理
chunk_size = 100_000
results = []

for skip in range(0, total_features, chunk_size):
    chunk = gpd.read_file(
        "huge_dataset.gpkg",
        skip_features=skip,
        max_features=chunk_size,
        engine='pyogrio'
    )
    # 对每块数据进行处理
    processed = chunk[chunk.geometry.area > 1000]
    results.append(processed)
    print(f"已处理: {min(skip + chunk_size, total_features)}/{total_features}")

# 合并结果
final = gpd.pd.concat(results, ignore_index=True)

25.5.2 按空间范围分块

from shapely.geometry import box

# 将研究区域划分为网格
xmin, ymin, xmax, ymax = gdf.total_bounds
nx, ny = 10, 10
dx = (xmax - xmin) / nx
dy = (ymax - ymin) / ny

results = []
for i in range(nx):
    for j in range(ny):
        cell = box(xmin + i*dx, ymin + j*dy,
                   xmin + (i+1)*dx, ymin + (j+1)*dy)
        # 使用 bbox 参数读取特定区域
        chunk = gpd.read_file(
            "huge_dataset.gpkg",
            bbox=cell,
            engine='pyogrio'
        )
        if not chunk.empty:
            processed = chunk.dissolve()
            results.append(processed)

final = gpd.pd.concat(results, ignore_index=True)

25.5.3 只读取必要的列

# 只读取需要的列,减少内存占用
gdf = gpd.read_file(
    "huge_dataset.gpkg",
    columns=['name', 'population', 'geometry'],
    engine='pyogrio'
)

# 只读取属性(不读取几何)
df = pyogrio.read_dataframe(
    "huge_dataset.gpkg",
    read_geometry=False
)

25.6 Dask-GeoPandas

25.6.1 简介

Dask-GeoPandas 将 GeoPandas 扩展到分布式计算环境,支持对超大数据集进行并行处理:

import dask_geopandas

# 从文件创建 Dask-GeoDataFrame
ddf = dask_geopandas.read_file("large_dataset.gpkg", npartitions=8)

# 查看分区信息
print(f"分区数: {ddf.npartitions}")
print(f"列名: {ddf.columns.tolist()}")

25.6.2 基本操作

# 延迟计算 —— 操作不会立即执行
buffered = ddf.buffer(100)
centroids = ddf.centroid
areas = ddf.area

# 触发计算
result = buffered.compute()  # 返回普通 GeoDataFrame
print(type(result))

# 筛选
filtered = ddf[ddf.area > 5000]
filtered_result = filtered.compute()

25.6.3 空间分区

空间分区能让相邻数据落入同一分区,提升空间操作效率:

# 使用 Hilbert 曲线进行空间分区
ddf_spatial = ddf.spatial_shuffle(by='hilbert', npartitions=16)

# 查看分区的空间范围
print(ddf_spatial.spatial_partitions)

25.6.4 从 GeoPandas 转换

gdf = gpd.read_file("medium_dataset.gpkg")

# 转换为 Dask-GeoDataFrame
ddf = dask_geopandas.from_geopandas(gdf, npartitions=4)

# 并行处理后转回
result = ddf.buffer(50).compute()

25.6.5 写入 Parquet

# Dask-GeoPandas 原生支持写入 Parquet(分区文件)
ddf.to_parquet("output_partitioned/")

# 读取分区 Parquet
ddf_loaded = dask_geopandas.read_parquet("output_partitioned/")

25.7 数据格式选择

25.7.1 常用格式对比

不同的文件格式在读写速度、文件大小和功能支持方面差异显著:

格式 读取速度 写入速度 文件大小 随机访问 列式读取
GeoParquet ★★★★★ ★★★★★ ★★★★★
FlatGeobuf ★★★★☆ ★★★★☆ ★★★★☆
GeoPackage ★★★☆☆ ★★★☆☆ ★★★★☆
GeoJSON ★★☆☆☆ ★★☆☆☆ ★★☆☆☆
Shapefile ★★★☆☆ ★★★☆☆ ★★★☆☆

25.7.2 格式性能基准测试

import time

gdf = gpd.read_file("test_data.gpkg")
formats = {
    'GeoParquet': ('test.parquet', 'parquet'),
    'FlatGeobuf': ('test.fgb', 'FlatGeobuf'),
    'GeoPackage': ('test.gpkg', 'GPKG'),
    'GeoJSON': ('test.geojson', 'GeoJSON'),
    'Shapefile': ('test.shp', 'ESRI Shapefile'),
}

results = []
for name, (path, driver) in formats.items():
    # 测试写入
    start = time.perf_counter()
    if name == 'GeoParquet':
        gdf.to_parquet(path)
    else:
        gdf.to_file(path, driver=driver)
    write_time = time.perf_counter() - start

    # 测试读取
    start = time.perf_counter()
    if name == 'GeoParquet':
        _ = gpd.read_parquet(path)
    else:
        _ = gpd.read_file(path)
    read_time = time.perf_counter() - start

    results.append({
        '格式': name,
        '写入(s)': round(write_time, 2),
        '读取(s)': round(read_time, 2),
    })

import pandas as pd
print(pd.DataFrame(results).to_string(index=False))

25.7.3 GeoParquet 最佳实践

# 写入 GeoParquet(推荐用于大数据集)
gdf.to_parquet("data.parquet", compression='snappy')

# 读取 GeoParquet(支持列式读取)
gdf = gpd.read_parquet("data.parquet", columns=['name', 'geometry'])

# 使用 pyarrow 过滤读取
import pyarrow.parquet as pq

table = pq.read_table(
    "data.parquet",
    filters=[('population', '>', 100000)]
)

25.8 几何简化

25.8.1 使用 simplify 减少顶点

对于可视化或分析精度要求不高的场景,简化几何可以大幅提升性能:

gdf = gpd.read_file("detailed_boundaries.gpkg")

# 查看原始顶点数量
vertex_count_before = gdf.geometry.apply(
    lambda g: shapely.get_num_coordinates(g)
).sum()
print(f"简化前顶点数: {vertex_count_before:,}")

# 简化几何(tolerance 单位与坐标系一致)
gdf_simplified = gdf.copy()
gdf_simplified['geometry'] = gdf.geometry.simplify(tolerance=0.001)

vertex_count_after = gdf_simplified.geometry.apply(
    lambda g: shapely.get_num_coordinates(g)
).sum()
print(f"简化后顶点数: {vertex_count_after:,}")
print(f"顶点减少比例: {(1 - vertex_count_after/vertex_count_before)*100:.1f}%")

25.8.2 保持拓扑关系的简化

# preserve_topology=True(默认值)保持多边形不自交
gdf_topo = gdf.copy()
gdf_topo['geometry'] = gdf.geometry.simplify(
    tolerance=0.001,
    preserve_topology=True
)

# 不同简化程度的对比
tolerances = [0.0001, 0.001, 0.01, 0.1]
for tol in tolerances:
    simplified = gdf.geometry.simplify(tolerance=tol)
    n_coords = sum(shapely.get_num_coordinates(simplified))
    print(f"tolerance={tol}: 顶点数={n_coords:,}")

25.8.3 使用 shapely.segmentize 控制线段密度

# 增加顶点密度(在需要更精细插值时使用)
densified = shapely.segmentize(gdf.geometry.values, max_segment_length=100)

# 减少顶点密度(简化后再等间隔采样)
simplified = shapely.simplify(gdf.geometry.values, tolerance=50)

25.9 内存管理

25.9.1 监控内存使用

import psutil
import os

def get_memory_mb():
    """获取当前进程内存使用(MB)"""
    process = psutil.Process(os.getpid())
    return process.memory_info().rss / 1024**2

print(f"当前内存: {get_memory_mb():.1f} MB")

# GeoDataFrame 内存明细
mem = gdf.memory_usage(deep=True)
print("\n各列内存使用:")
for col in mem.index:
    if mem[col] > 1024**2:
        print(f"  {col}: {mem[col]/1024**2:.1f} MB")

25.9.2 主动释放内存

import gc

# 删除不再需要的变量
del gdf_temp
del intermediate_results

# 强制垃圾回收
gc.collect()

print(f"回收后内存: {get_memory_mb():.1f} MB")

25.9.3 使用上下文管理器模式

def process_in_chunks(filepath, chunk_size=50000):
    """分块处理并及时释放内存"""
    info = pyogrio.read_info(filepath)
    total = info['features']
    all_results = []

    for skip in range(0, total, chunk_size):
        chunk = gpd.read_file(
            filepath,
            skip_features=skip,
            max_features=chunk_size,
            engine='pyogrio'
        )
        # 处理当前块
        result = chunk[chunk.geometry.is_valid].dissolve()
        all_results.append(result)

        # 主动释放当前块
        del chunk
        gc.collect()

    return gpd.pd.concat(all_results, ignore_index=True)

25.9.4 减少几何精度以节省内存

from shapely import set_precision

# 将坐标精度降低到0.001度(约100米级别)
gdf['geometry'] = set_precision(gdf.geometry.values, grid_size=0.001)

# 对比内存使用
print(f"精度调整后内存: {gdf.memory_usage(deep=True).sum() / 1024**2:.1f} MB")

25.10 性能测试与基准测试

25.10.1 使用 timeit 模块

import timeit

# 测试单个操作
time_buffer = timeit.timeit(
    lambda: gdf.buffer(100),
    number=10
)
print(f"buffer 平均耗时: {time_buffer/10:.3f}s")

# 对比不同方法
time_sjoin = timeit.timeit(
    lambda: gpd.sjoin(gdf1, gdf2, predicate='intersects'),
    number=5
)
time_overlay = timeit.timeit(
    lambda: gpd.overlay(gdf1, gdf2, how='intersection'),
    number=5
)
print(f"sjoin: {time_sjoin/5:.3f}s, overlay: {time_overlay/5:.3f}s")

25.10.2 构建基准测试框架

import time
import pandas as pd

def benchmark(func, name, n_runs=5):
    """通用基准测试函数"""
    times = []
    for _ in range(n_runs):
        start = time.perf_counter()
        func()
        elapsed = time.perf_counter() - start
        times.append(elapsed)
    return {
        '操作': name,
        '平均耗时(s)': round(sum(times) / n_runs, 4),
        '最短耗时(s)': round(min(times), 4),
        '最长耗时(s)': round(max(times), 4),
    }

# 执行基准测试
results = [
    benchmark(lambda: gdf.buffer(100), 'buffer(100)'),
    benchmark(lambda: gdf.centroid, 'centroid'),
    benchmark(lambda: gdf.unary_union, 'unary_union'),
    benchmark(lambda: gdf.simplify(0.01), 'simplify(0.01)'),
    benchmark(lambda: gdf.to_crs(epsg=3857), 'to_crs(3857)'),
]

report = pd.DataFrame(results)
print(report.to_string(index=False))

25.10.3 按数据规模测试

import numpy as np

sizes = [1000, 10_000, 50_000, 100_000, 500_000]
scale_results = []

for n in sizes:
    test_gdf = gpd.GeoDataFrame(
        geometry=gpd.points_from_xy(
            np.random.uniform(0, 100, n),
            np.random.uniform(0, 100, n)
        )
    )
    start = time.perf_counter()
    _ = test_gdf.buffer(1.0)
    elapsed = time.perf_counter() - start

    scale_results.append({'数据量': n, '耗时(s)': round(elapsed, 3)})

print(pd.DataFrame(scale_results).to_string(index=False))

25.11 本章小结

本章全面介绍了 GeoPandas 性能优化的核心技术,以下是各优化策略的适用场景与预期效果:

优化策略 适用场景 预期加速比 实施难度
Shapely 2.0 向量化 几何运算密集 10-100x
数据类型优化 内存占用大 1.5-3x(内存)
空间索引 空间查询频繁 10-1000x
分块处理 数据超出内存 不适用
Dask-GeoPandas 超大数据集并行 2-8x
GeoParquet 格式 频繁读写大文件 3-10x
几何简化 顶点数量过多 2-20x
内存管理 长时间运行任务 不适用

核心原则:

  1. 先分析后优化 —— 使用 cProfiletimeit 定位真正的瓶颈
  2. 优先向量化 —— 避免逐行循环,使用 Shapely 2.0 ufunc 操作
  3. 善用空间索引 —— 在空间查询前始终利用 sindex 进行预筛选
  4. 选择合适格式 —— 大数据集优先使用 GeoParquet 格式
  5. 分而治之 —— 对超大数据集使用分块处理或 Dask-GeoPandas 并行计算

下一章将介绍 GeoPandas 在实际项目中的综合应用案例。