首页 > 编程语言 >python DeepRacer生成最优路径和速度

python DeepRacer生成最优路径和速度

时间:2022-10-20 10:44:35浏览次数:90  
标签:xi python self track race DeepRacer new 最优 line

DeepRacer生成最优路径和速度

获取赛道数据

从github下载:https://github.com/aws-deepracer-community/deepracer-race-data
raw_data里边就是赛道数据,reinvent 2018:是reinvent_base.npy

生成最优路线

https://github.com/dgnzlz/Capstone_AWS_DeepRacer/blob/master/Compute_Speed_And_Actions/Race-Line-Calculation.ipynb

这个来源与github,通过提供一个跑道,根据K1999论文输出一个最优路径,这个路径通过PERC_WIDTH参数可以调整与边线的距离,同时可以调整迭代次数,迭代次数越多,路径越好,不过执行时间越长。
生成的跑道在脚本目录下的/racelines目录下,会生成一个py文件,和一个npy文件

import glob
import numpy as np
from shapely.geometry import Point, Polygon
from shapely.geometry.polygon import LinearRing, LineString
import pandas as pd
import matplotlib.pyplot as plt
import os.path
import copy
from datetime import datetime

class Calculate_Race_Line:

    def __init__(self, track_file):
        self.waypoints = None
        self.waypoints_new = None
        self.final_race_line = None
        self.track_file = track_file
        (filepath,tempfilename) = os.path.split(track_file)
        (filename,extension) = os.path.splitext(tempfilename)
        self.track_name = filename

    def dist_2_points(self, x1, x2, y1, y2):
        return abs(abs(x1 - x2) ** 2 + abs(y1 - y2) ** 2) ** 0.5

    def plot_coords(self, ax, ob):
        x, y = ob.xy
        ax.plot(x, y, '.', color='#999999', zorder=1)

    def plot_bounds(self, ax, ob):
        x, y = zip(*list((p.x, p.y) for p in ob.boundary))
        ax.plot(x, y, '.', color='#000000', zorder=1)

    def plot_line(self, ax, ob):
        x, y = ob.xy
        ax.plot(x, y, color='cyan', alpha=0.7, linewidth=3, solid_capstyle='round', zorder=2)

    def print_border(self, ax, waypoints, inner_border_waypoints, outer_border_waypoints):
        line = LineString(waypoints)
        self.plot_coords(ax, line)
        self.plot_line(ax, line)

        line = LineString(inner_border_waypoints)
        self.plot_coords(ax, line)
        self.plot_line(ax, line)

        line = LineString(outer_border_waypoints)
        self.plot_coords(ax, line)
        self.plot_line(ax, line)

    def get_track_data(self):
        self.waypoints = np.load("../%s.npy" % self.track_name)

    # 减少赛道宽度
    def compress_track_width(self, perc_width):
        new_waypoint = []
        for waypoint in self.waypoints:
            center_x, center_y, inner_x, inner_y, outer_x, outer_y = waypoint

            width = self.dist_2_points(inner_x, outer_x, inner_y, outer_y)

            delta_x = outer_x - inner_x
            delta_y = outer_y - inner_y

            inner_x_new = inner_x + delta_x / 2 * (1 - perc_width)
            outer_x_new = outer_x - delta_x / 2 * (1 - perc_width)
            inner_y_new = inner_y + delta_y / 2 * (1 - perc_width)
            outer_y_new = outer_y - delta_y / 2 * (1 - perc_width)

            new_waypoint.append([center_x, center_y, inner_x_new, inner_y_new, outer_x_new, outer_y_new])
            self.waypoints_new = np.asarray(new_waypoint)



    # 生成最优轨道,的两个函数,根据K1999论文
    def menger_curvature(self, pt1, pt2, pt3, atol=1e-3):

        vec21 = np.array([pt1[0]-pt2[0], pt1[1]-pt2[1]])
        vec23 = np.array([pt3[0]-pt2[0], pt3[1]-pt2[1]])

        norm21 = np.linalg.norm(vec21)
        norm23 = np.linalg.norm(vec23)

        theta = np.arccos(np.dot(vec21, vec23)/(norm21*norm23))
        if np.isclose(theta-np.pi, 0.0, atol=atol):
            theta = 0.0

        dist13 = np.linalg.norm(vec21-vec23)

        return 2*np.sin(theta) / dist13


    # Number of times to iterate each new race line point
    # keep this at 3-8 for best balance of performance and desired result
    # XI_ITERATIONS=8 # default 4

    # Number of times to scan the entire race track to iterate
    # 500 will get a good start, 1500 will be closer to optimal result
    # LINE_ITERATIONS=500 # default 1000

    def improve_race_line(self, old_line, inner_border, outer_border, XI_ITERATIONS=8):
        '''Use gradient descent, inspired by K1999, to find the racing line'''
        # start with the center line
        new_line = copy.deepcopy(old_line)
        ls_inner_border = Polygon(inner_border)
        ls_outer_border = Polygon(outer_border)
        for i in range(0,len(new_line)):
            xi = new_line[i]
            npoints = len(new_line)
            prevprev = (i - 2 + npoints) % npoints
            prev = (i - 1 + npoints) % npoints
            nexxt = (i + 1 + npoints) % npoints
            nexxtnexxt = (i + 2 + npoints) % npoints
            #print("%d: %d %d %d %d %d" % (npoints, prevprev, prev, i, nexxt, nexxtnexxt))
            ci = self.menger_curvature(new_line[prev], xi, new_line[nexxt])
            c1 = self.menger_curvature(new_line[prevprev], new_line[prev], xi)
            c2 = self.menger_curvature(xi, new_line[nexxt], new_line[nexxtnexxt])
            target_ci = (c1 + c2) / 2
            #print("i %d ci %f target_ci %f c1 %f c2 %f" % (i, ci, target_ci, c1, c2))

            # Calculate prospective new track position, start at half-way (curvature zero)
            xi_bound1 = copy.deepcopy(xi)
            xi_bound2 = ((new_line[nexxt][0] + new_line[prev][0]) / 2.0, (new_line[nexxt][1] + new_line[prev][1]) / 2.0)
            p_xi = copy.deepcopy(xi)
            for j in range(0,XI_ITERATIONS):
                p_ci = self.menger_curvature(new_line[prev], p_xi, new_line[nexxt])
                #print("i: {} iter {} p_ci {} p_xi {} b1 {} b2 {}".format(i,j,p_ci,p_xi,xi_bound1, xi_bound2))
                if np.isclose(p_ci, target_ci):
                    break
                if p_ci < target_ci:
                    # too flat, shrinking track too much
                    xi_bound2 = copy.deepcopy(p_xi)
                    new_p_xi = ((xi_bound1[0] + p_xi[0]) / 2.0, (xi_bound1[1] + p_xi[1]) / 2.0)
                    if Point(new_p_xi).within(ls_inner_border) or not Point(new_p_xi).within(ls_outer_border):
                        xi_bound1 = copy.deepcopy(new_p_xi)
                    else:
                        p_xi = new_p_xi
                else:
                    # too curved, flatten it out
                    xi_bound1 = copy.deepcopy(p_xi)
                    new_p_xi = ((xi_bound2[0] + p_xi[0]) / 2.0, (xi_bound2[1] + p_xi[1]) / 2.0)

                    # If iteration pushes the point beyond the border of the track,
                    # just abandon the refinement at this point.  As adjacent
                    # points are adjusted within the track the point should gradually
                    # make its way to a new position.  A better way would be to use
                    # a projection of the point on the border as the new bound.  Later.
                    if Point(new_p_xi).within(ls_inner_border) or not Point(new_p_xi).within(ls_outer_border):
                        xi_bound2 = copy.deepcopy(new_p_xi)
                    else:
                        p_xi = new_p_xi
            new_xi = p_xi
            # New point which has mid-curvature of prev and next points but may be outside of track
            #print((new_line[i], new_xi))
            new_line[i] = new_xi
        return new_line

    # 调用函数生成最优轨道,迭代
    def calcu_race_line(self, PERC_WIDTH=0.8, XI_ITERATIONS=8, LINE_ITERATIONS=1500):
        self.get_track_data()
        self.compress_track_width(PERC_WIDTH)

        center_line = self.waypoints[:,0:2]
        inner_border = self.waypoints[:,2:4]
        outer_border = self.waypoints[:,4:6]
        inner_border_new = self.waypoints_new[:,2:4]
        outer_border_new = self.waypoints_new[:,4:6]
        l_center_line = LineString(center_line)

        race_line = copy.deepcopy(center_line[:-1])  # Use this for centerline being outer bound

        for i in range(LINE_ITERATIONS):
            if i % 20 == 0: print("Iteration %d" % i)
            race_line = self.improve_race_line(race_line, inner_border_new, outer_border_new, XI_ITERATIONS)  # Remove "_new" for entire track width

        self.final_race_line = np.append(race_line, [race_line[0]], axis=0)

        print("These should be the same: ", (center_line.shape, self.final_race_line.shape))
        print("Original centerline length: %0.2f" % l_center_line.length)
        print("New race line length: %0.2f" % LineString(self.final_race_line).length)

        if not os.path.exists('./racelines'):
            os.makedirs('./racelines')

        prefix = './racelines/%s-%d-%d-%.1f-%.2f' % (self.track_name, LINE_ITERATIONS, XI_ITERATIONS, PERC_WIDTH, LineString(self.final_race_line).length)
        py_fname = prefix + '.py'
        npy_fname = prefix + '.npy'
        with open(py_fname, "w") as file:
            print("Writing python code to %s" % py_fname)
            file.write(np.array_repr(self.final_race_line))

        print("Writing numpy binary to %s" % npy_fname)
        np.save(npy_fname, self.final_race_line)

        fig = plt.figure(1, figsize=(16, 10))
        ax = fig.add_subplot(111, facecolor='black')
        plt.axis('equal')
        self.print_border(ax, center_line, inner_border, outer_border)
        self.print_border(ax, self.final_race_line, inner_border_new, outer_border_new)
        plt.show()


