Python强化学习PPO算法实现及个人解读(第3部分)

在前面定义好模型和经验回放后,就需要定义智能体了

三、定义智能体

import torch
from torch.distributions import Categorical


class Agent:
    def __init__(self, cfg) -> None:

        self.gamma = cfg.gamma
        self.device = torch.device(cfg.device)
        self.actor = ActorSoftmax(cfg.n_states, cfg.n_actions, hidden_dim=cfg.actor_hidden_dim).to(self.device)
        self.critic = Critic(cfg.n_states, 1, hidden_dim=cfg.critic_hidden_dim).to(self.device)
        self.actor_optimizer = torch.optim.Adam(self.actor.parameters(), lr=cfg.actor_lr)
        self.critic_optimizer = torch.optim.Adam(self.critic.parameters(), lr=cfg.critic_lr)
        self.memory = PGReplay()
        self.k_epochs = cfg.k_epochs  # update policy for K epochs
        self.eps_clip = cfg.eps_clip  # clip parameter for PPO
        self.entropy_coefficient = cfg.entropy_coefficient  # entropy coefficient
        self.sample_count = 0
        self.update_freq = cfg.update_freq

    def sample_action(self, state):
        self.sample_count += 1
        state = torch.tensor(state, device=self.device, dtype=torch.float32).unsqueeze(dim=0)
        probs = self.actor(state)
        dist = Categorical(probs)
        action = dist.sample()
        self.log_probs = dist.log_prob(action).detach()
        return action.detach().cpu().numpy().item()

    @torch.no_grad()
    def predict_action(self, state):
        state = torch.tensor(state, device=self.device, dtype=torch.float32).unsqueeze(dim=0)
        probs = self.actor(state)
        dist = Categorical(probs)
        action = dist.sample()
        return action.detach().cpu().numpy().item()

    def update(self):
        # update policy every n steps
        if self.sample_count % self.update_freq != 0:
            return
        # print("update policy")
        old_states, old_actions, old_log_probs, old_rewards, old_dones = self.memory.sample()
        # convert to tensor
        import numpy as np
        old_states = torch.tensor(np.array(old_states), device=self.device, dtype=torch.float32)
        old_actions = torch.tensor(np.array(old_actions), device=self.device, dtype=torch.float32)
        old_log_probs = torch.tensor(old_log_probs, device=self.device, dtype=torch.float32)
        # monte carlo estimate of state rewards
        returns = []
        discounted_sum = 0
        for reward, done in zip(reversed(old_rewards), reversed(old_dones)):
            if done:
                discounted_sum = 0
            discounted_sum = reward + (self.gamma * discounted_sum)
            returns.insert(0, discounted_sum)
        # Normalizing the rewards:
        returns = torch.tensor(returns, device=self.device, dtype=torch.float32)
        returns = (returns - returns.mean()) / (returns.std() + 1e-5)  # 1e-5 to avoid division by zero
        for _ in range(self.k_epochs):
            # compute advantage
            values = self.critic(old_states)  # detach to avoid backprop through the critic
            advantage = returns - values.detach()
            # get action probabilities
            probs = self.actor(old_states)
            dist = Categorical(probs)
            # get new action probabilities
            new_probs = dist.log_prob(old_actions)
            # compute ratio (pi_theta / pi_theta__old):
            ratio = torch.exp(new_probs - old_log_probs)  # old_log_probs must be detached
            # compute surrogate loss
            surr1 = ratio * advantage
            surr2 = torch.clamp(ratio, 1 - self.eps_clip, 1 + self.eps_clip) * advantage
            # compute actor loss
            actor_loss = -torch.min(surr1, surr2).mean() + self.entropy_coefficient * dist.entropy().mean()
            # compute critic loss
            critic_loss = (returns - values).pow(2).mean()
            # take gradient step
            self.actor_optimizer.zero_grad()
            self.critic_optimizer.zero_grad()
            actor_loss.backward()
            critic_loss.backward()
            self.actor_optimizer.step()
            self.critic_optimizer.step()
        self.memory.clear()

我们可以将代码分为两段进行分析,初始化以及采样动作为一部分,更新策略为另一部分。

1.第一部分

首先我们要知道cfg是一个参数,可以使得Agent类在初始化时能够接受外部传入的配置信息,从而方便地对 Agent 进行配置和定制。而Categorical分布,用于在离散空间中进行采样。Agent是基于Actor-Critic架构和PPO算法的强化学习智能体。

先初始化,cfg作为输入。并设置折扣因子gamma,用于计算未来奖励的折现值。Device是根据配置参数设置Agent的设备,可以是CPU或GPU。

接着初始化Actor神经网络,输入状态维度为cfg.n_states,输出动作维度为cfg.n_actions,并设置隐藏层维度为cfg.actor_hidden_dim。再初始化Critic神经网络,输入状态维度为cfg.n_states,输出维度为1(值函数),设置隐藏层维度为cfg.critic_hidden_dim。

然后初始化两个网络的优化器,使用Adam优化算法,对网络的参数进行优化,通过self.actor.parameters()获取 ActorSoftmax 模型中所有可学习的参数,返回一个参数迭代器。学习率为cfg.actor_lr。学习并优化完后就可以传给self.actor_optimizer。

