znlgis 博客

GIS开发与技术分享

第11章:线性参考

线性参考(Linear Referencing)是 GIS 中一种沿线性要素定位的技术。通过在线上指定距离或比例来确定点的位置,广泛应用于道路里程桩管理、管线定位、GPS 轨迹匹配等场景。Shapely 提供了一套完整的线性参考函数,包括沿线插值、点投影、线段提取、线段合并与分割等功能,本章将逐一详细介绍。


线性参考概述

什么是线性参考

线性参考是一种利用沿线测量值(measure)来标识线性要素上位置的方法。与传统的坐标定位不同,线性参考使用”距起点多远”这样的一维度量来描述位置。

在日常生活中,线性参考无处不在:

  • 道路里程桩:高速公路上的”K120”表示距起点 120 公里
  • 管线定位:天然气管道第 3500 米处的阀门
  • 铁路里程:京沪高铁第 856 公里处
  • 河流水文:长江上游距源头 2400 公里处的水文站

线性参考在 GIS 中的应用

# 线性参考的核心思想:用一维距离描述二维位置
from shapely import Point, LineString

# 一条道路
road = LineString([(0, 0), (10, 0), (10, 10)])
# 道路总长度
print(f"道路总长度: {road.length}")  # 20.0

# "在道路 5 公里处" → 线性参考
# "在坐标 (5, 0) 处" → 坐标定位

Shapely 中的线性参考函数

Shapely 提供了以下线性参考相关函数:

函数 说明
line_interpolate_point 沿线插值点
line_locate_point 点到线的投影距离
line_merge 合并连接的线段
shared_paths 共享路径
shortest_line 两几何体间最短连线
ops.substring 提取线段子串
ops.split 分割几何体
ops.nearest_points 最近点对

line_interpolate_point:沿线插值点

函数签名

shapely.line_interpolate_point(line, distance, normalized=False)

该函数返回沿线在指定距离处的点。distance 为沿线距离,normalized 控制是否使用归一化距离(0 到 1 之间的比例)。

绝对距离插值

from shapely import LineString, line_interpolate_point

# 创建一条简单的线
line = LineString([(0, 0), (10, 0)])

# 在距起点 3 个单位处插值
point = line_interpolate_point(line, 3)
print(point)  # POINT (3 0)

# 在距起点 7 个单位处插值
point2 = line_interpolate_point(line, 7)
print(point2)  # POINT (7 0)

# 超过线长度时,返回终点
point3 = line_interpolate_point(line, 15)
print(point3)  # POINT (10 0)

# 负值时,返回起点
point4 = line_interpolate_point(line, -1)
print(point4)  # POINT (0 0)

归一化距离插值

from shapely import LineString, line_interpolate_point

line = LineString([(0, 0), (10, 0), (10, 10)])
print(f"线长度: {line.length}")  # 20.0

# 归一化距离:0.0 = 起点, 1.0 = 终点
start = line_interpolate_point(line, 0.0, normalized=True)
mid = line_interpolate_point(line, 0.5, normalized=True)
end = line_interpolate_point(line, 1.0, normalized=True)

print(f"起点: {start}")    # POINT (0 0)
print(f"中点: {mid}")      # POINT (10 0)
print(f"终点: {end}")      # POINT (10 10)

# 四分之一处
quarter = line_interpolate_point(line, 0.25, normalized=True)
print(f"1/4处: {quarter}")  # POINT (5 0)

在折线上插值

from shapely import LineString, line_interpolate_point

# L 形折线
line = LineString([(0, 0), (10, 0), (10, 5)])
print(f"总长度: {line.length}")  # 15.0

# 在拐弯处
point_at_10 = line_interpolate_point(line, 10.0)
print(f"距离10处: {point_at_10}")  # POINT (10 0)

# 过了拐弯
point_at_12 = line_interpolate_point(line, 12.0)
print(f"距离12处: {point_at_12}")  # POINT (10 2)

# 等间距采样
import numpy as np
distances = np.linspace(0, line.length, 10)
points = [line_interpolate_point(line, d) for d in distances]
for i, p in enumerate(points):
    print(f"  采样点{i}: {p}")

处理 MultiLineString

from shapely import MultiLineString, line_interpolate_point

