首页 > 编程语言 >Python应用指南:高德交通态势数据(二)

Python应用指南:高德交通态势数据(二)

时间:2025-01-10 20:03:39浏览次数:3  
标签:指南 center Python lon lat lng 矩形 rectangles 高德

本篇文章是对上篇内容的一个深化探讨,通过生成多个矩形来实现一定范围的道路交通态势查询,在上一篇文章中,我们详细介绍了如何利用单个矩形区域查询功能来获取特定区域内的实时交通状况。然而,在实际应用中,城市交通网络复杂多变,单一矩形往往难以覆盖大面积的地理范围或满足更精细的数据需求。

因此,本篇文章进一步提出了一个多矩形查询的方法论,通过将一个大的监测区域细分为若干个小矩形,确保每个子矩形都符合高德地图API对于查询区域尺寸的要求(即对角线不超过10公里)。这种方法不仅能够提高数据采集的准确性和完整性,还能有效应对不同规模和形态的城市交通环境,通过定义中心点与总覆盖范围,来科学地划分多个子矩形,从而实现一定范围的道路交通态势查询。

这是上篇文章的的矩形范围内的交通态势查询边界;

我们通过python脚本生成多个相同大小的矩形区域,以实现对指定范围的全面覆盖。这种方式确保每个矩形都符合API的尺寸要求,同时能够系统性地覆盖整个目标区域;

主要功能:

  • 输入中心点坐标和半径,生成一个矩形网格系统,可以指定要生成的层数(默认2层)
  • 每层矩形围绕中心矩形均匀分布,生成可交互的HTML地图文件
  • 打印每个矩形的坐标范围

完整代码#运行环境 Python 3.11

from math import cos, radians
import folium
from folium.plugins import MeasureControl

def generate_rectangle(center_point, radius_km):
    """
    通过中心点和半径生成矩形范围
    """
    try:
        center_lon, center_lat = map(float, center_point.split(','))
        lat_diff = radius_km / 111.0
        lon_diff = radius_km / (111.0 * cos(radians(center_lat)))

        min_lon = center_lon - lon_diff
        min_lat = center_lat - lat_diff
        max_lon = center_lon + lon_diff
        max_lat = center_lat + lat_diff

        rectangle = f"{min_lon:.6f},{min_lat:.6f};{max_lon:.6f},{max_lat:.6f}"

        return {
            'rectangle': rectangle,
            'center': [center_lat, center_lon],
            'bounds': [[min_lat, min_lon], [max_lat, max_lon]],
            'lat_diff': lat_diff,
            'lon_diff': lon_diff
        }

    except Exception as e:
        print(f"生成矩形范围时出错: {str(e)}")
        return None

def generate_surrounding_rectangles(center_point, radius_km, layers=2):
    """
    生成多层矩形的坐标
    layers: 要生成的层数(默认2层)
    """
    center_coords = generate_rectangle(center_point, radius_km)
    if not center_coords:
        return None

    center_lon, center_lat = map(float, center_point.split(','))
    all_rectangles = []

    # 生成每一层的矩形
    for layer in range(1, layers + 1):
        # 当前层的偏移量
        current_lat_diff = center_coords['lat_diff'] * 2 * layer
        current_lon_diff = center_coords['lon_diff'] * 2 * layer

        layer_centers = []

        # 上下各 2*layer+1 个点
        for i in range(-layer, layer + 1):
            layer_centers.append((center_lon + i * center_coords['lon_diff'] * 2,
                                  center_lat + current_lat_diff))  # 上
            layer_centers.append((center_lon + i * center_coords['lon_diff'] * 2,
                                  center_lat - current_lat_diff))  # 下

        # 左右各 2*(layer-1)+1 个点(不包括已添加的角点)
        for i in range(-(layer - 1), layer):
            layer_centers.append((center_lon - current_lon_diff,
                                  center_lat + i * center_coords['lat_diff'] * 2))  # 左
            layer_centers.append((center_lon + current_lon_diff,
                                  center_lat + i * center_coords['lat_diff'] * 2))  # 右

        # 为当前层的所有中心点生成矩形
        layer_rectangles = []
        for i, (lon, lat) in enumerate(layer_centers):
            rect = generate_rectangle(f"{lon},{lat}", radius_km)
            if rect:
                rect['direction'] = f"第{layer}层{i + 1}号"
                rect['layer'] = layer
                layer_rectangles.append(rect)

        all_rectangles.extend(layer_rectangles)

    return center_coords, all_rectangles

