首页 > 编程问答 >改进 A* 寻路,考虑有临时障碍的替代路线

改进 A* 寻路,考虑有临时障碍的替代路线

时间:2024-07-21 02:55:10浏览次数:11  
标签:python

我正在为导航系统实现 A* 算法,但在处理临时障碍物时遇到了问题。当前设置通过最短路径导航,并在必要时尝试对角线移动。但是,当直接路径被临时障碍物阻挡时,它无法考虑替代路线。

给定一个网格,其中:

A is the starting point,
B, C are waypoints that we have to reach.
M is a temporary obstacle,
D is a valid diagonal move, and
N is a walkable alternative route
0 is non walkable path
1 is walkable path

我希望算法在最短路径被阻挡时考虑替代路线。这是我的网格的直观表示:

[0,0,0,0,0,0,0,0,M,1,1,1,0,0,0]
[0,0,0,0,0,0,0,1,D,0,0,1,0,0,0]
[0,0,0,0,0,1,1,1,0,0,0,B,0,0,0]
[0,0,0,0,C,1,0,0,0,1,1,1,0,0,0]
[0,0,0,0,0,0,0,A,M,N,1,0,0,0,0]
[0,0,0,0,0,0,0,N,N,N,1,0,0,0,0]
[0,0,0,0,0,0,0,0,0,0,0,0,0,0,0]
[0,0,0,0,0,0,0,0,0,0,0,0,0,0,0]
[0,0,0,0,0,0,0,0,0,0,0,0,0,0,0]
[0,0,0,0,0,0,0,0,0,0,0,0,0,0,0]
import heapq
from typing import List, Tuple
import numpy as np
import pyautogui

def is_walkable(pos: Tuple[int, int, int], binary_matrices: List[np.ndarray]) -> bool:
    x, y, z = pos
    if 0 <= z < len(binary_matrices) and 0 <= y < binary_matrices[z].shape[0] and 0 <= x < binary_matrices[z].shape[1]:
        return binary_matrices[z][y, x] == 1
    return False

def get_neighbors(current: Tuple[int, int, int], binary_matrices: List[np.ndarray], use_diagonals: bool = False) -> List[Tuple[int, int, int]]:
    x, y, z = current
    neighbors = []
    directions = [(1, 0), (-1, 0), (0, 1), (0, -1)]  # Cardinal directions: right, left, down, up

    if use_diagonals:
        directions += [(1, 1), (1, -1), (-1, 1), (-1, -1)]  # Diagonal directions

    for dx, dy in directions:
        nx, ny = x + dx, y + dy
        if is_walkable((nx, ny, z), binary_matrices):
            neighbors.append((nx, ny, z))
    return neighbors

