原文地址:https://isaac-sim.github.io/IsaacLab/source/tutorials/03_envs/create_direct_rl_env.html
从环境中获取信息(观察)
获取joint(铰链)信息:位置和速度
joint会被包含在Articulation(关节)中,一个Articulation可能会包含1个或多个的joint对象,可以通过Articulation.find_joints()
方法获得joint在当前Articulation中的索引(index)数据。
find_joints
的返回值是这样的:tuple[list[joint索引], list[joint名字]]
find_joints的函数声明如下:
def find_joints( self, name_keys: str | Sequence[str], joint_subset: list[str] | None = None, preserve_order: bool = False ) -> tuple[list[int], list[str]]
在Articulation内部有一个属性私有变量_data: ArticulationData
,该变量通过方法def data(self) -> ArticulationData
获取,在ArticulationData
中存放着几个关节重要的数据:位置ArticulationData._joint_pos
,速度ArticulationData._joint_vel
,加速度ArticulationData._joint_acc
在ArticulationData
有几个@property
装饰器函数,用于获取上述的三个属性,这样可以用过属性名的方式直接访问到这些数据。
下面介绍下这三个方法的返回值:
joint_pos
返回torch.Size([num_instances, num_joints])
joint_vel
返回torch.Size([num_instances, num_joints])
joint_acc
返回torch.Size([num_instances, num_joints])
是时候讲解下DirectRLEnv(gym.Env)._get_observations(self) -> VecEnvObs
方法了,该方法带有@abstractmethod
被定义成抽象方法,所以我们在继承DirectRLEnv
类后必须在自己的类中实现_get_observations
方法。
我们在_get_observations
方法中计算并返回观测值,这会用到上面提到的ArticulationData
以及如何通过joint
索引从中获取实际数据。
在cartpole_env.py
的代码中有如下实现:
def _get_observations(self) -> torch.Dict[str, torch.Tensor | torch.Dict[str, torch.Tensor]]:
obs = torch.cat(
(
self.joint_pos[:, self._pole_dof_idx[0]].unsqueeze(dim=1),
self.joint_vel[:, self._pole_dof_idx[0]].unsqueeze(dim=1),
self.joint_pos[:, self._cart_dof_idx[0]].unsqueeze(dim=1),
self.joint_vel[:, self._cart_dof_idx[0]].unsqueeze(dim=1),
),
dim=-1,
)
observations = {"policy": obs}
return observations
上述代码中的_pole_dof_idx
里边存放的是杆子的joint
对应的索引数据,_cart_dof_idx
存放的是小车的joint
对应的索引数据,这里介绍下获取杆子位置的代码,获取杆子速度和小车位置和速度的代码都一样。
self.joint_pos
就的返回数据类型是:torch.Size([num_instances, num_joints])
形状的张量,所以self.joint_pos[:, self._pole_dof_idx[0]]
的意思就是从self.joint_pos
中获取索引为self._pole_dof_idx[0]
的所有杆子的位置信息
通过在_get_observations
函数中增加了print
函数我把数据打印出来
print("[INFO]: _pole_dof_idx -> ", self._pole_dof_idx)
print("[INFO]: joint_pos -> ", self.joint_pos)
print("[INFO]: pole_joint_pos -> ", self.joint_pos[:, self._pole_dof_idx[0]])
打印出的数据如下:
[INFO]: _pole_dof_idx -> [1]
[INFO]: joint_pos -> tensor([[-0.0995, -0.0243],
[-0.5815, 0.0256],
[-0.5531, 0.4727],
...,
[ 0.4905, -0.7841],
[-0.4129, 0.4739],
[ 0.4791, -0.8703]], device='cuda:0')
[INFO]: pole_joint_pos -> tensor([-0.0243, 0.0256, 0.4727, ..., -0.7841, 0.4739, -0.8703],
device='cuda:0')
由于self._pole_dof_idx[0] == 1
,所以第二列的数据存储的就是杆子的位置数据了。位于代码中用了self._pole_dof_idx[0]
,因为self._pole_dof_idx
的表中只存储了一个joint
的索引值,也就是当前杆子对应的joint
索引值。
如果大家感兴趣的话可以把小车的数据也打印出来看下在joint_pos
中的第一列数据是否是小车的位置信息。
Pytorch补充:
torch.unsqueeze(_input_, _dim_)
函数
用来将_input_
的数据增加一个维度,以打印信息的pole_joint_pos
数据为例,当dim=1
时,一维数组会变成二维张量,如下所示:
tensor([[-0.0243], [0.0256], [0.4727], ..., [-0.7841], [0.4739], [-0.8703]],
device='cuda:0')
数据变成了torch.Size([N, 1])
,也就是N行一列的数据
torch.cat(_tensors_, _dim=0_, _*_, _out=None_) → [Tensor]
函数
用来按照指定维度拼接多个张量,在本例中torch.cat
的dim=-1
,所以按照张量的最后一个维度进行拼接
最终_get_observations
中的obs
变量存储这一个torch.Size([N, 4])
形状的张量数据:
杆子位置 | 杆子速度 | 小车位置 | 小车速度 |
---|---|---|---|
… | … | … | … |
奖励函数 _get_rewards
分析
存活奖励rew_alive
在类class DirectRLEnv(gym.Env)
中有以下变量
self.reset_terminated = torch.zeros(self.num_envs, device=self.device, dtype=torch.bool)
当重置时该变量被设置为true, 否则为false
在计算存活奖励:rew_alive
时的代码如下:rew_alive = rew_scale_alive * (1.0 - reset_terminated.float())
将torch.bool
强转为torch.float
类型,当重置发生时rew_scale_alive * (1.0 - 1.0)
,所以重置时的存活奖励就是0
终止奖励(惩罚)rew_termination
当代理不稳定或处于不安全的状态时触发,另外如果代理能够长时间稳定运行也会希望终止回合并开始新的回合,这样代理可以学会从不同的起始配置启动
所以终止分两种:
- 时间限制条件
- 终止条件
计算终止奖励的代码如下:
rew_termination = rew_scale_terminated * reset_terminated.float()
速度 & 位置 范围限制奖励
当小车与倒立摆的速度与位置在范围内时能够获得的奖励
相关的奖励有三个,看下面的代码:
rew_pole_pos = rew_scale_pole_pos * torch.sum(torch.square(pole_pos), dim=-1)
rew_cart_vel = rew_scale_cart_vel * torch.sum(torch.abs(cart_vel), dim=-1)
rew_pole_vel = rew_scale_pole_vel * torch.sum(torch.abs(pole_vel), dim=-1)
Pytorch补充:
torch.sum
函数:
返回输入张量中所有元素的总和,dim=-1
表示的是最后一个维度
torch.square
函数:
计算所有元素的平方值,并返回新值的张量
torch.abs函数:
计算所有元素的绝对值,并返回新值的张量
设计终止条件_get_dones
在超时或者超出范围时我们需要重置环境,在DirectRLEnv
中有一个抽象方法用于配置终止条件,这个方法返回两个Tensor
组成的Tuple
。Tuple
中的第一个Tensor
存储了终止条件,第二个Tensor
存储了超时信息,每个张量的形状为:torch.Size([num_envs])
@abstractmethod
def _get_dones(self) -> tuple[torch.Tensor, torch.Tensor]
在cartpole_env.py
中的实现如下:
def _get_dones(self) -> tuple[torch.Tensor, torch.Tensor]:
self.joint_pos = self.cartpole.data.joint_pos
self.joint_vel = self.cartpole.data.joint_vel
time_out = self.episode_length_buf >= self.max_episode_length - 1
out_of_bounds = torch.any(torch.abs(self.joint_pos[:, self._cart_dof_idx]) > self.cfg.max_cart_pos, dim=1)
out_of_bounds = out_of_bounds | torch.any(torch.abs(self.joint_pos[:, self._pole_dof_idx]) > math.pi / 2, dim=1)
return out_of_bounds, time_out
超时条件(time_out)计算
将当前所有环境的episode缓冲长度(dtype=torch.long)与最大允许的episode长度进行比较,并将结果存储到一个形状为torch.Size([num_envs])
,数据类型为torch.bool
的张量中。
下面是DirectRLEnv.max_episode_length
的计算方法:
@property
def max_episode_length_s(self) -> float:
"""Maximum episode length in seconds."""
return self.cfg.episode_length_s
@property
def max_episode_length(self):
"""The maximum episode length in steps adjusted from s."""
return math.ceil(self.max_episode_length_s / (self.cfg.sim.dt * self.cfg.decimation))
self.cfg.sim.dt
:在SimulationCfg.dt
中定义,是物理时间步长(秒),self.cfg.sim.dt * self.cfg.decimation
用来计算控制动作而执行频率。- self.cfg.episode_length_s:episode的最大长度(秒)
- 所以
math.ceil(self.max_episode_length_s / (self.cfg.sim.dt * self.cfg.decimation))
是计算出最大的物理时间步数并向上取整
小车和杆子的活动界限计算
torch.abs(self.joint_pos[:, self._cart_dof_idx])
:获取小车位置信息的绝对值,返回的张量形状为:torch.Size([N, 1])
- 然后判断小车位置的绝对值是否大于
self.cfg.max_cart_pos
值,这时候会返回一个形状为:torch.Size([N, 1])
且dtype=bool
的张量 - 最后通过torch.any函数设置
dim=1
测试所有列的是否为True
,并返回形状为torch.Size([N])
的张量