# MultiLineString 会被视为连续的线
mline = MultiLineString([
    [(0, 0), (5, 0)],
    [(5, 0), (10, 0)]
])

point = line_interpolate_point(mline, 7.0)
print(f"MultiLineString 上距离7处: {point}")  # POINT (7 0)

line_locate_point:点到线的投影距离

函数签名

shapely.line_locate_point(line, point, normalized=False)

该函数将点投影到线上,返回投影点到线起点的距离。这是 line_interpolate_point 的逆操作。

基本用法

from shapely import LineString, Point, line_locate_point

line = LineString([(0, 0), (10, 0)])
point = Point(5, 3)  # 不在线上的点

# 获取投影距离
distance = line_locate_point(line, point)
print(f"投影距离: {distance}")  # 5.0

# 点就在线上
point_on_line = Point(7, 0)
distance2 = line_locate_point(line, point_on_line)
print(f"投影距离: {distance2}")  # 7.0

归一化距离

from shapely import LineString, Point, line_locate_point

line = LineString([(0, 0), (10, 0), (10, 10)])

point = Point(10, 5)
dist = line_locate_point(line, point, normalized=False)
dist_norm = line_locate_point(line, point, normalized=True)

print(f"绝对距离: {dist}")      # 15.0
print(f"归一化距离: {dist_norm}")  # 0.75
print(f"线总长: {line.length}")    # 20.0

互逆验证

from shapely import LineString, Point
from shapely import line_interpolate_point, line_locate_point

line = LineString([(0, 0), (5, 5), (10, 0)])
original_point = Point(7, 4)

# 先投影得到距离
dist = line_locate_point(line, original_point)
print(f"投影距离: {dist:.4f}")

# 再用距离插值得到投影点
projected = line_interpolate_point(line, dist)
print(f"投影点: {projected}")

# 验证投影点在线上(或非常接近)
print(f"投影点到线的距离: {line.distance(projected):.10f}")  # 接近 0

道路里程桩定位应用

from shapely import LineString, Point, line_locate_point

# 模拟一条道路
road = LineString([
    (116.0, 39.0), (116.5, 39.2),
    (117.0, 39.5), (117.5, 39.3),
    (118.0, 39.8)
])

# 某事故发生地点
accident = Point(117.2, 39.4)

# 计算事故在道路上的里程
mileage = line_locate_point(road, accident)
mileage_norm = line_locate_point(road, accident, normalized=True)

print(f"事故距道路起点: {mileage:.4f} 单位")
print(f"事故在道路的 {mileage_norm*100:.1f}% 处")

line_merge:合并连接的线段

函数签名

shapely.line_merge(multilinestring, directed=False)

将多条首尾相连的线段合并为更少的线段。

基本合并

from shapely import MultiLineString, line_merge

# 三条首尾相连的线段
lines = MultiLineString([
    [(0, 0), (1, 1)],
    [(1, 1), (2, 2)],
    [(2, 2), (3, 3)]
])

merged = line_merge(lines)
print(f"合并前: {lines.geom_type}, {len(lines.geoms)} 条")
print(f"合并后: {merged.geom_type}")
print(f"合并结果: {merged}")
# LINESTRING (0 0, 1 1, 2 2, 3 3)

部分连接的线段

from shapely import MultiLineString, line_merge

# 两组不连接的线段
lines = MultiLineString([
    [(0, 0), (1, 0)],
    [(1, 0), (2, 0)],   # 与第一条连接
    [(5, 5), (6, 6)],   # 独立的线段
    [(6, 6), (7, 7)]    # 与第三条连接
])

merged = line_merge(lines)
print(f"合并结果: {merged}")
print(f"类型: {merged.geom_type}")
# MultiLineString,包含两条合并后的线
if merged.geom_type == 'MultiLineString':
    for i, geom in enumerate(merged.geoms):
        print(f"  线{i}: {geom}")

directed 参数的作用

from shapely import MultiLineString, line_merge

# 方向不一致的线段
lines = MultiLineString([
    [(0, 0), (1, 0)],
    [(2, 0), (1, 0)]   # 注意:方向相反,终点都是 (1, 0)
])

# 非有向合并:忽略方向,可以合并
merged_undirected = line_merge(lines, directed=False)
print(f"无向合并: {merged_undirected}")

