本篇文章是对上篇内容的一个深化探讨,通过生成多个矩形来实现一定范围的道路交通态势查询,在上一篇文章中,我们详细介绍了如何利用单个矩形区域查询功能来获取特定区域内的实时交通状况。然而,在实际应用中,城市交通网络复杂多变,单一矩形往往难以覆盖大面积的地理范围或满足更精细的数据需求。
因此,本篇文章进一步提出了一个多矩形查询的方法论,通过将一个大的监测区域细分为若干个小矩形,确保每个子矩形都符合高德地图API对于查询区域尺寸的要求(即对角线不超过10公里)。这种方法不仅能够提高数据采集的准确性和完整性,还能有效应对不同规模和形态的城市交通环境,通过定义中心点与总覆盖范围,来科学地划分多个子矩形,从而实现一定范围的道路交通态势查询。
这是上篇文章的的矩形范围内的交通态势查询边界;
我们通过python脚本生成多个相同大小的矩形区域,以实现对指定范围的全面覆盖。这种方式确保每个矩形都符合API的尺寸要求,同时能够系统性地覆盖整个目标区域;
主要功能:
- 输入中心点坐标和半径,生成一个矩形网格系统,可以指定要生成的层数(默认2层)
- 每层矩形围绕中心矩形均匀分布,生成可交互的HTML地图文件
- 打印每个矩形的坐标范围
完整代码#运行环境 Python 3.11
from math import cos, radians
import folium
from folium.plugins import MeasureControl
def generate_rectangle(center_point, radius_km):
"""
通过中心点和半径生成矩形范围
"""
try:
center_lon, center_lat = map(float, center_point.split(','))
lat_diff = radius_km / 111.0
lon_diff = radius_km / (111.0 * cos(radians(center_lat)))
min_lon = center_lon - lon_diff
min_lat = center_lat - lat_diff
max_lon = center_lon + lon_diff
max_lat = center_lat + lat_diff
rectangle = f"{min_lon:.6f},{min_lat:.6f};{max_lon:.6f},{max_lat:.6f}"
return {
'rectangle': rectangle,
'center': [center_lat, center_lon],
'bounds': [[min_lat, min_lon], [max_lat, max_lon]],
'lat_diff': lat_diff,
'lon_diff': lon_diff
}
except Exception as e:
print(f"生成矩形范围时出错: {str(e)}")
return None
def generate_surrounding_rectangles(center_point, radius_km, layers=2):
"""
生成多层矩形的坐标
layers: 要生成的层数(默认2层)
"""
center_coords = generate_rectangle(center_point, radius_km)
if not center_coords:
return None
center_lon, center_lat = map(float, center_point.split(','))
all_rectangles = []
# 生成每一层的矩形
for layer in range(1, layers + 1):
# 当前层的偏移量
current_lat_diff = center_coords['lat_diff'] * 2 * layer
current_lon_diff = center_coords['lon_diff'] * 2 * layer
layer_centers = []
# 上下各 2*layer+1 个点
for i in range(-layer, layer + 1):
layer_centers.append((center_lon + i * center_coords['lon_diff'] * 2,
center_lat + current_lat_diff)) # 上
layer_centers.append((center_lon + i * center_coords['lon_diff'] * 2,
center_lat - current_lat_diff)) # 下
# 左右各 2*(layer-1)+1 个点(不包括已添加的角点)
for i in range(-(layer - 1), layer):
layer_centers.append((center_lon - current_lon_diff,
center_lat + i * center_coords['lat_diff'] * 2)) # 左
layer_centers.append((center_lon + current_lon_diff,
center_lat + i * center_coords['lat_diff'] * 2)) # 右
# 为当前层的所有中心点生成矩形
layer_rectangles = []
for i, (lon, lat) in enumerate(layer_centers):
rect = generate_rectangle(f"{lon},{lat}", radius_km)
if rect:
rect['direction'] = f"第{layer}层{i + 1}号"
rect['layer'] = layer
layer_rectangles.append(rect)
all_rectangles.extend(layer_rectangles)
return center_coords, all_rectangles
def visualize_rectangles(center_point, radius_km, layers=2):
"""
可视化中心矩形和周围的矩形,并以特定格式打印坐标
"""
result = generate_surrounding_rectangles(center_point, radius_km, layers)
if not result:
return
center_coords, surrounding_rectangles = result
# 创建地图
m = folium.Map(
location=center_coords['center'],
zoom_start=12,
tiles='https://webst02.is.autonavi.com/appmaptile?style=7&x={x}&y={y}&z={z}',
attr='AutoNavi'
)
# 添加中心矩形
folium.Rectangle(
bounds=center_coords['bounds'],
color='red',
fill=True,
fillColor='red',
fillOpacity=0.2,
popup='中心矩形'
).add_to(m)
# 为每一层设置不同的基础颜色
base_colors = ['blue', 'green', 'purple', 'orange', 'darkred', 'darkblue',
'darkgreen', 'cadetblue', 'lightred', 'lightblue', 'lightgreen',
'gray', 'pink', 'beige', 'lightgray', 'black']
# 添加周围矩形
for rect in surrounding_rectangles:
# 根据层数选择颜色
color = base_colors[(rect['layer'] - 1) % len(base_colors)]
folium.Rectangle(
bounds=rect['bounds'],
color=color,
fill=True,
fillColor=color,
fillOpacity=0.2,
popup=f"{rect['direction']}"
).add_to(m)
# 添加中心点标记
folium.Marker(
rect['center'],
popup=f"{rect['direction']}中心点",
icon=folium.Icon(color='green', icon='info-sign')
).add_to(m)
# 添加中心点标记
folium.Marker(
center_coords['center'],
popup='中心点',
icon=folium.Icon(color='red', icon='info-sign')
).add_to(m)
# 添加测量控件
m.add_child(MeasureControl())
# 保存地图
output_file = 'rectangles_visualization.html'
m.save(output_file)
print(f"可视化结果已保存到: {output_file}")
# 修改后的打印格式
print("\nrectangles = [")
# 首先打印中心矩形
print(f" '{center_coords['rectangle']}',")
# 然后打印周围的矩形
for rect in surrounding_rectangles:
print(f" '{rect['rectangle']}',")
print("]")
return center_coords, surrounding_rectangles
if __name__ == "__main__":
center_point = "121.446433,31.22321" # 中心点坐标
radius = 2.0 # 半径2公里
layers = 2 # 想要生成的层数
visualize_rectangles(center_point, radius, layers)
运行结束会打印出所有矩形是左上右下的对角线坐标的一个字典方便直接复制到下一个脚本使用和一个html的可视化地图方便查看覆盖范围是否满足预期;
接下来,我们把上一步生成的坐标字典替换下个脚本的 rectangles = [ ]的部分,通过高德交通态势api开始遍历每一组坐标,并增加了一个随机延时,防止频繁访问,最后把输出结果整合到一个shp中,且保证输出地理坐标系是WGS84坐标系;
主要功能:
- 获取多个矩形区域的交通数据
- 将所有数据合并到一个文件中,并增加了一个随机延时
- 基于 name、direction 和 angle 进行去重
- 输出一个 shp 文件(去重后的数据、wgs84坐标系)
完整代码#运行环境 Python 3.11
import requests
import geopandas as gpd
import pandas as pd
from shapely.geometry import LineString
from datetime import datetime
import os
import math
import numpy as np
from shapely.ops import transform
from functools import partial
import time
import random
# 坐标转换参数
x_pi = 3.14159265358979324 * 3000.0 / 180.0
pi = 3.1415926535897932384626 # π
a = 6378245.0 # 长半轴
ee = 0.00669342162296594323 # 扁率
def gcj02towgs84(lng, lat):
"""
GCJ02(火星坐标系)转GPS84
:param lng:火星坐标系的经度
:param lat:火星坐标系纬度
:return:
"""
if out_of_china(lng, lat):
return lng, lat
dlat = transformlat(lng - 105.0, lat - 35.0)
dlng = transformlng(lng - 105.0, lat - 35.0)
radlat = lat / 180.0 * pi
magic = math.sin(radlat)
magic = 1 - ee * magic * magic
sqrtmagic = math.sqrt(magic)
dlat = (dlat * 180.0) / ((a * (1 - ee)) / (magic * sqrtmagic) * pi)
dlng = (dlng * 180.0) / (a / sqrtmagic * math.cos(radlat) * pi)
mglat = lat + dlat
mglng = lng + dlng
return [lng * 2 - mglng, lat * 2 - mglat]
def transformlat(lng, lat):
ret = -100.0 + 2.0 * lng + 3.0 * lat + 0.2 * lat * lat + 0.1 * lng * lat + 0.2 * math.sqrt(math.fabs(lng))
ret += (20.0 * math.sin(6.0 * lng * pi) + 20.0 * math.sin(2.0 * lng * pi)) * 2.0 / 3.0
ret += (20.0 * math.sin(lat * pi) + 40.0 * math.sin(lat / 3.0 * pi)) * 2.0 / 3.0
ret += (160.0 * math.sin(lat / 12.0 * pi) + 320 * math.sin(lat * pi / 30.0)) * 2.0 / 3.0
return ret
def transformlng(lng, lat):
ret = 300.0 + lng + 2.0 * lat + 0.1 * lng * lng + 0.1 * lng * lat + 0.1 * math.sqrt(math.fabs(lng))
ret += (20.0 * math.sin(6.0 * lng * pi) + 20.0 * math.sin(2.0 * lng * pi)) * 2.0 / 3.0
ret += (20.0 * math.sin(lng * pi) + 40.0 * math.sin(lng / 3.0 * pi)) * 2.0 / 3.0
ret += (150.0 * math.sin(lng / 12.0 * pi) + 300.0 * math.sin(lng / 30.0 * pi)) * 2.0 / 3.0
return ret
def out_of_china(lng, lat):
"""
判断是否在国内,不在国内不做偏移
"""
if lng < 72.004 or lng > 137.8347:
return True
if lat < 0.8293 or lat > 55.8271:
return True
return False
def transform_geometry(geom):
"""转换几何对象的坐标"""
def transform_coords(x, y, z=None):
wgs_x, wgs_y = gcj02towgs84(x, y)
return wgs_x, wgs_y
return transform(transform_coords, geom)
def get_traffic_data(rectangle, key, current_time):
"""获取交通数据并返回GeoDataFrame"""
try:
# 初始化数据字典
data = {
'name': [],
'status': [],
'status_desc': [],
'direction': [],
'angle': [],
'speed': [],
'timestamp': [],
'date': [],
'time': [],
'geometry': []
}
# 状态码映射
status_mapping = {
"0": "未知",
"1": "畅通",
"2": "缓行",
"3": "拥堵"
}
# 构建API请求URL
url = f'https://restapi.amap.com/v3/traffic/status/rectangle?rectangle={rectangle}&output=json&extensions=all&key={key}&level=6'
# 发送请求并获取JSON响应
res = requests.get(url, timeout=10).json()
# 遍历每条道路数据
for road in res['trafficinfo']['roads']:
try:
polylines = [(float(y[0]), float(y[1])) for y in
[x.split(',') for x in road['polyline'].split(';')]]
# 创建线几何对象
line = LineString(polylines)
# 转换为WGS84坐标系
wgs84_line = transform_geometry(line)
status = road.get('status', '0')
data['geometry'].append(wgs84_line)
data['name'].append(road.get('name', ''))
data['status'].append(float(status))
data['status_desc'].append(status_mapping.get(status, '未知'))
data['direction'].append(road.get('direction', ''))
data['angle'].append(float(road.get('angle', 0)))
data['speed'].append(int(road.get('speed', 0)))
data['timestamp'].append(current_time.strftime("%Y-%m-%d %H:%M:%S"))
data['date'].append(current_time.strftime("%Y-%m-%d"))
data['time'].append(current_time.strftime("%H:%M:%S"))
except Exception as e:
print(f"处理单条道路数据时出错: {str(e)}")
continue
# 创建GeoDataFrame对象
gdf = gpd.GeoDataFrame(data, geometry='geometry', crs='EPSG:4326')
gdf['status'] = gdf['status'].astype(np.float64)
return gdf
except Exception as e:
print(f"处理出错: {str(e)}")
return None
def process_rectangles_sequential(rectangles, key='你的key', output_dir='output'):
"""
顺序处理多个矩形区域的交通数据,将所有数据保存在同一个时间段的文件中,并进行去重
增加随机延时,避免请求过于频繁
"""
# 生成统一的时间戳
current_time = datetime.now()
filename = f'traffic_status_{current_time.strftime("%Y%m%d%H%M%S")}_wgs84.shp'
output_path = os.path.join(output_dir, filename)
# 用于存储所有区域的数据
all_gdfs = []
# 创建输出目录
os.makedirs(output_dir, exist_ok=True)
total_rectangles = len(rectangles)
for i, rectangle in enumerate(rectangles, 1):
try:
print(f"开始处理区域 [{i}/{total_rectangles}]: {rectangle}")
gdf = get_traffic_data(rectangle, key, current_time)
if gdf is not None:
all_gdfs.append(gdf)
print(f"区域 {rectangle} 处理完成")
# 随机延时,除了最后一个请求
if i < total_rectangles:
delay = random.uniform(1, 3) # 随机 1-3 秒延时
print(f"等待 {delay:.1f} 秒后继续...")
time.sleep(delay)
except Exception as e:
print(f"处理区域 {rectangle} 时出错: {str(e)}")
# 发生错误时也添加延时
time.sleep(random.uniform(2, 4))
# 合并所有数据并去重
if all_gdfs:
print("\n开始合并和去重数据...")
# 合并所有数据
combined_gdf = gpd.GeoDataFrame(pd.concat(all_gdfs, ignore_index=True))
# 去重前的记录数
before_count = len(combined_gdf)
# 基于name、direction和angle进行去重
combined_gdf = combined_gdf.drop_duplicates(
subset=['name', 'direction', 'angle'],
keep='first'
)
# 去重后的记录数
after_count = len(combined_gdf)
# 保存处理后的数据
combined_gdf.to_file(output_path, encoding='utf-8')
print(f"已保存所有数据到: {output_path}")
print(f"去重统计: {before_count} -> {after_count} 条记录 (删除了 {before_count - after_count} 条重复记录)")
if __name__ == "__main__":
rectangles = [
'121.425363,31.205192;121.467503,31.241228',
'121.383215,31.241228;121.425371,31.277264',
'121.383231,31.169156;121.425355,31.205192',
'121.425355,31.241228;121.467511,31.277264',
'121.425371,31.169156;121.467495,31.205192',
] # 替换坐标的部分
process_rectangles_sequential(rectangles)
生成结果如下,我们获取个整个范围的道路的交通态势情况,生成字段包括name(道路名称)、
status(路况)、direction(方向描述)、angle(车行角度,判断道路正反向使用)、speed(平均速度)status_description(路况综述);
通过status字段可视化结果如下,路况结果分四级(0:未知;1:畅通;2:缓行;3:拥堵);
文章仅用于分享个人学习成果与个人存档之用,分享知识,如有侵权,请联系作者进行删除。所有信息均基于作者的个人理解和经验,不代表任何官方立场或权威解读。
标签:指南,center,Python,lon,lat,lng,矩形,rectangles,高德 From: https://blog.csdn.net/weixin_45812624/article/details/145016900