znlgis 博客

GIS开发与技术分享

第27章:实战案例与最佳实践

经过前面章节的系统学习,我们已经掌握了 GeoPandas 的核心功能与高级技巧。本章将通过五个完整的实战案例,展示如何将所学知识综合运用到真实项目中,并总结项目组织、数据管理和性能优化等方面的最佳实践,帮助读者建立从理论到实践的完整桥梁。


27.1 实战案例概述

27.1.1 从理论到实践的思路

实际地理空间项目通常包含以下几个阶段:

阶段 核心任务 常用工具
数据获取 收集原始数据 geopandas.read_file()、API 接口
数据清洗 格式统一、缺失值处理 Pandas、GeoPandas
空间分析 叠置、缓冲、聚类等 GeoPandas、PySAL
可视化 制图、交互展示 Matplotlib、Folium
结果输出 导出报告与数据 GeoPandas、Pandas

27.1.2 案例选取原则

本章选取的五个案例覆盖了城市规划、环境监测、交通分析和房产评估等典型应用场景,每个案例都包含完整的数据读取、处理、分析和可视化流程。


27.2 案例一:城市 POI 分析

27.2.1 需求描述

分析城市中各类兴趣点(POI)的空间分布特征,统计各行政区的 POI 密度,并生成热力图。

27.2.2 数据准备与读取

import geopandas as gpd
import pandas as pd
from shapely.geometry import Point

# 读取行政区划数据
districts = gpd.read_file("data/districts.shp")
print(f"行政区数量: {len(districts)}")
print(f"坐标参考系: {districts.crs}")

# 读取 POI 数据(CSV 格式)
poi_df = pd.read_csv("data/poi_data.csv")
geometry = [Point(xy) for xy in zip(poi_df["longitude"], poi_df["latitude"])]
poi_gdf = gpd.GeoDataFrame(poi_df, geometry=geometry, crs="EPSG:4326")

print(f"POI 总数: {len(poi_gdf)}")
print(f"POI 类别: {poi_gdf['category'].unique()}")

27.2.3 空间连接与统计

# 将 POI 与行政区进行空间连接
poi_with_district = gpd.sjoin(poi_gdf, districts, how="inner", predicate="within")

# 按行政区和类别统计 POI 数量
poi_stats = poi_with_district.groupby(
    ["district_name", "category"]
).size().reset_index(name="count")

# 计算各区 POI 密度(个/平方公里)
districts_proj = districts.to_crs("EPSG:32650")
districts_proj["area_km2"] = districts_proj.geometry.area / 1e6

district_total = poi_with_district.groupby("district_name").size().reset_index(name="poi_count")
district_total = district_total.merge(
    districts_proj[["district_name", "area_km2"]], on="district_name"
)
district_total["density"] = district_total["poi_count"] / district_total["area_km2"]

print(district_total.sort_values("density", ascending=False).head())

27.2.4 热力图可视化

import matplotlib.pyplot as plt
import numpy as np

fig, axes = plt.subplots(1, 2, figsize=(16, 8))

# 左图:POI 分布散点图
districts.plot(ax=axes[0], color="lightgray", edgecolor="black")
poi_gdf.plot(ax=axes[0], column="category", markersize=2, legend=True, alpha=0.5)
axes[0].set_title("POI 空间分布")

# 右图:各区 POI 密度专题图
districts_density = districts.merge(district_total, on="district_name")
districts_density.plot(
    ax=axes[1], column="density", cmap="YlOrRd",
    legend=True, legend_kwds={"label": "POI 密度(个/km²)"}
)
axes[1].set_title("行政区 POI 密度")

plt.tight_layout()
plt.savefig("output/poi_analysis.png", dpi=150)
plt.show()

27.3 案例二:土地利用变化分析

27.3.1 需求描述

对比两个时期的土地利用数据,计算各类土地利用的变化面积,识别主要的土地利用转换类型。

27.3.2 数据加载与预处理

# 读取两期土地利用数据
land_2015 = gpd.read_file("data/landuse_2015.shp")
land_2020 = gpd.read_file("data/landuse_2020.shp")

# 统一坐标系(使用投影坐标系以便计算面积)
target_crs = "EPSG:32650"
land_2015 = land_2015.to_crs(target_crs)
land_2020 = land_2020.to_crs(target_crs)

