首页 > 编程语言 >Python应用指南:高德交通态势数据(二)

Python应用指南:高德交通态势数据(二)

时间:2025-01-07 21:34:12浏览次数:9  
标签:指南 center Python lon lat lng 矩形 rectangles 高德

本篇文章是对上篇内容的一个深化探讨,通过生成多个矩形来实现一定范围的道路交通态势查询,在上一篇文章中,我们详细介绍了如何利用单个矩形区域查询功能来获取特定区域内的实时交通状况。然而,在实际应用中,城市交通网络复杂多变,单一矩形往往难以覆盖广泛的地理范围或满足更精细的数据需求。

因此,本篇文章进一步提出了一个多矩形查询的方法论,通过将一个大的监测区域细分为若干个小矩形,确保每个子矩形都符合高德地图API对于查询区域尺寸的要求(即对角线不超过10公里)。这种方法不仅能够提高数据采集的准确性和完整性,还能有效应对不同规模和形态的城市交通环境,通过定义中心点与总覆盖范围,来科学地划分多个子矩形,从而实现一定范围的道路交通态势查询。

这是上篇文章的的矩形范围内的交通态势查询边界;

我们通过python脚本生成多个相同大小的矩形区域,以实现对指定范围的全面覆盖。这种方式确保每个矩形都符合API的尺寸要求,同时能够系统性地覆盖整个目标区域;

主要功能:

  • 输入中心点坐标和半径,生成一个矩形网格系统,可以指定要生成的层数(默认2层)
  • 每层矩形围绕中心矩形均匀分布,生成可交互的HTML地图文件
  • 打印每个矩形的坐标范围

完整代码#运行环境 Python 3.11

from math import cos, radians
import folium
from folium.plugins import MeasureControl

def generate_rectangle(center_point, radius_km):
    """
    通过中心点和半径生成矩形范围
    """
    try:
        center_lon, center_lat = map(float, center_point.split(','))
        lat_diff = radius_km / 111.0
        lon_diff = radius_km / (111.0 * cos(radians(center_lat)))

        min_lon = center_lon - lon_diff
        min_lat = center_lat - lat_diff
        max_lon = center_lon + lon_diff
        max_lat = center_lat + lat_diff

        rectangle = f"{min_lon:.6f},{min_lat:.6f};{max_lon:.6f},{max_lat:.6f}"

        return {
            'rectangle': rectangle,
            'center': [center_lat, center_lon],
            'bounds': [[min_lat, min_lon], [max_lat, max_lon]],
            'lat_diff': lat_diff,
            'lon_diff': lon_diff
        }

    except Exception as e:
        print(f"生成矩形范围时出错: {str(e)}")
        return None

def generate_surrounding_rectangles(center_point, radius_km, layers=2):
    """
    生成多层矩形的坐标
    layers: 要生成的层数(默认2层)
    """
    center_coords = generate_rectangle(center_point, radius_km)
    if not center_coords:
        return None

    center_lon, center_lat = map(float, center_point.split(','))
    all_rectangles = []

    # 生成每一层的矩形
    for layer in range(1, layers + 1):
        # 当前层的偏移量
        current_lat_diff = center_coords['lat_diff'] * 2 * layer
        current_lon_diff = center_coords['lon_diff'] * 2 * layer

        layer_centers = []

        # 上下各 2*layer+1 个点
        for i in range(-layer, layer + 1):
            layer_centers.append((center_lon + i * center_coords['lon_diff'] * 2,
                                  center_lat + current_lat_diff))  # 上
            layer_centers.append((center_lon + i * center_coords['lon_diff'] * 2,
                                  center_lat - current_lat_diff))  # 下

        # 左右各 2*(layer-1)+1 个点(不包括已添加的角点)
        for i in range(-(layer - 1), layer):
            layer_centers.append((center_lon - current_lon_diff,
                                  center_lat + i * center_coords['lat_diff'] * 2))  # 左
            layer_centers.append((center_lon + current_lon_diff,
                                  center_lat + i * center_coords['lat_diff'] * 2))  # 右

        # 为当前层的所有中心点生成矩形
        layer_rectangles = []
        for i, (lon, lat) in enumerate(layer_centers):
            rect = generate_rectangle(f"{lon},{lat}", radius_km)
            if rect:
                rect['direction'] = f"第{layer}层{i + 1}号"
                rect['layer'] = layer
                layer_rectangles.append(rect)

        all_rectangles.extend(layer_rectangles)

    return center_coords, all_rectangles

