znlgis 博客

GIS开发与技术分享

第9章:数据读写——数据库 IO

在企业级 GIS 应用和大规模空间数据管理中,空间数据库是不可或缺的基础设施。GeoPandas 提供了与 PostGIS、Spatialite 等空间数据库的无缝集成,支持直接从数据库读取空间数据,也支持将处理结果写回数据库。本章将详细介绍 GeoPandas 的数据库 IO 功能。


9.1 空间数据库概述

9.1.1 什么是空间数据库

空间数据库是在传统关系型数据库的基础上,添加了空间数据类型和空间查询功能的数据库系统。它能够存储、索引和查询地理空间数据。

9.1.2 常见空间数据库

数据库 说明 空间扩展 开源
PostgreSQL + PostGIS 最强大的开源空间数据库 PostGIS
SQLite + Spatialite 轻量级嵌入式空间数据库 SpatiaLite
MySQL 内置基本空间功能 内置
Oracle Spatial 企业级空间数据库 Oracle Spatial
SQL Server 微软空间数据库 内置
DuckDB + Spatial 新兴的分析型空间数据库 Spatial 扩展

9.1.3 PostGIS 简介

PostGIS 是 PostgreSQL 数据库的空间扩展,是目前最功能完善、使用最广泛的开源空间数据库。

PostGIS 的核心能力:

  • 存储和索引各种几何类型(点、线、面、多几何等)
  • 空间关系查询(包含、相交、邻近等)
  • 空间分析函数(缓冲区、叠加、最近邻等)
  • 坐标系转换
  • 拓扑分析
  • 栅格数据支持
  • 3D 和 4D 几何支持
-- PostGIS 基本操作示例
-- 创建空间表
CREATE TABLE cities (
    id SERIAL PRIMARY KEY,
    name VARCHAR(100),
    population INTEGER,
    geom GEOMETRY(Point, 4326)
);

-- 创建空间索引
CREATE INDEX idx_cities_geom ON cities USING GIST (geom);

-- 插入数据
INSERT INTO cities (name, population, geom)
VALUES ('北京', 21710000, ST_SetSRID(ST_MakePoint(116.407, 39.904), 4326));

-- 空间查询
SELECT name, population
FROM cities
WHERE ST_DWithin(
    geom,
    ST_SetSRID(ST_MakePoint(116.4, 39.9), 4326),
    0.1
);

9.1.4 Spatialite 简介

Spatialite 是 SQLite 的空间扩展,是一个轻量级的嵌入式空间数据库,无需安装服务器即可使用。

# Spatialite 的优势:
# - 零配置,单文件数据库
# - 无需安装数据库服务器
# - 适合桌面应用和小型项目
# - 可用于测试和原型开发

9.2 数据库连接配置

9.2.1 SQLAlchemy Engine

GeoPandas 使用 SQLAlchemy 作为数据库连接的抽象层。首先需要创建一个数据库引擎(Engine)。

from sqlalchemy import create_engine

# PostgreSQL/PostGIS 连接
engine = create_engine(
    "postgresql://username:password@hostname:5432/database_name"
)

# 连接字符串格式:
# postgresql://用户名:密码@主机:端口/数据库名

9.2.2 连接字符串格式

from sqlalchemy import create_engine

# 1. 基本连接
engine = create_engine("postgresql://user:pass@localhost:5432/mydb")

# 2. 使用 psycopg2 驱动(明确指定)
engine = create_engine("postgresql+psycopg2://user:pass@localhost:5432/mydb")

# 3. 使用 psycopg(v3)驱动
engine = create_engine("postgresql+psycopg://user:pass@localhost:5432/mydb")

# 4. 带额外参数的连接
engine = create_engine(
    "postgresql://user:pass@localhost:5432/mydb",
    echo=False,                # 不打印 SQL 语句
    pool_size=5,               # 连接池大小
    max_overflow=10,           # 最大溢出连接数
    pool_timeout=30,           # 连接超时
    pool_recycle=1800          # 连接回收时间(秒)
)

9.2.3 使用环境变量管理凭据

import os
from sqlalchemy import create_engine

# 从环境变量读取数据库配置
DB_USER = os.environ.get("DB_USER", "postgres")
DB_PASS = os.environ.get("DB_PASS", "password")
DB_HOST = os.environ.get("DB_HOST", "localhost")
DB_PORT = os.environ.get("DB_PORT", "5432")
DB_NAME = os.environ.get("DB_NAME", "gis_db")