(至于Adam优化算法,可以查看这篇文章:【pytorch优化器】Adam优化算法详解)

接着初始化经验回放缓冲区PGReplay,用于存储Agent与环境的交互数据。cfg.k_epochs:设置每次更新策略的迭代次数(即在每次更新策略时要执行的优化步骤的次数)。设置PPO算法中的clip参数,用于限制策略更新的幅度。cfg.entropy_coef为设置熵正则项的系数,用于增加策略的探索性(该值越大,策略越爱探索)。self.sample_count = 0是为了初始化采样计数器,用于控制策略更新的频率。最后设置策略更新的频率。

之后就要开始定义采样动作的方法,根据当前状态state采样一个动作。每调用一次sample_action方法,采样计数器加1。

Tensor这个就很重要,他是将输入的状态数据变成神经网络可以接受的形式(这个形式就是,将状态转化为张量,并增加一个维度),不可或缺。之后通过Actor网络计算状态state对应的动作概率。以这个概率为参数作离散概率分布(Categorical)。这样就能顺理从概率分布中采样得到一个动作。接着dist.log_prob(action)计算当前采样动作的对数概率,并使用detach()方法将其从计算图中分离出来,进而阻止梯度传播。将采样得到的动作转换为NumPy数组(就是那个numpy()),并返回动作的值(就是那个item())。

@torch.no_grad():装饰器,表示以下函数不会进行梯度计算。

第二部分是为了定义预测动作的方法,根据当前状态state预测一个动作。步骤都差不多,只不过不用算当前采样动作的对数概率。最后返回动作的值。

注意:

(1)尽管直接使用 self.device = cfg.device 也可以工作,但通过将设备信息转换为 torch.device 对象,可以提高代码的稳健性、灵活性和规范性。

(2).to(self.device) 的作用是将创建的 ActorSoftmax 对象移动到指定的设备上,也就是让演员网络和评论家网络在GPU上运行。

(3)torch.tensor()是一个通用的创建张量的函数,可根据输入数据自动确定数据类型。只有转化为张量,才可以进入神经网络计算

(4)detach()的作用是将这个计算出的对数概率值从计算图中分离出来,使其不参与后续的梯度计算。这样做是为了避免对数概率的梯度影响到之后的网络参数更新。保存这个对数概率值的作用在于,在进行策略梯度算法的更新时,需要使用这个对数概率值乘以相应的奖励信号进行梯度计算,以促使网络优化以更可能选择当前采样的动作。

(5)初始化里最后的update_freq参数,是更新频率,如果设置的是10,也就是在10步之后才会进行策略更新,而其中一步就是指执行一次动作。

2.第二部分

首先检查是否需要更新策略,只有在self.sample_count为self.update_freq的整数倍时才会执行策略更新操作,避免过于频繁地进行策略更新,从而控制策略更新的频率。

然后从经验回放缓冲区中抽样获取之前保存的状态、动作、对数概率、奖励和完成标志。用tensor进行转变,然后用蒙特卡洛计算每个状态的回报值 returns:并根据每个时间步的奖励和完成标志(done)计算折扣累积奖励(discount_sum)。还需要对回报值进行归一化处理:计算均值(returns.mean())和标准差(returns.std()),加一个极小数是为了防止分母为0。

执行策略更新的循环,在每次更新中执行以下步骤:

  • 计算优势函数:先计算Critic网络对应的状态值函数 values(需要使用.detach()避免反向传播影响 Critic 网络)。将计算得到的回报值returns减去评论家网络预测的值函数values.detach(),得到每个状态对应的优势值。优势值表示当前状态相对于基准线(评论家网络的值函数预测)的优劣程度,有助于指导策略网络的更新方向。
  • 获取 Actor 网络输出的动作概率分布并计算对数动作概率 new_probs(这里实际上是计算选择旧动作的对数概率,便于后期损失函数的计算)。
  • 计算新旧策略比率 ratio 以及两个损失值 surr1 和 surr2(PPO公式里的)。
  • 计算Actor的损失函数actor_loss和 Critic 的损失函数 critic_loss。其中-torch.min(surr1, surr2).mean():是取上一步中计算得到的较小代理损失值的负平均值。这部分构成了策略网络的主要损失,指导着策略网络参数的更新方向,后面加的是熵正则化项,用于提高探索性。而critic_loss是计算的均方差,pow(2)是指给前面的(真实值-预测值)加上平方,之后再取平均,所以是均方差。
  • 开始进行策略更新:先清空 Actor 和 Critic 网络的梯度,避免梯度累计。然后进行反向传播backward(),计算梯度。接着利用优化器根据计算得到的梯度更新策略网络和评论家网络的参数。
  • 最后:清空经验回放缓存,准备迎接下一轮的训练数据。
  • 这段代码主要实现的是基于PPO算法的策略网络更新过程,包括采样经验、计算回报值、计算优势函数、计算损失函数以及更新网络参数等步骤。

    注意:

    (1)PPO算法的相关知识可以在蘑菇书的第五章学习到相关的内容。也可以参考这篇文章:【强化学习PPO算法】

    上图是PPO算法与其他算法进行比较,可以看出整体表现还是不错的。

    作者:沈念辰

    物联沃分享整理
    物联沃-IOTWORD物联网 » Python强化学习PPO算法实现及个人解读(第3部分)

    发表回复