def visualize_rectangles(center_point, radius_km, layers=2):
    """
    可视化中心矩形和周围的矩形,并以特定格式打印坐标
    """
    result = generate_surrounding_rectangles(center_point, radius_km, layers)
    if not result:
        return

    center_coords, surrounding_rectangles = result

    # 创建地图
    m = folium.Map(
        location=center_coords['center'],
        zoom_start=12,
        tiles='https://webst02.is.autonavi.com/appmaptile?style=7&x={x}&y={y}&z={z}',
        attr='AutoNavi'
    )

    # 添加中心矩形
    folium.Rectangle(
        bounds=center_coords['bounds'],
        color='red',
        fill=True,
        fillColor='red',
        fillOpacity=0.2,
        popup='中心矩形'
    ).add_to(m)

    # 为每一层设置不同的基础颜色
    base_colors = ['blue', 'green', 'purple', 'orange', 'darkred', 'darkblue',
                   'darkgreen', 'cadetblue', 'lightred', 'lightblue', 'lightgreen',
                   'gray', 'pink', 'beige', 'lightgray', 'black']

    # 添加周围矩形
    for rect in surrounding_rectangles:
        # 根据层数选择颜色
        color = base_colors[(rect['layer'] - 1) % len(base_colors)]

        folium.Rectangle(
            bounds=rect['bounds'],
            color=color,
            fill=True,
            fillColor=color,
            fillOpacity=0.2,
            popup=f"{rect['direction']}"
        ).add_to(m)

        # 添加中心点标记
        folium.Marker(
            rect['center'],
            popup=f"{rect['direction']}中心点",
            icon=folium.Icon(color='green', icon='info-sign')
        ).add_to(m)

    # 添加中心点标记
    folium.Marker(
        center_coords['center'],
        popup='中心点',
        icon=folium.Icon(color='red', icon='info-sign')
    ).add_to(m)

    # 添加测量控件
    m.add_child(MeasureControl())

    # 保存地图
    output_file = 'rectangles_visualization.html'
    m.save(output_file)
    print(f"可视化结果已保存到: {output_file}")

    # 修改后的打印格式
    print("\nrectangles = [")
    # 首先打印中心矩形
    print(f"    '{center_coords['rectangle']}',")
    
    # 然后打印周围的矩形
    for rect in surrounding_rectangles:
        print(f"    '{rect['rectangle']}',")
    print("]")

    return center_coords, surrounding_rectangles

if __name__ == "__main__":
    center_point = "121.446433,31.22321"  # 中心点坐标
    radius = 2.0  # 半径2公里
    layers = 2  # 想要生成的层数

    visualize_rectangles(center_point, radius, layers)

运行结束会打印出所有矩形是左上右下的对角线坐标的一个字典方便直接复制到下一个脚本使用和一个html的可视化地图方便查看覆盖范围是否满足预期;

接下来,我们把上一步生成的坐标字典替换下个脚本的 rectangles = [ ]的部分,通过高德交通态势api开始遍历每一组坐标,并增加了一个随机延时,防止频繁访问,最后把输出结果整合到一个shp中,且保证输出地理坐标系是WGS84坐标系;

主要功能:

  • 获取多个矩形区域的交通数据
  • 将所有数据合并到一个文件中,并增加了一个随机延时
  •  基于 name、direction 和 angle 进行去重
  • 输出一个 shp 文件(去重后的数据、wgs84坐标系)

完整代码#运行环境 Python 3.11

import requests
import geopandas as gpd
import pandas as pd
from shapely.geometry import LineString
from datetime import datetime
import os
import math
import numpy as np
from shapely.ops import transform
from functools import partial
import time
import random

# 坐标转换参数
x_pi = 3.14159265358979324 * 3000.0 / 180.0
pi = 3.1415926535897932384626  # π
a = 6378245.0  # 长半轴
ee = 0.00669342162296594323  # 扁率


