第9章:数据读写——数据库 IO
在企业级 GIS 应用和大规模空间数据管理中,空间数据库是不可或缺的基础设施。GeoPandas 提供了与 PostGIS、Spatialite 等空间数据库的无缝集成,支持直接从数据库读取空间数据,也支持将处理结果写回数据库。本章将详细介绍 GeoPandas 的数据库 IO 功能。
9.1 空间数据库概述
9.1.1 什么是空间数据库
空间数据库是在传统关系型数据库的基础上,添加了空间数据类型和空间查询功能的数据库系统。它能够存储、索引和查询地理空间数据。
9.1.2 常见空间数据库
| 数据库 | 说明 | 空间扩展 | 开源 |
|---|---|---|---|
| PostgreSQL + PostGIS | 最强大的开源空间数据库 | PostGIS | ✅ |
| SQLite + Spatialite | 轻量级嵌入式空间数据库 | SpatiaLite | ✅ |
| MySQL | 内置基本空间功能 | 内置 | ✅ |
| Oracle Spatial | 企业级空间数据库 | Oracle Spatial | ❌ |
| SQL Server | 微软空间数据库 | 内置 | ❌ |
| DuckDB + Spatial | 新兴的分析型空间数据库 | Spatial 扩展 | ✅ |
9.1.3 PostGIS 简介
PostGIS 是 PostgreSQL 数据库的空间扩展,是目前最功能完善、使用最广泛的开源空间数据库。
PostGIS 的核心能力:
- 存储和索引各种几何类型(点、线、面、多几何等)
- 空间关系查询(包含、相交、邻近等)
- 空间分析函数(缓冲区、叠加、最近邻等)
- 坐标系转换
- 拓扑分析
- 栅格数据支持
- 3D 和 4D 几何支持
-- PostGIS 基本操作示例
-- 创建空间表
CREATE TABLE cities (
id SERIAL PRIMARY KEY,
name VARCHAR(100),
population INTEGER,
geom GEOMETRY(Point, 4326)
);
-- 创建空间索引
CREATE INDEX idx_cities_geom ON cities USING GIST (geom);
-- 插入数据
INSERT INTO cities (name, population, geom)
VALUES ('北京', 21710000, ST_SetSRID(ST_MakePoint(116.407, 39.904), 4326));
-- 空间查询
SELECT name, population
FROM cities
WHERE ST_DWithin(
geom,
ST_SetSRID(ST_MakePoint(116.4, 39.9), 4326),
0.1
);
9.1.4 Spatialite 简介
Spatialite 是 SQLite 的空间扩展,是一个轻量级的嵌入式空间数据库,无需安装服务器即可使用。
# Spatialite 的优势:
# - 零配置,单文件数据库
# - 无需安装数据库服务器
# - 适合桌面应用和小型项目
# - 可用于测试和原型开发
9.2 数据库连接配置
9.2.1 SQLAlchemy Engine
GeoPandas 使用 SQLAlchemy 作为数据库连接的抽象层。首先需要创建一个数据库引擎(Engine)。
from sqlalchemy import create_engine
# PostgreSQL/PostGIS 连接
engine = create_engine(
"postgresql://username:password@hostname:5432/database_name"
)
# 连接字符串格式:
# postgresql://用户名:密码@主机:端口/数据库名
9.2.2 连接字符串格式
from sqlalchemy import create_engine
# 1. 基本连接
engine = create_engine("postgresql://user:pass@localhost:5432/mydb")
# 2. 使用 psycopg2 驱动(明确指定)
engine = create_engine("postgresql+psycopg2://user:pass@localhost:5432/mydb")
# 3. 使用 psycopg(v3)驱动
engine = create_engine("postgresql+psycopg://user:pass@localhost:5432/mydb")
# 4. 带额外参数的连接
engine = create_engine(
"postgresql://user:pass@localhost:5432/mydb",
echo=False, # 不打印 SQL 语句
pool_size=5, # 连接池大小
max_overflow=10, # 最大溢出连接数
pool_timeout=30, # 连接超时
pool_recycle=1800 # 连接回收时间(秒)
)
9.2.3 使用环境变量管理凭据
import os
from sqlalchemy import create_engine
# 从环境变量读取数据库配置
DB_USER = os.environ.get("DB_USER", "postgres")
DB_PASS = os.environ.get("DB_PASS", "password")
DB_HOST = os.environ.get("DB_HOST", "localhost")
DB_PORT = os.environ.get("DB_PORT", "5432")
DB_NAME = os.environ.get("DB_NAME", "gis_db")
engine = create_engine(
f"postgresql://{DB_USER}:{DB_PASS}@{DB_HOST}:{DB_PORT}/{DB_NAME}"
)
9.2.4 使用 psycopg 直接连接
import psycopg2
# 直接使用 psycopg2 连接
conn = psycopg2.connect(
host="localhost",
port=5432,
dbname="gis_db",
user="postgres",
password="password"
)
# 注意:read_postgis() 也支持直接传入 psycopg2 连接
# 但推荐使用 SQLAlchemy Engine
9.2.5 SQLite/Spatialite 连接
from sqlalchemy import create_engine
# SQLite 连接(无 Spatialite 扩展)
engine = create_engine("sqlite:///my_database.db")
# Spatialite 连接需要特殊处理
import sqlite3
def load_spatialite(dbapi_conn, connection_record):
"""加载 Spatialite 扩展"""
dbapi_conn.enable_load_extension(True)
dbapi_conn.load_extension("mod_spatialite")
from sqlalchemy import event
engine = create_engine("sqlite:///my_spatial_db.db")
event.listen(engine, "connect", load_spatialite)
9.3 read_postgis() 详解
9.3.1 基本语法
geopandas.read_postgis(
sql, # SQL 查询语句或表名
con, # 数据库连接(SQLAlchemy Engine 或连接字符串)
geom_col="geom", # 几何列名称
crs=None, # 坐标参考系统
index_col=None, # 用作索引的列
coerce_float=True, # 是否将数值转为浮点数
parse_dates=None, # 日期列解析
params=None, # SQL 参数
chunksize=None # 分块读取大小
)
9.3.2 基本读取
import geopandas as gpd
from sqlalchemy import create_engine
# 创建连接
engine = create_engine("postgresql://user:pass@localhost:5432/gis_db")
# 方法1:读取整张表
gdf = gpd.read_postgis(
"SELECT * FROM provinces",
con=engine,
geom_col="geom"
)
print(f"读取 {len(gdf)} 条记录")
# 方法2:使用表名(自动生成 SELECT *)
gdf = gpd.read_postgis(
"provinces",
con=engine,
geom_col="geom"
)
9.3.3 指定几何列
# 当表中有多个几何列时,需要明确指定
gdf = gpd.read_postgis(
"SELECT id, name, geom, centroid_geom FROM regions",
con=engine,
geom_col="geom" # 指定使用哪个几何列
)
# 如果几何列名不是默认的 "geom"
gdf = gpd.read_postgis(
"SELECT * FROM my_table",
con=engine,
geom_col="the_geom" # 或 "geometry"、"shape" 等
)
9.3.4 指定 CRS
# 方法1:从数据库自动获取 CRS(如果表定义了 SRID)
gdf = gpd.read_postgis(
"SELECT * FROM provinces",
con=engine,
geom_col="geom"
# CRS 会从 geometry_columns 表中自动获取
)
print(gdf.crs) # 自动识别
# 方法2:显式指定 CRS
gdf = gpd.read_postgis(
"SELECT * FROM provinces",
con=engine,
geom_col="geom",
crs="EPSG:4326" # 显式指定
)
9.3.5 使用 SQL 查询进行过滤
# 属性过滤
gdf = gpd.read_postgis(
"""
SELECT name, population, area_km2, geom
FROM cities
WHERE population > 5000000
ORDER BY population DESC
""",
con=engine,
geom_col="geom"
)
# 空间过滤(使用 PostGIS 函数)
gdf = gpd.read_postgis(
"""
SELECT c.name, c.population, c.geom
FROM cities c
WHERE ST_Intersects(
c.geom,
ST_MakeEnvelope(115, 39, 117, 41, 4326)
)
""",
con=engine,
geom_col="geom"
)
# 空间连接查询
gdf = gpd.read_postgis(
"""
SELECT c.name AS city_name,
p.name AS province_name,
c.population,
c.geom
FROM cities c
JOIN provinces p ON ST_Within(c.geom, p.geom)
WHERE p.name = '广东省'
""",
con=engine,
geom_col="geom"
)
9.3.6 参数化查询
# 使用参数化查询防止 SQL 注入
from sqlalchemy import text
# 方法1:使用 params 参数
gdf = gpd.read_postgis(
text("SELECT * FROM cities WHERE province = :prov AND population > :pop"),
con=engine,
geom_col="geom",
params={"prov": "广东省", "pop": 1000000}
)
# 方法2:使用 SQLAlchemy text() 的 bindparams
from sqlalchemy import text, bindparam
sql = text(
"SELECT * FROM cities WHERE name = :name"
).bindparams(bindparam("name", type_=str))
gdf = gpd.read_postgis(sql, con=engine, geom_col="geom", params={"name": "广州"})
9.4 to_postgis() 详解
9.4.1 基本语法
GeoDataFrame.to_postgis(
name, # 目标表名
con, # 数据库连接
schema=None, # 数据库 schema
if_exists="fail", # 表已存在时的行为
index=True, # 是否写入索引
index_label=None, # 索引列名
chunksize=None, # 分块写入大小
dtype=None # 列类型映射
)
9.4.2 基本写入
import geopandas as gpd
from sqlalchemy import create_engine
engine = create_engine("postgresql://user:pass@localhost:5432/gis_db")
# 写入新表
gdf.to_postgis("cities", con=engine)
# 写入时会自动:
# 1. 创建表结构
# 2. 设置几何列类型和 SRID
# 3. 写入数据
9.4.3 if_exists 参数
# 表已存在时的处理策略
# "fail"(默认):如果表存在则报错
gdf.to_postgis("cities", con=engine, if_exists="fail")
# "replace":删除旧表,创建新表
gdf.to_postgis("cities", con=engine, if_exists="replace")
# "append":追加到已有表
gdf.to_postgis("cities", con=engine, if_exists="append")
9.4.4 指定 schema
# 写入到特定 schema
gdf.to_postgis("cities", con=engine, schema="public")
gdf.to_postgis("cities", con=engine, schema="gis_data")
# 确保 schema 存在
from sqlalchemy import text
with engine.connect() as conn:
conn.execute(text("CREATE SCHEMA IF NOT EXISTS gis_data"))
conn.commit()
9.4.5 索引控制
# 写入索引(默认)
gdf.to_postgis("cities", con=engine, index=True)
# 不写入索引
gdf.to_postgis("cities", con=engine, index=False)
# 自定义索引列名
gdf.to_postgis("cities", con=engine, index=True, index_label="gid")
9.4.6 dtype 参数 — 自定义列类型
from sqlalchemy import Integer, String, Float
from geoalchemy2 import Geometry
# 显式指定列类型
gdf.to_postgis(
"cities",
con=engine,
dtype={
"name": String(100),
"population": Integer,
"area_km2": Float,
"geometry": Geometry("POINT", srid=4326)
}
)
9.4.7 分块写入
# 对于大数据集,使用分块写入以控制内存和事务大小
gdf.to_postgis(
"large_table",
con=engine,
chunksize=10000, # 每次写入 10000 行
if_exists="replace"
)
9.4.8 写入后创建空间索引
from sqlalchemy import text
# 写入数据
gdf.to_postgis("cities", con=engine, if_exists="replace")
# 手动创建空间索引(如果没有自动创建)
with engine.connect() as conn:
conn.execute(text(
"CREATE INDEX IF NOT EXISTS idx_cities_geometry "
"ON cities USING GIST (geometry)"
))
conn.commit()
# 分析表以更新统计信息
with engine.connect() as conn:
conn.execute(text("ANALYZE cities"))
conn.commit()
9.5 Spatialite 读写
9.5.1 创建 Spatialite 数据库
import sqlite3
# 创建数据库并加载 Spatialite 扩展
conn = sqlite3.connect("my_spatial.db")
conn.enable_load_extension(True)
try:
conn.load_extension("mod_spatialite")
except Exception as e:
print(f"加载 Spatialite 扩展失败: {e}")
print("请确保已安装 libspatialite")
# 初始化空间元数据表
conn.execute("SELECT InitSpatialMetaData(1)")
conn.commit()
9.5.2 使用 GeoPandas 读取 Spatialite
import geopandas as gpd
from sqlalchemy import create_engine, event
import sqlite3
# 创建引擎并加载 Spatialite 扩展
engine = create_engine("sqlite:///my_spatial.db")
@event.listens_for(engine, "connect")
def connect(dbapi_connection, connection_record):
dbapi_connection.enable_load_extension(True)
dbapi_connection.load_extension("mod_spatialite")
# 读取数据
gdf = gpd.read_postgis(
"SELECT *, AsText(geometry) FROM cities",
con=engine,
geom_col="geometry"
)
# 或者直接使用 read_file() 读取 Spatialite
gdf = gpd.read_file("my_spatial.db", layer="cities")
9.5.3 使用 GeoPandas 写入 Spatialite
# 方法1:使用 to_file()(推荐,更简单)
gdf.to_file("output.db", driver="SQLite", layer="cities")
# 方法2:通过 GeoPackage 格式(GeoPackage 本身就是 SQLite)
gdf.to_file("output.gpkg", layer="cities")
# 方法3:使用 SQLAlchemy(需要更多配置)
# 需要 GeoAlchemy2 支持
9.5.4 Spatialite vs GeoPackage
| 特性 | Spatialite | GeoPackage |
|---|---|---|
| 基础 | SQLite + SpatiaLite 扩展 | SQLite(OGC 标准) |
| 需要扩展 | 是(mod_spatialite) | 否 |
| 空间函数 | 丰富(类似 PostGIS) | 有限 |
| 互操作性 | 一般 | 优秀(OGC 标准) |
| GeoPandas 支持 | 通过 SQLAlchemy | 原生支持 |
| 推荐场景 | 需要复杂空间查询 | 数据存储和交换 |
9.6 GeoAlchemy2 集成
9.6.1 GeoAlchemy2 简介
GeoAlchemy2 是 SQLAlchemy 的地理空间扩展,提供了 ORM 模式下的空间数据操作能力。
# 安装
# pip install geoalchemy2
from sqlalchemy import Column, Integer, String
from sqlalchemy.orm import declarative_base
from geoalchemy2 import Geometry
Base = declarative_base()
class City(Base):
__tablename__ = "cities"
id = Column(Integer, primary_key=True)
name = Column(String(100))
population = Column(Integer)
geom = Column(Geometry("POINT", srid=4326))
9.6.2 使用 ORM 创建表
from sqlalchemy import create_engine
from sqlalchemy.orm import Session
engine = create_engine("postgresql://user:pass@localhost:5432/gis_db")
# 创建所有表
Base.metadata.create_all(engine)
# 插入数据
with Session(engine) as session:
city = City(
name="北京",
population=21710000,
geom="SRID=4326;POINT(116.407 39.904)"
)
session.add(city)
session.commit()
9.6.3 ORM 空间查询
from sqlalchemy.orm import Session
from geoalchemy2.functions import ST_DWithin, ST_GeomFromText
from geoalchemy2 import WKTElement
with Session(engine) as session:
# 查找距离某点 0.1 度范围内的城市
center = WKTElement("POINT(116.4 39.9)", srid=4326)
cities = session.query(City).filter(
ST_DWithin(City.geom, center, 0.1)
).all()
for city in cities:
print(f"{city.name}: 人口 {city.population}")
9.6.4 将 ORM 查询结果转为 GeoDataFrame
import geopandas as gpd
from sqlalchemy import select
# 使用 SQLAlchemy Core 查询
stmt = select(City).where(City.population > 5000000)
# 通过 read_postgis 执行查询
gdf = gpd.read_postgis(
stmt,
con=engine,
geom_col="geom"
)
print(gdf)
9.7 高级 SQL 空间查询
9.7.1 空间关系函数
PostGIS 提供了丰富的空间关系函数,可以在 SQL 中直接使用:
import geopandas as gpd
# ST_Intersects — 相交
gdf = gpd.read_postgis(
"""
SELECT a.name, a.geom
FROM buildings a, flood_zones b
WHERE ST_Intersects(a.geom, b.geom)
AND b.risk_level = 'high'
""",
con=engine,
geom_col="geom"
)
# ST_Within — 在…之内
gdf = gpd.read_postgis(
"""
SELECT p.name, p.geom
FROM pois p, districts d
WHERE ST_Within(p.geom, d.geom)
AND d.name = '海淀区'
""",
con=engine,
geom_col="geom"
)
# ST_Contains — 包含
gdf = gpd.read_postgis(
"""
SELECT d.name, d.geom, COUNT(p.id) as poi_count
FROM districts d
LEFT JOIN pois p ON ST_Contains(d.geom, p.geom)
GROUP BY d.name, d.geom
""",
con=engine,
geom_col="geom"
)
9.7.2 空间分析函数
# ST_Buffer — 缓冲区分析
gdf = gpd.read_postgis(
"""
SELECT name,
ST_Buffer(geom::geography, 1000)::geometry as geom
FROM schools
""",
con=engine,
geom_col="geom"
)
# ST_Distance — 距离计算
gdf = gpd.read_postgis(
"""
SELECT a.name AS school_name,
b.name AS hospital_name,
ST_Distance(a.geom::geography, b.geom::geography) as distance_m
FROM schools a
CROSS JOIN LATERAL (
SELECT name, geom
FROM hospitals
ORDER BY a.geom <-> geom
LIMIT 1
) b
""",
con=engine,
geom_col="geom" # 注意此查询可能需要调整
)
# ST_Union — 合并几何
gdf = gpd.read_postgis(
"""
SELECT province,
ST_Union(geom) as geom,
SUM(population) as total_pop
FROM cities
GROUP BY province
""",
con=engine,
geom_col="geom"
)
9.7.3 空间索引利用
# 使用空间索引优化的查询模式
# PostGIS 会自动利用 GIST 索引
# 最近邻查询(KNN)— 使用 <-> 操作符
gdf = gpd.read_postgis(
"""
SELECT name, population, geom
FROM cities
ORDER BY geom <-> ST_SetSRID(ST_MakePoint(116.4, 39.9), 4326)
LIMIT 10
""",
con=engine,
geom_col="geom"
)
# 边界框过滤 — 使用 && 操作符
gdf = gpd.read_postgis(
"""
SELECT *
FROM buildings
WHERE geom && ST_MakeEnvelope(115, 39, 117, 41, 4326)
""",
con=engine,
geom_col="geom"
)
9.8 大数据量流式读取
9.8.1 chunksize 参数
对于大表,一次性读取可能导致内存不足。使用 chunksize 参数可以分块读取:
import geopandas as gpd
from sqlalchemy import create_engine
engine = create_engine("postgresql://user:pass@localhost:5432/gis_db")
# 分块读取,每次 10000 行
chunks = gpd.read_postgis(
"SELECT * FROM large_table",
con=engine,
geom_col="geom",
chunksize=10000
)
# chunks 是一个迭代器
results = []
for i, chunk in enumerate(chunks):
print(f"处理第 {i+1} 块,{len(chunk)} 行")
# 对每块数据进行处理
processed = process_chunk(chunk)
results.append(processed)
# 合并结果
import pandas as pd
final_gdf = gpd.GeoDataFrame(pd.concat(results, ignore_index=True))
9.8.2 服务端游标
# 使用服务端游标减少客户端内存使用
from sqlalchemy import create_engine, text
engine = create_engine(
"postgresql://user:pass@localhost:5432/gis_db",
execution_options={
"stream_results": True # 启用服务端游标
}
)
# 配合 chunksize 使用
chunks = gpd.read_postgis(
"SELECT * FROM very_large_table",
con=engine,
geom_col="geom",
chunksize=50000
)
for chunk in chunks:
# 处理每个块
pass
9.8.3 分页读取
def read_paginated(engine, table, geom_col, page_size=50000):
"""使用 OFFSET/LIMIT 分页读取"""
offset = 0
all_chunks = []
while True:
sql = f"""
SELECT *
FROM {table}
ORDER BY id
LIMIT {page_size} OFFSET {offset}
"""
chunk = gpd.read_postgis(sql, con=engine, geom_col=geom_col)
if len(chunk) == 0:
break
all_chunks.append(chunk)
offset += page_size
print(f"已读取 {offset} 行...")
import pandas as pd
return gpd.GeoDataFrame(pd.concat(all_chunks, ignore_index=True))
# 使用
gdf = read_paginated(engine, "large_table", "geom")
9.9 连接池与性能优化
9.9.1 连接池配置
from sqlalchemy import create_engine
from sqlalchemy.pool import QueuePool
# 配置连接池
engine = create_engine(
"postgresql://user:pass@localhost:5432/gis_db",
poolclass=QueuePool,
pool_size=5, # 常驻连接数
max_overflow=10, # 最大溢出连接数
pool_timeout=30, # 获取连接的超时时间(秒)
pool_recycle=1800, # 连接回收时间(秒)
pool_pre_ping=True # 使用前检查连接是否有效
)
# 查看连接池状态
print(engine.pool.status())
9.9.2 批量写入优化
import geopandas as gpd
# 1. 使用适当的 chunksize
gdf.to_postgis("table", con=engine, chunksize=5000)
# 2. 先删除索引,写入后重建(大批量写入时)
from sqlalchemy import text
with engine.connect() as conn:
# 删除空间索引
conn.execute(text("DROP INDEX IF EXISTS idx_table_geom"))
conn.commit()
# 写入数据
gdf.to_postgis("table", con=engine, if_exists="replace")
# 重建索引
with engine.connect() as conn:
conn.execute(text(
"CREATE INDEX idx_table_geom ON table USING GIST (geometry)"
))
conn.execute(text("ANALYZE table"))
conn.commit()
# 3. 使用 COPY 命令(最快的批量写入方式)
# 需要使用 psycopg2 的 copy_expert 功能
# 这超出了 GeoPandas 的直接支持范围
9.9.3 查询优化技巧
# 1. 只选择需要的列
gdf = gpd.read_postgis(
"SELECT name, population, geom FROM cities", # 而非 SELECT *
con=engine,
geom_col="geom"
)
# 2. 使用空间索引友好的查询
# ✅ 好:使用边界框先过滤
gdf = gpd.read_postgis(
"""
SELECT *
FROM buildings
WHERE geom && ST_MakeEnvelope(115, 39, 117, 41, 4326)
AND ST_Intersects(geom, some_complex_geometry)
""",
con=engine,
geom_col="geom"
)
# 3. 使用 LIMIT 限制结果集
gdf = gpd.read_postgis(
"SELECT * FROM pois ORDER BY geom <-> reference_point LIMIT 100",
con=engine,
geom_col="geom"
)
9.10 数据库空间索引
9.10.1 空间索引的重要性
空间索引是空间数据库性能的关键。没有空间索引,每次空间查询都需要扫描全表。
-- 创建 GIST 空间索引(PostGIS)
CREATE INDEX idx_cities_geom ON cities USING GIST (geom);
-- 查看索引使用情况
EXPLAIN ANALYZE
SELECT *
FROM cities
WHERE ST_Intersects(geom, ST_MakeEnvelope(115, 39, 117, 41, 4326));
9.10.2 在 Python 中管理空间索引
from sqlalchemy import text, inspect
# 检查表是否有空间索引
def check_spatial_index(engine, table_name, geom_col="geometry"):
"""检查表是否有空间索引"""
inspector = inspect(engine)
indexes = inspector.get_indexes(table_name)
for idx in indexes:
if geom_col in idx.get("column_names", []):
print(f"✓ 找到空间索引: {idx['name']}")
return True
print(f"⚠️ 表 {table_name} 没有空间索引")
return False
# 创建空间索引
def create_spatial_index(engine, table_name, geom_col="geometry"):
"""创建空间索引"""
index_name = f"idx_{table_name}_{geom_col}"
sql = f"CREATE INDEX IF NOT EXISTS {index_name} ON {table_name} USING GIST ({geom_col})"
with engine.connect() as conn:
conn.execute(text(sql))
conn.execute(text(f"ANALYZE {table_name}"))
conn.commit()
print(f"✓ 已创建空间索引: {index_name}")
9.10.3 索引类型对比
| 索引类型 | 说明 | 适用场景 |
|---|---|---|
| GIST | 通用搜索树,PostGIS 默认 | 空间查询(推荐) |
| SP-GiST | 空间分区 GiST | 非均匀分布的数据 |
| BRIN | 块范围索引,体积极小 | 物理有序的大数据集 |
| R-tree | Spatialite 使用 | Spatialite 数据库 |
9.11 常见问题排查
9.11.1 连接问题
# 问题1:连接被拒绝
# 解决:检查数据库是否运行、端口是否正确、防火墙设置
try:
engine = create_engine("postgresql://user:pass@localhost:5432/gis_db")
with engine.connect() as conn:
conn.execute(text("SELECT 1"))
print("✓ 连接成功")
except Exception as e:
print(f"✗ 连接失败: {e}")
# 问题2:认证失败
# 解决:检查用户名密码、pg_hba.conf 配置
# 问题3:数据库不存在
# 解决:先创建数据库
# createdb -U postgres gis_db
9.11.2 几何列识别问题
# 问题:read_postgis() 无法识别几何列
# 原因:查询结果中的几何列名与 geom_col 参数不匹配
# 解决1:明确指定几何列名
gdf = gpd.read_postgis(
"SELECT id, name, the_geom FROM my_table",
con=engine,
geom_col="the_geom" # 必须与 SELECT 中的列名一致
)
# 解决2:使用别名
gdf = gpd.read_postgis(
"SELECT id, name, shape AS geom FROM my_table",
con=engine,
geom_col="geom"
)
9.11.3 CRS 问题
# 问题:读取后 CRS 为 None
# 原因:几何列没有注册 SRID
# 检查 SRID
with engine.connect() as conn:
result = conn.execute(text(
"SELECT ST_SRID(geom) FROM my_table LIMIT 1"
))
srid = result.scalar()
print(f"SRID: {srid}")
# 如果 SRID 为 0 或 -1,需要手动设置
gdf = gpd.read_postgis(
"SELECT * FROM my_table",
con=engine,
geom_col="geom",
crs="EPSG:4326" # 显式指定 CRS
)
# 修复数据库中的 SRID
with engine.connect() as conn:
conn.execute(text(
"SELECT UpdateGeometrySRID('my_table', 'geom', 4326)"
))
conn.commit()
9.11.4 编码问题
# 问题:中文乱码
# 解决:确保连接使用 UTF-8 编码
engine = create_engine(
"postgresql://user:pass@localhost:5432/gis_db"
"?client_encoding=utf8"
)
# 或者在 PostgreSQL 配置中设置
# ALTER DATABASE gis_db SET client_encoding TO 'UTF8';
9.11.5 性能问题
# 问题:查询非常慢
# 排查步骤:
# 1. 检查是否有空间索引
with engine.connect() as conn:
result = conn.execute(text("""
SELECT indexname
FROM pg_indexes
WHERE tablename = 'my_table'
"""))
indexes = result.fetchall()
print("索引列表:", indexes)
# 2. 使用 EXPLAIN ANALYZE 分析查询
with engine.connect() as conn:
result = conn.execute(text("""
EXPLAIN ANALYZE
SELECT * FROM my_table
WHERE ST_Intersects(geom, ST_MakeEnvelope(115, 39, 117, 41, 4326))
"""))
for row in result:
print(row[0])
# 3. 检查表大小和行数
with engine.connect() as conn:
result = conn.execute(text("""
SELECT
pg_size_pretty(pg_total_relation_size('my_table')) as total_size,
(SELECT count(*) FROM my_table) as row_count
"""))
row = result.fetchone()
print(f"表大小: {row[0]}, 行数: {row[1]}")
9.12 本章小结
本章详细介绍了 GeoPandas 与空间数据库的交互,涵盖了以下核心内容:
| 主题 | 关键要点 |
|---|---|
| 空间数据库 | PostGIS 是最强大的开源选择,Spatialite 适合轻量级场景 |
| 连接配置 | 使用 SQLAlchemy Engine,支持连接池 |
| read_postgis() | 支持 SQL 查询、几何列指定、CRS 设置、参数化查询 |
| to_postgis() | 支持表创建、追加、替换,自动处理几何类型 |
| Spatialite | 零配置嵌入式空间数据库,通过 read_file() 更简单 |
| GeoAlchemy2 | SQLAlchemy 的空间扩展,支持 ORM 模式 |
| 空间 SQL | ST_Intersects、ST_Within、ST_Buffer 等函数 |
| 流式读取 | chunksize 参数控制内存使用 |
| 性能优化 | 连接池、空间索引、查询优化 |
核心建议:
- 大型项目使用 PostGIS,小型项目可用 Spatialite 或 GeoPackage
- 始终创建空间索引
- 使用参数化查询防止 SQL 注入
- 大数据集使用分块读取(chunksize)
- 批量写入前删除索引,写入后重建
- 使用环境变量管理数据库凭据,避免硬编码
下一章我们将学习 GeoPandas 中的几何属性与度量功能,了解如何获取几何对象的各种属性信息。