def visualize_rectangles(center_point, radius_km, layers=2):
    """
    可视化中心矩形和周围的矩形,并以特定格式打印坐标
    """
    result = generate_surrounding_rectangles(center_point, radius_km, layers)
    if not result:
        return

    center_coords, surrounding_rectangles = result

    # 创建地图
    m = folium.Map(
        location=center_coords['center'],
        zoom_start=12,
        tiles='https://webst02.is.autonavi.com/appmaptile?style=7&x={x}&y={y}&z={z}',
        attr='AutoNavi'
    )

    # 添加中心矩形
    folium.Rectangle(
        bounds=center_coords['bounds'],
        color='red',
        fill=True,
        fillColor='red',
        fillOpacity=0.2,
        popup='中心矩形'
    ).add_to(m)

    # 为每一层设置不同的基础颜色
    base_colors = ['blue', 'green', 'purple', 'orange', 'darkred', 'darkblue',
                   'darkgreen', 'cadetblue', 'lightred', 'lightblue', 'lightgreen',
                   'gray', 'pink', 'beige', 'lightgray', 'black']

    # 添加周围矩形
    for rect in surrounding_rectangles:
        # 根据层数选择颜色
        color = base_colors[(rect['layer'] - 1) % len(base_colors)]

        folium.Rectangle(
            bounds=rect['bounds'],
            color=color,
            fill=True,
            fillColor=color,
            fillOpacity=0.2,
            popup=f"{rect['direction']}"
        ).add_to(m)

        # 添加中心点标记
        folium.Marker(
            rect['center'],
            popup=f"{rect['direction']}中心点",
            icon=folium.Icon(color='green', icon='info-sign')
        ).add_to(m)

    # 添加中心点标记
    folium.Marker(
        center_coords['center'],
        popup='中心点',
        icon=folium.Icon(color='red', icon='info-sign')
    ).add_to(m)

    # 添加测量控件
    m.add_child(MeasureControl())

    # 保存地图
    output_file = 'rectangles_visualization.html'
    m.save(output_file)
    print(f"可视化结果已保存到: {output_file}")

    # 修改后的打印格式
    print("\nrectangles = [")
    # 首先打印中心矩形
    print(f"    '{center_coords['rectangle']}',")
    
    # 然后打印周围的矩形
    for rect in surrounding_rectangles:
        print(f"    '{rect['rectangle']}',")
    print("]")

    return center_coords, surrounding_rectangles

if __name__ == "__main__":
    center_point = "121.446433,31.22321"  # 中心点坐标
    radius = 2.0  # 半径2公里
    layers = 2  # 想要生成的层数

    visualize_rectangles(center_point, radius, layers)

运行结束会打印出所有矩形是左上右下的对角线坐标的一个字典方便直接复制到下一个脚本使用和一个html的可视化地图方便查看覆盖范围是否满足预期;

接下来,我们把上一步生成的坐标字典替换下个脚本的 rectangles = [ ]的部分,通过高德交通态势api开始遍历每一组坐标,并增加了一个随机延时,防止频繁访问,最后把输出结果整合到一个shp中,且保证输出地理坐标系是WGS84坐标系;

主要功能:

  • 获取多个矩形区域的交通数据
  • 将所有数据合并到一个文件中,并增加了一个随机延时
  •  基于 name、direction 和 angle 进行去重
  • 输出一个 shp 文件(去重后的数据、wgs84坐标系)

完整代码#运行环境 Python 3.11

import requests
import geopandas as gpd
import pandas as pd
from shapely.geometry import LineString
from datetime import datetime
import os
import math
import numpy as np
from shapely.ops import transform
from functools import partial
import time
import random

# 坐标转换参数
x_pi = 3.14159265358979324 * 3000.0 / 180.0
pi = 3.1415926535897932384626  # π
a = 6378245.0  # 长半轴
ee = 0.00669342162296594323  # 扁率