def find_path(start: Tuple[int, int, int], end: Tuple[int, int, int], binary_matrices: List[np.ndarray], search_range: int = 5, use_diagonals: bool = False) -> List[Tuple[int, int, int]]:
    def heuristic(a: Tuple[int, int, int], b: Tuple[int, int, int]) -> float:
        return abs(a[0] - b[0]) + abs(a[1] - b[1])  # Manhattan distance

    start_pixel = get_pixel_from_coordinate(start)
    end_pixel = get_pixel_from_coordinate(end)
    z = start[2]

    start_pixel = (*start_pixel, z)
    end_pixel = (*end_pixel, z)

    open_set = []
    heapq.heappush(open_set, (0, start_pixel))
    came_from = {}
    g_score = {start_pixel: 0}
    f_score = {start_pixel: heuristic(start_pixel, end_pixel)}

    print(f"Start pixel: {start_pixel}, End pixel: {end_pixel}")

    attempt_counter = 0
    max_attempts = 30
    used_diagonal = False

    while open_set:
        current = heapq.heappop(open_set)[1]
        print(f"Exploring: {current}")

        if current == end_pixel:
            path_found = True
            break

        neighbors = get_neighbors(current, binary_matrices, use_diagonals)
        for neighbor in neighbors:
            tentative_g_score = g_score[current] + 1
            if neighbor not in g_score or tentative_g_score < g_score[neighbor]:
                came_from[neighbor] = current
                g_score[neighbor] = tentative_g_score
                f_score[neighbor] = tentative_g_score + heuristic(neighbor, end_pixel)
                if neighbor not in [i[1] for i in open_set]:
                    heapq.heappush(open_set, (f_score[neighbor], neighbor))
                    print(f"  Added neighbor: {neighbor}")

        if len(open_set) == 0:
            attempt_counter += 1
            print(attempt_counter)
            if attempt_counter >= max_attempts and not use_diagonals:
                print(f"Attempted {attempt_counter} times without reaching the goal. Switching to diagonal movements.")
                use_diagonals = True
                used_diagonal = True
                attempt_counter = 0
                open_set = []
                heapq.heappush(open_set, (0, start_pixel))
                came_from = {}
                g_score = {start_pixel: 0}
                f_score = {start_pixel: heuristic(start_pixel, end_pixel)}
            else:
                print(f"No progress made after {attempt_counter} attempts. Retrying with the current settings.")
        else:
            attempt_counter = 0
            
    if not path_found:
        print("No path found")
        return []

    path = []
    while current in came_from:
        path.append(get_coordinate_from_pixel(current[:2], current[2]))
        current = came_from[current]
    path.append(start)
    path = list(reversed(path))

    if used_diagonal:
        filtered_path = [path[0]]
        for i in range(1, len(path)):
            dx = path[i][0] - filtered_path[-1][0]
            dy = path[i][1] - filtered_path[-1][1]
            if dx != 0 and dy != 0:
                continue
            filtered_path.append(path[i])
        path = filtered_path
    
    return path

def move_character(current_pos: Tuple[int, int, int], next_pos: Tuple[int, int, int], use_diagonal: bool = False):
    dx = next_pos[0] - current_pos[0]
    dy = next_pos[1] - current_pos[1]

    if use_diagonal:
        if dx > 0 and dy > 0:
            pyautogui.press('num3')
        elif dx > 0 and dy < 0:
            pyautogui.press('num9')
        elif dx < 0 and dy > 0:
            pyautogui.press('num1')
        elif dx < 0 and dy < 0:
            pyautogui.press('num7')
    else:
        if dx > 0:
            pyautogui.press('right')
        elif dx < 0:
            pyautogui.press('left')

        if dy > 0:
            pyautogui.press('down')
        elif dy < 0:
            pyautogui.press('up')

pathfinding.py

def navigate_to_waypoint(target_waypoint: Tuple[int, Tuple[int, int, int]], binary_matrices: List[np.ndarray], timeout: int = CONFIG['timeout']) -> bool:
start_time = time.time()
waypoint_id, waypoint_coords = target_waypoint
max_attempts = CONFIG['max_attempts']
attempt_counter = 0

while True:
    if time.time() - start_time > timeout:
        print(f"Timeout reached for waypoint {waypoint_id}. Moving to next waypoint.")
        return False
    try:
        current_pos = get_absolute_coordinates()
        print(f"Current position (game coordinates): {current_pos}")
        current_pixel = get_pixel_from_coordinate(current_pos)
        print(f"Current position (pixel coordinates): {current_pixel}")

        if current_pos[0] == waypoint_coords[0] and current_pos[1] == waypoint_coords[1] and current_pos[2] == waypoint_coords[2]:
            print(f"Reached waypoint {waypoint_id}: {waypoint_coords}")
            return True
        
        path = find_path(current_pos, waypoint_coords, binary_matrices)
        
        if not path:
            print("No valid path found. Retrying...")
            time.sleep(1)
            continue
        
        next_pos = path[1] if len(path) > 1 else path[0]
        print(f"Moving to next position: {next_pos}")
        move_character(current_pos, next_pos)

        if len(path) > 2 and path[1] != next_pos:
            attempt_counter = 0
        else:
            attempt_counter += 1

        if attempt_counter >= max_attempts:
            print(f"Attempted {attempt_counter} times without reaching the goal.")
            attempt_counter = 0

        time.sleep(0.1)
    except Exception as e:
        print(f"An error occurred: {e}")