engine = create_engine(
    f"postgresql://{DB_USER}:{DB_PASS}@{DB_HOST}:{DB_PORT}/{DB_NAME}"
)

9.2.4 使用 psycopg 直接连接

import psycopg2

# 直接使用 psycopg2 连接
conn = psycopg2.connect(
    host="localhost",
    port=5432,
    dbname="gis_db",
    user="postgres",
    password="password"
)

# 注意:read_postgis() 也支持直接传入 psycopg2 连接
# 但推荐使用 SQLAlchemy Engine

9.2.5 SQLite/Spatialite 连接

from sqlalchemy import create_engine

# SQLite 连接(无 Spatialite 扩展)
engine = create_engine("sqlite:///my_database.db")

# Spatialite 连接需要特殊处理
import sqlite3

def load_spatialite(dbapi_conn, connection_record):
    """加载 Spatialite 扩展"""
    dbapi_conn.enable_load_extension(True)
    dbapi_conn.load_extension("mod_spatialite")

from sqlalchemy import event
engine = create_engine("sqlite:///my_spatial_db.db")
event.listen(engine, "connect", load_spatialite)

9.3 read_postgis() 详解

9.3.1 基本语法

geopandas.read_postgis(
    sql,                    # SQL 查询语句或表名
    con,                    # 数据库连接(SQLAlchemy Engine 或连接字符串)
    geom_col="geom",        # 几何列名称
    crs=None,               # 坐标参考系统
    index_col=None,         # 用作索引的列
    coerce_float=True,      # 是否将数值转为浮点数
    parse_dates=None,       # 日期列解析
    params=None,            # SQL 参数
    chunksize=None          # 分块读取大小
)

9.3.2 基本读取

import geopandas as gpd
from sqlalchemy import create_engine

# 创建连接
engine = create_engine("postgresql://user:pass@localhost:5432/gis_db")

# 方法1:读取整张表
gdf = gpd.read_postgis(
    "SELECT * FROM provinces",
    con=engine,
    geom_col="geom"
)
print(f"读取 {len(gdf)} 条记录")

# 方法2:使用表名(自动生成 SELECT *)
gdf = gpd.read_postgis(
    "provinces",
    con=engine,
    geom_col="geom"
)

9.3.3 指定几何列

# 当表中有多个几何列时,需要明确指定
gdf = gpd.read_postgis(
    "SELECT id, name, geom, centroid_geom FROM regions",
    con=engine,
    geom_col="geom"  # 指定使用哪个几何列
)

# 如果几何列名不是默认的 "geom"
gdf = gpd.read_postgis(
    "SELECT * FROM my_table",
    con=engine,
    geom_col="the_geom"  # 或 "geometry"、"shape" 等
)

9.3.4 指定 CRS

# 方法1:从数据库自动获取 CRS(如果表定义了 SRID)
gdf = gpd.read_postgis(
    "SELECT * FROM provinces",
    con=engine,
    geom_col="geom"
    # CRS 会从 geometry_columns 表中自动获取
)
print(gdf.crs)  # 自动识别

# 方法2:显式指定 CRS
gdf = gpd.read_postgis(
    "SELECT * FROM provinces",
    con=engine,
    geom_col="geom",
    crs="EPSG:4326"  # 显式指定
)

9.3.5 使用 SQL 查询进行过滤

# 属性过滤
gdf = gpd.read_postgis(
    """
    SELECT name, population, area_km2, geom
    FROM cities
    WHERE population > 5000000
    ORDER BY population DESC
    """,
    con=engine,
    geom_col="geom"
)

# 空间过滤(使用 PostGIS 函数)
gdf = gpd.read_postgis(
    """
    SELECT c.name, c.population, c.geom
    FROM cities c
    WHERE ST_Intersects(
        c.geom,
        ST_MakeEnvelope(115, 39, 117, 41, 4326)
    )
    """,
    con=engine,
    geom_col="geom"
)

# 空间连接查询
gdf = gpd.read_postgis(
    """
    SELECT c.name AS city_name,
           p.name AS province_name,
           c.population,
           c.geom
    FROM cities c
    JOIN provinces p ON ST_Within(c.geom, p.geom)
    WHERE p.name = '广东省'
    """,
    con=engine,
    geom_col="geom"
)

9.3.6 参数化查询

