Python强化学习PPO算法实现及个人解读(第3部分)
在前面定义好模型和经验回放后,就需要定义智能体了
三、定义智能体
import torch
from torch.distributions import Categorical
class Agent:
def __init__(self, cfg) -> None:
self.gamma = cfg.gamma
self.device = torch.device(cfg.device)
self.actor = ActorSoftmax(cfg.n_states, cfg.n_actions, hidden_dim=cfg.actor_hidden_dim).to(self.device)
self.critic = Critic(cfg.n_states, 1, hidden_dim=cfg.critic_hidden_dim).to(self.device)
self.actor_optimizer = torch.optim.Adam(self.actor.parameters(), lr=cfg.actor_lr)
self.critic_optimizer = torch.optim.Adam(self.critic.parameters(), lr=cfg.critic_lr)
self.memory = PGReplay()
self.k_epochs = cfg.k_epochs # update policy for K epochs
self.eps_clip = cfg.eps_clip # clip parameter for PPO
self.entropy_coefficient = cfg.entropy_coefficient # entropy coefficient
self.sample_count = 0
self.update_freq = cfg.update_freq
def sample_action(self, state):
self.sample_count += 1
state = torch.tensor(state, device=self.device, dtype=torch.float32).unsqueeze(dim=0)
probs = self.actor(state)
dist = Categorical(probs)
action = dist.sample()
self.log_probs = dist.log_prob(action).detach()
return action.detach().cpu().numpy().item()
@torch.no_grad()
def predict_action(self, state):
state = torch.tensor(state, device=self.device, dtype=torch.float32).unsqueeze(dim=0)
probs = self.actor(state)
dist = Categorical(probs)
action = dist.sample()
return action.detach().cpu().numpy().item()
def update(self):
# update policy every n steps
if self.sample_count % self.update_freq != 0:
return
# print("update policy")
old_states, old_actions, old_log_probs, old_rewards, old_dones = self.memory.sample()
# convert to tensor
import numpy as np
old_states = torch.tensor(np.array(old_states), device=self.device, dtype=torch.float32)
old_actions = torch.tensor(np.array(old_actions), device=self.device, dtype=torch.float32)
old_log_probs = torch.tensor(old_log_probs, device=self.device, dtype=torch.float32)
# monte carlo estimate of state rewards
returns = []
discounted_sum = 0
for reward, done in zip(reversed(old_rewards), reversed(old_dones)):
if done:
discounted_sum = 0
discounted_sum = reward + (self.gamma * discounted_sum)
returns.insert(0, discounted_sum)
# Normalizing the rewards:
returns = torch.tensor(returns, device=self.device, dtype=torch.float32)
returns = (returns - returns.mean()) / (returns.std() + 1e-5) # 1e-5 to avoid division by zero
for _ in range(self.k_epochs):
# compute advantage
values = self.critic(old_states) # detach to avoid backprop through the critic
advantage = returns - values.detach()
# get action probabilities
probs = self.actor(old_states)
dist = Categorical(probs)
# get new action probabilities
new_probs = dist.log_prob(old_actions)
# compute ratio (pi_theta / pi_theta__old):
ratio = torch.exp(new_probs - old_log_probs) # old_log_probs must be detached
# compute surrogate loss
surr1 = ratio * advantage
surr2 = torch.clamp(ratio, 1 - self.eps_clip, 1 + self.eps_clip) * advantage
# compute actor loss
actor_loss = -torch.min(surr1, surr2).mean() + self.entropy_coefficient * dist.entropy().mean()
# compute critic loss
critic_loss = (returns - values).pow(2).mean()
# take gradient step
self.actor_optimizer.zero_grad()
self.critic_optimizer.zero_grad()
actor_loss.backward()
critic_loss.backward()
self.actor_optimizer.step()
self.critic_optimizer.step()
self.memory.clear()
我们可以将代码分为两段进行分析,初始化以及采样动作为一部分,更新策略为另一部分。
1.第一部分
首先我们要知道cfg是一个参数,可以使得Agent类在初始化时能够接受外部传入的配置信息,从而方便地对 Agent 进行配置和定制。而Categorical分布,用于在离散空间中进行采样。Agent是基于Actor-Critic架构和PPO算法的强化学习智能体。
先初始化,cfg作为输入。并设置折扣因子gamma,用于计算未来奖励的折现值。Device是根据配置参数设置Agent的设备,可以是CPU或GPU。
接着初始化Actor神经网络,输入状态维度为cfg.n_states,输出动作维度为cfg.n_actions,并设置隐藏层维度为cfg.actor_hidden_dim。再初始化Critic神经网络,输入状态维度为cfg.n_states,输出维度为1(值函数),设置隐藏层维度为cfg.critic_hidden_dim。
然后初始化两个网络的优化器,使用Adam优化算法,对网络的参数进行优化,通过self.actor.parameters()获取 ActorSoftmax 模型中所有可学习的参数,返回一个参数迭代器。学习率为cfg.actor_lr。学习并优化完后就可以传给self.actor_optimizer。
(至于Adam优化算法,可以查看这篇文章:【pytorch优化器】Adam优化算法详解)
接着初始化经验回放缓冲区PGReplay,用于存储Agent与环境的交互数据。cfg.k_epochs:设置每次更新策略的迭代次数(即在每次更新策略时要执行的优化步骤的次数)。设置PPO算法中的clip参数,用于限制策略更新的幅度。cfg.entropy_coef为设置熵正则项的系数,用于增加策略的探索性(该值越大,策略越爱探索)。self.sample_count = 0是为了初始化采样计数器,用于控制策略更新的频率。最后设置策略更新的频率。
之后就要开始定义采样动作的方法,根据当前状态state采样一个动作。每调用一次sample_action方法,采样计数器加1。
Tensor这个就很重要,他是将输入的状态数据变成神经网络可以接受的形式(这个形式就是,将状态转化为张量,并增加一个维度),不可或缺。之后通过Actor网络计算状态state对应的动作概率。以这个概率为参数作离散概率分布(Categorical)。这样就能顺理从概率分布中采样得到一个动作。接着dist.log_prob(action)计算当前采样动作的对数概率,并使用detach()方法将其从计算图中分离出来,进而阻止梯度传播。将采样得到的动作转换为NumPy数组(就是那个numpy()),并返回动作的值(就是那个item())。
@torch.no_grad():装饰器,表示以下函数不会进行梯度计算。
第二部分是为了定义预测动作的方法,根据当前状态state预测一个动作。步骤都差不多,只不过不用算当前采样动作的对数概率。最后返回动作的值。
注意:
(1)尽管直接使用 self.device = cfg.device 也可以工作,但通过将设备信息转换为 torch.device 对象,可以提高代码的稳健性、灵活性和规范性。
(2).to(self.device) 的作用是将创建的 ActorSoftmax 对象移动到指定的设备上,也就是让演员网络和评论家网络在GPU上运行。
(3)torch.tensor()是一个通用的创建张量的函数,可根据输入数据自动确定数据类型。只有转化为张量,才可以进入神经网络计算。
(4)detach()的作用是将这个计算出的对数概率值从计算图中分离出来,使其不参与后续的梯度计算。这样做是为了避免对数概率的梯度影响到之后的网络参数更新。保存这个对数概率值的作用在于,在进行策略梯度算法的更新时,需要使用这个对数概率值乘以相应的奖励信号进行梯度计算,以促使网络优化以更可能选择当前采样的动作。
(5)初始化里最后的update_freq参数,是更新频率,如果设置的是10,也就是在10步之后才会进行策略更新,而其中一步就是指执行一次动作。
2.第二部分
首先检查是否需要更新策略,只有在self.sample_count为self.update_freq的整数倍时才会执行策略更新操作,避免过于频繁地进行策略更新,从而控制策略更新的频率。
然后从经验回放缓冲区中抽样获取之前保存的状态、动作、对数概率、奖励和完成标志。用tensor进行转变,然后用蒙特卡洛计算每个状态的回报值 returns:并根据每个时间步的奖励和完成标志(done)计算折扣累积奖励(discount_sum)。还需要对回报值进行归一化处理:计算均值(returns.mean())和标准差(returns.std()),加一个极小数是为了防止分母为0。
执行策略更新的循环,在每次更新中执行以下步骤:
这段代码主要实现的是基于PPO算法的策略网络更新过程,包括采样经验、计算回报值、计算优势函数、计算损失函数以及更新网络参数等步骤。
注意:
(1)PPO算法的相关知识可以在蘑菇书的第五章学习到相关的内容。也可以参考这篇文章:【强化学习PPO算法】
上图是PPO算法与其他算法进行比较,可以看出整体表现还是不错的。
作者:沈念辰