目录
常用函数
- 添加usd资产
prim_path='/World/warehouse'
usd_path = '/home/linhai/app/isaac/Collected_warehouse_with_forklifts/warehouse_with_forklifts.usd'
stage_utils.add_reference_to_stage(usd_path=usd_path,prim_path=prim_path)
常用demo
task类
from abc import ABC, abstractmethod
from typing import Optional
import numpy as np
from omni.isaac.core.objects import DynamicCuboid
from omni.isaac.core.scenes.scene import Scene
from omni.isaac.core.tasks import BaseTask
from omni.isaac.core.utils.prims import is_prim_path_valid
from omni.isaac.core.utils.stage import get_stage_units
from omni.isaac.core.utils.string import find_unique_string_name
class PickPlace(ABC, BaseTask):
"""[summary]
Args:
name (str): [description]
cube_initial_position (Optional[np.ndarray], optional): [description]. Defaults to None.
cube_initial_orientation (Optional[np.ndarray], optional): [description]. Defaults to None.
target_position (Optional[np.ndarray], optional): [description]. Defaults to None.
cube_size (Optional[np.ndarray], optional): [description]. Defaults to None.
offset (Optional[np.ndarray], optional): [description]. Defaults to None.
"""
def __init__(
self,
name: str,
cube_initial_position: Optional[np.ndarray] = None,
cube_initial_orientation: Optional[np.ndarray] = None,
target_position: Optional[np.ndarray] = None,
cube_size: Optional[np.ndarray] = None,
offset: Optional[np.ndarray] = None,
) -> None:
BaseTask.__init__(self, name=name, offset=offset)
self._robot = None
self._target_cube = None
self._cube = None
self._cube_initial_position = cube_initial_position
self._cube_initial_orientation = cube_initial_orientation
self._target_position = target_position
self._cube_size = cube_size
if self._cube_size is None:
self._cube_size = np.array([0.0515, 0.0515, 0.0515]) / get_stage_units()
if self._cube_initial_position is None:
self._cube_initial_position = np.array([0.3, 0.3, 0.3]) / get_stage_units()
if self._cube_initial_orientation is None:
self._cube_initial_orientation = np.array([1, 0, 0, 0])
if self._target_position is None:
self._target_position = np.array([-0.3, -0.3, 0]) / get_stage_units()
self._target_position[2] = self._cube_size[2] / 2.0
self._target_position = self._target_position + self._offset
return
def set_up_scene(self, scene: Scene) -> None:
"""[summary]
Args:
scene (Scene): [description]
"""
super().set_up_scene(scene)
scene.add_default_ground_plane()
cube_prim_path = find_unique_string_name(
initial_name="/World/Cube", is_unique_fn=lambda x: not is_prim_path_valid(x)
)
cube_name = find_unique_string_name(initial_name="cube", is_unique_fn=lambda x: not self.scene.object_exists(x))
self._cube = scene.add(
DynamicCuboid(
name=cube_name,
position=self._cube_initial_position,
orientation=self._cube_initial_orientation,
prim_path=cube_prim_path,
scale=self._cube_size,
size=1.0,
color=np.array([0, 0, 1]),
)
)
self._task_objects[self._cube.name] = self._cube
self._robot = self.set_robot()
scene.add(self._robot)
self._task_objects[self._robot.name] = self._robot
self._move_task_objects_to_their_frame()
return
@abstractmethod
def set_robot(self) -> None:
raise NotImplementedError
def set_params(
self,
cube_position: Optional[np.ndarray] = None,
cube_orientation: Optional[np.ndarray] = None,
target_position: Optional[np.ndarray] = None,
) -> None:
if target_position is not None:
self._target_position = target_position
if cube_position is not None or cube_orientation is not None:
self._cube.set_local_pose(translation=cube_position, orientation=cube_orientation)
return
def get_params(self) -> dict:
params_representation = dict()
position, orientation = self._cube.get_local_pose()
params_representation["cube_position"] = {"value": position, "modifiable": True}
params_representation["cube_orientation"] = {"value": orientation, "modifiable": True}
params_representation["target_position"] = {"value": self._target_position, "modifiable": True}
params_representation["cube_name"] = {"value": self._cube.name, "modifiable": False}
params_representation["robot_name"] = {"value": self._robot.name, "modifiable": False}
return params_representation
def get_observations(self) -> dict:
"""[summary]
Returns:
dict: [description]
"""
joints_state = self._robot.get_joints_state()
cube_position, cube_orientation = self._cube.get_local_pose()
end_effector_position, _ = self._robot.end_effector.get_local_pose()
return {
self._cube.name: {
"position": cube_position,
"orientation": cube_orientation,
"target_position": self._target_position,
},
self._robot.name: {
"joint_positions": joints_state.positions,
"end_effector_position": end_effector_position,
},
}
def pre_step(self, time_step_index: int, simulation_time: float) -> None:
"""[summary]
Args:
time_step_index (int): [description]
simulation_time (float): [description]
"""
return
def post_reset(self) -> None:
from omni.isaac.manipulators.grippers.parallel_gripper import ParallelGripper
if isinstance(self._robot.gripper, ParallelGripper):
self._robot.gripper.set_joint_positions(self._robot.gripper.joint_opened_positions)
return
def calculate_metrics(self) -> dict:
"""[summary]"""
raise NotImplementedError
def is_done(self) -> bool:
"""[summary]"""
raise NotImplementedError
controller类
import typing
import numpy as np
from omni.isaac.core.controllers.base_controller import BaseController
from omni.isaac.core.utils.rotations import euler_angles_to_quat
from omni.isaac.core.utils.stage import get_stage_units
from omni.isaac.core.utils.types import ArticulationAction
from omni.isaac.manipulators.grippers.gripper import Gripper
class PickPlaceController(BaseController):
"""
A simple pick and place state machine for tutorials
Each phase runs for 1 second, which is the internal time of the state machine
Dt of each phase/ event step is defined
- Phase 0: Move end_effector above the cube center at the 'end_effector_initial_height'.
- Phase 1: Lower end_effector down to encircle the target cube
- Phase 2: Wait for Robot's inertia to settle.
- Phase 3: close grip.
- Phase 4: Move end_effector up again, keeping the grip tight (lifting the block).
- Phase 5: Smoothly move the end_effector toward the goal xy, keeping the height constant.
- Phase 6: Move end_effector vertically toward goal height at the 'end_effector_initial_height'.
- Phase 7: loosen the grip.
- Phase 8: Move end_effector vertically up again at the 'end_effector_initial_height'
- Phase 9: Move end_effector towards the old xy position.
Args:
name (str): Name id of the controller
cspace_controller (BaseController): a cartesian space controller that returns an ArticulationAction type
gripper (Gripper): a gripper controller for open/ close actions.
end_effector_initial_height (typing.Optional[float], optional): end effector initial picking height to start from (more info in phases above). If not defined, set to 0.3 meters. Defaults to None.
events_dt (typing.Optional[typing.List[float]], optional): Dt of each phase/ event step. 10 phases dt has to be defined. Defaults to None.
Raises:
Exception: events dt need to be list or numpy array
Exception: events dt need have length of 10
"""
def __init__(
self,
name: str,
cspace_controller: BaseController,
gripper: Gripper,
end_effector_initial_height: typing.Optional[float] = None,
events_dt: typing.Optional[typing.List[float]] = None,
) -> None:
BaseController.__init__(self, name=name)
self._event = 0
self._t = 0
self._h1 = end_effector_initial_height
if self._h1 is None:
self._h1 = 0.3 / get_stage_units()
self._h0 = None
self._events_dt = events_dt
if self._events_dt is None:
self._events_dt = [0.008, 0.005, 0.1, 0.1, 0.0025, 0.001, 0.0025, 1, 0.008, 0.08]
else:
if not isinstance(self._events_dt, np.ndarray) and not isinstance(self._events_dt, list):
raise Exception("events dt need to be list or numpy array")
elif isinstance(self._events_dt, np.ndarray):
self._events_dt = self._events_dt.tolist()
if len(self._events_dt) > 10:
raise Exception("events dt length must be less than 10")
self._cspace_controller = cspace_controller
self._gripper = gripper
self._pause = False
return
def is_paused(self) -> bool:
"""
Returns:
bool: True if the state machine is paused. Otherwise False.
"""
return self._pause
def get_current_event(self) -> int:
"""
Returns:
int: Current event/ phase of the state machine
"""
return self._event
def forward(
self,
picking_position: np.ndarray,
placing_position: np.ndarray,
current_joint_positions: np.ndarray,
end_effector_offset: typing.Optional[np.ndarray] = None,
end_effector_orientation: typing.Optional[np.ndarray] = None,
) -> ArticulationAction:
"""Runs the controller one step.
Args:
picking_position (np.ndarray): The object's position to be picked in local frame.
placing_position (np.ndarray): The object's position to be placed in local frame.
current_joint_positions (np.ndarray): Current joint positions of the robot.
end_effector_offset (typing.Optional[np.ndarray], optional): offset of the end effector target. Defaults to None.
end_effector_orientation (typing.Optional[np.ndarray], optional): end effector orientation while picking and placing. Defaults to None.
Returns:
ArticulationAction: action to be executed by the ArticulationController
"""
if end_effector_offset is None:
end_effector_offset = np.array([0, 0, 0])
if self._pause or self.is_done():
self.pause()
target_joint_positions = [None] * current_joint_positions.shape[0]
return ArticulationAction(joint_positions=target_joint_positions)
if self._event == 2:
target_joint_positions = ArticulationAction(joint_positions=[None] * current_joint_positions.shape[0])
elif self._event == 3:
target_joint_positions = self._gripper.forward(action="close")
elif self._event == 7:
target_joint_positions = self._gripper.forward(action="open")
else:
if self._event in [0, 1]:
self._current_target_x = picking_position[0]
self._current_target_y = picking_position[1]
self._h0 = picking_position[2]
interpolated_xy = self._get_interpolated_xy(
placing_position[0], placing_position[1], self._current_target_x, self._current_target_y
)
target_height = self._get_target_hs(placing_position[2])
position_target = np.array(
[
interpolated_xy[0] + end_effector_offset[0],
interpolated_xy[1] + end_effector_offset[1],
target_height + end_effector_offset[2],
]
)
if end_effector_orientation is None:
end_effector_orientation = euler_angles_to_quat(np.array([0, np.pi, 0]))
target_joint_positions = self._cspace_controller.forward(
target_end_effector_position=position_target, target_end_effector_orientation=end_effector_orientation
)
self._t += self._events_dt[self._event]
if self._t >= 1.0:
self._event += 1
self._t = 0
return target_joint_positions
def _get_interpolated_xy(self, target_x, target_y, current_x, current_y):
alpha = self._get_alpha()
xy_target = (1 - alpha) * np.array([current_x, current_y]) + alpha * np.array([target_x, target_y])
return xy_target
def _get_alpha(self):
if self._event < 5:
return 0
elif self._event == 5:
return self._mix_sin(self._t)
elif self._event in [6, 7, 8]:
return 1.0
elif self._event == 9:
return 1
else:
raise ValueError()
def _get_target_hs(self, target_height):
if self._event == 0:
h = self._h1
elif self._event == 1:
a = self._mix_sin(max(0, self._t))
h = self._combine_convex(self._h1, self._h0, a)
elif self._event == 3:
h = self._h0
elif self._event == 4:
a = self._mix_sin(max(0, self._t))
h = self._combine_convex(self._h0, self._h1, a)
elif self._event == 5:
h = self._h1
elif self._event == 6:
h = self._combine_convex(self._h1, target_height, self._mix_sin(self._t))
elif self._event == 7:
h = target_height
elif self._event == 8:
h = self._combine_convex(target_height, self._h1, self._mix_sin(self._t))
elif self._event == 9:
h = self._h1
else:
raise ValueError()
return h
def _mix_sin(self, t):
return 0.5 * (1 - np.cos(t * np.pi))
def _combine_convex(self, a, b, alpha):
return (1 - alpha) * a + alpha * b
def reset(
self,
end_effector_initial_height: typing.Optional[float] = None,
events_dt: typing.Optional[typing.List[float]] = None,
) -> None:
"""Resets the state machine to start from the first phase/ event
Args:
end_effector_initial_height (typing.Optional[float], optional): end effector initial picking height to start from. If not defined, set to 0.3 meters. Defaults to None.
events_dt (typing.Optional[typing.List[float]], optional): Dt of each phase/ event step. 10 phases dt has to be defined. Defaults to None.
Raises:
Exception: events dt need to be list or numpy array
Exception: events dt need have length of 10
"""
BaseController.reset(self)
self._cspace_controller.reset()
self._event = 0
self._t = 0
if end_effector_initial_height is not None:
self._h1 = end_effector_initial_height
self._pause = False
if events_dt is not None:
self._events_dt = events_dt
if not isinstance(self._events_dt, np.ndarray) and not isinstance(self._events_dt, list):
raise Exception("event velocities need to be list or numpy array")
elif isinstance(self._events_dt, np.ndarray):
self._events_dt = self._events_dt.tolist()
if len(self._events_dt) > 10:
raise Exception("events dt length must be less than 10")
return
def is_done(self) -> bool:
"""
Returns:
bool: True if the state machine reached the last phase. Otherwise False.
"""
if self._event >= len(self._events_dt):
return True
else:
return False
def pause(self) -> None:
"""Pauses the state machine's time and phase."""
self._pause = True
return
def resume(self) -> None:
"""Resumes the state machine's time and phase."""
self._pause = False
return
结合写好的task和cotroller
from omni.isaac.examples.base_sample import BaseSample
from omni.isaac.franka.tasks import PickPlace
from omni.isaac.franka.controllers import PickPlaceController
class HelloWorld(BaseSample):
def __init__(self) -> None:
super().__init__()
return
def setup_scene(self):
world = self.get_world()
# We add the task to the world here
world.add_task(PickPlace(name="awesome_task"))
return
async def setup_post_load(self):
#是一个协程方法,用于在场景加载后设置必要的组件。它检索世界实例,获取任务参数(如机器人和立方体的名称),初始化一个带有机器人夹具和关节细节的PickPlaceController,然后添加一个物理回调,在每个模拟步骤中被调用。最后,它异步启动模拟。
self._world = self.get_world()
# The world already called the setup_scene from the task so
# we can retrieve the task objects
# Each defined task in the robot extensions
# has set_params and get_params to allow for changing tasks during
# simulation, {"task_param_name": "value": [value], "modifiable": [True/ False]}
task_params = self._world.get_task("awesome_task").get_params()
self._franka = self._world.scene.get_object(task_params["robot_name"]["value"])
self._cube_name = task_params["cube_name"]["value"]
self._controller = PickPlaceController(
name="pick_place_controller",
gripper=self._franka.gripper,
robot_articulation=self._franka,
)
self._world.add_physics_callback("sim_step", callback_fn=self.physics_step)
await self._world.play_async()
return
async def setup_post_reset(self):
#另一个协程方法,旨在在模拟被重置后重置控制器状态,然后再次异步启动模拟。
self._controller.reset()
await self._world.play_async()
return
def physics_step(self, step_size):
# Gets all the tasks observations
current_observations = self._world.get_observations()
actions = self._controller.forward(
picking_position=current_observations[self._cube_name]["position"],
placing_position=current_observations[self._cube_name]["target_position"],
current_joint_positions=current_observations[self._franka.name]["joint_positions"],
)
self._franka.apply_action(actions)
if self._controller.is_done():
self._world.pause()
return
标签:None,cube,target,记录,self,._,isaac,position,sim
From: https://www.cnblogs.com/FrostyForest/p/18066517