# 检查并修复几何有效性
land_2015["geometry"] = land_2015.geometry.make_valid()
land_2020["geometry"] = land_2020.geometry.make_valid()

print("2015 年土地利用类型:", land_2015["landuse"].unique())
print("2020 年土地利用类型:", land_2020["landuse"].unique())

27.3.3 叠置分析与面积统计

# 执行叠置分析
change = gpd.overlay(land_2015, land_2020, how="intersection")

# 计算叠置后各图斑面积(公顷)
change["area_ha"] = change.geometry.area / 10000

# 生成转换矩阵
change_matrix = change.pivot_table(
    values="area_ha",
    index="landuse_1",    # 2015 年类型
    columns="landuse_2",  # 2020 年类型
    aggfunc="sum",
    fill_value=0
)
print("土地利用转换矩阵(公顷):")
print(change_matrix.round(2))

# 统计各类型面积变化
area_2015 = land_2015.copy()
area_2015["area_ha"] = area_2015.geometry.area / 10000
summary_2015 = area_2015.groupby("landuse")["area_ha"].sum()

area_2020 = land_2020.copy()
area_2020["area_ha"] = area_2020.geometry.area / 10000
summary_2020 = area_2020.groupby("landuse")["area_ha"].sum()

change_summary = pd.DataFrame({
    "2015面积(ha)": summary_2015,
    "2020面积(ha)": summary_2020
})
change_summary["变化量(ha)"] = change_summary["2020面积(ha)"] - change_summary["2015面积(ha)"]
change_summary["变化率(%)"] = (change_summary["变化量(ha)"] / change_summary["2015面积(ha)"] * 100).round(2)
print(change_summary)

27.3.4 变化结果可视化

fig, axes = plt.subplots(1, 3, figsize=(20, 7))

color_map = {"耕地": "#FFFF00", "林地": "#228B22", "建设用地": "#FF0000",
             "水域": "#0000FF", "草地": "#90EE90", "未利用地": "#D2B48C"}

for ax, (data, title) in zip(axes[:2], [(land_2015, "2015年"), (land_2020, "2020年")]):
    data.plot(ax=ax, column="landuse", color=[color_map.get(x, "#ccc") for x in data["landuse"]])
    ax.set_title(f"{title}土地利用")

# 变化量柱状图
change_summary["变化量(ha)"].plot(kind="bar", ax=axes[2], color="steelblue")
axes[2].set_title("各类土地利用变化量")
axes[2].set_ylabel("面积变化(公顷)")
axes[2].tick_params(axis="x", rotation=45)

plt.tight_layout()
plt.savefig("output/landuse_change.png", dpi=150)
plt.show()

27.4 案例三:交通网络可达性分析

27.4.1 需求描述

评估公交站点的服务覆盖范围,分析居民区是否在合理的步行距离内被公共交通覆盖。

27.4.2 数据加载

# 读取公交站点和居民区数据
stations = gpd.read_file("data/bus_stations.shp")
residential = gpd.read_file("data/residential_areas.shp")

# 转换到投影坐标系
stations = stations.to_crs("EPSG:32650")
residential = residential.to_crs("EPSG:32650")

print(f"公交站点数量: {len(stations)}")
print(f"居民区数量: {len(residential)}")

27.4.3 缓冲区分析

# 创建不同步行距离的缓冲区
buffer_distances = {"300m": 300, "500m": 500, "800m": 800}
coverage_results = {}

for label, dist in buffer_distances.items():
    # 创建缓冲区并合并
    buffers = stations.copy()
    buffers["geometry"] = stations.geometry.buffer(dist)
    merged_buffer = buffers.dissolve()

    # 计算居民区覆盖情况
    covered = gpd.sjoin(residential, buffers, how="inner", predicate="intersects")
    covered_ids = covered.index.unique()
    coverage_rate = len(covered_ids) / len(residential) * 100

    coverage_results[label] = {
        "覆盖居民区数": len(covered_ids),
        "覆盖率(%)": round(coverage_rate, 2),
        "缓冲区面积(km²)": round(merged_buffer.geometry.area.sum() / 1e6, 2)
    }