def gcj02towgs84(lng, lat):
    """
    GCJ02(火星坐标系)转GPS84
    :param lng:火星坐标系的经度
    :param lat:火星坐标系纬度
    :return:
    """
    if out_of_china(lng, lat):
        return lng, lat
    dlat = transformlat(lng - 105.0, lat - 35.0)
    dlng = transformlng(lng - 105.0, lat - 35.0)
    radlat = lat / 180.0 * pi
    magic = math.sin(radlat)
    magic = 1 - ee * magic * magic
    sqrtmagic = math.sqrt(magic)
    dlat = (dlat * 180.0) / ((a * (1 - ee)) / (magic * sqrtmagic) * pi)
    dlng = (dlng * 180.0) / (a / sqrtmagic * math.cos(radlat) * pi)
    mglat = lat + dlat
    mglng = lng + dlng
    return [lng * 2 - mglng, lat * 2 - mglat]


def transformlat(lng, lat):
    ret = -100.0 + 2.0 * lng + 3.0 * lat + 0.2 * lat * lat + 0.1 * lng * lat + 0.2 * math.sqrt(math.fabs(lng))
    ret += (20.0 * math.sin(6.0 * lng * pi) + 20.0 * math.sin(2.0 * lng * pi)) * 2.0 / 3.0
    ret += (20.0 * math.sin(lat * pi) + 40.0 * math.sin(lat / 3.0 * pi)) * 2.0 / 3.0
    ret += (160.0 * math.sin(lat / 12.0 * pi) + 320 * math.sin(lat * pi / 30.0)) * 2.0 / 3.0
    return ret


def transformlng(lng, lat):
    ret = 300.0 + lng + 2.0 * lat + 0.1 * lng * lng + 0.1 * lng * lat + 0.1 * math.sqrt(math.fabs(lng))
    ret += (20.0 * math.sin(6.0 * lng * pi) + 20.0 * math.sin(2.0 * lng * pi)) * 2.0 / 3.0
    ret += (20.0 * math.sin(lng * pi) + 40.0 * math.sin(lng / 3.0 * pi)) * 2.0 / 3.0
    ret += (150.0 * math.sin(lng / 12.0 * pi) + 300.0 * math.sin(lng / 30.0 * pi)) * 2.0 / 3.0
    return ret


def out_of_china(lng, lat):
    """
    判断是否在国内,不在国内不做偏移
    """
    if lng < 72.004 or lng > 137.8347:
        return True
    if lat < 0.8293 or lat > 55.8271:
        return True
    return False


def transform_geometry(geom):
    """转换几何对象的坐标"""

    def transform_coords(x, y, z=None):
        wgs_x, wgs_y = gcj02towgs84(x, y)
        return wgs_x, wgs_y

    return transform(transform_coords, geom)


def get_traffic_data(rectangle, key, current_time):
    """获取交通数据并返回GeoDataFrame"""
    try:
        # 初始化数据字典
        data = {
            'name': [],
            'status': [],
            'status_desc': [],
            'direction': [],
            'angle': [],
            'speed': [],
            'timestamp': [],
            'date': [],
            'time': [],
            'geometry': []
        }

        # 状态码映射
        status_mapping = {
            "0": "未知",
            "1": "畅通",
            "2": "缓行",
            "3": "拥堵"
        }

        # 构建API请求URL
        url = f'https://restapi.amap.com/v3/traffic/status/rectangle?rectangle={rectangle}&output=json&extensions=all&key={key}&level=6'

        # 发送请求并获取JSON响应
        res = requests.get(url, timeout=10).json()

        # 遍历每条道路数据
        for road in res['trafficinfo']['roads']:
            try:
                polylines = [(float(y[0]), float(y[1])) for y in
                             [x.split(',') for x in road['polyline'].split(';')]]

                # 创建线几何对象
                line = LineString(polylines)

                # 转换为WGS84坐标系
                wgs84_line = transform_geometry(line)

                status = road.get('status', '0')

                data['geometry'].append(wgs84_line)
                data['name'].append(road.get('name', ''))
                data['status'].append(float(status))
                data['status_desc'].append(status_mapping.get(status, '未知'))
                data['direction'].append(road.get('direction', ''))
                data['angle'].append(float(road.get('angle', 0)))
                data['speed'].append(int(road.get('speed', 0)))
                data['timestamp'].append(current_time.strftime("%Y-%m-%d %H:%M:%S"))
                data['date'].append(current_time.strftime("%Y-%m-%d"))
                data['time'].append(current_time.strftime("%H:%M:%S"))
            except Exception as e:
                print(f"处理单条道路数据时出错: {str(e)}")
                continue

        # 创建GeoDataFrame对象
        gdf = gpd.GeoDataFrame(data, geometry='geometry', crs='EPSG:4326')
        gdf['status'] = gdf['status'].astype(np.float64)

        return gdf

    except Exception as e:
        print(f"处理出错: {str(e)}")
        return None