def gcj02towgs84(lng, lat):
    """
    GCJ02(火星坐标系)转GPS84
    :param lng:火星坐标系的经度
    :param lat:火星坐标系纬度
    :return:
    """
    if out_of_china(lng, lat):
        return lng, lat
    dlat = transformlat(lng - 105.0, lat - 35.0)
    dlng = transformlng(lng - 105.0, lat - 35.0)
    radlat = lat / 180.0 * pi
    magic = math.sin(radlat)
    magic = 1 - ee * magic * magic
    sqrtmagic = math.sqrt(magic)
    dlat = (dlat * 180.0) / ((a * (1 - ee)) / (magic * sqrtmagic) * pi)
    dlng = (dlng * 180.0) / (a / sqrtmagic * math.cos(radlat) * pi)
    mglat = lat + dlat
    mglng = lng + dlng
    return [lng * 2 - mglng, lat * 2 - mglat]


def transformlat(lng, lat):
    ret = -100.0 + 2.0 * lng + 3.0 * lat + 0.2 * lat * lat + 0.1 * lng * lat + 0.2 * math.sqrt(math.fabs(lng))
    ret += (20.0 * math.sin(6.0 * lng * pi) + 20.0 * math.sin(2.0 * lng * pi)) * 2.0 / 3.0
    ret += (20.0 * math.sin(lat * pi) + 40.0 * math.sin(lat / 3.0 * pi)) * 2.0 / 3.0
    ret += (160.0 * math.sin(lat / 12.0 * pi) + 320 * math.sin(lat * pi / 30.0)) * 2.0 / 3.0
    return ret


def transformlng(lng, lat):
    ret = 300.0 + lng + 2.0 * lat + 0.1 * lng * lng + 0.1 * lng * lat + 0.1 * math.sqrt(math.fabs(lng))
    ret += (20.0 * math.sin(6.0 * lng * pi) + 20.0 * math.sin(2.0 * lng * pi)) * 2.0 / 3.0
    ret += (20.0 * math.sin(lng * pi) + 40.0 * math.sin(lng / 3.0 * pi)) * 2.0 / 3.0
    ret += (150.0 * math.sin(lng / 12.0 * pi) + 300.0 * math.sin(lng / 30.0 * pi)) * 2.0 / 3.0
    return ret


def out_of_china(lng, lat):
    """
    判断是否在国内,不在国内不做偏移
    """
    if lng < 72.004 or lng > 137.8347:
        return True
    if lat < 0.8293 or lat > 55.8271:
        return True
    return False


def transform_geometry(geom):
    """转换几何对象的坐标"""

    def transform_coords(x, y, z=None):
        wgs_x, wgs_y = gcj02towgs84(x, y)
        return wgs_x, wgs_y

    return transform(transform_coords, geom)


def get_traffic_data(rectangle, key, current_time):
    """获取交通数据并返回GeoDataFrame"""
    try:
        # 初始化数据字典
        data = {
            'name': [],
            'status': [],
            'status_desc': [],
            'direction': [],
            'angle': [],
            'speed': [],
            'timestamp': [],
            'date': [],
            'time': [],
            'geometry': []
        }

        # 状态码映射
        status_mapping = {
            "0": "未知",
            "1": "畅通",
            "2": "缓行",
            "3": "拥堵"
        }

        # 构建API请求URL
        url = f'https://restapi.amap.com/v3/traffic/status/rectangle?rectangle={rectangle}&output=json&extensions=all&key={key}&level=6'

        # 发送请求并获取JSON响应
        res = requests.get(url, timeout=10).json()

        # 遍历每条道路数据
        for road in res['trafficinfo']['roads']:
            try:
                polylines = [(float(y[0]), float(y[1])) for y in
                             [x.split(',') for x in road['polyline'].split(';')]]

                # 创建线几何对象
                line = LineString(polylines)

                # 转换为WGS84坐标系
                wgs84_line = transform_geometry(line)

                status = road.get('status', '0')

                data['geometry'].append(wgs84_line)
                data['name'].append(road.get('name', ''))
                data['status'].append(float(status))
                data['status_desc'].append(status_mapping.get(status, '未知'))
                data['direction'].append(road.get('direction', ''))
                data['angle'].append(float(road.get('angle', 0)))
                data['speed'].append(int(road.get('speed', 0)))
                data['timestamp'].append(current_time.strftime("%Y-%m-%d %H:%M:%S"))
                data['date'].append(current_time.strftime("%Y-%m-%d"))
                data['time'].append(current_time.strftime("%H:%M:%S"))
            except Exception as e:
                print(f"处理单条道路数据时出错: {str(e)}")
                continue

        # 创建GeoDataFrame对象
        gdf = gpd.GeoDataFrame(data, geometry='geometry', crs='EPSG:4326')
        gdf['status'] = gdf['status'].astype(np.float64)

        return gdf

    except Exception as e:
        print(f"处理出错: {str(e)}")
        return None