coverage_df = pd.DataFrame(coverage_results).T
print("公交站点覆盖分析:")
print(coverage_df)

27.4.4 覆盖盲区识别

# 识别 500m 范围内未被覆盖的居民区
buffer_500 = stations.geometry.buffer(500).unary_union
residential["is_covered"] = residential.geometry.intersects(buffer_500)
uncovered = residential[~residential["is_covered"]]

print(f"未覆盖居民区数量: {len(uncovered)}")
print(f"未覆盖居民区占比: {len(uncovered)/len(residential)*100:.1f}%")

# 可视化
fig, ax = plt.subplots(figsize=(12, 10))
residential[residential["is_covered"]].plot(ax=ax, color="lightgreen", label="已覆盖")
uncovered.plot(ax=ax, color="red", label="未覆盖")
stations.plot(ax=ax, color="blue", markersize=10, marker="^", label="公交站点")
ax.legend()
ax.set_title("公交站点 500m 覆盖分析")
plt.savefig("output/transit_coverage.png", dpi=150)
plt.show()

27.5 案例四:环境监测站点优化

27.5.1 需求描述

评估现有环境监测站点的空间分布合理性,识别监测盲区,并提出新增站点建议。

27.5.2 现有站点分布分析

from sklearn.cluster import DBSCAN
import numpy as np

# 读取数据
monitor_stations = gpd.read_file("data/monitor_stations.shp").to_crs("EPSG:32650")
study_area = gpd.read_file("data/study_area.shp").to_crs("EPSG:32650")

# 计算最近邻距离
from scipy.spatial import cKDTree

coords = np.array(list(zip(monitor_stations.geometry.x, monitor_stations.geometry.y)))
tree = cKDTree(coords)
distances, _ = tree.query(coords, k=2)
nn_distances = distances[:, 1]  # 排除自身

print(f"最近邻距离统计:")
print(f"  最小值: {nn_distances.min():.0f} m")
print(f"  最大值: {nn_distances.max():.0f} m")
print(f"  均值:   {nn_distances.mean():.0f} m")
print(f"  标准差: {nn_distances.std():.0f} m")

27.5.3 空间聚类分析

# 使用 DBSCAN 识别站点聚类
clustering = DBSCAN(eps=5000, min_samples=3).fit(coords)
monitor_stations["cluster"] = clustering.labels_

n_clusters = len(set(clustering.labels_)) - (1 if -1 in clustering.labels_ else 0)
n_noise = list(clustering.labels_).count(-1)

print(f"聚类数量: {n_clusters}")
print(f"离群站点数量: {n_noise}")

# 各聚类统计
cluster_stats = monitor_stations.groupby("cluster").agg(
    站点数=("cluster", "size"),
    平均监测值=("value", "mean")
).round(2)
print(cluster_stats)

27.5.4 监测盲区识别与站点建议

# 创建监测覆盖区域(假设有效监测半径 10km)
coverage = monitor_stations.geometry.buffer(10000).unary_union

# 计算盲区
study_boundary = study_area.geometry.unary_union
blind_area = study_boundary.difference(coverage)

total_area = study_boundary.area / 1e6
blind_area_km2 = blind_area.area / 1e6
print(f"研究区总面积: {total_area:.1f} km²")
print(f"监测盲区面积: {blind_area_km2:.1f} km²")
print(f"盲区占比: {blind_area_km2/total_area*100:.1f}%")

# 在盲区中建议新增站点位置(取盲区质心)
if blind_area.geom_type == "MultiPolygon":
    suggested = gpd.GeoDataFrame(
        geometry=[poly.centroid for poly in blind_area.geoms if poly.area > 1e6],
        crs="EPSG:32650"
    )
    print(f"建议新增站点数: {len(suggested)}")

27.6 案例五:房产价格空间分析

27.6.1 需求描述

分析房产价格的空间分布特征,检验空间自相关性,识别价格高值和低值聚集区域。

27.6.2 数据准备

import libpysal
from esda.moran import Moran

# 读取房产数据
housing = gpd.read_file("data/housing_prices.shp").to_crs("EPSG:32650")

print(f"样本数量: {len(housing)}")
print(housing[["price", "area_sqm", "rooms"]].describe().round(2))

