DeepRacer生成最优路径和速度
获取赛道数据
从github下载:https://github.com/aws-deepracer-community/deepracer-race-data
raw_data里边就是赛道数据,reinvent 2018:是reinvent_base.npy
生成最优路线
这个来源与github,通过提供一个跑道,根据K1999论文输出一个最优路径,这个路径通过PERC_WIDTH参数可以调整与边线的距离,同时可以调整迭代次数,迭代次数越多,路径越好,不过执行时间越长。
生成的跑道在脚本目录下的/racelines目录下,会生成一个py文件,和一个npy文件
import glob
import numpy as np
from shapely.geometry import Point, Polygon
from shapely.geometry.polygon import LinearRing, LineString
import pandas as pd
import matplotlib.pyplot as plt
import os.path
import copy
from datetime import datetime
class Calculate_Race_Line:
def __init__(self, track_file):
self.waypoints = None
self.waypoints_new = None
self.final_race_line = None
self.track_file = track_file
(filepath,tempfilename) = os.path.split(track_file)
(filename,extension) = os.path.splitext(tempfilename)
self.track_name = filename
def dist_2_points(self, x1, x2, y1, y2):
return abs(abs(x1 - x2) ** 2 + abs(y1 - y2) ** 2) ** 0.5
def plot_coords(self, ax, ob):
x, y = ob.xy
ax.plot(x, y, '.', color='#999999', zorder=1)
def plot_bounds(self, ax, ob):
x, y = zip(*list((p.x, p.y) for p in ob.boundary))
ax.plot(x, y, '.', color='#000000', zorder=1)
def plot_line(self, ax, ob):
x, y = ob.xy
ax.plot(x, y, color='cyan', alpha=0.7, linewidth=3, solid_capstyle='round', zorder=2)
def print_border(self, ax, waypoints, inner_border_waypoints, outer_border_waypoints):
line = LineString(waypoints)
self.plot_coords(ax, line)
self.plot_line(ax, line)
line = LineString(inner_border_waypoints)
self.plot_coords(ax, line)
self.plot_line(ax, line)
line = LineString(outer_border_waypoints)
self.plot_coords(ax, line)
self.plot_line(ax, line)
def get_track_data(self):
self.waypoints = np.load("../%s.npy" % self.track_name)
# 减少赛道宽度
def compress_track_width(self, perc_width):
new_waypoint = []
for waypoint in self.waypoints:
center_x, center_y, inner_x, inner_y, outer_x, outer_y = waypoint
width = self.dist_2_points(inner_x, outer_x, inner_y, outer_y)
delta_x = outer_x - inner_x
delta_y = outer_y - inner_y
inner_x_new = inner_x + delta_x / 2 * (1 - perc_width)
outer_x_new = outer_x - delta_x / 2 * (1 - perc_width)
inner_y_new = inner_y + delta_y / 2 * (1 - perc_width)
outer_y_new = outer_y - delta_y / 2 * (1 - perc_width)
new_waypoint.append([center_x, center_y, inner_x_new, inner_y_new, outer_x_new, outer_y_new])
self.waypoints_new = np.asarray(new_waypoint)
# 生成最优轨道,的两个函数,根据K1999论文
def menger_curvature(self, pt1, pt2, pt3, atol=1e-3):
vec21 = np.array([pt1[0]-pt2[0], pt1[1]-pt2[1]])
vec23 = np.array([pt3[0]-pt2[0], pt3[1]-pt2[1]])
norm21 = np.linalg.norm(vec21)
norm23 = np.linalg.norm(vec23)
theta = np.arccos(np.dot(vec21, vec23)/(norm21*norm23))
if np.isclose(theta-np.pi, 0.0, atol=atol):
theta = 0.0
dist13 = np.linalg.norm(vec21-vec23)
return 2*np.sin(theta) / dist13
# Number of times to iterate each new race line point
# keep this at 3-8 for best balance of performance and desired result
# XI_ITERATIONS=8 # default 4
# Number of times to scan the entire race track to iterate
# 500 will get a good start, 1500 will be closer to optimal result
# LINE_ITERATIONS=500 # default 1000
def improve_race_line(self, old_line, inner_border, outer_border, XI_ITERATIONS=8):
'''Use gradient descent, inspired by K1999, to find the racing line'''
# start with the center line
new_line = copy.deepcopy(old_line)
ls_inner_border = Polygon(inner_border)
ls_outer_border = Polygon(outer_border)
for i in range(0,len(new_line)):
xi = new_line[i]
npoints = len(new_line)
prevprev = (i - 2 + npoints) % npoints
prev = (i - 1 + npoints) % npoints
nexxt = (i + 1 + npoints) % npoints
nexxtnexxt = (i + 2 + npoints) % npoints
#print("%d: %d %d %d %d %d" % (npoints, prevprev, prev, i, nexxt, nexxtnexxt))
ci = self.menger_curvature(new_line[prev], xi, new_line[nexxt])
c1 = self.menger_curvature(new_line[prevprev], new_line[prev], xi)
c2 = self.menger_curvature(xi, new_line[nexxt], new_line[nexxtnexxt])
target_ci = (c1 + c2) / 2
#print("i %d ci %f target_ci %f c1 %f c2 %f" % (i, ci, target_ci, c1, c2))
# Calculate prospective new track position, start at half-way (curvature zero)
xi_bound1 = copy.deepcopy(xi)
xi_bound2 = ((new_line[nexxt][0] + new_line[prev][0]) / 2.0, (new_line[nexxt][1] + new_line[prev][1]) / 2.0)
p_xi = copy.deepcopy(xi)
for j in range(0,XI_ITERATIONS):
p_ci = self.menger_curvature(new_line[prev], p_xi, new_line[nexxt])
#print("i: {} iter {} p_ci {} p_xi {} b1 {} b2 {}".format(i,j,p_ci,p_xi,xi_bound1, xi_bound2))
if np.isclose(p_ci, target_ci):
break
if p_ci < target_ci:
# too flat, shrinking track too much
xi_bound2 = copy.deepcopy(p_xi)
new_p_xi = ((xi_bound1[0] + p_xi[0]) / 2.0, (xi_bound1[1] + p_xi[1]) / 2.0)
if Point(new_p_xi).within(ls_inner_border) or not Point(new_p_xi).within(ls_outer_border):
xi_bound1 = copy.deepcopy(new_p_xi)
else:
p_xi = new_p_xi
else:
# too curved, flatten it out
xi_bound1 = copy.deepcopy(p_xi)
new_p_xi = ((xi_bound2[0] + p_xi[0]) / 2.0, (xi_bound2[1] + p_xi[1]) / 2.0)
# If iteration pushes the point beyond the border of the track,
# just abandon the refinement at this point. As adjacent
# points are adjusted within the track the point should gradually
# make its way to a new position. A better way would be to use
# a projection of the point on the border as the new bound. Later.
if Point(new_p_xi).within(ls_inner_border) or not Point(new_p_xi).within(ls_outer_border):
xi_bound2 = copy.deepcopy(new_p_xi)
else:
p_xi = new_p_xi
new_xi = p_xi
# New point which has mid-curvature of prev and next points but may be outside of track
#print((new_line[i], new_xi))
new_line[i] = new_xi
return new_line
# 调用函数生成最优轨道,迭代
def calcu_race_line(self, PERC_WIDTH=0.8, XI_ITERATIONS=8, LINE_ITERATIONS=1500):
self.get_track_data()
self.compress_track_width(PERC_WIDTH)
center_line = self.waypoints[:,0:2]
inner_border = self.waypoints[:,2:4]
outer_border = self.waypoints[:,4:6]
inner_border_new = self.waypoints_new[:,2:4]
outer_border_new = self.waypoints_new[:,4:6]
l_center_line = LineString(center_line)
race_line = copy.deepcopy(center_line[:-1]) # Use this for centerline being outer bound
for i in range(LINE_ITERATIONS):
if i % 20 == 0: print("Iteration %d" % i)
race_line = self.improve_race_line(race_line, inner_border_new, outer_border_new, XI_ITERATIONS) # Remove "_new" for entire track width
self.final_race_line = np.append(race_line, [race_line[0]], axis=0)
print("These should be the same: ", (center_line.shape, self.final_race_line.shape))
print("Original centerline length: %0.2f" % l_center_line.length)
print("New race line length: %0.2f" % LineString(self.final_race_line).length)
if not os.path.exists('./racelines'):
os.makedirs('./racelines')
prefix = './racelines/%s-%d-%d-%.1f-%.2f' % (self.track_name, LINE_ITERATIONS, XI_ITERATIONS, PERC_WIDTH, LineString(self.final_race_line).length)
py_fname = prefix + '.py'
npy_fname = prefix + '.npy'
with open(py_fname, "w") as file:
print("Writing python code to %s" % py_fname)
file.write(np.array_repr(self.final_race_line))
print("Writing numpy binary to %s" % npy_fname)
np.save(npy_fname, self.final_race_line)
fig = plt.figure(1, figsize=(16, 10))
ax = fig.add_subplot(111, facecolor='black')
plt.axis('equal')
self.print_border(ax, center_line, inner_border, outer_border)
self.print_border(ax, self.final_race_line, inner_border_new, outer_border_new)
plt.show()
if __name__ == "__main__":
c = Calculate_Race_Line('./reinvent_base.npy')
c.calcu_race_line(0.9, 12, 3000)
生成路线速度
https://github.com/dgnzlz/Capstone_AWS_DeepRacer/blob/master/Compute_Speed_And_Actions/RaceLine_Speed_ActionSpace.ipynb
同样来源于github,输入一个路线,和最大速度和最小速度就能输出各个点的最优速度,当然所谓的最优还得基于,车能驾驭这个速度才行。
import os
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
import glob
class Calculate_Speed:
def __init__(self, race_line_file):
self.race_line_file = race_line_file
(filepath, tempfilename) = os.path.split(race_line_file)
(filename, extension) = os.path.splitext(tempfilename)
self.race_line_name = filename
self.racing_track = np.load(race_line_file)
self.racing_track = self.racing_track.tolist()[:-1]
self.velocity = None
self.total_time = None
# Uses previous and next coords to calculate the radius of the curve
# so you need to pass a list with form [[x1,y1],[x2,y2],[x3,y3]]
# Input 3 coords [[x1,y1],[x2,y2],[x3,y3]]
def circle_radius(self, coords):
# Flatten the list and assign to variables (makes code easier to read later)
x1, y1, x2, y2, x3, y3 = [i for sub in coords for i in sub]
a = x1*(y2-y3) - y1*(x2-x3) + x2*y3 - x3*y2
b = (x1**2+y1**2)*(y3-y2) + (x2**2+y2**2)*(y1-y3) + (x3**2+y3**2)*(y2-y1)
c = (x1**2+y1**2)*(x2-x3) + (x2**2+y2**2)*(x3-x1) + (x3**2+y3**2)*(x1-x2)
d = (x1**2+y1**2)*(x3*y2-x2*y3) + (x2**2+y2**2) * \
(x1*y3-x3*y1) + (x3**2+y3**2)*(x2*y1-x1*y2)
# In case a is zero (so radius is infinity)
try:
r = abs((b**2+c**2-4*a*d) / abs(4*a**2)) ** 0.5
except:
r = 999
return r
# Returns indexes of next index and index+lookfront
# We need this to calculate the radius for next track section.
def circle_indexes(self, mylist, index_car, add_index_1=0, add_index_2=0):
list_len = len(mylist)
# if index >= list_len:
# raise ValueError("Index out of range in circle_indexes()")
# Use modulo to consider that track is cyclical
index_1 = (index_car + add_index_1) % list_len
index_2 = (index_car + add_index_2) % list_len
return [index_car, index_1, index_2]
def optimal_velocity(self, track, min_speed, max_speed, look_ahead_points):
# Calculate the radius for every point of the track
radius = []
for i in range(len(track)):
indexes = self.circle_indexes(track, i, add_index_1=-1, add_index_2=1)
coords = [track[indexes[0]],
track[indexes[1]], track[indexes[2]]]
radius.append(self.circle_radius(coords))
# Get the max_velocity for the smallest radius
# That value should multiplied by a constant multiple
v_min_r = min(radius)**0.5
constant_multiple = min_speed / v_min_r
print(f"Constant multiple for optimal speed: {constant_multiple}")
if look_ahead_points == 0:
# Get the maximal velocity from radius
max_velocity = [(constant_multiple * i**0.5) for i in radius]
# Get velocity from max_velocity (cap at MAX_SPEED)
velocity = [min(v, max_speed) for v in max_velocity]
return velocity
else:
# Looks at the next n radii of points and takes the minimum
# goal: reduce lookahead until car crashes bc no time to break
LOOK_AHEAD_POINTS = look_ahead_points
radius_lookahead = []
for i in range(len(radius)):
next_n_radius = []
for j in range(LOOK_AHEAD_POINTS+1):
index = self.circle_indexes(
mylist=radius, index_car=i, add_index_1=j)[1]
next_n_radius.append(radius[index])
radius_lookahead.append(min(next_n_radius))
max_velocity_lookahead = [(constant_multiple * i**0.5)
for i in radius_lookahead]
velocity_lookahead = [min(v, max_speed)
for v in max_velocity_lookahead]
return velocity_lookahead
# For each point in racing track, check if left curve (returns boolean)
def is_left_curve(self, coords):
# Flatten the list and assign to variables (makes code easier to read later)
x1, y1, x2, y2, x3, y3 = [i for sub in coords for i in sub]
return ((x2-x1)*(y3-y1) - (y2-y1)*(x3-x1)) > 0
# Calculate the distance between 2 points
def dist_2_points(self, x1, x2, y1, y2):
return abs(abs(x1-x2)**2 + abs(y1-y2)**2)**0.5
def calc_speed(self, min_speed=1.3, max_speed=4, look_ahead_points=5):
self.velocity = self.optimal_velocity(self.racing_track, min_speed, max_speed, look_ahead_points)
#for i in range(len(self.velocity)):
# if self.velocity[i] > 4:
# self.velocity[i] = 4
# 计算各个点直接需要的时间,得到总时间
distance_to_prev = []
for i in range(len(self.racing_track)):
indexes = self.circle_indexes(self.racing_track, i, add_index_1=-1, add_index_2=0)[0:2]
coords = [self.racing_track[indexes[0]], self.racing_track[indexes[1]]]
dist_to_prev = self.dist_2_points(x1=coords[0][0], x2=coords[1][0], y1=coords[0][1], y2=coords[1][1])
distance_to_prev.append(dist_to_prev)
time_to_prev = [(distance_to_prev[i] / self.velocity[i]) for i in range(len(self.racing_track))]
self.total_time = sum(time_to_prev)
print(f"Total time for track, if racing line and speeds are followed perfectly: {self.total_time} s")
# 把数据写入到文件中
# Now we have list with columns (x,y,speed,distance,time)
racing_track_everything = []
for i in range(len(self.racing_track)):
racing_track_everything.append([self.racing_track[i][0],
self.racing_track[i][1],
self.velocity[i],
time_to_prev[i]])
# Round to 5 decimals
racing_track_everything = np.around(racing_track_everything, 5).tolist()
racing_track_everything = np.array(racing_track_everything)
if not os.path.exists('./track_with_speed'):
os.makedirs('./track_with_speed')
prefix = './track_with_speed/%s_%.1f-%.1f-%d-%.2f' % (self.race_line_name, min_speed, max_speed, look_ahead_points, self.total_time)
py_fname = prefix + '.py'
npy_fname = prefix + '.npy'
with open(py_fname, "w") as file:
print("Writing python code to %s" % py_fname)
file.write(np.array_repr(racing_track_everything))
print("Writing numpy binary to %s" % npy_fname)
np.save(npy_fname, racing_track_everything)
def get_raceline_files():
racing_lines = np.array([])
available_track_files = glob.glob("./racelines/**.npy")
available_track_names = list(map(lambda x: os.path.basename(x).split('.npy')[0], available_track_files))
for track_name in available_track_names:
racing_lines = np.append(racing_lines, './racelines/' + track_name + '.npy')
return racing_lines
if __name__ == '__main__':
min_speed = 1.2
max_speed = 4
look_ahead_points = 0
race_line_files = get_raceline_files()
plt.figure(figsize=(8, 5))
for race_line_file in race_line_files:
c = Calculate_Speed(race_line_file)
c.calc_speed(min_speed, max_speed, look_ahead_points)
x = [i[0] for i in c.racing_track]
y = [i[1] for i in c.racing_track]
plt.scatter(x, y, s=2, c=c.velocity, label=c.race_line_name+'__'+str(c.total_time))
plt.legend(loc=0)
plt.colorbar()
plt.show()
标签:xi,python,self,track,race,DeepRacer,new,最优,line
From: https://www.cnblogs.com/yuandonghua/p/16779276.html