# 有向合并:保持方向,无法合并
merged_directed = line_merge(lines, directed=True)
print(f"有向合并: {merged_directed}")
print(f"有向合并类型: {merged_directed.geom_type}")

复杂路网合并

from shapely import MultiLineString, line_merge

# 模拟碎片化的路网数据
road_segments = MultiLineString([
    [(0, 0), (1, 0)],
    [(1, 0), (2, 0)],
    [(2, 0), (3, 0)],
    [(3, 0), (3, 1)],
    [(3, 1), (3, 2)],
])

merged_road = line_merge(road_segments)
print(f"合并前: {len(road_segments.geoms)} 段")
print(f"合并后: {merged_road.geom_type}")
print(f"合并结果: {merged_road}")
# 所有段首尾相连,合并为一条 LineString

shared_paths:共享路径

函数签名

shapely.shared_paths(line1, line2)

找出两条线的共享路径段,返回一个 GeometryCollection,包含同向共享段和反向共享段。

基本用法

from shapely import LineString, shared_paths

line1 = LineString([(0, 0), (5, 0), (10, 0)])
line2 = LineString([(3, 0), (7, 0), (7, 5)])

result = shared_paths(line1, line2)
print(f"结果类型: {result.geom_type}")
print(f"结果: {result}")

# 返回 GeometryCollection
# 第一个元素:同向共享段(方向相同)
# 第二个元素:反向共享段(方向相反)
same_direction = result.geoms[0]
opposite_direction = result.geoms[1]

print(f"同向共享: {same_direction}")
print(f"反向共享: {opposite_direction}")

反向共享路径

from shapely import LineString, shared_paths

# 两条线在 (2,0)-(5,0) 段重叠,但方向相反
line1 = LineString([(0, 0), (2, 0), (5, 0), (8, 0)])
line2 = LineString([(6, 1), (5, 0), (2, 0), (0, 1)])

result = shared_paths(line1, line2)
same_dir = result.geoms[0]
opp_dir = result.geoms[1]

print(f"同向共享段: {same_dir}")
print(f"反向共享段: {opp_dir}")
# 由于 line2 中 (5,0)→(2,0) 与 line1 的 (2,0)→(5,0) 方向相反
# 所以出现在反向共享段中

道路重叠检测

from shapely import LineString, shared_paths

# 两条公交线路
bus_route_1 = LineString([
    (0, 0), (2, 0), (4, 0), (6, 0), (8, 0), (10, 0)
])
bus_route_2 = LineString([
    (3, 2), (4, 0), (6, 0), (8, 0), (9, 2)
])

result = shared_paths(bus_route_1, bus_route_2)
shared_same = result.geoms[0]

if not shared_same.is_empty:
    print(f"两条公交线路共享路段: {shared_same}")
    if hasattr(shared_same, 'geoms'):
        for seg in shared_same.geoms:
            print(f"  共享段长度: {seg.length:.2f}")
    else:
        print(f"  共享段长度: {shared_same.length:.2f}")

shortest_line:两几何体间最短连线

函数签名

shapely.shortest_line(geom1, geom2)

返回连接两个几何体最近点的线段。

基本用法

from shapely import Point, LineString, Polygon, shortest_line

# 点到线的最短连线
point = Point(5, 5)
line = LineString([(0, 0), (10, 0)])

result = shortest_line(point, line)
print(f"最短连线: {result}")
print(f"最短距离: {result.length}")  # 5.0

几何体之间的最短连线

from shapely import LineString, Polygon, shortest_line

# 两条不相交的线
line1 = LineString([(0, 0), (5, 0)])
line2 = LineString([(3, 3), (8, 3)])

short = shortest_line(line1, line2)
print(f"线到线最短连线: {short}")
print(f"最短距离: {short.length:.4f}")

# 两个不相交的多边形
poly1 = Polygon([(0, 0), (2, 0), (2, 2), (0, 2)])
poly2 = Polygon([(5, 5), (7, 5), (7, 7), (5, 7)])

short2 = shortest_line(poly1, poly2)
print(f"面到面最短连线: {short2}")
print(f"最短距离: {short2.length:.4f}")

最近设施查找

from shapely import Point, Polygon, shortest_line

# 用户位置
user = Point(5, 5)