# 计算单价
housing["unit_price"] = housing["price"] / housing["area_sqm"]

27.6.3 空间自相关分析

# 构建空间权重矩阵(K最近邻,k=8)
w = libpysal.weights.KNN.from_dataframe(housing, k=8)
w.transform = "r"  # 行标准化

# 全局 Moran's I
moran = Moran(housing["unit_price"], w)
print(f"全局 Moran's I: {moran.I:.4f}")
print(f"期望值 E[I]:    {moran.EI:.4f}")
print(f"p 值:           {moran.p_sim:.4f}")

if moran.p_sim < 0.05:
    print("结论: 房产单价存在显著的空间正自相关,即价格相似的房产在空间上趋于聚集。")
else:
    print("结论: 房产单价不存在显著的空间自相关。")

27.6.4 局部空间自相关(LISA)

from esda.moran import Moran_Local

# 计算局部 Moran's I
lisa = Moran_Local(housing["unit_price"], w)

# 标记显著性(p < 0.05)
housing["lisa_q"] = lisa.q           # 象限: 1=HH, 2=LH, 3=LL, 4=HL
housing["lisa_sig"] = lisa.p_sim < 0.05

# 显著聚类统计
sig_clusters = housing[housing["lisa_sig"]]
quadrant_labels = {1: "高-高聚集", 2: "低-高离群", 3: "低-低聚集", 4: "高-低离群"}
cluster_counts = sig_clusters["lisa_q"].map(quadrant_labels).value_counts()
print("显著空间聚类统计:")
print(cluster_counts)

# LISA 聚类图
fig, ax = plt.subplots(figsize=(12, 10))
colors = {1: "red", 2: "lightblue", 3: "blue", 4: "pink", 0: "lightgray"}
housing["color"] = housing.apply(
    lambda r: colors[r["lisa_q"]] if r["lisa_sig"] else colors[0], axis=1
)
housing.plot(ax=ax, color=housing["color"], markersize=15)
ax.set_title("房产单价 LISA 聚类图")

# 手动创建图例
from matplotlib.patches import Patch
legend_items = [Patch(color=c, label=l) for l, c in
                zip(["高-高", "低-高", "低-低", "高-低", "不显著"],
                    ["red", "lightblue", "blue", "pink", "lightgray"])]
ax.legend(handles=legend_items, loc="lower right")

plt.savefig("output/housing_lisa.png", dpi=150)
plt.show()

27.7 项目组织最佳实践

27.7.1 推荐目录结构

一个规范的 GeoPandas 项目应采用清晰的目录结构:

project_root/
├── data/                # 原始数据
│   ├── raw/             # 未处理数据
│   └── processed/       # 清洗后数据
├── notebooks/           # Jupyter Notebook
├── src/                 # 核心源代码
│   ├── __init__.py
│   ├── data_loader.py   # 数据读取模块
│   ├── analysis.py      # 分析功能模块
│   └── visualization.py # 可视化模块
├── output/              # 输出结果
│   ├── figures/         # 图表
│   └── reports/         # 报告
├── tests/               # 单元测试
├── config.py            # 配置文件
├── requirements.txt     # 依赖列表
└── README.md            # 项目说明

27.7.2 代码规范建议

规范要点 推荐做法 示例
CRS 管理 在分析入口处统一转换 gdf = gdf.to_crs("EPSG:32650")
几何验证 读取后立即检查并修复 gdf.geometry = gdf.geometry.make_valid()
列命名 使用小写下划线风格 land_use_type 而非 LandUseType
函数封装 单一职责,参数明确 每个函数只做一件事
异常处理 捕获文件读取和几何操作异常 try/except 包裹关键步骤
# 示例:封装可复用的分析函数
def calculate_coverage(stations_gdf, target_gdf, buffer_dist):
    """计算设施点对目标区域的覆盖率。

    Parameters
    ----------
    stations_gdf : GeoDataFrame
        设施点数据(投影坐标系)。
    target_gdf : GeoDataFrame
        待评估的目标区域数据。
    buffer_dist : float
        缓冲区距离(单位与坐标系一致)。

    Returns
    -------
    float
        覆盖率(0-100)。
    """
    buffer_union = stations_gdf.geometry.buffer(buffer_dist).unary_union
    covered = target_gdf[target_gdf.geometry.intersects(buffer_union)]
    return len(covered) / len(target_gdf) * 100