# 使用参数化查询防止 SQL 注入
from sqlalchemy import text

# 方法1:使用 params 参数
gdf = gpd.read_postgis(
    text("SELECT * FROM cities WHERE province = :prov AND population > :pop"),
    con=engine,
    geom_col="geom",
    params={"prov": "广东省", "pop": 1000000}
)

# 方法2:使用 SQLAlchemy text() 的 bindparams
from sqlalchemy import text, bindparam
sql = text(
    "SELECT * FROM cities WHERE name = :name"
).bindparams(bindparam("name", type_=str))

gdf = gpd.read_postgis(sql, con=engine, geom_col="geom", params={"name": "广州"})

9.4 to_postgis() 详解

9.4.1 基本语法

GeoDataFrame.to_postgis(
    name,                   # 目标表名
    con,                    # 数据库连接
    schema=None,            # 数据库 schema
    if_exists="fail",       # 表已存在时的行为
    index=True,             # 是否写入索引
    index_label=None,       # 索引列名
    chunksize=None,         # 分块写入大小
    dtype=None              # 列类型映射
)

9.4.2 基本写入

import geopandas as gpd
from sqlalchemy import create_engine

engine = create_engine("postgresql://user:pass@localhost:5432/gis_db")

# 写入新表
gdf.to_postgis("cities", con=engine)

# 写入时会自动:
# 1. 创建表结构
# 2. 设置几何列类型和 SRID
# 3. 写入数据

9.4.3 if_exists 参数

# 表已存在时的处理策略

# "fail"(默认):如果表存在则报错
gdf.to_postgis("cities", con=engine, if_exists="fail")

# "replace":删除旧表,创建新表
gdf.to_postgis("cities", con=engine, if_exists="replace")

# "append":追加到已有表
gdf.to_postgis("cities", con=engine, if_exists="append")

9.4.4 指定 schema

# 写入到特定 schema
gdf.to_postgis("cities", con=engine, schema="public")
gdf.to_postgis("cities", con=engine, schema="gis_data")

# 确保 schema 存在
from sqlalchemy import text
with engine.connect() as conn:
    conn.execute(text("CREATE SCHEMA IF NOT EXISTS gis_data"))
    conn.commit()

9.4.5 索引控制

# 写入索引(默认)
gdf.to_postgis("cities", con=engine, index=True)

# 不写入索引
gdf.to_postgis("cities", con=engine, index=False)

# 自定义索引列名
gdf.to_postgis("cities", con=engine, index=True, index_label="gid")

9.4.6 dtype 参数 — 自定义列类型

from sqlalchemy import Integer, String, Float
from geoalchemy2 import Geometry

# 显式指定列类型
gdf.to_postgis(
    "cities",
    con=engine,
    dtype={
        "name": String(100),
        "population": Integer,
        "area_km2": Float,
        "geometry": Geometry("POINT", srid=4326)
    }
)

9.4.7 分块写入

# 对于大数据集,使用分块写入以控制内存和事务大小
gdf.to_postgis(
    "large_table",
    con=engine,
    chunksize=10000,  # 每次写入 10000 行
    if_exists="replace"
)

9.4.8 写入后创建空间索引

from sqlalchemy import text

# 写入数据
gdf.to_postgis("cities", con=engine, if_exists="replace")

# 手动创建空间索引(如果没有自动创建)
with engine.connect() as conn:
    conn.execute(text(
        "CREATE INDEX IF NOT EXISTS idx_cities_geometry "
        "ON cities USING GIST (geometry)"
    ))
    conn.commit()

# 分析表以更新统计信息
with engine.connect() as conn:
    conn.execute(text("ANALYZE cities"))
    conn.commit()

9.5 Spatialite 读写

9.5.1 创建 Spatialite 数据库

import sqlite3

# 创建数据库并加载 Spatialite 扩展
conn = sqlite3.connect("my_spatial.db")
conn.enable_load_extension(True)

try:
    conn.load_extension("mod_spatialite")
except Exception as e:
    print(f"加载 Spatialite 扩展失败: {e}")
    print("请确保已安装 libspatialite")

# 初始化空间元数据表
conn.execute("SELECT InitSpatialMetaData(1)")
conn.commit()

9.5.2 使用 GeoPandas 读取 Spatialite

import geopandas as gpd
from sqlalchemy import create_engine, event
import sqlite3

