(9)reset(self, det_state=None):用于初始化环境的状态,可以选择指定明确的初始状态(det_state),否则将生成一个随机状态。返回初始状态和对应的零控制动作。
def reset(self, det_state = None):
state = []
action = []
self.action_hist = []
self.robust_parameters.reset()
self.solved = 0
self.done = False
self.i = 0
self.prev_shaping = None
self.previous_state = np.zeros(self.state_size)
self.abs_sum = 0
if det_state is not None:
self.previous_state = det_state
q = np.array([self.previous_state[6:10]]).T
self.ang = quat_euler(q)
else:
self.ang = np.random.rand(3)-0.5
Q_in = euler_quat(self.ang)
self.previous_state[0:5:2] = np.clip((np.random.normal([0, 0, 0], 2)), -BB_POS/2, BB_POS/2)
self.previous_state[1:6:2] = np.clip((np.random.normal([0, 0, 0], 2)), -BB_VEL/2, BB_VEL/2)
self.previous_state[6:10] = Q_in.T
self.previous_state[10:13] = np.clip((np.random.normal([0, 0, 0], 2)), -BB_VEL*1.5, BB_POS*1.5)
for i in range(self.T):
self.action = self.zero_control
self.action_hist.append(self.action)
state_t, reward, done = self.step(self.action)
state.append(state_t.flatten())
action.append(self.zero_control)
return np.array(state), np.array(action)
(10)step(self, action):用于在环境中执行一步动作。如果使用直接控制标志,将执行直接控制;否则,将使用电机输入计算四个电机的角速度,然后通过积分求解微分方程更新环境状态。返回更新后的状态、奖励和标志指示是否完成。
def step(self, action):
self.i += 1
if self.direct_control_flag:
self.action = np.clip(action,-1,1)
u = self.action
self.clipped_action = self.action
self.step_effort = self.action
else:
self.action = action
self.step_effort, self.w, f_in, m_action = self.f2w(action[0], np.array([action[1::]]).T)
self.clipped_action = np.append([f_in], m_action)
u = self.clipped_action
self.action_hist.append(self.clipped_action)
self.y = (integrate.solve_ivp(self.drone_eq, (0, self.t_step), self.previous_state, args=(u, ))).y
self.state = np.transpose(self.y[:, -1])
self.quat_state = np.array([np.concatenate((self.state[0:10], self.V_q))])
q = np.array([self.state[6:10]]).T
q = q/np.linalg.norm(q)
self.ang = quat_euler(q)
self.ang_vel = (self.ang - self.prev_ang)/self.t_step
self.prev_ang = self.ang
self.previous_state = self.state
self.done_condition()
self.reward_function()
self.control_effort()
return self.state, self.reward, self.done
(11)done_condition(self):用于检查环境是否满足终止条件,根据状态中的位置、角度等信息,判断是否满足预定条件,如果满足则将标志 done 设置为 True。
def done_condition(self):
cond_x = np.concatenate((self.state[1:6:2], self.ang, self.state[-3:]))
for x, c in zip(np.abs(cond_x), self.bb_cond):
if x >= c:
self.done = True
(12)reward_function(self, debug=0):用于计算当前状态下的奖励。能够根据速度、角度等信息计算 shaping 奖励,并根据控制动作的绝对值计算控制努力奖励。如果环境状态达到预定目标,给予额外的解决奖励。
def reward_function(self, debug=0):
self.reward = 0
velocity = self.state[1:6:2]
euler_angles = self.ang
psi = self.ang[2]
body_ang_vel = self.state[-3:]
action = self.action
shaping = -SHAPING_WEIGHT/np.sum(SHAPING_INTERNAL_WEIGHTS)*(SHAPING_INTERNAL_WEIGHTS[0]*norm(velocity/BB_VEL)+
SHAPING_INTERNAL_WEIGHTS[1]*norm(psi/4)+
SHAPING_INTERNAL_WEIGHTS[2]*norm(euler_angles[0:2]/BB_ANG))
#CASCADING REWARDS
r_state = np.concatenate((velocity, [psi]))
for TR_i, TR_Pi in zip(TR, TR_P):
if norm(r_state) < norm(np.ones(len(r_state))*TR_i):
shaping += TR_Pi
if norm(euler_angles[0:2]) < norm(np.ones(2)*TR_i*4):
shaping += TR_Pi
break
if self.prev_shaping is not None:
self.reward = shaping - self.prev_shaping
self.prev_shaping = shaping
## TOTAL REWARD SHAPING ##
abs_control = -np.sum(np.square(action - self.zero_control)) * P_C
self.reward += + abs_control
#SOLUTION ACHIEVED?
self.target_state = 9*(TR[0]**2)
self.current_state = np.sum(np.square(np.concatenate((velocity, euler_angles, body_ang_vel))))
if self.current_state < self.target_state:
self.reward += SOLVED_REWARD
self.solved = 1
if self.ppo_training:
self.done = True
elif self.i >= self.n:
self.reward = self.reward
self.solved = 0
self.done=True
elif self.done:
self.reward += BROKEN_REWARD
self.solved = 0
(13)control_effort(self):用于计算控制努力。通过计算当前步骤的电机输入与零控制的差异来度量控制的努力,并累积到 abs_sum 变量中。
def control_effort(self):
instant_effort = np.sqrt(np.sum(np.square(self.step_effort-np.array([0*M*G, 0, 0, 0]))))
self.abs_sum += instant_effort
(14)定义类sensor,表示一个虚拟传感器集合,模拟了在飞行器(或其他飞行器)环境中可能使用的传感器的行为。这些传感器包括加速度计、陀螺仪、磁力计和GPS等,用于感知飞行器的状态和环境信息。函数__init__(self, env, accel_std=0.1, ...)用于初始化传感器对象。接收一个环境对象和一系列传感器参数,包括加速度计、陀螺仪、磁力计和GPS的噪声标准差和漂移。初始化传感器误差标志,并调用bias_reset方法进行偏差初始化。
class sensor():
def __init__(self, env,
accel_std = 0.1, accel_bias_drift = 0.0005,
gyro_std = 0.035, gyro_bias_drift = 0.00015,
magnet_std = 15, magnet_bias_drift = 0.075,
gps_std_p = 1.71, gps_std_v=0.5):
self.std = [accel_std, gyro_std, magnet_std, gps_std_p, gps_std_v]
self.b_d = [accel_bias_drift, gyro_bias_drift, magnet_bias_drift]
self.quad = env
self.error = True
self.bias_reset()
(15)bias_reset(self):用于重置传感器偏差。根据设定的标准差和漂移,生成加速度计、陀螺仪、磁力计、GPS的随机偏差。初始化旋转矩阵R为单位矩阵。
def bias_reset(self):
self.a_std = self.std[0]*self.error
self.a_b_d = (np.random.random()-0.5)*2*self.b_d[0]*self.error
self.g_std = self.std[1]*self.error
self.g_b_d = (np.random.random()-0.5)*2*self.b_d[1]*self.error
self.m_std = self.std[2]*self.error
self.m_b_d = (np.random.random()-0.5)*2*self.b_d[2]*self.error
self.gps_std_p = self.std[3]*self.error
self.gps_std_v = self.std[4]*self.error
self.R = np.eye(3)
(16)accel(self):用于获取模拟加速度计的读数。考虑了加速度计漂移,返回带有随机误差的加速度计读数。
def accel(self):
self.a_b_accel = self.a_b_accel + self.a_b_d*self.quad.t_step
read_error = np.random.normal(self.a_b_accel, self.a_std, 3)
read_accel_body = self.quad.accelerometer_read.flatten()
return read_accel_body+read_error
(17)accel_grav(self, norm=True):用于模拟获取包含重力的加速度计读数。考虑了加速度计漂移和随机误差,返回读取的加速度计与重力矢量的合成。
def accel_grav(self, norm=True):
gravity_vec = np.array([0, 0, -9.81])
self.a_b_grav = self.a_b_grav + self.a_b_d*self.quad.t_step
#Gravity vector as read from body sensor
gravity_body = np.dot(self.quad.mat_rot.T, gravity_vec)+ np.random.normal(np.random.random(3)*self.a_b_grav, self.a_std, 3)
if norm:
ax = gravity_body[0]
ay = gravity_body[1]
az = gravity_body[2]
if ax !=0 and ay != 0 and az != 0:
# Normalise accelerometer measurement
recipNorm = 1/(math.sqrt(ax * ax + ay * ay + az * az))
ax *= recipNorm
ay *= recipNorm
az *= recipNorm
gravity_body = np.array([[ax, ay, az]], dtype='float32')
return gravity_body
(18)mag_gauss(self, norm=True):用于模拟获取包含磁场信息的磁力计读数。考虑了磁力计漂移和随机误差,返回带有随机误差的磁力计读数。
def mag_gauss(self, norm=True):
magnet_vec = np.array([-65.269, 165.115, -144.205])
self.m_b = self.m_b + self.m_b_d*self.quad.t_step
magnet_body = np.dot(self.quad.mat_rot.T, magnet_vec) + np.random.normal(np.random.random(3)*self.m_b, self.m_std, 3)
mx = magnet_body[0]
my = magnet_body[1]
mz = magnet_body[2]
if norm:
recipNorm = 1/math.sqrt(mx * mx + my * my + mz * mz)
mx *= recipNorm
my *= recipNorm
mz *= recipNorm
magnet_body = np.array([mx, my, mz])
return magnet_body
(19)gyro(self):用于模拟获取陀螺仪读数。考虑了陀螺仪漂移,返回了带有随机误差的陀螺仪读数。
def gyro(self):
self.g_b = self.g_b + self.g_b_d*self.quad.t_step
read_error = np.random.normal(self.g_b, self.g_std, 3)
read_gyro = self.quad.state[-3:].flatten()
return np.array([read_error+read_gyro], dtype='float32').T
(20) reset(self):用于重置传感器的状态。将传感器的各种偏差和初始状态重置为零,用于环境的重启。
def reset(self):
self.a_b_grav = 0
self.a_b_accel = 0
self.m_b = 0
self.g_b = 0
self.acceleration_t0 = np.zeros(3)
self.position_t0 = self.quad.state[0:5:2]
self.velocity_t0 = self.quad.state[1:6:2]
self.quaternion_t0 = self.quad.state[6:10]
self.bias_reset()
(21)gps(self):用于模拟获取GPS的读数。考虑了GPS位置和速度的噪声,返回带有随机误差的GPS读数。
def gps(self):
read_error_pos = np.random.normal(0, self.gps_std_p, 3)
read_error_vel = np.random.normal(0, self.gps_std_v, 3)
gps_pos = self.quad.state[0:5:2].flatten()
gps_vel = self.quad.state[1:6:2].flatten()
return read_error_pos+gps_pos, read_error_vel+gps_vel
(22)triad(self):功能是使用TRIAD算法估计姿态。使用加速度计和磁力计的读数,结合环境的旋转矩阵,返回四元数和估计的旋转矩阵。
def triad(self):
gravity_vec = np.array([0, 0, -G])
magnet_vec = np.array([-65.269, 165.115,-144.205]) #mGauss
#Magnetic Vector of Santo André - Brasil in MiliGauss
#https://www.ngdc.noaa.gov/geomag/calculators/magcalc.shtml#igrfwmm
#Gravity vector as read from body sensor
induced_acceleration = self.quad.f_in.flatten()/M - (self.R @ np.array([[0, 0, -G]]).T).flatten()
gravity_body = self.accel() - induced_acceleration
#Magnetic Field vector as read from body sensor
magnet_body = self.quad.mat_rot.T @ (np.random.normal(magnet_vec, self.m_std))
#Accel vector is more accurate
#Body Coordinates
gravity_body = gravity_body / np.linalg.norm(gravity_body)
magnet_body = magnet_body / np.linalg.norm(magnet_body)
t1b = gravity_body/np.linalg.norm(gravity_body)
t2b = np.cross(gravity_body, magnet_body)
t2b = t2b/np.linalg.norm(t2b)
t3b = np.cross(t1b, t2b)
t3b = t3b/np.linalg.norm(t3b)
tb = np.vstack((t1b, t2b, t3b)).T
#Inertial Coordinates
gravity_vec = gravity_vec/np.linalg.norm(gravity_vec)
magnet_vec = magnet_vec / np.linalg.norm(magnet_vec)
t1i = gravity_vec/np.linalg.norm(gravity_vec)
t2i = np.cross(gravity_vec, magnet_vec)
t2i = t2i/np.linalg.norm(t2i)
t3i = np.cross(t1i, t2i)
t3i = t3i/np.linalg.norm(t3i)
ti = np.vstack((t1i, t2i, t3i)).T
self.R = tb @ ti.T
q = Rotation.from_matrix(self.R.T).as_quat()
q = np.concatenate(([q[3]], q[0:3]))
return q, self.R
(23)Madgwick_AHRS_Only_Acce:功能是通过Madgwick算法仅使用加速度计的姿态估计。通过加速度计和陀螺仪的测量值,以及前一时刻的四元数,返回通过Madgwick算法估计的新四元数。
def Madgwick_AHRS_Only_Accel(self, gyro_meas, accel_meas, q_ant):
q0 = q_ant[0]
q1 = q_ant[1]
q2 = q_ant[2]
q3 = q_ant[3]
gx = gyro_meas[0]
gy = gyro_meas[1]
gz = gyro_meas[2]
ax = accel_meas[0]
ay = accel_meas[1]
az = accel_meas[2]
dt = 0.01
beta = 0.1
# Rate of change of quaternion from gyroscope
qDot1 = 0.5 * (-q1 * gx - q2 * gy - q3 * gz)
qDot2 = 0.5 * (q0 * gx + q2 * gz - q3 * gy)
qDot3 = 0.5 * (q0 * gy - q1 * gz + q3 * gx)
qDot4 = 0.5 * (q0 * gz + q1 * gy - q2 * gx)
# Compute feedback only if accelerometer measurement valid (avoids NaN in accelerometer normalisation)
if ax !=0 and ay != 0 and az != 0:
# Normalise accelerometer measurement
recipNorm = 1/(math.sqrt(ax * ax + ay * ay + az * az))
ax *= recipNorm
ay *= recipNorm
az *= recipNorm
# Auxiliary variables to avoid repeated arithmetic
_2q0 = 2.0 * q0
_2q1 = 2.0 * q1
_2q2 = 2.0 * q2
_2q3 = 2.0 * q3
_4q0 = 4.0 * q0
_4q1 = 4.0 * q1
_4q2 = 4.0 * q2
_4q3 = 4.0 * q3
_8q1 = 8.0 * q1
_8q2 = 8.0 * q2
q0q0 = q0 * q0
q1q1 = q1 * q1
q2q2 = q2 * q2
q3q3 = q3 * q3
f_g1 = 2*(q0*q2 - q1*q3) - ax
f_g2 = -2*(q0*q1 + q2*q3) - ay
f_g3 = 2*(q1q1 + q2q2 - 0.5) - az
# Gradient decent algorithm corrective step
s0 = _2q2*f_g1 - _2q1*f_g2
s1 = -_2q3*f_g1 - _2q0*f_g2 + _4q1*f_g3
s2 = _2q0*f_g1 - _2q3*f_g2 + _4q2*f_g3
s3 = -_2q1*f_g1 - _2q2*f_g2
recipNorm = 1/ (math.sqrt(s0 * s0 + s1 * s1 + s2 * s2 + s3 * s3)) ## normalise step magnitude
s0 *= recipNorm
s1 *= recipNorm
s2 *= recipNorm
s3 *= recipNorm
# Apply feedback step
qDot1 -= beta * s0
qDot2 -= beta * s1
qDot3 -= beta * s2
qDot4 -= beta * s3
# Integrate rate of change of quaternion to yield quaternion
q0 += qDot1 * dt
q1 += qDot2 * dt
q2 += qDot3 * dt
q3 += qDot4 * dt
# Normalise quaternion
recipNorm = 1 / (math.sqrt(q0 * q0 + q1 * q1 + q2 * q2 + q3 * q3))
q0 *= recipNorm
q1 *= recipNorm
q2 *= recipNorm
q3 *= recipNorm
q = np.array([q0, q1, q2, q3])
# print('q0: {0}, q1: {1}, q2: {2}, q3: {3}'.format(q[0], q[1], q[2], q[3]))p
return q
(24)accel_int(self):用于模拟加速度计的积分功能。根据加速度计的读数,使用TRIAD算法得到的旋转矩阵,对加速度进行积分,返回加速度、速度和位置。
def accel_int(self):
accel_body = self.accel()
_, R = self.triad()
acceleration = np.dot(R, accel_body)
velocity = self.velocity_t0 + acceleration*self.quad.t_step
position = self.position_t0 + velocity*self.quad.t_step
self.acceleration_t0 = acceleration
self.velocity_t0 = velocity
self.position_t0 = position
return acceleration, velocity, position
(25)gyro_int(self):用于模拟陀螺仪的积分功能。通过获取陀螺仪的读数和前一时刻的四元数,使用四元数微分方程对四元数进行积分,返回更新后的四元数。
def gyro_int(self):
w = self.gyro()
q = self.quaternion_t0
V_q = deriv_quat(w, q).flatten()
for i in range(len(q)):
q[i] = q[i] + V_q[i]*self.quad.t_step
self.quaternion_t0 = q/np.linalg.norm(q)
return q
(26)类plotter用于绘制仿真环境中飞行器的状态和控制信息。其中函数__init__(self, env, velocity_plot=False, depth_plot=False)用于初始化方法接受仿真环境对象以及可选的参数,例如是否绘制速度信息(velocity_plot)和是否绘制深度图(depth_plot),然后根据输入参数设置绘图的相关属性。
class plotter():
def __init__(self, env, velocity_plot = False, depth_plot=False):
plt.close('all')
self.figure = plt.figure('States')
self.depth_plot = depth_plot
self.env = env
self.states = []
self.times = []
self.print_list = range(13)
if velocity_plot:
self.plot_labels = ['$\dot X$', '$\dot Y$', '$\dot Z$',
'', '', '',
'$\phi$', '$\\theta$', '$\psi$',
'$T_{MH,1}$', '$T_{MH,2}$', '$T_{MH,3}$', '$T_{MH,4}$']
self.axis_labels = ['Velocidade (ms)', 'Velocidade (ms)', 'Velocidade (ms)',
'Velocidade (ms)', 'Velocidade (ms)', 'Velocidade (ms)',
'Atitude (rad)', 'Atitude (rad)', 'Atitude (rad)',
'Empuxo (N)', 'Empuxo (N)', 'Empuxo (N)', 'Empuxo (N)']
self.depth_plot = False
else:
self.plot_labels = ['x', 'y', 'z',
'$\phi$', '$\theta$', '$\psi$',
'$u_1$', '$u_2$', '$u_3$', '$u_4$']
self.line_styles = ['-', '-', '-',
'--', '--', '--',
'--', '--', '--',
':', ':', ':', ':']
self.color = ['r', 'g', 'b',
'r', 'g', 'b',
'r', 'g', 'b',
'r', 'g', 'b', 'c']
self.plot_place = [0, 0, 0,
0, 0, 0,
1, 1, 1,
2, 2, 2, 2]
self.velocity_plot = velocity_plot
(27)add(self, target):功能是接受飞行器的状态信息,并将其添加到绘图队列中。状态信息包括位置、姿态和控制输入等。根据初始化时的设置,可能包括速度信息。
def add(self, target):
if self.velocity_plot:
# state = np.concatenate((self.env.state[1:6:2].flatten(), target[1:6:2], self.env.ang.flatten(), self.env.clipped_action.flatten()))
state = np.concatenate((self.env.state[1:6:2].flatten(), target[1:6:2], self.env.ang.flatten(), (self.env.step_effort.flatten()+1)*T2WR*M*G/8 ))
else:
state = np.concatenate((self.env.state[0:5:2].flatten(), self.env.ang.flatten(), self.env.clipped_action.flatten()))
self.states.append(state)
self.times.append(self.env.i*self.env.t_step)
(28)clear(self):功能是清空绘图队列,准备绘制新的数据。
def clear(self,):
self.states = []
self.times = []
(29)plot(self, nome='padrao'):功能是绘制累积的状态信息。根据初始化时的设置,可能绘制位置、姿态、速度等信息的时变曲线。如果启用了深度图,还会生成一个三维散点图。
def plot(self, nome='padrao'):
P = 0.7
fig, self.axs = plt.subplots(3, figsize = (7, 7*1.414), dpi=300)
# fig.suptitle(nome)
self.states = np.array(self.states)
self.times = np.array(self.times)
for print_state, label, line_style, axis_place, color, name in zip(self.print_list, self.plot_labels, self.line_styles, self.plot_place, self.color, self.axis_labels):
self.axs[axis_place].plot(self.times, self.states[:,print_state], label = label, ls=line_style, lw=0.8, color = color)
self.axs[axis_place].legend()
self.axs[axis_place].grid(True)
self.axs[axis_place].set(ylabel=name)
self.axs[2].axhline(y=T2WR*M*G/4)
self.axs[2].axhline(y=0)
# plt.xlabel('tempo (s)')
# plt.savefig(nome+'.pgf', bbox_inches='tight')
# plt.savefig(nome+'.png', bbox_inches='tight')
# plt.title(nome[-40:])
plt.show()
if self.depth_plot:
fig3d = plt.figure('3D map')
ax = Axes3D(fig3d)
ax.set_xlabel('x (m)')
ax.set_ylabel('y (m)')
ax.set_zlabel('z (m)')
states = np.array(self.states)
times = np.array(self.times)
xs = self.states[:,0]
ys = self.states[:,1]
zs = self.states[:,2]
t = self.times
ax.scatter(xs,ys,zs,c=plt.cm.jet(t/max(t)))
ax.plot3D(xs,ys,zs,linewidth=0.5)
ax.set_xlim(-BB_POS, BB_POS)
ax.set_ylim(-BB_POS, BB_POS)
ax.set_zlim(-BB_POS, BB_POS)
plt.grid(True)
plt.show()
self.clear()