27.8 数据管理最佳实践

27.8.1 数据格式选择

格式 优点 缺点 适用场景
GeoPackage 开放标准、支持多图层、无文件数量限制 单文件可能较大 推荐为默认格式
Shapefile 兼容性最好 字段名 ≤10 字符、文件碎片多 与传统 GIS 软件交换
GeoJSON 纯文本、Web 友好 文件体积大、仅支持 WGS84 Web 应用、小数据集
Parquet (GeoParquet) 列式存储、读写极快、体积小 生态尚在发展 大数据集、云存储
FlatGeobuf 流式读取、支持空间过滤 较新格式 大数据集远程访问
# 推荐使用 GeoPackage 作为项目默认格式
gdf.to_file("data/processed/result.gpkg", driver="GPKG", layer="analysis_result")

# 大数据集使用 Parquet
gdf.to_parquet("data/processed/result.parquet")

27.8.2 坐标参考系管理

def ensure_projected_crs(gdf, target_crs="EPSG:32650"):
    """确保 GeoDataFrame 使用投影坐标系。"""
    if gdf.crs is None:
        raise ValueError("GeoDataFrame 缺少 CRS 信息,请先设置 CRS。")
    if gdf.crs.is_geographic:
        print(f"从 {gdf.crs} 转换到 {target_crs}")
        return gdf.to_crs(target_crs)
    return gdf

# 使用示例
districts = gpd.read_file("data/districts.gpkg")
districts = ensure_projected_crs(districts)

27.8.3 数据质量检查清单

def data_quality_report(gdf, name="GeoDataFrame"):
    """生成数据质量报告。"""
    report = {
        "数据集名称": name,
        "记录数": len(gdf),
        "列数": len(gdf.columns),
        "CRS": str(gdf.crs),
        "几何类型": gdf.geom_type.unique().tolist(),
        "空几何数": gdf.geometry.is_empty.sum(),
        "无效几何数": (~gdf.geometry.is_valid).sum(),
        "缺失值总数": gdf.isnull().sum().sum(),
        "边界范围": gdf.total_bounds.tolist(),
    }
    for key, val in report.items():
        print(f"  {key}: {val}")
    return report

data_quality_report(districts, "行政区划数据")

27.9 性能最佳实践

27.9.1 优化策略速查表

策略 方法 预期提升
使用空间索引 gdf.sindex,确保在空间查询前构建 10-100×
选择高效格式 Parquet / FlatGeobuf 替代 Shapefile 2-10×
减少数据量 提前筛选行列、简化几何 视数据而定
分块处理 对大数据集分区逐块处理 降低内存峰值
向量化操作 避免逐行循环,使用 apply 或原生方法 5-50×
并行计算 multiprocessing 或 Dask-GeoPandas 2-8×

27.9.2 关键性能技巧

# 1. 仅读取需要的列
gdf = gpd.read_file("data/large_dataset.gpkg", columns=["id", "name", "geometry"])

# 2. 使用空间过滤(bbox)减少读取量
from shapely.geometry import box
bbox = box(116.0, 39.5, 117.0, 40.5)
gdf = gpd.read_file("data/large_dataset.gpkg", mask=bbox)

# 3. 几何简化降低计算复杂度
gdf["geometry"] = gdf.geometry.simplify(tolerance=10, preserve_topology=True)

# 4. 利用空间索引加速查询
from shapely.strtree import STRtree

tree = STRtree(gdf.geometry.values)
query_geom = box(116.3, 39.8, 116.5, 40.0)
result_indices = tree.query(query_geom)
print(f"空间索引查询到 {len(result_indices)} 个要素")

# 5. 使用 Dask-GeoPandas 处理超大数据集
import dask_geopandas

ddf = dask_geopandas.read_parquet("data/very_large.parquet")
result = ddf[ddf.geometry.within(query_geom)].compute()

27.9.3 内存管理建议

import gc