if __name__ == "__main__":
    c = Calculate_Race_Line('./reinvent_base.npy')
    c.calcu_race_line(0.9, 12, 3000)

生成路线速度

https://github.com/dgnzlz/Capstone_AWS_DeepRacer/blob/master/Compute_Speed_And_Actions/RaceLine_Speed_ActionSpace.ipynb
同样来源于github,输入一个路线,和最大速度和最小速度就能输出各个点的最优速度,当然所谓的最优还得基于,车能驾驭这个速度才行。

import os
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
import glob

class Calculate_Speed:

    def __init__(self, race_line_file):
        self.race_line_file = race_line_file
        (filepath, tempfilename) = os.path.split(race_line_file)
        (filename, extension) = os.path.splitext(tempfilename)
        self.race_line_name = filename

        self.racing_track = np.load(race_line_file)
        self.racing_track = self.racing_track.tolist()[:-1]
        self.velocity = None
        self.total_time = None

    # Uses previous and next coords to calculate the radius of the curve
    # so you need to pass a list with form [[x1,y1],[x2,y2],[x3,y3]]
    # Input 3 coords [[x1,y1],[x2,y2],[x3,y3]]
    def circle_radius(self, coords):

        # Flatten the list and assign to variables (makes code easier to read later)
        x1, y1, x2, y2, x3, y3 = [i for sub in coords for i in sub]

        a = x1*(y2-y3) - y1*(x2-x3) + x2*y3 - x3*y2
        b = (x1**2+y1**2)*(y3-y2) + (x2**2+y2**2)*(y1-y3) + (x3**2+y3**2)*(y2-y1)
        c = (x1**2+y1**2)*(x2-x3) + (x2**2+y2**2)*(x3-x1) + (x3**2+y3**2)*(x1-x2)
        d = (x1**2+y1**2)*(x3*y2-x2*y3) + (x2**2+y2**2) * \
            (x1*y3-x3*y1) + (x3**2+y3**2)*(x2*y1-x1*y2)

        # In case a is zero (so radius is infinity)
        try:
            r = abs((b**2+c**2-4*a*d) / abs(4*a**2)) ** 0.5
        except:
            r = 999

        return r


    # Returns indexes of next index and index+lookfront
    # We need this to calculate the radius for next track section.
    def circle_indexes(self, mylist, index_car, add_index_1=0, add_index_2=0):

        list_len = len(mylist)

        # if index >= list_len:
        #     raise ValueError("Index out of range in circle_indexes()")

        # Use modulo to consider that track is cyclical
        index_1 = (index_car + add_index_1) % list_len
        index_2 = (index_car + add_index_2) % list_len

        return [index_car, index_1, index_2]


    def optimal_velocity(self, track, min_speed, max_speed, look_ahead_points):

        # Calculate the radius for every point of the track
        radius = []
        for i in range(len(track)):
            indexes = self.circle_indexes(track, i, add_index_1=-1, add_index_2=1)
            coords = [track[indexes[0]],
                      track[indexes[1]], track[indexes[2]]]
            radius.append(self.circle_radius(coords))

        # Get the max_velocity for the smallest radius
        # That value should multiplied by a constant multiple
        v_min_r = min(radius)**0.5
        constant_multiple = min_speed / v_min_r
        print(f"Constant multiple for optimal speed: {constant_multiple}")

        if look_ahead_points == 0:
            # Get the maximal velocity from radius
            max_velocity = [(constant_multiple * i**0.5) for i in radius]
            # Get velocity from max_velocity (cap at MAX_SPEED)
            velocity = [min(v, max_speed) for v in max_velocity]
            return velocity

        else:
            # Looks at the next n radii of points and takes the minimum
            # goal: reduce lookahead until car crashes bc no time to break
            LOOK_AHEAD_POINTS = look_ahead_points
            radius_lookahead = []
            for i in range(len(radius)):
                next_n_radius = []
                for j in range(LOOK_AHEAD_POINTS+1):
                    index = self.circle_indexes(
                        mylist=radius, index_car=i, add_index_1=j)[1]
                    next_n_radius.append(radius[index])
                radius_lookahead.append(min(next_n_radius))
            max_velocity_lookahead = [(constant_multiple * i**0.5)
                                      for i in radius_lookahead]
            velocity_lookahead = [min(v, max_speed)
                                  for v in max_velocity_lookahead]
            return velocity_lookahead


    # For each point in racing track, check if left curve (returns boolean)
    def is_left_curve(self, coords):

        # Flatten the list and assign to variables (makes code easier to read later)
        x1, y1, x2, y2, x3, y3 = [i for sub in coords for i in sub]

        return ((x2-x1)*(y3-y1) - (y2-y1)*(x3-x1)) > 0


    # Calculate the distance between 2 points
    def dist_2_points(self, x1, x2, y1, y2):
            return abs(abs(x1-x2)**2 + abs(y1-y2)**2)**0.5

    def calc_speed(self, min_speed=1.3, max_speed=4, look_ahead_points=5):
        self.velocity = self.optimal_velocity(self.racing_track, min_speed, max_speed, look_ahead_points)

        #for i in range(len(self.velocity)):
        #    if self.velocity[i] > 4:
        #        self.velocity[i] = 4

        # 计算各个点直接需要的时间,得到总时间
        distance_to_prev = []
        for i in range(len(self.racing_track)):
            indexes = self.circle_indexes(self.racing_track, i, add_index_1=-1, add_index_2=0)[0:2]
            coords = [self.racing_track[indexes[0]], self.racing_track[indexes[1]]]
            dist_to_prev = self.dist_2_points(x1=coords[0][0], x2=coords[1][0], y1=coords[0][1], y2=coords[1][1])
            distance_to_prev.append(dist_to_prev)

        time_to_prev = [(distance_to_prev[i] / self.velocity[i]) for i in range(len(self.racing_track))]

        self.total_time = sum(time_to_prev)
        print(f"Total time for track, if racing line and speeds are followed perfectly: {self.total_time} s")

        # 把数据写入到文件中
        # Now we have list with columns (x,y,speed,distance,time)
        racing_track_everything = []
        for i in range(len(self.racing_track)):
            racing_track_everything.append([self.racing_track[i][0],
                                            self.racing_track[i][1],
                                            self.velocity[i],
                                            time_to_prev[i]])
        # Round to 5 decimals
        racing_track_everything = np.around(racing_track_everything, 5).tolist()
        racing_track_everything = np.array(racing_track_everything)

        if not os.path.exists('./track_with_speed'):
            os.makedirs('./track_with_speed')

        prefix = './track_with_speed/%s_%.1f-%.1f-%d-%.2f' % (self.race_line_name, min_speed, max_speed, look_ahead_points, self.total_time)
        py_fname = prefix + '.py'
        npy_fname = prefix + '.npy'
        with open(py_fname, "w") as file:
            print("Writing python code to %s" % py_fname)
            file.write(np.array_repr(racing_track_everything))

        print("Writing numpy binary to %s" % npy_fname)
        np.save(npy_fname, racing_track_everything)