def process_rectangles_sequential(rectangles, key='你的key', output_dir='output'):
    """
    顺序处理多个矩形区域的交通数据,将所有数据保存在同一个时间段的文件中,并进行去重
    增加随机延时,避免请求过于频繁
    """
    # 生成统一的时间戳
    current_time = datetime.now()
    filename = f'traffic_status_{current_time.strftime("%Y%m%d%H%M%S")}_wgs84.shp'
    output_path = os.path.join(output_dir, filename)

    # 用于存储所有区域的数据
    all_gdfs = []

    # 创建输出目录
    os.makedirs(output_dir, exist_ok=True)

    total_rectangles = len(rectangles)
    for i, rectangle in enumerate(rectangles, 1):
        try:
            print(f"开始处理区域 [{i}/{total_rectangles}]: {rectangle}")
            gdf = get_traffic_data(rectangle, key, current_time)
            if gdf is not None:
                all_gdfs.append(gdf)
            print(f"区域 {rectangle} 处理完成")

            # 随机延时,除了最后一个请求
            if i < total_rectangles:
                delay = random.uniform(1, 3)  # 随机 1-3 秒延时
                print(f"等待 {delay:.1f} 秒后继续...")
                time.sleep(delay)

        except Exception as e:
            print(f"处理区域 {rectangle} 时出错: {str(e)}")
            # 发生错误时也添加延时
            time.sleep(random.uniform(2, 4))

    # 合并所有数据并去重
    if all_gdfs:
        print("\n开始合并和去重数据...")
        # 合并所有数据
        combined_gdf = gpd.GeoDataFrame(pd.concat(all_gdfs, ignore_index=True))

        # 去重前的记录数
        before_count = len(combined_gdf)

        # 基于name、direction和angle进行去重
        combined_gdf = combined_gdf.drop_duplicates(
            subset=['name', 'direction', 'angle'],
            keep='first'
        )

        # 去重后的记录数
        after_count = len(combined_gdf)

        # 保存处理后的数据
        combined_gdf.to_file(output_path, encoding='utf-8')
        print(f"已保存所有数据到: {output_path}")
        print(f"去重统计: {before_count} -> {after_count} 条记录 (删除了 {before_count - after_count} 条重复记录)")


if __name__ == "__main__":
    rectangles = [
        '121.425363,31.205192;121.467503,31.241228',
        '121.383215,31.241228;121.425371,31.277264',
        '121.383231,31.169156;121.425355,31.205192',
        '121.425355,31.241228;121.467511,31.277264',
        '121.425371,31.169156;121.467495,31.205192',
   
    ]  # 替换坐标的部分

    process_rectangles_sequential(rectangles) 

生成结果如下,我们获取个整个范围的道路的交通态势情况,生成字段包括name(道路名称)、
status(路况)、direction(方向描述)、angle(车行角度,判断道路正反向使用)、speed(平均速度)status_description(路况综述);

通过status字段可视化结果如下,路况结果分四级(0:未知;1:畅通;2:缓行;3:拥堵);

文章仅用于分享个人学习成果与个人存档之用,分享知识,如有侵权,请联系作者进行删除。所有信息均基于作者的个人理解和经验,不代表任何官方立场或权威解读。