# 几个建筑物(多边形)
buildings = [
    Polygon([(0, 0), (2, 0), (2, 2), (0, 2)]),
    Polygon([(8, 3), (10, 3), (10, 5), (8, 5)]),
    Polygon([(3, 7), (5, 7), (5, 9), (3, 9)]),
]
names = ["超市", "医院", "学校"]

# 查找最近的建筑物
min_dist = float('inf')
nearest_name = None
nearest_line = None

for name, building in zip(names, buildings):
    sl = shortest_line(user, building)
    if sl.length < min_dist:
        min_dist = sl.length
        nearest_name = name
        nearest_line = sl

print(f"最近的设施: {nearest_name}")
print(f"距离: {min_dist:.2f}")
print(f"连线: {nearest_line}")

shapely.ops.substring:提取线段子串

函数签名

shapely.ops.substring(geometry, start_dist, end_dist, normalized=False)

从线上提取指定起止距离之间的子串。

基本用法

from shapely import LineString
from shapely.ops import substring

line = LineString([(0, 0), (10, 0)])

# 提取 2 到 7 之间的子串
sub = substring(line, 2, 7)
print(f"子串: {sub}")  # LINESTRING (2 0, 7 0)
print(f"子串长度: {sub.length}")  # 5.0

归一化距离提取

from shapely import LineString
from shapely.ops import substring

line = LineString([(0, 0), (5, 0), (10, 0), (10, 10)])
print(f"总长: {line.length}")  # 20.0

# 提取前半段
first_half = substring(line, 0, 0.5, normalized=True)
print(f"前半段: {first_half}")
print(f"前半段长度: {first_half.length}")  # 10.0

# 提取后半段
second_half = substring(line, 0.5, 1.0, normalized=True)
print(f"后半段: {second_half}")
print(f"后半段长度: {second_half.length}")  # 10.0

折线子串提取

from shapely import LineString
from shapely.ops import substring

# L 形折线
line = LineString([(0, 0), (10, 0), (10, 10)])

# 提取跨越拐角的子串
sub = substring(line, 5, 15)
print(f"跨拐角子串: {sub}")
# LINESTRING (5 0, 10 0, 10 5)

# 等分线段
n_parts = 4
length = line.length
for i in range(n_parts):
    start = i * length / n_parts
    end = (i + 1) * length / n_parts
    part = substring(line, start, end)
    print(f"  第{i+1}段: {part}, 长度={part.length:.2f}")

反向提取

from shapely import LineString
from shapely.ops import substring

line = LineString([(0, 0), (10, 0)])

# start_dist > end_dist 时,返回反向子串
reversed_sub = substring(line, 7, 2)
print(f"反向子串: {reversed_sub}")
# LINESTRING (7 0, 2 0) — 方向反转

shapely.ops.split:分割几何体

函数签名

shapely.ops.split(geometry, splitter)

用一个几何体分割另一个几何体,返回 GeometryCollection。

用点分割线

from shapely import LineString, Point
from shapely.ops import split, snap

line = LineString([(0, 0), (10, 0)])

# 点必须在线上才能分割
split_point = Point(5, 0)
result = split(line, split_point)

print(f"分割结果数量: {len(result.geoms)}")
for i, geom in enumerate(result.geoms):
    print(f"  片段{i}: {geom}")

处理点不在线上的情况

from shapely import LineString, Point
from shapely.ops import split, snap

line = LineString([(0, 0), (10, 0)])
point = Point(5, 0.001)  # 不完全在线上

# 先将点 snap 到线上
snapped = snap(line, point, tolerance=0.01)
result = split(snapped, point)

print(f"分割结果: {len(result.geoms)} 段")
for i, geom in enumerate(result.geoms):
    print(f"  片段{i}: {geom}")

用线分割面

from shapely import Polygon, LineString
from shapely.ops import split

polygon = Polygon([(0, 0), (10, 0), (10, 10), (0, 10)])
splitter = LineString([(-1, 5), (11, 5)])  # 水平切割线

result = split(polygon, splitter)
print(f"分割结果数量: {len(result.geoms)}")
for i, geom in enumerate(result.geoms):
    print(f"  片段{i}: {geom}")
    print(f"    面积: {geom.area}")

用多条线分割