# 创建引擎并加载 Spatialite 扩展
engine = create_engine("sqlite:///my_spatial.db")

@event.listens_for(engine, "connect")
def connect(dbapi_connection, connection_record):
    dbapi_connection.enable_load_extension(True)
    dbapi_connection.load_extension("mod_spatialite")

# 读取数据
gdf = gpd.read_postgis(
    "SELECT *, AsText(geometry) FROM cities",
    con=engine,
    geom_col="geometry"
)

# 或者直接使用 read_file() 读取 Spatialite
gdf = gpd.read_file("my_spatial.db", layer="cities")

9.5.3 使用 GeoPandas 写入 Spatialite

# 方法1:使用 to_file()(推荐,更简单)
gdf.to_file("output.db", driver="SQLite", layer="cities")

# 方法2:通过 GeoPackage 格式(GeoPackage 本身就是 SQLite)
gdf.to_file("output.gpkg", layer="cities")

# 方法3:使用 SQLAlchemy(需要更多配置)
# 需要 GeoAlchemy2 支持

9.5.4 Spatialite vs GeoPackage

特性 Spatialite GeoPackage
基础 SQLite + SpatiaLite 扩展 SQLite(OGC 标准)
需要扩展 是(mod_spatialite)
空间函数 丰富(类似 PostGIS) 有限
互操作性 一般 优秀(OGC 标准)
GeoPandas 支持 通过 SQLAlchemy 原生支持
推荐场景 需要复杂空间查询 数据存储和交换

9.6 GeoAlchemy2 集成

9.6.1 GeoAlchemy2 简介

GeoAlchemy2 是 SQLAlchemy 的地理空间扩展,提供了 ORM 模式下的空间数据操作能力。

# 安装
# pip install geoalchemy2

from sqlalchemy import Column, Integer, String
from sqlalchemy.orm import declarative_base
from geoalchemy2 import Geometry

Base = declarative_base()

class City(Base):
    __tablename__ = "cities"

    id = Column(Integer, primary_key=True)
    name = Column(String(100))
    population = Column(Integer)
    geom = Column(Geometry("POINT", srid=4326))

9.6.2 使用 ORM 创建表

from sqlalchemy import create_engine
from sqlalchemy.orm import Session

engine = create_engine("postgresql://user:pass@localhost:5432/gis_db")

# 创建所有表
Base.metadata.create_all(engine)

# 插入数据
with Session(engine) as session:
    city = City(
        name="北京",
        population=21710000,
        geom="SRID=4326;POINT(116.407 39.904)"
    )
    session.add(city)
    session.commit()

9.6.3 ORM 空间查询

from sqlalchemy.orm import Session
from geoalchemy2.functions import ST_DWithin, ST_GeomFromText
from geoalchemy2 import WKTElement

with Session(engine) as session:
    # 查找距离某点 0.1 度范围内的城市
    center = WKTElement("POINT(116.4 39.9)", srid=4326)

    cities = session.query(City).filter(
        ST_DWithin(City.geom, center, 0.1)
    ).all()

    for city in cities:
        print(f"{city.name}: 人口 {city.population}")

9.6.4 将 ORM 查询结果转为 GeoDataFrame

import geopandas as gpd
from sqlalchemy import select

# 使用 SQLAlchemy Core 查询
stmt = select(City).where(City.population > 5000000)

# 通过 read_postgis 执行查询
gdf = gpd.read_postgis(
    stmt,
    con=engine,
    geom_col="geom"
)
print(gdf)

9.7 高级 SQL 空间查询

9.7.1 空间关系函数

PostGIS 提供了丰富的空间关系函数,可以在 SQL 中直接使用:

import geopandas as gpd

# ST_Intersects — 相交
gdf = gpd.read_postgis(
    """
    SELECT a.name, a.geom
    FROM buildings a, flood_zones b
    WHERE ST_Intersects(a.geom, b.geom)
    AND b.risk_level = 'high'
    """,
    con=engine,
    geom_col="geom"
)

# ST_Within — 在…之内
gdf = gpd.read_postgis(
    """
    SELECT p.name, p.geom
    FROM pois p, districts d
    WHERE ST_Within(p.geom, d.geom)
    AND d.name = '海淀区'
    """,
    con=engine,
    geom_col="geom"
)