def process_rectangles_sequential(rectangles, key='你的key', output_dir='output'):
    """
    顺序处理多个矩形区域的交通数据,将所有数据保存在同一个时间段的文件中,并进行去重
    增加随机延时,避免请求过于频繁
    """
    # 生成统一的时间戳
    current_time = datetime.now()
    filename = f'traffic_status_{current_time.strftime("%Y%m%d%H%M%S")}_wgs84.shp'
    output_path = os.path.join(output_dir, filename)

    # 用于存储所有区域的数据
    all_gdfs = []

    # 创建输出目录
    os.makedirs(output_dir, exist_ok=True)

    total_rectangles = len(rectangles)
    for i, rectangle in enumerate(rectangles, 1):
        try:
            print(f"开始处理区域 [{i}/{total_rectangles}]: {rectangle}")
            gdf = get_traffic_data(rectangle, key, current_time)
            if gdf is not None:
                all_gdfs.append(gdf)
            print(f"区域 {rectangle} 处理完成")

            # 随机延时,除了最后一个请求
            if i < total_rectangles:
                delay = random.uniform(1, 3)  # 随机 1-3 秒延时
                print(f"等待 {delay:.1f} 秒后继续...")
                time.sleep(delay)

        except Exception as e:
            print(f"处理区域 {rectangle} 时出错: {str(e)}")
            # 发生错误时也添加延时
            time.sleep(random.uniform(2, 4))

    # 合并所有数据并去重
    if all_gdfs:
        print("\n开始合并和去重数据...")
        # 合并所有数据
        combined_gdf = gpd.GeoDataFrame(pd.concat(all_gdfs, ignore_index=True))

        # 去重前的记录数
        before_count = len(combined_gdf)

        # 基于name、direction和angle进行去重
        combined_gdf = combined_gdf.drop_duplicates(
            subset=['name', 'direction', 'angle'],
            keep='first'
        )

        # 去重后的记录数
        after_count = len(combined_gdf)

        # 保存处理后的数据
        combined_gdf.to_file(output_path, encoding='utf-8')
        print(f"已保存所有数据到: {output_path}")
        print(f"去重统计: {before_count} -> {after_count} 条记录 (删除了 {before_count - after_count} 条重复记录)")


if __name__ == "__main__":
    rectangles = [
        '121.425363,31.205192;121.467503,31.241228',
        '121.383215,31.241228;121.425371,31.277264',
        '121.383231,31.169156;121.425355,31.205192',
        '121.425355,31.241228;121.467511,31.277264',
        '121.425371,31.169156;121.467495,31.205192',
   
    ]  # 替换坐标的部分

    process_rectangles_sequential(rectangles) 

生成结果如下,我们获取个整个范围的道路的交通态势情况,生成字段包括name(道路名称)、
status(路况)、direction(方向描述)、angle(车行角度,判断道路正反向使用)、speed(平均速度)status_description(路况综述);

通过status字段可视化结果如下,路况结果分四级(0:未知;1:畅通;2:缓行;3:拥堵);

文章仅用于分享个人学习成果与个人存档之用,分享知识,如有侵权,请联系作者进行删除。所有信息均基于作者的个人理解和经验,不代表任何官方立场或权威解读。

标签:指南,center,Python,lon,lat,lng,矩形,rectangles,高德
From: https://blog.csdn.net/weixin_45812624/article/details/145016900