def get_raceline_files():
    racing_lines = np.array([])
    available_track_files = glob.glob("./racelines/**.npy")
    available_track_names = list(map(lambda x: os.path.basename(x).split('.npy')[0], available_track_files))
    for track_name in available_track_names:
        racing_lines = np.append(racing_lines, './racelines/' + track_name + '.npy')

    return racing_lines


if __name__ == '__main__':
    min_speed = 1.2
    max_speed = 4
    look_ahead_points = 0

    race_line_files = get_raceline_files()
    plt.figure(figsize=(8, 5))
    for race_line_file in race_line_files:
        c = Calculate_Speed(race_line_file)
        c.calc_speed(min_speed, max_speed, look_ahead_points)

        x = [i[0] for i in c.racing_track]
        y = [i[1] for i in c.racing_track]
        plt.scatter(x, y, s=2, c=c.velocity, label=c.race_line_name+'__'+str(c.total_time))
        plt.legend(loc=0)
    plt.colorbar()
    plt.show()

img

标签:xi,python,self,track,race,DeepRacer,new,最优,line
From: https://www.cnblogs.com/yuandonghua/p/16779276.html

相关文章

  • python-opencv cv.imshow 错误
    本文平台windows报错信息: cv2.imshow('imshow',img))Thefunctionisnotimplemented.RebuildthelibrarywithWindows,GTK+2.xorCocoasupport.Ifyouare......
  • python实验报告(第六周)
    一、实验目的1.掌握如何创建并调用一个函数,以及如何进行参数传递和指定函数的返回值等。2.掌握变量的作用域和匿名函数。二、实验环境python版本:3.10(64-bit)三、实验内......
  • python使用selenium操作浏览器
    重复的操作令手工测试苦不堪言,于是自动化测试出现了!作为web应用里最出名的自动化测试工具,selenium让web应用的测试轻松了很多。今天我们就来简单的介绍一下一些简单的sele......
  • DAPR in Python
    最近在部署项目时,用到了DAPR,记录一下。一、安装     本地安装  https://docs.dapr.io/zh-hans/getting-started/install-dapr-cli/      docker内安......
  • 二、Python入门
    一、第一句Python代码编辑/scripts目录下编辑hello.py输入1print("hello,word")执行hello.py 即python3/scripts/hello.pypython内部执行过程如下: ......
  • 【Python】判断列表或字典中是否存在key值
    第一种in方法,即列出所有key值查询是否在里面a={"name":"1","value":"2"}if"name"ina.keys():print("存在")else:print("不存......
  • Python3中Super函数的使用
    Super函数用法主要用于调用父类函数代码演示classA:def__init__(self):self.n=2print('此时执行A的自定义函数,self的n值为',self.n)d......
  • 盘点一个Python自动化办公的实战案例(word文件处理)
    大家好,我是Python进阶者。一、前言前几天在Python铂金交流群【JethroShen】问了一个Python自动化办公的问题,提问截图如下:代码运行后的结果:他预期的效果是选项和答案......
  • python对列表里的元组,列表,字典进行排序
    python对列表里的元组,列表,字典进行排序其实很简单就是利用列表对象自带的sort方法和方法内的key关键字参数和lambda匿名函数搭配即可。若有:L=[('b',6),('a',1),('c',3)......
  • 6、最后一篇,小白看的Python基础教程,详细得很
    13、Python标准库Python标准库是随Pthon附带安装的,包含了大量极其有用的模块。我们主要了解下sys和os就够了。13.1sys模块sys模块主要是针对与Python解释器相关的变......