from shapely import Polygon, LineString, MultiLineString
from shapely.ops import split

polygon = Polygon([(0, 0), (10, 0), (10, 10), (0, 10)])

# 十字切割
splitter_h = LineString([(-1, 5), (11, 5)])
splitter_v = LineString([(5, -1), (5, 11)])

# 先水平切割
result1 = split(polygon, splitter_h)
print(f"第一次切割: {len(result1.geoms)} 块")

# 再对每块进行垂直切割
all_parts = []
for geom in result1.geoms:
    result2 = split(geom, splitter_v)
    all_parts.extend(result2.geoms)

print(f"最终: {len(all_parts)} 块")
for i, part in enumerate(all_parts):
    print(f"  块{i}: 面积={part.area}")

shapely.ops.nearest_points:最近点对

函数签名

shapely.ops.nearest_points(geom1, geom2)

返回两个几何体上最近的一对点。

基本用法

from shapely import Point, LineString
from shapely.ops import nearest_points

point = Point(5, 5)
line = LineString([(0, 0), (10, 0)])

# 返回一个包含两个点的元组
p1, p2 = nearest_points(point, line)
print(f"几何体1上的最近点: {p1}")  # POINT (5 5)
print(f"几何体2上的最近点: {p2}")  # POINT (5 0)
print(f"距离: {p1.distance(p2)}")   # 5.0

多边形之间的最近点

from shapely import Polygon
from shapely.ops import nearest_points

poly1 = Polygon([(0, 0), (2, 0), (2, 2), (0, 2)])
poly2 = Polygon([(5, 5), (7, 5), (7, 7), (5, 7)])

p1, p2 = nearest_points(poly1, poly2)
print(f"多边形1上最近点: {p1}")
print(f"多边形2上最近点: {p2}")
print(f"两多边形最短距离: {p1.distance(p2):.4f}")

查找多个几何体中最近的

from shapely import Point, Polygon
from shapely.ops import nearest_points

target = Point(5, 5)
geometries = [
    Polygon([(0, 0), (1, 0), (1, 1), (0, 1)]),
    Polygon([(3, 4), (5, 4), (5, 6), (3, 6)]),
    Polygon([(8, 8), (10, 8), (10, 10), (8, 10)]),
]
labels = ["A区", "B区", "C区"]

min_dist = float('inf')
nearest_label = None
nearest_pair = None

for label, geom in zip(labels, geometries):
    p1, p2 = nearest_points(target, geom)
    dist = p1.distance(p2)
    if dist < min_dist:
        min_dist = dist
        nearest_label = label
        nearest_pair = (p1, p2)

print(f"最近区域: {nearest_label}")
print(f"最短距离: {min_dist:.4f}")
print(f"目标上最近点: {nearest_pair[0]}")
print(f"区域上最近点: {nearest_pair[1]}")

实际应用案例

案例一:道路里程桩定位

from shapely import LineString, Point
from shapely import line_interpolate_point, line_locate_point

# 定义一条道路(简化的坐标)
road = LineString([
    (0, 0), (10, 0), (20, 5), (30, 5), (40, 10)
])
road_length = road.length
print(f"道路总长: {road_length:.2f}")

# 设置里程桩(每 10 个单位一个)
milestones = {}
dist = 0
while dist <= road_length:
    point = line_interpolate_point(road, dist)
    milestones[dist] = point
    print(f"里程桩 K{dist:.0f}: ({point.x:.2f}, {point.y:.2f})")
    dist += 10

# 报告事故位置
accident_location = Point(15, 3)
accident_mileage = line_locate_point(road, accident_location)
print(f"\n事故发生在里程 K{accident_mileage:.1f} 处")