相关文章

  • 基于python的网页表格数据下载--转excel
    基于Python的网页表格数据爬取与下载:以维基百科为例目录基于Python的网页表格数据爬取与下载:以维基百科为例1.背景介绍2.工具与环境3.操作步骤1.获取网页内容2.定位表格元素3.表格变身PandasDataFrame4.检查数据,收工!5.进阶玩法与优化6.完......
  • CSV库写数据 生成Excel表格数据 Python
    一维数据1.代码importcsvlist1=["name","age","school","address"]filew=open('asheet.csv',"w")filew.write(",".join(list1))filew.close()filew=open('asheet.csv','r')lin......
  • 毕业设计-可白嫖源码-基于python的零食小铺管理系统(案例分析)
       摘 要从上世纪末到目前,计算机科学技术已经被尝试应用在各个职业各个领域,在商业贸易上,使用比重较高。计算机科学技术包括软件技术、网络技术、硬件技术等,越来越多的商家使用计算机来进行营业,出售、收购、宣传各类商品,各类商业系统、软件解放了商家的双手,使商家把利益最......
  • python学opencv|读取图像(三十一)缩放图像的三种方法
    【1】引言前序学习进程中,我们至少掌握了两种方法,可以实现对图像实现缩放。第一种方法是调用cv2.resize()函数实现,相关学习链接为:python学opencv|读取图像(三)放大和缩小图像_pythonopencv读取图片缩放-CSDN博客第二种方法是在cv2.getRotationMatrix2D()函数旋转缩放图像时,......
  • python学opencv|读取图像(三十)使用cv2.getAffineTransform()函数倾斜拉伸图像
    【1】引言前序已经学习了如何平移和旋转缩放图像,相关文章链接为:python学opencv|读取图像(二十七)使用cv2.warpAffine()函数平移图像-CSDN博客python学opencv|读取图像(二十八)使用cv2.getRotationMatrix2D()函数旋转缩放图像-CSDN博客在此基础上,我们尝试倾斜拉伸图【2】核心代码......
  • 使用Azure OpenAI实现检索代理的实践指南
    在当今的信息爆炸时代,如何高效地从巨量的数据中提取出有用的信息成为了技术领域的一个重要挑战。AzureOpenAI提供了一种强大的检索代理架构,能够在Arxiv等学术领域进行高效检索。在本篇文章中,我们将深入探讨如何利用Retrieval-Agent包,结合AzureOpenAI和LangChain,构建一个......
  • 用python调用AlistClient 批量递归下载百度网盘指定目录文件,基于Alist
    importosimportrequestsfromalistimportAlistClientfromurllib.parseimportunquote,urlparsedefdownload_file(url,local_path):response=requests.get(url,stream=True)total_size=int(response.headers.get('content-length',0))......
  • 使用 Taro 开发鸿蒙原生应用 —— 快速上手,鸿蒙应用开发指南
    作者:京东零售利齐诺随着鸿蒙系统的不断完善,许多应用厂商都希望将自己的应用移植到鸿蒙平台上。最近,Taro发布了v4.0.0-beta.x版本,支持使用Taro快速开发鸿蒙原生应用,也可将现有的小程序转换为鸿蒙原生应用。在《使用Taro开发鸿蒙原生应用》系列文章中,我们已经介绍了鸿......
  • Python LangChain入门教程 1-使用LangChain和AI对话
    LangChain 是一个用于开发由大型语言模型(LLMs)驱动的应用程序的框架。在使用LangChain框架前,先导入LangChain#这里根据你使用的AI进行引入,我使用的是智谱清言的AIfromlangchain_community.chat_modelsimportChatZhipuAI#这里导入的是消息类型fromlangchain_co......
  • streamlit实现聊天机器人应用,掌握使用Python构建好看web的页面
     第一个可视化的大模型应用。实现一个带有可视化界面的聊天机器人应用,可以将我们之前实现的聊天机器人转化为一个更加直观、用户友好的,我们的第一个可视化的大模型应用。通过使用Streamlit,我们借助st.columns、st.container、st.chat_input和st.chatmessage等streamlitAPl......