# ST_Contains — 包含
gdf = gpd.read_postgis(
    """
    SELECT d.name, d.geom, COUNT(p.id) as poi_count
    FROM districts d
    LEFT JOIN pois p ON ST_Contains(d.geom, p.geom)
    GROUP BY d.name, d.geom
    """,
    con=engine,
    geom_col="geom"
)

9.7.2 空间分析函数

# ST_Buffer — 缓冲区分析
gdf = gpd.read_postgis(
    """
    SELECT name,
           ST_Buffer(geom::geography, 1000)::geometry as geom
    FROM schools
    """,
    con=engine,
    geom_col="geom"
)

# ST_Distance — 距离计算
gdf = gpd.read_postgis(
    """
    SELECT a.name AS school_name,
           b.name AS hospital_name,
           ST_Distance(a.geom::geography, b.geom::geography) as distance_m
    FROM schools a
    CROSS JOIN LATERAL (
        SELECT name, geom
        FROM hospitals
        ORDER BY a.geom <-> geom
        LIMIT 1
    ) b
    """,
    con=engine,
    geom_col="geom"  # 注意此查询可能需要调整
)

# ST_Union — 合并几何
gdf = gpd.read_postgis(
    """
    SELECT province,
           ST_Union(geom) as geom,
           SUM(population) as total_pop
    FROM cities
    GROUP BY province
    """,
    con=engine,
    geom_col="geom"
)

9.7.3 空间索引利用

# 使用空间索引优化的查询模式
# PostGIS 会自动利用 GIST 索引

# 最近邻查询(KNN)— 使用 <-> 操作符
gdf = gpd.read_postgis(
    """
    SELECT name, population, geom
    FROM cities
    ORDER BY geom <-> ST_SetSRID(ST_MakePoint(116.4, 39.9), 4326)
    LIMIT 10
    """,
    con=engine,
    geom_col="geom"
)

# 边界框过滤 — 使用 && 操作符
gdf = gpd.read_postgis(
    """
    SELECT *
    FROM buildings
    WHERE geom && ST_MakeEnvelope(115, 39, 117, 41, 4326)
    """,
    con=engine,
    geom_col="geom"
)

9.8 大数据量流式读取

9.8.1 chunksize 参数

对于大表,一次性读取可能导致内存不足。使用 chunksize 参数可以分块读取:

import geopandas as gpd
from sqlalchemy import create_engine

engine = create_engine("postgresql://user:pass@localhost:5432/gis_db")

# 分块读取,每次 10000 行
chunks = gpd.read_postgis(
    "SELECT * FROM large_table",
    con=engine,
    geom_col="geom",
    chunksize=10000
)

# chunks 是一个迭代器
results = []
for i, chunk in enumerate(chunks):
    print(f"处理第 {i+1} 块,{len(chunk)} 行")

    # 对每块数据进行处理
    processed = process_chunk(chunk)
    results.append(processed)

# 合并结果
import pandas as pd
final_gdf = gpd.GeoDataFrame(pd.concat(results, ignore_index=True))

9.8.2 服务端游标

# 使用服务端游标减少客户端内存使用
from sqlalchemy import create_engine, text

engine = create_engine(
    "postgresql://user:pass@localhost:5432/gis_db",
    execution_options={
        "stream_results": True  # 启用服务端游标
    }
)

# 配合 chunksize 使用
chunks = gpd.read_postgis(
    "SELECT * FROM very_large_table",
    con=engine,
    geom_col="geom",
    chunksize=50000
)

for chunk in chunks:
    # 处理每个块
    pass

9.8.3 分页读取

def read_paginated(engine, table, geom_col, page_size=50000):
    """使用 OFFSET/LIMIT 分页读取"""
    offset = 0
    all_chunks = []

    while True:
        sql = f"""
            SELECT *
            FROM {table}
            ORDER BY id
            LIMIT {page_size} OFFSET {offset}
        """
        chunk = gpd.read_postgis(sql, con=engine, geom_col=geom_col)

        if len(chunk) == 0:
            break

        all_chunks.append(chunk)
        offset += page_size
        print(f"已读取 {offset} 行...")

    import pandas as pd
    return gpd.GeoDataFrame(pd.concat(all_chunks, ignore_index=True))

# 使用
gdf = read_paginated(engine, "large_table", "geom")

9.9 连接池与性能优化

9.9.1 连接池配置

from sqlalchemy import create_engine
from sqlalchemy.pool import QueuePool