我尝试了不同的“启发式邻居探索”。 我想要一个导航系统,如果存在临时障碍,该系统能够采取可步行的替代路线到达航路点。


改进 A* 算法应对临时障碍

的代码已经实现了基本的 A* 算法,并尝试使用对角移动来绕过障碍物。但它无法有效处理临时障碍物的原因是,它只在所有方向都不可行时才切换到对角移动。

以下是如何改进 A* 算法以更好地处理临时障碍物的步骤:

1. 动态障碍物处理:

  • 实时更新地图: 的代码应该能够实时更新地图,以反映临时障碍物的存在。这可以通过定期扫描环境或接收传感器数据来实现。
  • 动态成本计算: 考虑将临时障碍物纳入成本计算中。例如,可以为经过临时障碍物附近的路径增加成本,从而鼓励算法选择更安全的路线。

2. 改进启发式函数:

  • 考虑临时障碍物: 当前的曼哈顿距离启发式函数没有考虑临时障碍物。可以尝试使用更复杂的启发式函数,例如:
    • 距离变换: 预先计算从每个可行走单元格到目标单元格的最短距离,并将临时障碍物考虑在内。
    • 障碍感知启发式: 在计算曼哈顿距离时,将经过临时障碍物附近的路径的成本增加。
  • 多重目标启发式: 对于必须按顺序到达的航路点(B 和 C),可以使用多重目标启发式函数,该函数考虑了到达当前目标和后续目标的成本。

3. 优化搜索策略:

  • 动态切换对角移动: 不要等到所有方向都不可行时才切换到对角移动。如果直线路径被临时障碍物阻挡,则立即考虑对角移动。
  • 路径平滑: 在找到路径后,可以使用路径平滑算法(例如,梯度下降)来消除不必要的转弯,从而创建更自然、更有效的路径。

代码示例:

以下代码示例展示了如何实现动态成本计算和障碍感知启发式函数:

def calculate_cost(current: Tuple[int, int, int], neighbor: Tuple[int, int, int], binary_matrices: List[np.ndarray]) -> float:
    """
    计算从当前单元格移动到相邻单元格的成本。
    如果相邻单元格靠近临时障碍物,则增加成本。
    """
    cost = 1  # 默认成本

    # 检查邻居是否靠近临时障碍物
    if is_near_obstacle(neighbor, binary_matrices):
        cost += 5  # 增加成本

    return cost

def heuristic(a: Tuple[int, int, int], b: Tuple[int, int, int], binary_matrices: List[np.ndarray]) -> float:
    """
    使用曼哈顿距离作为启发式函数,并考虑临时障碍物。
    """
    distance = abs(a[0] - b[0]) + abs(a[1] - b[1])

    # 如果路径经过临时障碍物附近,则增加成本
    if is_near_obstacle(a, binary_matrices) or is_near_obstacle(b, binary_matrices):
        distance += 2

    return distance

def is_near_obstacle(pos: Tuple[int, int, int], binary_matrices: List[np.ndarray], radius: int = 2) -> bool:
    """
    检查给定位置附近是否有临时障碍物。
    """
    x, y, z = pos

    for dx in range(-radius, radius + 1):
        for dy in range(-radius, radius + 1):
            nx, ny = x + dx, y + dy
            if is_walkable((nx, ny, z), binary_matrices) and binary_matrices[z][ny, nx] == 'M':
                return True

    return False

其他建议:

  • 考虑使用更高级的寻路算法,例如 D Lite 或 Anytime A ,它们专门用于处理动态环境。
  • 如果的环境非常复杂或计算资源有限,可以考虑使用路径规划库,例如 ROS Navigation 或 MoveIt。

通过实施这些改进,的 A* 算法将能够更有效地处理临时障碍物,并在导航过程中做出更智能的决策。