# 找到最近的前后里程桩
prev_milestone = int(accident_mileage // 10) * 10
next_milestone = prev_milestone + 10
print(f"位于 K{prev_milestone} 和 K{next_milestone} 之间")
print(f"距 K{prev_milestone}{accident_mileage - prev_milestone:.1f} 单位")

案例二:管线事故定位

from shapely import LineString, Point
from shapely import line_interpolate_point, line_locate_point
from shapely.ops import substring

# 天然气管线
pipeline = LineString([
    (0, 0), (100, 0), (100, 50),
    (200, 50), (200, 100), (300, 100)
])
print(f"管线总长: {pipeline.length:.0f} 米")

# 定义管线设施位置(按里程)
facilities = {
    "阀门A": 50,
    "阀门B": 150,
    "阀门C": 280,
    "压力站1": 100,
    "压力站2": 250,
}

# 泄漏检测到的位置
leak_point = Point(150, 50)
leak_mileage = line_locate_point(pipeline, leak_point)
print(f"泄漏点里程: {leak_mileage:.1f} 米")

# 查找泄漏点两侧最近的阀门
valves = {k: v for k, v in facilities.items() if "阀门" in k}
upstream_valve = None
downstream_valve = None

for name, mileage in sorted(valves.items(), key=lambda x: x[1]):
    if mileage <= leak_mileage:
        upstream_valve = (name, mileage)
    elif downstream_valve is None:
        downstream_valve = (name, mileage)

print(f"上游阀门: {upstream_valve[0]} (K{upstream_valve[1]})")
print(f"下游阀门: {downstream_valve[0]} (K{downstream_valve[1]})")

# 提取需要关闭的管段
affected_section = substring(
    pipeline,
    upstream_valve[1],
    downstream_valve[1]
)
print(f"受影响管段长度: {affected_section.length:.1f} 米")

案例三:GPS 轨迹匹配

from shapely import LineString, Point
from shapely import line_interpolate_point, line_locate_point, shortest_line

# 道路网络(简化为两条路)
road_a = LineString([(0, 0), (100, 0)])
road_b = LineString([(0, 0), (0, 100)])

roads = {"东西路": road_a, "南北路": road_b}

# GPS 采集的轨迹点(可能有误差)
gps_points = [
    Point(10, 2),   # 接近东西路
    Point(25, -1),  # 接近东西路
    Point(50, 3),   # 接近东西路
    Point(75, 1),   # 接近东西路
]

print("GPS 轨迹匹配结果:")
print("-" * 60)

for i, gps_pt in enumerate(gps_points):
    best_road = None
    best_dist = float('inf')
    best_proj_dist = 0

    for name, road in roads.items():
        sl = shortest_line(gps_pt, road)
        if sl.length < best_dist:
            best_dist = sl.length
            best_road = name
            best_proj_dist = line_locate_point(road, gps_pt)

    matched_point = line_interpolate_point(
        roads[best_road], best_proj_dist
    )

    print(f"GPS点{i}: ({gps_pt.x:.0f}, {gps_pt.y:.0f})")
    print(f"  匹配道路: {best_road}")
    print(f"  匹配里程: {best_proj_dist:.1f}")
    print(f"  匹配点: ({matched_point.x:.1f}, {matched_point.y:.1f})")
    print(f"  偏差: {best_dist:.2f}")

案例四:等距采样与平滑

from shapely import LineString
from shapely import line_interpolate_point
import math

# 原始不规则线
original = LineString([
    (0, 0), (3, 4), (7, 2), (12, 8), (15, 3), (20, 6)
])

# 等距采样:每 1 个单位采样一次
sample_interval = 1.0
total_length = original.length
num_samples = int(total_length / sample_interval) + 1

sampled_coords = []
for i in range(num_samples):
    dist = min(i * sample_interval, total_length)
    pt = line_interpolate_point(original, dist)
    sampled_coords.append((pt.x, pt.y))

resampled = LineString(sampled_coords)
print(f"原始顶点数: {len(original.coords)}")
print(f"重采样顶点数: {len(resampled.coords)}")
print(f"原始长度: {original.length:.4f}")
print(f"重采样长度: {resampled.length:.4f}")

小结

本章介绍了 Shapely 中的线性参考功能,包括:

  • line_interpolate_point:根据距离在线上插值点
  • line_locate_point:计算点在线上的投影距离
  • line_merge:合并首尾相连的线段
  • shared_paths:查找两条线的共享路径
  • shortest_line:计算两个几何体间的最短连线
  • ops.substring:提取线上指定范围的子串
  • ops.split:用几何体分割另一个几何体
  • ops.nearest_points:查找两个几何体间最近的点对

这些函数在道路管理、管线维护、GPS 定位等实际应用中有着广泛的用途。掌握线性参考技术,可以高效解决沿线定位和分析问题。