# 配置连接池
engine = create_engine(
    "postgresql://user:pass@localhost:5432/gis_db",
    poolclass=QueuePool,
    pool_size=5,           # 常驻连接数
    max_overflow=10,       # 最大溢出连接数
    pool_timeout=30,       # 获取连接的超时时间(秒)
    pool_recycle=1800,     # 连接回收时间(秒)
    pool_pre_ping=True     # 使用前检查连接是否有效
)

# 查看连接池状态
print(engine.pool.status())

9.9.2 批量写入优化

import geopandas as gpd

# 1. 使用适当的 chunksize
gdf.to_postgis("table", con=engine, chunksize=5000)

# 2. 先删除索引,写入后重建(大批量写入时)
from sqlalchemy import text

with engine.connect() as conn:
    # 删除空间索引
    conn.execute(text("DROP INDEX IF EXISTS idx_table_geom"))
    conn.commit()

# 写入数据
gdf.to_postgis("table", con=engine, if_exists="replace")

# 重建索引
with engine.connect() as conn:
    conn.execute(text(
        "CREATE INDEX idx_table_geom ON table USING GIST (geometry)"
    ))
    conn.execute(text("ANALYZE table"))
    conn.commit()

# 3. 使用 COPY 命令(最快的批量写入方式)
# 需要使用 psycopg2 的 copy_expert 功能
# 这超出了 GeoPandas 的直接支持范围

9.9.3 查询优化技巧

# 1. 只选择需要的列
gdf = gpd.read_postgis(
    "SELECT name, population, geom FROM cities",  # 而非 SELECT *
    con=engine,
    geom_col="geom"
)

# 2. 使用空间索引友好的查询
# ✅ 好:使用边界框先过滤
gdf = gpd.read_postgis(
    """
    SELECT *
    FROM buildings
    WHERE geom && ST_MakeEnvelope(115, 39, 117, 41, 4326)
    AND ST_Intersects(geom, some_complex_geometry)
    """,
    con=engine,
    geom_col="geom"
)

# 3. 使用 LIMIT 限制结果集
gdf = gpd.read_postgis(
    "SELECT * FROM pois ORDER BY geom <-> reference_point LIMIT 100",
    con=engine,
    geom_col="geom"
)

9.10 数据库空间索引

9.10.1 空间索引的重要性

空间索引是空间数据库性能的关键。没有空间索引,每次空间查询都需要扫描全表。

-- 创建 GIST 空间索引(PostGIS)
CREATE INDEX idx_cities_geom ON cities USING GIST (geom);

-- 查看索引使用情况
EXPLAIN ANALYZE
SELECT *
FROM cities
WHERE ST_Intersects(geom, ST_MakeEnvelope(115, 39, 117, 41, 4326));

9.10.2 在 Python 中管理空间索引

from sqlalchemy import text, inspect

# 检查表是否有空间索引
def check_spatial_index(engine, table_name, geom_col="geometry"):
    """检查表是否有空间索引"""
    inspector = inspect(engine)
    indexes = inspector.get_indexes(table_name)

    for idx in indexes:
        if geom_col in idx.get("column_names", []):
            print(f"✓ 找到空间索引: {idx['name']}")
            return True

    print(f"⚠️ 表 {table_name} 没有空间索引")
    return False

# 创建空间索引
def create_spatial_index(engine, table_name, geom_col="geometry"):
    """创建空间索引"""
    index_name = f"idx_{table_name}_{geom_col}"
    sql = f"CREATE INDEX IF NOT EXISTS {index_name} ON {table_name} USING GIST ({geom_col})"

    with engine.connect() as conn:
        conn.execute(text(sql))
        conn.execute(text(f"ANALYZE {table_name}"))
        conn.commit()

    print(f"✓ 已创建空间索引: {index_name}")

9.10.3 索引类型对比

索引类型 说明 适用场景
GIST 通用搜索树,PostGIS 默认 空间查询(推荐)
SP-GiST 空间分区 GiST 非均匀分布的数据
BRIN 块范围索引,体积极小 物理有序的大数据集
R-tree Spatialite 使用 Spatialite 数据库

9.11 常见问题排查

9.11.1 连接问题

# 问题1:连接被拒绝
# 解决:检查数据库是否运行、端口是否正确、防火墙设置

try:
    engine = create_engine("postgresql://user:pass@localhost:5432/gis_db")
    with engine.connect() as conn:
        conn.execute(text("SELECT 1"))
    print("✓ 连接成功")