def process_in_chunks(filepath, chunk_size=50000):
    """分块读取和处理大型空间数据。"""
    total = gpd.read_file(filepath, rows=1)  # 仅读取元数据
    results = []

    offset = 0
    while True:
        chunk = gpd.read_file(filepath, rows=slice(offset, offset + chunk_size))
        if len(chunk) == 0:
            break

        processed = chunk[chunk.geometry.is_valid].copy()
        processed["area"] = processed.geometry.area
        results.append(processed[["id", "area"]])

        offset += chunk_size
        del chunk
        gc.collect()

    return pd.concat(results, ignore_index=True)

27.10 常见问题与排错指南

27.10.1 常见问题 FAQ

Q1:sjoin 结果为空,但数据明显有重叠?

# 原因:CRS 不一致
print(gdf1.crs, gdf2.crs)  # 检查两个数据集的 CRS

# 解决:统一 CRS 后再进行空间连接
gdf2 = gdf2.to_crs(gdf1.crs)
result = gpd.sjoin(gdf1, gdf2, predicate="intersects")

Q2:overlay 操作非常慢或报错?

# 原因:存在无效几何
print(f"无效几何数: {(~gdf.geometry.is_valid).sum()}")

# 解决:先修复几何
gdf["geometry"] = gdf.geometry.make_valid()
# 也可设置零宽度缓冲区修复
gdf["geometry"] = gdf.geometry.buffer(0)

Q3:面积计算结果不合理(值极小或极大)?

# 原因:使用了地理坐标系(度)而非投影坐标系(米)
print(gdf.crs.is_geographic)  # True 则为地理坐标系

# 解决:转换到合适的投影坐标系
gdf_proj = gdf.to_crs("EPSG:32650")  # UTM 50N(适用于中国东部)
gdf_proj["area_m2"] = gdf_proj.geometry.area

Q4:读取 Shapefile 出现中文乱码?

# 解决:指定编码
gdf = gpd.read_file("data/china.shp", encoding="gbk")
# 或
gdf = gpd.read_file("data/china.shp", encoding="utf-8")

27.10.2 排错流程图

遇到空间分析问题时,建议按以下步骤排查:

步骤 检查项 对应方法
1 CRS 是否一致 gdf.crs
2 几何是否有效 gdf.geometry.is_valid.all()
3 数据是否为空 len(gdf), gdf.geometry.is_empty.any()
4 几何类型是否匹配 gdf.geom_type.unique()
5 空间范围是否重叠 gdf.total_bounds
6 坐标精度是否合理 目视检查坐标范围值
def spatial_debug(gdf1, gdf2):
    """空间分析前的快速诊断。"""
    print("=== 空间诊断报告 ===")
    print(f"数据集1: {len(gdf1)} 条记录, CRS={gdf1.crs}")
    print(f"数据集2: {len(gdf2)} 条记录, CRS={gdf2.crs}")
    print(f"CRS 一致: {gdf1.crs == gdf2.crs}")
    print(f"数据集1 无效几何: {(~gdf1.geometry.is_valid).sum()}")
    print(f"数据集2 无效几何: {(~gdf2.geometry.is_valid).sum()}")
    print(f"数据集1 边界: {gdf1.total_bounds}")
    print(f"数据集2 边界: {gdf2.total_bounds}")

    b1, b2 = gdf1.total_bounds, gdf2.total_bounds
    overlap = not (b1[2] < b2[0] or b2[2] < b1[0] or b1[3] < b2[1] or b2[3] < b1[1])
    print(f"边界范围是否重叠: {overlap}")

27.11 本章小结

本章通过五个实战案例和四个方面的最佳实践,将前面章节所学知识融会贯通:

内容 要点回顾
城市 POI 分析 空间连接、密度统计、专题制图
土地利用变化分析 叠置分析、转换矩阵、面积统计
交通可达性分析 缓冲区分析、覆盖率计算、盲区识别
环境监测站点优化 空间聚类、最近邻分析、盲区补充
房产价格空间分析 空间权重、Moran’s I、LISA 聚类图
项目组织 目录结构规范、代码封装、文档注释
数据管理 格式选择、CRS 管理、质量检查
性能优化 空间索引、分块处理、内存管理
排错指南 常见问题诊断、系统化排错流程

掌握了这些实战技能和最佳实践,读者可以自信地应对真实世界中的地理空间分析项目。建议在实际工作中不断积累案例经验,形成自己的工具库和方法论。