标签:python
From: 78767645

相关文章

  • Python 是一种选择性解释语言吗?为什么下面的代码不起作用?
    由于程序是从上到下运行的,为什么下面的代码不执行块中的第一行就直接抛出错误?if5>2:print("TwoislessthanFive!")print("Fiveisgreaterthantwo!")错误:文件“/Users/____/Desktop/Pythonpractise/practise.py”,第3行print("五比二大!")Indentati......
  • 裁剪时间变量 Python Matplotlib Xarray
    我不确定这是否是一个愚蠢的问题,但我想按时间变量剪辑.nc文件。我在xarray中打开了数据集,但以下ds.sel行(之前已运行)仅返回错误。ds=xr.open_dataset('/Users/mia/Desktop/RMP/data/tracking/mcs_tracks_2015_11.nc')selected_days=ds.sel(time=slice('2015-11-22',......
  • 用于匹配两个数据列表中的项目的高效数据结构 - python
    我有两个列表,其中一个列表填充ID,另一个列表填充进程名称。多个进程名称可以共享一个ID。我希望能够创建一个可以使用特定ID的数据结构,然后返回与该ID关联的进程列表。我还希望能够使用特定的进程名称并返回与其连接的ID列表。我知道我可以为此创建一个字典,但是I......
  • 有人可以解决我的代码中的问题吗?而且我无法在我的电脑上安装 nsetools。如何在 python
    从nsetools导入Nseimportpandasaspdnse=Nse()all_stock_codes=nse.get_stock_codes()companies_with_low_pe=[]对于all_stock_codes中的代码:如果代码=='符号':继续尝试:stock_quote=nse.get_quote(代码)pe_ratio=stock_quote.get('priceT......
  • 将 python 脚本的 stdin 重定向到 fifo 会导致 RuntimeError: input():lost sys.stdin
    我有这个python脚本,它的作用是充当服务器,它从重定向到fifo的stdin读取命令:test.py:whileTrue:try:line=input()exceptEOFError:breakprint(f'Received:{line}')在bash中运行命令:mkfifotestfifotest.py<testfifo......
  • Python/Flask mysql 游标:为什么它不起作用?
    fromflaskimportFlaskfromflask_mysqldbimportMySQLapp=Flask(__name__)app.config['MYSQL_HOST']='localhost'app.config['MYSQL_USER']='root'app.config['MYSQL_PASSWORD']='password'a......
  • Python pandas to_csv 导致 OSError: [Errno 22] 参数无效
    我的代码如下:importpandasaspdimportnumpyasnpdf=pd.read_csv("path/to/my/infile.csv")df=df.sort_values(['distance','time'])df.to_csv("path/to/my/outfile.csv")此代码成功从infile.csv(一个3GBcsv文件)读取数据,对其进行排......
  • 从 python 中的字符串列表中提取 def 定义函数的标签
    我想使用Python中的正常def过程创建函数,并将标签分配给从字符串列表中提取的命名空间。如何实现这一点?这个问题的动机:我正在创建一个与sympy兼容的python函数库,供数学家用于符号计算实验。许多函数需要初始化具有相关标签的多个对象的系统,这些标签分别由用户提供的字......
  • 在 Raspberry Pi 4 上使用 Python 从具有 SPI 连接的 MT6816 磁性编码器读取
    我对这个领域完全陌生,并不真正知道自己在做什么并且需要帮助。我正在尝试使用MT681614位磁性编码器通过RaspberryPi的SPI连接读取绝对角度。我有以下问题:在硬件方面,是否只是简单地连接必要的连接(3.3V、MOSI、MISO、SCK、GND、CE01)?对于编码......
  • PythonW 不运行脚本。严重地
    因此,使用Windows10和Python3.6。我创建了一个.py脚本,它可以使用命令pythonmyscript.py在命令提示符下正常运行,但是当我制作该脚本的精确副本并为其赋予扩展名.pyw,并尝试使用pythonw运行它时命令pythonwmyscript.pyw,什么也没有发生......