标签:指南,center,Python,lon,lat,lng,矩形,rectangles,高德
From: https://blog.csdn.net/weixin_45812624/article/details/144956319

相关文章

  • Docker:安装 XXL-JOB 分布式调度任务的技术指南
    1、简述XXL-JOB是一个分布式任务调度平台,提供简单易用的任务调度功能。它支持分布式调度、失败重试、任务监控和报警等功能。XXL-JOB采用了服务端与执行器的架构,任务调度在服务端进行,而任务的实际执行则由各个执行器完成。XXL-JOB的核心功能包括:支持分布式任务调度和......
  • python中的队列
    在Python中,队列(Queue)通常使用collections.deque来实现,因其提供了高效的从两端添加和删除元素的操作。队列通常遵循先进先出(FIFO)的原则,也就是最先插入的元素最先被移除。队列的基本操作:append(x):将元素x加入队列的尾部。popleft():移除并返回队列的头部元素。appen......
  • Python3 学习指南与资料分享
    Python3学习资料https://pan.quark.cn/s/4f79eee15bf9Python3学习资料https://pan.quark.cn/s/4f79eee15bf9Python3学习资料https://pan.quark.cn/s/4f79eee15bf9在如今这个科技飞速发展的时代,掌握Python3编程技能无疑为你打开了一扇通往无限可能的大门。无论你是想......
  • 基于Python的大语言模型词嵌入技术
    文章目录一、词嵌入技术概述1.1词嵌入的基本概念1.2词嵌入的主要方法二、使用Python实现词嵌入2.1使用Gensim实现Word2Vec2.2使用GloVe进行词嵌入2.3使用FastText进行词嵌入三、词嵌入在大语言模型中的应用3.1使用Transformers库实现BERT嵌入3.2在大语言模型训......
  • 如何通过Python优化大语言模型的参数效率
    文章目录一、大语言模型参数效率优化的必要性1.1参数效率的重要性1.2优化技术的概述二、Python实现参数优化技术2.1模型压缩2.2模型剪枝2.3知识蒸馏2.4模型量化三、优化技术的技术细节3.1模型压缩技术3.2模型剪枝技术3.3知识蒸馏技术3.4模型量化技术四、参......
  • 2025毕设python游泳馆管理系统程序+论文
    本系统(程序+源码+数据库+调试部署+开发环境)带论文文档1万字以上,文末可获取,系统界面在最后面。系统程序文件列表开题报告内容一、选题背景关于游泳馆管理系统的研究,现有研究主要集中在传统管理模式或者单一功能模块的优化上,专门针对使用Python构建综合多功能游泳馆管理系统......
  • 开启 Python3 学习之旅
    Python3学习资料Python3学习资料Python3学习资料在当今数字化的时代,编程技能愈发成为个人竞争力的关键组成部分,而Python3作为一门备受青睐的编程语言,以其简洁优雅、功能强大的特性,吸引着无数初学者与专业人士投身其中。一、轻松入门:环境搭建与基础了解开启Python3学......
  • Python 模块,包(详解)
    一.引用变量        引用变量:值的传递通常可以分为两种方式,一种是值的传递,一种是引用地址传递,在Python中一般都是用引用地址传递        变量名和对象:变量名(如a)和它指向的对象(如整数5)是分开的。变量名本身没有存储任何数据,它只是指向数据的一个标签(或者......
  • Python数据结构与常用操作方法汇总
     在Python中,数据结构是程序中用来存储、组织和操作数据的基本方式。常见的数据结构有列表(list)、元组(tuple)、字典(dict)、集合(set),每种数据结构有自己特定的操作方法。目录一:列表(list)二: 元组(tuple) 三:字典(dict) 四:集合(set)一:列表(list)  列表是......
  • python毕设 学院图书管理系统的设计与实现程序+论文
    本系统(程序+源码+数据库+调试部署+开发环境)带论文文档1万字以上,文末可获取,系统界面在最后面。系统程序文件列表开题报告内容一、选题背景关于学院图书管理系统的设计与实现这一问题的研究,现有研究主要以商业图书管理系统为主,专门针对学院这种特定环境下的图书管理系统的研......