except Exception as e:
    print(f"✗ 连接失败: {e}")

# 问题2:认证失败
# 解决:检查用户名密码、pg_hba.conf 配置

# 问题3:数据库不存在
# 解决:先创建数据库
# createdb -U postgres gis_db

9.11.2 几何列识别问题

# 问题:read_postgis() 无法识别几何列
# 原因:查询结果中的几何列名与 geom_col 参数不匹配

# 解决1:明确指定几何列名
gdf = gpd.read_postgis(
    "SELECT id, name, the_geom FROM my_table",
    con=engine,
    geom_col="the_geom"  # 必须与 SELECT 中的列名一致
)

# 解决2:使用别名
gdf = gpd.read_postgis(
    "SELECT id, name, shape AS geom FROM my_table",
    con=engine,
    geom_col="geom"
)

9.11.3 CRS 问题

# 问题:读取后 CRS 为 None
# 原因:几何列没有注册 SRID

# 检查 SRID
with engine.connect() as conn:
    result = conn.execute(text(
        "SELECT ST_SRID(geom) FROM my_table LIMIT 1"
    ))
    srid = result.scalar()
    print(f"SRID: {srid}")

# 如果 SRID 为 0 或 -1,需要手动设置
gdf = gpd.read_postgis(
    "SELECT * FROM my_table",
    con=engine,
    geom_col="geom",
    crs="EPSG:4326"  # 显式指定 CRS
)

# 修复数据库中的 SRID
with engine.connect() as conn:
    conn.execute(text(
        "SELECT UpdateGeometrySRID('my_table', 'geom', 4326)"
    ))
    conn.commit()

9.11.4 编码问题

# 问题:中文乱码
# 解决:确保连接使用 UTF-8 编码

engine = create_engine(
    "postgresql://user:pass@localhost:5432/gis_db"
    "?client_encoding=utf8"
)

# 或者在 PostgreSQL 配置中设置
# ALTER DATABASE gis_db SET client_encoding TO 'UTF8';

9.11.5 性能问题

# 问题:查询非常慢
# 排查步骤:

# 1. 检查是否有空间索引
with engine.connect() as conn:
    result = conn.execute(text("""
        SELECT indexname
        FROM pg_indexes
        WHERE tablename = 'my_table'
    """))
    indexes = result.fetchall()
    print("索引列表:", indexes)

# 2. 使用 EXPLAIN ANALYZE 分析查询
with engine.connect() as conn:
    result = conn.execute(text("""
        EXPLAIN ANALYZE
        SELECT * FROM my_table
        WHERE ST_Intersects(geom, ST_MakeEnvelope(115, 39, 117, 41, 4326))
    """))
    for row in result:
        print(row[0])

# 3. 检查表大小和行数
with engine.connect() as conn:
    result = conn.execute(text("""
        SELECT
            pg_size_pretty(pg_total_relation_size('my_table')) as total_size,
            (SELECT count(*) FROM my_table) as row_count
    """))
    row = result.fetchone()
    print(f"表大小: {row[0]}, 行数: {row[1]}")

9.12 本章小结

本章详细介绍了 GeoPandas 与空间数据库的交互,涵盖了以下核心内容:

主题 关键要点
空间数据库 PostGIS 是最强大的开源选择,Spatialite 适合轻量级场景
连接配置 使用 SQLAlchemy Engine,支持连接池
read_postgis() 支持 SQL 查询、几何列指定、CRS 设置、参数化查询
to_postgis() 支持表创建、追加、替换,自动处理几何类型
Spatialite 零配置嵌入式空间数据库,通过 read_file() 更简单
GeoAlchemy2 SQLAlchemy 的空间扩展,支持 ORM 模式
空间 SQL ST_Intersects、ST_Within、ST_Buffer 等函数
流式读取 chunksize 参数控制内存使用
性能优化 连接池、空间索引、查询优化

核心建议:

  • 大型项目使用 PostGIS,小型项目可用 Spatialite 或 GeoPackage
  • 始终创建空间索引
  • 使用参数化查询防止 SQL 注入
  • 大数据集使用分块读取(chunksize)
  • 批量写入前删除索引,写入后重建
  • 使用环境变量管理数据库凭据,避免硬编码

下一章我们将学习 GeoPandas 中的几何属性与度量功能,了解如何获取几何对象的各种属性信息。